Mercurial > libervia-backend
diff libervia/cli/cmd_call.py @ 4143:849721e1563b
cli: `call` command:
This command has 2 subcommands: `make` and `receive` to make a new call or wait for one.
When call is in progress, a window will be created to show incoming stream and local
feedback, and a text UI is available to (un)mute audio or video, and hang up.
rel 426
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 01 Nov 2023 14:10:00 +0100 |
parents | |
children | 0f8ea0768a3b |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/cli/cmd_call.py Wed Nov 01 14:10:00 2023 +0100 @@ -0,0 +1,316 @@ +#!/usr/bin/env python3 + + +# Libervia CLI +# Copyright (C) 2009-2021 JΓ©rΓ΄me Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +from argparse import ArgumentParser +import asyncio +from functools import partial +import logging +import os + +from prompt_toolkit.input import create_input +from prompt_toolkit.keys import Keys + +from libervia.backend.core.i18n import _ +from libervia.backend.tools.common import data_format +from libervia.cli.constants import Const as C +from libervia.frontends.tools import aio, jid +from rich.columns import Columns +from rich.console import group +from rich.live import Live +from rich.panel import Panel +from rich.text import Text + +from . import base + +__commands__ = ["Call"] + + +class WebRTCCall: + def __init__(self, host, profile: str, callee: jid.JID): + from libervia.frontends.tools import webrtc + + aio.install_glib_asyncio_iteration() + self.host = host + self.profile = profile + self.webrtc = webrtc.WebRTC(host.bridge, profile) + self.webrtc.callee = callee + host.bridge.register_signal( + "ice_candidates_new", self.on_ice_candidates_new, "plugin" + ) + host.bridge.register_signal("call_setup", self.on_call_setup, "plugin") + host.bridge.register_signal("call_ended", self.on_call_ended, "plugin") + + @property + def sid(self) -> str | None: + return self.webrtc.sid + + @sid.setter + def sid(self, new_sid: str | None) -> None: + self.webrtc.sid = new_sid + + async def on_ice_candidates_new( + self, sid: str, candidates_s: str, profile: str + ) -> None: + if sid != self.webrtc.sid or profile != self.profile: + return + self.webrtc.on_ice_candidates_new( + data_format.deserialise(candidates_s), + ) + + async def on_call_setup(self, sid: str, setup_data_s: str, profile: str) -> None: + if sid != self.webrtc.sid or profile != self.profile: + return + setup_data = data_format.deserialise(setup_data_s) + try: + role = setup_data["role"] + sdp = setup_data["sdp"] + except KeyError: + self.host.disp(f"Invalid setup data received: {setup_data}", error=True) + return + if role == "initiator": + self.webrtc.on_accepted_call(sdp, profile) + elif role == "responder": + await self.webrtc.answer_call(sdp, profile) + else: + self.host.disp( + f"Invalid role received during setup: {setup_data}", error=True + ) + # we want to be sure that call is ended if user presses `Ctrl + c` or anything + # else stops the session. + self.host.add_on_quit_callback( + lambda: self.host.bridge.call_end(sid, "", profile) + ) + + async def on_call_ended(self, sid: str, data_s: str, profile: str) -> None: + if sid != self.webrtc.sid or profile != self.profile: + return + await self.webrtc.end_call() + await self.host.a_quit() + + async def start(self): + """Start a call. + + To be used only if we are initiator + """ + await self.webrtc.setup_call("initiator") + self.webrtc.start_pipeline() + + +class UI: + def __init__(self, host, webrtc): + self.host = host + self.webrtc = webrtc + + def styled_shortcut_key(self, word: str, key: str | None = None) -> Text: + """Return a word with the specified key or the first letter underlined.""" + if key is None: + key = word[0] + index = word.find(key) + before, keyword, after = word[:index], word[index], word[index + 1 :] + return Text(before) + Text(keyword, style="shortcut") + Text(after) + + def get_micro_display(self): + if self.webrtc.audio_muted: + return Panel(Text("π ") + self.styled_shortcut_key("Muted"), expand=False) + else: + return Panel( + Text("π€ ") + self.styled_shortcut_key("Unmuted", "m"), expand=False + ) + + def get_video_display(self): + if self.webrtc.video_muted: + return Panel(Text("β ") + self.styled_shortcut_key("Video Off"), expand=False) + else: + return Panel(Text("π₯ ") + self.styled_shortcut_key("Video On"), expand=False) + + def get_phone_display(self): + return Panel(Text("π ") + self.styled_shortcut_key("Hang up"), expand=False) + + @group() + def generate_control_bar(self): + """Return the full interface display.""" + yield Columns( + [ + self.get_micro_display(), + self.get_video_display(), + self.get_phone_display(), + ], + expand=False, + title="Calling [bold center]{}[/]".format(self.webrtc.callee), + ) + + async def start(self): + done = asyncio.Event() + input = create_input() + + def keys_ready(live): + for key_press in input.read_keys(): + char = key_press.key.lower() + if char == "m": + # audio mute + self.webrtc.audio_muted = not self.webrtc.audio_muted + live.update(self.generate_control_bar(), refresh=True) + elif char == "v": + # video mute + self.webrtc.video_muted = not self.webrtc.video_muted + live.update(self.generate_control_bar(), refresh=True) + elif char == "h" or key_press.key == Keys.ControlC: + # Hang up + done.set() + elif char == "d": + # generate dot file for debugging. Only available if + # ``GST_DEBUG_DUMP_DOT_DIR`` is set. Filename is "pipeline.dot" with a + # timestamp. + if os.getenv("GST_DEBUG_DUMP_DOT_DIR"): + self.webrtc.generate_dot_file() + self.host.disp("Dot file generated.") + + with Live( + self.generate_control_bar(), + console=self.host.console, + auto_refresh=False + ) as live: + with input.raw_mode(): + with input.attach(partial(keys_ready, live)): + await done.wait() + + await self.webrtc.end_call() + await self.host.a_quit() + + +class Common(base.CommandBase): + + def add_parser_options(self): + self.parser.add_argument( + "--no-ui", action="store_true", help=_("disable user interface") + ) + + async def start(self): + root_logger = logging.getLogger() + # we don't want any formatting for messages from webrtc + for handler in root_logger.handlers: + handler.setFormatter(None) + if self.verbosity == 0: + root_logger.setLevel(logging.ERROR) + if self.verbosity >= 1: + root_logger.setLevel(logging.WARNING) + if self.verbosity >= 2: + root_logger.setLevel(logging.DEBUG) + + async def start_ui(self, webrtc_call): + if not self.args.no_ui: + ui = UI(self.host, webrtc_call.webrtc) + await ui.start() + + +class Make(Common): + def __init__(self, host): + super().__init__( + host, + "make", + use_verbose=True, + help=_("start a call"), + ) + + def add_parser_options(self): + super().add_parser_options() + self.parser.add_argument( + "entity", + metavar="JID", + help=_("JIDs of entity to call"), + ) + + async def start(self): + await super().start() + callee = jid.JID(self.args.entity) + webrtc_call = WebRTCCall(self.host, self.profile, callee) + await webrtc_call.start() + await super().start_ui(webrtc_call) + + +class Receive(Common): + def __init__(self, host): + super().__init__( + host, + "receive", + use_verbose=True, + help=_("wait for a call"), + ) + + def add_parser_options(self): + super().add_parser_options() + auto_accept_group = self.parser.add_mutually_exclusive_group() + auto_accept_group.add_argument( + "-a", + "--auto-accept", + action="append", + metavar="JID", + default=[], + help=_("automatically accept call from this jid (can be used multiple times)") + ) + auto_accept_group.add_argument( + "--auto-accept-all", + action="store_true", + help=_("automatically accept call from anybody") + ) + + async def on_action_new( + self, action_data_s: str, action_id: str, security_limit: int, profile: str + ) -> None: + if profile != self.profile: + return + action_data = data_format.deserialise(action_data_s) + if action_data.get("type") != C.META_TYPE_CALL: + return + peer_jid = jid.JID(action_data["from_jid"]).bare + caller = peer_jid.bare + if ( + not self.args.auto_accept_all + and caller not in self.args.auto_accept + and not await self.host.confirm( + _("π Incoming call from {caller}, do you accept?").format( + caller=caller + ) + ) + ): + await self.host.bridge.action_launch( + action_id, data_format.serialise({"cancelled": True}), profile + ) + return + + self.disp(_("β Incoming call from {caller} accepted.").format(caller=caller)) + + webrtc_call = WebRTCCall(self.host, self.profile, peer_jid) + webrtc_call.sid = action_data["session_id"] + await self.host.bridge.action_launch( + action_id, data_format.serialise({"cancelled": False}), profile + ) + await super().start_ui(webrtc_call) + + async def start(self): + await super().start() + self.host.bridge.register_signal("action_new", self.on_action_new, "core") + + +class Call(base.CommandBase): + subcommands = (Make, Receive) + + def __init__(self, host): + super().__init__(host, "call", use_profile=False, help=_("A/V calls and related"))