diff libervia/cli/call_tui.py @ 4210:9218d4331bb2

cli (call): `tui` output implementation: - Moved original UI to a separated class, and use if with the `simple` output - By default, best output is automatically selected. For now `gui` is selected if possible, and `simple` is used as fallback. - The new `tui` output can be used to have the videos directly embedded in the terminal, either with real videos for compatible terminal emulators, or with Unicode blocks. - Text contrôls are used for both `simple` and `tui` outputs - several options can be used with `--oo` (will be documented in next commit). rel 428
author Goffi <goffi@goffi.org>
date Fri, 16 Feb 2024 18:46:06 +0100
parents
children d01b8d002619
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/cli/call_tui.py	Fri Feb 16 18:46:06 2024 +0100
@@ -0,0 +1,194 @@
+#!/usr/bin/env python3
+
+# Libervia CLI
+# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+
+import asyncio
+from functools import partial
+import sys
+
+from PIL import Image
+import gi
+from rich.padding import Padding
+from term_image import image as t_image
+
+from libervia.cli.constants import Const as C
+from libervia.frontends.tools import webrtc
+
+from .call_simple import BaseAVTUI
+from .call_webrtc import CallData, WebRTCCall
+
+gi.require_versions({"Gst": "1.0", "GstWebRTC": "1.0"})
+
+from gi.repository import Gst
+
+
+class AVCallUI(BaseAVTUI):
+    def __init__(self, parent):
+        super().__init__(parent.host, align="center")
+        self.parent = parent
+        self.image_rows = 0
+        self.buffer = None
+        self._processing = False
+        self.render_class: t_image.ImageMeta | None = None
+        self.target_size
+        self.fps = 25
+        for oo in parent.args.output_opts:
+            if oo.startswith("renderer="):
+                renderer = oo[9:].lower().strip()
+                match renderer:
+                    case "auto":
+                        # we let None as it will auto-detect best option later
+                        pass
+                    case "block":
+                        self.render_class = t_image.BlockImage
+                    case "iterm2":
+                        self.render_class = t_image.ITerm2Image
+                    case "kitty":
+                        self.render_class = t_image.KittyImage
+                    case _:
+                        parent.parser.error(f"Invalid renderer: {renderer!r}")
+            elif oo.startswith("fps="):
+                try:
+                    self.fps = int(oo[4:])
+                except ValueError:
+                    parent.parser.error(f"Invalid FPS: {oo[4:]!r}")
+
+    async def init_call(self, call_data):
+        kwargs = self.parse_output_opts(self.parent)
+        if "target_size" not in kwargs:
+            # we use low res by default for performance reason
+            kwargs["target_size"] = (640, 380)
+        webrtc_call = await WebRTCCall.make_webrtc_call(
+            self.parent.host,
+            self.parent.profile,
+            call_data,
+            sinks=webrtc.SINKS_APP,
+            appsink_data=webrtc.AppSinkData(
+                local_video_cb=partial(self.on_new_sample, video_stream="local"),
+                remote_video_cb=None,
+            ),
+            merge_pip=True,
+            **kwargs,
+        )
+        self.webrtc = webrtc_call.webrtc
+
+    async def start(self, call_data):
+        term_rows = self.host.console.size.height
+        if term_rows < 20:
+            self.host.disp(
+                "Your terminal must have a height of a least 20 rows.", error=True
+            )
+            self.host.a_quit(C.EXIT_ERROR)
+        self.image_rows = term_rows - 6
+        self.image_cols = self.host.console.size.width
+        await self.init_call(call_data)
+        assert self.webrtc is not None
+
+        idx = 0
+        self.buffer = ""
+
+        # we detect render
+        if self.render_class is None:
+            self.render_class = t_image.auto_image_class()
+
+        loop_sleep = 1/self.fps
+
+        with self.input.raw_mode():
+            # for whatever reason, using self.input.attach is breaking KittyImage and uses
+            # a BlockImage style rendering instead. So we don't use it and we call
+            # ``self.keys_ready()`` ourself in the loop, below.
+            # cursor is not restored despite the ``screen`` context if peer is hanging up,
+            # so we reactivate cursor here
+            self.host.add_on_quit_callback(self.host.console.show_cursor, True)
+            with self.host.console.screen():
+                while True:
+                    idx += 1
+                    if self.buffer is not None:
+                        sys.stdout.write(self.buffer)
+                        sys.stdout.write("\n")
+                        self.parent.console.print(
+                            # the padding avoid artifact when toggling buttons
+                            Padding(self.generate_control_bar(), (0, 2, 0, 0))
+                        )
+                        sys.stdout.flush()
+                        rendered = True
+                    else:
+                        rendered = False
+                    await asyncio.sleep(loop_sleep)
+                    self.keys_ready()
+                    if self.done.is_set():
+                        break
+                    if rendered:
+                        # we put cursor back at the top of image to print the next frame
+                        # FIXME: we use +4 for the controls because we know the height of the
+                        #   renderable, but it would be better to measure it dynamically.
+                        sys.stdout.write(f"\033[{self.image_rows+4}A")
+
+        await self.webrtc.end_call()
+        await self.host.a_quit()
+
+    @classmethod
+    async def run(cls, parent, call_data: CallData) -> None:
+        ui = cls(parent)
+        await ui.start(call_data)
+
+    def on_new_sample(self, video_sink, video_stream: str) -> bool:
+        if self._processing:
+            # image encoding for terminal is slow, if one is already processing, we don't
+            # bother going further
+            return False
+        sample = video_sink.emit("pull-sample")
+        if sample is None:
+            return False
+
+        video_pad = video_sink.get_static_pad("sink")
+        assert video_pad is not None
+        s = video_pad.get_current_caps().get_structure(0)
+        stream_size = (s.get_value("width"), s.get_value("height"))
+        buf = sample.get_buffer()
+        result, mapinfo = buf.map(Gst.MapFlags.READ)
+        if result and self.render_class is not None:
+            self._processing = True
+            image_data = mapinfo.data
+            image = Image.frombuffer("RGB", stream_size, image_data, "raw", "RGB", 0, 1)
+            img_renderer = self.render_class(image, height=self.image_rows)
+            img_fmt = f"<{self.image_cols}.^1"
+            if self.render_class == t_image.KittyImage:
+                # we don't do compression to speed up things
+                img_fmt += "+Wc0"
+            self.host.loop.loop.call_soon_threadsafe(
+                self.update_sample,
+                sample,
+                stream_size,
+                video_stream,
+                format(img_renderer, img_fmt),
+            )
+            self._processing = False
+
+        buf.unmap(mapinfo)
+
+        return False
+
+    def update_sample(self, sample, stream_size, video_stream: str, buffer) -> None:
+        if sample is None:
+            return
+
+        if video_stream == "remote":
+            return
+
+        self.buffer = buffer