view libervia/frontends/tools/portal_desktop.py @ 4294:a0ed5c976bf8

component conferences, plugin XEP-0167, XEP-0298: add stream user metadata: A/V conference now adds user metadata about the stream it is forwarding through XEP-0298. This is parsed and added to metadata during confirmation on client side. rel 448
author Goffi <goffi@goffi.org>
date Tue, 06 Aug 2024 23:43:11 +0200
parents 0d7bb4df2343
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia freedesktop portal management module
# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Callable, Literal, overload
from libervia.backend.core import exceptions

import asyncio
import logging
from random import randint
import dbus
from dbus.mainloop.glib import DBusGMainLoop


log = logging.getLogger(__name__)


class PortalError(Exception):
    pass


class DesktopPortal:

    def __init__(self, on_session_closed_cb: Callable | None = None):
        # we want monitors + windows, see https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.ScreenCast.html#org-freedesktop-portal-screencast-availablesourcetypes
        self.dbus = dbus
        self.on_session_closed_cb = on_session_closed_cb
        self.sources_type = dbus.UInt32(7)
        DBusGMainLoop(set_as_default=True)
        self.session_bus = dbus.SessionBus()
        portal_object = self.session_bus.get_object(
            "org.freedesktop.portal.Desktop", "/org/freedesktop/portal/desktop"
        )
        self.screencast_interface = dbus.Interface(
            portal_object, "org.freedesktop.portal.ScreenCast"
        )
        self.remote_desktop_interface = dbus.Interface(
            portal_object, "org.freedesktop.portal.RemoteDesktop"
        )
        self.session_interface = None
        self.session_signal = None
        self.handle_counter = 0
        self.session_handle = None

    @property
    def handle_token(self):
        self.handle_counter += 1
        return f"libervia{self.handle_counter}"

    def on_session_closed(self, details: dict) -> None:
        if self.session_interface is not None:
            self.session_interface = None
            if self.on_session_closed_cb is not None:
                self.on_session_closed_cb()
            if self.session_signal is not None:
                self.session_signal.remove()
                self.session_signal = None

    @overload
    async def dbus_call(
        self,
        interface: dbus.Interface,
        method_name: str,
        response: Literal[False],
        **kwargs,
    ) -> None: ...

    @overload
    async def dbus_call(
        self,
        interface: dbus.Interface,
        method_name: str,
        response: Literal[True],
        **kwargs,
    ) -> dict: ...

    async def dbus_call(
        self,
        interface: dbus.Interface,
        method_name: str,
        response: bool,
        **kwargs,
    ) -> dict | None:
        """Call a portal method

        This method handle the signal response.
        @param method_name: method to call
        @param response: True if the method expect a response.
            If True, the method will await responde from
            ``org.freedesktop.portal.Request``'s ``Response`` signal.
        @param kwargs: method args.
            ``handle_token`` will be automatically added to ``options`` dict.
        @return: method result
        """
        method = getattr(interface, method_name)
        try:
            options = kwargs["options"]
        except KeyError:
            raise exceptions.InternalError('"options" key must be present.')
        reply_fut = asyncio.Future()
        signal_fut = asyncio.Future()
        # cf. https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.Request.html
        handle_token = self.handle_token
        sender = self.session_bus.get_unique_name().replace(".", "_")[1:]
        path = f"/org/freedesktop/portal/desktop/request/{sender}/{handle_token}"
        signal_match = None

        def on_signal(response, results):
            print(f"on_signal responde {response=}")
            assert signal_match is not None
            signal_match.remove()
            if response == 0:
                signal_fut.set_result(results)
            elif response == 1:
                signal_fut.set_exception(exceptions.CancelError("Cancelled by user."))
            else:
                signal_fut.set_exception(PortalError("Can't get signal result"))

        if response:
            signal_match = self.session_bus.add_signal_receiver(
                on_signal,
                signal_name="Response",
                dbus_interface="org.freedesktop.portal.Request",
                path=path,
            )

        options["handle_token"] = handle_token

        method(
            *kwargs.values(),
            reply_handler=lambda ret=None: reply_fut.set_result(ret),
            error_handler=reply_fut.set_exception,
        )
        try:
            await reply_fut
        except Exception as e:
            raise PortalError(f"Can't ask portal permission: {e}")
        if response:
            return await signal_fut

    async def create_session(
        self,
        interface: dbus.Interface,
    ) -> dict:
        """Create a new session and store its handle.

        This method creates a new session using the freedesktop portal's CreateSession
        dbus call. It then registers the session handle in this object for further use.

        @param None
        @return: A dictionary containing the session data, including the session handle.
        @raise PortalError: If there is an error getting the session handle.
        """
        if self.session_handle is not None:
            self.end_session()
        session_data = await self.dbus_call(
            interface,
            "CreateSession",
            response=True,
            options={
                "session_handle_token": str(randint(1, 2**32)),
            },
        )
        try:
            session_handle = session_data["session_handle"]
        except KeyError:
            raise PortalError("Can't get session handle")
        self.session_handle = session_handle
        return session_data

    def parse_streams(self, session_handle, screenshare_data: dict) -> dict:
        """Fill and returns stream_data from screenshare_data"""
        try:
            node_id, shared_stream_data = screenshare_data["streams"][0]
            source_type = int(shared_stream_data["source_type"])
        except (IndexError, KeyError):
            raise exceptions.NotFound("No stream data found.")
        stream_data = {
            "session_handle": session_handle,
            "node_id": node_id,
            "source_type": source_type,
        }
        try:
            height = int(shared_stream_data["size"][0])
            weight = int(shared_stream_data["size"][1])
        except (IndexError, KeyError):
            pass
        else:
            stream_data["size"] = (height, weight)
        return stream_data

    async def request_screenshare(self) -> dict:
        await self.create_session(self.screencast_interface)
        session_handle = self.session_handle

        await self.dbus_call(
            self.screencast_interface,
            "SelectSources",
            response=True,
            session_handle=session_handle,
            options={"multiple": True, "types": self.sources_type},
        )
        screenshare_data = await self.dbus_call(
            self.screencast_interface,
            "Start",
            response=True,
            session_handle=session_handle,
            parent_window="",
            options={},
        )

        session_object = self.session_bus.get_object(
            "org.freedesktop.portal.Desktop", session_handle
        )
        self.session_interface = self.dbus.Interface(
            session_object, "org.freedesktop.portal.Session"
        )

        self.session_signal = self.session_bus.add_signal_receiver(
            self.on_session_closed,
            signal_name="Closed",
            dbus_interface="org.freedesktop.portal.Session",
            path=session_handle,
        )

        try:
            return self.parse_streams(session_handle, screenshare_data)
        except exceptions.NotFound:
            raise PortalError("Can't parse stream data")

    async def request_remote_desktop(self, with_screen_sharing: bool = True) -> dict:
        """Request autorisation to remote control desktop.

        @param with_screen_sharing: True if screen must be shared.
        """
        await self.create_session(self.remote_desktop_interface)
        session_handle = self.session_handle

        if with_screen_sharing:
            await self.dbus_call(
                self.screencast_interface,
                "SelectSources",
                response=False,
                session_handle=session_handle,
                options={
                    "multiple": True,
                    "types": self.sources_type,
                    # hidden cursor (should be the default, but cursor appears during
                    # tests))
                    "cursor_mode": dbus.UInt32(1),
                },
            )

        await self.dbus_call(
            self.remote_desktop_interface,
            "SelectDevices",
            response=False,
            session_handle=session_handle,
            options={
                "types": dbus.UInt32(3),
                # "persist_mode": dbus.UInt32(1),
            },
        )

        remote_desktop_data = await self.dbus_call(
            self.remote_desktop_interface,
            "Start",
            response=True,
            session_handle=session_handle,
            parent_window="",
            options={},
        )
        try:
            stream_data = self.parse_streams(session_handle, remote_desktop_data)
        except exceptions.NotFound:
            pass
        else:
            remote_desktop_data["stream_data"] = stream_data

        session_object = self.session_bus.get_object(
            "org.freedesktop.portal.Desktop", session_handle
        )
        self.session_interface = self.dbus.Interface(
            session_object, "org.freedesktop.portal.Session"
        )

        self.session_signal = self.session_bus.add_signal_receiver(
            self.on_session_closed,
            signal_name="Closed",
            dbus_interface="org.freedesktop.portal.Session",
            path=session_handle,
        )

        return remote_desktop_data

    def end_session(self) -> None:
        """Close a running screenshare session, if any."""
        if self.session_interface is None:
            return
        self.session_interface.Close()
        self.on_session_closed({})

    async def notify_pointer_motion(self, dx: int, dy: int) -> None:
        """
        Notify about a new relative pointer motion event.

        @param dx: Relative movement on the x axis
        @param dy: Relative movement on the y axis
        """
        await self.dbus_call(
            self.remote_desktop_interface,
            "NotifyPointerMotion",
            response=False,
            session_handle=self.session_handle,
            options={},
            dx=dx,
            dy=dy,
        )

    async def notify_pointer_motion_absolute(
        self, stream: int, x: float, y: float
    ) -> None:
        """
        Notify about a new absolute pointer motion event.

        @param stream: The PipeWire stream node the coordinate is relative to
        @param x: Pointer motion x coordinate
        @param y: Pointer motion y coordinate
        """
        await self.dbus_call(
            self.remote_desktop_interface,
            "NotifyPointerMotionAbsolute",
            response=False,
            session_handle=self.session_handle,
            options={},
            stream=stream,
            x=x,
            y=y,
        )

    async def notify_pointer_button(self, button: int, state: int) -> None:
        """
        Notify about a new pointer button event.

        @param button: The pointer button was pressed or released
        @param state: The new state of the button
        """
        await self.dbus_call(
            self.remote_desktop_interface,
            "NotifyPointerButton",
            response=False,
            session_handle=self.session_handle,
            options={},
            button=button,
            state=state,
        )

    async def notify_pointer_axis(self, dx: float, dy: float) -> None:
        """
        Notify about a new pointer axis event.

        @param dx: Relative axis movement on the x axis
        @param dy: Relative axis movement on the y axis
        """
        await self.dbus_call(
            self.remote_desktop_interface,
            "NotifyPointerAxis",
            response=False,
            session_handle=self.session_handle,
            options={},
            dx=dx,
            dy=dy,
        )

    async def notify_pointer_axis_discrete(self, axis: int, steps: int) -> None:
        """
        Notify about a new pointer axis discrete event.

        @param axis: The axis that was scrolled
            0 for vertical
            1 for horizontal
        @param steps: The number of steps scrolled
        """
        await self.dbus_call(
            self.remote_desktop_interface,
            "NotifyPointerAxisDiscrete",
            response=False,
            session_handle=self.session_handle,
            options={},
            axis=axis,
            steps=steps,
        )

    async def notify_keyboard_keycode(self, keycode: int, state: int) -> None:
        """
        Notify about a new keyboard keycode event.

        @param keycode: Keyboard code that was pressed or released
        @param state: New state of keyboard keycode
        """
        await self.dbus_call(
            self.remote_desktop_interface,
            "NotifyKeyboardKeycode",
            response=False,
            session_handle=self.session_handle,
            options={},
            keycode=keycode,
            state=state,
        )

    async def notify_keyboard_keysym(self, keysym: int, state: int) -> None:
        """
        Notify about a new keyboard keysym event.

        @param keysym: Keyboard symbol that was pressed or released
        @param state: New state of keyboard keysym
        """
        await self.dbus_call(
            self.remote_desktop_interface,
            "NotifyKeyboardKeysym",
            response=False,
            session_handle=self.session_handle,
            options={},
            keysym=keysym,
            state=state,
        )

    async def notify_touch_down(self, stream: int, slot: int, x: int, y: int) -> None:
        """
        Notify about a new touch down event.

        @param stream: The PipeWire stream node the coordinate is relative to
        @param slot: Touch slot where touch point appeared
        @param x: Touch down x coordinate
        @param y: Touch down y coordinate
        """
        await self.dbus_call(
            self.remote_desktop_interface,
            "NotifyTouchDown",
            response=False,
            session_handle=self.session_handle,
            options={},
            stream=stream,
            slot=slot,
            x=x,
            y=y,
        )

    async def notify_touch_motion(self, stream: int, slot: int, x: int, y: int) -> None:
        """
        Notify about a new touch motion event.

        @param stream: The PipeWire stream node the coordinate is relative to
        @param slot: Touch slot where touch point appeared
        @param x: Touch motion x coordinate
        @param y: Touch motion y coordinate
        """
        await self.dbus_call(
            self.remote_desktop_interface,
            "NotifyTouchMotion",
            response=False,
            session_handle=self.session_handle,
            options={},
            stream=stream,
            slot=slot,
            x=x,
            y=y,
        )

    async def notify_touch_up(self, slot: int) -> None:
        """
        Notify about a new touch up event.

        @param slot: Touch slot where touch point appeared
        """
        await self.dbus_call(
            self.remote_desktop_interface,
            "NotifyTouchUp",
            response=False,
            session_handle=self.session_handle,
            options={},
            slot=slot,
        )