-
Notifications
You must be signed in to change notification settings - Fork 117
Fully configure frame processors when they are used directly on an audio stream #679
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
1egoman
wants to merge
16
commits into
main
Choose a base branch
from
frame-processor-on-audio-stream
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+130
−5
Open
Changes from all commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
538fc13
feat: add MVP of propagating room downwards from room -> track -> aud…
1egoman 7c7eaa4
feat: call _on_stream_info_updated with parent room reference on audi…
1egoman 12718d1
feat: call _on_credentials_updated with token / server url extracted …
1egoman af26b3d
fix: remove debugging logs
1egoman 5ecca5d
fix: address lint errors
1egoman af56d61
feat: only call frame processor handlers if room is set
1egoman f62c247
fix: properly intercept room refresh token events
1egoman e7ab10e
feat: add from __future__ import annotations to remove string types
1egoman f7f422d
fix: address incorrect docs
1egoman 24f2b6e
refactor: centralize frame processor state logic into Track, not Audi…
1egoman ad32574
feat: add auto cleanup of FrameProcessor as opt-out
1egoman ce5e793
fix: disable no-op credentials push
1egoman b9f34d0
fix: move processor close from __del__ to aclose
1egoman 4c73cc8
fix: proxy throgh noise_cancellation_leave_open into AudioStream.from…
1egoman 22f4896
fix: include missed noise_cancellation_leave_open in from_track
1egoman 2dbe350
fix: address type checker warning
1egoman File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,21 +12,98 @@ | |
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| from typing import TYPE_CHECKING, List, Union | ||
| from __future__ import annotations | ||
|
|
||
| import weakref | ||
| from typing import TYPE_CHECKING, List, Optional, Union | ||
| from ._ffi_client import FfiHandle, FfiClient | ||
| from ._proto import ffi_pb2 as proto_ffi | ||
| from ._proto import track_pb2 as proto_track | ||
| from ._proto import stats_pb2 as proto_stats | ||
|
|
||
| if TYPE_CHECKING: | ||
| from .audio_source import AudioSource | ||
| from .audio_stream import AudioStream | ||
| from .room import Room | ||
| from .video_source import VideoSource | ||
|
|
||
|
|
||
| class Track: | ||
| def __init__(self, owned_info: proto_track.OwnedTrack): | ||
| self._info = owned_info.info | ||
| self._ffi_handle = FfiHandle(owned_info.handle.id) | ||
| self._room_ref: Optional[weakref.ref[Room]] = None | ||
| self._audio_streams: weakref.WeakSet[AudioStream] = weakref.WeakSet() | ||
|
|
||
| def _resolve_room(self) -> Optional[Room]: | ||
| return self._room_ref() if self._room_ref is not None else None | ||
|
|
||
| def _set_room(self, room: Optional[Room]) -> None: | ||
| old_room = self._resolve_room() | ||
| if old_room is not room: | ||
| if old_room is not None: | ||
| old_room.off("token_refreshed", self._on_room_token_refreshed) | ||
| if room is not None: | ||
| room.on("token_refreshed", self._on_room_token_refreshed) | ||
|
|
||
| self._room_ref = weakref.ref(room) if room is not None else None | ||
|
|
||
| for stream in self._audio_streams: | ||
| self._push_processor_metadata_to_stream(stream, room) | ||
|
|
||
| def _on_room_token_refreshed(self) -> None: | ||
| room = self._resolve_room() | ||
| if room is None or room._token is None or room._server_url is None: | ||
| return | ||
| for stream in self._audio_streams: | ||
| stream._on_processor_credentials_updated(token=room._token, url=room._server_url) | ||
|
|
||
| def _push_processor_metadata_to_stream(self, stream: AudioStream, room: Optional[Room]) -> None: | ||
| if room is None: | ||
| # track left a room — clear processor's room context | ||
| # FIXME: This isn't really good, and I can't figure out what should happen here | ||
| # Closing the processor doesn't work (the track could get added to another room later) | ||
| # Empty values like this don't work, because it causes a drm::Error in the plugin | ||
| # Talk to lukas about this in a 1:1 and see if he can think of anything better | ||
| stream._on_processor_stream_info_updated( | ||
| room_name="", participant_identity="", publication_sid="" | ||
| ) | ||
| # stream._on_processor_credentials_updated(token="", url="") | ||
| return | ||
|
Comment on lines
+63
to
+72
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the biggest thing still to be determined: I'm not exactly sure what to do when a track is removed from a room while an
Curious what others think here and if there's an approach which I have missed. |
||
|
|
||
| identity = "" | ||
| pub_sid = "" | ||
| track_sid = self.sid | ||
| if track_sid: | ||
| for participant in room.remote_participants.values(): | ||
| publication = participant.track_publications.get(track_sid) | ||
| if publication is not None: | ||
| identity, pub_sid = participant.identity, publication.sid | ||
| break | ||
| else: | ||
| local = room._local_participant | ||
| if local is not None: | ||
| for local_publication in local.track_publications.values(): | ||
| if local_publication.sid == track_sid: | ||
| identity, pub_sid = local.identity, local_publication.sid | ||
| break | ||
|
|
||
| stream._on_processor_stream_info_updated( | ||
| room_name=room.name, | ||
| participant_identity=identity, | ||
| publication_sid=pub_sid, | ||
| ) | ||
| if room._token is not None and room._server_url is not None: | ||
| stream._on_processor_credentials_updated(token=room._token, url=room._server_url) | ||
|
|
||
| def _register_audio_stream(self, stream: AudioStream) -> None: | ||
| self._audio_streams.add(stream) | ||
| room = self._resolve_room() | ||
| if room is not None: | ||
| self._push_processor_metadata_to_stream(stream, room) | ||
|
|
||
| def _unregister_audio_stream(self, stream: AudioStream) -> None: | ||
| self._audio_streams.discard(stream) | ||
|
|
||
| @property | ||
| def sid(self) -> str: | ||
|
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔴 Processor closed before _run task finishes, allowing _process() calls on closed processor
In
aclose(),self._processor._close()is called at line 343 beforeawait self._taskat line 347. Although these are synchronous calls that happen atomically before the event loop yields, onceawait self._taskyields control, the_runloop (audio_stream.py:312-334) resumes and may process audio frames that were already buffered in_ffi_queuebefore the EOS event arrives. Those frames reachself._processor._process(frame)at line 322, which is now called on a closed processor. While thetry/exceptat line 321-327 catches Python exceptions, if the processor's_close()releases native resources that_process()depends on, this could cause undefined behavior or a crash that isn't caught by the exception handler. The fix is to move the processor close to afterawait self._taskso the_runloop has fully terminated before the processor is torn down.Was this helpful? React with 👍 or 👎 to provide feedback.