view libervia/cli/call_simple.py @ 4230:314d3c02bb67

core (xmpp): Add a timeout for messages processing to avoid blocking the queue.
author Goffi <goffi@goffi.org>
date Sat, 06 Apr 2024 12:21:04 +0200
parents 9218d4331bb2
children d01b8d002619
line wrap: on
line source

#!/usr/bin/env python3

# Libervia CLI
# Copyright (C) 2009-2024 JΓ©rΓ΄me Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import asyncio
from functools import partial
import os

from prompt_toolkit.input import create_input
from prompt_toolkit.keys import Keys
from rich.align import Align
from rich.columns import Columns
from rich.console import group
from rich.live import Live
from rich.panel import Panel
from rich.text import Text

from .call_webrtc import CallData, WebRTCCall


class BaseAVTUI:
    def __init__(self, host, webrtc=None, align: str = "left"):
        self.host = host
        self.webrtc = webrtc
        self.align = align
        self.input = create_input()
        self.done = asyncio.Event()
        self.target_size: tuple[int, int] | None = None

    @staticmethod
    def parse_output_opts(parent) -> dict:
        """Parse output options.

        This method should be called in a loop checking all output options.
        It will set the relevant attributes.
        @return: keyword argument to use to instanciate WebRTCCall
        """
        kwargs = {}
        for oo in parent.args.output_opts:
            if oo.startswith("size="):
                try:
                    width_s, height_s = oo[5:].lower().strip().split("x", 1)
                    width, height = int(width_s), int(height_s)
                except ValueError:
                    parent.parser.error(
                        "Invalid size; it must be in the form widthxheight "
                        "(e.g., 640x360)."
                    )
                else:
                    kwargs["target_size"] = (width, height)
        return kwargs

    def styled_shortcut_key(self, word: str, key: str | None = None) -> Text:
        """Return a word with the specified key or the first letter underlined."""
        if key is None:
            key = word[0]
        index = word.find(key)
        before, keyword, after = word[:index], word[index], word[index + 1 :]
        return Text(before) + Text(keyword, style="shortcut") + Text(after)

    def get_micro_display(self):
        assert self.webrtc is not None
        if self.webrtc.audio_muted:
            return Panel(Text("πŸ”‡ ") + self.styled_shortcut_key("Muted"), expand=False)
        else:
            return Panel(
                Text("🎀 ") + self.styled_shortcut_key("Unmuted", "m"), expand=False
            )

    def get_video_display(self):
        assert self.webrtc is not None
        if self.webrtc.video_muted:
            return Panel(Text("❌ ") + self.styled_shortcut_key("Video Off"), expand=False)
        else:
            return Panel(Text("πŸŽ₯ ") + self.styled_shortcut_key("Video On"), expand=False)

    def get_phone_display(self):
        return Panel(Text("πŸ“ž ") + self.styled_shortcut_key("Hang up"), expand=False)

    @group()
    def generate_control_bar(self):
        """Return the full interface display."""
        assert self.webrtc is not None
        yield Align(
            Columns(
                [
                    Text(),
                    self.get_micro_display(),
                    self.get_video_display(),
                    self.get_phone_display(),
                ],
                expand=False,
                title="Calling [bold center]{}[/]".format(self.webrtc.callee),
            ),
            align=self.align,
        )

    def keys_ready(self, live=None):
        assert self.webrtc is not None
        for key_press in self.input.read_keys():
            char = key_press.key.lower()
            if char == "m":
                # audio mute
                self.webrtc.audio_muted = not self.webrtc.audio_muted
                if live is not None:
                    live.update(self.generate_control_bar(), refresh=True)
            elif char == "v":
                # video mute
                self.webrtc.video_muted = not self.webrtc.video_muted
                if live is not None:
                    live.update(self.generate_control_bar(), refresh=True)
            elif char == "h" or key_press.key == Keys.ControlC:
                # Hang up
                self.done.set()
            elif char == "d":
                # generate dot file for debugging. Only available if
                # ``GST_DEBUG_DUMP_DOT_DIR`` is set. Filename is "pipeline.dot" with a
                # timestamp.
                if os.getenv("GST_DEBUG_DUMP_DOT_DIR"):
                    self.webrtc.generate_dot_file()
                    self.host.disp("Dot file generated.")


class AVCallUI(BaseAVTUI):
    def __init__(self, host, webrtc):
        super().__init__(host, webrtc)

    async def start(self):
        assert self.webrtc is not None
        with Live(
            self.generate_control_bar(), console=self.host.console, auto_refresh=False
        ) as live:
            with self.input.raw_mode():
                with self.input.attach(partial(self.keys_ready, live)):
                    await self.done.wait()

        await self.webrtc.end_call()
        await self.host.a_quit()

    @classmethod
    async def run(cls, parent, call_data: CallData) -> None:
        kwargs = cls.parse_output_opts(parent)
        merge_pip = False if "split" in parent.args.output_opts else None

        webrtc_call = await WebRTCCall.make_webrtc_call(
            parent.host,
            parent.profile,
            call_data,
            merge_pip=merge_pip,
            **kwargs,
        )
        if not parent.args.no_ui:
            ui = cls(parent.host, webrtc_call.webrtc)
            await ui.start()