view libervia/cli/cmd_remote_control.py @ 4266:9fc3d28bc3f6

core (main): add a mechanism to have a shared temp directory: this directory may be used to share files between backend and frontends. Normally, an os-dependent temporary directory is created for that, but if this option is set, the directory will be created in <local_dir>/<cache_dir>, which may be useful in some use case (e.g. containerized frontends and backend).
author Goffi <goffi@goffi.org>
date Wed, 12 Jun 2024 22:47:34 +0200
parents e47e29511d57
children 0d7bb4df2343
line wrap: on
line source

#!/usr/bin/env python3


# Libervia CLI
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import asyncio
from functools import partial
import logging
import xml.etree.ElementTree as ET

from prompt_toolkit.input import Input, create_input
from prompt_toolkit.key_binding import KeyPress
from prompt_toolkit.keys import Keys
from prompt_toolkit.patch_stdout import patch_stdout
from prompt_toolkit import PromptSession

from libervia.backend.core import exceptions
from libervia.backend.core.i18n import _
from libervia.backend.tools.common import data_format
from libervia.cli.constants import Const as C
from libervia.frontends.tools import aio, jid
from libervia.frontends.tools.webrtc_remote_control import WebRTCRemoteController

from . import base
from . import xmlui_manager

__commands__ = ["RemoteControl"]


class Send(base.CommandBase):
    def __init__(self, host):
        super(Send, self).__init__(
            host,
            "send",
            use_progress=True,
            use_verbose=True,
            help=_("remote control another device"),
        )
        self.remote_controller: WebRTCRemoteController | None = None

    def add_parser_options(self):
        self.parser.add_argument("jid", help=_("the destination jid"))

    def send_key(
        self,
        key: str,
        code: str,
        ctrl_key: bool = False,
        shift_key: bool = False,
        alt_key: bool = False,
        meta_key: bool = False,
    ) -> None:
        """
        Send the key press input.

        @param key: The key pressed.
        @param code: The code of the key pressed.
        @param ctrl_key: Whether the Ctrl key was pressed.
        @param shift_key: Whether the Shift key was pressed.
        @param alt_key: Whether the Alt key was pressed.
        @param meta_key: Whether the Meta key was pressed.
        """
        assert self.remote_controller is not None
        event_data = {
            "key": key,
            "code": code,
            "ctrlKey": ctrl_key,
            "shiftKey": shift_key,
            "altKey": alt_key,
            "metaKey": meta_key,
        }
        # we send both events as we don't distinguish them.
        for evt_type in ["keydown", "keyup"]:
            key_data = {"type": evt_type, **event_data}
            self.remote_controller.send_input(key_data)
            print(f"Sending {key_data}")

    def handle_key_press(self, key_press: KeyPress) -> None:
        """Handle key press event."""
        key = key_press.key
        if key.startswith("c-"):
            key = key[2:]  # remove "c-" prefix
        elif key.startswith("s-"):
            key = key[2:]  # remove "s-" prefix
        self.send_key(
            key=key.lower(),
            # FIXME: handle properly code.
            code=key.lower(),
            ctrl_key=key_press.key.startswith("c-"),
            shift_key=key_press.key.startswith("s-") or key.isupper(),
            # FIXME: alt-key is translated to escape + key, see
            #    https://python-prompt-toolkit.readthedocs.io/en/master/pages/advanced_topics/key_bindings.html
            alt_key=False,
            # TODO
            meta_key=False,
        )

    def on_keys_ready(self, input_: Input, handle_key_fut: asyncio.Future) -> None:
        for key_press in input_.read_keys():
            if key_press.key == Keys.ControlC:
                handle_key_fut.set_exception(KeyboardInterrupt())
            else:
                self.handle_key_press(key_press)

    async def confirm_ctrl_c(self) -> bool:
        """Ask user if they want to send Ctrl-C event or quit."""
        session = PromptSession()
        with patch_stdout():
            while True:
                response = await session.prompt_async(
                    "Ctrl-C pressed. Send event (e) or quit (q)? (e/q): "
                )
                if response.lower() == "e":
                    return True
                elif response.lower() == "q":
                    return False

    async def handle_ctrl_c(self) -> None:
        """Handle Ctrl-C key press."""
        if await self.confirm_ctrl_c():
            self.send_key(key="c", code="c", ctrl_key=True)
        else:
            await self.host.a_quit()

    async def _on_open(self, remote_controller: WebRTCRemoteController) -> None:
        input_ = create_input()
        self.disp(
            "Connection with peer established. Your keyboard input will be sent to "
            "controlled device."
        )

        while True:
            handle_key_fut = asyncio.Future()
            try:
                with input_.raw_mode():
                    with input_.attach(
                        partial(self.on_keys_ready, input_, handle_key_fut)
                    ):
                        await handle_key_fut
            except KeyboardInterrupt:
                await self.handle_ctrl_c()

    async def start(self):
        root_logger = logging.getLogger()
        # we don't want any formatting for messages from webrtc
        for handler in root_logger.handlers:
            handler.setFormatter(None)
        if self.verbosity == 0:
            root_logger.setLevel(logging.ERROR)
        if self.verbosity >= 1:
            root_logger.setLevel(logging.WARNING)
        if self.verbosity >= 2:
            root_logger.setLevel(logging.DEBUG)

        aio.install_glib_asyncio_iteration()
        self.remote_controller = WebRTCRemoteController(
            self.host.bridge, self.profile, end_call_cb=self.host.a_quit
        )
        await self.remote_controller.start(
            self.args.jid, {"devices": {"keyboard": {}}}, on_open_cb=self._on_open
        )


class Receive(base.CommandAnswering):
    def __init__(self, host):
        super().__init__(
            host,
            "receive",
            use_verbose=True,
            help=_("be remote controlled by another device"),
        )
        self.action_callbacks = {
            C.META_TYPE_CONFIRM: self.on_confirm_action,
            C.META_TYPE_NOT_IN_ROSTER_LEAK: self.on_confirm_action,
            C.META_TYPE_REMOTE_CONTROL: self.on_remote_control_action,
        }
        self.receiver = None

    def add_parser_options(self):
        self.parser.add_argument(
            "-S",
            "--share-screen",
            choices=["yes", "no", "auto"],
            default="auto",
            help=_("share the screen (default: auto)"),
        )
        self.parser.add_argument(
            "jids",
            nargs="+",
            help=_("jids accepted automatically"),
        )

    async def start_webrtc(
        self, from_jid: jid.JID, session_id: str, screenshare: dict, devices: dict
    ) -> None:
        """Start the WebRTC workflown"""
        assert self.receiver is not None
        root_logger = logging.getLogger()
        # we don't want any formatting for messages from webrtc
        for handler in root_logger.handlers:
            handler.setFormatter(None)
        if self.verbosity == 0:
            root_logger.setLevel(logging.ERROR)
        if self.verbosity >= 1:
            root_logger.setLevel(logging.WARNING)
        if self.verbosity >= 2:
            root_logger.setLevel(logging.DEBUG)
        await self.receiver.start_receiving(from_jid, session_id, screenshare)

    def get_xmlui_id(self, action_data):
        # FIXME: we temporarily use ElementTree, but a real XMLUI managing module
        #        should be available in the futur
        # TODO: XMLUI module
        try:
            xml_ui = action_data["xmlui"]
        except KeyError:
            self.disp(_("Action has no XMLUI"), 1)
        else:
            ui = ET.fromstring(xml_ui.encode("utf-8"))
            xmlui_id = ui.get("submit")
            if not xmlui_id:
                self.disp(_("Invalid XMLUI received"), error=True)
            return xmlui_id

    async def get_confirmation(
        self, action_data: dict, pre_accepted: bool = False
    ) -> tuple[str, jid.JID, bool]:
        """Check if action is confirmed, and ask user otherwise

        @param action_data: Data as used in on_action_new.
        @return: a tuple with:
            - XMLUI ID
            - sender JID
            - confirmation boolean
        """
        xmlui_id = self.get_xmlui_id(action_data)
        if xmlui_id is None:
            self.disp("Internal ERROR: xmlui_id missing", error=True)
            raise exceptions.InternalError()
        if (
            action_data["type"] != C.META_TYPE_REMOTE_CONTROL
            and action_data.get("subtype") != C.META_TYPE_REMOTE_CONTROL
        ):
            self.disp(_("Ignoring confirm dialog unrelated to remote control."), 1)
            raise exceptions.CancelError
        try:
            from_jid = jid.JID(action_data["from_jid"])
        except ValueError:
            self.disp(
                _('invalid "from_jid" value received, ignoring: {value}').format(
                    value=action_data["from_jid"]
                ),
                error=True,
            )
            raise exceptions.DataError
        except KeyError:
            self.disp(_('ignoring action without "from_jid" value'), error=True)
            raise exceptions.DataError

        self.disp(_("Confirmation needed for request from an entity not in roster"), 1)

        if pre_accepted:
            # Session has already been accepted during pre-flight.
            confirmed = True
        elif action_data["type"] == C.META_TYPE_CONFIRM and not self.bare_jids:
            # Sender is in roster, and we have an "accept all" policy.
            confirmed = True
            self.disp(
                _(
                    f"{from_jid} automatically confirmed due to entity being in roster"
                    " and accept all policy."
                )
            )
        elif from_jid.bare in self.bare_jids:
            # If the sender is expected, we can confirm the session.
            confirmed = True
            self.disp(_("Sender confirmed because they are explicitly expected"), 1)
        else:
            # Not automatically accepted, we ask authorisation to the user.
            xmlui = xmlui_manager.create(self.host, action_data["xmlui"])
            confirmed = await self.host.confirm(xmlui.dlg.message)
        return xmlui_id, from_jid, confirmed

    async def on_confirm_action(self, action_data, action_id, security_limit, profile):
        """Handle pre-flight remote control request"""
        try:
            xmlui_id, from_jid, confirmed = await self.get_confirmation(action_data)
        except exceptions.InternalError:
            self.host.quit_from_signal(1)
            return
        except (exceptions.InternalError, exceptions.DataError):
            return
        xmlui_data = {"answer": C.bool_const(confirmed)}
        await self.host.bridge.action_launch(
            xmlui_id, data_format.serialise(xmlui_data), profile_key=profile
        )
        if not confirmed:
            self.disp(_("Session refused for {from_jid}").format(from_jid=from_jid))
            self.host.quit_from_signal(0)

    async def on_remote_control_action(
        self, action_data, action_id, security_limit, profile
    ):
        """Handles actual remote control request"""
        try:
            session_id = action_data["session_id"]
        except KeyError:
            self.disp(
                f"Internal Error: Session ID is missing in action data: {action_data=}",
                error=True,
            )
            return
        pre_accepted = action_data.get("pre_accepted", False)
        try:
            xmlui_id, from_jid, confirmed = await self.get_confirmation(
                action_data, pre_accepted
            )
        except exceptions.InternalError:
            self.host.quit_from_signal(1)
            return
        except (exceptions.InternalError, exceptions.DataError):
            return
        if confirmed:
            await self.start_webrtc(
                from_jid,
                session_id,
                action_data.get("screenshare", {}),
                action_data.get("devices", {}),
            )
        xmlui_data = {"answer": C.bool_const(confirmed)}
        await self.host.bridge.action_launch(
            xmlui_id, data_format.serialise(xmlui_data), profile_key=profile
        )

    async def start(self):
        self.bare_jids = [jid.JID(jid_).bare for jid_ in self.args.jids]
        from libervia.frontends.tools.webrtc_remote_control import (
            WebRTCRemoteControlReceiver,
        )

        self.receiver = WebRTCRemoteControlReceiver(
            self.host.bridge,
            self.profile,
            on_close_cb=self.host.a_quit,
            verbose=self.verbosity >= 1,
        )
        aio.install_glib_asyncio_iteration()
        # FIXME: for now AUTO always do the screen sharing, but is should be disabled and
        #   appropriate method should be used when no desktop environment is detected.
        with_screen_sharing = self.args.share_screen in ("yes", "auto")
        await self.receiver.request_remote_desktop(
            with_screen_sharing
        )

        self.disp(_("Waiting for controlling device…"))
        await self.start_answering()


class RemoteControl(base.CommandBase):
    subcommands = (Send, Receive)

    def __init__(self, host):
        super().__init__(
            host,
            "remote-control",
            use_profile=False,
            help=_("Control or be controlled by another device."),
        )