view libervia/web/pages/_browser/bridge.py @ 1616:6bfeb9f0fb84

browser (calls): conferences implementation: - Handle A/V conferences calls creation/joining by entering a conference room JID in the search box. - Group call box has been improved and is used both for group calls (small number of participants) and A/V conferences (larger number of participants). - Fullscreen button for group call is working. - Avatar/user nickname are shown in group call on peer user, as an overlay on video stream. - Use `user` metadata when present to display the right user avatar/name when receiving a stream from SFU (i.e. A/V conference). - Peer user have a new 3 dots menu with a `pin` item to (un)pin it (i.e. display it on full container with on top). - Updated webrtc to handle unidirectional streams correctly and to adapt to A/V conference specification. rel 448
author Goffi <goffi@goffi.org>
date Wed, 07 Aug 2024 00:01:57 +0200
parents 5db55d01ce05
children
line wrap: on
line source

from browser import window, aio, timer, console as log
import time
import random
import json
import dialog
import javascript


log.warning = log.warn
tab_id = random.randint(0, 2**64)
log.info(f"TAB ID is {tab_id}")


class BridgeException(Exception):
    """An exception which has been raised from the backend and arrived to the frontend."""

    def __init__(self, name, message="", condition=""):
        """

        @param name (str): full exception class name (with module)
        @param message (str): error message
        @param condition (str) : error condition
        """
        Exception.__init__(self)
        self.fullname = str(name)
        self.message = str(message)
        self.condition = str(condition) if condition else ""
        self.module, __, self.classname = str(self.fullname).rpartition(".")

    def __str__(self):
        return f"{self.classname}: {self.message or ''}"

    def __eq__(self, other):
        return self.classname == other


class WebSocket:

    def __init__(self, broadcast_channel):
        self.broadcast_channel = broadcast_channel
        self.token = window.ws_token
        self.socket = None
        self.create_socket()
        self.retrying = False
        self.network_error = False
        self._ready_fut = aio.Future()

    @property
    def profile(self):
        return self.broadcast_channel.profile

    @property
    def is_ready(self):
        return False if self.socket is None else self.socket.readyState == "OPEN"

    @property
    def connection_ready_fut(self):
        """Future resolved when connection is ready"""
        return self._ready_fut

    def retry_connect(self) -> None:
        if self.retrying:
            return
        self.retrying = True
        try:
            notif = dialog.RetryNotification(self.create_socket)
            notif.show(
                "Can't connect to server",
                delay=random.randint(0, 30)
            )
        except Exception as e:
            # for security reasons, browser don't give the reason of the error with
            # WebSockets, thus we try to detect network error here, as if we can't show
            # the retry dialog, that probably means that it's not reachable
            try:
                name = e.name
            except AttributeError:
                name = None
            if name == "NetworkError":
                self.network_error = True
                log.warning("network error detected, server may be down")
                log.error(f"Can't show retry dialog: {e}")
                log.info("retrying in 30s")
                timer.set_timeout(self.create_socket, 30000)
            else:
                raise e
        else:
            # if we can show the retry dialog, the network is fine
            self.network_error = False

    def create_socket(self) -> None:
        log.debug("creating socket")
        self.retrying = False
        self.socket = window.WebSocket.new(window.ws_url, "libervia-page")
        self.socket_start = time.time()
        self.socket.bind("open", self.on_open)
        self.socket.bind("error", self.on_error)
        self.socket.bind("close", self.on_close)
        self.socket.bind("message", self.on_message)

    def send(self, data_type: str, data: dict) -> None:
        self.socket.send(json.dumps({
            "type": data_type,
            "data": data
        }, ensure_ascii=False))

    def close(self) -> None:
        log.debug("closing socket")
        self.broadcast_channel.ws = None
        self.socket.close()

    def on_open(self, evt) -> None:
        log.info("websocket connection opened")
        self._ready_fut.set_result(None)
        self.send("init", {"profile": self.profile, "token": self.token})

    def on_error(self, evt) -> None:
        if not self.network_error and time.time() - self.socket_start < 5:
            # disconnection is happening fast, we try to reload
            log.warning("Reloading due to suspected session error")
            window.location.reload()
        else:
            self.retry_connect()

    def on_close(self, evt) -> None:
        log.warning(f"websocket is closed {evt.code=} {evt.reason=}")
        if self.broadcast_channel.ws is None:
            # this is a close requested locally
            return
        elif evt.code == 4401:
            log.info(
                "no authorized, the session is probably not valid anymore, reloading"
            )
            window.location.reload()
        else:
            # close event may be due to normal tab closing, thus we try to reconnect only
            # after a delay
            timer.set_timeout(self.retry_connect, 5000)

    def on_message(self, message_evt):
        msg_data = json.loads(message_evt.data)
        msg_type = msg_data.get("type")
        if msg_type == "bridge":
            log.debug(
                f"==> bridge message: {msg_data=}"
            )
            self.broadcast_channel.post(
                msg_type,
                msg_data["data"]
            )
        elif msg_type == "force_close":
            log.warning(f"force closing connection: {msg_data.get('reason')}")
            self.close()
        else:
            dialog.notification.show(
                f"Unexpected message type {msg_type}"
                "error"
            )


class BroadcastChannel:
    handlers = {}

    def __init__(self):
        log.debug(f"BroadcastChannel init with profile {self.profile!r}")
        self.ws = None
        self._connected = False
        self.start = time.time()
        self.bc = window.BroadcastChannel.new("libervia")
        self.bc.bind("message", self.on_message)
        # there is no way to check if there is already a connection in BroadcastChannel
        # API, thus we wait a bit to see if somebody is answering. If not, we are probably
        # the first tab.
        self.check_connection_timer = timer.set_timeout(self.establish_connection, 20)
        # set of all known tab ids
        self.tabs_ids = {tab_id}
        self.post("salut_a_vous", {
            "profile": self.profile
        })
        window.bind("unload", self.on_unload)
        self._wait_connection_fut = aio.Future()

    @property
    def profile(self):
        return window.profile or ""

    @property
    def connecting_tab(self) -> bool:
        """True is this tab is the one establishing the websocket connection"""
        return self.ws is not None

    async def _wait_for_ws(self):
        assert self.ws is not None
        await self.ws.connection_ready_fut
        self._wait_connection_fut.set_result(None)
        self._connected = True

    @connecting_tab.setter
    def connecting_tab(self, connecting: bool) -> None:
        if connecting:
            if self.ws is None:
                self.ws = WebSocket(self)
                self.post("connection", {})
                aio.run(self._wait_for_ws())

        elif self.ws is not None:
            self.ws.close()

    @property
    def connected(self):
        return self._connected

    async def wait_for_connection(self):
        if self._connected:
            return
        await self._wait_connection_fut

    def establish_connection(self) -> None:
        """Called when there is no existing connection"""
        timer.clear_timeout(self.check_connection_timer)
        log.debug(f"Establishing connection {tab_id=}")
        self.connecting_tab = True

    def handle_bridge_signal(self, data: dict) -> None:
        """Forward bridge signals to registered handlers"""
        signal = data["signal"]
        handlers = self.handlers.get(signal, [])
        for handler in handlers:
            handler(*data["args"])

    def on_message(self, evt) -> None:
        data = json.loads(evt.data)
        # FIXME: we convert back to int, see FIXME in [post] for details
        data["id"] = int(data["id"])
        if data["type"] == "bridge":
            self.handle_bridge_signal(data)
        elif data["type"] == "salut_a_toi":
            # this is a response from existing tabs
            other_tab_id = data["id"]
            if other_tab_id == tab_id:
                # in the unlikely case that this happens, we simply reload this tab to get
                # a new ID
                log.warning("duplicate tab id, we reload the page: {tab_id=}")
                window.location.reload()
                return
            self.tabs_ids.add(other_tab_id)
            if data["connecting_tab"] and self.check_connection_timer is not None:
                # this tab has the websocket connection to server
                log.info(f"there is already a connection to server at tab {other_tab_id}")
                timer.clear_timeout(self.check_connection_timer)
                self.check_connection_timer = None
                self._wait_connection_fut.set_result(None)
                self._connected = True
        elif data["type"] == "salut_a_vous":
            # a new tab has just been created
            if data["profile"] != self.profile:
                log.info(
                    f"we are now connected with the profile {data['profile']}, "
                    "reloading the page"
                )
                window.location.reload()
            else:
                self.tabs_ids.add(data["id"])
                self.post("salut_a_toi", {
                    "id": tab_id,
                    "connecting_tab": self.connecting_tab
                })
        elif data["type"] == "connection":
            log.info(f"tab {data['id']} is the new connecting tab")
        elif data["type"] == "salut_a_rantanplan":
            # a tab is being closed
            other_tab_id = data["id"]
            # it is unlikely that there is a collision, but just in case we check it
            if other_tab_id != tab_id:
                self.tabs_ids.discard(other_tab_id)
            if data["connecting_tab"]:
                log.info(f"connecting tab with id {other_tab_id} has been closed")
                if max(self.tabs_ids) == tab_id:
                    log.info("this is the new connecting tab, establish_connection")
                    self.connecting_tab = True
            else:
                log.info(f"tab with id {other_tab_id} has been closed")
        else:
            log.warning(f"unknown message type: {data}")

    def post(self, data_type, data: dict):
        data["type"] = str(data_type)
        # FIXME: for some reason, JSON.stringify fail when a random.randint is used with
        #   Brython 3.11 . See https://github.com/brython-dev/brython/issues/2332,
        #   workaround may be removed once fixed version is used.
        data["id"] = str(tab_id)
        # FIXME: json.dumps doesn't support "ensure_ascii=False" and fails to correctly
        #   dump emoji. See https://github.com/brython-dev/brython/issues/2331, workaround
        #   may be removed once fixed version is used.
        dumped = javascript.JSON.stringify(data)
        self.bc.postMessage(dumped)
        if data_type == "bridge":
            self.handle_bridge_signal(data)

    def on_unload(self, evt) -> None:
        """Send a message to indicate that the tab is being closed"""
        self.post("salut_a_rantanplan", {
            "id": tab_id,
            "connecting_tab": self.connecting_tab
        })


class Bridge:
    bc: BroadcastChannel | None = None

    def __init__(self) -> None:
        if Bridge.bc is None:
            Bridge.bc = BroadcastChannel()

    def __getattr__(self, attr):
        return lambda *args, **kwargs: self.call(attr, *args, **kwargs)

    def on_load(self, xhr, ev, callback, errback):
        if xhr.status == 200:
            ret = javascript.JSON.parse(xhr.response)
            if callback is not None:
                if ret is None:
                    callback()
                else:
                    callback(ret)
        elif xhr.status == 502:
            # PROXY_ERROR is used for bridge error
            ret = javascript.JSON.parse(xhr.response)
            if errback is not None:
                errback(ret)
        else:
            log.error(
                f"bridge call failed: code: {xhr.response}, text: {xhr.statusText}"
            )
            if errback is not None:
                errback({"fullname": "BridgeInternalError", "message": xhr.statusText})

    def call(self, method_name, *args, callback, errback, **kwargs):
        xhr = window.XMLHttpRequest.new()
        xhr.bind('load', lambda ev: self.on_load(xhr, ev, callback, errback))
        xhr.bind('error', lambda ev: errback(
            {"fullname": "ConnectionError", "message": xhr.statusText}))
        xhr.open("POST", f"/_bridge/{method_name}", True)
        data = javascript.JSON.stringify({
            "args": args,
            "kwargs": kwargs,
        })
        xhr.setRequestHeader('X-Csrf-Token', window.csrf_token)
        xhr.send(data)

    def register_signal(self, signal: str, handler, iface=None) -> None:
        BroadcastChannel.handlers.setdefault(signal, []).append(handler)
        log.debug(f"signal {signal} has been registered")


class AsyncBridge:

    def __getattr__(self, attr):
        return lambda *args, **kwargs: self.call(attr, *args, **kwargs)

    async def call(self, method_name, *args, **kwargs):
        print(f"calling {method_name}")
        data = javascript.JSON.stringify({
            "args": args,
            "kwargs": kwargs,
        })
        url = f"/_bridge/{method_name}"
        r = await aio.post(
            url,
            headers={
                'X-Csrf-Token': window.csrf_token,
            },
            data=data,
        )

        if r.status == 200:
            return javascript.JSON.parse(r.data)
        elif r.status == 502:
            ret = javascript.JSON.parse(r.data)
            raise BridgeException(ret['fullname'], ret['message'], ret['condition'])
        else:
            print(f"bridge called failed: code: {r.status}, text: {r.statusText}")
            raise BridgeException("InternalError", r.statusText)

    def register_signal(self, signal: str, handler, iface=None) -> None:
        BroadcastChannel.handlers.setdefault(signal, []).append(handler)
        log.debug(f"signal {signal} has been registered")