comparison libervia/frontends/tools/webrtc.py @ 4240:79c8a70e1813

backend, frontend: prepare remote control: This is a series of changes necessary to prepare the implementation of remote control feature: - XEP-0166: add a `priority` attribute to `ApplicationData`: this is needed when several applications are working in a same session, to know which one must be handled first. Will be used to make Remote Control have precedence over Call content. - XEP-0166: `_call_plugins` is now async and is not used with `DeferredList` anymore: the benefit to have methods called in parallels is very low, and it cause a lot of trouble as we can't predict order. Methods are now called sequentially so workflow can be predicted. - XEP-0167: fix `senders` XMPP attribute <=> SDP mapping - XEP-0234: preflight acceptance key is now `pre-accepted` instead of `file-accepted`, so the same key can be used with other jingle applications. - XEP-0167, XEP-0343: move some method to XEP-0167 - XEP-0353: use new `priority` feature to call preflight methods of applications according to it. - frontend (webrtc): refactor the sources/sink handling with a more flexible mechanism based on Pydantic models. It is now possible to have has many Data Channel as necessary, to have them in addition to A/V streams, to specify manually GStreamer sources and sinks, etc. - frontend (webrtc): rework of the pipeline to reduce latency. - frontend: new `portal_desktop` method. Screenshare portal handling has been moved there, and RemoteDesktop portal has been added. - frontend (webrtc): fix `extract_ufrag_pwd` method. rel 436
author Goffi <goffi@goffi.org>
date Sat, 11 May 2024 13:52:41 +0200
parents d01b8d002619
children 0d7bb4df2343
comparison
equal deleted inserted replaced
4239:a38559e6d6e2 4240:79c8a70e1813
33 ) 33 )
34 import asyncio 34 import asyncio
35 from datetime import datetime 35 from datetime import datetime
36 import logging 36 import logging
37 import re 37 import re
38 from typing import Callable 38 from typing import Callable, Final
39 from urllib.parse import quote_plus 39 from urllib.parse import quote_plus
40 40
41 from libervia.backend.tools.common import data_format 41 from libervia.backend.tools.common import data_format
42 from libervia.frontends.tools import aio, display_servers, jid 42 from libervia.frontends.tools import aio, display_servers, jid
43 from .webrtc_models import AppSinkData, CallData 43 from .webrtc_models import (
44 from .webrtc_screenshare import DesktopPortal 44 CallData,
45 SinksApp,
46 SinksAuto,
47 SinksData,
48 SinksDataChannel,
49 SinksNone,
50 SourcesAuto,
51 SourcesData,
52 SourcesDataChannel,
53 SourcesNone,
54 SourcesPipeline,
55 SourcesTest,
56 )
45 57
46 current_server = display_servers.detect() 58 current_server = display_servers.detect()
47 if current_server == display_servers.X11: 59 if current_server == display_servers.X11:
48 # GSTreamer's ximagesrc documentation asks to run this function 60 # GSTreamer's ximagesrc documentation asks to run this function
49 import ctypes 61 import ctypes
50 62
51 ctypes.CDLL("libX11.so.6").XInitThreads() 63 ctypes.CDLL("libX11.so.6").XInitThreads()
52 64
53 65
54 log = logging.getLogger(__name__) 66 log = logging.getLogger(__name__)
67 VIDEO_SOURCE_AUTO: Final = "v4l2src"
68 AUDIO_SOURCE_AUTO: Final = "pulsesrc"
69 NONE_NOT_IMPLEMENTED_MSG: Final = "None value is not handled yet."
55 70
56 Gst.init(None) 71 Gst.init(None)
57
58 SOURCES_AUTO = "auto"
59 SOURCES_TEST = "test"
60 SOURCES_DATACHANNEL = "datachannel"
61 SINKS_APP = "app"
62 SINKS_AUTO = "auto"
63 SINKS_TEST = "test"
64 SINKS_DATACHANNEL = "datachannel"
65 72
66 73
67 class WebRTC: 74 class WebRTC:
68 """GSTreamer based WebRTC implementation for audio and video communication. 75 """GSTreamer based WebRTC implementation for audio and video communication.
69 76
73 80
74 def __init__( 81 def __init__(
75 self, 82 self,
76 bridge, 83 bridge,
77 profile: str, 84 profile: str,
78 sources: str = SOURCES_AUTO, 85 sources_data: SourcesData|None = None,
79 sinks: str = SINKS_AUTO, 86 sinks_data: SinksData|None = None,
80 appsink_data: AppSinkData | None = None,
81 reset_cb: Callable | None = None, 87 reset_cb: Callable | None = None,
82 merge_pip: bool | None = None, 88 merge_pip: bool | None = None,
83 target_size: tuple[int, int] | None = None, 89 target_size: tuple[int, int] | None = None,
84 call_start_cb: Callable[[str, dict, str], Awaitable[str]] | None = None, 90 call_start_cb: Callable[[str, dict, str], Awaitable[str]] | None = None,
85 dc_open_cb: ( 91 dc_data_list: list[SourcesDataChannel|SinksDataChannel]|None = None
86 Callable[[GstWebRTC.WebRTCDataChannel], Awaitable[None]] | None
87 ) = None,
88 dc_on_data_channel: (
89 Callable[[GstWebRTC.WebRTCDataChannel], Awaitable[None]] | None
90 ) = None,
91 ) -> None: 92 ) -> None:
92 """Initializes a new WebRTC instance. 93 """Initializes a new WebRTC instance.
93 94
94 @param bridge: An instance of backend bridge. 95 @param bridge: An instance of backend bridge.
95 @param profile: Libervia profile. 96 @param profile: Libervia profile.
96 @param sources: Which kind of source to use. 97 @param sources_data: Data of the sources.
97 @param sinks: Which kind of sinks to use. 98 The model used will determine which sources to use.
98 @param appsink_data: configuration data for appsink (when SINKS_APP is used). Must 99 SourcesDataChannel can be used here as a convenience. It will then be moved
99 not be used for other sinks. 100 to ``data_channels`` and ``SourcesNone`` will be used instead for
101 ``sources_data``.
102 If None, SourcesAuto will be used.
103 @param sinks_data: Data of the sinks.
104 The model used will determine which sinks to use.
105 SinksDataChannel can be used here as a convenience. It will then be moved
106 to ``data_channels`` and ``SinksNone`` will be used instead for
107 ``sinks_data``.
108 If None, SinksAuto will be used.
100 @param reset_cb: An optional Callable that is triggered on reset events. Can be 109 @param reset_cb: An optional Callable that is triggered on reset events. Can be
101 used to reset UI data on new calls. 110 used to reset UI data on new calls.
102 @param merge_pip: A boolean flag indicating whether Picture-in-Picture mode is 111 @param merge_pip: A boolean flag indicating whether Picture-in-Picture mode is
103 enabled. When PiP is used, local feedback is merged to remote video stream. 112 enabled. When PiP is used, local feedback is merged to remote video stream.
104 Only one video stream is then produced (the local one). 113 Only one video stream is then produced (the local one).
105 If None, PiP mode is selected automatically according to selected sink (it's 114 If None, PiP mode is selected automatically according to selected sink (it's
106 used for SINKS_AUTO only for now). 115 used for SinksAuto only for now).
107 @param target_size: Expected size of the final sink stream. Mainly use by composer 116 @param target_size: Expected size of the final sink stream. Mainly use by composer
108 when ``merge_pip`` is set. 117 when ``merge_pip`` is set.
109 None to autodetect (not real autodetection implemeted yet, default to 118 None to autodetect (no real autodetection implemeted yet, default to
110 (1280,720)). 119 (1280,720)).
111 @param call_start_cb: Called when call is started. 120 @param call_start_cb: Called when call is started.
112 @param dc_open_cb: Called when Data Channel is open (for SOURCES_DATACHANNEL). 121 @param dc_data_list: Data Channels to create.
113 This callback will be run in a GStreamer thread. 122 If a SourcesDataChannel is used as ``sources_data``, or a SinksDataChannel is
114 @param dc_open_cb: Called when Data Channel is created (for SINKS_DATACHANNEL). 123 used as ``sinks_data``, they will be automatically added to this list.
115 This callback will be run in a GStreamer thread.
116 """ 124 """
117 self.main_loop = asyncio.get_event_loop() 125 self.main_loop = asyncio.get_event_loop()
118 self.bridge = bridge 126 self.bridge = bridge
119 self.profile = profile 127 self.profile = profile
120 self.pipeline = None 128 self.pipeline = None
121 self._audio_muted = False 129 self._audio_muted = False
122 self._video_muted = False 130 self._video_muted = False
123 self._desktop_sharing = False 131 self._desktop_sharing = False
124 self.desktop_sharing_data = None 132 self.desktop_sharing_data = None
125 self.sources = sources 133 if dc_data_list is None:
126 self.sinks = sinks 134 dc_data_list = []
135 self.dc_data_list = dc_data_list
136 if sources_data is None:
137 sources_data = SourcesAuto()
138 elif isinstance(sources_data, SourcesDataChannel):
139 dc_data_list.append(sources_data)
140 sources_data = SourcesNone()
141 self.sources_data = sources_data
142 if sinks_data is None:
143 sinks_data = SinksAuto()
144 elif isinstance(sinks_data, SinksDataChannel):
145 dc_data_list.append(sinks_data)
146 sinks_data = SinksNone()
147 self.sinks_data = sinks_data
127 if target_size is None: 148 if target_size is None:
128 target_size = (1280, 720) 149 target_size = (1280, 720)
129 self.target_width, self.target_height = target_size 150 self.target_width, self.target_height = target_size
130 if merge_pip is None: 151 if merge_pip is None:
131 merge_pip = sinks == SINKS_AUTO 152 merge_pip = isinstance(sinks_data, SinksAuto)
132 self.merge_pip = merge_pip 153 self.merge_pip = merge_pip
133 if call_start_cb is None: 154 if call_start_cb is None:
134 call_start_cb = self._call_start 155 call_start_cb = self._call_start
135 self.call_start_cb = call_start_cb 156 self.call_start_cb = call_start_cb
136 if sources == SOURCES_DATACHANNEL: 157 if isinstance(sinks_data, SinksApp):
137 assert dc_open_cb is not None 158 if merge_pip and sinks_data.remote_video_cb is not None:
138 self.dc_open_cb = dc_open_cb
139 if sinks == SINKS_DATACHANNEL:
140 assert dc_on_data_channel is not None
141 self.dc_on_data_channel = dc_on_data_channel
142 if sinks == SINKS_APP:
143 if (
144 merge_pip
145 and appsink_data is not None
146 and appsink_data.remote_video_cb is not None
147 ):
148 raise ValueError("Remote_video_cb can't be used when merge_pip is used!") 159 raise ValueError("Remote_video_cb can't be used when merge_pip is used!")
149 self.appsink_data = appsink_data
150 elif appsink_data is not None:
151 raise exceptions.InternalError(
152 "appsink_data can only be used for SINKS_APP sinks"
153 )
154 self.reset_cb = reset_cb 160 self.reset_cb = reset_cb
155 if current_server == display_servers.WAYLAND: 161 if current_server == display_servers.WAYLAND:
156 self.desktop_portal = DesktopPortal(self) 162 from .portal_desktop import DesktopPortal
163
164 self.desktop_portal = DesktopPortal(
165 on_session_closed_cb=self.on_portal_session_closed
166 )
157 else: 167 else:
158 self.desktop_portal = None 168 self.desktop_portal = None
159 self.reset_instance() 169 self.reset_instance()
160 170
161 @property 171 @property
368 if parsed_candidate.get("generation"): 378 if parsed_candidate.get("generation"):
369 base_format += " generation {generation}" 379 base_format += " generation {generation}"
370 380
371 return base_format.format(**parsed_candidate) 381 return base_format.format(**parsed_candidate)
372 382
373 def extract_ufrag_pwd(self, sdp: str) -> tuple[str, str]: 383 def extract_ufrag_pwd(self, sdp: str) -> None:
374 """Retrieves ICE password and user fragment for SDP offer. 384 """Retrieves ICE password and user fragment for SDP offer.
375 385
376 @param sdp: The Session Description Protocol offer string. 386 @param sdp: The Session Description Protocol offer string.
377 @return: ufrag and pwd 387 """
378 @raise ValueError: Can't extract ufrag and password 388 lines = sdp.splitlines()
379 """ 389 media = ''
380 ufrag_line = re.search(r"ice-ufrag:(\S+)", sdp) 390 mid_media_map = {}
381 pwd_line = re.search(r"ice-pwd:(\S+)", sdp) 391 bundle_media = set()
382 392 bundle_ufrag = ''
383 if ufrag_line and pwd_line: 393 bundle_pwd = ''
384 ufrag = self.ufrag = ufrag_line.group(1) 394 in_bundle = False
385 pwd = self.pwd = pwd_line.group(1) 395
386 return ufrag, pwd 396 for line in lines:
387 else: 397 if line.startswith('m='):
388 log.error(f"SDP with missing ice-ufrag or ice-pwd:\n{sdp}") 398 media = line.split('=')[1].split()[0]
389 raise ValueError("Can't extract ice-ufrag and ice-pwd from SDP") 399 elif line.startswith('a=mid:'):
400 mid = line.split(':')[1].strip()
401 mid_media_map[mid] = media
402 elif line.startswith('a=group:BUNDLE'):
403 in_bundle = True
404 bundle_media = set(line.split(':')[1].strip().split())
405 elif line.startswith('a=ice-ufrag:'):
406 if in_bundle:
407 bundle_ufrag = line.split(':')[1].strip()
408 else:
409 self.ufrag[media] = line.split(':')[1].strip()
410 elif line.startswith('a=ice-pwd:'):
411 if in_bundle:
412 bundle_pwd = line.split(':')[1].strip()
413 else:
414 self.pwd[media] = line.split(':')[1].strip()
415 else:
416 in_bundle = False
417
418 if bundle_ufrag and bundle_pwd:
419 for mid in bundle_media:
420 media = mid_media_map[mid]
421 self.ufrag[media] = bundle_ufrag
422 self.pwd[media] = bundle_pwd
390 423
391 def reset_instance(self): 424 def reset_instance(self):
392 """Inits or resets the instance variables to their default state.""" 425 """Inits or resets the instance variables to their default state."""
393 self.role: str | None = None 426 self.role: str | None = None
394 if self.pipeline is not None: 427 if self.pipeline is not None:
396 self.pipeline = None 429 self.pipeline = None
397 self._remote_video_pad = None 430 self._remote_video_pad = None
398 self.sid: str | None = None 431 self.sid: str | None = None
399 self.offer: str | None = None 432 self.offer: str | None = None
400 self.local_candidates_buffer = {} 433 self.local_candidates_buffer = {}
401 self.ufrag: str | None = None 434 self.ufrag: dict[str, str] = {}
402 self.pwd: str | None = None 435 self.pwd: dict[str, str] = {}
403 self.callee: jid.JID | None = None 436 self.callee: jid.JID | None = None
404 self._media_types = None 437 self._media_types = None
405 self._media_types_inv = None 438 self._media_types_inv = None
406 self._sdp_set: bool = False 439 self._sdp_set: bool = False
407 self.remote_candidates_buffer: dict[str, dict[str, list]] = { 440 self.remote_candidates_buffer: dict[str, dict[str, list]] = {
410 } 443 }
411 self._media_types = None 444 self._media_types = None
412 self._media_types_inv = None 445 self._media_types_inv = None
413 self.audio_valve = None 446 self.audio_valve = None
414 self.video_valve = None 447 self.video_valve = None
448 self.video_selector = None
415 if self.desktop_portal is not None: 449 if self.desktop_portal is not None:
416 self.desktop_portal.end_screenshare() 450 self.desktop_portal.end_session()
417 self.desktop_sharing = False 451 self.desktop_sharing = False
418 self.desktop_sink_pad = None 452 self.desktop_sink_pad = None
419 self.bindings = {} 453 self.bindings = {}
420 if self.reset_cb is not None: 454 if self.reset_cb is not None:
421 self.reset_cb() 455 self.reset_cb()
456 self.data_channels: dict[str, GstWebRTC.WebRTCDataChannel] = {}
457
458 @property
459 def data_channel(self) -> GstWebRTC.WebRTCDataChannel:
460 """Convenience method to get WebRTCDataChannel instance when there is only one."""
461 if len(self.data_channels) != 1:
462 raise exceptions.InternalError(
463 "self.data_channel can only be used in a single Data Channel scenario. "
464 "Use self.data_channels dict instead."
465 )
466 return next(iter(self.data_channels.values()))
422 467
423 async def setup_call( 468 async def setup_call(
424 self, 469 self,
425 role: str, 470 role: str,
426 audio_pt: int | None = 96, 471 audio_pt: int | None = 96,
440 @raises AssertionError: If the role is not 'initiator' or 'responder'. 485 @raises AssertionError: If the role is not 'initiator' or 'responder'.
441 """ 486 """
442 assert role in ("initiator", "responder") 487 assert role in ("initiator", "responder")
443 self.role = role 488 self.role = role
444 489
445 if self.sources == SOURCES_DATACHANNEL or self.sinks == SINKS_DATACHANNEL: 490
446 # Setup pipeline for datachannel only, no media streams. 491 if isinstance(self.sources_data, SourcesPipeline):
447 self.gst_pipe_desc = f""" 492 if self.sources_data.video_pipeline!= "" and video_pt is None:
448 webrtcbin name=sendrecv bundle-policy=max-bundle 493 raise NotImplementedError(NONE_NOT_IMPLEMENTED_MSG)
449 """ 494 if self.sources_data.audio_pipeline!= "" and audio_pt is None:
495 raise NotImplementedError(NONE_NOT_IMPLEMENTED_MSG)
496 elif isinstance(self.sources_data, SourcesNone):
497 pass
450 else: 498 else:
451 if audio_pt is None or video_pt is None: 499 if audio_pt is None or video_pt is None:
452 raise NotImplementedError("None value is not handled yet") 500 raise NotImplementedError(NONE_NOT_IMPLEMENTED_MSG)
453 501
454 if self.sources == SOURCES_AUTO: 502 match self.sources_data:
455 video_source_elt = "v4l2src" 503 case SourcesAuto():
456 audio_source_elt = "pulsesrc" 504 video_source_elt = VIDEO_SOURCE_AUTO
457 elif self.sources == SOURCES_TEST: 505 audio_source_elt = AUDIO_SOURCE_AUTO
506 case SourcesNone():
507 video_source_elt = ""
508 audio_source_elt = ""
509 case SourcesPipeline() as source:
510 if source.video_pipeline is None:
511 video_source_elt = VIDEO_SOURCE_AUTO
512 else:
513 video_source_elt = source.video_pipeline
514 if source.audio_pipeline is None:
515 audio_source_elt = AUDIO_SOURCE_AUTO
516 else:
517 audio_source_elt = source.audio_pipeline
518 case SourcesTest():
458 video_source_elt = "videotestsrc is-live=true pattern=ball" 519 video_source_elt = "videotestsrc is-live=true pattern=ball"
459 audio_source_elt = "audiotestsrc" 520 audio_source_elt = "audiotestsrc"
460 else: 521 case _:
461 raise exceptions.InternalError( 522 raise exceptions.InternalError(
462 f'Unknown "sources" value: {self.sources!r}' 523 f'Unexpected "sources_data" value: {self.sources_data!r}'
463 ) 524 )
464 525
465 if self.sinks == SINKS_APP: 526 match self.sinks_data:
527 case SinksApp():
466 local_video_sink_elt = ( 528 local_video_sink_elt = (
467 "appsink name=local_video_sink emit-signals=true drop=true max-buffers=1 " 529 "appsink name=local_video_sink emit-signals=true drop=true "
468 "sync=True" 530 "max-buffers=1 sync=True"
469 ) 531 )
470 elif self.sinks == SINKS_AUTO: 532 case SinksAuto():
471 local_video_sink_elt = "autovideosink" 533 if isinstance(self.sources_data, SourcesNone):
472 else: 534 local_video_sink_elt = ""
473 raise exceptions.InternalError(f"Unknown sinks value {self.sinks!r}") 535 else:
474 536 local_video_sink_elt = "autovideosink"
475 if self.merge_pip: 537 case SinksNone():
476 extra_elt = ( 538 local_video_sink_elt = ""
477 "compositor name=compositor background=black " 539 case _:
478 f"! video/x-raw,width={self.target_width},height={self.target_height}," 540 raise exceptions.InternalError(
479 "framerate=30/1 " 541 f'Unexpected "sinks_data" value {self.sinks_data!r}'
480 f"! {local_video_sink_elt}"
481 ) 542 )
482 local_video_sink_elt = "compositor.sink_1" 543
483 else: 544 gst_pipe_elements = [
484 extra_elt = "" 545 "webrtcbin latency=30 name=sendrecv bundle-policy=max-bundle"
485 546 ]
486 self.gst_pipe_desc = f""" 547
487 webrtcbin latency=100 name=sendrecv bundle-policy=max-bundle 548 if self.merge_pip and local_video_sink_elt:
488 549 # Compositor is used to merge local video feedback in video sink, useful when
489 input-selector name=video_selector 550 # we have only a single video sink.
490 ! videorate 551 gst_pipe_elements.append(
491 ! video/x-raw,framerate=30/1 552 "compositor name=compositor background=black "
492 ! tee name=t 553 f"! video/x-raw,width={self.target_width},"
493 554 f"height={self.target_height},framerate=30/1 "
494 {extra_elt} 555 f"! {local_video_sink_elt}"
495 556 )
496 {video_source_elt} name=video_src ! queue leaky=downstream ! video_selector. 557 local_video_sink_elt = "compositor.sink_1"
497 videotestsrc name=muted_src is-live=true pattern=black ! queue leaky=downstream ! video_selector. 558
498 559 if video_source_elt:
499 t. 560 # Video source with an input-selector to switch between normal and video mute
500 ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream 561 # (or desktop sharing).
501 ! videoconvert 562 gst_pipe_elements.append(f"""
502 ! vp8enc deadline=1 keyframe-max-dist=60 563 input-selector name=video_selector
503 ! rtpvp8pay picture-id-mode=15-bit 564 ! videorate drop-only=1 max-rate=30
504 ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt} 565 ! video/x-raw,framerate=30/1
505 ! sendrecv. 566 ! tee name=t
506 567
507 t. 568 {video_source_elt} name=video_src
508 ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream 569 ! queue max-size-buffers=1 max-size-time=0 max-size-bytes=0 leaky=downstream
509 ! videoconvert 570 ! video_selector.
510 ! {local_video_sink_elt} 571
511 572 videotestsrc name=muted_src is-live=true pattern=black
512 {audio_source_elt} name=audio_src 573 ! queue leaky=downstream
513 ! valve 574 ! video_selector.
514 ! queue max-size-buffers=10 max-size-time=0 max-size-bytes=0 leaky=downstream 575
515 ! audioconvert 576 t.
516 ! audioresample 577 ! queue max-size-buffers=1 max-size-time=0 max-size-bytes=0 leaky=downstream
517 ! opusenc audio-type=voice 578 ! videoscale
518 ! rtpopuspay 579 ! videoconvert
519 ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt} 580 ! vp8enc deadline=1 keyframe-max-dist=30
520 ! sendrecv. 581 ! rtpvp8pay picture-id-mode=15-bit
521 """ 582 ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt}
583 ! sendrecv.
584 """)
585
586 if local_video_sink_elt:
587 # Local video feedback.
588 gst_pipe_elements.append(f"""
589 t.
590 ! queue max-size-buffers=1 max-size-time=0 max-size-bytes=0 leaky=downstream
591 ! videoconvert
592 ! {local_video_sink_elt}
593 """)
594
595 if audio_source_elt:
596 # Audio with a valve for muting.
597 gst_pipe_elements.append(r"""
598 {audio_source_elt} name=audio_src
599 ! valve
600 ! queue max-size-buffers=10 max-size-time=0 max-size-bytes=0 leaky=downstream
601 ! audioconvert
602 ! audioresample
603 ! opusenc audio-type=voice
604 ! rtpopuspay
605 ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt}
606 ! sendrecv.
607 """)
608
609 self.gst_pipe_desc = "\n\n".join(gst_pipe_elements)
522 610
523 log.debug(f"Gstreamer pipeline: {self.gst_pipe_desc}") 611 log.debug(f"Gstreamer pipeline: {self.gst_pipe_desc}")
524 612
525 # Create the pipeline 613 # Create the pipeline
526 try: 614 try:
540 628
541 self.webrtcbin = self.pipeline.get_by_name("sendrecv") 629 self.webrtcbin = self.pipeline.get_by_name("sendrecv")
542 if self.webrtcbin is None: 630 if self.webrtcbin is None:
543 raise exceptions.InternalError("Can't get the pipeline.") 631 raise exceptions.InternalError("Can't get the pipeline.")
544 632
545 # For datachannel setups, media source, selector, and sink elements are not 633 # If video or audio sources are not created, ``get_by_name`` will return None.
546 # created 634 self.video_src = self.pipeline.get_by_name("video_src")
547 if self.sources != SOURCES_DATACHANNEL and self.sinks != SINKS_DATACHANNEL: 635 self.muted_src = self.pipeline.get_by_name("muted_src")
548 self.video_src = self.pipeline.get_by_name("video_src") 636 self.video_selector = self.pipeline.get_by_name("video_selector")
549 self.muted_src = self.pipeline.get_by_name("muted_src") 637 if self.video_src and isinstance(self.sources_data, SourcesPipeline):
550 self.video_selector = self.pipeline.get_by_name("video_selector") 638 for name, value in self.sources_data.video_properties.items():
551 self.audio_valve = self.pipeline.get_by_name("audio_valve") 639 self.video_src.set_property(name, value)
552 640
553 if self.video_muted: 641 self.audio_src = self.pipeline.get_by_name("audio_src")
554 self.on_video_mute(True) 642 if self.audio_src and isinstance(self.sources_data, SourcesPipeline):
555 if self.audio_muted: 643 for name, value in self.sources_data.audio_properties.items():
556 self.on_audio_mute(True) 644 self.audio_src.set_property(name, value)
645
646 self.audio_valve = self.pipeline.get_by_name("audio_valve")
647
648 if self.video_muted:
649 self.on_video_mute(True)
650 if self.audio_muted:
651 self.on_audio_mute(True)
557 652
558 # set STUN and TURN servers 653 # set STUN and TURN servers
559 external_disco = data_format.deserialise( 654 external_disco = data_format.deserialise(
560 await self.bridge.external_disco_get("", self.profile), type_check=list 655 await self.bridge.external_disco_get("", self.profile), type_check=list
561 ) 656 )
581 676
582 if not self.webrtcbin.emit("add-turn-server", url): 677 if not self.webrtcbin.emit("add-turn-server", url):
583 log.warning(f"Erreur while adding TURN server {url}") 678 log.warning(f"Erreur while adding TURN server {url}")
584 679
585 # local video feedback 680 # local video feedback
586 if self.sinks == SINKS_APP and self.sources != SOURCES_DATACHANNEL: 681 if isinstance(self.sinks_data, SinksApp):
587 assert self.appsink_data is not None
588 local_video_sink = self.pipeline.get_by_name("local_video_sink") 682 local_video_sink = self.pipeline.get_by_name("local_video_sink")
589 local_video_sink.set_property("emit-signals", True) 683 if local_video_sink is not None:
590 local_video_sink.connect("new-sample", self.appsink_data.local_video_cb) 684 local_video_sink.set_property("emit-signals", True)
591 local_video_sink_caps = Gst.Caps.from_string(f"video/x-raw,format=RGB") 685 local_video_sink.connect("new-sample", self.sinks_data.local_video_cb)
592 local_video_sink.set_property("caps", local_video_sink_caps) 686 local_video_sink_caps = Gst.Caps.from_string(f"video/x-raw,format=RGB")
687 local_video_sink.set_property("caps", local_video_sink_caps)
593 688
594 # Create bus and associate signal watchers 689 # Create bus and associate signal watchers
595 self.bus = self.pipeline.get_bus() 690 self.bus = self.pipeline.get_bus()
596 if not self.bus: 691 if not self.bus:
597 log.error("Failed to get bus from pipeline.") 692 log.error("Failed to get bus from pipeline.")
608 ) 703 )
609 self.webrtcbin.connect( 704 self.webrtcbin.connect(
610 "notify::ice-connection-state", self.on_ice_connection_state 705 "notify::ice-connection-state", self.on_ice_connection_state
611 ) 706 )
612 707
613 if self.sources == SOURCES_DATACHANNEL: 708 for dc_data in self.dc_data_list:
709 self.create_data_channel(dc_data)
710
711 def create_data_channel(self, dc_data: SourcesDataChannel|SinksDataChannel) -> None:
712 """Create a Data Channel and connect relevant callbacks."""
713 assert self.pipeline is not None
714 if isinstance(dc_data, SourcesDataChannel):
614 # Data channel configuration for compatibility with browser defaults 715 # Data channel configuration for compatibility with browser defaults
615 data_channel_options = Gst.Structure.new_empty("data-channel-options") 716 data_channel_options = Gst.Structure.new_empty("data-channel-options")
616 data_channel_options.set_value("ordered", True) 717 data_channel_options.set_value("ordered", True)
617 data_channel_options.set_value("protocol", "") 718 data_channel_options.set_value("protocol", "")
618 719
619 # Create the data channel 720 # Create the data channel
620 self.pipeline.set_state(Gst.State.READY) 721 self.pipeline.set_state(Gst.State.READY)
621 self.data_channel = self.webrtcbin.emit( 722 self.data_channels[dc_data.name] = data_channel = self.webrtcbin.emit(
622 "create-data-channel", "file", data_channel_options 723 "create-data-channel", dc_data.name, data_channel_options
623 ) 724 )
624 if self.data_channel is None: 725 if data_channel is None:
625 log.error("Failed to create data channel") 726 log.error("Failed to create data channel")
626 return 727 return
627 self.data_channel.connect("on-open", self.dc_open_cb) 728 data_channel.connect("on-open", dc_data.dc_open_cb)
628 if self.sinks == SINKS_DATACHANNEL: 729 elif isinstance(dc_data, SinksDataChannel):
629 self.webrtcbin.connect("on-data-channel", self.dc_on_data_channel) 730 self.webrtcbin.connect("on-data-channel", dc_data.dc_on_data_channel)
731 else:
732 raise ValueError(
733 "Only SourcesDataChannel or SinksDataChannel are allowed."
734 )
630 735
631 def start_pipeline(self) -> None: 736 def start_pipeline(self) -> None:
632 """Starts the GStreamer pipeline.""" 737 """Starts the GStreamer pipeline."""
633 log.debug("starting the pipeline") 738 log.debug("starting the pipeline")
634 self.pipeline.set_state(Gst.State.PLAYING) 739 self.pipeline.set_state(Gst.State.PLAYING)
788 remote_video_sink = compositor.request_pad(sink_pad_template, None, None) 893 remote_video_sink = compositor.request_pad(sink_pad_template, None, None)
789 remote_video_sink.set_property("zorder", 0) 894 remote_video_sink.set_property("zorder", 0)
790 remote_video_sink.set_property("width", width) 895 remote_video_sink.set_property("width", width)
791 remote_video_sink.set_property("height", height) 896 remote_video_sink.set_property("height", height)
792 remote_video_sink.set_property("sizing-policy", 1) 897 remote_video_sink.set_property("sizing-policy", 1)
793 elif self.sinks == SINKS_APP: 898 elif isinstance(self.sinks_data, SinksApp):
794 # ``app`` sink without ``self.merge_pip`` set, be create the sink and 899 # ``app`` sink without ``self.merge_pip`` set, be create the sink and
795 # connect it to the ``remote_video_cb``. 900 # connect it to the ``remote_video_cb``.
796 assert self.appsink_data is not None
797 remote_video_sink = Gst.ElementFactory.make("appsink") 901 remote_video_sink = Gst.ElementFactory.make("appsink")
798 902
799 remote_video_caps = Gst.Caps.from_string("video/x-raw,format=RGB") 903 remote_video_caps = Gst.Caps.from_string("video/x-raw,format=RGB")
800 remote_video_sink.set_property("caps", remote_video_caps) 904 remote_video_sink.set_property("caps", remote_video_caps)
801 905
802 remote_video_sink.set_property("emit-signals", True) 906 remote_video_sink.set_property("emit-signals", True)
803 remote_video_sink.set_property("drop", True) 907 remote_video_sink.set_property("drop", True)
804 remote_video_sink.set_property("max-buffers", 1) 908 remote_video_sink.set_property("max-buffers", 1)
805 remote_video_sink.set_property("sync", True) 909 remote_video_sink.set_property("sync", True)
806 remote_video_sink.connect("new-sample", self.appsink_data.remote_video_cb) 910 remote_video_sink.connect("new-sample", self.sinks_data.remote_video_cb)
807 self.pipeline.add(remote_video_sink) 911 self.pipeline.add(remote_video_sink)
808 elif self.sinks == SINKS_AUTO: 912 elif isinstance(self.sinks_data, SinksAuto):
809 # if ``self.merge_pip`` is not set, we create a dedicated 913 # if ``self.merge_pip`` is not set, we create a dedicated
810 # ``autovideosink`` for remote stream. 914 # ``autovideosink`` for remote stream.
811 remote_video_sink = Gst.ElementFactory.make("autovideosink") 915 remote_video_sink = Gst.ElementFactory.make("autovideosink")
812 self.pipeline.add(remote_video_sink) 916 self.pipeline.add(remote_video_sink)
813 else: 917 else:
814 raise exceptions.InternalError(f'Unhandled "sinks" value: {self.sinks!r}') 918 raise exceptions.InternalError(
919 f'Unhandled "sinks_data" value: {self.sinks_data!r}'
920 )
815 921
816 if adjust_resolution: 922 if adjust_resolution:
817 videoscale = Gst.ElementFactory.make("videoscale") 923 videoscale = Gst.ElementFactory.make("videoscale")
818 adjusted_caps = Gst.Caps.from_string( 924 adjusted_caps = Gst.Caps.from_string(
819 f"video/x-raw,width={width},height={height}" 925 f"video/x-raw,width={width},height={height}"
896 ) 1002 )
897 if self.local_candidates_buffer: 1003 if self.local_candidates_buffer:
898 log.debug( 1004 log.debug(
899 f"sending buffered local ICE candidates: {self.local_candidates_buffer}" 1005 f"sending buffered local ICE candidates: {self.local_candidates_buffer}"
900 ) 1006 )
901 if self.pwd is None: 1007 if not self.pwd:
902 sdp = self.webrtcbin.props.local_description.sdp.as_text() 1008 sdp = self.webrtcbin.props.local_description.sdp.as_text()
903 self.extract_ufrag_pwd(sdp) 1009 self.extract_ufrag_pwd(sdp)
904 ice_data = {} 1010 ice_data = {}
905 for media_type, candidates in self.local_candidates_buffer.items(): 1011 for media_type, candidates in self.local_candidates_buffer.items():
906 ice_data[media_type] = { 1012 ice_data[media_type] = {
907 "ufrag": self.ufrag, 1013 "ufrag": self.ufrag[media_type],
908 "pwd": self.pwd, 1014 "pwd": self.pwd[media_type],
909 "candidates": candidates, 1015 "candidates": candidates,
910 } 1016 }
911 await self.bridge.ice_candidates_add( 1017 await self.bridge.ice_candidates_add(
912 self.sid, data_format.serialise(ice_data), self.profile 1018 self.sid, data_format.serialise(ice_data), self.profile
913 ) 1019 )
947 payload_types = self.get_payload_types( 1053 payload_types = self.get_payload_types(
948 offer_sdp_msg, video_encoding="VP8", audio_encoding="OPUS" 1054 offer_sdp_msg, video_encoding="VP8", audio_encoding="OPUS"
949 ) 1055 )
950 assert "VP8" in payload_types 1056 assert "VP8" in payload_types
951 assert "OPUS" in payload_types 1057 assert "OPUS" in payload_types
952 await self.setup_call( 1058 try:
953 "responder", audio_pt=payload_types["OPUS"], video_pt=payload_types["VP8"] 1059 await self.setup_call(
954 ) 1060 "responder", audio_pt=payload_types["OPUS"], video_pt=payload_types["VP8"]
1061 )
1062 except Exception:
1063 log.exception("Can't setup call")
1064 raise
955 self.start_pipeline() 1065 self.start_pipeline()
956 offer = GstWebRTC.WebRTCSessionDescription.new( 1066 offer = GstWebRTC.WebRTCSessionDescription.new(
957 GstWebRTC.WebRTCSDPType.OFFER, offer_sdp_msg 1067 GstWebRTC.WebRTCSDPType.OFFER, offer_sdp_msg
958 ) 1068 )
959 promise = Gst.Promise.new_with_change_func(self.on_offer_set) 1069 promise = Gst.Promise.new_with_change_func(self.on_offer_set)
984 parsed_candidate 1094 parsed_candidate
985 ) 1095 )
986 else: 1096 else:
987 sdp = self.webrtcbin.props.local_description.sdp.as_text() 1097 sdp = self.webrtcbin.props.local_description.sdp.as_text()
988 assert sdp is not None 1098 assert sdp is not None
989 ufrag, pwd = self.extract_ufrag_pwd(sdp) 1099 self.extract_ufrag_pwd(sdp)
990 ice_data = {"ufrag": ufrag, "pwd": pwd, "candidates": [parsed_candidate]} 1100 ice_data = {
1101 "ufrag": self.ufrag[media_type],
1102 "pwd": self.pwd[media_type],
1103 "candidates": [parsed_candidate]
1104 }
991 self._a_call( 1105 self._a_call(
992 self.bridge.ice_candidates_add, 1106 self.bridge.ice_candidates_add,
993 self.sid, 1107 self.sid,
994 data_format.serialise({media_type: ice_data}), 1108 data_format.serialise({media_type: ice_data}),
995 self.profile, 1109 self.profile,
1094 self.desktop_sharing = False 1208 self.desktop_sharing = False
1095 return 1209 return
1096 self.desktop_sharing_data = {"path": str(screenshare_data["node_id"])} 1210 self.desktop_sharing_data = {"path": str(screenshare_data["node_id"])}
1097 self.do_desktop_switch(desktop_active) 1211 self.do_desktop_switch(desktop_active)
1098 1212
1213 def on_portal_session_closed(self) -> None:
1214 self.desktop_sharing = False
1215
1099 def do_desktop_switch(self, desktop_active: bool) -> None: 1216 def do_desktop_switch(self, desktop_active: bool) -> None:
1100 if self.video_muted: 1217 if self.video_muted:
1101 # Update the active source state but do not switch 1218 # Update the active source state but do not switch
1102 self.desktop_sharing = desktop_active 1219 self.desktop_sharing = desktop_active
1103 return 1220 return
1212 if self.desktop_sink_pad: 1329 if self.desktop_sink_pad:
1213 self.video_selector.release_request_pad(self.desktop_sink_pad) 1330 self.video_selector.release_request_pad(self.desktop_sink_pad)
1214 self.desktop_sink_pad = None 1331 self.desktop_sink_pad = None
1215 1332
1216 if self.desktop_portal is not None: 1333 if self.desktop_portal is not None:
1217 self.desktop_portal.end_screenshare() 1334 self.desktop_portal.end_session()
1218 1335
1219 async def end_call(self) -> None: 1336 async def end_call(self) -> None:
1220 """Stop streaming and clean instance""" 1337 """Stop streaming and clean instance"""
1221 self.reset_instance() 1338 self.reset_instance()
1222 1339
1247 self.profile = profile 1364 self.profile = profile
1248 self.webrtc = WebRTC(bridge, profile, **kwargs) 1365 self.webrtc = WebRTC(bridge, profile, **kwargs)
1249 self.webrtc.callee = callee 1366 self.webrtc.callee = callee
1250 self.on_call_setup_cb = on_call_setup_cb 1367 self.on_call_setup_cb = on_call_setup_cb
1251 self.on_call_ended_cb = on_call_ended_cb 1368 self.on_call_ended_cb = on_call_ended_cb
1252 bridge.register_signal( 1369 bridge.register_signal("ice_candidates_new", self.on_ice_candidates_new, "plugin")
1253 "ice_candidates_new", self.on_ice_candidates_new, "plugin"
1254 )
1255 bridge.register_signal("call_setup", self.on_call_setup, "plugin") 1370 bridge.register_signal("call_setup", self.on_call_setup, "plugin")
1256 bridge.register_signal("call_ended", self.on_call_ended, "plugin") 1371 bridge.register_signal("call_ended", self.on_call_ended, "plugin")
1257 1372
1258 @classmethod 1373 @classmethod
1259 async def make_webrtc_call( 1374 async def make_webrtc_call(
1326 async def start(self): 1441 async def start(self):
1327 """Start a call. 1442 """Start a call.
1328 1443
1329 To be used only if we are initiator 1444 To be used only if we are initiator
1330 """ 1445 """
1331 await self.webrtc.setup_call("initiator") 1446 try:
1447 await self.webrtc.setup_call("initiator")
1448 except Exception:
1449 log.exception("Can't setup call")
1450 raise
1332 self.webrtc.start_pipeline() 1451 self.webrtc.start_pipeline()