Skip to content

Commit e55c0f4

Browse files
fix: handle errors occurred in non-blocking sendMessage (#187)
# Description This PR fixes the error of server crash in case of error in the event loop for non blocking event, if the first reply to the client has been already sent. Two different error handling strategies are implemented based on the origin of the error: 1. If the error originates from the pushNotification service, the loop is not interrupted, and the error is simply printed on the console error output 2. If the error is originating from the ExecutionEventQueue or from a taskStore operation (resultManager interaction), the error is interrupting the loop. In the catch clause a TaskStatusUpdateEvent is generated, with taskStatus equal to "failed", if an already existing Task has been processed by the ResultManager. The new TaskStatusUpdateEvent is saved in the taskStore. If a new error occurs within this operation, it is simply printed on the console error output. Fixes #186 🦕
1 parent dc9f7dc commit e55c0f4

File tree

3 files changed

+154
-7
lines changed

3 files changed

+154
-7
lines changed

src/server/request_handler/default_request_handler.ts

Lines changed: 63 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,11 @@ export class DefaultRequestHandler implements A2ARequestHandler {
167167
for await (const event of eventQueue.events()) {
168168
await resultManager.processEvent(event);
169169

170-
await this._sendPushNotificationIfNeeded(event);
170+
try {
171+
await this._sendPushNotificationIfNeeded(event);
172+
} catch (error) {
173+
console.error(`Error sending push notification: ${error}`);
174+
}
171175

172176
if (options?.firstResultResolver && !firstResultSent) {
173177
let firstResult: Message | Task | undefined;
@@ -187,13 +191,15 @@ export class DefaultRequestHandler implements A2ARequestHandler {
187191
A2AError.internalError('Execution finished before a message or task was produced.')
188192
);
189193
}
190-
} catch (error) {
194+
} catch (error: any) {
191195
console.error(`Event processing loop failed for task ${taskId}:`, error);
192-
if (options?.firstResultRejector && !firstResultSent) {
193-
options.firstResultRejector(error);
194-
}
195-
// re-throw error for blocking case to catch
196-
throw error;
196+
this._handleProcessingError(
197+
error,
198+
resultManager,
199+
firstResultSent,
200+
taskId,
201+
options?.firstResultRejector
202+
);
197203
} finally {
198204
this.eventBusManager.cleanupByTaskId(taskId);
199205
}
@@ -621,4 +627,54 @@ export class DefaultRequestHandler implements A2ARequestHandler {
621627
// Send push notification in the background.
622628
this.pushNotificationSender?.send(task);
623629
}
630+
631+
private async _handleProcessingError(
632+
error: any,
633+
resultManager: ResultManager,
634+
firstResultSent: boolean,
635+
taskId: string,
636+
firstResultRejector?: (reason: any) => void
637+
): Promise<void> {
638+
// Non-blocking case with with first result not sent
639+
if (firstResultRejector && !firstResultSent) {
640+
firstResultRejector(error);
641+
return;
642+
}
643+
644+
// re-throw error for blocking case to catch
645+
if (!firstResultRejector) {
646+
throw error;
647+
}
648+
649+
// Non-blocking case with first result already sent
650+
const currentTask = resultManager.getCurrentTask();
651+
if (currentTask) {
652+
const statusUpdateFailed: TaskStatusUpdateEvent = {
653+
taskId: currentTask.id,
654+
contextId: currentTask.contextId,
655+
status: {
656+
state: 'failed',
657+
message: {
658+
kind: 'message',
659+
role: 'agent',
660+
messageId: uuidv4(),
661+
parts: [{ kind: 'text', text: `Event processing loop failed: ${error.message}` }],
662+
taskId: currentTask.id,
663+
contextId: currentTask.contextId,
664+
},
665+
timestamp: new Date().toISOString(),
666+
},
667+
kind: 'status-update',
668+
final: true,
669+
};
670+
671+
try {
672+
await resultManager.processEvent(statusUpdateFailed);
673+
} catch (error) {
674+
console.error(`Event processing loop failed for task ${taskId}: ${error.message}`);
675+
}
676+
} else {
677+
console.error(`Event processing loop failed for task ${taskId}: ${error.message}`);
678+
}
679+
}
624680
}

test/server/default_request_handler.spec.ts

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ import {
4040
} from './mocks/agent-executor.mock.js';
4141
import { MockPushNotificationSender } from './mocks/push_notification_sender.mock.js';
4242
import { ServerCallContext } from '../../src/server/context.js';
43+
import { MockTaskStore } from './mocks/task_store.mock.js';
44+
import { TextPart } from 'genkit/model';
4345

4446
describe('DefaultRequestHandler as A2ARequestHandler', () => {
4547
let handler: A2ARequestHandler;
@@ -277,6 +279,87 @@ describe('DefaultRequestHandler as A2ARequestHandler', () => {
277279
assert.equal(saveSpy.secondCall.args[0].status.state, 'completed');
278280
});
279281

282+
it('sendMessage: (non-blocking) should handle failure in event loop after successfull task event', async () => {
283+
clock = sinon.useFakeTimers();
284+
285+
const mockTaskStore = new MockTaskStore();
286+
const handler = new DefaultRequestHandler(
287+
testAgentCard,
288+
mockTaskStore,
289+
mockAgentExecutor,
290+
executionEventBusManager
291+
);
292+
293+
const params: MessageSendParams = {
294+
message: createTestMessage('msg-nonblock', 'Do a long task'),
295+
configuration: {
296+
blocking: false,
297+
acceptedOutputModes: [],
298+
},
299+
};
300+
301+
const taskId = 'task-nonblock-123';
302+
const contextId = 'ctx-nonblock-abc';
303+
(mockAgentExecutor as MockAgentExecutor).execute.callsFake(async (ctx, bus) => {
304+
// First event is the task creation, which should be returned immediately
305+
bus.publish({
306+
id: taskId,
307+
contextId,
308+
status: { state: 'submitted' },
309+
kind: 'task',
310+
});
311+
312+
// Simulate work before publishing more events
313+
await clock.tickAsync(500);
314+
315+
bus.publish({
316+
taskId,
317+
contextId,
318+
kind: 'status-update',
319+
status: { state: 'completed' },
320+
final: true,
321+
});
322+
bus.finished();
323+
});
324+
325+
let finalTaskSaved: Task | undefined;
326+
const errorMessage = 'Error thrown on saving completed task notification';
327+
(mockTaskStore as MockTaskStore).save.callsFake(async (task) => {
328+
if (task.status.state == 'completed') {
329+
throw new Error(errorMessage);
330+
}
331+
332+
if (task.status.state == 'failed') {
333+
finalTaskSaved = task;
334+
}
335+
});
336+
337+
// This call should return as soon as the first 'task' event is published
338+
const immediateResult = await handler.sendMessage(params);
339+
340+
// Assert that we got the initial task object back right away
341+
const taskResult = immediateResult as Task;
342+
assert.equal(taskResult.kind, 'task');
343+
assert.equal(taskResult.id, taskId);
344+
assert.equal(
345+
taskResult.status.state,
346+
'submitted',
347+
"Should return immediately with 'submitted' state"
348+
);
349+
350+
// Allow the background processing to complete
351+
await clock.runAllAsync();
352+
353+
assert.equal(finalTaskSaved!.status.state, 'failed');
354+
assert.equal(finalTaskSaved!.id, taskId);
355+
assert.equal(finalTaskSaved!.contextId, contextId);
356+
assert.equal(finalTaskSaved!.status.message!.role, 'agent');
357+
assert.equal(
358+
(finalTaskSaved!.status.message!.parts[0] as TextPart).text,
359+
`Event processing loop failed: ${errorMessage}`
360+
);
361+
});
362+
280363
it('sendMessage: should handle agent execution failure for non-blocking calls', async () => {
281364
const errorMessage = 'Agent failed!';
282365
(mockAgentExecutor as MockAgentExecutor).execute.rejects(new Error(errorMessage));
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
import sinon, { SinonStub } from 'sinon';
2+
import { Task } from '../../../src/index.js';
3+
import { TaskStore } from '../../../src/server/store.js';
4+
5+
export class MockTaskStore implements TaskStore {
6+
public save: SinonStub<[Task], Promise<void>> = sinon.stub();
7+
public load: SinonStub<[string], Promise<Task | undefined>> = sinon.stub();
8+
}

0 commit comments

Comments
 (0)