@@ -454,45 +454,16 @@ public Flowable<Event> runLive(InvocationContext invocationContext) {
454
454
455
455
String eventIdForSendData = Event .generateEventId ();
456
456
LlmAgent agent = (LlmAgent ) invocationContext .agent ();
457
- BaseLlm llm =
458
- agent . resolvedModel (). model (). isPresent ()
459
- ? agent . resolvedModel (). model (). get ()
460
- : LlmRegistry . getLlm ( agent . resolvedModel (). modelName (). get ());
457
+ BaseLlm llm = agent . resolvedModel (). model ()
458
+ . orElseGet (() -> LlmRegistry . getLlm ( agent . resolvedModel (). modelName ()
459
+ . orElseThrow ()));
460
+
461
461
BaseLlmConnection connection = llm .connect (llmRequestAfterPreprocess );
462
462
Completable historySent =
463
463
llmRequestAfterPreprocess .contents ().isEmpty ()
464
464
? Completable .complete ()
465
465
: Completable .defer (
466
- () -> {
467
- Span sendDataSpan =
468
- Telemetry .getTracer ().spanBuilder ("send_data" ).startSpan ();
469
- try (Scope scope = sendDataSpan .makeCurrent ()) {
470
- return connection
471
- .sendHistory (llmRequestAfterPreprocess .contents ())
472
- .doOnComplete (
473
- () -> {
474
- try (Scope innerScope = sendDataSpan .makeCurrent ()) {
475
- Telemetry .traceSendData (
476
- invocationContext ,
477
- eventIdForSendData ,
478
- llmRequestAfterPreprocess .contents ());
479
- }
480
- })
481
- .doOnError (
482
- error -> {
483
- sendDataSpan .setStatus (
484
- StatusCode .ERROR , error .getMessage ());
485
- sendDataSpan .recordException (error );
486
- try (Scope innerScope = sendDataSpan .makeCurrent ()) {
487
- Telemetry .traceSendData (
488
- invocationContext ,
489
- eventIdForSendData ,
490
- llmRequestAfterPreprocess .contents ());
491
- }
492
- })
493
- .doFinally (sendDataSpan ::end );
494
- }
495
- });
466
+ () -> sendHistory (invocationContext , connection , llmRequestAfterPreprocess , eventIdForSendData ));
496
467
497
468
Flowable <LiveRequest > liveRequests = invocationContext .liveRequestQueue ().get ().get ();
498
469
Disposable sendTask =
@@ -588,7 +559,33 @@ public void onError(Throwable e) {
588
559
});
589
560
}
590
561
591
- /**
562
+ private static Completable sendHistory (
563
+ final InvocationContext invocationContext ,
564
+ final BaseLlmConnection connection ,
565
+ final LlmRequest llmRequestAfterPreprocess ,
566
+ final String eventIdForSendData ) {
567
+ Span sendDataSpan = Telemetry .getTracer ().spanBuilder ("send_data" ).startSpan ();
568
+ try (Scope scope = sendDataSpan .makeCurrent ()) {
569
+ return connection
570
+ .sendHistory (llmRequestAfterPreprocess .contents ())
571
+ .doOnEvent (
572
+ error -> {
573
+ if (error != null ) {
574
+ sendDataSpan .setStatus (StatusCode .ERROR , error .getMessage ());
575
+ sendDataSpan .recordException (error );
576
+ }
577
+ try (Scope innerScope = sendDataSpan .makeCurrent ()) {
578
+ Telemetry .traceSendData (
579
+ invocationContext ,
580
+ eventIdForSendData ,
581
+ llmRequestAfterPreprocess .contents ());
582
+ }
583
+ })
584
+ .doFinally (sendDataSpan ::end );
585
+ }
586
+ }
587
+
588
+ /**
592
589
* Builds an {@link Event} from LLM response, request, and base event data.
593
590
*
594
591
* <p>Populates the event with LLM output and tool function call metadata.
0 commit comments