view libervia/web/pages/calls/_browser/__init__.py @ 1534:49ad8dd210d0

server (restricted_bridge): add `message_send` method
author Goffi <goffi@goffi.org>
date Thu, 22 Jun 2023 16:36:18 +0200
parents eb00d593801d
children e47c24204449
line wrap: on
line source

import json
import re

from bridge import AsyncBridge as Bridge
from browser import aio, console as log, document, timer, window
import errors
import loading

log.warning = log.warn
profile = window.profile or ""
bridge = Bridge()
GATHER_TIMEOUT = 10000


class WebRTCCall:

    def __init__(self):
        self.reset_instance()

    def reset_instance(self):
        """Inits or resets the instance variables to their default state."""
        self._peer_connection = None
        self._media_types = None
        self.sid = None
        self.local_candidates = None
        self.remote_stream = None
        self.candidates_buffer = {
            "audio": {"candidates": []},
            "video": {"candidates": []},
        }
        self.media_candidates = {}
        self.candidates_gathered = aio.Future()

    @property
    def media_types(self):
        if self._media_types is None:
            raise Exception("self._media_types should not be None!")
        return self._media_types

    def get_sdp_mline_index(self, media_type):
        """Gets the sdpMLineIndex for a given media type.

        @param media_type: The type of the media.
        """
        for index, m_type in self.media_types.items():
            if m_type == media_type:
                return index
        raise ValueError(f"Media type '{media_type}' not found")

    def extract_pwd_ufrag(self, sdp):
        """Retrieves ICE password and user fragment for SDP offer.

        @param sdp: The Session Description Protocol offer string.
        """
        ufrag_line = re.search(r"ice-ufrag:(\S+)", sdp)
        pwd_line = re.search(r"ice-pwd:(\S+)", sdp)

        if ufrag_line and pwd_line:
            return ufrag_line.group(1), pwd_line.group(1)
        else:
            log.error(f"SDP with missing ice-ufrag or ice-pwd:\n{sdp}")
            raise ValueError("Can't extract ice-ufrag and ice-pwd from SDP")

    def extract_fingerprint_data(self, sdp):
        """Retrieves fingerprint data from an SDP offer.

        @param sdp: The Session Description Protocol offer string.
        @return: A dictionary containing the fingerprint data.
        """
        fingerprint_line = re.search(r"a=fingerprint:(\S+)\s+(\S+)", sdp)
        if fingerprint_line:
            algorithm, fingerprint = fingerprint_line.groups()
            fingerprint_data = {
                "hash": algorithm,
                "fingerprint": fingerprint
            }

            setup_line = re.search(r"a=setup:(\S+)", sdp)
            if setup_line:
                setup = setup_line.group(1)
                fingerprint_data["setup"] = setup

            return fingerprint_data
        else:
            raise ValueError("fingerprint should not be missing")

    def parse_ice_candidate(self, candidate_string):
        """Parses the ice candidate string.

        @param candidate_string: The ice candidate string to be parsed.
        """
        pattern = re.compile(
            r"candidate:(?P<foundation>\S+) (?P<component_id>\d+) (?P<transport>\S+) "
            r"(?P<priority>\d+) (?P<address>\S+) (?P<port>\d+) typ "
            r"(?P<type>\S+)(?: raddr (?P<rel_addr>\S+) rport "
            r"(?P<rel_port>\d+))?(?: generation (?P<generation>\d+))?"
        )
        match = pattern.match(candidate_string)
        if match:
            candidate_dict = match.groupdict()

            # Apply the correct types to the dictionary values
            candidate_dict["component_id"] = int(candidate_dict["component_id"])
            candidate_dict["priority"] = int(candidate_dict["priority"])
            candidate_dict["port"] = int(candidate_dict["port"])

            if candidate_dict["rel_port"]:
                candidate_dict["rel_port"] = int(candidate_dict["rel_port"])

            if candidate_dict["generation"]:
                candidate_dict["generation"] = candidate_dict["generation"]

            # Remove None values
            return {k: v for k, v in candidate_dict.items() if v is not None}
        else:
            log.warning(f"can't parse candidate: {candidate_string!r}")
            return None

    def build_ice_candidate(self, parsed_candidate):
        """Builds ICE candidate

        @param parsed_candidate: Dictionary containing parsed ICE candidate
        """
        base_format = (
            "candidate:{foundation} {component_id} {transport} {priority} "
            "{address} {port} typ {type}"
        )

        if ((parsed_candidate.get('rel_addr')
             and parsed_candidate.get('rel_port'))):
            base_format += " raddr {rel_addr} rport {rel_port}"

        if parsed_candidate.get('generation'):
            base_format += " generation {generation}"

        return base_format.format(**parsed_candidate)

    def on_ice_candidate(self, event):
        """Handles ICE candidate event

        @param event: Event containing the ICE candidate
        """
        log.debug(f"on ice candidate {event.candidate=}")
        if event.candidate and event.candidate.candidate:
            window.last_event = event
            parsed_candidate = self.parse_ice_candidate(event.candidate.candidate)
            if parsed_candidate is None:
                return
            try:
                media_type = self.media_types[event.candidate.sdpMLineIndex]
            except (TypeError, IndexError):
                log.error(
                    f"Can't find media type.\n{event.candidate=}\n{self._media_types=}"
                )
                return
            self.media_candidates.setdefault(media_type, []).append(parsed_candidate)
            log.debug(f"ICE candidate [{media_type}]: {event.candidate.candidate}")
        else:
            log.debug("All ICE candidates gathered")

    def _set_media_types(self, offer):
        """Sets media types from offer SDP

        @param offer: RTC session description containing the offer
        """
        sdp_lines = offer.sdp.splitlines()
        media_types = {}
        mline_index = 0

        for line in sdp_lines:
            if line.startswith("m="):
                media_types[mline_index] = line[2:line.find(" ")]
                mline_index += 1

        self._media_types = media_types

    def on_ice_gathering_state_change(self, event):
        """Handles ICE gathering state change

        @param event: Event containing the ICE gathering state change
        """
        connection = event.target
        log.debug(f"on_ice_gathering_state_change {connection.iceGatheringState=}")
        if connection.iceGatheringState == "complete":
            log.info("ICE candidates gathering done")
            self.candidates_gathered.set_result(None)

    async def _create_peer_connection(
        self,
    ):
        """Creates peer connection"""
        if self._peer_connection is not None:
            raise Exception("create_peer_connection can't be called twice!")

        external_disco = json.loads(await bridge.external_disco_get(""))
        ice_servers = []

        for server in external_disco:
            ice_server = {}
            if server["type"] == "stun":
                ice_server["urls"] = f"stun:{server['host']}:{server['port']}"
            elif server["type"] == "turn":
                ice_server["urls"] = (
                    f"turn:{server['host']}:{server['port']}?transport={server['transport']}"
                )
                ice_server["username"] = server["username"]
                ice_server["credential"] = server["password"]
            ice_servers.append(ice_server)

        rtc_configuration = {"iceServers": ice_servers}

        peer_connection = window.RTCPeerConnection.new(rtc_configuration)
        peer_connection.addEventListener("track", self.on_track)
        peer_connection.addEventListener("negotiationneeded", self.on_negotiation_needed)
        peer_connection.addEventListener("icecandidate", self.on_ice_candidate)
        peer_connection.addEventListener("icegatheringstatechange", self.on_ice_gathering_state_change)

        self._peer_connection = peer_connection
        window.pc = self._peer_connection

    async def _get_user_media(
        self,
        audio: bool = True,
        video: bool = True
    ):
        """Gets user media

        @param audio: True if an audio flux is required
        @param video: True if a video flux is required
        """
        media_constraints = {'audio': audio, 'video': video}
        local_stream = await window.navigator.mediaDevices.getUserMedia(media_constraints)
        document["local_video"].srcObject = local_stream

        for track in local_stream.getTracks():
            self._peer_connection.addTrack(track)

    async def _gather_ice_candidates(self, is_initiator: bool, remote_candidates=None):
        """Get ICE candidates and wait to have them all before returning them

        @param is_initiator: Boolean indicating if the user is the initiator of the connection
        @param remote_candidates: Remote ICE candidates, if any
        """
        if self._peer_connection is None:
            raise Exception("The peer connection must be created before gathering ICE candidates!")

        self.media_candidates.clear()
        gather_timeout = timer.set_timeout(
            lambda: self.candidates_gathered.set_exception(
                errors.TimeoutError("ICE gathering time out")
            ),
            GATHER_TIMEOUT
        )

        if is_initiator:
            offer = await self._peer_connection.createOffer()
            self._set_media_types(offer)
            await self._peer_connection.setLocalDescription(offer)
        else:
            answer = await self._peer_connection.createAnswer()
            self._set_media_types(answer)
            await self._peer_connection.setLocalDescription(answer)

        if not is_initiator:
            log.debug(self._peer_connection.localDescription.sdp)
        await self.candidates_gathered
        log.debug(self._peer_connection.localDescription.sdp)
        timer.clear_timeout(gather_timeout)
        ufrag, pwd = self.extract_pwd_ufrag(self._peer_connection.localDescription.sdp)
        return {
            "ufrag": ufrag,
            "pwd": pwd,
            "candidates": self.media_candidates,
        }

    def on_action_new(
        self, action_data_s: str, action_id: str, security_limit: int, profile: str
    ) -> None:
        """Called when a call is received

        @param action_data_s: Action data serialized
        @param action_id: Unique identifier for the action
        @param security_limit: Security limit for the action
        @param profile: Profile associated with the action
        """
        action_data = json.loads(action_data_s)
        if action_data.get("type") != "call":
            return
        peer_jid = action_data["from_jid"]
        log.info(
            f"{peer_jid} wants to start a call ({action_data['sub_type']})"
        )
        if self.sid is not None:
            log.warning(
                f"already in a call ({self.sid}), can't receive a new call from "
                f"{peer_jid}"
            )
            return
        self.sid = action_data["session_id"]
        log.debug(f"Call SID: {self.sid}")

        # Answer the call
        offer_sdp = action_data["sdp"]
        aio.run(self.answer_call(offer_sdp, action_id))

    def _on_call_accepted(self, session_id: str, sdp: str, profile: str) -> None:
        """Called when we have received answer SDP from responder

        @param session_id: Session identifier
        @param sdp: Session Description Protocol data
        @param profile: Profile associated with the action
        """
        aio.run(self.on_call_accepted(session_id, sdp, profile))

    def _on_call_ended(self, session_id: str, data_s: str, profile: str) -> None:
        """Call has been terminated

        @param session_id: Session identifier
        @param data_s: Serialised additional data on why the call has ended
        @param profile: Profile associated
        """
        if self.sid is None:
            log.debug("there are no calls in progress")
            return
        if session_id != self.sid:
            log.debug(
                f"ignoring call_ended not linked to our call ({self.sid}): {session_id}"
            )
            return
        aio.run(self.end_call())

    async def on_call_accepted(self, session_id: str, sdp: str, profile: str) -> None:
        """Call has been accepted, connection can be established

        @param session_id: Session identifier
        @param sdp: Session Description Protocol data
        @param profile: Profile associated
        """
        if self.sid != session_id:
            log.debug(
                f"Call ignored due to different session ID ({self.sid=} {session_id=})"
            )
            return
        await self._peer_connection.setRemoteDescription({
            "type": "answer",
            "sdp": sdp
        })
        await self.on_ice_candidates_new(self.candidates_buffer)
        self.candidates_buffer.clear()

    def _on_ice_candidates_new(self, sid: str, candidates_s: str, profile: str) -> None:
        """Called when new ICE candidates are received

        @param sid: Session identifier
        @param candidates_s: ICE candidates serialized
        @param profile: Profile associated with the action
        """
        if sid != self.sid:
            log.debug(
                f"ignoring peer ice candidates for {sid=} ({self.sid=})."
            )
            return
        candidates = json.loads(candidates_s)
        aio.run(self.on_ice_candidates_new(candidates))

    async def on_ice_candidates_new(self, candidates: dict) -> None:
        """Called when new ICE canidates are received from peer

        @param candidates: Dictionary containing new ICE candidates
        """
        log.debug(f"new peer candidates received: {candidates}")
        if (
            self._peer_connection is None
            or self._peer_connection.remoteDescription is None
        ):
            for media_type in ("audio", "video"):
                media_candidates = candidates.get(media_type)
                if media_candidates:
                    buffer = self.candidates_buffer[media_type]
                    buffer["candidates"].extend(media_candidates["candidates"])
            return
        for media_type, ice_data in candidates.items():
            for candidate in ice_data["candidates"]:
                candidate_sdp = self.build_ice_candidate(candidate)
                try:
                    sdp_mline_index = self.get_sdp_mline_index(media_type)
                except Exception as e:
                    log.warning(e)
                    continue
                ice_candidate = window.RTCIceCandidate.new({
                    "candidate": candidate_sdp,
                    "sdpMLineIndex": sdp_mline_index
                }
                                                           )
                await self._peer_connection.addIceCandidate(ice_candidate)

    def on_track(self, event):
        """New track has been received from peer

        @param event: Event associated with the new track
        """
        if event.streams and event.streams[0]:
            remote_stream = event.streams[0]
            document["remote_video"].srcObject = remote_stream
        else:
            if self.remote_stream is None:
                self.remote_stream = window.MediaStream.new()
                document["remote_video"].srcObject = self.remote_stream
            self.remote_stream.addTrack(event.track)

        document["call_btn"].classList.add("is-hidden")
        document["hangup_btn"].classList.remove("is-hidden")

    def on_negotiation_needed(self, event) -> None:
        log.debug(f"on_negotiation_needed {event=}")
        # TODO

    async def answer_call(self, offer_sdp: str, action_id: str):
        """We respond to the call"""
        log.debug("answering call")
        await self._create_peer_connection()

        await self._peer_connection.setRemoteDescription({
            "type": "offer",
            "sdp": offer_sdp
        })
        await self.on_ice_candidates_new(self.candidates_buffer)
        self.candidates_buffer.clear()
        await self._get_user_media()

        # Gather local ICE candidates
        local_ice_data = await self._gather_ice_candidates(False)
        self.local_candidates = local_ice_data["candidates"]

        await bridge.action_launch(
            action_id,
            json.dumps({
                "sdp": self._peer_connection.localDescription.sdp,
            })
        )

    async def make_call(self, audio: bool = True, video: bool = True) -> None:
        """Start a WebRTC call

        @param audio: True if an audio flux is required
        @param video: True if a video flux is required
        """
        await self._create_peer_connection()
        await self._get_user_media(audio, video)
        await self._gather_ice_candidates(True)
        callee_jid = document["callee_jid"].value

        call_data = {
            "sdp": self._peer_connection.localDescription.sdp
        }
        log.info(f"calling {callee_jid!r}")
        self.sid = await bridge.call_start(
            callee_jid,
            json.dumps(call_data)
        )
        log.debug(f"Call SID: {self.sid}")

    async def end_call(self) -> None:
        """Stop streaming and clean instance"""
        document["hangup_btn"].classList.add("is-hidden")
        document["call_btn"].classList.remove("is-hidden")
        if self._peer_connection is None:
            log.debug("There is currently no call to end.")
        else:
            self._peer_connection.removeEventListener("track", self.on_track)
            self._peer_connection.removeEventListener("negotiationneeded", self.on_negotiation_needed)
            self._peer_connection.removeEventListener("icecandidate", self.on_ice_candidate)
            self._peer_connection.removeEventListener("icegatheringstatechange", self.on_ice_gathering_state_change)

            local_video = document["local_video"]
            remote_video = document["remote_video"]
            if local_video.srcObject:
                for track in local_video.srcObject.getTracks():
                    track.stop()
            if remote_video.srcObject:
                for track in remote_video.srcObject.getTracks():
                    track.stop()

            self._peer_connection.close()
            self.reset_instance()

    async def hand_up(self) -> None:
        """Terminate the call"""
        session_id = self.sid
        await self.end_call()
        await bridge.call_end(
            session_id,
            ""
        )


webrtc_call = WebRTCCall()

document["call_btn"].bind(
    "click",
    lambda __: aio.run(webrtc_call.make_call())
)
document["hangup_btn"].bind(
    "click",
    lambda __: aio.run(webrtc_call.hand_up())
)

bridge.register_signal("action_new", webrtc_call.on_action_new)
bridge.register_signal("call_accepted", webrtc_call._on_call_accepted)
bridge.register_signal("call_ended", webrtc_call._on_call_ended)
bridge.register_signal("ice_candidates_new", webrtc_call._on_ice_candidates_new)

loading.remove_loading_screen()