Fully configure frame processors when they are used directly on an audio stream#679
Fully configure frame processors when they are used directly on an audio stream#6791egoman wants to merge 16 commits into
Conversation
…io stream And extracting metadata from that room that can be fed into the frame processor.
3e5a9ab to
f62c247
Compare
…oStream This makes it less complex.
The agents sdk can pass this opt-out flag so that it can reuse the frame processor across many audio tracks
Need to think about this a bit more, this pattern as written won't work, since the FrameProcessor today can't have a set of no-op credentials pushed.
| # track left a room — clear processor's room context | ||
| # FIXME: This isn't really good, and I can't figure out what should happen here | ||
| # Closing the processor doesn't work (the track could get added to another room later) | ||
| # Empty values like this don't work, because it causes a drm::Error in the plugin | ||
| # Talk to lukas about this in a 1:1 and see if he can think of anything better | ||
| stream._on_processor_stream_info_updated( | ||
| room_name="", participant_identity="", publication_sid="" | ||
| ) | ||
| # stream._on_processor_credentials_updated(token="", url="") | ||
| return |
There was a problem hiding this comment.
This is the biggest thing still to be determined: I'm not exactly sure what to do when a track is removed from a room while an AudioStream that is within that track has a FrameProcessor registered.
- Initial thought: set empty metadata with something like
stream._on_processor_credentials_updated(token="", url=""). However, this causes the ai-coustics plugin to throw an error because the""string cannot be parsed as a URL. - Next thought: disable the FrameProcessor by setting
enabledtoFalsein this situation (probably also log a warning too that this is being done?). The big problem with this is it would overzelously disableFrameProcessors which don't need credentials to work (like the already-existing Krisp VIVAFrameProcessor). - Other idea: Maybe modify the
FrameProcessorinterface to maketoken/urlOptional[str]? But that would be a breaking api change, so that's probably out...
Curious what others think here and if there's an approach which I have missed.
| if self._processor is not None and not self._processor_leave_open: | ||
| self._processor._close() | ||
| if self._track is not None: | ||
| self._track._unregister_audio_stream(self) | ||
| self._ffi_handle.dispose() | ||
| await self._task |
There was a problem hiding this comment.
🔴 Processor closed before _run task finishes, allowing _process() calls on closed processor
In aclose(), self._processor._close() is called at line 343 before await self._task at line 347. Although these are synchronous calls that happen atomically before the event loop yields, once await self._task yields control, the _run loop (audio_stream.py:312-334) resumes and may process audio frames that were already buffered in _ffi_queue before the EOS event arrives. Those frames reach self._processor._process(frame) at line 322, which is now called on a closed processor. While the try/except at line 321-327 catches Python exceptions, if the processor's _close() releases native resources that _process() depends on, this could cause undefined behavior or a crash that isn't caught by the exception handler. The fix is to move the processor close to after await self._task so the _run loop has fully terminated before the processor is torn down.
| if self._processor is not None and not self._processor_leave_open: | |
| self._processor._close() | |
| if self._track is not None: | |
| self._track._unregister_audio_stream(self) | |
| self._ffi_handle.dispose() | |
| await self._task | |
| if self._track is not None: | |
| self._track._unregister_audio_stream(self) | |
| self._ffi_handle.dispose() | |
| await self._task | |
| if self._processor is not None and not self._processor_leave_open: | |
| self._processor._close() |
Was this helpful? React with 👍 or 👎 to provide feedback.
Updates the python sdk so that
FrameProcessor-based noise cancellation providers can be used directly onAudioStream, without having to go through the agent's RoomIO to be able to initialize itself with credentials.For example, with this change, something like the below becomes possible:
The way this works -
Tracks now keep track of which room they are part of (holding aweakrefvalue). When the room a track is in changes, it computes new frame processor options and sends these to anyAudioStreams which are associated with the track.The
noise_cancellation_leave_openparameter allows the agents sdk to call thisfrom_trackmethod with a frame processor which remains open across the whole session, and won't be auto-closed when the track is closed.This goes along with https://github.com/livekit/agents/pull/new/move-frame-processor-metadata-to-client-sdk, which removes the relevant event handling logic in the agents sdk. I will make this as a follow up once I can determine this pull request is generally moving in a good direction. Also I will follow up with a node version of this once the python one is in a good state.
Todo