Mercurial > libervia-backend
view libervia/cli/call_tui.py @ 4351:6a0a081485b8
plugin autocrypt: Autocrypt protocol implementation:
Implementation of autocrypt: `autocrypt` header is checked, and if present and no public
key is known for the peer, the key is imported.
`autocrypt` header is also added to outgoing message (only if an email gateway is
detected).
For the moment, the JID is use as identifier, but the real email used by gateway should be
used in the future.
rel 456
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 28 Feb 2025 09:23:35 +0100 |
parents | 0d7bb4df2343 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia CLI # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import asyncio from functools import partial import sys from PIL import Image import gi from rich.padding import Padding from term_image import image as t_image from libervia.cli.constants import Const as C from libervia.frontends.tools import aio, webrtc from libervia.frontends.tools.webrtc import CallData, WebRTCCall from .call_simple import BaseAVTUI gi.require_versions({"Gst": "1.0", "GstWebRTC": "1.0"}) from gi.repository import Gst aio.install_glib_asyncio_iteration() class AVCallUI(BaseAVTUI): def __init__(self, parent): super().__init__(parent.host, align="center") self.parent = parent self.image_rows = 0 self.buffer = None self._processing = False self.render_class: t_image.ImageMeta | None = None self.target_size self.fps = 25 for oo in parent.args.output_opts: if oo.startswith("renderer="): renderer = oo[9:].lower().strip() match renderer: case "auto": # we let None as it will auto-detect best option later pass case "block": self.render_class = t_image.BlockImage case "iterm2": self.render_class = t_image.ITerm2Image case "kitty": self.render_class = t_image.KittyImage case _: parent.parser.error(f"Invalid renderer: {renderer!r}") elif oo.startswith("fps="): try: self.fps = int(oo[4:]) except ValueError: parent.parser.error(f"Invalid FPS: {oo[4:]!r}") async def init_call(self, call_data): kwargs = self.parse_output_opts(self.parent) if "target_size" not in kwargs: # we use low res by default for performance reason kwargs["target_size"] = (640, 380) webrtc_call = await WebRTCCall.make_webrtc_call( self.parent.host.bridge, self.parent.profile, call_data, sinks_data=webrtc.SinksApp( local_video_cb=partial(self.on_new_sample, video_stream="local"), remote_video_cb=None, ), merge_pip=True, # we want to be sure that call is ended if user presses `Ctrl + c` or anything # else stops the session. on_call_setup_cb=lambda sid, profile: self.parent.host.add_on_quit_callback( self.parent.host.bridge.call_end, sid, "", profile ), on_call_ended_cb=lambda sid, profile: self.parent.host.a_quit(), **kwargs, ) self.webrtc = webrtc_call.webrtc async def start(self, call_data): term_rows = self.host.console.size.height if term_rows < 20: self.host.disp( "Your terminal must have a height of a least 20 rows.", error=True ) self.host.a_quit(C.EXIT_ERROR) self.image_rows = term_rows - 6 self.image_cols = self.host.console.size.width await self.init_call(call_data) assert self.webrtc is not None idx = 0 self.buffer = "" # we detect render if self.render_class is None: self.render_class = t_image.auto_image_class() loop_sleep = 1 / self.fps with self.input.raw_mode(): # for whatever reason, using self.input.attach is breaking KittyImage and uses # a BlockImage style rendering instead. So we don't use it and we call # ``self.keys_ready()`` ourself in the loop, below. # cursor is not restored despite the ``screen`` context if peer is hanging up, # so we reactivate cursor here self.host.add_on_quit_callback(self.host.console.show_cursor, True) with self.host.console.screen(): while True: idx += 1 if self.buffer is not None: sys.stdout.write(self.buffer) sys.stdout.write("\n") self.parent.console.print( # the padding avoid artifact when toggling buttons Padding(self.generate_control_bar(), (0, 2, 0, 0)) ) sys.stdout.flush() rendered = True else: rendered = False await asyncio.sleep(loop_sleep) self.keys_ready() if self.done.is_set(): break if rendered: # we put cursor back at the top of image to print the next frame # FIXME: we use +4 for the controls because we know the height of the # renderable, but it would be better to measure it dynamically. sys.stdout.write(f"\033[{self.image_rows+4}A") await self.webrtc.end_call() await self.host.a_quit() @classmethod async def run(cls, parent, call_data: CallData) -> None: ui = cls(parent) await ui.start(call_data) def on_new_sample(self, video_sink, video_stream: str) -> bool: if self._processing: # image encoding for terminal is slow, if one is already processing, we don't # bother going further return False sample = video_sink.emit("pull-sample") if sample is None: return False video_pad = video_sink.get_static_pad("sink") assert video_pad is not None s = video_pad.get_current_caps().get_structure(0) stream_size = (s.get_value("width"), s.get_value("height")) buf = sample.get_buffer() result, mapinfo = buf.map(Gst.MapFlags.READ) if result and self.render_class is not None: self._processing = True image_data = mapinfo.data image = Image.frombuffer("RGB", stream_size, image_data, "raw", "RGB", 0, 1) img_renderer = self.render_class(image, height=self.image_rows) img_fmt = f"<{self.image_cols}.^1" if self.render_class == t_image.KittyImage: # we don't do compression to speed up things img_fmt += "+Wc0" self.host.loop.loop.call_soon_threadsafe( self.update_sample, sample, stream_size, video_stream, format(img_renderer, img_fmt), ) self._processing = False buf.unmap(mapinfo) return False def update_sample(self, sample, stream_size, video_stream: str, buffer) -> None: if sample is None: return if video_stream == "remote": return self.buffer = buffer