view sat/plugins/plugin_pubsub_cache.py @ 3673:bd13391ee29e

core (memory/sqla): fix `fileUpdate`
author Goffi <goffi@goffi.org>
date Wed, 08 Sep 2021 17:58:48 +0200
parents 342e3ddefd23
children ffa8c8c78115
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin for PubSub Caching
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import time
from datetime import datetime
from typing import Optional, List, Tuple
from twisted.words.protocols.jabber import jid, error
from twisted.words.xish import domish
from twisted.internet import defer
from wokkel import pubsub, rsm
from sat.core.i18n import _
from sat.core.constants import Const as C
from sat.core import exceptions
from sat.core.log import getLogger
from sat.core.core_types import SatXMPPEntity
from sat.tools import xml_tools, utils
from sat.tools.common import data_format
from sat.memory.sqla import PubsubNode, PubsubItem, SyncState


log = getLogger(__name__)

PLUGIN_INFO = {
    C.PI_NAME: "PubSub Cache",
    C.PI_IMPORT_NAME: "PUBSUB_CACHE",
    C.PI_TYPE: C.PLUG_TYPE_PUBSUB,
    C.PI_PROTOCOLS: [],
    C.PI_DEPENDENCIES: ["XEP-0059", "XEP-0060"],
    C.PI_RECOMMENDATIONS: [],
    C.PI_MAIN: "PubsubCache",
    C.PI_HANDLER: "no",
    C.PI_DESCRIPTION: _("""Local Cache for PubSub"""),
}

ANALYSER_KEYS_TO_COPY = ("name", "type", "to_sync", "parser")
# maximum of items to cache
CACHE_LIMIT = 5000
# number of second before a progress caching is considered failed and tried again
PROGRESS_DEADLINE = 60 * 60 * 6



class PubsubCache:
    # TODO: there is currently no notification for (un)subscribe events with XEP-0060,
    #   but it would be necessary to have this data if some devices unsubscribe a cached
    #   node, as we can then get out of sync. A protoXEP could be proposed to fix this
    #   situation.
    # TODO: handle configuration events

    def __init__(self, host):
        log.info(_("PubSub Cache initialization"))
        strategy = host.memory.getConfig(None, "pubsub_cache_strategy")
        if strategy == "no_cache":
            log.info(
                _(
                    "Pubsub cache won't be used due to pubsub_cache_strategy={value} "
                    "setting."
                ).format(value=repr(strategy))
            )
            self.use_cache = False
        else:
            self.use_cache = True
        self.host = host
        self._p = host.plugins["XEP-0060"]
        self.analysers = {}
        # map for caching in progress (node, service) => Deferred
        self.in_progress = {}
        self.host.trigger.add("XEP-0060_getItems", self._getItemsTrigger)
        self._p.addManagedNode(
            "",
            items_cb=self.onItemsEvent,
            delete_cb=self.onDeleteEvent,
            purge_db=self.onPurgeEvent,
        )
        host.bridge.addMethod(
            "psCacheGet",
            ".plugin",
            in_sign="ssiassss",
            out_sign="s",
            method=self._getItemsFromCache,
            async_=True,
        )
        host.bridge.addMethod(
            "psCacheSync",
            ".plugin",
            "sss",
            out_sign="",
            method=self._synchronise,
            async_=True,
        )
        host.bridge.addMethod(
            "psCachePurge",
            ".plugin",
            "s",
            out_sign="",
            method=self._purge,
            async_=True,
        )
        host.bridge.addMethod(
            "psCacheReset",
            ".plugin",
            "",
            out_sign="",
            method=self._reset,
            async_=True,
        )
        host.bridge.addMethod(
            "psCacheSearch",
            ".plugin",
            "s",
            out_sign="s",
            method=self._search,
            async_=True,
        )

    def registerAnalyser(self, analyser: dict) -> None:
        """Register a new pubsub node analyser

        @param analyser: An analyser is a dictionary which may have the following keys
        (keys with a ``*`` are mandatory, at least one of ``node`` or ``namespace`` keys
        must be used):

            :name (str)*:
              a unique name for this analyser. This name will be stored in database
              to retrieve the analyser when necessary (notably to get the parsing method),
              thus it is recommended to use a stable name such as the source plugin name
              instead of a name which may change with standard evolution, such as the
              feature namespace.

            :type (str)*:
              indicates what kind of items we are dealing with. Type must be a human
              readable word, as it may be used in searches. Good types examples are
              **blog** or **event**.

            :node (str):
              prefix of a node name which may be used to identify its type. Example:
              *urn:xmpp:microblog:0* (a node starting with this name will be identified as
              *blog* node).

            :namespace (str):
              root namespace of items. When analysing a node, the first item will be
              retrieved. The analyser will be chosen its given namespace match the
              namespace of the first child element of ``<item>`` element.

            :to_sync (bool):
              if True, the node must be synchronised in cache. The default False value
              means that the pubsub service will always be requested.

            :parser (callable):
              method (which may be sync, a coroutine or a method returning a "Deferred")
              to call to parse the ``domish.Element`` of the item. The result must be
              dictionary which can be serialised to JSON.

              The method must have the following signature:

              .. function:: parser(client: SatXMPPEntity, item_elt: domish.Element, \
                                   service: Optional[jid.JID], node: Optional[str]) \
                                   -> dict
                :noindex:

            :match_cb (callable):
              method (which may be sync, a coroutine or a method returning a "Deferred")
              called when the analyser matches. The method is called with the curreny
              analyse which is can modify **in-place**.

              The method must have the following signature:

              .. function:: match_cb(client: SatXMPPEntity, analyse: dict) -> None
                :noindex:

        @raise exceptions.Conflict: a analyser with this name already exists
        """

        name = analyser.get("name", "").strip().lower()
        # we want the normalised name
        analyser["name"] = name
        if not name:
            raise ValueError('"name" is mandatory in analyser')
        if "type" not in analyser:
            raise ValueError('"type" is mandatory in analyser')
        type_test_keys = {"node", "namespace"}
        if not type_test_keys.intersection(analyser):
            raise ValueError(f'at least one of {type_test_keys} must be used')
        if name in self.analysers:
            raise exceptions.Conflict(
                f"An analyser with the name {name!r} is already registered"
            )
        self.analysers[name] = analyser

    async def cacheItems(
        self,
        client: SatXMPPEntity,
        pubsub_node: PubsubNode,
        items: List[domish.Element]
    ) -> None:
        try:
            parser = self.analysers[pubsub_node.analyser].get("parser")
        except KeyError:
            parser = None

        if parser is not None:
            parsed_items = [
                await utils.asDeferred(
                    parser,
                    client,
                    item,
                    pubsub_node.service,
                    pubsub_node.name
                )
                for item in items
            ]
        else:
            parsed_items = None

        await self.host.memory.storage.cachePubsubItems(
            client, pubsub_node, items, parsed_items
        )

    async def _cacheNode(
        self,
        client: SatXMPPEntity,
        pubsub_node: PubsubNode
    ) -> None:
        await self.host.memory.storage.updatePubsubNodeSyncState(
            pubsub_node, SyncState.IN_PROGRESS
        )
        service, node = pubsub_node.service, pubsub_node.name
        try:
            log.debug(
                f"Caching node {node!r} at {service} for {client.profile}"
            )
            if not pubsub_node.subscribed:
                try:
                    sub = await self._p.subscribe(client, service, node)
                except Exception as e:
                    log.warning(
                        _(
                            "Can't subscribe node {pubsub_node}, that means that "
                            "synchronisation can't be maintained: {reason}"
                        ).format(pubsub_node=pubsub_node, reason=e)
                    )
                else:
                    if sub.state == "subscribed":
                        sub_id = sub.subscriptionIdentifier
                        log.debug(
                            f"{pubsub_node} subscribed (subscription id: {sub_id!r})"
                        )
                        pubsub_node.subscribed = True
                        await self.host.memory.storage.add(pubsub_node)
                    else:
                        log.warning(
                            _(
                                "{pubsub_node} is not subscribed, that means that "
                                "synchronisation can't be maintained, and you may have "
                                "to enforce subscription manually. Subscription state: "
                                "{state}"
                            ).format(pubsub_node=pubsub_node, state=sub.state)
                        )

            try:
                await self.host.checkFeatures(
                    client, [rsm.NS_RSM, self._p.DISCO_RSM], pubsub_node.service
                )
            except exceptions.FeatureNotFound:
                log.warning(
                    f"service {service} doesn't handle Result Set Management "
                    "(XEP-0059), we'll only cache latest 20 items"
                )
                items, __ = await client.pubsub_client.items(
                    pubsub_node.service, pubsub_node.name, maxItems=20
                )
                await self.cacheItems(
                    client, pubsub_node, items
                )
            else:
                rsm_p = self.host.plugins["XEP-0059"]
                rsm_request = rsm.RSMRequest()
                cached_ids = set()
                while True:
                    items, rsm_response = await client.pubsub_client.items(
                        service, node, rsm_request=rsm_request
                    )
                    await self.cacheItems(
                        client, pubsub_node, items
                    )
                    for item in items:
                        item_id = item["id"]
                        if item_id in cached_ids:
                            log.warning(
                                f"Pubsub node {node!r} at {service} is returning several "
                                f"times the same item ({item_id!r}). This is illegal "
                                "behaviour, and it means that Pubsub service "
                                f"{service} is buggy and can't be cached properly. "
                                f"Please report this to {service.host} administrators"
                            )
                            rsm_request = None
                            break
                        cached_ids.add(item["id"])
                        if len(cached_ids) >= CACHE_LIMIT:
                            log.warning(
                                f"Pubsub node {node!r} at {service} contains more items "
                                f"than the cache limit ({CACHE_LIMIT}). We stop "
                                "caching here, at item {item['id']!r}."
                            )
                            rsm_request = None
                            break
                    rsm_request = rsm_p.getNextRequest(rsm_request, rsm_response)
                    if rsm_request is None:
                        break

            await self.host.memory.storage.updatePubsubNodeSyncState(
                pubsub_node, SyncState.COMPLETED
            )
        except Exception as e:
            import traceback
            tb = traceback.format_tb(e.__traceback__)
            log.error(
                f"Can't cache node {node!r} at {service} for {client.profile}: {e}\n{tb}"
            )
            await self.host.memory.storage.updatePubsubNodeSyncState(
                pubsub_node, SyncState.ERROR
            )
            await self.host.memory.storage.deletePubsubItems(pubsub_node)
            raise e

    def _cacheNodeClean(self, __, pubsub_node):
        del self.in_progress[(pubsub_node.service, pubsub_node.name)]

    def cacheNode(
        self,
        client: SatXMPPEntity,
        pubsub_node: PubsubNode
    ) -> None:
        """Launch node caching as a background task"""
        d = defer.ensureDeferred(self._cacheNode(client, pubsub_node))
        d.addBoth(self._cacheNodeClean, pubsub_node=pubsub_node)
        self.in_progress[(pubsub_node.service, pubsub_node.name)] = d
        return d

    async def analyseNode(
        self,
        client: SatXMPPEntity,
        service: jid.JID,
        node: str,
        pubsub_node : PubsubNode = None,
    ) -> dict:
        """Use registered analysers on a node to determine what it is used for"""
        analyse = {"service": service, "node": node}
        if pubsub_node is None:
            try:
                first_item = (await client.pubsub_client.items(
                    service, node, 1
                ))[0][0]
            except IndexError:
                pass
            except error.StanzaError as e:
                if e.condition == "item-not-found":
                    pass
                else:
                    log.warning(
                        f"Can't retrieve last item on node {node!r} at service "
                        f"{service} for {client.profile}: {e}"
                    )
            else:
                try:
                    uri = first_item.firstChildElement().uri
                except Exception as e:
                    log.warning(
                        f"Can't retrieve item namespace on node {node!r} at service "
                        f"{service} for {client.profile}: {e}"
                    )
                else:
                    analyse["namespace"] = uri
            try:
                conf = await self._p.getConfiguration(client, service, node)
            except Exception as e:
                log.warning(
                    f"Can't retrieve configuration for node {node!r} at service {service} "
                    f"for {client.profile}: {e}"
                )
            else:
                analyse["conf"] = conf

        for analyser in self.analysers.values():
            try:
                an_node = analyser["node"]
            except KeyError:
                pass
            else:
                if node.startswith(an_node):
                    for key in ANALYSER_KEYS_TO_COPY:
                        try:
                            analyse[key] = analyser[key]
                        except KeyError:
                            pass
                    found = True
                    break
            try:
                namespace = analyse["namespace"]
                an_namespace = analyser["namespace"]
            except KeyError:
                pass
            else:
                if namespace == an_namespace:
                    for key in ANALYSER_KEYS_TO_COPY:
                        try:
                            analyse[key] = analyser[key]
                        except KeyError:
                            pass
                    found = True
                    break

        else:
            found = False
            log.debug(
                f"node {node!r} at service {service} doesn't match any known type"
            )
        if found:
            try:
                match_cb = analyser["match_cb"]
            except KeyError:
                pass
            else:
                await utils.asDeferred(match_cb, client, analyse)
        return analyse

    def _getItemsFromCache(
        self, service="", node="", max_items=10, item_ids=None, sub_id=None,
        extra="", profile_key=C.PROF_KEY_NONE
    ):
        d = defer.ensureDeferred(self._aGetItemsFromCache(
            service, node, max_items, item_ids, sub_id, extra, profile_key
        ))
        d.addCallback(self._p.transItemsData)
        d.addCallback(self._p.serialiseItems)
        return d

    async def _aGetItemsFromCache(
        self, service, node, max_items, item_ids, sub_id, extra, profile_key
    ):
        client = self.host.getClient(profile_key)
        service = jid.JID(service) if service else client.jid.userhostJID()
        pubsub_node = await self.host.memory.storage.getPubsubNode(
            client, service, node
        )
        if pubsub_node is None:
            raise exceptions.NotFound(
                f"{node!r} at {service} doesn't exist in cache for {client.profile!r}"
            )
        max_items = None if max_items == C.NO_LIMIT else max_items
        extra = self._p.parseExtra(data_format.deserialise(extra))
        items, metadata = await self.getItemsFromCache(
            client,
            pubsub_node,
            max_items,
            item_ids,
            sub_id or None,
            extra.rsm_request,
            extra.extra,
        )
        return [i.data for i in items], metadata

    async def getItemsFromCache(
        self,
        client: SatXMPPEntity,
        node: PubsubNode,
        max_items: Optional[int] = None,
        item_ids: Optional[List[str]] = None,
        sub_id: Optional[str] = None,
        rsm_request: Optional[rsm.RSMRequest] = None,
        extra: Optional[dict] = None
    ) -> Tuple[List[PubsubItem], dict]:
        """Get items from cache, using same arguments as for external Pubsub request"""
        if "mam" in extra:
            raise NotImplementedError("MAM queries are not supported yet")
        if max_items is None and rsm_request is None:
            max_items = 20
        if max_items is not None:
            if rsm_request is not None:
                raise exceptions.InternalError(
                    "Pubsub max items and RSM must not be used at the same time"
                )
            elif item_ids is None:
                raise exceptions.InternalError(
                    "Pubsub max items and item IDs must not be used at the same time"
                )
            pubsub_items, metadata = await self.host.memory.storage.getItems(
                node, max_items=max_items, order_by=extra.get(C.KEY_ORDER_BY)
            )
        else:
            desc = False
            if rsm_request.before == "":
                before = None
                desc = True
            else:
                before = rsm_request.before
            pubsub_items, metadata = await self.host.memory.storage.getItems(
                node, max_items=rsm_request.max, before=before, after=rsm_request.after,
                from_index=rsm_request.index, order_by=extra.get(C.KEY_ORDER_BY),
                desc=desc, force_rsm=True,
            )

        return pubsub_items, metadata

    async def onItemsEvent(self, client, event):
        node = await self.host.memory.storage.getPubsubNode(
            client, event.sender, event.nodeIdentifier
        )
        if node is None:
            return
        if node.sync_state in (SyncState.COMPLETED, SyncState.IN_PROGRESS):
            items = []
            retract_ids = []
            for elt in event.items:
                if elt.name == "item":
                    items.append(elt)
                elif elt.name == "retract":
                    item_id = elt.getAttribute("id")
                    if not item_id:
                        log.warning(
                            "Ignoring invalid retract item element: "
                            f"{xml_tools.pFmtElt(elt)}"
                        )
                        continue

                    retract_ids.append(elt["id"])
                else:
                    log.warning(
                        f"Unexpected Pubsub event element: {xml_tools.pFmtElt(elt)}"
                    )
            if items:
                log.debug("caching new items received from {node}")
                await self.cacheItems(
                    client, node, items
                )
            if retract_ids:
                log.debug(f"deleting retracted items from {node}")
                await self.host.memory.storage.deletePubsubItems(
                    node, items_names=retract_ids
                )

    async def onDeleteEvent(self, client, event):
        log.debug(
            f"deleting node {event.nodeIdentifier} from {event.sender} for "
            f"{client.profile}"
        )
        await self.host.memory.storage.deletePubsubNode(
            [client.profile], [event.sender], [event.nodeIdentifier]
        )

    async def onPurgeEvent(self, client, event):
        node = await self.host.memory.storage.getPubsubNode(
            client, event.sender, event.nodeIdentifier
        )
        if node is None:
            return
        log.debug(f"purging node {node} for {client.profile}")
        await self.host.memory.storage.deletePubsubItems(node)

    async def _getItemsTrigger(
        self,
        client: SatXMPPEntity,
        service: Optional[jid.JID],
        node: str,
        max_items: Optional[int],
        item_ids: Optional[List[str]],
        sub_id: Optional[str],
        rsm_request: Optional[rsm.RSMRequest],
        extra: dict
    ) -> Tuple[bool, Optional[Tuple[List[dict], dict]]]:
        if not self.use_cache:
            log.debug("cache disabled in settings")
            return True, None
        if extra.get(C.KEY_USE_CACHE) == False:
            log.debug("skipping pubsub cache as requested")
            return True, None
        if service is None:
            service = client.jid.userhostJID()
        pubsub_node = await self.host.memory.storage.getPubsubNode(
            client, service, node
        )
        if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED:
            analyse = {"to_sync": True}
        else:
            analyse = await self.analyseNode(client, service, node)

        if pubsub_node is None:
            pubsub_node = await self.host.memory.storage.setPubsubNode(
                client,
                service,
                node,
                analyser=analyse.get("name"),
                type_=analyse.get("type"),
                subtype=analyse.get("subtype"),
            )

        if analyse.get("to_sync"):
            if pubsub_node.sync_state == SyncState.COMPLETED:
                if "mam" in extra:
                    log.debug("MAM caching is not supported yet, skipping cache")
                    return True, None
                pubsub_items, metadata = await self.getItemsFromCache(
                    client, pubsub_node, max_items, item_ids, sub_id, rsm_request, extra
                )
                return False, ([i.data for i in pubsub_items], metadata)

            if pubsub_node.sync_state == SyncState.IN_PROGRESS:
                if (service, node) not in self.in_progress:
                    log.warning(
                        f"{pubsub_node} is reported as being cached, but not caching is "
                        "in progress, this is most probably due to the backend being "
                        "restarted. Resetting the status, caching will be done again."
                    )
                    pubsub_node.sync_state = None
                    await self.host.memory.storage.deletePubsubItems(pubsub_node)
                elif time.time() - pubsub_node.sync_state_updated > PROGRESS_DEADLINE:
                    log.warning(
                        f"{pubsub_node} is in progress for too long "
                        f"({pubsub_node.sync_state_updated//60} minutes), "
                        "cancelling it and retrying."
                    )
                    self.in_progress.pop[(service, node)].cancel()
                    pubsub_node.sync_state = None
                    await self.host.memory.storage.deletePubsubItems(pubsub_node)
                else:
                    log.debug(
                        f"{pubsub_node} synchronisation is already in progress, skipping"
                    )
            if pubsub_node.sync_state is None:
                key = (service, node)
                if key in self.in_progress:
                    raise exceptions.InternalError(
                        f"There is already a caching in progress for {pubsub_node}, this "
                        "should not happen"
                    )
                self.cacheNode(client, pubsub_node)
            elif pubsub_node.sync_state == SyncState.ERROR:
                log.debug(
                    f"{pubsub_node} synchronisation has previously failed, skipping"
                )

        return True, None

    async def _subscribeTrigger(
        self,
        client: SatXMPPEntity,
        service: jid.JID,
        nodeIdentifier: str,
        sub_jid: Optional[jid.JID],
        options: Optional[dict],
        subscription: pubsub.Subscription
    ) -> None:
        pass

    async def _unsubscribeTrigger(
        self,
        client: SatXMPPEntity,
        service: jid.JID,
        nodeIdentifier: str,
        sub_jid,
        subscriptionIdentifier,
        sender,
    ) -> None:
        pass

    def _synchronise(self, service, node, profile_key):
        client = self.host.getClient(profile_key)
        service = client.jid.userhostJID() if not service else jid.JID(service)
        return defer.ensureDeferred(self.synchronise(client, service, node))

    async def synchronise(
        self,
        client: SatXMPPEntity,
        service: jid.JID,
        node: str
    ) -> None:
        """Synchronise a node with a pubsub service

        If the node is already synchronised, it will be resynchronised (all items will be
        deleted and re-downloaded).

        The node will be synchronised even if there is no matching analyser.

        Note that when a node is synchronised, it is automatically subscribed.
        """
        pubsub_node = await self.host.memory.storage.getPubsubNode(
            client, service, node
        )
        if pubsub_node is None:
            log.info(
                _(
                    "Synchronising the new node {node} at {service}"
                ).format(node=node, service=service.full)
            )
            analyse = await self.analyseNode(client, service, node)
            pubsub_node = await self.host.memory.storage.setPubsubNode(
                client,
                service,
                node,
                analyser=analyse.get("name"),
                type_=analyse.get("type"),
            )

        if ((pubsub_node.sync_state == SyncState.IN_PROGRESS
             or (service, node) in self.in_progress)):
            log.warning(
                _(
                    "{node} at {service} is already being synchronised, can't do a new "
                    "synchronisation."
                ).format(node=node, service=service)
            )
        else:
            log.info(
                _(
                    "(Re)Synchronising the node {node} at {service} on user request"
                ).format(node=node, service=service.full)
            )
            # we first delete and recreate the node (will also delete its items)
            await self.host.memory.storage.delete(pubsub_node)
            analyse = await self.analyseNode(client, service, node)
            pubsub_node = await self.host.memory.storage.setPubsubNode(
                client,
                service,
                node,
                analyser=analyse.get("name"),
                type_=analyse.get("type"),
            )
            # then we can put node in cache
            await self.cacheNode(client, pubsub_node)

    async def purge(self, purge_filters: dict) -> None:
        """Remove items according to filters

        filters can have on of the following keys, all are optional:

            :services:
                list of JIDs of services from which items must be deleted
            :nodes:
                list of node names to delete
            :types:
                list of node types to delete
            :subtypes:
                list of node subtypes to delete
            :profiles:
                list of profiles from which items must be deleted
            :created_before:
                datetime before which items must have been created to be deleted
            :created_update:
                datetime before which items must have been updated last to be deleted
        """
        purge_filters["names"] = purge_filters.pop("nodes", None)
        await self.host.memory.storage.purgePubsubItems(**purge_filters)

    def _purge(self, purge_filters: str) -> None:
        purge_filters = data_format.deserialise(purge_filters)
        for key in "created_before", "updated_before":
            try:
                purge_filters[key] = datetime.fromtimestamp(purge_filters[key])
            except (KeyError, TypeError):
                pass
        return defer.ensureDeferred(self.purge(purge_filters))

    async def reset(self) -> None:
        """Remove ALL nodes and items from cache

        After calling this method, cache will be refilled progressively as if it where new
        """
        await self.host.memory.storage.deletePubsubNode(None, None, None)

    def _reset(self) -> defer.Deferred:
        return defer.ensureDeferred(self.reset())

    async def search(self, query: dict) -> List[PubsubItem]:
        """Search pubsub items in cache"""
        return await self.host.memory.storage.searchPubsubItems(query)

    async def serialisableSearch(self, query: dict) -> List[dict]:
        """Search pubsub items in cache and returns parsed data

        The returned data can be serialised.

        "pubsub_service" and "pubsub_name" will be added to each data (both as strings)
        """
        items = await self.search(query)
        ret = []
        for item in items:
            parsed = item.parsed
            parsed["pubsub_service"] = item.node.service.full()
            parsed["pubsub_node"] = item.node.name
            if query.get("with_payload"):
                parsed["item_payload"] = item.data.toXml()
            parsed["node_profile"] = self.host.memory.storage.getProfileById(
                item.node.profile_id
            )

            ret.append(parsed)
        return ret

    def _search(self, query: str) -> defer.Deferred:
        query = data_format.deserialise(query)
        services = query.get("services")
        if services:
            query["services"] = [jid.JID(s) for s in services]
        d = defer.ensureDeferred(self.serialisableSearch(query))
        d.addCallback(data_format.serialise)
        return d