# HG changeset patch # User Goffi # Date 1705616965 -3600 # Node ID 17a8168966f99b43875a41a34557f642090d98a4 # Parent 879bad48cc2dc115b4d49342dcd641d60d34d13d frontends (tools/webrtc): implement screensharing for Wayland + bug fixes: - Freedesktop Desktop Screenshare port is now used when Wayland is detected (needs `xdg-desktop-portal` with the implementation corresponding to desktop environment). - Add a binding feature to feedback state to application (e.g. if desktop sharing is cancelled from desktop environment, or at portal's permission request level). - fix misnaming of video source (was wrongly named `camera` instead of `video`). - fix desktop sharing pad selection in `input-selector` when it has been added once, then removed, then added again. rel 434 diff -r 879bad48cc2d -r 17a8168966f9 libervia/frontends/tools/webrtc.py --- a/libervia/frontends/tools/webrtc.py Tue Jan 16 10:42:00 2024 +0100 +++ b/libervia/frontends/tools/webrtc.py Thu Jan 18 23:29:25 2024 +0100 @@ -36,6 +36,7 @@ from dataclasses import dataclass from datetime import datetime import logging +from random import randint import re from typing import Callable from urllib.parse import quote_plus @@ -61,12 +62,184 @@ SINKS_TEST = "test" +class ScreenshareError(Exception): + pass + + @dataclass class AppSinkData: local_video_cb: Callable remote_video_cb: Callable +class DesktopPortal: + + def __init__(self, webrtc: "WebRTC"): + import dbus + from dbus.mainloop.glib import DBusGMainLoop + # we want monitors + windows, see https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.ScreenCast.html#org-freedesktop-portal-screencast-availablesourcetypes + self.dbus = dbus + self.webrtc = webrtc + self.sources_type = dbus.UInt32(7) + DBusGMainLoop(set_as_default=True) + self.session_bus = dbus.SessionBus() + portal_object = self.session_bus.get_object( + 'org.freedesktop.portal.Desktop', + '/org/freedesktop/portal/desktop' + ) + self.screencast_interface = dbus.Interface( + portal_object, + 'org.freedesktop.portal.ScreenCast' + ) + self.session_interface = None + self.session_signal = None + self.handle_counter = 0 + self.session_handle = None + self.stream_data: dict|None = None + + @property + def handle_token(self): + self.handle_counter += 1 + return f"libervia{self.handle_counter}" + + def on_session_closed(self, details: dict) -> None: + if self.session_interface is not None: + self.session_interface = None + self.webrtc.desktop_sharing = False + if self.session_signal is not None: + self.session_signal.remove() + self.session_signal = None + + + async def dbus_call(self, method_name: str, *args) -> dict: + """Call a screenshare portal method + + This method handle the signal response. + @param method_name: method to call + @param args: extra args + `handle_token` will be automatically added to the last arg (option dict) + @return: method result + """ + if self.session_handle is not None: + self.end_screenshare() + method = getattr(self.screencast_interface, method_name) + options = args[-1] + reply_fut = asyncio.Future() + signal_fut = asyncio.Future() + # cf. https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.Request.html + handle_token = self.handle_token + sender = self.session_bus.get_unique_name().replace(".", "_")[1:] + path = f"/org/freedesktop/portal/desktop/request/{sender}/{handle_token}" + signal_match = None + + def on_signal(response, results): + assert signal_match is not None + signal_match.remove() + if response == 0: + signal_fut.set_result(results) + elif response == 1: + signal_fut.set_exception( + exceptions.CancelError("Cancelled by user.") + ) + else: + signal_fut.set_exception(ScreenshareError( + f"Can't get signal result" + )) + + signal_match = self.session_bus.add_signal_receiver( + on_signal, + signal_name="Response", + dbus_interface="org.freedesktop.portal.Request", + path=path + ) + + options["handle_token"] = handle_token + + method( + *args, + reply_handler=reply_fut.set_result, + error_handler=reply_fut.set_exception + ) + try: + await reply_fut + except Exception as e: + raise ScreenshareError(f"Can't ask screenshare permission: {e}") + return await signal_fut + + async def request_screenshare(self) -> dict: + session_data = await self.dbus_call( + "CreateSession", + { + "session_handle_token": str(randint(1, 2**32)), + } + ) + try: + session_handle = session_data["session_handle"] + except KeyError: + raise ScreenshareError("Can't get session handle") + self.session_handle = session_handle + + + await self.dbus_call( + "SelectSources", + session_handle, + { + "multiple": True, + "types": self.sources_type, + "modal": True + } + ) + screenshare_data = await self.dbus_call( + "Start", + session_handle, + "", + {} + ) + + session_object = self.session_bus.get_object( + 'org.freedesktop.portal.Desktop', + session_handle + ) + self.session_interface = self.dbus.Interface( + session_object, + 'org.freedesktop.portal.Session' + ) + + self.session_signal = self.session_bus.add_signal_receiver( + self.on_session_closed, + signal_name="Closed", + dbus_interface="org.freedesktop.portal.Session", + path=session_handle + ) + + try: + node_id, stream_data = screenshare_data["streams"][0] + source_type = int(stream_data["source_type"]) + except (IndexError, KeyError): + raise ScreenshareError("Can't parse stream data") + self.stream_data = stream_data = { + "session_handle": session_handle, + "node_id": node_id, + "source_type": source_type + } + try: + height = int(stream_data["size"][0]) + weight = int(stream_data["size"][1]) + except (IndexError, KeyError): + pass + else: + stream_data["size"] = (height, weight) + + return self.stream_data + + def end_screenshare(self) -> None: + """Close a running screenshare session, if any.""" + if self.session_interface is None: + return + self.session_interface.Close() + self.on_session_closed({}) + + class WebRTC: """GSTreamer based WebRTC implementation for audio and video communication. @@ -100,6 +273,10 @@ "appsink_data can only be used for SINKS_APP sinks" ) self.reset_cb = reset_cb + if current_server == display_servers.WAYLAND: + self.desktop_portal = DesktopPortal(self) + else: + self.desktop_portal = None self.reset_instance() @property @@ -131,6 +308,12 @@ if active != self._desktop_sharing: self._desktop_sharing = active self.on_desktop_switch(active) + try: + cb = self.bindings["desktop_sharing"] + except KeyError: + pass + else: + cb(active) @property def sdp_set(self): @@ -161,6 +344,16 @@ raise Exception("self._media_types_inv should not be None!") return self._media_types_inv + def bind(self, **kwargs: Callable) -> None: + self.bindings.clear() + for key, cb in kwargs.items(): + if key not in ("desktop_sharing",): + raise ValueError( + 'Only "desktop_sharing" is currently allowed for binding' + ) + self.bindings[key] = cb + + def generate_dot_file( self, filename: str = "pipeline", @@ -341,6 +534,11 @@ self._media_types_inv = None self.audio_valve = None self.video_valve = None + if self.desktop_portal is not None: + self.desktop_portal.end_screenshare() + self.desktop_sharing = False + self.desktop_sink_pad = None + self.bindings = {} if self.reset_cb is not None: self.reset_cb() @@ -922,71 +1120,102 @@ @param muted: True if video is muted. """ if self.video_selector is not None: - current_source = None if muted else "desktop" if self.desktop_sharing else "camera" + current_source = None if muted else "desktop" if self.desktop_sharing else "video" self.switch_video_source(current_source) state = "muted" if muted else "unmuted" log.info(f"Video is now {state}") def on_desktop_switch(self, desktop_active: bool) -> None: - """Switches the video source between desktop and camera. + """Switches the video source between desktop and video. + + @param desktop_active: True if desktop must be active. False for video. + """ + if desktop_active and self.desktop_portal is not None: + aio.run_async(self.on_desktop_switch_portal(desktop_active)) + else: + self.do_desktop_switch(desktop_active) - @param desktop_active: True if desktop must be active. False for camera. - """ + async def on_desktop_switch_portal(self, desktop_active: bool) -> None: + """Call freedesktop screenshare portal and the activate the shared stream""" + assert self.desktop_portal is not None + try: + screenshare_data = await self.desktop_portal.request_screenshare() + except exceptions.CancelError: + self.desktop_sharing = False + return + self.desktop_sharing_data = { + "path": str(screenshare_data["node_id"]) + } + self.do_desktop_switch(desktop_active) + + def do_desktop_switch(self, desktop_active: bool) -> None: if self.video_muted: # Update the active source state but do not switch self.desktop_sharing = desktop_active return - source = "desktop" if desktop_active else "camera" + source = "desktop" if desktop_active else "video" self.switch_video_source(source) self.desktop_sharing = desktop_active def switch_video_source(self, source: str|None) -> None: - """Activates the specified source while deactivating the others. + """Activates the specified source while deactivating the others. - @param source: 'desktop', 'camera', 'muted' or None for muted source. - """ - if source is None: - source = "muted" - if source not in ["camera", "muted", "desktop"]: - raise ValueError( - f"Invalid source: {source!r}, use one of {'camera', 'muted', 'desktop'}" - ) + @param source: 'desktop', 'video', 'muted' or None for muted source. + """ + if source is None: + source = "muted" + if source not in ["video", "muted", "desktop"]: + raise ValueError( + f"Invalid source: {source!r}, use one of {'video', 'muted', 'desktop'}" + ) - self.pipeline.set_state(Gst.State.PAUSED) + self.pipeline.set_state(Gst.State.PAUSED) - # Create a new desktop source if necessary - if source == "desktop": - self._setup_desktop_source(self.desktop_sharing_data) + # Create a new desktop source if necessary + if source == "desktop": + self._setup_desktop_source(self.desktop_sharing_data) - # Activate the chosen source and deactivate the others - for src_name in ["camera", "muted", "desktop"]: - src_element = self.pipeline.get_by_name(f"{src_name}_src") - if src_name == source: - if src_element: - src_element.set_state(Gst.State.PLAYING) - else: - if src_element: - src_element.set_state(Gst.State.NULL) - if src_name == "desktop": - self._remove_desktop_source(src_element) + # Activate the chosen source and deactivate the others + for src_name in ["video", "muted", "desktop"]: + src_element = self.pipeline.get_by_name(f"{src_name}_src") + if src_name == source: + if src_element: + src_element.set_state(Gst.State.PLAYING) + else: + if src_element: + if src_name == "desktop": + self._remove_desktop_source(src_element) + else: + src_element.set_state(Gst.State.NULL) - # Set the video_selector active pad - pad_name = f"sink_{['camera', 'muted', 'desktop'].index(source)}" - pad = self.video_selector.get_static_pad(pad_name) - self.video_selector.props.active_pad = pad - self.pipeline.set_state(Gst.State.PLAYING) + # Set the video_selector active pad + if source == "desktop": + if self.desktop_sink_pad: + pad = self.desktop_sink_pad + else: + log.error(f"No desktop pad available") + pad = None + else: + pad_name = f"sink_{['video', 'muted'].index(source)}" + pad = self.video_selector.get_static_pad(pad_name) + + if pad is not None: + self.video_selector.props.active_pad = pad + + self.pipeline.set_state(Gst.State.PLAYING) def _setup_desktop_source(self, properties: dict[str, object]|None) -> None: """Set up a new desktop source. @param properties: The properties to set on the desktop source. """ - desktop_src = Gst.ElementFactory.make("ximagesrc", "desktop_src") + source_elt = "ximagesrc" if self.desktop_portal is None else "pipewiresrc" + desktop_src = Gst.ElementFactory.make(source_elt, "desktop_src") if properties is None: properties = {} for key, value in properties.items(): - log.debug(f"setting ximagesrc property: {key!r}={value!r}") + log.debug(f"setting {source_elt} property: {key!r}={value!r}") desktop_src.set_property(key, value) video_convert = Gst.ElementFactory.make("videoconvert", "desktop_videoconvert") queue = Gst.ElementFactory.make("queue", "desktop_queue") @@ -1000,9 +1229,9 @@ video_convert.link(queue) sink_pad_template = self.video_selector.get_pad_template("sink_%u") - sink_pad = self.video_selector.request_pad(sink_pad_template, None, None) + self.desktop_sink_pad = self.video_selector.request_pad(sink_pad_template, None, None) queue_src_pad = queue.get_static_pad("src") - queue_src_pad.link(sink_pad) + queue_src_pad.link(self.desktop_sink_pad) desktop_src.sync_state_with_parent() video_convert.sync_state_with_parent() @@ -1016,15 +1245,27 @@ # Remove elements for the desktop source video_convert = self.pipeline.get_by_name("desktop_videoconvert") queue = self.pipeline.get_by_name("desktop_queue") + if video_convert: video_convert.set_state(Gst.State.NULL) desktop_src.unlink(video_convert) self.pipeline.remove(video_convert) + if queue: queue.set_state(Gst.State.NULL) self.pipeline.remove(queue) + + desktop_src.set_state(Gst.State.NULL) self.pipeline.remove(desktop_src) + # Release the pad associated with the desktop source + if self.desktop_sink_pad: + self.video_selector.release_request_pad(self.desktop_sink_pad) + self.desktop_sink_pad = None + + if self.desktop_portal is not None: + self.desktop_portal.end_screenshare() + async def end_call(self) -> None: """Stop streaming and clean instance""" self.reset_instance()