view libervia/cli/call_simple.py @ 4219:1b5cf2ee1d86

plugin XEP-0384, XEP-0391: download missing devices list: when a peer jid was not in our roster, devices list was not retrieved, resulting in failed en/decryption. This patch does check it and download missing devices list in necessary. There is no subscription managed yet, so the list won't be updated in case of new devices, this should be addressed at some point.
author Goffi <goffi@goffi.org>
date Tue, 05 Mar 2024 17:31:36 +0100
parents 9218d4331bb2
children d01b8d002619
line wrap: on
line source

#!/usr/bin/env python3

# Libervia CLI
# Copyright (C) 2009-2024 JΓ©rΓ΄me Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import asyncio
from functools import partial
import os

from prompt_toolkit.input import create_input
from prompt_toolkit.keys import Keys
from rich.align import Align
from rich.columns import Columns
from rich.console import group
from rich.live import Live
from rich.panel import Panel
from rich.text import Text

from .call_webrtc import CallData, WebRTCCall


class BaseAVTUI:
    def __init__(self, host, webrtc=None, align: str = "left"):
        self.host = host
        self.webrtc = webrtc
        self.align = align
        self.input = create_input()
        self.done = asyncio.Event()
        self.target_size: tuple[int, int] | None = None

    @staticmethod
    def parse_output_opts(parent) -> dict:
        """Parse output options.

        This method should be called in a loop checking all output options.
        It will set the relevant attributes.
        @return: keyword argument to use to instanciate WebRTCCall
        """
        kwargs = {}
        for oo in parent.args.output_opts:
            if oo.startswith("size="):
                try:
                    width_s, height_s = oo[5:].lower().strip().split("x", 1)
                    width, height = int(width_s), int(height_s)
                except ValueError:
                    parent.parser.error(
                        "Invalid size; it must be in the form widthxheight "
                        "(e.g., 640x360)."
                    )
                else:
                    kwargs["target_size"] = (width, height)
        return kwargs

    def styled_shortcut_key(self, word: str, key: str | None = None) -> Text:
        """Return a word with the specified key or the first letter underlined."""
        if key is None:
            key = word[0]
        index = word.find(key)
        before, keyword, after = word[:index], word[index], word[index + 1 :]
        return Text(before) + Text(keyword, style="shortcut") + Text(after)

    def get_micro_display(self):
        assert self.webrtc is not None
        if self.webrtc.audio_muted:
            return Panel(Text("πŸ”‡ ") + self.styled_shortcut_key("Muted"), expand=False)
        else:
            return Panel(
                Text("🎀 ") + self.styled_shortcut_key("Unmuted", "m"), expand=False
            )

    def get_video_display(self):
        assert self.webrtc is not None
        if self.webrtc.video_muted:
            return Panel(Text("❌ ") + self.styled_shortcut_key("Video Off"), expand=False)
        else:
            return Panel(Text("πŸŽ₯ ") + self.styled_shortcut_key("Video On"), expand=False)

    def get_phone_display(self):
        return Panel(Text("πŸ“ž ") + self.styled_shortcut_key("Hang up"), expand=False)

    @group()
    def generate_control_bar(self):
        """Return the full interface display."""
        assert self.webrtc is not None
        yield Align(
            Columns(
                [
                    Text(),
                    self.get_micro_display(),
                    self.get_video_display(),
                    self.get_phone_display(),
                ],
                expand=False,
                title="Calling [bold center]{}[/]".format(self.webrtc.callee),
            ),
            align=self.align,
        )

    def keys_ready(self, live=None):
        assert self.webrtc is not None
        for key_press in self.input.read_keys():
            char = key_press.key.lower()
            if char == "m":
                # audio mute
                self.webrtc.audio_muted = not self.webrtc.audio_muted
                if live is not None:
                    live.update(self.generate_control_bar(), refresh=True)
            elif char == "v":
                # video mute
                self.webrtc.video_muted = not self.webrtc.video_muted
                if live is not None:
                    live.update(self.generate_control_bar(), refresh=True)
            elif char == "h" or key_press.key == Keys.ControlC:
                # Hang up
                self.done.set()
            elif char == "d":
                # generate dot file for debugging. Only available if
                # ``GST_DEBUG_DUMP_DOT_DIR`` is set. Filename is "pipeline.dot" with a
                # timestamp.
                if os.getenv("GST_DEBUG_DUMP_DOT_DIR"):
                    self.webrtc.generate_dot_file()
                    self.host.disp("Dot file generated.")


class AVCallUI(BaseAVTUI):
    def __init__(self, host, webrtc):
        super().__init__(host, webrtc)

    async def start(self):
        assert self.webrtc is not None
        with Live(
            self.generate_control_bar(), console=self.host.console, auto_refresh=False
        ) as live:
            with self.input.raw_mode():
                with self.input.attach(partial(self.keys_ready, live)):
                    await self.done.wait()

        await self.webrtc.end_call()
        await self.host.a_quit()

    @classmethod
    async def run(cls, parent, call_data: CallData) -> None:
        kwargs = cls.parse_output_opts(parent)
        merge_pip = False if "split" in parent.args.output_opts else None

        webrtc_call = await WebRTCCall.make_webrtc_call(
            parent.host,
            parent.profile,
            call_data,
            merge_pip=merge_pip,
            **kwargs,
        )
        if not parent.args.no_ui:
            ui = cls(parent.host, webrtc_call.webrtc)
            await ui.start()