Skip to content
107 changes: 90 additions & 17 deletions livekit-rtc/livekit/rtc/audio_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import asyncio
from typing import Optional, AsyncIterator
from dataclasses import dataclass
from typing import Any, AsyncIterator, Optional

from ._ffi_client import FfiHandle, FfiClient
from ._ffi_client import FfiClient, FfiHandle
from ._proto import audio_frame_pb2 as proto_audio_frame
from ._proto import ffi_pb2 as proto_ffi
from ._proto.track_pb2 import TrackSource
from ._utils import RingQueue, task_done_logger
from .audio_frame import AudioFrame
from .participant import Participant
from .track import Track
from dataclasses import dataclass


@dataclass
Expand All @@ -39,30 +43,93 @@ def __init__(
capacity: int = 0,
sample_rate: int = 48000,
num_channels: int = 1,
**kwargs,
) -> None:
self._track = track
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self._track = track
self._track: Track | None = track

self._sample_rate = sample_rate
self._num_channels = num_channels
self._loop = loop or asyncio.get_event_loop()
self._ffi_queue = FfiClient.instance.queue.subscribe(self._loop)
self._queue: RingQueue[AudioFrameEvent] = RingQueue(capacity)

req = proto_ffi.FfiRequest()
new_audio_stream = req.new_audio_stream
new_audio_stream.track_handle = track._ffi_handle.handle
new_audio_stream.type = proto_audio_frame.AudioStreamType.AUDIO_STREAM_NATIVE
new_audio_stream.sample_rate = sample_rate
new_audio_stream.num_channels = num_channels
resp = FfiClient.instance.request(req)

stream_info = resp.new_audio_stream.stream
self._ffi_handle = FfiHandle(stream_info.handle.id)
self._info = stream_info
self._queue: RingQueue[AudioFrameEvent | None] = RingQueue(capacity)

self._task = self._loop.create_task(self._run())
self._task.add_done_callback(task_done_logger)

stream: Any = None
if "participant" in kwargs:
stream = self._create_owned_stream_from_participant(
participant=kwargs["participant"], track_source=kwargs["track_source"]
)
else:
stream = self._create_owned_stream()
self._ffi_handle = FfiHandle(stream.handle.id)
self._info = stream.info

@classmethod
def from_participant(
cls,
participant: Participant,
track_source: TrackSource.ValueType,
loop: Optional[asyncio.AbstractEventLoop] = None,
capacity: int = 0,
sample_rate: int = 48000,
num_channels: int = 1,
) -> AudioStream:
return AudioStream(
participant=participant,
track_source=track_source,
loop=loop,
capacity=capacity,
track=None, # type: ignore
sample_rate=sample_rate,
num_channels=num_channels,
)

@classmethod
def from_track(
cls,
track: Track,
loop: Optional[asyncio.AbstractEventLoop] = None,
capacity: int = 0,
sample_rate: int = 48000,
num_channels: int = 1,
) -> AudioStream:
return AudioStream(
track=track,
loop=loop,
capacity=capacity,
sample_rate=sample_rate,
num_channels=num_channels,
)

def __del__(self) -> None:
FfiClient.instance.queue.unsubscribe(self._ffi_queue)

def _create_owned_stream(self) -> Any:
req = proto_ffi.FfiRequest()
new_audio_stream = req.new_audio_stream
new_audio_stream.track_handle = self._track._ffi_handle.handle
new_audio_stream.sample_rate = self._sample_rate
new_audio_stream.num_channels = self._num_channels
new_audio_stream.type = proto_audio_frame.AudioStreamType.AUDIO_STREAM_NATIVE
resp = FfiClient.instance.request(req)
return resp.new_audio_stream.stream

def _create_owned_stream_from_participant(
self, participant: Participant, track_source: TrackSource.ValueType
) -> Any:
req = proto_ffi.FfiRequest()
audio_stream_from_participant = req.audio_stream_from_participant
audio_stream_from_participant.participant_handle = (
participant._ffi_handle.handle
)
audio_stream_from_participant.type = (
proto_audio_frame.AudioStreamType.AUDIO_STREAM_NATIVE
)
audio_stream_from_participant.track_source = track_source
resp = FfiClient.instance.request(req)
return resp.audio_stream_from_participant.stream

async def _run(self):
while True:
event = await self._ffi_queue.wait_for(self._is_event)
Expand All @@ -74,6 +141,7 @@ async def _run(self):
event = AudioFrameEvent(frame)
self._queue.put(event)
elif audio_event.HasField("eos"):
self._queue.put(None)
break

FfiClient.instance.queue.unsubscribe(self._ffi_queue)
Expand All @@ -91,4 +159,9 @@ def __aiter__(self) -> AsyncIterator[AudioFrameEvent]:
async def __anext__(self) -> AudioFrameEvent:
if self._task.done():
raise StopAsyncIteration
return await self._queue.get()

item = await self._queue.get()
if item is None:
raise StopAsyncIteration

return item
2 changes: 1 addition & 1 deletion livekit-rtc/livekit/rtc/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from .e2ee import E2EEManager, E2EEOptions
from .participant import LocalParticipant, Participant, RemoteParticipant
from .track import RemoteAudioTrack, RemoteVideoTrack
from .track_publication import TrackPublication, RemoteTrackPublication
from .track_publication import RemoteTrackPublication, TrackPublication
from .transcription import TranscriptionSegment

EventTypes = Literal[
Expand Down
107 changes: 89 additions & 18 deletions livekit-rtc/livekit/rtc/video_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import asyncio
from dataclasses import dataclass
from typing import Optional, AsyncIterator
from typing import Any, AsyncIterator, Optional

from ._ffi_client import FfiHandle, FfiClient
from ._ffi_client import FfiClient, FfiHandle
from ._proto import ffi_pb2 as proto_ffi
from ._proto import video_frame_pb2 as proto_video_frame
from ._proto.track_pb2 import TrackSource
from ._utils import RingQueue, task_done_logger
from .participant import Participant
from .track import Track
from .video_frame import VideoFrame

Expand All @@ -37,34 +41,96 @@ class VideoStream:
def __init__(
self,
track: Track,
*,
loop: Optional[asyncio.AbstractEventLoop] = None,
capacity: int = 0,
format: Optional[proto_video_frame.VideoBufferType.ValueType] = None,
**kwargs,
) -> None:
self._track = track
self._loop = loop or asyncio.get_event_loop()
self._ffi_queue = FfiClient.instance.queue.subscribe(self._loop)
self._queue: RingQueue[VideoFrameEvent] = RingQueue(capacity)
self._queue: RingQueue[VideoFrameEvent | None] = RingQueue(capacity)
self._track = track
self._format = format
self._capacity = capacity
self._format = format
stream: Any = None
if "participant" in kwargs:
stream = self._create_owned_stream_from_participant(
participant=kwargs["participant"], track_source=kwargs["track_source"]
)
else:
stream = self._create_owned_stream()

self._ffi_handle = FfiHandle(stream.handle.id)
self._info = stream.info

self._task = self._loop.create_task(self._run())
self._task.add_done_callback(task_done_logger)

@classmethod
def from_participant(
cls,
participant: Participant,
track_source: TrackSource.ValueType,
loop: Optional[asyncio.AbstractEventLoop] = None,
format: Optional[proto_video_frame.VideoBufferType.ValueType] = None,
capacity: int = 0,
) -> VideoStream:
return VideoStream(
participant=participant,
track_source=track_source,
loop=loop,
capacity=capacity,
format=format,
track=None, # type: ignore
)

@classmethod
def from_track(
cls,
track: Track,
loop: Optional[asyncio.AbstractEventLoop] = None,
format: Optional[proto_video_frame.VideoBufferType.ValueType] = None,
capacity: int = 0,
) -> VideoStream:
return VideoStream(
track=track,
loop=loop,
capacity=capacity,
format=format,
)

def __del__(self) -> None:
FfiClient.instance.queue.unsubscribe(self._ffi_queue)

def _create_owned_stream(self) -> Any:
req = proto_ffi.FfiRequest()
new_video_stream = req.new_video_stream
new_video_stream.track_handle = track._ffi_handle.handle
new_video_stream.track_handle = self._track._ffi_handle.handle
new_video_stream.type = proto_video_frame.VideoStreamType.VIDEO_STREAM_NATIVE
if format is not None:
new_video_stream.format = format
if self._format is not None:
new_video_stream.format = self._format
new_video_stream.normalize_stride = True

resp = FfiClient.instance.request(req)
return resp.new_video_stream.stream

stream_info = resp.new_video_stream.stream
self._ffi_handle = FfiHandle(stream_info.handle.id)
self._info = stream_info.info
self._task = self._loop.create_task(self._run())
self._task.add_done_callback(task_done_logger)

def __del__(self) -> None:
FfiClient.instance.queue.unsubscribe(self._ffi_queue)
def _create_owned_stream_from_participant(
self, participant: Participant, track_source: TrackSource.ValueType
) -> Any:
req = proto_ffi.FfiRequest()
video_stream_from_participant = req.video_stream_from_participant
video_stream_from_participant.participant_handle = (
participant._ffi_handle.handle
)
video_stream_from_participant.type = (
proto_video_frame.VideoStreamType.VIDEO_STREAM_NATIVE
)
video_stream_from_participant.track_source = track_source
video_stream_from_participant.normalize_stride = True
if self._format is not None:
video_stream_from_participant.format = self._format
resp = FfiClient.instance.request(req)
return resp.video_stream_from_participant.stream

async def _run(self) -> None:
while True:
Expand Down Expand Up @@ -100,4 +166,9 @@ def __aiter__(self) -> AsyncIterator[VideoFrameEvent]:
async def __anext__(self) -> VideoFrameEvent:
if self._task.done():
raise StopAsyncIteration
return await self._queue.get()

item = await self._queue.get()
if item is None:
raise StopAsyncIteration

return item
Loading