view libervia/frontends/tools/webrtc.py @ 4231:e11b13418ba6

plugin XEP-0353, XEP-0234, jingle: WebRTC data channel signaling implementation: Implement XEP-0343: Signaling WebRTC Data Channels in Jingle. The current version of the XEP (0.3.1) has no implementation and contains some flaws. After discussing this on xsf@, Daniel (from Conversations) mentioned that they had a sprint with Larma (from Dino) to work on another version and provided me with this link: https://gist.github.com/iNPUTmice/6c56f3e948cca517c5fb129016d99e74 . I have used it for my implementation. This implementation reuses work done on Jingle A/V call (notably XEP-0176 and XEP-0167 plugins), with adaptations. When used, XEP-0234 will not handle the file itself as it normally does. This is because WebRTC has several implementations (browser for web interface, GStreamer for others), and file/data must be handled directly by the frontend. This is particularly important for web frontends, as the file is not sent from the backend but from the end-user's browser device. Among the changes, there are: - XEP-0343 implementation. - `file_send` bridge method now use serialised dict as output. - New `BaseTransportHandler.is_usable` method which get content data and returns a boolean (default to `True`) to tell if this transport can actually be used in this context (when we are initiator). Used in webRTC case to see if call data are available. - Support of `application` media type, and everything necessary to handle data channels. - Better confirmation message, with file name, size and description when available. - When file is accepted in preflight, it is specified in following `action_new` signal for actual file transfer. This way, frontend can avoid the display or 2 confirmation messages. - XEP-0166: when not specified, default `content` name is now its index number instead of a UUID. This follows the behaviour of browsers. - XEP-0353: better handling of events such as call taken by another device. - various other updates. rel 441
author Goffi <goffi@goffi.org>
date Sat, 06 Apr 2024 12:57:23 +0200
parents fe29fbdabce6
children d01b8d002619
line wrap: on
line source

#!/usr/bin/env python3

# Libervia WebRTC implementation
# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import gi
gi.require_versions({
    "Gst": "1.0",
    "GstWebRTC": "1.0"
})
from gi.repository import Gst, GstWebRTC, GstSdp

from libervia.backend.core import exceptions

try:
    from gi.overrides import Gst as _
except ImportError:
    raise exceptions.MissingModule(
        "No GStreamer Python overrides available. Please install relevant packages on "
        "your system (e.g., `python3-gst-1.0` on Debian and derivatives)."
    )
import asyncio
from dataclasses import dataclass
from datetime import datetime
import logging
from random import randint
import re
from typing import Callable
from urllib.parse import quote_plus

from libervia.backend.tools.common import data_format
from libervia.frontends.tools import aio, display_servers

current_server = display_servers.detect()
if current_server == display_servers.X11:
    # GSTreamer's ximagesrc documentation asks to run this function
    import ctypes
    ctypes.CDLL('libX11.so.6').XInitThreads()


log = logging.getLogger(__name__)

Gst.init(None)

SOURCES_AUTO = "auto"
SOURCES_TEST = "test"
SINKS_APP = "app"
SINKS_AUTO = "auto"
SINKS_TEST = "test"


class ScreenshareError(Exception):
    pass


@dataclass
class AppSinkData:
    local_video_cb: Callable
    remote_video_cb: Callable|None


class DesktopPortal:

    def __init__(self, webrtc: "WebRTC"):
        import dbus
        from dbus.mainloop.glib import DBusGMainLoop
        # we want monitors + windows, see https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.ScreenCast.html#org-freedesktop-portal-screencast-availablesourcetypes
        self.dbus = dbus
        self.webrtc = webrtc
        self.sources_type = dbus.UInt32(7)
        DBusGMainLoop(set_as_default=True)
        self.session_bus = dbus.SessionBus()
        portal_object = self.session_bus.get_object(
            'org.freedesktop.portal.Desktop',
            '/org/freedesktop/portal/desktop'
        )
        self.screencast_interface = dbus.Interface(
            portal_object,
            'org.freedesktop.portal.ScreenCast'
        )
        self.session_interface = None
        self.session_signal = None
        self.handle_counter = 0
        self.session_handle = None
        self.stream_data: dict|None = None

    @property
    def handle_token(self):
        self.handle_counter += 1
        return f"libervia{self.handle_counter}"

    def on_session_closed(self, details: dict) -> None:
        if self.session_interface is not None:
            self.session_interface = None
            self.webrtc.desktop_sharing = False
            if self.session_signal is not None:
                self.session_signal.remove()
                self.session_signal = None


    async def dbus_call(self, method_name: str, *args) -> dict:
        """Call a screenshare portal method

        This method handle the signal response.
        @param method_name: method to call
        @param args: extra args
            `handle_token` will be automatically added to the last arg (option dict)
        @return: method result
        """
        if self.session_handle is not None:
            self.end_screenshare()
        method = getattr(self.screencast_interface, method_name)
        options = args[-1]
        reply_fut = asyncio.Future()
        signal_fut = asyncio.Future()
        # cf. https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.Request.html
        handle_token = self.handle_token
        sender = self.session_bus.get_unique_name().replace(".", "_")[1:]
        path = f"/org/freedesktop/portal/desktop/request/{sender}/{handle_token}"
        signal_match = None

        def on_signal(response, results):
            assert signal_match is not None
            signal_match.remove()
            if response == 0:
                signal_fut.set_result(results)
            elif response == 1:
                signal_fut.set_exception(
                    exceptions.CancelError("Cancelled by user.")
                )
            else:
                signal_fut.set_exception(ScreenshareError(
                    f"Can't get signal result"
                ))

        signal_match = self.session_bus.add_signal_receiver(
            on_signal,
            signal_name="Response",
            dbus_interface="org.freedesktop.portal.Request",
            path=path
        )

        options["handle_token"] = handle_token

        method(
            *args,
            reply_handler=reply_fut.set_result,
            error_handler=reply_fut.set_exception
        )
        try:
            await reply_fut
        except Exception as e:
            raise ScreenshareError(f"Can't ask screenshare permission: {e}")
        return await signal_fut

    async def request_screenshare(self) -> dict:
        session_data = await self.dbus_call(
            "CreateSession",
            {
                "session_handle_token": str(randint(1, 2**32)),
            }
        )
        try:
            session_handle = session_data["session_handle"]
        except KeyError:
            raise ScreenshareError("Can't get session handle")
        self.session_handle = session_handle


        await self.dbus_call(
            "SelectSources",
            session_handle,
            {
                "multiple": True,
                "types": self.sources_type,
                "modal": True
            }
        )
        screenshare_data = await self.dbus_call(
            "Start",
            session_handle,
            "",
            {}
        )

        session_object = self.session_bus.get_object(
            'org.freedesktop.portal.Desktop',
            session_handle
        )
        self.session_interface = self.dbus.Interface(
            session_object,
            'org.freedesktop.portal.Session'
        )

        self.session_signal = self.session_bus.add_signal_receiver(
            self.on_session_closed,
            signal_name="Closed",
            dbus_interface="org.freedesktop.portal.Session",
            path=session_handle
        )

        try:
            node_id, stream_data = screenshare_data["streams"][0]
            source_type = int(stream_data["source_type"])
        except (IndexError, KeyError):
            raise ScreenshareError("Can't parse stream data")
        self.stream_data = stream_data = {
            "session_handle": session_handle,
            "node_id": node_id,
            "source_type": source_type
        }
        try:
            height = int(stream_data["size"][0])
            weight = int(stream_data["size"][1])
        except (IndexError, KeyError):
            pass
        else:
            stream_data["size"] = (height, weight)

        return self.stream_data

    def end_screenshare(self) -> None:
        """Close a running screenshare session, if any."""
        if self.session_interface is None:
            return
        self.session_interface.Close()
        self.on_session_closed({})


class WebRTC:
    """GSTreamer based WebRTC implementation for audio and video communication.

    This class encapsulates the WebRTC functionalities required for initiating and
    handling audio and video calls.
    """

    def __init__(
        self,
        bridge,
        profile: str,
        sources: str = SOURCES_AUTO,
        sinks: str = SINKS_AUTO,
        appsink_data: AppSinkData | None = None,
        reset_cb: Callable | None = None,
        merge_pip: bool|None = None,
        target_size: tuple[int, int]|None = None,
    ) -> None:
        """Initializes a new WebRTC instance.

        @param bridge: An instance of backend bridge.
        @param profile: Libervia profile.
        @param sources: Which kind of source to use.
        @param sinks: Which kind of sinks to use.
        @param appsink_data: configuration data for appsink (when SINKS_APP is used). Must
            not be used for other sinks.
        @param reset_cb: An optional Callable that is triggered on reset events. Can be
            used to reset UI data on new calls.
        @param merge_pip: A boolean flag indicating whether Picture-in-Picture mode is
            enabled. When PiP is used, local feedback is merged to remote video stream.
            Only one video stream is then produced (the local one).
            If None, PiP mode is selected automatically according to selected sink (it's
            used for SINKS_AUTO only for now).
        @param target_size: Expected size of the final sink stream. Mainly use by composer
            when ``merge_pip`` is set.
            None to autodetect (not real autodetection implemeted yet, default to
            (1280,720)).
        """
        self.main_loop = asyncio.get_event_loop()
        self.bridge = bridge
        self.profile = profile
        self.pipeline = None
        self._audio_muted = False
        self._video_muted = False
        self._desktop_sharing = False
        self.desktop_sharing_data = None
        self.sources = sources
        self.sinks = sinks
        if target_size is None:
            target_size=(1280, 720)
        self.target_width, self.target_height = target_size
        if merge_pip is None:
            merge_pip = sinks == SINKS_AUTO
        self.merge_pip = merge_pip
        if sinks == SINKS_APP:
            if (
                merge_pip
                and appsink_data is not None
                and appsink_data.remote_video_cb is not None
            ):
                raise ValueError("Remote_video_cb can't be used when merge_pip is used!")
            self.appsink_data = appsink_data
        elif appsink_data is not None:
            raise exceptions.InternalError(
                "appsink_data can only be used for SINKS_APP sinks"
            )
        self.reset_cb = reset_cb
        if current_server == display_servers.WAYLAND:
            self.desktop_portal = DesktopPortal(self)
        else:
            self.desktop_portal = None
        self.reset_instance()

    @property
    def audio_muted(self):
        return self._audio_muted

    @audio_muted.setter
    def audio_muted(self, muted: bool) -> None:
        if muted != self._audio_muted:
            self._audio_muted = muted
            self.on_audio_mute(muted)

    @property
    def video_muted(self):
        return self._video_muted

    @video_muted.setter
    def video_muted(self, muted: bool) -> None:
        if muted != self._video_muted:
            self._video_muted = muted
            self.on_video_mute(muted)

    @property
    def desktop_sharing(self):
        return self._desktop_sharing

    @desktop_sharing.setter
    def desktop_sharing(self, active: bool) -> None:
        if active != self._desktop_sharing:
            self._desktop_sharing = active
            self.on_desktop_switch(active)
            try:
                cb = self.bindings["desktop_sharing"]
            except KeyError:
                pass
            else:
                cb(active)

    @property
    def sdp_set(self):
        return self._sdp_set

    @sdp_set.setter
    def sdp_set(self, is_set: bool):
        self._sdp_set = is_set
        if is_set:
            self.on_ice_candidates_new(self.remote_candidates_buffer)
            for data in self.remote_candidates_buffer.values():
                data["candidates"].clear()

    @property
    def media_types(self):
        if self._media_types is None:
            raise Exception("self._media_types should not be None!")
        return self._media_types

    @media_types.setter
    def media_types(self, new_media_types: dict) -> None:
        self._media_types = new_media_types
        self._media_types_inv = {v: k for k, v in new_media_types.items()}

    @property
    def media_types_inv(self) -> dict:
        if self._media_types_inv is None:
            raise Exception("self._media_types_inv should not be None!")
        return self._media_types_inv

    def bind(self, **kwargs: Callable) -> None:
        self.bindings.clear()
        for key, cb in kwargs.items():
            if key not in ("desktop_sharing",):
                raise ValueError(
                    'Only "desktop_sharing" is currently allowed for binding'
                )
            self.bindings[key] = cb


    def generate_dot_file(
        self,
        filename: str = "pipeline",
        details: Gst.DebugGraphDetails = Gst.DebugGraphDetails.ALL,
        with_timestamp: bool = True,
        bin_: Gst.Bin|None = None,
    ) -> None:
        """Generate Dot File for debugging

        ``GST_DEBUG_DUMP_DOT_DIR`` environment variable must be set to destination dir.
        ``dot -Tpng -o <filename>.png <filename>.dot`` can be use to convert to a PNG file.
        See
        https://gstreamer.freedesktop.org/documentation/gstreamer/debugutils.html?gi-language=python#GstDebugGraphDetails
        for details.

        @param filename: name of the generated file
        @param details: which details to print
        @param with_timestamp: if True, add a timestamp to filename
        @param bin_: which bin to output. By default, the whole pipeline
            (``self.pipeline``) will be used.
        """
        if bin_ is None:
            bin_ = self.pipeline
        if with_timestamp:
            timestamp = datetime.now().isoformat(timespec='milliseconds')
            filename = f"{timestamp}_filename"

        Gst.debug_bin_to_dot_file(bin_, details, filename)

    def get_sdp_mline_index(self, media_type: str) -> int:
        """Gets the sdpMLineIndex for a given media type.

        @param media_type: The type of the media.
        """
        for index, m_type in self.media_types.items():
            if m_type == media_type:
                return index
        raise ValueError(f"Media type '{media_type}' not found")

    def _set_media_types(self, offer_sdp: str) -> None:
        """Sets media types from offer SDP

        @param offer: RTC session description containing the offer
        """
        sdp_lines = offer_sdp.splitlines()
        media_types = {}
        mline_index = 0

        for line in sdp_lines:
            if line.startswith("m="):
                media_types[mline_index] = line[2 : line.find(" ")]
                mline_index += 1

        self.media_types = media_types

    def _a_call(self, method, *args, **kwargs):
        """Call an async method in main thread"""
        aio.run_from_thread(method, *args, **kwargs, loop=self.main_loop)

    def get_payload_types(
        self, sdpmsg, video_encoding: str, audio_encoding: str
    ) -> dict[str, int | None]:
        """Find the payload types for the specified video and audio encoding.

        Very simplistically finds the first payload type matching the encoding
        name. More complex applications will want to match caps on
        profile-level-id, packetization-mode, etc.
        """
        # method coming from gstreamer example (Matthew Waters, Nirbheek Chauhan) at
        # subprojects/gst-examples/webrtc/sendrecv/gst/webrtc_sendrecv.py
        video_pt = None
        audio_pt = None
        for i in range(0, sdpmsg.medias_len()):
            media = sdpmsg.get_media(i)
            for j in range(0, media.formats_len()):
                fmt = media.get_format(j)
                if fmt == "webrtc-datachannel":
                    continue
                pt = int(fmt)
                caps = media.get_caps_from_media(pt)
                s = caps.get_structure(0)
                encoding_name = s["encoding-name"]
                if video_pt is None and encoding_name == video_encoding:
                    video_pt = pt
                elif audio_pt is None and encoding_name == audio_encoding:
                    audio_pt = pt
        return {video_encoding: video_pt, audio_encoding: audio_pt}

    def parse_ice_candidate(self, candidate_string):
        """Parses the ice candidate string.

        @param candidate_string: The ice candidate string to be parsed.
        """
        pattern = re.compile(
            r"candidate:(?P<foundation>\S+) (?P<component_id>\d+) (?P<transport>\S+) "
            r"(?P<priority>\d+) (?P<address>\S+) (?P<port>\d+) typ "
            r"(?P<type>\S+)(?: raddr (?P<rel_addr>\S+) rport "
            r"(?P<rel_port>\d+))?(?: generation (?P<generation>\d+))?"
        )
        match = pattern.match(candidate_string)
        if match:
            candidate_dict = match.groupdict()

            # Apply the correct types to the dictionary values
            candidate_dict["component_id"] = int(candidate_dict["component_id"])
            candidate_dict["priority"] = int(candidate_dict["priority"])
            candidate_dict["port"] = int(candidate_dict["port"])

            if candidate_dict["rel_port"]:
                candidate_dict["rel_port"] = int(candidate_dict["rel_port"])

            if candidate_dict["generation"]:
                candidate_dict["generation"] = candidate_dict["generation"]

            # Remove None values
            return {k: v for k, v in candidate_dict.items() if v is not None}
        else:
            log.warning(f"can't parse candidate: {candidate_string!r}")
            return None

    def build_ice_candidate(self, parsed_candidate):
        """Builds ICE candidate

        @param parsed_candidate: Dictionary containing parsed ICE candidate
        """
        base_format = (
            "candidate:{foundation} {component_id} {transport} {priority} "
            "{address} {port} typ {type}"
        )

        if parsed_candidate.get("rel_addr") and parsed_candidate.get("rel_port"):
            base_format += " raddr {rel_addr} rport {rel_port}"

        if parsed_candidate.get("generation"):
            base_format += " generation {generation}"

        return base_format.format(**parsed_candidate)

    def extract_ufrag_pwd(self, sdp: str) -> tuple[str, str]:
        """Retrieves ICE password and user fragment for SDP offer.

        @param sdp: The Session Description Protocol offer string.
        @return: ufrag and pwd
        @raise ValueError: Can't extract ufrag and password
        """
        ufrag_line = re.search(r"ice-ufrag:(\S+)", sdp)
        pwd_line = re.search(r"ice-pwd:(\S+)", sdp)

        if ufrag_line and pwd_line:
            ufrag = self.ufrag = ufrag_line.group(1)
            pwd = self.pwd = pwd_line.group(1)
            return ufrag, pwd
        else:
            log.error(f"SDP with missing ice-ufrag or ice-pwd:\n{sdp}")
            raise ValueError("Can't extract ice-ufrag and ice-pwd from SDP")

    def reset_instance(self):
        """Inits or resets the instance variables to their default state."""
        self.role: str | None = None
        if self.pipeline is not None:
            self.pipeline.set_state(Gst.State.NULL)
        self.pipeline = None
        self._remote_video_pad = None
        self.sid: str | None = None
        self.offer: str | None = None
        self.local_candidates_buffer = {}
        self.ufrag: str | None = None
        self.pwd: str | None = None
        self.callee: str | None = None
        self._media_types = None
        self._media_types_inv = None
        self._sdp_set: bool = False
        self.remote_candidates_buffer: dict[str, dict[str, list]] = {
            "audio": {"candidates": []},
            "video": {"candidates": []},
        }
        self._media_types = None
        self._media_types_inv = None
        self.audio_valve = None
        self.video_valve = None
        if self.desktop_portal is not None:
            self.desktop_portal.end_screenshare()
        self.desktop_sharing = False
        self.desktop_sink_pad = None
        self.bindings = {}
        if self.reset_cb is not None:
            self.reset_cb()


    async def setup_call(
        self,
        role: str,
        audio_pt: int | None = 96,
        video_pt: int | None = 97,
    ) -> None:
        """Sets up the call.

        This method establishes the Gstreamer pipeline for audio and video communication.
        The method also manages STUN and TURN server configurations, signal watchers, and
        various connection handlers for the webrtcbin.

        @param role: The role for the call, either 'initiator' or 'responder'.
        @param audio_pt: The payload type for the audio stream.
        @param video_pt: The payload type for the video stream

        @raises NotImplementedError: If audio_pt or video_pt is set to None.
        @raises AssertionError: If the role is not 'initiator' or 'responder'.
        """
        assert role in ("initiator", "responder")
        self.role = role
        if audio_pt is None or video_pt is None:
            raise NotImplementedError("None value is not handled yet")

        if self.sources == SOURCES_AUTO:
            video_source_elt = "v4l2src"
            audio_source_elt = "pulsesrc"
        elif self.sources == SOURCES_TEST:
            video_source_elt = "videotestsrc is-live=true pattern=ball"
            audio_source_elt = "audiotestsrc"
        else:
            raise exceptions.InternalError(f'Unknown "sources" value: {self.sources!r}')


        if self.sinks == SINKS_APP:
            local_video_sink_elt = (
                "appsink name=local_video_sink emit-signals=true drop=true max-buffers=1 "
                "sync=True"
            )
        elif self.sinks == SINKS_AUTO:
            local_video_sink_elt = "autovideosink"
        else:
            raise exceptions.InternalError(f"Unknown sinks value {self.sinks!r}")

        if self.merge_pip:
            extra_elt = (
                "compositor name=compositor background=black "
                f"! video/x-raw,width={self.target_width},height={self.target_height},"
                "framerate=30/1 "
                f"! {local_video_sink_elt}"
            )
            local_video_sink_elt = "compositor.sink_1"
        else:
            extra_elt = ""

        self.gst_pipe_desc = f"""
        webrtcbin latency=100 name=sendrecv bundle-policy=max-bundle

        input-selector name=video_selector
        ! videorate
        ! video/x-raw,framerate=30/1
        ! tee name=t

        {extra_elt}

        {video_source_elt} name=video_src ! queue leaky=downstream ! video_selector.
        videotestsrc name=muted_src is-live=true pattern=black ! queue leaky=downstream ! video_selector.

        t.
        ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream
        ! videoconvert
        ! vp8enc deadline=1 keyframe-max-dist=60
        ! rtpvp8pay picture-id-mode=15-bit
        ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt}
        ! sendrecv.

        t.
        ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream
        ! videoconvert
        ! {local_video_sink_elt}

        {audio_source_elt} name=audio_src
        ! valve
        ! queue max-size-buffers=10 max-size-time=0 max-size-bytes=0 leaky=downstream
        ! audioconvert
        ! audioresample
        ! opusenc audio-type=voice
        ! rtpopuspay
        ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt}
        ! sendrecv.
        """

        log.debug(f"Gstreamer pipeline: {self.gst_pipe_desc}")

        # Create the pipeline
        try:
            self.pipeline = Gst.parse_launch(self.gst_pipe_desc)
        except Exception:
            log.exception("Can't parse pipeline")
            self.pipeline = None
        if not self.pipeline:
            raise exceptions.InternalError("Failed to create Gstreamer pipeline.")

        self.webrtcbin = self.pipeline.get_by_name("sendrecv")
        self.video_src = self.pipeline.get_by_name("video_src")
        self.muted_src = self.pipeline.get_by_name("muted_src")
        self.video_selector = self.pipeline.get_by_name("video_selector")
        self.audio_valve = self.pipeline.get_by_name("audio_valve")

        if self.video_muted:
            self.on_video_mute(True)
        if self.audio_muted:
            self.on_audio_mute(True)

        # set STUN and TURN servers
        external_disco = data_format.deserialise(
            await self.bridge.external_disco_get("", self.profile), type_check=list
        )

        for server in external_disco:
            if server["type"] == "stun":
                if server["transport"] == "tcp":
                    log.info(
                        "ignoring TCP STUN server, GStreamer only support one STUN server"
                    )
                url = f"stun://{server['host']}:{server['port']}"
                log.debug(f"adding stun server: {url}")
                self.webrtcbin.set_property("stun-server", url)
            elif server["type"] == "turn":
                url = "{scheme}://{username}:{password}@{host}:{port}".format(
                    scheme="turns" if server["transport"] == "tcp" else "turn",
                    username=quote_plus(server["username"]),
                    password=quote_plus(server["password"]),
                    host=server["host"],
                    port=server["port"],
                )
                log.debug(f"adding turn server: {url}")

                if not self.webrtcbin.emit("add-turn-server", url):
                    log.warning(f"Erreur while adding TURN server {url}")

        # local video feedback
        if self.sinks == SINKS_APP:
            assert self.appsink_data is not None
            local_video_sink = self.pipeline.get_by_name("local_video_sink")
            local_video_sink.set_property("emit-signals", True)
            local_video_sink.connect("new-sample", self.appsink_data.local_video_cb)
            local_video_sink_caps = Gst.Caps.from_string(f"video/x-raw,format=RGB")
            local_video_sink.set_property("caps", local_video_sink_caps)

        # Create bus and associate signal watchers
        self.bus = self.pipeline.get_bus()
        if not self.bus:
            log.error("Failed to get bus from pipeline.")
            return

        self.bus.add_signal_watch()
        self.webrtcbin.connect("pad-added", self.on_pad_added)
        self.bus.connect("message::error", self.on_bus_error)
        self.bus.connect("message::eos", self.on_bus_eos)
        self.webrtcbin.connect("on-negotiation-needed", self.on_negotiation_needed)
        self.webrtcbin.connect("on-ice-candidate", self.on_ice_candidate)
        self.webrtcbin.connect(
            "notify::ice-gathering-state", self.on_ice_gathering_state_change
        )
        self.webrtcbin.connect(
            "notify::ice-connection-state", self.on_ice_connection_state
        )

    def start_pipeline(self) -> None:
        """Starts the GStreamer pipeline."""
        log.debug("starting the pipeline")
        self.pipeline.set_state(Gst.State.PLAYING)

    def on_negotiation_needed(self, webrtc):
        """Initiate SDP offer when negotiation is needed."""
        log.debug("Negotiation needed.")
        if self.role == "initiator":
            log.debug("Creating offer…")
            promise = Gst.Promise.new_with_change_func(self.on_offer_created)
            self.webrtcbin.emit("create-offer", None, promise)

    def on_offer_created(self, promise):
        """Callback for when SDP offer is created."""
        log.info("on_offer_created called")
        assert promise.wait() == Gst.PromiseResult.REPLIED
        reply = promise.get_reply()
        if reply is None:
            log.error("Promise reply is None. Offer creation might have failed.")
            return
        offer = reply["offer"]
        self.offer = offer.sdp.as_text()
        log.info(f"SDP offer created: \n{self.offer}")
        self._set_media_types(self.offer)
        promise = Gst.Promise.new()
        self.webrtcbin.emit("set-local-description", offer, promise)
        promise.interrupt()
        self._a_call(self._start_call)

    def on_answer_set(self, promise):
        assert promise.wait() == Gst.PromiseResult.REPLIED

    def on_answer_created(self, promise, _, __):
        """Callback for when SDP answer is created."""
        assert promise.wait() == Gst.PromiseResult.REPLIED
        reply = promise.get_reply()
        answer = reply["answer"]
        promise = Gst.Promise.new()
        self.webrtcbin.emit("set-local-description", answer, promise)
        promise.interrupt()
        answer_sdp = answer.sdp.as_text()
        log.info(f"SDP answer set: \n{answer_sdp}")
        self.sdp_set = True
        self._a_call(self.bridge.call_answer_sdp, self.sid, answer_sdp, self.profile)

    def on_offer_set(self, promise):
        assert promise.wait() == Gst.PromiseResult.REPLIED
        promise = Gst.Promise.new_with_change_func(self.on_answer_created, None, None)
        self.webrtcbin.emit("create-answer", None, promise)

    def link_element_or_pad(
        self, source: Gst.Element, dest: Gst.Element | Gst.Pad
    ) -> bool:
        """Check if dest is a pad or an element, and link appropriately"""
        src_pad = source.get_static_pad("src")

        if isinstance(dest, Gst.Pad):
            # If the dest is a pad, link directly
            if not src_pad.link(dest) == Gst.PadLinkReturn.OK:
                log.error(
                    "Failed to link 'conv' to the compositor's newly requested pad!"
                )
                return False
        elif isinstance(dest, Gst.Element):
            return source.link(dest)
        else:
            log.error(f"Unexpected type for dest: {type(sink)}")
            return False

        return True

    def scaled_dimensions(
        self, original_width: int, original_height: int, max_width: int, max_height: int
    ) -> tuple[int, int]:
        """Calculates the scaled dimensions preserving aspect ratio.

        @param original_width: Original width of the video stream.
        @param original_height: Original height of the video stream.
        @param max_width: Maximum desired width for the scaled video.
        @param max_height: Maximum desired height for the scaled video.
        @return: The width and height of the scaled video.
        """
        aspect_ratio = original_width / original_height
        new_width = int(max_height * aspect_ratio)

        if new_width <= max_width:
            return new_width, max_height

        new_height = int(max_width / aspect_ratio)
        return max_width, new_height

    def on_remote_decodebin_stream(self, _, pad: Gst.Pad) -> None:
        """Handle the stream from the remote decodebin.

        This method processes the incoming stream from the remote decodebin, determining
        whether it's video or audio. It then sets up the appropriate GStreamer elements
        for video/audio processing and adds them to the pipeline.

        @param pad: The Gst.Pad from the remote decodebin producing the stream.
        """
        assert self.pipeline is not None
        if not pad.has_current_caps():
            log.error(f"{pad} has no caps, ignoring")
            return

        caps = pad.get_current_caps()
        assert len(caps)
        s = caps[0]
        name = s.get_name()
        log.debug(f"====> NAME START: {name}")

        q = Gst.ElementFactory.make("queue")

        if name.startswith("video"):
            log.debug("===> VIDEO OK")

            self._remote_video_pad = pad

            # Check and log the original size of the video
            width = self.target_width
            height = self.target_height
            log.info(f"Original video size: {width}x{height}")

            # This is a fix for an issue found with Movim on desktop: a non standard
            # resolution is used (990x557) resulting in bad alignement and no color in
            # rendered image
            adjust_resolution = width % 4 != 0 or height % 4 != 0
            if adjust_resolution:
                log.info("non standard resolution, we need to adjust size")
                width = (width + 3) // 4 * 4
                height = (height + 3) // 4 * 4
                log.info(f"Adjusted video size: {width}x{height}")

            conv = Gst.ElementFactory.make("videoconvert")
            if self.merge_pip:
                # with ``merge_pip`` set, we plug the remote stream to the composer
                compositor = self.pipeline.get_by_name("compositor")

                sink1_pad = compositor.get_static_pad("sink_1")

                local_width, local_height = self.scaled_dimensions(
                    sink1_pad.get_property("width"),
                    sink1_pad.get_property("height"),
                    width // 3,
                    height // 3,
                )

                sink1_pad.set_property("xpos", width - local_width)
                sink1_pad.set_property("ypos", height - local_height)
                sink1_pad.set_property("width", local_width)
                sink1_pad.set_property("height", local_height)
                sink1_pad.set_property("sizing-policy", 1)
                sink1_pad.set_property("zorder", 1)

                # Request a new pad for the remote stream
                sink_pad_template = compositor.get_pad_template("sink_%u")
                remote_video_sink = compositor.request_pad(sink_pad_template, None, None)
                remote_video_sink.set_property("zorder", 0)
                remote_video_sink.set_property("width", width)
                remote_video_sink.set_property("height", height)
                remote_video_sink.set_property("sizing-policy", 1)
            elif self.sinks == SINKS_APP:
                # ``app`` sink without ``self.merge_pip`` set, be create the sink and
                # connect it to the ``remote_video_cb``.
                assert self.appsink_data is not None
                remote_video_sink = Gst.ElementFactory.make("appsink")

                remote_video_caps = Gst.Caps.from_string("video/x-raw,format=RGB")
                remote_video_sink.set_property("caps", remote_video_caps)

                remote_video_sink.set_property("emit-signals", True)
                remote_video_sink.set_property("drop", True)
                remote_video_sink.set_property("max-buffers", 1)
                remote_video_sink.set_property("sync", True)
                remote_video_sink.connect("new-sample", self.appsink_data.remote_video_cb)
                self.pipeline.add(remote_video_sink)
            elif self.sinks == SINKS_AUTO:
                # if ``self.merge_pip`` is not set, we create a dedicated
                # ``autovideosink`` for remote stream.
                remote_video_sink = Gst.ElementFactory.make("autovideosink")
                self.pipeline.add(remote_video_sink)
            else:
                raise exceptions.InternalError(f'Unhandled "sinks" value: {self.sinks!r}')

            if adjust_resolution:
                videoscale = Gst.ElementFactory.make("videoscale")
                adjusted_caps = Gst.Caps.from_string(
                    f"video/x-raw,width={width},height={height}"
                )
                capsfilter = Gst.ElementFactory.make("capsfilter")
                capsfilter.set_property("caps", adjusted_caps)

                self.pipeline.add(q, conv, videoscale, capsfilter)


                self.pipeline.sync_children_states()
                ret = pad.link(q.get_static_pad("sink"))
                if ret != Gst.PadLinkReturn.OK:
                    log.error(f"Error linking pad: {ret}")
                q.link(conv)
                conv.link(videoscale)
                videoscale.link(capsfilter)
                self.link_element_or_pad(capsfilter.link, remote_video_sink)

            else:
                self.pipeline.add(q, conv)

                self.pipeline.sync_children_states()
                ret = pad.link(q.get_static_pad("sink"))
                if ret != Gst.PadLinkReturn.OK:
                    log.error(f"Error linking pad: {ret}")
                q.link(conv)
                self.link_element_or_pad(conv, remote_video_sink)

        elif name.startswith("audio"):
            log.debug("===> Audio OK")
            conv = Gst.ElementFactory.make("audioconvert")
            resample = Gst.ElementFactory.make("audioresample")
            remote_audio_sink = Gst.ElementFactory.make("autoaudiosink")
            self.pipeline.add(q, conv, resample, remote_audio_sink)
            self.pipeline.sync_children_states()
            ret = pad.link(q.get_static_pad("sink"))
            if ret != Gst.PadLinkReturn.OK:
                log.error(f"Error linking pad: {ret}")
            q.link(conv)
            conv.link(resample)
            resample.link(remote_audio_sink)

        else:
            log.warning(f"unmanaged name: {name!r}")

    def on_pad_added(self, __, pad: Gst.Pad) -> None:
        """Handle the addition of a new pad to the element.

        When a new source pad is added to the element, this method creates a decodebin,
        connects it to handle the stream, and links the pad to the decodebin.

        @param __: Placeholder for the signal source. Not used in this method.
        @param pad: The newly added pad.
        """
        log.debug("on_pad_added")
        if pad.direction != Gst.PadDirection.SRC:
            return

        decodebin = Gst.ElementFactory.make("decodebin")
        decodebin.connect("pad-added", self.on_remote_decodebin_stream)
        self.pipeline.add(decodebin)
        decodebin.sync_state_with_parent()
        pad.link(decodebin.get_static_pad("sink"))

    async def _start_call(self) -> None:
        """Initiate the call.

        Initiates a call with the callee using the stored offer. If there are any buffered
        local ICE candidates, they are sent as part of the initiation.
        """
        assert self.callee
        self.sid = await self.bridge.call_start(
            str(self.callee), data_format.serialise({"sdp": self.offer}), self.profile
        )
        if self.local_candidates_buffer:
            log.debug(
                f"sending buffered local ICE candidates: {self.local_candidates_buffer}"
            )
            if self.pwd is None:
                sdp = self.webrtcbin.props.local_description.sdp.as_text()
                self.extract_ufrag_pwd(sdp)
            ice_data = {}
            for media_type, candidates in self.local_candidates_buffer.items():
                ice_data[media_type] = {
                    "ufrag": self.ufrag,
                    "pwd": self.pwd,
                    "candidates": candidates,
                }
            await self.bridge.ice_candidates_add(
                self.sid, data_format.serialise(ice_data), self.profile
            )
            self.local_candidates_buffer.clear()

    def _remote_sdp_set(self, promise) -> None:
        assert promise.wait() == Gst.PromiseResult.REPLIED
        self.sdp_set = True

    def on_accepted_call(self, sdp: str, profile: str) -> None:
        """Outgoing call has been accepted.

        @param sdp: The SDP answer string received from the other party.
        @param profile: Profile used for the call.
        """
        log.debug(f"SDP answer received: \n{sdp}")

        __, sdpmsg = GstSdp.SDPMessage.new_from_text(sdp)
        answer = GstWebRTC.WebRTCSessionDescription.new(
            GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg
        )
        promise = Gst.Promise.new_with_change_func(self._remote_sdp_set)
        self.webrtcbin.emit("set-remote-description", answer, promise)

    async def answer_call(self, sdp: str, profile: str) -> None:
        """Answer an incoming call

        @param sdp: The SDP offer string received from the initiator.
        @param profile: Profile used for the call.

        @raise AssertionError: Raised when either "VP8" or "OPUS" is not present in
            payload types.
        """
        log.debug(f"SDP offer received: \n{sdp}")
        self._set_media_types(sdp)
        __, offer_sdp_msg = GstSdp.SDPMessage.new_from_text(sdp)
        payload_types = self.get_payload_types(
            offer_sdp_msg, video_encoding="VP8", audio_encoding="OPUS"
        )
        assert "VP8" in payload_types
        assert "OPUS" in payload_types
        await self.setup_call(
            "responder", audio_pt=payload_types["OPUS"], video_pt=payload_types["VP8"]
        )
        self.start_pipeline()
        offer = GstWebRTC.WebRTCSessionDescription.new(
            GstWebRTC.WebRTCSDPType.OFFER, offer_sdp_msg
        )
        promise = Gst.Promise.new_with_change_func(self.on_offer_set)
        self.webrtcbin.emit("set-remote-description", offer, promise)

    def on_ice_candidate(self, webrtc, mline_index, candidate_sdp):
        """Handles the on-ice-candidate signal of webrtcbin.

        @param webrtc: The webrtcbin element.
        @param mlineindex: The mline index.
        @param candidate: The ICE candidate.
        """
        log.debug(
            f"Local ICE candidate. MLine Index: {mline_index}, Candidate: {candidate_sdp}"
        )
        parsed_candidate = self.parse_ice_candidate(candidate_sdp)
        try:
            media_type = self.media_types[mline_index]
        except KeyError:
            raise exceptions.InternalError("can't find media type")

        if self.sid is None:
            log.debug("buffering local ICE candidate")
            self.local_candidates_buffer.setdefault(media_type, []).append(
                parsed_candidate
            )
        else:
            sdp = self.webrtcbin.props.local_description.sdp.as_text()
            assert sdp is not None
            ufrag, pwd = self.extract_ufrag_pwd(sdp)
            ice_data = {"ufrag": ufrag, "pwd": pwd, "candidates": [parsed_candidate]}
            self._a_call(
                self.bridge.ice_candidates_add,
                self.sid,
                data_format.serialise({media_type: ice_data}),
                self.profile,
            )

    def on_ice_candidates_new(self, candidates: dict) -> None:
        """Handle new ICE candidates.

        @param candidates: A dictionary containing media types ("audio" or "video") as
            keys and corresponding ICE data as values.

        @raise exceptions.InternalError: Raised when sdp mline index is not found.
        """
        if not self.sdp_set:
            log.debug("buffering remote ICE candidate")
            for media_type in ("audio", "video"):
                media_candidates = candidates.get(media_type)
                if media_candidates:
                    buffer = self.remote_candidates_buffer[media_type]
                    buffer["candidates"].extend(media_candidates["candidates"])
            return
        for media_type, ice_data in candidates.items():
            for candidate in ice_data["candidates"]:
                candidate_sdp = self.build_ice_candidate(candidate)
                try:
                    mline_index = self.get_sdp_mline_index(media_type)
                except Exception as e:
                    raise exceptions.InternalError(f"Can't find sdp mline index: {e}")
                self.webrtcbin.emit("add-ice-candidate", mline_index, candidate_sdp)
                log.debug(
                    f"Remote ICE candidate added. MLine Index: {mline_index}, "
                    f"{candidate_sdp}"
                )

    def on_ice_gathering_state_change(self, pspec, __):
        state = self.webrtcbin.get_property("ice-gathering-state")
        log.debug(f"ICE gathering state changed to {state}")

    def on_ice_connection_state(self, pspec, __):
        state = self.webrtcbin.props.ice_connection_state
        if state == GstWebRTC.WebRTCICEConnectionState.FAILED:
            log.error("ICE connection failed")
        log.info(f"ICE connection state changed to {state}")

    def on_bus_error(self, bus: Gst.Bus, message: Gst.Message) -> None:
        """Handles the GStreamer bus error messages.

        @param bus: The GStreamer bus.
        @param message: The error message.
        """
        err, debug = message.parse_error()
        log.error(f"Error from {message.src.get_name()}: {err.message}")
        log.error(f"Debugging info: {debug}")

    def on_bus_eos(self, bus: Gst.Bus, message: Gst.Message) -> None:
        """Handles the GStreamer bus eos messages.

        @param bus: The GStreamer bus.
        @param message: The eos message.
        """
        log.info("End of stream")

    def on_audio_mute(self, muted: bool) -> None:
        """Handles (un)muting of audio.

        @param muted: True if audio is muted.
        """
        if self.audio_valve is not None:
            self.audio_valve.set_property("drop", muted)
            state = "muted" if muted else "unmuted"
            log.info(f"audio is now {state}")

    def on_video_mute(self, muted: bool) -> None:
        """Handles (un)muting of video.

        @param muted: True if video is muted.
        """
        if self.video_selector is not None:
            current_source = None if muted else "desktop" if self.desktop_sharing else "video"
            self.switch_video_source(current_source)
            state = "muted" if muted else "unmuted"
            log.info(f"Video is now {state}")

    def on_desktop_switch(self, desktop_active: bool) -> None:
        """Switches the video source between desktop and video.

        @param desktop_active: True if desktop must be active. False for video.
        """
        if desktop_active and self.desktop_portal is not None:
            aio.run_async(self.on_desktop_switch_portal(desktop_active))
        else:
            self.do_desktop_switch(desktop_active)

    async def on_desktop_switch_portal(self, desktop_active: bool) -> None:
        """Call freedesktop screenshare portal and the activate the shared stream"""
        assert self.desktop_portal is not None
        try:
            screenshare_data = await self.desktop_portal.request_screenshare()
        except exceptions.CancelError:
            self.desktop_sharing = False
            return
        self.desktop_sharing_data = {
            "path": str(screenshare_data["node_id"])
        }
        self.do_desktop_switch(desktop_active)

    def do_desktop_switch(self, desktop_active: bool) -> None:
        if self.video_muted:
            # Update the active source state but do not switch
            self.desktop_sharing = desktop_active
            return

        source = "desktop" if desktop_active else "video"
        self.switch_video_source(source)
        self.desktop_sharing = desktop_active

    def switch_video_source(self, source: str|None) -> None:
        """Activates the specified source while deactivating the others.

        @param source: 'desktop', 'video', 'muted' or None for muted source.
        """
        if source is None:
            source = "muted"
        if source not in ["video", "muted", "desktop"]:
            raise ValueError(
                f"Invalid source: {source!r}, use one of {'video', 'muted', 'desktop'}"
            )

        self.pipeline.set_state(Gst.State.PAUSED)

        # Create a new desktop source if necessary
        if source == "desktop":
            self._setup_desktop_source(self.desktop_sharing_data)

        # Activate the chosen source and deactivate the others
        for src_name in ["video", "muted", "desktop"]:
            src_element = self.pipeline.get_by_name(f"{src_name}_src")
            if src_name == source:
                if src_element:
                    src_element.set_state(Gst.State.PLAYING)
            else:
                if src_element:
                    if src_name == "desktop":
                        self._remove_desktop_source(src_element)
                    else:
                        src_element.set_state(Gst.State.NULL)

        # Set the video_selector active pad
        if source == "desktop":
            if self.desktop_sink_pad:
                pad = self.desktop_sink_pad
            else:
               log.error(f"No desktop pad available")
               pad = None
        else:
            pad_name = f"sink_{['video', 'muted'].index(source)}"
            pad = self.video_selector.get_static_pad(pad_name)

        if pad is not None:
           self.video_selector.props.active_pad = pad

        self.pipeline.set_state(Gst.State.PLAYING)

    def _setup_desktop_source(self, properties: dict[str, object]|None) -> None:
        """Set up a new desktop source.

        @param properties: The properties to set on the desktop source.
        """
        source_elt = "ximagesrc" if self.desktop_portal is None else "pipewiresrc"
        desktop_src = Gst.ElementFactory.make(source_elt, "desktop_src")
        if properties is None:
            properties = {}
        for key, value in properties.items():
            log.debug(f"setting {source_elt} property: {key!r}={value!r}")
            desktop_src.set_property(key, value)
        video_convert = Gst.ElementFactory.make("videoconvert", "desktop_videoconvert")
        queue = Gst.ElementFactory.make("queue", "desktop_queue")
        queue.set_property("leaky", "downstream")

        self.pipeline.add(desktop_src)
        self.pipeline.add(video_convert)
        self.pipeline.add(queue)

        desktop_src.link(video_convert)
        video_convert.link(queue)

        sink_pad_template = self.video_selector.get_pad_template("sink_%u")
        self.desktop_sink_pad = self.video_selector.request_pad(sink_pad_template, None, None)
        queue_src_pad = queue.get_static_pad("src")
        queue_src_pad.link(self.desktop_sink_pad)

        desktop_src.sync_state_with_parent()
        video_convert.sync_state_with_parent()
        queue.sync_state_with_parent()

    def _remove_desktop_source(self, desktop_src: Gst.Element) -> None:
        """Remove the desktop source from the pipeline.

        @param desktop_src: The desktop source to remove.
        """
        # Remove elements for the desktop source
        video_convert = self.pipeline.get_by_name("desktop_videoconvert")
        queue = self.pipeline.get_by_name("desktop_queue")

        if video_convert:
            video_convert.set_state(Gst.State.NULL)
            desktop_src.unlink(video_convert)
            self.pipeline.remove(video_convert)

        if queue:
            queue.set_state(Gst.State.NULL)
            self.pipeline.remove(queue)

        desktop_src.set_state(Gst.State.NULL)
        self.pipeline.remove(desktop_src)

        # Release the pad associated with the desktop source
        if self.desktop_sink_pad:
            self.video_selector.release_request_pad(self.desktop_sink_pad)
            self.desktop_sink_pad = None

        if self.desktop_portal is not None:
            self.desktop_portal.end_screenshare()

    async def end_call(self) -> None:
        """Stop streaming and clean instance"""
        self.reset_instance()