Mercurial > libervia-web
view libervia/web/pages/calls/_browser/webrtc.py @ 1555:0796bb8bad2f
doc (calls): minor markup fix
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 14 Aug 2023 17:10:27 +0200 |
parents | 83c2a6faa2ae |
children | 410064b31dca |
line wrap: on
line source
import json import re from bridge import AsyncBridge as Bridge from browser import aio, console as log, document, timer, window import dialog import errors from javascript import JSObject import jid log.warning = log.warn profile = window.profile or "" bridge = Bridge() GATHER_TIMEOUT = 10000 class WebRTC: def __init__(self): self.reset_instance() bridge.register_signal("ice_candidates_new", self._on_ice_candidates_new) self.is_audio_muted = None self.is_video_muted = None self._is_sharing_screen = False self.screen_sharing_cb = None self.local_video_elt = document["local_video"] self.remote_video_elt = document["remote_video"] @property def is_sharing_screen(self) -> bool: return self._is_sharing_screen @is_sharing_screen.setter def is_sharing_screen(self, sharing: bool) -> None: if sharing != self._is_sharing_screen: self._is_sharing_screen = sharing if self.screen_sharing_cb is not None: self.screen_sharing_cb(sharing) def reset_instance(self): """Inits or resets the instance variables to their default state.""" self._peer_connection = None self._media_types = None self._media_types_inv = None self._callee = None self.sid = None self.local_candidates = None self.remote_stream = None self.candidates_buffer = { "audio": {"candidates": []}, "video": {"candidates": []}, } self.media_candidates = {} self.candidates_gathered = aio.Future() @property def media_types(self): if self._media_types is None: raise Exception("self._media_types should not be None!") return self._media_types @media_types.setter def media_types(self, new_media_types: dict) -> None: self._media_types = new_media_types self._media_types_inv = {v: k for k, v in new_media_types.items()} @property def media_types_inv(self) -> dict: if self._media_types_inv is None: raise Exception("self._media_types_inv should not be None!") return self._media_types_inv def get_sdp_mline_index(self, media_type): """Gets the sdpMLineIndex for a given media type. @param media_type: The type of the media. """ for index, m_type in self.media_types.items(): if m_type == media_type: return index raise ValueError(f"Media type '{media_type}' not found") def extract_pwd_ufrag(self, sdp): """Retrieves ICE password and user fragment for SDP offer. @param sdp: The Session Description Protocol offer string. """ ufrag_line = re.search(r"ice-ufrag:(\S+)", sdp) pwd_line = re.search(r"ice-pwd:(\S+)", sdp) if ufrag_line and pwd_line: return ufrag_line.group(1), pwd_line.group(1) else: log.error(f"SDP with missing ice-ufrag or ice-pwd:\n{sdp}") raise ValueError("Can't extract ice-ufrag and ice-pwd from SDP") def extract_fingerprint_data(self, sdp): """Retrieves fingerprint data from an SDP offer. @param sdp: The Session Description Protocol offer string. @return: A dictionary containing the fingerprint data. """ fingerprint_line = re.search(r"a=fingerprint:(\S+)\s+(\S+)", sdp) if fingerprint_line: algorithm, fingerprint = fingerprint_line.groups() fingerprint_data = {"hash": algorithm, "fingerprint": fingerprint} setup_line = re.search(r"a=setup:(\S+)", sdp) if setup_line: setup = setup_line.group(1) fingerprint_data["setup"] = setup return fingerprint_data else: raise ValueError("fingerprint should not be missing") def parse_ice_candidate(self, candidate_string): """Parses the ice candidate string. @param candidate_string: The ice candidate string to be parsed. """ pattern = re.compile( r"candidate:(?P<foundation>\S+) (?P<component_id>\d+) (?P<transport>\S+) " r"(?P<priority>\d+) (?P<address>\S+) (?P<port>\d+) typ " r"(?P<type>\S+)(?: raddr (?P<rel_addr>\S+) rport " r"(?P<rel_port>\d+))?(?: generation (?P<generation>\d+))?" ) match = pattern.match(candidate_string) if match: candidate_dict = match.groupdict() # Apply the correct types to the dictionary values candidate_dict["component_id"] = int(candidate_dict["component_id"]) candidate_dict["priority"] = int(candidate_dict["priority"]) candidate_dict["port"] = int(candidate_dict["port"]) if candidate_dict["rel_port"]: candidate_dict["rel_port"] = int(candidate_dict["rel_port"]) if candidate_dict["generation"]: candidate_dict["generation"] = candidate_dict["generation"] # Remove None values return {k: v for k, v in candidate_dict.items() if v is not None} else: log.warning(f"can't parse candidate: {candidate_string!r}") return None def build_ice_candidate(self, parsed_candidate): """Builds ICE candidate @param parsed_candidate: Dictionary containing parsed ICE candidate """ base_format = ( "candidate:{foundation} {component_id} {transport} {priority} " "{address} {port} typ {type}" ) if parsed_candidate.get("rel_addr") and parsed_candidate.get("rel_port"): base_format += " raddr {rel_addr} rport {rel_port}" if parsed_candidate.get("generation"): base_format += " generation {generation}" return base_format.format(**parsed_candidate) def on_ice_candidate(self, event): """Handles ICE candidate event @param event: Event containing the ICE candidate """ log.debug(f"on ice candidate {event.candidate=}") if event.candidate and event.candidate.candidate: window.last_event = event parsed_candidate = self.parse_ice_candidate(event.candidate.candidate) if parsed_candidate is None: return try: media_type = self.media_types[event.candidate.sdpMLineIndex] except (TypeError, IndexError): log.error( f"Can't find media type.\n{event.candidate=}\n{self._media_types=}" ) return self.media_candidates.setdefault(media_type, []).append(parsed_candidate) log.debug(f"ICE candidate [{media_type}]: {event.candidate.candidate}") else: log.debug("All ICE candidates gathered") def _set_media_types(self, offer): """Sets media types from offer SDP @param offer: RTC session description containing the offer """ sdp_lines = offer.sdp.splitlines() media_types = {} mline_index = 0 for line in sdp_lines: if line.startswith("m="): media_types[mline_index] = line[2 : line.find(" ")] mline_index += 1 self.media_types = media_types def on_ice_gathering_state_change(self, event): """Handles ICE gathering state change @param event: Event containing the ICE gathering state change """ connection = event.target log.debug(f"on_ice_gathering_state_change {connection.iceGatheringState=}") if connection.iceGatheringState == "complete": log.info("ICE candidates gathering done") self.candidates_gathered.set_result(None) async def _create_peer_connection( self, ): """Creates peer connection""" if self._peer_connection is not None: raise Exception("create_peer_connection can't be called twice!") external_disco = json.loads(await bridge.external_disco_get("")) ice_servers = [] for server in external_disco: ice_server = {} if server["type"] == "stun": ice_server["urls"] = f"stun:{server['host']}:{server['port']}" elif server["type"] == "turn": ice_server[ "urls" ] = f"turn:{server['host']}:{server['port']}?transport={server['transport']}" ice_server["username"] = server["username"] ice_server["credential"] = server["password"] ice_servers.append(ice_server) rtc_configuration = {"iceServers": ice_servers} peer_connection = window.RTCPeerConnection.new(rtc_configuration) peer_connection.addEventListener("track", self.on_track) peer_connection.addEventListener("negotiationneeded", self.on_negotiation_needed) peer_connection.addEventListener("icecandidate", self.on_ice_candidate) peer_connection.addEventListener( "icegatheringstatechange", self.on_ice_gathering_state_change ) self._peer_connection = peer_connection window.pc = self._peer_connection async def _get_user_media(self, audio: bool = True, video: bool = True) -> None: """ Gets user media (camera and microphone). @param audio: True if an audio flux is required. @param video: True if a video flux is required. """ media_constraints = {"audio": audio, "video": video} local_stream = await window.navigator.mediaDevices.getUserMedia(media_constraints) if not local_stream: log.error("Failed to get the media stream.") return self.local_video_elt.srcObject = local_stream for track in local_stream.getTracks(): self._peer_connection.addTrack(track) async def _replace_user_video( self, screen: bool = False, ) -> JSObject | None: """Replaces the user video track with either a camera or desktop sharing track. @param screen: True if desktop sharing is required. False will use the camera. @return: The local media stream or None if failed. """ if screen: media_constraints = {"video": {"cursor": "always"}} new_stream = await window.navigator.mediaDevices.getDisplayMedia( media_constraints ) else: if self.local_video_elt.srcObject: for track in self.local_video_elt.srcObject.getTracks(): if track.kind == "video": track.stop() media_constraints = {"video": True} new_stream = await window.navigator.mediaDevices.getUserMedia( media_constraints ) if not new_stream: log.error("Failed to get the media stream.") return None new_video_tracks = [ track for track in new_stream.getTracks() if track.kind == "video" ] if not new_video_tracks: log.error("Failed to retrieve the video track from the new stream.") return None # Retrieve the current local stream's video track. local_stream = self.local_video_elt.srcObject if local_stream: local_video_tracks = [ track for track in local_stream.getTracks() if track.kind == "video" ] if local_video_tracks: # Remove the old video track and add the new one to the local stream. local_stream.removeTrack(local_video_tracks[0]) local_stream.addTrack(new_video_tracks[0]) video_sender = next( ( sender for sender in self._peer_connection.getSenders() if sender.track and sender.track.kind == "video" ), None, ) if video_sender: await video_sender.replaceTrack(new_video_tracks[0]) if screen: # For screen sharing, we track the end event to properly stop the sharing when # the user clicks on the browser's stop sharing dialog. def on_track_ended(event): aio.run(self.toggle_screen_sharing()) new_video_tracks[0].bind("ended", on_track_ended) self.is_sharing_screen = screen return local_stream async def _gather_ice_candidates(self, is_initiator: bool, remote_candidates=None): """Get ICE candidates and wait to have them all before returning them @param is_initiator: Boolean indicating if the user is the initiator of the connection @param remote_candidates: Remote ICE candidates, if any """ if self._peer_connection is None: raise Exception( "The peer connection must be created before gathering ICE candidates!" ) self.media_candidates.clear() gather_timeout = timer.set_timeout( lambda: self.candidates_gathered.set_exception( errors.TimeoutError("ICE gathering time out") ), GATHER_TIMEOUT, ) if is_initiator: offer = await self._peer_connection.createOffer() self._set_media_types(offer) await self._peer_connection.setLocalDescription(offer) else: answer = await self._peer_connection.createAnswer() self._set_media_types(answer) await self._peer_connection.setLocalDescription(answer) if not is_initiator: log.debug(self._peer_connection.localDescription.sdp) await self.candidates_gathered log.debug(self._peer_connection.localDescription.sdp) timer.clear_timeout(gather_timeout) ufrag, pwd = self.extract_pwd_ufrag(self._peer_connection.localDescription.sdp) return { "ufrag": ufrag, "pwd": pwd, "candidates": self.media_candidates, } async def accept_call(self, session_id: str, sdp: str, profile: str) -> None: """Call has been accepted, connection can be established @param session_id: Session identifier @param sdp: Session Description Protocol data @param profile: Profile associated """ await self._peer_connection.setRemoteDescription({"type": "answer", "sdp": sdp}) await self.on_ice_candidates_new(self.candidates_buffer) self.candidates_buffer.clear() def _on_ice_candidates_new(self, sid: str, candidates_s: str, profile: str) -> None: """Called when new ICE candidates are received @param sid: Session identifier @param candidates_s: ICE candidates serialized @param profile: Profile associated with the action """ if sid != self.sid: log.debug(f"ignoring peer ice candidates for {sid=} ({self.sid=}).") return candidates = json.loads(candidates_s) aio.run(self.on_ice_candidates_new(candidates)) async def on_ice_candidates_new(self, candidates: dict) -> None: """Called when new ICE canidates are received from peer @param candidates: Dictionary containing new ICE candidates """ log.debug(f"new peer candidates received: {candidates}") if ( self._peer_connection is None or self._peer_connection.remoteDescription is None ): for media_type in ("audio", "video"): media_candidates = candidates.get(media_type) if media_candidates: buffer = self.candidates_buffer[media_type] buffer["candidates"].extend(media_candidates["candidates"]) return for media_type, ice_data in candidates.items(): for candidate in ice_data["candidates"]: candidate_sdp = self.build_ice_candidate(candidate) try: sdp_mline_index = self.get_sdp_mline_index(media_type) except Exception as e: log.warning(e) continue ice_candidate = window.RTCIceCandidate.new( {"candidate": candidate_sdp, "sdpMLineIndex": sdp_mline_index} ) await self._peer_connection.addIceCandidate(ice_candidate) def on_track(self, event): """New track has been received from peer @param event: Event associated with the new track """ if event.streams and event.streams[0]: remote_stream = event.streams[0] self.remote_video_elt.srcObject = remote_stream else: if self.remote_stream is None: self.remote_stream = window.MediaStream.new() self.remote_video_elt.srcObject = self.remote_stream self.remote_stream.addTrack(event.track) def on_negotiation_needed(self, event) -> None: log.debug(f"on_negotiation_needed {event=}") # TODO async def answer_call(self, sid: str, offer_sdp: str, profile: str): """We respond to the call""" log.debug("answering call") if sid != self.sid: raise Exception(f"Internal Error: unexpected sid: {sid=} {self.sid=}") await self._create_peer_connection() await self._peer_connection.setRemoteDescription( {"type": "offer", "sdp": offer_sdp} ) await self.on_ice_candidates_new(self.candidates_buffer) self.candidates_buffer.clear() await self._get_user_media() # Gather local ICE candidates local_ice_data = await self._gather_ice_candidates(False) self.local_candidates = local_ice_data["candidates"] await bridge.call_answer_sdp(sid, self._peer_connection.localDescription.sdp) async def make_call( self, callee_jid: jid.JID, audio: bool = True, video: bool = True ) -> None: """Start a WebRTC call @param audio: True if an audio flux is required @param video: True if a video flux is required """ await self._create_peer_connection() await self._get_user_media(audio, video) await self._gather_ice_candidates(True) call_data = {"sdp": self._peer_connection.localDescription.sdp} log.info(f"calling {callee_jid!r}") self.sid = await bridge.call_start(str(callee_jid), json.dumps(call_data)) log.debug(f"Call SID: {self.sid}") async def end_call(self) -> None: """Stop streaming and clean instance""" if self._peer_connection is None: log.debug("There is currently no call to end.") else: self._peer_connection.removeEventListener("track", self.on_track) self._peer_connection.removeEventListener( "negotiationneeded", self.on_negotiation_needed ) self._peer_connection.removeEventListener( "icecandidate", self.on_ice_candidate ) self._peer_connection.removeEventListener( "icegatheringstatechange", self.on_ice_gathering_state_change ) # Base64 encoded 1x1 black pixel image # this is a trick to reset the image displayed, so we don't see last image of # last stream black_image_data = ( "" "lEQVR42mP8/wcAAwAB/uzNq7sAAAAASUVORK5CYII=" ) local_video = self.local_video_elt remote_video = self.remote_video_elt if local_video.srcObject: for track in local_video.srcObject.getTracks(): track.stop() local_video.src = black_image_data if remote_video.srcObject: for track in remote_video.srcObject.getTracks(): track.stop() remote_video.src = black_image_data self._peer_connection.close() self.reset_instance() def toggle_media_mute(self, media_type: str) -> bool: """Toggle mute/unmute for media tracks. @param media_type: 'audio' or 'video'. Determines which media tracks to process. """ assert media_type in ("audio", "video"), "Invalid media type" local_video = self.local_video_elt is_muted_attr = f"is_{media_type}_muted" if local_video.srcObject: log.debug(f"{local_video.srcObject=}") track_getter = getattr( local_video.srcObject, f"get{media_type.capitalize()}Tracks" ) log.debug("track go") for track in track_getter(): log.debug(f"{track=}") track.enabled = not track.enabled setattr(self, is_muted_attr, not track.enabled) media_name = self.media_types_inv.get(media_type) if media_name is not None: extra = {"name": str(media_name)} aio.run( bridge.call_info( self.sid, "mute" if getattr(self, is_muted_attr) else "unmute", json.dumps(extra), ) ) return getattr(self, is_muted_attr) def toggle_audio_mute(self) -> bool: """Toggle mute/unmute for audio tracks.""" return self.toggle_media_mute("audio") def toggle_video_mute(self) -> bool: """Toggle mute/unmute for video tracks.""" return self.toggle_media_mute("video") async def toggle_screen_sharing(self): log.debug(f"toggle_screen_sharing {self._is_sharing_screen=}") if self._is_sharing_screen: await self._replace_user_video(screen=False) else: await self._replace_user_video(screen=True)