Mercurial > libervia-web
changeset 1504:409d10211b20
server, browser: dynamic pages refactoring:
dynamic pages has been reworked, to change the initial basic implementation.
Pages are now dynamic by default, and a websocket is established by the first connected
page of a session. The socket is used to transmit bridge signals, and then the signal is
broadcasted to other tabs using broadcast channel.
If the connecting tab is closed, an other one is chosen.
Some tests are made to retry connecting in case of problem, and sometimes reload the pages
(e.g. if profile is connected).
Signals (or other data) are cached during reconnection phase, to avoid lost of data.
All previous partial rendering mechanism have been removed, chat page is temporarily not
working anymore, but will be eventually redone (one of the goal of this work is to have
proper chat).
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 01 Mar 2023 18:02:44 +0100 |
parents | 2796e73ed50c |
children | a169cbc315f0 |
files | libervia/pages/_browser/__init__.py libervia/pages/_browser/bridge.py libervia/pages/_browser/dialog.py libervia/pages/chat/page_meta.py libervia/pages/login/page_meta.py libervia/server/pages.py libervia/server/server.py libervia/server/session_iface.py libervia/server/websockets.py |
diffstat | 9 files changed, 595 insertions(+), 376 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/pages/_browser/__init__.py Wed Mar 01 18:02:44 2023 +0100 @@ -0,0 +1,5 @@ +import bridge + + +# we create a Bridge instance to receive signals +bridge.Bridge()
--- a/libervia/pages/_browser/bridge.py Wed Mar 01 17:55:25 2023 +0100 +++ b/libervia/pages/_browser/bridge.py Wed Mar 01 18:02:44 2023 +0100 @@ -1,8 +1,251 @@ -from browser import window +from browser import window, timer, console as log +import time +import random +import json +import dialog import javascript +log.warning = log.warn +tab_id = random.randint(0, 2**64) +log.info(f"TAB ID is {tab_id}") + + +class WebSocket: + + def __init__(self, broadcast_channel): + self.broadcast_channel = broadcast_channel + self.token = window.ws_token + self.create_socket() + self.retrying = False + self.network_error = False + + @property + def profile(self): + return self.broadcast_channel.profile + + def retry_connect(self) -> None: + if self.retrying: + return + self.retrying = True + try: + notif = dialog.RetryNotification(self.create_socket) + notif.show( + "Can't connect to server", + delay=random.randint(0, 30) + ) + except Exception as e: + # for security reasons, browser don't give the reason of the error with + # WebSockets, thus we try to detect network error here, as if we can't show + # the retry dialog, that probably means that it's not reachable + try: + name = e.name + except AttributeError: + name = None + if name == "NetworkError": + self.network_error = True + log.warning("network error detected, server may be down") + log.error(f"Can't show retry dialog: {e}") + log.info("retrying in 30s") + timer.set_timeout(self.create_socket, 30000) + else: + raise e + else: + # if we can show the retry dialog, the network is fine + self.network_error = False + + def create_socket(self) -> None: + log.debug("creating socket") + self.retrying = False + self.socket = window.WebSocket.new(window.ws_url, "libervia-page") + self.socket_start = time.time() + self.socket.bind("open", self.on_open) + self.socket.bind("error", self.on_error) + self.socket.bind("close", self.on_close) + self.socket.bind("message", self.on_message) + + def send(self, data_type: str, data: dict) -> None: + self.socket.send(json.dumps({ + "type": data_type, + "data": data + })) + + def close(self) -> None: + log.debug("closing socket") + self.broadcast_channel.ws = None + self.socket.close() + + def on_open(self, evt) -> None: + log.info("websocket connection opened") + self.send("init", {"profile": self.profile, "token": self.token}) + + def on_error(self, evt) -> None: + if not self.network_error and time.time() - self.socket_start < 5: + # disconnection is happening fast, we try to reload + log.warning("Reloading due to suspected session error") + window.location.reload() + else: + self.retry_connect() + + def on_close(self, evt) -> None: + log.warning(f"websocket is closed {evt.code=} {evt.reason=}") + if self.broadcast_channel.ws is None: + # this is a close requested locally + return + elif evt.code == 4401: + log.info( + "no authorized, the session is probably not valid anymore, reloading" + ) + window.location.reload() + else: + # close event may be due to normal tab closing, thus we try to reconnect only + # after a delay + timer.set_timeout(self.retry_connect, 5000) + + def on_message(self, message_evt): + msg_data = json.loads(message_evt.data) + msg_type = msg_data.get("type") + if msg_type == "bridge": + self.broadcast_channel.post( + msg_type, + msg_data["data"] + ) + elif msg_type == "force_close": + log.warning(f"force closing connection: {msg_data.get('reason')}") + self.close() + else: + dialog.notification.show( + f"Unexpected message type {msg_type}" + "error" + ) + + +class BroadcastChannel: + handlers = {} + + def __init__(self): + log.debug(f"BroadcastChannel init with profile {self.profile!r}") + self.start = time.time() + self.bc = window.BroadcastChannel.new("libervia") + self.bc.bind("message", self.on_message) + # there is no way to check if there is already a connection in BroadcastChannel + # API, thus we wait a bit to see if somebody is answering. If not, we are probably + # the first tab. + self.check_connection_timer = timer.set_timeout(self.establish_connection, 20) + self.ws = None + # set of all known tab ids + self.tabs_ids = {tab_id} + self.post("salut_a_vous", { + "id": tab_id, + "profile": self.profile + }) + window.bind("unload", self.on_unload) + + @property + def profile(self): + return window.profile or "" + + @property + def connecting_tab(self) -> bool: + """True is this tab is the one establishing the websocket connection""" + return self.ws is not None + + @connecting_tab.setter + def connecting_tab(self, connecting: bool) -> None: + if connecting: + if self.ws is None: + self.ws = WebSocket(self) + self.post("connection", { + "tab_id": tab_id + }) + elif self.ws is not None: + self.ws.close() + + def establish_connection(self) -> None: + """Called when there is no existing connection""" + timer.clear_timeout(self.check_connection_timer) + log.debug(f"Establishing connection {tab_id=}") + self.connecting_tab = True + + def handle_bridge_signal(self, data: dict) -> None: + """Forward bridge signals to registered handlers""" + signal = data["signal"] + handlers = self.handlers.get(signal, []) + for handler in handlers: + handler(*data["args"]) + + def on_message(self, evt) -> None: + data = json.loads(evt.data) + if data["type"] == "bridge": + self.handle_bridge_signal(data) + elif data["type"] == "salut_a_toi": + # this is a response from existing tabs + other_tab_id = data["id"] + if other_tab_id == tab_id: + # in the unlikely case that this happens, we simply reload this tab to get + # a new ID + log.warning("duplicate tab id, we reload the page: {tab_id=}") + window.location.reload() + return + self.tabs_ids.add(other_tab_id) + if data["connecting_tab"] and self.check_connection_timer is not None: + # this tab has the websocket connection to server + log.info(f"there is already a connection to server at tab {other_tab_id}") + timer.clear_timeout(self.check_connection_timer) + self.check_connection_timer = None + elif data["type"] == "salut_a_vous": + # a new tab has just been created + if data["profile"] != self.profile: + log.info( + f"we are now connected with the profile {data['profile']}, " + "reloading the page" + ) + window.location.reload() + else: + self.tabs_ids.add(data["id"]) + self.post("salut_a_toi", { + "id": tab_id, + "connecting_tab": self.connecting_tab + }) + elif data["type"] == "connection": + log.info(f"tab {data['id']} is the new connecting tab") + elif data["type"] == "salut_a_rantanplan": + # a tab is being closed + other_tab_id = data["id"] + # it is unlikely that there is a collision, but just in case we check it + if other_tab_id != tab_id: + self.tabs_ids.discard(other_tab_id) + if data["connecting_tab"]: + log.info(f"connecting tab with id {other_tab_id} has been closed") + if max(self.tabs_ids) == tab_id: + log.info("this is the new connecting tab, establish_connection") + self.connecting_tab = True + else: + log.info(f"tab with id {other_tab_id} has been closed") + else: + log.warning(f"unknown message type: {data}") + + def post(self, data_type, data: dict): + data["type"] = data_type + data["id"] = tab_id + self.bc.postMessage(json.dumps(data)) + if data_type == "bridge": + self.handle_bridge_signal(data) + + def on_unload(self, evt) -> None: + """Send a message to indicate that the tab is being closed""" + self.post("salut_a_rantanplan", { + "id": tab_id, + "connecting_tab": self.connecting_tab + }) + + class Bridge: + bc: BroadcastChannel | None = None + + def __init__(self) -> None: + if Bridge.bc is None: + Bridge.bc = BroadcastChannel() def __getattr__(self, attr): return lambda *args, **kwargs: self.call(attr, *args, **kwargs) @@ -21,7 +264,9 @@ if errback is not None: errback(ret) else: - print(f"bridge called failed: code: {xhr.response}, text: {xhr.statusText}") + log.error( + f"bridge called failed: code: {xhr.response}, text: {xhr.statusText}" + ) if errback is not None: errback({"fullname": "BridgeInternalError", "message": xhr.statusText}) @@ -37,3 +282,7 @@ }) xhr.setRequestHeader('X-Csrf-Token', window.csrf_token) xhr.send(data) + + def register_signal(self, signal: str, handler, iface=None) -> None: + BroadcastChannel.handlers.setdefault(signal, []).append(handler) + log.debug(f"signal {signal} has been registered")
--- a/libervia/pages/_browser/dialog.py Wed Mar 01 17:55:25 2023 +0100 +++ b/libervia/pages/_browser/dialog.py Wed Mar 01 18:02:44 2023 +0100 @@ -1,8 +1,10 @@ """manage common dialogs""" -from browser import document, window, timer +from browser import document, window, timer, console as log from template import Template +log.warning = log.warn + class Confirm: @@ -78,4 +80,52 @@ elt.bind('click', lambda __: self.close(notif_elt)) +class RetryNotification: + def __init__(self, retry_cb): + self._tpl = Template("dialogs/retry-notification.html") + self.retry_cb = retry_cb + self.counter = 0 + self.timer = None + + def retry(self, notif_elt): + if self.timer is not None: + timer.clear_interval(self.timer) + self.timer = None + notif_elt.classList.remove('state_appended') + notif_elt.bind("transitionend", lambda __: notif_elt.remove()) + self.retry_cb() + + def update_counter(self, notif_elt): + counter = notif_elt.select_one(".retry_counter") + counter.text = str(self.counter) + self.counter -= 1 + if self.counter < 0: + self.retry(notif_elt) + + def show( + self, + message: str, + level: str = "warning", + delay: int = 5 + ) -> None: + # we log in console error messages, may be useful + if level == "error": + log.error(message) + elif level == "warning": + log.warning(message) + self.counter = delay + notif_elt = self._tpl.get_elt({ + "message": message, + "level": level, + }) + self.update_counter(notif_elt) + document["notifs_area"] <= notif_elt + timer.set_timeout(lambda: notif_elt.classList.add('state_appended'), 0) + self.timer = timer.set_interval(self.update_counter, 1000, notif_elt) + for elt in notif_elt.select('.click_to_retry'): + elt.bind('click', lambda __: self.retry(notif_elt)) + + + + notification = Notification()
--- a/libervia/pages/chat/page_meta.py Wed Mar 01 17:55:25 2023 +0100 +++ b/libervia/pages/chat/page_meta.py Wed Mar 01 18:02:44 2023 +0100 @@ -120,36 +120,3 @@ ) else: log.warning("unknown message type: {type}".format(type=data_type)) - - -@defer.inlineCallbacks -def on_signal(self, request, signal, *args): - if signal == "messageNew": - rdata = self.getRData(request) - template_data_update = {"msg": data_objects.Message((args))} - target_jid = rdata["target"] - identities = rdata["identities"] - uid, timestamp, from_jid_s, to_jid_s, message, subject, mess_type, extra_s, __ = ( - args - ) - from_jid = jid.JID(from_jid_s) - to_jid = jid.JID(to_jid_s) - if ( - target_jid.userhostJID() != from_jid.userhostJID() - and target_jid.userhostJID() != to_jid.userhostJID() - ): - # the message is not linked with page's room/user - return - - if from_jid_s not in identities: - profile = self.getProfile(request) - id_raw = yield self.host.bridgeCall( - "identityGet", from_jid_s, [], True, profile - ) - identities[from_jid_s] = data_format.deserialise(id_raw) - template_data_update["identities"] = identities - self.renderAndUpdate( - request, "chat/message.html", "#messages", template_data_update - ) - else: - log.error(_("Unexpected signal: {signal}").format(signal=signal))
--- a/libervia/pages/login/page_meta.py Wed Mar 01 17:55:25 2023 +0100 +++ b/libervia/pages/login/page_meta.py Wed Mar 01 18:02:44 2023 +0100 @@ -54,8 +54,7 @@ return C.POST_NO_CONFIRM -@defer.inlineCallbacks -def on_data_post(self, request): +async def on_data_post(self, request): profile = self.getProfile(request) type_ = self.getPostedData(request, "type") if type_ == "disconnect": @@ -64,25 +63,25 @@ self.pageError(request, C.HTTP_BAD_REQUEST) else: self.host.purgeSession(request) - defer.returnValue(C.POST_NO_CONFIRM) + return C.POST_NO_CONFIRM elif type_ == "login": login, password = self.getPostedData(request, ("login", "password")) try: - status = yield self.host.connect(request, login, password) + status = await self.host.connect(request, login, password) except exceptions.ProfileUnknownError: # the profile doesn't exist, we return the same error as for invalid password # to avoid bruteforcing valid profiles log.warning(f"login tentative with invalid profile: {login!r}") - defer.returnValue(login_error(self, request, C.PROFILE_AUTH_ERROR)) + return login_error(self, request, C.PROFILE_AUTH_ERROR) except ValueError as e: message = str(e) if message in (C.XMPP_AUTH_ERROR, C.PROFILE_AUTH_ERROR): - defer.returnValue(login_error(self, request, message)) + return login_error(self, request, message) else: # this error was not expected! raise e except exceptions.TimeOutError: - defer.returnValue(login_error(self, request, C.NO_REPLY)) + return login_error(self, request, C.NO_REPLY) else: if status in (C.PROFILE_LOGGED, C.PROFILE_LOGGED_EXT_JID, C.SESSION_ACTIVE): # Profile has been logged correctly
--- a/libervia/server/pages.py Wed Mar 01 17:55:25 2023 +0100 +++ b/libervia/server/pages.py Wed Mar 01 18:02:44 2023 +0100 @@ -112,7 +112,6 @@ class LiberviaPage(web_resource.Resource): isLeaf = True # we handle subpages ourself - signals_handlers = {} cache = {} # Set of tuples (service/node/sub_id) of nodes subscribed for caching # sub_id can be empty string if not handled by service @@ -120,9 +119,9 @@ def __init__( self, host, vhost_root, root_dir, url, name=None, label=None, redirect=None, - access=None, dynamic=False, parse_url=None, add_breadcrumb=None, + access=None, dynamic=True, parse_url=None, add_breadcrumb=None, prepare_render=None, render=None, template=None, on_data_post=None, on_data=None, - on_signal=None, url_cache=False, replace_on_conflict=False + url_cache=False, replace_on_conflict=False ): """Initiate LiberviaPage instance @@ -173,8 +172,6 @@ as a notification @param on_data(callable, None): method to call when dynamic data is sent this method is used with Libervia's websocket mechanism - @param on_signal(callable, None): method to call when a registered signal is - received. This method is used with Libervia's websocket mechanism @param url_cache(boolean): if set, result of parse_url is cached (per profile). Useful when costly calls (e.g. network) are done while parsing URL. @param replace_on_conflict(boolean): if True, don't raise ConflictError if a @@ -231,7 +228,6 @@ self.render_method = render self.on_data_post = on_data_post self.on_data = on_data - self.on_signal = on_signal self.url_cache = url_cache if access == C.PAGES_ACCESS_NONE: # none pages just return a 404, no further check is needed @@ -305,7 +301,7 @@ label=page_data.get("label"), redirect=page_data.get("redirect"), access=page_data.get("access"), - dynamic=page_data.get("dynamic", False), + dynamic=page_data.get("dynamic", True), parse_url=page_data.get("parse_url"), add_breadcrumb=page_data.get("add_breadcrumb"), prepare_render=page_data.get("prepare_render"), @@ -313,7 +309,6 @@ template=page_data.get("template"), on_data_post=page_data.get("on_data_post"), on_data=page_data.get("on_data"), - on_signal=page_data.get("on_signal"), url_cache=page_data.get("url_cache", False), replace_on_conflict=replace_on_conflict ) @@ -321,9 +316,9 @@ @staticmethod def createBrowserData( vhost_root, - resource: Optional(LiberviaPage), + resource: Optional[LiberviaPage], browser_path: Path, - path_elts: Optional(List[str]), + path_elts: Optional[List[str]], engine: str = "brython" ) -> None: """create and store data for browser dynamic code""" @@ -605,45 +600,6 @@ .format( *uri_tuple)) self.uri_callbacks[uri_tuple] = (self, get_uri_cb) - def getSignalId(self, request): - """Retrieve signal_id for a request - - signal_id is used for dynamic page, to associate a initial request with a - signal handler. For WebsocketRequest, signal_id attribute is used (which must - be orginal request's id) - For server.Request it's id(request) - """ - return getattr(request, 'signal_id', id(request)) - - def registerSignal(self, request, signal, check_profile=True): - r"""register a signal handler - - the page must be dynamic - when signal is received, self.on_signal will be called with: - - request - - signal name - - signal arguments - signal handler will be removed when connection with dynamic page will be lost - @param signal(unicode): name of the signal - last arg of signal must be profile, as it will be checked to filter signals - @param check_profile(bool): if True, signal profile (which MUST be last arg) - will be checked against session profile. - /!\ if False, profile will not be checked/filtered, be sure to know what you - are doing if you unset this option /!\ - """ - # FIXME: add a timeout; if socket is not opened before it, signal handler - # must be removed - if not self.dynamic: - log.error(_("You can't register signal if page is not dynamic")) - return - signal_id = self.getSignalId(request) - LiberviaPage.signals_handlers.setdefault(signal, {})[signal_id] = [ - self, - request, - check_profile, - ] - request._signals_registered.append(signal) - def getConfig(self, key, default=None, value_type=None): return self.host.getConfig(self.vhost_root, key=key, default=default, value_type=value_type) @@ -1213,71 +1169,6 @@ # signals, server => browser communication - @classmethod - def onSignal(cls, host, signal, *args): - """Generic method which receive registered signals - - if a callback is registered for this signal, call it - @param host: Libervia instance - @param signal(unicode): name of the signal - @param *args: args of the signals - """ - for page, request, check_profile in cls.signals_handlers.get( - signal, {} - ).values(): - if check_profile: - signal_profile = args[-1] - request_profile = page.getProfile(request) - if not request_profile: - # if you want to use signal without session, unset check_profile - # (be sure to know what you are doing) - log.error(_("no session started, signal can't be checked")) - continue - if signal_profile != request_profile: - # we ignore the signal, it's not for our profile - continue - if request._signals_cache is not None: - # socket is not yet opened, we cache the signal - request._signals_cache.append((request, signal, args)) - log.debug( - "signal [{signal}] cached: {args}".format(signal=signal, args=args) - ) - else: - page.on_signal(page, request, signal, *args) - - def onSocketOpen(self, request): - """Called for dynamic pages when socket has just been opened - - we send all cached signals - """ - assert request._signals_cache is not None - # we need to replace corresponding original requests by this websocket request - # in signals_handlers - signal_id = request.signal_id - for signal_handlers_map in self.__class__.signals_handlers.values(): - if signal_id in signal_handlers_map: - signal_handlers_map[signal_id][1] = request - - cache = request._signals_cache - request._signals_cache = None - for request, signal, args in cache: - self.on_signal(self, request, signal, *args) - - def onSocketClose(self, request): - """Called for dynamic pages when socket has just been closed - - we remove signal handler - """ - for signal in request._signals_registered: - signal_id = self.getSignalId(request) - try: - del LiberviaPage.signals_handlers[signal][signal_id] - except KeyError: - log.error(_("Can't find signal handler for [{signal}], this should not " - "happen").format(signal=signal)) - else: - log.debug(_("Removed signal handler")) - def delegateToResource(self, request, resource): """continue workflow with Twisted Resource""" buf = resource.render(request) @@ -1447,10 +1338,14 @@ raise failure.Failure(exceptions.CancelError("subpage page is used")) def _prepare_dynamic(self, request): + session_data = self.host.getSessionData(request, session_iface.ISATSession) # we need to activate dynamic page # we set data for template, and create/register token - socket_token = str(uuid.uuid4()) - socket_url = self.host.getWebsocketURL(request) + # socket_token = str(uuid.uuid4()) + socket_url = self.host.get_websocket_url(request) + # as for CSRF, it is important to not let the socket token if we use the service + # profile, as those pages can be cached, and then the token leaked. + socket_token = '' if session_data.profile is None else session_data.ws_token socket_debug = C.boolConst(self.host.debug) request.template_data["websocket"] = WebsocketMeta( socket_url, socket_token, socket_debug @@ -1459,7 +1354,6 @@ request._signals_registered = [] # we will cache registered signals until socket is opened request._signals_cache = [] - self.host.registerWSToken(socket_token, self, request) def _render_template(self, request): template_data = request.template_data @@ -1759,62 +1653,6 @@ session_data.locale = a return - def renderPartial(self, request, template, template_data): - """Render a template to be inserted in dynamic page - - this is NOT the normal page rendering method, it is used only to update - dynamic pages - @param template(unicode): path of the template to render - @param template_data(dict): template_data to use - """ - if not self.dynamic: - raise exceptions.InternalError( - _("renderPartial must only be used with dynamic pages") - ) - session_data = self.host.getSessionData(request, session_iface.ISATSession) - if session_data.locale is not None: - template_data['locale'] = session_data.locale - if self.vhost_root.site_name: - template_data['site'] = self.vhost_root.site_name - - return self.host.renderer.render( - template, - theme=session_data.theme or self.default_theme, - site_themes=self.site_themes, - page_url=self.getURL(), - media_path=f"/{C.MEDIA_DIR}", - build_path=f"/{C.BUILD_DIR}/", - cache_path=session_data.cache_dir, - main_menu=self.main_menu, - **template_data - ) - - def renderAndUpdate( - self, request, template, selectors, template_data_update, update_type="append" - ): - """Helper method to render a partial page element and update the page - - this is NOT the normal page rendering method, it is used only to update - dynamic pages - @param request(server.Request): current HTTP request - @param template: same as for [renderPartial] - @param selectors: CSS selectors to use - @param template_data_update: template data to use - template data cached in request will be copied then updated - with this data - @parap update_type(unicode): one of: - append: append rendered element to selected element - """ - template_data = request.template_data.copy() - template_data.update(template_data_update) - html = self.renderPartial(request, template, template_data) - try: - request.sendData( - "dom", selectors=selectors, update_type=update_type, html=html) - except Exception as e: - log.error("Can't renderAndUpdate, html was: {html}".format(html=html)) - raise e - async def renderPage(self, request, skip_parse_url=False): """Main method to handle the workflow of a LiberviaPage""" # template_data are the variables passed to template
--- a/libervia/server/server.py Wed Mar 01 17:55:25 2023 +0100 +++ b/libervia/server/server.py Wed Mar 01 18:02:44 2023 +0100 @@ -844,6 +844,7 @@ def __init__(self, options): self.options = options + websockets.host = self def _init(self): # we do init here and not in __init__ to avoid doule initialisation with twistd @@ -1199,10 +1200,10 @@ # websocket if self.options["connection_type"] in ("https", "both"): - wss = websockets.LiberviaPageWSProtocol.getResource(self, secure=True) + wss = websockets.LiberviaPageWSProtocol.getResource(secure=True) self.putChildAll(b'wss', wss) if self.options["connection_type"] in ("http", "both"): - ws = websockets.LiberviaPageWSProtocol.getResource(self, secure=False) + ws = websockets.LiberviaPageWSProtocol.getResource(secure=False) self.putChildAll(b'ws', ws) ## following signal is needed for cache handling in Libervia pages @@ -1210,7 +1211,7 @@ "psEventRaw", partial(LiberviaPage.onNodeEvent, self), "plugin" ) self.bridge.register_signal( - "messageNew", partial(LiberviaPage.onSignal, self, "messageNew") + "messageNew", partial(self.on_signal, "messageNew") ) # Progress handling @@ -1325,6 +1326,19 @@ getattr(self.bridge, method_name)(*args, **kwargs) return d + def on_signal(self, signal_name, *args): + profile = args[-1] + if not profile: + log.error(f"got signal without profile: {signal_name}, {args}") + return + try: + sockets = websockets.LiberviaPageWSProtocol.profile_map[profile] + except KeyError: + log.debug(f"no socket opened for profile {profile}") + return + for socket in sockets: + socket.send("bridge", {"signal": signal_name, "args": args}) + async def _logged(self, profile, request): """Set everything when a user just logged in @@ -1360,16 +1374,19 @@ ) ) - def onExpire(): + def on_expire(): log.info("Session expired (profile={profile})".format(profile=profile)) self.cache_resource.delEntity(sat_session.uuid.encode('utf-8')) log.debug( _("profile cache resource {uuid} deleted").format(uuid=sat_session.uuid) ) + sat_session.on_expire() + if sat_session.ws_socket is not None: + sat_session.ws_socket.close() # and now we disconnect the profile self.bridgeCall("disconnect", profile) - session.notifyOnExpire(onExpire) + session.notifyOnExpire(on_expire) # FIXME: those session infos should be returned by connect or isConnected infos = await self.bridgeCall("sessionInfosGet", profile) @@ -1768,6 +1785,11 @@ session = request.session if session is not None: log.debug(_("session purge")) + sat_session = self.getSessionData(request, session_iface.ISATSession) + socket = sat_session.ws_socket + if socket is not None: + socket.close() + session.ws_socket = None session.expire() # FIXME: not clean but it seems that it's the best way to reset # session during request handling @@ -1824,7 +1846,7 @@ ## Websocket (dynamic pages) ## - def getWebsocketURL(self, request): + def get_websocket_url(self, request): base_url_split = self.getExtBaseURLData(request) if base_url_split.scheme.endswith("s"): scheme = "wss" @@ -1833,12 +1855,6 @@ return self.getExtBaseURL(request, path=scheme, scheme=scheme) - def registerWSToken(self, token, page, request): - # we make a shallow copy of request to avoid losing request.channel when - # connection is lost (which would result as request.isSecure() being always - # False). See #327 - request._signal_id = id(request) - websockets.LiberviaPageWSProtocol.registerToken(token, page, copy.copy(request)) ## Various utils ##
--- a/libervia/server/session_iface.py Wed Mar 01 17:55:25 2023 +0100 +++ b/libervia/server/session_iface.py Wed Mar 01 18:02:44 2023 +0100 @@ -15,14 +15,22 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -from zope.interface import Interface, Attribute +from collections import OrderedDict, deque +import os.path +import time +from typing import List, Dict, Optional + +import shortuuid +from zope.interface import Attribute, Interface from zope.interface import implementer -from libervia.server.constants import Const as C + +from sat.core.log import getLogger + from libervia.server.classes import Notification -from collections import OrderedDict -import os.path -import shortuuid -import time +from libervia.server.constants import Const as C + + +log = getLogger(__name__) FLAGS_KEY = "_flags" NOTIFICATIONS_KEY = "_notifications" @@ -37,10 +45,11 @@ @implementer(ISATSession) -class SATSession(object): +class SATSession: + profiles_map: Dict[Optional[str], List["SATSession"]] = {} def __init__(self, session): - self.profile = None + self._profile = None self.jid = None self.started = time.time() # time when the backend session was started @@ -48,10 +57,39 @@ self.uuid = str(shortuuid.uuid()) self.identities = {} self.csrf_token = str(shortuuid.uuid()) + self.ws_token = str(shortuuid.uuid()) + self.ws_socket = None + self.ws_buffer = deque(maxlen=200) self.locale = None # i18n of the pages self.theme = None self.pages_data = {} # used to keep data accross reloads (key is page instance) self.affiliations = OrderedDict() # cache for node affiliations + self.profiles_map.setdefault(C.SERVICE_PROFILE, []).append(self) + log.debug( + f"session started for {C.SERVICE_PROFILE} " + f"({len(self.get_profile_sessions(C.SERVICE_PROFILE))} session(s) active)" + ) + + @property + def profile(self) -> Optional[str]: + return self._profile + + @profile.setter + def profile(self, profile: Optional[str]) -> None: + old_profile = self._profile or C.SERVICE_PROFILE + new_profile = profile or C.SERVICE_PROFILE + try: + self.profiles_map[old_profile].remove(self) + except (ValueError, KeyError): + log.warning(f"session was not registered for profile {old_profile}") + else: + nb_session_old = len(self.get_profile_sessions(old_profile)) + log.debug(f"{old_profile} has now {nb_session_old} session(s) active") + + self._profile = profile + self.profiles_map.setdefault(new_profile, []).append(self) + nb_session_new = len(self.get_profile_sessions(new_profile)) + log.debug(f"{new_profile} has now {nb_session_new} session(s) active") @property def cache_dir(self): @@ -71,6 +109,36 @@ else: return self.profile.startswith("guest@@") + @classmethod + def send(cls, profile: str, data_type: str, data: dict) -> None: + """Send a message to all session + + If the session doesn't have an active websocket, the message is buffered until a + socket is available + """ + for session in cls.profiles_map.get(profile, []): + if session.ws_socket is None or not session.ws_socket.init_ok: + session.ws_buffer.append({"data_type": data_type, "data": data}) + else: + session.ws_socket.send(data_type, data) + + def on_expire(self) -> None: + profile = self._profile or C.SERVICE_PROFILE + try: + self.profiles_map[profile].remove(self) + except (ValueError, KeyError): + log.warning(f"session was not registered for profile {profile}") + else: + nb_session = len(self.get_profile_sessions(profile)) + log.debug( + f"Session for profile {profile} expired. {profile} has now {nb_session} " + f"session(s) active." + ) + + @classmethod + def get_profile_sessions(cls, profile: str) -> List["SATSession"]: + return cls.profiles_map.get(profile, []) + def getPageData(self, page, key): """get session data for a page
--- a/libervia/server/websockets.py Wed Mar 01 17:55:25 2023 +0100 +++ b/libervia/server/websockets.py Wed Mar 01 18:02:44 2023 +0100 @@ -1,6 +1,5 @@ #!/usr/bin/env python3 - # Libervia: a Salut à Toi frontend # Copyright (C) 2011-2021 Jérôme Poisson <goffi@goffi.org> @@ -19,7 +18,8 @@ import json -from twisted.internet import error +from typing import Optional + from autobahn.twisted import websocket from autobahn.twisted import resource as resource from autobahn.websocket import types @@ -27,168 +27,195 @@ from sat.core.i18n import _ from sat.core.log import getLogger +from . import session_iface +from .constants import Const as C + log = getLogger(__name__) -LIBERVIA_PROTOCOL = "libervia_page" - - -class WebsocketRequest(object): - """Wrapper around autobahn's ConnectionRequest and Twisted's server.Request - - This is used to have a common interface in Libervia page with request object - """ - - def __init__(self, ws_protocol, connection_request, server_request): - """ - @param connection_request: websocket request - @param serveur_request: original request of the page - """ - self.ws_protocol = ws_protocol - self.ws_request = connection_request - if self.isSecure(): - cookie_name = "TWISTED_SECURE_SESSION" - else: - cookie_name = "TWISTED_SESSION" - cookie_value = server_request.getCookie(cookie_name.encode('utf-8')) - try: - raw_cookies = ws_protocol.http_headers['cookie'] - except KeyError: - raise ValueError("missing expected cookie header") - self.cookies = {k:v for k,v in (c.split('=') for c in raw_cookies.split(';'))} - if self.cookies[cookie_name] != cookie_value.decode('utf-8'): - raise exceptions.PermissionError( - "Bad cookie value, this should never happen.\n" - "headers: {headers}".format(headers=ws_protocol.http_headers)) - - self.template_data = server_request.template_data - self.data = server_request.data - self.session = server_request.getSession() - self._signals_registered = server_request._signals_registered - self._signals_cache = server_request._signals_cache - # signal id is needed to link original request with signal handler - self.signal_id = server_request._signal_id - - def isSecure(self): - return self.ws_protocol.factory.isSecure - - def getSession(self, sessionInterface=None): - try: - self.session.touch() - except (error.AlreadyCalled, error.AlreadyCancelled): - # Session has already expired. - self.session = None - - if sessionInterface: - return self.session.getComponent(sessionInterface) - - return self.session - - def sendData(self, type_, **data): - assert "type" not in data - data["type"] = type_ - self.ws_protocol.sendMessage(json.dumps(data, ensure_ascii=False).encode("utf8")) +host = None class LiberviaPageWSProtocol(websocket.WebSocketServerProtocol): - host = None - tokens_map = {} + + def __init__(self): + super().__init__() + self._init_ok: bool = False + self.__profile: Optional[str] = None + self.__session: Optional[session_iface.SATSession] = None + + @property + def init_ok(self): + return self._init_ok + + def send(self, data_type: str, data: dict) -> None: + """Send data to frontend""" + if not self._init_ok and data_type != "error": + raise exceptions.InternalError( + "send called when not initialized, this should not happend! Please use " + "SATSession.send which takes care of sending correctly the data to all " + "sessions." + ) + + data_root = { + "type": data_type, + "data": data + } + self.sendMessage(json.dumps(data_root, ensure_ascii=False).encode()) + + def close(self) -> None: + log.debug(f"closing websocket for profile {self.__profile}") + + def error(self, error_type: str, msg: str) -> None: + """Send an error message to frontend and log it locally""" + log.warning( + f"websocket error {error_type}: {msg}" + ) + self.send("error", { + "type": error_type, + "msg": msg, + }) def onConnect(self, request): - prefix = LIBERVIA_PROTOCOL + "_" - for protocol in request.protocols: - if protocol.startswith(prefix): - token = protocol[len(prefix) :].strip() - if token: - break - else: + if "libervia-page" not in request.protocols: raise types.ConnectionDeny( - types.ConnectionDeny.NOT_IMPLEMENTED, "Can't use this subprotocol" + types.ConnectionDeny.NOT_IMPLEMENTED, "No supported protocol" + ) + self._init_ok = False + cookies = {} + for cookie in request.headers.get("cookie", "").split(";"): + k, __, v = cookie.partition("=") + cookies[k.strip()] = v.strip() + session_uid = ( + cookies.get("TWISTED_SECURE_SESSION") + or cookies.get("TWISTED_SESSION") + or "" + ) + if not session_uid: + raise types.ConnectionDeny( + types.ConnectionDeny.FORBIDDEN, "No session set" + ) + try: + session = host.site.getSession(session_uid.encode()) + except KeyError: + raise types.ConnectionDeny( + types.ConnectionDeny.FORBIDDEN, "Invalid session" ) - if token not in self.tokens_map: - log.warning(_("Can't activate page socket: unknown token")) - raise types.ConnectionDeny( - types.ConnectionDeny.FORBIDDEN, "Bad token, please reload page" + session.touch() + session_data = session.getComponent(session_iface.ISATSession) + if session_data.ws_socket is not None: + log.warning("Session socket is already set, force closing it") + session_data.ws_socket.send( + "force_close", {"reason": "duplicate connection detected"} ) - self.token = token - token_map = self.tokens_map.pop(token) - self.page = token_map["page"] - self.request = WebsocketRequest(self, request, token_map["request"]) - return protocol + session_data.ws_socket = self + self.__session = session_data + self.__profile = session_data.profile or C.SERVICE_PROFILE + log.debug(f"websocket connection connected for profile {self.__profile}") + return "libervia-page" def onOpen(self): - log.debug( - _( - "Websocket opened for {page} (token: {token})".format( - page=self.page, token=self.token - ) - ) - ) - self.page.onSocketOpen(self.request) + log.debug("websocket connection opened") - def onMessage(self, payload, isBinary): + def onMessage(self, payload: bytes, isBinary: bool) -> None: + if self.__session is None: + raise exceptions.InternalError("empty session, this should never happen") try: - data_json = json.loads(payload.decode("utf8")) + data_full = json.loads(payload.decode()) + data_type = data_full["type"] + data = data_full["data"] except ValueError as e: - log.warning( - _("Not valid JSON, ignoring data: {msg}\n{data}").format( - msg=e, data=payload - ) + self.error( + "bad_request", + f"Not valid JSON, ignoring data ({e}): {payload!r}" + ) + return + except KeyError: + self.error( + "bad_request", + 'Invalid request (missing "type" or "data")' ) return - # we request page first, to raise an AttributeError - # if it is not set (which should never happen) - page = self.page - try: - cb = page.on_data - except AttributeError: - log.warning( - _( - 'No "on_data" method set on dynamic page, ignoring data:\n{data}' - ).format(data=data_json) + + if data_type == "init": + if self._init_ok: + self.error( + "bad_request", + "double init" + ) + self.sendClose(4400, "Bad Request") + return + + try: + profile = data["profile"] or C.SERVICE_PROFILE + token = data["token"] + except KeyError: + self.error( + "bad_request", + "Invalid init data (missing profile or token)" + ) + self.sendClose(4400, "Bad Request") + return + if (( + profile != self.__profile + or (token != self.__session.ws_token and profile != C.SERVICE_PROFILE) + )): + log.debug( + f"profile got {profile}, was expecting {self.__profile}, " + f"token got {token}, was expecting {self.__session.ws_token}, " + ) + self.error( + "Unauthorized", + "Invalid profile or token" + ) + self.sendClose(4401, "Unauthorized") + return + else: + log.debug(f"websocket connection initialized for {profile}") + self._init_ok = True + # we now send all cached data, if any + while True: + try: + session_kw = self.__session.ws_buffer.popleft() + except IndexError: + break + else: + self.send(**session_kw) + + if not self._init_ok: + self.error( + "Unauthorized", + "session not authorized" ) - else: - cb(page, self.request, data_json) + self.sendClose(4401, "Unauthorized") + return def onClose(self, wasClean, code, reason): - try: - page = self.page - except AttributeError: - log.debug( - "page is not available, the socket was probably not opened cleanly.\n" - "reason: {reason}".format(reason=reason)) - return - page.onSocketClose(self.request) + log.debug(f"closing websocket (profile: {self.__profile}, reason: {reason})") + if self.__profile is None: + log.error("self.__profile should not be None") + self.__profile = C.SERVICE_PROFILE - log.debug( - _( - "Websocket closed for {page} (token: {token}). {reason}".format( - page=self.page, - token=self.token, - reason="" - if wasClean - else _("Reason: {reason}").format(reason=reason), - ) - ) + if self.__session is None: + log.warning("closing a socket without attached session") + elif self.__session.ws_socket != self: + log.error("session socket is not linked to our instance") + else: + log.debug(f"reseting websocket session for {self.__profile}") + self.__session.ws_socket = None + sessions = session_iface.SATSession.get_profile_sessions(self.__profile) + log.debug(f"websocket connection for profile {self.__profile} closed") + self.__profile = None + + @classmethod + def getBaseURL(cls, secure): + return "ws{sec}://localhost:{port}".format( + sec="s" if secure else "", + port=host.options["port_https" if secure else "port"], ) @classmethod - def getBaseURL(cls, host, secure): - return "ws{sec}://localhost:{port}".format( - sec="s" if secure else "", - port=cls.host.options["port_https" if secure else "port"], - ) - - @classmethod - def getResource(cls, host, secure): - if cls.host is None: - cls.host = host - factory = websocket.WebSocketServerFactory(cls.getBaseURL(host, secure)) + def getResource(cls, secure): + factory = websocket.WebSocketServerFactory(cls.getBaseURL(secure)) factory.protocol = cls return resource.WebSocketResource(factory) - - @classmethod - def registerToken(cls, token, page, request): - if token in cls.tokens_map: - raise exceptions.ConflictError(_("This token is already registered")) - cls.tokens_map[token] = {"page": page, "request": request}