view libervia/pages/_browser/bridge.py @ 1504:409d10211b20

server, browser: dynamic pages refactoring: dynamic pages has been reworked, to change the initial basic implementation. Pages are now dynamic by default, and a websocket is established by the first connected page of a session. The socket is used to transmit bridge signals, and then the signal is broadcasted to other tabs using broadcast channel. If the connecting tab is closed, an other one is chosen. Some tests are made to retry connecting in case of problem, and sometimes reload the pages (e.g. if profile is connected). Signals (or other data) are cached during reconnection phase, to avoid lost of data. All previous partial rendering mechanism have been removed, chat page is temporarily not working anymore, but will be eventually redone (one of the goal of this work is to have proper chat).
author Goffi <goffi@goffi.org>
date Wed, 01 Mar 2023 18:02:44 +0100
parents b28025a7cc28
children ce879da7fcf7
line wrap: on
line source

from browser import window, timer, console as log
import time
import random
import json
import dialog
import javascript


log.warning = log.warn
tab_id = random.randint(0, 2**64)
log.info(f"TAB ID is {tab_id}")


class WebSocket:

    def __init__(self, broadcast_channel):
        self.broadcast_channel = broadcast_channel
        self.token = window.ws_token
        self.create_socket()
        self.retrying = False
        self.network_error = False

    @property
    def profile(self):
        return self.broadcast_channel.profile

    def retry_connect(self) -> None:
        if self.retrying:
            return
        self.retrying = True
        try:
            notif = dialog.RetryNotification(self.create_socket)
            notif.show(
                "Can't connect to server",
                delay=random.randint(0, 30)
            )
        except Exception as e:
            # for security reasons, browser don't give the reason of the error with
            # WebSockets, thus we try to detect network error here, as if we can't show
            # the retry dialog, that probably means that it's not reachable
            try:
                name = e.name
            except AttributeError:
                name = None
            if name == "NetworkError":
                self.network_error = True
                log.warning("network error detected, server may be down")
                log.error(f"Can't show retry dialog: {e}")
                log.info("retrying in 30s")
                timer.set_timeout(self.create_socket, 30000)
            else:
                raise e
        else:
            # if we can show the retry dialog, the network is fine
            self.network_error = False

    def create_socket(self) -> None:
        log.debug("creating socket")
        self.retrying = False
        self.socket = window.WebSocket.new(window.ws_url, "libervia-page")
        self.socket_start = time.time()
        self.socket.bind("open", self.on_open)
        self.socket.bind("error", self.on_error)
        self.socket.bind("close", self.on_close)
        self.socket.bind("message", self.on_message)

    def send(self, data_type: str, data: dict) -> None:
        self.socket.send(json.dumps({
            "type": data_type,
            "data": data
        }))

    def close(self) -> None:
        log.debug("closing socket")
        self.broadcast_channel.ws = None
        self.socket.close()

    def on_open(self, evt) -> None:
        log.info("websocket connection opened")
        self.send("init", {"profile": self.profile, "token": self.token})

    def on_error(self, evt) -> None:
        if not self.network_error and time.time() - self.socket_start < 5:
            # disconnection is happening fast, we try to reload
            log.warning("Reloading due to suspected session error")
            window.location.reload()
        else:
            self.retry_connect()

    def on_close(self, evt) -> None:
        log.warning(f"websocket is closed {evt.code=} {evt.reason=}")
        if self.broadcast_channel.ws is None:
            # this is a close requested locally
            return
        elif evt.code == 4401:
            log.info(
                "no authorized, the session is probably not valid anymore, reloading"
            )
            window.location.reload()
        else:
            # close event may be due to normal tab closing, thus we try to reconnect only
            # after a delay
            timer.set_timeout(self.retry_connect, 5000)

    def on_message(self, message_evt):
        msg_data = json.loads(message_evt.data)
        msg_type = msg_data.get("type")
        if msg_type == "bridge":
            self.broadcast_channel.post(
                msg_type,
                msg_data["data"]
            )
        elif msg_type == "force_close":
            log.warning(f"force closing connection: {msg_data.get('reason')}")
            self.close()
        else:
            dialog.notification.show(
                f"Unexpected message type {msg_type}"
                "error"
            )


class BroadcastChannel:
    handlers = {}

    def __init__(self):
        log.debug(f"BroadcastChannel init with profile {self.profile!r}")
        self.start = time.time()
        self.bc = window.BroadcastChannel.new("libervia")
        self.bc.bind("message", self.on_message)
        # there is no way to check if there is already a connection in BroadcastChannel
        # API, thus we wait a bit to see if somebody is answering. If not, we are probably
        # the first tab.
        self.check_connection_timer = timer.set_timeout(self.establish_connection, 20)
        self.ws = None
        # set of all known tab ids
        self.tabs_ids = {tab_id}
        self.post("salut_a_vous", {
            "id": tab_id,
            "profile": self.profile
        })
        window.bind("unload", self.on_unload)

    @property
    def profile(self):
        return window.profile or ""

    @property
    def connecting_tab(self) -> bool:
        """True is this tab is the one establishing the websocket connection"""
        return self.ws is not None

    @connecting_tab.setter
    def connecting_tab(self, connecting: bool) -> None:
        if connecting:
            if self.ws is None:
                self.ws = WebSocket(self)
                self.post("connection", {
                    "tab_id": tab_id
                })
        elif self.ws is not None:
            self.ws.close()

    def establish_connection(self) -> None:
        """Called when there is no existing connection"""
        timer.clear_timeout(self.check_connection_timer)
        log.debug(f"Establishing connection {tab_id=}")
        self.connecting_tab = True

    def handle_bridge_signal(self, data: dict) -> None:
        """Forward bridge signals to registered handlers"""
        signal = data["signal"]
        handlers = self.handlers.get(signal, [])
        for handler in handlers:
            handler(*data["args"])

    def on_message(self, evt) -> None:
        data = json.loads(evt.data)
        if data["type"] == "bridge":
            self.handle_bridge_signal(data)
        elif data["type"] == "salut_a_toi":
            # this is a response from existing tabs
            other_tab_id = data["id"]
            if other_tab_id == tab_id:
                # in the unlikely case that this happens, we simply reload this tab to get
                # a new ID
                log.warning("duplicate tab id, we reload the page: {tab_id=}")
                window.location.reload()
                return
            self.tabs_ids.add(other_tab_id)
            if data["connecting_tab"] and self.check_connection_timer is not None:
                # this tab has the websocket connection to server
                log.info(f"there is already a connection to server at tab {other_tab_id}")
                timer.clear_timeout(self.check_connection_timer)
                self.check_connection_timer = None
        elif data["type"] == "salut_a_vous":
            # a new tab has just been created
            if data["profile"] != self.profile:
                log.info(
                    f"we are now connected with the profile {data['profile']}, "
                    "reloading the page"
                )
                window.location.reload()
            else:
                self.tabs_ids.add(data["id"])
                self.post("salut_a_toi", {
                    "id": tab_id,
                    "connecting_tab": self.connecting_tab
                })
        elif data["type"] == "connection":
            log.info(f"tab {data['id']} is the new connecting tab")
        elif data["type"] == "salut_a_rantanplan":
            # a tab is being closed
            other_tab_id = data["id"]
            # it is unlikely that there is a collision, but just in case we check it
            if other_tab_id != tab_id:
                self.tabs_ids.discard(other_tab_id)
            if data["connecting_tab"]:
                log.info(f"connecting tab with id {other_tab_id} has been closed")
                if max(self.tabs_ids) == tab_id:
                    log.info("this is the new connecting tab, establish_connection")
                    self.connecting_tab = True
            else:
                log.info(f"tab with id {other_tab_id} has been closed")
        else:
            log.warning(f"unknown message type: {data}")

    def post(self, data_type, data: dict):
        data["type"] = data_type
        data["id"] = tab_id
        self.bc.postMessage(json.dumps(data))
        if data_type == "bridge":
            self.handle_bridge_signal(data)

    def on_unload(self, evt) -> None:
        """Send a message to indicate that the tab is being closed"""
        self.post("salut_a_rantanplan", {
            "id": tab_id,
            "connecting_tab": self.connecting_tab
        })


class Bridge:
    bc: BroadcastChannel | None = None

    def __init__(self) -> None:
        if Bridge.bc is None:
            Bridge.bc = BroadcastChannel()

    def __getattr__(self, attr):
        return lambda *args, **kwargs: self.call(attr, *args, **kwargs)

    def on_load(self, xhr, ev, callback, errback):
        if xhr.status == 200:
            ret = javascript.JSON.parse(xhr.response)
            if callback is not None:
                if ret is None:
                    callback()
                else:
                    callback(ret)
        elif xhr.status == 502:
            # PROXY_ERROR is used for bridge error
            ret = javascript.JSON.parse(xhr.response)
            if errback is not None:
                errback(ret)
        else:
            log.error(
                f"bridge called failed: code: {xhr.response}, text: {xhr.statusText}"
            )
            if errback is not None:
                errback({"fullname": "BridgeInternalError", "message": xhr.statusText})

    def call(self, method_name, *args, callback, errback, **kwargs):
        xhr = window.XMLHttpRequest.new()
        xhr.bind('load', lambda ev: self.on_load(xhr, ev, callback, errback))
        xhr.bind('error', lambda ev: errback(
            {"fullname": "ConnectionError", "message": xhr.statusText}))
        xhr.open("POST", f"/_bridge/{method_name}", True)
        data = javascript.JSON.stringify({
            "args": args,
            "kwargs": kwargs,
        })
        xhr.setRequestHeader('X-Csrf-Token', window.csrf_token)
        xhr.send(data)

    def register_signal(self, signal: str, handler, iface=None) -> None:
        BroadcastChannel.handlers.setdefault(signal, []).append(handler)
        log.debug(f"signal {signal} has been registered")