diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index f01403f5..2432fad7 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -12,16 +12,20 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations + import asyncio -from typing import Optional, AsyncIterator +from dataclasses import dataclass +from typing import Any, AsyncIterator, Optional -from ._ffi_client import FfiHandle, FfiClient +from ._ffi_client import FfiClient, FfiHandle from ._proto import audio_frame_pb2 as proto_audio_frame from ._proto import ffi_pb2 as proto_ffi +from ._proto.track_pb2 import TrackSource from ._utils import RingQueue, task_done_logger from .audio_frame import AudioFrame +from .participant import Participant from .track import Track -from dataclasses import dataclass @dataclass @@ -39,30 +43,95 @@ def __init__( capacity: int = 0, sample_rate: int = 48000, num_channels: int = 1, + **kwargs, ) -> None: - self._track = track + self._track: Track | None = track + self._sample_rate = sample_rate + self._num_channels = num_channels self._loop = loop or asyncio.get_event_loop() self._ffi_queue = FfiClient.instance.queue.subscribe(self._loop) - self._queue: RingQueue[AudioFrameEvent] = RingQueue(capacity) - - req = proto_ffi.FfiRequest() - new_audio_stream = req.new_audio_stream - new_audio_stream.track_handle = track._ffi_handle.handle - new_audio_stream.type = proto_audio_frame.AudioStreamType.AUDIO_STREAM_NATIVE - new_audio_stream.sample_rate = sample_rate - new_audio_stream.num_channels = num_channels - resp = FfiClient.instance.request(req) - - stream_info = resp.new_audio_stream.stream - self._ffi_handle = FfiHandle(stream_info.handle.id) - self._info = stream_info + self._queue: RingQueue[AudioFrameEvent | None] = RingQueue(capacity) self._task = self._loop.create_task(self._run()) self._task.add_done_callback(task_done_logger) + stream: Any = None + if "participant" in kwargs: + stream = self._create_owned_stream_from_participant( + participant=kwargs["participant"], track_source=kwargs["track_source"] + ) + else: + stream = self._create_owned_stream() + self._ffi_handle = FfiHandle(stream.handle.id) + self._info = stream.info + + @classmethod + def from_participant( + cls, + *, + participant: Participant, + track_source: TrackSource.ValueType, + loop: Optional[asyncio.AbstractEventLoop] = None, + capacity: int = 0, + sample_rate: int = 48000, + num_channels: int = 1, + ) -> AudioStream: + return AudioStream( + participant=participant, + track_source=track_source, + loop=loop, + capacity=capacity, + track=None, # type: ignore + sample_rate=sample_rate, + num_channels=num_channels, + ) + + @classmethod + def from_track( + cls, + *, + track: Track, + loop: Optional[asyncio.AbstractEventLoop] = None, + capacity: int = 0, + sample_rate: int = 48000, + num_channels: int = 1, + ) -> AudioStream: + return AudioStream( + track=track, + loop=loop, + capacity=capacity, + sample_rate=sample_rate, + num_channels=num_channels, + ) + def __del__(self) -> None: FfiClient.instance.queue.unsubscribe(self._ffi_queue) + def _create_owned_stream(self) -> Any: + req = proto_ffi.FfiRequest() + new_audio_stream = req.new_audio_stream + new_audio_stream.track_handle = self._track._ffi_handle.handle + new_audio_stream.sample_rate = self._sample_rate + new_audio_stream.num_channels = self._num_channels + new_audio_stream.type = proto_audio_frame.AudioStreamType.AUDIO_STREAM_NATIVE + resp = FfiClient.instance.request(req) + return resp.new_audio_stream.stream + + def _create_owned_stream_from_participant( + self, participant: Participant, track_source: TrackSource.ValueType + ) -> Any: + req = proto_ffi.FfiRequest() + audio_stream_from_participant = req.audio_stream_from_participant + audio_stream_from_participant.participant_handle = ( + participant._ffi_handle.handle + ) + audio_stream_from_participant.type = ( + proto_audio_frame.AudioStreamType.AUDIO_STREAM_NATIVE + ) + audio_stream_from_participant.track_source = track_source + resp = FfiClient.instance.request(req) + return resp.audio_stream_from_participant.stream + async def _run(self): while True: event = await self._ffi_queue.wait_for(self._is_event) @@ -74,6 +143,7 @@ async def _run(self): event = AudioFrameEvent(frame) self._queue.put(event) elif audio_event.HasField("eos"): + self._queue.put(None) break FfiClient.instance.queue.unsubscribe(self._ffi_queue) @@ -91,4 +161,9 @@ def __aiter__(self) -> AsyncIterator[AudioFrameEvent]: async def __anext__(self) -> AudioFrameEvent: if self._task.done(): raise StopAsyncIteration - return await self._queue.get() + + item = await self._queue.get() + if item is None: + raise StopAsyncIteration + + return item diff --git a/livekit-rtc/livekit/rtc/room.py b/livekit-rtc/livekit/rtc/room.py index df5693ee..d774cb17 100644 --- a/livekit-rtc/livekit/rtc/room.py +++ b/livekit-rtc/livekit/rtc/room.py @@ -30,7 +30,7 @@ from .e2ee import E2EEManager, E2EEOptions from .participant import LocalParticipant, Participant, RemoteParticipant from .track import RemoteAudioTrack, RemoteVideoTrack -from .track_publication import TrackPublication, RemoteTrackPublication +from .track_publication import RemoteTrackPublication, TrackPublication from .transcription import TranscriptionSegment EventTypes = Literal[ diff --git a/livekit-rtc/livekit/rtc/video_stream.py b/livekit-rtc/livekit/rtc/video_stream.py index adc2368e..e9f28735 100644 --- a/livekit-rtc/livekit/rtc/video_stream.py +++ b/livekit-rtc/livekit/rtc/video_stream.py @@ -12,14 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations + import asyncio from dataclasses import dataclass -from typing import Optional, AsyncIterator +from typing import Any, AsyncIterator, Optional -from ._ffi_client import FfiHandle, FfiClient +from ._ffi_client import FfiClient, FfiHandle from ._proto import ffi_pb2 as proto_ffi from ._proto import video_frame_pb2 as proto_video_frame +from ._proto.track_pb2 import TrackSource from ._utils import RingQueue, task_done_logger +from .participant import Participant from .track import Track from .video_frame import VideoFrame @@ -37,34 +41,98 @@ class VideoStream: def __init__( self, track: Track, - *, loop: Optional[asyncio.AbstractEventLoop] = None, capacity: int = 0, format: Optional[proto_video_frame.VideoBufferType.ValueType] = None, + **kwargs, ) -> None: - self._track = track self._loop = loop or asyncio.get_event_loop() self._ffi_queue = FfiClient.instance.queue.subscribe(self._loop) - self._queue: RingQueue[VideoFrameEvent] = RingQueue(capacity) + self._queue: RingQueue[VideoFrameEvent | None] = RingQueue(capacity) + self._track: Track | None = track + self._format = format + self._capacity = capacity + self._format = format + stream: Any = None + if "participant" in kwargs: + stream = self._create_owned_stream_from_participant( + participant=kwargs["participant"], track_source=kwargs["track_source"] + ) + else: + stream = self._create_owned_stream() + + self._ffi_handle = FfiHandle(stream.handle.id) + self._info = stream.info + + self._task = self._loop.create_task(self._run()) + self._task.add_done_callback(task_done_logger) + + @classmethod + def from_participant( + cls, + *, + participant: Participant, + track_source: TrackSource.ValueType, + loop: Optional[asyncio.AbstractEventLoop] = None, + format: Optional[proto_video_frame.VideoBufferType.ValueType] = None, + capacity: int = 0, + ) -> VideoStream: + return VideoStream( + participant=participant, + track_source=track_source, + loop=loop, + capacity=capacity, + format=format, + track=None, # type: ignore + ) + + @classmethod + def from_track( + cls, + *, + track: Track, + loop: Optional[asyncio.AbstractEventLoop] = None, + format: Optional[proto_video_frame.VideoBufferType.ValueType] = None, + capacity: int = 0, + ) -> VideoStream: + return VideoStream( + track=track, + loop=loop, + capacity=capacity, + format=format, + ) + def __del__(self) -> None: + FfiClient.instance.queue.unsubscribe(self._ffi_queue) + + def _create_owned_stream(self) -> Any: req = proto_ffi.FfiRequest() new_video_stream = req.new_video_stream - new_video_stream.track_handle = track._ffi_handle.handle + new_video_stream.track_handle = self._track._ffi_handle.handle new_video_stream.type = proto_video_frame.VideoStreamType.VIDEO_STREAM_NATIVE - if format is not None: - new_video_stream.format = format + if self._format is not None: + new_video_stream.format = self._format new_video_stream.normalize_stride = True - resp = FfiClient.instance.request(req) + return resp.new_video_stream.stream - stream_info = resp.new_video_stream.stream - self._ffi_handle = FfiHandle(stream_info.handle.id) - self._info = stream_info.info - self._task = self._loop.create_task(self._run()) - self._task.add_done_callback(task_done_logger) - - def __del__(self) -> None: - FfiClient.instance.queue.unsubscribe(self._ffi_queue) + def _create_owned_stream_from_participant( + self, participant: Participant, track_source: TrackSource.ValueType + ) -> Any: + req = proto_ffi.FfiRequest() + video_stream_from_participant = req.video_stream_from_participant + video_stream_from_participant.participant_handle = ( + participant._ffi_handle.handle + ) + video_stream_from_participant.type = ( + proto_video_frame.VideoStreamType.VIDEO_STREAM_NATIVE + ) + video_stream_from_participant.track_source = track_source + video_stream_from_participant.normalize_stride = True + if self._format is not None: + video_stream_from_participant.format = self._format + resp = FfiClient.instance.request(req) + return resp.video_stream_from_participant.stream async def _run(self) -> None: while True: @@ -100,4 +168,9 @@ def __aiter__(self) -> AsyncIterator[VideoFrameEvent]: async def __anext__(self) -> VideoFrameEvent: if self._task.done(): raise StopAsyncIteration - return await self._queue.get() + + item = await self._queue.get() + if item is None: + raise StopAsyncIteration + + return item