Mercurial > libervia-backend
view libervia/frontends/tools/portal_desktop.py @ 4294:a0ed5c976bf8
component conferences, plugin XEP-0167, XEP-0298: add stream user metadata:
A/V conference now adds user metadata about the stream it is forwarding through XEP-0298.
This is parsed and added to metadata during confirmation on client side.
rel 448
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 06 Aug 2024 23:43:11 +0200 |
parents | 0d7bb4df2343 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia freedesktop portal management module # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from typing import Callable, Literal, overload from libervia.backend.core import exceptions import asyncio import logging from random import randint import dbus from dbus.mainloop.glib import DBusGMainLoop log = logging.getLogger(__name__) class PortalError(Exception): pass class DesktopPortal: def __init__(self, on_session_closed_cb: Callable | None = None): # we want monitors + windows, see https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.ScreenCast.html#org-freedesktop-portal-screencast-availablesourcetypes self.dbus = dbus self.on_session_closed_cb = on_session_closed_cb self.sources_type = dbus.UInt32(7) DBusGMainLoop(set_as_default=True) self.session_bus = dbus.SessionBus() portal_object = self.session_bus.get_object( "org.freedesktop.portal.Desktop", "/org/freedesktop/portal/desktop" ) self.screencast_interface = dbus.Interface( portal_object, "org.freedesktop.portal.ScreenCast" ) self.remote_desktop_interface = dbus.Interface( portal_object, "org.freedesktop.portal.RemoteDesktop" ) self.session_interface = None self.session_signal = None self.handle_counter = 0 self.session_handle = None @property def handle_token(self): self.handle_counter += 1 return f"libervia{self.handle_counter}" def on_session_closed(self, details: dict) -> None: if self.session_interface is not None: self.session_interface = None if self.on_session_closed_cb is not None: self.on_session_closed_cb() if self.session_signal is not None: self.session_signal.remove() self.session_signal = None @overload async def dbus_call( self, interface: dbus.Interface, method_name: str, response: Literal[False], **kwargs, ) -> None: ... @overload async def dbus_call( self, interface: dbus.Interface, method_name: str, response: Literal[True], **kwargs, ) -> dict: ... async def dbus_call( self, interface: dbus.Interface, method_name: str, response: bool, **kwargs, ) -> dict | None: """Call a portal method This method handle the signal response. @param method_name: method to call @param response: True if the method expect a response. If True, the method will await responde from ``org.freedesktop.portal.Request``'s ``Response`` signal. @param kwargs: method args. ``handle_token`` will be automatically added to ``options`` dict. @return: method result """ method = getattr(interface, method_name) try: options = kwargs["options"] except KeyError: raise exceptions.InternalError('"options" key must be present.') reply_fut = asyncio.Future() signal_fut = asyncio.Future() # cf. https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.Request.html handle_token = self.handle_token sender = self.session_bus.get_unique_name().replace(".", "_")[1:] path = f"/org/freedesktop/portal/desktop/request/{sender}/{handle_token}" signal_match = None def on_signal(response, results): print(f"on_signal responde {response=}") assert signal_match is not None signal_match.remove() if response == 0: signal_fut.set_result(results) elif response == 1: signal_fut.set_exception(exceptions.CancelError("Cancelled by user.")) else: signal_fut.set_exception(PortalError("Can't get signal result")) if response: signal_match = self.session_bus.add_signal_receiver( on_signal, signal_name="Response", dbus_interface="org.freedesktop.portal.Request", path=path, ) options["handle_token"] = handle_token method( *kwargs.values(), reply_handler=lambda ret=None: reply_fut.set_result(ret), error_handler=reply_fut.set_exception, ) try: await reply_fut except Exception as e: raise PortalError(f"Can't ask portal permission: {e}") if response: return await signal_fut async def create_session( self, interface: dbus.Interface, ) -> dict: """Create a new session and store its handle. This method creates a new session using the freedesktop portal's CreateSession dbus call. It then registers the session handle in this object for further use. @param None @return: A dictionary containing the session data, including the session handle. @raise PortalError: If there is an error getting the session handle. """ if self.session_handle is not None: self.end_session() session_data = await self.dbus_call( interface, "CreateSession", response=True, options={ "session_handle_token": str(randint(1, 2**32)), }, ) try: session_handle = session_data["session_handle"] except KeyError: raise PortalError("Can't get session handle") self.session_handle = session_handle return session_data def parse_streams(self, session_handle, screenshare_data: dict) -> dict: """Fill and returns stream_data from screenshare_data""" try: node_id, shared_stream_data = screenshare_data["streams"][0] source_type = int(shared_stream_data["source_type"]) except (IndexError, KeyError): raise exceptions.NotFound("No stream data found.") stream_data = { "session_handle": session_handle, "node_id": node_id, "source_type": source_type, } try: height = int(shared_stream_data["size"][0]) weight = int(shared_stream_data["size"][1]) except (IndexError, KeyError): pass else: stream_data["size"] = (height, weight) return stream_data async def request_screenshare(self) -> dict: await self.create_session(self.screencast_interface) session_handle = self.session_handle await self.dbus_call( self.screencast_interface, "SelectSources", response=True, session_handle=session_handle, options={"multiple": True, "types": self.sources_type}, ) screenshare_data = await self.dbus_call( self.screencast_interface, "Start", response=True, session_handle=session_handle, parent_window="", options={}, ) session_object = self.session_bus.get_object( "org.freedesktop.portal.Desktop", session_handle ) self.session_interface = self.dbus.Interface( session_object, "org.freedesktop.portal.Session" ) self.session_signal = self.session_bus.add_signal_receiver( self.on_session_closed, signal_name="Closed", dbus_interface="org.freedesktop.portal.Session", path=session_handle, ) try: return self.parse_streams(session_handle, screenshare_data) except exceptions.NotFound: raise PortalError("Can't parse stream data") async def request_remote_desktop(self, with_screen_sharing: bool = True) -> dict: """Request autorisation to remote control desktop. @param with_screen_sharing: True if screen must be shared. """ await self.create_session(self.remote_desktop_interface) session_handle = self.session_handle if with_screen_sharing: await self.dbus_call( self.screencast_interface, "SelectSources", response=False, session_handle=session_handle, options={ "multiple": True, "types": self.sources_type, # hidden cursor (should be the default, but cursor appears during # tests)) "cursor_mode": dbus.UInt32(1), }, ) await self.dbus_call( self.remote_desktop_interface, "SelectDevices", response=False, session_handle=session_handle, options={ "types": dbus.UInt32(3), # "persist_mode": dbus.UInt32(1), }, ) remote_desktop_data = await self.dbus_call( self.remote_desktop_interface, "Start", response=True, session_handle=session_handle, parent_window="", options={}, ) try: stream_data = self.parse_streams(session_handle, remote_desktop_data) except exceptions.NotFound: pass else: remote_desktop_data["stream_data"] = stream_data session_object = self.session_bus.get_object( "org.freedesktop.portal.Desktop", session_handle ) self.session_interface = self.dbus.Interface( session_object, "org.freedesktop.portal.Session" ) self.session_signal = self.session_bus.add_signal_receiver( self.on_session_closed, signal_name="Closed", dbus_interface="org.freedesktop.portal.Session", path=session_handle, ) return remote_desktop_data def end_session(self) -> None: """Close a running screenshare session, if any.""" if self.session_interface is None: return self.session_interface.Close() self.on_session_closed({}) async def notify_pointer_motion(self, dx: int, dy: int) -> None: """ Notify about a new relative pointer motion event. @param dx: Relative movement on the x axis @param dy: Relative movement on the y axis """ await self.dbus_call( self.remote_desktop_interface, "NotifyPointerMotion", response=False, session_handle=self.session_handle, options={}, dx=dx, dy=dy, ) async def notify_pointer_motion_absolute( self, stream: int, x: float, y: float ) -> None: """ Notify about a new absolute pointer motion event. @param stream: The PipeWire stream node the coordinate is relative to @param x: Pointer motion x coordinate @param y: Pointer motion y coordinate """ await self.dbus_call( self.remote_desktop_interface, "NotifyPointerMotionAbsolute", response=False, session_handle=self.session_handle, options={}, stream=stream, x=x, y=y, ) async def notify_pointer_button(self, button: int, state: int) -> None: """ Notify about a new pointer button event. @param button: The pointer button was pressed or released @param state: The new state of the button """ await self.dbus_call( self.remote_desktop_interface, "NotifyPointerButton", response=False, session_handle=self.session_handle, options={}, button=button, state=state, ) async def notify_pointer_axis(self, dx: float, dy: float) -> None: """ Notify about a new pointer axis event. @param dx: Relative axis movement on the x axis @param dy: Relative axis movement on the y axis """ await self.dbus_call( self.remote_desktop_interface, "NotifyPointerAxis", response=False, session_handle=self.session_handle, options={}, dx=dx, dy=dy, ) async def notify_pointer_axis_discrete(self, axis: int, steps: int) -> None: """ Notify about a new pointer axis discrete event. @param axis: The axis that was scrolled 0 for vertical 1 for horizontal @param steps: The number of steps scrolled """ await self.dbus_call( self.remote_desktop_interface, "NotifyPointerAxisDiscrete", response=False, session_handle=self.session_handle, options={}, axis=axis, steps=steps, ) async def notify_keyboard_keycode(self, keycode: int, state: int) -> None: """ Notify about a new keyboard keycode event. @param keycode: Keyboard code that was pressed or released @param state: New state of keyboard keycode """ await self.dbus_call( self.remote_desktop_interface, "NotifyKeyboardKeycode", response=False, session_handle=self.session_handle, options={}, keycode=keycode, state=state, ) async def notify_keyboard_keysym(self, keysym: int, state: int) -> None: """ Notify about a new keyboard keysym event. @param keysym: Keyboard symbol that was pressed or released @param state: New state of keyboard keysym """ await self.dbus_call( self.remote_desktop_interface, "NotifyKeyboardKeysym", response=False, session_handle=self.session_handle, options={}, keysym=keysym, state=state, ) async def notify_touch_down(self, stream: int, slot: int, x: int, y: int) -> None: """ Notify about a new touch down event. @param stream: The PipeWire stream node the coordinate is relative to @param slot: Touch slot where touch point appeared @param x: Touch down x coordinate @param y: Touch down y coordinate """ await self.dbus_call( self.remote_desktop_interface, "NotifyTouchDown", response=False, session_handle=self.session_handle, options={}, stream=stream, slot=slot, x=x, y=y, ) async def notify_touch_motion(self, stream: int, slot: int, x: int, y: int) -> None: """ Notify about a new touch motion event. @param stream: The PipeWire stream node the coordinate is relative to @param slot: Touch slot where touch point appeared @param x: Touch motion x coordinate @param y: Touch motion y coordinate """ await self.dbus_call( self.remote_desktop_interface, "NotifyTouchMotion", response=False, session_handle=self.session_handle, options={}, stream=stream, slot=slot, x=x, y=y, ) async def notify_touch_up(self, slot: int) -> None: """ Notify about a new touch up event. @param slot: Touch slot where touch point appeared """ await self.dbus_call( self.remote_desktop_interface, "NotifyTouchUp", response=False, session_handle=self.session_handle, options={}, slot=slot, )