comparison libervia/frontends/tools/webrtc.py @ 4139:6745c6bd4c7a

frontends (tools): webrtc implementation: this is a factored implementation usable by all non-web frontends. Sources and Sinks can be configured easily to use tests source or local webcam/microphone, autosinks or a `appsink` that the frontend will use. rel 426
author Goffi <goffi@goffi.org>
date Wed, 01 Nov 2023 14:03:36 +0100
parents
children 970b6209526a
comparison
equal deleted inserted replaced
4138:5de6f3595380 4139:6745c6bd4c7a
1 #!/usr/bin/env python3
2
3 # Libervia WebRTC implementation
4 # Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 import gi
20 gi.require_versions({
21 "Gst": "1.0",
22 "GstWebRTC": "1.0"
23 })
24 from gi.repository import Gst, GstWebRTC, GstSdp
25
26 try:
27 from gi.overrides import Gst as _
28 except ImportError:
29 print(
30 "no GStreamer python overrides available, please install relevant pacakges on "
31 "your system."
32 )
33 import asyncio
34 from dataclasses import dataclass
35 from datetime import datetime
36 import logging
37 import re
38 from typing import Callable
39 from urllib.parse import quote_plus
40
41 from libervia.backend.core import exceptions
42 from libervia.backend.tools.common import data_format
43 from libervia.frontends.tools import aio
44
45
46 log = logging.getLogger(__name__)
47
48 Gst.init(None)
49
50 SOURCES_AUTO = "auto"
51 SOURCES_TEST = "test"
52 SINKS_APP = "app"
53 SINKS_AUTO = "auto"
54 SINKS_TEST = "test"
55
56
57 @dataclass
58 class AppSinkData:
59 local_video_cb: Callable
60 remote_video_cb: Callable
61
62
63 class WebRTC:
64 """GSTreamer based WebRTC implementation for audio and video communication.
65
66 This class encapsulates the WebRTC functionalities required for initiating and
67 handling audio and video calls.
68 """
69
70 def __init__(
71 self,
72 bridge,
73 profile: str,
74 sources: str = SOURCES_AUTO,
75 sinks: str = SINKS_AUTO,
76 appsink_data: AppSinkData | None = None,
77 reset_cb: Callable | None = None,
78 ) -> None:
79 self.main_loop = asyncio.get_event_loop()
80 self.bridge = bridge
81 self.profile = profile
82 self.pipeline = None
83 self._audio_muted = False
84 self._video_muted = False
85 self.sources = sources
86 self.sinks = sinks
87 if sinks == SINKS_APP:
88 self.appsink_data = appsink_data
89 elif appsink_data is not None:
90 raise exceptions.InternalError(
91 "appsink_data can only be used for SINKS_APP sinks"
92 )
93 self.reset_cb = reset_cb
94 self.reset_instance()
95
96 @property
97 def audio_muted(self):
98 return self._audio_muted
99
100 @audio_muted.setter
101 def audio_muted(self, muted: bool) -> None:
102 if muted != self._audio_muted:
103 self._audio_muted = muted
104 self.on_audio_mute(muted)
105
106 @property
107 def video_muted(self):
108 return self._video_muted
109
110 @video_muted.setter
111 def video_muted(self, muted: bool) -> None:
112 if muted != self._video_muted:
113 self._video_muted = muted
114 self.on_video_mute(muted)
115
116 @property
117 def sdp_set(self):
118 return self._sdp_set
119
120 @sdp_set.setter
121 def sdp_set(self, is_set: bool):
122 self._sdp_set = is_set
123 if is_set:
124 self.on_ice_candidates_new(self.remote_candidates_buffer)
125 for data in self.remote_candidates_buffer.values():
126 data["candidates"].clear()
127
128 @property
129 def media_types(self):
130 if self._media_types is None:
131 raise Exception("self._media_types should not be None!")
132 return self._media_types
133
134 @media_types.setter
135 def media_types(self, new_media_types: dict) -> None:
136 self._media_types = new_media_types
137 self._media_types_inv = {v: k for k, v in new_media_types.items()}
138
139 @property
140 def media_types_inv(self) -> dict:
141 if self._media_types_inv is None:
142 raise Exception("self._media_types_inv should not be None!")
143 return self._media_types_inv
144
145 def generate_dot_file(
146 self,
147 filename: str = "pipeline",
148 details: Gst.DebugGraphDetails = Gst.DebugGraphDetails.ALL,
149 with_timestamp: bool = True,
150 bin_: Gst.Bin|None = None,
151 ) -> None:
152 """Generate Dot File for debugging
153
154 ``GST_DEBUG_DUMP_DOT_DIR`` environment variable must be set to destination dir.
155 ``dot -Tpng -o <filename>.png <filename>.dot`` can be use to convert to a PNG file.
156 See
157 https://gstreamer.freedesktop.org/documentation/gstreamer/debugutils.html?gi-language=python#GstDebugGraphDetails
158 for details.
159
160 @param filename: name of the generated file
161 @param details: which details to print
162 @param with_timestamp: if True, add a timestamp to filename
163 @param bin_: which bin to output. By default, the whole pipeline
164 (``self.pipeline``) will be used.
165 """
166 if bin_ is None:
167 bin_ = self.pipeline
168 if with_timestamp:
169 timestamp = datetime.now().isoformat(timespec='milliseconds')
170 filename = f"{timestamp}_filename"
171
172 Gst.debug_bin_to_dot_file(bin_, details, filename)
173
174 def get_sdp_mline_index(self, media_type: str) -> int:
175 """Gets the sdpMLineIndex for a given media type.
176
177 @param media_type: The type of the media.
178 """
179 for index, m_type in self.media_types.items():
180 if m_type == media_type:
181 return index
182 raise ValueError(f"Media type '{media_type}' not found")
183
184 def _set_media_types(self, offer_sdp: str) -> None:
185 """Sets media types from offer SDP
186
187 @param offer: RTC session description containing the offer
188 """
189 sdp_lines = offer_sdp.splitlines()
190 media_types = {}
191 mline_index = 0
192
193 for line in sdp_lines:
194 if line.startswith("m="):
195 media_types[mline_index] = line[2 : line.find(" ")]
196 mline_index += 1
197
198 self.media_types = media_types
199
200 def _a_call(self, method, *args, **kwargs):
201 """Call an async method in main thread"""
202 aio.run_from_thread(method, *args, **kwargs, loop=self.main_loop)
203
204 def get_payload_types(
205 self, sdpmsg, video_encoding: str, audio_encoding: str
206 ) -> dict[str, int | None]:
207 """Find the payload types for the specified video and audio encoding.
208
209 Very simplistically finds the first payload type matching the encoding
210 name. More complex applications will want to match caps on
211 profile-level-id, packetization-mode, etc.
212 """
213 # method coming from gstreamer example (Matthew Waters, Nirbheek Chauhan) at
214 # subprojects/gst-examples/webrtc/sendrecv/gst/webrtc_sendrecv.py
215 video_pt = None
216 audio_pt = None
217 for i in range(0, sdpmsg.medias_len()):
218 media = sdpmsg.get_media(i)
219 for j in range(0, media.formats_len()):
220 fmt = media.get_format(j)
221 if fmt == "webrtc-datachannel":
222 continue
223 pt = int(fmt)
224 caps = media.get_caps_from_media(pt)
225 s = caps.get_structure(0)
226 encoding_name = s["encoding-name"]
227 if video_pt is None and encoding_name == video_encoding:
228 video_pt = pt
229 elif audio_pt is None and encoding_name == audio_encoding:
230 audio_pt = pt
231 return {video_encoding: video_pt, audio_encoding: audio_pt}
232
233 def parse_ice_candidate(self, candidate_string):
234 """Parses the ice candidate string.
235
236 @param candidate_string: The ice candidate string to be parsed.
237 """
238 pattern = re.compile(
239 r"candidate:(?P<foundation>\S+) (?P<component_id>\d+) (?P<transport>\S+) "
240 r"(?P<priority>\d+) (?P<address>\S+) (?P<port>\d+) typ "
241 r"(?P<type>\S+)(?: raddr (?P<rel_addr>\S+) rport "
242 r"(?P<rel_port>\d+))?(?: generation (?P<generation>\d+))?"
243 )
244 match = pattern.match(candidate_string)
245 if match:
246 candidate_dict = match.groupdict()
247
248 # Apply the correct types to the dictionary values
249 candidate_dict["component_id"] = int(candidate_dict["component_id"])
250 candidate_dict["priority"] = int(candidate_dict["priority"])
251 candidate_dict["port"] = int(candidate_dict["port"])
252
253 if candidate_dict["rel_port"]:
254 candidate_dict["rel_port"] = int(candidate_dict["rel_port"])
255
256 if candidate_dict["generation"]:
257 candidate_dict["generation"] = candidate_dict["generation"]
258
259 # Remove None values
260 return {k: v for k, v in candidate_dict.items() if v is not None}
261 else:
262 log.warning(f"can't parse candidate: {candidate_string!r}")
263 return None
264
265 def build_ice_candidate(self, parsed_candidate):
266 """Builds ICE candidate
267
268 @param parsed_candidate: Dictionary containing parsed ICE candidate
269 """
270 base_format = (
271 "candidate:{foundation} {component_id} {transport} {priority} "
272 "{address} {port} typ {type}"
273 )
274
275 if parsed_candidate.get("rel_addr") and parsed_candidate.get("rel_port"):
276 base_format += " raddr {rel_addr} rport {rel_port}"
277
278 if parsed_candidate.get("generation"):
279 base_format += " generation {generation}"
280
281 return base_format.format(**parsed_candidate)
282
283 def extract_ufrag_pwd(self, sdp: str) -> tuple[str, str]:
284 """Retrieves ICE password and user fragment for SDP offer.
285
286 @param sdp: The Session Description Protocol offer string.
287 @return: ufrag and pwd
288 @raise ValueError: Can't extract ufrag and password
289 """
290 ufrag_line = re.search(r"ice-ufrag:(\S+)", sdp)
291 pwd_line = re.search(r"ice-pwd:(\S+)", sdp)
292
293 if ufrag_line and pwd_line:
294 ufrag = self.ufrag = ufrag_line.group(1)
295 pwd = self.pwd = pwd_line.group(1)
296 return ufrag, pwd
297 else:
298 log.error(f"SDP with missing ice-ufrag or ice-pwd:\n{sdp}")
299 raise ValueError("Can't extract ice-ufrag and ice-pwd from SDP")
300
301 def reset_instance(self):
302 """Inits or resets the instance variables to their default state."""
303 self.role: str | None = None
304 if self.pipeline is not None:
305 self.pipeline.set_state(Gst.State.NULL)
306 self.pipeline = None
307 self._remote_video_pad = None
308 self.sid: str | None = None
309 self.offer: str | None = None
310 self.local_candidates_buffer = {}
311 self.ufrag: str | None = None
312 self.pwd: str | None = None
313 self.callee: str | None = None
314 self._media_types = None
315 self._media_types_inv = None
316 self._sdp_set: bool = False
317 self.remote_candidates_buffer: dict[str, dict[str, list]] = {
318 "audio": {"candidates": []},
319 "video": {"candidates": []},
320 }
321 self._media_types = None
322 self._media_types_inv = None
323 self.audio_valve = None
324 self.video_valve = None
325 if self.reset_cb is not None:
326 self.reset_cb()
327
328
329 async def setup_call(
330 self,
331 role: str,
332 audio_pt: int | None = 96,
333 video_pt: int | None = 97,
334 ) -> None:
335 """Sets up the call.
336
337 This method establishes the Gstreamer pipeline for audio and video communication.
338 The method also manages STUN and TURN server configurations, signal watchers, and
339 various connection handlers for the webrtcbin.
340
341 @param role: The role for the call, either 'initiator' or 'responder'.
342 @param audio_pt: The payload type for the audio stream.
343 @param video_pt: The payload type for the video stream
344
345 @raises NotImplementedError: If audio_pt or video_pt is set to None.
346 @raises AssertionError: If the role is not 'initiator' or 'responder'.
347 """
348 assert role in ("initiator", "responder")
349 self.role = role
350 if audio_pt is None or video_pt is None:
351 raise NotImplementedError("None value is not handled yet")
352
353 if self.sources == SOURCES_AUTO:
354 video_source_elt = "v4l2src"
355 audio_source_elt = "pulsesrc"
356 elif self.sources == SOURCES_TEST:
357 video_source_elt = "videotestsrc is-live=true pattern=ball"
358 audio_source_elt = "audiotestsrc"
359 else:
360 raise exceptions.InternalError(f'Unknown "sources" value: {self.sources!r}')
361
362 extra_elt = ""
363
364 if self.sinks == SINKS_APP:
365 local_video_sink_elt = (
366 "appsink name=local_video_sink emit-signals=true drop=true max-buffers=1 "
367 "sync=True"
368 )
369 elif self.sinks == SINKS_AUTO:
370 extra_elt = "compositor name=compositor ! autovideosink"
371 local_video_sink_elt = """compositor.sink_1"""
372 else:
373 raise exceptions.InternalError(f"Unknown sinks value {self.sinks!r}")
374
375 self.gst_pipe_desc = f"""
376 webrtcbin latency=100 name=sendrecv bundle-policy=max-bundle
377
378 input-selector name=video_selector
379 ! videorate
380 ! video/x-raw,framerate=30/1
381 ! tee name=t
382
383 {extra_elt}
384
385 {video_source_elt} name=video_src ! queue leaky=downstream ! video_selector.
386 videotestsrc is-live=true pattern=black ! queue leaky=downstream ! video_selector.
387
388 t.
389 ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream
390 ! videoconvert
391 ! vp8enc deadline=1 keyframe-max-dist=60
392 ! rtpvp8pay picture-id-mode=15-bit
393 ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt}
394 ! sendrecv.
395
396 t.
397 ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream
398 ! videoconvert
399 ! {local_video_sink_elt}
400
401 {audio_source_elt} name=audio_src
402 ! valve
403 ! queue max-size-buffers=10 max-size-time=0 max-size-bytes=0 leaky=downstream
404 ! audioconvert
405 ! audioresample
406 ! opusenc audio-type=voice
407 ! rtpopuspay
408 ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt}
409 ! sendrecv.
410 """
411
412 log.debug(f"Gstreamer pipeline: {self.gst_pipe_desc}")
413
414 # Create the pipeline
415 self.pipeline = Gst.parse_launch(self.gst_pipe_desc)
416 if not self.pipeline:
417 raise exceptions.InternalError("Failed to create Gstreamer pipeline.")
418
419 self.webrtcbin = self.pipeline.get_by_name("sendrecv")
420 self.video_src = self.pipeline.get_by_name("video_src")
421 self.video_selector = self.pipeline.get_by_name("video_selector")
422 self.audio_valve = self.pipeline.get_by_name("audio_valve")
423
424 if self.video_muted:
425 self.on_video_mute(True)
426 if self.audio_muted:
427 self.on_audio_mute(True)
428
429 # set STUN and TURN servers
430 external_disco = data_format.deserialise(
431 await self.bridge.external_disco_get("", self.profile), type_check=list
432 )
433
434 for server in external_disco:
435 if server["type"] == "stun":
436 if server["transport"] == "tcp":
437 log.info(
438 "ignoring TCP STUN server, GStreamer only support one STUN server"
439 )
440 url = f"stun://{server['host']}:{server['port']}"
441 log.debug(f"adding stun server: {url}")
442 self.webrtcbin.set_property("stun-server", url)
443 elif server["type"] == "turn":
444 url = "{scheme}://{username}:{password}@{host}:{port}".format(
445 scheme="turns" if server["transport"] == "tcp" else "turn",
446 username=quote_plus(server["username"]),
447 password=quote_plus(server["password"]),
448 host=server["host"],
449 port=server["port"],
450 )
451 log.debug(f"adding turn server: {url}")
452
453 if not self.webrtcbin.emit("add-turn-server", url):
454 log.warning(f"Erreur while adding TURN server {url}")
455
456 # local video feedback
457 if self.sinks == SINKS_APP:
458 assert self.appsink_data is not None
459 local_video_sink = self.pipeline.get_by_name("local_video_sink")
460 local_video_sink.set_property("emit-signals", True)
461 local_video_sink.connect("new-sample", self.appsink_data.local_video_cb)
462 local_video_sink_caps = Gst.Caps.from_string(f"video/x-raw,format=RGB")
463 local_video_sink.set_property("caps", local_video_sink_caps)
464
465 # Create bus and associate signal watchers
466 self.bus = self.pipeline.get_bus()
467 if not self.bus:
468 log.error("Failed to get bus from pipeline.")
469 return
470
471 self.bus.add_signal_watch()
472 self.webrtcbin.connect("pad-added", self.on_pad_added)
473 self.bus.connect("message::error", self.on_bus_error)
474 self.bus.connect("message::eos", self.on_bus_eos)
475 self.webrtcbin.connect("on-negotiation-needed", self.on_negotiation_needed)
476 self.webrtcbin.connect("on-ice-candidate", self.on_ice_candidate)
477 self.webrtcbin.connect(
478 "notify::ice-gathering-state", self.on_ice_gathering_state_change
479 )
480 self.webrtcbin.connect(
481 "notify::ice-connection-state", self.on_ice_connection_state
482 )
483
484 def start_pipeline(self) -> None:
485 """Starts the GStreamer pipeline."""
486 log.debug("starting the pipeline")
487 self.pipeline.set_state(Gst.State.PLAYING)
488
489 def on_negotiation_needed(self, webrtc):
490 """Initiate SDP offer when negotiation is needed."""
491 log.debug("Negotiation needed.")
492 if self.role == "initiator":
493 log.debug("Creating offer…")
494 promise = Gst.Promise.new_with_change_func(self.on_offer_created)
495 self.webrtcbin.emit("create-offer", None, promise)
496
497 def on_offer_created(self, promise):
498 """Callback for when SDP offer is created."""
499 log.info("on_offer_created called")
500 assert promise.wait() == Gst.PromiseResult.REPLIED
501 reply = promise.get_reply()
502 if reply is None:
503 log.error("Promise reply is None. Offer creation might have failed.")
504 return
505 offer = reply["offer"]
506 self.offer = offer.sdp.as_text()
507 log.info(f"SDP offer created: \n{self.offer}")
508 self._set_media_types(self.offer)
509 promise = Gst.Promise.new()
510 self.webrtcbin.emit("set-local-description", offer, promise)
511 promise.interrupt()
512 self._a_call(self._start_call)
513
514 def on_answer_set(self, promise):
515 assert promise.wait() == Gst.PromiseResult.REPLIED
516
517 def on_answer_created(self, promise, _, __):
518 """Callback for when SDP answer is created."""
519 assert promise.wait() == Gst.PromiseResult.REPLIED
520 reply = promise.get_reply()
521 answer = reply["answer"]
522 promise = Gst.Promise.new()
523 self.webrtcbin.emit("set-local-description", answer, promise)
524 promise.interrupt()
525 answer_sdp = answer.sdp.as_text()
526 log.info(f"SDP answer set: \n{answer_sdp}")
527 self.sdp_set = True
528 self._a_call(self.bridge.call_answer_sdp, self.sid, answer_sdp, self.profile)
529
530 def on_offer_set(self, promise):
531 assert promise.wait() == Gst.PromiseResult.REPLIED
532 promise = Gst.Promise.new_with_change_func(self.on_answer_created, None, None)
533 self.webrtcbin.emit("create-answer", None, promise)
534
535 def link_element_or_pad(
536 self, source: Gst.Element, dest: Gst.Element | Gst.Pad
537 ) -> bool:
538 """Check if dest is a pad or an element, and link appropriately"""
539 src_pad = source.get_static_pad("src")
540
541 if isinstance(dest, Gst.Pad):
542 # If the dest is a pad, link directly
543 if not src_pad.link(dest) == Gst.PadLinkReturn.OK:
544 log.error(
545 "Failed to link 'conv' to the compositor's newly requested pad!"
546 )
547 return False
548 elif isinstance(dest, Gst.Element):
549 return source.link(dest)
550 else:
551 log.error(f"Unexpected type for dest: {type(sink)}")
552 return False
553
554 return True
555
556 def scaled_dimensions(
557 self, original_width: int, original_height: int, max_width: int, max_height: int
558 ) -> tuple[int, int]:
559 """Calculates the scaled dimensions preserving aspect ratio.
560
561 @param original_width: Original width of the video stream.
562 @param original_height: Original height of the video stream.
563 @param max_width: Maximum desired width for the scaled video.
564 @param max_height: Maximum desired height for the scaled video.
565 @return: The width and height of the scaled video.
566 """
567 aspect_ratio = original_width / original_height
568 new_width = int(max_height * aspect_ratio)
569
570 if new_width <= max_width:
571 return new_width, max_height
572
573 new_height = int(max_width / aspect_ratio)
574 return max_width, new_height
575
576 def on_remote_decodebin_stream(self, _, pad: Gst.Pad) -> None:
577 """Handle the stream from the remote decodebin.
578
579 This method processes the incoming stream from the remote decodebin, determining
580 whether it's video or audio. It then sets up the appropriate GStreamer elements
581 for video/audio processing and adds them to the pipeline.
582
583 @param pad: The Gst.Pad from the remote decodebin producing the stream.
584 """
585 assert self.pipeline is not None
586 if not pad.has_current_caps():
587 log.error(f"{pad} has no caps, ignoring")
588 return
589
590 caps = pad.get_current_caps()
591 assert len(caps)
592 s = caps[0]
593 name = s.get_name()
594 log.debug(f"====> NAME START: {name}")
595
596 q = Gst.ElementFactory.make("queue")
597
598 if name.startswith("video"):
599 log.debug("===> VIDEO OK")
600
601 self._remote_video_pad = pad
602
603 # Check and log the original size of the video
604 width = s.get_int("width").value
605 height = s.get_int("height").value
606 log.info(f"Original video size: {width}x{height}")
607
608 # This is a fix for an issue found with Movim on desktop: a non standard
609 # resolution is used (990x557) resulting in bad alignement and no color in
610 # rendered image
611 adjust_resolution = width % 4 != 0 or height % 4 != 0
612 if adjust_resolution:
613 log.info("non standard resolution, we need to adjust size")
614 width = (width + 3) // 4 * 4
615 height = (height + 3) // 4 * 4
616 log.info(f"Adjusted video size: {width}x{height}")
617
618 conv = Gst.ElementFactory.make("videoconvert")
619 if self.sinks == SINKS_APP:
620 assert self.appsink_data is not None
621 remote_video_sink = Gst.ElementFactory.make("appsink")
622
623 appsink_caps = Gst.Caps.from_string("video/x-raw,format=RGB")
624 remote_video_sink.set_property("caps", appsink_caps)
625
626 remote_video_sink.set_property("emit-signals", True)
627 remote_video_sink.set_property("drop", True)
628 remote_video_sink.set_property("max-buffers", 1)
629 remote_video_sink.set_property("sync", True)
630 remote_video_sink.connect("new-sample", self.appsink_data.remote_video_cb)
631 self.pipeline.add(remote_video_sink)
632 if self.sinks == SINKS_AUTO:
633 compositor = self.pipeline.get_by_name("compositor")
634
635 sink1_pad = compositor.get_static_pad("sink_1")
636
637 local_width, local_height = self.scaled_dimensions(
638 sink1_pad.get_property("width"),
639 sink1_pad.get_property("height"),
640 width // 3,
641 height // 3,
642 )
643
644 sink1_pad.set_property("xpos", width - local_width)
645 sink1_pad.set_property("ypos", height - local_height)
646 sink1_pad.set_property("width", local_width)
647 sink1_pad.set_property("height", local_height)
648 sink1_pad.set_property("zorder", 1)
649
650 # Request a new pad for the remote stream
651 sink_pad_template = compositor.get_pad_template("sink_%u")
652 remote_video_sink = compositor.request_pad(sink_pad_template, None, None)
653 remote_video_sink.set_property("zorder", 0)
654
655 else:
656 raise exceptions.InternalError(f'Unhandled "sinks" value: {self.sinks!r}')
657
658 if adjust_resolution:
659 videoscale = Gst.ElementFactory.make("videoscale")
660 adjusted_caps = Gst.Caps.from_string(
661 f"video/x-raw,width={width},height={height}"
662 )
663 capsfilter = Gst.ElementFactory.make("capsfilter")
664 capsfilter.set_property("caps", adjusted_caps)
665
666 self.pipeline.add(q, conv, videoscale, capsfilter)
667
668
669 self.pipeline.sync_children_states()
670 ret = pad.link(q.get_static_pad("sink"))
671 if ret != Gst.PadLinkReturn.OK:
672 log.error(f"Error linking pad: {ret}")
673 q.link(conv)
674 conv.link(videoscale)
675 videoscale.link(capsfilter)
676 self.link_element_or_pad(capsfilter.link, remote_video_sink)
677
678 else:
679 self.pipeline.add(q, conv)
680
681 self.pipeline.sync_children_states()
682 ret = pad.link(q.get_static_pad("sink"))
683 if ret != Gst.PadLinkReturn.OK:
684 log.error(f"Error linking pad: {ret}")
685 q.link(conv)
686 self.link_element_or_pad(conv, remote_video_sink)
687
688 elif name.startswith("audio"):
689 log.debug("===> Audio OK")
690 conv = Gst.ElementFactory.make("audioconvert")
691 resample = Gst.ElementFactory.make("audioresample")
692 remote_audio_sink = Gst.ElementFactory.make("autoaudiosink")
693 self.pipeline.add(q, conv, resample, remote_audio_sink)
694 self.pipeline.sync_children_states()
695 ret = pad.link(q.get_static_pad("sink"))
696 if ret != Gst.PadLinkReturn.OK:
697 log.error(f"Error linking pad: {ret}")
698 q.link(conv)
699 conv.link(resample)
700 resample.link(remote_audio_sink)
701
702 else:
703 log.warning(f"unmanaged name: {name!r}")
704
705 def on_pad_added(self, __, pad: Gst.Pad) -> None:
706 """Handle the addition of a new pad to the element.
707
708 When a new source pad is added to the element, this method creates a decodebin,
709 connects it to handle the stream, and links the pad to the decodebin.
710
711 @param __: Placeholder for the signal source. Not used in this method.
712 @param pad: The newly added pad.
713 """
714 log.debug("on_pad_added")
715 if pad.direction != Gst.PadDirection.SRC:
716 return
717
718 decodebin = Gst.ElementFactory.make("decodebin")
719 decodebin.connect("pad-added", self.on_remote_decodebin_stream)
720 self.pipeline.add(decodebin)
721 decodebin.sync_state_with_parent()
722 pad.link(decodebin.get_static_pad("sink"))
723
724 async def _start_call(self) -> None:
725 """Initiate the call.
726
727 Initiates a call with the callee using the stored offer. If there are any buffered
728 local ICE candidates, they are sent as part of the initiation.
729 """
730 assert self.callee
731 self.sid = await self.bridge.call_start(
732 str(self.callee), data_format.serialise({"sdp": self.offer}), self.profile
733 )
734 if self.local_candidates_buffer:
735 log.debug(
736 f"sending buffered local ICE candidates: {self.local_candidates_buffer}"
737 )
738 if self.pwd is None:
739 sdp = self.webrtcbin.props.local_description.sdp.as_text()
740 self.extract_ufrag_pwd(sdp)
741 ice_data = {}
742 for media_type, candidates in self.local_candidates_buffer.items():
743 ice_data[media_type] = {
744 "ufrag": self.ufrag,
745 "pwd": self.pwd,
746 "candidates": candidates,
747 }
748 await self.bridge.ice_candidates_add(
749 self.sid, data_format.serialise(ice_data), self.profile
750 )
751 self.local_candidates_buffer.clear()
752
753 def _remote_sdp_set(self, promise) -> None:
754 assert promise.wait() == Gst.PromiseResult.REPLIED
755 self.sdp_set = True
756
757 def on_accepted_call(self, sdp: str, profile: str) -> None:
758 """Outgoing call has been accepted.
759
760 @param sdp: The SDP answer string received from the other party.
761 @param profile: Profile used for the call.
762 """
763 log.debug(f"SDP answer received: \n{sdp}")
764
765 __, sdpmsg = GstSdp.SDPMessage.new_from_text(sdp)
766 answer = GstWebRTC.WebRTCSessionDescription.new(
767 GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg
768 )
769 promise = Gst.Promise.new_with_change_func(self._remote_sdp_set)
770 self.webrtcbin.emit("set-remote-description", answer, promise)
771
772 async def answer_call(self, sdp: str, profile: str) -> None:
773 """Answer an incoming call
774
775 @param sdp: The SDP offer string received from the initiator.
776 @param profile: Profile used for the call.
777
778 @raise AssertionError: Raised when either "VP8" or "OPUS" is not present in
779 payload types.
780 """
781 log.debug(f"SDP offer received: \n{sdp}")
782 self._set_media_types(sdp)
783 __, offer_sdp_msg = GstSdp.SDPMessage.new_from_text(sdp)
784 payload_types = self.get_payload_types(
785 offer_sdp_msg, video_encoding="VP8", audio_encoding="OPUS"
786 )
787 assert "VP8" in payload_types
788 assert "OPUS" in payload_types
789 await self.setup_call(
790 "responder", audio_pt=payload_types["OPUS"], video_pt=payload_types["VP8"]
791 )
792 self.start_pipeline()
793 offer = GstWebRTC.WebRTCSessionDescription.new(
794 GstWebRTC.WebRTCSDPType.OFFER, offer_sdp_msg
795 )
796 promise = Gst.Promise.new_with_change_func(self.on_offer_set)
797 self.webrtcbin.emit("set-remote-description", offer, promise)
798
799 def on_ice_candidate(self, webrtc, mline_index, candidate_sdp):
800 """Handles the on-ice-candidate signal of webrtcbin.
801
802 @param webrtc: The webrtcbin element.
803 @param mlineindex: The mline index.
804 @param candidate: The ICE candidate.
805 """
806 log.debug(
807 f"Local ICE candidate. MLine Index: {mline_index}, Candidate: {candidate_sdp}"
808 )
809 parsed_candidate = self.parse_ice_candidate(candidate_sdp)
810 try:
811 media_type = self.media_types[mline_index]
812 except KeyError:
813 raise exceptions.InternalError("can't find media type")
814
815 if self.sid is None:
816 log.debug("buffering local ICE candidate")
817 self.local_candidates_buffer.setdefault(media_type, []).append(
818 parsed_candidate
819 )
820 else:
821 sdp = self.webrtcbin.props.local_description.sdp.as_text()
822 assert sdp is not None
823 ufrag, pwd = self.extract_ufrag_pwd(sdp)
824 ice_data = {"ufrag": ufrag, "pwd": pwd, "candidates": [parsed_candidate]}
825 self._a_call(
826 self.bridge.ice_candidates_add,
827 self.sid,
828 data_format.serialise({media_type: ice_data}),
829 self.profile,
830 )
831
832 def on_ice_candidates_new(self, candidates: dict) -> None:
833 """Handle new ICE candidates.
834
835 @param candidates: A dictionary containing media types ("audio" or "video") as
836 keys and corresponding ICE data as values.
837
838 @raise exceptions.InternalError: Raised when sdp mline index is not found.
839 """
840 if not self.sdp_set:
841 log.debug("buffering remote ICE candidate")
842 for media_type in ("audio", "video"):
843 media_candidates = candidates.get(media_type)
844 if media_candidates:
845 buffer = self.remote_candidates_buffer[media_type]
846 buffer["candidates"].extend(media_candidates["candidates"])
847 return
848 for media_type, ice_data in candidates.items():
849 for candidate in ice_data["candidates"]:
850 candidate_sdp = self.build_ice_candidate(candidate)
851 try:
852 mline_index = self.get_sdp_mline_index(media_type)
853 except Exception as e:
854 raise exceptions.InternalError(f"Can't find sdp mline index: {e}")
855 self.webrtcbin.emit("add-ice-candidate", mline_index, candidate_sdp)
856 log.debug(
857 f"Remote ICE candidate added. MLine Index: {mline_index}, "
858 f"{candidate_sdp}"
859 )
860
861 def on_ice_gathering_state_change(self, pspec, __):
862 state = self.webrtcbin.get_property("ice-gathering-state")
863 log.debug(f"ICE gathering state changed to {state}")
864
865 def on_ice_connection_state(self, pspec, __):
866 state = self.webrtcbin.props.ice_connection_state
867 if state == GstWebRTC.WebRTCICEConnectionState.FAILED:
868 log.error("ICE connection failed")
869 log.info(f"ICE connection state changed to {state}")
870
871 def on_bus_error(self, bus: Gst.Bus, message: Gst.Message) -> None:
872 """Handles the GStreamer bus error messages.
873
874 @param bus: The GStreamer bus.
875 @param message: The error message.
876 """
877 err, debug = message.parse_error()
878 log.error(f"Error from {message.src.get_name()}: {err.message}")
879 log.error(f"Debugging info: {debug}")
880
881 def on_bus_eos(self, bus: Gst.Bus, message: Gst.Message) -> None:
882 """Handles the GStreamer bus eos messages.
883
884 @param bus: The GStreamer bus.
885 @param message: The eos message.
886 """
887 log.info("End of stream")
888
889 def on_audio_mute(self, muted: bool) -> None:
890 if self.audio_valve is not None:
891 self.audio_valve.set_property("drop", muted)
892 state = "muted" if muted else "unmuted"
893 log.info(f"audio is now {state}")
894
895 def on_video_mute(self, muted: bool) -> None:
896 if self.video_selector is not None:
897 # when muted, we switch to a black image and deactivate the camera
898 if not muted:
899 self.video_src.set_state(Gst.State.PLAYING)
900 pad = self.video_selector.get_static_pad("sink_1" if muted else "sink_0")
901 self.video_selector.props.active_pad = pad
902 if muted:
903 self.video_src.set_state(Gst.State.NULL)
904 state = "muted" if muted else "unmuted"
905 log.info(f"video is now {state}")
906
907 async def end_call(self) -> None:
908 """Stop streaming and clean instance"""
909 self.reset_instance()