diff libervia/cli/cmd_call.py @ 4143:849721e1563b

cli: `call` command: This command has 2 subcommands: `make` and `receive` to make a new call or wait for one. When call is in progress, a window will be created to show incoming stream and local feedback, and a text UI is available to (un)mute audio or video, and hang up. rel 426
author Goffi <goffi@goffi.org>
date Wed, 01 Nov 2023 14:10:00 +0100
parents
children 0f8ea0768a3b
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/cli/cmd_call.py	Wed Nov 01 14:10:00 2023 +0100
@@ -0,0 +1,316 @@
+#!/usr/bin/env python3
+
+
+# Libervia CLI
+# Copyright (C) 2009-2021 JΓ©rΓ΄me Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+
+from argparse import ArgumentParser
+import asyncio
+from functools import partial
+import logging
+import os
+
+from prompt_toolkit.input import create_input
+from prompt_toolkit.keys import Keys
+
+from libervia.backend.core.i18n import _
+from libervia.backend.tools.common import data_format
+from libervia.cli.constants import Const as C
+from libervia.frontends.tools import aio, jid
+from rich.columns import Columns
+from rich.console import group
+from rich.live import Live
+from rich.panel import Panel
+from rich.text import Text
+
+from . import base
+
+__commands__ = ["Call"]
+
+
+class WebRTCCall:
+    def __init__(self, host, profile: str, callee: jid.JID):
+        from libervia.frontends.tools import webrtc
+
+        aio.install_glib_asyncio_iteration()
+        self.host = host
+        self.profile = profile
+        self.webrtc = webrtc.WebRTC(host.bridge, profile)
+        self.webrtc.callee = callee
+        host.bridge.register_signal(
+            "ice_candidates_new", self.on_ice_candidates_new, "plugin"
+        )
+        host.bridge.register_signal("call_setup", self.on_call_setup, "plugin")
+        host.bridge.register_signal("call_ended", self.on_call_ended, "plugin")
+
+    @property
+    def sid(self) -> str | None:
+        return self.webrtc.sid
+
+    @sid.setter
+    def sid(self, new_sid: str | None) -> None:
+        self.webrtc.sid = new_sid
+
+    async def on_ice_candidates_new(
+        self, sid: str, candidates_s: str, profile: str
+    ) -> None:
+        if sid != self.webrtc.sid or profile != self.profile:
+            return
+        self.webrtc.on_ice_candidates_new(
+            data_format.deserialise(candidates_s),
+        )
+
+    async def on_call_setup(self, sid: str, setup_data_s: str, profile: str) -> None:
+        if sid != self.webrtc.sid or profile != self.profile:
+            return
+        setup_data = data_format.deserialise(setup_data_s)
+        try:
+            role = setup_data["role"]
+            sdp = setup_data["sdp"]
+        except KeyError:
+            self.host.disp(f"Invalid setup data received: {setup_data}", error=True)
+            return
+        if role == "initiator":
+            self.webrtc.on_accepted_call(sdp, profile)
+        elif role == "responder":
+            await self.webrtc.answer_call(sdp, profile)
+        else:
+            self.host.disp(
+                f"Invalid role received during setup: {setup_data}", error=True
+            )
+        # we want to be sure that call is ended if user presses `Ctrl + c` or anything
+        # else stops the session.
+        self.host.add_on_quit_callback(
+            lambda: self.host.bridge.call_end(sid, "", profile)
+        )
+
+    async def on_call_ended(self, sid: str, data_s: str, profile: str) -> None:
+        if sid != self.webrtc.sid or profile != self.profile:
+            return
+        await self.webrtc.end_call()
+        await self.host.a_quit()
+
+    async def start(self):
+        """Start a call.
+
+        To be used only if we are initiator
+        """
+        await self.webrtc.setup_call("initiator")
+        self.webrtc.start_pipeline()
+
+
+class UI:
+    def __init__(self, host, webrtc):
+        self.host = host
+        self.webrtc = webrtc
+
+    def styled_shortcut_key(self, word: str, key: str | None = None) -> Text:
+        """Return a word with the specified key or the first letter underlined."""
+        if key is None:
+            key = word[0]
+        index = word.find(key)
+        before, keyword, after = word[:index], word[index], word[index + 1 :]
+        return Text(before) + Text(keyword, style="shortcut") + Text(after)
+
+    def get_micro_display(self):
+        if self.webrtc.audio_muted:
+            return Panel(Text("πŸ”‡ ") + self.styled_shortcut_key("Muted"), expand=False)
+        else:
+            return Panel(
+                Text("🎀 ") + self.styled_shortcut_key("Unmuted", "m"), expand=False
+            )
+
+    def get_video_display(self):
+        if self.webrtc.video_muted:
+            return Panel(Text("❌ ") + self.styled_shortcut_key("Video Off"), expand=False)
+        else:
+            return Panel(Text("πŸŽ₯ ") + self.styled_shortcut_key("Video On"), expand=False)
+
+    def get_phone_display(self):
+        return Panel(Text("πŸ“ž ") + self.styled_shortcut_key("Hang up"), expand=False)
+
+    @group()
+    def generate_control_bar(self):
+        """Return the full interface display."""
+        yield Columns(
+            [
+                self.get_micro_display(),
+                self.get_video_display(),
+                self.get_phone_display(),
+            ],
+            expand=False,
+            title="Calling [bold center]{}[/]".format(self.webrtc.callee),
+        )
+
+    async def start(self):
+        done = asyncio.Event()
+        input = create_input()
+
+        def keys_ready(live):
+            for key_press in input.read_keys():
+                char = key_press.key.lower()
+                if char == "m":
+                    # audio mute
+                    self.webrtc.audio_muted = not self.webrtc.audio_muted
+                    live.update(self.generate_control_bar(), refresh=True)
+                elif char == "v":
+                    # video mute
+                    self.webrtc.video_muted = not self.webrtc.video_muted
+                    live.update(self.generate_control_bar(), refresh=True)
+                elif char == "h" or key_press.key == Keys.ControlC:
+                    # Hang up
+                    done.set()
+                elif char == "d":
+                    # generate dot file for debugging. Only available if
+                    # ``GST_DEBUG_DUMP_DOT_DIR`` is set. Filename is "pipeline.dot" with a
+                    # timestamp.
+                    if os.getenv("GST_DEBUG_DUMP_DOT_DIR"):
+                        self.webrtc.generate_dot_file()
+                        self.host.disp("Dot file generated.")
+
+        with Live(
+            self.generate_control_bar(),
+            console=self.host.console,
+            auto_refresh=False
+        ) as live:
+            with input.raw_mode():
+                with input.attach(partial(keys_ready, live)):
+                    await done.wait()
+
+        await self.webrtc.end_call()
+        await self.host.a_quit()
+
+
+class Common(base.CommandBase):
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "--no-ui", action="store_true", help=_("disable user interface")
+        )
+
+    async def start(self):
+        root_logger = logging.getLogger()
+        # we don't want any formatting for messages from webrtc
+        for handler in root_logger.handlers:
+            handler.setFormatter(None)
+        if self.verbosity == 0:
+            root_logger.setLevel(logging.ERROR)
+        if self.verbosity >= 1:
+            root_logger.setLevel(logging.WARNING)
+        if self.verbosity >= 2:
+            root_logger.setLevel(logging.DEBUG)
+
+    async def start_ui(self, webrtc_call):
+        if not self.args.no_ui:
+            ui = UI(self.host, webrtc_call.webrtc)
+            await ui.start()
+
+
+class Make(Common):
+    def __init__(self, host):
+        super().__init__(
+            host,
+            "make",
+            use_verbose=True,
+            help=_("start a call"),
+        )
+
+    def add_parser_options(self):
+        super().add_parser_options()
+        self.parser.add_argument(
+            "entity",
+            metavar="JID",
+            help=_("JIDs of entity to call"),
+        )
+
+    async def start(self):
+        await super().start()
+        callee = jid.JID(self.args.entity)
+        webrtc_call = WebRTCCall(self.host, self.profile, callee)
+        await webrtc_call.start()
+        await super().start_ui(webrtc_call)
+
+
+class Receive(Common):
+    def __init__(self, host):
+        super().__init__(
+            host,
+            "receive",
+            use_verbose=True,
+            help=_("wait for a call"),
+        )
+
+    def add_parser_options(self):
+        super().add_parser_options()
+        auto_accept_group = self.parser.add_mutually_exclusive_group()
+        auto_accept_group.add_argument(
+            "-a",
+            "--auto-accept",
+            action="append",
+            metavar="JID",
+            default=[],
+            help=_("automatically accept call from this jid (can be used multiple times)")
+        )
+        auto_accept_group.add_argument(
+            "--auto-accept-all",
+            action="store_true",
+            help=_("automatically accept call from anybody")
+        )
+
+    async def on_action_new(
+        self, action_data_s: str, action_id: str, security_limit: int, profile: str
+    ) -> None:
+        if profile != self.profile:
+            return
+        action_data = data_format.deserialise(action_data_s)
+        if action_data.get("type") != C.META_TYPE_CALL:
+            return
+        peer_jid = jid.JID(action_data["from_jid"]).bare
+        caller = peer_jid.bare
+        if (
+            not self.args.auto_accept_all
+            and caller not in self.args.auto_accept
+            and not await self.host.confirm(
+                _("πŸ“ž Incoming call from {caller}, do you accept?").format(
+                    caller=caller
+                )
+            )
+        ):
+            await self.host.bridge.action_launch(
+                action_id, data_format.serialise({"cancelled": True}), profile
+            )
+            return
+
+        self.disp(_("βœ… Incoming call from {caller} accepted.").format(caller=caller))
+
+        webrtc_call = WebRTCCall(self.host, self.profile, peer_jid)
+        webrtc_call.sid = action_data["session_id"]
+        await self.host.bridge.action_launch(
+            action_id, data_format.serialise({"cancelled": False}), profile
+        )
+        await super().start_ui(webrtc_call)
+
+    async def start(self):
+        await super().start()
+        self.host.bridge.register_signal("action_new", self.on_action_new, "core")
+
+
+class Call(base.CommandBase):
+    subcommands = (Make, Receive)
+
+    def __init__(self, host):
+        super().__init__(host, "call", use_profile=False, help=_("A/V calls and related"))