Mercurial > libervia-backend
view libervia/frontends/tools/webrtc_screenshare.py @ 4239:a38559e6d6e2
core: remove legacy VERSION file.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 11 May 2024 13:25:45 +0200 |
parents | d01b8d002619 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia WebRTC implementation # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from libervia.backend.core import exceptions import asyncio import logging from random import randint log = logging.getLogger(__name__) SOURCES_AUTO = "auto" SOURCES_TEST = "test" SOURCES_DATACHANNEL = "datachannel" SINKS_APP = "app" SINKS_AUTO = "auto" SINKS_TEST = "test" SINKS_DATACHANNEL = "datachannel" class ScreenshareError(Exception): pass class DesktopPortal: def __init__(self, webrtc): import dbus from dbus.mainloop.glib import DBusGMainLoop # we want monitors + windows, see https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.ScreenCast.html#org-freedesktop-portal-screencast-availablesourcetypes self.dbus = dbus self.webrtc = webrtc self.sources_type = dbus.UInt32(7) DBusGMainLoop(set_as_default=True) self.session_bus = dbus.SessionBus() portal_object = self.session_bus.get_object( 'org.freedesktop.portal.Desktop', '/org/freedesktop/portal/desktop' ) self.screencast_interface = dbus.Interface( portal_object, 'org.freedesktop.portal.ScreenCast' ) self.session_interface = None self.session_signal = None self.handle_counter = 0 self.session_handle = None self.stream_data: dict|None = None @property def handle_token(self): self.handle_counter += 1 return f"libervia{self.handle_counter}" def on_session_closed(self, details: dict) -> None: if self.session_interface is not None: self.session_interface = None self.webrtc.desktop_sharing = False if self.session_signal is not None: self.session_signal.remove() self.session_signal = None async def dbus_call(self, method_name: str, *args) -> dict: """Call a screenshare portal method This method handle the signal response. @param method_name: method to call @param args: extra args `handle_token` will be automatically added to the last arg (option dict) @return: method result """ if self.session_handle is not None: self.end_screenshare() method = getattr(self.screencast_interface, method_name) options = args[-1] reply_fut = asyncio.Future() signal_fut = asyncio.Future() # cf. https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.Request.html handle_token = self.handle_token sender = self.session_bus.get_unique_name().replace(".", "_")[1:] path = f"/org/freedesktop/portal/desktop/request/{sender}/{handle_token}" signal_match = None def on_signal(response, results): assert signal_match is not None signal_match.remove() if response == 0: signal_fut.set_result(results) elif response == 1: signal_fut.set_exception( exceptions.CancelError("Cancelled by user.") ) else: signal_fut.set_exception(ScreenshareError( f"Can't get signal result" )) signal_match = self.session_bus.add_signal_receiver( on_signal, signal_name="Response", dbus_interface="org.freedesktop.portal.Request", path=path ) options["handle_token"] = handle_token method( *args, reply_handler=reply_fut.set_result, error_handler=reply_fut.set_exception ) try: await reply_fut except Exception as e: raise ScreenshareError(f"Can't ask screenshare permission: {e}") return await signal_fut async def request_screenshare(self) -> dict: session_data = await self.dbus_call( "CreateSession", { "session_handle_token": str(randint(1, 2**32)), } ) try: session_handle = session_data["session_handle"] except KeyError: raise ScreenshareError("Can't get session handle") self.session_handle = session_handle await self.dbus_call( "SelectSources", session_handle, { "multiple": True, "types": self.sources_type, "modal": True } ) screenshare_data = await self.dbus_call( "Start", session_handle, "", {} ) session_object = self.session_bus.get_object( 'org.freedesktop.portal.Desktop', session_handle ) self.session_interface = self.dbus.Interface( session_object, 'org.freedesktop.portal.Session' ) self.session_signal = self.session_bus.add_signal_receiver( self.on_session_closed, signal_name="Closed", dbus_interface="org.freedesktop.portal.Session", path=session_handle ) try: node_id, stream_data = screenshare_data["streams"][0] source_type = int(stream_data["source_type"]) except (IndexError, KeyError): raise ScreenshareError("Can't parse stream data") self.stream_data = stream_data = { "session_handle": session_handle, "node_id": node_id, "source_type": source_type } try: height = int(stream_data["size"][0]) weight = int(stream_data["size"][1]) except (IndexError, KeyError): pass else: stream_data["size"] = (height, weight) return self.stream_data def end_screenshare(self) -> None: """Close a running screenshare session, if any.""" if self.session_interface is None: return self.session_interface.Close() self.on_session_closed({})