-
-
Notifications
You must be signed in to change notification settings - Fork 121
Fix/tool race conditions #258
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
36e7eaf
7edc01c
75ece1b
4077ac6
96c89d1
3aebce6
91c9c6f
28c9b17
dca833e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| --- | ||
| '@tanstack/ai': patch | ||
| '@tanstack/ai-client': patch | ||
| --- | ||
|
|
||
| fix: improve tool execution reliability and prevent race conditions | ||
|
|
||
| - Fix client tool execution race conditions by tracking pending tool executions | ||
| - Prevent duplicate continuation attempts with continuationPending flag | ||
| - Guard against concurrent stream processing in streamResponse | ||
| - Add approval info to ToolCall type for server-side decision tracking | ||
| - Include approval info in model message conversion for approval workflows | ||
| - Check ModelMessage format for approval info extraction in chat activity | ||
|
|
||
| This change improves the reliability of tool execution, especially for: | ||
|
|
||
| - Client tools with async execute functions | ||
| - Approval-based tool workflows | ||
| - Sequential tool execution scenarios |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,6 +29,10 @@ export class ChatClient { | |
| private currentStreamId: string | null = null | ||
| private currentMessageId: string | null = null | ||
| private postStreamActions: Array<() => Promise<void>> = [] | ||
| // Track pending client tool executions to await them before stream finalization | ||
| private pendingToolExecutions: Map<string, Promise<void>> = new Map() | ||
| // Flag to deduplicate continuation checks during action draining | ||
| private continuationPending = false | ||
|
|
||
| private callbacksRef: { | ||
| current: { | ||
|
|
@@ -133,31 +137,41 @@ export class ChatClient { | |
| ) | ||
| } | ||
| }, | ||
| onToolCall: async (args: { | ||
| onToolCall: (args: { | ||
| toolCallId: string | ||
| toolName: string | ||
| input: any | ||
| }) => { | ||
| // Handle client-side tool execution automatically | ||
| const clientTool = this.clientToolsRef.current.get(args.toolName) | ||
| if (clientTool?.execute) { | ||
| try { | ||
| const output = await clientTool.execute(args.input) | ||
| await this.addToolResult({ | ||
| toolCallId: args.toolCallId, | ||
| tool: args.toolName, | ||
| output, | ||
| state: 'output-available', | ||
| }) | ||
| } catch (error: any) { | ||
| await this.addToolResult({ | ||
| toolCallId: args.toolCallId, | ||
| tool: args.toolName, | ||
| output: null, | ||
| state: 'output-error', | ||
| errorText: error.message, | ||
| }) | ||
| } | ||
| const executeFunc = clientTool?.execute | ||
| if (executeFunc) { | ||
| // Create and track the execution promise | ||
| const executionPromise = (async () => { | ||
| try { | ||
| const output = await executeFunc(args.input) | ||
| await this.addToolResult({ | ||
| toolCallId: args.toolCallId, | ||
| tool: args.toolName, | ||
| output, | ||
| state: 'output-available', | ||
| }) | ||
| } catch (error: any) { | ||
| await this.addToolResult({ | ||
| toolCallId: args.toolCallId, | ||
| tool: args.toolName, | ||
| output: null, | ||
| state: 'output-error', | ||
| errorText: error.message, | ||
| }) | ||
| } finally { | ||
| // Remove from pending when complete | ||
| this.pendingToolExecutions.delete(args.toolCallId) | ||
| } | ||
| })() | ||
|
|
||
| // Track the pending execution | ||
| this.pendingToolExecutions.set(args.toolCallId, executionPromise) | ||
| } | ||
| }, | ||
| onApprovalRequest: (args: { | ||
|
|
@@ -238,6 +252,12 @@ export class ChatClient { | |
| await new Promise((resolve) => setTimeout(resolve, 0)) | ||
| } | ||
|
|
||
| // Wait for all pending tool executions to complete before finalizing | ||
| // This ensures client tools finish before we check for continuation | ||
| if (this.pendingToolExecutions.size > 0) { | ||
| await Promise.all(this.pendingToolExecutions.values()) | ||
| } | ||
|
|
||
| // Finalize the stream | ||
| this.processor.finalizeStream() | ||
|
|
||
|
|
@@ -298,21 +318,35 @@ export class ChatClient { | |
| const messages = this.processor.getMessages() | ||
| this.processor.setMessages([...messages, uiMessage]) | ||
|
|
||
| // If stream is in progress, queue the response for after it ends | ||
| if (this.isLoading) { | ||
| this.queuePostStreamAction(() => this.streamResponse()) | ||
| return | ||
| } | ||
|
|
||
| await this.streamResponse() | ||
| } | ||
|
|
||
| /** | ||
| * Stream a response from the LLM | ||
| */ | ||
| private async streamResponse(): Promise<void> { | ||
| // Guard against concurrent streams - if already loading, skip | ||
| if (this.isLoading) { | ||
| return | ||
| } | ||
|
|
||
| this.setIsLoading(true) | ||
| this.setStatus('submitted') | ||
| this.setError(undefined) | ||
| this.abortController = new AbortController() | ||
| // Reset pending tool executions for the new stream | ||
| this.pendingToolExecutions.clear() | ||
| let streamCompletedSuccessfully = false | ||
|
|
||
| try { | ||
| // Get model messages for the LLM | ||
| const modelMessages = this.processor.toModelMessages() | ||
| // Get UIMessages with parts (preserves approval state and client tool results) | ||
| const messages = this.processor.getMessages() | ||
|
|
||
| // Call onResponse callback | ||
| await this.callbacksRef.current.onResponse() | ||
|
|
@@ -325,12 +359,13 @@ export class ChatClient { | |
|
|
||
| // Connect and stream | ||
| const stream = this.connection.connect( | ||
| modelMessages, | ||
| messages, | ||
| bodyWithConversationId, | ||
| this.abortController.signal, | ||
| ) | ||
|
|
||
| await this.processStream(stream) | ||
| streamCompletedSuccessfully = true | ||
| } catch (err) { | ||
| if (err instanceof Error) { | ||
| if (err.name === 'AbortError') { | ||
|
|
@@ -346,6 +381,20 @@ export class ChatClient { | |
|
|
||
| // Drain any actions that were queued while the stream was in progress | ||
| await this.drainPostStreamActions() | ||
|
|
||
| // Continue conversation if the stream ended with a tool result (server tool completed) | ||
| if (streamCompletedSuccessfully) { | ||
| const messages = this.processor.getMessages() | ||
| const lastPart = messages.at(-1)?.parts.at(-1) | ||
|
|
||
| if (lastPart?.type === 'tool-result' && this.shouldAutoSend()) { | ||
| try { | ||
| await this.checkForContinuation() | ||
| } catch (error) { | ||
| console.error('Failed to continue flow after tool result:', error) | ||
| } | ||
| } | ||
| } | ||
|
Comment on lines
+384
to
+397
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: # First, let's examine the file structure and find the chat-client.ts file
find . -name "chat-client.ts" -type fRepository: TanStack/ai Length of output: 105 🏁 Script executed: # Read the specific lines mentioned in the review (384-397)
sed -n '370,420p' packages/typescript/ai-client/src/chat-client.ts | cat -nRepository: TanStack/ai Length of output: 1896 🏁 Script executed: # Look at the drainPostStreamActions method
rg -n "drainPostStreamActions" packages/typescript/ai-client/src/chat-client.ts -A 20Repository: TanStack/ai Length of output: 1426 🏁 Script executed: # Look at the checkForContinuation method
rg -n "checkForContinuation" packages/typescript/ai-client/src/chat-client.ts -A 15Repository: TanStack/ai Length of output: 2180 🏁 Script executed: # Check for any tests related to tool continuation and nested streams
find . -name "*.test.ts" -o -name "*.spec.ts" | xargs rg -l "tool.*result|continuation" | head -5Repository: TanStack/ai Length of output: 444 🏁 Script executed: # Check the server-client-sequence test for nested tool scenarios
head -100 packages/typescript/smoke-tests/e2e/tests/tools/server-client-sequence.spec.tsRepository: TanStack/ai Length of output: 3195 🏁 Script executed: # Look for tests that involve multiple tool calls or nested tool scenarios
rg -n "tool.*result|nested|continuation" packages/typescript/smoke-tests/e2e/tests/tools/server-client-sequence.spec.ts -B 2 -A 5Repository: TanStack/ai Length of output: 702 🏁 Script executed: # Check if there are any comments in chat-client.ts explaining the streamCompletedSuccessfully logic
rg -n "streamCompletedSuccessfully" packages/typescript/ai-client/src/chat-client.ts -B 5 -A 2Repository: TanStack/ai Length of output: 993 🏁 Script executed: # Look at the broader context where streamCompletedSuccessfully is set
sed -n '300,400p' packages/typescript/ai-client/src/chat-client.ts | cat -nRepository: TanStack/ai Length of output: 3857 Consider adding a clarifying comment about state synchronization after nested streams. The implementation correctly handles nested streams from Adding a comment like the following would clarify this pattern for future maintainers: 🤖 Prompt for AI Agents |
||
| } | ||
| } | ||
|
|
||
|
|
@@ -490,8 +539,18 @@ export class ChatClient { | |
| * Check if we should continue the flow and do so if needed | ||
| */ | ||
| private async checkForContinuation(): Promise<void> { | ||
| // Prevent duplicate continuation attempts | ||
| if (this.continuationPending || this.isLoading) { | ||
| return | ||
| } | ||
|
|
||
| if (this.shouldAutoSend()) { | ||
| await this.streamResponse() | ||
| this.continuationPending = true | ||
| try { | ||
| await this.streamResponse() | ||
| } finally { | ||
| this.continuationPending = false | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.