diff libervia/backend/plugins/plugin_pubsub_cache.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_pubsub_cache.py@524856bd7b19
children 2b000790b197
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_pubsub_cache.py	Fri Jun 02 11:49:51 2023 +0200
@@ -0,0 +1,861 @@
+#!/usr/bin/env python3
+
+# Libervia plugin for PubSub Caching
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+import time
+from datetime import datetime
+from typing import Optional, List, Tuple, Dict, Any
+from twisted.words.protocols.jabber import jid, error
+from twisted.words.xish import domish
+from twisted.internet import defer
+from wokkel import pubsub, rsm
+from libervia.backend.core.i18n import _
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core import exceptions
+from libervia.backend.core.log import getLogger
+from libervia.backend.core.core_types import SatXMPPEntity
+from libervia.backend.tools import xml_tools, utils
+from libervia.backend.tools.common import data_format
+from libervia.backend.memory.sqla import PubsubNode, PubsubItem, SyncState, IntegrityError
+
+
+log = getLogger(__name__)
+
+PLUGIN_INFO = {
+    C.PI_NAME: "PubSub Cache",
+    C.PI_IMPORT_NAME: "PUBSUB_CACHE",
+    C.PI_TYPE: C.PLUG_TYPE_PUBSUB,
+    C.PI_MODES: C.PLUG_MODE_BOTH,
+    C.PI_PROTOCOLS: [],
+    C.PI_DEPENDENCIES: ["XEP-0059", "XEP-0060"],
+    C.PI_RECOMMENDATIONS: [],
+    C.PI_MAIN: "PubsubCache",
+    C.PI_HANDLER: "no",
+    C.PI_DESCRIPTION: _("""Local Cache for PubSub"""),
+}
+
+ANALYSER_KEYS_TO_COPY = ("name", "type", "to_sync", "parser")
+# maximum of items to cache
+CACHE_LIMIT = 5000
+# number of second before a progress caching is considered failed and tried again
+PROGRESS_DEADLINE = 60 * 60 * 6
+
+
+
+class PubsubCache:
+    # TODO: there is currently no notification for (un)subscribe events with XEP-0060,
+    #   but it would be necessary to have this data if some devices unsubscribe a cached
+    #   node, as we can then get out of sync. A protoXEP could be proposed to fix this
+    #   situation.
+    # TODO: handle configuration events
+
+    def __init__(self, host):
+        log.info(_("PubSub Cache initialization"))
+        strategy = host.memory.config_get(None, "pubsub_cache_strategy")
+        if strategy == "no_cache":
+            log.info(
+                _(
+                    "Pubsub cache won't be used due to pubsub_cache_strategy={value} "
+                    "setting."
+                ).format(value=repr(strategy))
+            )
+            self.use_cache = False
+        else:
+            self.use_cache = True
+        self.host = host
+        self._p = host.plugins["XEP-0060"]
+        self.analysers = {}
+        # map for caching in progress (node, service) => Deferred
+        self.in_progress = {}
+        self.host.trigger.add("XEP-0060_getItems", self._get_items_trigger)
+        self._p.add_managed_node(
+            "",
+            items_cb=self.on_items_event,
+            delete_cb=self.on_delete_event,
+            purge_db=self.on_purge_event,
+        )
+        host.bridge.add_method(
+            "ps_cache_get",
+            ".plugin",
+            in_sign="ssiassss",
+            out_sign="s",
+            method=self._get_items_from_cache,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "ps_cache_sync",
+            ".plugin",
+            "sss",
+            out_sign="",
+            method=self._synchronise,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "ps_cache_purge",
+            ".plugin",
+            "s",
+            out_sign="",
+            method=self._purge,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "ps_cache_reset",
+            ".plugin",
+            "",
+            out_sign="",
+            method=self._reset,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "ps_cache_search",
+            ".plugin",
+            "s",
+            out_sign="s",
+            method=self._search,
+            async_=True,
+        )
+
+    def register_analyser(self, analyser: dict) -> None:
+        """Register a new pubsub node analyser
+
+        @param analyser: An analyser is a dictionary which may have the following keys
+        (keys with a ``*`` are mandatory, at least one of ``node`` or ``namespace`` keys
+        must be used):
+
+            :name (str)*:
+              a unique name for this analyser. This name will be stored in database
+              to retrieve the analyser when necessary (notably to get the parsing method),
+              thus it is recommended to use a stable name such as the source plugin name
+              instead of a name which may change with standard evolution, such as the
+              feature namespace.
+
+            :type (str)*:
+              indicates what kind of items we are dealing with. Type must be a human
+              readable word, as it may be used in searches. Good types examples are
+              **blog** or **event**.
+
+            :node (str):
+              prefix of a node name which may be used to identify its type. Example:
+              *urn:xmpp:microblog:0* (a node starting with this name will be identified as
+              *blog* node).
+
+            :namespace (str):
+              root namespace of items. When analysing a node, the first item will be
+              retrieved. The analyser will be chosen its given namespace match the
+              namespace of the first child element of ``<item>`` element.
+
+            :to_sync (bool):
+              if True, the node must be synchronised in cache. The default False value
+              means that the pubsub service will always be requested.
+
+            :parser (callable):
+              method (which may be sync, a coroutine or a method returning a "Deferred")
+              to call to parse the ``domish.Element`` of the item. The result must be
+              dictionary which can be serialised to JSON.
+
+              The method must have the following signature:
+
+              .. function:: parser(client: SatXMPPEntity, item_elt: domish.Element, \
+                                   service: Optional[jid.JID], node: Optional[str]) \
+                                   -> dict
+                :noindex:
+
+            :match_cb (callable):
+              method (which may be sync, a coroutine or a method returning a "Deferred")
+              called when the analyser matches. The method is called with the curreny
+              analyse which is can modify **in-place**.
+
+              The method must have the following signature:
+
+              .. function:: match_cb(client: SatXMPPEntity, analyse: dict) -> None
+                :noindex:
+
+        @raise exceptions.Conflict: a analyser with this name already exists
+        """
+
+        name = analyser.get("name", "").strip().lower()
+        # we want the normalised name
+        analyser["name"] = name
+        if not name:
+            raise ValueError('"name" is mandatory in analyser')
+        if "type" not in analyser:
+            raise ValueError('"type" is mandatory in analyser')
+        type_test_keys = {"node", "namespace"}
+        if not type_test_keys.intersection(analyser):
+            raise ValueError(f'at least one of {type_test_keys} must be used')
+        if name in self.analysers:
+            raise exceptions.Conflict(
+                f"An analyser with the name {name!r} is already registered"
+            )
+        self.analysers[name] = analyser
+
+    async def cache_items(
+        self,
+        client: SatXMPPEntity,
+        pubsub_node: PubsubNode,
+        items: List[domish.Element]
+    ) -> None:
+        try:
+            parser = self.analysers[pubsub_node.analyser].get("parser")
+        except KeyError:
+            parser = None
+
+        if parser is not None:
+            parsed_items = [
+                await utils.as_deferred(
+                    parser,
+                    client,
+                    item,
+                    pubsub_node.service,
+                    pubsub_node.name
+                )
+                for item in items
+            ]
+        else:
+            parsed_items = None
+
+        await self.host.memory.storage.cache_pubsub_items(
+            client, pubsub_node, items, parsed_items
+        )
+
+    async def _cache_node(
+        self,
+        client: SatXMPPEntity,
+        pubsub_node: PubsubNode
+    ) -> None:
+        await self.host.memory.storage.update_pubsub_node_sync_state(
+            pubsub_node, SyncState.IN_PROGRESS
+        )
+        service, node = pubsub_node.service, pubsub_node.name
+        try:
+            log.debug(
+                f"Caching node {node!r} at {service} for {client.profile}"
+            )
+            if not pubsub_node.subscribed:
+                try:
+                    sub = await self._p.subscribe(client, service, node)
+                except Exception as e:
+                    log.warning(
+                        _(
+                            "Can't subscribe node {pubsub_node}, that means that "
+                            "synchronisation can't be maintained: {reason}"
+                        ).format(pubsub_node=pubsub_node, reason=e)
+                    )
+                else:
+                    if sub.state == "subscribed":
+                        sub_id = sub.subscriptionIdentifier
+                        log.debug(
+                            f"{pubsub_node} subscribed (subscription id: {sub_id!r})"
+                        )
+                        pubsub_node.subscribed = True
+                        await self.host.memory.storage.add(pubsub_node)
+                    else:
+                        log.warning(
+                            _(
+                                "{pubsub_node} is not subscribed, that means that "
+                                "synchronisation can't be maintained, and you may have "
+                                "to enforce subscription manually. Subscription state: "
+                                "{state}"
+                            ).format(pubsub_node=pubsub_node, state=sub.state)
+                        )
+
+            try:
+                await self.host.check_features(
+                    client, [rsm.NS_RSM, self._p.DISCO_RSM], pubsub_node.service
+                )
+            except error.StanzaError as e:
+                if e.condition == "service-unavailable":
+                    log.warning(
+                        "service {service} is hidding disco infos, we'll only cache "
+                        "latest 20 items"
+                    )
+                    items, __ = await client.pubsub_client.items(
+                        pubsub_node.service, pubsub_node.name, maxItems=20
+                    )
+                    await self.cache_items(
+                        client, pubsub_node, items
+                    )
+                else:
+                    raise e
+            except exceptions.FeatureNotFound:
+                log.warning(
+                    f"service {service} doesn't handle Result Set Management "
+                    "(XEP-0059), we'll only cache latest 20 items"
+                )
+                items, __ = await client.pubsub_client.items(
+                    pubsub_node.service, pubsub_node.name, maxItems=20
+                )
+                await self.cache_items(
+                    client, pubsub_node, items
+                )
+            else:
+                rsm_p = self.host.plugins["XEP-0059"]
+                rsm_request = rsm.RSMRequest()
+                cached_ids = set()
+                while True:
+                    items, rsm_response = await client.pubsub_client.items(
+                        service, node, rsm_request=rsm_request
+                    )
+                    await self.cache_items(
+                        client, pubsub_node, items
+                    )
+                    for item in items:
+                        item_id = item["id"]
+                        if item_id in cached_ids:
+                            log.warning(
+                                f"Pubsub node {node!r} at {service} is returning several "
+                                f"times the same item ({item_id!r}). This is illegal "
+                                "behaviour, and it means that Pubsub service "
+                                f"{service} is buggy and can't be cached properly. "
+                                f"Please report this to {service.host} administrators"
+                            )
+                            rsm_request = None
+                            break
+                        cached_ids.add(item["id"])
+                        if len(cached_ids) >= CACHE_LIMIT:
+                            log.warning(
+                                f"Pubsub node {node!r} at {service} contains more items "
+                                f"than the cache limit ({CACHE_LIMIT}). We stop "
+                                "caching here, at item {item['id']!r}."
+                            )
+                            rsm_request = None
+                            break
+                    rsm_request = rsm_p.get_next_request(rsm_request, rsm_response)
+                    if rsm_request is None:
+                        break
+
+            await self.host.memory.storage.update_pubsub_node_sync_state(
+                pubsub_node, SyncState.COMPLETED
+            )
+        except Exception as e:
+            import traceback
+            tb = traceback.format_tb(e.__traceback__)
+            log.error(
+                f"Can't cache node {node!r} at {service} for {client.profile}: {e}\n{tb}"
+            )
+            await self.host.memory.storage.update_pubsub_node_sync_state(
+                pubsub_node, SyncState.ERROR
+            )
+            await self.host.memory.storage.delete_pubsub_items(pubsub_node)
+            raise e
+
+    def _cache_node_clean(self, __, pubsub_node):
+        del self.in_progress[(pubsub_node.service, pubsub_node.name)]
+
+    def cache_node(
+        self,
+        client: SatXMPPEntity,
+        pubsub_node: PubsubNode
+    ) -> None:
+        """Launch node caching as a background task"""
+        d = defer.ensureDeferred(self._cache_node(client, pubsub_node))
+        d.addBoth(self._cache_node_clean, pubsub_node=pubsub_node)
+        self.in_progress[(pubsub_node.service, pubsub_node.name)] = d
+        return d
+
+    async def analyse_node(
+        self,
+        client: SatXMPPEntity,
+        service: jid.JID,
+        node: str,
+        pubsub_node : PubsubNode = None,
+    ) -> dict:
+        """Use registered analysers on a node to determine what it is used for"""
+        analyse = {"service": service, "node": node}
+        if pubsub_node is None:
+            try:
+                first_item = (await client.pubsub_client.items(
+                    service, node, 1
+                ))[0][0]
+            except IndexError:
+                pass
+            except error.StanzaError as e:
+                if e.condition == "item-not-found":
+                    pass
+                else:
+                    log.warning(
+                        f"Can't retrieve last item on node {node!r} at service "
+                        f"{service} for {client.profile}: {e}"
+                    )
+            else:
+                try:
+                    uri = first_item.firstChildElement().uri
+                except Exception as e:
+                    log.warning(
+                        f"Can't retrieve item namespace on node {node!r} at service "
+                        f"{service} for {client.profile}: {e}"
+                    )
+                else:
+                    analyse["namespace"] = uri
+            try:
+                conf = await self._p.getConfiguration(client, service, node)
+            except Exception as e:
+                log.warning(
+                    f"Can't retrieve configuration for node {node!r} at service {service} "
+                    f"for {client.profile}: {e}"
+                )
+            else:
+                analyse["conf"] = conf
+
+        for analyser in self.analysers.values():
+            try:
+                an_node = analyser["node"]
+            except KeyError:
+                pass
+            else:
+                if node.startswith(an_node):
+                    for key in ANALYSER_KEYS_TO_COPY:
+                        try:
+                            analyse[key] = analyser[key]
+                        except KeyError:
+                            pass
+                    found = True
+                    break
+            try:
+                namespace = analyse["namespace"]
+                an_namespace = analyser["namespace"]
+            except KeyError:
+                pass
+            else:
+                if namespace == an_namespace:
+                    for key in ANALYSER_KEYS_TO_COPY:
+                        try:
+                            analyse[key] = analyser[key]
+                        except KeyError:
+                            pass
+                    found = True
+                    break
+
+        else:
+            found = False
+            log.debug(
+                f"node {node!r} at service {service} doesn't match any known type"
+            )
+        if found:
+            try:
+                match_cb = analyser["match_cb"]
+            except KeyError:
+                pass
+            else:
+                await utils.as_deferred(match_cb, client, analyse)
+        return analyse
+
+    def _get_items_from_cache(
+        self, service="", node="", max_items=10, item_ids=None, sub_id=None,
+        extra="", profile_key=C.PROF_KEY_NONE
+    ):
+        d = defer.ensureDeferred(self._a_get_items_from_cache(
+            service, node, max_items, item_ids, sub_id, extra, profile_key
+        ))
+        d.addCallback(self._p.trans_items_data)
+        d.addCallback(self._p.serialise_items)
+        return d
+
+    async def _a_get_items_from_cache(
+        self, service, node, max_items, item_ids, sub_id, extra, profile_key
+    ):
+        client = self.host.get_client(profile_key)
+        service = jid.JID(service) if service else client.jid.userhostJID()
+        pubsub_node = await self.host.memory.storage.get_pubsub_node(
+            client, service, node
+        )
+        if pubsub_node is None:
+            raise exceptions.NotFound(
+                f"{node!r} at {service} doesn't exist in cache for {client.profile!r}"
+            )
+        max_items = None if max_items == C.NO_LIMIT else max_items
+        extra = self._p.parse_extra(data_format.deserialise(extra))
+        items, metadata = await self.get_items_from_cache(
+            client,
+            pubsub_node,
+            max_items,
+            item_ids,
+            sub_id or None,
+            extra.rsm_request,
+            extra.extra,
+        )
+        return [i.data for i in items], metadata
+
+    async def get_items_from_cache(
+        self,
+        client: SatXMPPEntity,
+        node: PubsubNode,
+        max_items: Optional[int] = None,
+        item_ids: Optional[List[str]] = None,
+        sub_id: Optional[str] = None,
+        rsm_request: Optional[rsm.RSMRequest] = None,
+        extra: Optional[Dict[str, Any]] = None
+    ) -> Tuple[List[PubsubItem], dict]:
+        """Get items from cache, using same arguments as for external Pubsub request"""
+        if extra is None:
+            extra = {}
+        if "mam" in extra:
+            raise NotImplementedError("MAM queries are not supported yet")
+        if max_items is None and rsm_request is None:
+            max_items = 20
+            pubsub_items, metadata = await self.host.memory.storage.get_items(
+                node, max_items=max_items, item_ids=item_ids or None,
+                order_by=extra.get(C.KEY_ORDER_BY)
+            )
+        elif max_items is not None:
+            if rsm_request is not None:
+                raise exceptions.InternalError(
+                    "Pubsub max items and RSM must not be used at the same time"
+                )
+            elif item_ids:
+                raise exceptions.InternalError(
+                    "Pubsub max items and item IDs must not be used at the same time"
+                )
+            pubsub_items, metadata = await self.host.memory.storage.get_items(
+                node, max_items=max_items, order_by=extra.get(C.KEY_ORDER_BY)
+            )
+        else:
+            desc = False
+            if rsm_request.before == "":
+                before = None
+                desc = True
+            else:
+                before = rsm_request.before
+            pubsub_items, metadata = await self.host.memory.storage.get_items(
+                node, max_items=rsm_request.max, before=before, after=rsm_request.after,
+                from_index=rsm_request.index, order_by=extra.get(C.KEY_ORDER_BY),
+                desc=desc, force_rsm=True,
+            )
+
+        return pubsub_items, metadata
+
+    async def on_items_event(self, client, event):
+        node = await self.host.memory.storage.get_pubsub_node(
+            client, event.sender, event.nodeIdentifier
+        )
+        if node is None:
+            return
+        if node.sync_state in (SyncState.COMPLETED, SyncState.IN_PROGRESS):
+            items = []
+            retract_ids = []
+            for elt in event.items:
+                if elt.name == "item":
+                    items.append(elt)
+                elif elt.name == "retract":
+                    item_id = elt.getAttribute("id")
+                    if not item_id:
+                        log.warning(
+                            "Ignoring invalid retract item element: "
+                            f"{xml_tools.p_fmt_elt(elt)}"
+                        )
+                        continue
+
+                    retract_ids.append(elt["id"])
+                else:
+                    log.warning(
+                        f"Unexpected Pubsub event element: {xml_tools.p_fmt_elt(elt)}"
+                    )
+            if items:
+                log.debug(f"[{client.profile}] caching new items received from {node}")
+                await self.cache_items(
+                    client, node, items
+                )
+            if retract_ids:
+                log.debug(f"deleting retracted items from {node}")
+                await self.host.memory.storage.delete_pubsub_items(
+                    node, items_names=retract_ids
+                )
+
+    async def on_delete_event(self, client, event):
+        log.debug(
+            f"deleting node {event.nodeIdentifier} from {event.sender} for "
+            f"{client.profile}"
+        )
+        await self.host.memory.storage.delete_pubsub_node(
+            [client.profile], [event.sender], [event.nodeIdentifier]
+        )
+
+    async def on_purge_event(self, client, event):
+        node = await self.host.memory.storage.get_pubsub_node(
+            client, event.sender, event.nodeIdentifier
+        )
+        if node is None:
+            return
+        log.debug(f"purging node {node} for {client.profile}")
+        await self.host.memory.storage.delete_pubsub_items(node)
+
+    async def _get_items_trigger(
+        self,
+        client: SatXMPPEntity,
+        service: Optional[jid.JID],
+        node: str,
+        max_items: Optional[int],
+        item_ids: Optional[List[str]],
+        sub_id: Optional[str],
+        rsm_request: Optional[rsm.RSMRequest],
+        extra: dict
+    ) -> Tuple[bool, Optional[Tuple[List[dict], dict]]]:
+        if not self.use_cache:
+            log.debug("cache disabled in settings")
+            return True, None
+        if extra.get(C.KEY_USE_CACHE) == False:
+            log.debug("skipping pubsub cache as requested")
+            return True, None
+        if service is None:
+            service = client.jid.userhostJID()
+        for __ in range(5):
+            pubsub_node = await self.host.memory.storage.get_pubsub_node(
+                client, service, node
+            )
+            if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED:
+                analyse = {"to_sync": True}
+            else:
+                analyse = await self.analyse_node(client, service, node)
+
+            if pubsub_node is None:
+                try:
+                    pubsub_node = await self.host.memory.storage.set_pubsub_node(
+                        client,
+                        service,
+                        node,
+                        analyser=analyse.get("name"),
+                        type_=analyse.get("type"),
+                        subtype=analyse.get("subtype"),
+                    )
+                except IntegrityError as e:
+                    if "unique" in str(e.orig).lower():
+                        log.debug(
+                            "race condition on pubsub node creation in cache, trying "
+                            "again"
+                        )
+                    else:
+                        raise e
+            break
+        else:
+            raise exceptions.InternalError(
+                "Too many IntegrityError with UNIQUE constraint, something is going wrong"
+            )
+
+        if analyse.get("to_sync"):
+            if pubsub_node.sync_state == SyncState.COMPLETED:
+                if "mam" in extra:
+                    log.debug("MAM caching is not supported yet, skipping cache")
+                    return True, None
+                pubsub_items, metadata = await self.get_items_from_cache(
+                    client, pubsub_node, max_items, item_ids, sub_id, rsm_request, extra
+                )
+                return False, ([i.data for i in pubsub_items], metadata)
+
+            if pubsub_node.sync_state == SyncState.IN_PROGRESS:
+                if (service, node) not in self.in_progress:
+                    log.warning(
+                        f"{pubsub_node} is reported as being cached, but not caching is "
+                        "in progress, this is most probably due to the backend being "
+                        "restarted. Resetting the status, caching will be done again."
+                    )
+                    pubsub_node.sync_state = None
+                    await self.host.memory.storage.delete_pubsub_items(pubsub_node)
+                elif time.time() - pubsub_node.sync_state_updated > PROGRESS_DEADLINE:
+                    log.warning(
+                        f"{pubsub_node} is in progress for too long "
+                        f"({pubsub_node.sync_state_updated//60} minutes), "
+                        "cancelling it and retrying."
+                    )
+                    self.in_progress.pop[(service, node)].cancel()
+                    pubsub_node.sync_state = None
+                    await self.host.memory.storage.delete_pubsub_items(pubsub_node)
+                else:
+                    log.debug(
+                        f"{pubsub_node} synchronisation is already in progress, skipping"
+                    )
+            if pubsub_node.sync_state is None:
+                key = (service, node)
+                if key in self.in_progress:
+                    raise exceptions.InternalError(
+                        f"There is already a caching in progress for {pubsub_node}, this "
+                        "should not happen"
+                    )
+                self.cache_node(client, pubsub_node)
+            elif pubsub_node.sync_state == SyncState.ERROR:
+                log.debug(
+                    f"{pubsub_node} synchronisation has previously failed, skipping"
+                )
+
+        return True, None
+
+    async def _subscribe_trigger(
+        self,
+        client: SatXMPPEntity,
+        service: jid.JID,
+        nodeIdentifier: str,
+        sub_jid: Optional[jid.JID],
+        options: Optional[dict],
+        subscription: pubsub.Subscription
+    ) -> None:
+        pass
+
+    async def _unsubscribe_trigger(
+        self,
+        client: SatXMPPEntity,
+        service: jid.JID,
+        nodeIdentifier: str,
+        sub_jid,
+        subscriptionIdentifier,
+        sender,
+    ) -> None:
+        pass
+
+    def _synchronise(self, service, node, profile_key):
+        client = self.host.get_client(profile_key)
+        service = client.jid.userhostJID() if not service else jid.JID(service)
+        return defer.ensureDeferred(self.synchronise(client, service, node))
+
+    async def synchronise(
+        self,
+        client: SatXMPPEntity,
+        service: jid.JID,
+        node: str,
+        resync: bool = True
+    ) -> None:
+        """Synchronise a node with a pubsub service
+
+        The node will be synchronised even if there is no matching analyser.
+
+        Note that when a node is synchronised, it is automatically subscribed.
+        @param resync: if True and the node is already synchronised, it will be
+            resynchronised (all items will be deleted and re-downloaded).
+
+        """
+        pubsub_node = await self.host.memory.storage.get_pubsub_node(
+            client, service, node
+        )
+        if pubsub_node is None:
+            log.info(
+                _(
+                    "Synchronising the new node {node} at {service}"
+                ).format(node=node, service=service.full)
+            )
+            analyse = await self.analyse_node(client, service, node)
+            pubsub_node = await self.host.memory.storage.set_pubsub_node(
+                client,
+                service,
+                node,
+                analyser=analyse.get("name"),
+                type_=analyse.get("type"),
+            )
+        elif not resync and pubsub_node.sync_state is not None:
+                # the node exists, nothing to do
+                return
+
+        if ((pubsub_node.sync_state == SyncState.IN_PROGRESS
+             or (service, node) in self.in_progress)):
+            log.warning(
+                _(
+                    "{node} at {service} is already being synchronised, can't do a new "
+                    "synchronisation."
+                ).format(node=node, service=service)
+            )
+        else:
+            log.info(
+                _(
+                    "(Re)Synchronising the node {node} at {service} on user request"
+                ).format(node=node, service=service.full())
+            )
+            # we first delete and recreate the node (will also delete its items)
+            await self.host.memory.storage.delete(pubsub_node)
+            analyse = await self.analyse_node(client, service, node)
+            pubsub_node = await self.host.memory.storage.set_pubsub_node(
+                client,
+                service,
+                node,
+                analyser=analyse.get("name"),
+                type_=analyse.get("type"),
+            )
+            # then we can put node in cache
+            await self.cache_node(client, pubsub_node)
+
+    async def purge(self, purge_filters: dict) -> None:
+        """Remove items according to filters
+
+        filters can have on of the following keys, all are optional:
+
+            :services:
+                list of JIDs of services from which items must be deleted
+            :nodes:
+                list of node names to delete
+            :types:
+                list of node types to delete
+            :subtypes:
+                list of node subtypes to delete
+            :profiles:
+                list of profiles from which items must be deleted
+            :created_before:
+                datetime before which items must have been created to be deleted
+            :created_update:
+                datetime before which items must have been updated last to be deleted
+        """
+        purge_filters["names"] = purge_filters.pop("nodes", None)
+        await self.host.memory.storage.purge_pubsub_items(**purge_filters)
+
+    def _purge(self, purge_filters: str) -> None:
+        purge_filters = data_format.deserialise(purge_filters)
+        for key in "created_before", "updated_before":
+            try:
+                purge_filters[key] = datetime.fromtimestamp(purge_filters[key])
+            except (KeyError, TypeError):
+                pass
+        return defer.ensureDeferred(self.purge(purge_filters))
+
+    async def reset(self) -> None:
+        """Remove ALL nodes and items from cache
+
+        After calling this method, cache will be refilled progressively as if it where new
+        """
+        await self.host.memory.storage.delete_pubsub_node(None, None, None)
+
+    def _reset(self) -> defer.Deferred:
+        return defer.ensureDeferred(self.reset())
+
+    async def search(self, query: dict) -> List[PubsubItem]:
+        """Search pubsub items in cache"""
+        return await self.host.memory.storage.search_pubsub_items(query)
+
+    async def serialisable_search(self, query: dict) -> List[dict]:
+        """Search pubsub items in cache and returns parsed data
+
+        The returned data can be serialised.
+
+        "pubsub_service" and "pubsub_name" will be added to each data (both as strings)
+        """
+        items = await self.search(query)
+        ret = []
+        for item in items:
+            parsed = item.parsed
+            parsed["pubsub_service"] = item.node.service.full()
+            parsed["pubsub_node"] = item.node.name
+            if query.get("with_payload"):
+                parsed["item_payload"] = item.data.toXml()
+            parsed["node_profile"] = self.host.memory.storage.get_profile_by_id(
+                item.node.profile_id
+            )
+
+            ret.append(parsed)
+        return ret
+
+    def _search(self, query: str) -> defer.Deferred:
+        query = data_format.deserialise(query)
+        services = query.get("services")
+        if services:
+            query["services"] = [jid.JID(s) for s in services]
+        d = defer.ensureDeferred(self.serialisable_search(query))
+        d.addCallback(data_format.serialise)
+        return d