view libervia/frontends/tools/portal_desktop.py @ 4242:8acf46ed7f36

frontends: remote control implementation: This is the frontends common part of remote control implementation. It handle the creation of WebRTC session, and management of inputs. For now the reception use freedesktop.org Desktop portal, and works mostly with Wayland based Desktop Environments. rel 436
author Goffi <goffi@goffi.org>
date Sat, 11 May 2024 13:52:43 +0200
parents 79c8a70e1813
children 0d7bb4df2343
line wrap: on
line source

#!/usr/bin/env python3

# Libervia freedesktop portal management module
# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Callable, Literal, overload
from libervia.backend.core import exceptions

import asyncio
import logging
from random import randint
import dbus
from dbus.mainloop.glib import DBusGMainLoop


log = logging.getLogger(__name__)


class PortalError(Exception):
    pass


class DesktopPortal:

    def __init__(self, on_session_closed_cb: Callable | None = None):
        # we want monitors + windows, see https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.ScreenCast.html#org-freedesktop-portal-screencast-availablesourcetypes
        self.dbus = dbus
        self.on_session_closed_cb = on_session_closed_cb
        self.sources_type = dbus.UInt32(7)
        DBusGMainLoop(set_as_default=True)
        self.session_bus = dbus.SessionBus()
        portal_object = self.session_bus.get_object(
            "org.freedesktop.portal.Desktop", "/org/freedesktop/portal/desktop"
        )
        self.screencast_interface = dbus.Interface(
            portal_object, "org.freedesktop.portal.ScreenCast"
        )
        self.remote_desktop_interface = dbus.Interface(
            portal_object, "org.freedesktop.portal.RemoteDesktop"
        )
        self.session_interface = None
        self.session_signal = None
        self.handle_counter = 0
        self.session_handle = None

    @property
    def handle_token(self):
        self.handle_counter += 1
        return f"libervia{self.handle_counter}"

    def on_session_closed(self, details: dict) -> None:
        if self.session_interface is not None:
            self.session_interface = None
            if self.on_session_closed_cb is not None:
                self.on_session_closed_cb()
            if self.session_signal is not None:
                self.session_signal.remove()
                self.session_signal = None

    @overload
    async def dbus_call(
        self,
        interface: dbus.Interface,
        method_name: str,
        response: Literal[False],
        **kwargs,
    ) -> None:
       ...

    @overload
    async def dbus_call(
        self,
        interface: dbus.Interface,
        method_name: str,
        response: Literal[True],
        **kwargs,
    ) -> dict:
       ...


    async def dbus_call(
        self,
        interface: dbus.Interface,
        method_name: str,
        response: bool,
        **kwargs,
    ) -> dict|None:
        """Call a portal method

        This method handle the signal response.
        @param method_name: method to call
        @param response: True if the method expect a response.
            If True, the method will await responde from
            ``org.freedesktop.portal.Request``'s ``Response`` signal.
        @param kwargs: method args.
            ``handle_token`` will be automatically added to ``options`` dict.
        @return: method result
        """
        method = getattr(interface, method_name)
        try:
            options = kwargs["options"]
        except KeyError:
            raise exceptions.InternalError('"options" key must be present.')
        reply_fut = asyncio.Future()
        signal_fut = asyncio.Future()
        # cf. https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.Request.html
        handle_token = self.handle_token
        sender = self.session_bus.get_unique_name().replace(".", "_")[1:]
        path = f"/org/freedesktop/portal/desktop/request/{sender}/{handle_token}"
        signal_match = None

        def on_signal(response, results):
            print(f"on_signal responde {response=}")
            assert signal_match is not None
            signal_match.remove()
            if response == 0:
                signal_fut.set_result(results)
            elif response == 1:
                signal_fut.set_exception(exceptions.CancelError("Cancelled by user."))
            else:
                signal_fut.set_exception(PortalError("Can't get signal result"))

        if response:
            signal_match = self.session_bus.add_signal_receiver(
                on_signal,
                signal_name="Response",
                dbus_interface="org.freedesktop.portal.Request",
                path=path,
            )

        options["handle_token"] = handle_token

        method(
            *kwargs.values(),
            reply_handler=lambda ret=None: reply_fut.set_result(ret),
            error_handler=reply_fut.set_exception,
        )
        try:
            await reply_fut
        except Exception as e:
            raise PortalError(f"Can't ask portal permission: {e}")
        if response:
            return await signal_fut

    async def create_session(
        self,
        interface: dbus.Interface,
    ) -> dict:
        """Create a new session and store its handle.

        This method creates a new session using the freedesktop portal's CreateSession
        dbus call. It then registers the session handle in this object for further use.

        @param None
        @return: A dictionary containing the session data, including the session handle.
        @raise PortalError: If there is an error getting the session handle.
        """
        if self.session_handle is not None:
            self.end_session()
        session_data = await self.dbus_call(
            interface,
            "CreateSession",
            response=True,
            options={
                "session_handle_token": str(randint(1, 2**32)),
            },
        )
        try:
            session_handle = session_data["session_handle"]
        except KeyError:
            raise PortalError("Can't get session handle")
        self.session_handle = session_handle
        return session_data

    def parse_streams(self, session_handle, screenshare_data: dict) -> dict:
        """Fill and returns stream_data from screenshare_data"""
        try:
            node_id, shared_stream_data = screenshare_data["streams"][0]
            source_type = int(shared_stream_data["source_type"])
        except (IndexError, KeyError):
            raise exceptions.NotFound("No stream data found.")
        stream_data = {
            "session_handle": session_handle,
            "node_id": node_id,
            "source_type": source_type,
        }
        try:
            height = int(shared_stream_data["size"][0])
            weight = int(shared_stream_data["size"][1])
        except (IndexError, KeyError):
            pass
        else:
            stream_data["size"] = (height, weight)
        return stream_data

    async def request_screenshare(self) -> dict:
        await self.create_session(self.screencast_interface)
        session_handle = self.session_handle

        await self.dbus_call(
            self.screencast_interface,
            "SelectSources",
            response=True,
            session_handle=session_handle,
            options={"multiple": True, "types": self.sources_type},
        )
        screenshare_data = await self.dbus_call(
            self.screencast_interface,
            "Start",
            response=True,
            session_handle=session_handle,
            parent_window="",
            options={},
        )

        session_object = self.session_bus.get_object(
            "org.freedesktop.portal.Desktop", session_handle
        )
        self.session_interface = self.dbus.Interface(
            session_object, "org.freedesktop.portal.Session"
        )

        self.session_signal = self.session_bus.add_signal_receiver(
            self.on_session_closed,
            signal_name="Closed",
            dbus_interface="org.freedesktop.portal.Session",
            path=session_handle,
        )

        try:
            return self.parse_streams(session_handle, screenshare_data)
        except exceptions.NotFound:
            raise PortalError("Can't parse stream data")

    async def request_remote_desktop(self, with_screen_sharing: bool = True) -> dict:
        """Request autorisation to remote control desktop.

        @param with_screen_sharing: True if screen must be shared.
        """
        await self.create_session(self.remote_desktop_interface)
        session_handle = self.session_handle

        if with_screen_sharing:
            await self.dbus_call(
                self.screencast_interface,
                "SelectSources",
                response=False,
                session_handle=session_handle,
                options={
                    "multiple": True,
                    "types": self.sources_type,
                    # hidden cursor (should be the default, but cursor appears during
                    # tests))
                    "cursor_mode": dbus.UInt32(1)
                },
            )

        await self.dbus_call(
            self.remote_desktop_interface,
            "SelectDevices",
            response=False,
            session_handle=session_handle,
            options={
                "types": dbus.UInt32(3),
                # "persist_mode": dbus.UInt32(1),
            },
        )

        remote_desktop_data = await self.dbus_call(
            self.remote_desktop_interface,
            "Start",
            response=True,
            session_handle=session_handle,
            parent_window="",
            options={},
        )
        try:
            stream_data = self.parse_streams(session_handle, remote_desktop_data)
        except exceptions.NotFound:
            pass
        else:
            remote_desktop_data["stream_data"] = stream_data

        session_object = self.session_bus.get_object(
            "org.freedesktop.portal.Desktop", session_handle
        )
        self.session_interface = self.dbus.Interface(
            session_object, "org.freedesktop.portal.Session"
        )

        self.session_signal = self.session_bus.add_signal_receiver(
            self.on_session_closed,
            signal_name="Closed",
            dbus_interface="org.freedesktop.portal.Session",
            path=session_handle,
        )

        return remote_desktop_data

    def end_session(self) -> None:
        """Close a running screenshare session, if any."""
        if self.session_interface is None:
            return
        self.session_interface.Close()
        self.on_session_closed({})

    async def notify_pointer_motion(self, dx: int, dy: int) -> None:
        """
        Notify about a new relative pointer motion event.

        @param dx: Relative movement on the x axis
        @param dy: Relative movement on the y axis
        """
        await self.dbus_call(
            self.remote_desktop_interface,
            "NotifyPointerMotion",
            response=False,
            session_handle=self.session_handle,
            options={},
            dx=dx,
            dy=dy,
        )

    async def notify_pointer_motion_absolute(
        self, stream: int, x: float, y: float
    ) -> None:
        """
        Notify about a new absolute pointer motion event.

        @param stream: The PipeWire stream node the coordinate is relative to
        @param x: Pointer motion x coordinate
        @param y: Pointer motion y coordinate
        """
        await self.dbus_call(
            self.remote_desktop_interface,
            "NotifyPointerMotionAbsolute",
            response=False,
            session_handle=self.session_handle,
            options={},
            stream=stream,
            x=x,
            y=y,
        )

    async def notify_pointer_button(self, button: int, state: int) -> None:
        """
        Notify about a new pointer button event.

        @param button: The pointer button was pressed or released
        @param state: The new state of the button
        """
        await self.dbus_call(
            self.remote_desktop_interface,
            "NotifyPointerButton",
            response=False,
            session_handle=self.session_handle,
            options={},
            button=button,
            state=state,
        )

    async def notify_pointer_axis(self, dx: float, dy: float) -> None:
        """
        Notify about a new pointer axis event.

        @param dx: Relative axis movement on the x axis
        @param dy: Relative axis movement on the y axis
        """
        await self.dbus_call(
            self.remote_desktop_interface,
            "NotifyPointerAxis",
            response=False,
            session_handle=self.session_handle,
            options={},
            dx=dx,
            dy=dy,
        )

    async def notify_pointer_axis_discrete(self, axis: int, steps: int) -> None:
        """
        Notify about a new pointer axis discrete event.

        @param axis: The axis that was scrolled
            0 for vertical
            1 for horizontal
        @param steps: The number of steps scrolled
        """
        await self.dbus_call(
            self.remote_desktop_interface,
            "NotifyPointerAxisDiscrete",
            response=False,
            session_handle=self.session_handle,
            options={},
            axis=axis,
            steps=steps,
        )

    async def notify_keyboard_keycode(self, keycode: int, state: int) -> None:
        """
        Notify about a new keyboard keycode event.

        @param keycode: Keyboard code that was pressed or released
        @param state: New state of keyboard keycode
        """
        await self.dbus_call(
            self.remote_desktop_interface,
            "NotifyKeyboardKeycode",
            response=False,
            session_handle=self.session_handle,
            options={},
            keycode=keycode,
            state=state,
        )

    async def notify_keyboard_keysym(self, keysym: int, state: int) -> None:
        """
        Notify about a new keyboard keysym event.

        @param keysym: Keyboard symbol that was pressed or released
        @param state: New state of keyboard keysym
        """
        await self.dbus_call(
            self.remote_desktop_interface,
            "NotifyKeyboardKeysym",
            response=False,
            session_handle=self.session_handle,
            options={},
            keysym=keysym,
            state=state,
        )

    async def notify_touch_down(self, stream: int, slot: int, x: int, y: int) -> None:
        """
        Notify about a new touch down event.

        @param stream: The PipeWire stream node the coordinate is relative to
        @param slot: Touch slot where touch point appeared
        @param x: Touch down x coordinate
        @param y: Touch down y coordinate
        """
        await self.dbus_call(
            self.remote_desktop_interface,
            "NotifyTouchDown",
            response=False,
            session_handle=self.session_handle,
            options={},
            stream=stream,
            slot=slot,
            x=x,
            y=y,
        )

    async def notify_touch_motion(self, stream: int, slot: int, x: int, y: int) -> None:
        """
        Notify about a new touch motion event.

        @param stream: The PipeWire stream node the coordinate is relative to
        @param slot: Touch slot where touch point appeared
        @param x: Touch motion x coordinate
        @param y: Touch motion y coordinate
        """
        await self.dbus_call(
            self.remote_desktop_interface,
            "NotifyTouchMotion",
            response=False,
            session_handle=self.session_handle,
            options={},
            stream=stream,
            slot=slot,
            x=x,
            y=y,
        )

    async def notify_touch_up(self, slot: int) -> None:
        """
        Notify about a new touch up event.

        @param slot: Touch slot where touch point appeared
        """
        await self.dbus_call(
            self.remote_desktop_interface,
            "NotifyTouchUp",
            response=False,
            session_handle=self.session_handle,
            options={},
            slot=slot,
        )