Mercurial > libervia-backend
comparison libervia/frontends/tools/webrtc.py @ 4233:d01b8d002619
cli (call, file), frontends: implement webRTC data channel transfer:
- file send/receive commands now supports webRTC transfer. In `send` command, the
`--webrtc` flags is currenty used to activate it.
- WebRTC related code have been factorized and moved to `libervia.frontends.tools.webrtc*`
modules.
rel 442
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 06 Apr 2024 13:43:09 +0200 |
parents | fe29fbdabce6 |
children | 79c8a70e1813 |
comparison
equal
deleted
inserted
replaced
4232:0fbe5c605eb6 | 4233:d01b8d002619 |
---|---|
14 # GNU Affero General Public License for more details. | 14 # GNU Affero General Public License for more details. |
15 | 15 |
16 # You should have received a copy of the GNU Affero General Public License | 16 # You should have received a copy of the GNU Affero General Public License |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 17 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
18 | 18 |
19 from collections.abc import Awaitable | |
19 import gi | 20 import gi |
20 gi.require_versions({ | 21 |
21 "Gst": "1.0", | 22 gi.require_versions({"Gst": "1.0", "GstWebRTC": "1.0"}) |
22 "GstWebRTC": "1.0" | |
23 }) | |
24 from gi.repository import Gst, GstWebRTC, GstSdp | 23 from gi.repository import Gst, GstWebRTC, GstSdp |
25 | 24 |
26 from libervia.backend.core import exceptions | 25 from libervia.backend.core import exceptions |
27 | 26 |
28 try: | 27 try: |
31 raise exceptions.MissingModule( | 30 raise exceptions.MissingModule( |
32 "No GStreamer Python overrides available. Please install relevant packages on " | 31 "No GStreamer Python overrides available. Please install relevant packages on " |
33 "your system (e.g., `python3-gst-1.0` on Debian and derivatives)." | 32 "your system (e.g., `python3-gst-1.0` on Debian and derivatives)." |
34 ) | 33 ) |
35 import asyncio | 34 import asyncio |
36 from dataclasses import dataclass | |
37 from datetime import datetime | 35 from datetime import datetime |
38 import logging | 36 import logging |
39 from random import randint | |
40 import re | 37 import re |
41 from typing import Callable | 38 from typing import Callable |
42 from urllib.parse import quote_plus | 39 from urllib.parse import quote_plus |
43 | 40 |
44 from libervia.backend.tools.common import data_format | 41 from libervia.backend.tools.common import data_format |
45 from libervia.frontends.tools import aio, display_servers | 42 from libervia.frontends.tools import aio, display_servers, jid |
43 from .webrtc_models import AppSinkData, CallData | |
44 from .webrtc_screenshare import DesktopPortal | |
46 | 45 |
47 current_server = display_servers.detect() | 46 current_server = display_servers.detect() |
48 if current_server == display_servers.X11: | 47 if current_server == display_servers.X11: |
49 # GSTreamer's ximagesrc documentation asks to run this function | 48 # GSTreamer's ximagesrc documentation asks to run this function |
50 import ctypes | 49 import ctypes |
51 ctypes.CDLL('libX11.so.6').XInitThreads() | 50 |
51 ctypes.CDLL("libX11.so.6").XInitThreads() | |
52 | 52 |
53 | 53 |
54 log = logging.getLogger(__name__) | 54 log = logging.getLogger(__name__) |
55 | 55 |
56 Gst.init(None) | 56 Gst.init(None) |
57 | 57 |
58 SOURCES_AUTO = "auto" | 58 SOURCES_AUTO = "auto" |
59 SOURCES_TEST = "test" | 59 SOURCES_TEST = "test" |
60 SOURCES_DATACHANNEL = "datachannel" | |
60 SINKS_APP = "app" | 61 SINKS_APP = "app" |
61 SINKS_AUTO = "auto" | 62 SINKS_AUTO = "auto" |
62 SINKS_TEST = "test" | 63 SINKS_TEST = "test" |
63 | 64 SINKS_DATACHANNEL = "datachannel" |
64 | |
65 class ScreenshareError(Exception): | |
66 pass | |
67 | |
68 | |
69 @dataclass | |
70 class AppSinkData: | |
71 local_video_cb: Callable | |
72 remote_video_cb: Callable|None | |
73 | |
74 | |
75 class DesktopPortal: | |
76 | |
77 def __init__(self, webrtc: "WebRTC"): | |
78 import dbus | |
79 from dbus.mainloop.glib import DBusGMainLoop | |
80 # we want monitors + windows, see https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.ScreenCast.html#org-freedesktop-portal-screencast-availablesourcetypes | |
81 self.dbus = dbus | |
82 self.webrtc = webrtc | |
83 self.sources_type = dbus.UInt32(7) | |
84 DBusGMainLoop(set_as_default=True) | |
85 self.session_bus = dbus.SessionBus() | |
86 portal_object = self.session_bus.get_object( | |
87 'org.freedesktop.portal.Desktop', | |
88 '/org/freedesktop/portal/desktop' | |
89 ) | |
90 self.screencast_interface = dbus.Interface( | |
91 portal_object, | |
92 'org.freedesktop.portal.ScreenCast' | |
93 ) | |
94 self.session_interface = None | |
95 self.session_signal = None | |
96 self.handle_counter = 0 | |
97 self.session_handle = None | |
98 self.stream_data: dict|None = None | |
99 | |
100 @property | |
101 def handle_token(self): | |
102 self.handle_counter += 1 | |
103 return f"libervia{self.handle_counter}" | |
104 | |
105 def on_session_closed(self, details: dict) -> None: | |
106 if self.session_interface is not None: | |
107 self.session_interface = None | |
108 self.webrtc.desktop_sharing = False | |
109 if self.session_signal is not None: | |
110 self.session_signal.remove() | |
111 self.session_signal = None | |
112 | |
113 | |
114 async def dbus_call(self, method_name: str, *args) -> dict: | |
115 """Call a screenshare portal method | |
116 | |
117 This method handle the signal response. | |
118 @param method_name: method to call | |
119 @param args: extra args | |
120 `handle_token` will be automatically added to the last arg (option dict) | |
121 @return: method result | |
122 """ | |
123 if self.session_handle is not None: | |
124 self.end_screenshare() | |
125 method = getattr(self.screencast_interface, method_name) | |
126 options = args[-1] | |
127 reply_fut = asyncio.Future() | |
128 signal_fut = asyncio.Future() | |
129 # cf. https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.Request.html | |
130 handle_token = self.handle_token | |
131 sender = self.session_bus.get_unique_name().replace(".", "_")[1:] | |
132 path = f"/org/freedesktop/portal/desktop/request/{sender}/{handle_token}" | |
133 signal_match = None | |
134 | |
135 def on_signal(response, results): | |
136 assert signal_match is not None | |
137 signal_match.remove() | |
138 if response == 0: | |
139 signal_fut.set_result(results) | |
140 elif response == 1: | |
141 signal_fut.set_exception( | |
142 exceptions.CancelError("Cancelled by user.") | |
143 ) | |
144 else: | |
145 signal_fut.set_exception(ScreenshareError( | |
146 f"Can't get signal result" | |
147 )) | |
148 | |
149 signal_match = self.session_bus.add_signal_receiver( | |
150 on_signal, | |
151 signal_name="Response", | |
152 dbus_interface="org.freedesktop.portal.Request", | |
153 path=path | |
154 ) | |
155 | |
156 options["handle_token"] = handle_token | |
157 | |
158 method( | |
159 *args, | |
160 reply_handler=reply_fut.set_result, | |
161 error_handler=reply_fut.set_exception | |
162 ) | |
163 try: | |
164 await reply_fut | |
165 except Exception as e: | |
166 raise ScreenshareError(f"Can't ask screenshare permission: {e}") | |
167 return await signal_fut | |
168 | |
169 async def request_screenshare(self) -> dict: | |
170 session_data = await self.dbus_call( | |
171 "CreateSession", | |
172 { | |
173 "session_handle_token": str(randint(1, 2**32)), | |
174 } | |
175 ) | |
176 try: | |
177 session_handle = session_data["session_handle"] | |
178 except KeyError: | |
179 raise ScreenshareError("Can't get session handle") | |
180 self.session_handle = session_handle | |
181 | |
182 | |
183 await self.dbus_call( | |
184 "SelectSources", | |
185 session_handle, | |
186 { | |
187 "multiple": True, | |
188 "types": self.sources_type, | |
189 "modal": True | |
190 } | |
191 ) | |
192 screenshare_data = await self.dbus_call( | |
193 "Start", | |
194 session_handle, | |
195 "", | |
196 {} | |
197 ) | |
198 | |
199 session_object = self.session_bus.get_object( | |
200 'org.freedesktop.portal.Desktop', | |
201 session_handle | |
202 ) | |
203 self.session_interface = self.dbus.Interface( | |
204 session_object, | |
205 'org.freedesktop.portal.Session' | |
206 ) | |
207 | |
208 self.session_signal = self.session_bus.add_signal_receiver( | |
209 self.on_session_closed, | |
210 signal_name="Closed", | |
211 dbus_interface="org.freedesktop.portal.Session", | |
212 path=session_handle | |
213 ) | |
214 | |
215 try: | |
216 node_id, stream_data = screenshare_data["streams"][0] | |
217 source_type = int(stream_data["source_type"]) | |
218 except (IndexError, KeyError): | |
219 raise ScreenshareError("Can't parse stream data") | |
220 self.stream_data = stream_data = { | |
221 "session_handle": session_handle, | |
222 "node_id": node_id, | |
223 "source_type": source_type | |
224 } | |
225 try: | |
226 height = int(stream_data["size"][0]) | |
227 weight = int(stream_data["size"][1]) | |
228 except (IndexError, KeyError): | |
229 pass | |
230 else: | |
231 stream_data["size"] = (height, weight) | |
232 | |
233 return self.stream_data | |
234 | |
235 def end_screenshare(self) -> None: | |
236 """Close a running screenshare session, if any.""" | |
237 if self.session_interface is None: | |
238 return | |
239 self.session_interface.Close() | |
240 self.on_session_closed({}) | |
241 | 65 |
242 | 66 |
243 class WebRTC: | 67 class WebRTC: |
244 """GSTreamer based WebRTC implementation for audio and video communication. | 68 """GSTreamer based WebRTC implementation for audio and video communication. |
245 | 69 |
246 This class encapsulates the WebRTC functionalities required for initiating and | 70 This class encapsulates the WebRTC functionalities required for initiating and |
247 handling audio and video calls. | 71 handling audio and video calls, and data channels. |
248 """ | 72 """ |
249 | 73 |
250 def __init__( | 74 def __init__( |
251 self, | 75 self, |
252 bridge, | 76 bridge, |
253 profile: str, | 77 profile: str, |
254 sources: str = SOURCES_AUTO, | 78 sources: str = SOURCES_AUTO, |
255 sinks: str = SINKS_AUTO, | 79 sinks: str = SINKS_AUTO, |
256 appsink_data: AppSinkData | None = None, | 80 appsink_data: AppSinkData | None = None, |
257 reset_cb: Callable | None = None, | 81 reset_cb: Callable | None = None, |
258 merge_pip: bool|None = None, | 82 merge_pip: bool | None = None, |
259 target_size: tuple[int, int]|None = None, | 83 target_size: tuple[int, int] | None = None, |
84 call_start_cb: Callable[[str, dict, str], Awaitable[str]] | None = None, | |
85 dc_open_cb: ( | |
86 Callable[[GstWebRTC.WebRTCDataChannel], Awaitable[None]] | None | |
87 ) = None, | |
88 dc_on_data_channel: ( | |
89 Callable[[GstWebRTC.WebRTCDataChannel], Awaitable[None]] | None | |
90 ) = None, | |
260 ) -> None: | 91 ) -> None: |
261 """Initializes a new WebRTC instance. | 92 """Initializes a new WebRTC instance. |
262 | 93 |
263 @param bridge: An instance of backend bridge. | 94 @param bridge: An instance of backend bridge. |
264 @param profile: Libervia profile. | 95 @param profile: Libervia profile. |
275 used for SINKS_AUTO only for now). | 106 used for SINKS_AUTO only for now). |
276 @param target_size: Expected size of the final sink stream. Mainly use by composer | 107 @param target_size: Expected size of the final sink stream. Mainly use by composer |
277 when ``merge_pip`` is set. | 108 when ``merge_pip`` is set. |
278 None to autodetect (not real autodetection implemeted yet, default to | 109 None to autodetect (not real autodetection implemeted yet, default to |
279 (1280,720)). | 110 (1280,720)). |
111 @param call_start_cb: Called when call is started. | |
112 @param dc_open_cb: Called when Data Channel is open (for SOURCES_DATACHANNEL). | |
113 This callback will be run in a GStreamer thread. | |
114 @param dc_open_cb: Called when Data Channel is created (for SINKS_DATACHANNEL). | |
115 This callback will be run in a GStreamer thread. | |
280 """ | 116 """ |
281 self.main_loop = asyncio.get_event_loop() | 117 self.main_loop = asyncio.get_event_loop() |
282 self.bridge = bridge | 118 self.bridge = bridge |
283 self.profile = profile | 119 self.profile = profile |
284 self.pipeline = None | 120 self.pipeline = None |
287 self._desktop_sharing = False | 123 self._desktop_sharing = False |
288 self.desktop_sharing_data = None | 124 self.desktop_sharing_data = None |
289 self.sources = sources | 125 self.sources = sources |
290 self.sinks = sinks | 126 self.sinks = sinks |
291 if target_size is None: | 127 if target_size is None: |
292 target_size=(1280, 720) | 128 target_size = (1280, 720) |
293 self.target_width, self.target_height = target_size | 129 self.target_width, self.target_height = target_size |
294 if merge_pip is None: | 130 if merge_pip is None: |
295 merge_pip = sinks == SINKS_AUTO | 131 merge_pip = sinks == SINKS_AUTO |
296 self.merge_pip = merge_pip | 132 self.merge_pip = merge_pip |
133 if call_start_cb is None: | |
134 call_start_cb = self._call_start | |
135 self.call_start_cb = call_start_cb | |
136 if sources == SOURCES_DATACHANNEL: | |
137 assert dc_open_cb is not None | |
138 self.dc_open_cb = dc_open_cb | |
139 if sinks == SINKS_DATACHANNEL: | |
140 assert dc_on_data_channel is not None | |
141 self.dc_on_data_channel = dc_on_data_channel | |
297 if sinks == SINKS_APP: | 142 if sinks == SINKS_APP: |
298 if ( | 143 if ( |
299 merge_pip | 144 merge_pip |
300 and appsink_data is not None | 145 and appsink_data is not None |
301 and appsink_data.remote_video_cb is not None | 146 and appsink_data.remote_video_cb is not None |
385 raise ValueError( | 230 raise ValueError( |
386 'Only "desktop_sharing" is currently allowed for binding' | 231 'Only "desktop_sharing" is currently allowed for binding' |
387 ) | 232 ) |
388 self.bindings[key] = cb | 233 self.bindings[key] = cb |
389 | 234 |
390 | |
391 def generate_dot_file( | 235 def generate_dot_file( |
392 self, | 236 self, |
393 filename: str = "pipeline", | 237 filename: str = "pipeline", |
394 details: Gst.DebugGraphDetails = Gst.DebugGraphDetails.ALL, | 238 details: Gst.DebugGraphDetails = Gst.DebugGraphDetails.ALL, |
395 with_timestamp: bool = True, | 239 with_timestamp: bool = True, |
396 bin_: Gst.Bin|None = None, | 240 bin_: Gst.Bin | None = None, |
397 ) -> None: | 241 ) -> None: |
398 """Generate Dot File for debugging | 242 """Generate Dot File for debugging |
399 | 243 |
400 ``GST_DEBUG_DUMP_DOT_DIR`` environment variable must be set to destination dir. | 244 ``GST_DEBUG_DUMP_DOT_DIR`` environment variable must be set to destination dir. |
401 ``dot -Tpng -o <filename>.png <filename>.dot`` can be use to convert to a PNG file. | 245 ``dot -Tpng -o <filename>.png <filename>.dot`` can be use to convert to a PNG file. |
410 (``self.pipeline``) will be used. | 254 (``self.pipeline``) will be used. |
411 """ | 255 """ |
412 if bin_ is None: | 256 if bin_ is None: |
413 bin_ = self.pipeline | 257 bin_ = self.pipeline |
414 if with_timestamp: | 258 if with_timestamp: |
415 timestamp = datetime.now().isoformat(timespec='milliseconds') | 259 timestamp = datetime.now().isoformat(timespec="milliseconds") |
416 filename = f"{timestamp}_filename" | 260 filename = f"{timestamp}_filename" |
417 | 261 |
418 Gst.debug_bin_to_dot_file(bin_, details, filename) | 262 Gst.debug_bin_to_dot_file(bin_, details, filename) |
419 | 263 |
420 def get_sdp_mline_index(self, media_type: str) -> int: | 264 def get_sdp_mline_index(self, media_type: str) -> int: |
554 self.sid: str | None = None | 398 self.sid: str | None = None |
555 self.offer: str | None = None | 399 self.offer: str | None = None |
556 self.local_candidates_buffer = {} | 400 self.local_candidates_buffer = {} |
557 self.ufrag: str | None = None | 401 self.ufrag: str | None = None |
558 self.pwd: str | None = None | 402 self.pwd: str | None = None |
559 self.callee: str | None = None | 403 self.callee: jid.JID | None = None |
560 self._media_types = None | 404 self._media_types = None |
561 self._media_types_inv = None | 405 self._media_types_inv = None |
562 self._sdp_set: bool = False | 406 self._sdp_set: bool = False |
563 self.remote_candidates_buffer: dict[str, dict[str, list]] = { | 407 self.remote_candidates_buffer: dict[str, dict[str, list]] = { |
564 "audio": {"candidates": []}, | 408 "audio": {"candidates": []}, |
574 self.desktop_sink_pad = None | 418 self.desktop_sink_pad = None |
575 self.bindings = {} | 419 self.bindings = {} |
576 if self.reset_cb is not None: | 420 if self.reset_cb is not None: |
577 self.reset_cb() | 421 self.reset_cb() |
578 | 422 |
579 | |
580 async def setup_call( | 423 async def setup_call( |
581 self, | 424 self, |
582 role: str, | 425 role: str, |
583 audio_pt: int | None = 96, | 426 audio_pt: int | None = 96, |
584 video_pt: int | None = 97, | 427 video_pt: int | None = 97, |
596 @raises NotImplementedError: If audio_pt or video_pt is set to None. | 439 @raises NotImplementedError: If audio_pt or video_pt is set to None. |
597 @raises AssertionError: If the role is not 'initiator' or 'responder'. | 440 @raises AssertionError: If the role is not 'initiator' or 'responder'. |
598 """ | 441 """ |
599 assert role in ("initiator", "responder") | 442 assert role in ("initiator", "responder") |
600 self.role = role | 443 self.role = role |
601 if audio_pt is None or video_pt is None: | 444 |
602 raise NotImplementedError("None value is not handled yet") | 445 if self.sources == SOURCES_DATACHANNEL or self.sinks == SINKS_DATACHANNEL: |
603 | 446 # Setup pipeline for datachannel only, no media streams. |
604 if self.sources == SOURCES_AUTO: | 447 self.gst_pipe_desc = f""" |
605 video_source_elt = "v4l2src" | 448 webrtcbin name=sendrecv bundle-policy=max-bundle |
606 audio_source_elt = "pulsesrc" | 449 """ |
607 elif self.sources == SOURCES_TEST: | |
608 video_source_elt = "videotestsrc is-live=true pattern=ball" | |
609 audio_source_elt = "audiotestsrc" | |
610 else: | 450 else: |
611 raise exceptions.InternalError(f'Unknown "sources" value: {self.sources!r}') | 451 if audio_pt is None or video_pt is None: |
612 | 452 raise NotImplementedError("None value is not handled yet") |
613 | 453 |
614 if self.sinks == SINKS_APP: | 454 if self.sources == SOURCES_AUTO: |
615 local_video_sink_elt = ( | 455 video_source_elt = "v4l2src" |
616 "appsink name=local_video_sink emit-signals=true drop=true max-buffers=1 " | 456 audio_source_elt = "pulsesrc" |
617 "sync=True" | 457 elif self.sources == SOURCES_TEST: |
618 ) | 458 video_source_elt = "videotestsrc is-live=true pattern=ball" |
619 elif self.sinks == SINKS_AUTO: | 459 audio_source_elt = "audiotestsrc" |
620 local_video_sink_elt = "autovideosink" | 460 else: |
621 else: | 461 raise exceptions.InternalError( |
622 raise exceptions.InternalError(f"Unknown sinks value {self.sinks!r}") | 462 f'Unknown "sources" value: {self.sources!r}' |
623 | 463 ) |
624 if self.merge_pip: | 464 |
625 extra_elt = ( | 465 if self.sinks == SINKS_APP: |
626 "compositor name=compositor background=black " | 466 local_video_sink_elt = ( |
627 f"! video/x-raw,width={self.target_width},height={self.target_height}," | 467 "appsink name=local_video_sink emit-signals=true drop=true max-buffers=1 " |
628 "framerate=30/1 " | 468 "sync=True" |
629 f"! {local_video_sink_elt}" | 469 ) |
630 ) | 470 elif self.sinks == SINKS_AUTO: |
631 local_video_sink_elt = "compositor.sink_1" | 471 local_video_sink_elt = "autovideosink" |
632 else: | 472 else: |
633 extra_elt = "" | 473 raise exceptions.InternalError(f"Unknown sinks value {self.sinks!r}") |
634 | 474 |
635 self.gst_pipe_desc = f""" | 475 if self.merge_pip: |
636 webrtcbin latency=100 name=sendrecv bundle-policy=max-bundle | 476 extra_elt = ( |
637 | 477 "compositor name=compositor background=black " |
638 input-selector name=video_selector | 478 f"! video/x-raw,width={self.target_width},height={self.target_height}," |
639 ! videorate | 479 "framerate=30/1 " |
640 ! video/x-raw,framerate=30/1 | 480 f"! {local_video_sink_elt}" |
641 ! tee name=t | 481 ) |
642 | 482 local_video_sink_elt = "compositor.sink_1" |
643 {extra_elt} | 483 else: |
644 | 484 extra_elt = "" |
645 {video_source_elt} name=video_src ! queue leaky=downstream ! video_selector. | 485 |
646 videotestsrc name=muted_src is-live=true pattern=black ! queue leaky=downstream ! video_selector. | 486 self.gst_pipe_desc = f""" |
647 | 487 webrtcbin latency=100 name=sendrecv bundle-policy=max-bundle |
648 t. | 488 |
649 ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream | 489 input-selector name=video_selector |
650 ! videoconvert | 490 ! videorate |
651 ! vp8enc deadline=1 keyframe-max-dist=60 | 491 ! video/x-raw,framerate=30/1 |
652 ! rtpvp8pay picture-id-mode=15-bit | 492 ! tee name=t |
653 ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt} | 493 |
654 ! sendrecv. | 494 {extra_elt} |
655 | 495 |
656 t. | 496 {video_source_elt} name=video_src ! queue leaky=downstream ! video_selector. |
657 ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream | 497 videotestsrc name=muted_src is-live=true pattern=black ! queue leaky=downstream ! video_selector. |
658 ! videoconvert | 498 |
659 ! {local_video_sink_elt} | 499 t. |
660 | 500 ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream |
661 {audio_source_elt} name=audio_src | 501 ! videoconvert |
662 ! valve | 502 ! vp8enc deadline=1 keyframe-max-dist=60 |
663 ! queue max-size-buffers=10 max-size-time=0 max-size-bytes=0 leaky=downstream | 503 ! rtpvp8pay picture-id-mode=15-bit |
664 ! audioconvert | 504 ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt} |
665 ! audioresample | 505 ! sendrecv. |
666 ! opusenc audio-type=voice | 506 |
667 ! rtpopuspay | 507 t. |
668 ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt} | 508 ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream |
669 ! sendrecv. | 509 ! videoconvert |
670 """ | 510 ! {local_video_sink_elt} |
511 | |
512 {audio_source_elt} name=audio_src | |
513 ! valve | |
514 ! queue max-size-buffers=10 max-size-time=0 max-size-bytes=0 leaky=downstream | |
515 ! audioconvert | |
516 ! audioresample | |
517 ! opusenc audio-type=voice | |
518 ! rtpopuspay | |
519 ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt} | |
520 ! sendrecv. | |
521 """ | |
671 | 522 |
672 log.debug(f"Gstreamer pipeline: {self.gst_pipe_desc}") | 523 log.debug(f"Gstreamer pipeline: {self.gst_pipe_desc}") |
673 | 524 |
674 # Create the pipeline | 525 # Create the pipeline |
675 try: | 526 try: |
678 log.exception("Can't parse pipeline") | 529 log.exception("Can't parse pipeline") |
679 self.pipeline = None | 530 self.pipeline = None |
680 if not self.pipeline: | 531 if not self.pipeline: |
681 raise exceptions.InternalError("Failed to create Gstreamer pipeline.") | 532 raise exceptions.InternalError("Failed to create Gstreamer pipeline.") |
682 | 533 |
534 if not isinstance(self.pipeline, Gst.Pipeline): | |
535 # in the case of Data Channel there is a single element, and Gst.parse_launch | |
536 # doesn't create a Pipeline in this case, so we do it manually. | |
537 pipeline = Gst.Pipeline() | |
538 pipeline.add(self.pipeline) | |
539 self.pipeline = pipeline | |
540 | |
683 self.webrtcbin = self.pipeline.get_by_name("sendrecv") | 541 self.webrtcbin = self.pipeline.get_by_name("sendrecv") |
684 self.video_src = self.pipeline.get_by_name("video_src") | 542 if self.webrtcbin is None: |
685 self.muted_src = self.pipeline.get_by_name("muted_src") | 543 raise exceptions.InternalError("Can't get the pipeline.") |
686 self.video_selector = self.pipeline.get_by_name("video_selector") | 544 |
687 self.audio_valve = self.pipeline.get_by_name("audio_valve") | 545 # For datachannel setups, media source, selector, and sink elements are not |
688 | 546 # created |
689 if self.video_muted: | 547 if self.sources != SOURCES_DATACHANNEL and self.sinks != SINKS_DATACHANNEL: |
690 self.on_video_mute(True) | 548 self.video_src = self.pipeline.get_by_name("video_src") |
691 if self.audio_muted: | 549 self.muted_src = self.pipeline.get_by_name("muted_src") |
692 self.on_audio_mute(True) | 550 self.video_selector = self.pipeline.get_by_name("video_selector") |
551 self.audio_valve = self.pipeline.get_by_name("audio_valve") | |
552 | |
553 if self.video_muted: | |
554 self.on_video_mute(True) | |
555 if self.audio_muted: | |
556 self.on_audio_mute(True) | |
693 | 557 |
694 # set STUN and TURN servers | 558 # set STUN and TURN servers |
695 external_disco = data_format.deserialise( | 559 external_disco = data_format.deserialise( |
696 await self.bridge.external_disco_get("", self.profile), type_check=list | 560 await self.bridge.external_disco_get("", self.profile), type_check=list |
697 ) | 561 ) |
717 | 581 |
718 if not self.webrtcbin.emit("add-turn-server", url): | 582 if not self.webrtcbin.emit("add-turn-server", url): |
719 log.warning(f"Erreur while adding TURN server {url}") | 583 log.warning(f"Erreur while adding TURN server {url}") |
720 | 584 |
721 # local video feedback | 585 # local video feedback |
722 if self.sinks == SINKS_APP: | 586 if self.sinks == SINKS_APP and self.sources != SOURCES_DATACHANNEL: |
723 assert self.appsink_data is not None | 587 assert self.appsink_data is not None |
724 local_video_sink = self.pipeline.get_by_name("local_video_sink") | 588 local_video_sink = self.pipeline.get_by_name("local_video_sink") |
725 local_video_sink.set_property("emit-signals", True) | 589 local_video_sink.set_property("emit-signals", True) |
726 local_video_sink.connect("new-sample", self.appsink_data.local_video_cb) | 590 local_video_sink.connect("new-sample", self.appsink_data.local_video_cb) |
727 local_video_sink_caps = Gst.Caps.from_string(f"video/x-raw,format=RGB") | 591 local_video_sink_caps = Gst.Caps.from_string(f"video/x-raw,format=RGB") |
743 "notify::ice-gathering-state", self.on_ice_gathering_state_change | 607 "notify::ice-gathering-state", self.on_ice_gathering_state_change |
744 ) | 608 ) |
745 self.webrtcbin.connect( | 609 self.webrtcbin.connect( |
746 "notify::ice-connection-state", self.on_ice_connection_state | 610 "notify::ice-connection-state", self.on_ice_connection_state |
747 ) | 611 ) |
612 | |
613 if self.sources == SOURCES_DATACHANNEL: | |
614 # Data channel configuration for compatibility with browser defaults | |
615 data_channel_options = Gst.Structure.new_empty("data-channel-options") | |
616 data_channel_options.set_value("ordered", True) | |
617 data_channel_options.set_value("protocol", "") | |
618 | |
619 # Create the data channel | |
620 self.pipeline.set_state(Gst.State.READY) | |
621 self.data_channel = self.webrtcbin.emit( | |
622 "create-data-channel", "file", data_channel_options | |
623 ) | |
624 if self.data_channel is None: | |
625 log.error("Failed to create data channel") | |
626 return | |
627 self.data_channel.connect("on-open", self.dc_open_cb) | |
628 if self.sinks == SINKS_DATACHANNEL: | |
629 self.webrtcbin.connect("on-data-channel", self.dc_on_data_channel) | |
748 | 630 |
749 def start_pipeline(self) -> None: | 631 def start_pipeline(self) -> None: |
750 """Starts the GStreamer pipeline.""" | 632 """Starts the GStreamer pipeline.""" |
751 log.debug("starting the pipeline") | 633 log.debug("starting the pipeline") |
752 self.pipeline.set_state(Gst.State.PLAYING) | 634 self.pipeline.set_state(Gst.State.PLAYING) |
811 ) | 693 ) |
812 return False | 694 return False |
813 elif isinstance(dest, Gst.Element): | 695 elif isinstance(dest, Gst.Element): |
814 return source.link(dest) | 696 return source.link(dest) |
815 else: | 697 else: |
816 log.error(f"Unexpected type for dest: {type(sink)}") | 698 log.error(f"Unexpected type for dest: {type(dest)}") |
817 return False | 699 return False |
818 | 700 |
819 return True | 701 return True |
820 | 702 |
821 def scaled_dimensions( | 703 def scaled_dimensions( |
939 capsfilter = Gst.ElementFactory.make("capsfilter") | 821 capsfilter = Gst.ElementFactory.make("capsfilter") |
940 capsfilter.set_property("caps", adjusted_caps) | 822 capsfilter.set_property("caps", adjusted_caps) |
941 | 823 |
942 self.pipeline.add(q, conv, videoscale, capsfilter) | 824 self.pipeline.add(q, conv, videoscale, capsfilter) |
943 | 825 |
944 | |
945 self.pipeline.sync_children_states() | 826 self.pipeline.sync_children_states() |
946 ret = pad.link(q.get_static_pad("sink")) | 827 ret = pad.link(q.get_static_pad("sink")) |
947 if ret != Gst.PadLinkReturn.OK: | 828 if ret != Gst.PadLinkReturn.OK: |
948 log.error(f"Error linking pad: {ret}") | 829 log.error(f"Error linking pad: {ret}") |
949 q.link(conv) | 830 q.link(conv) |
995 decodebin.connect("pad-added", self.on_remote_decodebin_stream) | 876 decodebin.connect("pad-added", self.on_remote_decodebin_stream) |
996 self.pipeline.add(decodebin) | 877 self.pipeline.add(decodebin) |
997 decodebin.sync_state_with_parent() | 878 decodebin.sync_state_with_parent() |
998 pad.link(decodebin.get_static_pad("sink")) | 879 pad.link(decodebin.get_static_pad("sink")) |
999 | 880 |
881 async def _call_start(self, callee: jid.JID, call_data: dict, profile: str) -> str: | |
882 return await self.bridge.call_start( | |
883 str(self.callee), data_format.serialise({"sdp": self.offer}), self.profile | |
884 ) | |
885 | |
1000 async def _start_call(self) -> None: | 886 async def _start_call(self) -> None: |
1001 """Initiate the call. | 887 """Initiate the call. |
1002 | 888 |
1003 Initiates a call with the callee using the stored offer. If there are any buffered | 889 Initiates a call with the callee using the stored offer. If there are any buffered |
1004 local ICE candidates, they are sent as part of the initiation. | 890 local ICE candidates, they are sent as part of the initiation. |
1005 """ | 891 """ |
1006 assert self.callee | 892 assert self.callee |
1007 self.sid = await self.bridge.call_start( | 893 assert self.call_start_cb is not None |
1008 str(self.callee), data_format.serialise({"sdp": self.offer}), self.profile | 894 self.sid = await self.call_start_cb( |
895 self.callee, {"sdp": self.offer}, self.profile | |
1009 ) | 896 ) |
1010 if self.local_candidates_buffer: | 897 if self.local_candidates_buffer: |
1011 log.debug( | 898 log.debug( |
1012 f"sending buffered local ICE candidates: {self.local_candidates_buffer}" | 899 f"sending buffered local ICE candidates: {self.local_candidates_buffer}" |
1013 ) | 900 ) |
1081 """ | 968 """ |
1082 log.debug( | 969 log.debug( |
1083 f"Local ICE candidate. MLine Index: {mline_index}, Candidate: {candidate_sdp}" | 970 f"Local ICE candidate. MLine Index: {mline_index}, Candidate: {candidate_sdp}" |
1084 ) | 971 ) |
1085 parsed_candidate = self.parse_ice_candidate(candidate_sdp) | 972 parsed_candidate = self.parse_ice_candidate(candidate_sdp) |
973 if parsed_candidate is None: | |
974 log.warning(f"Can't parse candidate: {candidate_sdp}") | |
975 return | |
1086 try: | 976 try: |
1087 media_type = self.media_types[mline_index] | 977 media_type = self.media_types[mline_index] |
1088 except KeyError: | 978 except KeyError: |
1089 raise exceptions.InternalError("can't find media type") | 979 raise exceptions.InternalError("can't find media type") |
1090 | 980 |
1127 try: | 1017 try: |
1128 mline_index = self.get_sdp_mline_index(media_type) | 1018 mline_index = self.get_sdp_mline_index(media_type) |
1129 except Exception as e: | 1019 except Exception as e: |
1130 raise exceptions.InternalError(f"Can't find sdp mline index: {e}") | 1020 raise exceptions.InternalError(f"Can't find sdp mline index: {e}") |
1131 self.webrtcbin.emit("add-ice-candidate", mline_index, candidate_sdp) | 1021 self.webrtcbin.emit("add-ice-candidate", mline_index, candidate_sdp) |
1132 log.debug( | 1022 log.warning( |
1133 f"Remote ICE candidate added. MLine Index: {mline_index}, " | 1023 f"Remote ICE candidate added. MLine Index: {mline_index}, " |
1134 f"{candidate_sdp}" | 1024 f"{candidate_sdp}" |
1135 ) | 1025 ) |
1136 | 1026 |
1137 def on_ice_gathering_state_change(self, pspec, __): | 1027 def on_ice_gathering_state_change(self, pspec, __): |
1176 """Handles (un)muting of video. | 1066 """Handles (un)muting of video. |
1177 | 1067 |
1178 @param muted: True if video is muted. | 1068 @param muted: True if video is muted. |
1179 """ | 1069 """ |
1180 if self.video_selector is not None: | 1070 if self.video_selector is not None: |
1181 current_source = None if muted else "desktop" if self.desktop_sharing else "video" | 1071 current_source = ( |
1072 None if muted else "desktop" if self.desktop_sharing else "video" | |
1073 ) | |
1182 self.switch_video_source(current_source) | 1074 self.switch_video_source(current_source) |
1183 state = "muted" if muted else "unmuted" | 1075 state = "muted" if muted else "unmuted" |
1184 log.info(f"Video is now {state}") | 1076 log.info(f"Video is now {state}") |
1185 | 1077 |
1186 def on_desktop_switch(self, desktop_active: bool) -> None: | 1078 def on_desktop_switch(self, desktop_active: bool) -> None: |
1199 try: | 1091 try: |
1200 screenshare_data = await self.desktop_portal.request_screenshare() | 1092 screenshare_data = await self.desktop_portal.request_screenshare() |
1201 except exceptions.CancelError: | 1093 except exceptions.CancelError: |
1202 self.desktop_sharing = False | 1094 self.desktop_sharing = False |
1203 return | 1095 return |
1204 self.desktop_sharing_data = { | 1096 self.desktop_sharing_data = {"path": str(screenshare_data["node_id"])} |
1205 "path": str(screenshare_data["node_id"]) | |
1206 } | |
1207 self.do_desktop_switch(desktop_active) | 1097 self.do_desktop_switch(desktop_active) |
1208 | 1098 |
1209 def do_desktop_switch(self, desktop_active: bool) -> None: | 1099 def do_desktop_switch(self, desktop_active: bool) -> None: |
1210 if self.video_muted: | 1100 if self.video_muted: |
1211 # Update the active source state but do not switch | 1101 # Update the active source state but do not switch |
1214 | 1104 |
1215 source = "desktop" if desktop_active else "video" | 1105 source = "desktop" if desktop_active else "video" |
1216 self.switch_video_source(source) | 1106 self.switch_video_source(source) |
1217 self.desktop_sharing = desktop_active | 1107 self.desktop_sharing = desktop_active |
1218 | 1108 |
1219 def switch_video_source(self, source: str|None) -> None: | 1109 def switch_video_source(self, source: str | None) -> None: |
1220 """Activates the specified source while deactivating the others. | 1110 """Activates the specified source while deactivating the others. |
1221 | 1111 |
1222 @param source: 'desktop', 'video', 'muted' or None for muted source. | 1112 @param source: 'desktop', 'video', 'muted' or None for muted source. |
1223 """ | 1113 """ |
1224 if source is None: | 1114 if source is None: |
1250 # Set the video_selector active pad | 1140 # Set the video_selector active pad |
1251 if source == "desktop": | 1141 if source == "desktop": |
1252 if self.desktop_sink_pad: | 1142 if self.desktop_sink_pad: |
1253 pad = self.desktop_sink_pad | 1143 pad = self.desktop_sink_pad |
1254 else: | 1144 else: |
1255 log.error(f"No desktop pad available") | 1145 log.error(f"No desktop pad available") |
1256 pad = None | 1146 pad = None |
1257 else: | 1147 else: |
1258 pad_name = f"sink_{['video', 'muted'].index(source)}" | 1148 pad_name = f"sink_{['video', 'muted'].index(source)}" |
1259 pad = self.video_selector.get_static_pad(pad_name) | 1149 pad = self.video_selector.get_static_pad(pad_name) |
1260 | 1150 |
1261 if pad is not None: | 1151 if pad is not None: |
1262 self.video_selector.props.active_pad = pad | 1152 self.video_selector.props.active_pad = pad |
1263 | 1153 |
1264 self.pipeline.set_state(Gst.State.PLAYING) | 1154 self.pipeline.set_state(Gst.State.PLAYING) |
1265 | 1155 |
1266 def _setup_desktop_source(self, properties: dict[str, object]|None) -> None: | 1156 def _setup_desktop_source(self, properties: dict[str, object] | None) -> None: |
1267 """Set up a new desktop source. | 1157 """Set up a new desktop source. |
1268 | 1158 |
1269 @param properties: The properties to set on the desktop source. | 1159 @param properties: The properties to set on the desktop source. |
1270 """ | 1160 """ |
1271 source_elt = "ximagesrc" if self.desktop_portal is None else "pipewiresrc" | 1161 source_elt = "ximagesrc" if self.desktop_portal is None else "pipewiresrc" |
1285 | 1175 |
1286 desktop_src.link(video_convert) | 1176 desktop_src.link(video_convert) |
1287 video_convert.link(queue) | 1177 video_convert.link(queue) |
1288 | 1178 |
1289 sink_pad_template = self.video_selector.get_pad_template("sink_%u") | 1179 sink_pad_template = self.video_selector.get_pad_template("sink_%u") |
1290 self.desktop_sink_pad = self.video_selector.request_pad(sink_pad_template, None, None) | 1180 self.desktop_sink_pad = self.video_selector.request_pad( |
1181 sink_pad_template, None, None | |
1182 ) | |
1291 queue_src_pad = queue.get_static_pad("src") | 1183 queue_src_pad = queue.get_static_pad("src") |
1292 queue_src_pad.link(self.desktop_sink_pad) | 1184 queue_src_pad.link(self.desktop_sink_pad) |
1293 | 1185 |
1294 desktop_src.sync_state_with_parent() | 1186 desktop_src.sync_state_with_parent() |
1295 video_convert.sync_state_with_parent() | 1187 video_convert.sync_state_with_parent() |
1325 self.desktop_portal.end_screenshare() | 1217 self.desktop_portal.end_screenshare() |
1326 | 1218 |
1327 async def end_call(self) -> None: | 1219 async def end_call(self) -> None: |
1328 """Stop streaming and clean instance""" | 1220 """Stop streaming and clean instance""" |
1329 self.reset_instance() | 1221 self.reset_instance() |
1222 | |
1223 | |
1224 class WebRTCCall: | |
1225 """Helper class to create and handle WebRTC. | |
1226 | |
1227 This class handles signals and communication of connection data with backend. | |
1228 | |
1229 """ | |
1230 | |
1231 def __init__( | |
1232 self, | |
1233 bridge, | |
1234 profile: str, | |
1235 callee: jid.JID, | |
1236 on_call_setup_cb: Callable | None = None, | |
1237 on_call_ended_cb: Callable | None = None, | |
1238 **kwargs, | |
1239 ): | |
1240 """Create and setup a webRTC instance | |
1241 | |
1242 @param bridge: async Bridge. | |
1243 @param profile: profile making or receiving the call | |
1244 @param callee: peer jid | |
1245 @param kwargs: extra kw args to use when instantiating WebRTC | |
1246 """ | |
1247 self.profile = profile | |
1248 self.webrtc = WebRTC(bridge, profile, **kwargs) | |
1249 self.webrtc.callee = callee | |
1250 self.on_call_setup_cb = on_call_setup_cb | |
1251 self.on_call_ended_cb = on_call_ended_cb | |
1252 bridge.register_signal( | |
1253 "ice_candidates_new", self.on_ice_candidates_new, "plugin" | |
1254 ) | |
1255 bridge.register_signal("call_setup", self.on_call_setup, "plugin") | |
1256 bridge.register_signal("call_ended", self.on_call_ended, "plugin") | |
1257 | |
1258 @classmethod | |
1259 async def make_webrtc_call( | |
1260 cls, bridge, profile: str, call_data: CallData, **kwargs | |
1261 ) -> "WebRTCCall": | |
1262 """Create the webrtc_call instance | |
1263 | |
1264 @param call_data: Call data of the command | |
1265 @param kwargs: extra args used to instanciate WebRTCCall | |
1266 | |
1267 """ | |
1268 webrtc_call = cls(bridge, profile, call_data.callee, **call_data.kwargs, **kwargs) | |
1269 if call_data.sid is None: | |
1270 # we are making the call | |
1271 await webrtc_call.start() | |
1272 else: | |
1273 # we are receiving the call | |
1274 webrtc_call.sid = call_data.sid | |
1275 if call_data.action_id is not None: | |
1276 await bridge.action_launch( | |
1277 call_data.action_id, | |
1278 data_format.serialise({"cancelled": False}), | |
1279 profile, | |
1280 ) | |
1281 return webrtc_call | |
1282 | |
1283 @property | |
1284 def sid(self) -> str | None: | |
1285 return self.webrtc.sid | |
1286 | |
1287 @sid.setter | |
1288 def sid(self, new_sid: str | None) -> None: | |
1289 self.webrtc.sid = new_sid | |
1290 | |
1291 async def on_ice_candidates_new( | |
1292 self, sid: str, candidates_s: str, profile: str | |
1293 ) -> None: | |
1294 if sid != self.webrtc.sid or profile != self.profile: | |
1295 return | |
1296 self.webrtc.on_ice_candidates_new( | |
1297 data_format.deserialise(candidates_s), | |
1298 ) | |
1299 | |
1300 async def on_call_setup(self, sid: str, setup_data_s: str, profile: str) -> None: | |
1301 if sid != self.webrtc.sid or profile != self.profile: | |
1302 return | |
1303 setup_data = data_format.deserialise(setup_data_s) | |
1304 try: | |
1305 role = setup_data["role"] | |
1306 sdp = setup_data["sdp"] | |
1307 except KeyError: | |
1308 log.error(f"Invalid setup data received: {setup_data}") | |
1309 return | |
1310 if role == "initiator": | |
1311 self.webrtc.on_accepted_call(sdp, profile) | |
1312 elif role == "responder": | |
1313 await self.webrtc.answer_call(sdp, profile) | |
1314 else: | |
1315 log.error(f"Invalid role received during setup: {setup_data}") | |
1316 if self.on_call_setup_cb is not None: | |
1317 await aio.maybe_async(self.on_call_setup_cb(sid, profile)) | |
1318 | |
1319 async def on_call_ended(self, sid: str, data_s: str, profile: str) -> None: | |
1320 if sid != self.webrtc.sid or profile != self.profile: | |
1321 return | |
1322 await self.webrtc.end_call() | |
1323 if self.on_call_ended_cb is not None: | |
1324 await aio.maybe_async(self.on_call_ended_cb(sid, profile)) | |
1325 | |
1326 async def start(self): | |
1327 """Start a call. | |
1328 | |
1329 To be used only if we are initiator | |
1330 """ | |
1331 await self.webrtc.setup_call("initiator") | |
1332 self.webrtc.start_pipeline() |