view libervia/frontends/tools/webrtc_remote_control.py @ 4339:699aa8788d98

tests (unit/email gateway): add tests for pubsub service: rel 453
author Goffi <goffi@goffi.org>
date Tue, 03 Dec 2024 00:52:06 +0100
parents 0d7bb4df2343
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia Remote-Control via WebRTC implementation.
# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import asyncio
from functools import partial
import logging
from typing import Any, Awaitable, Callable

import gi
from gi.overrides.Gst import GLib
from gi.repository import GstWebRTC

import cbor2
from libervia.backend.core.i18n import _
from libervia.backend.tools.common import data_format
from libervia.frontends.tools import aio, jid, webrtc
from libervia.frontends.tools.webrtc_models import (
    CallData,
    SourcesNone,
    SourcesPipeline,
)

gi.require_versions({"Gst": "1.0", "GstWebRTC": "1.0"})
OnOpenCbType = Callable[["WebRTCRemoteController"], None | Awaitable[None]]
MOUSE_BTN_LEFT = 0x110
MOUSE_BTN_RIGHT = 0x111
MOUSE_BTN_MIDDLE = 0x112
MOUSE_BTN_FORWARD = 0x115
MOUSE_BTN_BACK = 0x116


log = logging.getLogger(__name__)


class WebRTCRemoteController:

    def __init__(
        self,
        bridge,
        profile: str,
        on_call_start_cb: Callable[[dict], Any] | None = None,
        end_call_cb: Callable[[], Any] | None = None,
    ) -> None:
        """Initializes the File Sender.

        @param bridge: An async Bridge instance.
        @param profile: The profile name to be used.
        @param on_call_start_cb: A blocking or async callable that accepts a dict as its
            only argument.
        @param end_call_cb: A callable to be invoked at the end of a call.
        """

        self.bridge = bridge
        self.profile = profile
        self.on_call_start_cb = on_call_start_cb
        self.end_call_cb = end_call_cb
        self.loop = asyncio.get_event_loop()
        self.data_channel: GstWebRTC.WebRTCDataChannel | None = None

    def send_input(self, input_data: dict) -> None:
        """Send an input data to controlled device

        @param input_data: data of the input event.
        """
        assert self.data_channel is not None
        self.data_channel.send_data(GLib.Bytes(cbor2.dumps(input_data)))

    async def _on_webrtc_call_start(
        self,
        options: dict,
        callee: str,
        call_data: dict,
        profile: str,
    ) -> str:
        rc_data = {
            "webrtc": True,
            "call_data": call_data,
        }
        rc_data.update(options)
        remote_control_data_s = await self.bridge.remote_control_start(
            str(callee),
            data_format.serialise(rc_data),
            profile,
        )
        remote_control_data = data_format.deserialise(remote_control_data_s)

        if self.on_call_start_cb is not None:
            await aio.maybe_async(self.on_call_start_cb(remote_control_data))
        return remote_control_data["session_id"]

    def _on_dc_close(self, webrtc_call, data_channel: GstWebRTC.WebRTCDataChannel):
        if webrtc_call is not None:
            aio.run_from_thread(self._end_call_and_quit, webrtc_call, loop=self.loop)

    async def _end_call_and_quit(self, webrtc_call):
        await webrtc_call.webrtc.end_call()
        if self.end_call_cb is not None:
            await aio.maybe_async(self.end_call_cb())

    def _on_dc_open(
        self, on_open_cb: OnOpenCbType, data_channel: GstWebRTC.WebRTCDataChannel
    ) -> None:
        """Called when datachannel is open"""
        self.data_channel = data_channel
        aio.run_from_thread(self.on_dc_opened, on_open_cb, loop=self.loop)

    async def on_dc_opened(self, on_open_cb: OnOpenCbType) -> None:
        await aio.maybe_async(on_open_cb(self))

    async def start(
        self, callee: jid.JID, options: dict, on_open_cb: OnOpenCbType
    ) -> None:
        """Start a remote control session with ``callee``

        @param callee: The JID of the recipient to send the file to.
        @param options: Options such as which devices to control.
        @param on_open_cb: Method to call when the Data Channel is open.
            The WebRTCRemoteController instance used as argument can then be used to send
            input events to received.
        """
        call_data = CallData(callee=callee)
        self.webrtc_call = await webrtc.WebRTCCall.make_webrtc_call(
            self.bridge,
            self.profile,
            call_data,
            sources_data=webrtc.SourcesDataChannel(
                dc_open_cb=partial(self._on_dc_open, on_open_cb),
            ),
            call_start_cb=partial(self._on_webrtc_call_start, options),
        )


class WebRTCRemoteControlReceiver:

    def __init__(
        self,
        bridge,
        profile: str,
        on_close_cb: Callable[[], Any] | None = None,
        verbose: bool = False,
    ) -> None:
        """Initializes the File Receiver.

        @param bridge: An async Bridge instance.
        @param profile: The profile name to be used.
        @param on_close_cb: Called when the Data Channel is closed.
        @param verbose: if True, print input events.
        """
        self.bridge = bridge
        self.profile = profile
        self.on_close_cb = on_close_cb
        self.loop = asyncio.get_event_loop()
        self.desktop_portal = None
        self.remote_desktop_data: dict | None = None
        self.stream_node_id: int | None = None
        self.verbose = verbose

    async def do_input(self, data: dict) -> None:
        assert self.desktop_portal is not None
        try:
            type_ = data["type"]
            if type_.startswith("mouse"):
                try:
                    try:
                        x, y = data["x"], data["y"]
                    except KeyError:
                        dx, dy = data["movementX"], data["movementY"]
                        await self.desktop_portal.notify_pointer_motion(dx, dy)
                    else:
                        assert self.stream_node_id is not None
                        await self.desktop_portal.notify_pointer_motion_absolute(
                            self.stream_node_id, x, y
                        )
                except Exception:
                    log.exception("Can't send input")

                if type_ in ("mouseup", "mousedown"):
                    buttons = data["buttons"]
                    state = 1 if type_ == "mousedown" else 0
                    # see https://developer.mozilla.org/en-US/docs/Web/API/MouseEvent/buttons#value
                    if buttons & 1:
                        await self.desktop_portal.notify_pointer_button(
                            MOUSE_BTN_LEFT, state
                        )
                    if buttons & 2:
                        await self.desktop_portal.notify_pointer_button(
                            MOUSE_BTN_RIGHT, state
                        )
                    if buttons & 4:
                        await self.desktop_portal.notify_pointer_button(
                            MOUSE_BTN_MIDDLE, state
                        )
                    if buttons & 8:
                        await self.desktop_portal.notify_pointer_button(
                            MOUSE_BTN_BACK, state
                        )
                    if buttons & 16:
                        await self.desktop_portal.notify_pointer_button(
                            MOUSE_BTN_FORWARD, state
                        )
            elif type_ == "wheel":
                dx = data.get("deltaX", 0)
                dy = data.get("deltaY", 0)
                delta_mode = data["deltaMode"]
                if delta_mode == 0:
                    # deltas are in pixels
                    await self.desktop_portal.notify_pointer_axis(dx, dy)
                else:
                    # deltas are in steps (see
                    # https://developer.mozilla.org/en-US/docs/Web/API/Element/wheel_event#event_properties)
                    if dx:
                        await self.desktop_portal.notify_pointer_axis(1, dx)
                    if dy:
                        await self.desktop_portal.notify_pointer_axis(0, dy)
            elif type_.startswith("key"):
                # FIXME: this is a really naive implementation, it needs tot be improved.
                key = data["key"]
                if data.get("shiftKey", False):
                    key = key.upper()
                await self.desktop_portal.notify_keyboard_keysym(
                    ord(key), 1 if type_ == "keydown" else 0
                )

        except Exception:
            log.exception(f"Can't handle input {data}")

    def _on_dc_message_data(self, data_channel, glib_data) -> None:
        """A data chunk of the file has been received."""
        raw = glib_data.get_data()
        data = cbor2.loads(raw)
        if self.verbose:
            print(data)
        aio.run_from_thread(self.do_input, data, loop=self.loop)

    def _on_dc_close(self, data_channel) -> None:
        """Data channel is closed

        The file download should be complete, we close it.
        """
        aio.run_from_thread(self._on_close, loop=self.loop)

    async def _on_close(self) -> None:
        if self.on_close_cb is not None:
            await aio.maybe_async(self.on_close_cb())

    def _on_data_channel(self, webrtcbin, data_channel) -> None:
        """The data channel has been opened."""
        data_channel.connect("on-message-data", self._on_dc_message_data)
        data_channel.connect("on-close", self._on_dc_close)

    async def request_remote_desktop(self, with_screen_sharing: bool) -> None:
        """Request autorisation to remote control desktop.

        @param with_screen_sharing: True if screen must be shared.
        """
        from .portal_desktop import DesktopPortal

        self.desktop_portal = DesktopPortal()
        self.remote_desktop_data = await self.desktop_portal.request_remote_desktop(
            with_screen_sharing
        )
        print(self.remote_desktop_data)

    async def start_receiving(
        self, from_jid: jid.JID, session_id: str, screenshare: dict
    ) -> None:
        """Receives a file via WebRTC and saves it to the specified path.

        @param from_jid: The JID of the entity sending the file.
        @param session_id: The Jingle FT Session ID.
        @param file_path: The local path where the received file will be saved.
            If a file already exists at this path, it will be overwritten.
        @param file_data: Additional data about the file being transferred.
        """
        call_data = CallData(callee=from_jid, sid=session_id)
        if "video" in screenshare and self.remote_desktop_data:
            try:
                stream_data = self.remote_desktop_data["stream_data"]
                log.debug(f"{stream_data=}")
                self.stream_node_id = stream_data["node_id"]

                sources_data = SourcesPipeline(
                    video_pipeline="pipewiresrc",
                    audio_pipeline="",
                    video_properties={
                        "path": str(self.stream_node_id),
                        "do-timestamp": 1,
                    },
                )
            except KeyError:
                sources_data = SourcesNone()
        else:
            sources_data = SourcesNone()

        await webrtc.WebRTCCall.make_webrtc_call(
            self.bridge,
            self.profile,
            call_data,
            sources_data=sources_data,
            sinks_data=webrtc.SinksNone(),
            dc_data_list=[
                webrtc.SinksDataChannel(
                    dc_on_data_channel=self._on_data_channel,
                )
            ],
        )