Mercurial > libervia-web
view libervia/web/server/session_iface.py @ 1576:c7d15ded4cbb
server (restricted_bridge): add `message_reactions_set` method.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 22 Nov 2023 16:31:36 +0100 |
parents | eb00d593801d |
children | 7941444c1671 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia: a SàT frontend # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from collections import OrderedDict, deque import os.path import time from typing import List, Dict, Optional import shortuuid from zope.interface import Attribute, Interface from zope.interface import implementer from libervia.backend.core.log import getLogger from libervia.web.server.classes import Notification from libervia.web.server.constants import Const as C log = getLogger(__name__) FLAGS_KEY = "_flags" NOTIFICATIONS_KEY = "_notifications" MAX_CACHE_AFFILIATIONS = 100 # number of nodes to keep in cache class IWebSession(Interface): profile = Attribute("Sat profile") jid = Attribute("JID associated with the profile") uuid = Attribute("uuid associated with the profile session") identities = Attribute("Identities of XMPP entities") @implementer(IWebSession) class WebSession: profiles_map: Dict[Optional[str], List["WebSession"]] = {} def __init__(self, session): self._profile = None self.jid = None self.started = time.time() # time when the backend session was started self.backend_started = None self.uuid = str(shortuuid.uuid()) self.identities = {} self.csrf_token = str(shortuuid.uuid()) self.ws_token = str(shortuuid.uuid()) self.ws_socket = None self.ws_buffer = deque(maxlen=200) self.locale = None # i18n of the pages self.theme = None self.pages_data = {} # used to keep data accross reloads (key is page instance) self.affiliations = OrderedDict() # cache for node affiliations self.profiles_map.setdefault(C.SERVICE_PROFILE, []).append(self) log.debug( f"session started for {C.SERVICE_PROFILE} " f"({len(self.get_profile_sessions(C.SERVICE_PROFILE))} session(s) active)" ) @property def profile(self) -> Optional[str]: return self._profile @profile.setter def profile(self, profile: Optional[str]) -> None: old_profile = self._profile or C.SERVICE_PROFILE new_profile = profile or C.SERVICE_PROFILE try: self.profiles_map[old_profile].remove(self) except (ValueError, KeyError): log.warning(f"session was not registered for profile {old_profile}") else: nb_session_old = len(self.get_profile_sessions(old_profile)) log.debug(f"{old_profile} has now {nb_session_old} session(s) active") self._profile = profile self.profiles_map.setdefault(new_profile, []).append(self) nb_session_new = len(self.get_profile_sessions(new_profile)) log.debug(f"{new_profile} has now {nb_session_new} session(s) active") @property def cache_dir(self): if self.profile is None: return self.service_cache_url + "/" return os.path.join("/", C.CACHE_DIR, self.uuid) + "/" @property def connected(self): return self.profile is not None @property def guest(self): """True if this is a guest session""" if self.profile is None: return False else: return self.profile.startswith("guest@@") @classmethod def send(cls, profile: str, data_type: str, data: dict) -> None: """Send a message to all session If the session doesn't have an active websocket, the message is buffered until a socket is available """ for session in cls.profiles_map.get(profile, []): if session.ws_socket is None or not session.ws_socket.init_ok: session.ws_buffer.append({"data_type": data_type, "data": data}) else: session.ws_socket.send(data_type, data) def on_expire(self) -> None: profile = self._profile or C.SERVICE_PROFILE try: self.profiles_map[profile].remove(self) except (ValueError, KeyError): log.warning(f"session was not registered for profile {profile}") else: nb_session = len(self.get_profile_sessions(profile)) log.debug( f"Session for profile {profile} expired. {profile} has now {nb_session} " f"session(s) active." ) @classmethod def get_profile_sessions(cls, profile: str) -> List["WebSession"]: return cls.profiles_map.get(profile, []) def get_page_data(self, page, key): """get session data for a page @param page(LiberviaPage): instance of the page @param key(object): data key return (None, object): value of the key None if not found or page_data doesn't exist """ return self.pages_data.get(page, {}).get(key) def pop_page_data(self, page, key, default=None): """like get_page_data, but remove key once value is gotten @param page(LiberviaPage): instance of the page @param key(object): data key @param default(object): value to return if key is not found @return (object): found value or default """ page_data = self.pages_data.get(page) if page_data is None: return default value = page_data.pop(key, default) if not page_data: # no need to keep unused page_data del self.pages_data[page] return value def set_page_data(self, page, key, value): """set data to persist on reload @param page(LiberviaPage): instance of the page @param key(object): data key @param value(object): value to set @return (object): set value """ page_data = self.pages_data.setdefault(page, {}) page_data[key] = value return value def set_page_flag(self, page, flag): """set a flag for this page @param page(LiberviaPage): instance of the page @param flag(unicode): flag to set """ flags = self.get_page_data(page, FLAGS_KEY) if flags is None: flags = self.set_page_data(page, FLAGS_KEY, set()) flags.add(flag) def pop_page_flag(self, page, flag): """return True if flag is set flag is removed if it was set @param page(LiberviaPage): instance of the page @param flag(unicode): flag to set @return (bool): True if flaag was set """ page_data = self.pages_data.get(page, {}) flags = page_data.get(FLAGS_KEY) if flags is None: return False if flag in flags: flags.remove(flag) # we remove data if they are not used anymore if not flags: del page_data[FLAGS_KEY] if not page_data: del self.pages_data[page] return True else: return False def set_page_notification(self, page, message, level=C.LVL_INFO): """set a flag for this page @param page(LiberviaPage): instance of the page @param flag(unicode): flag to set """ notif = Notification(message, level) notifs = self.get_page_data(page, NOTIFICATIONS_KEY) if notifs is None: notifs = self.set_page_data(page, NOTIFICATIONS_KEY, []) notifs.append(notif) def pop_page_notifications(self, page): """Return and remove last page notification @param page(LiberviaPage): instance of the page @return (list[Notification]): notifications if any """ page_data = self.pages_data.get(page, {}) notifs = page_data.get(NOTIFICATIONS_KEY) if not notifs: return [] ret = notifs[:] del notifs[:] return ret def get_affiliation(self, service, node): """retrieve affiliation for a pubsub node @param service(jid.JID): pubsub service @param node(unicode): pubsub node @return (unicode, None): affiliation, or None if it is not in cache """ if service.resource: raise ValueError("Service must not have a resource") if not node: raise ValueError("node must be set") try: affiliation = self.affiliations.pop((service, node)) except KeyError: return None else: # we replace at the top to get the most recently used on top # so less recently used will be removed if cache is full self.affiliations[(service, node)] = affiliation return affiliation def set_affiliation(self, service, node, affiliation): """cache affiliation for a node will empty cache when it become too big @param service(jid.JID): pubsub service @param node(unicode): pubsub node @param affiliation(unicode): affiliation to this node """ if service.resource: raise ValueError("Service must not have a resource") if not node: raise ValueError("node must be set") self.affiliations[(service, node)] = affiliation while len(self.affiliations) > MAX_CACHE_AFFILIATIONS: self.affiliations.popitem(last=False) class IWebGuestSession(Interface): id = Attribute("UUID of the guest") data = Attribute("data associated with the guest") @implementer(IWebGuestSession) class WebGuestSession(object): def __init__(self, session): self.id = None self.data = None