view libervia/cli/cmd_call.py @ 4202:b26339343076

core: use a user specific directory for PID file: default location of pid file is now specific to logged user, this allow to run several instances of Libervia by different users on the same machine without PID conflicts.
author Goffi <goffi@goffi.org>
date Sun, 14 Jan 2024 17:48:02 +0100
parents 849721e1563b
children 0f8ea0768a3b
line wrap: on
line source

#!/usr/bin/env python3


# Libervia CLI
# Copyright (C) 2009-2021 JΓ©rΓ΄me Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


from argparse import ArgumentParser
import asyncio
from functools import partial
import logging
import os

from prompt_toolkit.input import create_input
from prompt_toolkit.keys import Keys

from libervia.backend.core.i18n import _
from libervia.backend.tools.common import data_format
from libervia.cli.constants import Const as C
from libervia.frontends.tools import aio, jid
from rich.columns import Columns
from rich.console import group
from rich.live import Live
from rich.panel import Panel
from rich.text import Text

from . import base

__commands__ = ["Call"]


class WebRTCCall:
    def __init__(self, host, profile: str, callee: jid.JID):
        from libervia.frontends.tools import webrtc

        aio.install_glib_asyncio_iteration()
        self.host = host
        self.profile = profile
        self.webrtc = webrtc.WebRTC(host.bridge, profile)
        self.webrtc.callee = callee
        host.bridge.register_signal(
            "ice_candidates_new", self.on_ice_candidates_new, "plugin"
        )
        host.bridge.register_signal("call_setup", self.on_call_setup, "plugin")
        host.bridge.register_signal("call_ended", self.on_call_ended, "plugin")

    @property
    def sid(self) -> str | None:
        return self.webrtc.sid

    @sid.setter
    def sid(self, new_sid: str | None) -> None:
        self.webrtc.sid = new_sid

    async def on_ice_candidates_new(
        self, sid: str, candidates_s: str, profile: str
    ) -> None:
        if sid != self.webrtc.sid or profile != self.profile:
            return
        self.webrtc.on_ice_candidates_new(
            data_format.deserialise(candidates_s),
        )

    async def on_call_setup(self, sid: str, setup_data_s: str, profile: str) -> None:
        if sid != self.webrtc.sid or profile != self.profile:
            return
        setup_data = data_format.deserialise(setup_data_s)
        try:
            role = setup_data["role"]
            sdp = setup_data["sdp"]
        except KeyError:
            self.host.disp(f"Invalid setup data received: {setup_data}", error=True)
            return
        if role == "initiator":
            self.webrtc.on_accepted_call(sdp, profile)
        elif role == "responder":
            await self.webrtc.answer_call(sdp, profile)
        else:
            self.host.disp(
                f"Invalid role received during setup: {setup_data}", error=True
            )
        # we want to be sure that call is ended if user presses `Ctrl + c` or anything
        # else stops the session.
        self.host.add_on_quit_callback(
            lambda: self.host.bridge.call_end(sid, "", profile)
        )

    async def on_call_ended(self, sid: str, data_s: str, profile: str) -> None:
        if sid != self.webrtc.sid or profile != self.profile:
            return
        await self.webrtc.end_call()
        await self.host.a_quit()

    async def start(self):
        """Start a call.

        To be used only if we are initiator
        """
        await self.webrtc.setup_call("initiator")
        self.webrtc.start_pipeline()


class UI:
    def __init__(self, host, webrtc):
        self.host = host
        self.webrtc = webrtc

    def styled_shortcut_key(self, word: str, key: str | None = None) -> Text:
        """Return a word with the specified key or the first letter underlined."""
        if key is None:
            key = word[0]
        index = word.find(key)
        before, keyword, after = word[:index], word[index], word[index + 1 :]
        return Text(before) + Text(keyword, style="shortcut") + Text(after)

    def get_micro_display(self):
        if self.webrtc.audio_muted:
            return Panel(Text("πŸ”‡ ") + self.styled_shortcut_key("Muted"), expand=False)
        else:
            return Panel(
                Text("🎀 ") + self.styled_shortcut_key("Unmuted", "m"), expand=False
            )

    def get_video_display(self):
        if self.webrtc.video_muted:
            return Panel(Text("❌ ") + self.styled_shortcut_key("Video Off"), expand=False)
        else:
            return Panel(Text("πŸŽ₯ ") + self.styled_shortcut_key("Video On"), expand=False)

    def get_phone_display(self):
        return Panel(Text("πŸ“ž ") + self.styled_shortcut_key("Hang up"), expand=False)

    @group()
    def generate_control_bar(self):
        """Return the full interface display."""
        yield Columns(
            [
                self.get_micro_display(),
                self.get_video_display(),
                self.get_phone_display(),
            ],
            expand=False,
            title="Calling [bold center]{}[/]".format(self.webrtc.callee),
        )

    async def start(self):
        done = asyncio.Event()
        input = create_input()

        def keys_ready(live):
            for key_press in input.read_keys():
                char = key_press.key.lower()
                if char == "m":
                    # audio mute
                    self.webrtc.audio_muted = not self.webrtc.audio_muted
                    live.update(self.generate_control_bar(), refresh=True)
                elif char == "v":
                    # video mute
                    self.webrtc.video_muted = not self.webrtc.video_muted
                    live.update(self.generate_control_bar(), refresh=True)
                elif char == "h" or key_press.key == Keys.ControlC:
                    # Hang up
                    done.set()
                elif char == "d":
                    # generate dot file for debugging. Only available if
                    # ``GST_DEBUG_DUMP_DOT_DIR`` is set. Filename is "pipeline.dot" with a
                    # timestamp.
                    if os.getenv("GST_DEBUG_DUMP_DOT_DIR"):
                        self.webrtc.generate_dot_file()
                        self.host.disp("Dot file generated.")

        with Live(
            self.generate_control_bar(),
            console=self.host.console,
            auto_refresh=False
        ) as live:
            with input.raw_mode():
                with input.attach(partial(keys_ready, live)):
                    await done.wait()

        await self.webrtc.end_call()
        await self.host.a_quit()


class Common(base.CommandBase):

    def add_parser_options(self):
        self.parser.add_argument(
            "--no-ui", action="store_true", help=_("disable user interface")
        )

    async def start(self):
        root_logger = logging.getLogger()
        # we don't want any formatting for messages from webrtc
        for handler in root_logger.handlers:
            handler.setFormatter(None)
        if self.verbosity == 0:
            root_logger.setLevel(logging.ERROR)
        if self.verbosity >= 1:
            root_logger.setLevel(logging.WARNING)
        if self.verbosity >= 2:
            root_logger.setLevel(logging.DEBUG)

    async def start_ui(self, webrtc_call):
        if not self.args.no_ui:
            ui = UI(self.host, webrtc_call.webrtc)
            await ui.start()


class Make(Common):
    def __init__(self, host):
        super().__init__(
            host,
            "make",
            use_verbose=True,
            help=_("start a call"),
        )

    def add_parser_options(self):
        super().add_parser_options()
        self.parser.add_argument(
            "entity",
            metavar="JID",
            help=_("JIDs of entity to call"),
        )

    async def start(self):
        await super().start()
        callee = jid.JID(self.args.entity)
        webrtc_call = WebRTCCall(self.host, self.profile, callee)
        await webrtc_call.start()
        await super().start_ui(webrtc_call)


class Receive(Common):
    def __init__(self, host):
        super().__init__(
            host,
            "receive",
            use_verbose=True,
            help=_("wait for a call"),
        )

    def add_parser_options(self):
        super().add_parser_options()
        auto_accept_group = self.parser.add_mutually_exclusive_group()
        auto_accept_group.add_argument(
            "-a",
            "--auto-accept",
            action="append",
            metavar="JID",
            default=[],
            help=_("automatically accept call from this jid (can be used multiple times)")
        )
        auto_accept_group.add_argument(
            "--auto-accept-all",
            action="store_true",
            help=_("automatically accept call from anybody")
        )

    async def on_action_new(
        self, action_data_s: str, action_id: str, security_limit: int, profile: str
    ) -> None:
        if profile != self.profile:
            return
        action_data = data_format.deserialise(action_data_s)
        if action_data.get("type") != C.META_TYPE_CALL:
            return
        peer_jid = jid.JID(action_data["from_jid"]).bare
        caller = peer_jid.bare
        if (
            not self.args.auto_accept_all
            and caller not in self.args.auto_accept
            and not await self.host.confirm(
                _("πŸ“ž Incoming call from {caller}, do you accept?").format(
                    caller=caller
                )
            )
        ):
            await self.host.bridge.action_launch(
                action_id, data_format.serialise({"cancelled": True}), profile
            )
            return

        self.disp(_("βœ… Incoming call from {caller} accepted.").format(caller=caller))

        webrtc_call = WebRTCCall(self.host, self.profile, peer_jid)
        webrtc_call.sid = action_data["session_id"]
        await self.host.bridge.action_launch(
            action_id, data_format.serialise({"cancelled": False}), profile
        )
        await super().start_ui(webrtc_call)

    async def start(self):
        await super().start()
        self.host.bridge.register_signal("action_new", self.on_action_new, "core")


class Call(base.CommandBase):
    subcommands = (Make, Receive)

    def __init__(self, host):
        super().__init__(host, "call", use_profile=False, help=_("A/V calls and related"))