view libervia/pages/_browser/bridge.py @ 1506:ce879da7fcf7

server: fix `on_signal` callback
author Goffi <goffi@goffi.org>
date Thu, 23 Mar 2023 17:50:54 +0100
parents 409d10211b20
children 106bae41f5c8
line wrap: on
line source

from browser import window, timer, console as log
import time
import random
import json
import dialog
import javascript


log.warning = log.warn
tab_id = random.randint(0, 2**64)
log.info(f"TAB ID is {tab_id}")


class WebSocket:

    def __init__(self, broadcast_channel):
        self.broadcast_channel = broadcast_channel
        self.token = window.ws_token
        self.create_socket()
        self.retrying = False
        self.network_error = False

    @property
    def profile(self):
        return self.broadcast_channel.profile

    def retry_connect(self) -> None:
        if self.retrying:
            return
        self.retrying = True
        try:
            notif = dialog.RetryNotification(self.create_socket)
            notif.show(
                "Can't connect to server",
                delay=random.randint(0, 30)
            )
        except Exception as e:
            # for security reasons, browser don't give the reason of the error with
            # WebSockets, thus we try to detect network error here, as if we can't show
            # the retry dialog, that probably means that it's not reachable
            try:
                name = e.name
            except AttributeError:
                name = None
            if name == "NetworkError":
                self.network_error = True
                log.warning("network error detected, server may be down")
                log.error(f"Can't show retry dialog: {e}")
                log.info("retrying in 30s")
                timer.set_timeout(self.create_socket, 30000)
            else:
                raise e
        else:
            # if we can show the retry dialog, the network is fine
            self.network_error = False

    def create_socket(self) -> None:
        log.debug("creating socket")
        self.retrying = False
        self.socket = window.WebSocket.new(window.ws_url, "libervia-page")
        self.socket_start = time.time()
        self.socket.bind("open", self.on_open)
        self.socket.bind("error", self.on_error)
        self.socket.bind("close", self.on_close)
        self.socket.bind("message", self.on_message)

    def send(self, data_type: str, data: dict) -> None:
        self.socket.send(json.dumps({
            "type": data_type,
            "data": data
        }))

    def close(self) -> None:
        log.debug("closing socket")
        self.broadcast_channel.ws = None
        self.socket.close()

    def on_open(self, evt) -> None:
        log.info("websocket connection opened")
        self.send("init", {"profile": self.profile, "token": self.token})

    def on_error(self, evt) -> None:
        if not self.network_error and time.time() - self.socket_start < 5:
            # disconnection is happening fast, we try to reload
            log.warning("Reloading due to suspected session error")
            window.location.reload()
        else:
            self.retry_connect()

    def on_close(self, evt) -> None:
        log.warning(f"websocket is closed {evt.code=} {evt.reason=}")
        if self.broadcast_channel.ws is None:
            # this is a close requested locally
            return
        elif evt.code == 4401:
            log.info(
                "no authorized, the session is probably not valid anymore, reloading"
            )
            window.location.reload()
        else:
            # close event may be due to normal tab closing, thus we try to reconnect only
            # after a delay
            timer.set_timeout(self.retry_connect, 5000)

    def on_message(self, message_evt):
        msg_data = json.loads(message_evt.data)
        msg_type = msg_data.get("type")
        if msg_type == "bridge":
            log.debug(
                f"==> bridge message: {msg_data=}"
            )
            self.broadcast_channel.post(
                msg_type,
                msg_data["data"]
            )
        elif msg_type == "force_close":
            log.warning(f"force closing connection: {msg_data.get('reason')}")
            self.close()
        else:
            dialog.notification.show(
                f"Unexpected message type {msg_type}"
                "error"
            )


class BroadcastChannel:
    handlers = {}

    def __init__(self):
        log.debug(f"BroadcastChannel init with profile {self.profile!r}")
        self.start = time.time()
        self.bc = window.BroadcastChannel.new("libervia")
        self.bc.bind("message", self.on_message)
        # there is no way to check if there is already a connection in BroadcastChannel
        # API, thus we wait a bit to see if somebody is answering. If not, we are probably
        # the first tab.
        self.check_connection_timer = timer.set_timeout(self.establish_connection, 20)
        self.ws = None
        # set of all known tab ids
        self.tabs_ids = {tab_id}
        self.post("salut_a_vous", {
            "id": tab_id,
            "profile": self.profile
        })
        window.bind("unload", self.on_unload)

    @property
    def profile(self):
        return window.profile or ""

    @property
    def connecting_tab(self) -> bool:
        """True is this tab is the one establishing the websocket connection"""
        return self.ws is not None

    @connecting_tab.setter
    def connecting_tab(self, connecting: bool) -> None:
        if connecting:
            if self.ws is None:
                self.ws = WebSocket(self)
                self.post("connection", {
                    "tab_id": tab_id
                })
        elif self.ws is not None:
            self.ws.close()

    def establish_connection(self) -> None:
        """Called when there is no existing connection"""
        timer.clear_timeout(self.check_connection_timer)
        log.debug(f"Establishing connection {tab_id=}")
        self.connecting_tab = True

    def handle_bridge_signal(self, data: dict) -> None:
        """Forward bridge signals to registered handlers"""
        signal = data["signal"]
        handlers = self.handlers.get(signal, [])
        for handler in handlers:
            handler(*data["args"])

    def on_message(self, evt) -> None:
        data = json.loads(evt.data)
        if data["type"] == "bridge":
            self.handle_bridge_signal(data)
        elif data["type"] == "salut_a_toi":
            # this is a response from existing tabs
            other_tab_id = data["id"]
            if other_tab_id == tab_id:
                # in the unlikely case that this happens, we simply reload this tab to get
                # a new ID
                log.warning("duplicate tab id, we reload the page: {tab_id=}")
                window.location.reload()
                return
            self.tabs_ids.add(other_tab_id)
            if data["connecting_tab"] and self.check_connection_timer is not None:
                # this tab has the websocket connection to server
                log.info(f"there is already a connection to server at tab {other_tab_id}")
                timer.clear_timeout(self.check_connection_timer)
                self.check_connection_timer = None
        elif data["type"] == "salut_a_vous":
            # a new tab has just been created
            if data["profile"] != self.profile:
                log.info(
                    f"we are now connected with the profile {data['profile']}, "
                    "reloading the page"
                )
                window.location.reload()
            else:
                self.tabs_ids.add(data["id"])
                self.post("salut_a_toi", {
                    "id": tab_id,
                    "connecting_tab": self.connecting_tab
                })
        elif data["type"] == "connection":
            log.info(f"tab {data['id']} is the new connecting tab")
        elif data["type"] == "salut_a_rantanplan":
            # a tab is being closed
            other_tab_id = data["id"]
            # it is unlikely that there is a collision, but just in case we check it
            if other_tab_id != tab_id:
                self.tabs_ids.discard(other_tab_id)
            if data["connecting_tab"]:
                log.info(f"connecting tab with id {other_tab_id} has been closed")
                if max(self.tabs_ids) == tab_id:
                    log.info("this is the new connecting tab, establish_connection")
                    self.connecting_tab = True
            else:
                log.info(f"tab with id {other_tab_id} has been closed")
        else:
            log.warning(f"unknown message type: {data}")

    def post(self, data_type, data: dict):
        data["type"] = data_type
        data["id"] = tab_id
        self.bc.postMessage(json.dumps(data))
        if data_type == "bridge":
            self.handle_bridge_signal(data)

    def on_unload(self, evt) -> None:
        """Send a message to indicate that the tab is being closed"""
        self.post("salut_a_rantanplan", {
            "id": tab_id,
            "connecting_tab": self.connecting_tab
        })


class Bridge:
    bc: BroadcastChannel | None = None

    def __init__(self) -> None:
        if Bridge.bc is None:
            Bridge.bc = BroadcastChannel()

    def __getattr__(self, attr):
        return lambda *args, **kwargs: self.call(attr, *args, **kwargs)

    def on_load(self, xhr, ev, callback, errback):
        if xhr.status == 200:
            ret = javascript.JSON.parse(xhr.response)
            if callback is not None:
                if ret is None:
                    callback()
                else:
                    callback(ret)
        elif xhr.status == 502:
            # PROXY_ERROR is used for bridge error
            ret = javascript.JSON.parse(xhr.response)
            if errback is not None:
                errback(ret)
        else:
            log.error(
                f"bridge called failed: code: {xhr.response}, text: {xhr.statusText}"
            )
            if errback is not None:
                errback({"fullname": "BridgeInternalError", "message": xhr.statusText})

    def call(self, method_name, *args, callback, errback, **kwargs):
        xhr = window.XMLHttpRequest.new()
        xhr.bind('load', lambda ev: self.on_load(xhr, ev, callback, errback))
        xhr.bind('error', lambda ev: errback(
            {"fullname": "ConnectionError", "message": xhr.statusText}))
        xhr.open("POST", f"/_bridge/{method_name}", True)
        data = javascript.JSON.stringify({
            "args": args,
            "kwargs": kwargs,
        })
        xhr.setRequestHeader('X-Csrf-Token', window.csrf_token)
        xhr.send(data)

    def register_signal(self, signal: str, handler, iface=None) -> None:
        BroadcastChannel.handlers.setdefault(signal, []).append(handler)
        log.debug(f"signal {signal} has been registered")