3131 LLMResponseCompletedEvent ,
3232 RealtimeUserSpeechTranscriptionEvent ,
3333 RealtimeAgentSpeechTranscriptionEvent ,
34+ RealtimeAudioOutputEvent ,
3435)
3536from ..llm .llm import AudioLLM , LLM , VideoLLM
3637from ..llm .realtime import Realtime
7071tracer : Tracer = trace .get_tracer ("agents" )
7172
7273
73-
7474class Agent :
7575 """
7676 Agent class makes it easy to build your own video AI.
@@ -227,7 +227,9 @@ def __init__(
227227
228228 async def _finish_llm_turn (self ):
229229 if self ._pending_turn is None or self ._pending_turn .response is None :
230- raise ValueError ("Finish LLM turn should only be called after self._pending_turn is set" )
230+ raise ValueError (
231+ "Finish LLM turn should only be called after self._pending_turn is set"
232+ )
231233 turn = self ._pending_turn
232234 self ._pending_turn = None
233235 event = turn .response
@@ -252,6 +254,7 @@ def setup_event_handling(self):
252254 self .events .subscribe (self ._on_turn_event )
253255
254256 if self .stt :
257+
255258 @self .stt .events .subscribe
256259 async def on_turn_ended (event : TurnEndedEvent ):
257260 logger .info ("Received TurnEndedEvent %s" , event )
@@ -322,10 +325,11 @@ async def on_stt_transcript_event_create_response(event: STTTranscriptEvent):
322325
323326 # if turn detection is disabled, treat the transcript event as an end of turn
324327 if not self .turn_detection_enabled :
325- self .events .send (TurnEndedEvent (
326- participant = event .participant ,
327- ))
328-
328+ self .events .send (
329+ TurnEndedEvent (
330+ participant = event .participant ,
331+ )
332+ )
329333
330334 # TODO: chat event handling needs work
331335
@@ -634,7 +638,12 @@ async def _apply(self, function_name: str, *args, **kwargs):
634638 ):
635639 func = getattr (subclass , function_name )
636640 if func is not None :
637- await func (* args , ** kwargs )
641+ try :
642+ await func (* args , ** kwargs )
643+ except Exception as e :
644+ self .logger .exception (
645+ f"Error calling { function_name } on { subclass .__class__ .__name__ } : { e } "
646+ )
638647
639648 def _end_tracing (self ):
640649 if self ._root_span is not None :
@@ -879,7 +888,10 @@ async def _reply_to_audio_consumer(self) -> None:
879888 pcm , participant , conversation = self .conversation
880889 )
881890
882- if participant and getattr (participant , "user_id" , None ) != self .agent_user .id :
891+ if (
892+ participant
893+ and getattr (participant , "user_id" , None ) != self .agent_user .id
894+ ):
883895 # first forward to processors
884896 # Extract audio bytes for processors using the proper PCM data structure
885897 # PCM data has: format, sample_rate, samples, pts, dts, time_base
@@ -1044,6 +1056,8 @@ async def _on_turn_event(self, event: TurnStartedEvent | TurnEndedEvent) -> None
10441056 self .logger .info (
10451057 f"👉 Turn started - participant speaking { participant_id } : { event .confidence } "
10461058 )
1059+ if self ._audio_track is not None :
1060+ await self ._audio_track .flush ()
10471061 else :
10481062 # Agent itself started speaking - this is normal
10491063 participant_id = (
@@ -1078,9 +1092,15 @@ async def _on_turn_event(self, event: TurnStartedEvent | TurnEndedEvent) -> None
10781092 self ._pending_user_transcripts [participant .user_id ] = ""
10791093 # cancel the old task if the text changed in the meantime
10801094
1081- if self ._pending_turn is not None and self ._pending_turn .input != transcript :
1082- logger .debug ("Eager turn and completed turn didn't match. Cancelling in flight response. %s vs %s " ,
1083- self ._pending_turn .input , transcript )
1095+ if (
1096+ self ._pending_turn is not None
1097+ and self ._pending_turn .input != transcript
1098+ ):
1099+ logger .debug (
1100+ "Eager turn and completed turn didn't match. Cancelling in flight response. %s vs %s " ,
1101+ self ._pending_turn .input ,
1102+ transcript ,
1103+ )
10841104 if self ._pending_turn .task :
10851105 self ._pending_turn .task .cancel ()
10861106
@@ -1092,18 +1112,22 @@ async def _on_turn_event(self, event: TurnStartedEvent | TurnEndedEvent) -> None
10921112 input = transcript ,
10931113 participant = event .participant ,
10941114 started_at = datetime .datetime .now (),
1095- turn_finished = not event .eager_end_of_turn
1115+ turn_finished = not event .eager_end_of_turn ,
10961116 )
10971117 self ._pending_turn = llm_turn
1098- task = asyncio .create_task (self .simple_response (transcript , event .participant ))
1118+ task = asyncio .create_task (
1119+ self .simple_response (transcript , event .participant )
1120+ )
10991121 llm_turn .task = task
11001122 elif self ._pending_turn .input == transcript :
11011123 # same text as pending turn
11021124 is_finished = not event .eager_end_of_turn
11031125 now = datetime .datetime .now ()
11041126 elapsed = now - self ._pending_turn .started_at
1105- logger .debug ("Marking eager turn as completed. Eager turn detection saved %.2f" ,
1106- elapsed .total_seconds () * 1000 )
1127+ logger .debug (
1128+ "Marking eager turn as completed. Eager turn detection saved %.2f" ,
1129+ elapsed .total_seconds () * 1000 ,
1130+ )
11071131
11081132 if is_finished :
11091133 self ._pending_turn .turn_finished = True
@@ -1113,8 +1137,9 @@ async def _on_turn_event(self, event: TurnStartedEvent | TurnEndedEvent) -> None
11131137 @property
11141138 def turn_detection_enabled (self ):
11151139 # return true if either turn detection or stt provide turn detection capabilities
1116- return self .turn_detection is not None or (self .stt is not None and self .stt .turn_detection )
1117-
1140+ return self .turn_detection is not None or (
1141+ self .stt is not None and self .stt .turn_detection
1142+ )
11181143
11191144 @property
11201145 def publish_audio (self ) -> bool :
@@ -1246,30 +1271,17 @@ def _validate_configuration(self):
12461271 def _prepare_rtc (self ):
12471272 # Variables are now initialized in __init__
12481273
1249- # Set up audio track if TTS is available
12501274 if self .publish_audio :
1251- if _is_audio_llm (self .llm ):
1252- self ._audio_track = self .llm .output_audio_track
1253- self .logger .info ("🎵 Using Realtime provider output track for audio" )
1254- elif self .audio_publishers :
1255- # Get the first audio publisher to create the track
1256- audio_publisher = self .audio_publishers [0 ]
1257- self ._audio_track = audio_publisher .publish_audio_track ()
1258- self .logger .info ("🎵 Audio track initialized from audio publisher" )
1259- else :
1260- # Default to WebRTC-friendly format unless configured differently
1261- framerate = 48000
1262- stereo = True
1263- self ._audio_track = self .edge .create_audio_track (
1264- framerate = framerate , stereo = stereo
1265- )
1266- # Inform TTS of desired output format so it can resample accordingly
1267- if self .tts :
1268- channels = 2 if stereo else 1
1269- self .tts .set_output_format (
1270- sample_rate = framerate ,
1271- channels = channels ,
1272- )
1275+ framerate = 48000
1276+ stereo = True
1277+ self ._audio_track = self .edge .create_audio_track (
1278+ framerate = framerate , stereo = stereo
1279+ )
1280+
1281+ @self .events .subscribe
1282+ async def forward_audio (event : RealtimeAudioOutputEvent ):
1283+ if self ._audio_track is not None :
1284+ await self ._audio_track .write (event .data )
12731285
12741286 # Set up video track if video publishers are available
12751287 if self .publish_video :
0 commit comments