comparison libervia/desktop_kivy/plugins/plugin_wid_calls.py @ 509:f0ce49b360c8

calls: move webrtc code to core: WebRTC code which can be used in several frontends has been factorized and moved to common `frontends.tools`. Test have been updated consequently. rel 426
author Goffi <goffi@goffi.org>
date Wed, 01 Nov 2023 13:41:07 +0100
parents 0480f883f0a6
children 97ab236e8f20
comparison
equal deleted inserted replaced
508:d87b9a6b0b69 509:f0ce49b360c8
1 from dataclasses import dataclass 1 from dataclasses import dataclass
2 from pathlib import Path 2 from pathlib import Path
3 import re
4 from typing import Optional, Callable 3 from typing import Optional, Callable
5 from urllib.parse import quote_plus
6 from functools import partial 4 from functools import partial
7 5
8 # from gi.repository import GLib 6 # from gi.repository import GLib
9 from gi.repository import GObject, Gst, GstWebRTC, GstSdp 7 from gi.repository import GObject, Gst, GstWebRTC, GstSdp
10 8
35 from libervia.backend.core import log as logging 33 from libervia.backend.core import log as logging
36 from libervia.backend.core.i18n import _ 34 from libervia.backend.core.i18n import _
37 from libervia.backend.core import exceptions 35 from libervia.backend.core import exceptions
38 from libervia.backend.tools.common import data_format 36 from libervia.backend.tools.common import data_format
39 from libervia.frontends.quick_frontend import quick_widgets 37 from libervia.frontends.quick_frontend import quick_widgets
40 from libervia.frontends.tools import jid, aio 38 from libervia.frontends.tools import aio, jid, webrtc
41 39
42 from libervia.desktop_kivy import G 40 from libervia.desktop_kivy import G
43 41
44 from ..core import cagou_widget 42 from ..core import cagou_widget
45 from ..core import common 43 from ..core import common
101 microphone available will be used. 99 microphone available will be used.
102 """ 100 """
103 101
104 test_mode: bool = False 102 test_mode: bool = False
105 103
104 PROXIED_PROPERTIES = {'audio_muted', 'callee', 'sid', 'video_muted'}
105 PROXIED_METHODS = {'answer_call', 'end_call', 'on_accepted_call', 'on_ice_candidates_new', 'setup_call', 'start_pipeline'}
106
106 def __init__(self, parent_calls: "Calls", profile: str) -> None: 107 def __init__(self, parent_calls: "Calls", profile: str) -> None:
107 self.parent_calls = parent_calls 108 self.parent_calls = parent_calls
108 self.profile = profile 109 self.profile = profile
110 self.webrtc = webrtc.WebRTC(
111 G.host.a_bridge,
112 profile,
113 sinks=webrtc.SINKS_TEST if self.test_mode else webrtc.SINKS_APP,
114 appsink_data=webrtc.AppSinkData(
115 local_video_cb=partial(
116 self.on_new_sample,
117 update_sample_method=self.update_sample,
118 video_widget=self.parent_calls.local_video
119 ),
120 remote_video_cb=partial(
121 self.on_new_sample,
122 update_sample_method=self.update_sample,
123 video_widget=self.parent_calls.remote_video
124 )
125 ),
126 reset_cb=self.on_reset
127
128 )
109 self.pipeline = None 129 self.pipeline = None
110 self.reset_instance() 130
111 131 def __getattr__(self, name):
112 @property 132 if name in self.PROXIED_PROPERTIES or name in self.PROXIED_METHODS:
113 def sdp_set(self): 133 return getattr(self.webrtc, name)
114 return self._sdp_set 134 raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'")
115 135
116 @sdp_set.setter 136 def __setattr__(self, name, value):
117 def sdp_set(self, is_set: bool): 137 if name in self.PROXIED_PROPERTIES:
118 self._sdp_set = is_set 138 setattr(self.webrtc, name, value)
119 if is_set:
120 self.on_ice_candidates_new(self.remote_candidates_buffer)
121 for data in self.remote_candidates_buffer.values():
122 data["candidates"].clear()
123
124 @property
125 def media_types(self):
126 if self._media_types is None:
127 raise Exception("self._media_types should not be None!")
128 return self._media_types
129
130 @media_types.setter
131 def media_types(self, new_media_types: dict) -> None:
132 self._media_types = new_media_types
133 self._media_types_inv = {v: k for k, v in new_media_types.items()}
134
135 @property
136 def media_types_inv(self) -> dict:
137 if self._media_types_inv is None:
138 raise Exception("self._media_types_inv should not be None!")
139 return self._media_types_inv
140
141 def get_sdp_mline_index(self, media_type: str) -> int:
142 """Gets the sdpMLineIndex for a given media type.
143
144 @param media_type: The type of the media.
145 """
146 for index, m_type in self.media_types.items():
147 if m_type == media_type:
148 return index
149 raise ValueError(f"Media type '{media_type}' not found")
150
151 def _set_media_types(self, offer_sdp: str) -> None:
152 """Sets media types from offer SDP
153
154 @param offer: RTC session description containing the offer
155 """
156 sdp_lines = offer_sdp.splitlines()
157 media_types = {}
158 mline_index = 0
159
160 for line in sdp_lines:
161 if line.startswith("m="):
162 media_types[mline_index] = line[2 : line.find(" ")]
163 mline_index += 1
164
165 self.media_types = media_types
166
167 def _a_call(self, method, *args, **kwargs):
168 """Call an async method in main thread"""
169
170 def wrapper(__):
171 aio.run_async(method(*args, **kwargs))
172 return False
173
174 Clock.schedule_once(wrapper)
175
176 def get_payload_types(
177 self, sdpmsg, video_encoding: str, audio_encoding: str
178 ) -> dict[str, int | None]:
179 """Find the payload types for the specified video and audio encoding.
180
181 Very simplistically finds the first payload type matching the encoding
182 name. More complex applications will want to match caps on
183 profile-level-id, packetization-mode, etc.
184 """
185 # method coming from gstreamer example (Matthew Waters, Nirbheek Chauhan) at
186 # subprojects/gst-examples/webrtc/sendrecv/gst/webrtc_sendrecv.py
187 video_pt = None
188 audio_pt = None
189 for i in range(0, sdpmsg.medias_len()):
190 media = sdpmsg.get_media(i)
191 for j in range(0, media.formats_len()):
192 fmt = media.get_format(j)
193 if fmt == "webrtc-datachannel":
194 continue
195 pt = int(fmt)
196 caps = media.get_caps_from_media(pt)
197 s = caps.get_structure(0)
198 encoding_name = s["encoding-name"]
199 if video_pt is None and encoding_name == video_encoding:
200 video_pt = pt
201 elif audio_pt is None and encoding_name == audio_encoding:
202 audio_pt = pt
203 return {video_encoding: video_pt, audio_encoding: audio_pt}
204
205 def parse_ice_candidate(self, candidate_string):
206 """Parses the ice candidate string.
207
208 @param candidate_string: The ice candidate string to be parsed.
209 """
210 pattern = re.compile(
211 r"candidate:(?P<foundation>\S+) (?P<component_id>\d+) (?P<transport>\S+) "
212 r"(?P<priority>\d+) (?P<address>\S+) (?P<port>\d+) typ "
213 r"(?P<type>\S+)(?: raddr (?P<rel_addr>\S+) rport "
214 r"(?P<rel_port>\d+))?(?: generation (?P<generation>\d+))?"
215 )
216 match = pattern.match(candidate_string)
217 if match:
218 candidate_dict = match.groupdict()
219
220 # Apply the correct types to the dictionary values
221 candidate_dict["component_id"] = int(candidate_dict["component_id"])
222 candidate_dict["priority"] = int(candidate_dict["priority"])
223 candidate_dict["port"] = int(candidate_dict["port"])
224
225 if candidate_dict["rel_port"]:
226 candidate_dict["rel_port"] = int(candidate_dict["rel_port"])
227
228 if candidate_dict["generation"]:
229 candidate_dict["generation"] = candidate_dict["generation"]
230
231 # Remove None values
232 return {k: v for k, v in candidate_dict.items() if v is not None}
233 else: 139 else:
234 log.warning(f"can't parse candidate: {candidate_string!r}") 140 super().__setattr__(name, value)
235 return None 141
236 142 def on_reset(self):
237 def build_ice_candidate(self, parsed_candidate):
238 """Builds ICE candidate
239
240 @param parsed_candidate: Dictionary containing parsed ICE candidate
241 """
242 base_format = (
243 "candidate:{foundation} {component_id} {transport} {priority} "
244 "{address} {port} typ {type}"
245 )
246
247 if parsed_candidate.get("rel_addr") and parsed_candidate.get("rel_port"):
248 base_format += " raddr {rel_addr} rport {rel_port}"
249
250 if parsed_candidate.get("generation"):
251 base_format += " generation {generation}"
252
253 return base_format.format(**parsed_candidate)
254
255 def extract_ufrag_pwd(self, sdp: str) -> tuple[str, str]:
256 """Retrieves ICE password and user fragment for SDP offer.
257
258 @param sdp: The Session Description Protocol offer string.
259 @return: ufrag and pwd
260 @raise ValueError: Can't extract ufrag and password
261 """
262 ufrag_line = re.search(r"ice-ufrag:(\S+)", sdp)
263 pwd_line = re.search(r"ice-pwd:(\S+)", sdp)
264
265 if ufrag_line and pwd_line:
266 ufrag = self.ufrag = ufrag_line.group(1)
267 pwd = self.pwd = pwd_line.group(1)
268 return ufrag, pwd
269 else:
270 log.error(f"SDP with missing ice-ufrag or ice-pwd:\n{sdp}")
271 raise ValueError("Can't extract ice-ufrag and ice-pwd from SDP")
272
273 def reset_instance(self):
274 """Inits or resets the instance variables to their default state."""
275 self.role: str | None = None
276 if self.pipeline is not None:
277 self.pipeline.set_state(Gst.State.NULL)
278 self.pipeline = None
279 self.texture_map: dict[VideoStreamWidget, TextureData] = {} 143 self.texture_map: dict[VideoStreamWidget, TextureData] = {}
280 self._remote_video_pad = None
281 self.sid: str | None = None
282 self.offer: str | None = None
283 self.local_candidates_buffer = {}
284 self.ufrag: str | None = None
285 self.pwd: str | None = None
286 self.callee: str | None = None
287 self._media_types = None
288 self._media_types_inv = None
289 self._sdp_set: bool = False
290 self.remote_candidates_buffer: dict[str, dict[str, list]] = {
291 "audio": {"candidates": []},
292 "video": {"candidates": []},
293 }
294 self._media_types = None
295 self._media_types_inv = None
296 self.audio_valve = None
297 self.video_valve = None
298
299 async def setup_call(
300 self,
301 role: str,
302 audio_pt: int | None = 96,
303 video_pt: int | None = 97,
304 ) -> None:
305 """Sets up the call.
306
307 This method establishes the Gstreamer pipeline for audio and video communication.
308 The method also manages STUN and TURN server configurations, signal watchers, and
309 various connection handlers for the webrtcbin.
310
311 @param role: The role for the call, either 'initiator' or 'responder'.
312 @param audio_pt: The payload type for the audio stream.
313 @param video_pt: The payload type for the video stream
314
315 @raises NotImplementedError: If audio_pt or video_pt is set to None.
316 @raises AssertionError: If the role is not 'initiator' or 'responder'.
317 """
318 assert role in ("initiator", "responder")
319 self.role = role
320 if audio_pt is None or video_pt is None:
321 raise NotImplementedError("None value is not handled yet")
322
323 if self.test_mode:
324 video_source_elt = "videotestsrc is-live=true pattern=ball"
325 audio_source_elt = "audiotestsrc"
326 else:
327 video_source_elt = "v4l2src"
328 audio_source_elt = "pulsesrc"
329
330 self.gst_pipe_desc = f"""
331 webrtcbin latency=100 name=sendrecv bundle-policy=max-compat
332
333 input-selector name=video_selector
334 ! videorate
335 ! video/x-raw,framerate=30/1
336 ! tee name=t
337
338 {video_source_elt} name=video_src ! queue ! video_selector.
339 videotestsrc is-live=true pattern=black ! queue ! video_selector.
340
341 t.
342 ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream
343 ! videoconvert
344 ! vp8enc deadline=1 keyframe-max-dist=60
345 ! rtpvp8pay picture-id-mode=15-bit
346 ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt}
347 ! sendrecv.
348
349 t.
350 ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0
351 ! videoconvert
352 ! appsink name=local_video_sink emit-signals=true drop=true max-buffers=1 sync=True
353
354 {audio_source_elt} name=audio_src
355 ! valve name=audio_valve
356 ! queue max-size-buffers=10 max-size-time=0 max-size-bytes=0
357 ! audioconvert
358 ! audioresample
359 ! opusenc audio-type=voice
360 ! rtpopuspay
361 ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt}
362 ! sendrecv.
363 """
364
365 log.debug(f"Gstreamer pipeline: {self.gst_pipe_desc}")
366
367 # Create the pipeline
368 self.pipeline = Gst.parse_launch(self.gst_pipe_desc)
369 if not self.pipeline:
370 raise exceptions.InternalError("Failed to create Gstreamer pipeline.")
371
372 self.webrtc = self.pipeline.get_by_name("sendrecv")
373 self.video_src = self.pipeline.get_by_name("video_src")
374 self.video_selector = self.pipeline.get_by_name("video_selector")
375 self.audio_valve = self.pipeline.get_by_name("audio_valve")
376
377 if self.parent_calls.video_muted:
378 self.on_video_mute(True)
379 if self.parent_calls.audio_muted:
380 self.on_audio_mute(True)
381
382 # set STUN and TURN servers
383 external_disco = data_format.deserialise(
384 await G.host.a_bridge.external_disco_get("", self.profile), type_check=list
385 )
386
387 for server in external_disco:
388 if server["type"] == "stun":
389 if server["transport"] == "tcp":
390 log.info(
391 "ignoring TCP STUN server, GStreamer only support one STUN server"
392 )
393 url = f"stun://{server['host']}:{server['port']}"
394 log.debug(f"adding stun server: {url}")
395 self.webrtc.set_property("stun-server", url)
396 elif server["type"] == "turn":
397 url = "{scheme}://{username}:{password}@{host}:{port}".format(
398 scheme="turns" if server["transport"] == "tcp" else "turn",
399 username=quote_plus(server["username"]),
400 password=quote_plus(server["password"]),
401 host=server["host"],
402 port=server["port"],
403 )
404 log.debug(f"adding turn server: {url}")
405
406 if not self.webrtc.emit("add-turn-server", url):
407 log.warning(f"Erreur while adding TURN server {url}")
408
409 # local video feedback
410 local_video_sink = self.pipeline.get_by_name("local_video_sink")
411 local_video_sink.set_property("emit-signals", True)
412 local_video_sink.connect(
413 "new-sample",
414 self.on_new_sample,
415 self.update_sample,
416 self.parent_calls.local_video,
417 )
418 local_video_sink_caps = Gst.Caps.from_string(f"video/x-raw,format=RGB")
419 local_video_sink.set_property("caps", local_video_sink_caps)
420
421 # Create bus and associate signal watchers
422 self.bus = self.pipeline.get_bus()
423 if not self.bus:
424 log.error("Failed to get bus from pipeline.")
425 return
426
427 self.bus.add_signal_watch()
428 self.webrtc.connect("pad-added", self.on_pad_added)
429 self.bus.connect("message::error", self.on_bus_error)
430 self.bus.connect("message::eos", self.on_bus_eos)
431 self.webrtc.connect("on-negotiation-needed", self.on_negotiation_needed)
432 self.webrtc.connect("on-ice-candidate", self.on_ice_candidate)
433 self.webrtc.connect(
434 "notify::ice-gathering-state", self.on_ice_gathering_state_change
435 )
436 self.webrtc.connect("notify::ice-connection-state", self.on_ice_connection_state)
437
438 def start_pipeline(self) -> None:
439 """Starts the GStreamer pipeline."""
440 log.debug("starting the pipeline")
441 self.pipeline.set_state(Gst.State.PLAYING)
442
443 def on_negotiation_needed(self, webrtc):
444 """Initiate SDP offer when negotiation is needed."""
445 log.debug("Negotiation needed.")
446 if self.role == "initiator":
447 log.debug("Creating offer…")
448 promise = Gst.Promise.new_with_change_func(self.on_offer_created)
449 self.webrtc.emit("create-offer", None, promise)
450
451 def on_offer_created(self, promise):
452 """Callback for when SDP offer is created."""
453 log.info("on_offer_created called")
454 assert promise.wait() == Gst.PromiseResult.REPLIED
455 reply = promise.get_reply()
456 if reply is None:
457 log.error("Promise reply is None. Offer creation might have failed.")
458 return
459 offer = reply["offer"]
460 self.offer = offer.sdp.as_text()
461 log.info(f"SDP offer created: \n{self.offer}")
462 self._set_media_types(self.offer)
463 promise = Gst.Promise.new()
464 self.webrtc.emit("set-local-description", offer, promise)
465 promise.interrupt()
466 self._a_call(self._start_call)
467
468 def on_answer_set(self, promise):
469 assert promise.wait() == Gst.PromiseResult.REPLIED
470
471 def on_answer_created(self, promise, _, __):
472 """Callback for when SDP answer is created."""
473 assert promise.wait() == Gst.PromiseResult.REPLIED
474 reply = promise.get_reply()
475 answer = reply["answer"]
476 promise = Gst.Promise.new()
477 self.webrtc.emit("set-local-description", answer, promise)
478 promise.interrupt()
479 answer_sdp = answer.sdp.as_text()
480 log.info(f"SDP answer set: \n{answer_sdp}")
481 self.sdp_set = True
482 self._a_call(G.host.a_bridge.call_answer_sdp, self.sid, answer_sdp, self.profile)
483
484 def on_offer_set(self, promise):
485 assert promise.wait() == Gst.PromiseResult.REPLIED
486 promise = Gst.Promise.new_with_change_func(self.on_answer_created, None, None)
487 self.webrtc.emit("create-answer", None, promise)
488 144
489 def on_new_sample( 145 def on_new_sample(
490 self, 146 self,
491 video_sink: Gst.Element, 147 video_sink: Gst.Element,
492 update_sample_method: Callable, 148 update_sample_method: Callable,
580 video_widget.canvas.ask_update() 236 video_widget.canvas.ask_update()
581 buf.unmap(mapinfo) 237 buf.unmap(mapinfo)
582 finally: 238 finally:
583 if buf is not None and mapinfo is not None: 239 if buf is not None and mapinfo is not None:
584 buf.unmap(mapinfo) 240 buf.unmap(mapinfo)
585
586 def on_remote_decodebin_stream(self, _, pad: Gst.Pad) -> None:
587 """Handle the stream from the remote decodebin.
588
589 This method processes the incoming stream from the remote decodebin, determining
590 whether it's video or audio. It then sets up the appropriate GStreamer elements
591 for video/audio processing and adds them to the pipeline.
592
593 @param pad: The Gst.Pad from the remote decodebin producing the stream.
594 """
595 assert self.pipeline is not None
596 if not pad.has_current_caps():
597 log.error(f"{pad} has no caps, ignoring")
598 return
599
600 self.pipeline.set_state(Gst.State.PAUSED)
601 caps = pad.get_current_caps()
602 assert len(caps)
603 s = caps[0]
604 name = s.get_name()
605 log.debug(f"====> NAME START: {name}")
606
607 q = Gst.ElementFactory.make("queue")
608 q.set_property("max-size-time", 0)
609 q.set_property("max-size-bytes", 0)
610 q.set_property("max-size-buffers", 5)
611
612 if name.startswith("video"):
613 log.debug("===> VIDEO OK")
614
615 self._remote_video_pad = pad
616
617 # Check and log the original size of the video
618 width = s.get_int("width").value
619 height = s.get_int("height").value
620 log.info(f"Original video size: {width}x{height}")
621
622 # This is a fix for an issue found with Movim on desktop: a non standard
623 # resolution is used (990x557) resulting in bad alignement and no color in
624 # rendered image
625 adjust_resolution = width % 4 != 0 or height % 4 != 0
626 if adjust_resolution:
627 log.info("non standard resolution, we need to adjust size")
628 width = (width + 3) // 4 * 4
629 height = (height + 3) // 4 * 4
630 log.info(f"Adjusted video size: {width}x{height}")
631
632 conv = Gst.ElementFactory.make("videoconvert")
633 remote_video_sink = Gst.ElementFactory.make("appsink")
634
635 appsink_caps = Gst.Caps.from_string("video/x-raw,format=RGB")
636 remote_video_sink.set_property("caps", appsink_caps)
637
638 remote_video_sink.set_property("emit-signals", True)
639 remote_video_sink.set_property("drop", True)
640 remote_video_sink.set_property("max-buffers", 1)
641 remote_video_sink.set_property("sync", True)
642 remote_video_sink.connect(
643 "new-sample",
644 self.on_new_sample,
645 self.update_sample,
646 self.parent_calls.remote_video,
647 )
648
649 if adjust_resolution:
650 videoscale = Gst.ElementFactory.make("videoscale")
651 adjusted_caps = Gst.Caps.from_string(
652 f"video/x-raw,width={width},height={height}"
653 )
654 capsfilter = Gst.ElementFactory.make("capsfilter")
655 capsfilter.set_property("caps", adjusted_caps)
656
657 self.pipeline.add(q, conv, videoscale, capsfilter, remote_video_sink)
658 self.pipeline.sync_children_states()
659 pad.link(q.get_static_pad("sink"))
660 q.link(conv)
661 conv.link(videoscale)
662 videoscale.link(capsfilter)
663 capsfilter.link(remote_video_sink)
664 else:
665 self.pipeline.add(q, conv, remote_video_sink)
666 self.pipeline.sync_children_states()
667 pad.link(q.get_static_pad("sink"))
668 q.link(conv)
669 conv.link(remote_video_sink)
670
671 elif name.startswith("audio"):
672 log.debug("===> Audio OK")
673 conv = Gst.ElementFactory.make("audioconvert")
674 resample = Gst.ElementFactory.make("audioresample")
675 remote_audio_sink = Gst.ElementFactory.make("autoaudiosink")
676 self.pipeline.add(q, conv, resample, remote_audio_sink)
677 self.pipeline.sync_children_states()
678 pad.link(q.get_static_pad("sink"))
679 q.link(conv)
680 conv.link(resample)
681 resample.link(remote_audio_sink)
682 self.pipeline.set_state(Gst.State.PLAYING)
683
684 def on_pad_added(self, __, pad: Gst.Pad) -> None:
685 """Handle the addition of a new pad to the element.
686
687 When a new source pad is added to the element, this method creates a decodebin,
688 connects it to handle the stream, and links the pad to the decodebin.
689
690 @param __: Placeholder for the signal source. Not used in this method.
691 @param pad: The newly added pad.
692 """
693
694 log.debug("on_pad_added")
695 if pad.direction != Gst.PadDirection.SRC:
696 return
697
698 decodebin = Gst.ElementFactory.make("decodebin")
699 decodebin.connect("pad-added", self.on_remote_decodebin_stream)
700 self.pipeline.add(decodebin)
701 decodebin.sync_state_with_parent()
702 pad.link(decodebin.get_static_pad("sink"))
703
704 async def _start_call(self) -> None:
705 """Initiate the call.
706
707 Initiates a call with the callee using the stored offer. If there are any buffered
708 local ICE candidates, they are sent as part of the initiation.
709 """
710 assert self.callee is not None
711 self.sid = await G.host.a_bridge.call_start(
712 str(self.callee), data_format.serialise({"sdp": self.offer}), self.profile
713 )
714 if self.local_candidates_buffer:
715 log.debug(
716 f"sending buffered local ICE candidates: {self.local_candidates_buffer}"
717 )
718 if self.pwd is None:
719 sdp = self.webrtc.props.local_description.sdp.as_text()
720 self.extract_ufrag_pwd(sdp)
721 ice_data = {}
722 for media_type, candidates in self.local_candidates_buffer.items():
723 ice_data[media_type] = {
724 "ufrag": self.ufrag,
725 "pwd": self.pwd,
726 "candidates": candidates,
727 }
728 await G.host.a_bridge.ice_candidates_add(
729 self.sid, data_format.serialise(ice_data), self.profile
730 )
731 self.local_candidates_buffer.clear()
732
733 def _remote_sdp_set(self, promise) -> None:
734 assert promise.wait() == Gst.PromiseResult.REPLIED
735 self.sdp_set = True
736
737 def on_accepted_call(self, sdp: str, profile: str) -> None:
738 """Outgoing call has been accepted.
739
740 @param sdp: The SDP answer string received from the other party.
741 @param profile: Profile used for the call.
742 """
743 log.debug(f"SDP answer received: \n{sdp}")
744
745 __, sdpmsg = GstSdp.SDPMessage.new_from_text(sdp)
746 answer = GstWebRTC.WebRTCSessionDescription.new(
747 GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg
748 )
749 promise = Gst.Promise.new_with_change_func(self._remote_sdp_set)
750 self.webrtc.emit("set-remote-description", answer, promise)
751
752 async def answer_call(self, sdp: str, profile: str) -> None:
753 """Answer an incoming call
754
755 @param sdp: The SDP offer string received from the initiator.
756 @param profile: Profile used for the call.
757
758 @raise AssertionError: Raised when either "VP8" or "OPUS" is not present in
759 payload types.
760 """
761 log.debug(f"SDP offer received: \n{sdp}")
762 self._set_media_types(sdp)
763 __, offer_sdp_msg = GstSdp.SDPMessage.new_from_text(sdp)
764 payload_types = self.get_payload_types(
765 offer_sdp_msg, video_encoding="VP8", audio_encoding="OPUS"
766 )
767 assert "VP8" in payload_types
768 assert "OPUS" in payload_types
769 await self.setup_call(
770 "responder", audio_pt=payload_types["OPUS"], video_pt=payload_types["VP8"]
771 )
772 self.start_pipeline()
773 offer = GstWebRTC.WebRTCSessionDescription.new(
774 GstWebRTC.WebRTCSDPType.OFFER, offer_sdp_msg
775 )
776 promise = Gst.Promise.new_with_change_func(self.on_offer_set)
777 self.webrtc.emit("set-remote-description", offer, promise)
778
779 def on_ice_candidate(self, webrtc, mline_index, candidate_sdp):
780 """Handles the on-ice-candidate signal of webrtcbin.
781
782 @param webrtc: The webrtcbin element.
783 @param mlineindex: The mline index.
784 @param candidate: The ICE candidate.
785 """
786 log.debug(
787 f"Local ICE candidate. MLine Index: {mline_index}, Candidate: {candidate_sdp}"
788 )
789 parsed_candidate = self.parse_ice_candidate(candidate_sdp)
790 try:
791 media_type = self.media_types[mline_index]
792 except KeyError:
793 raise exceptions.InternalError("can't find media type")
794
795 if self.sid is None:
796 log.debug("buffering local ICE candidate")
797 self.local_candidates_buffer.setdefault(media_type, []).append(
798 parsed_candidate
799 )
800 else:
801 sdp = self.webrtc.props.local_description.sdp.as_text()
802 assert sdp is not None
803 ufrag, pwd = self.extract_ufrag_pwd(sdp)
804 ice_data = {"ufrag": ufrag, "pwd": pwd, "candidates": [parsed_candidate]}
805 self._a_call(
806 G.host.a_bridge.ice_candidates_add,
807 self.sid,
808 data_format.serialise({media_type: ice_data}),
809 self.profile,
810 )
811
812 def on_ice_candidates_new(self, candidates: dict) -> None:
813 """Handle new ICE candidates.
814
815 @param candidates: A dictionary containing media types ("audio" or "video") as
816 keys and corresponding ICE data as values.
817
818 @raise exceptions.InternalError: Raised when sdp mline index is not found.
819 """
820 if not self.sdp_set:
821 log.debug("buffering remote ICE candidate")
822 for media_type in ("audio", "video"):
823 media_candidates = candidates.get(media_type)
824 if media_candidates:
825 buffer = self.remote_candidates_buffer[media_type]
826 buffer["candidates"].extend(media_candidates["candidates"])
827 return
828 for media_type, ice_data in candidates.items():
829 for candidate in ice_data["candidates"]:
830 candidate_sdp = self.build_ice_candidate(candidate)
831 try:
832 mline_index = self.get_sdp_mline_index(media_type)
833 except Exception as e:
834 raise exceptions.InternalError(f"Can't find sdp mline index: {e}")
835 self.webrtc.emit("add-ice-candidate", mline_index, candidate_sdp)
836 log.debug(
837 f"Remote ICE candidate added. MLine Index: {mline_index}, "
838 f"{candidate_sdp}"
839 )
840
841 def on_ice_gathering_state_change(self, pspec, __):
842 state = self.webrtc.get_property("ice-gathering-state")
843 log.debug(f"ICE gathering state changed to {state}")
844
845 def on_ice_connection_state(self, pspec, __):
846 state = self.webrtc.props.ice_connection_state
847 if state == GstWebRTC.WebRTCICEConnectionState.FAILED:
848 log.error("ICE connection failed")
849 log.info(f"ICE connection state changed to {state}")
850
851 def on_bus_error(self, bus: Gst.Bus, message: Gst.Message) -> None:
852 """Handles the GStreamer bus error messages.
853
854 @param bus: The GStreamer bus.
855 @param message: The error message.
856 """
857 err, debug = message.parse_error()
858 log.error(f"Error from {message.src.get_name()}: {err.message}")
859 log.error(f"Debugging info: {debug}")
860
861 def on_bus_eos(self, bus: Gst.Bus, message: Gst.Message) -> None:
862 """Handles the GStreamer bus eos messages.
863
864 @param bus: The GStreamer bus.
865 @param message: The eos message.
866 """
867 log.info("End of stream")
868
869 def on_audio_mute(self, muted: bool) -> None:
870 if self.audio_valve is not None:
871 self.audio_valve.set_property("drop", muted)
872 state = "muted" if muted else "unmuted"
873 log.info(f"audio is now {state}")
874
875 def on_video_mute(self, muted: bool) -> None:
876 if self.video_selector is not None:
877 # when muted, we switch to a black image and deactivate the camera
878 if not muted:
879 self.video_src.set_state(Gst.State.PLAYING)
880 pad = self.video_selector.get_static_pad("sink_1" if muted else "sink_0")
881 self.video_selector.props.active_pad = pad
882 if muted:
883 self.video_src.set_state(Gst.State.NULL)
884 state = "muted" if muted else "unmuted"
885 log.info(f"video is now {state}")
886
887 async def end_call(self) -> None:
888 """Stop streaming and clean instance"""
889 self.reset_instance()
890 241
891 242
892 class Calls( 243 class Calls(
893 quick_widgets.QuickWidget, 244 quick_widgets.QuickWidget,
894 cagou_widget.LiberviaDesktopKivyWidget, 245 cagou_widget.LiberviaDesktopKivyWidget,
916 parent_widget=self, on_press=lambda *__: aio.run_async(self.toggle_call()) 267 parent_widget=self, on_press=lambda *__: aio.run_async(self.toggle_call())
917 ) 268 )
918 self.header_input_add_extra(call_btn) 269 self.header_input_add_extra(call_btn)
919 self.webrtc = WebRTC(self, self.profile) 270 self.webrtc = WebRTC(self, self.profile)
920 self.previous_fullscreen = None 271 self.previous_fullscreen = None
921 self.bind(
922 audio_muted=lambda __, value: self.webrtc.on_audio_mute(value),
923 video_muted=lambda __, value: self.webrtc.on_video_mute(value),
924 )
925 self.reset_instance() 272 self.reset_instance()
926 273
927 @property 274 @property
928 def sid(self): 275 def sid(self):
929 return self.webrtc.sid 276 return self.webrtc.sid
1026 self.screen_manager.current = "call" 373 self.screen_manager.current = "call"
1027 else: 374 else:
1028 self.fullscreen = False 375 self.fullscreen = False
1029 self.screen_manager.transition.direction = "down" 376 self.screen_manager.transition.direction = "down"
1030 self.screen_manager.current = "search" 377 self.screen_manager.current = "search"
378
379 def on_audio_muted(self, instance, muted: bool) -> None:
380 self.webrtc.audio_muted = muted
381
382 def on_video_muted(self, instance, muted: bool) -> None:
383 self.webrtc.video_muted = muted
1031 384
1032 def on_fullscreen(self, instance, fullscreen: bool) -> None: 385 def on_fullscreen(self, instance, fullscreen: bool) -> None:
1033 if fullscreen: 386 if fullscreen:
1034 G.host.app.show_head_widget(False, animation=False) 387 G.host.app.show_head_widget(False, animation=False)
1035 self.call_layout.parent.remove_widget(self.call_layout) 388 self.call_layout.parent.remove_widget(self.call_layout)