view libervia/cli/cmd_remote_control.py @ 4309:b56b1eae7994

component email gateway: add multicasting: XEP-0033 multicasting is now supported both for incoming and outgoing messages. XEP-0033 metadata are converted to suitable Email headers and vice versa. Email address and JID are both supported, and delivery is done by the gateway when suitable on incoming messages. rel 450
author Goffi <goffi@goffi.org>
date Thu, 26 Sep 2024 16:12:01 +0200
parents 0d7bb4df2343
children
line wrap: on
line source

#!/usr/bin/env python3


# Libervia CLI
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import asyncio
from functools import partial
import logging
import xml.etree.ElementTree as ET

from prompt_toolkit.input import Input, create_input
from prompt_toolkit.key_binding import KeyPress
from prompt_toolkit.keys import Keys
from prompt_toolkit.patch_stdout import patch_stdout
from prompt_toolkit import PromptSession

from libervia.backend.core import exceptions
from libervia.backend.core.i18n import _
from libervia.backend.tools.common import data_format
from libervia.cli.constants import Const as C
from libervia.frontends.tools import aio, jid
from libervia.frontends.tools.webrtc_remote_control import WebRTCRemoteController

from . import base
from . import xmlui_manager

__commands__ = ["RemoteControl"]


class Send(base.CommandBase):
    def __init__(self, host):
        super(Send, self).__init__(
            host,
            "send",
            use_progress=True,
            use_verbose=True,
            help=_("remote control another device"),
        )
        self.remote_controller: WebRTCRemoteController | None = None

    def add_parser_options(self):
        self.parser.add_argument("jid", help=_("the destination jid"))

    def send_key(
        self,
        key: str,
        code: str,
        ctrl_key: bool = False,
        shift_key: bool = False,
        alt_key: bool = False,
        meta_key: bool = False,
    ) -> None:
        """
        Send the key press input.

        @param key: The key pressed.
        @param code: The code of the key pressed.
        @param ctrl_key: Whether the Ctrl key was pressed.
        @param shift_key: Whether the Shift key was pressed.
        @param alt_key: Whether the Alt key was pressed.
        @param meta_key: Whether the Meta key was pressed.
        """
        assert self.remote_controller is not None
        event_data = {
            "key": key,
            "code": code,
            "ctrlKey": ctrl_key,
            "shiftKey": shift_key,
            "altKey": alt_key,
            "metaKey": meta_key,
        }
        # we send both events as we don't distinguish them.
        for evt_type in ["keydown", "keyup"]:
            key_data = {"type": evt_type, **event_data}
            self.remote_controller.send_input(key_data)
            print(f"Sending {key_data}")

    def handle_key_press(self, key_press: KeyPress) -> None:
        """Handle key press event."""
        key = key_press.key
        if key.startswith("c-"):
            key = key[2:]  # remove "c-" prefix
        elif key.startswith("s-"):
            key = key[2:]  # remove "s-" prefix
        self.send_key(
            key=key.lower(),
            # FIXME: handle properly code.
            code=key.lower(),
            ctrl_key=key_press.key.startswith("c-"),
            shift_key=key_press.key.startswith("s-") or key.isupper(),
            # FIXME: alt-key is translated to escape + key, see
            #    https://python-prompt-toolkit.readthedocs.io/en/master/pages/advanced_topics/key_bindings.html
            alt_key=False,
            # TODO
            meta_key=False,
        )

    def on_keys_ready(self, input_: Input, handle_key_fut: asyncio.Future) -> None:
        for key_press in input_.read_keys():
            if key_press.key == Keys.ControlC:
                handle_key_fut.set_exception(KeyboardInterrupt())
            else:
                self.handle_key_press(key_press)

    async def confirm_ctrl_c(self) -> bool:
        """Ask user if they want to send Ctrl-C event or quit."""
        session = PromptSession()
        with patch_stdout():
            while True:
                response = await session.prompt_async(
                    "Ctrl-C pressed. Send event (e) or quit (q)? (e/q): "
                )
                if response.lower() == "e":
                    return True
                elif response.lower() == "q":
                    return False

    async def handle_ctrl_c(self) -> None:
        """Handle Ctrl-C key press."""
        if await self.confirm_ctrl_c():
            self.send_key(key="c", code="c", ctrl_key=True)
        else:
            await self.host.a_quit()

    async def _on_open(self, remote_controller: WebRTCRemoteController) -> None:
        input_ = create_input()
        self.disp(
            "Connection with peer established. Your keyboard input will be sent to "
            "controlled device."
        )

        while True:
            handle_key_fut = asyncio.Future()
            try:
                with input_.raw_mode():
                    with input_.attach(
                        partial(self.on_keys_ready, input_, handle_key_fut)
                    ):
                        await handle_key_fut
            except KeyboardInterrupt:
                await self.handle_ctrl_c()

    async def start(self):
        root_logger = logging.getLogger()
        # we don't want any formatting for messages from webrtc
        for handler in root_logger.handlers:
            handler.setFormatter(None)
        if self.verbosity == 0:
            root_logger.setLevel(logging.ERROR)
        if self.verbosity >= 1:
            root_logger.setLevel(logging.WARNING)
        if self.verbosity >= 2:
            root_logger.setLevel(logging.DEBUG)

        aio.install_glib_asyncio_iteration()
        self.remote_controller = WebRTCRemoteController(
            self.host.bridge, self.profile, end_call_cb=self.host.a_quit
        )
        await self.remote_controller.start(
            self.args.jid, {"devices": {"keyboard": {}}}, on_open_cb=self._on_open
        )


class Receive(base.CommandAnswering):
    def __init__(self, host):
        super().__init__(
            host,
            "receive",
            use_verbose=True,
            help=_("be remote controlled by another device"),
        )
        self.action_callbacks = {
            C.META_TYPE_CONFIRM: self.on_confirm_action,
            C.META_TYPE_NOT_IN_ROSTER_LEAK: self.on_confirm_action,
            C.META_TYPE_REMOTE_CONTROL: self.on_remote_control_action,
        }
        self.receiver = None

    def add_parser_options(self):
        self.parser.add_argument(
            "-S",
            "--share-screen",
            choices=["yes", "no", "auto"],
            default="auto",
            help=_("share the screen (default: auto)"),
        )
        self.parser.add_argument(
            "jids",
            nargs="+",
            help=_("jids accepted automatically"),
        )

    async def start_webrtc(
        self, from_jid: jid.JID, session_id: str, screenshare: dict, devices: dict
    ) -> None:
        """Start the WebRTC workflown"""
        assert self.receiver is not None
        root_logger = logging.getLogger()
        # we don't want any formatting for messages from webrtc
        for handler in root_logger.handlers:
            handler.setFormatter(None)
        if self.verbosity == 0:
            root_logger.setLevel(logging.ERROR)
        if self.verbosity >= 1:
            root_logger.setLevel(logging.WARNING)
        if self.verbosity >= 2:
            root_logger.setLevel(logging.DEBUG)
        await self.receiver.start_receiving(from_jid, session_id, screenshare)

    def get_xmlui_id(self, action_data):
        # FIXME: we temporarily use ElementTree, but a real XMLUI managing module
        #        should be available in the futur
        # TODO: XMLUI module
        try:
            xml_ui = action_data["xmlui"]
        except KeyError:
            self.disp(_("Action has no XMLUI"), 1)
        else:
            ui = ET.fromstring(xml_ui.encode("utf-8"))
            xmlui_id = ui.get("submit")
            if not xmlui_id:
                self.disp(_("Invalid XMLUI received"), error=True)
            return xmlui_id

    async def get_confirmation(
        self, action_data: dict, pre_accepted: bool = False
    ) -> tuple[str, jid.JID, bool]:
        """Check if action is confirmed, and ask user otherwise

        @param action_data: Data as used in on_action_new.
        @return: a tuple with:
            - XMLUI ID
            - sender JID
            - confirmation boolean
        """
        xmlui_id = self.get_xmlui_id(action_data)
        if xmlui_id is None:
            self.disp("Internal ERROR: xmlui_id missing", error=True)
            raise exceptions.InternalError()
        if (
            action_data["type"] != C.META_TYPE_REMOTE_CONTROL
            and action_data.get("subtype") != C.META_TYPE_REMOTE_CONTROL
        ):
            self.disp(_("Ignoring confirm dialog unrelated to remote control."), 1)
            raise exceptions.CancelError
        try:
            from_jid = jid.JID(action_data["from_jid"])
        except ValueError:
            self.disp(
                _('invalid "from_jid" value received, ignoring: {value}').format(
                    value=action_data["from_jid"]
                ),
                error=True,
            )
            raise exceptions.DataError
        except KeyError:
            self.disp(_('ignoring action without "from_jid" value'), error=True)
            raise exceptions.DataError

        self.disp(_("Confirmation needed for request from an entity not in roster"), 1)

        if pre_accepted:
            # Session has already been accepted during pre-flight.
            confirmed = True
        elif action_data["type"] == C.META_TYPE_CONFIRM and not self.bare_jids:
            # Sender is in roster, and we have an "accept all" policy.
            confirmed = True
            self.disp(
                _(
                    f"{from_jid} automatically confirmed due to entity being in roster"
                    " and accept all policy."
                )
            )
        elif from_jid.bare in self.bare_jids:
            # If the sender is expected, we can confirm the session.
            confirmed = True
            self.disp(_("Sender confirmed because they are explicitly expected"), 1)
        else:
            # Not automatically accepted, we ask authorisation to the user.
            xmlui = xmlui_manager.create(self.host, action_data["xmlui"])
            confirmed = await self.host.confirm(xmlui.dlg.message)
        return xmlui_id, from_jid, confirmed

    async def on_confirm_action(self, action_data, action_id, security_limit, profile):
        """Handle pre-flight remote control request"""
        try:
            xmlui_id, from_jid, confirmed = await self.get_confirmation(action_data)
        except exceptions.InternalError:
            self.host.quit_from_signal(1)
            return
        except (exceptions.InternalError, exceptions.DataError):
            return
        xmlui_data = {"answer": C.bool_const(confirmed)}
        await self.host.bridge.action_launch(
            xmlui_id, data_format.serialise(xmlui_data), profile_key=profile
        )
        if not confirmed:
            self.disp(_("Session refused for {from_jid}").format(from_jid=from_jid))
            self.host.quit_from_signal(0)

    async def on_remote_control_action(
        self, action_data, action_id, security_limit, profile
    ):
        """Handles actual remote control request"""
        try:
            session_id = action_data["session_id"]
        except KeyError:
            self.disp(
                f"Internal Error: Session ID is missing in action data: {action_data=}",
                error=True,
            )
            return
        pre_accepted = action_data.get("pre_accepted", False)
        try:
            xmlui_id, from_jid, confirmed = await self.get_confirmation(
                action_data, pre_accepted
            )
        except exceptions.InternalError:
            self.host.quit_from_signal(1)
            return
        except (exceptions.InternalError, exceptions.DataError):
            return
        if confirmed:
            await self.start_webrtc(
                from_jid,
                session_id,
                action_data.get("screenshare", {}),
                action_data.get("devices", {}),
            )
        xmlui_data = {"answer": C.bool_const(confirmed)}
        await self.host.bridge.action_launch(
            xmlui_id, data_format.serialise(xmlui_data), profile_key=profile
        )

    async def start(self):
        self.bare_jids = [jid.JID(jid_).bare for jid_ in self.args.jids]
        from libervia.frontends.tools.webrtc_remote_control import (
            WebRTCRemoteControlReceiver,
        )

        self.receiver = WebRTCRemoteControlReceiver(
            self.host.bridge,
            self.profile,
            on_close_cb=self.host.a_quit,
            verbose=self.verbosity >= 1,
        )
        aio.install_glib_asyncio_iteration()
        # FIXME: for now AUTO always do the screen sharing, but is should be disabled and
        #   appropriate method should be used when no desktop environment is detected.
        with_screen_sharing = self.args.share_screen in ("yes", "auto")
        await self.receiver.request_remote_desktop(with_screen_sharing)

        self.disp(_("Waiting for controlling device…"))
        await self.start_answering()


class RemoteControl(base.CommandBase):
    subcommands = (Send, Receive)

    def __init__(self, host):
        super().__init__(
            host,
            "remote-control",
            use_profile=False,
            help=_("Control or be controlled by another device."),
        )