Mercurial > libervia-backend
diff libervia/frontends/tools/webrtc.py @ 4233:d01b8d002619
cli (call, file), frontends: implement webRTC data channel transfer:
- file send/receive commands now supports webRTC transfer. In `send` command, the
`--webrtc` flags is currenty used to activate it.
- WebRTC related code have been factorized and moved to `libervia.frontends.tools.webrtc*`
modules.
rel 442
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 06 Apr 2024 13:43:09 +0200 |
parents | fe29fbdabce6 |
children | 79c8a70e1813 |
line wrap: on
line diff
--- a/libervia/frontends/tools/webrtc.py Sat Apr 06 12:59:50 2024 +0200 +++ b/libervia/frontends/tools/webrtc.py Sat Apr 06 13:43:09 2024 +0200 @@ -16,11 +16,10 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +from collections.abc import Awaitable import gi -gi.require_versions({ - "Gst": "1.0", - "GstWebRTC": "1.0" -}) + +gi.require_versions({"Gst": "1.0", "GstWebRTC": "1.0"}) from gi.repository import Gst, GstWebRTC, GstSdp from libervia.backend.core import exceptions @@ -33,22 +32,23 @@ "your system (e.g., `python3-gst-1.0` on Debian and derivatives)." ) import asyncio -from dataclasses import dataclass from datetime import datetime import logging -from random import randint import re from typing import Callable from urllib.parse import quote_plus from libervia.backend.tools.common import data_format -from libervia.frontends.tools import aio, display_servers +from libervia.frontends.tools import aio, display_servers, jid +from .webrtc_models import AppSinkData, CallData +from .webrtc_screenshare import DesktopPortal current_server = display_servers.detect() if current_server == display_servers.X11: # GSTreamer's ximagesrc documentation asks to run this function import ctypes - ctypes.CDLL('libX11.so.6').XInitThreads() + + ctypes.CDLL("libX11.so.6").XInitThreads() log = logging.getLogger(__name__) @@ -57,194 +57,18 @@ SOURCES_AUTO = "auto" SOURCES_TEST = "test" +SOURCES_DATACHANNEL = "datachannel" SINKS_APP = "app" SINKS_AUTO = "auto" SINKS_TEST = "test" - - -class ScreenshareError(Exception): - pass - - -@dataclass -class AppSinkData: - local_video_cb: Callable - remote_video_cb: Callable|None - - -class DesktopPortal: - - def __init__(self, webrtc: "WebRTC"): - import dbus - from dbus.mainloop.glib import DBusGMainLoop - # we want monitors + windows, see https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.ScreenCast.html#org-freedesktop-portal-screencast-availablesourcetypes - self.dbus = dbus - self.webrtc = webrtc - self.sources_type = dbus.UInt32(7) - DBusGMainLoop(set_as_default=True) - self.session_bus = dbus.SessionBus() - portal_object = self.session_bus.get_object( - 'org.freedesktop.portal.Desktop', - '/org/freedesktop/portal/desktop' - ) - self.screencast_interface = dbus.Interface( - portal_object, - 'org.freedesktop.portal.ScreenCast' - ) - self.session_interface = None - self.session_signal = None - self.handle_counter = 0 - self.session_handle = None - self.stream_data: dict|None = None - - @property - def handle_token(self): - self.handle_counter += 1 - return f"libervia{self.handle_counter}" - - def on_session_closed(self, details: dict) -> None: - if self.session_interface is not None: - self.session_interface = None - self.webrtc.desktop_sharing = False - if self.session_signal is not None: - self.session_signal.remove() - self.session_signal = None - - - async def dbus_call(self, method_name: str, *args) -> dict: - """Call a screenshare portal method - - This method handle the signal response. - @param method_name: method to call - @param args: extra args - `handle_token` will be automatically added to the last arg (option dict) - @return: method result - """ - if self.session_handle is not None: - self.end_screenshare() - method = getattr(self.screencast_interface, method_name) - options = args[-1] - reply_fut = asyncio.Future() - signal_fut = asyncio.Future() - # cf. https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.Request.html - handle_token = self.handle_token - sender = self.session_bus.get_unique_name().replace(".", "_")[1:] - path = f"/org/freedesktop/portal/desktop/request/{sender}/{handle_token}" - signal_match = None - - def on_signal(response, results): - assert signal_match is not None - signal_match.remove() - if response == 0: - signal_fut.set_result(results) - elif response == 1: - signal_fut.set_exception( - exceptions.CancelError("Cancelled by user.") - ) - else: - signal_fut.set_exception(ScreenshareError( - f"Can't get signal result" - )) - - signal_match = self.session_bus.add_signal_receiver( - on_signal, - signal_name="Response", - dbus_interface="org.freedesktop.portal.Request", - path=path - ) - - options["handle_token"] = handle_token - - method( - *args, - reply_handler=reply_fut.set_result, - error_handler=reply_fut.set_exception - ) - try: - await reply_fut - except Exception as e: - raise ScreenshareError(f"Can't ask screenshare permission: {e}") - return await signal_fut - - async def request_screenshare(self) -> dict: - session_data = await self.dbus_call( - "CreateSession", - { - "session_handle_token": str(randint(1, 2**32)), - } - ) - try: - session_handle = session_data["session_handle"] - except KeyError: - raise ScreenshareError("Can't get session handle") - self.session_handle = session_handle - - - await self.dbus_call( - "SelectSources", - session_handle, - { - "multiple": True, - "types": self.sources_type, - "modal": True - } - ) - screenshare_data = await self.dbus_call( - "Start", - session_handle, - "", - {} - ) - - session_object = self.session_bus.get_object( - 'org.freedesktop.portal.Desktop', - session_handle - ) - self.session_interface = self.dbus.Interface( - session_object, - 'org.freedesktop.portal.Session' - ) - - self.session_signal = self.session_bus.add_signal_receiver( - self.on_session_closed, - signal_name="Closed", - dbus_interface="org.freedesktop.portal.Session", - path=session_handle - ) - - try: - node_id, stream_data = screenshare_data["streams"][0] - source_type = int(stream_data["source_type"]) - except (IndexError, KeyError): - raise ScreenshareError("Can't parse stream data") - self.stream_data = stream_data = { - "session_handle": session_handle, - "node_id": node_id, - "source_type": source_type - } - try: - height = int(stream_data["size"][0]) - weight = int(stream_data["size"][1]) - except (IndexError, KeyError): - pass - else: - stream_data["size"] = (height, weight) - - return self.stream_data - - def end_screenshare(self) -> None: - """Close a running screenshare session, if any.""" - if self.session_interface is None: - return - self.session_interface.Close() - self.on_session_closed({}) +SINKS_DATACHANNEL = "datachannel" class WebRTC: """GSTreamer based WebRTC implementation for audio and video communication. This class encapsulates the WebRTC functionalities required for initiating and - handling audio and video calls. + handling audio and video calls, and data channels. """ def __init__( @@ -255,8 +79,15 @@ sinks: str = SINKS_AUTO, appsink_data: AppSinkData | None = None, reset_cb: Callable | None = None, - merge_pip: bool|None = None, - target_size: tuple[int, int]|None = None, + merge_pip: bool | None = None, + target_size: tuple[int, int] | None = None, + call_start_cb: Callable[[str, dict, str], Awaitable[str]] | None = None, + dc_open_cb: ( + Callable[[GstWebRTC.WebRTCDataChannel], Awaitable[None]] | None + ) = None, + dc_on_data_channel: ( + Callable[[GstWebRTC.WebRTCDataChannel], Awaitable[None]] | None + ) = None, ) -> None: """Initializes a new WebRTC instance. @@ -277,6 +108,11 @@ when ``merge_pip`` is set. None to autodetect (not real autodetection implemeted yet, default to (1280,720)). + @param call_start_cb: Called when call is started. + @param dc_open_cb: Called when Data Channel is open (for SOURCES_DATACHANNEL). + This callback will be run in a GStreamer thread. + @param dc_open_cb: Called when Data Channel is created (for SINKS_DATACHANNEL). + This callback will be run in a GStreamer thread. """ self.main_loop = asyncio.get_event_loop() self.bridge = bridge @@ -289,11 +125,20 @@ self.sources = sources self.sinks = sinks if target_size is None: - target_size=(1280, 720) + target_size = (1280, 720) self.target_width, self.target_height = target_size if merge_pip is None: merge_pip = sinks == SINKS_AUTO self.merge_pip = merge_pip + if call_start_cb is None: + call_start_cb = self._call_start + self.call_start_cb = call_start_cb + if sources == SOURCES_DATACHANNEL: + assert dc_open_cb is not None + self.dc_open_cb = dc_open_cb + if sinks == SINKS_DATACHANNEL: + assert dc_on_data_channel is not None + self.dc_on_data_channel = dc_on_data_channel if sinks == SINKS_APP: if ( merge_pip @@ -387,13 +232,12 @@ ) self.bindings[key] = cb - def generate_dot_file( self, filename: str = "pipeline", details: Gst.DebugGraphDetails = Gst.DebugGraphDetails.ALL, with_timestamp: bool = True, - bin_: Gst.Bin|None = None, + bin_: Gst.Bin | None = None, ) -> None: """Generate Dot File for debugging @@ -412,7 +256,7 @@ if bin_ is None: bin_ = self.pipeline if with_timestamp: - timestamp = datetime.now().isoformat(timespec='milliseconds') + timestamp = datetime.now().isoformat(timespec="milliseconds") filename = f"{timestamp}_filename" Gst.debug_bin_to_dot_file(bin_, details, filename) @@ -556,7 +400,7 @@ self.local_candidates_buffer = {} self.ufrag: str | None = None self.pwd: str | None = None - self.callee: str | None = None + self.callee: jid.JID | None = None self._media_types = None self._media_types_inv = None self._sdp_set: bool = False @@ -576,7 +420,6 @@ if self.reset_cb is not None: self.reset_cb() - async def setup_call( self, role: str, @@ -598,76 +441,84 @@ """ assert role in ("initiator", "responder") self.role = role - if audio_pt is None or video_pt is None: - raise NotImplementedError("None value is not handled yet") - if self.sources == SOURCES_AUTO: - video_source_elt = "v4l2src" - audio_source_elt = "pulsesrc" - elif self.sources == SOURCES_TEST: - video_source_elt = "videotestsrc is-live=true pattern=ball" - audio_source_elt = "audiotestsrc" + if self.sources == SOURCES_DATACHANNEL or self.sinks == SINKS_DATACHANNEL: + # Setup pipeline for datachannel only, no media streams. + self.gst_pipe_desc = f""" + webrtcbin name=sendrecv bundle-policy=max-bundle + """ else: - raise exceptions.InternalError(f'Unknown "sources" value: {self.sources!r}') - + if audio_pt is None or video_pt is None: + raise NotImplementedError("None value is not handled yet") - if self.sinks == SINKS_APP: - local_video_sink_elt = ( - "appsink name=local_video_sink emit-signals=true drop=true max-buffers=1 " - "sync=True" - ) - elif self.sinks == SINKS_AUTO: - local_video_sink_elt = "autovideosink" - else: - raise exceptions.InternalError(f"Unknown sinks value {self.sinks!r}") + if self.sources == SOURCES_AUTO: + video_source_elt = "v4l2src" + audio_source_elt = "pulsesrc" + elif self.sources == SOURCES_TEST: + video_source_elt = "videotestsrc is-live=true pattern=ball" + audio_source_elt = "audiotestsrc" + else: + raise exceptions.InternalError( + f'Unknown "sources" value: {self.sources!r}' + ) - if self.merge_pip: - extra_elt = ( - "compositor name=compositor background=black " - f"! video/x-raw,width={self.target_width},height={self.target_height}," - "framerate=30/1 " - f"! {local_video_sink_elt}" - ) - local_video_sink_elt = "compositor.sink_1" - else: - extra_elt = "" + if self.sinks == SINKS_APP: + local_video_sink_elt = ( + "appsink name=local_video_sink emit-signals=true drop=true max-buffers=1 " + "sync=True" + ) + elif self.sinks == SINKS_AUTO: + local_video_sink_elt = "autovideosink" + else: + raise exceptions.InternalError(f"Unknown sinks value {self.sinks!r}") - self.gst_pipe_desc = f""" - webrtcbin latency=100 name=sendrecv bundle-policy=max-bundle + if self.merge_pip: + extra_elt = ( + "compositor name=compositor background=black " + f"! video/x-raw,width={self.target_width},height={self.target_height}," + "framerate=30/1 " + f"! {local_video_sink_elt}" + ) + local_video_sink_elt = "compositor.sink_1" + else: + extra_elt = "" - input-selector name=video_selector - ! videorate - ! video/x-raw,framerate=30/1 - ! tee name=t + self.gst_pipe_desc = f""" + webrtcbin latency=100 name=sendrecv bundle-policy=max-bundle - {extra_elt} + input-selector name=video_selector + ! videorate + ! video/x-raw,framerate=30/1 + ! tee name=t - {video_source_elt} name=video_src ! queue leaky=downstream ! video_selector. - videotestsrc name=muted_src is-live=true pattern=black ! queue leaky=downstream ! video_selector. + {extra_elt} + + {video_source_elt} name=video_src ! queue leaky=downstream ! video_selector. + videotestsrc name=muted_src is-live=true pattern=black ! queue leaky=downstream ! video_selector. - t. - ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream - ! videoconvert - ! vp8enc deadline=1 keyframe-max-dist=60 - ! rtpvp8pay picture-id-mode=15-bit - ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt} - ! sendrecv. + t. + ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream + ! videoconvert + ! vp8enc deadline=1 keyframe-max-dist=60 + ! rtpvp8pay picture-id-mode=15-bit + ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt} + ! sendrecv. - t. - ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream - ! videoconvert - ! {local_video_sink_elt} + t. + ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream + ! videoconvert + ! {local_video_sink_elt} - {audio_source_elt} name=audio_src - ! valve - ! queue max-size-buffers=10 max-size-time=0 max-size-bytes=0 leaky=downstream - ! audioconvert - ! audioresample - ! opusenc audio-type=voice - ! rtpopuspay - ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt} - ! sendrecv. - """ + {audio_source_elt} name=audio_src + ! valve + ! queue max-size-buffers=10 max-size-time=0 max-size-bytes=0 leaky=downstream + ! audioconvert + ! audioresample + ! opusenc audio-type=voice + ! rtpopuspay + ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt} + ! sendrecv. + """ log.debug(f"Gstreamer pipeline: {self.gst_pipe_desc}") @@ -680,16 +531,29 @@ if not self.pipeline: raise exceptions.InternalError("Failed to create Gstreamer pipeline.") + if not isinstance(self.pipeline, Gst.Pipeline): + # in the case of Data Channel there is a single element, and Gst.parse_launch + # doesn't create a Pipeline in this case, so we do it manually. + pipeline = Gst.Pipeline() + pipeline.add(self.pipeline) + self.pipeline = pipeline + self.webrtcbin = self.pipeline.get_by_name("sendrecv") - self.video_src = self.pipeline.get_by_name("video_src") - self.muted_src = self.pipeline.get_by_name("muted_src") - self.video_selector = self.pipeline.get_by_name("video_selector") - self.audio_valve = self.pipeline.get_by_name("audio_valve") + if self.webrtcbin is None: + raise exceptions.InternalError("Can't get the pipeline.") - if self.video_muted: - self.on_video_mute(True) - if self.audio_muted: - self.on_audio_mute(True) + # For datachannel setups, media source, selector, and sink elements are not + # created + if self.sources != SOURCES_DATACHANNEL and self.sinks != SINKS_DATACHANNEL: + self.video_src = self.pipeline.get_by_name("video_src") + self.muted_src = self.pipeline.get_by_name("muted_src") + self.video_selector = self.pipeline.get_by_name("video_selector") + self.audio_valve = self.pipeline.get_by_name("audio_valve") + + if self.video_muted: + self.on_video_mute(True) + if self.audio_muted: + self.on_audio_mute(True) # set STUN and TURN servers external_disco = data_format.deserialise( @@ -719,7 +583,7 @@ log.warning(f"Erreur while adding TURN server {url}") # local video feedback - if self.sinks == SINKS_APP: + if self.sinks == SINKS_APP and self.sources != SOURCES_DATACHANNEL: assert self.appsink_data is not None local_video_sink = self.pipeline.get_by_name("local_video_sink") local_video_sink.set_property("emit-signals", True) @@ -746,6 +610,24 @@ "notify::ice-connection-state", self.on_ice_connection_state ) + if self.sources == SOURCES_DATACHANNEL: + # Data channel configuration for compatibility with browser defaults + data_channel_options = Gst.Structure.new_empty("data-channel-options") + data_channel_options.set_value("ordered", True) + data_channel_options.set_value("protocol", "") + + # Create the data channel + self.pipeline.set_state(Gst.State.READY) + self.data_channel = self.webrtcbin.emit( + "create-data-channel", "file", data_channel_options + ) + if self.data_channel is None: + log.error("Failed to create data channel") + return + self.data_channel.connect("on-open", self.dc_open_cb) + if self.sinks == SINKS_DATACHANNEL: + self.webrtcbin.connect("on-data-channel", self.dc_on_data_channel) + def start_pipeline(self) -> None: """Starts the GStreamer pipeline.""" log.debug("starting the pipeline") @@ -813,7 +695,7 @@ elif isinstance(dest, Gst.Element): return source.link(dest) else: - log.error(f"Unexpected type for dest: {type(sink)}") + log.error(f"Unexpected type for dest: {type(dest)}") return False return True @@ -941,7 +823,6 @@ self.pipeline.add(q, conv, videoscale, capsfilter) - self.pipeline.sync_children_states() ret = pad.link(q.get_static_pad("sink")) if ret != Gst.PadLinkReturn.OK: @@ -997,6 +878,11 @@ decodebin.sync_state_with_parent() pad.link(decodebin.get_static_pad("sink")) + async def _call_start(self, callee: jid.JID, call_data: dict, profile: str) -> str: + return await self.bridge.call_start( + str(self.callee), data_format.serialise({"sdp": self.offer}), self.profile + ) + async def _start_call(self) -> None: """Initiate the call. @@ -1004,8 +890,9 @@ local ICE candidates, they are sent as part of the initiation. """ assert self.callee - self.sid = await self.bridge.call_start( - str(self.callee), data_format.serialise({"sdp": self.offer}), self.profile + assert self.call_start_cb is not None + self.sid = await self.call_start_cb( + self.callee, {"sdp": self.offer}, self.profile ) if self.local_candidates_buffer: log.debug( @@ -1083,6 +970,9 @@ f"Local ICE candidate. MLine Index: {mline_index}, Candidate: {candidate_sdp}" ) parsed_candidate = self.parse_ice_candidate(candidate_sdp) + if parsed_candidate is None: + log.warning(f"Can't parse candidate: {candidate_sdp}") + return try: media_type = self.media_types[mline_index] except KeyError: @@ -1129,7 +1019,7 @@ except Exception as e: raise exceptions.InternalError(f"Can't find sdp mline index: {e}") self.webrtcbin.emit("add-ice-candidate", mline_index, candidate_sdp) - log.debug( + log.warning( f"Remote ICE candidate added. MLine Index: {mline_index}, " f"{candidate_sdp}" ) @@ -1178,7 +1068,9 @@ @param muted: True if video is muted. """ if self.video_selector is not None: - current_source = None if muted else "desktop" if self.desktop_sharing else "video" + current_source = ( + None if muted else "desktop" if self.desktop_sharing else "video" + ) self.switch_video_source(current_source) state = "muted" if muted else "unmuted" log.info(f"Video is now {state}") @@ -1201,9 +1093,7 @@ except exceptions.CancelError: self.desktop_sharing = False return - self.desktop_sharing_data = { - "path": str(screenshare_data["node_id"]) - } + self.desktop_sharing_data = {"path": str(screenshare_data["node_id"])} self.do_desktop_switch(desktop_active) def do_desktop_switch(self, desktop_active: bool) -> None: @@ -1216,7 +1106,7 @@ self.switch_video_source(source) self.desktop_sharing = desktop_active - def switch_video_source(self, source: str|None) -> None: + def switch_video_source(self, source: str | None) -> None: """Activates the specified source while deactivating the others. @param source: 'desktop', 'video', 'muted' or None for muted source. @@ -1252,18 +1142,18 @@ if self.desktop_sink_pad: pad = self.desktop_sink_pad else: - log.error(f"No desktop pad available") - pad = None + log.error(f"No desktop pad available") + pad = None else: pad_name = f"sink_{['video', 'muted'].index(source)}" pad = self.video_selector.get_static_pad(pad_name) if pad is not None: - self.video_selector.props.active_pad = pad + self.video_selector.props.active_pad = pad self.pipeline.set_state(Gst.State.PLAYING) - def _setup_desktop_source(self, properties: dict[str, object]|None) -> None: + def _setup_desktop_source(self, properties: dict[str, object] | None) -> None: """Set up a new desktop source. @param properties: The properties to set on the desktop source. @@ -1287,7 +1177,9 @@ video_convert.link(queue) sink_pad_template = self.video_selector.get_pad_template("sink_%u") - self.desktop_sink_pad = self.video_selector.request_pad(sink_pad_template, None, None) + self.desktop_sink_pad = self.video_selector.request_pad( + sink_pad_template, None, None + ) queue_src_pad = queue.get_static_pad("src") queue_src_pad.link(self.desktop_sink_pad) @@ -1327,3 +1219,114 @@ async def end_call(self) -> None: """Stop streaming and clean instance""" self.reset_instance() + + +class WebRTCCall: + """Helper class to create and handle WebRTC. + + This class handles signals and communication of connection data with backend. + + """ + + def __init__( + self, + bridge, + profile: str, + callee: jid.JID, + on_call_setup_cb: Callable | None = None, + on_call_ended_cb: Callable | None = None, + **kwargs, + ): + """Create and setup a webRTC instance + + @param bridge: async Bridge. + @param profile: profile making or receiving the call + @param callee: peer jid + @param kwargs: extra kw args to use when instantiating WebRTC + """ + self.profile = profile + self.webrtc = WebRTC(bridge, profile, **kwargs) + self.webrtc.callee = callee + self.on_call_setup_cb = on_call_setup_cb + self.on_call_ended_cb = on_call_ended_cb + bridge.register_signal( + "ice_candidates_new", self.on_ice_candidates_new, "plugin" + ) + bridge.register_signal("call_setup", self.on_call_setup, "plugin") + bridge.register_signal("call_ended", self.on_call_ended, "plugin") + + @classmethod + async def make_webrtc_call( + cls, bridge, profile: str, call_data: CallData, **kwargs + ) -> "WebRTCCall": + """Create the webrtc_call instance + + @param call_data: Call data of the command + @param kwargs: extra args used to instanciate WebRTCCall + + """ + webrtc_call = cls(bridge, profile, call_data.callee, **call_data.kwargs, **kwargs) + if call_data.sid is None: + # we are making the call + await webrtc_call.start() + else: + # we are receiving the call + webrtc_call.sid = call_data.sid + if call_data.action_id is not None: + await bridge.action_launch( + call_data.action_id, + data_format.serialise({"cancelled": False}), + profile, + ) + return webrtc_call + + @property + def sid(self) -> str | None: + return self.webrtc.sid + + @sid.setter + def sid(self, new_sid: str | None) -> None: + self.webrtc.sid = new_sid + + async def on_ice_candidates_new( + self, sid: str, candidates_s: str, profile: str + ) -> None: + if sid != self.webrtc.sid or profile != self.profile: + return + self.webrtc.on_ice_candidates_new( + data_format.deserialise(candidates_s), + ) + + async def on_call_setup(self, sid: str, setup_data_s: str, profile: str) -> None: + if sid != self.webrtc.sid or profile != self.profile: + return + setup_data = data_format.deserialise(setup_data_s) + try: + role = setup_data["role"] + sdp = setup_data["sdp"] + except KeyError: + log.error(f"Invalid setup data received: {setup_data}") + return + if role == "initiator": + self.webrtc.on_accepted_call(sdp, profile) + elif role == "responder": + await self.webrtc.answer_call(sdp, profile) + else: + log.error(f"Invalid role received during setup: {setup_data}") + if self.on_call_setup_cb is not None: + await aio.maybe_async(self.on_call_setup_cb(sid, profile)) + + async def on_call_ended(self, sid: str, data_s: str, profile: str) -> None: + if sid != self.webrtc.sid or profile != self.profile: + return + await self.webrtc.end_call() + if self.on_call_ended_cb is not None: + await aio.maybe_async(self.on_call_ended_cb(sid, profile)) + + async def start(self): + """Start a call. + + To be used only if we are initiator + """ + await self.webrtc.setup_call("initiator") + self.webrtc.start_pipeline()