# HG changeset patch # User Goffi # Date 1685648522 -7200 # Node ID b8ed9726525b959b85f11566cf63fa8e9dee9794 # Parent a3ca1bab6eb16696710d2b2b652e83c9331bbde4 browser: "calls" implementation, first draft: Basic page to make a call. Call are automatically accepted for the moment. fix 422 diff -r a3ca1bab6eb1 -r b8ed9726525b libervia/pages/calls/_browser/__init__.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/pages/calls/_browser/__init__.py Thu Jun 01 21:42:02 2023 +0200 @@ -0,0 +1,513 @@ +import json +import re + +from bridge import AsyncBridge as Bridge +from browser import aio, console as log, document, timer, window +import errors +import loading + +log.warning = log.warn +profile = window.profile or "" +bridge = Bridge() +GATHER_TIMEOUT = 10000 + + +class WebRTCCall: + + def __init__(self): + self.reset_instance() + + def reset_instance(self): + """Inits or resets the instance variables to their default state.""" + self._peer_connection = None + self._media_types = None + self.sid = None + self.local_candidates = None + self.remote_stream = None + self.candidates_buffer = { + "audio": {"candidates": []}, + "video": {"candidates": []}, + } + self.media_candidates = {} + self.candidates_gathered = aio.Future() + + @property + def media_types(self): + if self._media_types is None: + raise Exception("self._media_types should not be None!") + return self._media_types + + def get_sdp_mline_index(self, media_type): + """Gets the sdpMLineIndex for a given media type. + + @param media_type: The type of the media. + """ + for index, m_type in self.media_types.items(): + if m_type == media_type: + return index + raise ValueError(f"Media type '{media_type}' not found") + + def extract_pwd_ufrag(self, sdp): + """Retrieves ICE password and user fragment for SDP offer. + + @param sdp: The Session Description Protocol offer string. + """ + ufrag_line = re.search(r"ice-ufrag:(\S+)", sdp) + pwd_line = re.search(r"ice-pwd:(\S+)", sdp) + + if ufrag_line and pwd_line: + return ufrag_line.group(1), pwd_line.group(1) + else: + log.error(f"SDP with missing ice-ufrag or ice-pwd:\n{sdp}") + raise ValueError("Can't extract ice-ufrag and ice-pwd from SDP") + + def extract_fingerprint_data(self, sdp): + """Retrieves fingerprint data from an SDP offer. + + @param sdp: The Session Description Protocol offer string. + @return: A dictionary containing the fingerprint data. + """ + fingerprint_line = re.search(r"a=fingerprint:(\S+)\s+(\S+)", sdp) + if fingerprint_line: + algorithm, fingerprint = fingerprint_line.groups() + fingerprint_data = { + "hash": algorithm, + "fingerprint": fingerprint + } + + setup_line = re.search(r"a=setup:(\S+)", sdp) + if setup_line: + setup = setup_line.group(1) + fingerprint_data["setup"] = setup + + return fingerprint_data + else: + raise ValueError("fingerprint should not be missing") + + def parse_ice_candidate(self, candidate_string): + """Parses the ice candidate string. + + @param candidate_string: The ice candidate string to be parsed. + """ + pattern = re.compile( + r"candidate:(?P\S+) (?P\d+) (?P\S+) " + r"(?P\d+) (?P
\S+) (?P\d+) typ " + r"(?P\S+)(?: raddr (?P\S+) rport " + r"(?P\d+))?(?: generation (?P\d+))?" + ) + match = pattern.match(candidate_string) + if match: + candidate_dict = match.groupdict() + + # Apply the correct types to the dictionary values + candidate_dict["component_id"] = int(candidate_dict["component_id"]) + candidate_dict["priority"] = int(candidate_dict["priority"]) + candidate_dict["port"] = int(candidate_dict["port"]) + + if candidate_dict["rel_port"]: + candidate_dict["rel_port"] = int(candidate_dict["rel_port"]) + + if candidate_dict["generation"]: + candidate_dict["generation"] = candidate_dict["generation"] + + # Remove None values + return {k: v for k, v in candidate_dict.items() if v is not None} + else: + log.warning(f"can't parse candidate: {candidate_string!r}") + return None + + def build_ice_candidate(self, parsed_candidate): + """Builds ICE candidate + + @param parsed_candidate: Dictionary containing parsed ICE candidate + """ + base_format = ( + "candidate:{foundation} {component_id} {transport} {priority} " + "{address} {port} typ {type}" + ) + + if ((parsed_candidate.get('rel_addr') + and parsed_candidate.get('rel_port'))): + base_format += " raddr {rel_addr} rport {rel_port}" + + if parsed_candidate.get('generation'): + base_format += " generation {generation}" + + return base_format.format(**parsed_candidate) + + def on_ice_candidate(self, event): + """Handles ICE candidate event + + @param event: Event containing the ICE candidate + """ + log.debug(f"on ice candidate {event.candidate=}") + if event.candidate and event.candidate.candidate: + window.last_event = event + parsed_candidate = self.parse_ice_candidate(event.candidate.candidate) + if parsed_candidate is None: + return + try: + media_type = self.media_types[event.candidate.sdpMLineIndex] + except (TypeError, IndexError): + log.error( + f"Can't find media type.\n{event.candidate=}\n{self._media_types=}" + ) + return + self.media_candidates.setdefault(media_type, []).append(parsed_candidate) + log.debug(f"ICE candidate [{media_type}]: {event.candidate.candidate}") + else: + log.debug("All ICE candidates gathered") + + def _set_media_types(self, offer): + """Sets media types from offer SDP + + @param offer: RTC session description containing the offer + """ + sdp_lines = offer.sdp.splitlines() + media_types = {} + mline_index = 0 + + for line in sdp_lines: + if line.startswith("m="): + media_types[mline_index] = line[2:line.find(" ")] + mline_index += 1 + + self._media_types = media_types + + def on_ice_gathering_state_change(self, event): + """Handles ICE gathering state change + + @param event: Event containing the ICE gathering state change + """ + connection = event.target + log.debug(f"on_ice_gathering_state_change {connection.iceGatheringState=}") + if connection.iceGatheringState == "complete": + log.info("ICE candidates gathering done") + self.candidates_gathered.set_result(None) + + async def _create_peer_connection( + self, + ): + """Creates peer connection""" + if self._peer_connection is not None: + raise Exception("create_peer_connection can't be called twice!") + + external_disco = json.loads(await bridge.external_disco_get("")) + ice_servers = [] + + for server in external_disco: + ice_server = {} + if server["type"] == "stun": + ice_server["urls"] = f"stun:{server['host']}:{server['port']}" + elif server["type"] == "turn": + ice_server["urls"] = ( + f"turn:{server['host']}:{server['port']}?transport={server['transport']}" + ) + ice_server["username"] = server["username"] + ice_server["credential"] = server["password"] + ice_servers.append(ice_server) + + rtc_configuration = {"iceServers": ice_servers} + + peer_connection = window.RTCPeerConnection.new(rtc_configuration) + peer_connection.addEventListener("track", self.on_track) + peer_connection.addEventListener("negotiationneeded", self.on_negotiation_needed) + peer_connection.addEventListener("icecandidate", self.on_ice_candidate) + peer_connection.addEventListener("icegatheringstatechange", self.on_ice_gathering_state_change) + + self._peer_connection = peer_connection + window.pc = self._peer_connection + + async def _get_user_media( + self, + audio: bool = True, + video: bool = True + ): + """Gets user media + + @param audio: True if an audio flux is required + @param video: True if a video flux is required + """ + media_constraints = {'audio': audio, 'video': video} + local_stream = await window.navigator.mediaDevices.getUserMedia(media_constraints) + document["local_video"].srcObject = local_stream + + for track in local_stream.getTracks(): + self._peer_connection.addTrack(track) + + async def _gather_ice_candidates(self, is_initiator: bool, remote_candidates=None): + """Get ICE candidates and wait to have them all before returning them + + @param is_initiator: Boolean indicating if the user is the initiator of the connection + @param remote_candidates: Remote ICE candidates, if any + """ + if self._peer_connection is None: + raise Exception("The peer connection must be created before gathering ICE candidates!") + + self.media_candidates.clear() + gather_timeout = timer.set_timeout( + lambda: self.candidates_gathered.set_exception( + errors.TimeoutError("ICE gathering time out") + ), + GATHER_TIMEOUT + ) + + if is_initiator: + offer = await self._peer_connection.createOffer() + self._set_media_types(offer) + await self._peer_connection.setLocalDescription(offer) + else: + answer = await self._peer_connection.createAnswer() + self._set_media_types(answer) + await self._peer_connection.setLocalDescription(answer) + + if not is_initiator: + log.debug(self._peer_connection.localDescription.sdp) + await self.candidates_gathered + log.debug(self._peer_connection.localDescription.sdp) + timer.clear_timeout(gather_timeout) + ufrag, pwd = self.extract_pwd_ufrag(self._peer_connection.localDescription.sdp) + return { + "ufrag": ufrag, + "pwd": pwd, + "candidates": self.media_candidates, + } + + def on_action_new( + self, action_data_s: str, action_id: str, security_limit: int, profile: str + ) -> None: + """Called when a call is received + + @param action_data_s: Action data serialized + @param action_id: Unique identifier for the action + @param security_limit: Security limit for the action + @param profile: Profile associated with the action + """ + action_data = json.loads(action_data_s) + if action_data.get("type") != "call": + return + peer_jid = action_data["from_jid"] + log.info( + f"{peer_jid} wants to start a call ({action_data['sub_type']})" + ) + if self.sid is not None: + log.warning( + f"already in a call ({self.sid}), can't receive a new call from " + f"{peer_jid}" + ) + return + self.sid = action_data["session_id"] + log.debug(f"Call SID: {self.sid}") + + # Answer the call + offer_sdp = action_data["sdp"] + aio.run(self.answer_call(offer_sdp, action_id)) + + def _on_call_accepted(self, session_id: str, sdp: str, profile: str) -> None: + """Called when we have received answer SDP from responder + + @param session_id: Session identifier + @param sdp: Session Description Protocol data + @param profile: Profile associated with the action + """ + aio.run(self.on_call_accepted(session_id, sdp, profile)) + + def _on_call_ended(self, session_id: str, data_s: str, profile: str) -> None: + """Call has been terminated + + @param session_id: Session identifier + @param data_s: Serialised additional data on why the call has ended + @param profile: Profile associated + """ + if self.sid is None: + log.debug("there are no calls in progress") + return + if session_id != self.sid: + log.debug( + f"ignoring call_ended not linked to our call ({self.sid}): {session_id}" + ) + return + aio.run(self.end_call()) + + async def on_call_accepted(self, session_id: str, sdp: str, profile: str) -> None: + """Call has been accepted, connection can be established + + @param session_id: Session identifier + @param sdp: Session Description Protocol data + @param profile: Profile associated + """ + if self.sid != session_id: + log.debug( + f"Call ignored due to different session ID ({self.sid=} {session_id=})" + ) + return + await self._peer_connection.setRemoteDescription({ + "type": "answer", + "sdp": sdp + }) + await self.on_ice_candidates_new(self.candidates_buffer) + self.candidates_buffer.clear() + + def _on_ice_candidates_new(self, sid: str, candidates_s: str, profile: str) -> None: + """Called when new ICE candidates are received + + @param sid: Session identifier + @param candidates_s: ICE candidates serialized + @param profile: Profile associated with the action + """ + if sid != self.sid: + log.debug( + f"ignoring peer ice candidates for {sid=} ({self.sid=})." + ) + return + candidates = json.loads(candidates_s) + aio.run(self.on_ice_candidates_new(candidates)) + + async def on_ice_candidates_new(self, candidates: dict) -> None: + """Called when new ICE canidates are received from peer + + @param candidates: Dictionary containing new ICE candidates + """ + log.debug(f"new peer candidates received: {candidates}") + if ( + self._peer_connection is None + or self._peer_connection.remoteDescription is None + ): + for media_type in ("audio", "video"): + media_candidates = candidates.get(media_type) + if media_candidates: + buffer = self.candidates_buffer[media_type] + buffer["candidates"].extend(media_candidates["candidates"]) + return + for media_type, ice_data in candidates.items(): + for candidate in ice_data["candidates"]: + candidate_sdp = self.build_ice_candidate(candidate) + try: + sdp_mline_index = self.get_sdp_mline_index(media_type) + except Exception as e: + log.warning(e) + continue + ice_candidate = window.RTCIceCandidate.new({ + "candidate": candidate_sdp, + "sdpMLineIndex": sdp_mline_index + } + ) + await self._peer_connection.addIceCandidate(ice_candidate) + + def on_track(self, event): + """New track has been received from peer + + @param event: Event associated with the new track + """ + if event.streams and event.streams[0]: + remote_stream = event.streams[0] + document["remote_video"].srcObject = remote_stream + else: + if self.remote_stream is None: + self.remote_stream = window.MediaStream.new() + document["remote_video"].srcObject = self.remote_stream + self.remote_stream.addTrack(event.track) + + document["call_btn"].classList.add("is-hidden") + document["hangup_btn"].classList.remove("is-hidden") + + def on_negotiation_needed(self, event) -> None: + log.debug(f"on_negotiation_needed {event=}") + # TODO + + async def answer_call(self, offer_sdp: str, action_id: str): + """We respond to the call""" + log.debug("answering call") + await self._create_peer_connection() + + await self._peer_connection.setRemoteDescription({ + "type": "offer", + "sdp": offer_sdp + }) + await self.on_ice_candidates_new(self.candidates_buffer) + self.candidates_buffer.clear() + await self._get_user_media() + + # Gather local ICE candidates + local_ice_data = await self._gather_ice_candidates(False) + self.local_candidates = local_ice_data["candidates"] + + await bridge.action_launch( + action_id, + json.dumps({ + "sdp": self._peer_connection.localDescription.sdp, + }) + ) + + async def make_call(self, audio: bool = True, video: bool = True) -> None: + """Start a WebRTC call + + @param audio: True if an audio flux is required + @param video: True if a video flux is required + """ + await self._create_peer_connection() + await self._get_user_media(audio, video) + await self._gather_ice_candidates(True) + callee_jid = document["callee_jid"].value + + call_data = { + "sdp": self._peer_connection.localDescription.sdp + } + log.info(f"calling {callee_jid!r}") + self.sid = await bridge.call_start( + callee_jid, + json.dumps(call_data) + ) + log.debug(f"Call SID: {self.sid}") + + async def end_call(self) -> None: + """Stop streaming and clean instance""" + document["hangup_btn"].classList.add("is-hidden") + document["call_btn"].classList.remove("is-hidden") + if self._peer_connection is None: + log.debug("There is currently no call to end.") + else: + self._peer_connection.removeEventListener("track", self.on_track) + self._peer_connection.removeEventListener("negotiationneeded", self.on_negotiation_needed) + self._peer_connection.removeEventListener("icecandidate", self.on_ice_candidate) + self._peer_connection.removeEventListener("icegatheringstatechange", self.on_ice_gathering_state_change) + + local_video = document["local_video"] + remote_video = document["remote_video"] + if local_video.srcObject: + for track in local_video.srcObject.getTracks(): + track.stop() + if remote_video.srcObject: + for track in remote_video.srcObject.getTracks(): + track.stop() + + self._peer_connection.close() + self.reset_instance() + + async def hand_up(self) -> None: + """Terminate the call""" + session_id = self.sid + await self.end_call() + await bridge.call_end( + session_id, + "" + ) + + +webrtc_call = WebRTCCall() + +document["call_btn"].bind( + "click", + lambda __: aio.run(webrtc_call.make_call()) +) +document["hangup_btn"].bind( + "click", + lambda __: aio.run(webrtc_call.hand_up()) +) + +bridge.register_signal("action_new", webrtc_call.on_action_new) +bridge.register_signal("call_accepted", webrtc_call._on_call_accepted) +bridge.register_signal("call_ended", webrtc_call._on_call_ended) +bridge.register_signal("ice_candidates_new", webrtc_call._on_ice_candidates_new) + +loading.remove_loading_screen() diff -r a3ca1bab6eb1 -r b8ed9726525b libervia/pages/calls/page_meta.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/pages/calls/page_meta.py Thu Jun 01 21:42:02 2023 +0200 @@ -0,0 +1,19 @@ +#!/usr/bin/env python3 + + +from sat.core.i18n import _ +from sat.core.log import getLogger +from sat.tools.common import data_format +from twisted.internet import defer +import datetime +import time +from dateutil import tz + +from libervia.server.constants import Const as C + +log = getLogger(__name__) + + +name = "calls" +access = C.PAGES_ACCESS_PROFILE +template = "call/call.html" diff -r a3ca1bab6eb1 -r b8ed9726525b libervia/server/constants.py --- a/libervia/server/constants.py Thu Jun 01 20:44:57 2023 +0200 +++ b/libervia/server/constants.py Thu Jun 01 21:42:02 2023 +0200 @@ -95,6 +95,7 @@ "events", "lists", "merge-requests", + "calls" # XXX: app is not available anymore since removal of pyjamas code with Python 3 # port. It should come back at a later point with an alternative (Brython # probably).