diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index 875b8db2..d82c2792 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -65,6 +65,7 @@ def __init__( num_channels: int = 1, frame_size_ms: int | None = None, noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None, + noise_cancellation_leave_open: bool = False, **kwargs: Any, ) -> None: """Initialize an `AudioStream` instance. @@ -81,6 +82,9 @@ def __init__( noise_cancellation (Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]], optional): If noise cancellation is used, pass a `NoiseCancellationOptions` or `FrameProcessor[AudioFrame]` instance created by the noise cancellation module. + noise_cancellation_leave_open (bool): + When the audio stream closes, leaves the FrameProcessor in an unclosed state so it + can be used with another AudioStream. Example: ```python @@ -113,11 +117,13 @@ def __init__( self._audio_filter_module: str | None = None self._audio_filter_options: dict[str, Any] | None = None self._processor: FrameProcessor[AudioFrame] | None = None + self._processor_leave_open = False if isinstance(noise_cancellation, NoiseCancellationOptions): self._audio_filter_module = noise_cancellation.module_id self._audio_filter_options = noise_cancellation.options elif isinstance(noise_cancellation, FrameProcessor): self._processor = noise_cancellation + self._processor_leave_open = noise_cancellation_leave_open self._task = self._loop.create_task(self._run()) self._task.add_done_callback(task_done_logger) @@ -132,6 +138,9 @@ def __init__( self._ffi_handle = FfiHandle(stream.handle.id) self._info = stream.info + if self._track is not None: + self._track._register_audio_stream(self) + @classmethod def from_participant( cls, @@ -144,6 +153,7 @@ def from_participant( num_channels: int = 1, frame_size_ms: int | None = None, noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None, + noise_cancellation_leave_open: bool = False, ) -> AudioStream: """Create an `AudioStream` from a participant's audio track. @@ -179,8 +189,9 @@ def from_participant( track=None, # type: ignore sample_rate=sample_rate, num_channels=num_channels, - noise_cancellation=noise_cancellation, frame_size_ms=frame_size_ms, + noise_cancellation=noise_cancellation, + noise_cancellation_leave_open=noise_cancellation_leave_open, ) @classmethod @@ -194,6 +205,7 @@ def from_track( num_channels: int = 1, frame_size_ms: int | None = None, noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None, + noise_cancellation_leave_open: bool = False, ) -> AudioStream: """Create an `AudioStream` from an existing audio track. @@ -203,9 +215,12 @@ def from_track( capacity (int, optional): The capacity of the internal frame queue. Defaults to 0 (unbounded). sample_rate (int, optional): The sample rate for the audio stream in Hz. Defaults to 48000. num_channels (int, optional): The number of audio channels. Defaults to 1. - noise_cancellation (Optional[NoiseCancellationOptions], optional): - If noise cancellation is used, pass a `NoiseCancellationOptions` instance + noise_cancellation (Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]], optional): + If noise cancellation is used, pass a `NoiseCancellationOptions` or `FrameProcessor[AudioFrame]` instance created by the noise cancellation module. + noise_cancellation_leave_open (bool): + When the audio stream closes, leaves the FrameProcessor in an unclosed state so it + can be used with another AudioStream. Returns: AudioStream: An instance of `AudioStream` that can be used to receive audio frames. @@ -225,10 +240,31 @@ def from_track( capacity=capacity, sample_rate=sample_rate, num_channels=num_channels, - noise_cancellation=noise_cancellation, frame_size_ms=frame_size_ms, + noise_cancellation=noise_cancellation, + noise_cancellation_leave_open=noise_cancellation_leave_open, ) + def _on_processor_stream_info_updated( + self, + *, + room_name: str, + participant_identity: str, + publication_sid: str, + ) -> None: + if self._processor is None: + return + self._processor._on_stream_info_updated( + room_name=room_name, + participant_identity=participant_identity, + publication_sid=publication_sid, + ) + + def _on_processor_credentials_updated(self, *, token: str, url: str) -> None: + if self._processor is None: + return + self._processor._on_credentials_updated(token=token, url=url) + def __del__(self) -> None: FfiClient.instance.queue.unsubscribe(self._ffi_queue) @@ -303,6 +339,10 @@ async def aclose(self) -> None: This method cleans up resources associated with the audio stream and waits for any pending operations to complete. """ + if self._processor is not None and not self._processor_leave_open: + self._processor._close() + if self._track is not None: + self._track._unregister_audio_stream(self) self._ffi_handle.dispose() await self._task diff --git a/livekit-rtc/livekit/rtc/room.py b/livekit-rtc/livekit/rtc/room.py index e3619d9c..f3808b16 100644 --- a/livekit-rtc/livekit/rtc/room.py +++ b/livekit-rtc/livekit/rtc/room.py @@ -733,10 +733,14 @@ def _on_room_event(self, event: proto_room.RoomEvent) -> None: sid = event.local_track_published.track_sid lpublication = self.local_participant.track_publications[sid] ltrack = lpublication.track + if ltrack is not None: + ltrack._set_room(self) self.emit("local_track_published", lpublication, ltrack) elif which == "local_track_unpublished": sid = event.local_track_unpublished.publication_sid lpublication = self.local_participant.track_publications[sid] + if lpublication.track is not None: + lpublication.track._set_room(None) self.emit("local_track_unpublished", lpublication) elif which == "local_track_republished": # The SDK auto-republished a local track during a full @@ -777,10 +781,12 @@ def _on_room_event(self, event: proto_room.RoomEvent) -> None: rpublication._subscribed = True if track_info.kind == TrackKind.KIND_VIDEO: remote_video_track = RemoteVideoTrack(owned_track_info) + remote_video_track._set_room(self) rpublication._track = remote_video_track self.emit("track_subscribed", remote_video_track, rpublication, rparticipant) elif track_info.kind == TrackKind.KIND_AUDIO: remote_audio_track = RemoteAudioTrack(owned_track_info) + remote_audio_track._set_room(self) rpublication._track = remote_audio_track self.emit("track_subscribed", remote_audio_track, rpublication, rparticipant) elif which == "track_unsubscribed": @@ -788,6 +794,8 @@ def _on_room_event(self, event: proto_room.RoomEvent) -> None: rparticipant = self._remote_participants[identity] rpublication = rparticipant.track_publications[event.track_unsubscribed.track_sid] rtrack = rpublication.track + if rtrack is not None: + rtrack._set_room(None) rpublication._track = None rpublication._subscribed = False self.emit("track_unsubscribed", rtrack, rpublication, rparticipant) diff --git a/livekit-rtc/livekit/rtc/track.py b/livekit-rtc/livekit/rtc/track.py index 0eeb3465..3cffb702 100644 --- a/livekit-rtc/livekit/rtc/track.py +++ b/livekit-rtc/livekit/rtc/track.py @@ -12,7 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING, List, Union +from __future__ import annotations + +import weakref +from typing import TYPE_CHECKING, List, Optional, Union from ._ffi_client import FfiHandle, FfiClient from ._proto import ffi_pb2 as proto_ffi from ._proto import track_pb2 as proto_track @@ -20,6 +23,8 @@ if TYPE_CHECKING: from .audio_source import AudioSource + from .audio_stream import AudioStream + from .room import Room from .video_source import VideoSource @@ -27,6 +32,78 @@ class Track: def __init__(self, owned_info: proto_track.OwnedTrack): self._info = owned_info.info self._ffi_handle = FfiHandle(owned_info.handle.id) + self._room_ref: Optional[weakref.ref[Room]] = None + self._audio_streams: weakref.WeakSet[AudioStream] = weakref.WeakSet() + + def _resolve_room(self) -> Optional[Room]: + return self._room_ref() if self._room_ref is not None else None + + def _set_room(self, room: Optional[Room]) -> None: + old_room = self._resolve_room() + if old_room is not room: + if old_room is not None: + old_room.off("token_refreshed", self._on_room_token_refreshed) + if room is not None: + room.on("token_refreshed", self._on_room_token_refreshed) + + self._room_ref = weakref.ref(room) if room is not None else None + + for stream in self._audio_streams: + self._push_processor_metadata_to_stream(stream, room) + + def _on_room_token_refreshed(self) -> None: + room = self._resolve_room() + if room is None or room._token is None or room._server_url is None: + return + for stream in self._audio_streams: + stream._on_processor_credentials_updated(token=room._token, url=room._server_url) + + def _push_processor_metadata_to_stream(self, stream: AudioStream, room: Optional[Room]) -> None: + if room is None: + # track left a room — clear processor's room context + # FIXME: This isn't really good, and I can't figure out what should happen here + # Closing the processor doesn't work (the track could get added to another room later) + # Empty values like this don't work, because it causes a drm::Error in the plugin + # Talk to lukas about this in a 1:1 and see if he can think of anything better + stream._on_processor_stream_info_updated( + room_name="", participant_identity="", publication_sid="" + ) + # stream._on_processor_credentials_updated(token="", url="") + return + + identity = "" + pub_sid = "" + track_sid = self.sid + if track_sid: + for participant in room.remote_participants.values(): + publication = participant.track_publications.get(track_sid) + if publication is not None: + identity, pub_sid = participant.identity, publication.sid + break + else: + local = room._local_participant + if local is not None: + for local_publication in local.track_publications.values(): + if local_publication.sid == track_sid: + identity, pub_sid = local.identity, local_publication.sid + break + + stream._on_processor_stream_info_updated( + room_name=room.name, + participant_identity=identity, + publication_sid=pub_sid, + ) + if room._token is not None and room._server_url is not None: + stream._on_processor_credentials_updated(token=room._token, url=room._server_url) + + def _register_audio_stream(self, stream: AudioStream) -> None: + self._audio_streams.add(stream) + room = self._resolve_room() + if room is not None: + self._push_processor_metadata_to_stream(stream, room) + + def _unregister_audio_stream(self, stream: AudioStream) -> None: + self._audio_streams.discard(stream) @property def sid(self) -> str: