- 
                Notifications
    
You must be signed in to change notification settings  - Fork 1.4k
 
add pre-connected audio buffer #2171
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 27 commits
355b47f
              b732d68
              ca15aa0
              c161ebd
              b77db38
              4b0073f
              ad3e6b7
              1ea9d6b
              13171f9
              f066d26
              92fa347
              ca5537e
              7da1d04
              f1a7df1
              420f153
              5dc0b23
              439f377
              c964dd3
              181fed8
              b3125e0
              fb6aefd
              ed6c8c1
              8e733f9
              cc04cb1
              2b73403
              ba90cf5
              73fe674
              9086ba1
              56e278e
              cc093f5
              c4308f1
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,65 @@ | ||
| import logging | ||
| 
     | 
||
| from dotenv import load_dotenv | ||
| 
     | 
||
| from livekit.agents import Agent, AgentSession, JobContext, JobProcess, WorkerOptions, cli | ||
| from livekit.agents.voice.room_io import RoomInputOptions, RoomIO | ||
| from livekit.plugins import deepgram, openai, silero | ||
| from livekit.plugins.turn_detector.multilingual import MultilingualModel | ||
| 
     | 
||
| logger = logging.getLogger("pre-connect-audio-agent") | ||
| 
     | 
||
| load_dotenv() | ||
| 
     | 
||
| 
     | 
||
| # This example demonstrates the pre-connect audio buffer for instant connect feature. | ||
| # It captures what users say during connection time so they don't need to wait for the connection. | ||
| # The process works in three steps: | ||
| # 1. RoomIO is set up with pre_connect_audio=True | ||
| # 2. When connecting to the room, the client sends any audio spoken before connection | ||
| # 3. This pre-connection audio is combined with new audio after connection is established | ||
| 
     | 
||
| 
     | 
||
| class MyAgent(Agent): | ||
| def __init__(self) -> None: | ||
| super().__init__( | ||
| instructions="Your name is Kelly. You would interact with users via voice." | ||
| "with that in mind keep your responses concise and to the point." | ||
| "You are curious and friendly, and have a sense of humor.", | ||
| ) | ||
| 
     | 
||
| 
     | 
||
| def prewarm(proc: JobProcess): | ||
| proc.userdata["vad"] = silero.VAD.load() | ||
| 
     | 
||
| 
     | 
||
| async def entrypoint(ctx: JobContext): | ||
| session = AgentSession( | ||
| vad=ctx.proc.userdata["vad"], | ||
| # any combination of STT, LLM, TTS, or realtime API can be used | ||
| llm=openai.LLM(model="gpt-4o-mini"), | ||
| stt=deepgram.STT(model="nova-3", language="multi"), | ||
| tts=openai.TTS(voice="ash"), | ||
| # use LiveKit's turn detection model | ||
| turn_detection=MultilingualModel(), | ||
| ) | ||
| 
     | 
||
| # create and start room_io with pre-connect audio enabled to register the byte stream handler | ||
| room_io = RoomIO( | ||
| agent_session=session, | ||
| room=ctx.room, | ||
| input_options=RoomInputOptions(pre_connect_audio=True, pre_connect_audio_timeout=5.0), | ||
| ) | ||
| await room_io.start() | ||
| 
     | 
||
| # connect to room to notify the client to send pre-connect audio buffer, | ||
| await ctx.connect() | ||
| 
     | 
||
| # put the time consuming model/knowledge loading here | ||
| # user audio buffering starts after the room_io is started | ||
| 
     | 
||
| await session.start(agent=MyAgent()) | ||
| 
     | 
||
| 
     | 
||
| if __name__ == "__main__": | ||
| cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, prewarm_fnc=prewarm)) | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,117 @@ | ||
| import asyncio | ||
| import contextlib | ||
| import time | ||
| from dataclasses import dataclass, field | ||
| 
     | 
||
| from livekit import rtc | ||
| 
     | 
||
| from ..agent import logger, utils | ||
| 
     | 
||
| PRE_CONNECT_AUDIO_BUFFER_STREAM = "lk.agent.pre-connect-audio-buffer" | ||
| 
     | 
||
| 
     | 
||
| @dataclass | ||
| class _PreConnectAudioBuffer: | ||
| timestamp: float | ||
| frames: list[rtc.AudioFrame] = field(default_factory=list) | ||
| 
     | 
||
| 
     | 
||
| class PreConnectAudioHandler: | ||
| def __init__(self, room: rtc.Room, *, timeout: float, max_delta_s: float = 1.0): | ||
| self._room = room | ||
| self._timeout = timeout | ||
| self._max_delta_s = max_delta_s | ||
| 
     | 
||
| # track id -> buffer | ||
| self._buffers: dict[str, asyncio.Future[_PreConnectAudioBuffer]] = {} | ||
| self._tasks: set[asyncio.Task] = set() | ||
| 
     | 
||
| def register(self): | ||
| def _handler(reader: rtc.ByteStreamReader, participant_id: str): | ||
| task = asyncio.create_task(self._read_audio_task(reader, participant_id)) | ||
| self._tasks.add(task) | ||
| task.add_done_callback(self._tasks.discard) | ||
| 
     | 
||
| def _on_timeout(): | ||
| logger.warning( | ||
| "pre-connect audio received but not completed in time", | ||
| extra={"participant": participant_id}, | ||
| ) | ||
| if not task.done(): | ||
| task.cancel() | ||
| 
     | 
||
| timeout_handle = asyncio.get_event_loop().call_later(self._timeout, _on_timeout) | ||
| task.add_done_callback(lambda _: timeout_handle.cancel()) | ||
| 
     | 
||
| try: | ||
| self._room.register_byte_stream_handler(PRE_CONNECT_AUDIO_BUFFER_STREAM, _handler) | ||
| except ValueError: | ||
| logger.warning( | ||
| f"pre-connect audio handler for {PRE_CONNECT_AUDIO_BUFFER_STREAM} " | ||
| "already registered, ignoring" | ||
| ) | ||
| 
     | 
||
| async def aclose(self): | ||
| self._room.unregister_byte_stream_handler(PRE_CONNECT_AUDIO_BUFFER_STREAM) | ||
| await utils.aio.cancel_and_wait(*self._tasks) | ||
| 
     | 
||
| async def wait_for_data(self, track_id: str) -> list[rtc.AudioFrame]: | ||
| self._buffers.setdefault(track_id, asyncio.Future()) | ||
| fut = self._buffers[track_id] | ||
| 
     | 
||
| try: | ||
| if fut.done(): | ||
| buf = fut.result() | ||
| if (delta := time.time() - buf.timestamp) > self._max_delta_s: | ||
| logger.warning( | ||
| "pre-connect audio buffer is too old", | ||
| extra={"track_id": track_id, "delta_time": delta}, | ||
| ) | ||
| return [] | ||
| return buf.frames | ||
| 
     | 
||
| buf = await asyncio.wait_for(fut, self._timeout) | ||
| return buf.frames | ||
| finally: | ||
| self._buffers.pop(track_id) | ||
| 
     | 
||
| @utils.log_exceptions(logger=logger) | ||
| async def _read_audio_task(self, reader: rtc.ByteStreamReader, participant_id: str): | ||
| if not (track_id := reader.info.attributes.get("trackId")): | ||
| logger.warning( | ||
| "pre-connect audio received but no trackId", extra={"participant": participant_id} | ||
| ) | ||
| return | ||
| 
     | 
||
| if (fut := self._buffers.get(track_id)) and fut.done(): | ||
| # reset the buffer if it's already set | ||
| self._buffers.pop(track_id) | ||
| self._buffers.setdefault(track_id, asyncio.Future()) | ||
| fut = self._buffers[track_id] | ||
| 
     | 
||
| buf = _PreConnectAudioBuffer(timestamp=time.time()) | ||
| try: | ||
| sample_rate = int(reader.info.attributes["sampleRate"]) | ||
| num_channels = int(reader.info.attributes["channels"]) | ||
| 
     | 
||
| duration = 0 | ||
| audio_stream = utils.audio.AudioByteStream(sample_rate, num_channels) | ||
| async for chunk in reader: | ||
| for frame in audio_stream.push(chunk): | ||
| buf.frames.append(frame) | ||
| duration += frame.duration | ||
| 
     | 
||
| for frame in audio_stream.flush(): | ||
| buf.frames.append(frame) | ||
| duration += frame.duration | ||
| 
     | 
||
| logger.debug( | ||
| "pre-connect audio received", | ||
| extra={"duration": duration, "track_id": track_id, "participant": participant_id}, | ||
| ) | ||
| 
     | 
||
| with contextlib.suppress(asyncio.InvalidStateError): | ||
| fut.set_result(buf) | ||
| except Exception as e: | ||
| with contextlib.suppress(asyncio.InvalidStateError): | ||
| fut.set_exception(e) | 
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| 
          
            
          
           | 
    @@ -19,6 +19,7 @@ | |||||
| from ..events import AgentStateChangedEvent, UserInputTranscribedEvent | ||||||
| from ..io import AudioInput, AudioOutput, TextOutput, VideoInput | ||||||
| from ..transcription import TranscriptSynchronizer | ||||||
| from ._pre_connect_audio import PreConnectAudioHandler | ||||||
| 
     | 
||||||
| if TYPE_CHECKING: | ||||||
| from ..agent_session import AgentSession | ||||||
| 
          
            
          
           | 
    @@ -70,6 +71,10 @@ class RoomInputOptions: | |||||
| participant_identity: NotGivenOr[str] = NOT_GIVEN | ||||||
| """The participant to link to. If not provided, link to the first participant. | ||||||
| Can be overridden by the `participant` argument of RoomIO constructor or `set_participant`.""" | ||||||
| pre_connect_audio: bool = True | ||||||
| """Pre-connect audio enabled or not.""" | ||||||
| pre_connect_audio_timeout: float = 5.0 | ||||||
| """The pre-connect audio will be ignored if it doesn't arrive within this time.""" | ||||||
                
       | 
||||||
| pre_connect_audio_timeout: float = 5.0 | |
| """The pre-connect audio will be ignored if it doesn't arrive within this time.""" | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the byte stream is not stable in my test, a short timeout is needed IMO. otherwise user will need to wait for tens of seconds to get the response from the agent. I think it's better expose this to user if they want to decrease or increase the value based on needs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, my intuition is that it should just work OOTB since it's going to be a features implemented in all our client SDKs, so this value could just be internal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still prefer to expose this option, to let users know that enabling this feature is not free, there is a timeout if the buffer is missing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds like a client side bug? If the attribute is set that means the client will send the stream. if the stream never comes that's definitely a bug and we should fix it. do you have more info on what you mean by "the byte stream is not stable in my test"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the configurable timeout still needed with the other fixed to SFU?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why it's not needed? There is still a chance we cannot receive the buffer bc of the network right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just asking back to @theomonnom 's original question: do users need to think about this or can it just be internal? it's confusing anyways because it's not clear from the name if this covers the time from agent start to stream arrive or stream end. it actually turns out it's just the time from stream header to stream trailer.
no strong feelings here just re-raising the original question.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's to stream end. Sometimes stream head received but the tail lost, we don't rely on the stream head for anything.
I still think it's better to expose the timeout option, mainly to notify the user there is a timeout if the buffer is not arrived in time, if the buffer lost there will be a side effect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing tail seems to be correlated with the way of sending (frame-by-frame vs max stream chunk size).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd consider just omitting the specific example and ensuring the other examples are compatible instead