view libervia/cli/call_simple.py @ 4338:7c0b7ecb816f

component email gateway: Add a pubsub service: a pubsub service is implemented to retrieve and manage attachments using XEP-0498. rel 453
author Goffi <goffi@goffi.org>
date Tue, 03 Dec 2024 00:13:23 +0100
parents 0d7bb4df2343
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia CLI
# Copyright (C) 2009-2024 JΓ©rΓ΄me Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import asyncio
from functools import partial
import os

from prompt_toolkit.input import create_input
from prompt_toolkit.keys import Keys
from rich.align import Align
from rich.columns import Columns
from rich.console import group
from rich.live import Live
from rich.panel import Panel
from rich.text import Text

from libervia.frontends.tools import aio
from libervia.frontends.tools.webrtc import CallData, WebRTCCall


aio.install_glib_asyncio_iteration()


class BaseAVTUI:
    def __init__(self, host, webrtc=None, align: str = "left"):
        self.host = host
        self.webrtc = webrtc
        self.align = align
        self.input = create_input()
        self.done = asyncio.Event()
        self.target_size: tuple[int, int] | None = None

    @staticmethod
    def parse_output_opts(parent) -> dict:
        """Parse output options.

        This method should be called in a loop checking all output options.
        It will set the relevant attributes.
        @return: keyword argument to use to instanciate WebRTCCall
        """
        kwargs = {}
        for oo in parent.args.output_opts:
            if oo.startswith("size="):
                try:
                    width_s, height_s = oo[5:].lower().strip().split("x", 1)
                    width, height = int(width_s), int(height_s)
                except ValueError:
                    parent.parser.error(
                        "Invalid size; it must be in the form widthxheight "
                        "(e.g., 640x360)."
                    )
                else:
                    kwargs["target_size"] = (width, height)
        return kwargs

    def styled_shortcut_key(self, word: str, key: str | None = None) -> Text:
        """Return a word with the specified key or the first letter underlined."""
        if key is None:
            key = word[0]
        index = word.find(key)
        before, keyword, after = word[:index], word[index], word[index + 1 :]
        return Text(before) + Text(keyword, style="shortcut") + Text(after)

    def get_micro_display(self):
        assert self.webrtc is not None
        if self.webrtc.audio_muted:
            return Panel(Text("πŸ”‡ ") + self.styled_shortcut_key("Muted"), expand=False)
        else:
            return Panel(
                Text("🎀 ") + self.styled_shortcut_key("Unmuted", "m"), expand=False
            )

    def get_video_display(self):
        assert self.webrtc is not None
        if self.webrtc.video_muted:
            return Panel(
                Text("❌ ") + self.styled_shortcut_key("Video Off"), expand=False
            )
        else:
            return Panel(Text("πŸŽ₯ ") + self.styled_shortcut_key("Video On"), expand=False)

    def get_phone_display(self):
        return Panel(Text("πŸ“ž ") + self.styled_shortcut_key("Hang up"), expand=False)

    @group()
    def generate_control_bar(self):
        """Return the full interface display."""
        assert self.webrtc is not None
        yield Align(
            Columns(
                [
                    Text(),
                    self.get_micro_display(),
                    self.get_video_display(),
                    self.get_phone_display(),
                ],
                expand=False,
                title="Calling [bold center]{}[/]".format(self.webrtc.callee),
            ),
            align=self.align,
        )

    def keys_ready(self, live=None):
        assert self.webrtc is not None
        for key_press in self.input.read_keys():
            char = key_press.key.lower()
            if char == "m":
                # audio mute
                self.webrtc.audio_muted = not self.webrtc.audio_muted
                if live is not None:
                    live.update(self.generate_control_bar(), refresh=True)
            elif char == "v":
                # video mute
                self.webrtc.video_muted = not self.webrtc.video_muted
                if live is not None:
                    live.update(self.generate_control_bar(), refresh=True)
            elif char == "h" or key_press.key == Keys.ControlC:
                # Hang up
                self.done.set()
            elif char == "d":
                # generate dot file for debugging. Only available if
                # ``GST_DEBUG_DUMP_DOT_DIR`` is set. Filename is "pipeline.dot" with a
                # timestamp.
                if os.getenv("GST_DEBUG_DUMP_DOT_DIR"):
                    self.webrtc.generate_dot_file()
                    self.host.disp("Dot file generated.")


class AVCallUI(BaseAVTUI):
    def __init__(self, host, webrtc):
        super().__init__(host, webrtc)

    async def start(self):
        assert self.webrtc is not None
        with Live(
            self.generate_control_bar(), console=self.host.console, auto_refresh=False
        ) as live:
            with self.input.raw_mode():
                with self.input.attach(partial(self.keys_ready, live)):
                    await self.done.wait()

        await self.webrtc.end_call()
        await self.host.a_quit()

    @classmethod
    async def run(cls, parent, call_data: CallData) -> None:
        kwargs = cls.parse_output_opts(parent)
        merge_pip = False if "split" in parent.args.output_opts else None

        webrtc_call = await WebRTCCall.make_webrtc_call(
            parent.host.bridge,
            parent.profile,
            call_data,
            merge_pip=merge_pip,
            # we want to be sure that call is ended if user presses `Ctrl + c` or anything
            # else stops the session.
            on_call_setup_cb=lambda sid, profile: parent.host.add_on_quit_callback(
                parent.host.bridge.call_end, sid, "", profile
            ),
            on_call_ended_cb=lambda sid, profile: parent.host.a_quit(),
            **kwargs,
        )
        if not parent.args.no_ui:
            ui = cls(parent.host, webrtc_call.webrtc)
            await ui.start()