comparison libervia/web/pages/calls/_browser/__init__.py @ 1518:eb00d593801d

refactoring: rename `libervia` to `libervia.web` + update imports following backend changes
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 16:49:28 +0200
parents libervia/pages/calls/_browser/__init__.py@b8ed9726525b
children e47c24204449
comparison
equal deleted inserted replaced
1517:b8ed9726525b 1518:eb00d593801d
1 import json
2 import re
3
4 from bridge import AsyncBridge as Bridge
5 from browser import aio, console as log, document, timer, window
6 import errors
7 import loading
8
9 log.warning = log.warn
10 profile = window.profile or ""
11 bridge = Bridge()
12 GATHER_TIMEOUT = 10000
13
14
15 class WebRTCCall:
16
17 def __init__(self):
18 self.reset_instance()
19
20 def reset_instance(self):
21 """Inits or resets the instance variables to their default state."""
22 self._peer_connection = None
23 self._media_types = None
24 self.sid = None
25 self.local_candidates = None
26 self.remote_stream = None
27 self.candidates_buffer = {
28 "audio": {"candidates": []},
29 "video": {"candidates": []},
30 }
31 self.media_candidates = {}
32 self.candidates_gathered = aio.Future()
33
34 @property
35 def media_types(self):
36 if self._media_types is None:
37 raise Exception("self._media_types should not be None!")
38 return self._media_types
39
40 def get_sdp_mline_index(self, media_type):
41 """Gets the sdpMLineIndex for a given media type.
42
43 @param media_type: The type of the media.
44 """
45 for index, m_type in self.media_types.items():
46 if m_type == media_type:
47 return index
48 raise ValueError(f"Media type '{media_type}' not found")
49
50 def extract_pwd_ufrag(self, sdp):
51 """Retrieves ICE password and user fragment for SDP offer.
52
53 @param sdp: The Session Description Protocol offer string.
54 """
55 ufrag_line = re.search(r"ice-ufrag:(\S+)", sdp)
56 pwd_line = re.search(r"ice-pwd:(\S+)", sdp)
57
58 if ufrag_line and pwd_line:
59 return ufrag_line.group(1), pwd_line.group(1)
60 else:
61 log.error(f"SDP with missing ice-ufrag or ice-pwd:\n{sdp}")
62 raise ValueError("Can't extract ice-ufrag and ice-pwd from SDP")
63
64 def extract_fingerprint_data(self, sdp):
65 """Retrieves fingerprint data from an SDP offer.
66
67 @param sdp: The Session Description Protocol offer string.
68 @return: A dictionary containing the fingerprint data.
69 """
70 fingerprint_line = re.search(r"a=fingerprint:(\S+)\s+(\S+)", sdp)
71 if fingerprint_line:
72 algorithm, fingerprint = fingerprint_line.groups()
73 fingerprint_data = {
74 "hash": algorithm,
75 "fingerprint": fingerprint
76 }
77
78 setup_line = re.search(r"a=setup:(\S+)", sdp)
79 if setup_line:
80 setup = setup_line.group(1)
81 fingerprint_data["setup"] = setup
82
83 return fingerprint_data
84 else:
85 raise ValueError("fingerprint should not be missing")
86
87 def parse_ice_candidate(self, candidate_string):
88 """Parses the ice candidate string.
89
90 @param candidate_string: The ice candidate string to be parsed.
91 """
92 pattern = re.compile(
93 r"candidate:(?P<foundation>\S+) (?P<component_id>\d+) (?P<transport>\S+) "
94 r"(?P<priority>\d+) (?P<address>\S+) (?P<port>\d+) typ "
95 r"(?P<type>\S+)(?: raddr (?P<rel_addr>\S+) rport "
96 r"(?P<rel_port>\d+))?(?: generation (?P<generation>\d+))?"
97 )
98 match = pattern.match(candidate_string)
99 if match:
100 candidate_dict = match.groupdict()
101
102 # Apply the correct types to the dictionary values
103 candidate_dict["component_id"] = int(candidate_dict["component_id"])
104 candidate_dict["priority"] = int(candidate_dict["priority"])
105 candidate_dict["port"] = int(candidate_dict["port"])
106
107 if candidate_dict["rel_port"]:
108 candidate_dict["rel_port"] = int(candidate_dict["rel_port"])
109
110 if candidate_dict["generation"]:
111 candidate_dict["generation"] = candidate_dict["generation"]
112
113 # Remove None values
114 return {k: v for k, v in candidate_dict.items() if v is not None}
115 else:
116 log.warning(f"can't parse candidate: {candidate_string!r}")
117 return None
118
119 def build_ice_candidate(self, parsed_candidate):
120 """Builds ICE candidate
121
122 @param parsed_candidate: Dictionary containing parsed ICE candidate
123 """
124 base_format = (
125 "candidate:{foundation} {component_id} {transport} {priority} "
126 "{address} {port} typ {type}"
127 )
128
129 if ((parsed_candidate.get('rel_addr')
130 and parsed_candidate.get('rel_port'))):
131 base_format += " raddr {rel_addr} rport {rel_port}"
132
133 if parsed_candidate.get('generation'):
134 base_format += " generation {generation}"
135
136 return base_format.format(**parsed_candidate)
137
138 def on_ice_candidate(self, event):
139 """Handles ICE candidate event
140
141 @param event: Event containing the ICE candidate
142 """
143 log.debug(f"on ice candidate {event.candidate=}")
144 if event.candidate and event.candidate.candidate:
145 window.last_event = event
146 parsed_candidate = self.parse_ice_candidate(event.candidate.candidate)
147 if parsed_candidate is None:
148 return
149 try:
150 media_type = self.media_types[event.candidate.sdpMLineIndex]
151 except (TypeError, IndexError):
152 log.error(
153 f"Can't find media type.\n{event.candidate=}\n{self._media_types=}"
154 )
155 return
156 self.media_candidates.setdefault(media_type, []).append(parsed_candidate)
157 log.debug(f"ICE candidate [{media_type}]: {event.candidate.candidate}")
158 else:
159 log.debug("All ICE candidates gathered")
160
161 def _set_media_types(self, offer):
162 """Sets media types from offer SDP
163
164 @param offer: RTC session description containing the offer
165 """
166 sdp_lines = offer.sdp.splitlines()
167 media_types = {}
168 mline_index = 0
169
170 for line in sdp_lines:
171 if line.startswith("m="):
172 media_types[mline_index] = line[2:line.find(" ")]
173 mline_index += 1
174
175 self._media_types = media_types
176
177 def on_ice_gathering_state_change(self, event):
178 """Handles ICE gathering state change
179
180 @param event: Event containing the ICE gathering state change
181 """
182 connection = event.target
183 log.debug(f"on_ice_gathering_state_change {connection.iceGatheringState=}")
184 if connection.iceGatheringState == "complete":
185 log.info("ICE candidates gathering done")
186 self.candidates_gathered.set_result(None)
187
188 async def _create_peer_connection(
189 self,
190 ):
191 """Creates peer connection"""
192 if self._peer_connection is not None:
193 raise Exception("create_peer_connection can't be called twice!")
194
195 external_disco = json.loads(await bridge.external_disco_get(""))
196 ice_servers = []
197
198 for server in external_disco:
199 ice_server = {}
200 if server["type"] == "stun":
201 ice_server["urls"] = f"stun:{server['host']}:{server['port']}"
202 elif server["type"] == "turn":
203 ice_server["urls"] = (
204 f"turn:{server['host']}:{server['port']}?transport={server['transport']}"
205 )
206 ice_server["username"] = server["username"]
207 ice_server["credential"] = server["password"]
208 ice_servers.append(ice_server)
209
210 rtc_configuration = {"iceServers": ice_servers}
211
212 peer_connection = window.RTCPeerConnection.new(rtc_configuration)
213 peer_connection.addEventListener("track", self.on_track)
214 peer_connection.addEventListener("negotiationneeded", self.on_negotiation_needed)
215 peer_connection.addEventListener("icecandidate", self.on_ice_candidate)
216 peer_connection.addEventListener("icegatheringstatechange", self.on_ice_gathering_state_change)
217
218 self._peer_connection = peer_connection
219 window.pc = self._peer_connection
220
221 async def _get_user_media(
222 self,
223 audio: bool = True,
224 video: bool = True
225 ):
226 """Gets user media
227
228 @param audio: True if an audio flux is required
229 @param video: True if a video flux is required
230 """
231 media_constraints = {'audio': audio, 'video': video}
232 local_stream = await window.navigator.mediaDevices.getUserMedia(media_constraints)
233 document["local_video"].srcObject = local_stream
234
235 for track in local_stream.getTracks():
236 self._peer_connection.addTrack(track)
237
238 async def _gather_ice_candidates(self, is_initiator: bool, remote_candidates=None):
239 """Get ICE candidates and wait to have them all before returning them
240
241 @param is_initiator: Boolean indicating if the user is the initiator of the connection
242 @param remote_candidates: Remote ICE candidates, if any
243 """
244 if self._peer_connection is None:
245 raise Exception("The peer connection must be created before gathering ICE candidates!")
246
247 self.media_candidates.clear()
248 gather_timeout = timer.set_timeout(
249 lambda: self.candidates_gathered.set_exception(
250 errors.TimeoutError("ICE gathering time out")
251 ),
252 GATHER_TIMEOUT
253 )
254
255 if is_initiator:
256 offer = await self._peer_connection.createOffer()
257 self._set_media_types(offer)
258 await self._peer_connection.setLocalDescription(offer)
259 else:
260 answer = await self._peer_connection.createAnswer()
261 self._set_media_types(answer)
262 await self._peer_connection.setLocalDescription(answer)
263
264 if not is_initiator:
265 log.debug(self._peer_connection.localDescription.sdp)
266 await self.candidates_gathered
267 log.debug(self._peer_connection.localDescription.sdp)
268 timer.clear_timeout(gather_timeout)
269 ufrag, pwd = self.extract_pwd_ufrag(self._peer_connection.localDescription.sdp)
270 return {
271 "ufrag": ufrag,
272 "pwd": pwd,
273 "candidates": self.media_candidates,
274 }
275
276 def on_action_new(
277 self, action_data_s: str, action_id: str, security_limit: int, profile: str
278 ) -> None:
279 """Called when a call is received
280
281 @param action_data_s: Action data serialized
282 @param action_id: Unique identifier for the action
283 @param security_limit: Security limit for the action
284 @param profile: Profile associated with the action
285 """
286 action_data = json.loads(action_data_s)
287 if action_data.get("type") != "call":
288 return
289 peer_jid = action_data["from_jid"]
290 log.info(
291 f"{peer_jid} wants to start a call ({action_data['sub_type']})"
292 )
293 if self.sid is not None:
294 log.warning(
295 f"already in a call ({self.sid}), can't receive a new call from "
296 f"{peer_jid}"
297 )
298 return
299 self.sid = action_data["session_id"]
300 log.debug(f"Call SID: {self.sid}")
301
302 # Answer the call
303 offer_sdp = action_data["sdp"]
304 aio.run(self.answer_call(offer_sdp, action_id))
305
306 def _on_call_accepted(self, session_id: str, sdp: str, profile: str) -> None:
307 """Called when we have received answer SDP from responder
308
309 @param session_id: Session identifier
310 @param sdp: Session Description Protocol data
311 @param profile: Profile associated with the action
312 """
313 aio.run(self.on_call_accepted(session_id, sdp, profile))
314
315 def _on_call_ended(self, session_id: str, data_s: str, profile: str) -> None:
316 """Call has been terminated
317
318 @param session_id: Session identifier
319 @param data_s: Serialised additional data on why the call has ended
320 @param profile: Profile associated
321 """
322 if self.sid is None:
323 log.debug("there are no calls in progress")
324 return
325 if session_id != self.sid:
326 log.debug(
327 f"ignoring call_ended not linked to our call ({self.sid}): {session_id}"
328 )
329 return
330 aio.run(self.end_call())
331
332 async def on_call_accepted(self, session_id: str, sdp: str, profile: str) -> None:
333 """Call has been accepted, connection can be established
334
335 @param session_id: Session identifier
336 @param sdp: Session Description Protocol data
337 @param profile: Profile associated
338 """
339 if self.sid != session_id:
340 log.debug(
341 f"Call ignored due to different session ID ({self.sid=} {session_id=})"
342 )
343 return
344 await self._peer_connection.setRemoteDescription({
345 "type": "answer",
346 "sdp": sdp
347 })
348 await self.on_ice_candidates_new(self.candidates_buffer)
349 self.candidates_buffer.clear()
350
351 def _on_ice_candidates_new(self, sid: str, candidates_s: str, profile: str) -> None:
352 """Called when new ICE candidates are received
353
354 @param sid: Session identifier
355 @param candidates_s: ICE candidates serialized
356 @param profile: Profile associated with the action
357 """
358 if sid != self.sid:
359 log.debug(
360 f"ignoring peer ice candidates for {sid=} ({self.sid=})."
361 )
362 return
363 candidates = json.loads(candidates_s)
364 aio.run(self.on_ice_candidates_new(candidates))
365
366 async def on_ice_candidates_new(self, candidates: dict) -> None:
367 """Called when new ICE canidates are received from peer
368
369 @param candidates: Dictionary containing new ICE candidates
370 """
371 log.debug(f"new peer candidates received: {candidates}")
372 if (
373 self._peer_connection is None
374 or self._peer_connection.remoteDescription is None
375 ):
376 for media_type in ("audio", "video"):
377 media_candidates = candidates.get(media_type)
378 if media_candidates:
379 buffer = self.candidates_buffer[media_type]
380 buffer["candidates"].extend(media_candidates["candidates"])
381 return
382 for media_type, ice_data in candidates.items():
383 for candidate in ice_data["candidates"]:
384 candidate_sdp = self.build_ice_candidate(candidate)
385 try:
386 sdp_mline_index = self.get_sdp_mline_index(media_type)
387 except Exception as e:
388 log.warning(e)
389 continue
390 ice_candidate = window.RTCIceCandidate.new({
391 "candidate": candidate_sdp,
392 "sdpMLineIndex": sdp_mline_index
393 }
394 )
395 await self._peer_connection.addIceCandidate(ice_candidate)
396
397 def on_track(self, event):
398 """New track has been received from peer
399
400 @param event: Event associated with the new track
401 """
402 if event.streams and event.streams[0]:
403 remote_stream = event.streams[0]
404 document["remote_video"].srcObject = remote_stream
405 else:
406 if self.remote_stream is None:
407 self.remote_stream = window.MediaStream.new()
408 document["remote_video"].srcObject = self.remote_stream
409 self.remote_stream.addTrack(event.track)
410
411 document["call_btn"].classList.add("is-hidden")
412 document["hangup_btn"].classList.remove("is-hidden")
413
414 def on_negotiation_needed(self, event) -> None:
415 log.debug(f"on_negotiation_needed {event=}")
416 # TODO
417
418 async def answer_call(self, offer_sdp: str, action_id: str):
419 """We respond to the call"""
420 log.debug("answering call")
421 await self._create_peer_connection()
422
423 await self._peer_connection.setRemoteDescription({
424 "type": "offer",
425 "sdp": offer_sdp
426 })
427 await self.on_ice_candidates_new(self.candidates_buffer)
428 self.candidates_buffer.clear()
429 await self._get_user_media()
430
431 # Gather local ICE candidates
432 local_ice_data = await self._gather_ice_candidates(False)
433 self.local_candidates = local_ice_data["candidates"]
434
435 await bridge.action_launch(
436 action_id,
437 json.dumps({
438 "sdp": self._peer_connection.localDescription.sdp,
439 })
440 )
441
442 async def make_call(self, audio: bool = True, video: bool = True) -> None:
443 """Start a WebRTC call
444
445 @param audio: True if an audio flux is required
446 @param video: True if a video flux is required
447 """
448 await self._create_peer_connection()
449 await self._get_user_media(audio, video)
450 await self._gather_ice_candidates(True)
451 callee_jid = document["callee_jid"].value
452
453 call_data = {
454 "sdp": self._peer_connection.localDescription.sdp
455 }
456 log.info(f"calling {callee_jid!r}")
457 self.sid = await bridge.call_start(
458 callee_jid,
459 json.dumps(call_data)
460 )
461 log.debug(f"Call SID: {self.sid}")
462
463 async def end_call(self) -> None:
464 """Stop streaming and clean instance"""
465 document["hangup_btn"].classList.add("is-hidden")
466 document["call_btn"].classList.remove("is-hidden")
467 if self._peer_connection is None:
468 log.debug("There is currently no call to end.")
469 else:
470 self._peer_connection.removeEventListener("track", self.on_track)
471 self._peer_connection.removeEventListener("negotiationneeded", self.on_negotiation_needed)
472 self._peer_connection.removeEventListener("icecandidate", self.on_ice_candidate)
473 self._peer_connection.removeEventListener("icegatheringstatechange", self.on_ice_gathering_state_change)
474
475 local_video = document["local_video"]
476 remote_video = document["remote_video"]
477 if local_video.srcObject:
478 for track in local_video.srcObject.getTracks():
479 track.stop()
480 if remote_video.srcObject:
481 for track in remote_video.srcObject.getTracks():
482 track.stop()
483
484 self._peer_connection.close()
485 self.reset_instance()
486
487 async def hand_up(self) -> None:
488 """Terminate the call"""
489 session_id = self.sid
490 await self.end_call()
491 await bridge.call_end(
492 session_id,
493 ""
494 )
495
496
497 webrtc_call = WebRTCCall()
498
499 document["call_btn"].bind(
500 "click",
501 lambda __: aio.run(webrtc_call.make_call())
502 )
503 document["hangup_btn"].bind(
504 "click",
505 lambda __: aio.run(webrtc_call.hand_up())
506 )
507
508 bridge.register_signal("action_new", webrtc_call.on_action_new)
509 bridge.register_signal("call_accepted", webrtc_call._on_call_accepted)
510 bridge.register_signal("call_ended", webrtc_call._on_call_ended)
511 bridge.register_signal("ice_candidates_new", webrtc_call._on_ice_candidates_new)
512
513 loading.remove_loading_screen()