Mercurial > libervia-desktop-kivy
comparison libervia/desktop_kivy/plugins/plugin_wid_calls.py @ 499:f387992d8e37
plugins: new "call" plugin for A/V calls:
this is the base implementation for calls plugin, handling one2one calls.
For now, the interface is very basic, call is done by specifying the bare jid of the
destinee, then press the "call" button. Incoming calls are automatically accepted.
rel 424
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 04 Oct 2023 22:54:36 +0200 |
parents | |
children | 0480f883f0a6 |
comparison
equal
deleted
inserted
replaced
498:3b627382e681 | 499:f387992d8e37 |
---|---|
1 from dataclasses import dataclass | |
2 import re | |
3 from typing import Optional, Callable | |
4 from urllib.parse import quote_plus | |
5 from functools import partial | |
6 | |
7 # from gi.repository import GLib | |
8 from gi.repository import GObject, Gst, GstWebRTC, GstSdp | |
9 | |
10 try: | |
11 from gi.overrides import Gst as _ | |
12 except ImportError: | |
13 print( | |
14 "no GStreamer python overrides available, please install relevant pacakges on " | |
15 "your system." | |
16 ) | |
17 from kivy.clock import Clock | |
18 from kivy.graphics.texture import Texture | |
19 from kivy.properties import BooleanProperty, ObjectProperty | |
20 from kivy.support import install_gobject_iteration | |
21 from kivy.uix.button import Button | |
22 from kivy.uix.image import Image | |
23 from libervia.backend.core.constants import Const as C | |
24 from libervia.backend.core import log as logging | |
25 from libervia.backend.core.i18n import _ | |
26 from libervia.backend.core import exceptions | |
27 from libervia.backend.tools.common import data_format | |
28 from libervia.frontends.quick_frontend import quick_widgets | |
29 from libervia.frontends.tools import jid, aio | |
30 | |
31 from libervia.desktop_kivy import G | |
32 | |
33 from ..core import cagou_widget | |
34 | |
35 log = logging.getLogger(__name__) | |
36 | |
37 install_gobject_iteration() | |
38 | |
39 Gst.init(None) | |
40 | |
41 | |
42 PLUGIN_INFO = { | |
43 "name": _("calls"), | |
44 "main": "Calls", | |
45 "description": _("Audio/Video calls"), | |
46 "icon_symbol": "phone", | |
47 } | |
48 | |
49 | |
50 @dataclass | |
51 class TextureData: | |
52 texture: Optional[Texture] = None | |
53 size: Optional[tuple[int, int]] = None | |
54 | |
55 | |
56 class CallButton(Button): | |
57 parent_widget = ObjectProperty(None) | |
58 | |
59 | |
60 class VideoStreamWidget(Image): | |
61 pass | |
62 | |
63 | |
64 class WebRTC: | |
65 """WebRTC implementation for audio and video communication. | |
66 | |
67 This class encapsulates the WebRTC functionalities required for initiating and | |
68 handling audio and video calls. | |
69 | |
70 @attribute test_mode: A flag to indicate whether the WebRTC instance is in test mode. | |
71 If true, test video and audio sources will be used. Otherwise first webcam and | |
72 microphone available will be used. | |
73 """ | |
74 test_mode: bool = False | |
75 | |
76 | |
77 def __init__(self, parent_calls: "Calls", profile: str) -> None: | |
78 self.parent_calls = parent_calls | |
79 self.profile = profile | |
80 self.pipeline = None | |
81 self.reset_instance() | |
82 | |
83 @property | |
84 def sdp_set(self): | |
85 return self._sdp_set | |
86 | |
87 @sdp_set.setter | |
88 def sdp_set(self, is_set: bool): | |
89 self._sdp_set = is_set | |
90 if is_set: | |
91 self.on_ice_candidates_new(self.remote_candidates_buffer) | |
92 for data in self.remote_candidates_buffer.values(): | |
93 data["candidates"].clear() | |
94 | |
95 @property | |
96 def media_types(self): | |
97 if self._media_types is None: | |
98 raise Exception("self._media_types should not be None!") | |
99 return self._media_types | |
100 | |
101 @media_types.setter | |
102 def media_types(self, new_media_types: dict) -> None: | |
103 self._media_types = new_media_types | |
104 self._media_types_inv = {v: k for k, v in new_media_types.items()} | |
105 | |
106 @property | |
107 def media_types_inv(self) -> dict: | |
108 if self._media_types_inv is None: | |
109 raise Exception("self._media_types_inv should not be None!") | |
110 return self._media_types_inv | |
111 | |
112 def get_sdp_mline_index(self, media_type: str) -> int: | |
113 """Gets the sdpMLineIndex for a given media type. | |
114 | |
115 @param media_type: The type of the media. | |
116 """ | |
117 for index, m_type in self.media_types.items(): | |
118 if m_type == media_type: | |
119 return index | |
120 raise ValueError(f"Media type '{media_type}' not found") | |
121 | |
122 def _set_media_types(self, offer_sdp: str) -> None: | |
123 """Sets media types from offer SDP | |
124 | |
125 @param offer: RTC session description containing the offer | |
126 """ | |
127 sdp_lines = offer_sdp.splitlines() | |
128 media_types = {} | |
129 mline_index = 0 | |
130 | |
131 for line in sdp_lines: | |
132 if line.startswith("m="): | |
133 media_types[mline_index] = line[2 : line.find(" ")] | |
134 mline_index += 1 | |
135 | |
136 self.media_types = media_types | |
137 | |
138 def _a_call(self, method, *args, **kwargs): | |
139 """Call an async method in main thread""" | |
140 | |
141 def wrapper(__): | |
142 aio.run_async(method(*args, **kwargs)) | |
143 return False | |
144 | |
145 Clock.schedule_once(wrapper) | |
146 | |
147 def get_payload_types( | |
148 self, sdpmsg, video_encoding: str, audio_encoding: str | |
149 ) -> dict[str, int | None]: | |
150 """Find the payload types for the specified video and audio encoding. | |
151 | |
152 Very simplistically finds the first payload type matching the encoding | |
153 name. More complex applications will want to match caps on | |
154 profile-level-id, packetization-mode, etc. | |
155 """ | |
156 # method coming from gstreamer example (Matthew Waters, Nirbheek Chauhan) at | |
157 # subprojects/gst-examples/webrtc/sendrecv/gst/webrtc_sendrecv.py | |
158 video_pt = None | |
159 audio_pt = None | |
160 for i in range(0, sdpmsg.medias_len()): | |
161 media = sdpmsg.get_media(i) | |
162 for j in range(0, media.formats_len()): | |
163 fmt = media.get_format(j) | |
164 if fmt == "webrtc-datachannel": | |
165 continue | |
166 pt = int(fmt) | |
167 caps = media.get_caps_from_media(pt) | |
168 s = caps.get_structure(0) | |
169 encoding_name = s["encoding-name"] | |
170 if video_pt is None and encoding_name == video_encoding: | |
171 video_pt = pt | |
172 elif audio_pt is None and encoding_name == audio_encoding: | |
173 audio_pt = pt | |
174 return {video_encoding: video_pt, audio_encoding: audio_pt} | |
175 | |
176 def parse_ice_candidate(self, candidate_string): | |
177 """Parses the ice candidate string. | |
178 | |
179 @param candidate_string: The ice candidate string to be parsed. | |
180 """ | |
181 pattern = re.compile( | |
182 r"candidate:(?P<foundation>\S+) (?P<component_id>\d+) (?P<transport>\S+) " | |
183 r"(?P<priority>\d+) (?P<address>\S+) (?P<port>\d+) typ " | |
184 r"(?P<type>\S+)(?: raddr (?P<rel_addr>\S+) rport " | |
185 r"(?P<rel_port>\d+))?(?: generation (?P<generation>\d+))?" | |
186 ) | |
187 match = pattern.match(candidate_string) | |
188 if match: | |
189 candidate_dict = match.groupdict() | |
190 | |
191 # Apply the correct types to the dictionary values | |
192 candidate_dict["component_id"] = int(candidate_dict["component_id"]) | |
193 candidate_dict["priority"] = int(candidate_dict["priority"]) | |
194 candidate_dict["port"] = int(candidate_dict["port"]) | |
195 | |
196 if candidate_dict["rel_port"]: | |
197 candidate_dict["rel_port"] = int(candidate_dict["rel_port"]) | |
198 | |
199 if candidate_dict["generation"]: | |
200 candidate_dict["generation"] = candidate_dict["generation"] | |
201 | |
202 # Remove None values | |
203 return {k: v for k, v in candidate_dict.items() if v is not None} | |
204 else: | |
205 log.warning(f"can't parse candidate: {candidate_string!r}") | |
206 return None | |
207 | |
208 def build_ice_candidate(self, parsed_candidate): | |
209 """Builds ICE candidate | |
210 | |
211 @param parsed_candidate: Dictionary containing parsed ICE candidate | |
212 """ | |
213 base_format = ( | |
214 "candidate:{foundation} {component_id} {transport} {priority} " | |
215 "{address} {port} typ {type}" | |
216 ) | |
217 | |
218 if parsed_candidate.get("rel_addr") and parsed_candidate.get("rel_port"): | |
219 base_format += " raddr {rel_addr} rport {rel_port}" | |
220 | |
221 if parsed_candidate.get("generation"): | |
222 base_format += " generation {generation}" | |
223 | |
224 return base_format.format(**parsed_candidate) | |
225 | |
226 def extract_ufrag_pwd(self, sdp: str) -> tuple[str, str]: | |
227 """Retrieves ICE password and user fragment for SDP offer. | |
228 | |
229 @param sdp: The Session Description Protocol offer string. | |
230 @return: ufrag and pwd | |
231 @raise ValueError: Can't extract ufrag and password | |
232 """ | |
233 ufrag_line = re.search(r"ice-ufrag:(\S+)", sdp) | |
234 pwd_line = re.search(r"ice-pwd:(\S+)", sdp) | |
235 | |
236 if ufrag_line and pwd_line: | |
237 ufrag = self.ufrag = ufrag_line.group(1) | |
238 pwd = self.pwd = pwd_line.group(1) | |
239 return ufrag, pwd | |
240 else: | |
241 log.error(f"SDP with missing ice-ufrag or ice-pwd:\n{sdp}") | |
242 raise ValueError("Can't extract ice-ufrag and ice-pwd from SDP") | |
243 | |
244 def reset_instance(self): | |
245 """Inits or resets the instance variables to their default state.""" | |
246 self.role: str | None = None | |
247 if self.pipeline is not None: | |
248 self.pipeline.set_state(Gst.State.NULL) | |
249 self.pipeline = None | |
250 self.texture_map: dict[VideoStreamWidget, TextureData] = {} | |
251 self._remote_video_pad = None | |
252 self.sid: str | None = None | |
253 self.offer: str | None = None | |
254 self.local_candidates_buffer = {} | |
255 self.ufrag: str | None = None | |
256 self.pwd: str | None = None | |
257 self.callee: str | None = None | |
258 self._media_types = None | |
259 self._media_types_inv = None | |
260 self._sdp_set: bool = False | |
261 self.remote_candidates_buffer: dict[str, dict[str, list]] = { | |
262 "audio": {"candidates": []}, | |
263 "video": {"candidates": []}, | |
264 } | |
265 self._media_types = None | |
266 self._media_types_inv = None | |
267 | |
268 async def setup_call( | |
269 self, | |
270 role: str, | |
271 audio_pt: int | None = 96, | |
272 video_pt: int | None = 97, | |
273 ) -> None: | |
274 """Sets up the call. | |
275 | |
276 This method establishes the Gstreamer pipeline for audio and video communication. | |
277 The method also manages STUN and TURN server configurations, signal watchers, and | |
278 various connection handlers for the webrtcbin. | |
279 | |
280 @param role: The role for the call, either 'initiator' or 'responder'. | |
281 @param audio_pt: The payload type for the audio stream. | |
282 @param video_pt: The payload type for the video stream | |
283 | |
284 @raises NotImplementedError: If audio_pt or video_pt is set to None. | |
285 @raises AssertionError: If the role is not 'initiator' or 'responder'. | |
286 """ | |
287 assert role in ("initiator", "responder") | |
288 self.role = role | |
289 if audio_pt is None or video_pt is None: | |
290 raise NotImplementedError("None value is not handled yet") | |
291 | |
292 if self.test_mode: | |
293 video_source_elt = "videotestsrc is-live=true pattern=ball" | |
294 audio_source_elt = "audiotestsrc" | |
295 else: | |
296 video_source_elt = "v4l2src" | |
297 audio_source_elt = "pulsesrc" | |
298 | |
299 self.gst_pipe_desc = f""" | |
300 webrtcbin latency=100 name=sendrecv bundle-policy=max-compat | |
301 | |
302 {video_source_elt} name=video_src | |
303 ! videorate | |
304 ! video/x-raw,framerate=30/1 | |
305 ! tee name=t | |
306 | |
307 t. | |
308 ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream | |
309 ! videoconvert | |
310 ! vp8enc deadline=1 keyframe-max-dist=60 | |
311 ! rtpvp8pay picture-id-mode=15-bit | |
312 ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt} | |
313 ! sendrecv. | |
314 | |
315 t. | |
316 ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 | |
317 ! videoconvert | |
318 ! appsink name=local_video_sink emit-signals=true drop=true max-buffers=1 sync=True | |
319 | |
320 {audio_source_elt} name=audio_src | |
321 ! queue max-size-buffers=10 max-size-time=0 max-size-bytes=0 | |
322 ! audioconvert | |
323 ! audioresample | |
324 ! opusenc audio-type=voice | |
325 ! rtpopuspay | |
326 ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt} | |
327 ! sendrecv. | |
328 """ | |
329 | |
330 log.debug(f"Gstreamer pipeline: {self.gst_pipe_desc}") | |
331 | |
332 # Create the pipeline | |
333 self.pipeline = Gst.parse_launch(self.gst_pipe_desc) | |
334 if not self.pipeline: | |
335 raise exceptions.InternalError("Failed to create Gstreamer pipeline.") | |
336 | |
337 self.webrtc = self.pipeline.get_by_name("sendrecv") | |
338 | |
339 self.video_src = self.pipeline.get_by_name("video_src") | |
340 self.audio_src = self.pipeline.get_by_name("audio_src") | |
341 | |
342 # set STUN and TURN servers | |
343 external_disco = data_format.deserialise( | |
344 await G.host.a_bridge.external_disco_get("", self.profile), type_check=list | |
345 ) | |
346 | |
347 for server in external_disco: | |
348 if server["type"] == "stun": | |
349 if server["transport"] == "tcp": | |
350 log.info( | |
351 "ignoring TCP STUN server, GStreamer only support one STUN server" | |
352 ) | |
353 url = f"stun://{server['host']}:{server['port']}" | |
354 log.debug(f"adding stun server: {url}") | |
355 self.webrtc.set_property("stun-server", url) | |
356 elif server["type"] == "turn": | |
357 url = "{scheme}://{username}:{password}@{host}:{port}".format( | |
358 scheme = "turns" if server["transport"] == "tcp" else "turn", | |
359 username=quote_plus(server["username"]), | |
360 password=quote_plus(server["password"]), | |
361 host=server["host"], | |
362 port=server["port"], | |
363 ) | |
364 log.debug(f"adding turn server: {url}") | |
365 | |
366 if not self.webrtc.emit("add-turn-server", url): | |
367 log.warning(f"Erreur while adding TURN server {url}") | |
368 | |
369 # local video feedback | |
370 local_video_sink = self.pipeline.get_by_name("local_video_sink") | |
371 local_video_sink.set_property("emit-signals", True) | |
372 local_video_sink.connect( | |
373 "new-sample", | |
374 self.on_new_sample, | |
375 self.update_sample, | |
376 self.parent_calls.local_video, | |
377 ) | |
378 local_video_sink_caps = Gst.Caps.from_string(f"video/x-raw,format=RGB") | |
379 local_video_sink.set_property("caps", local_video_sink_caps) | |
380 | |
381 # Create bus and associate signal watchers | |
382 self.bus = self.pipeline.get_bus() | |
383 if not self.bus: | |
384 log.error("Failed to get bus from pipeline.") | |
385 return | |
386 | |
387 self.bus.add_signal_watch() | |
388 self.webrtc.connect("pad-added", self.on_pad_added) | |
389 self.bus.connect("message::error", self.on_bus_error) | |
390 self.bus.connect("message::eos", self.on_bus_eos) | |
391 self.webrtc.connect("on-negotiation-needed", self.on_negotiation_needed) | |
392 self.webrtc.connect("on-ice-candidate", self.on_ice_candidate) | |
393 self.webrtc.connect( | |
394 "notify::ice-gathering-state", self.on_ice_gathering_state_change | |
395 ) | |
396 self.webrtc.connect("notify::ice-connection-state", self.on_ice_connection_state) | |
397 | |
398 def start_pipeline(self) -> None: | |
399 """Starts the GStreamer pipeline.""" | |
400 log.debug("starting the pipeline") | |
401 self.pipeline.set_state(Gst.State.PLAYING) | |
402 | |
403 def on_negotiation_needed(self, webrtc): | |
404 """Initiate SDP offer when negotiation is needed.""" | |
405 log.debug("Negotiation needed.") | |
406 if self.role == "initiator": | |
407 log.debug("Creating offer…") | |
408 promise = Gst.Promise.new_with_change_func(self.on_offer_created) | |
409 self.webrtc.emit("create-offer", None, promise) | |
410 | |
411 def on_offer_created(self, promise): | |
412 """Callback for when SDP offer is created.""" | |
413 log.info("on_offer_created called") | |
414 assert promise.wait() == Gst.PromiseResult.REPLIED | |
415 reply = promise.get_reply() | |
416 if reply is None: | |
417 log.error("Promise reply is None. Offer creation might have failed.") | |
418 return | |
419 offer = reply["offer"] | |
420 self.offer = offer.sdp.as_text() | |
421 log.info(f"SDP offer created: \n{self.offer}") | |
422 self._set_media_types(self.offer) | |
423 promise = Gst.Promise.new() | |
424 self.webrtc.emit("set-local-description", offer, promise) | |
425 promise.interrupt() | |
426 self._a_call(self._start_call) | |
427 | |
428 def on_answer_set(self, promise): | |
429 assert promise.wait() == Gst.PromiseResult.REPLIED | |
430 | |
431 def on_answer_created(self, promise, _, __): | |
432 """Callback for when SDP answer is created.""" | |
433 assert promise.wait() == Gst.PromiseResult.REPLIED | |
434 reply = promise.get_reply() | |
435 answer = reply["answer"] | |
436 promise = Gst.Promise.new() | |
437 self.webrtc.emit("set-local-description", answer, promise) | |
438 promise.interrupt() | |
439 answer_sdp = answer.sdp.as_text() | |
440 log.info(f"SDP answer set: \n{answer_sdp}") | |
441 self.sdp_set = True | |
442 self._a_call(G.host.a_bridge.call_answer_sdp, self.sid, answer_sdp, self.profile) | |
443 | |
444 def on_offer_set(self, promise): | |
445 assert promise.wait() == Gst.PromiseResult.REPLIED | |
446 promise = Gst.Promise.new_with_change_func(self.on_answer_created, None, None) | |
447 self.webrtc.emit("create-answer", None, promise) | |
448 | |
449 def on_new_sample( | |
450 self, | |
451 video_sink: Gst.Element, | |
452 update_sample_method: Callable, | |
453 video_widget: VideoStreamWidget, | |
454 ) -> bool: | |
455 """Handles a new sample from the video sink. | |
456 | |
457 @param video_sink: The video sink to pull samples from. | |
458 @param update_sample_method: The method to call for updating the sample. | |
459 @param video_widget: The video widget (either local_video or remote_video). | |
460 @return: Always False | |
461 """ | |
462 sample = video_sink.emit("pull-sample") | |
463 if sample is None: | |
464 return False | |
465 | |
466 try: | |
467 texture_data = self.texture_map[video_widget] | |
468 except KeyError: | |
469 texture_data = self.texture_map[video_widget] = TextureData() | |
470 | |
471 # Get the current video size | |
472 video_pad = video_sink.get_static_pad("sink") | |
473 assert video_pad is not None | |
474 s = video_pad.get_current_caps().get_structure(0) | |
475 stream_size = (s.get_value("width"), s.get_value("height")) | |
476 | |
477 # Compare with the texture size | |
478 texture_size = texture_data.size | |
479 if texture_size != stream_size: | |
480 log.debug(f"sample size update: {texture_size} => {stream_size}") | |
481 texture_data.size = stream_size | |
482 # texture needs to be recreated, so we reset the one in texture_data | |
483 texture_data.texture = None | |
484 | |
485 Clock.schedule_once( | |
486 partial( | |
487 update_sample_method, | |
488 sample=sample, | |
489 video_widget=video_widget, | |
490 ) | |
491 ) | |
492 return False | |
493 | |
494 def update_sample( | |
495 self, | |
496 dt: float, | |
497 sample: Optional[Gst.Sample], | |
498 video_widget: VideoStreamWidget, | |
499 ) -> None: | |
500 """Updates the video sample. | |
501 | |
502 This method runs in the main thread. | |
503 | |
504 @param dt: Time delta since the last call. This is passed by Clock.schedule_once. | |
505 @param sample: The video sample to update. | |
506 @param texture_id: identifier of the texture data (e.g. "local" or "remote") | |
507 @param texture: The texture object. | |
508 @param texture_size: The texture size. | |
509 @param video_widget: The video widget. | |
510 """ | |
511 if sample is None: | |
512 return | |
513 try: | |
514 texture_data = self.texture_map[video_widget] | |
515 except KeyError: | |
516 log.warning(f"no texture data found for {video_widget}") | |
517 return | |
518 | |
519 if texture_data.texture is None and texture_data.size is not None: | |
520 log.debug(f"===> creating texture for {video_widget}: {texture_data.size=}") | |
521 texture = Texture.create(size=texture_data.size, colorfmt="rgb") | |
522 assert texture is not None | |
523 texture_data.texture = texture | |
524 texture.flip_vertical() | |
525 video_widget.texture = texture | |
526 | |
527 mapinfo = None | |
528 buf = None | |
529 try: | |
530 buf = sample.get_buffer() | |
531 _, mapinfo = buf.map(Gst.MapFlags.READ) | |
532 | |
533 buffer = mapinfo.data.tobytes() | |
534 | |
535 if texture_data.texture is None: | |
536 log.debug("can't copy the buffer, texture is None") | |
537 return | |
538 texture_data.texture.blit_buffer(buffer, colorfmt="rgb") | |
539 assert video_widget.canvas is not None | |
540 video_widget.canvas.ask_update() | |
541 buf.unmap(mapinfo) | |
542 finally: | |
543 if buf is not None and mapinfo is not None: | |
544 buf.unmap(mapinfo) | |
545 | |
546 def on_remote_decodebin_stream(self, _, pad: Gst.Pad) -> None: | |
547 """Handle the stream from the remote decodebin. | |
548 | |
549 This method processes the incoming stream from the remote decodebin, determining | |
550 whether it's video or audio. It then sets up the appropriate GStreamer elements | |
551 for video/audio processing and adds them to the pipeline. | |
552 | |
553 @param pad: The Gst.Pad from the remote decodebin producing the stream. | |
554 """ | |
555 assert self.pipeline is not None | |
556 if not pad.has_current_caps(): | |
557 log.error(f"{pad} has no caps, ignoring") | |
558 return | |
559 | |
560 self.pipeline.set_state(Gst.State.PAUSED) | |
561 caps = pad.get_current_caps() | |
562 assert len(caps) | |
563 s = caps[0] | |
564 name = s.get_name() | |
565 log.debug(f"====> NAME START: {name}") | |
566 | |
567 q = Gst.ElementFactory.make("queue") | |
568 q.set_property("max-size-time", 0) | |
569 q.set_property("max-size-bytes", 0) | |
570 q.set_property("max-size-buffers", 5) | |
571 | |
572 if name.startswith("video"): | |
573 log.debug("===> VIDEO OK") | |
574 | |
575 self._remote_video_pad = pad | |
576 | |
577 # Check and log the original size of the video | |
578 width = s.get_int("width").value | |
579 height = s.get_int("height").value | |
580 log.info(f"Original video size: {width}x{height}") | |
581 | |
582 # This is a fix for an issue found with Movim on desktop: a non standard | |
583 # resolution is used (990x557) resulting in bad alignement and no color in | |
584 # rendered image | |
585 adjust_resolution = width % 4 != 0 or height % 4 != 0 | |
586 if adjust_resolution: | |
587 log.info("non standard resolution, we need to adjust size") | |
588 width = (width + 3) // 4 * 4 | |
589 height = (height + 3) // 4 * 4 | |
590 log.info(f"Adjusted video size: {width}x{height}") | |
591 | |
592 conv = Gst.ElementFactory.make("videoconvert") | |
593 remote_video_sink = Gst.ElementFactory.make("appsink") | |
594 | |
595 appsink_caps = Gst.Caps.from_string("video/x-raw,format=RGB") | |
596 remote_video_sink.set_property("caps", appsink_caps) | |
597 | |
598 remote_video_sink.set_property("emit-signals", True) | |
599 remote_video_sink.set_property("drop", True) | |
600 remote_video_sink.set_property("max-buffers", 1) | |
601 remote_video_sink.set_property("sync", True) | |
602 remote_video_sink.connect( | |
603 "new-sample", | |
604 self.on_new_sample, | |
605 self.update_sample, | |
606 self.parent_calls.remote_video, | |
607 ) | |
608 | |
609 if adjust_resolution: | |
610 videoscale = Gst.ElementFactory.make("videoscale") | |
611 adjusted_caps = Gst.Caps.from_string(f"video/x-raw,width={width},height={height}") | |
612 capsfilter = Gst.ElementFactory.make("capsfilter") | |
613 capsfilter.set_property("caps", adjusted_caps) | |
614 | |
615 self.pipeline.add(q, conv, videoscale, capsfilter, remote_video_sink) | |
616 self.pipeline.sync_children_states() | |
617 pad.link(q.get_static_pad("sink")) | |
618 q.link(conv) | |
619 conv.link(videoscale) | |
620 videoscale.link(capsfilter) | |
621 capsfilter.link(remote_video_sink) | |
622 else: | |
623 self.pipeline.add(q, conv, remote_video_sink) | |
624 self.pipeline.sync_children_states() | |
625 pad.link(q.get_static_pad("sink")) | |
626 q.link(conv) | |
627 conv.link(remote_video_sink) | |
628 | |
629 elif name.startswith("audio"): | |
630 log.debug("===> Audio OK") | |
631 conv = Gst.ElementFactory.make("audioconvert") | |
632 resample = Gst.ElementFactory.make("audioresample") | |
633 remote_audio_sink = Gst.ElementFactory.make("autoaudiosink") | |
634 self.pipeline.add(q, conv, resample, remote_audio_sink) | |
635 self.pipeline.sync_children_states() | |
636 pad.link(q.get_static_pad("sink")) | |
637 q.link(conv) | |
638 conv.link(resample) | |
639 resample.link(remote_audio_sink) | |
640 self.pipeline.set_state(Gst.State.PLAYING) | |
641 | |
642 def on_pad_added(self, __, pad: Gst.Pad) -> None: | |
643 """Handle the addition of a new pad to the element. | |
644 | |
645 When a new source pad is added to the element, this method creates a decodebin, | |
646 connects it to handle the stream, and links the pad to the decodebin. | |
647 | |
648 @param __: Placeholder for the signal source. Not used in this method. | |
649 @param pad: The newly added pad. | |
650 """ | |
651 | |
652 log.debug("on_pad_added") | |
653 if pad.direction != Gst.PadDirection.SRC: | |
654 return | |
655 | |
656 decodebin = Gst.ElementFactory.make("decodebin") | |
657 decodebin.connect("pad-added", self.on_remote_decodebin_stream) | |
658 self.pipeline.add(decodebin) | |
659 decodebin.sync_state_with_parent() | |
660 pad.link(decodebin.get_static_pad("sink")) | |
661 | |
662 async def _start_call(self) -> None: | |
663 """Initiate the call. | |
664 | |
665 Initiates a call with the callee using the stored offer. If there are any buffered | |
666 local ICE candidates, they are sent as part of the initiation. | |
667 """ | |
668 assert self.callee is not None | |
669 self.sid = await G.host.a_bridge.call_start( | |
670 str(self.callee), data_format.serialise({"sdp": self.offer}), self.profile | |
671 ) | |
672 if self.local_candidates_buffer: | |
673 log.debug( | |
674 f"sending buffered local ICE candidates: {self.local_candidates_buffer}" | |
675 ) | |
676 if self.pwd is None: | |
677 sdp = self.webrtc.props.local_description.sdp.as_text() | |
678 self.extract_ufrag_pwd(sdp) | |
679 ice_data = {} | |
680 for media_type, candidates in self.local_candidates_buffer.items(): | |
681 ice_data[media_type] = { | |
682 "ufrag": self.ufrag, | |
683 "pwd": self.pwd, | |
684 "candidates": candidates, | |
685 } | |
686 await G.host.a_bridge.ice_candidates_add( | |
687 self.sid, data_format.serialise(ice_data), self.profile | |
688 ) | |
689 self.local_candidates_buffer.clear() | |
690 | |
691 def _remote_sdp_set(self, promise) -> None: | |
692 assert promise.wait() == Gst.PromiseResult.REPLIED | |
693 self.sdp_set = True | |
694 | |
695 def on_accepted_call(self, sdp: str, profile: str) -> None: | |
696 """Outgoing call has been accepted. | |
697 | |
698 @param sdp: The SDP answer string received from the other party. | |
699 @param profile: Profile used for the call. | |
700 """ | |
701 log.debug(f"SDP answer received: \n{sdp}") | |
702 | |
703 __, sdpmsg = GstSdp.SDPMessage.new_from_text(sdp) | |
704 answer = GstWebRTC.WebRTCSessionDescription.new( | |
705 GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg | |
706 ) | |
707 promise = Gst.Promise.new_with_change_func(self._remote_sdp_set) | |
708 self.webrtc.emit("set-remote-description", answer, promise) | |
709 | |
710 async def answer_call(self, sdp: str, profile: str) -> None: | |
711 """Answer an incoming call | |
712 | |
713 @param sdp: The SDP offer string received from the initiator. | |
714 @param profile: Profile used for the call. | |
715 | |
716 @raise AssertionError: Raised when either "VP8" or "OPUS" is not present in | |
717 payload types. | |
718 """ | |
719 log.debug(f"SDP offer received: \n{sdp}") | |
720 self._set_media_types(sdp) | |
721 __, offer_sdp_msg = GstSdp.SDPMessage.new_from_text(sdp) | |
722 payload_types = self.get_payload_types( | |
723 offer_sdp_msg, video_encoding="VP8", audio_encoding="OPUS" | |
724 ) | |
725 assert "VP8" in payload_types | |
726 assert "OPUS" in payload_types | |
727 await self.setup_call( | |
728 "responder", audio_pt=payload_types["OPUS"], video_pt=payload_types["VP8"] | |
729 ) | |
730 self.start_pipeline() | |
731 offer = GstWebRTC.WebRTCSessionDescription.new( | |
732 GstWebRTC.WebRTCSDPType.OFFER, offer_sdp_msg | |
733 ) | |
734 promise = Gst.Promise.new_with_change_func(self.on_offer_set) | |
735 self.webrtc.emit("set-remote-description", offer, promise) | |
736 | |
737 def on_ice_candidate(self, webrtc, mline_index, candidate_sdp): | |
738 """Handles the on-ice-candidate signal of webrtcbin. | |
739 | |
740 @param webrtc: The webrtcbin element. | |
741 @param mlineindex: The mline index. | |
742 @param candidate: The ICE candidate. | |
743 """ | |
744 log.debug( | |
745 f"Local ICE candidate. MLine Index: {mline_index}, Candidate: {candidate_sdp}" | |
746 ) | |
747 parsed_candidate = self.parse_ice_candidate(candidate_sdp) | |
748 try: | |
749 media_type = self.media_types[mline_index] | |
750 except KeyError: | |
751 raise exceptions.InternalError("can't find media type") | |
752 | |
753 if self.sid is None: | |
754 log.debug("buffering local ICE candidate") | |
755 self.local_candidates_buffer.setdefault(media_type, []).append( | |
756 parsed_candidate | |
757 ) | |
758 else: | |
759 sdp = self.webrtc.props.local_description.sdp.as_text() | |
760 assert sdp is not None | |
761 ufrag, pwd = self.extract_ufrag_pwd(sdp) | |
762 ice_data = {"ufrag": ufrag, "pwd": pwd, "candidates": [parsed_candidate]} | |
763 self._a_call( | |
764 G.host.a_bridge.ice_candidates_add, | |
765 self.sid, | |
766 data_format.serialise({media_type: ice_data}), | |
767 self.profile, | |
768 ) | |
769 | |
770 def on_ice_candidates_new(self, candidates: dict) -> None: | |
771 """Handle new ICE candidates. | |
772 | |
773 @param candidates: A dictionary containing media types ("audio" or "video") as | |
774 keys and corresponding ICE data as values. | |
775 | |
776 @raise exceptions.InternalError: Raised when sdp mline index is not found. | |
777 """ | |
778 if not self.sdp_set: | |
779 log.debug("buffering remote ICE candidate") | |
780 for media_type in ("audio", "video"): | |
781 media_candidates = candidates.get(media_type) | |
782 if media_candidates: | |
783 buffer = self.remote_candidates_buffer[media_type] | |
784 buffer["candidates"].extend(media_candidates["candidates"]) | |
785 return | |
786 for media_type, ice_data in candidates.items(): | |
787 for candidate in ice_data["candidates"]: | |
788 candidate_sdp = self.build_ice_candidate(candidate) | |
789 try: | |
790 mline_index = self.get_sdp_mline_index(media_type) | |
791 except Exception as e: | |
792 raise exceptions.InternalError(f"Can't find sdp mline index: {e}") | |
793 self.webrtc.emit("add-ice-candidate", mline_index, candidate_sdp) | |
794 log.debug( | |
795 f"Remote ICE candidate added. MLine Index: {mline_index}, " | |
796 f"{candidate_sdp}" | |
797 ) | |
798 | |
799 def on_ice_gathering_state_change(self, pspec, __): | |
800 state = self.webrtc.get_property("ice-gathering-state") | |
801 log.debug(f"ICE gathering state changed to {state}") | |
802 | |
803 def on_ice_connection_state(self, pspec, __): | |
804 state = self.webrtc.props.ice_connection_state | |
805 if state == GstWebRTC.WebRTCICEConnectionState.FAILED: | |
806 log.error("ICE connection failed") | |
807 log.info(f"ICE connection state changed to {state}") | |
808 | |
809 def on_bus_error(self, bus: Gst.Bus, message: Gst.Message) -> None: | |
810 """Handles the GStreamer bus error messages. | |
811 | |
812 @param bus: The GStreamer bus. | |
813 @param message: The error message. | |
814 """ | |
815 err, debug = message.parse_error() | |
816 log.error(f"Error from {message.src.get_name()}: {err.message}") | |
817 log.error(f"Debugging info: {debug}") | |
818 | |
819 def on_bus_eos(self, bus: Gst.Bus, message: Gst.Message) -> None: | |
820 """Handles the GStreamer bus eos messages. | |
821 | |
822 @param bus: The GStreamer bus. | |
823 @param message: The eos message. | |
824 """ | |
825 log.info("End of stream") | |
826 | |
827 async def end_call(self) -> None: | |
828 """Stop streaming and clean instance""" | |
829 self.reset_instance() | |
830 | |
831 | |
832 class Calls(quick_widgets.QuickWidget, cagou_widget.LiberviaDesktopKivyWidget): | |
833 remote_video = ObjectProperty() | |
834 local_video = ObjectProperty() | |
835 use_header_input = True | |
836 signals_registered = False | |
837 in_call = BooleanProperty(False) | |
838 | |
839 def __init__(self, host, target, profiles): | |
840 quick_widgets.QuickWidget.__init__(self, G.host, target, profiles) | |
841 cagou_widget.LiberviaDesktopKivyWidget.__init__(self) | |
842 call_btn = CallButton( | |
843 parent_widget=self, on_press=lambda *__: aio.run_async(self.toggle_call()) | |
844 ) | |
845 self.header_input_add_extra(call_btn) | |
846 self.webrtc = WebRTC(self, self.profile) | |
847 | |
848 if not self.__class__.signals_registered: | |
849 log.debug("registering signals") | |
850 G.host.register_signal( | |
851 "ice_candidates_new", | |
852 handler=self.__class__.ice_candidates_new_handler, | |
853 iface="plugin", | |
854 ) | |
855 G.host.register_signal( | |
856 "call_setup", handler=self.__class__.call_setup_handler, iface="plugin" | |
857 ) | |
858 G.host.register_signal( | |
859 "call_ended", handler=self.__class__.call_ended_handler, iface="plugin" | |
860 ) | |
861 G.host.register_action_handler( | |
862 C.META_TYPE_CALL, self.__class__.action_new_handler | |
863 ) | |
864 self.__class__.signals_registered = True | |
865 | |
866 self.reset_instance() | |
867 | |
868 @property | |
869 def sid(self): | |
870 return self.webrtc.sid | |
871 | |
872 async def toggle_call(self): | |
873 """Toggle between making a call and hanging up. | |
874 | |
875 This function will initiate or terminate a call based on the call state. | |
876 """ | |
877 if self.sid is None: | |
878 # Initiate the call | |
879 log.info("initiating call") | |
880 callee = jid.JID(self.header_input.text.strip()) | |
881 self.webrtc.callee = callee | |
882 await self.webrtc.setup_call("initiator") | |
883 self.webrtc.start_pipeline() | |
884 self.in_call = True | |
885 else: | |
886 # Terminate the call | |
887 sid = self.sid | |
888 await self.end_call({"reason": "terminated"}, self.profile) | |
889 await G.host.a_bridge.call_end(sid, "", self.profile) | |
890 self.in_call = False | |
891 | |
892 async def on_incoming_call( | |
893 self, action_data: dict, action_id: str, profile: str | |
894 ) -> None: | |
895 """Handle incoming call notifications. | |
896 | |
897 @param action_data: A dictionary containing data related to the incoming call | |
898 @param action_id: The ID corresponding to the incoming call action. | |
899 @param profile: The profile associated with the incoming call. | |
900 """ | |
901 if self.sid is not None: | |
902 # FIXME: show a double remote call notification | |
903 log.warning("Ignoring new remote call as we are already in a call.") | |
904 return | |
905 sid = action_data["session_id"] | |
906 self.in_call = True | |
907 self.webrtc.sid = sid | |
908 # FIXME: we accept all remote calls for now, will be changed when proper UI/UX | |
909 # will be implemented | |
910 log.warning("auto-accepting remote call") | |
911 await G.host.a_bridge.action_launch( | |
912 action_id, data_format.serialise({"cancelled": False}), profile | |
913 ) | |
914 | |
915 async def on_call_setup(self, setup_data: dict, profile: str) -> None: | |
916 """Call has been accepted, connection can be established | |
917 | |
918 @param session_id: Session identifier | |
919 @param setup_data: Data with following keys: | |
920 role: initiator or responser | |
921 sdp: Session Description Protocol data | |
922 @param profile: Profile associated | |
923 """ | |
924 try: | |
925 role = setup_data["role"] | |
926 sdp = setup_data["sdp"] | |
927 except KeyError: | |
928 log.error(f"Invalid setup data received: {setup_data}") | |
929 return | |
930 if role == "initiator": | |
931 self.webrtc.on_accepted_call(sdp, profile) | |
932 elif role == "responder": | |
933 await self.webrtc.answer_call(sdp, profile) | |
934 else: | |
935 log.error(f"Invalid role received during setup: {setup_data}") | |
936 | |
937 def reset_instance(self) -> None: | |
938 self.in_call = False | |
939 self.local_video.texture = None | |
940 self.remote_video.texture = None | |
941 | |
942 async def end_call(self, data: dict, profile: str) -> None: | |
943 """End current call if any and reset the instance | |
944 | |
945 @param data: end call data, often includes the reason of the call ending. | |
946 """ | |
947 await self.webrtc.end_call() | |
948 self.reset_instance() | |
949 | |
950 @classmethod | |
951 def ice_candidates_new_handler( | |
952 cls, sid: str, candidates_s: str, profile: str | |
953 ) -> None: | |
954 for wid in G.host.get_visible_list(cls): | |
955 if profile not in wid.profiles or sid != wid.sid: | |
956 continue | |
957 wid.webrtc.on_ice_candidates_new( | |
958 data_format.deserialise(candidates_s), | |
959 ) | |
960 | |
961 @classmethod | |
962 def call_setup_handler(cls, sid: str, setup_data_s: str, profile: str) -> None: | |
963 for wid in G.host.get_visible_list(cls): | |
964 if profile not in wid.profiles or sid != wid.sid: | |
965 continue | |
966 aio.run_async( | |
967 wid.on_call_setup(data_format.deserialise(setup_data_s), profile) | |
968 ) | |
969 | |
970 @classmethod | |
971 def call_ended_handler(cls, sid: str, data_s: str, profile: str) -> None: | |
972 for wid in G.host.get_visible_list(cls): | |
973 if profile not in wid.profiles or sid != wid.sid: | |
974 continue | |
975 aio.run_async(wid.end_call(data_format.deserialise(data_s), profile)) | |
976 | |
977 @classmethod | |
978 def action_new_handler( | |
979 cls, action_data: dict, action_id: str, security_limit: int, profile: str | |
980 ) -> None: | |
981 for wid in G.host.get_visible_list(cls): | |
982 if profile not in wid.profiles: | |
983 continue | |
984 aio.run_async(wid.on_remote_call(action_data, action_id, profile)) |