comparison libervia/desktop_kivy/plugins/plugin_wid_calls.py @ 499:f387992d8e37

plugins: new "call" plugin for A/V calls: this is the base implementation for calls plugin, handling one2one calls. For now, the interface is very basic, call is done by specifying the bare jid of the destinee, then press the "call" button. Incoming calls are automatically accepted. rel 424
author Goffi <goffi@goffi.org>
date Wed, 04 Oct 2023 22:54:36 +0200
parents
children 0480f883f0a6
comparison
equal deleted inserted replaced
498:3b627382e681 499:f387992d8e37
1 from dataclasses import dataclass
2 import re
3 from typing import Optional, Callable
4 from urllib.parse import quote_plus
5 from functools import partial
6
7 # from gi.repository import GLib
8 from gi.repository import GObject, Gst, GstWebRTC, GstSdp
9
10 try:
11 from gi.overrides import Gst as _
12 except ImportError:
13 print(
14 "no GStreamer python overrides available, please install relevant pacakges on "
15 "your system."
16 )
17 from kivy.clock import Clock
18 from kivy.graphics.texture import Texture
19 from kivy.properties import BooleanProperty, ObjectProperty
20 from kivy.support import install_gobject_iteration
21 from kivy.uix.button import Button
22 from kivy.uix.image import Image
23 from libervia.backend.core.constants import Const as C
24 from libervia.backend.core import log as logging
25 from libervia.backend.core.i18n import _
26 from libervia.backend.core import exceptions
27 from libervia.backend.tools.common import data_format
28 from libervia.frontends.quick_frontend import quick_widgets
29 from libervia.frontends.tools import jid, aio
30
31 from libervia.desktop_kivy import G
32
33 from ..core import cagou_widget
34
35 log = logging.getLogger(__name__)
36
37 install_gobject_iteration()
38
39 Gst.init(None)
40
41
42 PLUGIN_INFO = {
43 "name": _("calls"),
44 "main": "Calls",
45 "description": _("Audio/Video calls"),
46 "icon_symbol": "phone",
47 }
48
49
50 @dataclass
51 class TextureData:
52 texture: Optional[Texture] = None
53 size: Optional[tuple[int, int]] = None
54
55
56 class CallButton(Button):
57 parent_widget = ObjectProperty(None)
58
59
60 class VideoStreamWidget(Image):
61 pass
62
63
64 class WebRTC:
65 """WebRTC implementation for audio and video communication.
66
67 This class encapsulates the WebRTC functionalities required for initiating and
68 handling audio and video calls.
69
70 @attribute test_mode: A flag to indicate whether the WebRTC instance is in test mode.
71 If true, test video and audio sources will be used. Otherwise first webcam and
72 microphone available will be used.
73 """
74 test_mode: bool = False
75
76
77 def __init__(self, parent_calls: "Calls", profile: str) -> None:
78 self.parent_calls = parent_calls
79 self.profile = profile
80 self.pipeline = None
81 self.reset_instance()
82
83 @property
84 def sdp_set(self):
85 return self._sdp_set
86
87 @sdp_set.setter
88 def sdp_set(self, is_set: bool):
89 self._sdp_set = is_set
90 if is_set:
91 self.on_ice_candidates_new(self.remote_candidates_buffer)
92 for data in self.remote_candidates_buffer.values():
93 data["candidates"].clear()
94
95 @property
96 def media_types(self):
97 if self._media_types is None:
98 raise Exception("self._media_types should not be None!")
99 return self._media_types
100
101 @media_types.setter
102 def media_types(self, new_media_types: dict) -> None:
103 self._media_types = new_media_types
104 self._media_types_inv = {v: k for k, v in new_media_types.items()}
105
106 @property
107 def media_types_inv(self) -> dict:
108 if self._media_types_inv is None:
109 raise Exception("self._media_types_inv should not be None!")
110 return self._media_types_inv
111
112 def get_sdp_mline_index(self, media_type: str) -> int:
113 """Gets the sdpMLineIndex for a given media type.
114
115 @param media_type: The type of the media.
116 """
117 for index, m_type in self.media_types.items():
118 if m_type == media_type:
119 return index
120 raise ValueError(f"Media type '{media_type}' not found")
121
122 def _set_media_types(self, offer_sdp: str) -> None:
123 """Sets media types from offer SDP
124
125 @param offer: RTC session description containing the offer
126 """
127 sdp_lines = offer_sdp.splitlines()
128 media_types = {}
129 mline_index = 0
130
131 for line in sdp_lines:
132 if line.startswith("m="):
133 media_types[mline_index] = line[2 : line.find(" ")]
134 mline_index += 1
135
136 self.media_types = media_types
137
138 def _a_call(self, method, *args, **kwargs):
139 """Call an async method in main thread"""
140
141 def wrapper(__):
142 aio.run_async(method(*args, **kwargs))
143 return False
144
145 Clock.schedule_once(wrapper)
146
147 def get_payload_types(
148 self, sdpmsg, video_encoding: str, audio_encoding: str
149 ) -> dict[str, int | None]:
150 """Find the payload types for the specified video and audio encoding.
151
152 Very simplistically finds the first payload type matching the encoding
153 name. More complex applications will want to match caps on
154 profile-level-id, packetization-mode, etc.
155 """
156 # method coming from gstreamer example (Matthew Waters, Nirbheek Chauhan) at
157 # subprojects/gst-examples/webrtc/sendrecv/gst/webrtc_sendrecv.py
158 video_pt = None
159 audio_pt = None
160 for i in range(0, sdpmsg.medias_len()):
161 media = sdpmsg.get_media(i)
162 for j in range(0, media.formats_len()):
163 fmt = media.get_format(j)
164 if fmt == "webrtc-datachannel":
165 continue
166 pt = int(fmt)
167 caps = media.get_caps_from_media(pt)
168 s = caps.get_structure(0)
169 encoding_name = s["encoding-name"]
170 if video_pt is None and encoding_name == video_encoding:
171 video_pt = pt
172 elif audio_pt is None and encoding_name == audio_encoding:
173 audio_pt = pt
174 return {video_encoding: video_pt, audio_encoding: audio_pt}
175
176 def parse_ice_candidate(self, candidate_string):
177 """Parses the ice candidate string.
178
179 @param candidate_string: The ice candidate string to be parsed.
180 """
181 pattern = re.compile(
182 r"candidate:(?P<foundation>\S+) (?P<component_id>\d+) (?P<transport>\S+) "
183 r"(?P<priority>\d+) (?P<address>\S+) (?P<port>\d+) typ "
184 r"(?P<type>\S+)(?: raddr (?P<rel_addr>\S+) rport "
185 r"(?P<rel_port>\d+))?(?: generation (?P<generation>\d+))?"
186 )
187 match = pattern.match(candidate_string)
188 if match:
189 candidate_dict = match.groupdict()
190
191 # Apply the correct types to the dictionary values
192 candidate_dict["component_id"] = int(candidate_dict["component_id"])
193 candidate_dict["priority"] = int(candidate_dict["priority"])
194 candidate_dict["port"] = int(candidate_dict["port"])
195
196 if candidate_dict["rel_port"]:
197 candidate_dict["rel_port"] = int(candidate_dict["rel_port"])
198
199 if candidate_dict["generation"]:
200 candidate_dict["generation"] = candidate_dict["generation"]
201
202 # Remove None values
203 return {k: v for k, v in candidate_dict.items() if v is not None}
204 else:
205 log.warning(f"can't parse candidate: {candidate_string!r}")
206 return None
207
208 def build_ice_candidate(self, parsed_candidate):
209 """Builds ICE candidate
210
211 @param parsed_candidate: Dictionary containing parsed ICE candidate
212 """
213 base_format = (
214 "candidate:{foundation} {component_id} {transport} {priority} "
215 "{address} {port} typ {type}"
216 )
217
218 if parsed_candidate.get("rel_addr") and parsed_candidate.get("rel_port"):
219 base_format += " raddr {rel_addr} rport {rel_port}"
220
221 if parsed_candidate.get("generation"):
222 base_format += " generation {generation}"
223
224 return base_format.format(**parsed_candidate)
225
226 def extract_ufrag_pwd(self, sdp: str) -> tuple[str, str]:
227 """Retrieves ICE password and user fragment for SDP offer.
228
229 @param sdp: The Session Description Protocol offer string.
230 @return: ufrag and pwd
231 @raise ValueError: Can't extract ufrag and password
232 """
233 ufrag_line = re.search(r"ice-ufrag:(\S+)", sdp)
234 pwd_line = re.search(r"ice-pwd:(\S+)", sdp)
235
236 if ufrag_line and pwd_line:
237 ufrag = self.ufrag = ufrag_line.group(1)
238 pwd = self.pwd = pwd_line.group(1)
239 return ufrag, pwd
240 else:
241 log.error(f"SDP with missing ice-ufrag or ice-pwd:\n{sdp}")
242 raise ValueError("Can't extract ice-ufrag and ice-pwd from SDP")
243
244 def reset_instance(self):
245 """Inits or resets the instance variables to their default state."""
246 self.role: str | None = None
247 if self.pipeline is not None:
248 self.pipeline.set_state(Gst.State.NULL)
249 self.pipeline = None
250 self.texture_map: dict[VideoStreamWidget, TextureData] = {}
251 self._remote_video_pad = None
252 self.sid: str | None = None
253 self.offer: str | None = None
254 self.local_candidates_buffer = {}
255 self.ufrag: str | None = None
256 self.pwd: str | None = None
257 self.callee: str | None = None
258 self._media_types = None
259 self._media_types_inv = None
260 self._sdp_set: bool = False
261 self.remote_candidates_buffer: dict[str, dict[str, list]] = {
262 "audio": {"candidates": []},
263 "video": {"candidates": []},
264 }
265 self._media_types = None
266 self._media_types_inv = None
267
268 async def setup_call(
269 self,
270 role: str,
271 audio_pt: int | None = 96,
272 video_pt: int | None = 97,
273 ) -> None:
274 """Sets up the call.
275
276 This method establishes the Gstreamer pipeline for audio and video communication.
277 The method also manages STUN and TURN server configurations, signal watchers, and
278 various connection handlers for the webrtcbin.
279
280 @param role: The role for the call, either 'initiator' or 'responder'.
281 @param audio_pt: The payload type for the audio stream.
282 @param video_pt: The payload type for the video stream
283
284 @raises NotImplementedError: If audio_pt or video_pt is set to None.
285 @raises AssertionError: If the role is not 'initiator' or 'responder'.
286 """
287 assert role in ("initiator", "responder")
288 self.role = role
289 if audio_pt is None or video_pt is None:
290 raise NotImplementedError("None value is not handled yet")
291
292 if self.test_mode:
293 video_source_elt = "videotestsrc is-live=true pattern=ball"
294 audio_source_elt = "audiotestsrc"
295 else:
296 video_source_elt = "v4l2src"
297 audio_source_elt = "pulsesrc"
298
299 self.gst_pipe_desc = f"""
300 webrtcbin latency=100 name=sendrecv bundle-policy=max-compat
301
302 {video_source_elt} name=video_src
303 ! videorate
304 ! video/x-raw,framerate=30/1
305 ! tee name=t
306
307 t.
308 ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream
309 ! videoconvert
310 ! vp8enc deadline=1 keyframe-max-dist=60
311 ! rtpvp8pay picture-id-mode=15-bit
312 ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt}
313 ! sendrecv.
314
315 t.
316 ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0
317 ! videoconvert
318 ! appsink name=local_video_sink emit-signals=true drop=true max-buffers=1 sync=True
319
320 {audio_source_elt} name=audio_src
321 ! queue max-size-buffers=10 max-size-time=0 max-size-bytes=0
322 ! audioconvert
323 ! audioresample
324 ! opusenc audio-type=voice
325 ! rtpopuspay
326 ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt}
327 ! sendrecv.
328 """
329
330 log.debug(f"Gstreamer pipeline: {self.gst_pipe_desc}")
331
332 # Create the pipeline
333 self.pipeline = Gst.parse_launch(self.gst_pipe_desc)
334 if not self.pipeline:
335 raise exceptions.InternalError("Failed to create Gstreamer pipeline.")
336
337 self.webrtc = self.pipeline.get_by_name("sendrecv")
338
339 self.video_src = self.pipeline.get_by_name("video_src")
340 self.audio_src = self.pipeline.get_by_name("audio_src")
341
342 # set STUN and TURN servers
343 external_disco = data_format.deserialise(
344 await G.host.a_bridge.external_disco_get("", self.profile), type_check=list
345 )
346
347 for server in external_disco:
348 if server["type"] == "stun":
349 if server["transport"] == "tcp":
350 log.info(
351 "ignoring TCP STUN server, GStreamer only support one STUN server"
352 )
353 url = f"stun://{server['host']}:{server['port']}"
354 log.debug(f"adding stun server: {url}")
355 self.webrtc.set_property("stun-server", url)
356 elif server["type"] == "turn":
357 url = "{scheme}://{username}:{password}@{host}:{port}".format(
358 scheme = "turns" if server["transport"] == "tcp" else "turn",
359 username=quote_plus(server["username"]),
360 password=quote_plus(server["password"]),
361 host=server["host"],
362 port=server["port"],
363 )
364 log.debug(f"adding turn server: {url}")
365
366 if not self.webrtc.emit("add-turn-server", url):
367 log.warning(f"Erreur while adding TURN server {url}")
368
369 # local video feedback
370 local_video_sink = self.pipeline.get_by_name("local_video_sink")
371 local_video_sink.set_property("emit-signals", True)
372 local_video_sink.connect(
373 "new-sample",
374 self.on_new_sample,
375 self.update_sample,
376 self.parent_calls.local_video,
377 )
378 local_video_sink_caps = Gst.Caps.from_string(f"video/x-raw,format=RGB")
379 local_video_sink.set_property("caps", local_video_sink_caps)
380
381 # Create bus and associate signal watchers
382 self.bus = self.pipeline.get_bus()
383 if not self.bus:
384 log.error("Failed to get bus from pipeline.")
385 return
386
387 self.bus.add_signal_watch()
388 self.webrtc.connect("pad-added", self.on_pad_added)
389 self.bus.connect("message::error", self.on_bus_error)
390 self.bus.connect("message::eos", self.on_bus_eos)
391 self.webrtc.connect("on-negotiation-needed", self.on_negotiation_needed)
392 self.webrtc.connect("on-ice-candidate", self.on_ice_candidate)
393 self.webrtc.connect(
394 "notify::ice-gathering-state", self.on_ice_gathering_state_change
395 )
396 self.webrtc.connect("notify::ice-connection-state", self.on_ice_connection_state)
397
398 def start_pipeline(self) -> None:
399 """Starts the GStreamer pipeline."""
400 log.debug("starting the pipeline")
401 self.pipeline.set_state(Gst.State.PLAYING)
402
403 def on_negotiation_needed(self, webrtc):
404 """Initiate SDP offer when negotiation is needed."""
405 log.debug("Negotiation needed.")
406 if self.role == "initiator":
407 log.debug("Creating offer…")
408 promise = Gst.Promise.new_with_change_func(self.on_offer_created)
409 self.webrtc.emit("create-offer", None, promise)
410
411 def on_offer_created(self, promise):
412 """Callback for when SDP offer is created."""
413 log.info("on_offer_created called")
414 assert promise.wait() == Gst.PromiseResult.REPLIED
415 reply = promise.get_reply()
416 if reply is None:
417 log.error("Promise reply is None. Offer creation might have failed.")
418 return
419 offer = reply["offer"]
420 self.offer = offer.sdp.as_text()
421 log.info(f"SDP offer created: \n{self.offer}")
422 self._set_media_types(self.offer)
423 promise = Gst.Promise.new()
424 self.webrtc.emit("set-local-description", offer, promise)
425 promise.interrupt()
426 self._a_call(self._start_call)
427
428 def on_answer_set(self, promise):
429 assert promise.wait() == Gst.PromiseResult.REPLIED
430
431 def on_answer_created(self, promise, _, __):
432 """Callback for when SDP answer is created."""
433 assert promise.wait() == Gst.PromiseResult.REPLIED
434 reply = promise.get_reply()
435 answer = reply["answer"]
436 promise = Gst.Promise.new()
437 self.webrtc.emit("set-local-description", answer, promise)
438 promise.interrupt()
439 answer_sdp = answer.sdp.as_text()
440 log.info(f"SDP answer set: \n{answer_sdp}")
441 self.sdp_set = True
442 self._a_call(G.host.a_bridge.call_answer_sdp, self.sid, answer_sdp, self.profile)
443
444 def on_offer_set(self, promise):
445 assert promise.wait() == Gst.PromiseResult.REPLIED
446 promise = Gst.Promise.new_with_change_func(self.on_answer_created, None, None)
447 self.webrtc.emit("create-answer", None, promise)
448
449 def on_new_sample(
450 self,
451 video_sink: Gst.Element,
452 update_sample_method: Callable,
453 video_widget: VideoStreamWidget,
454 ) -> bool:
455 """Handles a new sample from the video sink.
456
457 @param video_sink: The video sink to pull samples from.
458 @param update_sample_method: The method to call for updating the sample.
459 @param video_widget: The video widget (either local_video or remote_video).
460 @return: Always False
461 """
462 sample = video_sink.emit("pull-sample")
463 if sample is None:
464 return False
465
466 try:
467 texture_data = self.texture_map[video_widget]
468 except KeyError:
469 texture_data = self.texture_map[video_widget] = TextureData()
470
471 # Get the current video size
472 video_pad = video_sink.get_static_pad("sink")
473 assert video_pad is not None
474 s = video_pad.get_current_caps().get_structure(0)
475 stream_size = (s.get_value("width"), s.get_value("height"))
476
477 # Compare with the texture size
478 texture_size = texture_data.size
479 if texture_size != stream_size:
480 log.debug(f"sample size update: {texture_size} => {stream_size}")
481 texture_data.size = stream_size
482 # texture needs to be recreated, so we reset the one in texture_data
483 texture_data.texture = None
484
485 Clock.schedule_once(
486 partial(
487 update_sample_method,
488 sample=sample,
489 video_widget=video_widget,
490 )
491 )
492 return False
493
494 def update_sample(
495 self,
496 dt: float,
497 sample: Optional[Gst.Sample],
498 video_widget: VideoStreamWidget,
499 ) -> None:
500 """Updates the video sample.
501
502 This method runs in the main thread.
503
504 @param dt: Time delta since the last call. This is passed by Clock.schedule_once.
505 @param sample: The video sample to update.
506 @param texture_id: identifier of the texture data (e.g. "local" or "remote")
507 @param texture: The texture object.
508 @param texture_size: The texture size.
509 @param video_widget: The video widget.
510 """
511 if sample is None:
512 return
513 try:
514 texture_data = self.texture_map[video_widget]
515 except KeyError:
516 log.warning(f"no texture data found for {video_widget}")
517 return
518
519 if texture_data.texture is None and texture_data.size is not None:
520 log.debug(f"===> creating texture for {video_widget}: {texture_data.size=}")
521 texture = Texture.create(size=texture_data.size, colorfmt="rgb")
522 assert texture is not None
523 texture_data.texture = texture
524 texture.flip_vertical()
525 video_widget.texture = texture
526
527 mapinfo = None
528 buf = None
529 try:
530 buf = sample.get_buffer()
531 _, mapinfo = buf.map(Gst.MapFlags.READ)
532
533 buffer = mapinfo.data.tobytes()
534
535 if texture_data.texture is None:
536 log.debug("can't copy the buffer, texture is None")
537 return
538 texture_data.texture.blit_buffer(buffer, colorfmt="rgb")
539 assert video_widget.canvas is not None
540 video_widget.canvas.ask_update()
541 buf.unmap(mapinfo)
542 finally:
543 if buf is not None and mapinfo is not None:
544 buf.unmap(mapinfo)
545
546 def on_remote_decodebin_stream(self, _, pad: Gst.Pad) -> None:
547 """Handle the stream from the remote decodebin.
548
549 This method processes the incoming stream from the remote decodebin, determining
550 whether it's video or audio. It then sets up the appropriate GStreamer elements
551 for video/audio processing and adds them to the pipeline.
552
553 @param pad: The Gst.Pad from the remote decodebin producing the stream.
554 """
555 assert self.pipeline is not None
556 if not pad.has_current_caps():
557 log.error(f"{pad} has no caps, ignoring")
558 return
559
560 self.pipeline.set_state(Gst.State.PAUSED)
561 caps = pad.get_current_caps()
562 assert len(caps)
563 s = caps[0]
564 name = s.get_name()
565 log.debug(f"====> NAME START: {name}")
566
567 q = Gst.ElementFactory.make("queue")
568 q.set_property("max-size-time", 0)
569 q.set_property("max-size-bytes", 0)
570 q.set_property("max-size-buffers", 5)
571
572 if name.startswith("video"):
573 log.debug("===> VIDEO OK")
574
575 self._remote_video_pad = pad
576
577 # Check and log the original size of the video
578 width = s.get_int("width").value
579 height = s.get_int("height").value
580 log.info(f"Original video size: {width}x{height}")
581
582 # This is a fix for an issue found with Movim on desktop: a non standard
583 # resolution is used (990x557) resulting in bad alignement and no color in
584 # rendered image
585 adjust_resolution = width % 4 != 0 or height % 4 != 0
586 if adjust_resolution:
587 log.info("non standard resolution, we need to adjust size")
588 width = (width + 3) // 4 * 4
589 height = (height + 3) // 4 * 4
590 log.info(f"Adjusted video size: {width}x{height}")
591
592 conv = Gst.ElementFactory.make("videoconvert")
593 remote_video_sink = Gst.ElementFactory.make("appsink")
594
595 appsink_caps = Gst.Caps.from_string("video/x-raw,format=RGB")
596 remote_video_sink.set_property("caps", appsink_caps)
597
598 remote_video_sink.set_property("emit-signals", True)
599 remote_video_sink.set_property("drop", True)
600 remote_video_sink.set_property("max-buffers", 1)
601 remote_video_sink.set_property("sync", True)
602 remote_video_sink.connect(
603 "new-sample",
604 self.on_new_sample,
605 self.update_sample,
606 self.parent_calls.remote_video,
607 )
608
609 if adjust_resolution:
610 videoscale = Gst.ElementFactory.make("videoscale")
611 adjusted_caps = Gst.Caps.from_string(f"video/x-raw,width={width},height={height}")
612 capsfilter = Gst.ElementFactory.make("capsfilter")
613 capsfilter.set_property("caps", adjusted_caps)
614
615 self.pipeline.add(q, conv, videoscale, capsfilter, remote_video_sink)
616 self.pipeline.sync_children_states()
617 pad.link(q.get_static_pad("sink"))
618 q.link(conv)
619 conv.link(videoscale)
620 videoscale.link(capsfilter)
621 capsfilter.link(remote_video_sink)
622 else:
623 self.pipeline.add(q, conv, remote_video_sink)
624 self.pipeline.sync_children_states()
625 pad.link(q.get_static_pad("sink"))
626 q.link(conv)
627 conv.link(remote_video_sink)
628
629 elif name.startswith("audio"):
630 log.debug("===> Audio OK")
631 conv = Gst.ElementFactory.make("audioconvert")
632 resample = Gst.ElementFactory.make("audioresample")
633 remote_audio_sink = Gst.ElementFactory.make("autoaudiosink")
634 self.pipeline.add(q, conv, resample, remote_audio_sink)
635 self.pipeline.sync_children_states()
636 pad.link(q.get_static_pad("sink"))
637 q.link(conv)
638 conv.link(resample)
639 resample.link(remote_audio_sink)
640 self.pipeline.set_state(Gst.State.PLAYING)
641
642 def on_pad_added(self, __, pad: Gst.Pad) -> None:
643 """Handle the addition of a new pad to the element.
644
645 When a new source pad is added to the element, this method creates a decodebin,
646 connects it to handle the stream, and links the pad to the decodebin.
647
648 @param __: Placeholder for the signal source. Not used in this method.
649 @param pad: The newly added pad.
650 """
651
652 log.debug("on_pad_added")
653 if pad.direction != Gst.PadDirection.SRC:
654 return
655
656 decodebin = Gst.ElementFactory.make("decodebin")
657 decodebin.connect("pad-added", self.on_remote_decodebin_stream)
658 self.pipeline.add(decodebin)
659 decodebin.sync_state_with_parent()
660 pad.link(decodebin.get_static_pad("sink"))
661
662 async def _start_call(self) -> None:
663 """Initiate the call.
664
665 Initiates a call with the callee using the stored offer. If there are any buffered
666 local ICE candidates, they are sent as part of the initiation.
667 """
668 assert self.callee is not None
669 self.sid = await G.host.a_bridge.call_start(
670 str(self.callee), data_format.serialise({"sdp": self.offer}), self.profile
671 )
672 if self.local_candidates_buffer:
673 log.debug(
674 f"sending buffered local ICE candidates: {self.local_candidates_buffer}"
675 )
676 if self.pwd is None:
677 sdp = self.webrtc.props.local_description.sdp.as_text()
678 self.extract_ufrag_pwd(sdp)
679 ice_data = {}
680 for media_type, candidates in self.local_candidates_buffer.items():
681 ice_data[media_type] = {
682 "ufrag": self.ufrag,
683 "pwd": self.pwd,
684 "candidates": candidates,
685 }
686 await G.host.a_bridge.ice_candidates_add(
687 self.sid, data_format.serialise(ice_data), self.profile
688 )
689 self.local_candidates_buffer.clear()
690
691 def _remote_sdp_set(self, promise) -> None:
692 assert promise.wait() == Gst.PromiseResult.REPLIED
693 self.sdp_set = True
694
695 def on_accepted_call(self, sdp: str, profile: str) -> None:
696 """Outgoing call has been accepted.
697
698 @param sdp: The SDP answer string received from the other party.
699 @param profile: Profile used for the call.
700 """
701 log.debug(f"SDP answer received: \n{sdp}")
702
703 __, sdpmsg = GstSdp.SDPMessage.new_from_text(sdp)
704 answer = GstWebRTC.WebRTCSessionDescription.new(
705 GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg
706 )
707 promise = Gst.Promise.new_with_change_func(self._remote_sdp_set)
708 self.webrtc.emit("set-remote-description", answer, promise)
709
710 async def answer_call(self, sdp: str, profile: str) -> None:
711 """Answer an incoming call
712
713 @param sdp: The SDP offer string received from the initiator.
714 @param profile: Profile used for the call.
715
716 @raise AssertionError: Raised when either "VP8" or "OPUS" is not present in
717 payload types.
718 """
719 log.debug(f"SDP offer received: \n{sdp}")
720 self._set_media_types(sdp)
721 __, offer_sdp_msg = GstSdp.SDPMessage.new_from_text(sdp)
722 payload_types = self.get_payload_types(
723 offer_sdp_msg, video_encoding="VP8", audio_encoding="OPUS"
724 )
725 assert "VP8" in payload_types
726 assert "OPUS" in payload_types
727 await self.setup_call(
728 "responder", audio_pt=payload_types["OPUS"], video_pt=payload_types["VP8"]
729 )
730 self.start_pipeline()
731 offer = GstWebRTC.WebRTCSessionDescription.new(
732 GstWebRTC.WebRTCSDPType.OFFER, offer_sdp_msg
733 )
734 promise = Gst.Promise.new_with_change_func(self.on_offer_set)
735 self.webrtc.emit("set-remote-description", offer, promise)
736
737 def on_ice_candidate(self, webrtc, mline_index, candidate_sdp):
738 """Handles the on-ice-candidate signal of webrtcbin.
739
740 @param webrtc: The webrtcbin element.
741 @param mlineindex: The mline index.
742 @param candidate: The ICE candidate.
743 """
744 log.debug(
745 f"Local ICE candidate. MLine Index: {mline_index}, Candidate: {candidate_sdp}"
746 )
747 parsed_candidate = self.parse_ice_candidate(candidate_sdp)
748 try:
749 media_type = self.media_types[mline_index]
750 except KeyError:
751 raise exceptions.InternalError("can't find media type")
752
753 if self.sid is None:
754 log.debug("buffering local ICE candidate")
755 self.local_candidates_buffer.setdefault(media_type, []).append(
756 parsed_candidate
757 )
758 else:
759 sdp = self.webrtc.props.local_description.sdp.as_text()
760 assert sdp is not None
761 ufrag, pwd = self.extract_ufrag_pwd(sdp)
762 ice_data = {"ufrag": ufrag, "pwd": pwd, "candidates": [parsed_candidate]}
763 self._a_call(
764 G.host.a_bridge.ice_candidates_add,
765 self.sid,
766 data_format.serialise({media_type: ice_data}),
767 self.profile,
768 )
769
770 def on_ice_candidates_new(self, candidates: dict) -> None:
771 """Handle new ICE candidates.
772
773 @param candidates: A dictionary containing media types ("audio" or "video") as
774 keys and corresponding ICE data as values.
775
776 @raise exceptions.InternalError: Raised when sdp mline index is not found.
777 """
778 if not self.sdp_set:
779 log.debug("buffering remote ICE candidate")
780 for media_type in ("audio", "video"):
781 media_candidates = candidates.get(media_type)
782 if media_candidates:
783 buffer = self.remote_candidates_buffer[media_type]
784 buffer["candidates"].extend(media_candidates["candidates"])
785 return
786 for media_type, ice_data in candidates.items():
787 for candidate in ice_data["candidates"]:
788 candidate_sdp = self.build_ice_candidate(candidate)
789 try:
790 mline_index = self.get_sdp_mline_index(media_type)
791 except Exception as e:
792 raise exceptions.InternalError(f"Can't find sdp mline index: {e}")
793 self.webrtc.emit("add-ice-candidate", mline_index, candidate_sdp)
794 log.debug(
795 f"Remote ICE candidate added. MLine Index: {mline_index}, "
796 f"{candidate_sdp}"
797 )
798
799 def on_ice_gathering_state_change(self, pspec, __):
800 state = self.webrtc.get_property("ice-gathering-state")
801 log.debug(f"ICE gathering state changed to {state}")
802
803 def on_ice_connection_state(self, pspec, __):
804 state = self.webrtc.props.ice_connection_state
805 if state == GstWebRTC.WebRTCICEConnectionState.FAILED:
806 log.error("ICE connection failed")
807 log.info(f"ICE connection state changed to {state}")
808
809 def on_bus_error(self, bus: Gst.Bus, message: Gst.Message) -> None:
810 """Handles the GStreamer bus error messages.
811
812 @param bus: The GStreamer bus.
813 @param message: The error message.
814 """
815 err, debug = message.parse_error()
816 log.error(f"Error from {message.src.get_name()}: {err.message}")
817 log.error(f"Debugging info: {debug}")
818
819 def on_bus_eos(self, bus: Gst.Bus, message: Gst.Message) -> None:
820 """Handles the GStreamer bus eos messages.
821
822 @param bus: The GStreamer bus.
823 @param message: The eos message.
824 """
825 log.info("End of stream")
826
827 async def end_call(self) -> None:
828 """Stop streaming and clean instance"""
829 self.reset_instance()
830
831
832 class Calls(quick_widgets.QuickWidget, cagou_widget.LiberviaDesktopKivyWidget):
833 remote_video = ObjectProperty()
834 local_video = ObjectProperty()
835 use_header_input = True
836 signals_registered = False
837 in_call = BooleanProperty(False)
838
839 def __init__(self, host, target, profiles):
840 quick_widgets.QuickWidget.__init__(self, G.host, target, profiles)
841 cagou_widget.LiberviaDesktopKivyWidget.__init__(self)
842 call_btn = CallButton(
843 parent_widget=self, on_press=lambda *__: aio.run_async(self.toggle_call())
844 )
845 self.header_input_add_extra(call_btn)
846 self.webrtc = WebRTC(self, self.profile)
847
848 if not self.__class__.signals_registered:
849 log.debug("registering signals")
850 G.host.register_signal(
851 "ice_candidates_new",
852 handler=self.__class__.ice_candidates_new_handler,
853 iface="plugin",
854 )
855 G.host.register_signal(
856 "call_setup", handler=self.__class__.call_setup_handler, iface="plugin"
857 )
858 G.host.register_signal(
859 "call_ended", handler=self.__class__.call_ended_handler, iface="plugin"
860 )
861 G.host.register_action_handler(
862 C.META_TYPE_CALL, self.__class__.action_new_handler
863 )
864 self.__class__.signals_registered = True
865
866 self.reset_instance()
867
868 @property
869 def sid(self):
870 return self.webrtc.sid
871
872 async def toggle_call(self):
873 """Toggle between making a call and hanging up.
874
875 This function will initiate or terminate a call based on the call state.
876 """
877 if self.sid is None:
878 # Initiate the call
879 log.info("initiating call")
880 callee = jid.JID(self.header_input.text.strip())
881 self.webrtc.callee = callee
882 await self.webrtc.setup_call("initiator")
883 self.webrtc.start_pipeline()
884 self.in_call = True
885 else:
886 # Terminate the call
887 sid = self.sid
888 await self.end_call({"reason": "terminated"}, self.profile)
889 await G.host.a_bridge.call_end(sid, "", self.profile)
890 self.in_call = False
891
892 async def on_incoming_call(
893 self, action_data: dict, action_id: str, profile: str
894 ) -> None:
895 """Handle incoming call notifications.
896
897 @param action_data: A dictionary containing data related to the incoming call
898 @param action_id: The ID corresponding to the incoming call action.
899 @param profile: The profile associated with the incoming call.
900 """
901 if self.sid is not None:
902 # FIXME: show a double remote call notification
903 log.warning("Ignoring new remote call as we are already in a call.")
904 return
905 sid = action_data["session_id"]
906 self.in_call = True
907 self.webrtc.sid = sid
908 # FIXME: we accept all remote calls for now, will be changed when proper UI/UX
909 # will be implemented
910 log.warning("auto-accepting remote call")
911 await G.host.a_bridge.action_launch(
912 action_id, data_format.serialise({"cancelled": False}), profile
913 )
914
915 async def on_call_setup(self, setup_data: dict, profile: str) -> None:
916 """Call has been accepted, connection can be established
917
918 @param session_id: Session identifier
919 @param setup_data: Data with following keys:
920 role: initiator or responser
921 sdp: Session Description Protocol data
922 @param profile: Profile associated
923 """
924 try:
925 role = setup_data["role"]
926 sdp = setup_data["sdp"]
927 except KeyError:
928 log.error(f"Invalid setup data received: {setup_data}")
929 return
930 if role == "initiator":
931 self.webrtc.on_accepted_call(sdp, profile)
932 elif role == "responder":
933 await self.webrtc.answer_call(sdp, profile)
934 else:
935 log.error(f"Invalid role received during setup: {setup_data}")
936
937 def reset_instance(self) -> None:
938 self.in_call = False
939 self.local_video.texture = None
940 self.remote_video.texture = None
941
942 async def end_call(self, data: dict, profile: str) -> None:
943 """End current call if any and reset the instance
944
945 @param data: end call data, often includes the reason of the call ending.
946 """
947 await self.webrtc.end_call()
948 self.reset_instance()
949
950 @classmethod
951 def ice_candidates_new_handler(
952 cls, sid: str, candidates_s: str, profile: str
953 ) -> None:
954 for wid in G.host.get_visible_list(cls):
955 if profile not in wid.profiles or sid != wid.sid:
956 continue
957 wid.webrtc.on_ice_candidates_new(
958 data_format.deserialise(candidates_s),
959 )
960
961 @classmethod
962 def call_setup_handler(cls, sid: str, setup_data_s: str, profile: str) -> None:
963 for wid in G.host.get_visible_list(cls):
964 if profile not in wid.profiles or sid != wid.sid:
965 continue
966 aio.run_async(
967 wid.on_call_setup(data_format.deserialise(setup_data_s), profile)
968 )
969
970 @classmethod
971 def call_ended_handler(cls, sid: str, data_s: str, profile: str) -> None:
972 for wid in G.host.get_visible_list(cls):
973 if profile not in wid.profiles or sid != wid.sid:
974 continue
975 aio.run_async(wid.end_call(data_format.deserialise(data_s), profile))
976
977 @classmethod
978 def action_new_handler(
979 cls, action_data: dict, action_id: str, security_limit: int, profile: str
980 ) -> None:
981 for wid in G.host.get_visible_list(cls):
982 if profile not in wid.profiles:
983 continue
984 aio.run_async(wid.on_remote_call(action_data, action_id, profile))