diff libervia/server/websockets.py @ 1504:409d10211b20

server, browser: dynamic pages refactoring: dynamic pages has been reworked, to change the initial basic implementation. Pages are now dynamic by default, and a websocket is established by the first connected page of a session. The socket is used to transmit bridge signals, and then the signal is broadcasted to other tabs using broadcast channel. If the connecting tab is closed, an other one is chosen. Some tests are made to retry connecting in case of problem, and sometimes reload the pages (e.g. if profile is connected). Signals (or other data) are cached during reconnection phase, to avoid lost of data. All previous partial rendering mechanism have been removed, chat page is temporarily not working anymore, but will be eventually redone (one of the goal of this work is to have proper chat).
author Goffi <goffi@goffi.org>
date Wed, 01 Mar 2023 18:02:44 +0100
parents 822bd0139769
children ce879da7fcf7
line wrap: on
line diff
--- a/libervia/server/websockets.py	Wed Mar 01 17:55:25 2023 +0100
+++ b/libervia/server/websockets.py	Wed Mar 01 18:02:44 2023 +0100
@@ -1,6 +1,5 @@
 #!/usr/bin/env python3
 
-
 # Libervia: a Salut à Toi frontend
 # Copyright (C) 2011-2021 Jérôme Poisson <goffi@goffi.org>
 
@@ -19,7 +18,8 @@
 
 
 import json
-from twisted.internet import error
+from typing import Optional
+
 from autobahn.twisted import websocket
 from autobahn.twisted import resource as resource
 from autobahn.websocket import types
@@ -27,168 +27,195 @@
 from sat.core.i18n import _
 from sat.core.log import getLogger
 
+from . import session_iface
+from .constants import Const as C
+
 log = getLogger(__name__)
 
-LIBERVIA_PROTOCOL = "libervia_page"
-
-
-class WebsocketRequest(object):
-    """Wrapper around autobahn's ConnectionRequest and Twisted's server.Request
-
-    This is used to have a common interface in Libervia page with request object
-    """
-
-    def __init__(self, ws_protocol, connection_request, server_request):
-        """
-        @param connection_request: websocket request
-        @param serveur_request: original request of the page
-        """
-        self.ws_protocol = ws_protocol
-        self.ws_request = connection_request
-        if self.isSecure():
-            cookie_name = "TWISTED_SECURE_SESSION"
-        else:
-            cookie_name = "TWISTED_SESSION"
-        cookie_value = server_request.getCookie(cookie_name.encode('utf-8'))
-        try:
-            raw_cookies = ws_protocol.http_headers['cookie']
-        except KeyError:
-            raise ValueError("missing expected cookie header")
-        self.cookies = {k:v for k,v in (c.split('=') for c in raw_cookies.split(';'))}
-        if self.cookies[cookie_name] != cookie_value.decode('utf-8'):
-            raise exceptions.PermissionError(
-                "Bad cookie value, this should never happen.\n"
-                "headers: {headers}".format(headers=ws_protocol.http_headers))
-
-        self.template_data = server_request.template_data
-        self.data = server_request.data
-        self.session = server_request.getSession()
-        self._signals_registered = server_request._signals_registered
-        self._signals_cache = server_request._signals_cache
-        # signal id is needed to link original request with signal handler
-        self.signal_id = server_request._signal_id
-
-    def isSecure(self):
-        return self.ws_protocol.factory.isSecure
-
-    def getSession(self, sessionInterface=None):
-        try:
-            self.session.touch()
-        except (error.AlreadyCalled, error.AlreadyCancelled):
-            # Session has already expired.
-            self.session = None
-
-        if sessionInterface:
-            return self.session.getComponent(sessionInterface)
-
-        return self.session
-
-    def sendData(self, type_, **data):
-        assert "type" not in data
-        data["type"] = type_
-        self.ws_protocol.sendMessage(json.dumps(data, ensure_ascii=False).encode("utf8"))
+host = None
 
 
 class LiberviaPageWSProtocol(websocket.WebSocketServerProtocol):
-    host = None
-    tokens_map = {}
+
+    def __init__(self):
+        super().__init__()
+        self._init_ok: bool = False
+        self.__profile: Optional[str] = None
+        self.__session: Optional[session_iface.SATSession] = None
+
+    @property
+    def init_ok(self):
+        return self._init_ok
+
+    def send(self, data_type: str, data: dict) -> None:
+        """Send data to frontend"""
+        if not self._init_ok and data_type != "error":
+            raise exceptions.InternalError(
+                "send called when not initialized, this should not happend! Please use "
+                "SATSession.send which takes care of sending correctly the data to all "
+                "sessions."
+            )
+
+        data_root = {
+            "type": data_type,
+            "data": data
+        }
+        self.sendMessage(json.dumps(data_root, ensure_ascii=False).encode())
+
+    def close(self) -> None:
+        log.debug(f"closing websocket for profile {self.__profile}")
+
+    def error(self, error_type: str, msg: str) -> None:
+        """Send an error message to frontend and log it locally"""
+        log.warning(
+            f"websocket error {error_type}: {msg}"
+        )
+        self.send("error", {
+            "type": error_type,
+            "msg": msg,
+        })
 
     def onConnect(self, request):
-        prefix = LIBERVIA_PROTOCOL + "_"
-        for protocol in request.protocols:
-            if protocol.startswith(prefix):
-                token = protocol[len(prefix) :].strip()
-                if token:
-                    break
-        else:
+        if "libervia-page" not in request.protocols:
             raise types.ConnectionDeny(
-                types.ConnectionDeny.NOT_IMPLEMENTED, "Can't use this subprotocol"
+                types.ConnectionDeny.NOT_IMPLEMENTED, "No supported protocol"
+            )
+        self._init_ok = False
+        cookies = {}
+        for cookie in request.headers.get("cookie", "").split(";"):
+            k, __, v = cookie.partition("=")
+            cookies[k.strip()] = v.strip()
+        session_uid = (
+            cookies.get("TWISTED_SECURE_SESSION")
+            or cookies.get("TWISTED_SESSION")
+            or ""
+        )
+        if not session_uid:
+            raise types.ConnectionDeny(
+                types.ConnectionDeny.FORBIDDEN, "No session set"
+            )
+        try:
+            session = host.site.getSession(session_uid.encode())
+        except KeyError:
+            raise types.ConnectionDeny(
+                types.ConnectionDeny.FORBIDDEN, "Invalid session"
             )
 
-        if token not in self.tokens_map:
-            log.warning(_("Can't activate page socket: unknown token"))
-            raise types.ConnectionDeny(
-                types.ConnectionDeny.FORBIDDEN, "Bad token, please reload page"
+        session.touch()
+        session_data = session.getComponent(session_iface.ISATSession)
+        if session_data.ws_socket is not None:
+            log.warning("Session socket is already set, force closing it")
+            session_data.ws_socket.send(
+                "force_close", {"reason": "duplicate connection detected"}
             )
-        self.token = token
-        token_map = self.tokens_map.pop(token)
-        self.page = token_map["page"]
-        self.request = WebsocketRequest(self, request, token_map["request"])
-        return protocol
+        session_data.ws_socket = self
+        self.__session = session_data
+        self.__profile = session_data.profile or C.SERVICE_PROFILE
+        log.debug(f"websocket connection connected for profile {self.__profile}")
+        return "libervia-page"
 
     def onOpen(self):
-        log.debug(
-            _(
-                "Websocket opened for {page} (token: {token})".format(
-                    page=self.page, token=self.token
-                )
-            )
-        )
-        self.page.onSocketOpen(self.request)
+        log.debug("websocket connection opened")
 
-    def onMessage(self, payload, isBinary):
+    def onMessage(self, payload: bytes, isBinary: bool) -> None:
+        if self.__session is None:
+            raise exceptions.InternalError("empty session, this should never happen")
         try:
-            data_json = json.loads(payload.decode("utf8"))
+            data_full = json.loads(payload.decode())
+            data_type = data_full["type"]
+            data = data_full["data"]
         except ValueError as e:
-            log.warning(
-                _("Not valid JSON, ignoring data: {msg}\n{data}").format(
-                    msg=e, data=payload
-                )
+            self.error(
+                "bad_request",
+                f"Not valid JSON, ignoring data ({e}): {payload!r}"
+            )
+            return
+        except KeyError:
+            self.error(
+                "bad_request",
+                'Invalid request (missing "type" or "data")'
             )
             return
-        #  we request page first, to raise an AttributeError
-        # if it is not set (which should never happen)
-        page = self.page
-        try:
-            cb = page.on_data
-        except AttributeError:
-            log.warning(
-                _(
-                    'No "on_data" method set on dynamic page, ignoring data:\n{data}'
-                ).format(data=data_json)
+
+        if data_type == "init":
+            if self._init_ok:
+                self.error(
+                    "bad_request",
+                    "double init"
+                )
+                self.sendClose(4400, "Bad Request")
+                return
+
+            try:
+                profile = data["profile"] or C.SERVICE_PROFILE
+                token = data["token"]
+            except KeyError:
+                self.error(
+                    "bad_request",
+                    "Invalid init data (missing profile or token)"
+                )
+                self.sendClose(4400, "Bad Request")
+                return
+            if ((
+                profile != self.__profile
+                or (token != self.__session.ws_token and profile != C.SERVICE_PROFILE)
+            )):
+                log.debug(
+                    f"profile got {profile}, was expecting {self.__profile}, "
+                    f"token got {token}, was expecting {self.__session.ws_token}, "
+                )
+                self.error(
+                    "Unauthorized",
+                    "Invalid profile or token"
+                )
+                self.sendClose(4401, "Unauthorized")
+                return
+            else:
+                log.debug(f"websocket connection initialized for {profile}")
+                self._init_ok = True
+                # we now send all cached data, if any
+                while True:
+                    try:
+                        session_kw = self.__session.ws_buffer.popleft()
+                    except IndexError:
+                        break
+                    else:
+                        self.send(**session_kw)
+
+        if not self._init_ok:
+            self.error(
+                "Unauthorized",
+                "session not authorized"
             )
-        else:
-            cb(page, self.request, data_json)
+            self.sendClose(4401, "Unauthorized")
+            return
 
     def onClose(self, wasClean, code, reason):
-        try:
-            page = self.page
-        except AttributeError:
-            log.debug(
-                "page is not available, the socket was probably not opened cleanly.\n"
-                "reason: {reason}".format(reason=reason))
-            return
-        page.onSocketClose(self.request)
+        log.debug(f"closing websocket (profile: {self.__profile}, reason: {reason})")
+        if self.__profile is None:
+            log.error("self.__profile should not be None")
+            self.__profile = C.SERVICE_PROFILE
 
-        log.debug(
-            _(
-                "Websocket closed for {page} (token: {token}). {reason}".format(
-                    page=self.page,
-                    token=self.token,
-                    reason=""
-                    if wasClean
-                    else _("Reason: {reason}").format(reason=reason),
-                )
-            )
+        if self.__session is None:
+            log.warning("closing a socket without attached session")
+        elif self.__session.ws_socket != self:
+            log.error("session socket is not linked to our instance")
+        else:
+            log.debug(f"reseting websocket session for {self.__profile}")
+            self.__session.ws_socket = None
+        sessions = session_iface.SATSession.get_profile_sessions(self.__profile)
+        log.debug(f"websocket connection for profile {self.__profile} closed")
+        self.__profile = None
+
+    @classmethod
+    def getBaseURL(cls, secure):
+        return "ws{sec}://localhost:{port}".format(
+            sec="s" if secure else "",
+            port=host.options["port_https" if secure else "port"],
         )
 
     @classmethod
-    def getBaseURL(cls, host, secure):
-        return "ws{sec}://localhost:{port}".format(
-            sec="s" if secure else "",
-            port=cls.host.options["port_https" if secure else "port"],
-        )
-
-    @classmethod
-    def getResource(cls, host, secure):
-        if cls.host is None:
-            cls.host = host
-        factory = websocket.WebSocketServerFactory(cls.getBaseURL(host, secure))
+    def getResource(cls, secure):
+        factory = websocket.WebSocketServerFactory(cls.getBaseURL(secure))
         factory.protocol = cls
         return resource.WebSocketResource(factory)
-
-    @classmethod
-    def registerToken(cls, token, page, request):
-        if token in cls.tokens_map:
-            raise exceptions.ConflictError(_("This token is already registered"))
-        cls.tokens_map[token] = {"page": page, "request": request}