changeset 509:f0ce49b360c8

calls: move webrtc code to core: WebRTC code which can be used in several frontends has been factorized and moved to common `frontends.tools`. Test have been updated consequently. rel 426
author Goffi <goffi@goffi.org>
date Wed, 01 Nov 2023 13:41:07 +0100
parents d87b9a6b0b69
children 97ab236e8f20
files libervia/desktop_kivy/plugins/plugin_wid_calls.py tests/unit/test_plugin_calls.py
diffstat 2 files changed, 47 insertions(+), 929 deletions(-) [+]
line wrap: on
line diff
--- a/libervia/desktop_kivy/plugins/plugin_wid_calls.py	Wed Oct 25 15:29:33 2023 +0200
+++ b/libervia/desktop_kivy/plugins/plugin_wid_calls.py	Wed Nov 01 13:41:07 2023 +0100
@@ -1,8 +1,6 @@
 from dataclasses import dataclass
 from pathlib import Path
-import re
 from typing import Optional, Callable
-from urllib.parse import quote_plus
 from functools import partial
 
 # from gi.repository import GLib
@@ -37,7 +35,7 @@
 from libervia.backend.core import exceptions
 from libervia.backend.tools.common import data_format
 from libervia.frontends.quick_frontend import quick_widgets
-from libervia.frontends.tools import jid, aio
+from libervia.frontends.tools import aio, jid, webrtc
 
 from libervia.desktop_kivy import G
 
@@ -103,388 +101,46 @@
 
     test_mode: bool = False
 
+    PROXIED_PROPERTIES = {'audio_muted', 'callee', 'sid', 'video_muted'}
+    PROXIED_METHODS = {'answer_call', 'end_call', 'on_accepted_call', 'on_ice_candidates_new', 'setup_call', 'start_pipeline'}
+
     def __init__(self, parent_calls: "Calls", profile: str) -> None:
         self.parent_calls = parent_calls
         self.profile = profile
-        self.pipeline = None
-        self.reset_instance()
-
-    @property
-    def sdp_set(self):
-        return self._sdp_set
-
-    @sdp_set.setter
-    def sdp_set(self, is_set: bool):
-        self._sdp_set = is_set
-        if is_set:
-            self.on_ice_candidates_new(self.remote_candidates_buffer)
-            for data in self.remote_candidates_buffer.values():
-                data["candidates"].clear()
-
-    @property
-    def media_types(self):
-        if self._media_types is None:
-            raise Exception("self._media_types should not be None!")
-        return self._media_types
-
-    @media_types.setter
-    def media_types(self, new_media_types: dict) -> None:
-        self._media_types = new_media_types
-        self._media_types_inv = {v: k for k, v in new_media_types.items()}
-
-    @property
-    def media_types_inv(self) -> dict:
-        if self._media_types_inv is None:
-            raise Exception("self._media_types_inv should not be None!")
-        return self._media_types_inv
-
-    def get_sdp_mline_index(self, media_type: str) -> int:
-        """Gets the sdpMLineIndex for a given media type.
-
-        @param media_type: The type of the media.
-        """
-        for index, m_type in self.media_types.items():
-            if m_type == media_type:
-                return index
-        raise ValueError(f"Media type '{media_type}' not found")
-
-    def _set_media_types(self, offer_sdp: str) -> None:
-        """Sets media types from offer SDP
-
-        @param offer: RTC session description containing the offer
-        """
-        sdp_lines = offer_sdp.splitlines()
-        media_types = {}
-        mline_index = 0
-
-        for line in sdp_lines:
-            if line.startswith("m="):
-                media_types[mline_index] = line[2 : line.find(" ")]
-                mline_index += 1
-
-        self.media_types = media_types
-
-    def _a_call(self, method, *args, **kwargs):
-        """Call an async method in main thread"""
-
-        def wrapper(__):
-            aio.run_async(method(*args, **kwargs))
-            return False
-
-        Clock.schedule_once(wrapper)
-
-    def get_payload_types(
-        self, sdpmsg, video_encoding: str, audio_encoding: str
-    ) -> dict[str, int | None]:
-        """Find the payload types for the specified video and audio encoding.
-
-        Very simplistically finds the first payload type matching the encoding
-        name. More complex applications will want to match caps on
-        profile-level-id, packetization-mode, etc.
-        """
-        # method coming from gstreamer example (Matthew Waters, Nirbheek Chauhan) at
-        # subprojects/gst-examples/webrtc/sendrecv/gst/webrtc_sendrecv.py
-        video_pt = None
-        audio_pt = None
-        for i in range(0, sdpmsg.medias_len()):
-            media = sdpmsg.get_media(i)
-            for j in range(0, media.formats_len()):
-                fmt = media.get_format(j)
-                if fmt == "webrtc-datachannel":
-                    continue
-                pt = int(fmt)
-                caps = media.get_caps_from_media(pt)
-                s = caps.get_structure(0)
-                encoding_name = s["encoding-name"]
-                if video_pt is None and encoding_name == video_encoding:
-                    video_pt = pt
-                elif audio_pt is None and encoding_name == audio_encoding:
-                    audio_pt = pt
-        return {video_encoding: video_pt, audio_encoding: audio_pt}
-
-    def parse_ice_candidate(self, candidate_string):
-        """Parses the ice candidate string.
-
-        @param candidate_string: The ice candidate string to be parsed.
-        """
-        pattern = re.compile(
-            r"candidate:(?P<foundation>\S+) (?P<component_id>\d+) (?P<transport>\S+) "
-            r"(?P<priority>\d+) (?P<address>\S+) (?P<port>\d+) typ "
-            r"(?P<type>\S+)(?: raddr (?P<rel_addr>\S+) rport "
-            r"(?P<rel_port>\d+))?(?: generation (?P<generation>\d+))?"
-        )
-        match = pattern.match(candidate_string)
-        if match:
-            candidate_dict = match.groupdict()
-
-            # Apply the correct types to the dictionary values
-            candidate_dict["component_id"] = int(candidate_dict["component_id"])
-            candidate_dict["priority"] = int(candidate_dict["priority"])
-            candidate_dict["port"] = int(candidate_dict["port"])
-
-            if candidate_dict["rel_port"]:
-                candidate_dict["rel_port"] = int(candidate_dict["rel_port"])
-
-            if candidate_dict["generation"]:
-                candidate_dict["generation"] = candidate_dict["generation"]
-
-            # Remove None values
-            return {k: v for k, v in candidate_dict.items() if v is not None}
-        else:
-            log.warning(f"can't parse candidate: {candidate_string!r}")
-            return None
-
-    def build_ice_candidate(self, parsed_candidate):
-        """Builds ICE candidate
-
-        @param parsed_candidate: Dictionary containing parsed ICE candidate
-        """
-        base_format = (
-            "candidate:{foundation} {component_id} {transport} {priority} "
-            "{address} {port} typ {type}"
-        )
-
-        if parsed_candidate.get("rel_addr") and parsed_candidate.get("rel_port"):
-            base_format += " raddr {rel_addr} rport {rel_port}"
-
-        if parsed_candidate.get("generation"):
-            base_format += " generation {generation}"
-
-        return base_format.format(**parsed_candidate)
-
-    def extract_ufrag_pwd(self, sdp: str) -> tuple[str, str]:
-        """Retrieves ICE password and user fragment for SDP offer.
-
-        @param sdp: The Session Description Protocol offer string.
-        @return: ufrag and pwd
-        @raise ValueError: Can't extract ufrag and password
-        """
-        ufrag_line = re.search(r"ice-ufrag:(\S+)", sdp)
-        pwd_line = re.search(r"ice-pwd:(\S+)", sdp)
-
-        if ufrag_line and pwd_line:
-            ufrag = self.ufrag = ufrag_line.group(1)
-            pwd = self.pwd = pwd_line.group(1)
-            return ufrag, pwd
-        else:
-            log.error(f"SDP with missing ice-ufrag or ice-pwd:\n{sdp}")
-            raise ValueError("Can't extract ice-ufrag and ice-pwd from SDP")
-
-    def reset_instance(self):
-        """Inits or resets the instance variables to their default state."""
-        self.role: str | None = None
-        if self.pipeline is not None:
-            self.pipeline.set_state(Gst.State.NULL)
-        self.pipeline = None
-        self.texture_map: dict[VideoStreamWidget, TextureData] = {}
-        self._remote_video_pad = None
-        self.sid: str | None = None
-        self.offer: str | None = None
-        self.local_candidates_buffer = {}
-        self.ufrag: str | None = None
-        self.pwd: str | None = None
-        self.callee: str | None = None
-        self._media_types = None
-        self._media_types_inv = None
-        self._sdp_set: bool = False
-        self.remote_candidates_buffer: dict[str, dict[str, list]] = {
-            "audio": {"candidates": []},
-            "video": {"candidates": []},
-        }
-        self._media_types = None
-        self._media_types_inv = None
-        self.audio_valve = None
-        self.video_valve = None
+        self.webrtc = webrtc.WebRTC(
+            G.host.a_bridge,
+            profile,
+            sinks=webrtc.SINKS_TEST if self.test_mode else webrtc.SINKS_APP,
+            appsink_data=webrtc.AppSinkData(
+                local_video_cb=partial(
+                    self.on_new_sample,
+                    update_sample_method=self.update_sample,
+                    video_widget=self.parent_calls.local_video
+                ),
+                remote_video_cb=partial(
+                    self.on_new_sample,
+                    update_sample_method=self.update_sample,
+                    video_widget=self.parent_calls.remote_video
+                )
+            ),
+            reset_cb=self.on_reset
 
-    async def setup_call(
-        self,
-        role: str,
-        audio_pt: int | None = 96,
-        video_pt: int | None = 97,
-    ) -> None:
-        """Sets up the call.
-
-        This method establishes the Gstreamer pipeline for audio and video communication.
-        The method also manages STUN and TURN server configurations, signal watchers, and
-        various connection handlers for the webrtcbin.
-
-        @param role: The role for the call, either 'initiator' or 'responder'.
-        @param audio_pt: The payload type for the audio stream.
-        @param video_pt: The payload type for the video stream
-
-        @raises NotImplementedError: If audio_pt or video_pt is set to None.
-        @raises AssertionError: If the role is not 'initiator' or 'responder'.
-        """
-        assert role in ("initiator", "responder")
-        self.role = role
-        if audio_pt is None or video_pt is None:
-            raise NotImplementedError("None value is not handled yet")
-
-        if self.test_mode:
-            video_source_elt = "videotestsrc is-live=true pattern=ball"
-            audio_source_elt = "audiotestsrc"
-        else:
-            video_source_elt = "v4l2src"
-            audio_source_elt = "pulsesrc"
-
-        self.gst_pipe_desc = f"""
-        webrtcbin latency=100 name=sendrecv bundle-policy=max-compat
-
-        input-selector name=video_selector
-        ! videorate
-        ! video/x-raw,framerate=30/1
-        ! tee name=t
-
-        {video_source_elt} name=video_src ! queue ! video_selector.
-        videotestsrc is-live=true pattern=black ! queue ! video_selector.
+        )
+        self.pipeline = None
 
-        t.
-        ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream
-        ! videoconvert
-        ! vp8enc deadline=1 keyframe-max-dist=60
-        ! rtpvp8pay picture-id-mode=15-bit
-        ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt}
-        ! sendrecv.
-
-        t.
-        ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0
-        ! videoconvert
-        ! appsink name=local_video_sink emit-signals=true drop=true max-buffers=1 sync=True
-
-        {audio_source_elt} name=audio_src
-        ! valve name=audio_valve
-        ! queue max-size-buffers=10 max-size-time=0 max-size-bytes=0
-        ! audioconvert
-        ! audioresample
-        ! opusenc audio-type=voice
-        ! rtpopuspay
-        ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt}
-        ! sendrecv.
-        """
-
-        log.debug(f"Gstreamer pipeline: {self.gst_pipe_desc}")
-
-        # Create the pipeline
-        self.pipeline = Gst.parse_launch(self.gst_pipe_desc)
-        if not self.pipeline:
-            raise exceptions.InternalError("Failed to create Gstreamer pipeline.")
-
-        self.webrtc = self.pipeline.get_by_name("sendrecv")
-        self.video_src = self.pipeline.get_by_name("video_src")
-        self.video_selector = self.pipeline.get_by_name("video_selector")
-        self.audio_valve = self.pipeline.get_by_name("audio_valve")
-
-        if self.parent_calls.video_muted:
-            self.on_video_mute(True)
-        if self.parent_calls.audio_muted:
-            self.on_audio_mute(True)
-
-        # set STUN and TURN servers
-        external_disco = data_format.deserialise(
-            await G.host.a_bridge.external_disco_get("", self.profile), type_check=list
-        )
+    def __getattr__(self, name):
+        if name in self.PROXIED_PROPERTIES or name in self.PROXIED_METHODS:
+            return getattr(self.webrtc, name)
+        raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'")
 
-        for server in external_disco:
-            if server["type"] == "stun":
-                if server["transport"] == "tcp":
-                    log.info(
-                        "ignoring TCP STUN server, GStreamer only support one STUN server"
-                    )
-                url = f"stun://{server['host']}:{server['port']}"
-                log.debug(f"adding stun server: {url}")
-                self.webrtc.set_property("stun-server", url)
-            elif server["type"] == "turn":
-                url = "{scheme}://{username}:{password}@{host}:{port}".format(
-                    scheme="turns" if server["transport"] == "tcp" else "turn",
-                    username=quote_plus(server["username"]),
-                    password=quote_plus(server["password"]),
-                    host=server["host"],
-                    port=server["port"],
-                )
-                log.debug(f"adding turn server: {url}")
-
-                if not self.webrtc.emit("add-turn-server", url):
-                    log.warning(f"Erreur while adding TURN server {url}")
-
-        # local video feedback
-        local_video_sink = self.pipeline.get_by_name("local_video_sink")
-        local_video_sink.set_property("emit-signals", True)
-        local_video_sink.connect(
-            "new-sample",
-            self.on_new_sample,
-            self.update_sample,
-            self.parent_calls.local_video,
-        )
-        local_video_sink_caps = Gst.Caps.from_string(f"video/x-raw,format=RGB")
-        local_video_sink.set_property("caps", local_video_sink_caps)
-
-        # Create bus and associate signal watchers
-        self.bus = self.pipeline.get_bus()
-        if not self.bus:
-            log.error("Failed to get bus from pipeline.")
-            return
-
-        self.bus.add_signal_watch()
-        self.webrtc.connect("pad-added", self.on_pad_added)
-        self.bus.connect("message::error", self.on_bus_error)
-        self.bus.connect("message::eos", self.on_bus_eos)
-        self.webrtc.connect("on-negotiation-needed", self.on_negotiation_needed)
-        self.webrtc.connect("on-ice-candidate", self.on_ice_candidate)
-        self.webrtc.connect(
-            "notify::ice-gathering-state", self.on_ice_gathering_state_change
-        )
-        self.webrtc.connect("notify::ice-connection-state", self.on_ice_connection_state)
+    def __setattr__(self, name, value):
+        if name in self.PROXIED_PROPERTIES:
+            setattr(self.webrtc, name, value)
+        else:
+            super().__setattr__(name, value)
 
-    def start_pipeline(self) -> None:
-        """Starts the GStreamer pipeline."""
-        log.debug("starting the pipeline")
-        self.pipeline.set_state(Gst.State.PLAYING)
-
-    def on_negotiation_needed(self, webrtc):
-        """Initiate SDP offer when negotiation is needed."""
-        log.debug("Negotiation needed.")
-        if self.role == "initiator":
-            log.debug("Creating offer…")
-            promise = Gst.Promise.new_with_change_func(self.on_offer_created)
-            self.webrtc.emit("create-offer", None, promise)
-
-    def on_offer_created(self, promise):
-        """Callback for when SDP offer is created."""
-        log.info("on_offer_created called")
-        assert promise.wait() == Gst.PromiseResult.REPLIED
-        reply = promise.get_reply()
-        if reply is None:
-            log.error("Promise reply is None. Offer creation might have failed.")
-            return
-        offer = reply["offer"]
-        self.offer = offer.sdp.as_text()
-        log.info(f"SDP offer created: \n{self.offer}")
-        self._set_media_types(self.offer)
-        promise = Gst.Promise.new()
-        self.webrtc.emit("set-local-description", offer, promise)
-        promise.interrupt()
-        self._a_call(self._start_call)
-
-    def on_answer_set(self, promise):
-        assert promise.wait() == Gst.PromiseResult.REPLIED
-
-    def on_answer_created(self, promise, _, __):
-        """Callback for when SDP answer is created."""
-        assert promise.wait() == Gst.PromiseResult.REPLIED
-        reply = promise.get_reply()
-        answer = reply["answer"]
-        promise = Gst.Promise.new()
-        self.webrtc.emit("set-local-description", answer, promise)
-        promise.interrupt()
-        answer_sdp = answer.sdp.as_text()
-        log.info(f"SDP answer set: \n{answer_sdp}")
-        self.sdp_set = True
-        self._a_call(G.host.a_bridge.call_answer_sdp, self.sid, answer_sdp, self.profile)
-
-    def on_offer_set(self, promise):
-        assert promise.wait() == Gst.PromiseResult.REPLIED
-        promise = Gst.Promise.new_with_change_func(self.on_answer_created, None, None)
-        self.webrtc.emit("create-answer", None, promise)
+    def on_reset(self):
+        self.texture_map: dict[VideoStreamWidget, TextureData] = {}
 
     def on_new_sample(
         self,
@@ -583,311 +239,6 @@
             if buf is not None and mapinfo is not None:
                 buf.unmap(mapinfo)
 
-    def on_remote_decodebin_stream(self, _, pad: Gst.Pad) -> None:
-        """Handle the stream from the remote decodebin.
-
-        This method processes the incoming stream from the remote decodebin, determining
-        whether it's video or audio. It then sets up the appropriate GStreamer elements
-        for video/audio processing and adds them to the pipeline.
-
-        @param pad: The Gst.Pad from the remote decodebin producing the stream.
-        """
-        assert self.pipeline is not None
-        if not pad.has_current_caps():
-            log.error(f"{pad} has no caps, ignoring")
-            return
-
-        self.pipeline.set_state(Gst.State.PAUSED)
-        caps = pad.get_current_caps()
-        assert len(caps)
-        s = caps[0]
-        name = s.get_name()
-        log.debug(f"====> NAME START: {name}")
-
-        q = Gst.ElementFactory.make("queue")
-        q.set_property("max-size-time", 0)
-        q.set_property("max-size-bytes", 0)
-        q.set_property("max-size-buffers", 5)
-
-        if name.startswith("video"):
-            log.debug("===> VIDEO OK")
-
-            self._remote_video_pad = pad
-
-            # Check and log the original size of the video
-            width = s.get_int("width").value
-            height = s.get_int("height").value
-            log.info(f"Original video size: {width}x{height}")
-
-            # This is a fix for an issue found with Movim on desktop: a non standard
-            # resolution is used (990x557) resulting in bad alignement and no color in
-            # rendered image
-            adjust_resolution = width % 4 != 0 or height % 4 != 0
-            if adjust_resolution:
-                log.info("non standard resolution, we need to adjust size")
-                width = (width + 3) // 4 * 4
-                height = (height + 3) // 4 * 4
-                log.info(f"Adjusted video size: {width}x{height}")
-
-            conv = Gst.ElementFactory.make("videoconvert")
-            remote_video_sink = Gst.ElementFactory.make("appsink")
-
-            appsink_caps = Gst.Caps.from_string("video/x-raw,format=RGB")
-            remote_video_sink.set_property("caps", appsink_caps)
-
-            remote_video_sink.set_property("emit-signals", True)
-            remote_video_sink.set_property("drop", True)
-            remote_video_sink.set_property("max-buffers", 1)
-            remote_video_sink.set_property("sync", True)
-            remote_video_sink.connect(
-                "new-sample",
-                self.on_new_sample,
-                self.update_sample,
-                self.parent_calls.remote_video,
-            )
-
-            if adjust_resolution:
-                videoscale = Gst.ElementFactory.make("videoscale")
-                adjusted_caps = Gst.Caps.from_string(
-                    f"video/x-raw,width={width},height={height}"
-                )
-                capsfilter = Gst.ElementFactory.make("capsfilter")
-                capsfilter.set_property("caps", adjusted_caps)
-
-                self.pipeline.add(q, conv, videoscale, capsfilter, remote_video_sink)
-                self.pipeline.sync_children_states()
-                pad.link(q.get_static_pad("sink"))
-                q.link(conv)
-                conv.link(videoscale)
-                videoscale.link(capsfilter)
-                capsfilter.link(remote_video_sink)
-            else:
-                self.pipeline.add(q, conv, remote_video_sink)
-                self.pipeline.sync_children_states()
-                pad.link(q.get_static_pad("sink"))
-                q.link(conv)
-                conv.link(remote_video_sink)
-
-        elif name.startswith("audio"):
-            log.debug("===> Audio OK")
-            conv = Gst.ElementFactory.make("audioconvert")
-            resample = Gst.ElementFactory.make("audioresample")
-            remote_audio_sink = Gst.ElementFactory.make("autoaudiosink")
-            self.pipeline.add(q, conv, resample, remote_audio_sink)
-            self.pipeline.sync_children_states()
-            pad.link(q.get_static_pad("sink"))
-            q.link(conv)
-            conv.link(resample)
-            resample.link(remote_audio_sink)
-        self.pipeline.set_state(Gst.State.PLAYING)
-
-    def on_pad_added(self, __, pad: Gst.Pad) -> None:
-        """Handle the addition of a new pad to the element.
-
-        When a new source pad is added to the element, this method creates a decodebin,
-        connects it to handle the stream, and links the pad to the decodebin.
-
-        @param __: Placeholder for the signal source. Not used in this method.
-        @param pad: The newly added pad.
-        """
-
-        log.debug("on_pad_added")
-        if pad.direction != Gst.PadDirection.SRC:
-            return
-
-        decodebin = Gst.ElementFactory.make("decodebin")
-        decodebin.connect("pad-added", self.on_remote_decodebin_stream)
-        self.pipeline.add(decodebin)
-        decodebin.sync_state_with_parent()
-        pad.link(decodebin.get_static_pad("sink"))
-
-    async def _start_call(self) -> None:
-        """Initiate the call.
-
-        Initiates a call with the callee using the stored offer. If there are any buffered
-        local ICE candidates, they are sent as part of the initiation.
-        """
-        assert self.callee is not None
-        self.sid = await G.host.a_bridge.call_start(
-            str(self.callee), data_format.serialise({"sdp": self.offer}), self.profile
-        )
-        if self.local_candidates_buffer:
-            log.debug(
-                f"sending buffered local ICE candidates: {self.local_candidates_buffer}"
-            )
-            if self.pwd is None:
-                sdp = self.webrtc.props.local_description.sdp.as_text()
-                self.extract_ufrag_pwd(sdp)
-            ice_data = {}
-            for media_type, candidates in self.local_candidates_buffer.items():
-                ice_data[media_type] = {
-                    "ufrag": self.ufrag,
-                    "pwd": self.pwd,
-                    "candidates": candidates,
-                }
-            await G.host.a_bridge.ice_candidates_add(
-                self.sid, data_format.serialise(ice_data), self.profile
-            )
-            self.local_candidates_buffer.clear()
-
-    def _remote_sdp_set(self, promise) -> None:
-        assert promise.wait() == Gst.PromiseResult.REPLIED
-        self.sdp_set = True
-
-    def on_accepted_call(self, sdp: str, profile: str) -> None:
-        """Outgoing call has been accepted.
-
-        @param sdp: The SDP answer string received from the other party.
-        @param profile: Profile used for the call.
-        """
-        log.debug(f"SDP answer received: \n{sdp}")
-
-        __, sdpmsg = GstSdp.SDPMessage.new_from_text(sdp)
-        answer = GstWebRTC.WebRTCSessionDescription.new(
-            GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg
-        )
-        promise = Gst.Promise.new_with_change_func(self._remote_sdp_set)
-        self.webrtc.emit("set-remote-description", answer, promise)
-
-    async def answer_call(self, sdp: str, profile: str) -> None:
-        """Answer an incoming call
-
-        @param sdp: The SDP offer string received from the initiator.
-        @param profile: Profile used for the call.
-
-        @raise AssertionError: Raised when either "VP8" or "OPUS" is not present in
-            payload types.
-        """
-        log.debug(f"SDP offer received: \n{sdp}")
-        self._set_media_types(sdp)
-        __, offer_sdp_msg = GstSdp.SDPMessage.new_from_text(sdp)
-        payload_types = self.get_payload_types(
-            offer_sdp_msg, video_encoding="VP8", audio_encoding="OPUS"
-        )
-        assert "VP8" in payload_types
-        assert "OPUS" in payload_types
-        await self.setup_call(
-            "responder", audio_pt=payload_types["OPUS"], video_pt=payload_types["VP8"]
-        )
-        self.start_pipeline()
-        offer = GstWebRTC.WebRTCSessionDescription.new(
-            GstWebRTC.WebRTCSDPType.OFFER, offer_sdp_msg
-        )
-        promise = Gst.Promise.new_with_change_func(self.on_offer_set)
-        self.webrtc.emit("set-remote-description", offer, promise)
-
-    def on_ice_candidate(self, webrtc, mline_index, candidate_sdp):
-        """Handles the on-ice-candidate signal of webrtcbin.
-
-        @param webrtc: The webrtcbin element.
-        @param mlineindex: The mline index.
-        @param candidate: The ICE candidate.
-        """
-        log.debug(
-            f"Local ICE candidate. MLine Index: {mline_index}, Candidate: {candidate_sdp}"
-        )
-        parsed_candidate = self.parse_ice_candidate(candidate_sdp)
-        try:
-            media_type = self.media_types[mline_index]
-        except KeyError:
-            raise exceptions.InternalError("can't find media type")
-
-        if self.sid is None:
-            log.debug("buffering local ICE candidate")
-            self.local_candidates_buffer.setdefault(media_type, []).append(
-                parsed_candidate
-            )
-        else:
-            sdp = self.webrtc.props.local_description.sdp.as_text()
-            assert sdp is not None
-            ufrag, pwd = self.extract_ufrag_pwd(sdp)
-            ice_data = {"ufrag": ufrag, "pwd": pwd, "candidates": [parsed_candidate]}
-            self._a_call(
-                G.host.a_bridge.ice_candidates_add,
-                self.sid,
-                data_format.serialise({media_type: ice_data}),
-                self.profile,
-            )
-
-    def on_ice_candidates_new(self, candidates: dict) -> None:
-        """Handle new ICE candidates.
-
-        @param candidates: A dictionary containing media types ("audio" or "video") as
-            keys and corresponding ICE data as values.
-
-        @raise exceptions.InternalError: Raised when sdp mline index is not found.
-        """
-        if not self.sdp_set:
-            log.debug("buffering remote ICE candidate")
-            for media_type in ("audio", "video"):
-                media_candidates = candidates.get(media_type)
-                if media_candidates:
-                    buffer = self.remote_candidates_buffer[media_type]
-                    buffer["candidates"].extend(media_candidates["candidates"])
-            return
-        for media_type, ice_data in candidates.items():
-            for candidate in ice_data["candidates"]:
-                candidate_sdp = self.build_ice_candidate(candidate)
-                try:
-                    mline_index = self.get_sdp_mline_index(media_type)
-                except Exception as e:
-                    raise exceptions.InternalError(f"Can't find sdp mline index: {e}")
-                self.webrtc.emit("add-ice-candidate", mline_index, candidate_sdp)
-                log.debug(
-                    f"Remote ICE candidate added. MLine Index: {mline_index}, "
-                    f"{candidate_sdp}"
-                )
-
-    def on_ice_gathering_state_change(self, pspec, __):
-        state = self.webrtc.get_property("ice-gathering-state")
-        log.debug(f"ICE gathering state changed to {state}")
-
-    def on_ice_connection_state(self, pspec, __):
-        state = self.webrtc.props.ice_connection_state
-        if state == GstWebRTC.WebRTCICEConnectionState.FAILED:
-            log.error("ICE connection failed")
-        log.info(f"ICE connection state changed to {state}")
-
-    def on_bus_error(self, bus: Gst.Bus, message: Gst.Message) -> None:
-        """Handles the GStreamer bus error messages.
-
-        @param bus: The GStreamer bus.
-        @param message: The error message.
-        """
-        err, debug = message.parse_error()
-        log.error(f"Error from {message.src.get_name()}: {err.message}")
-        log.error(f"Debugging info: {debug}")
-
-    def on_bus_eos(self, bus: Gst.Bus, message: Gst.Message) -> None:
-        """Handles the GStreamer bus eos messages.
-
-        @param bus: The GStreamer bus.
-        @param message: The eos message.
-        """
-        log.info("End of stream")
-
-    def on_audio_mute(self, muted: bool) -> None:
-        if self.audio_valve is not None:
-            self.audio_valve.set_property("drop", muted)
-            state = "muted" if muted else "unmuted"
-            log.info(f"audio is now {state}")
-
-    def on_video_mute(self, muted: bool) -> None:
-        if self.video_selector is not None:
-            # when muted, we switch to a black image and deactivate the camera
-            if not muted:
-                self.video_src.set_state(Gst.State.PLAYING)
-            pad = self.video_selector.get_static_pad("sink_1" if muted else "sink_0")
-            self.video_selector.props.active_pad = pad
-            if muted:
-                self.video_src.set_state(Gst.State.NULL)
-            state = "muted" if muted else "unmuted"
-            log.info(f"video is now {state}")
-
-    async def end_call(self) -> None:
-        """Stop streaming and clean instance"""
-        self.reset_instance()
-
 
 class Calls(
         quick_widgets.QuickWidget,
@@ -918,10 +269,6 @@
         self.header_input_add_extra(call_btn)
         self.webrtc = WebRTC(self, self.profile)
         self.previous_fullscreen = None
-        self.bind(
-            audio_muted=lambda __, value: self.webrtc.on_audio_mute(value),
-            video_muted=lambda __, value: self.webrtc.on_video_mute(value),
-        )
         self.reset_instance()
 
     @property
@@ -1029,6 +376,12 @@
             self.screen_manager.transition.direction = "down"
             self.screen_manager.current = "search"
 
+    def on_audio_muted(self, instance, muted: bool) -> None:
+        self.webrtc.audio_muted = muted
+
+    def on_video_muted(self, instance, muted: bool) -> None:
+        self.webrtc.video_muted = muted
+
     def on_fullscreen(self, instance, fullscreen: bool) -> None:
         if fullscreen:
             G.host.app.show_head_widget(False, animation=False)
--- a/tests/unit/test_plugin_calls.py	Wed Oct 25 15:29:33 2023 +0200
+++ b/tests/unit/test_plugin_calls.py	Wed Nov 01 13:41:07 2023 +0100
@@ -18,8 +18,6 @@
 
 from unittest.mock import AsyncMock, MagicMock
 
-from gi.repository import Gst
-from libervia.backend.core import exceptions
 from libervia.backend.tools.common import data_format
 import pytest
 
@@ -36,39 +34,12 @@
     return host
 
 
-@pytest.fixture(scope="function")
-def webrtc():
-    """Fixture for WebRTC instantiation."""
-    host_mock = MagicMock()
-    profile = "test_profile"
-    instance = plugin_wid_calls.WebRTC(host_mock, profile)
-
-    instance._set_media_types = MagicMock()
-    instance.start_pipeline = MagicMock()
-    instance.webrtc = MagicMock()
-    instance.webrtc.emit = MagicMock()
-
-    instance.GstSdp_SDPMessage_new_from_text = MagicMock()
-    instance.GstWebRTC_WebRTCSessionDescription_new = MagicMock()
-    instance.Gst_Promise_new_with_change_func = MagicMock()
-
-    return instance
-
-
 @pytest.fixture
 def calls(monkeypatch, host):
     """Fixture for Call UI instantiation."""
     for attr in ("header_box", "local_video", "remote_video", "screen_manager"):
-        monkeypatch.setattr(
-            plugin_wid_calls.Calls,
-            attr,
-            MagicMock()
-        )
-    calls = plugin_wid_calls.Calls(
-        host,
-        "test_peer@example.org",
-        ["test_profile"]
-    )
+        monkeypatch.setattr(plugin_wid_calls.Calls, attr, MagicMock())
+    calls = plugin_wid_calls.Calls(host, "test_peer@example.org", ["test_profile"])
     calls.jid_selector = MagicMock()
     calls.header_input = MagicMock()
     calls.header_input.text = "fake_jid@domain"
@@ -80,197 +51,7 @@
     return calls
 
 
-class TestWebRtc:
-    def test_get_payload_types(self, webrtc):
-        """The method can identify the correct payload types for video and audio."""
-        fake_sdpmsg = MagicMock()
-        fake_media = MagicMock()
-        fake_caps = MagicMock()
-        fake_structure = MagicMock()
-
-        # This side effect will return 'fake_video_encoding' first, then
-        # 'fake_audio_encoding'.
-        fake_structure.__getitem__.side_effect = [
-            "fake_video_encoding",
-            "fake_audio_encoding",
-        ]
-        fake_caps.get_structure.return_value = fake_structure
-        fake_media.get_format.side_effect = ["webrtc-datachannel", "10", "20"]
-        fake_media.get_caps_from_media.return_value = fake_caps
-        fake_sdpmsg.get_media.return_value = fake_media
-        fake_sdpmsg.medias_len.return_value = 1
-        fake_media.formats_len.return_value = 3
-
-        result = webrtc.get_payload_types(
-            fake_sdpmsg, "fake_video_encoding", "fake_audio_encoding"
-        )
-        expected_result = {"fake_video_encoding": 10, "fake_audio_encoding": 20}
-
-        assert result == expected_result
-
-    def test_on_accepted_call(self, webrtc):
-        """The method correctly sets the remote SDP upon acceptance of an outgoing call."""
-        sdp_str = "mock_sdp_string"
-        profile_str = "test_profile"
-
-        webrtc.on_accepted_call(sdp_str, profile_str)
-
-        # remote description must be set
-        assert webrtc.webrtc.emit.call_count == 1
-        assert webrtc.webrtc.emit.call_args[0][0] == "set-remote-description"
-
-    @pytest.mark.asyncio
-    async def test_answer_call(self, webrtc, monkeypatch):
-        """The method correctly answers an incoming call."""
-        mock_setup_call = AsyncMock()
-
-        def mock_get_payload_types(sdpmsg, video_encoding, audio_encoding):
-            return {"VP8": 96, "OPUS": 97}
-
-        monkeypatch.setattr(webrtc, "setup_call", mock_setup_call)
-        monkeypatch.setattr(webrtc, "get_payload_types", mock_get_payload_types)
-
-        sdp_str = "mock_sdp_string"
-        profile_str = "mock_profile"
-
-        await webrtc.answer_call(sdp_str, profile_str)
-
-        mock_setup_call.assert_called_once_with("responder", audio_pt=97, video_pt=96)
-
-        # remote description must be set
-        assert webrtc.webrtc.emit.call_count == 1
-        assert webrtc.webrtc.emit.call_args[0][0] == "set-remote-description"
-
-    def test_on_remote_decodebin_stream_video(self, webrtc, monkeypatch):
-        """The method correctly handles video streams from the remote decodebin."""
-        mock_pipeline = MagicMock()
-        monkeypatch.setattr(webrtc, "pipeline", mock_pipeline)
-
-        mock_pad = MagicMock()
-        mock_caps = MagicMock()
-        mock_structure = MagicMock()
-
-        mock_pad.has_current_caps.return_value = True
-        mock_pad.get_current_caps.return_value = mock_caps
-        mock_caps.__len__.return_value = 1
-        mock_caps.__getitem__.return_value = mock_structure
-        mock_structure.get_name.return_value = "video/x-h264"
-        # We use non-standard resolution as example to trigger the workaround
-        mock_structure.get_int.side_effect = lambda x: MagicMock(
-            value=990 if x == "width" else 557
-        )
-
-        webrtc.on_remote_decodebin_stream(None, mock_pad)
-
-        assert webrtc._remote_video_pad == mock_pad
-        mock_pipeline.add.assert_called()
-        mock_pipeline.set_state.assert_called()
-        mock_pad.link.assert_called()
-
-    def test_on_remote_decodebin_stream_audio(self, webrtc, monkeypatch):
-        """The method correctly handles audio streams from the remote decodebin."""
-        mock_pipeline = MagicMock()
-        monkeypatch.setattr(webrtc, "pipeline", mock_pipeline)
-
-        mock_pad = MagicMock()
-        mock_caps = MagicMock()
-        mock_structure = MagicMock()
-
-        mock_pad.has_current_caps.return_value = True
-        mock_pad.get_current_caps.return_value = mock_caps
-        mock_caps.__len__.return_value = 1
-        mock_caps.__getitem__.return_value = mock_structure
-        mock_structure.get_name.return_value = "audio/x-raw"
-
-        webrtc.on_remote_decodebin_stream(None, mock_pad)
-
-        mock_pipeline.add.assert_called()
-        mock_pipeline.set_state.assert_called()
-        mock_pad.link.assert_called()
-
-    @pytest.mark.asyncio
-    async def test_setup_call_correct_role(self, host, webrtc, monkeypatch):
-        """Roles are set in setup_call."""
-        monkeypatch.setattr(Gst, "parse_launch", MagicMock())
-        monkeypatch.setattr(data_format, "deserialise", MagicMock(return_value=[]))
-
-        await webrtc.setup_call("initiator")
-        assert webrtc.role == "initiator"
-
-        await webrtc.setup_call("responder")
-        assert webrtc.role == "responder"
-
-        with pytest.raises(AssertionError):
-            await webrtc.setup_call("invalid_role")
-
-    @pytest.mark.asyncio
-    async def test_setup_call_test_mode(self, host, webrtc, monkeypatch):
-        """Test mode use fake video and audio in setup_call."""
-        monkeypatch.setattr(data_format, "deserialise", MagicMock(return_value=[]))
-        monkeypatch.setattr(webrtc, "test_mode", True)
-        await webrtc.setup_call("initiator")
-        assert "videotestsrc" in webrtc.gst_pipe_desc
-        assert "audiotestsrc" in webrtc.gst_pipe_desc
-
-    @pytest.mark.asyncio
-    async def test_setup_call_normal_mode(self, host, webrtc, monkeypatch):
-        """Normal mode use real video and audio in setup_call."""
-        monkeypatch.setattr(data_format, "deserialise", MagicMock(return_value=[]))
-        monkeypatch.setattr(webrtc, "test_mode", False)
-        await webrtc.setup_call("initiator")
-        assert "v4l2src" in webrtc.gst_pipe_desc
-        assert "pulsesrc" in webrtc.gst_pipe_desc
-
-    @pytest.mark.asyncio
-    async def test_setup_call_with_stun_and_turn(self, host, webrtc, monkeypatch):
-        """STUN and TURN server configurations are done in setup_call."""
-        mock_pipeline = MagicMock()
-        mock_parse_launch = MagicMock()
-        mock_parse_launch.return_value = mock_pipeline
-        monkeypatch.setattr(Gst, "parse_launch", mock_parse_launch)
-
-        mock_pipeline.get_by_name.return_value = webrtc.webrtc
-
-        mock_external_disco = [
-            {"type": "stun", "transport": "udp", "host": "stun.host", "port": "3478"},
-            {
-                "type": "turn",
-                "transport": "udp",
-                "host": "turn.host",
-                "port": "3478",
-                "username": "user",
-                "password": "pass",
-            },
-        ]
-
-        monkeypatch.setattr(
-            data_format, "deserialise", MagicMock(return_value=mock_external_disco)
-        )
-
-        mock_emit = AsyncMock()
-        monkeypatch.setattr(webrtc.webrtc, "emit", mock_emit)
-
-        mock_set_property = AsyncMock()
-        monkeypatch.setattr(webrtc.webrtc, "set_property", mock_set_property)
-
-        await webrtc.setup_call("initiator")
-
-        G.host.a_bridge.external_disco_get.assert_called_once_with("", webrtc.profile)
-        mock_set_property.assert_any_call("stun-server", "stun://stun.host:3478")
-        mock_emit.assert_called_once_with(
-            "add-turn-server", "turn://user:pass@turn.host:3478"
-        )
-
-    @pytest.mark.asyncio
-    async def test_setup_call_gstreamer_pipeline_failure(self, webrtc, monkeypatch):
-        """Test setup_call method handling Gstreamer pipeline failure."""
-        monkeypatch.setattr(Gst, "parse_launch", lambda _: None)
-        with pytest.raises(exceptions.InternalError):
-            await webrtc.setup_call("initiator")
-
-
 class TestCalls:
-
     @pytest.mark.asyncio
     async def test_toggle_call_sid_none(self, monkeypatch, calls):
         """Call is started when there is not sid set."""
@@ -293,7 +74,6 @@
         host.a_bridge.call_end.assert_called_once_with("test_sid", "", calls.profile)
         assert calls.in_call == False
 
-
     @pytest.mark.asyncio
     async def test_on_incoming_call_sid_none(self, monkeypatch, host, calls):
         """Incoming call is accepted if no ongoing call."""
@@ -307,9 +87,7 @@
         assert calls.in_call == True
         assert calls.webrtc.sid == "test_sid"
         host.a_bridge.action_launch.assert_called_once_with(
-            fake_action_id,
-            data_format.serialise({"cancelled": False}),
-            fake_profile
+            fake_action_id, data_format.serialise({"cancelled": False}), fake_profile
         )
 
     @pytest.mark.asyncio
@@ -329,10 +107,7 @@
     @pytest.mark.asyncio
     async def test_on_call_setup_initiator(self, calls):
         """Correct method called if role is 'initiator'."""
-        setup_data = {
-            "role": "initiator",
-            "sdp": "fake_sdp"
-        }
+        setup_data = {"role": "initiator", "sdp": "fake_sdp"}
         profile = "fake_profile"
 
         await calls.on_call_setup(setup_data, profile)
@@ -342,15 +117,8 @@
     @pytest.mark.asyncio
     async def test_on_call_setup_responder(self, monkeypatch, calls):
         """Correct method called if role is 'responder'."""
-        monkeypatch.setattr(
-            calls.webrtc,
-            "answer_call",
-            AsyncMock()
-        )
-        setup_data = {
-            "role": "responder",
-            "sdp": "fake_sdp"
-        }
+        monkeypatch.setattr(calls.webrtc, "answer_call", AsyncMock())
+        setup_data = {"role": "responder", "sdp": "fake_sdp"}
         profile = "fake_profile"
 
         await calls.on_call_setup(setup_data, profile)
@@ -361,10 +129,7 @@
     @pytest.mark.asyncio
     async def test_on_call_setup_invalid_role(self, calls):
         """Nothing is called if role is neither 'initiator' nor 'responder'."""
-        setup_data = {
-            "role": "invalid_role",
-            "sdp": "fake_sdp"
-        }
+        setup_data = {"role": "invalid_role", "sdp": "fake_sdp"}
         profile = "fake_profile"
 
         await calls.on_call_setup(setup_data, profile)