diff libervia/frontends/tools/webrtc.py @ 4233:d01b8d002619

cli (call, file), frontends: implement webRTC data channel transfer: - file send/receive commands now supports webRTC transfer. In `send` command, the `--webrtc` flags is currenty used to activate it. - WebRTC related code have been factorized and moved to `libervia.frontends.tools.webrtc*` modules. rel 442
author Goffi <goffi@goffi.org>
date Sat, 06 Apr 2024 13:43:09 +0200
parents fe29fbdabce6
children 79c8a70e1813
line wrap: on
line diff
--- a/libervia/frontends/tools/webrtc.py	Sat Apr 06 12:59:50 2024 +0200
+++ b/libervia/frontends/tools/webrtc.py	Sat Apr 06 13:43:09 2024 +0200
@@ -16,11 +16,10 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
+from collections.abc import Awaitable
 import gi
-gi.require_versions({
-    "Gst": "1.0",
-    "GstWebRTC": "1.0"
-})
+
+gi.require_versions({"Gst": "1.0", "GstWebRTC": "1.0"})
 from gi.repository import Gst, GstWebRTC, GstSdp
 
 from libervia.backend.core import exceptions
@@ -33,22 +32,23 @@
         "your system (e.g., `python3-gst-1.0` on Debian and derivatives)."
     )
 import asyncio
-from dataclasses import dataclass
 from datetime import datetime
 import logging
-from random import randint
 import re
 from typing import Callable
 from urllib.parse import quote_plus
 
 from libervia.backend.tools.common import data_format
-from libervia.frontends.tools import aio, display_servers
+from libervia.frontends.tools import aio, display_servers, jid
+from .webrtc_models import AppSinkData, CallData
+from .webrtc_screenshare import DesktopPortal
 
 current_server = display_servers.detect()
 if current_server == display_servers.X11:
     # GSTreamer's ximagesrc documentation asks to run this function
     import ctypes
-    ctypes.CDLL('libX11.so.6').XInitThreads()
+
+    ctypes.CDLL("libX11.so.6").XInitThreads()
 
 
 log = logging.getLogger(__name__)
@@ -57,194 +57,18 @@
 
 SOURCES_AUTO = "auto"
 SOURCES_TEST = "test"
+SOURCES_DATACHANNEL = "datachannel"
 SINKS_APP = "app"
 SINKS_AUTO = "auto"
 SINKS_TEST = "test"
-
-
-class ScreenshareError(Exception):
-    pass
-
-
-@dataclass
-class AppSinkData:
-    local_video_cb: Callable
-    remote_video_cb: Callable|None
-
-
-class DesktopPortal:
-
-    def __init__(self, webrtc: "WebRTC"):
-        import dbus
-        from dbus.mainloop.glib import DBusGMainLoop
-        # we want monitors + windows, see https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.ScreenCast.html#org-freedesktop-portal-screencast-availablesourcetypes
-        self.dbus = dbus
-        self.webrtc = webrtc
-        self.sources_type = dbus.UInt32(7)
-        DBusGMainLoop(set_as_default=True)
-        self.session_bus = dbus.SessionBus()
-        portal_object = self.session_bus.get_object(
-            'org.freedesktop.portal.Desktop',
-            '/org/freedesktop/portal/desktop'
-        )
-        self.screencast_interface = dbus.Interface(
-            portal_object,
-            'org.freedesktop.portal.ScreenCast'
-        )
-        self.session_interface = None
-        self.session_signal = None
-        self.handle_counter = 0
-        self.session_handle = None
-        self.stream_data: dict|None = None
-
-    @property
-    def handle_token(self):
-        self.handle_counter += 1
-        return f"libervia{self.handle_counter}"
-
-    def on_session_closed(self, details: dict) -> None:
-        if self.session_interface is not None:
-            self.session_interface = None
-            self.webrtc.desktop_sharing = False
-            if self.session_signal is not None:
-                self.session_signal.remove()
-                self.session_signal = None
-
-
-    async def dbus_call(self, method_name: str, *args) -> dict:
-        """Call a screenshare portal method
-
-        This method handle the signal response.
-        @param method_name: method to call
-        @param args: extra args
-            `handle_token` will be automatically added to the last arg (option dict)
-        @return: method result
-        """
-        if self.session_handle is not None:
-            self.end_screenshare()
-        method = getattr(self.screencast_interface, method_name)
-        options = args[-1]
-        reply_fut = asyncio.Future()
-        signal_fut = asyncio.Future()
-        # cf. https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.Request.html
-        handle_token = self.handle_token
-        sender = self.session_bus.get_unique_name().replace(".", "_")[1:]
-        path = f"/org/freedesktop/portal/desktop/request/{sender}/{handle_token}"
-        signal_match = None
-
-        def on_signal(response, results):
-            assert signal_match is not None
-            signal_match.remove()
-            if response == 0:
-                signal_fut.set_result(results)
-            elif response == 1:
-                signal_fut.set_exception(
-                    exceptions.CancelError("Cancelled by user.")
-                )
-            else:
-                signal_fut.set_exception(ScreenshareError(
-                    f"Can't get signal result"
-                ))
-
-        signal_match = self.session_bus.add_signal_receiver(
-            on_signal,
-            signal_name="Response",
-            dbus_interface="org.freedesktop.portal.Request",
-            path=path
-        )
-
-        options["handle_token"] = handle_token
-
-        method(
-            *args,
-            reply_handler=reply_fut.set_result,
-            error_handler=reply_fut.set_exception
-        )
-        try:
-            await reply_fut
-        except Exception as e:
-            raise ScreenshareError(f"Can't ask screenshare permission: {e}")
-        return await signal_fut
-
-    async def request_screenshare(self) -> dict:
-        session_data = await self.dbus_call(
-            "CreateSession",
-            {
-                "session_handle_token": str(randint(1, 2**32)),
-            }
-        )
-        try:
-            session_handle = session_data["session_handle"]
-        except KeyError:
-            raise ScreenshareError("Can't get session handle")
-        self.session_handle = session_handle
-
-
-        await self.dbus_call(
-            "SelectSources",
-            session_handle,
-            {
-                "multiple": True,
-                "types": self.sources_type,
-                "modal": True
-            }
-        )
-        screenshare_data = await self.dbus_call(
-            "Start",
-            session_handle,
-            "",
-            {}
-        )
-
-        session_object = self.session_bus.get_object(
-            'org.freedesktop.portal.Desktop',
-            session_handle
-        )
-        self.session_interface = self.dbus.Interface(
-            session_object,
-            'org.freedesktop.portal.Session'
-        )
-
-        self.session_signal = self.session_bus.add_signal_receiver(
-            self.on_session_closed,
-            signal_name="Closed",
-            dbus_interface="org.freedesktop.portal.Session",
-            path=session_handle
-        )
-
-        try:
-            node_id, stream_data = screenshare_data["streams"][0]
-            source_type = int(stream_data["source_type"])
-        except (IndexError, KeyError):
-            raise ScreenshareError("Can't parse stream data")
-        self.stream_data = stream_data = {
-            "session_handle": session_handle,
-            "node_id": node_id,
-            "source_type": source_type
-        }
-        try:
-            height = int(stream_data["size"][0])
-            weight = int(stream_data["size"][1])
-        except (IndexError, KeyError):
-            pass
-        else:
-            stream_data["size"] = (height, weight)
-
-        return self.stream_data
-
-    def end_screenshare(self) -> None:
-        """Close a running screenshare session, if any."""
-        if self.session_interface is None:
-            return
-        self.session_interface.Close()
-        self.on_session_closed({})
+SINKS_DATACHANNEL = "datachannel"
 
 
 class WebRTC:
     """GSTreamer based WebRTC implementation for audio and video communication.
 
     This class encapsulates the WebRTC functionalities required for initiating and
-    handling audio and video calls.
+    handling audio and video calls, and data channels.
     """
 
     def __init__(
@@ -255,8 +79,15 @@
         sinks: str = SINKS_AUTO,
         appsink_data: AppSinkData | None = None,
         reset_cb: Callable | None = None,
-        merge_pip: bool|None = None,
-        target_size: tuple[int, int]|None = None,
+        merge_pip: bool | None = None,
+        target_size: tuple[int, int] | None = None,
+        call_start_cb: Callable[[str, dict, str], Awaitable[str]] | None = None,
+        dc_open_cb: (
+            Callable[[GstWebRTC.WebRTCDataChannel], Awaitable[None]] | None
+        ) = None,
+        dc_on_data_channel: (
+            Callable[[GstWebRTC.WebRTCDataChannel], Awaitable[None]] | None
+        ) = None,
     ) -> None:
         """Initializes a new WebRTC instance.
 
@@ -277,6 +108,11 @@
             when ``merge_pip`` is set.
             None to autodetect (not real autodetection implemeted yet, default to
             (1280,720)).
+        @param call_start_cb: Called when call is started.
+        @param dc_open_cb: Called when Data Channel is open (for SOURCES_DATACHANNEL).
+            This callback will be run in a GStreamer thread.
+        @param dc_open_cb: Called when Data Channel is created (for SINKS_DATACHANNEL).
+            This callback will be run in a GStreamer thread.
         """
         self.main_loop = asyncio.get_event_loop()
         self.bridge = bridge
@@ -289,11 +125,20 @@
         self.sources = sources
         self.sinks = sinks
         if target_size is None:
-            target_size=(1280, 720)
+            target_size = (1280, 720)
         self.target_width, self.target_height = target_size
         if merge_pip is None:
             merge_pip = sinks == SINKS_AUTO
         self.merge_pip = merge_pip
+        if call_start_cb is None:
+            call_start_cb = self._call_start
+        self.call_start_cb = call_start_cb
+        if sources == SOURCES_DATACHANNEL:
+            assert dc_open_cb is not None
+            self.dc_open_cb = dc_open_cb
+        if sinks == SINKS_DATACHANNEL:
+            assert dc_on_data_channel is not None
+            self.dc_on_data_channel = dc_on_data_channel
         if sinks == SINKS_APP:
             if (
                 merge_pip
@@ -387,13 +232,12 @@
                 )
             self.bindings[key] = cb
 
-
     def generate_dot_file(
         self,
         filename: str = "pipeline",
         details: Gst.DebugGraphDetails = Gst.DebugGraphDetails.ALL,
         with_timestamp: bool = True,
-        bin_: Gst.Bin|None = None,
+        bin_: Gst.Bin | None = None,
     ) -> None:
         """Generate Dot File for debugging
 
@@ -412,7 +256,7 @@
         if bin_ is None:
             bin_ = self.pipeline
         if with_timestamp:
-            timestamp = datetime.now().isoformat(timespec='milliseconds')
+            timestamp = datetime.now().isoformat(timespec="milliseconds")
             filename = f"{timestamp}_filename"
 
         Gst.debug_bin_to_dot_file(bin_, details, filename)
@@ -556,7 +400,7 @@
         self.local_candidates_buffer = {}
         self.ufrag: str | None = None
         self.pwd: str | None = None
-        self.callee: str | None = None
+        self.callee: jid.JID | None = None
         self._media_types = None
         self._media_types_inv = None
         self._sdp_set: bool = False
@@ -576,7 +420,6 @@
         if self.reset_cb is not None:
             self.reset_cb()
 
-
     async def setup_call(
         self,
         role: str,
@@ -598,76 +441,84 @@
         """
         assert role in ("initiator", "responder")
         self.role = role
-        if audio_pt is None or video_pt is None:
-            raise NotImplementedError("None value is not handled yet")
 
-        if self.sources == SOURCES_AUTO:
-            video_source_elt = "v4l2src"
-            audio_source_elt = "pulsesrc"
-        elif self.sources == SOURCES_TEST:
-            video_source_elt = "videotestsrc is-live=true pattern=ball"
-            audio_source_elt = "audiotestsrc"
+        if self.sources == SOURCES_DATACHANNEL or self.sinks == SINKS_DATACHANNEL:
+            # Setup pipeline for datachannel only, no media streams.
+            self.gst_pipe_desc = f"""
+            webrtcbin name=sendrecv bundle-policy=max-bundle
+            """
         else:
-            raise exceptions.InternalError(f'Unknown "sources" value: {self.sources!r}')
-
+            if audio_pt is None or video_pt is None:
+                raise NotImplementedError("None value is not handled yet")
 
-        if self.sinks == SINKS_APP:
-            local_video_sink_elt = (
-                "appsink name=local_video_sink emit-signals=true drop=true max-buffers=1 "
-                "sync=True"
-            )
-        elif self.sinks == SINKS_AUTO:
-            local_video_sink_elt = "autovideosink"
-        else:
-            raise exceptions.InternalError(f"Unknown sinks value {self.sinks!r}")
+            if self.sources == SOURCES_AUTO:
+                video_source_elt = "v4l2src"
+                audio_source_elt = "pulsesrc"
+            elif self.sources == SOURCES_TEST:
+                video_source_elt = "videotestsrc is-live=true pattern=ball"
+                audio_source_elt = "audiotestsrc"
+            else:
+                raise exceptions.InternalError(
+                    f'Unknown "sources" value: {self.sources!r}'
+                )
 
-        if self.merge_pip:
-            extra_elt = (
-                "compositor name=compositor background=black "
-                f"! video/x-raw,width={self.target_width},height={self.target_height},"
-                "framerate=30/1 "
-                f"! {local_video_sink_elt}"
-            )
-            local_video_sink_elt = "compositor.sink_1"
-        else:
-            extra_elt = ""
+            if self.sinks == SINKS_APP:
+                local_video_sink_elt = (
+                    "appsink name=local_video_sink emit-signals=true drop=true max-buffers=1 "
+                    "sync=True"
+                )
+            elif self.sinks == SINKS_AUTO:
+                local_video_sink_elt = "autovideosink"
+            else:
+                raise exceptions.InternalError(f"Unknown sinks value {self.sinks!r}")
 
-        self.gst_pipe_desc = f"""
-        webrtcbin latency=100 name=sendrecv bundle-policy=max-bundle
+            if self.merge_pip:
+                extra_elt = (
+                    "compositor name=compositor background=black "
+                    f"! video/x-raw,width={self.target_width},height={self.target_height},"
+                    "framerate=30/1 "
+                    f"! {local_video_sink_elt}"
+                )
+                local_video_sink_elt = "compositor.sink_1"
+            else:
+                extra_elt = ""
 
-        input-selector name=video_selector
-        ! videorate
-        ! video/x-raw,framerate=30/1
-        ! tee name=t
+            self.gst_pipe_desc = f"""
+            webrtcbin latency=100 name=sendrecv bundle-policy=max-bundle
 
-        {extra_elt}
+            input-selector name=video_selector
+            ! videorate
+            ! video/x-raw,framerate=30/1
+            ! tee name=t
 
-        {video_source_elt} name=video_src ! queue leaky=downstream ! video_selector.
-        videotestsrc name=muted_src is-live=true pattern=black ! queue leaky=downstream ! video_selector.
+            {extra_elt}
+
+            {video_source_elt} name=video_src ! queue leaky=downstream ! video_selector.
+            videotestsrc name=muted_src is-live=true pattern=black ! queue leaky=downstream ! video_selector.
 
-        t.
-        ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream
-        ! videoconvert
-        ! vp8enc deadline=1 keyframe-max-dist=60
-        ! rtpvp8pay picture-id-mode=15-bit
-        ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt}
-        ! sendrecv.
+            t.
+            ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream
+            ! videoconvert
+            ! vp8enc deadline=1 keyframe-max-dist=60
+            ! rtpvp8pay picture-id-mode=15-bit
+            ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt}
+            ! sendrecv.
 
-        t.
-        ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream
-        ! videoconvert
-        ! {local_video_sink_elt}
+            t.
+            ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream
+            ! videoconvert
+            ! {local_video_sink_elt}
 
-        {audio_source_elt} name=audio_src
-        ! valve
-        ! queue max-size-buffers=10 max-size-time=0 max-size-bytes=0 leaky=downstream
-        ! audioconvert
-        ! audioresample
-        ! opusenc audio-type=voice
-        ! rtpopuspay
-        ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt}
-        ! sendrecv.
-        """
+            {audio_source_elt} name=audio_src
+            ! valve
+            ! queue max-size-buffers=10 max-size-time=0 max-size-bytes=0 leaky=downstream
+            ! audioconvert
+            ! audioresample
+            ! opusenc audio-type=voice
+            ! rtpopuspay
+            ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt}
+            ! sendrecv.
+            """
 
         log.debug(f"Gstreamer pipeline: {self.gst_pipe_desc}")
 
@@ -680,16 +531,29 @@
         if not self.pipeline:
             raise exceptions.InternalError("Failed to create Gstreamer pipeline.")
 
+        if not isinstance(self.pipeline, Gst.Pipeline):
+            # in the case of Data Channel there is a single element, and Gst.parse_launch
+            # doesn't create a Pipeline in this case, so we do it manually.
+            pipeline = Gst.Pipeline()
+            pipeline.add(self.pipeline)
+            self.pipeline = pipeline
+
         self.webrtcbin = self.pipeline.get_by_name("sendrecv")
-        self.video_src = self.pipeline.get_by_name("video_src")
-        self.muted_src = self.pipeline.get_by_name("muted_src")
-        self.video_selector = self.pipeline.get_by_name("video_selector")
-        self.audio_valve = self.pipeline.get_by_name("audio_valve")
+        if self.webrtcbin is None:
+            raise exceptions.InternalError("Can't get the pipeline.")
 
-        if self.video_muted:
-            self.on_video_mute(True)
-        if self.audio_muted:
-            self.on_audio_mute(True)
+        # For datachannel setups, media source, selector, and sink elements are not
+        # created
+        if self.sources != SOURCES_DATACHANNEL and self.sinks != SINKS_DATACHANNEL:
+            self.video_src = self.pipeline.get_by_name("video_src")
+            self.muted_src = self.pipeline.get_by_name("muted_src")
+            self.video_selector = self.pipeline.get_by_name("video_selector")
+            self.audio_valve = self.pipeline.get_by_name("audio_valve")
+
+            if self.video_muted:
+                self.on_video_mute(True)
+            if self.audio_muted:
+                self.on_audio_mute(True)
 
         # set STUN and TURN servers
         external_disco = data_format.deserialise(
@@ -719,7 +583,7 @@
                     log.warning(f"Erreur while adding TURN server {url}")
 
         # local video feedback
-        if self.sinks == SINKS_APP:
+        if self.sinks == SINKS_APP and self.sources != SOURCES_DATACHANNEL:
             assert self.appsink_data is not None
             local_video_sink = self.pipeline.get_by_name("local_video_sink")
             local_video_sink.set_property("emit-signals", True)
@@ -746,6 +610,24 @@
             "notify::ice-connection-state", self.on_ice_connection_state
         )
 
+        if self.sources == SOURCES_DATACHANNEL:
+            # Data channel configuration for compatibility with browser defaults
+            data_channel_options = Gst.Structure.new_empty("data-channel-options")
+            data_channel_options.set_value("ordered", True)
+            data_channel_options.set_value("protocol", "")
+
+            # Create the data channel
+            self.pipeline.set_state(Gst.State.READY)
+            self.data_channel = self.webrtcbin.emit(
+                "create-data-channel", "file", data_channel_options
+            )
+            if self.data_channel is None:
+                log.error("Failed to create data channel")
+                return
+            self.data_channel.connect("on-open", self.dc_open_cb)
+        if self.sinks == SINKS_DATACHANNEL:
+            self.webrtcbin.connect("on-data-channel", self.dc_on_data_channel)
+
     def start_pipeline(self) -> None:
         """Starts the GStreamer pipeline."""
         log.debug("starting the pipeline")
@@ -813,7 +695,7 @@
         elif isinstance(dest, Gst.Element):
             return source.link(dest)
         else:
-            log.error(f"Unexpected type for dest: {type(sink)}")
+            log.error(f"Unexpected type for dest: {type(dest)}")
             return False
 
         return True
@@ -941,7 +823,6 @@
 
                 self.pipeline.add(q, conv, videoscale, capsfilter)
 
-
                 self.pipeline.sync_children_states()
                 ret = pad.link(q.get_static_pad("sink"))
                 if ret != Gst.PadLinkReturn.OK:
@@ -997,6 +878,11 @@
         decodebin.sync_state_with_parent()
         pad.link(decodebin.get_static_pad("sink"))
 
+    async def _call_start(self, callee: jid.JID, call_data: dict, profile: str) -> str:
+        return await self.bridge.call_start(
+            str(self.callee), data_format.serialise({"sdp": self.offer}), self.profile
+        )
+
     async def _start_call(self) -> None:
         """Initiate the call.
 
@@ -1004,8 +890,9 @@
         local ICE candidates, they are sent as part of the initiation.
         """
         assert self.callee
-        self.sid = await self.bridge.call_start(
-            str(self.callee), data_format.serialise({"sdp": self.offer}), self.profile
+        assert self.call_start_cb is not None
+        self.sid = await self.call_start_cb(
+            self.callee, {"sdp": self.offer}, self.profile
         )
         if self.local_candidates_buffer:
             log.debug(
@@ -1083,6 +970,9 @@
             f"Local ICE candidate. MLine Index: {mline_index}, Candidate: {candidate_sdp}"
         )
         parsed_candidate = self.parse_ice_candidate(candidate_sdp)
+        if parsed_candidate is None:
+            log.warning(f"Can't parse candidate: {candidate_sdp}")
+            return
         try:
             media_type = self.media_types[mline_index]
         except KeyError:
@@ -1129,7 +1019,7 @@
                 except Exception as e:
                     raise exceptions.InternalError(f"Can't find sdp mline index: {e}")
                 self.webrtcbin.emit("add-ice-candidate", mline_index, candidate_sdp)
-                log.debug(
+                log.warning(
                     f"Remote ICE candidate added. MLine Index: {mline_index}, "
                     f"{candidate_sdp}"
                 )
@@ -1178,7 +1068,9 @@
         @param muted: True if video is muted.
         """
         if self.video_selector is not None:
-            current_source = None if muted else "desktop" if self.desktop_sharing else "video"
+            current_source = (
+                None if muted else "desktop" if self.desktop_sharing else "video"
+            )
             self.switch_video_source(current_source)
             state = "muted" if muted else "unmuted"
             log.info(f"Video is now {state}")
@@ -1201,9 +1093,7 @@
         except exceptions.CancelError:
             self.desktop_sharing = False
             return
-        self.desktop_sharing_data = {
-            "path": str(screenshare_data["node_id"])
-        }
+        self.desktop_sharing_data = {"path": str(screenshare_data["node_id"])}
         self.do_desktop_switch(desktop_active)
 
     def do_desktop_switch(self, desktop_active: bool) -> None:
@@ -1216,7 +1106,7 @@
         self.switch_video_source(source)
         self.desktop_sharing = desktop_active
 
-    def switch_video_source(self, source: str|None) -> None:
+    def switch_video_source(self, source: str | None) -> None:
         """Activates the specified source while deactivating the others.
 
         @param source: 'desktop', 'video', 'muted' or None for muted source.
@@ -1252,18 +1142,18 @@
             if self.desktop_sink_pad:
                 pad = self.desktop_sink_pad
             else:
-               log.error(f"No desktop pad available")
-               pad = None
+                log.error(f"No desktop pad available")
+                pad = None
         else:
             pad_name = f"sink_{['video', 'muted'].index(source)}"
             pad = self.video_selector.get_static_pad(pad_name)
 
         if pad is not None:
-           self.video_selector.props.active_pad = pad
+            self.video_selector.props.active_pad = pad
 
         self.pipeline.set_state(Gst.State.PLAYING)
 
-    def _setup_desktop_source(self, properties: dict[str, object]|None) -> None:
+    def _setup_desktop_source(self, properties: dict[str, object] | None) -> None:
         """Set up a new desktop source.
 
         @param properties: The properties to set on the desktop source.
@@ -1287,7 +1177,9 @@
         video_convert.link(queue)
 
         sink_pad_template = self.video_selector.get_pad_template("sink_%u")
-        self.desktop_sink_pad = self.video_selector.request_pad(sink_pad_template, None, None)
+        self.desktop_sink_pad = self.video_selector.request_pad(
+            sink_pad_template, None, None
+        )
         queue_src_pad = queue.get_static_pad("src")
         queue_src_pad.link(self.desktop_sink_pad)
 
@@ -1327,3 +1219,114 @@
     async def end_call(self) -> None:
         """Stop streaming and clean instance"""
         self.reset_instance()
+
+
+class WebRTCCall:
+    """Helper class to create and handle WebRTC.
+
+    This class handles signals and communication of connection data with backend.
+
+    """
+
+    def __init__(
+        self,
+        bridge,
+        profile: str,
+        callee: jid.JID,
+        on_call_setup_cb: Callable | None = None,
+        on_call_ended_cb: Callable | None = None,
+        **kwargs,
+    ):
+        """Create and setup a webRTC instance
+
+        @param bridge: async Bridge.
+        @param profile: profile making or receiving the call
+        @param callee: peer jid
+        @param kwargs: extra kw args to use when instantiating WebRTC
+        """
+        self.profile = profile
+        self.webrtc = WebRTC(bridge, profile, **kwargs)
+        self.webrtc.callee = callee
+        self.on_call_setup_cb = on_call_setup_cb
+        self.on_call_ended_cb = on_call_ended_cb
+        bridge.register_signal(
+            "ice_candidates_new", self.on_ice_candidates_new, "plugin"
+        )
+        bridge.register_signal("call_setup", self.on_call_setup, "plugin")
+        bridge.register_signal("call_ended", self.on_call_ended, "plugin")
+
+    @classmethod
+    async def make_webrtc_call(
+        cls, bridge, profile: str, call_data: CallData, **kwargs
+    ) -> "WebRTCCall":
+        """Create the webrtc_call instance
+
+        @param call_data: Call data of the command
+        @param kwargs: extra args used to instanciate WebRTCCall
+
+        """
+        webrtc_call = cls(bridge, profile, call_data.callee, **call_data.kwargs, **kwargs)
+        if call_data.sid is None:
+            # we are making the call
+            await webrtc_call.start()
+        else:
+            # we are receiving the call
+            webrtc_call.sid = call_data.sid
+            if call_data.action_id is not None:
+                await bridge.action_launch(
+                    call_data.action_id,
+                    data_format.serialise({"cancelled": False}),
+                    profile,
+                )
+        return webrtc_call
+
+    @property
+    def sid(self) -> str | None:
+        return self.webrtc.sid
+
+    @sid.setter
+    def sid(self, new_sid: str | None) -> None:
+        self.webrtc.sid = new_sid
+
+    async def on_ice_candidates_new(
+        self, sid: str, candidates_s: str, profile: str
+    ) -> None:
+        if sid != self.webrtc.sid or profile != self.profile:
+            return
+        self.webrtc.on_ice_candidates_new(
+            data_format.deserialise(candidates_s),
+        )
+
+    async def on_call_setup(self, sid: str, setup_data_s: str, profile: str) -> None:
+        if sid != self.webrtc.sid or profile != self.profile:
+            return
+        setup_data = data_format.deserialise(setup_data_s)
+        try:
+            role = setup_data["role"]
+            sdp = setup_data["sdp"]
+        except KeyError:
+            log.error(f"Invalid setup data received: {setup_data}")
+            return
+        if role == "initiator":
+            self.webrtc.on_accepted_call(sdp, profile)
+        elif role == "responder":
+            await self.webrtc.answer_call(sdp, profile)
+        else:
+            log.error(f"Invalid role received during setup: {setup_data}")
+        if self.on_call_setup_cb is not None:
+            await aio.maybe_async(self.on_call_setup_cb(sid, profile))
+
+    async def on_call_ended(self, sid: str, data_s: str, profile: str) -> None:
+        if sid != self.webrtc.sid or profile != self.profile:
+            return
+        await self.webrtc.end_call()
+        if self.on_call_ended_cb is not None:
+            await aio.maybe_async(self.on_call_ended_cb(sid, profile))
+
+    async def start(self):
+        """Start a call.
+
+        To be used only if we are initiator
+        """
+        await self.webrtc.setup_call("initiator")
+        self.webrtc.start_pipeline()