Mercurial > libervia-backend
view libervia/backend/plugins/plugin_pubsub_cache.py @ 4151:18026ce0819c
core (xmpp): message reception workflow refactoring:
- Call methods from a root async one instead of using Deferred callbacks chain.
- Use a queue to be sure to process messages in order.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 22 Nov 2023 14:50:35 +0100 |
parents | 4b842c1fb686 |
children | 2b000790b197 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin for PubSub Caching # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import time from datetime import datetime from typing import Optional, List, Tuple, Dict, Any from twisted.words.protocols.jabber import jid, error from twisted.words.xish import domish from twisted.internet import defer from wokkel import pubsub, rsm from libervia.backend.core.i18n import _ from libervia.backend.core.constants import Const as C from libervia.backend.core import exceptions from libervia.backend.core.log import getLogger from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.tools import xml_tools, utils from libervia.backend.tools.common import data_format from libervia.backend.memory.sqla import PubsubNode, PubsubItem, SyncState, IntegrityError log = getLogger(__name__) PLUGIN_INFO = { C.PI_NAME: "PubSub Cache", C.PI_IMPORT_NAME: "PUBSUB_CACHE", C.PI_TYPE: C.PLUG_TYPE_PUBSUB, C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: ["XEP-0059", "XEP-0060"], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "PubsubCache", C.PI_HANDLER: "no", C.PI_DESCRIPTION: _("""Local Cache for PubSub"""), } ANALYSER_KEYS_TO_COPY = ("name", "type", "to_sync", "parser") # maximum of items to cache CACHE_LIMIT = 5000 # number of second before a progress caching is considered failed and tried again PROGRESS_DEADLINE = 60 * 60 * 6 class PubsubCache: # TODO: there is currently no notification for (un)subscribe events with XEP-0060, # but it would be necessary to have this data if some devices unsubscribe a cached # node, as we can then get out of sync. A protoXEP could be proposed to fix this # situation. # TODO: handle configuration events def __init__(self, host): log.info(_("PubSub Cache initialization")) strategy = host.memory.config_get(None, "pubsub_cache_strategy") if strategy == "no_cache": log.info( _( "Pubsub cache won't be used due to pubsub_cache_strategy={value} " "setting." ).format(value=repr(strategy)) ) self.use_cache = False else: self.use_cache = True self.host = host self._p = host.plugins["XEP-0060"] self.analysers = {} # map for caching in progress (node, service) => Deferred self.in_progress = {} self.host.trigger.add("XEP-0060_getItems", self._get_items_trigger) self._p.add_managed_node( "", items_cb=self.on_items_event, delete_cb=self.on_delete_event, purge_db=self.on_purge_event, ) host.bridge.add_method( "ps_cache_get", ".plugin", in_sign="ssiassss", out_sign="s", method=self._get_items_from_cache, async_=True, ) host.bridge.add_method( "ps_cache_sync", ".plugin", "sss", out_sign="", method=self._synchronise, async_=True, ) host.bridge.add_method( "ps_cache_purge", ".plugin", "s", out_sign="", method=self._purge, async_=True, ) host.bridge.add_method( "ps_cache_reset", ".plugin", "", out_sign="", method=self._reset, async_=True, ) host.bridge.add_method( "ps_cache_search", ".plugin", "s", out_sign="s", method=self._search, async_=True, ) def register_analyser(self, analyser: dict) -> None: """Register a new pubsub node analyser @param analyser: An analyser is a dictionary which may have the following keys (keys with a ``*`` are mandatory, at least one of ``node`` or ``namespace`` keys must be used): :name (str)*: a unique name for this analyser. This name will be stored in database to retrieve the analyser when necessary (notably to get the parsing method), thus it is recommended to use a stable name such as the source plugin name instead of a name which may change with standard evolution, such as the feature namespace. :type (str)*: indicates what kind of items we are dealing with. Type must be a human readable word, as it may be used in searches. Good types examples are **blog** or **event**. :node (str): prefix of a node name which may be used to identify its type. Example: *urn:xmpp:microblog:0* (a node starting with this name will be identified as *blog* node). :namespace (str): root namespace of items. When analysing a node, the first item will be retrieved. The analyser will be chosen its given namespace match the namespace of the first child element of ``<item>`` element. :to_sync (bool): if True, the node must be synchronised in cache. The default False value means that the pubsub service will always be requested. :parser (callable): method (which may be sync, a coroutine or a method returning a "Deferred") to call to parse the ``domish.Element`` of the item. The result must be dictionary which can be serialised to JSON. The method must have the following signature: .. function:: parser(client: SatXMPPEntity, item_elt: domish.Element, \ service: Optional[jid.JID], node: Optional[str]) \ -> dict :noindex: :match_cb (callable): method (which may be sync, a coroutine or a method returning a "Deferred") called when the analyser matches. The method is called with the curreny analyse which is can modify **in-place**. The method must have the following signature: .. function:: match_cb(client: SatXMPPEntity, analyse: dict) -> None :noindex: @raise exceptions.Conflict: a analyser with this name already exists """ name = analyser.get("name", "").strip().lower() # we want the normalised name analyser["name"] = name if not name: raise ValueError('"name" is mandatory in analyser') if "type" not in analyser: raise ValueError('"type" is mandatory in analyser') type_test_keys = {"node", "namespace"} if not type_test_keys.intersection(analyser): raise ValueError(f'at least one of {type_test_keys} must be used') if name in self.analysers: raise exceptions.Conflict( f"An analyser with the name {name!r} is already registered" ) self.analysers[name] = analyser async def cache_items( self, client: SatXMPPEntity, pubsub_node: PubsubNode, items: List[domish.Element] ) -> None: try: parser = self.analysers[pubsub_node.analyser].get("parser") except KeyError: parser = None if parser is not None: parsed_items = [ await utils.as_deferred( parser, client, item, pubsub_node.service, pubsub_node.name ) for item in items ] else: parsed_items = None await self.host.memory.storage.cache_pubsub_items( client, pubsub_node, items, parsed_items ) async def _cache_node( self, client: SatXMPPEntity, pubsub_node: PubsubNode ) -> None: await self.host.memory.storage.update_pubsub_node_sync_state( pubsub_node, SyncState.IN_PROGRESS ) service, node = pubsub_node.service, pubsub_node.name try: log.debug( f"Caching node {node!r} at {service} for {client.profile}" ) if not pubsub_node.subscribed: try: sub = await self._p.subscribe(client, service, node) except Exception as e: log.warning( _( "Can't subscribe node {pubsub_node}, that means that " "synchronisation can't be maintained: {reason}" ).format(pubsub_node=pubsub_node, reason=e) ) else: if sub.state == "subscribed": sub_id = sub.subscriptionIdentifier log.debug( f"{pubsub_node} subscribed (subscription id: {sub_id!r})" ) pubsub_node.subscribed = True await self.host.memory.storage.add(pubsub_node) else: log.warning( _( "{pubsub_node} is not subscribed, that means that " "synchronisation can't be maintained, and you may have " "to enforce subscription manually. Subscription state: " "{state}" ).format(pubsub_node=pubsub_node, state=sub.state) ) try: await self.host.check_features( client, [rsm.NS_RSM, self._p.DISCO_RSM], pubsub_node.service ) except error.StanzaError as e: if e.condition == "service-unavailable": log.warning( "service {service} is hidding disco infos, we'll only cache " "latest 20 items" ) items, __ = await client.pubsub_client.items( pubsub_node.service, pubsub_node.name, maxItems=20 ) await self.cache_items( client, pubsub_node, items ) else: raise e except exceptions.FeatureNotFound: log.warning( f"service {service} doesn't handle Result Set Management " "(XEP-0059), we'll only cache latest 20 items" ) items, __ = await client.pubsub_client.items( pubsub_node.service, pubsub_node.name, maxItems=20 ) await self.cache_items( client, pubsub_node, items ) else: rsm_p = self.host.plugins["XEP-0059"] rsm_request = rsm.RSMRequest() cached_ids = set() while True: items, rsm_response = await client.pubsub_client.items( service, node, rsm_request=rsm_request ) await self.cache_items( client, pubsub_node, items ) for item in items: item_id = item["id"] if item_id in cached_ids: log.warning( f"Pubsub node {node!r} at {service} is returning several " f"times the same item ({item_id!r}). This is illegal " "behaviour, and it means that Pubsub service " f"{service} is buggy and can't be cached properly. " f"Please report this to {service.host} administrators" ) rsm_request = None break cached_ids.add(item["id"]) if len(cached_ids) >= CACHE_LIMIT: log.warning( f"Pubsub node {node!r} at {service} contains more items " f"than the cache limit ({CACHE_LIMIT}). We stop " "caching here, at item {item['id']!r}." ) rsm_request = None break rsm_request = rsm_p.get_next_request(rsm_request, rsm_response) if rsm_request is None: break await self.host.memory.storage.update_pubsub_node_sync_state( pubsub_node, SyncState.COMPLETED ) except Exception as e: import traceback tb = traceback.format_tb(e.__traceback__) log.error( f"Can't cache node {node!r} at {service} for {client.profile}: {e}\n{tb}" ) await self.host.memory.storage.update_pubsub_node_sync_state( pubsub_node, SyncState.ERROR ) await self.host.memory.storage.delete_pubsub_items(pubsub_node) raise e def _cache_node_clean(self, __, pubsub_node): del self.in_progress[(pubsub_node.service, pubsub_node.name)] def cache_node( self, client: SatXMPPEntity, pubsub_node: PubsubNode ) -> None: """Launch node caching as a background task""" d = defer.ensureDeferred(self._cache_node(client, pubsub_node)) d.addBoth(self._cache_node_clean, pubsub_node=pubsub_node) self.in_progress[(pubsub_node.service, pubsub_node.name)] = d return d async def analyse_node( self, client: SatXMPPEntity, service: jid.JID, node: str, pubsub_node : PubsubNode = None, ) -> dict: """Use registered analysers on a node to determine what it is used for""" analyse = {"service": service, "node": node} if pubsub_node is None: try: first_item = (await client.pubsub_client.items( service, node, 1 ))[0][0] except IndexError: pass except error.StanzaError as e: if e.condition == "item-not-found": pass else: log.warning( f"Can't retrieve last item on node {node!r} at service " f"{service} for {client.profile}: {e}" ) else: try: uri = first_item.firstChildElement().uri except Exception as e: log.warning( f"Can't retrieve item namespace on node {node!r} at service " f"{service} for {client.profile}: {e}" ) else: analyse["namespace"] = uri try: conf = await self._p.getConfiguration(client, service, node) except Exception as e: log.warning( f"Can't retrieve configuration for node {node!r} at service {service} " f"for {client.profile}: {e}" ) else: analyse["conf"] = conf for analyser in self.analysers.values(): try: an_node = analyser["node"] except KeyError: pass else: if node.startswith(an_node): for key in ANALYSER_KEYS_TO_COPY: try: analyse[key] = analyser[key] except KeyError: pass found = True break try: namespace = analyse["namespace"] an_namespace = analyser["namespace"] except KeyError: pass else: if namespace == an_namespace: for key in ANALYSER_KEYS_TO_COPY: try: analyse[key] = analyser[key] except KeyError: pass found = True break else: found = False log.debug( f"node {node!r} at service {service} doesn't match any known type" ) if found: try: match_cb = analyser["match_cb"] except KeyError: pass else: await utils.as_deferred(match_cb, client, analyse) return analyse def _get_items_from_cache( self, service="", node="", max_items=10, item_ids=None, sub_id=None, extra="", profile_key=C.PROF_KEY_NONE ): d = defer.ensureDeferred(self._a_get_items_from_cache( service, node, max_items, item_ids, sub_id, extra, profile_key )) d.addCallback(self._p.trans_items_data) d.addCallback(self._p.serialise_items) return d async def _a_get_items_from_cache( self, service, node, max_items, item_ids, sub_id, extra, profile_key ): client = self.host.get_client(profile_key) service = jid.JID(service) if service else client.jid.userhostJID() pubsub_node = await self.host.memory.storage.get_pubsub_node( client, service, node ) if pubsub_node is None: raise exceptions.NotFound( f"{node!r} at {service} doesn't exist in cache for {client.profile!r}" ) max_items = None if max_items == C.NO_LIMIT else max_items extra = self._p.parse_extra(data_format.deserialise(extra)) items, metadata = await self.get_items_from_cache( client, pubsub_node, max_items, item_ids, sub_id or None, extra.rsm_request, extra.extra, ) return [i.data for i in items], metadata async def get_items_from_cache( self, client: SatXMPPEntity, node: PubsubNode, max_items: Optional[int] = None, item_ids: Optional[List[str]] = None, sub_id: Optional[str] = None, rsm_request: Optional[rsm.RSMRequest] = None, extra: Optional[Dict[str, Any]] = None ) -> Tuple[List[PubsubItem], dict]: """Get items from cache, using same arguments as for external Pubsub request""" if extra is None: extra = {} if "mam" in extra: raise NotImplementedError("MAM queries are not supported yet") if max_items is None and rsm_request is None: max_items = 20 pubsub_items, metadata = await self.host.memory.storage.get_items( node, max_items=max_items, item_ids=item_ids or None, order_by=extra.get(C.KEY_ORDER_BY) ) elif max_items is not None: if rsm_request is not None: raise exceptions.InternalError( "Pubsub max items and RSM must not be used at the same time" ) elif item_ids: raise exceptions.InternalError( "Pubsub max items and item IDs must not be used at the same time" ) pubsub_items, metadata = await self.host.memory.storage.get_items( node, max_items=max_items, order_by=extra.get(C.KEY_ORDER_BY) ) else: desc = False if rsm_request.before == "": before = None desc = True else: before = rsm_request.before pubsub_items, metadata = await self.host.memory.storage.get_items( node, max_items=rsm_request.max, before=before, after=rsm_request.after, from_index=rsm_request.index, order_by=extra.get(C.KEY_ORDER_BY), desc=desc, force_rsm=True, ) return pubsub_items, metadata async def on_items_event(self, client, event): node = await self.host.memory.storage.get_pubsub_node( client, event.sender, event.nodeIdentifier ) if node is None: return if node.sync_state in (SyncState.COMPLETED, SyncState.IN_PROGRESS): items = [] retract_ids = [] for elt in event.items: if elt.name == "item": items.append(elt) elif elt.name == "retract": item_id = elt.getAttribute("id") if not item_id: log.warning( "Ignoring invalid retract item element: " f"{xml_tools.p_fmt_elt(elt)}" ) continue retract_ids.append(elt["id"]) else: log.warning( f"Unexpected Pubsub event element: {xml_tools.p_fmt_elt(elt)}" ) if items: log.debug(f"[{client.profile}] caching new items received from {node}") await self.cache_items( client, node, items ) if retract_ids: log.debug(f"deleting retracted items from {node}") await self.host.memory.storage.delete_pubsub_items( node, items_names=retract_ids ) async def on_delete_event(self, client, event): log.debug( f"deleting node {event.nodeIdentifier} from {event.sender} for " f"{client.profile}" ) await self.host.memory.storage.delete_pubsub_node( [client.profile], [event.sender], [event.nodeIdentifier] ) async def on_purge_event(self, client, event): node = await self.host.memory.storage.get_pubsub_node( client, event.sender, event.nodeIdentifier ) if node is None: return log.debug(f"purging node {node} for {client.profile}") await self.host.memory.storage.delete_pubsub_items(node) async def _get_items_trigger( self, client: SatXMPPEntity, service: Optional[jid.JID], node: str, max_items: Optional[int], item_ids: Optional[List[str]], sub_id: Optional[str], rsm_request: Optional[rsm.RSMRequest], extra: dict ) -> Tuple[bool, Optional[Tuple[List[dict], dict]]]: if not self.use_cache: log.debug("cache disabled in settings") return True, None if extra.get(C.KEY_USE_CACHE) == False: log.debug("skipping pubsub cache as requested") return True, None if service is None: service = client.jid.userhostJID() for __ in range(5): pubsub_node = await self.host.memory.storage.get_pubsub_node( client, service, node ) if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED: analyse = {"to_sync": True} else: analyse = await self.analyse_node(client, service, node) if pubsub_node is None: try: pubsub_node = await self.host.memory.storage.set_pubsub_node( client, service, node, analyser=analyse.get("name"), type_=analyse.get("type"), subtype=analyse.get("subtype"), ) except IntegrityError as e: if "unique" in str(e.orig).lower(): log.debug( "race condition on pubsub node creation in cache, trying " "again" ) else: raise e break else: raise exceptions.InternalError( "Too many IntegrityError with UNIQUE constraint, something is going wrong" ) if analyse.get("to_sync"): if pubsub_node.sync_state == SyncState.COMPLETED: if "mam" in extra: log.debug("MAM caching is not supported yet, skipping cache") return True, None pubsub_items, metadata = await self.get_items_from_cache( client, pubsub_node, max_items, item_ids, sub_id, rsm_request, extra ) return False, ([i.data for i in pubsub_items], metadata) if pubsub_node.sync_state == SyncState.IN_PROGRESS: if (service, node) not in self.in_progress: log.warning( f"{pubsub_node} is reported as being cached, but not caching is " "in progress, this is most probably due to the backend being " "restarted. Resetting the status, caching will be done again." ) pubsub_node.sync_state = None await self.host.memory.storage.delete_pubsub_items(pubsub_node) elif time.time() - pubsub_node.sync_state_updated > PROGRESS_DEADLINE: log.warning( f"{pubsub_node} is in progress for too long " f"({pubsub_node.sync_state_updated//60} minutes), " "cancelling it and retrying." ) self.in_progress.pop[(service, node)].cancel() pubsub_node.sync_state = None await self.host.memory.storage.delete_pubsub_items(pubsub_node) else: log.debug( f"{pubsub_node} synchronisation is already in progress, skipping" ) if pubsub_node.sync_state is None: key = (service, node) if key in self.in_progress: raise exceptions.InternalError( f"There is already a caching in progress for {pubsub_node}, this " "should not happen" ) self.cache_node(client, pubsub_node) elif pubsub_node.sync_state == SyncState.ERROR: log.debug( f"{pubsub_node} synchronisation has previously failed, skipping" ) return True, None async def _subscribe_trigger( self, client: SatXMPPEntity, service: jid.JID, nodeIdentifier: str, sub_jid: Optional[jid.JID], options: Optional[dict], subscription: pubsub.Subscription ) -> None: pass async def _unsubscribe_trigger( self, client: SatXMPPEntity, service: jid.JID, nodeIdentifier: str, sub_jid, subscriptionIdentifier, sender, ) -> None: pass def _synchronise(self, service, node, profile_key): client = self.host.get_client(profile_key) service = client.jid.userhostJID() if not service else jid.JID(service) return defer.ensureDeferred(self.synchronise(client, service, node)) async def synchronise( self, client: SatXMPPEntity, service: jid.JID, node: str, resync: bool = True ) -> None: """Synchronise a node with a pubsub service The node will be synchronised even if there is no matching analyser. Note that when a node is synchronised, it is automatically subscribed. @param resync: if True and the node is already synchronised, it will be resynchronised (all items will be deleted and re-downloaded). """ pubsub_node = await self.host.memory.storage.get_pubsub_node( client, service, node ) if pubsub_node is None: log.info( _( "Synchronising the new node {node} at {service}" ).format(node=node, service=service.full) ) analyse = await self.analyse_node(client, service, node) pubsub_node = await self.host.memory.storage.set_pubsub_node( client, service, node, analyser=analyse.get("name"), type_=analyse.get("type"), ) elif not resync and pubsub_node.sync_state is not None: # the node exists, nothing to do return if ((pubsub_node.sync_state == SyncState.IN_PROGRESS or (service, node) in self.in_progress)): log.warning( _( "{node} at {service} is already being synchronised, can't do a new " "synchronisation." ).format(node=node, service=service) ) else: log.info( _( "(Re)Synchronising the node {node} at {service} on user request" ).format(node=node, service=service.full()) ) # we first delete and recreate the node (will also delete its items) await self.host.memory.storage.delete(pubsub_node) analyse = await self.analyse_node(client, service, node) pubsub_node = await self.host.memory.storage.set_pubsub_node( client, service, node, analyser=analyse.get("name"), type_=analyse.get("type"), ) # then we can put node in cache await self.cache_node(client, pubsub_node) async def purge(self, purge_filters: dict) -> None: """Remove items according to filters filters can have on of the following keys, all are optional: :services: list of JIDs of services from which items must be deleted :nodes: list of node names to delete :types: list of node types to delete :subtypes: list of node subtypes to delete :profiles: list of profiles from which items must be deleted :created_before: datetime before which items must have been created to be deleted :created_update: datetime before which items must have been updated last to be deleted """ purge_filters["names"] = purge_filters.pop("nodes", None) await self.host.memory.storage.purge_pubsub_items(**purge_filters) def _purge(self, purge_filters: str) -> None: purge_filters = data_format.deserialise(purge_filters) for key in "created_before", "updated_before": try: purge_filters[key] = datetime.fromtimestamp(purge_filters[key]) except (KeyError, TypeError): pass return defer.ensureDeferred(self.purge(purge_filters)) async def reset(self) -> None: """Remove ALL nodes and items from cache After calling this method, cache will be refilled progressively as if it where new """ await self.host.memory.storage.delete_pubsub_node(None, None, None) def _reset(self) -> defer.Deferred: return defer.ensureDeferred(self.reset()) async def search(self, query: dict) -> List[PubsubItem]: """Search pubsub items in cache""" return await self.host.memory.storage.search_pubsub_items(query) async def serialisable_search(self, query: dict) -> List[dict]: """Search pubsub items in cache and returns parsed data The returned data can be serialised. "pubsub_service" and "pubsub_name" will be added to each data (both as strings) """ items = await self.search(query) ret = [] for item in items: parsed = item.parsed parsed["pubsub_service"] = item.node.service.full() parsed["pubsub_node"] = item.node.name if query.get("with_payload"): parsed["item_payload"] = item.data.toXml() parsed["node_profile"] = self.host.memory.storage.get_profile_by_id( item.node.profile_id ) ret.append(parsed) return ret def _search(self, query: str) -> defer.Deferred: query = data_format.deserialise(query) services = query.get("services") if services: query["services"] = [jid.JID(s) for s in services] d = defer.ensureDeferred(self.serialisable_search(query)) d.addCallback(data_format.serialise) return d