Mercurial > libervia-desktop-kivy
comparison libervia/desktop_kivy/plugins/plugin_wid_calls.py @ 509:f0ce49b360c8
calls: move webrtc code to core:
WebRTC code which can be used in several frontends has been factorized and moved to
common `frontends.tools`. Test have been updated consequently.
rel 426
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 01 Nov 2023 13:41:07 +0100 |
parents | 0480f883f0a6 |
children | 97ab236e8f20 |
comparison
equal
deleted
inserted
replaced
508:d87b9a6b0b69 | 509:f0ce49b360c8 |
---|---|
1 from dataclasses import dataclass | 1 from dataclasses import dataclass |
2 from pathlib import Path | 2 from pathlib import Path |
3 import re | |
4 from typing import Optional, Callable | 3 from typing import Optional, Callable |
5 from urllib.parse import quote_plus | |
6 from functools import partial | 4 from functools import partial |
7 | 5 |
8 # from gi.repository import GLib | 6 # from gi.repository import GLib |
9 from gi.repository import GObject, Gst, GstWebRTC, GstSdp | 7 from gi.repository import GObject, Gst, GstWebRTC, GstSdp |
10 | 8 |
35 from libervia.backend.core import log as logging | 33 from libervia.backend.core import log as logging |
36 from libervia.backend.core.i18n import _ | 34 from libervia.backend.core.i18n import _ |
37 from libervia.backend.core import exceptions | 35 from libervia.backend.core import exceptions |
38 from libervia.backend.tools.common import data_format | 36 from libervia.backend.tools.common import data_format |
39 from libervia.frontends.quick_frontend import quick_widgets | 37 from libervia.frontends.quick_frontend import quick_widgets |
40 from libervia.frontends.tools import jid, aio | 38 from libervia.frontends.tools import aio, jid, webrtc |
41 | 39 |
42 from libervia.desktop_kivy import G | 40 from libervia.desktop_kivy import G |
43 | 41 |
44 from ..core import cagou_widget | 42 from ..core import cagou_widget |
45 from ..core import common | 43 from ..core import common |
101 microphone available will be used. | 99 microphone available will be used. |
102 """ | 100 """ |
103 | 101 |
104 test_mode: bool = False | 102 test_mode: bool = False |
105 | 103 |
104 PROXIED_PROPERTIES = {'audio_muted', 'callee', 'sid', 'video_muted'} | |
105 PROXIED_METHODS = {'answer_call', 'end_call', 'on_accepted_call', 'on_ice_candidates_new', 'setup_call', 'start_pipeline'} | |
106 | |
106 def __init__(self, parent_calls: "Calls", profile: str) -> None: | 107 def __init__(self, parent_calls: "Calls", profile: str) -> None: |
107 self.parent_calls = parent_calls | 108 self.parent_calls = parent_calls |
108 self.profile = profile | 109 self.profile = profile |
110 self.webrtc = webrtc.WebRTC( | |
111 G.host.a_bridge, | |
112 profile, | |
113 sinks=webrtc.SINKS_TEST if self.test_mode else webrtc.SINKS_APP, | |
114 appsink_data=webrtc.AppSinkData( | |
115 local_video_cb=partial( | |
116 self.on_new_sample, | |
117 update_sample_method=self.update_sample, | |
118 video_widget=self.parent_calls.local_video | |
119 ), | |
120 remote_video_cb=partial( | |
121 self.on_new_sample, | |
122 update_sample_method=self.update_sample, | |
123 video_widget=self.parent_calls.remote_video | |
124 ) | |
125 ), | |
126 reset_cb=self.on_reset | |
127 | |
128 ) | |
109 self.pipeline = None | 129 self.pipeline = None |
110 self.reset_instance() | 130 |
111 | 131 def __getattr__(self, name): |
112 @property | 132 if name in self.PROXIED_PROPERTIES or name in self.PROXIED_METHODS: |
113 def sdp_set(self): | 133 return getattr(self.webrtc, name) |
114 return self._sdp_set | 134 raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'") |
115 | 135 |
116 @sdp_set.setter | 136 def __setattr__(self, name, value): |
117 def sdp_set(self, is_set: bool): | 137 if name in self.PROXIED_PROPERTIES: |
118 self._sdp_set = is_set | 138 setattr(self.webrtc, name, value) |
119 if is_set: | |
120 self.on_ice_candidates_new(self.remote_candidates_buffer) | |
121 for data in self.remote_candidates_buffer.values(): | |
122 data["candidates"].clear() | |
123 | |
124 @property | |
125 def media_types(self): | |
126 if self._media_types is None: | |
127 raise Exception("self._media_types should not be None!") | |
128 return self._media_types | |
129 | |
130 @media_types.setter | |
131 def media_types(self, new_media_types: dict) -> None: | |
132 self._media_types = new_media_types | |
133 self._media_types_inv = {v: k for k, v in new_media_types.items()} | |
134 | |
135 @property | |
136 def media_types_inv(self) -> dict: | |
137 if self._media_types_inv is None: | |
138 raise Exception("self._media_types_inv should not be None!") | |
139 return self._media_types_inv | |
140 | |
141 def get_sdp_mline_index(self, media_type: str) -> int: | |
142 """Gets the sdpMLineIndex for a given media type. | |
143 | |
144 @param media_type: The type of the media. | |
145 """ | |
146 for index, m_type in self.media_types.items(): | |
147 if m_type == media_type: | |
148 return index | |
149 raise ValueError(f"Media type '{media_type}' not found") | |
150 | |
151 def _set_media_types(self, offer_sdp: str) -> None: | |
152 """Sets media types from offer SDP | |
153 | |
154 @param offer: RTC session description containing the offer | |
155 """ | |
156 sdp_lines = offer_sdp.splitlines() | |
157 media_types = {} | |
158 mline_index = 0 | |
159 | |
160 for line in sdp_lines: | |
161 if line.startswith("m="): | |
162 media_types[mline_index] = line[2 : line.find(" ")] | |
163 mline_index += 1 | |
164 | |
165 self.media_types = media_types | |
166 | |
167 def _a_call(self, method, *args, **kwargs): | |
168 """Call an async method in main thread""" | |
169 | |
170 def wrapper(__): | |
171 aio.run_async(method(*args, **kwargs)) | |
172 return False | |
173 | |
174 Clock.schedule_once(wrapper) | |
175 | |
176 def get_payload_types( | |
177 self, sdpmsg, video_encoding: str, audio_encoding: str | |
178 ) -> dict[str, int | None]: | |
179 """Find the payload types for the specified video and audio encoding. | |
180 | |
181 Very simplistically finds the first payload type matching the encoding | |
182 name. More complex applications will want to match caps on | |
183 profile-level-id, packetization-mode, etc. | |
184 """ | |
185 # method coming from gstreamer example (Matthew Waters, Nirbheek Chauhan) at | |
186 # subprojects/gst-examples/webrtc/sendrecv/gst/webrtc_sendrecv.py | |
187 video_pt = None | |
188 audio_pt = None | |
189 for i in range(0, sdpmsg.medias_len()): | |
190 media = sdpmsg.get_media(i) | |
191 for j in range(0, media.formats_len()): | |
192 fmt = media.get_format(j) | |
193 if fmt == "webrtc-datachannel": | |
194 continue | |
195 pt = int(fmt) | |
196 caps = media.get_caps_from_media(pt) | |
197 s = caps.get_structure(0) | |
198 encoding_name = s["encoding-name"] | |
199 if video_pt is None and encoding_name == video_encoding: | |
200 video_pt = pt | |
201 elif audio_pt is None and encoding_name == audio_encoding: | |
202 audio_pt = pt | |
203 return {video_encoding: video_pt, audio_encoding: audio_pt} | |
204 | |
205 def parse_ice_candidate(self, candidate_string): | |
206 """Parses the ice candidate string. | |
207 | |
208 @param candidate_string: The ice candidate string to be parsed. | |
209 """ | |
210 pattern = re.compile( | |
211 r"candidate:(?P<foundation>\S+) (?P<component_id>\d+) (?P<transport>\S+) " | |
212 r"(?P<priority>\d+) (?P<address>\S+) (?P<port>\d+) typ " | |
213 r"(?P<type>\S+)(?: raddr (?P<rel_addr>\S+) rport " | |
214 r"(?P<rel_port>\d+))?(?: generation (?P<generation>\d+))?" | |
215 ) | |
216 match = pattern.match(candidate_string) | |
217 if match: | |
218 candidate_dict = match.groupdict() | |
219 | |
220 # Apply the correct types to the dictionary values | |
221 candidate_dict["component_id"] = int(candidate_dict["component_id"]) | |
222 candidate_dict["priority"] = int(candidate_dict["priority"]) | |
223 candidate_dict["port"] = int(candidate_dict["port"]) | |
224 | |
225 if candidate_dict["rel_port"]: | |
226 candidate_dict["rel_port"] = int(candidate_dict["rel_port"]) | |
227 | |
228 if candidate_dict["generation"]: | |
229 candidate_dict["generation"] = candidate_dict["generation"] | |
230 | |
231 # Remove None values | |
232 return {k: v for k, v in candidate_dict.items() if v is not None} | |
233 else: | 139 else: |
234 log.warning(f"can't parse candidate: {candidate_string!r}") | 140 super().__setattr__(name, value) |
235 return None | 141 |
236 | 142 def on_reset(self): |
237 def build_ice_candidate(self, parsed_candidate): | |
238 """Builds ICE candidate | |
239 | |
240 @param parsed_candidate: Dictionary containing parsed ICE candidate | |
241 """ | |
242 base_format = ( | |
243 "candidate:{foundation} {component_id} {transport} {priority} " | |
244 "{address} {port} typ {type}" | |
245 ) | |
246 | |
247 if parsed_candidate.get("rel_addr") and parsed_candidate.get("rel_port"): | |
248 base_format += " raddr {rel_addr} rport {rel_port}" | |
249 | |
250 if parsed_candidate.get("generation"): | |
251 base_format += " generation {generation}" | |
252 | |
253 return base_format.format(**parsed_candidate) | |
254 | |
255 def extract_ufrag_pwd(self, sdp: str) -> tuple[str, str]: | |
256 """Retrieves ICE password and user fragment for SDP offer. | |
257 | |
258 @param sdp: The Session Description Protocol offer string. | |
259 @return: ufrag and pwd | |
260 @raise ValueError: Can't extract ufrag and password | |
261 """ | |
262 ufrag_line = re.search(r"ice-ufrag:(\S+)", sdp) | |
263 pwd_line = re.search(r"ice-pwd:(\S+)", sdp) | |
264 | |
265 if ufrag_line and pwd_line: | |
266 ufrag = self.ufrag = ufrag_line.group(1) | |
267 pwd = self.pwd = pwd_line.group(1) | |
268 return ufrag, pwd | |
269 else: | |
270 log.error(f"SDP with missing ice-ufrag or ice-pwd:\n{sdp}") | |
271 raise ValueError("Can't extract ice-ufrag and ice-pwd from SDP") | |
272 | |
273 def reset_instance(self): | |
274 """Inits or resets the instance variables to their default state.""" | |
275 self.role: str | None = None | |
276 if self.pipeline is not None: | |
277 self.pipeline.set_state(Gst.State.NULL) | |
278 self.pipeline = None | |
279 self.texture_map: dict[VideoStreamWidget, TextureData] = {} | 143 self.texture_map: dict[VideoStreamWidget, TextureData] = {} |
280 self._remote_video_pad = None | |
281 self.sid: str | None = None | |
282 self.offer: str | None = None | |
283 self.local_candidates_buffer = {} | |
284 self.ufrag: str | None = None | |
285 self.pwd: str | None = None | |
286 self.callee: str | None = None | |
287 self._media_types = None | |
288 self._media_types_inv = None | |
289 self._sdp_set: bool = False | |
290 self.remote_candidates_buffer: dict[str, dict[str, list]] = { | |
291 "audio": {"candidates": []}, | |
292 "video": {"candidates": []}, | |
293 } | |
294 self._media_types = None | |
295 self._media_types_inv = None | |
296 self.audio_valve = None | |
297 self.video_valve = None | |
298 | |
299 async def setup_call( | |
300 self, | |
301 role: str, | |
302 audio_pt: int | None = 96, | |
303 video_pt: int | None = 97, | |
304 ) -> None: | |
305 """Sets up the call. | |
306 | |
307 This method establishes the Gstreamer pipeline for audio and video communication. | |
308 The method also manages STUN and TURN server configurations, signal watchers, and | |
309 various connection handlers for the webrtcbin. | |
310 | |
311 @param role: The role for the call, either 'initiator' or 'responder'. | |
312 @param audio_pt: The payload type for the audio stream. | |
313 @param video_pt: The payload type for the video stream | |
314 | |
315 @raises NotImplementedError: If audio_pt or video_pt is set to None. | |
316 @raises AssertionError: If the role is not 'initiator' or 'responder'. | |
317 """ | |
318 assert role in ("initiator", "responder") | |
319 self.role = role | |
320 if audio_pt is None or video_pt is None: | |
321 raise NotImplementedError("None value is not handled yet") | |
322 | |
323 if self.test_mode: | |
324 video_source_elt = "videotestsrc is-live=true pattern=ball" | |
325 audio_source_elt = "audiotestsrc" | |
326 else: | |
327 video_source_elt = "v4l2src" | |
328 audio_source_elt = "pulsesrc" | |
329 | |
330 self.gst_pipe_desc = f""" | |
331 webrtcbin latency=100 name=sendrecv bundle-policy=max-compat | |
332 | |
333 input-selector name=video_selector | |
334 ! videorate | |
335 ! video/x-raw,framerate=30/1 | |
336 ! tee name=t | |
337 | |
338 {video_source_elt} name=video_src ! queue ! video_selector. | |
339 videotestsrc is-live=true pattern=black ! queue ! video_selector. | |
340 | |
341 t. | |
342 ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream | |
343 ! videoconvert | |
344 ! vp8enc deadline=1 keyframe-max-dist=60 | |
345 ! rtpvp8pay picture-id-mode=15-bit | |
346 ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt} | |
347 ! sendrecv. | |
348 | |
349 t. | |
350 ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 | |
351 ! videoconvert | |
352 ! appsink name=local_video_sink emit-signals=true drop=true max-buffers=1 sync=True | |
353 | |
354 {audio_source_elt} name=audio_src | |
355 ! valve name=audio_valve | |
356 ! queue max-size-buffers=10 max-size-time=0 max-size-bytes=0 | |
357 ! audioconvert | |
358 ! audioresample | |
359 ! opusenc audio-type=voice | |
360 ! rtpopuspay | |
361 ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt} | |
362 ! sendrecv. | |
363 """ | |
364 | |
365 log.debug(f"Gstreamer pipeline: {self.gst_pipe_desc}") | |
366 | |
367 # Create the pipeline | |
368 self.pipeline = Gst.parse_launch(self.gst_pipe_desc) | |
369 if not self.pipeline: | |
370 raise exceptions.InternalError("Failed to create Gstreamer pipeline.") | |
371 | |
372 self.webrtc = self.pipeline.get_by_name("sendrecv") | |
373 self.video_src = self.pipeline.get_by_name("video_src") | |
374 self.video_selector = self.pipeline.get_by_name("video_selector") | |
375 self.audio_valve = self.pipeline.get_by_name("audio_valve") | |
376 | |
377 if self.parent_calls.video_muted: | |
378 self.on_video_mute(True) | |
379 if self.parent_calls.audio_muted: | |
380 self.on_audio_mute(True) | |
381 | |
382 # set STUN and TURN servers | |
383 external_disco = data_format.deserialise( | |
384 await G.host.a_bridge.external_disco_get("", self.profile), type_check=list | |
385 ) | |
386 | |
387 for server in external_disco: | |
388 if server["type"] == "stun": | |
389 if server["transport"] == "tcp": | |
390 log.info( | |
391 "ignoring TCP STUN server, GStreamer only support one STUN server" | |
392 ) | |
393 url = f"stun://{server['host']}:{server['port']}" | |
394 log.debug(f"adding stun server: {url}") | |
395 self.webrtc.set_property("stun-server", url) | |
396 elif server["type"] == "turn": | |
397 url = "{scheme}://{username}:{password}@{host}:{port}".format( | |
398 scheme="turns" if server["transport"] == "tcp" else "turn", | |
399 username=quote_plus(server["username"]), | |
400 password=quote_plus(server["password"]), | |
401 host=server["host"], | |
402 port=server["port"], | |
403 ) | |
404 log.debug(f"adding turn server: {url}") | |
405 | |
406 if not self.webrtc.emit("add-turn-server", url): | |
407 log.warning(f"Erreur while adding TURN server {url}") | |
408 | |
409 # local video feedback | |
410 local_video_sink = self.pipeline.get_by_name("local_video_sink") | |
411 local_video_sink.set_property("emit-signals", True) | |
412 local_video_sink.connect( | |
413 "new-sample", | |
414 self.on_new_sample, | |
415 self.update_sample, | |
416 self.parent_calls.local_video, | |
417 ) | |
418 local_video_sink_caps = Gst.Caps.from_string(f"video/x-raw,format=RGB") | |
419 local_video_sink.set_property("caps", local_video_sink_caps) | |
420 | |
421 # Create bus and associate signal watchers | |
422 self.bus = self.pipeline.get_bus() | |
423 if not self.bus: | |
424 log.error("Failed to get bus from pipeline.") | |
425 return | |
426 | |
427 self.bus.add_signal_watch() | |
428 self.webrtc.connect("pad-added", self.on_pad_added) | |
429 self.bus.connect("message::error", self.on_bus_error) | |
430 self.bus.connect("message::eos", self.on_bus_eos) | |
431 self.webrtc.connect("on-negotiation-needed", self.on_negotiation_needed) | |
432 self.webrtc.connect("on-ice-candidate", self.on_ice_candidate) | |
433 self.webrtc.connect( | |
434 "notify::ice-gathering-state", self.on_ice_gathering_state_change | |
435 ) | |
436 self.webrtc.connect("notify::ice-connection-state", self.on_ice_connection_state) | |
437 | |
438 def start_pipeline(self) -> None: | |
439 """Starts the GStreamer pipeline.""" | |
440 log.debug("starting the pipeline") | |
441 self.pipeline.set_state(Gst.State.PLAYING) | |
442 | |
443 def on_negotiation_needed(self, webrtc): | |
444 """Initiate SDP offer when negotiation is needed.""" | |
445 log.debug("Negotiation needed.") | |
446 if self.role == "initiator": | |
447 log.debug("Creating offer…") | |
448 promise = Gst.Promise.new_with_change_func(self.on_offer_created) | |
449 self.webrtc.emit("create-offer", None, promise) | |
450 | |
451 def on_offer_created(self, promise): | |
452 """Callback for when SDP offer is created.""" | |
453 log.info("on_offer_created called") | |
454 assert promise.wait() == Gst.PromiseResult.REPLIED | |
455 reply = promise.get_reply() | |
456 if reply is None: | |
457 log.error("Promise reply is None. Offer creation might have failed.") | |
458 return | |
459 offer = reply["offer"] | |
460 self.offer = offer.sdp.as_text() | |
461 log.info(f"SDP offer created: \n{self.offer}") | |
462 self._set_media_types(self.offer) | |
463 promise = Gst.Promise.new() | |
464 self.webrtc.emit("set-local-description", offer, promise) | |
465 promise.interrupt() | |
466 self._a_call(self._start_call) | |
467 | |
468 def on_answer_set(self, promise): | |
469 assert promise.wait() == Gst.PromiseResult.REPLIED | |
470 | |
471 def on_answer_created(self, promise, _, __): | |
472 """Callback for when SDP answer is created.""" | |
473 assert promise.wait() == Gst.PromiseResult.REPLIED | |
474 reply = promise.get_reply() | |
475 answer = reply["answer"] | |
476 promise = Gst.Promise.new() | |
477 self.webrtc.emit("set-local-description", answer, promise) | |
478 promise.interrupt() | |
479 answer_sdp = answer.sdp.as_text() | |
480 log.info(f"SDP answer set: \n{answer_sdp}") | |
481 self.sdp_set = True | |
482 self._a_call(G.host.a_bridge.call_answer_sdp, self.sid, answer_sdp, self.profile) | |
483 | |
484 def on_offer_set(self, promise): | |
485 assert promise.wait() == Gst.PromiseResult.REPLIED | |
486 promise = Gst.Promise.new_with_change_func(self.on_answer_created, None, None) | |
487 self.webrtc.emit("create-answer", None, promise) | |
488 | 144 |
489 def on_new_sample( | 145 def on_new_sample( |
490 self, | 146 self, |
491 video_sink: Gst.Element, | 147 video_sink: Gst.Element, |
492 update_sample_method: Callable, | 148 update_sample_method: Callable, |
580 video_widget.canvas.ask_update() | 236 video_widget.canvas.ask_update() |
581 buf.unmap(mapinfo) | 237 buf.unmap(mapinfo) |
582 finally: | 238 finally: |
583 if buf is not None and mapinfo is not None: | 239 if buf is not None and mapinfo is not None: |
584 buf.unmap(mapinfo) | 240 buf.unmap(mapinfo) |
585 | |
586 def on_remote_decodebin_stream(self, _, pad: Gst.Pad) -> None: | |
587 """Handle the stream from the remote decodebin. | |
588 | |
589 This method processes the incoming stream from the remote decodebin, determining | |
590 whether it's video or audio. It then sets up the appropriate GStreamer elements | |
591 for video/audio processing and adds them to the pipeline. | |
592 | |
593 @param pad: The Gst.Pad from the remote decodebin producing the stream. | |
594 """ | |
595 assert self.pipeline is not None | |
596 if not pad.has_current_caps(): | |
597 log.error(f"{pad} has no caps, ignoring") | |
598 return | |
599 | |
600 self.pipeline.set_state(Gst.State.PAUSED) | |
601 caps = pad.get_current_caps() | |
602 assert len(caps) | |
603 s = caps[0] | |
604 name = s.get_name() | |
605 log.debug(f"====> NAME START: {name}") | |
606 | |
607 q = Gst.ElementFactory.make("queue") | |
608 q.set_property("max-size-time", 0) | |
609 q.set_property("max-size-bytes", 0) | |
610 q.set_property("max-size-buffers", 5) | |
611 | |
612 if name.startswith("video"): | |
613 log.debug("===> VIDEO OK") | |
614 | |
615 self._remote_video_pad = pad | |
616 | |
617 # Check and log the original size of the video | |
618 width = s.get_int("width").value | |
619 height = s.get_int("height").value | |
620 log.info(f"Original video size: {width}x{height}") | |
621 | |
622 # This is a fix for an issue found with Movim on desktop: a non standard | |
623 # resolution is used (990x557) resulting in bad alignement and no color in | |
624 # rendered image | |
625 adjust_resolution = width % 4 != 0 or height % 4 != 0 | |
626 if adjust_resolution: | |
627 log.info("non standard resolution, we need to adjust size") | |
628 width = (width + 3) // 4 * 4 | |
629 height = (height + 3) // 4 * 4 | |
630 log.info(f"Adjusted video size: {width}x{height}") | |
631 | |
632 conv = Gst.ElementFactory.make("videoconvert") | |
633 remote_video_sink = Gst.ElementFactory.make("appsink") | |
634 | |
635 appsink_caps = Gst.Caps.from_string("video/x-raw,format=RGB") | |
636 remote_video_sink.set_property("caps", appsink_caps) | |
637 | |
638 remote_video_sink.set_property("emit-signals", True) | |
639 remote_video_sink.set_property("drop", True) | |
640 remote_video_sink.set_property("max-buffers", 1) | |
641 remote_video_sink.set_property("sync", True) | |
642 remote_video_sink.connect( | |
643 "new-sample", | |
644 self.on_new_sample, | |
645 self.update_sample, | |
646 self.parent_calls.remote_video, | |
647 ) | |
648 | |
649 if adjust_resolution: | |
650 videoscale = Gst.ElementFactory.make("videoscale") | |
651 adjusted_caps = Gst.Caps.from_string( | |
652 f"video/x-raw,width={width},height={height}" | |
653 ) | |
654 capsfilter = Gst.ElementFactory.make("capsfilter") | |
655 capsfilter.set_property("caps", adjusted_caps) | |
656 | |
657 self.pipeline.add(q, conv, videoscale, capsfilter, remote_video_sink) | |
658 self.pipeline.sync_children_states() | |
659 pad.link(q.get_static_pad("sink")) | |
660 q.link(conv) | |
661 conv.link(videoscale) | |
662 videoscale.link(capsfilter) | |
663 capsfilter.link(remote_video_sink) | |
664 else: | |
665 self.pipeline.add(q, conv, remote_video_sink) | |
666 self.pipeline.sync_children_states() | |
667 pad.link(q.get_static_pad("sink")) | |
668 q.link(conv) | |
669 conv.link(remote_video_sink) | |
670 | |
671 elif name.startswith("audio"): | |
672 log.debug("===> Audio OK") | |
673 conv = Gst.ElementFactory.make("audioconvert") | |
674 resample = Gst.ElementFactory.make("audioresample") | |
675 remote_audio_sink = Gst.ElementFactory.make("autoaudiosink") | |
676 self.pipeline.add(q, conv, resample, remote_audio_sink) | |
677 self.pipeline.sync_children_states() | |
678 pad.link(q.get_static_pad("sink")) | |
679 q.link(conv) | |
680 conv.link(resample) | |
681 resample.link(remote_audio_sink) | |
682 self.pipeline.set_state(Gst.State.PLAYING) | |
683 | |
684 def on_pad_added(self, __, pad: Gst.Pad) -> None: | |
685 """Handle the addition of a new pad to the element. | |
686 | |
687 When a new source pad is added to the element, this method creates a decodebin, | |
688 connects it to handle the stream, and links the pad to the decodebin. | |
689 | |
690 @param __: Placeholder for the signal source. Not used in this method. | |
691 @param pad: The newly added pad. | |
692 """ | |
693 | |
694 log.debug("on_pad_added") | |
695 if pad.direction != Gst.PadDirection.SRC: | |
696 return | |
697 | |
698 decodebin = Gst.ElementFactory.make("decodebin") | |
699 decodebin.connect("pad-added", self.on_remote_decodebin_stream) | |
700 self.pipeline.add(decodebin) | |
701 decodebin.sync_state_with_parent() | |
702 pad.link(decodebin.get_static_pad("sink")) | |
703 | |
704 async def _start_call(self) -> None: | |
705 """Initiate the call. | |
706 | |
707 Initiates a call with the callee using the stored offer. If there are any buffered | |
708 local ICE candidates, they are sent as part of the initiation. | |
709 """ | |
710 assert self.callee is not None | |
711 self.sid = await G.host.a_bridge.call_start( | |
712 str(self.callee), data_format.serialise({"sdp": self.offer}), self.profile | |
713 ) | |
714 if self.local_candidates_buffer: | |
715 log.debug( | |
716 f"sending buffered local ICE candidates: {self.local_candidates_buffer}" | |
717 ) | |
718 if self.pwd is None: | |
719 sdp = self.webrtc.props.local_description.sdp.as_text() | |
720 self.extract_ufrag_pwd(sdp) | |
721 ice_data = {} | |
722 for media_type, candidates in self.local_candidates_buffer.items(): | |
723 ice_data[media_type] = { | |
724 "ufrag": self.ufrag, | |
725 "pwd": self.pwd, | |
726 "candidates": candidates, | |
727 } | |
728 await G.host.a_bridge.ice_candidates_add( | |
729 self.sid, data_format.serialise(ice_data), self.profile | |
730 ) | |
731 self.local_candidates_buffer.clear() | |
732 | |
733 def _remote_sdp_set(self, promise) -> None: | |
734 assert promise.wait() == Gst.PromiseResult.REPLIED | |
735 self.sdp_set = True | |
736 | |
737 def on_accepted_call(self, sdp: str, profile: str) -> None: | |
738 """Outgoing call has been accepted. | |
739 | |
740 @param sdp: The SDP answer string received from the other party. | |
741 @param profile: Profile used for the call. | |
742 """ | |
743 log.debug(f"SDP answer received: \n{sdp}") | |
744 | |
745 __, sdpmsg = GstSdp.SDPMessage.new_from_text(sdp) | |
746 answer = GstWebRTC.WebRTCSessionDescription.new( | |
747 GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg | |
748 ) | |
749 promise = Gst.Promise.new_with_change_func(self._remote_sdp_set) | |
750 self.webrtc.emit("set-remote-description", answer, promise) | |
751 | |
752 async def answer_call(self, sdp: str, profile: str) -> None: | |
753 """Answer an incoming call | |
754 | |
755 @param sdp: The SDP offer string received from the initiator. | |
756 @param profile: Profile used for the call. | |
757 | |
758 @raise AssertionError: Raised when either "VP8" or "OPUS" is not present in | |
759 payload types. | |
760 """ | |
761 log.debug(f"SDP offer received: \n{sdp}") | |
762 self._set_media_types(sdp) | |
763 __, offer_sdp_msg = GstSdp.SDPMessage.new_from_text(sdp) | |
764 payload_types = self.get_payload_types( | |
765 offer_sdp_msg, video_encoding="VP8", audio_encoding="OPUS" | |
766 ) | |
767 assert "VP8" in payload_types | |
768 assert "OPUS" in payload_types | |
769 await self.setup_call( | |
770 "responder", audio_pt=payload_types["OPUS"], video_pt=payload_types["VP8"] | |
771 ) | |
772 self.start_pipeline() | |
773 offer = GstWebRTC.WebRTCSessionDescription.new( | |
774 GstWebRTC.WebRTCSDPType.OFFER, offer_sdp_msg | |
775 ) | |
776 promise = Gst.Promise.new_with_change_func(self.on_offer_set) | |
777 self.webrtc.emit("set-remote-description", offer, promise) | |
778 | |
779 def on_ice_candidate(self, webrtc, mline_index, candidate_sdp): | |
780 """Handles the on-ice-candidate signal of webrtcbin. | |
781 | |
782 @param webrtc: The webrtcbin element. | |
783 @param mlineindex: The mline index. | |
784 @param candidate: The ICE candidate. | |
785 """ | |
786 log.debug( | |
787 f"Local ICE candidate. MLine Index: {mline_index}, Candidate: {candidate_sdp}" | |
788 ) | |
789 parsed_candidate = self.parse_ice_candidate(candidate_sdp) | |
790 try: | |
791 media_type = self.media_types[mline_index] | |
792 except KeyError: | |
793 raise exceptions.InternalError("can't find media type") | |
794 | |
795 if self.sid is None: | |
796 log.debug("buffering local ICE candidate") | |
797 self.local_candidates_buffer.setdefault(media_type, []).append( | |
798 parsed_candidate | |
799 ) | |
800 else: | |
801 sdp = self.webrtc.props.local_description.sdp.as_text() | |
802 assert sdp is not None | |
803 ufrag, pwd = self.extract_ufrag_pwd(sdp) | |
804 ice_data = {"ufrag": ufrag, "pwd": pwd, "candidates": [parsed_candidate]} | |
805 self._a_call( | |
806 G.host.a_bridge.ice_candidates_add, | |
807 self.sid, | |
808 data_format.serialise({media_type: ice_data}), | |
809 self.profile, | |
810 ) | |
811 | |
812 def on_ice_candidates_new(self, candidates: dict) -> None: | |
813 """Handle new ICE candidates. | |
814 | |
815 @param candidates: A dictionary containing media types ("audio" or "video") as | |
816 keys and corresponding ICE data as values. | |
817 | |
818 @raise exceptions.InternalError: Raised when sdp mline index is not found. | |
819 """ | |
820 if not self.sdp_set: | |
821 log.debug("buffering remote ICE candidate") | |
822 for media_type in ("audio", "video"): | |
823 media_candidates = candidates.get(media_type) | |
824 if media_candidates: | |
825 buffer = self.remote_candidates_buffer[media_type] | |
826 buffer["candidates"].extend(media_candidates["candidates"]) | |
827 return | |
828 for media_type, ice_data in candidates.items(): | |
829 for candidate in ice_data["candidates"]: | |
830 candidate_sdp = self.build_ice_candidate(candidate) | |
831 try: | |
832 mline_index = self.get_sdp_mline_index(media_type) | |
833 except Exception as e: | |
834 raise exceptions.InternalError(f"Can't find sdp mline index: {e}") | |
835 self.webrtc.emit("add-ice-candidate", mline_index, candidate_sdp) | |
836 log.debug( | |
837 f"Remote ICE candidate added. MLine Index: {mline_index}, " | |
838 f"{candidate_sdp}" | |
839 ) | |
840 | |
841 def on_ice_gathering_state_change(self, pspec, __): | |
842 state = self.webrtc.get_property("ice-gathering-state") | |
843 log.debug(f"ICE gathering state changed to {state}") | |
844 | |
845 def on_ice_connection_state(self, pspec, __): | |
846 state = self.webrtc.props.ice_connection_state | |
847 if state == GstWebRTC.WebRTCICEConnectionState.FAILED: | |
848 log.error("ICE connection failed") | |
849 log.info(f"ICE connection state changed to {state}") | |
850 | |
851 def on_bus_error(self, bus: Gst.Bus, message: Gst.Message) -> None: | |
852 """Handles the GStreamer bus error messages. | |
853 | |
854 @param bus: The GStreamer bus. | |
855 @param message: The error message. | |
856 """ | |
857 err, debug = message.parse_error() | |
858 log.error(f"Error from {message.src.get_name()}: {err.message}") | |
859 log.error(f"Debugging info: {debug}") | |
860 | |
861 def on_bus_eos(self, bus: Gst.Bus, message: Gst.Message) -> None: | |
862 """Handles the GStreamer bus eos messages. | |
863 | |
864 @param bus: The GStreamer bus. | |
865 @param message: The eos message. | |
866 """ | |
867 log.info("End of stream") | |
868 | |
869 def on_audio_mute(self, muted: bool) -> None: | |
870 if self.audio_valve is not None: | |
871 self.audio_valve.set_property("drop", muted) | |
872 state = "muted" if muted else "unmuted" | |
873 log.info(f"audio is now {state}") | |
874 | |
875 def on_video_mute(self, muted: bool) -> None: | |
876 if self.video_selector is not None: | |
877 # when muted, we switch to a black image and deactivate the camera | |
878 if not muted: | |
879 self.video_src.set_state(Gst.State.PLAYING) | |
880 pad = self.video_selector.get_static_pad("sink_1" if muted else "sink_0") | |
881 self.video_selector.props.active_pad = pad | |
882 if muted: | |
883 self.video_src.set_state(Gst.State.NULL) | |
884 state = "muted" if muted else "unmuted" | |
885 log.info(f"video is now {state}") | |
886 | |
887 async def end_call(self) -> None: | |
888 """Stop streaming and clean instance""" | |
889 self.reset_instance() | |
890 | 241 |
891 | 242 |
892 class Calls( | 243 class Calls( |
893 quick_widgets.QuickWidget, | 244 quick_widgets.QuickWidget, |
894 cagou_widget.LiberviaDesktopKivyWidget, | 245 cagou_widget.LiberviaDesktopKivyWidget, |
916 parent_widget=self, on_press=lambda *__: aio.run_async(self.toggle_call()) | 267 parent_widget=self, on_press=lambda *__: aio.run_async(self.toggle_call()) |
917 ) | 268 ) |
918 self.header_input_add_extra(call_btn) | 269 self.header_input_add_extra(call_btn) |
919 self.webrtc = WebRTC(self, self.profile) | 270 self.webrtc = WebRTC(self, self.profile) |
920 self.previous_fullscreen = None | 271 self.previous_fullscreen = None |
921 self.bind( | |
922 audio_muted=lambda __, value: self.webrtc.on_audio_mute(value), | |
923 video_muted=lambda __, value: self.webrtc.on_video_mute(value), | |
924 ) | |
925 self.reset_instance() | 272 self.reset_instance() |
926 | 273 |
927 @property | 274 @property |
928 def sid(self): | 275 def sid(self): |
929 return self.webrtc.sid | 276 return self.webrtc.sid |
1026 self.screen_manager.current = "call" | 373 self.screen_manager.current = "call" |
1027 else: | 374 else: |
1028 self.fullscreen = False | 375 self.fullscreen = False |
1029 self.screen_manager.transition.direction = "down" | 376 self.screen_manager.transition.direction = "down" |
1030 self.screen_manager.current = "search" | 377 self.screen_manager.current = "search" |
378 | |
379 def on_audio_muted(self, instance, muted: bool) -> None: | |
380 self.webrtc.audio_muted = muted | |
381 | |
382 def on_video_muted(self, instance, muted: bool) -> None: | |
383 self.webrtc.video_muted = muted | |
1031 | 384 |
1032 def on_fullscreen(self, instance, fullscreen: bool) -> None: | 385 def on_fullscreen(self, instance, fullscreen: bool) -> None: |
1033 if fullscreen: | 386 if fullscreen: |
1034 G.host.app.show_head_widget(False, animation=False) | 387 G.host.app.show_head_widget(False, animation=False) |
1035 self.call_layout.parent.remove_widget(self.call_layout) | 388 self.call_layout.parent.remove_widget(self.call_layout) |