Mercurial > libervia-backend
view libervia/frontends/tools/webrtc_remote_control.py @ 4339:699aa8788d98
tests (unit/email gateway): add tests for pubsub service:
rel 453
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 03 Dec 2024 00:52:06 +0100 |
parents | 0d7bb4df2343 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia Remote-Control via WebRTC implementation. # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import asyncio from functools import partial import logging from typing import Any, Awaitable, Callable import gi from gi.overrides.Gst import GLib from gi.repository import GstWebRTC import cbor2 from libervia.backend.core.i18n import _ from libervia.backend.tools.common import data_format from libervia.frontends.tools import aio, jid, webrtc from libervia.frontends.tools.webrtc_models import ( CallData, SourcesNone, SourcesPipeline, ) gi.require_versions({"Gst": "1.0", "GstWebRTC": "1.0"}) OnOpenCbType = Callable[["WebRTCRemoteController"], None | Awaitable[None]] MOUSE_BTN_LEFT = 0x110 MOUSE_BTN_RIGHT = 0x111 MOUSE_BTN_MIDDLE = 0x112 MOUSE_BTN_FORWARD = 0x115 MOUSE_BTN_BACK = 0x116 log = logging.getLogger(__name__) class WebRTCRemoteController: def __init__( self, bridge, profile: str, on_call_start_cb: Callable[[dict], Any] | None = None, end_call_cb: Callable[[], Any] | None = None, ) -> None: """Initializes the File Sender. @param bridge: An async Bridge instance. @param profile: The profile name to be used. @param on_call_start_cb: A blocking or async callable that accepts a dict as its only argument. @param end_call_cb: A callable to be invoked at the end of a call. """ self.bridge = bridge self.profile = profile self.on_call_start_cb = on_call_start_cb self.end_call_cb = end_call_cb self.loop = asyncio.get_event_loop() self.data_channel: GstWebRTC.WebRTCDataChannel | None = None def send_input(self, input_data: dict) -> None: """Send an input data to controlled device @param input_data: data of the input event. """ assert self.data_channel is not None self.data_channel.send_data(GLib.Bytes(cbor2.dumps(input_data))) async def _on_webrtc_call_start( self, options: dict, callee: str, call_data: dict, profile: str, ) -> str: rc_data = { "webrtc": True, "call_data": call_data, } rc_data.update(options) remote_control_data_s = await self.bridge.remote_control_start( str(callee), data_format.serialise(rc_data), profile, ) remote_control_data = data_format.deserialise(remote_control_data_s) if self.on_call_start_cb is not None: await aio.maybe_async(self.on_call_start_cb(remote_control_data)) return remote_control_data["session_id"] def _on_dc_close(self, webrtc_call, data_channel: GstWebRTC.WebRTCDataChannel): if webrtc_call is not None: aio.run_from_thread(self._end_call_and_quit, webrtc_call, loop=self.loop) async def _end_call_and_quit(self, webrtc_call): await webrtc_call.webrtc.end_call() if self.end_call_cb is not None: await aio.maybe_async(self.end_call_cb()) def _on_dc_open( self, on_open_cb: OnOpenCbType, data_channel: GstWebRTC.WebRTCDataChannel ) -> None: """Called when datachannel is open""" self.data_channel = data_channel aio.run_from_thread(self.on_dc_opened, on_open_cb, loop=self.loop) async def on_dc_opened(self, on_open_cb: OnOpenCbType) -> None: await aio.maybe_async(on_open_cb(self)) async def start( self, callee: jid.JID, options: dict, on_open_cb: OnOpenCbType ) -> None: """Start a remote control session with ``callee`` @param callee: The JID of the recipient to send the file to. @param options: Options such as which devices to control. @param on_open_cb: Method to call when the Data Channel is open. The WebRTCRemoteController instance used as argument can then be used to send input events to received. """ call_data = CallData(callee=callee) self.webrtc_call = await webrtc.WebRTCCall.make_webrtc_call( self.bridge, self.profile, call_data, sources_data=webrtc.SourcesDataChannel( dc_open_cb=partial(self._on_dc_open, on_open_cb), ), call_start_cb=partial(self._on_webrtc_call_start, options), ) class WebRTCRemoteControlReceiver: def __init__( self, bridge, profile: str, on_close_cb: Callable[[], Any] | None = None, verbose: bool = False, ) -> None: """Initializes the File Receiver. @param bridge: An async Bridge instance. @param profile: The profile name to be used. @param on_close_cb: Called when the Data Channel is closed. @param verbose: if True, print input events. """ self.bridge = bridge self.profile = profile self.on_close_cb = on_close_cb self.loop = asyncio.get_event_loop() self.desktop_portal = None self.remote_desktop_data: dict | None = None self.stream_node_id: int | None = None self.verbose = verbose async def do_input(self, data: dict) -> None: assert self.desktop_portal is not None try: type_ = data["type"] if type_.startswith("mouse"): try: try: x, y = data["x"], data["y"] except KeyError: dx, dy = data["movementX"], data["movementY"] await self.desktop_portal.notify_pointer_motion(dx, dy) else: assert self.stream_node_id is not None await self.desktop_portal.notify_pointer_motion_absolute( self.stream_node_id, x, y ) except Exception: log.exception("Can't send input") if type_ in ("mouseup", "mousedown"): buttons = data["buttons"] state = 1 if type_ == "mousedown" else 0 # see https://developer.mozilla.org/en-US/docs/Web/API/MouseEvent/buttons#value if buttons & 1: await self.desktop_portal.notify_pointer_button( MOUSE_BTN_LEFT, state ) if buttons & 2: await self.desktop_portal.notify_pointer_button( MOUSE_BTN_RIGHT, state ) if buttons & 4: await self.desktop_portal.notify_pointer_button( MOUSE_BTN_MIDDLE, state ) if buttons & 8: await self.desktop_portal.notify_pointer_button( MOUSE_BTN_BACK, state ) if buttons & 16: await self.desktop_portal.notify_pointer_button( MOUSE_BTN_FORWARD, state ) elif type_ == "wheel": dx = data.get("deltaX", 0) dy = data.get("deltaY", 0) delta_mode = data["deltaMode"] if delta_mode == 0: # deltas are in pixels await self.desktop_portal.notify_pointer_axis(dx, dy) else: # deltas are in steps (see # https://developer.mozilla.org/en-US/docs/Web/API/Element/wheel_event#event_properties) if dx: await self.desktop_portal.notify_pointer_axis(1, dx) if dy: await self.desktop_portal.notify_pointer_axis(0, dy) elif type_.startswith("key"): # FIXME: this is a really naive implementation, it needs tot be improved. key = data["key"] if data.get("shiftKey", False): key = key.upper() await self.desktop_portal.notify_keyboard_keysym( ord(key), 1 if type_ == "keydown" else 0 ) except Exception: log.exception(f"Can't handle input {data}") def _on_dc_message_data(self, data_channel, glib_data) -> None: """A data chunk of the file has been received.""" raw = glib_data.get_data() data = cbor2.loads(raw) if self.verbose: print(data) aio.run_from_thread(self.do_input, data, loop=self.loop) def _on_dc_close(self, data_channel) -> None: """Data channel is closed The file download should be complete, we close it. """ aio.run_from_thread(self._on_close, loop=self.loop) async def _on_close(self) -> None: if self.on_close_cb is not None: await aio.maybe_async(self.on_close_cb()) def _on_data_channel(self, webrtcbin, data_channel) -> None: """The data channel has been opened.""" data_channel.connect("on-message-data", self._on_dc_message_data) data_channel.connect("on-close", self._on_dc_close) async def request_remote_desktop(self, with_screen_sharing: bool) -> None: """Request autorisation to remote control desktop. @param with_screen_sharing: True if screen must be shared. """ from .portal_desktop import DesktopPortal self.desktop_portal = DesktopPortal() self.remote_desktop_data = await self.desktop_portal.request_remote_desktop( with_screen_sharing ) print(self.remote_desktop_data) async def start_receiving( self, from_jid: jid.JID, session_id: str, screenshare: dict ) -> None: """Receives a file via WebRTC and saves it to the specified path. @param from_jid: The JID of the entity sending the file. @param session_id: The Jingle FT Session ID. @param file_path: The local path where the received file will be saved. If a file already exists at this path, it will be overwritten. @param file_data: Additional data about the file being transferred. """ call_data = CallData(callee=from_jid, sid=session_id) if "video" in screenshare and self.remote_desktop_data: try: stream_data = self.remote_desktop_data["stream_data"] log.debug(f"{stream_data=}") self.stream_node_id = stream_data["node_id"] sources_data = SourcesPipeline( video_pipeline="pipewiresrc", audio_pipeline="", video_properties={ "path": str(self.stream_node_id), "do-timestamp": 1, }, ) except KeyError: sources_data = SourcesNone() else: sources_data = SourcesNone() await webrtc.WebRTCCall.make_webrtc_call( self.bridge, self.profile, call_data, sources_data=sources_data, sinks_data=webrtc.SinksNone(), dc_data_list=[ webrtc.SinksDataChannel( dc_on_data_channel=self._on_data_channel, ) ], )