Mercurial > libervia-backend
view libervia/cli/cmd_call.py @ 4180:b86912d3fd33
plugin IP: fix use of legacy URL + coroutine use:
An https:/salut-a-toi.org URL was used to retrieve external IP, but it's not valid
anymore, resulting in an exception. This feature is currently disabled.
Also moved several methods from legacy inline callbacks to coroutines.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 09 Dec 2023 14:30:54 +0100 |
parents | 849721e1563b |
children | 0f8ea0768a3b |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia CLI # Copyright (C) 2009-2021 JΓ©rΓ΄me Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from argparse import ArgumentParser import asyncio from functools import partial import logging import os from prompt_toolkit.input import create_input from prompt_toolkit.keys import Keys from libervia.backend.core.i18n import _ from libervia.backend.tools.common import data_format from libervia.cli.constants import Const as C from libervia.frontends.tools import aio, jid from rich.columns import Columns from rich.console import group from rich.live import Live from rich.panel import Panel from rich.text import Text from . import base __commands__ = ["Call"] class WebRTCCall: def __init__(self, host, profile: str, callee: jid.JID): from libervia.frontends.tools import webrtc aio.install_glib_asyncio_iteration() self.host = host self.profile = profile self.webrtc = webrtc.WebRTC(host.bridge, profile) self.webrtc.callee = callee host.bridge.register_signal( "ice_candidates_new", self.on_ice_candidates_new, "plugin" ) host.bridge.register_signal("call_setup", self.on_call_setup, "plugin") host.bridge.register_signal("call_ended", self.on_call_ended, "plugin") @property def sid(self) -> str | None: return self.webrtc.sid @sid.setter def sid(self, new_sid: str | None) -> None: self.webrtc.sid = new_sid async def on_ice_candidates_new( self, sid: str, candidates_s: str, profile: str ) -> None: if sid != self.webrtc.sid or profile != self.profile: return self.webrtc.on_ice_candidates_new( data_format.deserialise(candidates_s), ) async def on_call_setup(self, sid: str, setup_data_s: str, profile: str) -> None: if sid != self.webrtc.sid or profile != self.profile: return setup_data = data_format.deserialise(setup_data_s) try: role = setup_data["role"] sdp = setup_data["sdp"] except KeyError: self.host.disp(f"Invalid setup data received: {setup_data}", error=True) return if role == "initiator": self.webrtc.on_accepted_call(sdp, profile) elif role == "responder": await self.webrtc.answer_call(sdp, profile) else: self.host.disp( f"Invalid role received during setup: {setup_data}", error=True ) # we want to be sure that call is ended if user presses `Ctrl + c` or anything # else stops the session. self.host.add_on_quit_callback( lambda: self.host.bridge.call_end(sid, "", profile) ) async def on_call_ended(self, sid: str, data_s: str, profile: str) -> None: if sid != self.webrtc.sid or profile != self.profile: return await self.webrtc.end_call() await self.host.a_quit() async def start(self): """Start a call. To be used only if we are initiator """ await self.webrtc.setup_call("initiator") self.webrtc.start_pipeline() class UI: def __init__(self, host, webrtc): self.host = host self.webrtc = webrtc def styled_shortcut_key(self, word: str, key: str | None = None) -> Text: """Return a word with the specified key or the first letter underlined.""" if key is None: key = word[0] index = word.find(key) before, keyword, after = word[:index], word[index], word[index + 1 :] return Text(before) + Text(keyword, style="shortcut") + Text(after) def get_micro_display(self): if self.webrtc.audio_muted: return Panel(Text("π ") + self.styled_shortcut_key("Muted"), expand=False) else: return Panel( Text("π€ ") + self.styled_shortcut_key("Unmuted", "m"), expand=False ) def get_video_display(self): if self.webrtc.video_muted: return Panel(Text("β ") + self.styled_shortcut_key("Video Off"), expand=False) else: return Panel(Text("π₯ ") + self.styled_shortcut_key("Video On"), expand=False) def get_phone_display(self): return Panel(Text("π ") + self.styled_shortcut_key("Hang up"), expand=False) @group() def generate_control_bar(self): """Return the full interface display.""" yield Columns( [ self.get_micro_display(), self.get_video_display(), self.get_phone_display(), ], expand=False, title="Calling [bold center]{}[/]".format(self.webrtc.callee), ) async def start(self): done = asyncio.Event() input = create_input() def keys_ready(live): for key_press in input.read_keys(): char = key_press.key.lower() if char == "m": # audio mute self.webrtc.audio_muted = not self.webrtc.audio_muted live.update(self.generate_control_bar(), refresh=True) elif char == "v": # video mute self.webrtc.video_muted = not self.webrtc.video_muted live.update(self.generate_control_bar(), refresh=True) elif char == "h" or key_press.key == Keys.ControlC: # Hang up done.set() elif char == "d": # generate dot file for debugging. Only available if # ``GST_DEBUG_DUMP_DOT_DIR`` is set. Filename is "pipeline.dot" with a # timestamp. if os.getenv("GST_DEBUG_DUMP_DOT_DIR"): self.webrtc.generate_dot_file() self.host.disp("Dot file generated.") with Live( self.generate_control_bar(), console=self.host.console, auto_refresh=False ) as live: with input.raw_mode(): with input.attach(partial(keys_ready, live)): await done.wait() await self.webrtc.end_call() await self.host.a_quit() class Common(base.CommandBase): def add_parser_options(self): self.parser.add_argument( "--no-ui", action="store_true", help=_("disable user interface") ) async def start(self): root_logger = logging.getLogger() # we don't want any formatting for messages from webrtc for handler in root_logger.handlers: handler.setFormatter(None) if self.verbosity == 0: root_logger.setLevel(logging.ERROR) if self.verbosity >= 1: root_logger.setLevel(logging.WARNING) if self.verbosity >= 2: root_logger.setLevel(logging.DEBUG) async def start_ui(self, webrtc_call): if not self.args.no_ui: ui = UI(self.host, webrtc_call.webrtc) await ui.start() class Make(Common): def __init__(self, host): super().__init__( host, "make", use_verbose=True, help=_("start a call"), ) def add_parser_options(self): super().add_parser_options() self.parser.add_argument( "entity", metavar="JID", help=_("JIDs of entity to call"), ) async def start(self): await super().start() callee = jid.JID(self.args.entity) webrtc_call = WebRTCCall(self.host, self.profile, callee) await webrtc_call.start() await super().start_ui(webrtc_call) class Receive(Common): def __init__(self, host): super().__init__( host, "receive", use_verbose=True, help=_("wait for a call"), ) def add_parser_options(self): super().add_parser_options() auto_accept_group = self.parser.add_mutually_exclusive_group() auto_accept_group.add_argument( "-a", "--auto-accept", action="append", metavar="JID", default=[], help=_("automatically accept call from this jid (can be used multiple times)") ) auto_accept_group.add_argument( "--auto-accept-all", action="store_true", help=_("automatically accept call from anybody") ) async def on_action_new( self, action_data_s: str, action_id: str, security_limit: int, profile: str ) -> None: if profile != self.profile: return action_data = data_format.deserialise(action_data_s) if action_data.get("type") != C.META_TYPE_CALL: return peer_jid = jid.JID(action_data["from_jid"]).bare caller = peer_jid.bare if ( not self.args.auto_accept_all and caller not in self.args.auto_accept and not await self.host.confirm( _("π Incoming call from {caller}, do you accept?").format( caller=caller ) ) ): await self.host.bridge.action_launch( action_id, data_format.serialise({"cancelled": True}), profile ) return self.disp(_("β Incoming call from {caller} accepted.").format(caller=caller)) webrtc_call = WebRTCCall(self.host, self.profile, peer_jid) webrtc_call.sid = action_data["session_id"] await self.host.bridge.action_launch( action_id, data_format.serialise({"cancelled": False}), profile ) await super().start_ui(webrtc_call) async def start(self): await super().start() self.host.bridge.register_signal("action_new", self.on_action_new, "core") class Call(base.CommandBase): subcommands = (Make, Receive) def __init__(self, host): super().__init__(host, "call", use_profile=False, help=_("A/V calls and related"))