diff libervia/frontends/tools/portal_desktop.py @ 4240:79c8a70e1813

backend, frontend: prepare remote control: This is a series of changes necessary to prepare the implementation of remote control feature: - XEP-0166: add a `priority` attribute to `ApplicationData`: this is needed when several applications are working in a same session, to know which one must be handled first. Will be used to make Remote Control have precedence over Call content. - XEP-0166: `_call_plugins` is now async and is not used with `DeferredList` anymore: the benefit to have methods called in parallels is very low, and it cause a lot of trouble as we can't predict order. Methods are now called sequentially so workflow can be predicted. - XEP-0167: fix `senders` XMPP attribute <=> SDP mapping - XEP-0234: preflight acceptance key is now `pre-accepted` instead of `file-accepted`, so the same key can be used with other jingle applications. - XEP-0167, XEP-0343: move some method to XEP-0167 - XEP-0353: use new `priority` feature to call preflight methods of applications according to it. - frontend (webrtc): refactor the sources/sink handling with a more flexible mechanism based on Pydantic models. It is now possible to have has many Data Channel as necessary, to have them in addition to A/V streams, to specify manually GStreamer sources and sinks, etc. - frontend (webrtc): rework of the pipeline to reduce latency. - frontend: new `portal_desktop` method. Screenshare portal handling has been moved there, and RemoteDesktop portal has been added. - frontend (webrtc): fix `extract_ufrag_pwd` method. rel 436
author Goffi <goffi@goffi.org>
date Sat, 11 May 2024 13:52:41 +0200
parents libervia/frontends/tools/webrtc_screenshare.py@d01b8d002619
children 0d7bb4df2343
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/frontends/tools/portal_desktop.py	Sat May 11 13:52:41 2024 +0200
@@ -0,0 +1,500 @@
+#!/usr/bin/env python3
+
+# Libervia freedesktop portal management module
+# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from typing import Callable, Literal, overload
+from libervia.backend.core import exceptions
+
+import asyncio
+import logging
+from random import randint
+import dbus
+from dbus.mainloop.glib import DBusGMainLoop
+
+
+log = logging.getLogger(__name__)
+
+
+class PortalError(Exception):
+    pass
+
+
+class DesktopPortal:
+
+    def __init__(self, on_session_closed_cb: Callable | None = None):
+        # we want monitors + windows, see https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.ScreenCast.html#org-freedesktop-portal-screencast-availablesourcetypes
+        self.dbus = dbus
+        self.on_session_closed_cb = on_session_closed_cb
+        self.sources_type = dbus.UInt32(7)
+        DBusGMainLoop(set_as_default=True)
+        self.session_bus = dbus.SessionBus()
+        portal_object = self.session_bus.get_object(
+            "org.freedesktop.portal.Desktop", "/org/freedesktop/portal/desktop"
+        )
+        self.screencast_interface = dbus.Interface(
+            portal_object, "org.freedesktop.portal.ScreenCast"
+        )
+        self.remote_desktop_interface = dbus.Interface(
+            portal_object, "org.freedesktop.portal.RemoteDesktop"
+        )
+        self.session_interface = None
+        self.session_signal = None
+        self.handle_counter = 0
+        self.session_handle = None
+
+    @property
+    def handle_token(self):
+        self.handle_counter += 1
+        return f"libervia{self.handle_counter}"
+
+    def on_session_closed(self, details: dict) -> None:
+        if self.session_interface is not None:
+            self.session_interface = None
+            if self.on_session_closed_cb is not None:
+                self.on_session_closed_cb()
+            if self.session_signal is not None:
+                self.session_signal.remove()
+                self.session_signal = None
+
+    @overload
+    async def dbus_call(
+        self,
+        interface: dbus.Interface,
+        method_name: str,
+        response: Literal[False],
+        **kwargs,
+    ) -> None:
+       ...
+
+    @overload
+    async def dbus_call(
+        self,
+        interface: dbus.Interface,
+        method_name: str,
+        response: Literal[True],
+        **kwargs,
+    ) -> dict:
+       ...
+
+
+    async def dbus_call(
+        self,
+        interface: dbus.Interface,
+        method_name: str,
+        response: bool,
+        **kwargs,
+    ) -> dict|None:
+        """Call a portal method
+
+        This method handle the signal response.
+        @param method_name: method to call
+        @param response: True if the method expect a response.
+            If True, the method will await responde from
+            ``org.freedesktop.portal.Request``'s ``Response`` signal.
+        @param kwargs: method args.
+            ``handle_token`` will be automatically added to ``options`` dict.
+        @return: method result
+        """
+        method = getattr(interface, method_name)
+        try:
+            options = kwargs["options"]
+        except KeyError:
+            raise exceptions.InternalError('"options" key must be present.')
+        reply_fut = asyncio.Future()
+        signal_fut = asyncio.Future()
+        # cf. https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.Request.html
+        handle_token = self.handle_token
+        sender = self.session_bus.get_unique_name().replace(".", "_")[1:]
+        path = f"/org/freedesktop/portal/desktop/request/{sender}/{handle_token}"
+        signal_match = None
+
+        def on_signal(response, results):
+            print(f"on_signal responde {response=}")
+            assert signal_match is not None
+            signal_match.remove()
+            if response == 0:
+                signal_fut.set_result(results)
+            elif response == 1:
+                signal_fut.set_exception(exceptions.CancelError("Cancelled by user."))
+            else:
+                signal_fut.set_exception(PortalError("Can't get signal result"))
+
+        if response:
+            signal_match = self.session_bus.add_signal_receiver(
+                on_signal,
+                signal_name="Response",
+                dbus_interface="org.freedesktop.portal.Request",
+                path=path,
+            )
+
+        options["handle_token"] = handle_token
+
+        method(
+            *kwargs.values(),
+            reply_handler=lambda ret=None: reply_fut.set_result(ret),
+            error_handler=reply_fut.set_exception,
+        )
+        try:
+            await reply_fut
+        except Exception as e:
+            raise PortalError(f"Can't ask portal permission: {e}")
+        if response:
+            return await signal_fut
+
+    async def create_session(
+        self,
+        interface: dbus.Interface,
+    ) -> dict:
+        """Create a new session and store its handle.
+
+        This method creates a new session using the freedesktop portal's CreateSession
+        dbus call. It then registers the session handle in this object for further use.
+
+        @param None
+        @return: A dictionary containing the session data, including the session handle.
+        @raise PortalError: If there is an error getting the session handle.
+        """
+        if self.session_handle is not None:
+            self.end_session()
+        session_data = await self.dbus_call(
+            interface,
+            "CreateSession",
+            response=True,
+            options={
+                "session_handle_token": str(randint(1, 2**32)),
+            },
+        )
+        try:
+            session_handle = session_data["session_handle"]
+        except KeyError:
+            raise PortalError("Can't get session handle")
+        self.session_handle = session_handle
+        return session_data
+
+    def parse_streams(self, session_handle, screenshare_data: dict) -> dict:
+        """Fill and returns stream_data from screenshare_data"""
+        try:
+            node_id, shared_stream_data = screenshare_data["streams"][0]
+            source_type = int(shared_stream_data["source_type"])
+        except (IndexError, KeyError):
+            raise exceptions.NotFound("No stream data found.")
+        stream_data = {
+            "session_handle": session_handle,
+            "node_id": node_id,
+            "source_type": source_type,
+        }
+        try:
+            height = int(shared_stream_data["size"][0])
+            weight = int(shared_stream_data["size"][1])
+        except (IndexError, KeyError):
+            pass
+        else:
+            stream_data["size"] = (height, weight)
+        return stream_data
+
+    async def request_screenshare(self) -> dict:
+        await self.create_session(self.screencast_interface)
+        session_handle = self.session_handle
+
+        await self.dbus_call(
+            self.screencast_interface,
+            "SelectSources",
+            response=True,
+            session_handle=session_handle,
+            options={"multiple": True, "types": self.sources_type},
+        )
+        screenshare_data = await self.dbus_call(
+            self.screencast_interface,
+            "Start",
+            response=True,
+            session_handle=session_handle,
+            parent_window="",
+            options={},
+        )
+
+        session_object = self.session_bus.get_object(
+            "org.freedesktop.portal.Desktop", session_handle
+        )
+        self.session_interface = self.dbus.Interface(
+            session_object, "org.freedesktop.portal.Session"
+        )
+
+        self.session_signal = self.session_bus.add_signal_receiver(
+            self.on_session_closed,
+            signal_name="Closed",
+            dbus_interface="org.freedesktop.portal.Session",
+            path=session_handle,
+        )
+
+        try:
+            return self.parse_streams(session_handle, screenshare_data)
+        except exceptions.NotFound:
+            raise PortalError("Can't parse stream data")
+
+    async def request_remote_desktop(self, with_screen_sharing: bool = True) -> dict:
+        """Request autorisation to remote control desktop.
+
+        @param with_screen_sharing: True if screen must be shared.
+        """
+        await self.create_session(self.remote_desktop_interface)
+        session_handle = self.session_handle
+
+        if with_screen_sharing:
+            await self.dbus_call(
+                self.screencast_interface,
+                "SelectSources",
+                response=False,
+                session_handle=session_handle,
+                options={
+                    "multiple": True,
+                    "types": self.sources_type,
+                    # hidden cursor (should be the default, but cursor appears during
+                    # tests))
+                    "cursor_mode": dbus.UInt32(1)
+                },
+            )
+
+        await self.dbus_call(
+            self.remote_desktop_interface,
+            "SelectDevices",
+            response=False,
+            session_handle=session_handle,
+            options={
+                "types": dbus.UInt32(3),
+                # "persist_mode": dbus.UInt32(1),
+            },
+        )
+
+        remote_desktop_data = await self.dbus_call(
+            self.remote_desktop_interface,
+            "Start",
+            response=True,
+            session_handle=session_handle,
+            parent_window="",
+            options={},
+        )
+        try:
+            stream_data = self.parse_streams(session_handle, remote_desktop_data)
+        except exceptions.NotFound:
+            pass
+        else:
+            remote_desktop_data["stream_data"] = stream_data
+
+        session_object = self.session_bus.get_object(
+            "org.freedesktop.portal.Desktop", session_handle
+        )
+        self.session_interface = self.dbus.Interface(
+            session_object, "org.freedesktop.portal.Session"
+        )
+
+        self.session_signal = self.session_bus.add_signal_receiver(
+            self.on_session_closed,
+            signal_name="Closed",
+            dbus_interface="org.freedesktop.portal.Session",
+            path=session_handle,
+        )
+
+        return remote_desktop_data
+
+    def end_session(self) -> None:
+        """Close a running screenshare session, if any."""
+        if self.session_interface is None:
+            return
+        self.session_interface.Close()
+        self.on_session_closed({})
+
+    async def notify_pointer_motion(self, dx: int, dy: int) -> None:
+        """
+        Notify about a new relative pointer motion event.
+
+        @param dx: Relative movement on the x axis
+        @param dy: Relative movement on the y axis
+        """
+        await self.dbus_call(
+            self.remote_desktop_interface,
+            "NotifyPointerMotion",
+            response=False,
+            session_handle=self.session_handle,
+            options={},
+            dx=dx,
+            dy=dy,
+        )
+
+    async def notify_pointer_motion_absolute(
+        self, stream: int, x: float, y: float
+    ) -> None:
+        """
+        Notify about a new absolute pointer motion event.
+
+        @param stream: The PipeWire stream node the coordinate is relative to
+        @param x: Pointer motion x coordinate
+        @param y: Pointer motion y coordinate
+        """
+        await self.dbus_call(
+            self.remote_desktop_interface,
+            "NotifyPointerMotionAbsolute",
+            response=False,
+            session_handle=self.session_handle,
+            options={},
+            stream=stream,
+            x=x,
+            y=y,
+        )
+
+    async def notify_pointer_button(self, button: int, state: int) -> None:
+        """
+        Notify about a new pointer button event.
+
+        @param button: The pointer button was pressed or released
+        @param state: The new state of the button
+        """
+        await self.dbus_call(
+            self.remote_desktop_interface,
+            "NotifyPointerButton",
+            response=False,
+            session_handle=self.session_handle,
+            options={},
+            button=button,
+            state=state,
+        )
+
+    async def notify_pointer_axis(self, dx: float, dy: float) -> None:
+        """
+        Notify about a new pointer axis event.
+
+        @param dx: Relative axis movement on the x axis
+        @param dy: Relative axis movement on the y axis
+        """
+        await self.dbus_call(
+            self.remote_desktop_interface,
+            "NotifyPointerAxis",
+            response=False,
+            session_handle=self.session_handle,
+            options={},
+            dx=dx,
+            dy=dy,
+        )
+
+    async def notify_pointer_axis_discrete(self, axis: int, steps: int) -> None:
+        """
+        Notify about a new pointer axis discrete event.
+
+        @param axis: The axis that was scrolled
+            0 for vertical
+            1 for horizontal
+        @param steps: The number of steps scrolled
+        """
+        await self.dbus_call(
+            self.remote_desktop_interface,
+            "NotifyPointerAxisDiscrete",
+            response=False,
+            session_handle=self.session_handle,
+            options={},
+            axis=axis,
+            steps=steps,
+        )
+
+    async def notify_keyboard_keycode(self, keycode: int, state: int) -> None:
+        """
+        Notify about a new keyboard keycode event.
+
+        @param keycode: Keyboard code that was pressed or released
+        @param state: New state of keyboard keycode
+        """
+        await self.dbus_call(
+            self.remote_desktop_interface,
+            "NotifyKeyboardKeycode",
+            response=False,
+            session_handle=self.session_handle,
+            options={},
+            keycode=keycode,
+            state=state,
+        )
+
+    async def notify_keyboard_keysym(self, keysym: int, state: int) -> None:
+        """
+        Notify about a new keyboard keysym event.
+
+        @param keysym: Keyboard symbol that was pressed or released
+        @param state: New state of keyboard keysym
+        """
+        await self.dbus_call(
+            self.remote_desktop_interface,
+            "NotifyKeyboardKeysym",
+            response=False,
+            session_handle=self.session_handle,
+            options={},
+            keysym=keysym,
+            state=state,
+        )
+
+    async def notify_touch_down(self, stream: int, slot: int, x: int, y: int) -> None:
+        """
+        Notify about a new touch down event.
+
+        @param stream: The PipeWire stream node the coordinate is relative to
+        @param slot: Touch slot where touch point appeared
+        @param x: Touch down x coordinate
+        @param y: Touch down y coordinate
+        """
+        await self.dbus_call(
+            self.remote_desktop_interface,
+            "NotifyTouchDown",
+            response=False,
+            session_handle=self.session_handle,
+            options={},
+            stream=stream,
+            slot=slot,
+            x=x,
+            y=y,
+        )
+
+    async def notify_touch_motion(self, stream: int, slot: int, x: int, y: int) -> None:
+        """
+        Notify about a new touch motion event.
+
+        @param stream: The PipeWire stream node the coordinate is relative to
+        @param slot: Touch slot where touch point appeared
+        @param x: Touch motion x coordinate
+        @param y: Touch motion y coordinate
+        """
+        await self.dbus_call(
+            self.remote_desktop_interface,
+            "NotifyTouchMotion",
+            response=False,
+            session_handle=self.session_handle,
+            options={},
+            stream=stream,
+            slot=slot,
+            x=x,
+            y=y,
+        )
+
+    async def notify_touch_up(self, slot: int) -> None:
+        """
+        Notify about a new touch up event.
+
+        @param slot: Touch slot where touch point appeared
+        """
+        await self.dbus_call(
+            self.remote_desktop_interface,
+            "NotifyTouchUp",
+            response=False,
+            session_handle=self.session_handle,
+            options={},
+            slot=slot,
+        )