view libervia/web/pages/calls/_browser/webrtc.py @ 1607:64bfb6554131

doc (installation): add missing pipx link
author Goffi <goffi@goffi.org>
date Thu, 30 May 2024 23:56:05 +0200
parents 4a9679369856
children 6bfeb9f0fb84
line wrap: on
line source

import json
import re

from bridge import AsyncBridge as Bridge
from browser import aio, console as log, document, window, DOMNode
import dialog
from javascript import JSObject, NULL
import jid
from js_modules.cbor_x import CBOR

log.warning = log.warn
profile = window.profile or ""
bridge = Bridge()


class FileSender:
    CHUNK_SIZE = 64 * 1024

    def __init__(self, session_id, file, data_channel):
        self.session_id = session_id
        self.file = file
        self.data_channel = data_channel
        data_channel.bind("open", self._on_open)
        self.offset = 0

    def _on_open(self, __):
        log.info(f"Data channel open, starting to send {self.file.name}.")
        self.send_file()

    def _on_reader_load(self, event):
        self.data_channel.send(event.target.result)
        self.offset += self.CHUNK_SIZE
        self.send_file()

    def send_file(self):
        if self.offset < self.file.size:
            chunk = self.file.slice(self.offset, self.offset + self.CHUNK_SIZE)
            reader = window.FileReader.new()
            reader.onload = self._on_reader_load
            reader.readAsArrayBuffer(chunk)
        else:
            log.info(f"file {self.file.name} sent.")
            self.data_channel.close()
            if self.session_id is not None:
                aio.run(bridge.call_end(self.session_id, ""))


class FileReceiver:

    def __init__(self, session_id: str, data_channel, extra_data: dict) -> None:
        """Initializes the file receiver with a data channel.

        @param data_channel: The RTCDataChannel through which file data is received.
        """
        self.session_id = session_id
        self.data_channel = data_channel
        self.received_chunks = []
        self.file_data = extra_data.get("file_data", {})
        data_channel.bind("message", self._on_message)
        data_channel.bind("close", self._on_close)
        log.debug("File receiver created.")

    def _on_message(self, event) -> None:
        """Handles incoming message events from the data channel.

        @param event: The event containing the data chunk.
        """
        self.received_chunks.append(event.data)

    def _on_close(self, __) -> None:
        """Handles the data channel's close event.

        Assembles the received chunks into a Blob and triggers a file download.
        """
        # The file is complete, we assemble the chunks in a blob
        blob = window.Blob.new(self.received_chunks)
        url = window.URL.createObjectURL(blob)

        # and create the <a> element to download the file.
        a = document.createElement("a")
        a.href = url
        a.download = self.file_data.get("name", "received_file")
        document.body.appendChild(a)
        a.click()

        # We now clean up.
        document.body.removeChild(a)
        window.URL.revokeObjectURL(url)
        log.info("File received.")
        aio.run(bridge.call_end(self.session_id, ""))


class RemoteControler:
    """Send input events to controlled device"""

    def __init__(
        self,
        session_id: str,
        capture_elt: DOMNode,
        data_channel: JSObject,
        simulate_mouse: bool = True
    ) -> None:
        """Initialize a RemoteControler instance.

        @param session_id: ID of the Jingle Session
        @param capture_elt: element where the input events are captured.
        @param data_channel: WebRTCDataChannel instance linking to controlled device.
        @simulate_mouse: if True, touch event will be converted to mouse events.
        """
        self.session_id = session_id
        self.capture_elt = capture_elt
        self.capture_elt.bind("click", self._on_capture_elt_click)
        self.data_channel = data_channel
        data_channel.bind("open", self._on_open)
        self.simulate_mouse = simulate_mouse
        self.last_mouse_position = (0, 0)

    def _on_capture_elt_click(self, __):
        self.capture_elt.focus()

    def _on_open(self, __):
        log.info(f"Data channel open, starting to send inputs.")
        self.start_capture()

    def start_capture(self) -> None:
        """Start capturing input events to send them to the controlled device."""
        for event_name in [
                "mousedown",
                "mouseup",
                "mousemove",
                "keydown",
                "keyup",
                "touchstart",
                "touchend",
                "touchmove",
                "wheel",
        ]:
            self.capture_elt.bind(event_name, self._send_event)
        self.capture_elt.focus()

    def get_stream_coordinates(self, client_x: float, client_y: float) -> tuple[float, float]:
        """Calculate coordinates relative to the actual video stream.

        This method calculates the coordinates relative to the video content inside the <video>
        element, considering any scaling or letterboxing due to aspect ratio differences.

        @param client_x: The clientX value from the event, relative to the viewport.
        @param client_y: The clientY value from the event, relative to the viewport.
        @return: The x and y coordinates relative to the actual video stream.
        """
        video_element = self.capture_elt
        video_rect = video_element.getBoundingClientRect()

        # Calculate offsets relative to the capture element
        element_x = client_x - video_rect.left
        element_y = client_y - video_rect.top

        element_width, element_height = video_rect.width, video_rect.height
        stream_width, stream_height = video_element.videoWidth, video_element.videoHeight

        if not all((element_width, element_height, stream_width, stream_height)):
            log.warning("Invalid dimensions for video or element, using offsets.")
            return element_x, element_y

        element_aspect_ratio = element_width / element_height
        stream_aspect_ratio = stream_width / stream_height

        # Calculate scale and offset based on aspect ratio comparison
        if stream_aspect_ratio > element_aspect_ratio:
            # Video is more "wide" than the element: letterboxes will be top and bottom
            scale = element_width / stream_width
            scaled_height = stream_height * scale
            offset_x, offset_y = 0, (element_height - scaled_height) / 2
        else:
            # Video is more "tall" than the element: letterboxes will be on the sides
            scale = element_height / stream_height
            scaled_width = stream_width * scale
            offset_x, offset_y = (element_width - scaled_width) / 2, 0

        # Mouse coordinates relative to the video stream
        x = (element_x - offset_x) / scale
        y = (element_y - offset_y) / scale

        # Ensure the coordinates are within the bounds of the video stream
        x = max(0.0, min(x, stream_width))
        y = max(0.0, min(y, stream_height))

        return x, y

    def _send_event(self, event: JSObject) -> None:
        """Serialize and send the event to the controlled device through the data channel."""
        event.preventDefault()
        serialized_event = self._serialize_event(event)
        # TODO: we should join events instead
        self.data_channel.send(CBOR.encode(serialized_event))

    def _serialize_event(self, event: JSObject) -> dict[str, object]:
        """Serialize event data for transmission.

        @param event: an input event.
        @return: event data to send to peer.
        """
        if event.type.startswith("key"):
            ret = {
                "type": event.type,
                "key": event.key,
            }
            if event.location:
                ret["location"] = event.location
            return ret
        elif event.type.startswith("mouse"):
            x, y = self.get_stream_coordinates(event.clientX, event.clientY)
            return {
                "type": event.type,
                "buttons": event.buttons,
                "x": x,
                "y": y,
            }
        elif event.type.startswith("touch"):
            touches =  [
                {
                    "identifier": touch.identifier,
                    **dict(zip(["x", "y"], self.get_stream_coordinates(
                        touch.clientX,
                        touch.clientY
                    ))),
                }
                for touch in event.touches
            ]
            if self.simulate_mouse:
                # In simulate mouse mode, we send mouse events.
                if touches:
                    touch_data = touches[0]
                    x, y = touch_data["x"], touch_data["y"]
                    self.last_mouse_position = (x, y)
                else:
                    x, y = self.last_mouse_position

                mouse_event: dict[str, object] = {
                    "x": x,
                    "y": y,
                }
                if event.type == "touchstart":
                    mouse_event.update({
                        "type": "mousedown",
                        "buttons": 1,
                    })
                elif event.type == "touchend":
                    mouse_event.update({
                        "type": "mouseup",
                        "buttons": 1,
                    })
                elif event.type == "touchmove":
                    mouse_event.update({
                        "type": "mousemove",
                    })
                return mouse_event
            else:
                # Normal mode, with send touch events.
                return {
                    "type": event.type,
                    "touches": touches
                }
        elif event.type == "wheel":
            return {
                "type": event.type,
                "deltaX": event.deltaX,
                "deltaY": event.deltaY,
                "deltaZ": event.deltaZ,
                "deltaMode": event.deltaMode,
            }
        else:
            raise Exception(f"Internal Error: unexpected event {event.type=}")


class WebRTC:

    def __init__(
        self,
        screen_sharing_cb=None,
        on_connection_established_cb=None,
        on_reconnect_cb=None,
        on_connection_lost_cb=None,
        on_video_devices=None,
        on_reset_cb=None,
        file_only: bool = False,
        extra_data: dict | None = None,
        local_video_elt = None,
        remote_video_elt = None,
        local_stream = None
    ):
        """Initialise WebRTC instance.

        @param screen_sharing_cb: callable function for screen sharing event
        @param on_connection_established_cb: callable function for connection established
            event
        @param on_reconnect_cb: called when a reconnection is triggered.
        @param on_connection_lost_cb: called when the connection is lost.
        @param on_video_devices: called when new video devices are set.
        @param on_reset_cb: called on instance reset.
        @param file_only: indicates a file transfer only session.
        @param extra_data: optional dictionary containing additional data.
            Notably used for file transfer, where ``file_data`` key is used.
        """
        # reset
        self.on_reset_cb = on_reset_cb
        self.reset_instance()

        # ICE events
        bridge.register_signal("ice_candidates_new", self._on_ice_candidates_new)
        bridge.register_signal("ice_restart", self._on_ice_restart)

        # connection events callbacks
        self.on_connection_established_cb = on_connection_established_cb
        self.on_reconnect_cb = on_reconnect_cb
        self.on_connection_lost_cb = on_connection_lost_cb

        # video devices
        self.on_video_devices = on_video_devices
        self.video_devices = []
        self.has_multiple_cameras = False
        self.current_camera = None

        self.file_only = file_only
        if not file_only:
            # Initially populate the video devices list
            aio.run(self._populate_video_devices())

            # video elements
            self.local_video_elt = local_video_elt
            assert remote_video_elt is not None
            self.remote_video_elt = remote_video_elt
        else:
            self.file_sender = None

        self.local_stream = local_stream

        # muting
        self.is_audio_muted = None
        self.is_video_muted = None

        # screen sharing
        self._is_sharing_screen = False
        self.screen_sharing_cb = screen_sharing_cb

        # extra
        if extra_data is None:
            extra_data = {}
        self.extra_data = extra_data

    @property
    def is_sharing_screen(self) -> bool:
        return self._is_sharing_screen

    @is_sharing_screen.setter
    def is_sharing_screen(self, sharing: bool) -> None:
        if sharing != self._is_sharing_screen:
            self._is_sharing_screen = sharing
            if self.screen_sharing_cb is not None:
                self.screen_sharing_cb(sharing)

    def reset_instance(self):
        """Inits or resets the instance variables to their default state."""
        self._peer_connection = None
        self._media_types = None
        self._media_types_inv = None
        self.ufrag = None
        self.pwd = None
        self.sid = None
        self.local_candidates = None
        self.remote_stream = None
        self.remote_candidates_buffer = {
            "audio": {"candidates": []},
            "video": {"candidates": []},
            "application": {"candidates": []},
        }
        self.local_candidates_buffer = {}
        self.media_candidates = {}
        if self.on_reset_cb is not None:
            self.on_reset_cb()

    async def _populate_video_devices(self):
        devices = await window.navigator.mediaDevices.enumerateDevices()
        devices_ids = set()
        self.video_devices.clear()
        for device in devices:
            if device.kind != "videoinput":
                continue
            # we can have multiple devices with same IDs in some corner cases (e.g.
            # infrared camera)
            device_id = device.deviceId
            if device_id in devices_ids:
                continue
            devices_ids.add(device_id)
            self.video_devices.append(device)
        self.has_multiple_cameras = len(self.video_devices) > 1
        if self.on_video_devices is not None:
            self.on_video_devices(self.has_multiple_cameras)
        # Set the initial camera to the default (usually front on mobile)
        if self.video_devices:
            self.current_camera = self.video_devices[0].deviceId
        log.debug(
            f"devices populated: {self.video_devices=} {self.has_multiple_cameras=}"
        )

    @property
    def media_types(self):
        if self._media_types is None:
            raise Exception("self._media_types should not be None!")
        return self._media_types

    @media_types.setter
    def media_types(self, new_media_types: dict) -> None:
        self._media_types = new_media_types
        self._media_types_inv = {v: k for k, v in new_media_types.items()}

    @property
    def media_types_inv(self) -> dict:
        if self._media_types_inv is None:
            raise Exception("self._media_types_inv should not be None!")
        return self._media_types_inv

    def get_sdp_mline_index(self, media_type):
        """Gets the sdpMLineIndex for a given media type.

        @param media_type: The type of the media.
        """
        for index, m_type in self.media_types.items():
            if m_type == media_type:
                return index
        raise ValueError(f"Media type '{media_type}' not found")

    def extract_ufrag_pwd(self, sdp: str) -> tuple[str, str]:
        """Retrieves ICE password and user fragment for SDP offer.

        @param sdp: The Session Description Protocol offer string.
        @return: ufrag and pwd
        @raise ValueError: Can't extract ufrag and password
        """
        ufrag_line = re.search(r"ice-ufrag:(\S+)", sdp)
        pwd_line = re.search(r"ice-pwd:(\S+)", sdp)

        if ufrag_line and pwd_line:
            ufrag = self.ufrag = ufrag_line.group(1)
            pwd = self.pwd = pwd_line.group(1)
            return ufrag, pwd
        else:
            log.error(f"SDP with missing ice-ufrag or ice-pwd:\n{sdp}")
            raise ValueError("Can't extract ice-ufrag and ice-pwd from SDP")

    def extract_fingerprint_data(self, sdp):
        """Retrieves fingerprint data from an SDP offer.

        @param sdp: The Session Description Protocol offer string.
        @return: A dictionary containing the fingerprint data.
        """
        fingerprint_line = re.search(r"a=fingerprint:(\S+)\s+(\S+)", sdp)
        if fingerprint_line:
            algorithm, fingerprint = fingerprint_line.groups()
            fingerprint_data = {"hash": algorithm, "fingerprint": fingerprint}

            setup_line = re.search(r"a=setup:(\S+)", sdp)
            if setup_line:
                setup = setup_line.group(1)
                fingerprint_data["setup"] = setup

            return fingerprint_data
        else:
            raise ValueError("fingerprint should not be missing")

    def parse_ice_candidate(self, candidate_string):
        """Parses the ice candidate string.

        @param candidate_string: The ice candidate string to be parsed.
        """
        pattern = re.compile(
            r"candidate:(?P<foundation>\S+) (?P<component_id>\d+) (?P<transport>\S+) "
            r"(?P<priority>\d+) (?P<address>\S+) (?P<port>\d+) typ "
            r"(?P<type>\S+)(?: raddr (?P<rel_addr>\S+) rport "
            r"(?P<rel_port>\d+))?(?: generation (?P<generation>\d+))?"
        )
        match = pattern.match(candidate_string)
        if match:
            candidate_dict = match.groupdict()

            # Apply the correct types to the dictionary values
            candidate_dict["component_id"] = int(candidate_dict["component_id"])
            candidate_dict["priority"] = int(candidate_dict["priority"])
            candidate_dict["port"] = int(candidate_dict["port"])

            if candidate_dict["rel_port"]:
                candidate_dict["rel_port"] = int(candidate_dict["rel_port"])

            if candidate_dict["generation"]:
                candidate_dict["generation"] = candidate_dict["generation"]

            # Remove None values
            return {k: v for k, v in candidate_dict.items() if v is not None}
        else:
            log.warning(f"can't parse candidate: {candidate_string!r}")
            return None

    def build_ice_candidate(self, parsed_candidate):
        """Builds ICE candidate

        @param parsed_candidate: Dictionary containing parsed ICE candidate
        """
        base_format = (
            "candidate:{foundation} {component_id} {transport} {priority} "
            "{address} {port} typ {type}"
        )

        if parsed_candidate.get("rel_addr") and parsed_candidate.get("rel_port"):
            base_format += " raddr {rel_addr} rport {rel_port}"

        if parsed_candidate.get("generation"):
            base_format += " generation {generation}"

        return base_format.format(**parsed_candidate)

    def on_ice_candidate(self, event):
        """Handles ICE candidate event

        @param event: Event containing the ICE candidate
        """
        log.debug(f"on ice candidate {event.candidate=}")
        if event.candidate and event.candidate.candidate:
            parsed_candidate = self.parse_ice_candidate(event.candidate.candidate)
            if parsed_candidate is None:
                return
            try:
                media_type = self.media_types[event.candidate.sdpMLineIndex]
            except (TypeError, IndexError):
                log.error(
                    f"Can't find media type.\n{event.candidate=}\n{self._media_types=}"
                )
                return
            self.media_candidates.setdefault(media_type, []).append(parsed_candidate)
            log.debug(f"ICE candidate [{media_type}]: {event.candidate.candidate}")
            if self.sid is None:
                log.debug("buffering candidate")
                self.local_candidates_buffer.setdefault(media_type, []).append(
                    parsed_candidate
                )
            else:
                ufrag, pwd = self.extract_ufrag_pwd(
                    self._peer_connection.localDescription.sdp
                )

                ice_data = {"ufrag": ufrag, "pwd": pwd, "candidates": [parsed_candidate]}
                aio.run(
                    bridge.ice_candidates_add(
                        self.sid, json.dumps({media_type: ice_data})
                    )
                )

        else:
            log.debug("All ICE candidates gathered")

    def on_ice_connection_state_change(self, event):
        """Log ICE connection change, mainly used for debugging"""
        state = self._peer_connection.iceConnectionState
        log.info(f"ICE Connection State changed to: {state}")

        if state == "connected":
            if self.on_connection_established_cb is not None:
                self.on_connection_established_cb()
        elif state == "failed":
            log.error(
                "ICE connection failed. Check network connectivity and ICE"
                " configurations."
            )
        elif state == "disconnected":
            log.warning("ICE connection was disconnected.")
            if self.on_connection_lost_cb is not None:
                self.on_connection_lost_cb()

    def on_ice_candidate_error(self, event):
        """Log ICE error, useful for debugging"""
        log.error(f"ICE Candidate Error: {event.errorText} (Code: {event.errorCode})")
        log.debug(
            f"URL: {event.url}, Host candidate: {event.hostCandidate}, Port: {event.port}"
        )

    def _set_media_types(self, offer):
        """Sets media types from offer SDP

        @param offer: RTC session description containing the offer
        """
        sdp_lines = offer.sdp.splitlines()
        media_types = {}
        mline_index = 0

        for line in sdp_lines:
            if line.startswith("m="):
                media_types[mline_index] = line[2 : line.find(" ")]
                mline_index += 1

        self.media_types = media_types

    def on_ice_gathering_state_change(self, event):
        """Handles ICE gathering state change

        @param event: Event containing the ICE gathering state change
        """
        connection = event.target
        log.debug(f"on_ice_gathering_state_change {connection.iceGatheringState=}")
        if connection.iceGatheringState == "complete":
            log.info("ICE candidates gathering done")

    async def _create_peer_connection(
        self,
    ) -> JSObject:
        """Creates peer connection"""
        if self._peer_connection is not None:
            raise Exception("create_peer_connection can't be called twice!")

        external_disco = json.loads(await bridge.external_disco_get(""))
        ice_servers = []

        for server in external_disco:
            ice_server = {}
            if server["type"] == "stun":
                ice_server["urls"] = f"stun:{server['host']}:{server['port']}"
            elif server["type"] == "turn":
                ice_server["urls"] = (
                    f"turn:{server['host']}:{server['port']}?transport={server['transport']}"
                )
                ice_server["username"] = server["username"]
                ice_server["credential"] = server["password"]
            ice_servers.append(ice_server)

        rtc_configuration = {"iceServers": ice_servers}

        peer_connection = window.RTCPeerConnection.new(rtc_configuration)
        peer_connection.addEventListener("track", self.on_track)
        peer_connection.addEventListener("negotiationneeded", self.on_negotiation_needed)
        peer_connection.addEventListener(
            "iceconnectionstatechange", self.on_ice_connection_state_change
        )
        peer_connection.addEventListener("icecandidate", self.on_ice_candidate)
        peer_connection.addEventListener("icecandidateerror", self.on_ice_candidate_error)
        peer_connection.addEventListener(
            "icegatheringstatechange", self.on_ice_gathering_state_change
        )

        self._peer_connection = peer_connection
        window.pc = self._peer_connection
        return peer_connection

    async def _get_user_media(self, audio: bool = True, video: bool = True) -> None:
        """
        Gets user media (camera and microphone).

        @param audio: True if an audio flux is required.
        @param video: True if a video flux is required.
        """
        media_constraints = {"audio": audio, "video": video}
        if self.local_stream is None:
            self.local_stream = await window.navigator.mediaDevices.getUserMedia(
                media_constraints
            )

        if not self.local_stream:
            log.error("Failed to get the media stream.")
            return

        if self.local_video_elt is not None:
            self.local_video_elt.srcObject = self.local_stream

        for track in self.local_stream.getTracks():
            self._peer_connection.addTrack(track)

    async def _replace_user_video(
        self,
        screen: bool = False,
    ) -> JSObject | None:
        """Replaces the user video track with either a camera or desktop sharing track.

        @param screen: True if desktop sharing is required. False will use the camera.
        @return: The local media stream or None if failed.
        """
        if screen:
            media_constraints = {"video": {"cursor": "always"}}
            new_stream = await window.navigator.mediaDevices.getDisplayMedia(
                media_constraints
            )
        else:
            if self.local_video_elt is not None and self.local_video_elt.srcObject:
                for track in self.local_video_elt.srcObject.getTracks():
                    if track.kind == "video":
                        track.stop()

            media_constraints = {
                "video": (
                    {"deviceId": self.current_camera} if self.current_camera else True
                )
            }

            new_stream = await window.navigator.mediaDevices.getUserMedia(
                media_constraints
            )

        if not new_stream:
            log.error("Failed to get the media stream.")
            return None

        new_video_tracks = [
            track for track in new_stream.getTracks() if track.kind == "video"
        ]

        if not new_video_tracks:
            log.error("Failed to retrieve the video track from the new stream.")
            return None

        # Retrieve the current local stream's video track.
        if self.local_video_elt is None:
            local_stream = None
        else:
            local_stream = self.local_video_elt.srcObject
        if local_stream:
            local_video_tracks = [
                track for track in local_stream.getTracks() if track.kind == "video"
            ]
            if local_video_tracks:
                # Remove the old video track and add the new one to the local stream.
                local_stream.removeTrack(local_video_tracks[0])
                local_stream.addTrack(new_video_tracks[0])

        video_sender = next(
            (
                sender
                for sender in self._peer_connection.getSenders()
                if sender.track and sender.track.kind == "video"
            ),
            None,
        )
        if video_sender:
            await video_sender.replaceTrack(new_video_tracks[0])

            if screen:
                # For screen sharing, we track the end event to properly stop the sharing
                # when the user clicks on the browser's stop sharing dialog.
                def on_track_ended(event):
                    aio.run(self.toggle_screen_sharing())

                new_video_tracks[0].bind("ended", on_track_ended)

        self.is_sharing_screen = screen

        return local_stream

    async def switch_camera(self) -> None:
        """Switches to the next available camera.

        This method cycles through the list of available video devices, replaces the
        current video track with the next one in the user's local video element, and then
        updates the sender's track in the peer connection. If there's only one camera or
        if an error occurs while switching, the method logs the error and does nothing.
        """
        log.info("switching camera")
        if not self.has_multiple_cameras:
            log.error("No multiple cameras to switch.")
            return

        current_camera_index = -1
        for i, device_info in enumerate(self.video_devices):
            if device_info.deviceId == self.current_camera:
                current_camera_index = i
                break

        if current_camera_index == -1:
            log.error("Current camera not found in available devices.")
            return

        # Switch to the next camera in the list
        next_camera_index = (current_camera_index + 1) % len(self.video_devices)
        self.current_camera = self.video_devices[next_camera_index].deviceId
        log.debug(f"{next_camera_index=} {self.current_camera=}")

        new_stream = await window.navigator.mediaDevices.getUserMedia(
            {"video": {"deviceId": self.current_camera}}
        )

        new_video_tracks = [
            track for track in new_stream.getTracks() if track.kind == "video"
        ]

        if not new_video_tracks:
            log.error("Failed to retrieve the video track from the new stream.")
            return

        # Update local video element's stream
        if self.local_video_elt is None:
            local_stream = None
        else:
            local_stream = self.local_video_elt.srcObject
        if local_stream:
            local_video_tracks = [
                track for track in local_stream.getTracks() if track.kind == "video"
            ]
            if local_video_tracks:
                local_video_tracks[0].stop()
                local_stream.removeTrack(local_video_tracks[0])
                local_stream.addTrack(new_video_tracks[0])
            self.local_video_elt.srcObject = local_stream

        # update remove video stream
        video_sender = next(
            (
                sender
                for sender in self._peer_connection.getSenders()
                if sender.track and sender.track.kind == "video"
            ),
            None,
        )

        if video_sender:
            await video_sender.replaceTrack(new_video_tracks[0])

    async def _gather_ice_candidates(self, is_initiator: bool, remote_candidates=None):
        """Get ICE candidates and wait to have them all before returning them

        @param is_initiator: Boolean indicating if the user is the initiator of the connection
        @param remote_candidates: Remote ICE candidates, if any
        """
        if self._peer_connection is None:
            raise Exception(
                "The peer connection must be created before gathering ICE candidates!"
            )

        self.media_candidates.clear()

        if is_initiator:
            offer = await self._peer_connection.createOffer()
            self._set_media_types(offer)
            await self._peer_connection.setLocalDescription(offer)
        else:
            answer = await self._peer_connection.createAnswer()
            self._set_media_types(answer)
            await self._peer_connection.setLocalDescription(answer)

        if not is_initiator:
            log.debug(self._peer_connection.localDescription.sdp)
        log.debug(self._peer_connection.localDescription.sdp)
        ufrag, pwd = self.extract_ufrag_pwd(self._peer_connection.localDescription.sdp)
        return {
            "ufrag": ufrag,
            "pwd": pwd,
            "candidates": self.media_candidates,
        }

    async def accept_call(self, session_id: str, sdp: str, profile: str) -> None:
        """Call has been accepted, connection can be established

        @param session_id: Session identifier
        @param sdp: Session Description Protocol data
        @param profile: Profile associated
        """
        await self._peer_connection.setRemoteDescription({"type": "answer", "sdp": sdp})
        await self.on_ice_candidates_new(self.remote_candidates_buffer)
        self.remote_candidates_buffer.clear()

    def _on_ice_candidates_new(self, sid: str, candidates_s: str, profile: str) -> None:
        """Called when new ICE candidates are received

        @param sid: Session identifier
        @param candidates_s: ICE candidates serialized
        @param profile: Profile associated with the action
        """
        if sid != self.sid:
            log.debug(f"ignoring peer ice candidates for {sid=} ({self.sid=}).")
            return
        candidates = json.loads(candidates_s)
        aio.run(self.on_ice_candidates_new(candidates))

    def _on_ice_restart(self, sid: str, side: str, profile: str):
        if sid != self.sid:
            log.debug(f"ignoring peer ice candidates for {sid=} ({self.sid=}).")
            return
        log.debug("ICE has been restarted")
        if self.on_reconnect_cb is not None:
            self.on_reconnect_cb()

    async def on_ice_candidates_new(self, candidates: dict) -> None:
        """Called when new ICE canidates are received from peer

        @param candidates: Dictionary containing new ICE candidates
        """
        log.debug(f"new peer candidates received: {candidates}")

        # try:
        #     # FIXME: javascript.NULL must be used here, once we move to Brython 3.12.3+
        #     remoteDescription_is_none = self._peer_connection.remoteDescription is None
        # except Exception as e:
        #     # FIXME: should be fine in Brython 3.12.3+
        #     log.debug("Workaround for Brython bug activated.")
        #     remoteDescription_is_none = True

        if (
            self._peer_connection is None
            or self._peer_connection.remoteDescription is NULL
        ):
            for media_type in ("audio", "video", "application"):
                media_candidates = candidates.get(media_type)
                if media_candidates:
                    buffer = self.remote_candidates_buffer[media_type]
                    buffer["candidates"].extend(media_candidates["candidates"])
            return
        for media_type, ice_data in candidates.items():
            for candidate in ice_data["candidates"]:
                candidate_sdp = self.build_ice_candidate(candidate)
                try:
                    sdp_mline_index = self.get_sdp_mline_index(media_type)
                except Exception as e:
                    log.warning(f"Can't get sdp_mline_index: {e}")
                    continue
                else:
                    ice_candidate = window.RTCIceCandidate.new(
                        {"candidate": candidate_sdp, "sdpMLineIndex": sdp_mline_index}
                    )
                    await self._peer_connection.addIceCandidate(ice_candidate)

    def on_track(self, event):
        """New track has been received from peer

        @param event: Event associated with the new track
        """
        if event.streams and event.streams[0]:
            remote_stream = event.streams[0]
            self.remote_video_elt.srcObject = remote_stream
        else:
            if self.remote_stream is None:
                self.remote_stream = window.MediaStream.new()
                self.remote_video_elt.srcObject = self.remote_stream
            self.remote_stream.addTrack(event.track)

    def on_negotiation_needed(self, event) -> None:
        log.debug(f"on_negotiation_needed {event=}")
        # TODO

    def _on_data_channel(self, event) -> None:
        """Handles the data channel event from the peer connection.

        @param event: The event associated with the opening of a data channel.
        """
        data_channel = event.channel
        self.file_receiver = FileReceiver(self.sid, data_channel, self.extra_data)

    async def answer_call(self, sid: str, offer_sdp: str, profile: str):
        """We respond to the call"""
        log.debug("answering call")
        if sid != self.sid:
            raise Exception(f"Internal Error: unexpected sid: {sid=} {self.sid=}")
        await self._create_peer_connection()

        await self._peer_connection.setRemoteDescription(
            {"type": "offer", "sdp": offer_sdp}
        )
        await self.on_ice_candidates_new(self.remote_candidates_buffer)
        self.remote_candidates_buffer.clear()
        if self.file_only:
            self._peer_connection.bind("datachannel", self._on_data_channel)
        else:
            await self._get_user_media()

        # Gather local ICE candidates
        local_ice_data = await self._gather_ice_candidates(False)
        self.local_candidates = local_ice_data["candidates"]

        await bridge.call_answer_sdp(sid, self._peer_connection.localDescription.sdp)

    async def _get_call_data(self) -> dict:
        """Start a WebRTC call"""
        await self._gather_ice_candidates(True)

        return {"sdp": self._peer_connection.localDescription.sdp}

    async def _send_buffered_local_candidates(self) -> None:
        if self.local_candidates_buffer:
            log.debug(
                f"sending buffered local ICE candidates: {self.local_candidates_buffer}"
            )
            assert self.pwd is not None
            ice_data = {}
            for media_type, candidates in self.local_candidates_buffer.items():
                ice_data[media_type] = {
                    "ufrag": self.ufrag,
                    "pwd": self.pwd,
                    "candidates": candidates,
                }
            await bridge.ice_candidates_add(
                self.sid,
                json.dumps(ice_data),
            )
            self.local_candidates_buffer.clear()

    async def prepare_call(
        self,
        audio: bool = True,
        video: bool = True
    ) -> dict:
        """Prepare a call.

        Create RTCPeerConnection instance, and get use media.

        @param audio: True if an audio flux is required
        @param video: True if a video flux is required
        @return: Call Data
        """
        await self._create_peer_connection()
        await self._get_user_media(audio, video)
        return await self._get_call_data()

    async def make_call(
        self, callee_jid: jid.JID, audio: bool = True, video: bool = True
    ) -> None:
        """
        @param audio: True if an audio flux is required
        @param video: True if a video flux is required
        """
        call_data = await self.prepare_call(audio, video)
        log.info(f"calling {callee_jid!r}")
        self.sid = await bridge.call_start(str(callee_jid), json.dumps(call_data))
        log.debug(f"Call SID: {self.sid}")
        await self._send_buffered_local_candidates()

    async def start_remote_control(
        self, callee_jid: jid.JID, audio: bool = True, video: bool = True
    ) -> None:
        """Starts a Remote Control session.

        If both audio and video are False, no screenshare will be done, the input will be
        sent without feedback.
        @param audio: True if an audio flux is required
        @param video: True if a video flux is required
        """
        if audio or not video:
            raise NotImplementedError("Only video screenshare is supported for now.")
        peer_connection = await self._create_peer_connection()
        if video:
            peer_connection.addTransceiver("video", {"direction": "recvonly"})
        data_channel = peer_connection.createDataChannel("remote-control")

        call_data = await self._get_call_data()

        try:
            remote_control_data = json.loads(
                await bridge.remote_control_start(
                    str(callee_jid),
                    json.dumps(
                        {
                            "devices": {
                                "keyboard": {},
                                "mouse": {},
                                "wheel": {}
                            },
                            "call_data": call_data,
                        }
                    ),
                )
            )
        except Exception as e:
            dialog.notification.show(f"Can't start remote control: {e}", level="error")
            return

        self.sid = remote_control_data["session_id"]

        log.debug(f"Remote Control SID: {self.sid}")
        await self._send_buffered_local_candidates()
        self.remote_controller = RemoteControler(
            self.sid, self.remote_video_elt, data_channel
        )

    def _on_opened_data_channel(self, event):
        log.info("Datachannel has been opened.")

    async def send_file(self, callee_jid: jid.JID, file: JSObject) -> None:
        assert self.file_only
        peer_connection = await self._create_peer_connection()
        data_channel = peer_connection.createDataChannel("file")
        call_data = await self._get_call_data()
        log.info(f"sending file to {callee_jid!r}")
        file_meta = {"size": file.size}
        if file.type:
            file_meta["media_type"] = file.type

        try:
            file_data = json.loads(
                await bridge.file_jingle_send(
                    str(callee_jid),
                    "",
                    file.name,
                    "",
                    json.dumps({"webrtc": True, "call_data": call_data, **file_meta}),
                )
            )
        except Exception as e:
            dialog.notification.show(f"Can't send file: {e}", level="error")
            return

        self.sid = file_data["session_id"]

        log.debug(f"File Transfer SID: {self.sid}")
        await self._send_buffered_local_candidates()
        self.file_sender = FileSender(self.sid, file, data_channel)

    async def end_call(self) -> None:
        """Stop streaming and clean instance"""
        if self._peer_connection is None:
            log.debug("There is currently no call to end.")
        else:
            self._peer_connection.removeEventListener("track", self.on_track)
            self._peer_connection.removeEventListener(
                "negotiationneeded", self.on_negotiation_needed
            )
            self._peer_connection.removeEventListener(
                "icecandidate", self.on_ice_candidate
            )
            self._peer_connection.removeEventListener(
                "icegatheringstatechange", self.on_ice_gathering_state_change
            )

            # Base64 encoded 1x1 black pixel image
            # this is a trick to reset the image displayed, so we don't see last image of
            # last stream
            black_image_data = (
                "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0"
                "lEQVR42mP8/wcAAwAB/uzNq7sAAAAASUVORK5CYII="
            )

            remote_video = self.remote_video_elt
            local_video = self.local_video_elt
            if local_video is not None:
                if local_video.srcObject:
                    for track in local_video.srcObject.getTracks():
                        track.stop()
                local_video.src = black_image_data

            if remote_video.srcObject:
                for track in remote_video.srcObject.getTracks():
                    track.stop()
            remote_video.src = black_image_data

            self._peer_connection.close()
        self.reset_instance()

    def toggle_media_mute(self, media_type: str) -> bool:
        """Toggle mute/unmute for media tracks.

        @param media_type: "audio" or "video". Determines which media tracks
            to process.
        """
        assert media_type in ("audio", "video"), "Invalid media type"

        local_video = self.local_video_elt
        is_muted_attr = f"is_{media_type}_muted"

        if local_video is not None and local_video.srcObject:
            track_getter = getattr(
                local_video.srcObject, f"get{media_type.capitalize()}Tracks"
            )
            for track in track_getter():
                track.enabled = not track.enabled
                setattr(self, is_muted_attr, not track.enabled)

        media_name = self.media_types_inv.get(media_type)
        if media_name is not None:
            extra = {"name": str(media_name)}
            aio.run(
                bridge.call_info(
                    self.sid,
                    "mute" if getattr(self, is_muted_attr) else "unmute",
                    json.dumps(extra),
                )
            )

        return getattr(self, is_muted_attr)

    def toggle_audio_mute(self) -> bool:
        """Toggle mute/unmute for audio tracks."""
        return self.toggle_media_mute("audio")

    def toggle_video_mute(self) -> bool:
        """Toggle mute/unmute for video tracks."""
        return self.toggle_media_mute("video")

    async def toggle_screen_sharing(self):
        log.debug(f"toggle_screen_sharing {self._is_sharing_screen=}")

        if self._is_sharing_screen:
            await self._replace_user_video(screen=False)
        else:
            await self._replace_user_video(screen=True)