view libervia/cli/cmd_call.py @ 4161:2074b2bbe616

core (memory/sqla_mapping): `delete-orphan` in History: add `cascade=delete-orphan` to History's `messages`, `subjects` and `thread`, to make modification of those attributes easier.
author Goffi <goffi@goffi.org>
date Tue, 28 Nov 2023 17:35:20 +0100
parents 849721e1563b
children 0f8ea0768a3b
line wrap: on
line source

#!/usr/bin/env python3


# Libervia CLI
# Copyright (C) 2009-2021 JΓ©rΓ΄me Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


from argparse import ArgumentParser
import asyncio
from functools import partial
import logging
import os

from prompt_toolkit.input import create_input
from prompt_toolkit.keys import Keys

from libervia.backend.core.i18n import _
from libervia.backend.tools.common import data_format
from libervia.cli.constants import Const as C
from libervia.frontends.tools import aio, jid
from rich.columns import Columns
from rich.console import group
from rich.live import Live
from rich.panel import Panel
from rich.text import Text

from . import base

__commands__ = ["Call"]


class WebRTCCall:
    def __init__(self, host, profile: str, callee: jid.JID):
        from libervia.frontends.tools import webrtc

        aio.install_glib_asyncio_iteration()
        self.host = host
        self.profile = profile
        self.webrtc = webrtc.WebRTC(host.bridge, profile)
        self.webrtc.callee = callee
        host.bridge.register_signal(
            "ice_candidates_new", self.on_ice_candidates_new, "plugin"
        )
        host.bridge.register_signal("call_setup", self.on_call_setup, "plugin")
        host.bridge.register_signal("call_ended", self.on_call_ended, "plugin")

    @property
    def sid(self) -> str | None:
        return self.webrtc.sid

    @sid.setter
    def sid(self, new_sid: str | None) -> None:
        self.webrtc.sid = new_sid

    async def on_ice_candidates_new(
        self, sid: str, candidates_s: str, profile: str
    ) -> None:
        if sid != self.webrtc.sid or profile != self.profile:
            return
        self.webrtc.on_ice_candidates_new(
            data_format.deserialise(candidates_s),
        )

    async def on_call_setup(self, sid: str, setup_data_s: str, profile: str) -> None:
        if sid != self.webrtc.sid or profile != self.profile:
            return
        setup_data = data_format.deserialise(setup_data_s)
        try:
            role = setup_data["role"]
            sdp = setup_data["sdp"]
        except KeyError:
            self.host.disp(f"Invalid setup data received: {setup_data}", error=True)
            return
        if role == "initiator":
            self.webrtc.on_accepted_call(sdp, profile)
        elif role == "responder":
            await self.webrtc.answer_call(sdp, profile)
        else:
            self.host.disp(
                f"Invalid role received during setup: {setup_data}", error=True
            )
        # we want to be sure that call is ended if user presses `Ctrl + c` or anything
        # else stops the session.
        self.host.add_on_quit_callback(
            lambda: self.host.bridge.call_end(sid, "", profile)
        )

    async def on_call_ended(self, sid: str, data_s: str, profile: str) -> None:
        if sid != self.webrtc.sid or profile != self.profile:
            return
        await self.webrtc.end_call()
        await self.host.a_quit()

    async def start(self):
        """Start a call.

        To be used only if we are initiator
        """
        await self.webrtc.setup_call("initiator")
        self.webrtc.start_pipeline()


class UI:
    def __init__(self, host, webrtc):
        self.host = host
        self.webrtc = webrtc

    def styled_shortcut_key(self, word: str, key: str | None = None) -> Text:
        """Return a word with the specified key or the first letter underlined."""
        if key is None:
            key = word[0]
        index = word.find(key)
        before, keyword, after = word[:index], word[index], word[index + 1 :]
        return Text(before) + Text(keyword, style="shortcut") + Text(after)

    def get_micro_display(self):
        if self.webrtc.audio_muted:
            return Panel(Text("πŸ”‡ ") + self.styled_shortcut_key("Muted"), expand=False)
        else:
            return Panel(
                Text("🎀 ") + self.styled_shortcut_key("Unmuted", "m"), expand=False
            )

    def get_video_display(self):
        if self.webrtc.video_muted:
            return Panel(Text("❌ ") + self.styled_shortcut_key("Video Off"), expand=False)
        else:
            return Panel(Text("πŸŽ₯ ") + self.styled_shortcut_key("Video On"), expand=False)

    def get_phone_display(self):
        return Panel(Text("πŸ“ž ") + self.styled_shortcut_key("Hang up"), expand=False)

    @group()
    def generate_control_bar(self):
        """Return the full interface display."""
        yield Columns(
            [
                self.get_micro_display(),
                self.get_video_display(),
                self.get_phone_display(),
            ],
            expand=False,
            title="Calling [bold center]{}[/]".format(self.webrtc.callee),
        )

    async def start(self):
        done = asyncio.Event()
        input = create_input()

        def keys_ready(live):
            for key_press in input.read_keys():
                char = key_press.key.lower()
                if char == "m":
                    # audio mute
                    self.webrtc.audio_muted = not self.webrtc.audio_muted
                    live.update(self.generate_control_bar(), refresh=True)
                elif char == "v":
                    # video mute
                    self.webrtc.video_muted = not self.webrtc.video_muted
                    live.update(self.generate_control_bar(), refresh=True)
                elif char == "h" or key_press.key == Keys.ControlC:
                    # Hang up
                    done.set()
                elif char == "d":
                    # generate dot file for debugging. Only available if
                    # ``GST_DEBUG_DUMP_DOT_DIR`` is set. Filename is "pipeline.dot" with a
                    # timestamp.
                    if os.getenv("GST_DEBUG_DUMP_DOT_DIR"):
                        self.webrtc.generate_dot_file()
                        self.host.disp("Dot file generated.")

        with Live(
            self.generate_control_bar(),
            console=self.host.console,
            auto_refresh=False
        ) as live:
            with input.raw_mode():
                with input.attach(partial(keys_ready, live)):
                    await done.wait()

        await self.webrtc.end_call()
        await self.host.a_quit()


class Common(base.CommandBase):

    def add_parser_options(self):
        self.parser.add_argument(
            "--no-ui", action="store_true", help=_("disable user interface")
        )

    async def start(self):
        root_logger = logging.getLogger()
        # we don't want any formatting for messages from webrtc
        for handler in root_logger.handlers:
            handler.setFormatter(None)
        if self.verbosity == 0:
            root_logger.setLevel(logging.ERROR)
        if self.verbosity >= 1:
            root_logger.setLevel(logging.WARNING)
        if self.verbosity >= 2:
            root_logger.setLevel(logging.DEBUG)

    async def start_ui(self, webrtc_call):
        if not self.args.no_ui:
            ui = UI(self.host, webrtc_call.webrtc)
            await ui.start()


class Make(Common):
    def __init__(self, host):
        super().__init__(
            host,
            "make",
            use_verbose=True,
            help=_("start a call"),
        )

    def add_parser_options(self):
        super().add_parser_options()
        self.parser.add_argument(
            "entity",
            metavar="JID",
            help=_("JIDs of entity to call"),
        )

    async def start(self):
        await super().start()
        callee = jid.JID(self.args.entity)
        webrtc_call = WebRTCCall(self.host, self.profile, callee)
        await webrtc_call.start()
        await super().start_ui(webrtc_call)


class Receive(Common):
    def __init__(self, host):
        super().__init__(
            host,
            "receive",
            use_verbose=True,
            help=_("wait for a call"),
        )

    def add_parser_options(self):
        super().add_parser_options()
        auto_accept_group = self.parser.add_mutually_exclusive_group()
        auto_accept_group.add_argument(
            "-a",
            "--auto-accept",
            action="append",
            metavar="JID",
            default=[],
            help=_("automatically accept call from this jid (can be used multiple times)")
        )
        auto_accept_group.add_argument(
            "--auto-accept-all",
            action="store_true",
            help=_("automatically accept call from anybody")
        )

    async def on_action_new(
        self, action_data_s: str, action_id: str, security_limit: int, profile: str
    ) -> None:
        if profile != self.profile:
            return
        action_data = data_format.deserialise(action_data_s)
        if action_data.get("type") != C.META_TYPE_CALL:
            return
        peer_jid = jid.JID(action_data["from_jid"]).bare
        caller = peer_jid.bare
        if (
            not self.args.auto_accept_all
            and caller not in self.args.auto_accept
            and not await self.host.confirm(
                _("πŸ“ž Incoming call from {caller}, do you accept?").format(
                    caller=caller
                )
            )
        ):
            await self.host.bridge.action_launch(
                action_id, data_format.serialise({"cancelled": True}), profile
            )
            return

        self.disp(_("βœ… Incoming call from {caller} accepted.").format(caller=caller))

        webrtc_call = WebRTCCall(self.host, self.profile, peer_jid)
        webrtc_call.sid = action_data["session_id"]
        await self.host.bridge.action_launch(
            action_id, data_format.serialise({"cancelled": False}), profile
        )
        await super().start_ui(webrtc_call)

    async def start(self):
        await super().start()
        self.host.bridge.register_signal("action_new", self.on_action_new, "core")


class Call(base.CommandBase):
    subcommands = (Make, Receive)

    def __init__(self, host):
        super().__init__(host, "call", use_profile=False, help=_("A/V calls and related"))