comparison libervia/web/pages/calls/_browser/webrtc.py @ 1549:e47c24204449

browser (calls): update call to handle search, control buttons, and better UI/UX: - adapt to backend changes - UI and WebRTC parts are not separated - users can now be searched - add mute/fullscreen buttons - ring - cancellable dialog when a call is received - status of the call - animations - various UI/UX improvments rel 423
author Goffi <goffi@goffi.org>
date Wed, 09 Aug 2023 00:22:18 +0200
parents libervia/web/pages/calls/_browser/__init__.py@eb00d593801d
children 83c2a6faa2ae
comparison
equal deleted inserted replaced
1548:66aa6e140ebb 1549:e47c24204449
1 import json
2 import re
3
4 from bridge import AsyncBridge as Bridge
5 from browser import aio, console as log, document, timer, window
6 import errors
7 import jid
8
9 log.warning = log.warn
10 profile = window.profile or ""
11 bridge = Bridge()
12 GATHER_TIMEOUT = 10000
13
14
15 class WebRTC:
16
17 def __init__(self):
18 self.reset_instance()
19 bridge.register_signal("ice_candidates_new", self._on_ice_candidates_new)
20 self.is_audio_muted = None
21 self.is_video_muted = None
22 self.local_video_elt = document["local_video"]
23 self.remote_video_elt = document["remote_video"]
24
25 def reset_instance(self):
26 """Inits or resets the instance variables to their default state."""
27 self._peer_connection = None
28 self._media_types = None
29 self._media_types_inv = None
30 self._callee = None
31 self.sid = None
32 self.local_candidates = None
33 self.remote_stream = None
34 self.candidates_buffer = {
35 "audio": {"candidates": []},
36 "video": {"candidates": []},
37 }
38 self.media_candidates = {}
39 self.candidates_gathered = aio.Future()
40
41 @property
42 def media_types(self):
43 if self._media_types is None:
44 raise Exception("self._media_types should not be None!")
45 return self._media_types
46
47 @media_types.setter
48 def media_types(self, new_media_types: dict) -> None:
49 self._media_types = new_media_types
50 self._media_types_inv = {v:k for k,v in new_media_types.items()}
51
52 @property
53 def media_types_inv(self) -> dict:
54 if self._media_types_inv is None:
55 raise Exception("self._media_types_inv should not be None!")
56 return self._media_types_inv
57
58 def get_sdp_mline_index(self, media_type):
59 """Gets the sdpMLineIndex for a given media type.
60
61 @param media_type: The type of the media.
62 """
63 for index, m_type in self.media_types.items():
64 if m_type == media_type:
65 return index
66 raise ValueError(f"Media type '{media_type}' not found")
67
68 def extract_pwd_ufrag(self, sdp):
69 """Retrieves ICE password and user fragment for SDP offer.
70
71 @param sdp: The Session Description Protocol offer string.
72 """
73 ufrag_line = re.search(r"ice-ufrag:(\S+)", sdp)
74 pwd_line = re.search(r"ice-pwd:(\S+)", sdp)
75
76 if ufrag_line and pwd_line:
77 return ufrag_line.group(1), pwd_line.group(1)
78 else:
79 log.error(f"SDP with missing ice-ufrag or ice-pwd:\n{sdp}")
80 raise ValueError("Can't extract ice-ufrag and ice-pwd from SDP")
81
82 def extract_fingerprint_data(self, sdp):
83 """Retrieves fingerprint data from an SDP offer.
84
85 @param sdp: The Session Description Protocol offer string.
86 @return: A dictionary containing the fingerprint data.
87 """
88 fingerprint_line = re.search(r"a=fingerprint:(\S+)\s+(\S+)", sdp)
89 if fingerprint_line:
90 algorithm, fingerprint = fingerprint_line.groups()
91 fingerprint_data = {
92 "hash": algorithm,
93 "fingerprint": fingerprint
94 }
95
96 setup_line = re.search(r"a=setup:(\S+)", sdp)
97 if setup_line:
98 setup = setup_line.group(1)
99 fingerprint_data["setup"] = setup
100
101 return fingerprint_data
102 else:
103 raise ValueError("fingerprint should not be missing")
104
105 def parse_ice_candidate(self, candidate_string):
106 """Parses the ice candidate string.
107
108 @param candidate_string: The ice candidate string to be parsed.
109 """
110 pattern = re.compile(
111 r"candidate:(?P<foundation>\S+) (?P<component_id>\d+) (?P<transport>\S+) "
112 r"(?P<priority>\d+) (?P<address>\S+) (?P<port>\d+) typ "
113 r"(?P<type>\S+)(?: raddr (?P<rel_addr>\S+) rport "
114 r"(?P<rel_port>\d+))?(?: generation (?P<generation>\d+))?"
115 )
116 match = pattern.match(candidate_string)
117 if match:
118 candidate_dict = match.groupdict()
119
120 # Apply the correct types to the dictionary values
121 candidate_dict["component_id"] = int(candidate_dict["component_id"])
122 candidate_dict["priority"] = int(candidate_dict["priority"])
123 candidate_dict["port"] = int(candidate_dict["port"])
124
125 if candidate_dict["rel_port"]:
126 candidate_dict["rel_port"] = int(candidate_dict["rel_port"])
127
128 if candidate_dict["generation"]:
129 candidate_dict["generation"] = candidate_dict["generation"]
130
131 # Remove None values
132 return {k: v for k, v in candidate_dict.items() if v is not None}
133 else:
134 log.warning(f"can't parse candidate: {candidate_string!r}")
135 return None
136
137 def build_ice_candidate(self, parsed_candidate):
138 """Builds ICE candidate
139
140 @param parsed_candidate: Dictionary containing parsed ICE candidate
141 """
142 base_format = (
143 "candidate:{foundation} {component_id} {transport} {priority} "
144 "{address} {port} typ {type}"
145 )
146
147 if ((parsed_candidate.get('rel_addr')
148 and parsed_candidate.get('rel_port'))):
149 base_format += " raddr {rel_addr} rport {rel_port}"
150
151 if parsed_candidate.get('generation'):
152 base_format += " generation {generation}"
153
154 return base_format.format(**parsed_candidate)
155
156 def on_ice_candidate(self, event):
157 """Handles ICE candidate event
158
159 @param event: Event containing the ICE candidate
160 """
161 log.debug(f"on ice candidate {event.candidate=}")
162 if event.candidate and event.candidate.candidate:
163 window.last_event = event
164 parsed_candidate = self.parse_ice_candidate(event.candidate.candidate)
165 if parsed_candidate is None:
166 return
167 try:
168 media_type = self.media_types[event.candidate.sdpMLineIndex]
169 except (TypeError, IndexError):
170 log.error(
171 f"Can't find media type.\n{event.candidate=}\n{self._media_types=}"
172 )
173 return
174 self.media_candidates.setdefault(media_type, []).append(parsed_candidate)
175 log.debug(f"ICE candidate [{media_type}]: {event.candidate.candidate}")
176 else:
177 log.debug("All ICE candidates gathered")
178
179 def _set_media_types(self, offer):
180 """Sets media types from offer SDP
181
182 @param offer: RTC session description containing the offer
183 """
184 sdp_lines = offer.sdp.splitlines()
185 media_types = {}
186 mline_index = 0
187
188 for line in sdp_lines:
189 if line.startswith("m="):
190 media_types[mline_index] = line[2:line.find(" ")]
191 mline_index += 1
192
193 self.media_types = media_types
194
195 def on_ice_gathering_state_change(self, event):
196 """Handles ICE gathering state change
197
198 @param event: Event containing the ICE gathering state change
199 """
200 connection = event.target
201 log.debug(f"on_ice_gathering_state_change {connection.iceGatheringState=}")
202 if connection.iceGatheringState == "complete":
203 log.info("ICE candidates gathering done")
204 self.candidates_gathered.set_result(None)
205
206 async def _create_peer_connection(
207 self,
208 ):
209 """Creates peer connection"""
210 if self._peer_connection is not None:
211 raise Exception("create_peer_connection can't be called twice!")
212
213 external_disco = json.loads(await bridge.external_disco_get(""))
214 ice_servers = []
215
216 for server in external_disco:
217 ice_server = {}
218 if server["type"] == "stun":
219 ice_server["urls"] = f"stun:{server['host']}:{server['port']}"
220 elif server["type"] == "turn":
221 ice_server["urls"] = (
222 f"turn:{server['host']}:{server['port']}?transport={server['transport']}"
223 )
224 ice_server["username"] = server["username"]
225 ice_server["credential"] = server["password"]
226 ice_servers.append(ice_server)
227
228 rtc_configuration = {"iceServers": ice_servers}
229
230 peer_connection = window.RTCPeerConnection.new(rtc_configuration)
231 peer_connection.addEventListener("track", self.on_track)
232 peer_connection.addEventListener("negotiationneeded", self.on_negotiation_needed)
233 peer_connection.addEventListener("icecandidate", self.on_ice_candidate)
234 peer_connection.addEventListener("icegatheringstatechange", self.on_ice_gathering_state_change)
235
236 self._peer_connection = peer_connection
237 window.pc = self._peer_connection
238
239 async def _get_user_media(
240 self,
241 audio: bool = True,
242 video: bool = True
243 ):
244 """Gets user media
245
246 @param audio: True if an audio flux is required
247 @param video: True if a video flux is required
248 """
249 media_constraints = {'audio': audio, 'video': video}
250 local_stream = await window.navigator.mediaDevices.getUserMedia(media_constraints)
251 self.local_video_elt.srcObject = local_stream
252
253 for track in local_stream.getTracks():
254 self._peer_connection.addTrack(track)
255
256 async def _gather_ice_candidates(self, is_initiator: bool, remote_candidates=None):
257 """Get ICE candidates and wait to have them all before returning them
258
259 @param is_initiator: Boolean indicating if the user is the initiator of the connection
260 @param remote_candidates: Remote ICE candidates, if any
261 """
262 if self._peer_connection is None:
263 raise Exception("The peer connection must be created before gathering ICE candidates!")
264
265 self.media_candidates.clear()
266 gather_timeout = timer.set_timeout(
267 lambda: self.candidates_gathered.set_exception(
268 errors.TimeoutError("ICE gathering time out")
269 ),
270 GATHER_TIMEOUT
271 )
272
273 if is_initiator:
274 offer = await self._peer_connection.createOffer()
275 self._set_media_types(offer)
276 await self._peer_connection.setLocalDescription(offer)
277 else:
278 answer = await self._peer_connection.createAnswer()
279 self._set_media_types(answer)
280 await self._peer_connection.setLocalDescription(answer)
281
282 if not is_initiator:
283 log.debug(self._peer_connection.localDescription.sdp)
284 await self.candidates_gathered
285 log.debug(self._peer_connection.localDescription.sdp)
286 timer.clear_timeout(gather_timeout)
287 ufrag, pwd = self.extract_pwd_ufrag(self._peer_connection.localDescription.sdp)
288 return {
289 "ufrag": ufrag,
290 "pwd": pwd,
291 "candidates": self.media_candidates,
292 }
293
294 async def accept_call(self, session_id: str, sdp: str, profile: str) -> None:
295 """Call has been accepted, connection can be established
296
297 @param session_id: Session identifier
298 @param sdp: Session Description Protocol data
299 @param profile: Profile associated
300 """
301 await self._peer_connection.setRemoteDescription({
302 "type": "answer",
303 "sdp": sdp
304 })
305 await self.on_ice_candidates_new(self.candidates_buffer)
306 self.candidates_buffer.clear()
307
308 def _on_ice_candidates_new(self, sid: str, candidates_s: str, profile: str) -> None:
309 """Called when new ICE candidates are received
310
311 @param sid: Session identifier
312 @param candidates_s: ICE candidates serialized
313 @param profile: Profile associated with the action
314 """
315 if sid != self.sid:
316 log.debug(
317 f"ignoring peer ice candidates for {sid=} ({self.sid=})."
318 )
319 return
320 candidates = json.loads(candidates_s)
321 aio.run(self.on_ice_candidates_new(candidates))
322
323 async def on_ice_candidates_new(self, candidates: dict) -> None:
324 """Called when new ICE canidates are received from peer
325
326 @param candidates: Dictionary containing new ICE candidates
327 """
328 log.debug(f"new peer candidates received: {candidates}")
329 if (
330 self._peer_connection is None
331 or self._peer_connection.remoteDescription is None
332 ):
333 for media_type in ("audio", "video"):
334 media_candidates = candidates.get(media_type)
335 if media_candidates:
336 buffer = self.candidates_buffer[media_type]
337 buffer["candidates"].extend(media_candidates["candidates"])
338 return
339 for media_type, ice_data in candidates.items():
340 for candidate in ice_data["candidates"]:
341 candidate_sdp = self.build_ice_candidate(candidate)
342 try:
343 sdp_mline_index = self.get_sdp_mline_index(media_type)
344 except Exception as e:
345 log.warning(e)
346 continue
347 ice_candidate = window.RTCIceCandidate.new({
348 "candidate": candidate_sdp,
349 "sdpMLineIndex": sdp_mline_index
350 }
351 )
352 await self._peer_connection.addIceCandidate(ice_candidate)
353
354 def on_track(self, event):
355 """New track has been received from peer
356
357 @param event: Event associated with the new track
358 """
359 if event.streams and event.streams[0]:
360 remote_stream = event.streams[0]
361 self.remote_video_elt.srcObject = remote_stream
362 else:
363 if self.remote_stream is None:
364 self.remote_stream = window.MediaStream.new()
365 self.remote_video_elt.srcObject = self.remote_stream
366 self.remote_stream.addTrack(event.track)
367
368 def on_negotiation_needed(self, event) -> None:
369 log.debug(f"on_negotiation_needed {event=}")
370 # TODO
371
372 async def answer_call(self, sid: str, offer_sdp: str, profile: str):
373 """We respond to the call"""
374 log.debug("answering call")
375 if sid != self.sid:
376 raise Exception(
377 f"Internal Error: unexpected sid: {sid=} {self.sid=}"
378 )
379 await self._create_peer_connection()
380
381 await self._peer_connection.setRemoteDescription({
382 "type": "offer",
383 "sdp": offer_sdp
384 })
385 await self.on_ice_candidates_new(self.candidates_buffer)
386 self.candidates_buffer.clear()
387 await self._get_user_media()
388
389 # Gather local ICE candidates
390 local_ice_data = await self._gather_ice_candidates(False)
391 self.local_candidates = local_ice_data["candidates"]
392
393 await bridge.call_answer_sdp(sid, self._peer_connection.localDescription.sdp)
394
395 async def make_call(
396 self,
397 callee_jid: jid.JID,
398 audio: bool = True,
399 video: bool = True
400 ) -> None:
401 """Start a WebRTC call
402
403 @param audio: True if an audio flux is required
404 @param video: True if a video flux is required
405 """
406 await self._create_peer_connection()
407 await self._get_user_media(audio, video)
408 await self._gather_ice_candidates(True)
409
410 call_data = {
411 "sdp": self._peer_connection.localDescription.sdp
412 }
413 log.info(f"calling {callee_jid!r}")
414 self.sid = await bridge.call_start(
415 str(callee_jid),
416 json.dumps(call_data)
417 )
418 log.debug(f"Call SID: {self.sid}")
419
420 async def end_call(self) -> None:
421 """Stop streaming and clean instance"""
422 if self._peer_connection is None:
423 log.debug("There is currently no call to end.")
424 else:
425 self._peer_connection.removeEventListener("track", self.on_track)
426 self._peer_connection.removeEventListener("negotiationneeded", self.on_negotiation_needed)
427 self._peer_connection.removeEventListener("icecandidate", self.on_ice_candidate)
428 self._peer_connection.removeEventListener("icegatheringstatechange", self.on_ice_gathering_state_change)
429
430 # Base64 encoded 1x1 black pixel image
431 # this is a trick to reset the image displayed, so we don't see last image of
432 # last stream
433 black_image_data = (
434 ""
435 "lEQVR42mP8/wcAAwAB/uzNq7sAAAAASUVORK5CYII="
436 )
437
438 local_video = self.local_video_elt
439 remote_video = self.remote_video_elt
440 if local_video.srcObject:
441 for track in local_video.srcObject.getTracks():
442 track.stop()
443 local_video.src = black_image_data
444
445 if remote_video.srcObject:
446 for track in remote_video.srcObject.getTracks():
447 track.stop()
448 remote_video.src = black_image_data
449
450 self._peer_connection.close()
451 self.reset_instance()
452
453 def toggle_media_mute(self, media_type: str) -> bool:
454 """Toggle mute/unmute for media tracks.
455
456 @param media_type: 'audio' or 'video'. Determines which media tracks
457 to process.
458 """
459 assert media_type in ("audio", "video"), "Invalid media type"
460
461 local_video = self.local_video_elt
462 is_muted_attr = f"is_{media_type}_muted"
463
464 if local_video.srcObject:
465 track_getter = getattr(local_video.srcObject, f"get{media_type.capitalize()}Tracks")
466 for track in track_getter():
467 track.enabled = not track.enabled
468 setattr(self, is_muted_attr, not track.enabled)
469
470 media_name = self.media_types_inv.get(media_type)
471 if media_name is not None:
472 extra = {"name": str(media_name)}
473 aio.run(
474 bridge.call_info(
475 self.sid,
476 "mute" if getattr(self, is_muted_attr) else "unmute",
477 json.dumps(extra),
478 )
479 )
480
481 return getattr(self, is_muted_attr)
482
483 def toggle_audio_mute(self) -> bool:
484 """Toggle mute/unmute for audio tracks."""
485 return self.toggle_media_mute("audio")
486
487 def toggle_video_mute(self) -> bool:
488 """Toggle mute/unmute for video tracks."""
489 return self.toggle_media_mute("video")