view libervia/cli/call_tui.py @ 4306:94e0968987cd

plugin XEP-0033: code modernisation, improve delivery, data validation: - Code has been rewritten using Pydantic models and `async` coroutines for data validation and cleaner element parsing/generation. - Delivery has been completely rewritten. It now works even if server doesn't support multicast, and send to local multicast service first. Delivering to local multicast service first is due to bad support of XEP-0033 in server (notably Prosody which has an incomplete implementation), and the current impossibility to detect if a sub-domain service handles fully multicast or only for local domains. This is a workaround to have a good balance between backward compatilibity and use of bandwith, and to make it work with the incoming email gateway implementation (the gateway will only deliver to entities of its own domain). - disco feature checking now uses `async` corountines. `host` implementation still use Deferred return values for compatibility with legacy code. rel 450
author Goffi <goffi@goffi.org>
date Thu, 26 Sep 2024 16:12:01 +0200
parents 0d7bb4df2343
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia CLI
# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import asyncio
from functools import partial
import sys

from PIL import Image
import gi
from rich.padding import Padding
from term_image import image as t_image

from libervia.cli.constants import Const as C
from libervia.frontends.tools import aio, webrtc
from libervia.frontends.tools.webrtc import CallData, WebRTCCall

from .call_simple import BaseAVTUI

gi.require_versions({"Gst": "1.0", "GstWebRTC": "1.0"})

from gi.repository import Gst


aio.install_glib_asyncio_iteration()


class AVCallUI(BaseAVTUI):
    def __init__(self, parent):
        super().__init__(parent.host, align="center")
        self.parent = parent
        self.image_rows = 0
        self.buffer = None
        self._processing = False
        self.render_class: t_image.ImageMeta | None = None
        self.target_size
        self.fps = 25
        for oo in parent.args.output_opts:
            if oo.startswith("renderer="):
                renderer = oo[9:].lower().strip()
                match renderer:
                    case "auto":
                        # we let None as it will auto-detect best option later
                        pass
                    case "block":
                        self.render_class = t_image.BlockImage
                    case "iterm2":
                        self.render_class = t_image.ITerm2Image
                    case "kitty":
                        self.render_class = t_image.KittyImage
                    case _:
                        parent.parser.error(f"Invalid renderer: {renderer!r}")
            elif oo.startswith("fps="):
                try:
                    self.fps = int(oo[4:])
                except ValueError:
                    parent.parser.error(f"Invalid FPS: {oo[4:]!r}")

    async def init_call(self, call_data):
        kwargs = self.parse_output_opts(self.parent)
        if "target_size" not in kwargs:
            # we use low res by default for performance reason
            kwargs["target_size"] = (640, 380)
        webrtc_call = await WebRTCCall.make_webrtc_call(
            self.parent.host.bridge,
            self.parent.profile,
            call_data,
            sinks_data=webrtc.SinksApp(
                local_video_cb=partial(self.on_new_sample, video_stream="local"),
                remote_video_cb=None,
            ),
            merge_pip=True,
            # we want to be sure that call is ended if user presses `Ctrl + c` or anything
            # else stops the session.
            on_call_setup_cb=lambda sid, profile: self.parent.host.add_on_quit_callback(
                self.parent.host.bridge.call_end, sid, "", profile
            ),
            on_call_ended_cb=lambda sid, profile: self.parent.host.a_quit(),
            **kwargs,
        )
        self.webrtc = webrtc_call.webrtc

    async def start(self, call_data):
        term_rows = self.host.console.size.height
        if term_rows < 20:
            self.host.disp(
                "Your terminal must have a height of a least 20 rows.", error=True
            )
            self.host.a_quit(C.EXIT_ERROR)
        self.image_rows = term_rows - 6
        self.image_cols = self.host.console.size.width
        await self.init_call(call_data)
        assert self.webrtc is not None

        idx = 0
        self.buffer = ""

        # we detect render
        if self.render_class is None:
            self.render_class = t_image.auto_image_class()

        loop_sleep = 1 / self.fps

        with self.input.raw_mode():
            # for whatever reason, using self.input.attach is breaking KittyImage and uses
            # a BlockImage style rendering instead. So we don't use it and we call
            # ``self.keys_ready()`` ourself in the loop, below.
            # cursor is not restored despite the ``screen`` context if peer is hanging up,
            # so we reactivate cursor here
            self.host.add_on_quit_callback(self.host.console.show_cursor, True)
            with self.host.console.screen():
                while True:
                    idx += 1
                    if self.buffer is not None:
                        sys.stdout.write(self.buffer)
                        sys.stdout.write("\n")
                        self.parent.console.print(
                            # the padding avoid artifact when toggling buttons
                            Padding(self.generate_control_bar(), (0, 2, 0, 0))
                        )
                        sys.stdout.flush()
                        rendered = True
                    else:
                        rendered = False
                    await asyncio.sleep(loop_sleep)
                    self.keys_ready()
                    if self.done.is_set():
                        break
                    if rendered:
                        # we put cursor back at the top of image to print the next frame
                        # FIXME: we use +4 for the controls because we know the height of the
                        #   renderable, but it would be better to measure it dynamically.
                        sys.stdout.write(f"\033[{self.image_rows+4}A")

        await self.webrtc.end_call()
        await self.host.a_quit()

    @classmethod
    async def run(cls, parent, call_data: CallData) -> None:
        ui = cls(parent)
        await ui.start(call_data)

    def on_new_sample(self, video_sink, video_stream: str) -> bool:
        if self._processing:
            # image encoding for terminal is slow, if one is already processing, we don't
            # bother going further
            return False
        sample = video_sink.emit("pull-sample")
        if sample is None:
            return False

        video_pad = video_sink.get_static_pad("sink")
        assert video_pad is not None
        s = video_pad.get_current_caps().get_structure(0)
        stream_size = (s.get_value("width"), s.get_value("height"))
        buf = sample.get_buffer()
        result, mapinfo = buf.map(Gst.MapFlags.READ)
        if result and self.render_class is not None:
            self._processing = True
            image_data = mapinfo.data
            image = Image.frombuffer("RGB", stream_size, image_data, "raw", "RGB", 0, 1)
            img_renderer = self.render_class(image, height=self.image_rows)
            img_fmt = f"<{self.image_cols}.^1"
            if self.render_class == t_image.KittyImage:
                # we don't do compression to speed up things
                img_fmt += "+Wc0"
            self.host.loop.loop.call_soon_threadsafe(
                self.update_sample,
                sample,
                stream_size,
                video_stream,
                format(img_renderer, img_fmt),
            )
            self._processing = False

        buf.unmap(mapinfo)

        return False

    def update_sample(self, sample, stream_size, video_stream: str, buffer) -> None:
        if sample is None:
            return

        if video_stream == "remote":
            return

        self.buffer = buffer