diff libervia/frontends/tools/webrtc.py @ 4139:6745c6bd4c7a

frontends (tools): webrtc implementation: this is a factored implementation usable by all non-web frontends. Sources and Sinks can be configured easily to use tests source or local webcam/microphone, autosinks or a `appsink` that the frontend will use. rel 426
author Goffi <goffi@goffi.org>
date Wed, 01 Nov 2023 14:03:36 +0100
parents
children 970b6209526a
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/frontends/tools/webrtc.py	Wed Nov 01 14:03:36 2023 +0100
@@ -0,0 +1,909 @@
+#!/usr/bin/env python3
+
+# Libervia WebRTC implementation
+# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+import gi
+gi.require_versions({
+    "Gst": "1.0",
+    "GstWebRTC": "1.0"
+})
+from gi.repository import Gst, GstWebRTC, GstSdp
+
+try:
+    from gi.overrides import Gst as _
+except ImportError:
+    print(
+        "no GStreamer python overrides available, please install relevant pacakges on "
+        "your system."
+    )
+import asyncio
+from dataclasses import dataclass
+from datetime import datetime
+import logging
+import re
+from typing import Callable
+from urllib.parse import quote_plus
+
+from libervia.backend.core import exceptions
+from libervia.backend.tools.common import data_format
+from libervia.frontends.tools import aio
+
+
+log = logging.getLogger(__name__)
+
+Gst.init(None)
+
+SOURCES_AUTO = "auto"
+SOURCES_TEST = "test"
+SINKS_APP = "app"
+SINKS_AUTO = "auto"
+SINKS_TEST = "test"
+
+
+@dataclass
+class AppSinkData:
+    local_video_cb: Callable
+    remote_video_cb: Callable
+
+
+class WebRTC:
+    """GSTreamer based WebRTC implementation for audio and video communication.
+
+    This class encapsulates the WebRTC functionalities required for initiating and
+    handling audio and video calls.
+    """
+
+    def __init__(
+        self,
+        bridge,
+        profile: str,
+        sources: str = SOURCES_AUTO,
+        sinks: str = SINKS_AUTO,
+        appsink_data: AppSinkData | None = None,
+        reset_cb: Callable | None = None,
+    ) -> None:
+        self.main_loop = asyncio.get_event_loop()
+        self.bridge = bridge
+        self.profile = profile
+        self.pipeline = None
+        self._audio_muted = False
+        self._video_muted = False
+        self.sources = sources
+        self.sinks = sinks
+        if sinks == SINKS_APP:
+            self.appsink_data = appsink_data
+        elif appsink_data is not None:
+            raise exceptions.InternalError(
+                "appsink_data can only be used for SINKS_APP sinks"
+            )
+        self.reset_cb = reset_cb
+        self.reset_instance()
+
+    @property
+    def audio_muted(self):
+        return self._audio_muted
+
+    @audio_muted.setter
+    def audio_muted(self, muted: bool) -> None:
+        if muted != self._audio_muted:
+            self._audio_muted = muted
+            self.on_audio_mute(muted)
+
+    @property
+    def video_muted(self):
+        return self._video_muted
+
+    @video_muted.setter
+    def video_muted(self, muted: bool) -> None:
+        if muted != self._video_muted:
+            self._video_muted = muted
+            self.on_video_mute(muted)
+
+    @property
+    def sdp_set(self):
+        return self._sdp_set
+
+    @sdp_set.setter
+    def sdp_set(self, is_set: bool):
+        self._sdp_set = is_set
+        if is_set:
+            self.on_ice_candidates_new(self.remote_candidates_buffer)
+            for data in self.remote_candidates_buffer.values():
+                data["candidates"].clear()
+
+    @property
+    def media_types(self):
+        if self._media_types is None:
+            raise Exception("self._media_types should not be None!")
+        return self._media_types
+
+    @media_types.setter
+    def media_types(self, new_media_types: dict) -> None:
+        self._media_types = new_media_types
+        self._media_types_inv = {v: k for k, v in new_media_types.items()}
+
+    @property
+    def media_types_inv(self) -> dict:
+        if self._media_types_inv is None:
+            raise Exception("self._media_types_inv should not be None!")
+        return self._media_types_inv
+
+    def generate_dot_file(
+        self,
+        filename: str = "pipeline",
+        details: Gst.DebugGraphDetails = Gst.DebugGraphDetails.ALL,
+        with_timestamp: bool = True,
+        bin_: Gst.Bin|None = None,
+    ) -> None:
+        """Generate Dot File for debugging
+
+        ``GST_DEBUG_DUMP_DOT_DIR`` environment variable must be set to destination dir.
+        ``dot -Tpng -o <filename>.png <filename>.dot`` can be use to convert to a PNG file.
+        See
+        https://gstreamer.freedesktop.org/documentation/gstreamer/debugutils.html?gi-language=python#GstDebugGraphDetails
+        for details.
+
+        @param filename: name of the generated file
+        @param details: which details to print
+        @param with_timestamp: if True, add a timestamp to filename
+        @param bin_: which bin to output. By default, the whole pipeline
+            (``self.pipeline``) will be used.
+        """
+        if bin_ is None:
+            bin_ = self.pipeline
+        if with_timestamp:
+            timestamp = datetime.now().isoformat(timespec='milliseconds')
+            filename = f"{timestamp}_filename"
+
+        Gst.debug_bin_to_dot_file(bin_, details, filename)
+
+    def get_sdp_mline_index(self, media_type: str) -> int:
+        """Gets the sdpMLineIndex for a given media type.
+
+        @param media_type: The type of the media.
+        """
+        for index, m_type in self.media_types.items():
+            if m_type == media_type:
+                return index
+        raise ValueError(f"Media type '{media_type}' not found")
+
+    def _set_media_types(self, offer_sdp: str) -> None:
+        """Sets media types from offer SDP
+
+        @param offer: RTC session description containing the offer
+        """
+        sdp_lines = offer_sdp.splitlines()
+        media_types = {}
+        mline_index = 0
+
+        for line in sdp_lines:
+            if line.startswith("m="):
+                media_types[mline_index] = line[2 : line.find(" ")]
+                mline_index += 1
+
+        self.media_types = media_types
+
+    def _a_call(self, method, *args, **kwargs):
+        """Call an async method in main thread"""
+        aio.run_from_thread(method, *args, **kwargs, loop=self.main_loop)
+
+    def get_payload_types(
+        self, sdpmsg, video_encoding: str, audio_encoding: str
+    ) -> dict[str, int | None]:
+        """Find the payload types for the specified video and audio encoding.
+
+        Very simplistically finds the first payload type matching the encoding
+        name. More complex applications will want to match caps on
+        profile-level-id, packetization-mode, etc.
+        """
+        # method coming from gstreamer example (Matthew Waters, Nirbheek Chauhan) at
+        # subprojects/gst-examples/webrtc/sendrecv/gst/webrtc_sendrecv.py
+        video_pt = None
+        audio_pt = None
+        for i in range(0, sdpmsg.medias_len()):
+            media = sdpmsg.get_media(i)
+            for j in range(0, media.formats_len()):
+                fmt = media.get_format(j)
+                if fmt == "webrtc-datachannel":
+                    continue
+                pt = int(fmt)
+                caps = media.get_caps_from_media(pt)
+                s = caps.get_structure(0)
+                encoding_name = s["encoding-name"]
+                if video_pt is None and encoding_name == video_encoding:
+                    video_pt = pt
+                elif audio_pt is None and encoding_name == audio_encoding:
+                    audio_pt = pt
+        return {video_encoding: video_pt, audio_encoding: audio_pt}
+
+    def parse_ice_candidate(self, candidate_string):
+        """Parses the ice candidate string.
+
+        @param candidate_string: The ice candidate string to be parsed.
+        """
+        pattern = re.compile(
+            r"candidate:(?P<foundation>\S+) (?P<component_id>\d+) (?P<transport>\S+) "
+            r"(?P<priority>\d+) (?P<address>\S+) (?P<port>\d+) typ "
+            r"(?P<type>\S+)(?: raddr (?P<rel_addr>\S+) rport "
+            r"(?P<rel_port>\d+))?(?: generation (?P<generation>\d+))?"
+        )
+        match = pattern.match(candidate_string)
+        if match:
+            candidate_dict = match.groupdict()
+
+            # Apply the correct types to the dictionary values
+            candidate_dict["component_id"] = int(candidate_dict["component_id"])
+            candidate_dict["priority"] = int(candidate_dict["priority"])
+            candidate_dict["port"] = int(candidate_dict["port"])
+
+            if candidate_dict["rel_port"]:
+                candidate_dict["rel_port"] = int(candidate_dict["rel_port"])
+
+            if candidate_dict["generation"]:
+                candidate_dict["generation"] = candidate_dict["generation"]
+
+            # Remove None values
+            return {k: v for k, v in candidate_dict.items() if v is not None}
+        else:
+            log.warning(f"can't parse candidate: {candidate_string!r}")
+            return None
+
+    def build_ice_candidate(self, parsed_candidate):
+        """Builds ICE candidate
+
+        @param parsed_candidate: Dictionary containing parsed ICE candidate
+        """
+        base_format = (
+            "candidate:{foundation} {component_id} {transport} {priority} "
+            "{address} {port} typ {type}"
+        )
+
+        if parsed_candidate.get("rel_addr") and parsed_candidate.get("rel_port"):
+            base_format += " raddr {rel_addr} rport {rel_port}"
+
+        if parsed_candidate.get("generation"):
+            base_format += " generation {generation}"
+
+        return base_format.format(**parsed_candidate)
+
+    def extract_ufrag_pwd(self, sdp: str) -> tuple[str, str]:
+        """Retrieves ICE password and user fragment for SDP offer.
+
+        @param sdp: The Session Description Protocol offer string.
+        @return: ufrag and pwd
+        @raise ValueError: Can't extract ufrag and password
+        """
+        ufrag_line = re.search(r"ice-ufrag:(\S+)", sdp)
+        pwd_line = re.search(r"ice-pwd:(\S+)", sdp)
+
+        if ufrag_line and pwd_line:
+            ufrag = self.ufrag = ufrag_line.group(1)
+            pwd = self.pwd = pwd_line.group(1)
+            return ufrag, pwd
+        else:
+            log.error(f"SDP with missing ice-ufrag or ice-pwd:\n{sdp}")
+            raise ValueError("Can't extract ice-ufrag and ice-pwd from SDP")
+
+    def reset_instance(self):
+        """Inits or resets the instance variables to their default state."""
+        self.role: str | None = None
+        if self.pipeline is not None:
+            self.pipeline.set_state(Gst.State.NULL)
+        self.pipeline = None
+        self._remote_video_pad = None
+        self.sid: str | None = None
+        self.offer: str | None = None
+        self.local_candidates_buffer = {}
+        self.ufrag: str | None = None
+        self.pwd: str | None = None
+        self.callee: str | None = None
+        self._media_types = None
+        self._media_types_inv = None
+        self._sdp_set: bool = False
+        self.remote_candidates_buffer: dict[str, dict[str, list]] = {
+            "audio": {"candidates": []},
+            "video": {"candidates": []},
+        }
+        self._media_types = None
+        self._media_types_inv = None
+        self.audio_valve = None
+        self.video_valve = None
+        if self.reset_cb is not None:
+            self.reset_cb()
+
+
+    async def setup_call(
+        self,
+        role: str,
+        audio_pt: int | None = 96,
+        video_pt: int | None = 97,
+    ) -> None:
+        """Sets up the call.
+
+        This method establishes the Gstreamer pipeline for audio and video communication.
+        The method also manages STUN and TURN server configurations, signal watchers, and
+        various connection handlers for the webrtcbin.
+
+        @param role: The role for the call, either 'initiator' or 'responder'.
+        @param audio_pt: The payload type for the audio stream.
+        @param video_pt: The payload type for the video stream
+
+        @raises NotImplementedError: If audio_pt or video_pt is set to None.
+        @raises AssertionError: If the role is not 'initiator' or 'responder'.
+        """
+        assert role in ("initiator", "responder")
+        self.role = role
+        if audio_pt is None or video_pt is None:
+            raise NotImplementedError("None value is not handled yet")
+
+        if self.sources == SOURCES_AUTO:
+            video_source_elt = "v4l2src"
+            audio_source_elt = "pulsesrc"
+        elif self.sources == SOURCES_TEST:
+            video_source_elt = "videotestsrc is-live=true pattern=ball"
+            audio_source_elt = "audiotestsrc"
+        else:
+            raise exceptions.InternalError(f'Unknown "sources" value: {self.sources!r}')
+
+        extra_elt = ""
+
+        if self.sinks == SINKS_APP:
+            local_video_sink_elt = (
+                "appsink name=local_video_sink emit-signals=true drop=true max-buffers=1 "
+                "sync=True"
+            )
+        elif self.sinks == SINKS_AUTO:
+            extra_elt = "compositor name=compositor ! autovideosink"
+            local_video_sink_elt = """compositor.sink_1"""
+        else:
+            raise exceptions.InternalError(f"Unknown sinks value {self.sinks!r}")
+
+        self.gst_pipe_desc = f"""
+        webrtcbin latency=100 name=sendrecv bundle-policy=max-bundle
+
+        input-selector name=video_selector
+        ! videorate
+        ! video/x-raw,framerate=30/1
+        ! tee name=t
+
+        {extra_elt}
+
+        {video_source_elt} name=video_src ! queue leaky=downstream ! video_selector.
+        videotestsrc is-live=true pattern=black ! queue leaky=downstream ! video_selector.
+
+        t.
+        ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream
+        ! videoconvert
+        ! vp8enc deadline=1 keyframe-max-dist=60
+        ! rtpvp8pay picture-id-mode=15-bit
+        ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt}
+        ! sendrecv.
+
+        t.
+        ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream
+        ! videoconvert
+        ! {local_video_sink_elt}
+
+        {audio_source_elt} name=audio_src
+        ! valve
+        ! queue max-size-buffers=10 max-size-time=0 max-size-bytes=0 leaky=downstream
+        ! audioconvert
+        ! audioresample
+        ! opusenc audio-type=voice
+        ! rtpopuspay
+        ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt}
+        ! sendrecv.
+        """
+
+        log.debug(f"Gstreamer pipeline: {self.gst_pipe_desc}")
+
+        # Create the pipeline
+        self.pipeline = Gst.parse_launch(self.gst_pipe_desc)
+        if not self.pipeline:
+            raise exceptions.InternalError("Failed to create Gstreamer pipeline.")
+
+        self.webrtcbin = self.pipeline.get_by_name("sendrecv")
+        self.video_src = self.pipeline.get_by_name("video_src")
+        self.video_selector = self.pipeline.get_by_name("video_selector")
+        self.audio_valve = self.pipeline.get_by_name("audio_valve")
+
+        if self.video_muted:
+            self.on_video_mute(True)
+        if self.audio_muted:
+            self.on_audio_mute(True)
+
+        # set STUN and TURN servers
+        external_disco = data_format.deserialise(
+            await self.bridge.external_disco_get("", self.profile), type_check=list
+        )
+
+        for server in external_disco:
+            if server["type"] == "stun":
+                if server["transport"] == "tcp":
+                    log.info(
+                        "ignoring TCP STUN server, GStreamer only support one STUN server"
+                    )
+                url = f"stun://{server['host']}:{server['port']}"
+                log.debug(f"adding stun server: {url}")
+                self.webrtcbin.set_property("stun-server", url)
+            elif server["type"] == "turn":
+                url = "{scheme}://{username}:{password}@{host}:{port}".format(
+                    scheme="turns" if server["transport"] == "tcp" else "turn",
+                    username=quote_plus(server["username"]),
+                    password=quote_plus(server["password"]),
+                    host=server["host"],
+                    port=server["port"],
+                )
+                log.debug(f"adding turn server: {url}")
+
+                if not self.webrtcbin.emit("add-turn-server", url):
+                    log.warning(f"Erreur while adding TURN server {url}")
+
+        # local video feedback
+        if self.sinks == SINKS_APP:
+            assert self.appsink_data is not None
+            local_video_sink = self.pipeline.get_by_name("local_video_sink")
+            local_video_sink.set_property("emit-signals", True)
+            local_video_sink.connect("new-sample", self.appsink_data.local_video_cb)
+            local_video_sink_caps = Gst.Caps.from_string(f"video/x-raw,format=RGB")
+            local_video_sink.set_property("caps", local_video_sink_caps)
+
+        # Create bus and associate signal watchers
+        self.bus = self.pipeline.get_bus()
+        if not self.bus:
+            log.error("Failed to get bus from pipeline.")
+            return
+
+        self.bus.add_signal_watch()
+        self.webrtcbin.connect("pad-added", self.on_pad_added)
+        self.bus.connect("message::error", self.on_bus_error)
+        self.bus.connect("message::eos", self.on_bus_eos)
+        self.webrtcbin.connect("on-negotiation-needed", self.on_negotiation_needed)
+        self.webrtcbin.connect("on-ice-candidate", self.on_ice_candidate)
+        self.webrtcbin.connect(
+            "notify::ice-gathering-state", self.on_ice_gathering_state_change
+        )
+        self.webrtcbin.connect(
+            "notify::ice-connection-state", self.on_ice_connection_state
+        )
+
+    def start_pipeline(self) -> None:
+        """Starts the GStreamer pipeline."""
+        log.debug("starting the pipeline")
+        self.pipeline.set_state(Gst.State.PLAYING)
+
+    def on_negotiation_needed(self, webrtc):
+        """Initiate SDP offer when negotiation is needed."""
+        log.debug("Negotiation needed.")
+        if self.role == "initiator":
+            log.debug("Creating offer…")
+            promise = Gst.Promise.new_with_change_func(self.on_offer_created)
+            self.webrtcbin.emit("create-offer", None, promise)
+
+    def on_offer_created(self, promise):
+        """Callback for when SDP offer is created."""
+        log.info("on_offer_created called")
+        assert promise.wait() == Gst.PromiseResult.REPLIED
+        reply = promise.get_reply()
+        if reply is None:
+            log.error("Promise reply is None. Offer creation might have failed.")
+            return
+        offer = reply["offer"]
+        self.offer = offer.sdp.as_text()
+        log.info(f"SDP offer created: \n{self.offer}")
+        self._set_media_types(self.offer)
+        promise = Gst.Promise.new()
+        self.webrtcbin.emit("set-local-description", offer, promise)
+        promise.interrupt()
+        self._a_call(self._start_call)
+
+    def on_answer_set(self, promise):
+        assert promise.wait() == Gst.PromiseResult.REPLIED
+
+    def on_answer_created(self, promise, _, __):
+        """Callback for when SDP answer is created."""
+        assert promise.wait() == Gst.PromiseResult.REPLIED
+        reply = promise.get_reply()
+        answer = reply["answer"]
+        promise = Gst.Promise.new()
+        self.webrtcbin.emit("set-local-description", answer, promise)
+        promise.interrupt()
+        answer_sdp = answer.sdp.as_text()
+        log.info(f"SDP answer set: \n{answer_sdp}")
+        self.sdp_set = True
+        self._a_call(self.bridge.call_answer_sdp, self.sid, answer_sdp, self.profile)
+
+    def on_offer_set(self, promise):
+        assert promise.wait() == Gst.PromiseResult.REPLIED
+        promise = Gst.Promise.new_with_change_func(self.on_answer_created, None, None)
+        self.webrtcbin.emit("create-answer", None, promise)
+
+    def link_element_or_pad(
+        self, source: Gst.Element, dest: Gst.Element | Gst.Pad
+    ) -> bool:
+        """Check if dest is a pad or an element, and link appropriately"""
+        src_pad = source.get_static_pad("src")
+
+        if isinstance(dest, Gst.Pad):
+            # If the dest is a pad, link directly
+            if not src_pad.link(dest) == Gst.PadLinkReturn.OK:
+                log.error(
+                    "Failed to link 'conv' to the compositor's newly requested pad!"
+                )
+                return False
+        elif isinstance(dest, Gst.Element):
+            return source.link(dest)
+        else:
+            log.error(f"Unexpected type for dest: {type(sink)}")
+            return False
+
+        return True
+
+    def scaled_dimensions(
+        self, original_width: int, original_height: int, max_width: int, max_height: int
+    ) -> tuple[int, int]:
+        """Calculates the scaled dimensions preserving aspect ratio.
+
+        @param original_width: Original width of the video stream.
+        @param original_height: Original height of the video stream.
+        @param max_width: Maximum desired width for the scaled video.
+        @param max_height: Maximum desired height for the scaled video.
+        @return: The width and height of the scaled video.
+        """
+        aspect_ratio = original_width / original_height
+        new_width = int(max_height * aspect_ratio)
+
+        if new_width <= max_width:
+            return new_width, max_height
+
+        new_height = int(max_width / aspect_ratio)
+        return max_width, new_height
+
+    def on_remote_decodebin_stream(self, _, pad: Gst.Pad) -> None:
+        """Handle the stream from the remote decodebin.
+
+        This method processes the incoming stream from the remote decodebin, determining
+        whether it's video or audio. It then sets up the appropriate GStreamer elements
+        for video/audio processing and adds them to the pipeline.
+
+        @param pad: The Gst.Pad from the remote decodebin producing the stream.
+        """
+        assert self.pipeline is not None
+        if not pad.has_current_caps():
+            log.error(f"{pad} has no caps, ignoring")
+            return
+
+        caps = pad.get_current_caps()
+        assert len(caps)
+        s = caps[0]
+        name = s.get_name()
+        log.debug(f"====> NAME START: {name}")
+
+        q = Gst.ElementFactory.make("queue")
+
+        if name.startswith("video"):
+            log.debug("===> VIDEO OK")
+
+            self._remote_video_pad = pad
+
+            # Check and log the original size of the video
+            width = s.get_int("width").value
+            height = s.get_int("height").value
+            log.info(f"Original video size: {width}x{height}")
+
+            # This is a fix for an issue found with Movim on desktop: a non standard
+            # resolution is used (990x557) resulting in bad alignement and no color in
+            # rendered image
+            adjust_resolution = width % 4 != 0 or height % 4 != 0
+            if adjust_resolution:
+                log.info("non standard resolution, we need to adjust size")
+                width = (width + 3) // 4 * 4
+                height = (height + 3) // 4 * 4
+                log.info(f"Adjusted video size: {width}x{height}")
+
+            conv = Gst.ElementFactory.make("videoconvert")
+            if self.sinks == SINKS_APP:
+                assert self.appsink_data is not None
+                remote_video_sink = Gst.ElementFactory.make("appsink")
+
+                appsink_caps = Gst.Caps.from_string("video/x-raw,format=RGB")
+                remote_video_sink.set_property("caps", appsink_caps)
+
+                remote_video_sink.set_property("emit-signals", True)
+                remote_video_sink.set_property("drop", True)
+                remote_video_sink.set_property("max-buffers", 1)
+                remote_video_sink.set_property("sync", True)
+                remote_video_sink.connect("new-sample", self.appsink_data.remote_video_cb)
+                self.pipeline.add(remote_video_sink)
+            if self.sinks == SINKS_AUTO:
+                compositor = self.pipeline.get_by_name("compositor")
+
+                sink1_pad = compositor.get_static_pad("sink_1")
+
+                local_width, local_height = self.scaled_dimensions(
+                    sink1_pad.get_property("width"),
+                    sink1_pad.get_property("height"),
+                    width // 3,
+                    height // 3,
+                )
+
+                sink1_pad.set_property("xpos", width - local_width)
+                sink1_pad.set_property("ypos", height - local_height)
+                sink1_pad.set_property("width", local_width)
+                sink1_pad.set_property("height", local_height)
+                sink1_pad.set_property("zorder", 1)
+
+                # Request a new pad for the remote stream
+                sink_pad_template = compositor.get_pad_template("sink_%u")
+                remote_video_sink = compositor.request_pad(sink_pad_template, None, None)
+                remote_video_sink.set_property("zorder", 0)
+
+            else:
+                raise exceptions.InternalError(f'Unhandled "sinks" value: {self.sinks!r}')
+
+            if adjust_resolution:
+                videoscale = Gst.ElementFactory.make("videoscale")
+                adjusted_caps = Gst.Caps.from_string(
+                    f"video/x-raw,width={width},height={height}"
+                )
+                capsfilter = Gst.ElementFactory.make("capsfilter")
+                capsfilter.set_property("caps", adjusted_caps)
+
+                self.pipeline.add(q, conv, videoscale, capsfilter)
+
+
+                self.pipeline.sync_children_states()
+                ret = pad.link(q.get_static_pad("sink"))
+                if ret != Gst.PadLinkReturn.OK:
+                    log.error(f"Error linking pad: {ret}")
+                q.link(conv)
+                conv.link(videoscale)
+                videoscale.link(capsfilter)
+                self.link_element_or_pad(capsfilter.link, remote_video_sink)
+
+            else:
+                self.pipeline.add(q, conv)
+
+                self.pipeline.sync_children_states()
+                ret = pad.link(q.get_static_pad("sink"))
+                if ret != Gst.PadLinkReturn.OK:
+                    log.error(f"Error linking pad: {ret}")
+                q.link(conv)
+                self.link_element_or_pad(conv, remote_video_sink)
+
+        elif name.startswith("audio"):
+            log.debug("===> Audio OK")
+            conv = Gst.ElementFactory.make("audioconvert")
+            resample = Gst.ElementFactory.make("audioresample")
+            remote_audio_sink = Gst.ElementFactory.make("autoaudiosink")
+            self.pipeline.add(q, conv, resample, remote_audio_sink)
+            self.pipeline.sync_children_states()
+            ret = pad.link(q.get_static_pad("sink"))
+            if ret != Gst.PadLinkReturn.OK:
+                log.error(f"Error linking pad: {ret}")
+            q.link(conv)
+            conv.link(resample)
+            resample.link(remote_audio_sink)
+
+        else:
+            log.warning(f"unmanaged name: {name!r}")
+
+    def on_pad_added(self, __, pad: Gst.Pad) -> None:
+        """Handle the addition of a new pad to the element.
+
+        When a new source pad is added to the element, this method creates a decodebin,
+        connects it to handle the stream, and links the pad to the decodebin.
+
+        @param __: Placeholder for the signal source. Not used in this method.
+        @param pad: The newly added pad.
+        """
+        log.debug("on_pad_added")
+        if pad.direction != Gst.PadDirection.SRC:
+            return
+
+        decodebin = Gst.ElementFactory.make("decodebin")
+        decodebin.connect("pad-added", self.on_remote_decodebin_stream)
+        self.pipeline.add(decodebin)
+        decodebin.sync_state_with_parent()
+        pad.link(decodebin.get_static_pad("sink"))
+
+    async def _start_call(self) -> None:
+        """Initiate the call.
+
+        Initiates a call with the callee using the stored offer. If there are any buffered
+        local ICE candidates, they are sent as part of the initiation.
+        """
+        assert self.callee
+        self.sid = await self.bridge.call_start(
+            str(self.callee), data_format.serialise({"sdp": self.offer}), self.profile
+        )
+        if self.local_candidates_buffer:
+            log.debug(
+                f"sending buffered local ICE candidates: {self.local_candidates_buffer}"
+            )
+            if self.pwd is None:
+                sdp = self.webrtcbin.props.local_description.sdp.as_text()
+                self.extract_ufrag_pwd(sdp)
+            ice_data = {}
+            for media_type, candidates in self.local_candidates_buffer.items():
+                ice_data[media_type] = {
+                    "ufrag": self.ufrag,
+                    "pwd": self.pwd,
+                    "candidates": candidates,
+                }
+            await self.bridge.ice_candidates_add(
+                self.sid, data_format.serialise(ice_data), self.profile
+            )
+            self.local_candidates_buffer.clear()
+
+    def _remote_sdp_set(self, promise) -> None:
+        assert promise.wait() == Gst.PromiseResult.REPLIED
+        self.sdp_set = True
+
+    def on_accepted_call(self, sdp: str, profile: str) -> None:
+        """Outgoing call has been accepted.
+
+        @param sdp: The SDP answer string received from the other party.
+        @param profile: Profile used for the call.
+        """
+        log.debug(f"SDP answer received: \n{sdp}")
+
+        __, sdpmsg = GstSdp.SDPMessage.new_from_text(sdp)
+        answer = GstWebRTC.WebRTCSessionDescription.new(
+            GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg
+        )
+        promise = Gst.Promise.new_with_change_func(self._remote_sdp_set)
+        self.webrtcbin.emit("set-remote-description", answer, promise)
+
+    async def answer_call(self, sdp: str, profile: str) -> None:
+        """Answer an incoming call
+
+        @param sdp: The SDP offer string received from the initiator.
+        @param profile: Profile used for the call.
+
+        @raise AssertionError: Raised when either "VP8" or "OPUS" is not present in
+            payload types.
+        """
+        log.debug(f"SDP offer received: \n{sdp}")
+        self._set_media_types(sdp)
+        __, offer_sdp_msg = GstSdp.SDPMessage.new_from_text(sdp)
+        payload_types = self.get_payload_types(
+            offer_sdp_msg, video_encoding="VP8", audio_encoding="OPUS"
+        )
+        assert "VP8" in payload_types
+        assert "OPUS" in payload_types
+        await self.setup_call(
+            "responder", audio_pt=payload_types["OPUS"], video_pt=payload_types["VP8"]
+        )
+        self.start_pipeline()
+        offer = GstWebRTC.WebRTCSessionDescription.new(
+            GstWebRTC.WebRTCSDPType.OFFER, offer_sdp_msg
+        )
+        promise = Gst.Promise.new_with_change_func(self.on_offer_set)
+        self.webrtcbin.emit("set-remote-description", offer, promise)
+
+    def on_ice_candidate(self, webrtc, mline_index, candidate_sdp):
+        """Handles the on-ice-candidate signal of webrtcbin.
+
+        @param webrtc: The webrtcbin element.
+        @param mlineindex: The mline index.
+        @param candidate: The ICE candidate.
+        """
+        log.debug(
+            f"Local ICE candidate. MLine Index: {mline_index}, Candidate: {candidate_sdp}"
+        )
+        parsed_candidate = self.parse_ice_candidate(candidate_sdp)
+        try:
+            media_type = self.media_types[mline_index]
+        except KeyError:
+            raise exceptions.InternalError("can't find media type")
+
+        if self.sid is None:
+            log.debug("buffering local ICE candidate")
+            self.local_candidates_buffer.setdefault(media_type, []).append(
+                parsed_candidate
+            )
+        else:
+            sdp = self.webrtcbin.props.local_description.sdp.as_text()
+            assert sdp is not None
+            ufrag, pwd = self.extract_ufrag_pwd(sdp)
+            ice_data = {"ufrag": ufrag, "pwd": pwd, "candidates": [parsed_candidate]}
+            self._a_call(
+                self.bridge.ice_candidates_add,
+                self.sid,
+                data_format.serialise({media_type: ice_data}),
+                self.profile,
+            )
+
+    def on_ice_candidates_new(self, candidates: dict) -> None:
+        """Handle new ICE candidates.
+
+        @param candidates: A dictionary containing media types ("audio" or "video") as
+            keys and corresponding ICE data as values.
+
+        @raise exceptions.InternalError: Raised when sdp mline index is not found.
+        """
+        if not self.sdp_set:
+            log.debug("buffering remote ICE candidate")
+            for media_type in ("audio", "video"):
+                media_candidates = candidates.get(media_type)
+                if media_candidates:
+                    buffer = self.remote_candidates_buffer[media_type]
+                    buffer["candidates"].extend(media_candidates["candidates"])
+            return
+        for media_type, ice_data in candidates.items():
+            for candidate in ice_data["candidates"]:
+                candidate_sdp = self.build_ice_candidate(candidate)
+                try:
+                    mline_index = self.get_sdp_mline_index(media_type)
+                except Exception as e:
+                    raise exceptions.InternalError(f"Can't find sdp mline index: {e}")
+                self.webrtcbin.emit("add-ice-candidate", mline_index, candidate_sdp)
+                log.debug(
+                    f"Remote ICE candidate added. MLine Index: {mline_index}, "
+                    f"{candidate_sdp}"
+                )
+
+    def on_ice_gathering_state_change(self, pspec, __):
+        state = self.webrtcbin.get_property("ice-gathering-state")
+        log.debug(f"ICE gathering state changed to {state}")
+
+    def on_ice_connection_state(self, pspec, __):
+        state = self.webrtcbin.props.ice_connection_state
+        if state == GstWebRTC.WebRTCICEConnectionState.FAILED:
+            log.error("ICE connection failed")
+        log.info(f"ICE connection state changed to {state}")
+
+    def on_bus_error(self, bus: Gst.Bus, message: Gst.Message) -> None:
+        """Handles the GStreamer bus error messages.
+
+        @param bus: The GStreamer bus.
+        @param message: The error message.
+        """
+        err, debug = message.parse_error()
+        log.error(f"Error from {message.src.get_name()}: {err.message}")
+        log.error(f"Debugging info: {debug}")
+
+    def on_bus_eos(self, bus: Gst.Bus, message: Gst.Message) -> None:
+        """Handles the GStreamer bus eos messages.
+
+        @param bus: The GStreamer bus.
+        @param message: The eos message.
+        """
+        log.info("End of stream")
+
+    def on_audio_mute(self, muted: bool) -> None:
+        if self.audio_valve is not None:
+            self.audio_valve.set_property("drop", muted)
+            state = "muted" if muted else "unmuted"
+            log.info(f"audio is now {state}")
+
+    def on_video_mute(self, muted: bool) -> None:
+        if self.video_selector is not None:
+            # when muted, we switch to a black image and deactivate the camera
+            if not muted:
+                self.video_src.set_state(Gst.State.PLAYING)
+            pad = self.video_selector.get_static_pad("sink_1" if muted else "sink_0")
+            self.video_selector.props.active_pad = pad
+            if muted:
+                self.video_src.set_state(Gst.State.NULL)
+            state = "muted" if muted else "unmuted"
+            log.info(f"video is now {state}")
+
+    async def end_call(self) -> None:
+        """Stop streaming and clean instance"""
+        self.reset_instance()