Mercurial > libervia-web
comparison libervia/web/pages/calls/_browser/webrtc.py @ 1549:e47c24204449
browser (calls): update call to handle search, control buttons, and better UI/UX:
- adapt to backend changes
- UI and WebRTC parts are not separated
- users can now be searched
- add mute/fullscreen buttons
- ring
- cancellable dialog when a call is received
- status of the call
- animations
- various UI/UX improvments
rel 423
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 09 Aug 2023 00:22:18 +0200 |
parents | libervia/web/pages/calls/_browser/__init__.py@eb00d593801d |
children | 83c2a6faa2ae |
comparison
equal
deleted
inserted
replaced
1548:66aa6e140ebb | 1549:e47c24204449 |
---|---|
1 import json | |
2 import re | |
3 | |
4 from bridge import AsyncBridge as Bridge | |
5 from browser import aio, console as log, document, timer, window | |
6 import errors | |
7 import jid | |
8 | |
9 log.warning = log.warn | |
10 profile = window.profile or "" | |
11 bridge = Bridge() | |
12 GATHER_TIMEOUT = 10000 | |
13 | |
14 | |
15 class WebRTC: | |
16 | |
17 def __init__(self): | |
18 self.reset_instance() | |
19 bridge.register_signal("ice_candidates_new", self._on_ice_candidates_new) | |
20 self.is_audio_muted = None | |
21 self.is_video_muted = None | |
22 self.local_video_elt = document["local_video"] | |
23 self.remote_video_elt = document["remote_video"] | |
24 | |
25 def reset_instance(self): | |
26 """Inits or resets the instance variables to their default state.""" | |
27 self._peer_connection = None | |
28 self._media_types = None | |
29 self._media_types_inv = None | |
30 self._callee = None | |
31 self.sid = None | |
32 self.local_candidates = None | |
33 self.remote_stream = None | |
34 self.candidates_buffer = { | |
35 "audio": {"candidates": []}, | |
36 "video": {"candidates": []}, | |
37 } | |
38 self.media_candidates = {} | |
39 self.candidates_gathered = aio.Future() | |
40 | |
41 @property | |
42 def media_types(self): | |
43 if self._media_types is None: | |
44 raise Exception("self._media_types should not be None!") | |
45 return self._media_types | |
46 | |
47 @media_types.setter | |
48 def media_types(self, new_media_types: dict) -> None: | |
49 self._media_types = new_media_types | |
50 self._media_types_inv = {v:k for k,v in new_media_types.items()} | |
51 | |
52 @property | |
53 def media_types_inv(self) -> dict: | |
54 if self._media_types_inv is None: | |
55 raise Exception("self._media_types_inv should not be None!") | |
56 return self._media_types_inv | |
57 | |
58 def get_sdp_mline_index(self, media_type): | |
59 """Gets the sdpMLineIndex for a given media type. | |
60 | |
61 @param media_type: The type of the media. | |
62 """ | |
63 for index, m_type in self.media_types.items(): | |
64 if m_type == media_type: | |
65 return index | |
66 raise ValueError(f"Media type '{media_type}' not found") | |
67 | |
68 def extract_pwd_ufrag(self, sdp): | |
69 """Retrieves ICE password and user fragment for SDP offer. | |
70 | |
71 @param sdp: The Session Description Protocol offer string. | |
72 """ | |
73 ufrag_line = re.search(r"ice-ufrag:(\S+)", sdp) | |
74 pwd_line = re.search(r"ice-pwd:(\S+)", sdp) | |
75 | |
76 if ufrag_line and pwd_line: | |
77 return ufrag_line.group(1), pwd_line.group(1) | |
78 else: | |
79 log.error(f"SDP with missing ice-ufrag or ice-pwd:\n{sdp}") | |
80 raise ValueError("Can't extract ice-ufrag and ice-pwd from SDP") | |
81 | |
82 def extract_fingerprint_data(self, sdp): | |
83 """Retrieves fingerprint data from an SDP offer. | |
84 | |
85 @param sdp: The Session Description Protocol offer string. | |
86 @return: A dictionary containing the fingerprint data. | |
87 """ | |
88 fingerprint_line = re.search(r"a=fingerprint:(\S+)\s+(\S+)", sdp) | |
89 if fingerprint_line: | |
90 algorithm, fingerprint = fingerprint_line.groups() | |
91 fingerprint_data = { | |
92 "hash": algorithm, | |
93 "fingerprint": fingerprint | |
94 } | |
95 | |
96 setup_line = re.search(r"a=setup:(\S+)", sdp) | |
97 if setup_line: | |
98 setup = setup_line.group(1) | |
99 fingerprint_data["setup"] = setup | |
100 | |
101 return fingerprint_data | |
102 else: | |
103 raise ValueError("fingerprint should not be missing") | |
104 | |
105 def parse_ice_candidate(self, candidate_string): | |
106 """Parses the ice candidate string. | |
107 | |
108 @param candidate_string: The ice candidate string to be parsed. | |
109 """ | |
110 pattern = re.compile( | |
111 r"candidate:(?P<foundation>\S+) (?P<component_id>\d+) (?P<transport>\S+) " | |
112 r"(?P<priority>\d+) (?P<address>\S+) (?P<port>\d+) typ " | |
113 r"(?P<type>\S+)(?: raddr (?P<rel_addr>\S+) rport " | |
114 r"(?P<rel_port>\d+))?(?: generation (?P<generation>\d+))?" | |
115 ) | |
116 match = pattern.match(candidate_string) | |
117 if match: | |
118 candidate_dict = match.groupdict() | |
119 | |
120 # Apply the correct types to the dictionary values | |
121 candidate_dict["component_id"] = int(candidate_dict["component_id"]) | |
122 candidate_dict["priority"] = int(candidate_dict["priority"]) | |
123 candidate_dict["port"] = int(candidate_dict["port"]) | |
124 | |
125 if candidate_dict["rel_port"]: | |
126 candidate_dict["rel_port"] = int(candidate_dict["rel_port"]) | |
127 | |
128 if candidate_dict["generation"]: | |
129 candidate_dict["generation"] = candidate_dict["generation"] | |
130 | |
131 # Remove None values | |
132 return {k: v for k, v in candidate_dict.items() if v is not None} | |
133 else: | |
134 log.warning(f"can't parse candidate: {candidate_string!r}") | |
135 return None | |
136 | |
137 def build_ice_candidate(self, parsed_candidate): | |
138 """Builds ICE candidate | |
139 | |
140 @param parsed_candidate: Dictionary containing parsed ICE candidate | |
141 """ | |
142 base_format = ( | |
143 "candidate:{foundation} {component_id} {transport} {priority} " | |
144 "{address} {port} typ {type}" | |
145 ) | |
146 | |
147 if ((parsed_candidate.get('rel_addr') | |
148 and parsed_candidate.get('rel_port'))): | |
149 base_format += " raddr {rel_addr} rport {rel_port}" | |
150 | |
151 if parsed_candidate.get('generation'): | |
152 base_format += " generation {generation}" | |
153 | |
154 return base_format.format(**parsed_candidate) | |
155 | |
156 def on_ice_candidate(self, event): | |
157 """Handles ICE candidate event | |
158 | |
159 @param event: Event containing the ICE candidate | |
160 """ | |
161 log.debug(f"on ice candidate {event.candidate=}") | |
162 if event.candidate and event.candidate.candidate: | |
163 window.last_event = event | |
164 parsed_candidate = self.parse_ice_candidate(event.candidate.candidate) | |
165 if parsed_candidate is None: | |
166 return | |
167 try: | |
168 media_type = self.media_types[event.candidate.sdpMLineIndex] | |
169 except (TypeError, IndexError): | |
170 log.error( | |
171 f"Can't find media type.\n{event.candidate=}\n{self._media_types=}" | |
172 ) | |
173 return | |
174 self.media_candidates.setdefault(media_type, []).append(parsed_candidate) | |
175 log.debug(f"ICE candidate [{media_type}]: {event.candidate.candidate}") | |
176 else: | |
177 log.debug("All ICE candidates gathered") | |
178 | |
179 def _set_media_types(self, offer): | |
180 """Sets media types from offer SDP | |
181 | |
182 @param offer: RTC session description containing the offer | |
183 """ | |
184 sdp_lines = offer.sdp.splitlines() | |
185 media_types = {} | |
186 mline_index = 0 | |
187 | |
188 for line in sdp_lines: | |
189 if line.startswith("m="): | |
190 media_types[mline_index] = line[2:line.find(" ")] | |
191 mline_index += 1 | |
192 | |
193 self.media_types = media_types | |
194 | |
195 def on_ice_gathering_state_change(self, event): | |
196 """Handles ICE gathering state change | |
197 | |
198 @param event: Event containing the ICE gathering state change | |
199 """ | |
200 connection = event.target | |
201 log.debug(f"on_ice_gathering_state_change {connection.iceGatheringState=}") | |
202 if connection.iceGatheringState == "complete": | |
203 log.info("ICE candidates gathering done") | |
204 self.candidates_gathered.set_result(None) | |
205 | |
206 async def _create_peer_connection( | |
207 self, | |
208 ): | |
209 """Creates peer connection""" | |
210 if self._peer_connection is not None: | |
211 raise Exception("create_peer_connection can't be called twice!") | |
212 | |
213 external_disco = json.loads(await bridge.external_disco_get("")) | |
214 ice_servers = [] | |
215 | |
216 for server in external_disco: | |
217 ice_server = {} | |
218 if server["type"] == "stun": | |
219 ice_server["urls"] = f"stun:{server['host']}:{server['port']}" | |
220 elif server["type"] == "turn": | |
221 ice_server["urls"] = ( | |
222 f"turn:{server['host']}:{server['port']}?transport={server['transport']}" | |
223 ) | |
224 ice_server["username"] = server["username"] | |
225 ice_server["credential"] = server["password"] | |
226 ice_servers.append(ice_server) | |
227 | |
228 rtc_configuration = {"iceServers": ice_servers} | |
229 | |
230 peer_connection = window.RTCPeerConnection.new(rtc_configuration) | |
231 peer_connection.addEventListener("track", self.on_track) | |
232 peer_connection.addEventListener("negotiationneeded", self.on_negotiation_needed) | |
233 peer_connection.addEventListener("icecandidate", self.on_ice_candidate) | |
234 peer_connection.addEventListener("icegatheringstatechange", self.on_ice_gathering_state_change) | |
235 | |
236 self._peer_connection = peer_connection | |
237 window.pc = self._peer_connection | |
238 | |
239 async def _get_user_media( | |
240 self, | |
241 audio: bool = True, | |
242 video: bool = True | |
243 ): | |
244 """Gets user media | |
245 | |
246 @param audio: True if an audio flux is required | |
247 @param video: True if a video flux is required | |
248 """ | |
249 media_constraints = {'audio': audio, 'video': video} | |
250 local_stream = await window.navigator.mediaDevices.getUserMedia(media_constraints) | |
251 self.local_video_elt.srcObject = local_stream | |
252 | |
253 for track in local_stream.getTracks(): | |
254 self._peer_connection.addTrack(track) | |
255 | |
256 async def _gather_ice_candidates(self, is_initiator: bool, remote_candidates=None): | |
257 """Get ICE candidates and wait to have them all before returning them | |
258 | |
259 @param is_initiator: Boolean indicating if the user is the initiator of the connection | |
260 @param remote_candidates: Remote ICE candidates, if any | |
261 """ | |
262 if self._peer_connection is None: | |
263 raise Exception("The peer connection must be created before gathering ICE candidates!") | |
264 | |
265 self.media_candidates.clear() | |
266 gather_timeout = timer.set_timeout( | |
267 lambda: self.candidates_gathered.set_exception( | |
268 errors.TimeoutError("ICE gathering time out") | |
269 ), | |
270 GATHER_TIMEOUT | |
271 ) | |
272 | |
273 if is_initiator: | |
274 offer = await self._peer_connection.createOffer() | |
275 self._set_media_types(offer) | |
276 await self._peer_connection.setLocalDescription(offer) | |
277 else: | |
278 answer = await self._peer_connection.createAnswer() | |
279 self._set_media_types(answer) | |
280 await self._peer_connection.setLocalDescription(answer) | |
281 | |
282 if not is_initiator: | |
283 log.debug(self._peer_connection.localDescription.sdp) | |
284 await self.candidates_gathered | |
285 log.debug(self._peer_connection.localDescription.sdp) | |
286 timer.clear_timeout(gather_timeout) | |
287 ufrag, pwd = self.extract_pwd_ufrag(self._peer_connection.localDescription.sdp) | |
288 return { | |
289 "ufrag": ufrag, | |
290 "pwd": pwd, | |
291 "candidates": self.media_candidates, | |
292 } | |
293 | |
294 async def accept_call(self, session_id: str, sdp: str, profile: str) -> None: | |
295 """Call has been accepted, connection can be established | |
296 | |
297 @param session_id: Session identifier | |
298 @param sdp: Session Description Protocol data | |
299 @param profile: Profile associated | |
300 """ | |
301 await self._peer_connection.setRemoteDescription({ | |
302 "type": "answer", | |
303 "sdp": sdp | |
304 }) | |
305 await self.on_ice_candidates_new(self.candidates_buffer) | |
306 self.candidates_buffer.clear() | |
307 | |
308 def _on_ice_candidates_new(self, sid: str, candidates_s: str, profile: str) -> None: | |
309 """Called when new ICE candidates are received | |
310 | |
311 @param sid: Session identifier | |
312 @param candidates_s: ICE candidates serialized | |
313 @param profile: Profile associated with the action | |
314 """ | |
315 if sid != self.sid: | |
316 log.debug( | |
317 f"ignoring peer ice candidates for {sid=} ({self.sid=})." | |
318 ) | |
319 return | |
320 candidates = json.loads(candidates_s) | |
321 aio.run(self.on_ice_candidates_new(candidates)) | |
322 | |
323 async def on_ice_candidates_new(self, candidates: dict) -> None: | |
324 """Called when new ICE canidates are received from peer | |
325 | |
326 @param candidates: Dictionary containing new ICE candidates | |
327 """ | |
328 log.debug(f"new peer candidates received: {candidates}") | |
329 if ( | |
330 self._peer_connection is None | |
331 or self._peer_connection.remoteDescription is None | |
332 ): | |
333 for media_type in ("audio", "video"): | |
334 media_candidates = candidates.get(media_type) | |
335 if media_candidates: | |
336 buffer = self.candidates_buffer[media_type] | |
337 buffer["candidates"].extend(media_candidates["candidates"]) | |
338 return | |
339 for media_type, ice_data in candidates.items(): | |
340 for candidate in ice_data["candidates"]: | |
341 candidate_sdp = self.build_ice_candidate(candidate) | |
342 try: | |
343 sdp_mline_index = self.get_sdp_mline_index(media_type) | |
344 except Exception as e: | |
345 log.warning(e) | |
346 continue | |
347 ice_candidate = window.RTCIceCandidate.new({ | |
348 "candidate": candidate_sdp, | |
349 "sdpMLineIndex": sdp_mline_index | |
350 } | |
351 ) | |
352 await self._peer_connection.addIceCandidate(ice_candidate) | |
353 | |
354 def on_track(self, event): | |
355 """New track has been received from peer | |
356 | |
357 @param event: Event associated with the new track | |
358 """ | |
359 if event.streams and event.streams[0]: | |
360 remote_stream = event.streams[0] | |
361 self.remote_video_elt.srcObject = remote_stream | |
362 else: | |
363 if self.remote_stream is None: | |
364 self.remote_stream = window.MediaStream.new() | |
365 self.remote_video_elt.srcObject = self.remote_stream | |
366 self.remote_stream.addTrack(event.track) | |
367 | |
368 def on_negotiation_needed(self, event) -> None: | |
369 log.debug(f"on_negotiation_needed {event=}") | |
370 # TODO | |
371 | |
372 async def answer_call(self, sid: str, offer_sdp: str, profile: str): | |
373 """We respond to the call""" | |
374 log.debug("answering call") | |
375 if sid != self.sid: | |
376 raise Exception( | |
377 f"Internal Error: unexpected sid: {sid=} {self.sid=}" | |
378 ) | |
379 await self._create_peer_connection() | |
380 | |
381 await self._peer_connection.setRemoteDescription({ | |
382 "type": "offer", | |
383 "sdp": offer_sdp | |
384 }) | |
385 await self.on_ice_candidates_new(self.candidates_buffer) | |
386 self.candidates_buffer.clear() | |
387 await self._get_user_media() | |
388 | |
389 # Gather local ICE candidates | |
390 local_ice_data = await self._gather_ice_candidates(False) | |
391 self.local_candidates = local_ice_data["candidates"] | |
392 | |
393 await bridge.call_answer_sdp(sid, self._peer_connection.localDescription.sdp) | |
394 | |
395 async def make_call( | |
396 self, | |
397 callee_jid: jid.JID, | |
398 audio: bool = True, | |
399 video: bool = True | |
400 ) -> None: | |
401 """Start a WebRTC call | |
402 | |
403 @param audio: True if an audio flux is required | |
404 @param video: True if a video flux is required | |
405 """ | |
406 await self._create_peer_connection() | |
407 await self._get_user_media(audio, video) | |
408 await self._gather_ice_candidates(True) | |
409 | |
410 call_data = { | |
411 "sdp": self._peer_connection.localDescription.sdp | |
412 } | |
413 log.info(f"calling {callee_jid!r}") | |
414 self.sid = await bridge.call_start( | |
415 str(callee_jid), | |
416 json.dumps(call_data) | |
417 ) | |
418 log.debug(f"Call SID: {self.sid}") | |
419 | |
420 async def end_call(self) -> None: | |
421 """Stop streaming and clean instance""" | |
422 if self._peer_connection is None: | |
423 log.debug("There is currently no call to end.") | |
424 else: | |
425 self._peer_connection.removeEventListener("track", self.on_track) | |
426 self._peer_connection.removeEventListener("negotiationneeded", self.on_negotiation_needed) | |
427 self._peer_connection.removeEventListener("icecandidate", self.on_ice_candidate) | |
428 self._peer_connection.removeEventListener("icegatheringstatechange", self.on_ice_gathering_state_change) | |
429 | |
430 # Base64 encoded 1x1 black pixel image | |
431 # this is a trick to reset the image displayed, so we don't see last image of | |
432 # last stream | |
433 black_image_data = ( | |
434 "" | |
435 "lEQVR42mP8/wcAAwAB/uzNq7sAAAAASUVORK5CYII=" | |
436 ) | |
437 | |
438 local_video = self.local_video_elt | |
439 remote_video = self.remote_video_elt | |
440 if local_video.srcObject: | |
441 for track in local_video.srcObject.getTracks(): | |
442 track.stop() | |
443 local_video.src = black_image_data | |
444 | |
445 if remote_video.srcObject: | |
446 for track in remote_video.srcObject.getTracks(): | |
447 track.stop() | |
448 remote_video.src = black_image_data | |
449 | |
450 self._peer_connection.close() | |
451 self.reset_instance() | |
452 | |
453 def toggle_media_mute(self, media_type: str) -> bool: | |
454 """Toggle mute/unmute for media tracks. | |
455 | |
456 @param media_type: 'audio' or 'video'. Determines which media tracks | |
457 to process. | |
458 """ | |
459 assert media_type in ("audio", "video"), "Invalid media type" | |
460 | |
461 local_video = self.local_video_elt | |
462 is_muted_attr = f"is_{media_type}_muted" | |
463 | |
464 if local_video.srcObject: | |
465 track_getter = getattr(local_video.srcObject, f"get{media_type.capitalize()}Tracks") | |
466 for track in track_getter(): | |
467 track.enabled = not track.enabled | |
468 setattr(self, is_muted_attr, not track.enabled) | |
469 | |
470 media_name = self.media_types_inv.get(media_type) | |
471 if media_name is not None: | |
472 extra = {"name": str(media_name)} | |
473 aio.run( | |
474 bridge.call_info( | |
475 self.sid, | |
476 "mute" if getattr(self, is_muted_attr) else "unmute", | |
477 json.dumps(extra), | |
478 ) | |
479 ) | |
480 | |
481 return getattr(self, is_muted_attr) | |
482 | |
483 def toggle_audio_mute(self) -> bool: | |
484 """Toggle mute/unmute for audio tracks.""" | |
485 return self.toggle_media_mute("audio") | |
486 | |
487 def toggle_video_mute(self) -> bool: | |
488 """Toggle mute/unmute for video tracks.""" | |
489 return self.toggle_media_mute("video") |