diff libervia/frontends/tools/webrtc.py @ 4240:79c8a70e1813

backend, frontend: prepare remote control: This is a series of changes necessary to prepare the implementation of remote control feature: - XEP-0166: add a `priority` attribute to `ApplicationData`: this is needed when several applications are working in a same session, to know which one must be handled first. Will be used to make Remote Control have precedence over Call content. - XEP-0166: `_call_plugins` is now async and is not used with `DeferredList` anymore: the benefit to have methods called in parallels is very low, and it cause a lot of trouble as we can't predict order. Methods are now called sequentially so workflow can be predicted. - XEP-0167: fix `senders` XMPP attribute <=> SDP mapping - XEP-0234: preflight acceptance key is now `pre-accepted` instead of `file-accepted`, so the same key can be used with other jingle applications. - XEP-0167, XEP-0343: move some method to XEP-0167 - XEP-0353: use new `priority` feature to call preflight methods of applications according to it. - frontend (webrtc): refactor the sources/sink handling with a more flexible mechanism based on Pydantic models. It is now possible to have has many Data Channel as necessary, to have them in addition to A/V streams, to specify manually GStreamer sources and sinks, etc. - frontend (webrtc): rework of the pipeline to reduce latency. - frontend: new `portal_desktop` method. Screenshare portal handling has been moved there, and RemoteDesktop portal has been added. - frontend (webrtc): fix `extract_ufrag_pwd` method. rel 436
author Goffi <goffi@goffi.org>
date Sat, 11 May 2024 13:52:41 +0200
parents d01b8d002619
children 0d7bb4df2343
line wrap: on
line diff
--- a/libervia/frontends/tools/webrtc.py	Sat May 11 13:25:45 2024 +0200
+++ b/libervia/frontends/tools/webrtc.py	Sat May 11 13:52:41 2024 +0200
@@ -35,13 +35,25 @@
 from datetime import datetime
 import logging
 import re
-from typing import Callable
+from typing import Callable, Final
 from urllib.parse import quote_plus
 
 from libervia.backend.tools.common import data_format
 from libervia.frontends.tools import aio, display_servers, jid
-from .webrtc_models import AppSinkData, CallData
-from .webrtc_screenshare import DesktopPortal
+from .webrtc_models import (
+    CallData,
+    SinksApp,
+    SinksAuto,
+    SinksData,
+    SinksDataChannel,
+    SinksNone,
+    SourcesAuto,
+    SourcesData,
+    SourcesDataChannel,
+    SourcesNone,
+    SourcesPipeline,
+    SourcesTest,
+)
 
 current_server = display_servers.detect()
 if current_server == display_servers.X11:
@@ -52,17 +64,12 @@
 
 
 log = logging.getLogger(__name__)
+VIDEO_SOURCE_AUTO: Final = "v4l2src"
+AUDIO_SOURCE_AUTO: Final = "pulsesrc"
+NONE_NOT_IMPLEMENTED_MSG: Final = "None value is not handled yet."
 
 Gst.init(None)
 
-SOURCES_AUTO = "auto"
-SOURCES_TEST = "test"
-SOURCES_DATACHANNEL = "datachannel"
-SINKS_APP = "app"
-SINKS_AUTO = "auto"
-SINKS_TEST = "test"
-SINKS_DATACHANNEL = "datachannel"
-
 
 class WebRTC:
     """GSTreamer based WebRTC implementation for audio and video communication.
@@ -75,44 +82,45 @@
         self,
         bridge,
         profile: str,
-        sources: str = SOURCES_AUTO,
-        sinks: str = SINKS_AUTO,
-        appsink_data: AppSinkData | None = None,
+        sources_data: SourcesData|None = None,
+        sinks_data: SinksData|None = None,
         reset_cb: Callable | None = None,
         merge_pip: bool | None = None,
         target_size: tuple[int, int] | None = None,
         call_start_cb: Callable[[str, dict, str], Awaitable[str]] | None = None,
-        dc_open_cb: (
-            Callable[[GstWebRTC.WebRTCDataChannel], Awaitable[None]] | None
-        ) = None,
-        dc_on_data_channel: (
-            Callable[[GstWebRTC.WebRTCDataChannel], Awaitable[None]] | None
-        ) = None,
+        dc_data_list: list[SourcesDataChannel|SinksDataChannel]|None = None
     ) -> None:
         """Initializes a new WebRTC instance.
 
         @param bridge: An instance of backend bridge.
         @param profile: Libervia profile.
-        @param sources: Which kind of source to use.
-        @param sinks: Which kind of sinks to use.
-        @param appsink_data: configuration data for appsink (when SINKS_APP is used). Must
-            not be used for other sinks.
+        @param sources_data: Data of the sources.
+            The model used will determine which sources to use.
+            SourcesDataChannel can be used here as a convenience. It will then be moved
+            to ``data_channels`` and ``SourcesNone`` will be used instead for
+            ``sources_data``.
+            If None, SourcesAuto will be used.
+        @param sinks_data: Data of the sinks.
+            The model used will determine which sinks to use.
+            SinksDataChannel can be used here as a convenience. It will then be moved
+            to ``data_channels`` and ``SinksNone`` will be used instead for
+            ``sinks_data``.
+            If None, SinksAuto will be used.
         @param reset_cb: An optional Callable that is triggered on reset events. Can be
             used to reset UI data on new calls.
         @param merge_pip: A boolean flag indicating whether Picture-in-Picture mode is
             enabled. When PiP is used, local feedback is merged to remote video stream.
             Only one video stream is then produced (the local one).
             If None, PiP mode is selected automatically according to selected sink (it's
-            used for SINKS_AUTO only for now).
+            used for SinksAuto only for now).
         @param target_size: Expected size of the final sink stream. Mainly use by composer
             when ``merge_pip`` is set.
-            None to autodetect (not real autodetection implemeted yet, default to
+            None to autodetect (no real autodetection implemeted yet, default to
             (1280,720)).
         @param call_start_cb: Called when call is started.
-        @param dc_open_cb: Called when Data Channel is open (for SOURCES_DATACHANNEL).
-            This callback will be run in a GStreamer thread.
-        @param dc_open_cb: Called when Data Channel is created (for SINKS_DATACHANNEL).
-            This callback will be run in a GStreamer thread.
+        @param dc_data_list: Data Channels to create.
+            If a SourcesDataChannel is used as ``sources_data``, or a SinksDataChannel is
+            used as ``sinks_data``, they will be automatically added to this list.
         """
         self.main_loop = asyncio.get_event_loop()
         self.bridge = bridge
@@ -122,38 +130,40 @@
         self._video_muted = False
         self._desktop_sharing = False
         self.desktop_sharing_data = None
-        self.sources = sources
-        self.sinks = sinks
+        if dc_data_list is None:
+            dc_data_list = []
+        self.dc_data_list = dc_data_list
+        if sources_data is None:
+            sources_data = SourcesAuto()
+        elif isinstance(sources_data, SourcesDataChannel):
+            dc_data_list.append(sources_data)
+            sources_data = SourcesNone()
+        self.sources_data = sources_data
+        if sinks_data is None:
+            sinks_data = SinksAuto()
+        elif isinstance(sinks_data, SinksDataChannel):
+            dc_data_list.append(sinks_data)
+            sinks_data = SinksNone()
+        self.sinks_data = sinks_data
         if target_size is None:
             target_size = (1280, 720)
         self.target_width, self.target_height = target_size
         if merge_pip is None:
-            merge_pip = sinks == SINKS_AUTO
+            merge_pip = isinstance(sinks_data, SinksAuto)
         self.merge_pip = merge_pip
         if call_start_cb is None:
             call_start_cb = self._call_start
         self.call_start_cb = call_start_cb
-        if sources == SOURCES_DATACHANNEL:
-            assert dc_open_cb is not None
-            self.dc_open_cb = dc_open_cb
-        if sinks == SINKS_DATACHANNEL:
-            assert dc_on_data_channel is not None
-            self.dc_on_data_channel = dc_on_data_channel
-        if sinks == SINKS_APP:
-            if (
-                merge_pip
-                and appsink_data is not None
-                and appsink_data.remote_video_cb is not None
-            ):
+        if isinstance(sinks_data, SinksApp):
+            if merge_pip and sinks_data.remote_video_cb is not None:
                 raise ValueError("Remote_video_cb can't be used when merge_pip is used!")
-            self.appsink_data = appsink_data
-        elif appsink_data is not None:
-            raise exceptions.InternalError(
-                "appsink_data can only be used for SINKS_APP sinks"
-            )
         self.reset_cb = reset_cb
         if current_server == display_servers.WAYLAND:
-            self.desktop_portal = DesktopPortal(self)
+            from .portal_desktop import DesktopPortal
+
+            self.desktop_portal = DesktopPortal(
+                on_session_closed_cb=self.on_portal_session_closed
+            )
         else:
             self.desktop_portal = None
         self.reset_instance()
@@ -370,23 +380,46 @@
 
         return base_format.format(**parsed_candidate)
 
-    def extract_ufrag_pwd(self, sdp: str) -> tuple[str, str]:
+    def extract_ufrag_pwd(self, sdp: str) -> None:
         """Retrieves ICE password and user fragment for SDP offer.
 
         @param sdp: The Session Description Protocol offer string.
-        @return: ufrag and pwd
-        @raise ValueError: Can't extract ufrag and password
         """
-        ufrag_line = re.search(r"ice-ufrag:(\S+)", sdp)
-        pwd_line = re.search(r"ice-pwd:(\S+)", sdp)
+        lines = sdp.splitlines()
+        media = ''
+        mid_media_map = {}
+        bundle_media = set()
+        bundle_ufrag = ''
+        bundle_pwd = ''
+        in_bundle = False
 
-        if ufrag_line and pwd_line:
-            ufrag = self.ufrag = ufrag_line.group(1)
-            pwd = self.pwd = pwd_line.group(1)
-            return ufrag, pwd
-        else:
-            log.error(f"SDP with missing ice-ufrag or ice-pwd:\n{sdp}")
-            raise ValueError("Can't extract ice-ufrag and ice-pwd from SDP")
+        for line in lines:
+            if line.startswith('m='):
+                media = line.split('=')[1].split()[0]
+            elif line.startswith('a=mid:'):
+                mid = line.split(':')[1].strip()
+                mid_media_map[mid] = media
+            elif line.startswith('a=group:BUNDLE'):
+                in_bundle = True
+                bundle_media = set(line.split(':')[1].strip().split())
+            elif line.startswith('a=ice-ufrag:'):
+                if in_bundle:
+                    bundle_ufrag = line.split(':')[1].strip()
+                else:
+                    self.ufrag[media] = line.split(':')[1].strip()
+            elif line.startswith('a=ice-pwd:'):
+                if in_bundle:
+                    bundle_pwd = line.split(':')[1].strip()
+                else:
+                    self.pwd[media] = line.split(':')[1].strip()
+            else:
+                in_bundle = False
+
+        if bundle_ufrag and bundle_pwd:
+            for mid in bundle_media:
+                media = mid_media_map[mid]
+                self.ufrag[media] = bundle_ufrag
+                self.pwd[media] = bundle_pwd
 
     def reset_instance(self):
         """Inits or resets the instance variables to their default state."""
@@ -398,8 +431,8 @@
         self.sid: str | None = None
         self.offer: str | None = None
         self.local_candidates_buffer = {}
-        self.ufrag: str | None = None
-        self.pwd: str | None = None
+        self.ufrag: dict[str, str] = {}
+        self.pwd: dict[str, str] = {}
         self.callee: jid.JID | None = None
         self._media_types = None
         self._media_types_inv = None
@@ -412,13 +445,25 @@
         self._media_types_inv = None
         self.audio_valve = None
         self.video_valve = None
+        self.video_selector = None
         if self.desktop_portal is not None:
-            self.desktop_portal.end_screenshare()
+            self.desktop_portal.end_session()
         self.desktop_sharing = False
         self.desktop_sink_pad = None
         self.bindings = {}
         if self.reset_cb is not None:
             self.reset_cb()
+        self.data_channels: dict[str, GstWebRTC.WebRTCDataChannel] = {}
+
+    @property
+    def data_channel(self) -> GstWebRTC.WebRTCDataChannel:
+        """Convenience method to get WebRTCDataChannel instance when there is only one."""
+        if len(self.data_channels) != 1:
+            raise exceptions.InternalError(
+                "self.data_channel can only be used in a single Data Channel scenario. "
+                "Use self.data_channels dict instead."
+            )
+        return next(iter(self.data_channels.values()))
 
     async def setup_call(
         self,
@@ -442,83 +487,126 @@
         assert role in ("initiator", "responder")
         self.role = role
 
-        if self.sources == SOURCES_DATACHANNEL or self.sinks == SINKS_DATACHANNEL:
-            # Setup pipeline for datachannel only, no media streams.
-            self.gst_pipe_desc = f"""
-            webrtcbin name=sendrecv bundle-policy=max-bundle
-            """
+
+        if isinstance(self.sources_data, SourcesPipeline):
+            if self.sources_data.video_pipeline!= "" and video_pt is None:
+                raise NotImplementedError(NONE_NOT_IMPLEMENTED_MSG)
+            if self.sources_data.audio_pipeline!= "" and audio_pt is None:
+                raise NotImplementedError(NONE_NOT_IMPLEMENTED_MSG)
+        elif isinstance(self.sources_data, SourcesNone):
+            pass
         else:
             if audio_pt is None or video_pt is None:
-                raise NotImplementedError("None value is not handled yet")
+                raise NotImplementedError(NONE_NOT_IMPLEMENTED_MSG)
 
-            if self.sources == SOURCES_AUTO:
-                video_source_elt = "v4l2src"
-                audio_source_elt = "pulsesrc"
-            elif self.sources == SOURCES_TEST:
+        match self.sources_data:
+            case SourcesAuto():
+                video_source_elt = VIDEO_SOURCE_AUTO
+                audio_source_elt = AUDIO_SOURCE_AUTO
+            case SourcesNone():
+                video_source_elt = ""
+                audio_source_elt = ""
+            case SourcesPipeline() as source:
+                if source.video_pipeline is None:
+                    video_source_elt = VIDEO_SOURCE_AUTO
+                else:
+                    video_source_elt = source.video_pipeline
+                if source.audio_pipeline is None:
+                    audio_source_elt = AUDIO_SOURCE_AUTO
+                else:
+                    audio_source_elt = source.audio_pipeline
+            case SourcesTest():
                 video_source_elt = "videotestsrc is-live=true pattern=ball"
                 audio_source_elt = "audiotestsrc"
-            else:
+            case _:
                 raise exceptions.InternalError(
-                    f'Unknown "sources" value: {self.sources!r}'
+                    f'Unexpected "sources_data" value: {self.sources_data!r}'
+                )
+
+        match self.sinks_data:
+            case SinksApp():
+                local_video_sink_elt = (
+                    "appsink name=local_video_sink emit-signals=true drop=true "
+                    "max-buffers=1 sync=True"
+                )
+            case SinksAuto():
+                if isinstance(self.sources_data, SourcesNone):
+                    local_video_sink_elt = ""
+                else:
+                    local_video_sink_elt = "autovideosink"
+            case SinksNone():
+                local_video_sink_elt = ""
+            case _:
+                raise exceptions.InternalError(
+                    f'Unexpected "sinks_data" value {self.sinks_data!r}'
                 )
 
-            if self.sinks == SINKS_APP:
-                local_video_sink_elt = (
-                    "appsink name=local_video_sink emit-signals=true drop=true max-buffers=1 "
-                    "sync=True"
-                )
-            elif self.sinks == SINKS_AUTO:
-                local_video_sink_elt = "autovideosink"
-            else:
-                raise exceptions.InternalError(f"Unknown sinks value {self.sinks!r}")
+        gst_pipe_elements = [
+            "webrtcbin latency=30 name=sendrecv bundle-policy=max-bundle"
+        ]
+
+        if self.merge_pip and local_video_sink_elt:
+            # Compositor is used to merge local video feedback in video sink, useful when
+            # we have only a single video sink.
+            gst_pipe_elements.append(
+                "compositor name=compositor background=black "
+                f"! video/x-raw,width={self.target_width},"
+                f"height={self.target_height},framerate=30/1 "
+                f"! {local_video_sink_elt}"
+            )
+            local_video_sink_elt = "compositor.sink_1"
 
-            if self.merge_pip:
-                extra_elt = (
-                    "compositor name=compositor background=black "
-                    f"! video/x-raw,width={self.target_width},height={self.target_height},"
-                    "framerate=30/1 "
-                    f"! {local_video_sink_elt}"
-                )
-                local_video_sink_elt = "compositor.sink_1"
-            else:
-                extra_elt = ""
+        if video_source_elt:
+            # Video source with an input-selector to switch between normal and video mute
+            # (or desktop sharing).
+            gst_pipe_elements.append(f"""
+        input-selector name=video_selector
+        ! videorate drop-only=1 max-rate=30
+        ! video/x-raw,framerate=30/1
+        ! tee name=t
 
-            self.gst_pipe_desc = f"""
-            webrtcbin latency=100 name=sendrecv bundle-policy=max-bundle
+        {video_source_elt} name=video_src
+        ! queue max-size-buffers=1 max-size-time=0 max-size-bytes=0 leaky=downstream
+        ! video_selector.
 
-            input-selector name=video_selector
-            ! videorate
-            ! video/x-raw,framerate=30/1
-            ! tee name=t
+        videotestsrc name=muted_src is-live=true pattern=black
+        ! queue leaky=downstream
+        ! video_selector.
 
-            {extra_elt}
-
-            {video_source_elt} name=video_src ! queue leaky=downstream ! video_selector.
-            videotestsrc name=muted_src is-live=true pattern=black ! queue leaky=downstream ! video_selector.
-
-            t.
-            ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream
-            ! videoconvert
-            ! vp8enc deadline=1 keyframe-max-dist=60
-            ! rtpvp8pay picture-id-mode=15-bit
-            ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt}
-            ! sendrecv.
+        t.
+        ! queue max-size-buffers=1 max-size-time=0 max-size-bytes=0 leaky=downstream
+        ! videoscale
+        ! videoconvert
+        ! vp8enc deadline=1 keyframe-max-dist=30
+        ! rtpvp8pay picture-id-mode=15-bit
+        ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt}
+        ! sendrecv.
+        """)
 
-            t.
-            ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream
-            ! videoconvert
-            ! {local_video_sink_elt}
+        if local_video_sink_elt:
+            # Local video feedback.
+            gst_pipe_elements.append(f"""
+        t.
+        ! queue max-size-buffers=1 max-size-time=0 max-size-bytes=0 leaky=downstream
+        ! videoconvert
+        ! {local_video_sink_elt}
+        """)
 
-            {audio_source_elt} name=audio_src
-            ! valve
-            ! queue max-size-buffers=10 max-size-time=0 max-size-bytes=0 leaky=downstream
-            ! audioconvert
-            ! audioresample
-            ! opusenc audio-type=voice
-            ! rtpopuspay
-            ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt}
-            ! sendrecv.
-            """
+        if audio_source_elt:
+            # Audio with a valve for muting.
+            gst_pipe_elements.append(r"""
+        {audio_source_elt} name=audio_src
+        ! valve
+        ! queue max-size-buffers=10 max-size-time=0 max-size-bytes=0 leaky=downstream
+        ! audioconvert
+        ! audioresample
+        ! opusenc audio-type=voice
+        ! rtpopuspay
+        ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt}
+        ! sendrecv.
+        """)
+
+        self.gst_pipe_desc = "\n\n".join(gst_pipe_elements)
 
         log.debug(f"Gstreamer pipeline: {self.gst_pipe_desc}")
 
@@ -542,18 +630,25 @@
         if self.webrtcbin is None:
             raise exceptions.InternalError("Can't get the pipeline.")
 
-        # For datachannel setups, media source, selector, and sink elements are not
-        # created
-        if self.sources != SOURCES_DATACHANNEL and self.sinks != SINKS_DATACHANNEL:
-            self.video_src = self.pipeline.get_by_name("video_src")
-            self.muted_src = self.pipeline.get_by_name("muted_src")
-            self.video_selector = self.pipeline.get_by_name("video_selector")
-            self.audio_valve = self.pipeline.get_by_name("audio_valve")
+        # If video or audio sources are not created, ``get_by_name`` will return None.
+        self.video_src = self.pipeline.get_by_name("video_src")
+        self.muted_src = self.pipeline.get_by_name("muted_src")
+        self.video_selector = self.pipeline.get_by_name("video_selector")
+        if self.video_src and isinstance(self.sources_data, SourcesPipeline):
+            for name, value in self.sources_data.video_properties.items():
+                self.video_src.set_property(name, value)
 
-            if self.video_muted:
-                self.on_video_mute(True)
-            if self.audio_muted:
-                self.on_audio_mute(True)
+        self.audio_src = self.pipeline.get_by_name("audio_src")
+        if self.audio_src and isinstance(self.sources_data, SourcesPipeline):
+            for name, value in self.sources_data.audio_properties.items():
+                self.audio_src.set_property(name, value)
+
+        self.audio_valve = self.pipeline.get_by_name("audio_valve")
+
+        if self.video_muted:
+            self.on_video_mute(True)
+        if self.audio_muted:
+            self.on_audio_mute(True)
 
         # set STUN and TURN servers
         external_disco = data_format.deserialise(
@@ -583,13 +678,13 @@
                     log.warning(f"Erreur while adding TURN server {url}")
 
         # local video feedback
-        if self.sinks == SINKS_APP and self.sources != SOURCES_DATACHANNEL:
-            assert self.appsink_data is not None
+        if isinstance(self.sinks_data, SinksApp):
             local_video_sink = self.pipeline.get_by_name("local_video_sink")
-            local_video_sink.set_property("emit-signals", True)
-            local_video_sink.connect("new-sample", self.appsink_data.local_video_cb)
-            local_video_sink_caps = Gst.Caps.from_string(f"video/x-raw,format=RGB")
-            local_video_sink.set_property("caps", local_video_sink_caps)
+            if local_video_sink is not None:
+                local_video_sink.set_property("emit-signals", True)
+                local_video_sink.connect("new-sample", self.sinks_data.local_video_cb)
+                local_video_sink_caps = Gst.Caps.from_string(f"video/x-raw,format=RGB")
+                local_video_sink.set_property("caps", local_video_sink_caps)
 
         # Create bus and associate signal watchers
         self.bus = self.pipeline.get_bus()
@@ -610,7 +705,13 @@
             "notify::ice-connection-state", self.on_ice_connection_state
         )
 
-        if self.sources == SOURCES_DATACHANNEL:
+        for dc_data in self.dc_data_list:
+            self.create_data_channel(dc_data)
+
+    def create_data_channel(self, dc_data: SourcesDataChannel|SinksDataChannel) -> None:
+        """Create a Data Channel and connect relevant callbacks."""
+        assert self.pipeline is not None
+        if isinstance(dc_data, SourcesDataChannel):
             # Data channel configuration for compatibility with browser defaults
             data_channel_options = Gst.Structure.new_empty("data-channel-options")
             data_channel_options.set_value("ordered", True)
@@ -618,15 +719,19 @@
 
             # Create the data channel
             self.pipeline.set_state(Gst.State.READY)
-            self.data_channel = self.webrtcbin.emit(
-                "create-data-channel", "file", data_channel_options
+            self.data_channels[dc_data.name] = data_channel = self.webrtcbin.emit(
+                "create-data-channel", dc_data.name, data_channel_options
             )
-            if self.data_channel is None:
+            if data_channel is None:
                 log.error("Failed to create data channel")
                 return
-            self.data_channel.connect("on-open", self.dc_open_cb)
-        if self.sinks == SINKS_DATACHANNEL:
-            self.webrtcbin.connect("on-data-channel", self.dc_on_data_channel)
+            data_channel.connect("on-open", dc_data.dc_open_cb)
+        elif isinstance(dc_data, SinksDataChannel):
+            self.webrtcbin.connect("on-data-channel", dc_data.dc_on_data_channel)
+        else:
+            raise ValueError(
+                "Only SourcesDataChannel or SinksDataChannel are allowed."
+            )
 
     def start_pipeline(self) -> None:
         """Starts the GStreamer pipeline."""
@@ -790,10 +895,9 @@
                 remote_video_sink.set_property("width", width)
                 remote_video_sink.set_property("height", height)
                 remote_video_sink.set_property("sizing-policy", 1)
-            elif self.sinks == SINKS_APP:
+            elif isinstance(self.sinks_data, SinksApp):
                 # ``app`` sink without ``self.merge_pip`` set, be create the sink and
                 # connect it to the ``remote_video_cb``.
-                assert self.appsink_data is not None
                 remote_video_sink = Gst.ElementFactory.make("appsink")
 
                 remote_video_caps = Gst.Caps.from_string("video/x-raw,format=RGB")
@@ -803,15 +907,17 @@
                 remote_video_sink.set_property("drop", True)
                 remote_video_sink.set_property("max-buffers", 1)
                 remote_video_sink.set_property("sync", True)
-                remote_video_sink.connect("new-sample", self.appsink_data.remote_video_cb)
+                remote_video_sink.connect("new-sample", self.sinks_data.remote_video_cb)
                 self.pipeline.add(remote_video_sink)
-            elif self.sinks == SINKS_AUTO:
+            elif isinstance(self.sinks_data, SinksAuto):
                 # if ``self.merge_pip`` is not set, we create a dedicated
                 # ``autovideosink`` for remote stream.
                 remote_video_sink = Gst.ElementFactory.make("autovideosink")
                 self.pipeline.add(remote_video_sink)
             else:
-                raise exceptions.InternalError(f'Unhandled "sinks" value: {self.sinks!r}')
+                raise exceptions.InternalError(
+                    f'Unhandled "sinks_data" value: {self.sinks_data!r}'
+                )
 
             if adjust_resolution:
                 videoscale = Gst.ElementFactory.make("videoscale")
@@ -898,14 +1004,14 @@
             log.debug(
                 f"sending buffered local ICE candidates: {self.local_candidates_buffer}"
             )
-            if self.pwd is None:
+            if not self.pwd:
                 sdp = self.webrtcbin.props.local_description.sdp.as_text()
                 self.extract_ufrag_pwd(sdp)
             ice_data = {}
             for media_type, candidates in self.local_candidates_buffer.items():
                 ice_data[media_type] = {
-                    "ufrag": self.ufrag,
-                    "pwd": self.pwd,
+                    "ufrag": self.ufrag[media_type],
+                    "pwd": self.pwd[media_type],
                     "candidates": candidates,
                 }
             await self.bridge.ice_candidates_add(
@@ -949,9 +1055,13 @@
         )
         assert "VP8" in payload_types
         assert "OPUS" in payload_types
-        await self.setup_call(
-            "responder", audio_pt=payload_types["OPUS"], video_pt=payload_types["VP8"]
-        )
+        try:
+            await self.setup_call(
+                "responder", audio_pt=payload_types["OPUS"], video_pt=payload_types["VP8"]
+            )
+        except Exception:
+            log.exception("Can't setup call")
+            raise
         self.start_pipeline()
         offer = GstWebRTC.WebRTCSessionDescription.new(
             GstWebRTC.WebRTCSDPType.OFFER, offer_sdp_msg
@@ -986,8 +1096,12 @@
         else:
             sdp = self.webrtcbin.props.local_description.sdp.as_text()
             assert sdp is not None
-            ufrag, pwd = self.extract_ufrag_pwd(sdp)
-            ice_data = {"ufrag": ufrag, "pwd": pwd, "candidates": [parsed_candidate]}
+            self.extract_ufrag_pwd(sdp)
+            ice_data = {
+                "ufrag": self.ufrag[media_type],
+                "pwd": self.pwd[media_type],
+                "candidates": [parsed_candidate]
+            }
             self._a_call(
                 self.bridge.ice_candidates_add,
                 self.sid,
@@ -1096,6 +1210,9 @@
         self.desktop_sharing_data = {"path": str(screenshare_data["node_id"])}
         self.do_desktop_switch(desktop_active)
 
+    def on_portal_session_closed(self) -> None:
+        self.desktop_sharing = False
+
     def do_desktop_switch(self, desktop_active: bool) -> None:
         if self.video_muted:
             # Update the active source state but do not switch
@@ -1214,7 +1331,7 @@
             self.desktop_sink_pad = None
 
         if self.desktop_portal is not None:
-            self.desktop_portal.end_screenshare()
+            self.desktop_portal.end_session()
 
     async def end_call(self) -> None:
         """Stop streaming and clean instance"""
@@ -1249,9 +1366,7 @@
         self.webrtc.callee = callee
         self.on_call_setup_cb = on_call_setup_cb
         self.on_call_ended_cb = on_call_ended_cb
-        bridge.register_signal(
-            "ice_candidates_new", self.on_ice_candidates_new, "plugin"
-        )
+        bridge.register_signal("ice_candidates_new", self.on_ice_candidates_new, "plugin")
         bridge.register_signal("call_setup", self.on_call_setup, "plugin")
         bridge.register_signal("call_ended", self.on_call_ended, "plugin")
 
@@ -1328,5 +1443,9 @@
 
         To be used only if we are initiator
         """
-        await self.webrtc.setup_call("initiator")
+        try:
+            await self.webrtc.setup_call("initiator")
+        except Exception:
+            log.exception("Can't setup call")
+            raise
         self.webrtc.start_pipeline()