Mercurial > libervia-backend
comparison libervia/frontends/tools/webrtc.py @ 4240:79c8a70e1813
backend, frontend: prepare remote control:
This is a series of changes necessary to prepare the implementation of remote control
feature:
- XEP-0166: add a `priority` attribute to `ApplicationData`: this is needed when several
applications are working in a same session, to know which one must be handled first.
Will be used to make Remote Control have precedence over Call content.
- XEP-0166: `_call_plugins` is now async and is not used with `DeferredList` anymore: the
benefit to have methods called in parallels is very low, and it cause a lot of trouble
as we can't predict order. Methods are now called sequentially so workflow can be
predicted.
- XEP-0167: fix `senders` XMPP attribute <=> SDP mapping
- XEP-0234: preflight acceptance key is now `pre-accepted` instead of `file-accepted`, so
the same key can be used with other jingle applications.
- XEP-0167, XEP-0343: move some method to XEP-0167
- XEP-0353: use new `priority` feature to call preflight methods of applications according
to it.
- frontend (webrtc): refactor the sources/sink handling with a more flexible mechanism
based on Pydantic models. It is now possible to have has many Data Channel as necessary,
to have them in addition to A/V streams, to specify manually GStreamer sources and
sinks, etc.
- frontend (webrtc): rework of the pipeline to reduce latency.
- frontend: new `portal_desktop` method. Screenshare portal handling has been moved there,
and RemoteDesktop portal has been added.
- frontend (webrtc): fix `extract_ufrag_pwd` method.
rel 436
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 11 May 2024 13:52:41 +0200 |
parents | d01b8d002619 |
children | 0d7bb4df2343 |
comparison
equal
deleted
inserted
replaced
4239:a38559e6d6e2 | 4240:79c8a70e1813 |
---|---|
33 ) | 33 ) |
34 import asyncio | 34 import asyncio |
35 from datetime import datetime | 35 from datetime import datetime |
36 import logging | 36 import logging |
37 import re | 37 import re |
38 from typing import Callable | 38 from typing import Callable, Final |
39 from urllib.parse import quote_plus | 39 from urllib.parse import quote_plus |
40 | 40 |
41 from libervia.backend.tools.common import data_format | 41 from libervia.backend.tools.common import data_format |
42 from libervia.frontends.tools import aio, display_servers, jid | 42 from libervia.frontends.tools import aio, display_servers, jid |
43 from .webrtc_models import AppSinkData, CallData | 43 from .webrtc_models import ( |
44 from .webrtc_screenshare import DesktopPortal | 44 CallData, |
45 SinksApp, | |
46 SinksAuto, | |
47 SinksData, | |
48 SinksDataChannel, | |
49 SinksNone, | |
50 SourcesAuto, | |
51 SourcesData, | |
52 SourcesDataChannel, | |
53 SourcesNone, | |
54 SourcesPipeline, | |
55 SourcesTest, | |
56 ) | |
45 | 57 |
46 current_server = display_servers.detect() | 58 current_server = display_servers.detect() |
47 if current_server == display_servers.X11: | 59 if current_server == display_servers.X11: |
48 # GSTreamer's ximagesrc documentation asks to run this function | 60 # GSTreamer's ximagesrc documentation asks to run this function |
49 import ctypes | 61 import ctypes |
50 | 62 |
51 ctypes.CDLL("libX11.so.6").XInitThreads() | 63 ctypes.CDLL("libX11.so.6").XInitThreads() |
52 | 64 |
53 | 65 |
54 log = logging.getLogger(__name__) | 66 log = logging.getLogger(__name__) |
67 VIDEO_SOURCE_AUTO: Final = "v4l2src" | |
68 AUDIO_SOURCE_AUTO: Final = "pulsesrc" | |
69 NONE_NOT_IMPLEMENTED_MSG: Final = "None value is not handled yet." | |
55 | 70 |
56 Gst.init(None) | 71 Gst.init(None) |
57 | |
58 SOURCES_AUTO = "auto" | |
59 SOURCES_TEST = "test" | |
60 SOURCES_DATACHANNEL = "datachannel" | |
61 SINKS_APP = "app" | |
62 SINKS_AUTO = "auto" | |
63 SINKS_TEST = "test" | |
64 SINKS_DATACHANNEL = "datachannel" | |
65 | 72 |
66 | 73 |
67 class WebRTC: | 74 class WebRTC: |
68 """GSTreamer based WebRTC implementation for audio and video communication. | 75 """GSTreamer based WebRTC implementation for audio and video communication. |
69 | 76 |
73 | 80 |
74 def __init__( | 81 def __init__( |
75 self, | 82 self, |
76 bridge, | 83 bridge, |
77 profile: str, | 84 profile: str, |
78 sources: str = SOURCES_AUTO, | 85 sources_data: SourcesData|None = None, |
79 sinks: str = SINKS_AUTO, | 86 sinks_data: SinksData|None = None, |
80 appsink_data: AppSinkData | None = None, | |
81 reset_cb: Callable | None = None, | 87 reset_cb: Callable | None = None, |
82 merge_pip: bool | None = None, | 88 merge_pip: bool | None = None, |
83 target_size: tuple[int, int] | None = None, | 89 target_size: tuple[int, int] | None = None, |
84 call_start_cb: Callable[[str, dict, str], Awaitable[str]] | None = None, | 90 call_start_cb: Callable[[str, dict, str], Awaitable[str]] | None = None, |
85 dc_open_cb: ( | 91 dc_data_list: list[SourcesDataChannel|SinksDataChannel]|None = None |
86 Callable[[GstWebRTC.WebRTCDataChannel], Awaitable[None]] | None | |
87 ) = None, | |
88 dc_on_data_channel: ( | |
89 Callable[[GstWebRTC.WebRTCDataChannel], Awaitable[None]] | None | |
90 ) = None, | |
91 ) -> None: | 92 ) -> None: |
92 """Initializes a new WebRTC instance. | 93 """Initializes a new WebRTC instance. |
93 | 94 |
94 @param bridge: An instance of backend bridge. | 95 @param bridge: An instance of backend bridge. |
95 @param profile: Libervia profile. | 96 @param profile: Libervia profile. |
96 @param sources: Which kind of source to use. | 97 @param sources_data: Data of the sources. |
97 @param sinks: Which kind of sinks to use. | 98 The model used will determine which sources to use. |
98 @param appsink_data: configuration data for appsink (when SINKS_APP is used). Must | 99 SourcesDataChannel can be used here as a convenience. It will then be moved |
99 not be used for other sinks. | 100 to ``data_channels`` and ``SourcesNone`` will be used instead for |
101 ``sources_data``. | |
102 If None, SourcesAuto will be used. | |
103 @param sinks_data: Data of the sinks. | |
104 The model used will determine which sinks to use. | |
105 SinksDataChannel can be used here as a convenience. It will then be moved | |
106 to ``data_channels`` and ``SinksNone`` will be used instead for | |
107 ``sinks_data``. | |
108 If None, SinksAuto will be used. | |
100 @param reset_cb: An optional Callable that is triggered on reset events. Can be | 109 @param reset_cb: An optional Callable that is triggered on reset events. Can be |
101 used to reset UI data on new calls. | 110 used to reset UI data on new calls. |
102 @param merge_pip: A boolean flag indicating whether Picture-in-Picture mode is | 111 @param merge_pip: A boolean flag indicating whether Picture-in-Picture mode is |
103 enabled. When PiP is used, local feedback is merged to remote video stream. | 112 enabled. When PiP is used, local feedback is merged to remote video stream. |
104 Only one video stream is then produced (the local one). | 113 Only one video stream is then produced (the local one). |
105 If None, PiP mode is selected automatically according to selected sink (it's | 114 If None, PiP mode is selected automatically according to selected sink (it's |
106 used for SINKS_AUTO only for now). | 115 used for SinksAuto only for now). |
107 @param target_size: Expected size of the final sink stream. Mainly use by composer | 116 @param target_size: Expected size of the final sink stream. Mainly use by composer |
108 when ``merge_pip`` is set. | 117 when ``merge_pip`` is set. |
109 None to autodetect (not real autodetection implemeted yet, default to | 118 None to autodetect (no real autodetection implemeted yet, default to |
110 (1280,720)). | 119 (1280,720)). |
111 @param call_start_cb: Called when call is started. | 120 @param call_start_cb: Called when call is started. |
112 @param dc_open_cb: Called when Data Channel is open (for SOURCES_DATACHANNEL). | 121 @param dc_data_list: Data Channels to create. |
113 This callback will be run in a GStreamer thread. | 122 If a SourcesDataChannel is used as ``sources_data``, or a SinksDataChannel is |
114 @param dc_open_cb: Called when Data Channel is created (for SINKS_DATACHANNEL). | 123 used as ``sinks_data``, they will be automatically added to this list. |
115 This callback will be run in a GStreamer thread. | |
116 """ | 124 """ |
117 self.main_loop = asyncio.get_event_loop() | 125 self.main_loop = asyncio.get_event_loop() |
118 self.bridge = bridge | 126 self.bridge = bridge |
119 self.profile = profile | 127 self.profile = profile |
120 self.pipeline = None | 128 self.pipeline = None |
121 self._audio_muted = False | 129 self._audio_muted = False |
122 self._video_muted = False | 130 self._video_muted = False |
123 self._desktop_sharing = False | 131 self._desktop_sharing = False |
124 self.desktop_sharing_data = None | 132 self.desktop_sharing_data = None |
125 self.sources = sources | 133 if dc_data_list is None: |
126 self.sinks = sinks | 134 dc_data_list = [] |
135 self.dc_data_list = dc_data_list | |
136 if sources_data is None: | |
137 sources_data = SourcesAuto() | |
138 elif isinstance(sources_data, SourcesDataChannel): | |
139 dc_data_list.append(sources_data) | |
140 sources_data = SourcesNone() | |
141 self.sources_data = sources_data | |
142 if sinks_data is None: | |
143 sinks_data = SinksAuto() | |
144 elif isinstance(sinks_data, SinksDataChannel): | |
145 dc_data_list.append(sinks_data) | |
146 sinks_data = SinksNone() | |
147 self.sinks_data = sinks_data | |
127 if target_size is None: | 148 if target_size is None: |
128 target_size = (1280, 720) | 149 target_size = (1280, 720) |
129 self.target_width, self.target_height = target_size | 150 self.target_width, self.target_height = target_size |
130 if merge_pip is None: | 151 if merge_pip is None: |
131 merge_pip = sinks == SINKS_AUTO | 152 merge_pip = isinstance(sinks_data, SinksAuto) |
132 self.merge_pip = merge_pip | 153 self.merge_pip = merge_pip |
133 if call_start_cb is None: | 154 if call_start_cb is None: |
134 call_start_cb = self._call_start | 155 call_start_cb = self._call_start |
135 self.call_start_cb = call_start_cb | 156 self.call_start_cb = call_start_cb |
136 if sources == SOURCES_DATACHANNEL: | 157 if isinstance(sinks_data, SinksApp): |
137 assert dc_open_cb is not None | 158 if merge_pip and sinks_data.remote_video_cb is not None: |
138 self.dc_open_cb = dc_open_cb | |
139 if sinks == SINKS_DATACHANNEL: | |
140 assert dc_on_data_channel is not None | |
141 self.dc_on_data_channel = dc_on_data_channel | |
142 if sinks == SINKS_APP: | |
143 if ( | |
144 merge_pip | |
145 and appsink_data is not None | |
146 and appsink_data.remote_video_cb is not None | |
147 ): | |
148 raise ValueError("Remote_video_cb can't be used when merge_pip is used!") | 159 raise ValueError("Remote_video_cb can't be used when merge_pip is used!") |
149 self.appsink_data = appsink_data | |
150 elif appsink_data is not None: | |
151 raise exceptions.InternalError( | |
152 "appsink_data can only be used for SINKS_APP sinks" | |
153 ) | |
154 self.reset_cb = reset_cb | 160 self.reset_cb = reset_cb |
155 if current_server == display_servers.WAYLAND: | 161 if current_server == display_servers.WAYLAND: |
156 self.desktop_portal = DesktopPortal(self) | 162 from .portal_desktop import DesktopPortal |
163 | |
164 self.desktop_portal = DesktopPortal( | |
165 on_session_closed_cb=self.on_portal_session_closed | |
166 ) | |
157 else: | 167 else: |
158 self.desktop_portal = None | 168 self.desktop_portal = None |
159 self.reset_instance() | 169 self.reset_instance() |
160 | 170 |
161 @property | 171 @property |
368 if parsed_candidate.get("generation"): | 378 if parsed_candidate.get("generation"): |
369 base_format += " generation {generation}" | 379 base_format += " generation {generation}" |
370 | 380 |
371 return base_format.format(**parsed_candidate) | 381 return base_format.format(**parsed_candidate) |
372 | 382 |
373 def extract_ufrag_pwd(self, sdp: str) -> tuple[str, str]: | 383 def extract_ufrag_pwd(self, sdp: str) -> None: |
374 """Retrieves ICE password and user fragment for SDP offer. | 384 """Retrieves ICE password and user fragment for SDP offer. |
375 | 385 |
376 @param sdp: The Session Description Protocol offer string. | 386 @param sdp: The Session Description Protocol offer string. |
377 @return: ufrag and pwd | 387 """ |
378 @raise ValueError: Can't extract ufrag and password | 388 lines = sdp.splitlines() |
379 """ | 389 media = '' |
380 ufrag_line = re.search(r"ice-ufrag:(\S+)", sdp) | 390 mid_media_map = {} |
381 pwd_line = re.search(r"ice-pwd:(\S+)", sdp) | 391 bundle_media = set() |
382 | 392 bundle_ufrag = '' |
383 if ufrag_line and pwd_line: | 393 bundle_pwd = '' |
384 ufrag = self.ufrag = ufrag_line.group(1) | 394 in_bundle = False |
385 pwd = self.pwd = pwd_line.group(1) | 395 |
386 return ufrag, pwd | 396 for line in lines: |
387 else: | 397 if line.startswith('m='): |
388 log.error(f"SDP with missing ice-ufrag or ice-pwd:\n{sdp}") | 398 media = line.split('=')[1].split()[0] |
389 raise ValueError("Can't extract ice-ufrag and ice-pwd from SDP") | 399 elif line.startswith('a=mid:'): |
400 mid = line.split(':')[1].strip() | |
401 mid_media_map[mid] = media | |
402 elif line.startswith('a=group:BUNDLE'): | |
403 in_bundle = True | |
404 bundle_media = set(line.split(':')[1].strip().split()) | |
405 elif line.startswith('a=ice-ufrag:'): | |
406 if in_bundle: | |
407 bundle_ufrag = line.split(':')[1].strip() | |
408 else: | |
409 self.ufrag[media] = line.split(':')[1].strip() | |
410 elif line.startswith('a=ice-pwd:'): | |
411 if in_bundle: | |
412 bundle_pwd = line.split(':')[1].strip() | |
413 else: | |
414 self.pwd[media] = line.split(':')[1].strip() | |
415 else: | |
416 in_bundle = False | |
417 | |
418 if bundle_ufrag and bundle_pwd: | |
419 for mid in bundle_media: | |
420 media = mid_media_map[mid] | |
421 self.ufrag[media] = bundle_ufrag | |
422 self.pwd[media] = bundle_pwd | |
390 | 423 |
391 def reset_instance(self): | 424 def reset_instance(self): |
392 """Inits or resets the instance variables to their default state.""" | 425 """Inits or resets the instance variables to their default state.""" |
393 self.role: str | None = None | 426 self.role: str | None = None |
394 if self.pipeline is not None: | 427 if self.pipeline is not None: |
396 self.pipeline = None | 429 self.pipeline = None |
397 self._remote_video_pad = None | 430 self._remote_video_pad = None |
398 self.sid: str | None = None | 431 self.sid: str | None = None |
399 self.offer: str | None = None | 432 self.offer: str | None = None |
400 self.local_candidates_buffer = {} | 433 self.local_candidates_buffer = {} |
401 self.ufrag: str | None = None | 434 self.ufrag: dict[str, str] = {} |
402 self.pwd: str | None = None | 435 self.pwd: dict[str, str] = {} |
403 self.callee: jid.JID | None = None | 436 self.callee: jid.JID | None = None |
404 self._media_types = None | 437 self._media_types = None |
405 self._media_types_inv = None | 438 self._media_types_inv = None |
406 self._sdp_set: bool = False | 439 self._sdp_set: bool = False |
407 self.remote_candidates_buffer: dict[str, dict[str, list]] = { | 440 self.remote_candidates_buffer: dict[str, dict[str, list]] = { |
410 } | 443 } |
411 self._media_types = None | 444 self._media_types = None |
412 self._media_types_inv = None | 445 self._media_types_inv = None |
413 self.audio_valve = None | 446 self.audio_valve = None |
414 self.video_valve = None | 447 self.video_valve = None |
448 self.video_selector = None | |
415 if self.desktop_portal is not None: | 449 if self.desktop_portal is not None: |
416 self.desktop_portal.end_screenshare() | 450 self.desktop_portal.end_session() |
417 self.desktop_sharing = False | 451 self.desktop_sharing = False |
418 self.desktop_sink_pad = None | 452 self.desktop_sink_pad = None |
419 self.bindings = {} | 453 self.bindings = {} |
420 if self.reset_cb is not None: | 454 if self.reset_cb is not None: |
421 self.reset_cb() | 455 self.reset_cb() |
456 self.data_channels: dict[str, GstWebRTC.WebRTCDataChannel] = {} | |
457 | |
458 @property | |
459 def data_channel(self) -> GstWebRTC.WebRTCDataChannel: | |
460 """Convenience method to get WebRTCDataChannel instance when there is only one.""" | |
461 if len(self.data_channels) != 1: | |
462 raise exceptions.InternalError( | |
463 "self.data_channel can only be used in a single Data Channel scenario. " | |
464 "Use self.data_channels dict instead." | |
465 ) | |
466 return next(iter(self.data_channels.values())) | |
422 | 467 |
423 async def setup_call( | 468 async def setup_call( |
424 self, | 469 self, |
425 role: str, | 470 role: str, |
426 audio_pt: int | None = 96, | 471 audio_pt: int | None = 96, |
440 @raises AssertionError: If the role is not 'initiator' or 'responder'. | 485 @raises AssertionError: If the role is not 'initiator' or 'responder'. |
441 """ | 486 """ |
442 assert role in ("initiator", "responder") | 487 assert role in ("initiator", "responder") |
443 self.role = role | 488 self.role = role |
444 | 489 |
445 if self.sources == SOURCES_DATACHANNEL or self.sinks == SINKS_DATACHANNEL: | 490 |
446 # Setup pipeline for datachannel only, no media streams. | 491 if isinstance(self.sources_data, SourcesPipeline): |
447 self.gst_pipe_desc = f""" | 492 if self.sources_data.video_pipeline!= "" and video_pt is None: |
448 webrtcbin name=sendrecv bundle-policy=max-bundle | 493 raise NotImplementedError(NONE_NOT_IMPLEMENTED_MSG) |
449 """ | 494 if self.sources_data.audio_pipeline!= "" and audio_pt is None: |
495 raise NotImplementedError(NONE_NOT_IMPLEMENTED_MSG) | |
496 elif isinstance(self.sources_data, SourcesNone): | |
497 pass | |
450 else: | 498 else: |
451 if audio_pt is None or video_pt is None: | 499 if audio_pt is None or video_pt is None: |
452 raise NotImplementedError("None value is not handled yet") | 500 raise NotImplementedError(NONE_NOT_IMPLEMENTED_MSG) |
453 | 501 |
454 if self.sources == SOURCES_AUTO: | 502 match self.sources_data: |
455 video_source_elt = "v4l2src" | 503 case SourcesAuto(): |
456 audio_source_elt = "pulsesrc" | 504 video_source_elt = VIDEO_SOURCE_AUTO |
457 elif self.sources == SOURCES_TEST: | 505 audio_source_elt = AUDIO_SOURCE_AUTO |
506 case SourcesNone(): | |
507 video_source_elt = "" | |
508 audio_source_elt = "" | |
509 case SourcesPipeline() as source: | |
510 if source.video_pipeline is None: | |
511 video_source_elt = VIDEO_SOURCE_AUTO | |
512 else: | |
513 video_source_elt = source.video_pipeline | |
514 if source.audio_pipeline is None: | |
515 audio_source_elt = AUDIO_SOURCE_AUTO | |
516 else: | |
517 audio_source_elt = source.audio_pipeline | |
518 case SourcesTest(): | |
458 video_source_elt = "videotestsrc is-live=true pattern=ball" | 519 video_source_elt = "videotestsrc is-live=true pattern=ball" |
459 audio_source_elt = "audiotestsrc" | 520 audio_source_elt = "audiotestsrc" |
460 else: | 521 case _: |
461 raise exceptions.InternalError( | 522 raise exceptions.InternalError( |
462 f'Unknown "sources" value: {self.sources!r}' | 523 f'Unexpected "sources_data" value: {self.sources_data!r}' |
463 ) | 524 ) |
464 | 525 |
465 if self.sinks == SINKS_APP: | 526 match self.sinks_data: |
527 case SinksApp(): | |
466 local_video_sink_elt = ( | 528 local_video_sink_elt = ( |
467 "appsink name=local_video_sink emit-signals=true drop=true max-buffers=1 " | 529 "appsink name=local_video_sink emit-signals=true drop=true " |
468 "sync=True" | 530 "max-buffers=1 sync=True" |
469 ) | 531 ) |
470 elif self.sinks == SINKS_AUTO: | 532 case SinksAuto(): |
471 local_video_sink_elt = "autovideosink" | 533 if isinstance(self.sources_data, SourcesNone): |
472 else: | 534 local_video_sink_elt = "" |
473 raise exceptions.InternalError(f"Unknown sinks value {self.sinks!r}") | 535 else: |
474 | 536 local_video_sink_elt = "autovideosink" |
475 if self.merge_pip: | 537 case SinksNone(): |
476 extra_elt = ( | 538 local_video_sink_elt = "" |
477 "compositor name=compositor background=black " | 539 case _: |
478 f"! video/x-raw,width={self.target_width},height={self.target_height}," | 540 raise exceptions.InternalError( |
479 "framerate=30/1 " | 541 f'Unexpected "sinks_data" value {self.sinks_data!r}' |
480 f"! {local_video_sink_elt}" | |
481 ) | 542 ) |
482 local_video_sink_elt = "compositor.sink_1" | 543 |
483 else: | 544 gst_pipe_elements = [ |
484 extra_elt = "" | 545 "webrtcbin latency=30 name=sendrecv bundle-policy=max-bundle" |
485 | 546 ] |
486 self.gst_pipe_desc = f""" | 547 |
487 webrtcbin latency=100 name=sendrecv bundle-policy=max-bundle | 548 if self.merge_pip and local_video_sink_elt: |
488 | 549 # Compositor is used to merge local video feedback in video sink, useful when |
489 input-selector name=video_selector | 550 # we have only a single video sink. |
490 ! videorate | 551 gst_pipe_elements.append( |
491 ! video/x-raw,framerate=30/1 | 552 "compositor name=compositor background=black " |
492 ! tee name=t | 553 f"! video/x-raw,width={self.target_width}," |
493 | 554 f"height={self.target_height},framerate=30/1 " |
494 {extra_elt} | 555 f"! {local_video_sink_elt}" |
495 | 556 ) |
496 {video_source_elt} name=video_src ! queue leaky=downstream ! video_selector. | 557 local_video_sink_elt = "compositor.sink_1" |
497 videotestsrc name=muted_src is-live=true pattern=black ! queue leaky=downstream ! video_selector. | 558 |
498 | 559 if video_source_elt: |
499 t. | 560 # Video source with an input-selector to switch between normal and video mute |
500 ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream | 561 # (or desktop sharing). |
501 ! videoconvert | 562 gst_pipe_elements.append(f""" |
502 ! vp8enc deadline=1 keyframe-max-dist=60 | 563 input-selector name=video_selector |
503 ! rtpvp8pay picture-id-mode=15-bit | 564 ! videorate drop-only=1 max-rate=30 |
504 ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt} | 565 ! video/x-raw,framerate=30/1 |
505 ! sendrecv. | 566 ! tee name=t |
506 | 567 |
507 t. | 568 {video_source_elt} name=video_src |
508 ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream | 569 ! queue max-size-buffers=1 max-size-time=0 max-size-bytes=0 leaky=downstream |
509 ! videoconvert | 570 ! video_selector. |
510 ! {local_video_sink_elt} | 571 |
511 | 572 videotestsrc name=muted_src is-live=true pattern=black |
512 {audio_source_elt} name=audio_src | 573 ! queue leaky=downstream |
513 ! valve | 574 ! video_selector. |
514 ! queue max-size-buffers=10 max-size-time=0 max-size-bytes=0 leaky=downstream | 575 |
515 ! audioconvert | 576 t. |
516 ! audioresample | 577 ! queue max-size-buffers=1 max-size-time=0 max-size-bytes=0 leaky=downstream |
517 ! opusenc audio-type=voice | 578 ! videoscale |
518 ! rtpopuspay | 579 ! videoconvert |
519 ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt} | 580 ! vp8enc deadline=1 keyframe-max-dist=30 |
520 ! sendrecv. | 581 ! rtpvp8pay picture-id-mode=15-bit |
521 """ | 582 ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt} |
583 ! sendrecv. | |
584 """) | |
585 | |
586 if local_video_sink_elt: | |
587 # Local video feedback. | |
588 gst_pipe_elements.append(f""" | |
589 t. | |
590 ! queue max-size-buffers=1 max-size-time=0 max-size-bytes=0 leaky=downstream | |
591 ! videoconvert | |
592 ! {local_video_sink_elt} | |
593 """) | |
594 | |
595 if audio_source_elt: | |
596 # Audio with a valve for muting. | |
597 gst_pipe_elements.append(r""" | |
598 {audio_source_elt} name=audio_src | |
599 ! valve | |
600 ! queue max-size-buffers=10 max-size-time=0 max-size-bytes=0 leaky=downstream | |
601 ! audioconvert | |
602 ! audioresample | |
603 ! opusenc audio-type=voice | |
604 ! rtpopuspay | |
605 ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt} | |
606 ! sendrecv. | |
607 """) | |
608 | |
609 self.gst_pipe_desc = "\n\n".join(gst_pipe_elements) | |
522 | 610 |
523 log.debug(f"Gstreamer pipeline: {self.gst_pipe_desc}") | 611 log.debug(f"Gstreamer pipeline: {self.gst_pipe_desc}") |
524 | 612 |
525 # Create the pipeline | 613 # Create the pipeline |
526 try: | 614 try: |
540 | 628 |
541 self.webrtcbin = self.pipeline.get_by_name("sendrecv") | 629 self.webrtcbin = self.pipeline.get_by_name("sendrecv") |
542 if self.webrtcbin is None: | 630 if self.webrtcbin is None: |
543 raise exceptions.InternalError("Can't get the pipeline.") | 631 raise exceptions.InternalError("Can't get the pipeline.") |
544 | 632 |
545 # For datachannel setups, media source, selector, and sink elements are not | 633 # If video or audio sources are not created, ``get_by_name`` will return None. |
546 # created | 634 self.video_src = self.pipeline.get_by_name("video_src") |
547 if self.sources != SOURCES_DATACHANNEL and self.sinks != SINKS_DATACHANNEL: | 635 self.muted_src = self.pipeline.get_by_name("muted_src") |
548 self.video_src = self.pipeline.get_by_name("video_src") | 636 self.video_selector = self.pipeline.get_by_name("video_selector") |
549 self.muted_src = self.pipeline.get_by_name("muted_src") | 637 if self.video_src and isinstance(self.sources_data, SourcesPipeline): |
550 self.video_selector = self.pipeline.get_by_name("video_selector") | 638 for name, value in self.sources_data.video_properties.items(): |
551 self.audio_valve = self.pipeline.get_by_name("audio_valve") | 639 self.video_src.set_property(name, value) |
552 | 640 |
553 if self.video_muted: | 641 self.audio_src = self.pipeline.get_by_name("audio_src") |
554 self.on_video_mute(True) | 642 if self.audio_src and isinstance(self.sources_data, SourcesPipeline): |
555 if self.audio_muted: | 643 for name, value in self.sources_data.audio_properties.items(): |
556 self.on_audio_mute(True) | 644 self.audio_src.set_property(name, value) |
645 | |
646 self.audio_valve = self.pipeline.get_by_name("audio_valve") | |
647 | |
648 if self.video_muted: | |
649 self.on_video_mute(True) | |
650 if self.audio_muted: | |
651 self.on_audio_mute(True) | |
557 | 652 |
558 # set STUN and TURN servers | 653 # set STUN and TURN servers |
559 external_disco = data_format.deserialise( | 654 external_disco = data_format.deserialise( |
560 await self.bridge.external_disco_get("", self.profile), type_check=list | 655 await self.bridge.external_disco_get("", self.profile), type_check=list |
561 ) | 656 ) |
581 | 676 |
582 if not self.webrtcbin.emit("add-turn-server", url): | 677 if not self.webrtcbin.emit("add-turn-server", url): |
583 log.warning(f"Erreur while adding TURN server {url}") | 678 log.warning(f"Erreur while adding TURN server {url}") |
584 | 679 |
585 # local video feedback | 680 # local video feedback |
586 if self.sinks == SINKS_APP and self.sources != SOURCES_DATACHANNEL: | 681 if isinstance(self.sinks_data, SinksApp): |
587 assert self.appsink_data is not None | |
588 local_video_sink = self.pipeline.get_by_name("local_video_sink") | 682 local_video_sink = self.pipeline.get_by_name("local_video_sink") |
589 local_video_sink.set_property("emit-signals", True) | 683 if local_video_sink is not None: |
590 local_video_sink.connect("new-sample", self.appsink_data.local_video_cb) | 684 local_video_sink.set_property("emit-signals", True) |
591 local_video_sink_caps = Gst.Caps.from_string(f"video/x-raw,format=RGB") | 685 local_video_sink.connect("new-sample", self.sinks_data.local_video_cb) |
592 local_video_sink.set_property("caps", local_video_sink_caps) | 686 local_video_sink_caps = Gst.Caps.from_string(f"video/x-raw,format=RGB") |
687 local_video_sink.set_property("caps", local_video_sink_caps) | |
593 | 688 |
594 # Create bus and associate signal watchers | 689 # Create bus and associate signal watchers |
595 self.bus = self.pipeline.get_bus() | 690 self.bus = self.pipeline.get_bus() |
596 if not self.bus: | 691 if not self.bus: |
597 log.error("Failed to get bus from pipeline.") | 692 log.error("Failed to get bus from pipeline.") |
608 ) | 703 ) |
609 self.webrtcbin.connect( | 704 self.webrtcbin.connect( |
610 "notify::ice-connection-state", self.on_ice_connection_state | 705 "notify::ice-connection-state", self.on_ice_connection_state |
611 ) | 706 ) |
612 | 707 |
613 if self.sources == SOURCES_DATACHANNEL: | 708 for dc_data in self.dc_data_list: |
709 self.create_data_channel(dc_data) | |
710 | |
711 def create_data_channel(self, dc_data: SourcesDataChannel|SinksDataChannel) -> None: | |
712 """Create a Data Channel and connect relevant callbacks.""" | |
713 assert self.pipeline is not None | |
714 if isinstance(dc_data, SourcesDataChannel): | |
614 # Data channel configuration for compatibility with browser defaults | 715 # Data channel configuration for compatibility with browser defaults |
615 data_channel_options = Gst.Structure.new_empty("data-channel-options") | 716 data_channel_options = Gst.Structure.new_empty("data-channel-options") |
616 data_channel_options.set_value("ordered", True) | 717 data_channel_options.set_value("ordered", True) |
617 data_channel_options.set_value("protocol", "") | 718 data_channel_options.set_value("protocol", "") |
618 | 719 |
619 # Create the data channel | 720 # Create the data channel |
620 self.pipeline.set_state(Gst.State.READY) | 721 self.pipeline.set_state(Gst.State.READY) |
621 self.data_channel = self.webrtcbin.emit( | 722 self.data_channels[dc_data.name] = data_channel = self.webrtcbin.emit( |
622 "create-data-channel", "file", data_channel_options | 723 "create-data-channel", dc_data.name, data_channel_options |
623 ) | 724 ) |
624 if self.data_channel is None: | 725 if data_channel is None: |
625 log.error("Failed to create data channel") | 726 log.error("Failed to create data channel") |
626 return | 727 return |
627 self.data_channel.connect("on-open", self.dc_open_cb) | 728 data_channel.connect("on-open", dc_data.dc_open_cb) |
628 if self.sinks == SINKS_DATACHANNEL: | 729 elif isinstance(dc_data, SinksDataChannel): |
629 self.webrtcbin.connect("on-data-channel", self.dc_on_data_channel) | 730 self.webrtcbin.connect("on-data-channel", dc_data.dc_on_data_channel) |
731 else: | |
732 raise ValueError( | |
733 "Only SourcesDataChannel or SinksDataChannel are allowed." | |
734 ) | |
630 | 735 |
631 def start_pipeline(self) -> None: | 736 def start_pipeline(self) -> None: |
632 """Starts the GStreamer pipeline.""" | 737 """Starts the GStreamer pipeline.""" |
633 log.debug("starting the pipeline") | 738 log.debug("starting the pipeline") |
634 self.pipeline.set_state(Gst.State.PLAYING) | 739 self.pipeline.set_state(Gst.State.PLAYING) |
788 remote_video_sink = compositor.request_pad(sink_pad_template, None, None) | 893 remote_video_sink = compositor.request_pad(sink_pad_template, None, None) |
789 remote_video_sink.set_property("zorder", 0) | 894 remote_video_sink.set_property("zorder", 0) |
790 remote_video_sink.set_property("width", width) | 895 remote_video_sink.set_property("width", width) |
791 remote_video_sink.set_property("height", height) | 896 remote_video_sink.set_property("height", height) |
792 remote_video_sink.set_property("sizing-policy", 1) | 897 remote_video_sink.set_property("sizing-policy", 1) |
793 elif self.sinks == SINKS_APP: | 898 elif isinstance(self.sinks_data, SinksApp): |
794 # ``app`` sink without ``self.merge_pip`` set, be create the sink and | 899 # ``app`` sink without ``self.merge_pip`` set, be create the sink and |
795 # connect it to the ``remote_video_cb``. | 900 # connect it to the ``remote_video_cb``. |
796 assert self.appsink_data is not None | |
797 remote_video_sink = Gst.ElementFactory.make("appsink") | 901 remote_video_sink = Gst.ElementFactory.make("appsink") |
798 | 902 |
799 remote_video_caps = Gst.Caps.from_string("video/x-raw,format=RGB") | 903 remote_video_caps = Gst.Caps.from_string("video/x-raw,format=RGB") |
800 remote_video_sink.set_property("caps", remote_video_caps) | 904 remote_video_sink.set_property("caps", remote_video_caps) |
801 | 905 |
802 remote_video_sink.set_property("emit-signals", True) | 906 remote_video_sink.set_property("emit-signals", True) |
803 remote_video_sink.set_property("drop", True) | 907 remote_video_sink.set_property("drop", True) |
804 remote_video_sink.set_property("max-buffers", 1) | 908 remote_video_sink.set_property("max-buffers", 1) |
805 remote_video_sink.set_property("sync", True) | 909 remote_video_sink.set_property("sync", True) |
806 remote_video_sink.connect("new-sample", self.appsink_data.remote_video_cb) | 910 remote_video_sink.connect("new-sample", self.sinks_data.remote_video_cb) |
807 self.pipeline.add(remote_video_sink) | 911 self.pipeline.add(remote_video_sink) |
808 elif self.sinks == SINKS_AUTO: | 912 elif isinstance(self.sinks_data, SinksAuto): |
809 # if ``self.merge_pip`` is not set, we create a dedicated | 913 # if ``self.merge_pip`` is not set, we create a dedicated |
810 # ``autovideosink`` for remote stream. | 914 # ``autovideosink`` for remote stream. |
811 remote_video_sink = Gst.ElementFactory.make("autovideosink") | 915 remote_video_sink = Gst.ElementFactory.make("autovideosink") |
812 self.pipeline.add(remote_video_sink) | 916 self.pipeline.add(remote_video_sink) |
813 else: | 917 else: |
814 raise exceptions.InternalError(f'Unhandled "sinks" value: {self.sinks!r}') | 918 raise exceptions.InternalError( |
919 f'Unhandled "sinks_data" value: {self.sinks_data!r}' | |
920 ) | |
815 | 921 |
816 if adjust_resolution: | 922 if adjust_resolution: |
817 videoscale = Gst.ElementFactory.make("videoscale") | 923 videoscale = Gst.ElementFactory.make("videoscale") |
818 adjusted_caps = Gst.Caps.from_string( | 924 adjusted_caps = Gst.Caps.from_string( |
819 f"video/x-raw,width={width},height={height}" | 925 f"video/x-raw,width={width},height={height}" |
896 ) | 1002 ) |
897 if self.local_candidates_buffer: | 1003 if self.local_candidates_buffer: |
898 log.debug( | 1004 log.debug( |
899 f"sending buffered local ICE candidates: {self.local_candidates_buffer}" | 1005 f"sending buffered local ICE candidates: {self.local_candidates_buffer}" |
900 ) | 1006 ) |
901 if self.pwd is None: | 1007 if not self.pwd: |
902 sdp = self.webrtcbin.props.local_description.sdp.as_text() | 1008 sdp = self.webrtcbin.props.local_description.sdp.as_text() |
903 self.extract_ufrag_pwd(sdp) | 1009 self.extract_ufrag_pwd(sdp) |
904 ice_data = {} | 1010 ice_data = {} |
905 for media_type, candidates in self.local_candidates_buffer.items(): | 1011 for media_type, candidates in self.local_candidates_buffer.items(): |
906 ice_data[media_type] = { | 1012 ice_data[media_type] = { |
907 "ufrag": self.ufrag, | 1013 "ufrag": self.ufrag[media_type], |
908 "pwd": self.pwd, | 1014 "pwd": self.pwd[media_type], |
909 "candidates": candidates, | 1015 "candidates": candidates, |
910 } | 1016 } |
911 await self.bridge.ice_candidates_add( | 1017 await self.bridge.ice_candidates_add( |
912 self.sid, data_format.serialise(ice_data), self.profile | 1018 self.sid, data_format.serialise(ice_data), self.profile |
913 ) | 1019 ) |
947 payload_types = self.get_payload_types( | 1053 payload_types = self.get_payload_types( |
948 offer_sdp_msg, video_encoding="VP8", audio_encoding="OPUS" | 1054 offer_sdp_msg, video_encoding="VP8", audio_encoding="OPUS" |
949 ) | 1055 ) |
950 assert "VP8" in payload_types | 1056 assert "VP8" in payload_types |
951 assert "OPUS" in payload_types | 1057 assert "OPUS" in payload_types |
952 await self.setup_call( | 1058 try: |
953 "responder", audio_pt=payload_types["OPUS"], video_pt=payload_types["VP8"] | 1059 await self.setup_call( |
954 ) | 1060 "responder", audio_pt=payload_types["OPUS"], video_pt=payload_types["VP8"] |
1061 ) | |
1062 except Exception: | |
1063 log.exception("Can't setup call") | |
1064 raise | |
955 self.start_pipeline() | 1065 self.start_pipeline() |
956 offer = GstWebRTC.WebRTCSessionDescription.new( | 1066 offer = GstWebRTC.WebRTCSessionDescription.new( |
957 GstWebRTC.WebRTCSDPType.OFFER, offer_sdp_msg | 1067 GstWebRTC.WebRTCSDPType.OFFER, offer_sdp_msg |
958 ) | 1068 ) |
959 promise = Gst.Promise.new_with_change_func(self.on_offer_set) | 1069 promise = Gst.Promise.new_with_change_func(self.on_offer_set) |
984 parsed_candidate | 1094 parsed_candidate |
985 ) | 1095 ) |
986 else: | 1096 else: |
987 sdp = self.webrtcbin.props.local_description.sdp.as_text() | 1097 sdp = self.webrtcbin.props.local_description.sdp.as_text() |
988 assert sdp is not None | 1098 assert sdp is not None |
989 ufrag, pwd = self.extract_ufrag_pwd(sdp) | 1099 self.extract_ufrag_pwd(sdp) |
990 ice_data = {"ufrag": ufrag, "pwd": pwd, "candidates": [parsed_candidate]} | 1100 ice_data = { |
1101 "ufrag": self.ufrag[media_type], | |
1102 "pwd": self.pwd[media_type], | |
1103 "candidates": [parsed_candidate] | |
1104 } | |
991 self._a_call( | 1105 self._a_call( |
992 self.bridge.ice_candidates_add, | 1106 self.bridge.ice_candidates_add, |
993 self.sid, | 1107 self.sid, |
994 data_format.serialise({media_type: ice_data}), | 1108 data_format.serialise({media_type: ice_data}), |
995 self.profile, | 1109 self.profile, |
1094 self.desktop_sharing = False | 1208 self.desktop_sharing = False |
1095 return | 1209 return |
1096 self.desktop_sharing_data = {"path": str(screenshare_data["node_id"])} | 1210 self.desktop_sharing_data = {"path": str(screenshare_data["node_id"])} |
1097 self.do_desktop_switch(desktop_active) | 1211 self.do_desktop_switch(desktop_active) |
1098 | 1212 |
1213 def on_portal_session_closed(self) -> None: | |
1214 self.desktop_sharing = False | |
1215 | |
1099 def do_desktop_switch(self, desktop_active: bool) -> None: | 1216 def do_desktop_switch(self, desktop_active: bool) -> None: |
1100 if self.video_muted: | 1217 if self.video_muted: |
1101 # Update the active source state but do not switch | 1218 # Update the active source state but do not switch |
1102 self.desktop_sharing = desktop_active | 1219 self.desktop_sharing = desktop_active |
1103 return | 1220 return |
1212 if self.desktop_sink_pad: | 1329 if self.desktop_sink_pad: |
1213 self.video_selector.release_request_pad(self.desktop_sink_pad) | 1330 self.video_selector.release_request_pad(self.desktop_sink_pad) |
1214 self.desktop_sink_pad = None | 1331 self.desktop_sink_pad = None |
1215 | 1332 |
1216 if self.desktop_portal is not None: | 1333 if self.desktop_portal is not None: |
1217 self.desktop_portal.end_screenshare() | 1334 self.desktop_portal.end_session() |
1218 | 1335 |
1219 async def end_call(self) -> None: | 1336 async def end_call(self) -> None: |
1220 """Stop streaming and clean instance""" | 1337 """Stop streaming and clean instance""" |
1221 self.reset_instance() | 1338 self.reset_instance() |
1222 | 1339 |
1247 self.profile = profile | 1364 self.profile = profile |
1248 self.webrtc = WebRTC(bridge, profile, **kwargs) | 1365 self.webrtc = WebRTC(bridge, profile, **kwargs) |
1249 self.webrtc.callee = callee | 1366 self.webrtc.callee = callee |
1250 self.on_call_setup_cb = on_call_setup_cb | 1367 self.on_call_setup_cb = on_call_setup_cb |
1251 self.on_call_ended_cb = on_call_ended_cb | 1368 self.on_call_ended_cb = on_call_ended_cb |
1252 bridge.register_signal( | 1369 bridge.register_signal("ice_candidates_new", self.on_ice_candidates_new, "plugin") |
1253 "ice_candidates_new", self.on_ice_candidates_new, "plugin" | |
1254 ) | |
1255 bridge.register_signal("call_setup", self.on_call_setup, "plugin") | 1370 bridge.register_signal("call_setup", self.on_call_setup, "plugin") |
1256 bridge.register_signal("call_ended", self.on_call_ended, "plugin") | 1371 bridge.register_signal("call_ended", self.on_call_ended, "plugin") |
1257 | 1372 |
1258 @classmethod | 1373 @classmethod |
1259 async def make_webrtc_call( | 1374 async def make_webrtc_call( |
1326 async def start(self): | 1441 async def start(self): |
1327 """Start a call. | 1442 """Start a call. |
1328 | 1443 |
1329 To be used only if we are initiator | 1444 To be used only if we are initiator |
1330 """ | 1445 """ |
1331 await self.webrtc.setup_call("initiator") | 1446 try: |
1447 await self.webrtc.setup_call("initiator") | |
1448 except Exception: | |
1449 log.exception("Can't setup call") | |
1450 raise | |
1332 self.webrtc.start_pipeline() | 1451 self.webrtc.start_pipeline() |