Mercurial > libervia-backend
view libervia/cli/call_tui.py @ 4294:a0ed5c976bf8
component conferences, plugin XEP-0167, XEP-0298: add stream user metadata:
A/V conference now adds user metadata about the stream it is forwarding through XEP-0298.
This is parsed and added to metadata during confirmation on client side.
rel 448
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 06 Aug 2024 23:43:11 +0200 |
parents | 0d7bb4df2343 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia CLI # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import asyncio from functools import partial import sys from PIL import Image import gi from rich.padding import Padding from term_image import image as t_image from libervia.cli.constants import Const as C from libervia.frontends.tools import aio, webrtc from libervia.frontends.tools.webrtc import CallData, WebRTCCall from .call_simple import BaseAVTUI gi.require_versions({"Gst": "1.0", "GstWebRTC": "1.0"}) from gi.repository import Gst aio.install_glib_asyncio_iteration() class AVCallUI(BaseAVTUI): def __init__(self, parent): super().__init__(parent.host, align="center") self.parent = parent self.image_rows = 0 self.buffer = None self._processing = False self.render_class: t_image.ImageMeta | None = None self.target_size self.fps = 25 for oo in parent.args.output_opts: if oo.startswith("renderer="): renderer = oo[9:].lower().strip() match renderer: case "auto": # we let None as it will auto-detect best option later pass case "block": self.render_class = t_image.BlockImage case "iterm2": self.render_class = t_image.ITerm2Image case "kitty": self.render_class = t_image.KittyImage case _: parent.parser.error(f"Invalid renderer: {renderer!r}") elif oo.startswith("fps="): try: self.fps = int(oo[4:]) except ValueError: parent.parser.error(f"Invalid FPS: {oo[4:]!r}") async def init_call(self, call_data): kwargs = self.parse_output_opts(self.parent) if "target_size" not in kwargs: # we use low res by default for performance reason kwargs["target_size"] = (640, 380) webrtc_call = await WebRTCCall.make_webrtc_call( self.parent.host.bridge, self.parent.profile, call_data, sinks_data=webrtc.SinksApp( local_video_cb=partial(self.on_new_sample, video_stream="local"), remote_video_cb=None, ), merge_pip=True, # we want to be sure that call is ended if user presses `Ctrl + c` or anything # else stops the session. on_call_setup_cb=lambda sid, profile: self.parent.host.add_on_quit_callback( self.parent.host.bridge.call_end, sid, "", profile ), on_call_ended_cb=lambda sid, profile: self.parent.host.a_quit(), **kwargs, ) self.webrtc = webrtc_call.webrtc async def start(self, call_data): term_rows = self.host.console.size.height if term_rows < 20: self.host.disp( "Your terminal must have a height of a least 20 rows.", error=True ) self.host.a_quit(C.EXIT_ERROR) self.image_rows = term_rows - 6 self.image_cols = self.host.console.size.width await self.init_call(call_data) assert self.webrtc is not None idx = 0 self.buffer = "" # we detect render if self.render_class is None: self.render_class = t_image.auto_image_class() loop_sleep = 1 / self.fps with self.input.raw_mode(): # for whatever reason, using self.input.attach is breaking KittyImage and uses # a BlockImage style rendering instead. So we don't use it and we call # ``self.keys_ready()`` ourself in the loop, below. # cursor is not restored despite the ``screen`` context if peer is hanging up, # so we reactivate cursor here self.host.add_on_quit_callback(self.host.console.show_cursor, True) with self.host.console.screen(): while True: idx += 1 if self.buffer is not None: sys.stdout.write(self.buffer) sys.stdout.write("\n") self.parent.console.print( # the padding avoid artifact when toggling buttons Padding(self.generate_control_bar(), (0, 2, 0, 0)) ) sys.stdout.flush() rendered = True else: rendered = False await asyncio.sleep(loop_sleep) self.keys_ready() if self.done.is_set(): break if rendered: # we put cursor back at the top of image to print the next frame # FIXME: we use +4 for the controls because we know the height of the # renderable, but it would be better to measure it dynamically. sys.stdout.write(f"\033[{self.image_rows+4}A") await self.webrtc.end_call() await self.host.a_quit() @classmethod async def run(cls, parent, call_data: CallData) -> None: ui = cls(parent) await ui.start(call_data) def on_new_sample(self, video_sink, video_stream: str) -> bool: if self._processing: # image encoding for terminal is slow, if one is already processing, we don't # bother going further return False sample = video_sink.emit("pull-sample") if sample is None: return False video_pad = video_sink.get_static_pad("sink") assert video_pad is not None s = video_pad.get_current_caps().get_structure(0) stream_size = (s.get_value("width"), s.get_value("height")) buf = sample.get_buffer() result, mapinfo = buf.map(Gst.MapFlags.READ) if result and self.render_class is not None: self._processing = True image_data = mapinfo.data image = Image.frombuffer("RGB", stream_size, image_data, "raw", "RGB", 0, 1) img_renderer = self.render_class(image, height=self.image_rows) img_fmt = f"<{self.image_cols}.^1" if self.render_class == t_image.KittyImage: # we don't do compression to speed up things img_fmt += "+Wc0" self.host.loop.loop.call_soon_threadsafe( self.update_sample, sample, stream_size, video_stream, format(img_renderer, img_fmt), ) self._processing = False buf.unmap(mapinfo) return False def update_sample(self, sample, stream_size, video_stream: str, buffer) -> None: if sample is None: return if video_stream == "remote": return self.buffer = buffer