comparison libervia/frontends/tools/webrtc.py @ 4233:d01b8d002619

cli (call, file), frontends: implement webRTC data channel transfer: - file send/receive commands now supports webRTC transfer. In `send` command, the `--webrtc` flags is currenty used to activate it. - WebRTC related code have been factorized and moved to `libervia.frontends.tools.webrtc*` modules. rel 442
author Goffi <goffi@goffi.org>
date Sat, 06 Apr 2024 13:43:09 +0200
parents fe29fbdabce6
children 79c8a70e1813
comparison
equal deleted inserted replaced
4232:0fbe5c605eb6 4233:d01b8d002619
14 # GNU Affero General Public License for more details. 14 # GNU Affero General Public License for more details.
15 15
16 # You should have received a copy of the GNU Affero General Public License 16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. 17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 18
19 from collections.abc import Awaitable
19 import gi 20 import gi
20 gi.require_versions({ 21
21 "Gst": "1.0", 22 gi.require_versions({"Gst": "1.0", "GstWebRTC": "1.0"})
22 "GstWebRTC": "1.0"
23 })
24 from gi.repository import Gst, GstWebRTC, GstSdp 23 from gi.repository import Gst, GstWebRTC, GstSdp
25 24
26 from libervia.backend.core import exceptions 25 from libervia.backend.core import exceptions
27 26
28 try: 27 try:
31 raise exceptions.MissingModule( 30 raise exceptions.MissingModule(
32 "No GStreamer Python overrides available. Please install relevant packages on " 31 "No GStreamer Python overrides available. Please install relevant packages on "
33 "your system (e.g., `python3-gst-1.0` on Debian and derivatives)." 32 "your system (e.g., `python3-gst-1.0` on Debian and derivatives)."
34 ) 33 )
35 import asyncio 34 import asyncio
36 from dataclasses import dataclass
37 from datetime import datetime 35 from datetime import datetime
38 import logging 36 import logging
39 from random import randint
40 import re 37 import re
41 from typing import Callable 38 from typing import Callable
42 from urllib.parse import quote_plus 39 from urllib.parse import quote_plus
43 40
44 from libervia.backend.tools.common import data_format 41 from libervia.backend.tools.common import data_format
45 from libervia.frontends.tools import aio, display_servers 42 from libervia.frontends.tools import aio, display_servers, jid
43 from .webrtc_models import AppSinkData, CallData
44 from .webrtc_screenshare import DesktopPortal
46 45
47 current_server = display_servers.detect() 46 current_server = display_servers.detect()
48 if current_server == display_servers.X11: 47 if current_server == display_servers.X11:
49 # GSTreamer's ximagesrc documentation asks to run this function 48 # GSTreamer's ximagesrc documentation asks to run this function
50 import ctypes 49 import ctypes
51 ctypes.CDLL('libX11.so.6').XInitThreads() 50
51 ctypes.CDLL("libX11.so.6").XInitThreads()
52 52
53 53
54 log = logging.getLogger(__name__) 54 log = logging.getLogger(__name__)
55 55
56 Gst.init(None) 56 Gst.init(None)
57 57
58 SOURCES_AUTO = "auto" 58 SOURCES_AUTO = "auto"
59 SOURCES_TEST = "test" 59 SOURCES_TEST = "test"
60 SOURCES_DATACHANNEL = "datachannel"
60 SINKS_APP = "app" 61 SINKS_APP = "app"
61 SINKS_AUTO = "auto" 62 SINKS_AUTO = "auto"
62 SINKS_TEST = "test" 63 SINKS_TEST = "test"
63 64 SINKS_DATACHANNEL = "datachannel"
64
65 class ScreenshareError(Exception):
66 pass
67
68
69 @dataclass
70 class AppSinkData:
71 local_video_cb: Callable
72 remote_video_cb: Callable|None
73
74
75 class DesktopPortal:
76
77 def __init__(self, webrtc: "WebRTC"):
78 import dbus
79 from dbus.mainloop.glib import DBusGMainLoop
80 # we want monitors + windows, see https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.ScreenCast.html#org-freedesktop-portal-screencast-availablesourcetypes
81 self.dbus = dbus
82 self.webrtc = webrtc
83 self.sources_type = dbus.UInt32(7)
84 DBusGMainLoop(set_as_default=True)
85 self.session_bus = dbus.SessionBus()
86 portal_object = self.session_bus.get_object(
87 'org.freedesktop.portal.Desktop',
88 '/org/freedesktop/portal/desktop'
89 )
90 self.screencast_interface = dbus.Interface(
91 portal_object,
92 'org.freedesktop.portal.ScreenCast'
93 )
94 self.session_interface = None
95 self.session_signal = None
96 self.handle_counter = 0
97 self.session_handle = None
98 self.stream_data: dict|None = None
99
100 @property
101 def handle_token(self):
102 self.handle_counter += 1
103 return f"libervia{self.handle_counter}"
104
105 def on_session_closed(self, details: dict) -> None:
106 if self.session_interface is not None:
107 self.session_interface = None
108 self.webrtc.desktop_sharing = False
109 if self.session_signal is not None:
110 self.session_signal.remove()
111 self.session_signal = None
112
113
114 async def dbus_call(self, method_name: str, *args) -> dict:
115 """Call a screenshare portal method
116
117 This method handle the signal response.
118 @param method_name: method to call
119 @param args: extra args
120 `handle_token` will be automatically added to the last arg (option dict)
121 @return: method result
122 """
123 if self.session_handle is not None:
124 self.end_screenshare()
125 method = getattr(self.screencast_interface, method_name)
126 options = args[-1]
127 reply_fut = asyncio.Future()
128 signal_fut = asyncio.Future()
129 # cf. https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.Request.html
130 handle_token = self.handle_token
131 sender = self.session_bus.get_unique_name().replace(".", "_")[1:]
132 path = f"/org/freedesktop/portal/desktop/request/{sender}/{handle_token}"
133 signal_match = None
134
135 def on_signal(response, results):
136 assert signal_match is not None
137 signal_match.remove()
138 if response == 0:
139 signal_fut.set_result(results)
140 elif response == 1:
141 signal_fut.set_exception(
142 exceptions.CancelError("Cancelled by user.")
143 )
144 else:
145 signal_fut.set_exception(ScreenshareError(
146 f"Can't get signal result"
147 ))
148
149 signal_match = self.session_bus.add_signal_receiver(
150 on_signal,
151 signal_name="Response",
152 dbus_interface="org.freedesktop.portal.Request",
153 path=path
154 )
155
156 options["handle_token"] = handle_token
157
158 method(
159 *args,
160 reply_handler=reply_fut.set_result,
161 error_handler=reply_fut.set_exception
162 )
163 try:
164 await reply_fut
165 except Exception as e:
166 raise ScreenshareError(f"Can't ask screenshare permission: {e}")
167 return await signal_fut
168
169 async def request_screenshare(self) -> dict:
170 session_data = await self.dbus_call(
171 "CreateSession",
172 {
173 "session_handle_token": str(randint(1, 2**32)),
174 }
175 )
176 try:
177 session_handle = session_data["session_handle"]
178 except KeyError:
179 raise ScreenshareError("Can't get session handle")
180 self.session_handle = session_handle
181
182
183 await self.dbus_call(
184 "SelectSources",
185 session_handle,
186 {
187 "multiple": True,
188 "types": self.sources_type,
189 "modal": True
190 }
191 )
192 screenshare_data = await self.dbus_call(
193 "Start",
194 session_handle,
195 "",
196 {}
197 )
198
199 session_object = self.session_bus.get_object(
200 'org.freedesktop.portal.Desktop',
201 session_handle
202 )
203 self.session_interface = self.dbus.Interface(
204 session_object,
205 'org.freedesktop.portal.Session'
206 )
207
208 self.session_signal = self.session_bus.add_signal_receiver(
209 self.on_session_closed,
210 signal_name="Closed",
211 dbus_interface="org.freedesktop.portal.Session",
212 path=session_handle
213 )
214
215 try:
216 node_id, stream_data = screenshare_data["streams"][0]
217 source_type = int(stream_data["source_type"])
218 except (IndexError, KeyError):
219 raise ScreenshareError("Can't parse stream data")
220 self.stream_data = stream_data = {
221 "session_handle": session_handle,
222 "node_id": node_id,
223 "source_type": source_type
224 }
225 try:
226 height = int(stream_data["size"][0])
227 weight = int(stream_data["size"][1])
228 except (IndexError, KeyError):
229 pass
230 else:
231 stream_data["size"] = (height, weight)
232
233 return self.stream_data
234
235 def end_screenshare(self) -> None:
236 """Close a running screenshare session, if any."""
237 if self.session_interface is None:
238 return
239 self.session_interface.Close()
240 self.on_session_closed({})
241 65
242 66
243 class WebRTC: 67 class WebRTC:
244 """GSTreamer based WebRTC implementation for audio and video communication. 68 """GSTreamer based WebRTC implementation for audio and video communication.
245 69
246 This class encapsulates the WebRTC functionalities required for initiating and 70 This class encapsulates the WebRTC functionalities required for initiating and
247 handling audio and video calls. 71 handling audio and video calls, and data channels.
248 """ 72 """
249 73
250 def __init__( 74 def __init__(
251 self, 75 self,
252 bridge, 76 bridge,
253 profile: str, 77 profile: str,
254 sources: str = SOURCES_AUTO, 78 sources: str = SOURCES_AUTO,
255 sinks: str = SINKS_AUTO, 79 sinks: str = SINKS_AUTO,
256 appsink_data: AppSinkData | None = None, 80 appsink_data: AppSinkData | None = None,
257 reset_cb: Callable | None = None, 81 reset_cb: Callable | None = None,
258 merge_pip: bool|None = None, 82 merge_pip: bool | None = None,
259 target_size: tuple[int, int]|None = None, 83 target_size: tuple[int, int] | None = None,
84 call_start_cb: Callable[[str, dict, str], Awaitable[str]] | None = None,
85 dc_open_cb: (
86 Callable[[GstWebRTC.WebRTCDataChannel], Awaitable[None]] | None
87 ) = None,
88 dc_on_data_channel: (
89 Callable[[GstWebRTC.WebRTCDataChannel], Awaitable[None]] | None
90 ) = None,
260 ) -> None: 91 ) -> None:
261 """Initializes a new WebRTC instance. 92 """Initializes a new WebRTC instance.
262 93
263 @param bridge: An instance of backend bridge. 94 @param bridge: An instance of backend bridge.
264 @param profile: Libervia profile. 95 @param profile: Libervia profile.
275 used for SINKS_AUTO only for now). 106 used for SINKS_AUTO only for now).
276 @param target_size: Expected size of the final sink stream. Mainly use by composer 107 @param target_size: Expected size of the final sink stream. Mainly use by composer
277 when ``merge_pip`` is set. 108 when ``merge_pip`` is set.
278 None to autodetect (not real autodetection implemeted yet, default to 109 None to autodetect (not real autodetection implemeted yet, default to
279 (1280,720)). 110 (1280,720)).
111 @param call_start_cb: Called when call is started.
112 @param dc_open_cb: Called when Data Channel is open (for SOURCES_DATACHANNEL).
113 This callback will be run in a GStreamer thread.
114 @param dc_open_cb: Called when Data Channel is created (for SINKS_DATACHANNEL).
115 This callback will be run in a GStreamer thread.
280 """ 116 """
281 self.main_loop = asyncio.get_event_loop() 117 self.main_loop = asyncio.get_event_loop()
282 self.bridge = bridge 118 self.bridge = bridge
283 self.profile = profile 119 self.profile = profile
284 self.pipeline = None 120 self.pipeline = None
287 self._desktop_sharing = False 123 self._desktop_sharing = False
288 self.desktop_sharing_data = None 124 self.desktop_sharing_data = None
289 self.sources = sources 125 self.sources = sources
290 self.sinks = sinks 126 self.sinks = sinks
291 if target_size is None: 127 if target_size is None:
292 target_size=(1280, 720) 128 target_size = (1280, 720)
293 self.target_width, self.target_height = target_size 129 self.target_width, self.target_height = target_size
294 if merge_pip is None: 130 if merge_pip is None:
295 merge_pip = sinks == SINKS_AUTO 131 merge_pip = sinks == SINKS_AUTO
296 self.merge_pip = merge_pip 132 self.merge_pip = merge_pip
133 if call_start_cb is None:
134 call_start_cb = self._call_start
135 self.call_start_cb = call_start_cb
136 if sources == SOURCES_DATACHANNEL:
137 assert dc_open_cb is not None
138 self.dc_open_cb = dc_open_cb
139 if sinks == SINKS_DATACHANNEL:
140 assert dc_on_data_channel is not None
141 self.dc_on_data_channel = dc_on_data_channel
297 if sinks == SINKS_APP: 142 if sinks == SINKS_APP:
298 if ( 143 if (
299 merge_pip 144 merge_pip
300 and appsink_data is not None 145 and appsink_data is not None
301 and appsink_data.remote_video_cb is not None 146 and appsink_data.remote_video_cb is not None
385 raise ValueError( 230 raise ValueError(
386 'Only "desktop_sharing" is currently allowed for binding' 231 'Only "desktop_sharing" is currently allowed for binding'
387 ) 232 )
388 self.bindings[key] = cb 233 self.bindings[key] = cb
389 234
390
391 def generate_dot_file( 235 def generate_dot_file(
392 self, 236 self,
393 filename: str = "pipeline", 237 filename: str = "pipeline",
394 details: Gst.DebugGraphDetails = Gst.DebugGraphDetails.ALL, 238 details: Gst.DebugGraphDetails = Gst.DebugGraphDetails.ALL,
395 with_timestamp: bool = True, 239 with_timestamp: bool = True,
396 bin_: Gst.Bin|None = None, 240 bin_: Gst.Bin | None = None,
397 ) -> None: 241 ) -> None:
398 """Generate Dot File for debugging 242 """Generate Dot File for debugging
399 243
400 ``GST_DEBUG_DUMP_DOT_DIR`` environment variable must be set to destination dir. 244 ``GST_DEBUG_DUMP_DOT_DIR`` environment variable must be set to destination dir.
401 ``dot -Tpng -o <filename>.png <filename>.dot`` can be use to convert to a PNG file. 245 ``dot -Tpng -o <filename>.png <filename>.dot`` can be use to convert to a PNG file.
410 (``self.pipeline``) will be used. 254 (``self.pipeline``) will be used.
411 """ 255 """
412 if bin_ is None: 256 if bin_ is None:
413 bin_ = self.pipeline 257 bin_ = self.pipeline
414 if with_timestamp: 258 if with_timestamp:
415 timestamp = datetime.now().isoformat(timespec='milliseconds') 259 timestamp = datetime.now().isoformat(timespec="milliseconds")
416 filename = f"{timestamp}_filename" 260 filename = f"{timestamp}_filename"
417 261
418 Gst.debug_bin_to_dot_file(bin_, details, filename) 262 Gst.debug_bin_to_dot_file(bin_, details, filename)
419 263
420 def get_sdp_mline_index(self, media_type: str) -> int: 264 def get_sdp_mline_index(self, media_type: str) -> int:
554 self.sid: str | None = None 398 self.sid: str | None = None
555 self.offer: str | None = None 399 self.offer: str | None = None
556 self.local_candidates_buffer = {} 400 self.local_candidates_buffer = {}
557 self.ufrag: str | None = None 401 self.ufrag: str | None = None
558 self.pwd: str | None = None 402 self.pwd: str | None = None
559 self.callee: str | None = None 403 self.callee: jid.JID | None = None
560 self._media_types = None 404 self._media_types = None
561 self._media_types_inv = None 405 self._media_types_inv = None
562 self._sdp_set: bool = False 406 self._sdp_set: bool = False
563 self.remote_candidates_buffer: dict[str, dict[str, list]] = { 407 self.remote_candidates_buffer: dict[str, dict[str, list]] = {
564 "audio": {"candidates": []}, 408 "audio": {"candidates": []},
574 self.desktop_sink_pad = None 418 self.desktop_sink_pad = None
575 self.bindings = {} 419 self.bindings = {}
576 if self.reset_cb is not None: 420 if self.reset_cb is not None:
577 self.reset_cb() 421 self.reset_cb()
578 422
579
580 async def setup_call( 423 async def setup_call(
581 self, 424 self,
582 role: str, 425 role: str,
583 audio_pt: int | None = 96, 426 audio_pt: int | None = 96,
584 video_pt: int | None = 97, 427 video_pt: int | None = 97,
596 @raises NotImplementedError: If audio_pt or video_pt is set to None. 439 @raises NotImplementedError: If audio_pt or video_pt is set to None.
597 @raises AssertionError: If the role is not 'initiator' or 'responder'. 440 @raises AssertionError: If the role is not 'initiator' or 'responder'.
598 """ 441 """
599 assert role in ("initiator", "responder") 442 assert role in ("initiator", "responder")
600 self.role = role 443 self.role = role
601 if audio_pt is None or video_pt is None: 444
602 raise NotImplementedError("None value is not handled yet") 445 if self.sources == SOURCES_DATACHANNEL or self.sinks == SINKS_DATACHANNEL:
603 446 # Setup pipeline for datachannel only, no media streams.
604 if self.sources == SOURCES_AUTO: 447 self.gst_pipe_desc = f"""
605 video_source_elt = "v4l2src" 448 webrtcbin name=sendrecv bundle-policy=max-bundle
606 audio_source_elt = "pulsesrc" 449 """
607 elif self.sources == SOURCES_TEST:
608 video_source_elt = "videotestsrc is-live=true pattern=ball"
609 audio_source_elt = "audiotestsrc"
610 else: 450 else:
611 raise exceptions.InternalError(f'Unknown "sources" value: {self.sources!r}') 451 if audio_pt is None or video_pt is None:
612 452 raise NotImplementedError("None value is not handled yet")
613 453
614 if self.sinks == SINKS_APP: 454 if self.sources == SOURCES_AUTO:
615 local_video_sink_elt = ( 455 video_source_elt = "v4l2src"
616 "appsink name=local_video_sink emit-signals=true drop=true max-buffers=1 " 456 audio_source_elt = "pulsesrc"
617 "sync=True" 457 elif self.sources == SOURCES_TEST:
618 ) 458 video_source_elt = "videotestsrc is-live=true pattern=ball"
619 elif self.sinks == SINKS_AUTO: 459 audio_source_elt = "audiotestsrc"
620 local_video_sink_elt = "autovideosink" 460 else:
621 else: 461 raise exceptions.InternalError(
622 raise exceptions.InternalError(f"Unknown sinks value {self.sinks!r}") 462 f'Unknown "sources" value: {self.sources!r}'
623 463 )
624 if self.merge_pip: 464
625 extra_elt = ( 465 if self.sinks == SINKS_APP:
626 "compositor name=compositor background=black " 466 local_video_sink_elt = (
627 f"! video/x-raw,width={self.target_width},height={self.target_height}," 467 "appsink name=local_video_sink emit-signals=true drop=true max-buffers=1 "
628 "framerate=30/1 " 468 "sync=True"
629 f"! {local_video_sink_elt}" 469 )
630 ) 470 elif self.sinks == SINKS_AUTO:
631 local_video_sink_elt = "compositor.sink_1" 471 local_video_sink_elt = "autovideosink"
632 else: 472 else:
633 extra_elt = "" 473 raise exceptions.InternalError(f"Unknown sinks value {self.sinks!r}")
634 474
635 self.gst_pipe_desc = f""" 475 if self.merge_pip:
636 webrtcbin latency=100 name=sendrecv bundle-policy=max-bundle 476 extra_elt = (
637 477 "compositor name=compositor background=black "
638 input-selector name=video_selector 478 f"! video/x-raw,width={self.target_width},height={self.target_height},"
639 ! videorate 479 "framerate=30/1 "
640 ! video/x-raw,framerate=30/1 480 f"! {local_video_sink_elt}"
641 ! tee name=t 481 )
642 482 local_video_sink_elt = "compositor.sink_1"
643 {extra_elt} 483 else:
644 484 extra_elt = ""
645 {video_source_elt} name=video_src ! queue leaky=downstream ! video_selector. 485
646 videotestsrc name=muted_src is-live=true pattern=black ! queue leaky=downstream ! video_selector. 486 self.gst_pipe_desc = f"""
647 487 webrtcbin latency=100 name=sendrecv bundle-policy=max-bundle
648 t. 488
649 ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream 489 input-selector name=video_selector
650 ! videoconvert 490 ! videorate
651 ! vp8enc deadline=1 keyframe-max-dist=60 491 ! video/x-raw,framerate=30/1
652 ! rtpvp8pay picture-id-mode=15-bit 492 ! tee name=t
653 ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt} 493
654 ! sendrecv. 494 {extra_elt}
655 495
656 t. 496 {video_source_elt} name=video_src ! queue leaky=downstream ! video_selector.
657 ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream 497 videotestsrc name=muted_src is-live=true pattern=black ! queue leaky=downstream ! video_selector.
658 ! videoconvert 498
659 ! {local_video_sink_elt} 499 t.
660 500 ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream
661 {audio_source_elt} name=audio_src 501 ! videoconvert
662 ! valve 502 ! vp8enc deadline=1 keyframe-max-dist=60
663 ! queue max-size-buffers=10 max-size-time=0 max-size-bytes=0 leaky=downstream 503 ! rtpvp8pay picture-id-mode=15-bit
664 ! audioconvert 504 ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt}
665 ! audioresample 505 ! sendrecv.
666 ! opusenc audio-type=voice 506
667 ! rtpopuspay 507 t.
668 ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt} 508 ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream
669 ! sendrecv. 509 ! videoconvert
670 """ 510 ! {local_video_sink_elt}
511
512 {audio_source_elt} name=audio_src
513 ! valve
514 ! queue max-size-buffers=10 max-size-time=0 max-size-bytes=0 leaky=downstream
515 ! audioconvert
516 ! audioresample
517 ! opusenc audio-type=voice
518 ! rtpopuspay
519 ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt}
520 ! sendrecv.
521 """
671 522
672 log.debug(f"Gstreamer pipeline: {self.gst_pipe_desc}") 523 log.debug(f"Gstreamer pipeline: {self.gst_pipe_desc}")
673 524
674 # Create the pipeline 525 # Create the pipeline
675 try: 526 try:
678 log.exception("Can't parse pipeline") 529 log.exception("Can't parse pipeline")
679 self.pipeline = None 530 self.pipeline = None
680 if not self.pipeline: 531 if not self.pipeline:
681 raise exceptions.InternalError("Failed to create Gstreamer pipeline.") 532 raise exceptions.InternalError("Failed to create Gstreamer pipeline.")
682 533
534 if not isinstance(self.pipeline, Gst.Pipeline):
535 # in the case of Data Channel there is a single element, and Gst.parse_launch
536 # doesn't create a Pipeline in this case, so we do it manually.
537 pipeline = Gst.Pipeline()
538 pipeline.add(self.pipeline)
539 self.pipeline = pipeline
540
683 self.webrtcbin = self.pipeline.get_by_name("sendrecv") 541 self.webrtcbin = self.pipeline.get_by_name("sendrecv")
684 self.video_src = self.pipeline.get_by_name("video_src") 542 if self.webrtcbin is None:
685 self.muted_src = self.pipeline.get_by_name("muted_src") 543 raise exceptions.InternalError("Can't get the pipeline.")
686 self.video_selector = self.pipeline.get_by_name("video_selector") 544
687 self.audio_valve = self.pipeline.get_by_name("audio_valve") 545 # For datachannel setups, media source, selector, and sink elements are not
688 546 # created
689 if self.video_muted: 547 if self.sources != SOURCES_DATACHANNEL and self.sinks != SINKS_DATACHANNEL:
690 self.on_video_mute(True) 548 self.video_src = self.pipeline.get_by_name("video_src")
691 if self.audio_muted: 549 self.muted_src = self.pipeline.get_by_name("muted_src")
692 self.on_audio_mute(True) 550 self.video_selector = self.pipeline.get_by_name("video_selector")
551 self.audio_valve = self.pipeline.get_by_name("audio_valve")
552
553 if self.video_muted:
554 self.on_video_mute(True)
555 if self.audio_muted:
556 self.on_audio_mute(True)
693 557
694 # set STUN and TURN servers 558 # set STUN and TURN servers
695 external_disco = data_format.deserialise( 559 external_disco = data_format.deserialise(
696 await self.bridge.external_disco_get("", self.profile), type_check=list 560 await self.bridge.external_disco_get("", self.profile), type_check=list
697 ) 561 )
717 581
718 if not self.webrtcbin.emit("add-turn-server", url): 582 if not self.webrtcbin.emit("add-turn-server", url):
719 log.warning(f"Erreur while adding TURN server {url}") 583 log.warning(f"Erreur while adding TURN server {url}")
720 584
721 # local video feedback 585 # local video feedback
722 if self.sinks == SINKS_APP: 586 if self.sinks == SINKS_APP and self.sources != SOURCES_DATACHANNEL:
723 assert self.appsink_data is not None 587 assert self.appsink_data is not None
724 local_video_sink = self.pipeline.get_by_name("local_video_sink") 588 local_video_sink = self.pipeline.get_by_name("local_video_sink")
725 local_video_sink.set_property("emit-signals", True) 589 local_video_sink.set_property("emit-signals", True)
726 local_video_sink.connect("new-sample", self.appsink_data.local_video_cb) 590 local_video_sink.connect("new-sample", self.appsink_data.local_video_cb)
727 local_video_sink_caps = Gst.Caps.from_string(f"video/x-raw,format=RGB") 591 local_video_sink_caps = Gst.Caps.from_string(f"video/x-raw,format=RGB")
743 "notify::ice-gathering-state", self.on_ice_gathering_state_change 607 "notify::ice-gathering-state", self.on_ice_gathering_state_change
744 ) 608 )
745 self.webrtcbin.connect( 609 self.webrtcbin.connect(
746 "notify::ice-connection-state", self.on_ice_connection_state 610 "notify::ice-connection-state", self.on_ice_connection_state
747 ) 611 )
612
613 if self.sources == SOURCES_DATACHANNEL:
614 # Data channel configuration for compatibility with browser defaults
615 data_channel_options = Gst.Structure.new_empty("data-channel-options")
616 data_channel_options.set_value("ordered", True)
617 data_channel_options.set_value("protocol", "")
618
619 # Create the data channel
620 self.pipeline.set_state(Gst.State.READY)
621 self.data_channel = self.webrtcbin.emit(
622 "create-data-channel", "file", data_channel_options
623 )
624 if self.data_channel is None:
625 log.error("Failed to create data channel")
626 return
627 self.data_channel.connect("on-open", self.dc_open_cb)
628 if self.sinks == SINKS_DATACHANNEL:
629 self.webrtcbin.connect("on-data-channel", self.dc_on_data_channel)
748 630
749 def start_pipeline(self) -> None: 631 def start_pipeline(self) -> None:
750 """Starts the GStreamer pipeline.""" 632 """Starts the GStreamer pipeline."""
751 log.debug("starting the pipeline") 633 log.debug("starting the pipeline")
752 self.pipeline.set_state(Gst.State.PLAYING) 634 self.pipeline.set_state(Gst.State.PLAYING)
811 ) 693 )
812 return False 694 return False
813 elif isinstance(dest, Gst.Element): 695 elif isinstance(dest, Gst.Element):
814 return source.link(dest) 696 return source.link(dest)
815 else: 697 else:
816 log.error(f"Unexpected type for dest: {type(sink)}") 698 log.error(f"Unexpected type for dest: {type(dest)}")
817 return False 699 return False
818 700
819 return True 701 return True
820 702
821 def scaled_dimensions( 703 def scaled_dimensions(
939 capsfilter = Gst.ElementFactory.make("capsfilter") 821 capsfilter = Gst.ElementFactory.make("capsfilter")
940 capsfilter.set_property("caps", adjusted_caps) 822 capsfilter.set_property("caps", adjusted_caps)
941 823
942 self.pipeline.add(q, conv, videoscale, capsfilter) 824 self.pipeline.add(q, conv, videoscale, capsfilter)
943 825
944
945 self.pipeline.sync_children_states() 826 self.pipeline.sync_children_states()
946 ret = pad.link(q.get_static_pad("sink")) 827 ret = pad.link(q.get_static_pad("sink"))
947 if ret != Gst.PadLinkReturn.OK: 828 if ret != Gst.PadLinkReturn.OK:
948 log.error(f"Error linking pad: {ret}") 829 log.error(f"Error linking pad: {ret}")
949 q.link(conv) 830 q.link(conv)
995 decodebin.connect("pad-added", self.on_remote_decodebin_stream) 876 decodebin.connect("pad-added", self.on_remote_decodebin_stream)
996 self.pipeline.add(decodebin) 877 self.pipeline.add(decodebin)
997 decodebin.sync_state_with_parent() 878 decodebin.sync_state_with_parent()
998 pad.link(decodebin.get_static_pad("sink")) 879 pad.link(decodebin.get_static_pad("sink"))
999 880
881 async def _call_start(self, callee: jid.JID, call_data: dict, profile: str) -> str:
882 return await self.bridge.call_start(
883 str(self.callee), data_format.serialise({"sdp": self.offer}), self.profile
884 )
885
1000 async def _start_call(self) -> None: 886 async def _start_call(self) -> None:
1001 """Initiate the call. 887 """Initiate the call.
1002 888
1003 Initiates a call with the callee using the stored offer. If there are any buffered 889 Initiates a call with the callee using the stored offer. If there are any buffered
1004 local ICE candidates, they are sent as part of the initiation. 890 local ICE candidates, they are sent as part of the initiation.
1005 """ 891 """
1006 assert self.callee 892 assert self.callee
1007 self.sid = await self.bridge.call_start( 893 assert self.call_start_cb is not None
1008 str(self.callee), data_format.serialise({"sdp": self.offer}), self.profile 894 self.sid = await self.call_start_cb(
895 self.callee, {"sdp": self.offer}, self.profile
1009 ) 896 )
1010 if self.local_candidates_buffer: 897 if self.local_candidates_buffer:
1011 log.debug( 898 log.debug(
1012 f"sending buffered local ICE candidates: {self.local_candidates_buffer}" 899 f"sending buffered local ICE candidates: {self.local_candidates_buffer}"
1013 ) 900 )
1081 """ 968 """
1082 log.debug( 969 log.debug(
1083 f"Local ICE candidate. MLine Index: {mline_index}, Candidate: {candidate_sdp}" 970 f"Local ICE candidate. MLine Index: {mline_index}, Candidate: {candidate_sdp}"
1084 ) 971 )
1085 parsed_candidate = self.parse_ice_candidate(candidate_sdp) 972 parsed_candidate = self.parse_ice_candidate(candidate_sdp)
973 if parsed_candidate is None:
974 log.warning(f"Can't parse candidate: {candidate_sdp}")
975 return
1086 try: 976 try:
1087 media_type = self.media_types[mline_index] 977 media_type = self.media_types[mline_index]
1088 except KeyError: 978 except KeyError:
1089 raise exceptions.InternalError("can't find media type") 979 raise exceptions.InternalError("can't find media type")
1090 980
1127 try: 1017 try:
1128 mline_index = self.get_sdp_mline_index(media_type) 1018 mline_index = self.get_sdp_mline_index(media_type)
1129 except Exception as e: 1019 except Exception as e:
1130 raise exceptions.InternalError(f"Can't find sdp mline index: {e}") 1020 raise exceptions.InternalError(f"Can't find sdp mline index: {e}")
1131 self.webrtcbin.emit("add-ice-candidate", mline_index, candidate_sdp) 1021 self.webrtcbin.emit("add-ice-candidate", mline_index, candidate_sdp)
1132 log.debug( 1022 log.warning(
1133 f"Remote ICE candidate added. MLine Index: {mline_index}, " 1023 f"Remote ICE candidate added. MLine Index: {mline_index}, "
1134 f"{candidate_sdp}" 1024 f"{candidate_sdp}"
1135 ) 1025 )
1136 1026
1137 def on_ice_gathering_state_change(self, pspec, __): 1027 def on_ice_gathering_state_change(self, pspec, __):
1176 """Handles (un)muting of video. 1066 """Handles (un)muting of video.
1177 1067
1178 @param muted: True if video is muted. 1068 @param muted: True if video is muted.
1179 """ 1069 """
1180 if self.video_selector is not None: 1070 if self.video_selector is not None:
1181 current_source = None if muted else "desktop" if self.desktop_sharing else "video" 1071 current_source = (
1072 None if muted else "desktop" if self.desktop_sharing else "video"
1073 )
1182 self.switch_video_source(current_source) 1074 self.switch_video_source(current_source)
1183 state = "muted" if muted else "unmuted" 1075 state = "muted" if muted else "unmuted"
1184 log.info(f"Video is now {state}") 1076 log.info(f"Video is now {state}")
1185 1077
1186 def on_desktop_switch(self, desktop_active: bool) -> None: 1078 def on_desktop_switch(self, desktop_active: bool) -> None:
1199 try: 1091 try:
1200 screenshare_data = await self.desktop_portal.request_screenshare() 1092 screenshare_data = await self.desktop_portal.request_screenshare()
1201 except exceptions.CancelError: 1093 except exceptions.CancelError:
1202 self.desktop_sharing = False 1094 self.desktop_sharing = False
1203 return 1095 return
1204 self.desktop_sharing_data = { 1096 self.desktop_sharing_data = {"path": str(screenshare_data["node_id"])}
1205 "path": str(screenshare_data["node_id"])
1206 }
1207 self.do_desktop_switch(desktop_active) 1097 self.do_desktop_switch(desktop_active)
1208 1098
1209 def do_desktop_switch(self, desktop_active: bool) -> None: 1099 def do_desktop_switch(self, desktop_active: bool) -> None:
1210 if self.video_muted: 1100 if self.video_muted:
1211 # Update the active source state but do not switch 1101 # Update the active source state but do not switch
1214 1104
1215 source = "desktop" if desktop_active else "video" 1105 source = "desktop" if desktop_active else "video"
1216 self.switch_video_source(source) 1106 self.switch_video_source(source)
1217 self.desktop_sharing = desktop_active 1107 self.desktop_sharing = desktop_active
1218 1108
1219 def switch_video_source(self, source: str|None) -> None: 1109 def switch_video_source(self, source: str | None) -> None:
1220 """Activates the specified source while deactivating the others. 1110 """Activates the specified source while deactivating the others.
1221 1111
1222 @param source: 'desktop', 'video', 'muted' or None for muted source. 1112 @param source: 'desktop', 'video', 'muted' or None for muted source.
1223 """ 1113 """
1224 if source is None: 1114 if source is None:
1250 # Set the video_selector active pad 1140 # Set the video_selector active pad
1251 if source == "desktop": 1141 if source == "desktop":
1252 if self.desktop_sink_pad: 1142 if self.desktop_sink_pad:
1253 pad = self.desktop_sink_pad 1143 pad = self.desktop_sink_pad
1254 else: 1144 else:
1255 log.error(f"No desktop pad available") 1145 log.error(f"No desktop pad available")
1256 pad = None 1146 pad = None
1257 else: 1147 else:
1258 pad_name = f"sink_{['video', 'muted'].index(source)}" 1148 pad_name = f"sink_{['video', 'muted'].index(source)}"
1259 pad = self.video_selector.get_static_pad(pad_name) 1149 pad = self.video_selector.get_static_pad(pad_name)
1260 1150
1261 if pad is not None: 1151 if pad is not None:
1262 self.video_selector.props.active_pad = pad 1152 self.video_selector.props.active_pad = pad
1263 1153
1264 self.pipeline.set_state(Gst.State.PLAYING) 1154 self.pipeline.set_state(Gst.State.PLAYING)
1265 1155
1266 def _setup_desktop_source(self, properties: dict[str, object]|None) -> None: 1156 def _setup_desktop_source(self, properties: dict[str, object] | None) -> None:
1267 """Set up a new desktop source. 1157 """Set up a new desktop source.
1268 1158
1269 @param properties: The properties to set on the desktop source. 1159 @param properties: The properties to set on the desktop source.
1270 """ 1160 """
1271 source_elt = "ximagesrc" if self.desktop_portal is None else "pipewiresrc" 1161 source_elt = "ximagesrc" if self.desktop_portal is None else "pipewiresrc"
1285 1175
1286 desktop_src.link(video_convert) 1176 desktop_src.link(video_convert)
1287 video_convert.link(queue) 1177 video_convert.link(queue)
1288 1178
1289 sink_pad_template = self.video_selector.get_pad_template("sink_%u") 1179 sink_pad_template = self.video_selector.get_pad_template("sink_%u")
1290 self.desktop_sink_pad = self.video_selector.request_pad(sink_pad_template, None, None) 1180 self.desktop_sink_pad = self.video_selector.request_pad(
1181 sink_pad_template, None, None
1182 )
1291 queue_src_pad = queue.get_static_pad("src") 1183 queue_src_pad = queue.get_static_pad("src")
1292 queue_src_pad.link(self.desktop_sink_pad) 1184 queue_src_pad.link(self.desktop_sink_pad)
1293 1185
1294 desktop_src.sync_state_with_parent() 1186 desktop_src.sync_state_with_parent()
1295 video_convert.sync_state_with_parent() 1187 video_convert.sync_state_with_parent()
1325 self.desktop_portal.end_screenshare() 1217 self.desktop_portal.end_screenshare()
1326 1218
1327 async def end_call(self) -> None: 1219 async def end_call(self) -> None:
1328 """Stop streaming and clean instance""" 1220 """Stop streaming and clean instance"""
1329 self.reset_instance() 1221 self.reset_instance()
1222
1223
1224 class WebRTCCall:
1225 """Helper class to create and handle WebRTC.
1226
1227 This class handles signals and communication of connection data with backend.
1228
1229 """
1230
1231 def __init__(
1232 self,
1233 bridge,
1234 profile: str,
1235 callee: jid.JID,
1236 on_call_setup_cb: Callable | None = None,
1237 on_call_ended_cb: Callable | None = None,
1238 **kwargs,
1239 ):
1240 """Create and setup a webRTC instance
1241
1242 @param bridge: async Bridge.
1243 @param profile: profile making or receiving the call
1244 @param callee: peer jid
1245 @param kwargs: extra kw args to use when instantiating WebRTC
1246 """
1247 self.profile = profile
1248 self.webrtc = WebRTC(bridge, profile, **kwargs)
1249 self.webrtc.callee = callee
1250 self.on_call_setup_cb = on_call_setup_cb
1251 self.on_call_ended_cb = on_call_ended_cb
1252 bridge.register_signal(
1253 "ice_candidates_new", self.on_ice_candidates_new, "plugin"
1254 )
1255 bridge.register_signal("call_setup", self.on_call_setup, "plugin")
1256 bridge.register_signal("call_ended", self.on_call_ended, "plugin")
1257
1258 @classmethod
1259 async def make_webrtc_call(
1260 cls, bridge, profile: str, call_data: CallData, **kwargs
1261 ) -> "WebRTCCall":
1262 """Create the webrtc_call instance
1263
1264 @param call_data: Call data of the command
1265 @param kwargs: extra args used to instanciate WebRTCCall
1266
1267 """
1268 webrtc_call = cls(bridge, profile, call_data.callee, **call_data.kwargs, **kwargs)
1269 if call_data.sid is None:
1270 # we are making the call
1271 await webrtc_call.start()
1272 else:
1273 # we are receiving the call
1274 webrtc_call.sid = call_data.sid
1275 if call_data.action_id is not None:
1276 await bridge.action_launch(
1277 call_data.action_id,
1278 data_format.serialise({"cancelled": False}),
1279 profile,
1280 )
1281 return webrtc_call
1282
1283 @property
1284 def sid(self) -> str | None:
1285 return self.webrtc.sid
1286
1287 @sid.setter
1288 def sid(self, new_sid: str | None) -> None:
1289 self.webrtc.sid = new_sid
1290
1291 async def on_ice_candidates_new(
1292 self, sid: str, candidates_s: str, profile: str
1293 ) -> None:
1294 if sid != self.webrtc.sid or profile != self.profile:
1295 return
1296 self.webrtc.on_ice_candidates_new(
1297 data_format.deserialise(candidates_s),
1298 )
1299
1300 async def on_call_setup(self, sid: str, setup_data_s: str, profile: str) -> None:
1301 if sid != self.webrtc.sid or profile != self.profile:
1302 return
1303 setup_data = data_format.deserialise(setup_data_s)
1304 try:
1305 role = setup_data["role"]
1306 sdp = setup_data["sdp"]
1307 except KeyError:
1308 log.error(f"Invalid setup data received: {setup_data}")
1309 return
1310 if role == "initiator":
1311 self.webrtc.on_accepted_call(sdp, profile)
1312 elif role == "responder":
1313 await self.webrtc.answer_call(sdp, profile)
1314 else:
1315 log.error(f"Invalid role received during setup: {setup_data}")
1316 if self.on_call_setup_cb is not None:
1317 await aio.maybe_async(self.on_call_setup_cb(sid, profile))
1318
1319 async def on_call_ended(self, sid: str, data_s: str, profile: str) -> None:
1320 if sid != self.webrtc.sid or profile != self.profile:
1321 return
1322 await self.webrtc.end_call()
1323 if self.on_call_ended_cb is not None:
1324 await aio.maybe_async(self.on_call_ended_cb(sid, profile))
1325
1326 async def start(self):
1327 """Start a call.
1328
1329 To be used only if we are initiator
1330 """
1331 await self.webrtc.setup_call("initiator")
1332 self.webrtc.start_pipeline()