diff libervia/web/pages/calls/_browser/__init__.py @ 1518:eb00d593801d

refactoring: rename `libervia` to `libervia.web` + update imports following backend changes
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 16:49:28 +0200
parents libervia/pages/calls/_browser/__init__.py@b8ed9726525b
children e47c24204449
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/web/pages/calls/_browser/__init__.py	Fri Jun 02 16:49:28 2023 +0200
@@ -0,0 +1,513 @@
+import json
+import re
+
+from bridge import AsyncBridge as Bridge
+from browser import aio, console as log, document, timer, window
+import errors
+import loading
+
+log.warning = log.warn
+profile = window.profile or ""
+bridge = Bridge()
+GATHER_TIMEOUT = 10000
+
+
+class WebRTCCall:
+
+    def __init__(self):
+        self.reset_instance()
+
+    def reset_instance(self):
+        """Inits or resets the instance variables to their default state."""
+        self._peer_connection = None
+        self._media_types = None
+        self.sid = None
+        self.local_candidates = None
+        self.remote_stream = None
+        self.candidates_buffer = {
+            "audio": {"candidates": []},
+            "video": {"candidates": []},
+        }
+        self.media_candidates = {}
+        self.candidates_gathered = aio.Future()
+
+    @property
+    def media_types(self):
+        if self._media_types is None:
+            raise Exception("self._media_types should not be None!")
+        return self._media_types
+
+    def get_sdp_mline_index(self, media_type):
+        """Gets the sdpMLineIndex for a given media type.
+
+        @param media_type: The type of the media.
+        """
+        for index, m_type in self.media_types.items():
+            if m_type == media_type:
+                return index
+        raise ValueError(f"Media type '{media_type}' not found")
+
+    def extract_pwd_ufrag(self, sdp):
+        """Retrieves ICE password and user fragment for SDP offer.
+
+        @param sdp: The Session Description Protocol offer string.
+        """
+        ufrag_line = re.search(r"ice-ufrag:(\S+)", sdp)
+        pwd_line = re.search(r"ice-pwd:(\S+)", sdp)
+
+        if ufrag_line and pwd_line:
+            return ufrag_line.group(1), pwd_line.group(1)
+        else:
+            log.error(f"SDP with missing ice-ufrag or ice-pwd:\n{sdp}")
+            raise ValueError("Can't extract ice-ufrag and ice-pwd from SDP")
+
+    def extract_fingerprint_data(self, sdp):
+        """Retrieves fingerprint data from an SDP offer.
+
+        @param sdp: The Session Description Protocol offer string.
+        @return: A dictionary containing the fingerprint data.
+        """
+        fingerprint_line = re.search(r"a=fingerprint:(\S+)\s+(\S+)", sdp)
+        if fingerprint_line:
+            algorithm, fingerprint = fingerprint_line.groups()
+            fingerprint_data = {
+                "hash": algorithm,
+                "fingerprint": fingerprint
+            }
+
+            setup_line = re.search(r"a=setup:(\S+)", sdp)
+            if setup_line:
+                setup = setup_line.group(1)
+                fingerprint_data["setup"] = setup
+
+            return fingerprint_data
+        else:
+            raise ValueError("fingerprint should not be missing")
+
+    def parse_ice_candidate(self, candidate_string):
+        """Parses the ice candidate string.
+
+        @param candidate_string: The ice candidate string to be parsed.
+        """
+        pattern = re.compile(
+            r"candidate:(?P<foundation>\S+) (?P<component_id>\d+) (?P<transport>\S+) "
+            r"(?P<priority>\d+) (?P<address>\S+) (?P<port>\d+) typ "
+            r"(?P<type>\S+)(?: raddr (?P<rel_addr>\S+) rport "
+            r"(?P<rel_port>\d+))?(?: generation (?P<generation>\d+))?"
+        )
+        match = pattern.match(candidate_string)
+        if match:
+            candidate_dict = match.groupdict()
+
+            # Apply the correct types to the dictionary values
+            candidate_dict["component_id"] = int(candidate_dict["component_id"])
+            candidate_dict["priority"] = int(candidate_dict["priority"])
+            candidate_dict["port"] = int(candidate_dict["port"])
+
+            if candidate_dict["rel_port"]:
+                candidate_dict["rel_port"] = int(candidate_dict["rel_port"])
+
+            if candidate_dict["generation"]:
+                candidate_dict["generation"] = candidate_dict["generation"]
+
+            # Remove None values
+            return {k: v for k, v in candidate_dict.items() if v is not None}
+        else:
+            log.warning(f"can't parse candidate: {candidate_string!r}")
+            return None
+
+    def build_ice_candidate(self, parsed_candidate):
+        """Builds ICE candidate
+
+        @param parsed_candidate: Dictionary containing parsed ICE candidate
+        """
+        base_format = (
+            "candidate:{foundation} {component_id} {transport} {priority} "
+            "{address} {port} typ {type}"
+        )
+
+        if ((parsed_candidate.get('rel_addr')
+             and parsed_candidate.get('rel_port'))):
+            base_format += " raddr {rel_addr} rport {rel_port}"
+
+        if parsed_candidate.get('generation'):
+            base_format += " generation {generation}"
+
+        return base_format.format(**parsed_candidate)
+
+    def on_ice_candidate(self, event):
+        """Handles ICE candidate event
+
+        @param event: Event containing the ICE candidate
+        """
+        log.debug(f"on ice candidate {event.candidate=}")
+        if event.candidate and event.candidate.candidate:
+            window.last_event = event
+            parsed_candidate = self.parse_ice_candidate(event.candidate.candidate)
+            if parsed_candidate is None:
+                return
+            try:
+                media_type = self.media_types[event.candidate.sdpMLineIndex]
+            except (TypeError, IndexError):
+                log.error(
+                    f"Can't find media type.\n{event.candidate=}\n{self._media_types=}"
+                )
+                return
+            self.media_candidates.setdefault(media_type, []).append(parsed_candidate)
+            log.debug(f"ICE candidate [{media_type}]: {event.candidate.candidate}")
+        else:
+            log.debug("All ICE candidates gathered")
+
+    def _set_media_types(self, offer):
+        """Sets media types from offer SDP
+
+        @param offer: RTC session description containing the offer
+        """
+        sdp_lines = offer.sdp.splitlines()
+        media_types = {}
+        mline_index = 0
+
+        for line in sdp_lines:
+            if line.startswith("m="):
+                media_types[mline_index] = line[2:line.find(" ")]
+                mline_index += 1
+
+        self._media_types = media_types
+
+    def on_ice_gathering_state_change(self, event):
+        """Handles ICE gathering state change
+
+        @param event: Event containing the ICE gathering state change
+        """
+        connection = event.target
+        log.debug(f"on_ice_gathering_state_change {connection.iceGatheringState=}")
+        if connection.iceGatheringState == "complete":
+            log.info("ICE candidates gathering done")
+            self.candidates_gathered.set_result(None)
+
+    async def _create_peer_connection(
+        self,
+    ):
+        """Creates peer connection"""
+        if self._peer_connection is not None:
+            raise Exception("create_peer_connection can't be called twice!")
+
+        external_disco = json.loads(await bridge.external_disco_get(""))
+        ice_servers = []
+
+        for server in external_disco:
+            ice_server = {}
+            if server["type"] == "stun":
+                ice_server["urls"] = f"stun:{server['host']}:{server['port']}"
+            elif server["type"] == "turn":
+                ice_server["urls"] = (
+                    f"turn:{server['host']}:{server['port']}?transport={server['transport']}"
+                )
+                ice_server["username"] = server["username"]
+                ice_server["credential"] = server["password"]
+            ice_servers.append(ice_server)
+
+        rtc_configuration = {"iceServers": ice_servers}
+
+        peer_connection = window.RTCPeerConnection.new(rtc_configuration)
+        peer_connection.addEventListener("track", self.on_track)
+        peer_connection.addEventListener("negotiationneeded", self.on_negotiation_needed)
+        peer_connection.addEventListener("icecandidate", self.on_ice_candidate)
+        peer_connection.addEventListener("icegatheringstatechange", self.on_ice_gathering_state_change)
+
+        self._peer_connection = peer_connection
+        window.pc = self._peer_connection
+
+    async def _get_user_media(
+        self,
+        audio: bool = True,
+        video: bool = True
+    ):
+        """Gets user media
+
+        @param audio: True if an audio flux is required
+        @param video: True if a video flux is required
+        """
+        media_constraints = {'audio': audio, 'video': video}
+        local_stream = await window.navigator.mediaDevices.getUserMedia(media_constraints)
+        document["local_video"].srcObject = local_stream
+
+        for track in local_stream.getTracks():
+            self._peer_connection.addTrack(track)
+
+    async def _gather_ice_candidates(self, is_initiator: bool, remote_candidates=None):
+        """Get ICE candidates and wait to have them all before returning them
+
+        @param is_initiator: Boolean indicating if the user is the initiator of the connection
+        @param remote_candidates: Remote ICE candidates, if any
+        """
+        if self._peer_connection is None:
+            raise Exception("The peer connection must be created before gathering ICE candidates!")
+
+        self.media_candidates.clear()
+        gather_timeout = timer.set_timeout(
+            lambda: self.candidates_gathered.set_exception(
+                errors.TimeoutError("ICE gathering time out")
+            ),
+            GATHER_TIMEOUT
+        )
+
+        if is_initiator:
+            offer = await self._peer_connection.createOffer()
+            self._set_media_types(offer)
+            await self._peer_connection.setLocalDescription(offer)
+        else:
+            answer = await self._peer_connection.createAnswer()
+            self._set_media_types(answer)
+            await self._peer_connection.setLocalDescription(answer)
+
+        if not is_initiator:
+            log.debug(self._peer_connection.localDescription.sdp)
+        await self.candidates_gathered
+        log.debug(self._peer_connection.localDescription.sdp)
+        timer.clear_timeout(gather_timeout)
+        ufrag, pwd = self.extract_pwd_ufrag(self._peer_connection.localDescription.sdp)
+        return {
+            "ufrag": ufrag,
+            "pwd": pwd,
+            "candidates": self.media_candidates,
+        }
+
+    def on_action_new(
+        self, action_data_s: str, action_id: str, security_limit: int, profile: str
+    ) -> None:
+        """Called when a call is received
+
+        @param action_data_s: Action data serialized
+        @param action_id: Unique identifier for the action
+        @param security_limit: Security limit for the action
+        @param profile: Profile associated with the action
+        """
+        action_data = json.loads(action_data_s)
+        if action_data.get("type") != "call":
+            return
+        peer_jid = action_data["from_jid"]
+        log.info(
+            f"{peer_jid} wants to start a call ({action_data['sub_type']})"
+        )
+        if self.sid is not None:
+            log.warning(
+                f"already in a call ({self.sid}), can't receive a new call from "
+                f"{peer_jid}"
+            )
+            return
+        self.sid = action_data["session_id"]
+        log.debug(f"Call SID: {self.sid}")
+
+        # Answer the call
+        offer_sdp = action_data["sdp"]
+        aio.run(self.answer_call(offer_sdp, action_id))
+
+    def _on_call_accepted(self, session_id: str, sdp: str, profile: str) -> None:
+        """Called when we have received answer SDP from responder
+
+        @param session_id: Session identifier
+        @param sdp: Session Description Protocol data
+        @param profile: Profile associated with the action
+        """
+        aio.run(self.on_call_accepted(session_id, sdp, profile))
+
+    def _on_call_ended(self, session_id: str, data_s: str, profile: str) -> None:
+        """Call has been terminated
+
+        @param session_id: Session identifier
+        @param data_s: Serialised additional data on why the call has ended
+        @param profile: Profile associated
+        """
+        if self.sid is None:
+            log.debug("there are no calls in progress")
+            return
+        if session_id != self.sid:
+            log.debug(
+                f"ignoring call_ended not linked to our call ({self.sid}): {session_id}"
+            )
+            return
+        aio.run(self.end_call())
+
+    async def on_call_accepted(self, session_id: str, sdp: str, profile: str) -> None:
+        """Call has been accepted, connection can be established
+
+        @param session_id: Session identifier
+        @param sdp: Session Description Protocol data
+        @param profile: Profile associated
+        """
+        if self.sid != session_id:
+            log.debug(
+                f"Call ignored due to different session ID ({self.sid=} {session_id=})"
+            )
+            return
+        await self._peer_connection.setRemoteDescription({
+            "type": "answer",
+            "sdp": sdp
+        })
+        await self.on_ice_candidates_new(self.candidates_buffer)
+        self.candidates_buffer.clear()
+
+    def _on_ice_candidates_new(self, sid: str, candidates_s: str, profile: str) -> None:
+        """Called when new ICE candidates are received
+
+        @param sid: Session identifier
+        @param candidates_s: ICE candidates serialized
+        @param profile: Profile associated with the action
+        """
+        if sid != self.sid:
+            log.debug(
+                f"ignoring peer ice candidates for {sid=} ({self.sid=})."
+            )
+            return
+        candidates = json.loads(candidates_s)
+        aio.run(self.on_ice_candidates_new(candidates))
+
+    async def on_ice_candidates_new(self, candidates: dict) -> None:
+        """Called when new ICE canidates are received from peer
+
+        @param candidates: Dictionary containing new ICE candidates
+        """
+        log.debug(f"new peer candidates received: {candidates}")
+        if (
+            self._peer_connection is None
+            or self._peer_connection.remoteDescription is None
+        ):
+            for media_type in ("audio", "video"):
+                media_candidates = candidates.get(media_type)
+                if media_candidates:
+                    buffer = self.candidates_buffer[media_type]
+                    buffer["candidates"].extend(media_candidates["candidates"])
+            return
+        for media_type, ice_data in candidates.items():
+            for candidate in ice_data["candidates"]:
+                candidate_sdp = self.build_ice_candidate(candidate)
+                try:
+                    sdp_mline_index = self.get_sdp_mline_index(media_type)
+                except Exception as e:
+                    log.warning(e)
+                    continue
+                ice_candidate = window.RTCIceCandidate.new({
+                    "candidate": candidate_sdp,
+                    "sdpMLineIndex": sdp_mline_index
+                }
+                                                           )
+                await self._peer_connection.addIceCandidate(ice_candidate)
+
+    def on_track(self, event):
+        """New track has been received from peer
+
+        @param event: Event associated with the new track
+        """
+        if event.streams and event.streams[0]:
+            remote_stream = event.streams[0]
+            document["remote_video"].srcObject = remote_stream
+        else:
+            if self.remote_stream is None:
+                self.remote_stream = window.MediaStream.new()
+                document["remote_video"].srcObject = self.remote_stream
+            self.remote_stream.addTrack(event.track)
+
+        document["call_btn"].classList.add("is-hidden")
+        document["hangup_btn"].classList.remove("is-hidden")
+
+    def on_negotiation_needed(self, event) -> None:
+        log.debug(f"on_negotiation_needed {event=}")
+        # TODO
+
+    async def answer_call(self, offer_sdp: str, action_id: str):
+        """We respond to the call"""
+        log.debug("answering call")
+        await self._create_peer_connection()
+
+        await self._peer_connection.setRemoteDescription({
+            "type": "offer",
+            "sdp": offer_sdp
+        })
+        await self.on_ice_candidates_new(self.candidates_buffer)
+        self.candidates_buffer.clear()
+        await self._get_user_media()
+
+        # Gather local ICE candidates
+        local_ice_data = await self._gather_ice_candidates(False)
+        self.local_candidates = local_ice_data["candidates"]
+
+        await bridge.action_launch(
+            action_id,
+            json.dumps({
+                "sdp": self._peer_connection.localDescription.sdp,
+            })
+        )
+
+    async def make_call(self, audio: bool = True, video: bool = True) -> None:
+        """Start a WebRTC call
+
+        @param audio: True if an audio flux is required
+        @param video: True if a video flux is required
+        """
+        await self._create_peer_connection()
+        await self._get_user_media(audio, video)
+        await self._gather_ice_candidates(True)
+        callee_jid = document["callee_jid"].value
+
+        call_data = {
+            "sdp": self._peer_connection.localDescription.sdp
+        }
+        log.info(f"calling {callee_jid!r}")
+        self.sid = await bridge.call_start(
+            callee_jid,
+            json.dumps(call_data)
+        )
+        log.debug(f"Call SID: {self.sid}")
+
+    async def end_call(self) -> None:
+        """Stop streaming and clean instance"""
+        document["hangup_btn"].classList.add("is-hidden")
+        document["call_btn"].classList.remove("is-hidden")
+        if self._peer_connection is None:
+            log.debug("There is currently no call to end.")
+        else:
+            self._peer_connection.removeEventListener("track", self.on_track)
+            self._peer_connection.removeEventListener("negotiationneeded", self.on_negotiation_needed)
+            self._peer_connection.removeEventListener("icecandidate", self.on_ice_candidate)
+            self._peer_connection.removeEventListener("icegatheringstatechange", self.on_ice_gathering_state_change)
+
+            local_video = document["local_video"]
+            remote_video = document["remote_video"]
+            if local_video.srcObject:
+                for track in local_video.srcObject.getTracks():
+                    track.stop()
+            if remote_video.srcObject:
+                for track in remote_video.srcObject.getTracks():
+                    track.stop()
+
+            self._peer_connection.close()
+            self.reset_instance()
+
+    async def hand_up(self) -> None:
+        """Terminate the call"""
+        session_id = self.sid
+        await self.end_call()
+        await bridge.call_end(
+            session_id,
+            ""
+        )
+
+
+webrtc_call = WebRTCCall()
+
+document["call_btn"].bind(
+    "click",
+    lambda __: aio.run(webrtc_call.make_call())
+)
+document["hangup_btn"].bind(
+    "click",
+    lambda __: aio.run(webrtc_call.hand_up())
+)
+
+bridge.register_signal("action_new", webrtc_call.on_action_new)
+bridge.register_signal("call_accepted", webrtc_call._on_call_accepted)
+bridge.register_signal("call_ended", webrtc_call._on_call_ended)
+bridge.register_signal("ice_candidates_new", webrtc_call._on_ice_candidates_new)
+
+loading.remove_loading_screen()