changeset 4205:17a8168966f9

frontends (tools/webrtc): implement screensharing for Wayland + bug fixes: - Freedesktop Desktop Screenshare port is now used when Wayland is detected (needs `xdg-desktop-portal` with the implementation corresponding to desktop environment). - Add a binding feature to feedback state to application (e.g. if desktop sharing is cancelled from desktop environment, or at portal's permission request level). - fix misnaming of video source (was wrongly named `camera` instead of `video`). - fix desktop sharing pad selection in `input-selector` when it has been added once, then removed, then added again. rel 434
author Goffi <goffi@goffi.org>
date Thu, 18 Jan 2024 23:29:25 +0100
parents 879bad48cc2d
children 0f8ea0768a3b
files libervia/frontends/tools/webrtc.py
diffstat 1 files changed, 279 insertions(+), 38 deletions(-) [+]
line wrap: on
line diff
--- a/libervia/frontends/tools/webrtc.py	Tue Jan 16 10:42:00 2024 +0100
+++ b/libervia/frontends/tools/webrtc.py	Thu Jan 18 23:29:25 2024 +0100
@@ -36,6 +36,7 @@
 from dataclasses import dataclass
 from datetime import datetime
 import logging
+from random import randint
 import re
 from typing import Callable
 from urllib.parse import quote_plus
@@ -61,12 +62,184 @@
 SINKS_TEST = "test"
 
 
+class ScreenshareError(Exception):
+    pass
+
+
 @dataclass
 class AppSinkData:
     local_video_cb: Callable
     remote_video_cb: Callable
 
 
+class DesktopPortal:
+
+    def __init__(self, webrtc: "WebRTC"):
+        import dbus
+        from dbus.mainloop.glib import DBusGMainLoop
+        # we want monitors + windows, see https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.ScreenCast.html#org-freedesktop-portal-screencast-availablesourcetypes
+        self.dbus = dbus
+        self.webrtc = webrtc
+        self.sources_type = dbus.UInt32(7)
+        DBusGMainLoop(set_as_default=True)
+        self.session_bus = dbus.SessionBus()
+        portal_object = self.session_bus.get_object(
+            'org.freedesktop.portal.Desktop',
+            '/org/freedesktop/portal/desktop'
+        )
+        self.screencast_interface = dbus.Interface(
+            portal_object,
+            'org.freedesktop.portal.ScreenCast'
+        )
+        self.session_interface = None
+        self.session_signal = None
+        self.handle_counter = 0
+        self.session_handle = None
+        self.stream_data: dict|None = None
+
+    @property
+    def handle_token(self):
+        self.handle_counter += 1
+        return f"libervia{self.handle_counter}"
+
+    def on_session_closed(self, details: dict) -> None:
+        if self.session_interface is not None:
+            self.session_interface = None
+            self.webrtc.desktop_sharing = False
+            if self.session_signal is not None:
+                self.session_signal.remove()
+                self.session_signal = None
+
+
+    async def dbus_call(self, method_name: str, *args) -> dict:
+        """Call a screenshare portal method
+
+        This method handle the signal response.
+        @param method_name: method to call
+        @param args: extra args
+            `handle_token` will be automatically added to the last arg (option dict)
+        @return: method result
+        """
+        if self.session_handle is not None:
+            self.end_screenshare()
+        method = getattr(self.screencast_interface, method_name)
+        options = args[-1]
+        reply_fut = asyncio.Future()
+        signal_fut = asyncio.Future()
+        # cf. https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.Request.html
+        handle_token = self.handle_token
+        sender = self.session_bus.get_unique_name().replace(".", "_")[1:]
+        path = f"/org/freedesktop/portal/desktop/request/{sender}/{handle_token}"
+        signal_match = None
+
+        def on_signal(response, results):
+            assert signal_match is not None
+            signal_match.remove()
+            if response == 0:
+                signal_fut.set_result(results)
+            elif response == 1:
+                signal_fut.set_exception(
+                    exceptions.CancelError("Cancelled by user.")
+                )
+            else:
+                signal_fut.set_exception(ScreenshareError(
+                    f"Can't get signal result"
+                ))
+
+        signal_match = self.session_bus.add_signal_receiver(
+            on_signal,
+            signal_name="Response",
+            dbus_interface="org.freedesktop.portal.Request",
+            path=path
+        )
+
+        options["handle_token"] = handle_token
+
+        method(
+            *args,
+            reply_handler=reply_fut.set_result,
+            error_handler=reply_fut.set_exception
+        )
+        try:
+            await reply_fut
+        except Exception as e:
+            raise ScreenshareError(f"Can't ask screenshare permission: {e}")
+        return await signal_fut
+
+    async def request_screenshare(self) -> dict:
+        session_data = await self.dbus_call(
+            "CreateSession",
+            {
+                "session_handle_token": str(randint(1, 2**32)),
+            }
+        )
+        try:
+            session_handle = session_data["session_handle"]
+        except KeyError:
+            raise ScreenshareError("Can't get session handle")
+        self.session_handle = session_handle
+
+
+        await self.dbus_call(
+            "SelectSources",
+            session_handle,
+            {
+                "multiple": True,
+                "types": self.sources_type,
+                "modal": True
+            }
+        )
+        screenshare_data = await self.dbus_call(
+            "Start",
+            session_handle,
+            "",
+            {}
+        )
+
+        session_object = self.session_bus.get_object(
+            'org.freedesktop.portal.Desktop',
+            session_handle
+        )
+        self.session_interface = self.dbus.Interface(
+            session_object,
+            'org.freedesktop.portal.Session'
+        )
+
+        self.session_signal = self.session_bus.add_signal_receiver(
+            self.on_session_closed,
+            signal_name="Closed",
+            dbus_interface="org.freedesktop.portal.Session",
+            path=session_handle
+        )
+
+        try:
+            node_id, stream_data = screenshare_data["streams"][0]
+            source_type = int(stream_data["source_type"])
+        except (IndexError, KeyError):
+            raise ScreenshareError("Can't parse stream data")
+        self.stream_data = stream_data = {
+            "session_handle": session_handle,
+            "node_id": node_id,
+            "source_type": source_type
+        }
+        try:
+            height = int(stream_data["size"][0])
+            weight = int(stream_data["size"][1])
+        except (IndexError, KeyError):
+            pass
+        else:
+            stream_data["size"] = (height, weight)
+
+        return self.stream_data
+
+    def end_screenshare(self) -> None:
+        """Close a running screenshare session, if any."""
+        if self.session_interface is None:
+            return
+        self.session_interface.Close()
+        self.on_session_closed({})
+
+
 class WebRTC:
     """GSTreamer based WebRTC implementation for audio and video communication.
 
@@ -100,6 +273,10 @@
                 "appsink_data can only be used for SINKS_APP sinks"
             )
         self.reset_cb = reset_cb
+        if current_server == display_servers.WAYLAND:
+            self.desktop_portal = DesktopPortal(self)
+        else:
+            self.desktop_portal = None
         self.reset_instance()
 
     @property
@@ -131,6 +308,12 @@
         if active != self._desktop_sharing:
             self._desktop_sharing = active
             self.on_desktop_switch(active)
+            try:
+                cb = self.bindings["desktop_sharing"]
+            except KeyError:
+                pass
+            else:
+                cb(active)
 
     @property
     def sdp_set(self):
@@ -161,6 +344,16 @@
             raise Exception("self._media_types_inv should not be None!")
         return self._media_types_inv
 
+    def bind(self, **kwargs: Callable) -> None:
+        self.bindings.clear()
+        for key, cb in kwargs.items():
+            if key not in ("desktop_sharing",):
+                raise ValueError(
+                    'Only "desktop_sharing" is currently allowed for binding'
+                )
+            self.bindings[key] = cb
+
+
     def generate_dot_file(
         self,
         filename: str = "pipeline",
@@ -341,6 +534,11 @@
         self._media_types_inv = None
         self.audio_valve = None
         self.video_valve = None
+        if self.desktop_portal is not None:
+            self.desktop_portal.end_screenshare()
+        self.desktop_sharing = False
+        self.desktop_sink_pad = None
+        self.bindings = {}
         if self.reset_cb is not None:
             self.reset_cb()
 
@@ -922,71 +1120,102 @@
         @param muted: True if video is muted.
         """
         if self.video_selector is not None:
-            current_source = None if muted else "desktop" if self.desktop_sharing else "camera"
+            current_source = None if muted else "desktop" if self.desktop_sharing else "video"
             self.switch_video_source(current_source)
             state = "muted" if muted else "unmuted"
             log.info(f"Video is now {state}")
 
     def on_desktop_switch(self, desktop_active: bool) -> None:
-        """Switches the video source between desktop and camera.
+        """Switches the video source between desktop and video.
+
+        @param desktop_active: True if desktop must be active. False for video.
+        """
+        if desktop_active and self.desktop_portal is not None:
+            aio.run_async(self.on_desktop_switch_portal(desktop_active))
+        else:
+            self.do_desktop_switch(desktop_active)
 
-        @param desktop_active: True if desktop must be active. False for camera.
-        """
+    async def on_desktop_switch_portal(self, desktop_active: bool) -> None:
+        """Call freedesktop screenshare portal and the activate the shared stream"""
+        assert self.desktop_portal is not None
+        try:
+            screenshare_data = await self.desktop_portal.request_screenshare()
+        except exceptions.CancelError:
+            self.desktop_sharing = False
+            return
+        self.desktop_sharing_data = {
+            "path": str(screenshare_data["node_id"])
+        }
+        self.do_desktop_switch(desktop_active)
+
+    def do_desktop_switch(self, desktop_active: bool) -> None:
         if self.video_muted:
             # Update the active source state but do not switch
             self.desktop_sharing = desktop_active
             return
 
-        source = "desktop" if desktop_active else "camera"
+        source = "desktop" if desktop_active else "video"
         self.switch_video_source(source)
         self.desktop_sharing = desktop_active
 
     def switch_video_source(self, source: str|None) -> None:
-         """Activates the specified source while deactivating the others.
+        """Activates the specified source while deactivating the others.
 
-         @param source: 'desktop', 'camera', 'muted' or None for muted source.
-         """
-         if source is None:
-             source = "muted"
-         if source not in ["camera", "muted", "desktop"]:
-             raise ValueError(
-                 f"Invalid source: {source!r}, use one of {'camera', 'muted', 'desktop'}"
-             )
+        @param source: 'desktop', 'video', 'muted' or None for muted source.
+        """
+        if source is None:
+            source = "muted"
+        if source not in ["video", "muted", "desktop"]:
+            raise ValueError(
+                f"Invalid source: {source!r}, use one of {'video', 'muted', 'desktop'}"
+            )
 
-         self.pipeline.set_state(Gst.State.PAUSED)
+        self.pipeline.set_state(Gst.State.PAUSED)
 
-         # Create a new desktop source if necessary
-         if source == "desktop":
-             self._setup_desktop_source(self.desktop_sharing_data)
+        # Create a new desktop source if necessary
+        if source == "desktop":
+            self._setup_desktop_source(self.desktop_sharing_data)
 
-         # Activate the chosen source and deactivate the others
-         for src_name in ["camera", "muted", "desktop"]:
-             src_element = self.pipeline.get_by_name(f"{src_name}_src")
-             if src_name == source:
-                 if src_element:
-                     src_element.set_state(Gst.State.PLAYING)
-             else:
-                 if src_element:
-                     src_element.set_state(Gst.State.NULL)
-                     if src_name == "desktop":
-                         self._remove_desktop_source(src_element)
+        # Activate the chosen source and deactivate the others
+        for src_name in ["video", "muted", "desktop"]:
+            src_element = self.pipeline.get_by_name(f"{src_name}_src")
+            if src_name == source:
+                if src_element:
+                    src_element.set_state(Gst.State.PLAYING)
+            else:
+                if src_element:
+                    if src_name == "desktop":
+                        self._remove_desktop_source(src_element)
+                    else:
+                        src_element.set_state(Gst.State.NULL)
 
-         # Set the video_selector active pad
-         pad_name = f"sink_{['camera', 'muted', 'desktop'].index(source)}"
-         pad = self.video_selector.get_static_pad(pad_name)
-         self.video_selector.props.active_pad = pad
-         self.pipeline.set_state(Gst.State.PLAYING)
+        # Set the video_selector active pad
+        if source == "desktop":
+            if self.desktop_sink_pad:
+                pad = self.desktop_sink_pad
+            else:
+               log.error(f"No desktop pad available")
+               pad = None
+        else:
+            pad_name = f"sink_{['video', 'muted'].index(source)}"
+            pad = self.video_selector.get_static_pad(pad_name)
+
+        if pad is not None:
+           self.video_selector.props.active_pad = pad
+
+        self.pipeline.set_state(Gst.State.PLAYING)
 
     def _setup_desktop_source(self, properties: dict[str, object]|None) -> None:
         """Set up a new desktop source.
 
         @param properties: The properties to set on the desktop source.
         """
-        desktop_src = Gst.ElementFactory.make("ximagesrc", "desktop_src")
+        source_elt = "ximagesrc" if self.desktop_portal is None else "pipewiresrc"
+        desktop_src = Gst.ElementFactory.make(source_elt, "desktop_src")
         if properties is None:
             properties = {}
         for key, value in properties.items():
-            log.debug(f"setting ximagesrc property: {key!r}={value!r}")
+            log.debug(f"setting {source_elt} property: {key!r}={value!r}")
             desktop_src.set_property(key, value)
         video_convert = Gst.ElementFactory.make("videoconvert", "desktop_videoconvert")
         queue = Gst.ElementFactory.make("queue", "desktop_queue")
@@ -1000,9 +1229,9 @@
         video_convert.link(queue)
 
         sink_pad_template = self.video_selector.get_pad_template("sink_%u")
-        sink_pad = self.video_selector.request_pad(sink_pad_template, None, None)
+        self.desktop_sink_pad = self.video_selector.request_pad(sink_pad_template, None, None)
         queue_src_pad = queue.get_static_pad("src")
-        queue_src_pad.link(sink_pad)
+        queue_src_pad.link(self.desktop_sink_pad)
 
         desktop_src.sync_state_with_parent()
         video_convert.sync_state_with_parent()
@@ -1016,15 +1245,27 @@
         # Remove elements for the desktop source
         video_convert = self.pipeline.get_by_name("desktop_videoconvert")
         queue = self.pipeline.get_by_name("desktop_queue")
+
         if video_convert:
             video_convert.set_state(Gst.State.NULL)
             desktop_src.unlink(video_convert)
             self.pipeline.remove(video_convert)
+
         if queue:
             queue.set_state(Gst.State.NULL)
             self.pipeline.remove(queue)
+
+        desktop_src.set_state(Gst.State.NULL)
         self.pipeline.remove(desktop_src)
 
+        # Release the pad associated with the desktop source
+        if self.desktop_sink_pad:
+            self.video_selector.release_request_pad(self.desktop_sink_pad)
+            self.desktop_sink_pad = None
+
+        if self.desktop_portal is not None:
+            self.desktop_portal.end_screenshare()
+
     async def end_call(self) -> None:
         """Stop streaming and clean instance"""
         self.reset_instance()