fix: AgentLoop array-input TypeError + truly streaming stream()#1
fix: AgentLoop array-input TypeError + truly streaming stream()#1
Conversation
Agent-Logs-Url: https://github.com/sk-wang/hao-code/sessions/d14fc524-6624-4dd7-bc43-d614a3ab0d93 Co-authored-by: sk-wang <14271914+sk-wang@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Fixes two SDK/agent-loop issues: (1) a crash when the first user input is provided as a mixed content-block array (text + image), and (2) stream() APIs that were buffering all events until completion rather than yielding in real time.
Changes:
- Prevent
AgentLoop::run()session-title generation from calling string functions on array$userInputby extracting text from content blocks. - Rework
HaoCode::stream()andConversation::stream()to yield text/tool events in real time via a Fiber + queue mechanism.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| app/Services/Agent/AgentLoop.php | Extracts title text safely from array content blocks to avoid TypeError. |
| app/Sdk/HaoCode.php | Implements true streaming by yielding messages as callbacks arrive (Fiber + SplQueue). |
| app/Sdk/Conversation.php | Mirrors the Fiber-based real-time streaming behavior for multi-turn conversations. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| $fiber->start(); | ||
|
|
||
| foreach ($messages as $msg) { | ||
| yield $msg; | ||
| while (! $fiber->isTerminated()) { | ||
| while (! $queue->isEmpty()) { | ||
| yield $queue->dequeue(); | ||
| } | ||
| if (! $fiber->isTerminated()) { | ||
| $fiber->resume(); | ||
| } | ||
| } |
There was a problem hiding this comment.
The Fiber only advances when the consumer continues iterating this Generator. If a caller breaks out early (or the Generator is garbage-collected) the Fiber can remain suspended inside an onText/onTool* callback, leaving AgentLoop::run() mid-flight and the underlying streaming HTTP response potentially un-cancelled. Consider wrapping the streaming loop in a try/finally that aborts/cancels the run (e.g., $loop->abort() and, if the Fiber is suspended, $fiber->throw(...) to unwind) so resources are released even on early termination.
| while (! $fiber->isTerminated()) { | ||
| while (! $queue->isEmpty()) { | ||
| yield $queue->dequeue(); | ||
| } | ||
| if (! $fiber->isTerminated()) { | ||
| $fiber->resume(); | ||
| } | ||
| } | ||
|
|
||
| yield Message::result( | ||
| text: $response, | ||
| usage: self::extractUsage($loop), | ||
| cost: $loop->getEstimatedCost(), | ||
| sessionId: $loop->getSessionManager()->getSessionId(), | ||
| ); | ||
| } catch (\Throwable $e) { | ||
| yield Message::error($e->getMessage()); | ||
| // Drain any messages enqueued before the fiber's final termination | ||
| while (! $queue->isEmpty()) { | ||
| yield $queue->dequeue(); | ||
| } | ||
|
|
||
| if ($thrownException !== null) { | ||
| yield Message::error($thrownException->getMessage()); | ||
|
|
||
| return; | ||
| } | ||
|
|
||
| yield Message::result( | ||
| text: $response ?? '', | ||
| usage: self::extractUsage($loop), | ||
| cost: $loop->getEstimatedCost(), | ||
| sessionId: $loop->getSessionManager()->getSessionId(), | ||
| ); | ||
| } |
There was a problem hiding this comment.
The Fiber-driven streaming loop is not covered by a test that verifies messages are yielded incrementally (i.e., before the full AgentLoop::run() completes). Current SDK tests collect all messages after iteration completes, which would still pass even if stream() buffered everything. Adding an assertion that the first yielded Message arrives before the final result (e.g., timing-based with mocked SSE delays, or using a controllable mock stream) would prevent regressions back to buffered behavior.
| $fiber->start(); | ||
|
|
||
| while (! $fiber->isTerminated()) { | ||
| while (! $queue->isEmpty()) { | ||
| yield $queue->dequeue(); | ||
| } | ||
| if (! $fiber->isTerminated()) { | ||
| $fiber->resume(); | ||
| } | ||
| } |
There was a problem hiding this comment.
Same early-termination issue as HaoCode::stream(): if the caller stops iterating this Generator, the Fiber can remain suspended and the underlying streaming request may not be cancelled. Please add a try/finally to ensure the AgentLoop run is aborted/cancelled and the Fiber is unwound/terminated when the Generator is closed early.
| while (! $fiber->isTerminated()) { | ||
| while (! $queue->isEmpty()) { | ||
| yield $queue->dequeue(); | ||
| } | ||
| if (! $fiber->isTerminated()) { | ||
| $fiber->resume(); | ||
| } | ||
| } | ||
|
|
||
| yield Message::result( | ||
| text: $response, | ||
| usage: [ | ||
| 'input_tokens' => $this->loop->getTotalInputTokens(), | ||
| 'output_tokens' => $this->loop->getTotalOutputTokens(), | ||
| 'cache_creation_tokens' => $this->loop->getCacheCreationTokens(), | ||
| 'cache_read_tokens' => $this->loop->getCacheReadTokens(), | ||
| ], | ||
| cost: $this->loop->getEstimatedCost(), | ||
| sessionId: $this->loop->getSessionManager()->getSessionId(), | ||
| ); | ||
| } catch (\Throwable $e) { | ||
| yield Message::error($e->getMessage()); | ||
| // Drain any messages enqueued before the fiber's final termination | ||
| while (! $queue->isEmpty()) { | ||
| yield $queue->dequeue(); | ||
| } | ||
|
|
||
| if ($thrownException !== null) { | ||
| yield Message::error($thrownException->getMessage()); | ||
|
|
||
| return; | ||
| } | ||
|
|
||
| yield Message::result( | ||
| text: $response ?? '', | ||
| usage: [ | ||
| 'input_tokens' => $this->loop->getTotalInputTokens(), | ||
| 'output_tokens' => $this->loop->getTotalOutputTokens(), | ||
| 'cache_creation_tokens' => $this->loop->getCacheCreationTokens(), | ||
| 'cache_read_tokens' => $this->loop->getCacheReadTokens(), | ||
| ], | ||
| cost: $this->loop->getEstimatedCost(), | ||
| sessionId: $this->loop->getSessionManager()->getSessionId(), | ||
| ); | ||
| } |
There was a problem hiding this comment.
Conversation::stream() behavior changed from buffered to real-time yielding, but existing tests don’t assert incremental delivery (they only validate the final collected sequence). Add a test that would fail if stream() buffers until completion (e.g., mocked SSE with intentional delays and measuring yield timing, or a mock stream that blocks until the test advances it).
| if (is_string($userInput)) { | ||
| $rawTitle = $userInput; | ||
| } else { | ||
| // Array of content blocks (e.g. text + image): extract text parts. | ||
| // Each block is normally an array like ['type'=>'text','text'=>'...'], | ||
| // but guard against bare strings that may appear in mixed inputs. | ||
| $texts = array_filter( | ||
| array_map(fn ($block) => is_string($block) ? $block : (is_array($block) ? ($block['text'] ?? null) : null), $userInput), | ||
| fn ($t) => is_string($t) && $t !== '', | ||
| ); | ||
| $rawTitle = implode(' ', $texts); | ||
| } | ||
| $firstInput = mb_substr($rawTitle, 0, 80); |
There was a problem hiding this comment.
There’s no test covering the updated auto-title path when $userInput is an array of mixed content blocks. Since this change fixes a concrete TypeError crash, consider adding a unit test that calls AgentLoop::run() with array userInput (including non-text blocks) and asserts it does not throw and that SessionManager::setTitle() is called with the extracted/truncated text.
Two bugs: a crash when
$userInputis an image+text content block array, andstream()methods that buffer all messages untilrun()completes—defeating the purpose of a streaming API.Bug 1 —
TypeErrorinAgentLoop::run()with array$userInputmb_substr($userInput, 0, 80)in the session-title auto-generation path crashes when input is a mixed content-block array (e.g. text + image). Fixed to extract text from content blocks, with a guard for bare-string elements:Bug 2 —
stream()was buffering, not streamingBoth
HaoCode::stream()andConversation::stream()collected all events into a$messages[]array inside callbacks and yielded them as a batch only after the blocking$loop->run()returned. Callbacks fired in real time but callers saw a batch dump.Fix: Run
$loop->run()inside a PHP Fiber. Each callback (onText,onToolStart,onToolComplete) enqueues its message and callsFiber::getCurrent()?->suspend(), handing control back to the Generator which drains the queue and yields immediately before calling$fiber->resume().Requires PHP ≥ 8.1 (Fibers); project requires
^8.2.