Mercurial > libervia-web
view libervia/web/pages/calls/_browser/__init__.py @ 1534:49ad8dd210d0
server (restricted_bridge): add `message_send` method
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 22 Jun 2023 16:36:18 +0200 |
parents | eb00d593801d |
children | e47c24204449 |
line wrap: on
line source
import json import re from bridge import AsyncBridge as Bridge from browser import aio, console as log, document, timer, window import errors import loading log.warning = log.warn profile = window.profile or "" bridge = Bridge() GATHER_TIMEOUT = 10000 class WebRTCCall: def __init__(self): self.reset_instance() def reset_instance(self): """Inits or resets the instance variables to their default state.""" self._peer_connection = None self._media_types = None self.sid = None self.local_candidates = None self.remote_stream = None self.candidates_buffer = { "audio": {"candidates": []}, "video": {"candidates": []}, } self.media_candidates = {} self.candidates_gathered = aio.Future() @property def media_types(self): if self._media_types is None: raise Exception("self._media_types should not be None!") return self._media_types def get_sdp_mline_index(self, media_type): """Gets the sdpMLineIndex for a given media type. @param media_type: The type of the media. """ for index, m_type in self.media_types.items(): if m_type == media_type: return index raise ValueError(f"Media type '{media_type}' not found") def extract_pwd_ufrag(self, sdp): """Retrieves ICE password and user fragment for SDP offer. @param sdp: The Session Description Protocol offer string. """ ufrag_line = re.search(r"ice-ufrag:(\S+)", sdp) pwd_line = re.search(r"ice-pwd:(\S+)", sdp) if ufrag_line and pwd_line: return ufrag_line.group(1), pwd_line.group(1) else: log.error(f"SDP with missing ice-ufrag or ice-pwd:\n{sdp}") raise ValueError("Can't extract ice-ufrag and ice-pwd from SDP") def extract_fingerprint_data(self, sdp): """Retrieves fingerprint data from an SDP offer. @param sdp: The Session Description Protocol offer string. @return: A dictionary containing the fingerprint data. """ fingerprint_line = re.search(r"a=fingerprint:(\S+)\s+(\S+)", sdp) if fingerprint_line: algorithm, fingerprint = fingerprint_line.groups() fingerprint_data = { "hash": algorithm, "fingerprint": fingerprint } setup_line = re.search(r"a=setup:(\S+)", sdp) if setup_line: setup = setup_line.group(1) fingerprint_data["setup"] = setup return fingerprint_data else: raise ValueError("fingerprint should not be missing") def parse_ice_candidate(self, candidate_string): """Parses the ice candidate string. @param candidate_string: The ice candidate string to be parsed. """ pattern = re.compile( r"candidate:(?P<foundation>\S+) (?P<component_id>\d+) (?P<transport>\S+) " r"(?P<priority>\d+) (?P<address>\S+) (?P<port>\d+) typ " r"(?P<type>\S+)(?: raddr (?P<rel_addr>\S+) rport " r"(?P<rel_port>\d+))?(?: generation (?P<generation>\d+))?" ) match = pattern.match(candidate_string) if match: candidate_dict = match.groupdict() # Apply the correct types to the dictionary values candidate_dict["component_id"] = int(candidate_dict["component_id"]) candidate_dict["priority"] = int(candidate_dict["priority"]) candidate_dict["port"] = int(candidate_dict["port"]) if candidate_dict["rel_port"]: candidate_dict["rel_port"] = int(candidate_dict["rel_port"]) if candidate_dict["generation"]: candidate_dict["generation"] = candidate_dict["generation"] # Remove None values return {k: v for k, v in candidate_dict.items() if v is not None} else: log.warning(f"can't parse candidate: {candidate_string!r}") return None def build_ice_candidate(self, parsed_candidate): """Builds ICE candidate @param parsed_candidate: Dictionary containing parsed ICE candidate """ base_format = ( "candidate:{foundation} {component_id} {transport} {priority} " "{address} {port} typ {type}" ) if ((parsed_candidate.get('rel_addr') and parsed_candidate.get('rel_port'))): base_format += " raddr {rel_addr} rport {rel_port}" if parsed_candidate.get('generation'): base_format += " generation {generation}" return base_format.format(**parsed_candidate) def on_ice_candidate(self, event): """Handles ICE candidate event @param event: Event containing the ICE candidate """ log.debug(f"on ice candidate {event.candidate=}") if event.candidate and event.candidate.candidate: window.last_event = event parsed_candidate = self.parse_ice_candidate(event.candidate.candidate) if parsed_candidate is None: return try: media_type = self.media_types[event.candidate.sdpMLineIndex] except (TypeError, IndexError): log.error( f"Can't find media type.\n{event.candidate=}\n{self._media_types=}" ) return self.media_candidates.setdefault(media_type, []).append(parsed_candidate) log.debug(f"ICE candidate [{media_type}]: {event.candidate.candidate}") else: log.debug("All ICE candidates gathered") def _set_media_types(self, offer): """Sets media types from offer SDP @param offer: RTC session description containing the offer """ sdp_lines = offer.sdp.splitlines() media_types = {} mline_index = 0 for line in sdp_lines: if line.startswith("m="): media_types[mline_index] = line[2:line.find(" ")] mline_index += 1 self._media_types = media_types def on_ice_gathering_state_change(self, event): """Handles ICE gathering state change @param event: Event containing the ICE gathering state change """ connection = event.target log.debug(f"on_ice_gathering_state_change {connection.iceGatheringState=}") if connection.iceGatheringState == "complete": log.info("ICE candidates gathering done") self.candidates_gathered.set_result(None) async def _create_peer_connection( self, ): """Creates peer connection""" if self._peer_connection is not None: raise Exception("create_peer_connection can't be called twice!") external_disco = json.loads(await bridge.external_disco_get("")) ice_servers = [] for server in external_disco: ice_server = {} if server["type"] == "stun": ice_server["urls"] = f"stun:{server['host']}:{server['port']}" elif server["type"] == "turn": ice_server["urls"] = ( f"turn:{server['host']}:{server['port']}?transport={server['transport']}" ) ice_server["username"] = server["username"] ice_server["credential"] = server["password"] ice_servers.append(ice_server) rtc_configuration = {"iceServers": ice_servers} peer_connection = window.RTCPeerConnection.new(rtc_configuration) peer_connection.addEventListener("track", self.on_track) peer_connection.addEventListener("negotiationneeded", self.on_negotiation_needed) peer_connection.addEventListener("icecandidate", self.on_ice_candidate) peer_connection.addEventListener("icegatheringstatechange", self.on_ice_gathering_state_change) self._peer_connection = peer_connection window.pc = self._peer_connection async def _get_user_media( self, audio: bool = True, video: bool = True ): """Gets user media @param audio: True if an audio flux is required @param video: True if a video flux is required """ media_constraints = {'audio': audio, 'video': video} local_stream = await window.navigator.mediaDevices.getUserMedia(media_constraints) document["local_video"].srcObject = local_stream for track in local_stream.getTracks(): self._peer_connection.addTrack(track) async def _gather_ice_candidates(self, is_initiator: bool, remote_candidates=None): """Get ICE candidates and wait to have them all before returning them @param is_initiator: Boolean indicating if the user is the initiator of the connection @param remote_candidates: Remote ICE candidates, if any """ if self._peer_connection is None: raise Exception("The peer connection must be created before gathering ICE candidates!") self.media_candidates.clear() gather_timeout = timer.set_timeout( lambda: self.candidates_gathered.set_exception( errors.TimeoutError("ICE gathering time out") ), GATHER_TIMEOUT ) if is_initiator: offer = await self._peer_connection.createOffer() self._set_media_types(offer) await self._peer_connection.setLocalDescription(offer) else: answer = await self._peer_connection.createAnswer() self._set_media_types(answer) await self._peer_connection.setLocalDescription(answer) if not is_initiator: log.debug(self._peer_connection.localDescription.sdp) await self.candidates_gathered log.debug(self._peer_connection.localDescription.sdp) timer.clear_timeout(gather_timeout) ufrag, pwd = self.extract_pwd_ufrag(self._peer_connection.localDescription.sdp) return { "ufrag": ufrag, "pwd": pwd, "candidates": self.media_candidates, } def on_action_new( self, action_data_s: str, action_id: str, security_limit: int, profile: str ) -> None: """Called when a call is received @param action_data_s: Action data serialized @param action_id: Unique identifier for the action @param security_limit: Security limit for the action @param profile: Profile associated with the action """ action_data = json.loads(action_data_s) if action_data.get("type") != "call": return peer_jid = action_data["from_jid"] log.info( f"{peer_jid} wants to start a call ({action_data['sub_type']})" ) if self.sid is not None: log.warning( f"already in a call ({self.sid}), can't receive a new call from " f"{peer_jid}" ) return self.sid = action_data["session_id"] log.debug(f"Call SID: {self.sid}") # Answer the call offer_sdp = action_data["sdp"] aio.run(self.answer_call(offer_sdp, action_id)) def _on_call_accepted(self, session_id: str, sdp: str, profile: str) -> None: """Called when we have received answer SDP from responder @param session_id: Session identifier @param sdp: Session Description Protocol data @param profile: Profile associated with the action """ aio.run(self.on_call_accepted(session_id, sdp, profile)) def _on_call_ended(self, session_id: str, data_s: str, profile: str) -> None: """Call has been terminated @param session_id: Session identifier @param data_s: Serialised additional data on why the call has ended @param profile: Profile associated """ if self.sid is None: log.debug("there are no calls in progress") return if session_id != self.sid: log.debug( f"ignoring call_ended not linked to our call ({self.sid}): {session_id}" ) return aio.run(self.end_call()) async def on_call_accepted(self, session_id: str, sdp: str, profile: str) -> None: """Call has been accepted, connection can be established @param session_id: Session identifier @param sdp: Session Description Protocol data @param profile: Profile associated """ if self.sid != session_id: log.debug( f"Call ignored due to different session ID ({self.sid=} {session_id=})" ) return await self._peer_connection.setRemoteDescription({ "type": "answer", "sdp": sdp }) await self.on_ice_candidates_new(self.candidates_buffer) self.candidates_buffer.clear() def _on_ice_candidates_new(self, sid: str, candidates_s: str, profile: str) -> None: """Called when new ICE candidates are received @param sid: Session identifier @param candidates_s: ICE candidates serialized @param profile: Profile associated with the action """ if sid != self.sid: log.debug( f"ignoring peer ice candidates for {sid=} ({self.sid=})." ) return candidates = json.loads(candidates_s) aio.run(self.on_ice_candidates_new(candidates)) async def on_ice_candidates_new(self, candidates: dict) -> None: """Called when new ICE canidates are received from peer @param candidates: Dictionary containing new ICE candidates """ log.debug(f"new peer candidates received: {candidates}") if ( self._peer_connection is None or self._peer_connection.remoteDescription is None ): for media_type in ("audio", "video"): media_candidates = candidates.get(media_type) if media_candidates: buffer = self.candidates_buffer[media_type] buffer["candidates"].extend(media_candidates["candidates"]) return for media_type, ice_data in candidates.items(): for candidate in ice_data["candidates"]: candidate_sdp = self.build_ice_candidate(candidate) try: sdp_mline_index = self.get_sdp_mline_index(media_type) except Exception as e: log.warning(e) continue ice_candidate = window.RTCIceCandidate.new({ "candidate": candidate_sdp, "sdpMLineIndex": sdp_mline_index } ) await self._peer_connection.addIceCandidate(ice_candidate) def on_track(self, event): """New track has been received from peer @param event: Event associated with the new track """ if event.streams and event.streams[0]: remote_stream = event.streams[0] document["remote_video"].srcObject = remote_stream else: if self.remote_stream is None: self.remote_stream = window.MediaStream.new() document["remote_video"].srcObject = self.remote_stream self.remote_stream.addTrack(event.track) document["call_btn"].classList.add("is-hidden") document["hangup_btn"].classList.remove("is-hidden") def on_negotiation_needed(self, event) -> None: log.debug(f"on_negotiation_needed {event=}") # TODO async def answer_call(self, offer_sdp: str, action_id: str): """We respond to the call""" log.debug("answering call") await self._create_peer_connection() await self._peer_connection.setRemoteDescription({ "type": "offer", "sdp": offer_sdp }) await self.on_ice_candidates_new(self.candidates_buffer) self.candidates_buffer.clear() await self._get_user_media() # Gather local ICE candidates local_ice_data = await self._gather_ice_candidates(False) self.local_candidates = local_ice_data["candidates"] await bridge.action_launch( action_id, json.dumps({ "sdp": self._peer_connection.localDescription.sdp, }) ) async def make_call(self, audio: bool = True, video: bool = True) -> None: """Start a WebRTC call @param audio: True if an audio flux is required @param video: True if a video flux is required """ await self._create_peer_connection() await self._get_user_media(audio, video) await self._gather_ice_candidates(True) callee_jid = document["callee_jid"].value call_data = { "sdp": self._peer_connection.localDescription.sdp } log.info(f"calling {callee_jid!r}") self.sid = await bridge.call_start( callee_jid, json.dumps(call_data) ) log.debug(f"Call SID: {self.sid}") async def end_call(self) -> None: """Stop streaming and clean instance""" document["hangup_btn"].classList.add("is-hidden") document["call_btn"].classList.remove("is-hidden") if self._peer_connection is None: log.debug("There is currently no call to end.") else: self._peer_connection.removeEventListener("track", self.on_track) self._peer_connection.removeEventListener("negotiationneeded", self.on_negotiation_needed) self._peer_connection.removeEventListener("icecandidate", self.on_ice_candidate) self._peer_connection.removeEventListener("icegatheringstatechange", self.on_ice_gathering_state_change) local_video = document["local_video"] remote_video = document["remote_video"] if local_video.srcObject: for track in local_video.srcObject.getTracks(): track.stop() if remote_video.srcObject: for track in remote_video.srcObject.getTracks(): track.stop() self._peer_connection.close() self.reset_instance() async def hand_up(self) -> None: """Terminate the call""" session_id = self.sid await self.end_call() await bridge.call_end( session_id, "" ) webrtc_call = WebRTCCall() document["call_btn"].bind( "click", lambda __: aio.run(webrtc_call.make_call()) ) document["hangup_btn"].bind( "click", lambda __: aio.run(webrtc_call.hand_up()) ) bridge.register_signal("action_new", webrtc_call.on_action_new) bridge.register_signal("call_accepted", webrtc_call._on_call_accepted) bridge.register_signal("call_ended", webrtc_call._on_call_ended) bridge.register_signal("ice_candidates_new", webrtc_call._on_ice_candidates_new) loading.remove_loading_screen()