Mercurial > libervia-backend
comparison libervia/frontends/tools/webrtc.py @ 4139:6745c6bd4c7a
frontends (tools): webrtc implementation:
this is a factored implementation usable by all non-web frontends. Sources and Sinks can
be configured easily to use tests source or local webcam/microphone, autosinks or a
`appsink` that the frontend will use.
rel 426
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 01 Nov 2023 14:03:36 +0100 |
parents | |
children | 970b6209526a |
comparison
equal
deleted
inserted
replaced
4138:5de6f3595380 | 4139:6745c6bd4c7a |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia WebRTC implementation | |
4 # Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 import gi | |
20 gi.require_versions({ | |
21 "Gst": "1.0", | |
22 "GstWebRTC": "1.0" | |
23 }) | |
24 from gi.repository import Gst, GstWebRTC, GstSdp | |
25 | |
26 try: | |
27 from gi.overrides import Gst as _ | |
28 except ImportError: | |
29 print( | |
30 "no GStreamer python overrides available, please install relevant pacakges on " | |
31 "your system." | |
32 ) | |
33 import asyncio | |
34 from dataclasses import dataclass | |
35 from datetime import datetime | |
36 import logging | |
37 import re | |
38 from typing import Callable | |
39 from urllib.parse import quote_plus | |
40 | |
41 from libervia.backend.core import exceptions | |
42 from libervia.backend.tools.common import data_format | |
43 from libervia.frontends.tools import aio | |
44 | |
45 | |
46 log = logging.getLogger(__name__) | |
47 | |
48 Gst.init(None) | |
49 | |
50 SOURCES_AUTO = "auto" | |
51 SOURCES_TEST = "test" | |
52 SINKS_APP = "app" | |
53 SINKS_AUTO = "auto" | |
54 SINKS_TEST = "test" | |
55 | |
56 | |
57 @dataclass | |
58 class AppSinkData: | |
59 local_video_cb: Callable | |
60 remote_video_cb: Callable | |
61 | |
62 | |
63 class WebRTC: | |
64 """GSTreamer based WebRTC implementation for audio and video communication. | |
65 | |
66 This class encapsulates the WebRTC functionalities required for initiating and | |
67 handling audio and video calls. | |
68 """ | |
69 | |
70 def __init__( | |
71 self, | |
72 bridge, | |
73 profile: str, | |
74 sources: str = SOURCES_AUTO, | |
75 sinks: str = SINKS_AUTO, | |
76 appsink_data: AppSinkData | None = None, | |
77 reset_cb: Callable | None = None, | |
78 ) -> None: | |
79 self.main_loop = asyncio.get_event_loop() | |
80 self.bridge = bridge | |
81 self.profile = profile | |
82 self.pipeline = None | |
83 self._audio_muted = False | |
84 self._video_muted = False | |
85 self.sources = sources | |
86 self.sinks = sinks | |
87 if sinks == SINKS_APP: | |
88 self.appsink_data = appsink_data | |
89 elif appsink_data is not None: | |
90 raise exceptions.InternalError( | |
91 "appsink_data can only be used for SINKS_APP sinks" | |
92 ) | |
93 self.reset_cb = reset_cb | |
94 self.reset_instance() | |
95 | |
96 @property | |
97 def audio_muted(self): | |
98 return self._audio_muted | |
99 | |
100 @audio_muted.setter | |
101 def audio_muted(self, muted: bool) -> None: | |
102 if muted != self._audio_muted: | |
103 self._audio_muted = muted | |
104 self.on_audio_mute(muted) | |
105 | |
106 @property | |
107 def video_muted(self): | |
108 return self._video_muted | |
109 | |
110 @video_muted.setter | |
111 def video_muted(self, muted: bool) -> None: | |
112 if muted != self._video_muted: | |
113 self._video_muted = muted | |
114 self.on_video_mute(muted) | |
115 | |
116 @property | |
117 def sdp_set(self): | |
118 return self._sdp_set | |
119 | |
120 @sdp_set.setter | |
121 def sdp_set(self, is_set: bool): | |
122 self._sdp_set = is_set | |
123 if is_set: | |
124 self.on_ice_candidates_new(self.remote_candidates_buffer) | |
125 for data in self.remote_candidates_buffer.values(): | |
126 data["candidates"].clear() | |
127 | |
128 @property | |
129 def media_types(self): | |
130 if self._media_types is None: | |
131 raise Exception("self._media_types should not be None!") | |
132 return self._media_types | |
133 | |
134 @media_types.setter | |
135 def media_types(self, new_media_types: dict) -> None: | |
136 self._media_types = new_media_types | |
137 self._media_types_inv = {v: k for k, v in new_media_types.items()} | |
138 | |
139 @property | |
140 def media_types_inv(self) -> dict: | |
141 if self._media_types_inv is None: | |
142 raise Exception("self._media_types_inv should not be None!") | |
143 return self._media_types_inv | |
144 | |
145 def generate_dot_file( | |
146 self, | |
147 filename: str = "pipeline", | |
148 details: Gst.DebugGraphDetails = Gst.DebugGraphDetails.ALL, | |
149 with_timestamp: bool = True, | |
150 bin_: Gst.Bin|None = None, | |
151 ) -> None: | |
152 """Generate Dot File for debugging | |
153 | |
154 ``GST_DEBUG_DUMP_DOT_DIR`` environment variable must be set to destination dir. | |
155 ``dot -Tpng -o <filename>.png <filename>.dot`` can be use to convert to a PNG file. | |
156 See | |
157 https://gstreamer.freedesktop.org/documentation/gstreamer/debugutils.html?gi-language=python#GstDebugGraphDetails | |
158 for details. | |
159 | |
160 @param filename: name of the generated file | |
161 @param details: which details to print | |
162 @param with_timestamp: if True, add a timestamp to filename | |
163 @param bin_: which bin to output. By default, the whole pipeline | |
164 (``self.pipeline``) will be used. | |
165 """ | |
166 if bin_ is None: | |
167 bin_ = self.pipeline | |
168 if with_timestamp: | |
169 timestamp = datetime.now().isoformat(timespec='milliseconds') | |
170 filename = f"{timestamp}_filename" | |
171 | |
172 Gst.debug_bin_to_dot_file(bin_, details, filename) | |
173 | |
174 def get_sdp_mline_index(self, media_type: str) -> int: | |
175 """Gets the sdpMLineIndex for a given media type. | |
176 | |
177 @param media_type: The type of the media. | |
178 """ | |
179 for index, m_type in self.media_types.items(): | |
180 if m_type == media_type: | |
181 return index | |
182 raise ValueError(f"Media type '{media_type}' not found") | |
183 | |
184 def _set_media_types(self, offer_sdp: str) -> None: | |
185 """Sets media types from offer SDP | |
186 | |
187 @param offer: RTC session description containing the offer | |
188 """ | |
189 sdp_lines = offer_sdp.splitlines() | |
190 media_types = {} | |
191 mline_index = 0 | |
192 | |
193 for line in sdp_lines: | |
194 if line.startswith("m="): | |
195 media_types[mline_index] = line[2 : line.find(" ")] | |
196 mline_index += 1 | |
197 | |
198 self.media_types = media_types | |
199 | |
200 def _a_call(self, method, *args, **kwargs): | |
201 """Call an async method in main thread""" | |
202 aio.run_from_thread(method, *args, **kwargs, loop=self.main_loop) | |
203 | |
204 def get_payload_types( | |
205 self, sdpmsg, video_encoding: str, audio_encoding: str | |
206 ) -> dict[str, int | None]: | |
207 """Find the payload types for the specified video and audio encoding. | |
208 | |
209 Very simplistically finds the first payload type matching the encoding | |
210 name. More complex applications will want to match caps on | |
211 profile-level-id, packetization-mode, etc. | |
212 """ | |
213 # method coming from gstreamer example (Matthew Waters, Nirbheek Chauhan) at | |
214 # subprojects/gst-examples/webrtc/sendrecv/gst/webrtc_sendrecv.py | |
215 video_pt = None | |
216 audio_pt = None | |
217 for i in range(0, sdpmsg.medias_len()): | |
218 media = sdpmsg.get_media(i) | |
219 for j in range(0, media.formats_len()): | |
220 fmt = media.get_format(j) | |
221 if fmt == "webrtc-datachannel": | |
222 continue | |
223 pt = int(fmt) | |
224 caps = media.get_caps_from_media(pt) | |
225 s = caps.get_structure(0) | |
226 encoding_name = s["encoding-name"] | |
227 if video_pt is None and encoding_name == video_encoding: | |
228 video_pt = pt | |
229 elif audio_pt is None and encoding_name == audio_encoding: | |
230 audio_pt = pt | |
231 return {video_encoding: video_pt, audio_encoding: audio_pt} | |
232 | |
233 def parse_ice_candidate(self, candidate_string): | |
234 """Parses the ice candidate string. | |
235 | |
236 @param candidate_string: The ice candidate string to be parsed. | |
237 """ | |
238 pattern = re.compile( | |
239 r"candidate:(?P<foundation>\S+) (?P<component_id>\d+) (?P<transport>\S+) " | |
240 r"(?P<priority>\d+) (?P<address>\S+) (?P<port>\d+) typ " | |
241 r"(?P<type>\S+)(?: raddr (?P<rel_addr>\S+) rport " | |
242 r"(?P<rel_port>\d+))?(?: generation (?P<generation>\d+))?" | |
243 ) | |
244 match = pattern.match(candidate_string) | |
245 if match: | |
246 candidate_dict = match.groupdict() | |
247 | |
248 # Apply the correct types to the dictionary values | |
249 candidate_dict["component_id"] = int(candidate_dict["component_id"]) | |
250 candidate_dict["priority"] = int(candidate_dict["priority"]) | |
251 candidate_dict["port"] = int(candidate_dict["port"]) | |
252 | |
253 if candidate_dict["rel_port"]: | |
254 candidate_dict["rel_port"] = int(candidate_dict["rel_port"]) | |
255 | |
256 if candidate_dict["generation"]: | |
257 candidate_dict["generation"] = candidate_dict["generation"] | |
258 | |
259 # Remove None values | |
260 return {k: v for k, v in candidate_dict.items() if v is not None} | |
261 else: | |
262 log.warning(f"can't parse candidate: {candidate_string!r}") | |
263 return None | |
264 | |
265 def build_ice_candidate(self, parsed_candidate): | |
266 """Builds ICE candidate | |
267 | |
268 @param parsed_candidate: Dictionary containing parsed ICE candidate | |
269 """ | |
270 base_format = ( | |
271 "candidate:{foundation} {component_id} {transport} {priority} " | |
272 "{address} {port} typ {type}" | |
273 ) | |
274 | |
275 if parsed_candidate.get("rel_addr") and parsed_candidate.get("rel_port"): | |
276 base_format += " raddr {rel_addr} rport {rel_port}" | |
277 | |
278 if parsed_candidate.get("generation"): | |
279 base_format += " generation {generation}" | |
280 | |
281 return base_format.format(**parsed_candidate) | |
282 | |
283 def extract_ufrag_pwd(self, sdp: str) -> tuple[str, str]: | |
284 """Retrieves ICE password and user fragment for SDP offer. | |
285 | |
286 @param sdp: The Session Description Protocol offer string. | |
287 @return: ufrag and pwd | |
288 @raise ValueError: Can't extract ufrag and password | |
289 """ | |
290 ufrag_line = re.search(r"ice-ufrag:(\S+)", sdp) | |
291 pwd_line = re.search(r"ice-pwd:(\S+)", sdp) | |
292 | |
293 if ufrag_line and pwd_line: | |
294 ufrag = self.ufrag = ufrag_line.group(1) | |
295 pwd = self.pwd = pwd_line.group(1) | |
296 return ufrag, pwd | |
297 else: | |
298 log.error(f"SDP with missing ice-ufrag or ice-pwd:\n{sdp}") | |
299 raise ValueError("Can't extract ice-ufrag and ice-pwd from SDP") | |
300 | |
301 def reset_instance(self): | |
302 """Inits or resets the instance variables to their default state.""" | |
303 self.role: str | None = None | |
304 if self.pipeline is not None: | |
305 self.pipeline.set_state(Gst.State.NULL) | |
306 self.pipeline = None | |
307 self._remote_video_pad = None | |
308 self.sid: str | None = None | |
309 self.offer: str | None = None | |
310 self.local_candidates_buffer = {} | |
311 self.ufrag: str | None = None | |
312 self.pwd: str | None = None | |
313 self.callee: str | None = None | |
314 self._media_types = None | |
315 self._media_types_inv = None | |
316 self._sdp_set: bool = False | |
317 self.remote_candidates_buffer: dict[str, dict[str, list]] = { | |
318 "audio": {"candidates": []}, | |
319 "video": {"candidates": []}, | |
320 } | |
321 self._media_types = None | |
322 self._media_types_inv = None | |
323 self.audio_valve = None | |
324 self.video_valve = None | |
325 if self.reset_cb is not None: | |
326 self.reset_cb() | |
327 | |
328 | |
329 async def setup_call( | |
330 self, | |
331 role: str, | |
332 audio_pt: int | None = 96, | |
333 video_pt: int | None = 97, | |
334 ) -> None: | |
335 """Sets up the call. | |
336 | |
337 This method establishes the Gstreamer pipeline for audio and video communication. | |
338 The method also manages STUN and TURN server configurations, signal watchers, and | |
339 various connection handlers for the webrtcbin. | |
340 | |
341 @param role: The role for the call, either 'initiator' or 'responder'. | |
342 @param audio_pt: The payload type for the audio stream. | |
343 @param video_pt: The payload type for the video stream | |
344 | |
345 @raises NotImplementedError: If audio_pt or video_pt is set to None. | |
346 @raises AssertionError: If the role is not 'initiator' or 'responder'. | |
347 """ | |
348 assert role in ("initiator", "responder") | |
349 self.role = role | |
350 if audio_pt is None or video_pt is None: | |
351 raise NotImplementedError("None value is not handled yet") | |
352 | |
353 if self.sources == SOURCES_AUTO: | |
354 video_source_elt = "v4l2src" | |
355 audio_source_elt = "pulsesrc" | |
356 elif self.sources == SOURCES_TEST: | |
357 video_source_elt = "videotestsrc is-live=true pattern=ball" | |
358 audio_source_elt = "audiotestsrc" | |
359 else: | |
360 raise exceptions.InternalError(f'Unknown "sources" value: {self.sources!r}') | |
361 | |
362 extra_elt = "" | |
363 | |
364 if self.sinks == SINKS_APP: | |
365 local_video_sink_elt = ( | |
366 "appsink name=local_video_sink emit-signals=true drop=true max-buffers=1 " | |
367 "sync=True" | |
368 ) | |
369 elif self.sinks == SINKS_AUTO: | |
370 extra_elt = "compositor name=compositor ! autovideosink" | |
371 local_video_sink_elt = """compositor.sink_1""" | |
372 else: | |
373 raise exceptions.InternalError(f"Unknown sinks value {self.sinks!r}") | |
374 | |
375 self.gst_pipe_desc = f""" | |
376 webrtcbin latency=100 name=sendrecv bundle-policy=max-bundle | |
377 | |
378 input-selector name=video_selector | |
379 ! videorate | |
380 ! video/x-raw,framerate=30/1 | |
381 ! tee name=t | |
382 | |
383 {extra_elt} | |
384 | |
385 {video_source_elt} name=video_src ! queue leaky=downstream ! video_selector. | |
386 videotestsrc is-live=true pattern=black ! queue leaky=downstream ! video_selector. | |
387 | |
388 t. | |
389 ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream | |
390 ! videoconvert | |
391 ! vp8enc deadline=1 keyframe-max-dist=60 | |
392 ! rtpvp8pay picture-id-mode=15-bit | |
393 ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt} | |
394 ! sendrecv. | |
395 | |
396 t. | |
397 ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream | |
398 ! videoconvert | |
399 ! {local_video_sink_elt} | |
400 | |
401 {audio_source_elt} name=audio_src | |
402 ! valve | |
403 ! queue max-size-buffers=10 max-size-time=0 max-size-bytes=0 leaky=downstream | |
404 ! audioconvert | |
405 ! audioresample | |
406 ! opusenc audio-type=voice | |
407 ! rtpopuspay | |
408 ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt} | |
409 ! sendrecv. | |
410 """ | |
411 | |
412 log.debug(f"Gstreamer pipeline: {self.gst_pipe_desc}") | |
413 | |
414 # Create the pipeline | |
415 self.pipeline = Gst.parse_launch(self.gst_pipe_desc) | |
416 if not self.pipeline: | |
417 raise exceptions.InternalError("Failed to create Gstreamer pipeline.") | |
418 | |
419 self.webrtcbin = self.pipeline.get_by_name("sendrecv") | |
420 self.video_src = self.pipeline.get_by_name("video_src") | |
421 self.video_selector = self.pipeline.get_by_name("video_selector") | |
422 self.audio_valve = self.pipeline.get_by_name("audio_valve") | |
423 | |
424 if self.video_muted: | |
425 self.on_video_mute(True) | |
426 if self.audio_muted: | |
427 self.on_audio_mute(True) | |
428 | |
429 # set STUN and TURN servers | |
430 external_disco = data_format.deserialise( | |
431 await self.bridge.external_disco_get("", self.profile), type_check=list | |
432 ) | |
433 | |
434 for server in external_disco: | |
435 if server["type"] == "stun": | |
436 if server["transport"] == "tcp": | |
437 log.info( | |
438 "ignoring TCP STUN server, GStreamer only support one STUN server" | |
439 ) | |
440 url = f"stun://{server['host']}:{server['port']}" | |
441 log.debug(f"adding stun server: {url}") | |
442 self.webrtcbin.set_property("stun-server", url) | |
443 elif server["type"] == "turn": | |
444 url = "{scheme}://{username}:{password}@{host}:{port}".format( | |
445 scheme="turns" if server["transport"] == "tcp" else "turn", | |
446 username=quote_plus(server["username"]), | |
447 password=quote_plus(server["password"]), | |
448 host=server["host"], | |
449 port=server["port"], | |
450 ) | |
451 log.debug(f"adding turn server: {url}") | |
452 | |
453 if not self.webrtcbin.emit("add-turn-server", url): | |
454 log.warning(f"Erreur while adding TURN server {url}") | |
455 | |
456 # local video feedback | |
457 if self.sinks == SINKS_APP: | |
458 assert self.appsink_data is not None | |
459 local_video_sink = self.pipeline.get_by_name("local_video_sink") | |
460 local_video_sink.set_property("emit-signals", True) | |
461 local_video_sink.connect("new-sample", self.appsink_data.local_video_cb) | |
462 local_video_sink_caps = Gst.Caps.from_string(f"video/x-raw,format=RGB") | |
463 local_video_sink.set_property("caps", local_video_sink_caps) | |
464 | |
465 # Create bus and associate signal watchers | |
466 self.bus = self.pipeline.get_bus() | |
467 if not self.bus: | |
468 log.error("Failed to get bus from pipeline.") | |
469 return | |
470 | |
471 self.bus.add_signal_watch() | |
472 self.webrtcbin.connect("pad-added", self.on_pad_added) | |
473 self.bus.connect("message::error", self.on_bus_error) | |
474 self.bus.connect("message::eos", self.on_bus_eos) | |
475 self.webrtcbin.connect("on-negotiation-needed", self.on_negotiation_needed) | |
476 self.webrtcbin.connect("on-ice-candidate", self.on_ice_candidate) | |
477 self.webrtcbin.connect( | |
478 "notify::ice-gathering-state", self.on_ice_gathering_state_change | |
479 ) | |
480 self.webrtcbin.connect( | |
481 "notify::ice-connection-state", self.on_ice_connection_state | |
482 ) | |
483 | |
484 def start_pipeline(self) -> None: | |
485 """Starts the GStreamer pipeline.""" | |
486 log.debug("starting the pipeline") | |
487 self.pipeline.set_state(Gst.State.PLAYING) | |
488 | |
489 def on_negotiation_needed(self, webrtc): | |
490 """Initiate SDP offer when negotiation is needed.""" | |
491 log.debug("Negotiation needed.") | |
492 if self.role == "initiator": | |
493 log.debug("Creating offer…") | |
494 promise = Gst.Promise.new_with_change_func(self.on_offer_created) | |
495 self.webrtcbin.emit("create-offer", None, promise) | |
496 | |
497 def on_offer_created(self, promise): | |
498 """Callback for when SDP offer is created.""" | |
499 log.info("on_offer_created called") | |
500 assert promise.wait() == Gst.PromiseResult.REPLIED | |
501 reply = promise.get_reply() | |
502 if reply is None: | |
503 log.error("Promise reply is None. Offer creation might have failed.") | |
504 return | |
505 offer = reply["offer"] | |
506 self.offer = offer.sdp.as_text() | |
507 log.info(f"SDP offer created: \n{self.offer}") | |
508 self._set_media_types(self.offer) | |
509 promise = Gst.Promise.new() | |
510 self.webrtcbin.emit("set-local-description", offer, promise) | |
511 promise.interrupt() | |
512 self._a_call(self._start_call) | |
513 | |
514 def on_answer_set(self, promise): | |
515 assert promise.wait() == Gst.PromiseResult.REPLIED | |
516 | |
517 def on_answer_created(self, promise, _, __): | |
518 """Callback for when SDP answer is created.""" | |
519 assert promise.wait() == Gst.PromiseResult.REPLIED | |
520 reply = promise.get_reply() | |
521 answer = reply["answer"] | |
522 promise = Gst.Promise.new() | |
523 self.webrtcbin.emit("set-local-description", answer, promise) | |
524 promise.interrupt() | |
525 answer_sdp = answer.sdp.as_text() | |
526 log.info(f"SDP answer set: \n{answer_sdp}") | |
527 self.sdp_set = True | |
528 self._a_call(self.bridge.call_answer_sdp, self.sid, answer_sdp, self.profile) | |
529 | |
530 def on_offer_set(self, promise): | |
531 assert promise.wait() == Gst.PromiseResult.REPLIED | |
532 promise = Gst.Promise.new_with_change_func(self.on_answer_created, None, None) | |
533 self.webrtcbin.emit("create-answer", None, promise) | |
534 | |
535 def link_element_or_pad( | |
536 self, source: Gst.Element, dest: Gst.Element | Gst.Pad | |
537 ) -> bool: | |
538 """Check if dest is a pad or an element, and link appropriately""" | |
539 src_pad = source.get_static_pad("src") | |
540 | |
541 if isinstance(dest, Gst.Pad): | |
542 # If the dest is a pad, link directly | |
543 if not src_pad.link(dest) == Gst.PadLinkReturn.OK: | |
544 log.error( | |
545 "Failed to link 'conv' to the compositor's newly requested pad!" | |
546 ) | |
547 return False | |
548 elif isinstance(dest, Gst.Element): | |
549 return source.link(dest) | |
550 else: | |
551 log.error(f"Unexpected type for dest: {type(sink)}") | |
552 return False | |
553 | |
554 return True | |
555 | |
556 def scaled_dimensions( | |
557 self, original_width: int, original_height: int, max_width: int, max_height: int | |
558 ) -> tuple[int, int]: | |
559 """Calculates the scaled dimensions preserving aspect ratio. | |
560 | |
561 @param original_width: Original width of the video stream. | |
562 @param original_height: Original height of the video stream. | |
563 @param max_width: Maximum desired width for the scaled video. | |
564 @param max_height: Maximum desired height for the scaled video. | |
565 @return: The width and height of the scaled video. | |
566 """ | |
567 aspect_ratio = original_width / original_height | |
568 new_width = int(max_height * aspect_ratio) | |
569 | |
570 if new_width <= max_width: | |
571 return new_width, max_height | |
572 | |
573 new_height = int(max_width / aspect_ratio) | |
574 return max_width, new_height | |
575 | |
576 def on_remote_decodebin_stream(self, _, pad: Gst.Pad) -> None: | |
577 """Handle the stream from the remote decodebin. | |
578 | |
579 This method processes the incoming stream from the remote decodebin, determining | |
580 whether it's video or audio. It then sets up the appropriate GStreamer elements | |
581 for video/audio processing and adds them to the pipeline. | |
582 | |
583 @param pad: The Gst.Pad from the remote decodebin producing the stream. | |
584 """ | |
585 assert self.pipeline is not None | |
586 if not pad.has_current_caps(): | |
587 log.error(f"{pad} has no caps, ignoring") | |
588 return | |
589 | |
590 caps = pad.get_current_caps() | |
591 assert len(caps) | |
592 s = caps[0] | |
593 name = s.get_name() | |
594 log.debug(f"====> NAME START: {name}") | |
595 | |
596 q = Gst.ElementFactory.make("queue") | |
597 | |
598 if name.startswith("video"): | |
599 log.debug("===> VIDEO OK") | |
600 | |
601 self._remote_video_pad = pad | |
602 | |
603 # Check and log the original size of the video | |
604 width = s.get_int("width").value | |
605 height = s.get_int("height").value | |
606 log.info(f"Original video size: {width}x{height}") | |
607 | |
608 # This is a fix for an issue found with Movim on desktop: a non standard | |
609 # resolution is used (990x557) resulting in bad alignement and no color in | |
610 # rendered image | |
611 adjust_resolution = width % 4 != 0 or height % 4 != 0 | |
612 if adjust_resolution: | |
613 log.info("non standard resolution, we need to adjust size") | |
614 width = (width + 3) // 4 * 4 | |
615 height = (height + 3) // 4 * 4 | |
616 log.info(f"Adjusted video size: {width}x{height}") | |
617 | |
618 conv = Gst.ElementFactory.make("videoconvert") | |
619 if self.sinks == SINKS_APP: | |
620 assert self.appsink_data is not None | |
621 remote_video_sink = Gst.ElementFactory.make("appsink") | |
622 | |
623 appsink_caps = Gst.Caps.from_string("video/x-raw,format=RGB") | |
624 remote_video_sink.set_property("caps", appsink_caps) | |
625 | |
626 remote_video_sink.set_property("emit-signals", True) | |
627 remote_video_sink.set_property("drop", True) | |
628 remote_video_sink.set_property("max-buffers", 1) | |
629 remote_video_sink.set_property("sync", True) | |
630 remote_video_sink.connect("new-sample", self.appsink_data.remote_video_cb) | |
631 self.pipeline.add(remote_video_sink) | |
632 if self.sinks == SINKS_AUTO: | |
633 compositor = self.pipeline.get_by_name("compositor") | |
634 | |
635 sink1_pad = compositor.get_static_pad("sink_1") | |
636 | |
637 local_width, local_height = self.scaled_dimensions( | |
638 sink1_pad.get_property("width"), | |
639 sink1_pad.get_property("height"), | |
640 width // 3, | |
641 height // 3, | |
642 ) | |
643 | |
644 sink1_pad.set_property("xpos", width - local_width) | |
645 sink1_pad.set_property("ypos", height - local_height) | |
646 sink1_pad.set_property("width", local_width) | |
647 sink1_pad.set_property("height", local_height) | |
648 sink1_pad.set_property("zorder", 1) | |
649 | |
650 # Request a new pad for the remote stream | |
651 sink_pad_template = compositor.get_pad_template("sink_%u") | |
652 remote_video_sink = compositor.request_pad(sink_pad_template, None, None) | |
653 remote_video_sink.set_property("zorder", 0) | |
654 | |
655 else: | |
656 raise exceptions.InternalError(f'Unhandled "sinks" value: {self.sinks!r}') | |
657 | |
658 if adjust_resolution: | |
659 videoscale = Gst.ElementFactory.make("videoscale") | |
660 adjusted_caps = Gst.Caps.from_string( | |
661 f"video/x-raw,width={width},height={height}" | |
662 ) | |
663 capsfilter = Gst.ElementFactory.make("capsfilter") | |
664 capsfilter.set_property("caps", adjusted_caps) | |
665 | |
666 self.pipeline.add(q, conv, videoscale, capsfilter) | |
667 | |
668 | |
669 self.pipeline.sync_children_states() | |
670 ret = pad.link(q.get_static_pad("sink")) | |
671 if ret != Gst.PadLinkReturn.OK: | |
672 log.error(f"Error linking pad: {ret}") | |
673 q.link(conv) | |
674 conv.link(videoscale) | |
675 videoscale.link(capsfilter) | |
676 self.link_element_or_pad(capsfilter.link, remote_video_sink) | |
677 | |
678 else: | |
679 self.pipeline.add(q, conv) | |
680 | |
681 self.pipeline.sync_children_states() | |
682 ret = pad.link(q.get_static_pad("sink")) | |
683 if ret != Gst.PadLinkReturn.OK: | |
684 log.error(f"Error linking pad: {ret}") | |
685 q.link(conv) | |
686 self.link_element_or_pad(conv, remote_video_sink) | |
687 | |
688 elif name.startswith("audio"): | |
689 log.debug("===> Audio OK") | |
690 conv = Gst.ElementFactory.make("audioconvert") | |
691 resample = Gst.ElementFactory.make("audioresample") | |
692 remote_audio_sink = Gst.ElementFactory.make("autoaudiosink") | |
693 self.pipeline.add(q, conv, resample, remote_audio_sink) | |
694 self.pipeline.sync_children_states() | |
695 ret = pad.link(q.get_static_pad("sink")) | |
696 if ret != Gst.PadLinkReturn.OK: | |
697 log.error(f"Error linking pad: {ret}") | |
698 q.link(conv) | |
699 conv.link(resample) | |
700 resample.link(remote_audio_sink) | |
701 | |
702 else: | |
703 log.warning(f"unmanaged name: {name!r}") | |
704 | |
705 def on_pad_added(self, __, pad: Gst.Pad) -> None: | |
706 """Handle the addition of a new pad to the element. | |
707 | |
708 When a new source pad is added to the element, this method creates a decodebin, | |
709 connects it to handle the stream, and links the pad to the decodebin. | |
710 | |
711 @param __: Placeholder for the signal source. Not used in this method. | |
712 @param pad: The newly added pad. | |
713 """ | |
714 log.debug("on_pad_added") | |
715 if pad.direction != Gst.PadDirection.SRC: | |
716 return | |
717 | |
718 decodebin = Gst.ElementFactory.make("decodebin") | |
719 decodebin.connect("pad-added", self.on_remote_decodebin_stream) | |
720 self.pipeline.add(decodebin) | |
721 decodebin.sync_state_with_parent() | |
722 pad.link(decodebin.get_static_pad("sink")) | |
723 | |
724 async def _start_call(self) -> None: | |
725 """Initiate the call. | |
726 | |
727 Initiates a call with the callee using the stored offer. If there are any buffered | |
728 local ICE candidates, they are sent as part of the initiation. | |
729 """ | |
730 assert self.callee | |
731 self.sid = await self.bridge.call_start( | |
732 str(self.callee), data_format.serialise({"sdp": self.offer}), self.profile | |
733 ) | |
734 if self.local_candidates_buffer: | |
735 log.debug( | |
736 f"sending buffered local ICE candidates: {self.local_candidates_buffer}" | |
737 ) | |
738 if self.pwd is None: | |
739 sdp = self.webrtcbin.props.local_description.sdp.as_text() | |
740 self.extract_ufrag_pwd(sdp) | |
741 ice_data = {} | |
742 for media_type, candidates in self.local_candidates_buffer.items(): | |
743 ice_data[media_type] = { | |
744 "ufrag": self.ufrag, | |
745 "pwd": self.pwd, | |
746 "candidates": candidates, | |
747 } | |
748 await self.bridge.ice_candidates_add( | |
749 self.sid, data_format.serialise(ice_data), self.profile | |
750 ) | |
751 self.local_candidates_buffer.clear() | |
752 | |
753 def _remote_sdp_set(self, promise) -> None: | |
754 assert promise.wait() == Gst.PromiseResult.REPLIED | |
755 self.sdp_set = True | |
756 | |
757 def on_accepted_call(self, sdp: str, profile: str) -> None: | |
758 """Outgoing call has been accepted. | |
759 | |
760 @param sdp: The SDP answer string received from the other party. | |
761 @param profile: Profile used for the call. | |
762 """ | |
763 log.debug(f"SDP answer received: \n{sdp}") | |
764 | |
765 __, sdpmsg = GstSdp.SDPMessage.new_from_text(sdp) | |
766 answer = GstWebRTC.WebRTCSessionDescription.new( | |
767 GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg | |
768 ) | |
769 promise = Gst.Promise.new_with_change_func(self._remote_sdp_set) | |
770 self.webrtcbin.emit("set-remote-description", answer, promise) | |
771 | |
772 async def answer_call(self, sdp: str, profile: str) -> None: | |
773 """Answer an incoming call | |
774 | |
775 @param sdp: The SDP offer string received from the initiator. | |
776 @param profile: Profile used for the call. | |
777 | |
778 @raise AssertionError: Raised when either "VP8" or "OPUS" is not present in | |
779 payload types. | |
780 """ | |
781 log.debug(f"SDP offer received: \n{sdp}") | |
782 self._set_media_types(sdp) | |
783 __, offer_sdp_msg = GstSdp.SDPMessage.new_from_text(sdp) | |
784 payload_types = self.get_payload_types( | |
785 offer_sdp_msg, video_encoding="VP8", audio_encoding="OPUS" | |
786 ) | |
787 assert "VP8" in payload_types | |
788 assert "OPUS" in payload_types | |
789 await self.setup_call( | |
790 "responder", audio_pt=payload_types["OPUS"], video_pt=payload_types["VP8"] | |
791 ) | |
792 self.start_pipeline() | |
793 offer = GstWebRTC.WebRTCSessionDescription.new( | |
794 GstWebRTC.WebRTCSDPType.OFFER, offer_sdp_msg | |
795 ) | |
796 promise = Gst.Promise.new_with_change_func(self.on_offer_set) | |
797 self.webrtcbin.emit("set-remote-description", offer, promise) | |
798 | |
799 def on_ice_candidate(self, webrtc, mline_index, candidate_sdp): | |
800 """Handles the on-ice-candidate signal of webrtcbin. | |
801 | |
802 @param webrtc: The webrtcbin element. | |
803 @param mlineindex: The mline index. | |
804 @param candidate: The ICE candidate. | |
805 """ | |
806 log.debug( | |
807 f"Local ICE candidate. MLine Index: {mline_index}, Candidate: {candidate_sdp}" | |
808 ) | |
809 parsed_candidate = self.parse_ice_candidate(candidate_sdp) | |
810 try: | |
811 media_type = self.media_types[mline_index] | |
812 except KeyError: | |
813 raise exceptions.InternalError("can't find media type") | |
814 | |
815 if self.sid is None: | |
816 log.debug("buffering local ICE candidate") | |
817 self.local_candidates_buffer.setdefault(media_type, []).append( | |
818 parsed_candidate | |
819 ) | |
820 else: | |
821 sdp = self.webrtcbin.props.local_description.sdp.as_text() | |
822 assert sdp is not None | |
823 ufrag, pwd = self.extract_ufrag_pwd(sdp) | |
824 ice_data = {"ufrag": ufrag, "pwd": pwd, "candidates": [parsed_candidate]} | |
825 self._a_call( | |
826 self.bridge.ice_candidates_add, | |
827 self.sid, | |
828 data_format.serialise({media_type: ice_data}), | |
829 self.profile, | |
830 ) | |
831 | |
832 def on_ice_candidates_new(self, candidates: dict) -> None: | |
833 """Handle new ICE candidates. | |
834 | |
835 @param candidates: A dictionary containing media types ("audio" or "video") as | |
836 keys and corresponding ICE data as values. | |
837 | |
838 @raise exceptions.InternalError: Raised when sdp mline index is not found. | |
839 """ | |
840 if not self.sdp_set: | |
841 log.debug("buffering remote ICE candidate") | |
842 for media_type in ("audio", "video"): | |
843 media_candidates = candidates.get(media_type) | |
844 if media_candidates: | |
845 buffer = self.remote_candidates_buffer[media_type] | |
846 buffer["candidates"].extend(media_candidates["candidates"]) | |
847 return | |
848 for media_type, ice_data in candidates.items(): | |
849 for candidate in ice_data["candidates"]: | |
850 candidate_sdp = self.build_ice_candidate(candidate) | |
851 try: | |
852 mline_index = self.get_sdp_mline_index(media_type) | |
853 except Exception as e: | |
854 raise exceptions.InternalError(f"Can't find sdp mline index: {e}") | |
855 self.webrtcbin.emit("add-ice-candidate", mline_index, candidate_sdp) | |
856 log.debug( | |
857 f"Remote ICE candidate added. MLine Index: {mline_index}, " | |
858 f"{candidate_sdp}" | |
859 ) | |
860 | |
861 def on_ice_gathering_state_change(self, pspec, __): | |
862 state = self.webrtcbin.get_property("ice-gathering-state") | |
863 log.debug(f"ICE gathering state changed to {state}") | |
864 | |
865 def on_ice_connection_state(self, pspec, __): | |
866 state = self.webrtcbin.props.ice_connection_state | |
867 if state == GstWebRTC.WebRTCICEConnectionState.FAILED: | |
868 log.error("ICE connection failed") | |
869 log.info(f"ICE connection state changed to {state}") | |
870 | |
871 def on_bus_error(self, bus: Gst.Bus, message: Gst.Message) -> None: | |
872 """Handles the GStreamer bus error messages. | |
873 | |
874 @param bus: The GStreamer bus. | |
875 @param message: The error message. | |
876 """ | |
877 err, debug = message.parse_error() | |
878 log.error(f"Error from {message.src.get_name()}: {err.message}") | |
879 log.error(f"Debugging info: {debug}") | |
880 | |
881 def on_bus_eos(self, bus: Gst.Bus, message: Gst.Message) -> None: | |
882 """Handles the GStreamer bus eos messages. | |
883 | |
884 @param bus: The GStreamer bus. | |
885 @param message: The eos message. | |
886 """ | |
887 log.info("End of stream") | |
888 | |
889 def on_audio_mute(self, muted: bool) -> None: | |
890 if self.audio_valve is not None: | |
891 self.audio_valve.set_property("drop", muted) | |
892 state = "muted" if muted else "unmuted" | |
893 log.info(f"audio is now {state}") | |
894 | |
895 def on_video_mute(self, muted: bool) -> None: | |
896 if self.video_selector is not None: | |
897 # when muted, we switch to a black image and deactivate the camera | |
898 if not muted: | |
899 self.video_src.set_state(Gst.State.PLAYING) | |
900 pad = self.video_selector.get_static_pad("sink_1" if muted else "sink_0") | |
901 self.video_selector.props.active_pad = pad | |
902 if muted: | |
903 self.video_src.set_state(Gst.State.NULL) | |
904 state = "muted" if muted else "unmuted" | |
905 log.info(f"video is now {state}") | |
906 | |
907 async def end_call(self) -> None: | |
908 """Stop streaming and clean instance""" | |
909 self.reset_instance() |