view libervia/frontends/tools/webrtc_remote_control.py @ 4242:8acf46ed7f36

frontends: remote control implementation: This is the frontends common part of remote control implementation. It handle the creation of WebRTC session, and management of inputs. For now the reception use freedesktop.org Desktop portal, and works mostly with Wayland based Desktop Environments. rel 436
author Goffi <goffi@goffi.org>
date Sat, 11 May 2024 13:52:43 +0200
parents
children 0d7bb4df2343
line wrap: on
line source

#!/usr/bin/env python3

# Libervia Remote-Control via WebRTC implementation.
# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import asyncio
from functools import partial
import logging
from typing import Any, Awaitable, Callable

import gi
from gi.overrides.Gst import GLib
from gi.repository import GstWebRTC

import cbor2
from libervia.backend.core.i18n import _
from libervia.backend.tools.common import data_format
from libervia.frontends.tools import aio, jid, webrtc
from libervia.frontends.tools.webrtc_models import (
    CallData,
    SourcesNone,
    SourcesPipeline,
)

gi.require_versions({"Gst": "1.0", "GstWebRTC": "1.0"})
OnOpenCbType = Callable[["WebRTCRemoteController"], None|Awaitable[None]]
MOUSE_BTN_LEFT = 0x110
MOUSE_BTN_RIGHT = 0x111
MOUSE_BTN_MIDDLE = 0x112
MOUSE_BTN_FORWARD = 0x115
MOUSE_BTN_BACK = 0x116


log = logging.getLogger(__name__)


class WebRTCRemoteController:

    def __init__(
        self,
        bridge,
        profile: str,
        on_call_start_cb: Callable[[dict], Any] | None = None,
        end_call_cb: Callable[[], Any] | None = None,
    ) -> None:
        """Initializes the File Sender.

        @param bridge: An async Bridge instance.
        @param profile: The profile name to be used.
        @param on_call_start_cb: A blocking or async callable that accepts a dict as its
            only argument.
        @param end_call_cb: A callable to be invoked at the end of a call.
        """

        self.bridge = bridge
        self.profile = profile
        self.on_call_start_cb = on_call_start_cb
        self.end_call_cb = end_call_cb
        self.loop = asyncio.get_event_loop()
        self.data_channel: GstWebRTC.WebRTCDataChannel|None = None

    def send_input(self, input_data: dict) -> None:
        """Send an input data to controlled device

        @param input_data: data of the input event.
        """
        assert self.data_channel is not None
        self.data_channel.send_data(GLib.Bytes(cbor2.dumps(input_data)))

    async def _on_webrtc_call_start(
        self,
        options: dict,
        callee: str,
        call_data: dict,
        profile: str,
    ) -> str:
        rc_data = {
            "webrtc": True,
            "call_data": call_data,
        }
        rc_data.update(options)
        remote_control_data_s = await self.bridge.remote_control_start(
            str(callee),
            data_format.serialise(
               rc_data
            ),
            profile,
        )
        remote_control_data = data_format.deserialise(remote_control_data_s)

        if self.on_call_start_cb is not None:
            await aio.maybe_async(self.on_call_start_cb(remote_control_data))
        return remote_control_data["session_id"]

    def _on_dc_close(self, webrtc_call, data_channel: GstWebRTC.WebRTCDataChannel):
        if webrtc_call is not None:
            aio.run_from_thread(self._end_call_and_quit, webrtc_call, loop=self.loop)

    async def _end_call_and_quit(self, webrtc_call):
        await webrtc_call.webrtc.end_call()
        if self.end_call_cb is not None:
            await aio.maybe_async(self.end_call_cb())

    def _on_dc_open(
        self, on_open_cb: OnOpenCbType, data_channel: GstWebRTC.WebRTCDataChannel
    ) -> None:
        """Called when datachannel is open"""
        self.data_channel = data_channel
        aio.run_from_thread(self.on_dc_opened, on_open_cb, loop=self.loop)

    async def on_dc_opened(self, on_open_cb: OnOpenCbType) -> None:
        await aio.maybe_async(on_open_cb(self))

    async def start(
        self,
        callee: jid.JID,
        options: dict,
        on_open_cb: OnOpenCbType
    ) -> None:
        """Start a remote control session with ``callee``

        @param callee: The JID of the recipient to send the file to.
        @param options: Options such as which devices to control.
        @param on_open_cb: Method to call when the Data Channel is open.
            The WebRTCRemoteController instance used as argument can then be used to send
            input events to received.
        """
        call_data = CallData(callee=callee)
        self.webrtc_call = await webrtc.WebRTCCall.make_webrtc_call(
            self.bridge,
            self.profile,
            call_data,
            sources_data=webrtc.SourcesDataChannel(
                dc_open_cb=partial(self._on_dc_open, on_open_cb),
            ),
            call_start_cb=partial(self._on_webrtc_call_start, options),
        )


class WebRTCRemoteControlReceiver:

    def __init__(
        self, bridge, profile: str, on_close_cb: Callable[[], Any] | None = None,
        verbose: bool = False
    ) -> None:
        """Initializes the File Receiver.

        @param bridge: An async Bridge instance.
        @param profile: The profile name to be used.
        @param on_close_cb: Called when the Data Channel is closed.
        @param verbose: if True, print input events.
        """
        self.bridge = bridge
        self.profile = profile
        self.on_close_cb = on_close_cb
        self.loop = asyncio.get_event_loop()
        self.desktop_portal = None
        self.remote_desktop_data: dict|None = None
        self.stream_node_id: int|None = None
        self.verbose = verbose

    async def do_input(self, data: dict) -> None:
        assert self.desktop_portal is not None
        try:
            type_ = data["type"]
            if type_.startswith("mouse"):
                try:
                    try:
                        x, y = data["x"], data["y"]
                    except KeyError:
                        dx, dy = data["movementX"], data["movementY"]
                        await self.desktop_portal.notify_pointer_motion(
                            dx, dy
                        )
                    else:
                        assert self.stream_node_id is not None
                        await self.desktop_portal.notify_pointer_motion_absolute(
                            self.stream_node_id, x, y
                        )
                except Exception:
                    log.exception("Can't send input")

                if type_ in ("mouseup", "mousedown"):
                    buttons = data["buttons"]
                    state = 1 if type_ == "mousedown" else 0
                    # see https://developer.mozilla.org/en-US/docs/Web/API/MouseEvent/buttons#value
                    if buttons & 1:
                        await self.desktop_portal.notify_pointer_button(
                            MOUSE_BTN_LEFT,
                            state
                        )
                    if buttons & 2:
                        await self.desktop_portal.notify_pointer_button(
                            MOUSE_BTN_RIGHT,
                            state
                        )
                    if buttons & 4:
                        await self.desktop_portal.notify_pointer_button(
                            MOUSE_BTN_MIDDLE,
                            state
                        )
                    if buttons & 8:
                        await self.desktop_portal.notify_pointer_button(
                            MOUSE_BTN_BACK,
                            state
                        )
                    if buttons & 16:
                        await self.desktop_portal.notify_pointer_button(
                            MOUSE_BTN_FORWARD,
                            state
                        )
            elif type_ == "wheel":
                dx = data.get("deltaX", 0)
                dy = data.get("deltaY", 0)
                delta_mode = data["deltaMode"]
                if delta_mode == 0:
                    # deltas are in pixels
                    await self.desktop_portal.notify_pointer_axis(
                        dx,
                        dy
                    )
                else:
                    # deltas are in steps (see
                    # https://developer.mozilla.org/en-US/docs/Web/API/Element/wheel_event#event_properties)
                    if dx:
                        await self.desktop_portal.notify_pointer_axis(
                            1,
                            dx
                        )
                    if dy:
                        await self.desktop_portal.notify_pointer_axis(
                            0,
                            dy
                        )
            elif type_.startswith("key"):
                # FIXME: this is a really naive implementation, it needs tot be improved.
                key = data["key"]
                if data.get("shiftKey", False):
                    key = key.upper()
                await self.desktop_portal.notify_keyboard_keysym(
                    ord(key), 1 if type_ == "keydown" else 0
                )

        except Exception:
            log.exception(f"Can't handle input {data}")


    def _on_dc_message_data(self, data_channel, glib_data) -> None:
        """A data chunk of the file has been received."""
        raw = glib_data.get_data()
        data = cbor2.loads(raw)
        if self.verbose:
            print(data)
        aio.run_from_thread(
            self.do_input,
            data,
            loop=self.loop
        )

    def _on_dc_close(self, data_channel) -> None:
        """Data channel is closed

        The file download should be complete, we close it.
        """
        aio.run_from_thread(self._on_close, loop=self.loop)

    async def _on_close(self) -> None:
        if self.on_close_cb is not None:
            await aio.maybe_async(self.on_close_cb())

    def _on_data_channel(self, webrtcbin, data_channel) -> None:
        """The data channel has been opened."""
        data_channel.connect(
            "on-message-data", self._on_dc_message_data
        )
        data_channel.connect("on-close", self._on_dc_close)

    async def request_remote_desktop(self, with_screen_sharing: bool) -> None:
        """Request autorisation to remote control desktop.

        @param with_screen_sharing: True if screen must be shared.
        """
        from .portal_desktop import DesktopPortal
        self.desktop_portal = DesktopPortal()
        self.remote_desktop_data = await self.desktop_portal.request_remote_desktop(
            with_screen_sharing
        )
        print(self.remote_desktop_data)

    async def start_receiving(
        self,
        from_jid: jid.JID,
        session_id: str,
        screenshare: dict
    ) -> None:
        """Receives a file via WebRTC and saves it to the specified path.

        @param from_jid: The JID of the entity sending the file.
        @param session_id: The Jingle FT Session ID.
        @param file_path: The local path where the received file will be saved.
            If a file already exists at this path, it will be overwritten.
        @param file_data: Additional data about the file being transferred.
        """
        call_data = CallData(callee=from_jid, sid=session_id)
        if "video" in screenshare and self.remote_desktop_data:
            try:
                stream_data = self.remote_desktop_data["stream_data"]
                log.debug(f"{stream_data=}")
                self.stream_node_id = stream_data["node_id"]

                sources_data = SourcesPipeline(
                    video_pipeline="pipewiresrc",
                    audio_pipeline="",
                    video_properties={
                        "path": str(self.stream_node_id),
                        "do-timestamp": 1,
                    }
                )
            except KeyError:
                sources_data = SourcesNone()
        else:
            sources_data = SourcesNone()

        await webrtc.WebRTCCall.make_webrtc_call(
            self.bridge,
            self.profile,
            call_data,
            sources_data = sources_data,
            sinks_data=webrtc.SinksNone(),
            dc_data_list=[webrtc.SinksDataChannel(
                dc_on_data_channel=self._on_data_channel,
            )],
        )