Mercurial > libervia-backend
view libervia/cli/cmd_call.py @ 4161:2074b2bbe616
core (memory/sqla_mapping): `delete-orphan` in History:
add `cascade=delete-orphan` to History's `messages`, `subjects` and `thread`, to make
modification of those attributes easier.
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 28 Nov 2023 17:35:20 +0100 |
parents | 849721e1563b |
children | 0f8ea0768a3b |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia CLI # Copyright (C) 2009-2021 JΓ©rΓ΄me Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from argparse import ArgumentParser import asyncio from functools import partial import logging import os from prompt_toolkit.input import create_input from prompt_toolkit.keys import Keys from libervia.backend.core.i18n import _ from libervia.backend.tools.common import data_format from libervia.cli.constants import Const as C from libervia.frontends.tools import aio, jid from rich.columns import Columns from rich.console import group from rich.live import Live from rich.panel import Panel from rich.text import Text from . import base __commands__ = ["Call"] class WebRTCCall: def __init__(self, host, profile: str, callee: jid.JID): from libervia.frontends.tools import webrtc aio.install_glib_asyncio_iteration() self.host = host self.profile = profile self.webrtc = webrtc.WebRTC(host.bridge, profile) self.webrtc.callee = callee host.bridge.register_signal( "ice_candidates_new", self.on_ice_candidates_new, "plugin" ) host.bridge.register_signal("call_setup", self.on_call_setup, "plugin") host.bridge.register_signal("call_ended", self.on_call_ended, "plugin") @property def sid(self) -> str | None: return self.webrtc.sid @sid.setter def sid(self, new_sid: str | None) -> None: self.webrtc.sid = new_sid async def on_ice_candidates_new( self, sid: str, candidates_s: str, profile: str ) -> None: if sid != self.webrtc.sid or profile != self.profile: return self.webrtc.on_ice_candidates_new( data_format.deserialise(candidates_s), ) async def on_call_setup(self, sid: str, setup_data_s: str, profile: str) -> None: if sid != self.webrtc.sid or profile != self.profile: return setup_data = data_format.deserialise(setup_data_s) try: role = setup_data["role"] sdp = setup_data["sdp"] except KeyError: self.host.disp(f"Invalid setup data received: {setup_data}", error=True) return if role == "initiator": self.webrtc.on_accepted_call(sdp, profile) elif role == "responder": await self.webrtc.answer_call(sdp, profile) else: self.host.disp( f"Invalid role received during setup: {setup_data}", error=True ) # we want to be sure that call is ended if user presses `Ctrl + c` or anything # else stops the session. self.host.add_on_quit_callback( lambda: self.host.bridge.call_end(sid, "", profile) ) async def on_call_ended(self, sid: str, data_s: str, profile: str) -> None: if sid != self.webrtc.sid or profile != self.profile: return await self.webrtc.end_call() await self.host.a_quit() async def start(self): """Start a call. To be used only if we are initiator """ await self.webrtc.setup_call("initiator") self.webrtc.start_pipeline() class UI: def __init__(self, host, webrtc): self.host = host self.webrtc = webrtc def styled_shortcut_key(self, word: str, key: str | None = None) -> Text: """Return a word with the specified key or the first letter underlined.""" if key is None: key = word[0] index = word.find(key) before, keyword, after = word[:index], word[index], word[index + 1 :] return Text(before) + Text(keyword, style="shortcut") + Text(after) def get_micro_display(self): if self.webrtc.audio_muted: return Panel(Text("π ") + self.styled_shortcut_key("Muted"), expand=False) else: return Panel( Text("π€ ") + self.styled_shortcut_key("Unmuted", "m"), expand=False ) def get_video_display(self): if self.webrtc.video_muted: return Panel(Text("β ") + self.styled_shortcut_key("Video Off"), expand=False) else: return Panel(Text("π₯ ") + self.styled_shortcut_key("Video On"), expand=False) def get_phone_display(self): return Panel(Text("π ") + self.styled_shortcut_key("Hang up"), expand=False) @group() def generate_control_bar(self): """Return the full interface display.""" yield Columns( [ self.get_micro_display(), self.get_video_display(), self.get_phone_display(), ], expand=False, title="Calling [bold center]{}[/]".format(self.webrtc.callee), ) async def start(self): done = asyncio.Event() input = create_input() def keys_ready(live): for key_press in input.read_keys(): char = key_press.key.lower() if char == "m": # audio mute self.webrtc.audio_muted = not self.webrtc.audio_muted live.update(self.generate_control_bar(), refresh=True) elif char == "v": # video mute self.webrtc.video_muted = not self.webrtc.video_muted live.update(self.generate_control_bar(), refresh=True) elif char == "h" or key_press.key == Keys.ControlC: # Hang up done.set() elif char == "d": # generate dot file for debugging. Only available if # ``GST_DEBUG_DUMP_DOT_DIR`` is set. Filename is "pipeline.dot" with a # timestamp. if os.getenv("GST_DEBUG_DUMP_DOT_DIR"): self.webrtc.generate_dot_file() self.host.disp("Dot file generated.") with Live( self.generate_control_bar(), console=self.host.console, auto_refresh=False ) as live: with input.raw_mode(): with input.attach(partial(keys_ready, live)): await done.wait() await self.webrtc.end_call() await self.host.a_quit() class Common(base.CommandBase): def add_parser_options(self): self.parser.add_argument( "--no-ui", action="store_true", help=_("disable user interface") ) async def start(self): root_logger = logging.getLogger() # we don't want any formatting for messages from webrtc for handler in root_logger.handlers: handler.setFormatter(None) if self.verbosity == 0: root_logger.setLevel(logging.ERROR) if self.verbosity >= 1: root_logger.setLevel(logging.WARNING) if self.verbosity >= 2: root_logger.setLevel(logging.DEBUG) async def start_ui(self, webrtc_call): if not self.args.no_ui: ui = UI(self.host, webrtc_call.webrtc) await ui.start() class Make(Common): def __init__(self, host): super().__init__( host, "make", use_verbose=True, help=_("start a call"), ) def add_parser_options(self): super().add_parser_options() self.parser.add_argument( "entity", metavar="JID", help=_("JIDs of entity to call"), ) async def start(self): await super().start() callee = jid.JID(self.args.entity) webrtc_call = WebRTCCall(self.host, self.profile, callee) await webrtc_call.start() await super().start_ui(webrtc_call) class Receive(Common): def __init__(self, host): super().__init__( host, "receive", use_verbose=True, help=_("wait for a call"), ) def add_parser_options(self): super().add_parser_options() auto_accept_group = self.parser.add_mutually_exclusive_group() auto_accept_group.add_argument( "-a", "--auto-accept", action="append", metavar="JID", default=[], help=_("automatically accept call from this jid (can be used multiple times)") ) auto_accept_group.add_argument( "--auto-accept-all", action="store_true", help=_("automatically accept call from anybody") ) async def on_action_new( self, action_data_s: str, action_id: str, security_limit: int, profile: str ) -> None: if profile != self.profile: return action_data = data_format.deserialise(action_data_s) if action_data.get("type") != C.META_TYPE_CALL: return peer_jid = jid.JID(action_data["from_jid"]).bare caller = peer_jid.bare if ( not self.args.auto_accept_all and caller not in self.args.auto_accept and not await self.host.confirm( _("π Incoming call from {caller}, do you accept?").format( caller=caller ) ) ): await self.host.bridge.action_launch( action_id, data_format.serialise({"cancelled": True}), profile ) return self.disp(_("β Incoming call from {caller} accepted.").format(caller=caller)) webrtc_call = WebRTCCall(self.host, self.profile, peer_jid) webrtc_call.sid = action_data["session_id"] await self.host.bridge.action_launch( action_id, data_format.serialise({"cancelled": False}), profile ) await super().start_ui(webrtc_call) async def start(self): await super().start() self.host.bridge.register_signal("action_new", self.on_action_new, "core") class Call(base.CommandBase): subcommands = (Make, Receive) def __init__(self, host): super().__init__(host, "call", use_profile=False, help=_("A/V calls and related"))