Mercurial > libervia-web
comparison libervia/web/pages/calls/_browser/__init__.py @ 1518:eb00d593801d
refactoring: rename `libervia` to `libervia.web` + update imports following backend changes
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 16:49:28 +0200 |
parents | libervia/pages/calls/_browser/__init__.py@b8ed9726525b |
children | e47c24204449 |
comparison
equal
deleted
inserted
replaced
1517:b8ed9726525b | 1518:eb00d593801d |
---|---|
1 import json | |
2 import re | |
3 | |
4 from bridge import AsyncBridge as Bridge | |
5 from browser import aio, console as log, document, timer, window | |
6 import errors | |
7 import loading | |
8 | |
9 log.warning = log.warn | |
10 profile = window.profile or "" | |
11 bridge = Bridge() | |
12 GATHER_TIMEOUT = 10000 | |
13 | |
14 | |
15 class WebRTCCall: | |
16 | |
17 def __init__(self): | |
18 self.reset_instance() | |
19 | |
20 def reset_instance(self): | |
21 """Inits or resets the instance variables to their default state.""" | |
22 self._peer_connection = None | |
23 self._media_types = None | |
24 self.sid = None | |
25 self.local_candidates = None | |
26 self.remote_stream = None | |
27 self.candidates_buffer = { | |
28 "audio": {"candidates": []}, | |
29 "video": {"candidates": []}, | |
30 } | |
31 self.media_candidates = {} | |
32 self.candidates_gathered = aio.Future() | |
33 | |
34 @property | |
35 def media_types(self): | |
36 if self._media_types is None: | |
37 raise Exception("self._media_types should not be None!") | |
38 return self._media_types | |
39 | |
40 def get_sdp_mline_index(self, media_type): | |
41 """Gets the sdpMLineIndex for a given media type. | |
42 | |
43 @param media_type: The type of the media. | |
44 """ | |
45 for index, m_type in self.media_types.items(): | |
46 if m_type == media_type: | |
47 return index | |
48 raise ValueError(f"Media type '{media_type}' not found") | |
49 | |
50 def extract_pwd_ufrag(self, sdp): | |
51 """Retrieves ICE password and user fragment for SDP offer. | |
52 | |
53 @param sdp: The Session Description Protocol offer string. | |
54 """ | |
55 ufrag_line = re.search(r"ice-ufrag:(\S+)", sdp) | |
56 pwd_line = re.search(r"ice-pwd:(\S+)", sdp) | |
57 | |
58 if ufrag_line and pwd_line: | |
59 return ufrag_line.group(1), pwd_line.group(1) | |
60 else: | |
61 log.error(f"SDP with missing ice-ufrag or ice-pwd:\n{sdp}") | |
62 raise ValueError("Can't extract ice-ufrag and ice-pwd from SDP") | |
63 | |
64 def extract_fingerprint_data(self, sdp): | |
65 """Retrieves fingerprint data from an SDP offer. | |
66 | |
67 @param sdp: The Session Description Protocol offer string. | |
68 @return: A dictionary containing the fingerprint data. | |
69 """ | |
70 fingerprint_line = re.search(r"a=fingerprint:(\S+)\s+(\S+)", sdp) | |
71 if fingerprint_line: | |
72 algorithm, fingerprint = fingerprint_line.groups() | |
73 fingerprint_data = { | |
74 "hash": algorithm, | |
75 "fingerprint": fingerprint | |
76 } | |
77 | |
78 setup_line = re.search(r"a=setup:(\S+)", sdp) | |
79 if setup_line: | |
80 setup = setup_line.group(1) | |
81 fingerprint_data["setup"] = setup | |
82 | |
83 return fingerprint_data | |
84 else: | |
85 raise ValueError("fingerprint should not be missing") | |
86 | |
87 def parse_ice_candidate(self, candidate_string): | |
88 """Parses the ice candidate string. | |
89 | |
90 @param candidate_string: The ice candidate string to be parsed. | |
91 """ | |
92 pattern = re.compile( | |
93 r"candidate:(?P<foundation>\S+) (?P<component_id>\d+) (?P<transport>\S+) " | |
94 r"(?P<priority>\d+) (?P<address>\S+) (?P<port>\d+) typ " | |
95 r"(?P<type>\S+)(?: raddr (?P<rel_addr>\S+) rport " | |
96 r"(?P<rel_port>\d+))?(?: generation (?P<generation>\d+))?" | |
97 ) | |
98 match = pattern.match(candidate_string) | |
99 if match: | |
100 candidate_dict = match.groupdict() | |
101 | |
102 # Apply the correct types to the dictionary values | |
103 candidate_dict["component_id"] = int(candidate_dict["component_id"]) | |
104 candidate_dict["priority"] = int(candidate_dict["priority"]) | |
105 candidate_dict["port"] = int(candidate_dict["port"]) | |
106 | |
107 if candidate_dict["rel_port"]: | |
108 candidate_dict["rel_port"] = int(candidate_dict["rel_port"]) | |
109 | |
110 if candidate_dict["generation"]: | |
111 candidate_dict["generation"] = candidate_dict["generation"] | |
112 | |
113 # Remove None values | |
114 return {k: v for k, v in candidate_dict.items() if v is not None} | |
115 else: | |
116 log.warning(f"can't parse candidate: {candidate_string!r}") | |
117 return None | |
118 | |
119 def build_ice_candidate(self, parsed_candidate): | |
120 """Builds ICE candidate | |
121 | |
122 @param parsed_candidate: Dictionary containing parsed ICE candidate | |
123 """ | |
124 base_format = ( | |
125 "candidate:{foundation} {component_id} {transport} {priority} " | |
126 "{address} {port} typ {type}" | |
127 ) | |
128 | |
129 if ((parsed_candidate.get('rel_addr') | |
130 and parsed_candidate.get('rel_port'))): | |
131 base_format += " raddr {rel_addr} rport {rel_port}" | |
132 | |
133 if parsed_candidate.get('generation'): | |
134 base_format += " generation {generation}" | |
135 | |
136 return base_format.format(**parsed_candidate) | |
137 | |
138 def on_ice_candidate(self, event): | |
139 """Handles ICE candidate event | |
140 | |
141 @param event: Event containing the ICE candidate | |
142 """ | |
143 log.debug(f"on ice candidate {event.candidate=}") | |
144 if event.candidate and event.candidate.candidate: | |
145 window.last_event = event | |
146 parsed_candidate = self.parse_ice_candidate(event.candidate.candidate) | |
147 if parsed_candidate is None: | |
148 return | |
149 try: | |
150 media_type = self.media_types[event.candidate.sdpMLineIndex] | |
151 except (TypeError, IndexError): | |
152 log.error( | |
153 f"Can't find media type.\n{event.candidate=}\n{self._media_types=}" | |
154 ) | |
155 return | |
156 self.media_candidates.setdefault(media_type, []).append(parsed_candidate) | |
157 log.debug(f"ICE candidate [{media_type}]: {event.candidate.candidate}") | |
158 else: | |
159 log.debug("All ICE candidates gathered") | |
160 | |
161 def _set_media_types(self, offer): | |
162 """Sets media types from offer SDP | |
163 | |
164 @param offer: RTC session description containing the offer | |
165 """ | |
166 sdp_lines = offer.sdp.splitlines() | |
167 media_types = {} | |
168 mline_index = 0 | |
169 | |
170 for line in sdp_lines: | |
171 if line.startswith("m="): | |
172 media_types[mline_index] = line[2:line.find(" ")] | |
173 mline_index += 1 | |
174 | |
175 self._media_types = media_types | |
176 | |
177 def on_ice_gathering_state_change(self, event): | |
178 """Handles ICE gathering state change | |
179 | |
180 @param event: Event containing the ICE gathering state change | |
181 """ | |
182 connection = event.target | |
183 log.debug(f"on_ice_gathering_state_change {connection.iceGatheringState=}") | |
184 if connection.iceGatheringState == "complete": | |
185 log.info("ICE candidates gathering done") | |
186 self.candidates_gathered.set_result(None) | |
187 | |
188 async def _create_peer_connection( | |
189 self, | |
190 ): | |
191 """Creates peer connection""" | |
192 if self._peer_connection is not None: | |
193 raise Exception("create_peer_connection can't be called twice!") | |
194 | |
195 external_disco = json.loads(await bridge.external_disco_get("")) | |
196 ice_servers = [] | |
197 | |
198 for server in external_disco: | |
199 ice_server = {} | |
200 if server["type"] == "stun": | |
201 ice_server["urls"] = f"stun:{server['host']}:{server['port']}" | |
202 elif server["type"] == "turn": | |
203 ice_server["urls"] = ( | |
204 f"turn:{server['host']}:{server['port']}?transport={server['transport']}" | |
205 ) | |
206 ice_server["username"] = server["username"] | |
207 ice_server["credential"] = server["password"] | |
208 ice_servers.append(ice_server) | |
209 | |
210 rtc_configuration = {"iceServers": ice_servers} | |
211 | |
212 peer_connection = window.RTCPeerConnection.new(rtc_configuration) | |
213 peer_connection.addEventListener("track", self.on_track) | |
214 peer_connection.addEventListener("negotiationneeded", self.on_negotiation_needed) | |
215 peer_connection.addEventListener("icecandidate", self.on_ice_candidate) | |
216 peer_connection.addEventListener("icegatheringstatechange", self.on_ice_gathering_state_change) | |
217 | |
218 self._peer_connection = peer_connection | |
219 window.pc = self._peer_connection | |
220 | |
221 async def _get_user_media( | |
222 self, | |
223 audio: bool = True, | |
224 video: bool = True | |
225 ): | |
226 """Gets user media | |
227 | |
228 @param audio: True if an audio flux is required | |
229 @param video: True if a video flux is required | |
230 """ | |
231 media_constraints = {'audio': audio, 'video': video} | |
232 local_stream = await window.navigator.mediaDevices.getUserMedia(media_constraints) | |
233 document["local_video"].srcObject = local_stream | |
234 | |
235 for track in local_stream.getTracks(): | |
236 self._peer_connection.addTrack(track) | |
237 | |
238 async def _gather_ice_candidates(self, is_initiator: bool, remote_candidates=None): | |
239 """Get ICE candidates and wait to have them all before returning them | |
240 | |
241 @param is_initiator: Boolean indicating if the user is the initiator of the connection | |
242 @param remote_candidates: Remote ICE candidates, if any | |
243 """ | |
244 if self._peer_connection is None: | |
245 raise Exception("The peer connection must be created before gathering ICE candidates!") | |
246 | |
247 self.media_candidates.clear() | |
248 gather_timeout = timer.set_timeout( | |
249 lambda: self.candidates_gathered.set_exception( | |
250 errors.TimeoutError("ICE gathering time out") | |
251 ), | |
252 GATHER_TIMEOUT | |
253 ) | |
254 | |
255 if is_initiator: | |
256 offer = await self._peer_connection.createOffer() | |
257 self._set_media_types(offer) | |
258 await self._peer_connection.setLocalDescription(offer) | |
259 else: | |
260 answer = await self._peer_connection.createAnswer() | |
261 self._set_media_types(answer) | |
262 await self._peer_connection.setLocalDescription(answer) | |
263 | |
264 if not is_initiator: | |
265 log.debug(self._peer_connection.localDescription.sdp) | |
266 await self.candidates_gathered | |
267 log.debug(self._peer_connection.localDescription.sdp) | |
268 timer.clear_timeout(gather_timeout) | |
269 ufrag, pwd = self.extract_pwd_ufrag(self._peer_connection.localDescription.sdp) | |
270 return { | |
271 "ufrag": ufrag, | |
272 "pwd": pwd, | |
273 "candidates": self.media_candidates, | |
274 } | |
275 | |
276 def on_action_new( | |
277 self, action_data_s: str, action_id: str, security_limit: int, profile: str | |
278 ) -> None: | |
279 """Called when a call is received | |
280 | |
281 @param action_data_s: Action data serialized | |
282 @param action_id: Unique identifier for the action | |
283 @param security_limit: Security limit for the action | |
284 @param profile: Profile associated with the action | |
285 """ | |
286 action_data = json.loads(action_data_s) | |
287 if action_data.get("type") != "call": | |
288 return | |
289 peer_jid = action_data["from_jid"] | |
290 log.info( | |
291 f"{peer_jid} wants to start a call ({action_data['sub_type']})" | |
292 ) | |
293 if self.sid is not None: | |
294 log.warning( | |
295 f"already in a call ({self.sid}), can't receive a new call from " | |
296 f"{peer_jid}" | |
297 ) | |
298 return | |
299 self.sid = action_data["session_id"] | |
300 log.debug(f"Call SID: {self.sid}") | |
301 | |
302 # Answer the call | |
303 offer_sdp = action_data["sdp"] | |
304 aio.run(self.answer_call(offer_sdp, action_id)) | |
305 | |
306 def _on_call_accepted(self, session_id: str, sdp: str, profile: str) -> None: | |
307 """Called when we have received answer SDP from responder | |
308 | |
309 @param session_id: Session identifier | |
310 @param sdp: Session Description Protocol data | |
311 @param profile: Profile associated with the action | |
312 """ | |
313 aio.run(self.on_call_accepted(session_id, sdp, profile)) | |
314 | |
315 def _on_call_ended(self, session_id: str, data_s: str, profile: str) -> None: | |
316 """Call has been terminated | |
317 | |
318 @param session_id: Session identifier | |
319 @param data_s: Serialised additional data on why the call has ended | |
320 @param profile: Profile associated | |
321 """ | |
322 if self.sid is None: | |
323 log.debug("there are no calls in progress") | |
324 return | |
325 if session_id != self.sid: | |
326 log.debug( | |
327 f"ignoring call_ended not linked to our call ({self.sid}): {session_id}" | |
328 ) | |
329 return | |
330 aio.run(self.end_call()) | |
331 | |
332 async def on_call_accepted(self, session_id: str, sdp: str, profile: str) -> None: | |
333 """Call has been accepted, connection can be established | |
334 | |
335 @param session_id: Session identifier | |
336 @param sdp: Session Description Protocol data | |
337 @param profile: Profile associated | |
338 """ | |
339 if self.sid != session_id: | |
340 log.debug( | |
341 f"Call ignored due to different session ID ({self.sid=} {session_id=})" | |
342 ) | |
343 return | |
344 await self._peer_connection.setRemoteDescription({ | |
345 "type": "answer", | |
346 "sdp": sdp | |
347 }) | |
348 await self.on_ice_candidates_new(self.candidates_buffer) | |
349 self.candidates_buffer.clear() | |
350 | |
351 def _on_ice_candidates_new(self, sid: str, candidates_s: str, profile: str) -> None: | |
352 """Called when new ICE candidates are received | |
353 | |
354 @param sid: Session identifier | |
355 @param candidates_s: ICE candidates serialized | |
356 @param profile: Profile associated with the action | |
357 """ | |
358 if sid != self.sid: | |
359 log.debug( | |
360 f"ignoring peer ice candidates for {sid=} ({self.sid=})." | |
361 ) | |
362 return | |
363 candidates = json.loads(candidates_s) | |
364 aio.run(self.on_ice_candidates_new(candidates)) | |
365 | |
366 async def on_ice_candidates_new(self, candidates: dict) -> None: | |
367 """Called when new ICE canidates are received from peer | |
368 | |
369 @param candidates: Dictionary containing new ICE candidates | |
370 """ | |
371 log.debug(f"new peer candidates received: {candidates}") | |
372 if ( | |
373 self._peer_connection is None | |
374 or self._peer_connection.remoteDescription is None | |
375 ): | |
376 for media_type in ("audio", "video"): | |
377 media_candidates = candidates.get(media_type) | |
378 if media_candidates: | |
379 buffer = self.candidates_buffer[media_type] | |
380 buffer["candidates"].extend(media_candidates["candidates"]) | |
381 return | |
382 for media_type, ice_data in candidates.items(): | |
383 for candidate in ice_data["candidates"]: | |
384 candidate_sdp = self.build_ice_candidate(candidate) | |
385 try: | |
386 sdp_mline_index = self.get_sdp_mline_index(media_type) | |
387 except Exception as e: | |
388 log.warning(e) | |
389 continue | |
390 ice_candidate = window.RTCIceCandidate.new({ | |
391 "candidate": candidate_sdp, | |
392 "sdpMLineIndex": sdp_mline_index | |
393 } | |
394 ) | |
395 await self._peer_connection.addIceCandidate(ice_candidate) | |
396 | |
397 def on_track(self, event): | |
398 """New track has been received from peer | |
399 | |
400 @param event: Event associated with the new track | |
401 """ | |
402 if event.streams and event.streams[0]: | |
403 remote_stream = event.streams[0] | |
404 document["remote_video"].srcObject = remote_stream | |
405 else: | |
406 if self.remote_stream is None: | |
407 self.remote_stream = window.MediaStream.new() | |
408 document["remote_video"].srcObject = self.remote_stream | |
409 self.remote_stream.addTrack(event.track) | |
410 | |
411 document["call_btn"].classList.add("is-hidden") | |
412 document["hangup_btn"].classList.remove("is-hidden") | |
413 | |
414 def on_negotiation_needed(self, event) -> None: | |
415 log.debug(f"on_negotiation_needed {event=}") | |
416 # TODO | |
417 | |
418 async def answer_call(self, offer_sdp: str, action_id: str): | |
419 """We respond to the call""" | |
420 log.debug("answering call") | |
421 await self._create_peer_connection() | |
422 | |
423 await self._peer_connection.setRemoteDescription({ | |
424 "type": "offer", | |
425 "sdp": offer_sdp | |
426 }) | |
427 await self.on_ice_candidates_new(self.candidates_buffer) | |
428 self.candidates_buffer.clear() | |
429 await self._get_user_media() | |
430 | |
431 # Gather local ICE candidates | |
432 local_ice_data = await self._gather_ice_candidates(False) | |
433 self.local_candidates = local_ice_data["candidates"] | |
434 | |
435 await bridge.action_launch( | |
436 action_id, | |
437 json.dumps({ | |
438 "sdp": self._peer_connection.localDescription.sdp, | |
439 }) | |
440 ) | |
441 | |
442 async def make_call(self, audio: bool = True, video: bool = True) -> None: | |
443 """Start a WebRTC call | |
444 | |
445 @param audio: True if an audio flux is required | |
446 @param video: True if a video flux is required | |
447 """ | |
448 await self._create_peer_connection() | |
449 await self._get_user_media(audio, video) | |
450 await self._gather_ice_candidates(True) | |
451 callee_jid = document["callee_jid"].value | |
452 | |
453 call_data = { | |
454 "sdp": self._peer_connection.localDescription.sdp | |
455 } | |
456 log.info(f"calling {callee_jid!r}") | |
457 self.sid = await bridge.call_start( | |
458 callee_jid, | |
459 json.dumps(call_data) | |
460 ) | |
461 log.debug(f"Call SID: {self.sid}") | |
462 | |
463 async def end_call(self) -> None: | |
464 """Stop streaming and clean instance""" | |
465 document["hangup_btn"].classList.add("is-hidden") | |
466 document["call_btn"].classList.remove("is-hidden") | |
467 if self._peer_connection is None: | |
468 log.debug("There is currently no call to end.") | |
469 else: | |
470 self._peer_connection.removeEventListener("track", self.on_track) | |
471 self._peer_connection.removeEventListener("negotiationneeded", self.on_negotiation_needed) | |
472 self._peer_connection.removeEventListener("icecandidate", self.on_ice_candidate) | |
473 self._peer_connection.removeEventListener("icegatheringstatechange", self.on_ice_gathering_state_change) | |
474 | |
475 local_video = document["local_video"] | |
476 remote_video = document["remote_video"] | |
477 if local_video.srcObject: | |
478 for track in local_video.srcObject.getTracks(): | |
479 track.stop() | |
480 if remote_video.srcObject: | |
481 for track in remote_video.srcObject.getTracks(): | |
482 track.stop() | |
483 | |
484 self._peer_connection.close() | |
485 self.reset_instance() | |
486 | |
487 async def hand_up(self) -> None: | |
488 """Terminate the call""" | |
489 session_id = self.sid | |
490 await self.end_call() | |
491 await bridge.call_end( | |
492 session_id, | |
493 "" | |
494 ) | |
495 | |
496 | |
497 webrtc_call = WebRTCCall() | |
498 | |
499 document["call_btn"].bind( | |
500 "click", | |
501 lambda __: aio.run(webrtc_call.make_call()) | |
502 ) | |
503 document["hangup_btn"].bind( | |
504 "click", | |
505 lambda __: aio.run(webrtc_call.hand_up()) | |
506 ) | |
507 | |
508 bridge.register_signal("action_new", webrtc_call.on_action_new) | |
509 bridge.register_signal("call_accepted", webrtc_call._on_call_accepted) | |
510 bridge.register_signal("call_ended", webrtc_call._on_call_ended) | |
511 bridge.register_signal("ice_candidates_new", webrtc_call._on_ice_candidates_new) | |
512 | |
513 loading.remove_loading_screen() |