Mercurial > libervia-backend
diff libervia/cli/call_tui.py @ 4210:9218d4331bb2
cli (call): `tui` output implementation:
- Moved original UI to a separated class, and use if with the `simple` output
- By default, best output is automatically selected. For now `gui` is selected if possible,
and `simple` is used as fallback.
- The new `tui` output can be used to have the videos directly embedded in the terminal,
either with real videos for compatible terminal emulators, or with Unicode blocks.
- Text contrôls are used for both `simple` and `tui` outputs
- several options can be used with `--oo` (will be documented in next commit).
rel 428
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 16 Feb 2024 18:46:06 +0100 |
parents | |
children | d01b8d002619 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/cli/call_tui.py Fri Feb 16 18:46:06 2024 +0100 @@ -0,0 +1,194 @@ +#!/usr/bin/env python3 + +# Libervia CLI +# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import asyncio +from functools import partial +import sys + +from PIL import Image +import gi +from rich.padding import Padding +from term_image import image as t_image + +from libervia.cli.constants import Const as C +from libervia.frontends.tools import webrtc + +from .call_simple import BaseAVTUI +from .call_webrtc import CallData, WebRTCCall + +gi.require_versions({"Gst": "1.0", "GstWebRTC": "1.0"}) + +from gi.repository import Gst + + +class AVCallUI(BaseAVTUI): + def __init__(self, parent): + super().__init__(parent.host, align="center") + self.parent = parent + self.image_rows = 0 + self.buffer = None + self._processing = False + self.render_class: t_image.ImageMeta | None = None + self.target_size + self.fps = 25 + for oo in parent.args.output_opts: + if oo.startswith("renderer="): + renderer = oo[9:].lower().strip() + match renderer: + case "auto": + # we let None as it will auto-detect best option later + pass + case "block": + self.render_class = t_image.BlockImage + case "iterm2": + self.render_class = t_image.ITerm2Image + case "kitty": + self.render_class = t_image.KittyImage + case _: + parent.parser.error(f"Invalid renderer: {renderer!r}") + elif oo.startswith("fps="): + try: + self.fps = int(oo[4:]) + except ValueError: + parent.parser.error(f"Invalid FPS: {oo[4:]!r}") + + async def init_call(self, call_data): + kwargs = self.parse_output_opts(self.parent) + if "target_size" not in kwargs: + # we use low res by default for performance reason + kwargs["target_size"] = (640, 380) + webrtc_call = await WebRTCCall.make_webrtc_call( + self.parent.host, + self.parent.profile, + call_data, + sinks=webrtc.SINKS_APP, + appsink_data=webrtc.AppSinkData( + local_video_cb=partial(self.on_new_sample, video_stream="local"), + remote_video_cb=None, + ), + merge_pip=True, + **kwargs, + ) + self.webrtc = webrtc_call.webrtc + + async def start(self, call_data): + term_rows = self.host.console.size.height + if term_rows < 20: + self.host.disp( + "Your terminal must have a height of a least 20 rows.", error=True + ) + self.host.a_quit(C.EXIT_ERROR) + self.image_rows = term_rows - 6 + self.image_cols = self.host.console.size.width + await self.init_call(call_data) + assert self.webrtc is not None + + idx = 0 + self.buffer = "" + + # we detect render + if self.render_class is None: + self.render_class = t_image.auto_image_class() + + loop_sleep = 1/self.fps + + with self.input.raw_mode(): + # for whatever reason, using self.input.attach is breaking KittyImage and uses + # a BlockImage style rendering instead. So we don't use it and we call + # ``self.keys_ready()`` ourself in the loop, below. + # cursor is not restored despite the ``screen`` context if peer is hanging up, + # so we reactivate cursor here + self.host.add_on_quit_callback(self.host.console.show_cursor, True) + with self.host.console.screen(): + while True: + idx += 1 + if self.buffer is not None: + sys.stdout.write(self.buffer) + sys.stdout.write("\n") + self.parent.console.print( + # the padding avoid artifact when toggling buttons + Padding(self.generate_control_bar(), (0, 2, 0, 0)) + ) + sys.stdout.flush() + rendered = True + else: + rendered = False + await asyncio.sleep(loop_sleep) + self.keys_ready() + if self.done.is_set(): + break + if rendered: + # we put cursor back at the top of image to print the next frame + # FIXME: we use +4 for the controls because we know the height of the + # renderable, but it would be better to measure it dynamically. + sys.stdout.write(f"\033[{self.image_rows+4}A") + + await self.webrtc.end_call() + await self.host.a_quit() + + @classmethod + async def run(cls, parent, call_data: CallData) -> None: + ui = cls(parent) + await ui.start(call_data) + + def on_new_sample(self, video_sink, video_stream: str) -> bool: + if self._processing: + # image encoding for terminal is slow, if one is already processing, we don't + # bother going further + return False + sample = video_sink.emit("pull-sample") + if sample is None: + return False + + video_pad = video_sink.get_static_pad("sink") + assert video_pad is not None + s = video_pad.get_current_caps().get_structure(0) + stream_size = (s.get_value("width"), s.get_value("height")) + buf = sample.get_buffer() + result, mapinfo = buf.map(Gst.MapFlags.READ) + if result and self.render_class is not None: + self._processing = True + image_data = mapinfo.data + image = Image.frombuffer("RGB", stream_size, image_data, "raw", "RGB", 0, 1) + img_renderer = self.render_class(image, height=self.image_rows) + img_fmt = f"<{self.image_cols}.^1" + if self.render_class == t_image.KittyImage: + # we don't do compression to speed up things + img_fmt += "+Wc0" + self.host.loop.loop.call_soon_threadsafe( + self.update_sample, + sample, + stream_size, + video_stream, + format(img_renderer, img_fmt), + ) + self._processing = False + + buf.unmap(mapinfo) + + return False + + def update_sample(self, sample, stream_size, video_stream: str, buffer) -> None: + if sample is None: + return + + if video_stream == "remote": + return + + self.buffer = buffer