Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 44 additions & 4 deletions livekit-rtc/livekit/rtc/audio_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def __init__(
num_channels: int = 1,
frame_size_ms: int | None = None,
noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None,
noise_cancellation_leave_open: bool = False,
**kwargs: Any,
) -> None:
"""Initialize an `AudioStream` instance.
Expand All @@ -81,6 +82,9 @@ def __init__(
noise_cancellation (Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]], optional):
If noise cancellation is used, pass a `NoiseCancellationOptions` or `FrameProcessor[AudioFrame]` instance
created by the noise cancellation module.
noise_cancellation_leave_open (bool):
When the audio stream closes, leaves the FrameProcessor in an unclosed state so it
can be used with another AudioStream.

Example:
```python
Expand Down Expand Up @@ -113,11 +117,13 @@ def __init__(
self._audio_filter_module: str | None = None
self._audio_filter_options: dict[str, Any] | None = None
self._processor: FrameProcessor[AudioFrame] | None = None
self._processor_leave_open = False
if isinstance(noise_cancellation, NoiseCancellationOptions):
self._audio_filter_module = noise_cancellation.module_id
self._audio_filter_options = noise_cancellation.options
elif isinstance(noise_cancellation, FrameProcessor):
self._processor = noise_cancellation
self._processor_leave_open = noise_cancellation_leave_open

self._task = self._loop.create_task(self._run())
self._task.add_done_callback(task_done_logger)
Expand All @@ -132,6 +138,9 @@ def __init__(
self._ffi_handle = FfiHandle(stream.handle.id)
self._info = stream.info

if self._track is not None:
self._track._register_audio_stream(self)

@classmethod
def from_participant(
cls,
Expand All @@ -144,6 +153,7 @@ def from_participant(
num_channels: int = 1,
frame_size_ms: int | None = None,
noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None,
noise_cancellation_leave_open: bool = False,
) -> AudioStream:
"""Create an `AudioStream` from a participant's audio track.

Expand Down Expand Up @@ -179,8 +189,9 @@ def from_participant(
track=None, # type: ignore
sample_rate=sample_rate,
num_channels=num_channels,
noise_cancellation=noise_cancellation,
frame_size_ms=frame_size_ms,
noise_cancellation=noise_cancellation,
noise_cancellation_leave_open=noise_cancellation_leave_open,
)

@classmethod
Expand All @@ -194,6 +205,7 @@ def from_track(
num_channels: int = 1,
frame_size_ms: int | None = None,
noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None,
noise_cancellation_leave_open: bool = False,
) -> AudioStream:
"""Create an `AudioStream` from an existing audio track.

Expand All @@ -203,9 +215,12 @@ def from_track(
capacity (int, optional): The capacity of the internal frame queue. Defaults to 0 (unbounded).
sample_rate (int, optional): The sample rate for the audio stream in Hz. Defaults to 48000.
num_channels (int, optional): The number of audio channels. Defaults to 1.
noise_cancellation (Optional[NoiseCancellationOptions], optional):
If noise cancellation is used, pass a `NoiseCancellationOptions` instance
noise_cancellation (Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]], optional):
If noise cancellation is used, pass a `NoiseCancellationOptions` or `FrameProcessor[AudioFrame]` instance
created by the noise cancellation module.
noise_cancellation_leave_open (bool):
When the audio stream closes, leaves the FrameProcessor in an unclosed state so it
can be used with another AudioStream.

Returns:
AudioStream: An instance of `AudioStream` that can be used to receive audio frames.
Expand All @@ -225,10 +240,31 @@ def from_track(
capacity=capacity,
sample_rate=sample_rate,
num_channels=num_channels,
noise_cancellation=noise_cancellation,
frame_size_ms=frame_size_ms,
noise_cancellation=noise_cancellation,
noise_cancellation_leave_open=noise_cancellation_leave_open,
)

def _on_processor_stream_info_updated(
self,
*,
room_name: str,
participant_identity: str,
publication_sid: str,
) -> None:
if self._processor is None:
return
self._processor._on_stream_info_updated(
room_name=room_name,
participant_identity=participant_identity,
publication_sid=publication_sid,
)

def _on_processor_credentials_updated(self, *, token: str, url: str) -> None:
if self._processor is None:
return
self._processor._on_credentials_updated(token=token, url=url)

def __del__(self) -> None:
FfiClient.instance.queue.unsubscribe(self._ffi_queue)

Expand Down Expand Up @@ -303,6 +339,10 @@ async def aclose(self) -> None:
This method cleans up resources associated with the audio stream and waits for
any pending operations to complete.
"""
if self._processor is not None and not self._processor_leave_open:
self._processor._close()
if self._track is not None:
self._track._unregister_audio_stream(self)
self._ffi_handle.dispose()
await self._task
Comment on lines +342 to 347
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Processor closed before _run task finishes, allowing _process() calls on closed processor

In aclose(), self._processor._close() is called at line 343 before await self._task at line 347. Although these are synchronous calls that happen atomically before the event loop yields, once await self._task yields control, the _run loop (audio_stream.py:312-334) resumes and may process audio frames that were already buffered in _ffi_queue before the EOS event arrives. Those frames reach self._processor._process(frame) at line 322, which is now called on a closed processor. While the try/except at line 321-327 catches Python exceptions, if the processor's _close() releases native resources that _process() depends on, this could cause undefined behavior or a crash that isn't caught by the exception handler. The fix is to move the processor close to after await self._task so the _run loop has fully terminated before the processor is torn down.

Suggested change
if self._processor is not None and not self._processor_leave_open:
self._processor._close()
if self._track is not None:
self._track._unregister_audio_stream(self)
self._ffi_handle.dispose()
await self._task
if self._track is not None:
self._track._unregister_audio_stream(self)
self._ffi_handle.dispose()
await self._task
if self._processor is not None and not self._processor_leave_open:
self._processor._close()
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


Expand Down
8 changes: 8 additions & 0 deletions livekit-rtc/livekit/rtc/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -733,10 +733,14 @@ def _on_room_event(self, event: proto_room.RoomEvent) -> None:
sid = event.local_track_published.track_sid
lpublication = self.local_participant.track_publications[sid]
ltrack = lpublication.track
if ltrack is not None:
ltrack._set_room(self)
self.emit("local_track_published", lpublication, ltrack)
elif which == "local_track_unpublished":
sid = event.local_track_unpublished.publication_sid
lpublication = self.local_participant.track_publications[sid]
if lpublication.track is not None:
lpublication.track._set_room(None)
self.emit("local_track_unpublished", lpublication)
elif which == "local_track_republished":
# The SDK auto-republished a local track during a full
Expand Down Expand Up @@ -777,17 +781,21 @@ def _on_room_event(self, event: proto_room.RoomEvent) -> None:
rpublication._subscribed = True
if track_info.kind == TrackKind.KIND_VIDEO:
remote_video_track = RemoteVideoTrack(owned_track_info)
remote_video_track._set_room(self)
rpublication._track = remote_video_track
self.emit("track_subscribed", remote_video_track, rpublication, rparticipant)
elif track_info.kind == TrackKind.KIND_AUDIO:
remote_audio_track = RemoteAudioTrack(owned_track_info)
remote_audio_track._set_room(self)
rpublication._track = remote_audio_track
self.emit("track_subscribed", remote_audio_track, rpublication, rparticipant)
elif which == "track_unsubscribed":
identity = event.track_unsubscribed.participant_identity
rparticipant = self._remote_participants[identity]
rpublication = rparticipant.track_publications[event.track_unsubscribed.track_sid]
rtrack = rpublication.track
if rtrack is not None:
rtrack._set_room(None)
rpublication._track = None
rpublication._subscribed = False
self.emit("track_unsubscribed", rtrack, rpublication, rparticipant)
Expand Down
79 changes: 78 additions & 1 deletion livekit-rtc/livekit/rtc/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,98 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import TYPE_CHECKING, List, Union
from __future__ import annotations

import weakref
from typing import TYPE_CHECKING, List, Optional, Union
from ._ffi_client import FfiHandle, FfiClient
from ._proto import ffi_pb2 as proto_ffi
from ._proto import track_pb2 as proto_track
from ._proto import stats_pb2 as proto_stats

if TYPE_CHECKING:
from .audio_source import AudioSource
from .audio_stream import AudioStream
from .room import Room
from .video_source import VideoSource


class Track:
def __init__(self, owned_info: proto_track.OwnedTrack):
self._info = owned_info.info
self._ffi_handle = FfiHandle(owned_info.handle.id)
self._room_ref: Optional[weakref.ref[Room]] = None
self._audio_streams: weakref.WeakSet[AudioStream] = weakref.WeakSet()

def _resolve_room(self) -> Optional[Room]:
return self._room_ref() if self._room_ref is not None else None

def _set_room(self, room: Optional[Room]) -> None:
old_room = self._resolve_room()
if old_room is not room:
if old_room is not None:
old_room.off("token_refreshed", self._on_room_token_refreshed)
if room is not None:
room.on("token_refreshed", self._on_room_token_refreshed)

self._room_ref = weakref.ref(room) if room is not None else None

for stream in self._audio_streams:
self._push_processor_metadata_to_stream(stream, room)

def _on_room_token_refreshed(self) -> None:
room = self._resolve_room()
if room is None or room._token is None or room._server_url is None:
return
for stream in self._audio_streams:
stream._on_processor_credentials_updated(token=room._token, url=room._server_url)

def _push_processor_metadata_to_stream(self, stream: AudioStream, room: Optional[Room]) -> None:
if room is None:
# track left a room — clear processor's room context
# FIXME: This isn't really good, and I can't figure out what should happen here
# Closing the processor doesn't work (the track could get added to another room later)
# Empty values like this don't work, because it causes a drm::Error in the plugin
# Talk to lukas about this in a 1:1 and see if he can think of anything better
stream._on_processor_stream_info_updated(
room_name="", participant_identity="", publication_sid=""
)
# stream._on_processor_credentials_updated(token="", url="")
return
Comment on lines +63 to +72
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the biggest thing still to be determined: I'm not exactly sure what to do when a track is removed from a room while an AudioStream that is within that track has a FrameProcessor registered.

  • Initial thought: set empty metadata with something like stream._on_processor_credentials_updated(token="", url=""). However, this causes the ai-coustics plugin to throw an error because the "" string cannot be parsed as a URL.
  • Next thought: disable the FrameProcessor by setting enabled to False in this situation (probably also log a warning too that this is being done?). The big problem with this is it would overzelously disable FrameProcessors which don't need credentials to work (like the already-existing Krisp VIVA FrameProcessor).
  • Other idea: Maybe modify the FrameProcessor interface to make token / url Optional[str]? But that would be a breaking api change, so that's probably out...

Curious what others think here and if there's an approach which I have missed.


identity = ""
pub_sid = ""
track_sid = self.sid
if track_sid:
for participant in room.remote_participants.values():
publication = participant.track_publications.get(track_sid)
if publication is not None:
identity, pub_sid = participant.identity, publication.sid
break
else:
local = room._local_participant
if local is not None:
for local_publication in local.track_publications.values():
if local_publication.sid == track_sid:
identity, pub_sid = local.identity, local_publication.sid
break

stream._on_processor_stream_info_updated(
room_name=room.name,
participant_identity=identity,
publication_sid=pub_sid,
)
if room._token is not None and room._server_url is not None:
stream._on_processor_credentials_updated(token=room._token, url=room._server_url)

def _register_audio_stream(self, stream: AudioStream) -> None:
self._audio_streams.add(stream)
room = self._resolve_room()
if room is not None:
self._push_processor_metadata_to_stream(stream, room)

def _unregister_audio_stream(self, stream: AudioStream) -> None:
self._audio_streams.discard(stream)

@property
def sid(self) -> str:
Expand Down
Loading