Mercurial > libervia-backend
view libervia/frontends/tools/portal_desktop.py @ 4242:8acf46ed7f36
frontends: remote control implementation:
This is the frontends common part of remote control implementation. It handle the creation
of WebRTC session, and management of inputs. For now the reception use freedesktop.org
Desktop portal, and works mostly with Wayland based Desktop Environments.
rel 436
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 11 May 2024 13:52:43 +0200 |
parents | 79c8a70e1813 |
children | 0d7bb4df2343 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia freedesktop portal management module # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from typing import Callable, Literal, overload from libervia.backend.core import exceptions import asyncio import logging from random import randint import dbus from dbus.mainloop.glib import DBusGMainLoop log = logging.getLogger(__name__) class PortalError(Exception): pass class DesktopPortal: def __init__(self, on_session_closed_cb: Callable | None = None): # we want monitors + windows, see https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.ScreenCast.html#org-freedesktop-portal-screencast-availablesourcetypes self.dbus = dbus self.on_session_closed_cb = on_session_closed_cb self.sources_type = dbus.UInt32(7) DBusGMainLoop(set_as_default=True) self.session_bus = dbus.SessionBus() portal_object = self.session_bus.get_object( "org.freedesktop.portal.Desktop", "/org/freedesktop/portal/desktop" ) self.screencast_interface = dbus.Interface( portal_object, "org.freedesktop.portal.ScreenCast" ) self.remote_desktop_interface = dbus.Interface( portal_object, "org.freedesktop.portal.RemoteDesktop" ) self.session_interface = None self.session_signal = None self.handle_counter = 0 self.session_handle = None @property def handle_token(self): self.handle_counter += 1 return f"libervia{self.handle_counter}" def on_session_closed(self, details: dict) -> None: if self.session_interface is not None: self.session_interface = None if self.on_session_closed_cb is not None: self.on_session_closed_cb() if self.session_signal is not None: self.session_signal.remove() self.session_signal = None @overload async def dbus_call( self, interface: dbus.Interface, method_name: str, response: Literal[False], **kwargs, ) -> None: ... @overload async def dbus_call( self, interface: dbus.Interface, method_name: str, response: Literal[True], **kwargs, ) -> dict: ... async def dbus_call( self, interface: dbus.Interface, method_name: str, response: bool, **kwargs, ) -> dict|None: """Call a portal method This method handle the signal response. @param method_name: method to call @param response: True if the method expect a response. If True, the method will await responde from ``org.freedesktop.portal.Request``'s ``Response`` signal. @param kwargs: method args. ``handle_token`` will be automatically added to ``options`` dict. @return: method result """ method = getattr(interface, method_name) try: options = kwargs["options"] except KeyError: raise exceptions.InternalError('"options" key must be present.') reply_fut = asyncio.Future() signal_fut = asyncio.Future() # cf. https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.Request.html handle_token = self.handle_token sender = self.session_bus.get_unique_name().replace(".", "_")[1:] path = f"/org/freedesktop/portal/desktop/request/{sender}/{handle_token}" signal_match = None def on_signal(response, results): print(f"on_signal responde {response=}") assert signal_match is not None signal_match.remove() if response == 0: signal_fut.set_result(results) elif response == 1: signal_fut.set_exception(exceptions.CancelError("Cancelled by user.")) else: signal_fut.set_exception(PortalError("Can't get signal result")) if response: signal_match = self.session_bus.add_signal_receiver( on_signal, signal_name="Response", dbus_interface="org.freedesktop.portal.Request", path=path, ) options["handle_token"] = handle_token method( *kwargs.values(), reply_handler=lambda ret=None: reply_fut.set_result(ret), error_handler=reply_fut.set_exception, ) try: await reply_fut except Exception as e: raise PortalError(f"Can't ask portal permission: {e}") if response: return await signal_fut async def create_session( self, interface: dbus.Interface, ) -> dict: """Create a new session and store its handle. This method creates a new session using the freedesktop portal's CreateSession dbus call. It then registers the session handle in this object for further use. @param None @return: A dictionary containing the session data, including the session handle. @raise PortalError: If there is an error getting the session handle. """ if self.session_handle is not None: self.end_session() session_data = await self.dbus_call( interface, "CreateSession", response=True, options={ "session_handle_token": str(randint(1, 2**32)), }, ) try: session_handle = session_data["session_handle"] except KeyError: raise PortalError("Can't get session handle") self.session_handle = session_handle return session_data def parse_streams(self, session_handle, screenshare_data: dict) -> dict: """Fill and returns stream_data from screenshare_data""" try: node_id, shared_stream_data = screenshare_data["streams"][0] source_type = int(shared_stream_data["source_type"]) except (IndexError, KeyError): raise exceptions.NotFound("No stream data found.") stream_data = { "session_handle": session_handle, "node_id": node_id, "source_type": source_type, } try: height = int(shared_stream_data["size"][0]) weight = int(shared_stream_data["size"][1]) except (IndexError, KeyError): pass else: stream_data["size"] = (height, weight) return stream_data async def request_screenshare(self) -> dict: await self.create_session(self.screencast_interface) session_handle = self.session_handle await self.dbus_call( self.screencast_interface, "SelectSources", response=True, session_handle=session_handle, options={"multiple": True, "types": self.sources_type}, ) screenshare_data = await self.dbus_call( self.screencast_interface, "Start", response=True, session_handle=session_handle, parent_window="", options={}, ) session_object = self.session_bus.get_object( "org.freedesktop.portal.Desktop", session_handle ) self.session_interface = self.dbus.Interface( session_object, "org.freedesktop.portal.Session" ) self.session_signal = self.session_bus.add_signal_receiver( self.on_session_closed, signal_name="Closed", dbus_interface="org.freedesktop.portal.Session", path=session_handle, ) try: return self.parse_streams(session_handle, screenshare_data) except exceptions.NotFound: raise PortalError("Can't parse stream data") async def request_remote_desktop(self, with_screen_sharing: bool = True) -> dict: """Request autorisation to remote control desktop. @param with_screen_sharing: True if screen must be shared. """ await self.create_session(self.remote_desktop_interface) session_handle = self.session_handle if with_screen_sharing: await self.dbus_call( self.screencast_interface, "SelectSources", response=False, session_handle=session_handle, options={ "multiple": True, "types": self.sources_type, # hidden cursor (should be the default, but cursor appears during # tests)) "cursor_mode": dbus.UInt32(1) }, ) await self.dbus_call( self.remote_desktop_interface, "SelectDevices", response=False, session_handle=session_handle, options={ "types": dbus.UInt32(3), # "persist_mode": dbus.UInt32(1), }, ) remote_desktop_data = await self.dbus_call( self.remote_desktop_interface, "Start", response=True, session_handle=session_handle, parent_window="", options={}, ) try: stream_data = self.parse_streams(session_handle, remote_desktop_data) except exceptions.NotFound: pass else: remote_desktop_data["stream_data"] = stream_data session_object = self.session_bus.get_object( "org.freedesktop.portal.Desktop", session_handle ) self.session_interface = self.dbus.Interface( session_object, "org.freedesktop.portal.Session" ) self.session_signal = self.session_bus.add_signal_receiver( self.on_session_closed, signal_name="Closed", dbus_interface="org.freedesktop.portal.Session", path=session_handle, ) return remote_desktop_data def end_session(self) -> None: """Close a running screenshare session, if any.""" if self.session_interface is None: return self.session_interface.Close() self.on_session_closed({}) async def notify_pointer_motion(self, dx: int, dy: int) -> None: """ Notify about a new relative pointer motion event. @param dx: Relative movement on the x axis @param dy: Relative movement on the y axis """ await self.dbus_call( self.remote_desktop_interface, "NotifyPointerMotion", response=False, session_handle=self.session_handle, options={}, dx=dx, dy=dy, ) async def notify_pointer_motion_absolute( self, stream: int, x: float, y: float ) -> None: """ Notify about a new absolute pointer motion event. @param stream: The PipeWire stream node the coordinate is relative to @param x: Pointer motion x coordinate @param y: Pointer motion y coordinate """ await self.dbus_call( self.remote_desktop_interface, "NotifyPointerMotionAbsolute", response=False, session_handle=self.session_handle, options={}, stream=stream, x=x, y=y, ) async def notify_pointer_button(self, button: int, state: int) -> None: """ Notify about a new pointer button event. @param button: The pointer button was pressed or released @param state: The new state of the button """ await self.dbus_call( self.remote_desktop_interface, "NotifyPointerButton", response=False, session_handle=self.session_handle, options={}, button=button, state=state, ) async def notify_pointer_axis(self, dx: float, dy: float) -> None: """ Notify about a new pointer axis event. @param dx: Relative axis movement on the x axis @param dy: Relative axis movement on the y axis """ await self.dbus_call( self.remote_desktop_interface, "NotifyPointerAxis", response=False, session_handle=self.session_handle, options={}, dx=dx, dy=dy, ) async def notify_pointer_axis_discrete(self, axis: int, steps: int) -> None: """ Notify about a new pointer axis discrete event. @param axis: The axis that was scrolled 0 for vertical 1 for horizontal @param steps: The number of steps scrolled """ await self.dbus_call( self.remote_desktop_interface, "NotifyPointerAxisDiscrete", response=False, session_handle=self.session_handle, options={}, axis=axis, steps=steps, ) async def notify_keyboard_keycode(self, keycode: int, state: int) -> None: """ Notify about a new keyboard keycode event. @param keycode: Keyboard code that was pressed or released @param state: New state of keyboard keycode """ await self.dbus_call( self.remote_desktop_interface, "NotifyKeyboardKeycode", response=False, session_handle=self.session_handle, options={}, keycode=keycode, state=state, ) async def notify_keyboard_keysym(self, keysym: int, state: int) -> None: """ Notify about a new keyboard keysym event. @param keysym: Keyboard symbol that was pressed or released @param state: New state of keyboard keysym """ await self.dbus_call( self.remote_desktop_interface, "NotifyKeyboardKeysym", response=False, session_handle=self.session_handle, options={}, keysym=keysym, state=state, ) async def notify_touch_down(self, stream: int, slot: int, x: int, y: int) -> None: """ Notify about a new touch down event. @param stream: The PipeWire stream node the coordinate is relative to @param slot: Touch slot where touch point appeared @param x: Touch down x coordinate @param y: Touch down y coordinate """ await self.dbus_call( self.remote_desktop_interface, "NotifyTouchDown", response=False, session_handle=self.session_handle, options={}, stream=stream, slot=slot, x=x, y=y, ) async def notify_touch_motion(self, stream: int, slot: int, x: int, y: int) -> None: """ Notify about a new touch motion event. @param stream: The PipeWire stream node the coordinate is relative to @param slot: Touch slot where touch point appeared @param x: Touch motion x coordinate @param y: Touch motion y coordinate """ await self.dbus_call( self.remote_desktop_interface, "NotifyTouchMotion", response=False, session_handle=self.session_handle, options={}, stream=stream, slot=slot, x=x, y=y, ) async def notify_touch_up(self, slot: int) -> None: """ Notify about a new touch up event. @param slot: Touch slot where touch point appeared """ await self.dbus_call( self.remote_desktop_interface, "NotifyTouchUp", response=False, session_handle=self.session_handle, options={}, slot=slot, )