Skip to content
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
355b47f
add pre-connect audio buffer
longcw Apr 29, 2025
b732d68
read buffer as a list
longcw Apr 29, 2025
ca15aa0
clean logs
longcw Apr 29, 2025
c161ebd
add wait_for_data for PreConnectAudioData
longcw Apr 29, 2025
b77db38
Merge remote-tracking branch 'origin/main' into longc/pre-connect-audio
longcw Apr 29, 2025
4b0073f
support multi participant
longcw Apr 29, 2025
ad3e6b7
update PreConnectAudioData
longcw Apr 29, 2025
1ea9d6b
move PreConnectAudioHandler to room io
longcw Apr 30, 2025
13171f9
update comments
longcw Apr 30, 2025
f066d26
update comments
longcw Apr 30, 2025
92fa347
clean up timeout
longcw Apr 30, 2025
ca5537e
check PRE_CONNECT_AUDIO_ATTRIBUTE == true
longcw Apr 30, 2025
7da1d04
add warning for PreConnectAudioHandler
longcw May 1, 2025
f1a7df1
Merge remote-tracking branch 'origin/main' into longc/pre-connect-aud…
longcw May 1, 2025
420f153
Merge remote-tracking branch 'origin/main' into longc/pre-connect-aud…
longcw May 1, 2025
5dc0b23
update logs and timeout
longcw May 1, 2025
439f377
Merge remote-tracking branch 'origin/main' into longc/pre-connect-aud…
longcw May 5, 2025
c964dd3
update audio buffer timeout
longcw May 5, 2025
181fed8
pass publication to forward task
longcw May 5, 2025
b3125e0
Merge remote-tracking branch 'origin/main' into longc/pre-connect-aud…
longcw May 7, 2025
fb6aefd
use track id as the buffer key
longcw May 7, 2025
ed6c8c1
check AudioTrackFeature TF_PRECONNECT_BUFFER
longcw May 8, 2025
8e733f9
Merge remote-tracking branch 'origin/main' into longc/pre-connect-aud…
longcw May 8, 2025
cc04cb1
ruff
longcw May 8, 2025
2b73403
upgrade livekit sdk
longcw May 8, 2025
ba90cf5
Merge remote-tracking branch 'origin/main' into longc/pre-connect-aud…
longcw May 13, 2025
73fe674
set pre_connect_audio default True
longcw May 13, 2025
9086ba1
remove pre connect audio buffer example
longcw May 13, 2025
56e278e
log warning for connection order only the buffer used
longcw May 13, 2025
cc093f5
set default timeout for pre-connect buffer to 3s
longcw May 14, 2025
c4308f1
Merge remote-tracking branch 'origin/main' into longc/pre-connect-aud…
longcw May 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions examples/voice_agents/pre_connect_audio_buffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import logging

from dotenv import load_dotenv

from livekit.agents import Agent, AgentSession, JobContext, JobProcess, WorkerOptions, cli
from livekit.agents.voice.room_io import RoomInputOptions, RoomIO
from livekit.plugins import deepgram, openai, silero
from livekit.plugins.turn_detector.multilingual import MultilingualModel

logger = logging.getLogger("pre-connect-audio-agent")

load_dotenv()


# This example demonstrates the pre-connect audio buffer for instant connect feature.
# It captures what users say during connection time so they don't need to wait for the connection.
# The process works in three steps:
# 1. RoomIO is set up with pre_connect_audio=True
# 2. When connecting to the room, the client sends any audio spoken before connection
# 3. This pre-connection audio is combined with new audio after connection is established
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd consider just omitting the specific example and ensuring the other examples are compatible instead



class MyAgent(Agent):
def __init__(self) -> None:
super().__init__(
instructions="Your name is Kelly. You would interact with users via voice."
"with that in mind keep your responses concise and to the point."
"You are curious and friendly, and have a sense of humor.",
)


def prewarm(proc: JobProcess):
proc.userdata["vad"] = silero.VAD.load()


async def entrypoint(ctx: JobContext):
session = AgentSession(
vad=ctx.proc.userdata["vad"],
# any combination of STT, LLM, TTS, or realtime API can be used
llm=openai.LLM(model="gpt-4o-mini"),
stt=deepgram.STT(model="nova-3", language="multi"),
tts=openai.TTS(voice="ash"),
# use LiveKit's turn detection model
turn_detection=MultilingualModel(),
)

# create and start room_io with pre-connect audio enabled to register the byte stream handler
room_io = RoomIO(
agent_session=session,
room=ctx.room,
input_options=RoomInputOptions(pre_connect_audio=True, pre_connect_audio_timeout=5.0),
)
await room_io.start()

# connect to room to notify the client to send pre-connect audio buffer,
await ctx.connect()

# put the time consuming model/knowledge loading here
# user audio buffering starts after the room_io is started

await session.start(agent=MyAgent())


if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, prewarm_fnc=prewarm))
87 changes: 82 additions & 5 deletions livekit-agents/livekit/agents/voice/room_io/_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@

import asyncio
from abc import ABC, abstractmethod
from collections.abc import AsyncIterator
from collections.abc import AsyncIterator, Iterable
from typing import Generic, TypeVar, Union

from typing_extensions import override

import livekit.rtc as rtc
from livekit.rtc._proto.track_pb2 import AudioTrackFeature

from ...log import logger
from ...utils import aio, log_exceptions
from ..io import AudioInput, VideoInput
from ._pre_connect_audio import PreConnectAudioHandler

T = TypeVar("T", bound=Union[rtc.AudioFrame, rtc.VideoFrame])

Expand Down Expand Up @@ -126,14 +128,15 @@ async def _forward_task(
self,
old_task: asyncio.Task | None,
stream: rtc.VideoStream | rtc.AudioStream,
track_source: rtc.TrackSource.ValueType,
publication: rtc.RemoteTrackPublication,
participant: rtc.RemoteParticipant,
) -> None:
if old_task:
await aio.cancel_and_wait(old_task)

extra = {
"participant": self._participant_identity,
"source": rtc.TrackSource.Name(track_source),
"participant": participant.identity,
"source": rtc.TrackSource.Name(publication.source),
}
logger.debug("start reading stream", extra=extra)
async for event in stream:
Expand Down Expand Up @@ -172,7 +175,7 @@ def _on_track_available(
self._stream = self._create_stream(track)
self._publication = publication
self._forward_atask = asyncio.create_task(
self._forward_task(self._forward_atask, self._stream, publication.source)
self._forward_task(self._forward_atask, self._stream, publication, participant)
)
return True

Expand Down Expand Up @@ -202,13 +205,15 @@ def __init__(
sample_rate: int,
num_channels: int,
noise_cancellation: rtc.NoiseCancellationOptions | None,
pre_connect_audio_handler: PreConnectAudioHandler | None,
) -> None:
_ParticipantInputStream.__init__(
self, room=room, track_source=rtc.TrackSource.SOURCE_MICROPHONE
)
self._sample_rate = sample_rate
self._num_channels = num_channels
self._noise_cancellation = noise_cancellation
self._pre_connect_audio_handler = pre_connect_audio_handler

@override
def _create_stream(self, track: rtc.Track) -> rtc.AudioStream:
Expand All @@ -219,6 +224,78 @@ def _create_stream(self, track: rtc.Track) -> rtc.AudioStream:
noise_cancellation=self._noise_cancellation,
)

@override
async def _forward_task(
self,
old_task: asyncio.Task | None,
stream: rtc.AudioStream,
publication: rtc.RemoteTrackPublication,
participant: rtc.RemoteParticipant,
) -> None:
if (
self._pre_connect_audio_handler
and publication.track
and AudioTrackFeature.TF_PRECONNECT_BUFFER in publication.audio_features
):
try:
duration = 0
frames = await self._pre_connect_audio_handler.wait_for_data(publication.track.sid)
for frame in self._resample_frames(frames):
if self._attached:
await self._data_ch.send(frame)
duration += frame.duration
if frames:
logger.debug(
"pre-connect audio buffer pushed",
extra={
"duration": duration,
"track_id": publication.track.sid,
"participant": participant.identity,
},
)

except asyncio.TimeoutError:
logger.warning(
"timeout waiting for pre-connect audio buffer",
extra={
"duration": duration,
"track_id": publication.track.sid,
"participant": participant.identity,
},
)

except Exception as e:
logger.error(
"error reading pre-connect audio buffer",
extra={
"error": e,
"track_id": publication.track.sid,
"participant": participant.identity,
},
)

await super()._forward_task(old_task, stream, publication, participant)

def _resample_frames(self, frames: Iterable[rtc.AudioFrame]) -> Iterable[rtc.AudioFrame]:
resampler: rtc.AudioResampler | None = None
for frame in frames:
if (
not resampler
and self._sample_rate is not None
and frame.sample_rate != self._sample_rate
):
resampler = rtc.AudioResampler(
input_rate=frame.sample_rate, output_rate=self._sample_rate
)

if resampler:
yield from resampler.push(frame)
else:
yield frame

if resampler:
yield from resampler.flush()


class _ParticipantVideoInputStream(_ParticipantInputStream[rtc.VideoFrame], VideoInput):
def __init__(self, room: rtc.Room) -> None:
Expand Down
117 changes: 117 additions & 0 deletions livekit-agents/livekit/agents/voice/room_io/_pre_connect_audio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import asyncio
import contextlib
import time
from dataclasses import dataclass, field

from livekit import rtc

from ..agent import logger, utils

PRE_CONNECT_AUDIO_BUFFER_STREAM = "lk.agent.pre-connect-audio-buffer"


@dataclass
class _PreConnectAudioBuffer:
timestamp: float
frames: list[rtc.AudioFrame] = field(default_factory=list)


class PreConnectAudioHandler:
def __init__(self, room: rtc.Room, *, timeout: float, max_delta_s: float = 1.0):
self._room = room
self._timeout = timeout
self._max_delta_s = max_delta_s

# track id -> buffer
self._buffers: dict[str, asyncio.Future[_PreConnectAudioBuffer]] = {}
self._tasks: set[asyncio.Task] = set()

def register(self):
def _handler(reader: rtc.ByteStreamReader, participant_id: str):
task = asyncio.create_task(self._read_audio_task(reader, participant_id))
self._tasks.add(task)
task.add_done_callback(self._tasks.discard)

def _on_timeout():
logger.warning(
"pre-connect audio received but not completed in time",
extra={"participant": participant_id},
)
if not task.done():
task.cancel()

timeout_handle = asyncio.get_event_loop().call_later(self._timeout, _on_timeout)
task.add_done_callback(lambda _: timeout_handle.cancel())

try:
self._room.register_byte_stream_handler(PRE_CONNECT_AUDIO_BUFFER_STREAM, _handler)
except ValueError:
logger.warning(
f"pre-connect audio handler for {PRE_CONNECT_AUDIO_BUFFER_STREAM} "
"already registered, ignoring"
)

async def aclose(self):
self._room.unregister_byte_stream_handler(PRE_CONNECT_AUDIO_BUFFER_STREAM)
await utils.aio.cancel_and_wait(*self._tasks)

async def wait_for_data(self, track_id: str) -> list[rtc.AudioFrame]:
self._buffers.setdefault(track_id, asyncio.Future())
fut = self._buffers[track_id]

try:
if fut.done():
buf = fut.result()
if (delta := time.time() - buf.timestamp) > self._max_delta_s:
logger.warning(
"pre-connect audio buffer is too old",
extra={"track_id": track_id, "delta_time": delta},
)
return []
return buf.frames

buf = await asyncio.wait_for(fut, self._timeout)
return buf.frames
finally:
self._buffers.pop(track_id)

@utils.log_exceptions(logger=logger)
async def _read_audio_task(self, reader: rtc.ByteStreamReader, participant_id: str):
if not (track_id := reader.info.attributes.get("trackId")):
logger.warning(
"pre-connect audio received but no trackId", extra={"participant": participant_id}
)
return

if (fut := self._buffers.get(track_id)) and fut.done():
# reset the buffer if it's already set
self._buffers.pop(track_id)
self._buffers.setdefault(track_id, asyncio.Future())
fut = self._buffers[track_id]

buf = _PreConnectAudioBuffer(timestamp=time.time())
try:
sample_rate = int(reader.info.attributes["sampleRate"])
num_channels = int(reader.info.attributes["channels"])

duration = 0
audio_stream = utils.audio.AudioByteStream(sample_rate, num_channels)
async for chunk in reader:
for frame in audio_stream.push(chunk):
buf.frames.append(frame)
duration += frame.duration

for frame in audio_stream.flush():
buf.frames.append(frame)
duration += frame.duration

logger.debug(
"pre-connect audio received",
extra={"duration": duration, "track_id": track_id, "participant": participant_id},
)

with contextlib.suppress(asyncio.InvalidStateError):
fut.set_result(buf)
except Exception as e:
with contextlib.suppress(asyncio.InvalidStateError):
fut.set_exception(e)
20 changes: 20 additions & 0 deletions livekit-agents/livekit/agents/voice/room_io/room_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from ..events import AgentStateChangedEvent, UserInputTranscribedEvent
from ..io import AudioInput, AudioOutput, TextOutput, VideoInput
from ..transcription import TranscriptSynchronizer
from ._pre_connect_audio import PreConnectAudioHandler

if TYPE_CHECKING:
from ..agent_session import AgentSession
Expand Down Expand Up @@ -70,6 +71,10 @@ class RoomInputOptions:
participant_identity: NotGivenOr[str] = NOT_GIVEN
"""The participant to link to. If not provided, link to the first participant.
Can be overridden by the `participant` argument of RoomIO constructor or `set_participant`."""
pre_connect_audio: bool = True
"""Pre-connect audio enabled or not."""
pre_connect_audio_timeout: float = 5.0
"""The pre-connect audio will be ignored if it doesn't arrive within this time."""
Copy link
Member

@theomonnom theomonnom May 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove the timeout? ideally we know if we should wait for it based on the attributes. (So exposing a configuration for it isn't very useful, we can have a hard limit in the worst scenario)

Suggested change
pre_connect_audio_timeout: float = 5.0
"""The pre-connect audio will be ignored if it doesn't arrive within this time."""

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the byte stream is not stable in my test, a short timeout is needed IMO. otherwise user will need to wait for tens of seconds to get the response from the agent. I think it's better expose this to user if they want to decrease or increase the value based on needs.

Copy link
Member

@theomonnom theomonnom May 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, my intuition is that it should just work OOTB since it's going to be a features implemented in all our client SDKs, so this value could just be internal

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still prefer to expose this option, to let users know that enabling this feature is not free, there is a timeout if the buffer is missing

Copy link
Contributor

@bcherry bcherry May 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds like a client side bug? If the attribute is set that means the client will send the stream. if the stream never comes that's definitely a bug and we should fix it. do you have more info on what you mean by "the byte stream is not stable in my test"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the configurable timeout still needed with the other fixed to SFU?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it's not needed? There is still a chance we cannot receive the buffer bc of the network right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just asking back to @theomonnom 's original question: do users need to think about this or can it just be internal? it's confusing anyways because it's not clear from the name if this covers the time from agent start to stream arrive or stream end. it actually turns out it's just the time from stream header to stream trailer.

no strong feelings here just re-raising the original question.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to stream end. Sometimes stream head received but the tail lost, we don't rely on the stream head for anything.

I still think it's better to expose the timeout option, mainly to notify the user there is a timeout if the buffer is not arrived in time, if the buffer lost there will be a side effect.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing tail seems to be correlated with the way of sending (frame-by-frame vs max stream chunk size).



@dataclass
Expand Down Expand Up @@ -125,8 +130,19 @@ def __init__(
self._tasks: set[asyncio.Task] = set()
self._update_state_task: asyncio.Task | None = None

self._pre_connect_audio_handler: PreConnectAudioHandler | None = None

async def start(self) -> None:
# -- create inputs --
if self._input_options.pre_connect_audio:
self._pre_connect_audio_handler = PreConnectAudioHandler(
room=self._room,
timeout=self._input_options.pre_connect_audio_timeout,
)
if self._room.isconnected():
logger.warning("pre-connect audio handler registered after room is connected")
self._pre_connect_audio_handler.register()

if self._input_options.text_enabled:
try:
self._room.register_text_stream_handler(TOPIC_CHAT, self._on_user_text_input)
Expand All @@ -144,6 +160,7 @@ async def start(self) -> None:
sample_rate=self._input_options.audio_sample_rate,
num_channels=self._input_options.audio_num_channels,
noise_cancellation=self._input_options.noise_cancellation,
pre_connect_audio_handler=self._pre_connect_audio_handler,
)

# -- create outputs --
Expand Down Expand Up @@ -209,6 +226,9 @@ async def aclose(self) -> None:
if self._init_atask:
await utils.aio.cancel_and_wait(self._init_atask)

if self._pre_connect_audio_handler:
await self._pre_connect_audio_handler.aclose()

if self._audio_input:
await self._audio_input.aclose()
if self._video_input:
Expand Down
2 changes: 1 addition & 1 deletion livekit-agents/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ classifiers = [
]
dependencies = [
"click~=8.1",
"livekit>=1.0.6,<2",
"livekit>=1.0.7,<2",
"livekit-api>=1.0.2,<2",
"livekit-protocol~=1.0",
"protobuf>=3",
Expand Down
Loading