view libervia/web/pages/calls/_browser/webrtc.py @ 1552:c62027660ec1

doc (installation): update `pipx` instruction and remove `requirements.txt` mention
author Goffi <goffi@goffi.org>
date Wed, 09 Aug 2023 00:48:21 +0200
parents e47c24204449
children 83c2a6faa2ae
line wrap: on
line source

import json
import re

from bridge import AsyncBridge as Bridge
from browser import aio, console as log, document, timer, window
import errors
import  jid

log.warning = log.warn
profile = window.profile or ""
bridge = Bridge()
GATHER_TIMEOUT = 10000


class WebRTC:

    def __init__(self):
        self.reset_instance()
        bridge.register_signal("ice_candidates_new", self._on_ice_candidates_new)
        self.is_audio_muted = None
        self.is_video_muted = None
        self.local_video_elt = document["local_video"]
        self.remote_video_elt = document["remote_video"]

    def reset_instance(self):
        """Inits or resets the instance variables to their default state."""
        self._peer_connection = None
        self._media_types = None
        self._media_types_inv = None
        self._callee = None
        self.sid = None
        self.local_candidates = None
        self.remote_stream = None
        self.candidates_buffer = {
            "audio": {"candidates": []},
            "video": {"candidates": []},
        }
        self.media_candidates = {}
        self.candidates_gathered = aio.Future()

    @property
    def media_types(self):
        if self._media_types is None:
            raise Exception("self._media_types should not be None!")
        return self._media_types

    @media_types.setter
    def media_types(self, new_media_types: dict) -> None:
        self._media_types = new_media_types
        self._media_types_inv = {v:k for k,v in new_media_types.items()}

    @property
    def media_types_inv(self) -> dict:
        if self._media_types_inv is None:
            raise Exception("self._media_types_inv should not be None!")
        return self._media_types_inv

    def get_sdp_mline_index(self, media_type):
        """Gets the sdpMLineIndex for a given media type.

        @param media_type: The type of the media.
        """
        for index, m_type in self.media_types.items():
            if m_type == media_type:
                return index
        raise ValueError(f"Media type '{media_type}' not found")

    def extract_pwd_ufrag(self, sdp):
        """Retrieves ICE password and user fragment for SDP offer.

        @param sdp: The Session Description Protocol offer string.
        """
        ufrag_line = re.search(r"ice-ufrag:(\S+)", sdp)
        pwd_line = re.search(r"ice-pwd:(\S+)", sdp)

        if ufrag_line and pwd_line:
            return ufrag_line.group(1), pwd_line.group(1)
        else:
            log.error(f"SDP with missing ice-ufrag or ice-pwd:\n{sdp}")
            raise ValueError("Can't extract ice-ufrag and ice-pwd from SDP")

    def extract_fingerprint_data(self, sdp):
        """Retrieves fingerprint data from an SDP offer.

        @param sdp: The Session Description Protocol offer string.
        @return: A dictionary containing the fingerprint data.
        """
        fingerprint_line = re.search(r"a=fingerprint:(\S+)\s+(\S+)", sdp)
        if fingerprint_line:
            algorithm, fingerprint = fingerprint_line.groups()
            fingerprint_data = {
                "hash": algorithm,
                "fingerprint": fingerprint
            }

            setup_line = re.search(r"a=setup:(\S+)", sdp)
            if setup_line:
                setup = setup_line.group(1)
                fingerprint_data["setup"] = setup

            return fingerprint_data
        else:
            raise ValueError("fingerprint should not be missing")

    def parse_ice_candidate(self, candidate_string):
        """Parses the ice candidate string.

        @param candidate_string: The ice candidate string to be parsed.
        """
        pattern = re.compile(
            r"candidate:(?P<foundation>\S+) (?P<component_id>\d+) (?P<transport>\S+) "
            r"(?P<priority>\d+) (?P<address>\S+) (?P<port>\d+) typ "
            r"(?P<type>\S+)(?: raddr (?P<rel_addr>\S+) rport "
            r"(?P<rel_port>\d+))?(?: generation (?P<generation>\d+))?"
        )
        match = pattern.match(candidate_string)
        if match:
            candidate_dict = match.groupdict()

            # Apply the correct types to the dictionary values
            candidate_dict["component_id"] = int(candidate_dict["component_id"])
            candidate_dict["priority"] = int(candidate_dict["priority"])
            candidate_dict["port"] = int(candidate_dict["port"])

            if candidate_dict["rel_port"]:
                candidate_dict["rel_port"] = int(candidate_dict["rel_port"])

            if candidate_dict["generation"]:
                candidate_dict["generation"] = candidate_dict["generation"]

            # Remove None values
            return {k: v for k, v in candidate_dict.items() if v is not None}
        else:
            log.warning(f"can't parse candidate: {candidate_string!r}")
            return None

    def build_ice_candidate(self, parsed_candidate):
        """Builds ICE candidate

        @param parsed_candidate: Dictionary containing parsed ICE candidate
        """
        base_format = (
            "candidate:{foundation} {component_id} {transport} {priority} "
            "{address} {port} typ {type}"
        )

        if ((parsed_candidate.get('rel_addr')
             and parsed_candidate.get('rel_port'))):
            base_format += " raddr {rel_addr} rport {rel_port}"

        if parsed_candidate.get('generation'):
            base_format += " generation {generation}"

        return base_format.format(**parsed_candidate)

    def on_ice_candidate(self, event):
        """Handles ICE candidate event

        @param event: Event containing the ICE candidate
        """
        log.debug(f"on ice candidate {event.candidate=}")
        if event.candidate and event.candidate.candidate:
            window.last_event = event
            parsed_candidate = self.parse_ice_candidate(event.candidate.candidate)
            if parsed_candidate is None:
                return
            try:
                media_type = self.media_types[event.candidate.sdpMLineIndex]
            except (TypeError, IndexError):
                log.error(
                    f"Can't find media type.\n{event.candidate=}\n{self._media_types=}"
                )
                return
            self.media_candidates.setdefault(media_type, []).append(parsed_candidate)
            log.debug(f"ICE candidate [{media_type}]: {event.candidate.candidate}")
        else:
            log.debug("All ICE candidates gathered")

    def _set_media_types(self, offer):
        """Sets media types from offer SDP

        @param offer: RTC session description containing the offer
        """
        sdp_lines = offer.sdp.splitlines()
        media_types = {}
        mline_index = 0

        for line in sdp_lines:
            if line.startswith("m="):
                media_types[mline_index] = line[2:line.find(" ")]
                mline_index += 1

        self.media_types = media_types

    def on_ice_gathering_state_change(self, event):
        """Handles ICE gathering state change

        @param event: Event containing the ICE gathering state change
        """
        connection = event.target
        log.debug(f"on_ice_gathering_state_change {connection.iceGatheringState=}")
        if connection.iceGatheringState == "complete":
            log.info("ICE candidates gathering done")
            self.candidates_gathered.set_result(None)

    async def _create_peer_connection(
        self,
    ):
        """Creates peer connection"""
        if self._peer_connection is not None:
            raise Exception("create_peer_connection can't be called twice!")

        external_disco = json.loads(await bridge.external_disco_get(""))
        ice_servers = []

        for server in external_disco:
            ice_server = {}
            if server["type"] == "stun":
                ice_server["urls"] = f"stun:{server['host']}:{server['port']}"
            elif server["type"] == "turn":
                ice_server["urls"] = (
                    f"turn:{server['host']}:{server['port']}?transport={server['transport']}"
                )
                ice_server["username"] = server["username"]
                ice_server["credential"] = server["password"]
            ice_servers.append(ice_server)

        rtc_configuration = {"iceServers": ice_servers}

        peer_connection = window.RTCPeerConnection.new(rtc_configuration)
        peer_connection.addEventListener("track", self.on_track)
        peer_connection.addEventListener("negotiationneeded", self.on_negotiation_needed)
        peer_connection.addEventListener("icecandidate", self.on_ice_candidate)
        peer_connection.addEventListener("icegatheringstatechange", self.on_ice_gathering_state_change)

        self._peer_connection = peer_connection
        window.pc = self._peer_connection

    async def _get_user_media(
        self,
        audio: bool = True,
        video: bool = True
    ):
        """Gets user media

        @param audio: True if an audio flux is required
        @param video: True if a video flux is required
        """
        media_constraints = {'audio': audio, 'video': video}
        local_stream = await window.navigator.mediaDevices.getUserMedia(media_constraints)
        self.local_video_elt.srcObject = local_stream

        for track in local_stream.getTracks():
            self._peer_connection.addTrack(track)

    async def _gather_ice_candidates(self, is_initiator: bool, remote_candidates=None):
        """Get ICE candidates and wait to have them all before returning them

        @param is_initiator: Boolean indicating if the user is the initiator of the connection
        @param remote_candidates: Remote ICE candidates, if any
        """
        if self._peer_connection is None:
            raise Exception("The peer connection must be created before gathering ICE candidates!")

        self.media_candidates.clear()
        gather_timeout = timer.set_timeout(
            lambda: self.candidates_gathered.set_exception(
                errors.TimeoutError("ICE gathering time out")
            ),
            GATHER_TIMEOUT
        )

        if is_initiator:
            offer = await self._peer_connection.createOffer()
            self._set_media_types(offer)
            await self._peer_connection.setLocalDescription(offer)
        else:
            answer = await self._peer_connection.createAnswer()
            self._set_media_types(answer)
            await self._peer_connection.setLocalDescription(answer)

        if not is_initiator:
            log.debug(self._peer_connection.localDescription.sdp)
        await self.candidates_gathered
        log.debug(self._peer_connection.localDescription.sdp)
        timer.clear_timeout(gather_timeout)
        ufrag, pwd = self.extract_pwd_ufrag(self._peer_connection.localDescription.sdp)
        return {
            "ufrag": ufrag,
            "pwd": pwd,
            "candidates": self.media_candidates,
        }

    async def accept_call(self, session_id: str, sdp: str, profile: str) -> None:
        """Call has been accepted, connection can be established

        @param session_id: Session identifier
        @param sdp: Session Description Protocol data
        @param profile: Profile associated
        """
        await self._peer_connection.setRemoteDescription({
            "type": "answer",
            "sdp": sdp
        })
        await self.on_ice_candidates_new(self.candidates_buffer)
        self.candidates_buffer.clear()

    def _on_ice_candidates_new(self, sid: str, candidates_s: str, profile: str) -> None:
        """Called when new ICE candidates are received

        @param sid: Session identifier
        @param candidates_s: ICE candidates serialized
        @param profile: Profile associated with the action
        """
        if sid != self.sid:
            log.debug(
                f"ignoring peer ice candidates for {sid=} ({self.sid=})."
            )
            return
        candidates = json.loads(candidates_s)
        aio.run(self.on_ice_candidates_new(candidates))

    async def on_ice_candidates_new(self, candidates: dict) -> None:
        """Called when new ICE canidates are received from peer

        @param candidates: Dictionary containing new ICE candidates
        """
        log.debug(f"new peer candidates received: {candidates}")
        if (
            self._peer_connection is None
            or self._peer_connection.remoteDescription is None
        ):
            for media_type in ("audio", "video"):
                media_candidates = candidates.get(media_type)
                if media_candidates:
                    buffer = self.candidates_buffer[media_type]
                    buffer["candidates"].extend(media_candidates["candidates"])
            return
        for media_type, ice_data in candidates.items():
            for candidate in ice_data["candidates"]:
                candidate_sdp = self.build_ice_candidate(candidate)
                try:
                    sdp_mline_index = self.get_sdp_mline_index(media_type)
                except Exception as e:
                    log.warning(e)
                    continue
                ice_candidate = window.RTCIceCandidate.new({
                    "candidate": candidate_sdp,
                    "sdpMLineIndex": sdp_mline_index
                }
                                                           )
                await self._peer_connection.addIceCandidate(ice_candidate)

    def on_track(self, event):
        """New track has been received from peer

        @param event: Event associated with the new track
        """
        if event.streams and event.streams[0]:
            remote_stream = event.streams[0]
            self.remote_video_elt.srcObject = remote_stream
        else:
            if self.remote_stream is None:
                self.remote_stream = window.MediaStream.new()
                self.remote_video_elt.srcObject = self.remote_stream
            self.remote_stream.addTrack(event.track)

    def on_negotiation_needed(self, event) -> None:
        log.debug(f"on_negotiation_needed {event=}")
        # TODO

    async def answer_call(self, sid: str, offer_sdp: str, profile: str):
        """We respond to the call"""
        log.debug("answering call")
        if sid != self.sid:
            raise Exception(
                f"Internal Error: unexpected sid: {sid=} {self.sid=}"
            )
        await self._create_peer_connection()

        await self._peer_connection.setRemoteDescription({
            "type": "offer",
            "sdp": offer_sdp
        })
        await self.on_ice_candidates_new(self.candidates_buffer)
        self.candidates_buffer.clear()
        await self._get_user_media()

        # Gather local ICE candidates
        local_ice_data = await self._gather_ice_candidates(False)
        self.local_candidates = local_ice_data["candidates"]

        await bridge.call_answer_sdp(sid, self._peer_connection.localDescription.sdp)

    async def make_call(
        self,
        callee_jid: jid.JID,
        audio: bool = True,
        video: bool = True
    ) -> None:
        """Start a WebRTC call

        @param audio: True if an audio flux is required
        @param video: True if a video flux is required
        """
        await self._create_peer_connection()
        await self._get_user_media(audio, video)
        await self._gather_ice_candidates(True)

        call_data = {
            "sdp": self._peer_connection.localDescription.sdp
        }
        log.info(f"calling {callee_jid!r}")
        self.sid = await bridge.call_start(
            str(callee_jid),
            json.dumps(call_data)
        )
        log.debug(f"Call SID: {self.sid}")

    async def end_call(self) -> None:
        """Stop streaming and clean instance"""
        if self._peer_connection is None:
            log.debug("There is currently no call to end.")
        else:
            self._peer_connection.removeEventListener("track", self.on_track)
            self._peer_connection.removeEventListener("negotiationneeded", self.on_negotiation_needed)
            self._peer_connection.removeEventListener("icecandidate", self.on_ice_candidate)
            self._peer_connection.removeEventListener("icegatheringstatechange", self.on_ice_gathering_state_change)

            # Base64 encoded 1x1 black pixel image
            # this is a trick to reset the image displayed, so we don't see last image of
            # last stream
            black_image_data = (
                ""
                "lEQVR42mP8/wcAAwAB/uzNq7sAAAAASUVORK5CYII="
            )

            local_video = self.local_video_elt
            remote_video = self.remote_video_elt
            if local_video.srcObject:
                for track in local_video.srcObject.getTracks():
                    track.stop()
            local_video.src = black_image_data

            if remote_video.srcObject:
                for track in remote_video.srcObject.getTracks():
                    track.stop()
            remote_video.src = black_image_data

            self._peer_connection.close()
        self.reset_instance()

    def toggle_media_mute(self, media_type: str) -> bool:
        """Toggle mute/unmute for media tracks.

        @param media_type: 'audio' or 'video'. Determines which media tracks
        to process.
        """
        assert media_type in ("audio", "video"), "Invalid media type"

        local_video = self.local_video_elt
        is_muted_attr = f"is_{media_type}_muted"

        if local_video.srcObject:
            track_getter = getattr(local_video.srcObject, f"get{media_type.capitalize()}Tracks")
            for track in track_getter():
                track.enabled = not track.enabled
                setattr(self, is_muted_attr, not track.enabled)

        media_name = self.media_types_inv.get(media_type)
        if media_name is not None:
            extra = {"name": str(media_name)}
            aio.run(
                bridge.call_info(
                    self.sid,
                    "mute" if getattr(self, is_muted_attr) else "unmute",
                    json.dumps(extra),
                )
            )

        return getattr(self, is_muted_attr)

    def toggle_audio_mute(self) -> bool:
        """Toggle mute/unmute for audio tracks."""
        return self.toggle_media_mute("audio")

    def toggle_video_mute(self) -> bool:
        """Toggle mute/unmute for video tracks."""
        return self.toggle_media_mute("video")