comparison libervia/cli/call_tui.py @ 4210:9218d4331bb2

cli (call): `tui` output implementation: - Moved original UI to a separated class, and use if with the `simple` output - By default, best output is automatically selected. For now `gui` is selected if possible, and `simple` is used as fallback. - The new `tui` output can be used to have the videos directly embedded in the terminal, either with real videos for compatible terminal emulators, or with Unicode blocks. - Text contrôls are used for both `simple` and `tui` outputs - several options can be used with `--oo` (will be documented in next commit). rel 428
author Goffi <goffi@goffi.org>
date Fri, 16 Feb 2024 18:46:06 +0100
parents
children d01b8d002619
comparison
equal deleted inserted replaced
4209:fe29fbdabce6 4210:9218d4331bb2
1 #!/usr/bin/env python3
2
3 # Libervia CLI
4 # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20 import asyncio
21 from functools import partial
22 import sys
23
24 from PIL import Image
25 import gi
26 from rich.padding import Padding
27 from term_image import image as t_image
28
29 from libervia.cli.constants import Const as C
30 from libervia.frontends.tools import webrtc
31
32 from .call_simple import BaseAVTUI
33 from .call_webrtc import CallData, WebRTCCall
34
35 gi.require_versions({"Gst": "1.0", "GstWebRTC": "1.0"})
36
37 from gi.repository import Gst
38
39
40 class AVCallUI(BaseAVTUI):
41 def __init__(self, parent):
42 super().__init__(parent.host, align="center")
43 self.parent = parent
44 self.image_rows = 0
45 self.buffer = None
46 self._processing = False
47 self.render_class: t_image.ImageMeta | None = None
48 self.target_size
49 self.fps = 25
50 for oo in parent.args.output_opts:
51 if oo.startswith("renderer="):
52 renderer = oo[9:].lower().strip()
53 match renderer:
54 case "auto":
55 # we let None as it will auto-detect best option later
56 pass
57 case "block":
58 self.render_class = t_image.BlockImage
59 case "iterm2":
60 self.render_class = t_image.ITerm2Image
61 case "kitty":
62 self.render_class = t_image.KittyImage
63 case _:
64 parent.parser.error(f"Invalid renderer: {renderer!r}")
65 elif oo.startswith("fps="):
66 try:
67 self.fps = int(oo[4:])
68 except ValueError:
69 parent.parser.error(f"Invalid FPS: {oo[4:]!r}")
70
71 async def init_call(self, call_data):
72 kwargs = self.parse_output_opts(self.parent)
73 if "target_size" not in kwargs:
74 # we use low res by default for performance reason
75 kwargs["target_size"] = (640, 380)
76 webrtc_call = await WebRTCCall.make_webrtc_call(
77 self.parent.host,
78 self.parent.profile,
79 call_data,
80 sinks=webrtc.SINKS_APP,
81 appsink_data=webrtc.AppSinkData(
82 local_video_cb=partial(self.on_new_sample, video_stream="local"),
83 remote_video_cb=None,
84 ),
85 merge_pip=True,
86 **kwargs,
87 )
88 self.webrtc = webrtc_call.webrtc
89
90 async def start(self, call_data):
91 term_rows = self.host.console.size.height
92 if term_rows < 20:
93 self.host.disp(
94 "Your terminal must have a height of a least 20 rows.", error=True
95 )
96 self.host.a_quit(C.EXIT_ERROR)
97 self.image_rows = term_rows - 6
98 self.image_cols = self.host.console.size.width
99 await self.init_call(call_data)
100 assert self.webrtc is not None
101
102 idx = 0
103 self.buffer = ""
104
105 # we detect render
106 if self.render_class is None:
107 self.render_class = t_image.auto_image_class()
108
109 loop_sleep = 1/self.fps
110
111 with self.input.raw_mode():
112 # for whatever reason, using self.input.attach is breaking KittyImage and uses
113 # a BlockImage style rendering instead. So we don't use it and we call
114 # ``self.keys_ready()`` ourself in the loop, below.
115 # cursor is not restored despite the ``screen`` context if peer is hanging up,
116 # so we reactivate cursor here
117 self.host.add_on_quit_callback(self.host.console.show_cursor, True)
118 with self.host.console.screen():
119 while True:
120 idx += 1
121 if self.buffer is not None:
122 sys.stdout.write(self.buffer)
123 sys.stdout.write("\n")
124 self.parent.console.print(
125 # the padding avoid artifact when toggling buttons
126 Padding(self.generate_control_bar(), (0, 2, 0, 0))
127 )
128 sys.stdout.flush()
129 rendered = True
130 else:
131 rendered = False
132 await asyncio.sleep(loop_sleep)
133 self.keys_ready()
134 if self.done.is_set():
135 break
136 if rendered:
137 # we put cursor back at the top of image to print the next frame
138 # FIXME: we use +4 for the controls because we know the height of the
139 # renderable, but it would be better to measure it dynamically.
140 sys.stdout.write(f"\033[{self.image_rows+4}A")
141
142 await self.webrtc.end_call()
143 await self.host.a_quit()
144
145 @classmethod
146 async def run(cls, parent, call_data: CallData) -> None:
147 ui = cls(parent)
148 await ui.start(call_data)
149
150 def on_new_sample(self, video_sink, video_stream: str) -> bool:
151 if self._processing:
152 # image encoding for terminal is slow, if one is already processing, we don't
153 # bother going further
154 return False
155 sample = video_sink.emit("pull-sample")
156 if sample is None:
157 return False
158
159 video_pad = video_sink.get_static_pad("sink")
160 assert video_pad is not None
161 s = video_pad.get_current_caps().get_structure(0)
162 stream_size = (s.get_value("width"), s.get_value("height"))
163 buf = sample.get_buffer()
164 result, mapinfo = buf.map(Gst.MapFlags.READ)
165 if result and self.render_class is not None:
166 self._processing = True
167 image_data = mapinfo.data
168 image = Image.frombuffer("RGB", stream_size, image_data, "raw", "RGB", 0, 1)
169 img_renderer = self.render_class(image, height=self.image_rows)
170 img_fmt = f"<{self.image_cols}.^1"
171 if self.render_class == t_image.KittyImage:
172 # we don't do compression to speed up things
173 img_fmt += "+Wc0"
174 self.host.loop.loop.call_soon_threadsafe(
175 self.update_sample,
176 sample,
177 stream_size,
178 video_stream,
179 format(img_renderer, img_fmt),
180 )
181 self._processing = False
182
183 buf.unmap(mapinfo)
184
185 return False
186
187 def update_sample(self, sample, stream_size, video_stream: str, buffer) -> None:
188 if sample is None:
189 return
190
191 if video_stream == "remote":
192 return
193
194 self.buffer = buffer