Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_pubsub_cache.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_pubsub_cache.py@524856bd7b19 |
children | 2b000790b197 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_pubsub_cache.py Fri Jun 02 11:49:51 2023 +0200 @@ -0,0 +1,861 @@ +#!/usr/bin/env python3 + +# Libervia plugin for PubSub Caching +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import time +from datetime import datetime +from typing import Optional, List, Tuple, Dict, Any +from twisted.words.protocols.jabber import jid, error +from twisted.words.xish import domish +from twisted.internet import defer +from wokkel import pubsub, rsm +from libervia.backend.core.i18n import _ +from libervia.backend.core.constants import Const as C +from libervia.backend.core import exceptions +from libervia.backend.core.log import getLogger +from libervia.backend.core.core_types import SatXMPPEntity +from libervia.backend.tools import xml_tools, utils +from libervia.backend.tools.common import data_format +from libervia.backend.memory.sqla import PubsubNode, PubsubItem, SyncState, IntegrityError + + +log = getLogger(__name__) + +PLUGIN_INFO = { + C.PI_NAME: "PubSub Cache", + C.PI_IMPORT_NAME: "PUBSUB_CACHE", + C.PI_TYPE: C.PLUG_TYPE_PUBSUB, + C.PI_MODES: C.PLUG_MODE_BOTH, + C.PI_PROTOCOLS: [], + C.PI_DEPENDENCIES: ["XEP-0059", "XEP-0060"], + C.PI_RECOMMENDATIONS: [], + C.PI_MAIN: "PubsubCache", + C.PI_HANDLER: "no", + C.PI_DESCRIPTION: _("""Local Cache for PubSub"""), +} + +ANALYSER_KEYS_TO_COPY = ("name", "type", "to_sync", "parser") +# maximum of items to cache +CACHE_LIMIT = 5000 +# number of second before a progress caching is considered failed and tried again +PROGRESS_DEADLINE = 60 * 60 * 6 + + + +class PubsubCache: + # TODO: there is currently no notification for (un)subscribe events with XEP-0060, + # but it would be necessary to have this data if some devices unsubscribe a cached + # node, as we can then get out of sync. A protoXEP could be proposed to fix this + # situation. + # TODO: handle configuration events + + def __init__(self, host): + log.info(_("PubSub Cache initialization")) + strategy = host.memory.config_get(None, "pubsub_cache_strategy") + if strategy == "no_cache": + log.info( + _( + "Pubsub cache won't be used due to pubsub_cache_strategy={value} " + "setting." + ).format(value=repr(strategy)) + ) + self.use_cache = False + else: + self.use_cache = True + self.host = host + self._p = host.plugins["XEP-0060"] + self.analysers = {} + # map for caching in progress (node, service) => Deferred + self.in_progress = {} + self.host.trigger.add("XEP-0060_getItems", self._get_items_trigger) + self._p.add_managed_node( + "", + items_cb=self.on_items_event, + delete_cb=self.on_delete_event, + purge_db=self.on_purge_event, + ) + host.bridge.add_method( + "ps_cache_get", + ".plugin", + in_sign="ssiassss", + out_sign="s", + method=self._get_items_from_cache, + async_=True, + ) + host.bridge.add_method( + "ps_cache_sync", + ".plugin", + "sss", + out_sign="", + method=self._synchronise, + async_=True, + ) + host.bridge.add_method( + "ps_cache_purge", + ".plugin", + "s", + out_sign="", + method=self._purge, + async_=True, + ) + host.bridge.add_method( + "ps_cache_reset", + ".plugin", + "", + out_sign="", + method=self._reset, + async_=True, + ) + host.bridge.add_method( + "ps_cache_search", + ".plugin", + "s", + out_sign="s", + method=self._search, + async_=True, + ) + + def register_analyser(self, analyser: dict) -> None: + """Register a new pubsub node analyser + + @param analyser: An analyser is a dictionary which may have the following keys + (keys with a ``*`` are mandatory, at least one of ``node`` or ``namespace`` keys + must be used): + + :name (str)*: + a unique name for this analyser. This name will be stored in database + to retrieve the analyser when necessary (notably to get the parsing method), + thus it is recommended to use a stable name such as the source plugin name + instead of a name which may change with standard evolution, such as the + feature namespace. + + :type (str)*: + indicates what kind of items we are dealing with. Type must be a human + readable word, as it may be used in searches. Good types examples are + **blog** or **event**. + + :node (str): + prefix of a node name which may be used to identify its type. Example: + *urn:xmpp:microblog:0* (a node starting with this name will be identified as + *blog* node). + + :namespace (str): + root namespace of items. When analysing a node, the first item will be + retrieved. The analyser will be chosen its given namespace match the + namespace of the first child element of ``<item>`` element. + + :to_sync (bool): + if True, the node must be synchronised in cache. The default False value + means that the pubsub service will always be requested. + + :parser (callable): + method (which may be sync, a coroutine or a method returning a "Deferred") + to call to parse the ``domish.Element`` of the item. The result must be + dictionary which can be serialised to JSON. + + The method must have the following signature: + + .. function:: parser(client: SatXMPPEntity, item_elt: domish.Element, \ + service: Optional[jid.JID], node: Optional[str]) \ + -> dict + :noindex: + + :match_cb (callable): + method (which may be sync, a coroutine or a method returning a "Deferred") + called when the analyser matches. The method is called with the curreny + analyse which is can modify **in-place**. + + The method must have the following signature: + + .. function:: match_cb(client: SatXMPPEntity, analyse: dict) -> None + :noindex: + + @raise exceptions.Conflict: a analyser with this name already exists + """ + + name = analyser.get("name", "").strip().lower() + # we want the normalised name + analyser["name"] = name + if not name: + raise ValueError('"name" is mandatory in analyser') + if "type" not in analyser: + raise ValueError('"type" is mandatory in analyser') + type_test_keys = {"node", "namespace"} + if not type_test_keys.intersection(analyser): + raise ValueError(f'at least one of {type_test_keys} must be used') + if name in self.analysers: + raise exceptions.Conflict( + f"An analyser with the name {name!r} is already registered" + ) + self.analysers[name] = analyser + + async def cache_items( + self, + client: SatXMPPEntity, + pubsub_node: PubsubNode, + items: List[domish.Element] + ) -> None: + try: + parser = self.analysers[pubsub_node.analyser].get("parser") + except KeyError: + parser = None + + if parser is not None: + parsed_items = [ + await utils.as_deferred( + parser, + client, + item, + pubsub_node.service, + pubsub_node.name + ) + for item in items + ] + else: + parsed_items = None + + await self.host.memory.storage.cache_pubsub_items( + client, pubsub_node, items, parsed_items + ) + + async def _cache_node( + self, + client: SatXMPPEntity, + pubsub_node: PubsubNode + ) -> None: + await self.host.memory.storage.update_pubsub_node_sync_state( + pubsub_node, SyncState.IN_PROGRESS + ) + service, node = pubsub_node.service, pubsub_node.name + try: + log.debug( + f"Caching node {node!r} at {service} for {client.profile}" + ) + if not pubsub_node.subscribed: + try: + sub = await self._p.subscribe(client, service, node) + except Exception as e: + log.warning( + _( + "Can't subscribe node {pubsub_node}, that means that " + "synchronisation can't be maintained: {reason}" + ).format(pubsub_node=pubsub_node, reason=e) + ) + else: + if sub.state == "subscribed": + sub_id = sub.subscriptionIdentifier + log.debug( + f"{pubsub_node} subscribed (subscription id: {sub_id!r})" + ) + pubsub_node.subscribed = True + await self.host.memory.storage.add(pubsub_node) + else: + log.warning( + _( + "{pubsub_node} is not subscribed, that means that " + "synchronisation can't be maintained, and you may have " + "to enforce subscription manually. Subscription state: " + "{state}" + ).format(pubsub_node=pubsub_node, state=sub.state) + ) + + try: + await self.host.check_features( + client, [rsm.NS_RSM, self._p.DISCO_RSM], pubsub_node.service + ) + except error.StanzaError as e: + if e.condition == "service-unavailable": + log.warning( + "service {service} is hidding disco infos, we'll only cache " + "latest 20 items" + ) + items, __ = await client.pubsub_client.items( + pubsub_node.service, pubsub_node.name, maxItems=20 + ) + await self.cache_items( + client, pubsub_node, items + ) + else: + raise e + except exceptions.FeatureNotFound: + log.warning( + f"service {service} doesn't handle Result Set Management " + "(XEP-0059), we'll only cache latest 20 items" + ) + items, __ = await client.pubsub_client.items( + pubsub_node.service, pubsub_node.name, maxItems=20 + ) + await self.cache_items( + client, pubsub_node, items + ) + else: + rsm_p = self.host.plugins["XEP-0059"] + rsm_request = rsm.RSMRequest() + cached_ids = set() + while True: + items, rsm_response = await client.pubsub_client.items( + service, node, rsm_request=rsm_request + ) + await self.cache_items( + client, pubsub_node, items + ) + for item in items: + item_id = item["id"] + if item_id in cached_ids: + log.warning( + f"Pubsub node {node!r} at {service} is returning several " + f"times the same item ({item_id!r}). This is illegal " + "behaviour, and it means that Pubsub service " + f"{service} is buggy and can't be cached properly. " + f"Please report this to {service.host} administrators" + ) + rsm_request = None + break + cached_ids.add(item["id"]) + if len(cached_ids) >= CACHE_LIMIT: + log.warning( + f"Pubsub node {node!r} at {service} contains more items " + f"than the cache limit ({CACHE_LIMIT}). We stop " + "caching here, at item {item['id']!r}." + ) + rsm_request = None + break + rsm_request = rsm_p.get_next_request(rsm_request, rsm_response) + if rsm_request is None: + break + + await self.host.memory.storage.update_pubsub_node_sync_state( + pubsub_node, SyncState.COMPLETED + ) + except Exception as e: + import traceback + tb = traceback.format_tb(e.__traceback__) + log.error( + f"Can't cache node {node!r} at {service} for {client.profile}: {e}\n{tb}" + ) + await self.host.memory.storage.update_pubsub_node_sync_state( + pubsub_node, SyncState.ERROR + ) + await self.host.memory.storage.delete_pubsub_items(pubsub_node) + raise e + + def _cache_node_clean(self, __, pubsub_node): + del self.in_progress[(pubsub_node.service, pubsub_node.name)] + + def cache_node( + self, + client: SatXMPPEntity, + pubsub_node: PubsubNode + ) -> None: + """Launch node caching as a background task""" + d = defer.ensureDeferred(self._cache_node(client, pubsub_node)) + d.addBoth(self._cache_node_clean, pubsub_node=pubsub_node) + self.in_progress[(pubsub_node.service, pubsub_node.name)] = d + return d + + async def analyse_node( + self, + client: SatXMPPEntity, + service: jid.JID, + node: str, + pubsub_node : PubsubNode = None, + ) -> dict: + """Use registered analysers on a node to determine what it is used for""" + analyse = {"service": service, "node": node} + if pubsub_node is None: + try: + first_item = (await client.pubsub_client.items( + service, node, 1 + ))[0][0] + except IndexError: + pass + except error.StanzaError as e: + if e.condition == "item-not-found": + pass + else: + log.warning( + f"Can't retrieve last item on node {node!r} at service " + f"{service} for {client.profile}: {e}" + ) + else: + try: + uri = first_item.firstChildElement().uri + except Exception as e: + log.warning( + f"Can't retrieve item namespace on node {node!r} at service " + f"{service} for {client.profile}: {e}" + ) + else: + analyse["namespace"] = uri + try: + conf = await self._p.getConfiguration(client, service, node) + except Exception as e: + log.warning( + f"Can't retrieve configuration for node {node!r} at service {service} " + f"for {client.profile}: {e}" + ) + else: + analyse["conf"] = conf + + for analyser in self.analysers.values(): + try: + an_node = analyser["node"] + except KeyError: + pass + else: + if node.startswith(an_node): + for key in ANALYSER_KEYS_TO_COPY: + try: + analyse[key] = analyser[key] + except KeyError: + pass + found = True + break + try: + namespace = analyse["namespace"] + an_namespace = analyser["namespace"] + except KeyError: + pass + else: + if namespace == an_namespace: + for key in ANALYSER_KEYS_TO_COPY: + try: + analyse[key] = analyser[key] + except KeyError: + pass + found = True + break + + else: + found = False + log.debug( + f"node {node!r} at service {service} doesn't match any known type" + ) + if found: + try: + match_cb = analyser["match_cb"] + except KeyError: + pass + else: + await utils.as_deferred(match_cb, client, analyse) + return analyse + + def _get_items_from_cache( + self, service="", node="", max_items=10, item_ids=None, sub_id=None, + extra="", profile_key=C.PROF_KEY_NONE + ): + d = defer.ensureDeferred(self._a_get_items_from_cache( + service, node, max_items, item_ids, sub_id, extra, profile_key + )) + d.addCallback(self._p.trans_items_data) + d.addCallback(self._p.serialise_items) + return d + + async def _a_get_items_from_cache( + self, service, node, max_items, item_ids, sub_id, extra, profile_key + ): + client = self.host.get_client(profile_key) + service = jid.JID(service) if service else client.jid.userhostJID() + pubsub_node = await self.host.memory.storage.get_pubsub_node( + client, service, node + ) + if pubsub_node is None: + raise exceptions.NotFound( + f"{node!r} at {service} doesn't exist in cache for {client.profile!r}" + ) + max_items = None if max_items == C.NO_LIMIT else max_items + extra = self._p.parse_extra(data_format.deserialise(extra)) + items, metadata = await self.get_items_from_cache( + client, + pubsub_node, + max_items, + item_ids, + sub_id or None, + extra.rsm_request, + extra.extra, + ) + return [i.data for i in items], metadata + + async def get_items_from_cache( + self, + client: SatXMPPEntity, + node: PubsubNode, + max_items: Optional[int] = None, + item_ids: Optional[List[str]] = None, + sub_id: Optional[str] = None, + rsm_request: Optional[rsm.RSMRequest] = None, + extra: Optional[Dict[str, Any]] = None + ) -> Tuple[List[PubsubItem], dict]: + """Get items from cache, using same arguments as for external Pubsub request""" + if extra is None: + extra = {} + if "mam" in extra: + raise NotImplementedError("MAM queries are not supported yet") + if max_items is None and rsm_request is None: + max_items = 20 + pubsub_items, metadata = await self.host.memory.storage.get_items( + node, max_items=max_items, item_ids=item_ids or None, + order_by=extra.get(C.KEY_ORDER_BY) + ) + elif max_items is not None: + if rsm_request is not None: + raise exceptions.InternalError( + "Pubsub max items and RSM must not be used at the same time" + ) + elif item_ids: + raise exceptions.InternalError( + "Pubsub max items and item IDs must not be used at the same time" + ) + pubsub_items, metadata = await self.host.memory.storage.get_items( + node, max_items=max_items, order_by=extra.get(C.KEY_ORDER_BY) + ) + else: + desc = False + if rsm_request.before == "": + before = None + desc = True + else: + before = rsm_request.before + pubsub_items, metadata = await self.host.memory.storage.get_items( + node, max_items=rsm_request.max, before=before, after=rsm_request.after, + from_index=rsm_request.index, order_by=extra.get(C.KEY_ORDER_BY), + desc=desc, force_rsm=True, + ) + + return pubsub_items, metadata + + async def on_items_event(self, client, event): + node = await self.host.memory.storage.get_pubsub_node( + client, event.sender, event.nodeIdentifier + ) + if node is None: + return + if node.sync_state in (SyncState.COMPLETED, SyncState.IN_PROGRESS): + items = [] + retract_ids = [] + for elt in event.items: + if elt.name == "item": + items.append(elt) + elif elt.name == "retract": + item_id = elt.getAttribute("id") + if not item_id: + log.warning( + "Ignoring invalid retract item element: " + f"{xml_tools.p_fmt_elt(elt)}" + ) + continue + + retract_ids.append(elt["id"]) + else: + log.warning( + f"Unexpected Pubsub event element: {xml_tools.p_fmt_elt(elt)}" + ) + if items: + log.debug(f"[{client.profile}] caching new items received from {node}") + await self.cache_items( + client, node, items + ) + if retract_ids: + log.debug(f"deleting retracted items from {node}") + await self.host.memory.storage.delete_pubsub_items( + node, items_names=retract_ids + ) + + async def on_delete_event(self, client, event): + log.debug( + f"deleting node {event.nodeIdentifier} from {event.sender} for " + f"{client.profile}" + ) + await self.host.memory.storage.delete_pubsub_node( + [client.profile], [event.sender], [event.nodeIdentifier] + ) + + async def on_purge_event(self, client, event): + node = await self.host.memory.storage.get_pubsub_node( + client, event.sender, event.nodeIdentifier + ) + if node is None: + return + log.debug(f"purging node {node} for {client.profile}") + await self.host.memory.storage.delete_pubsub_items(node) + + async def _get_items_trigger( + self, + client: SatXMPPEntity, + service: Optional[jid.JID], + node: str, + max_items: Optional[int], + item_ids: Optional[List[str]], + sub_id: Optional[str], + rsm_request: Optional[rsm.RSMRequest], + extra: dict + ) -> Tuple[bool, Optional[Tuple[List[dict], dict]]]: + if not self.use_cache: + log.debug("cache disabled in settings") + return True, None + if extra.get(C.KEY_USE_CACHE) == False: + log.debug("skipping pubsub cache as requested") + return True, None + if service is None: + service = client.jid.userhostJID() + for __ in range(5): + pubsub_node = await self.host.memory.storage.get_pubsub_node( + client, service, node + ) + if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED: + analyse = {"to_sync": True} + else: + analyse = await self.analyse_node(client, service, node) + + if pubsub_node is None: + try: + pubsub_node = await self.host.memory.storage.set_pubsub_node( + client, + service, + node, + analyser=analyse.get("name"), + type_=analyse.get("type"), + subtype=analyse.get("subtype"), + ) + except IntegrityError as e: + if "unique" in str(e.orig).lower(): + log.debug( + "race condition on pubsub node creation in cache, trying " + "again" + ) + else: + raise e + break + else: + raise exceptions.InternalError( + "Too many IntegrityError with UNIQUE constraint, something is going wrong" + ) + + if analyse.get("to_sync"): + if pubsub_node.sync_state == SyncState.COMPLETED: + if "mam" in extra: + log.debug("MAM caching is not supported yet, skipping cache") + return True, None + pubsub_items, metadata = await self.get_items_from_cache( + client, pubsub_node, max_items, item_ids, sub_id, rsm_request, extra + ) + return False, ([i.data for i in pubsub_items], metadata) + + if pubsub_node.sync_state == SyncState.IN_PROGRESS: + if (service, node) not in self.in_progress: + log.warning( + f"{pubsub_node} is reported as being cached, but not caching is " + "in progress, this is most probably due to the backend being " + "restarted. Resetting the status, caching will be done again." + ) + pubsub_node.sync_state = None + await self.host.memory.storage.delete_pubsub_items(pubsub_node) + elif time.time() - pubsub_node.sync_state_updated > PROGRESS_DEADLINE: + log.warning( + f"{pubsub_node} is in progress for too long " + f"({pubsub_node.sync_state_updated//60} minutes), " + "cancelling it and retrying." + ) + self.in_progress.pop[(service, node)].cancel() + pubsub_node.sync_state = None + await self.host.memory.storage.delete_pubsub_items(pubsub_node) + else: + log.debug( + f"{pubsub_node} synchronisation is already in progress, skipping" + ) + if pubsub_node.sync_state is None: + key = (service, node) + if key in self.in_progress: + raise exceptions.InternalError( + f"There is already a caching in progress for {pubsub_node}, this " + "should not happen" + ) + self.cache_node(client, pubsub_node) + elif pubsub_node.sync_state == SyncState.ERROR: + log.debug( + f"{pubsub_node} synchronisation has previously failed, skipping" + ) + + return True, None + + async def _subscribe_trigger( + self, + client: SatXMPPEntity, + service: jid.JID, + nodeIdentifier: str, + sub_jid: Optional[jid.JID], + options: Optional[dict], + subscription: pubsub.Subscription + ) -> None: + pass + + async def _unsubscribe_trigger( + self, + client: SatXMPPEntity, + service: jid.JID, + nodeIdentifier: str, + sub_jid, + subscriptionIdentifier, + sender, + ) -> None: + pass + + def _synchronise(self, service, node, profile_key): + client = self.host.get_client(profile_key) + service = client.jid.userhostJID() if not service else jid.JID(service) + return defer.ensureDeferred(self.synchronise(client, service, node)) + + async def synchronise( + self, + client: SatXMPPEntity, + service: jid.JID, + node: str, + resync: bool = True + ) -> None: + """Synchronise a node with a pubsub service + + The node will be synchronised even if there is no matching analyser. + + Note that when a node is synchronised, it is automatically subscribed. + @param resync: if True and the node is already synchronised, it will be + resynchronised (all items will be deleted and re-downloaded). + + """ + pubsub_node = await self.host.memory.storage.get_pubsub_node( + client, service, node + ) + if pubsub_node is None: + log.info( + _( + "Synchronising the new node {node} at {service}" + ).format(node=node, service=service.full) + ) + analyse = await self.analyse_node(client, service, node) + pubsub_node = await self.host.memory.storage.set_pubsub_node( + client, + service, + node, + analyser=analyse.get("name"), + type_=analyse.get("type"), + ) + elif not resync and pubsub_node.sync_state is not None: + # the node exists, nothing to do + return + + if ((pubsub_node.sync_state == SyncState.IN_PROGRESS + or (service, node) in self.in_progress)): + log.warning( + _( + "{node} at {service} is already being synchronised, can't do a new " + "synchronisation." + ).format(node=node, service=service) + ) + else: + log.info( + _( + "(Re)Synchronising the node {node} at {service} on user request" + ).format(node=node, service=service.full()) + ) + # we first delete and recreate the node (will also delete its items) + await self.host.memory.storage.delete(pubsub_node) + analyse = await self.analyse_node(client, service, node) + pubsub_node = await self.host.memory.storage.set_pubsub_node( + client, + service, + node, + analyser=analyse.get("name"), + type_=analyse.get("type"), + ) + # then we can put node in cache + await self.cache_node(client, pubsub_node) + + async def purge(self, purge_filters: dict) -> None: + """Remove items according to filters + + filters can have on of the following keys, all are optional: + + :services: + list of JIDs of services from which items must be deleted + :nodes: + list of node names to delete + :types: + list of node types to delete + :subtypes: + list of node subtypes to delete + :profiles: + list of profiles from which items must be deleted + :created_before: + datetime before which items must have been created to be deleted + :created_update: + datetime before which items must have been updated last to be deleted + """ + purge_filters["names"] = purge_filters.pop("nodes", None) + await self.host.memory.storage.purge_pubsub_items(**purge_filters) + + def _purge(self, purge_filters: str) -> None: + purge_filters = data_format.deserialise(purge_filters) + for key in "created_before", "updated_before": + try: + purge_filters[key] = datetime.fromtimestamp(purge_filters[key]) + except (KeyError, TypeError): + pass + return defer.ensureDeferred(self.purge(purge_filters)) + + async def reset(self) -> None: + """Remove ALL nodes and items from cache + + After calling this method, cache will be refilled progressively as if it where new + """ + await self.host.memory.storage.delete_pubsub_node(None, None, None) + + def _reset(self) -> defer.Deferred: + return defer.ensureDeferred(self.reset()) + + async def search(self, query: dict) -> List[PubsubItem]: + """Search pubsub items in cache""" + return await self.host.memory.storage.search_pubsub_items(query) + + async def serialisable_search(self, query: dict) -> List[dict]: + """Search pubsub items in cache and returns parsed data + + The returned data can be serialised. + + "pubsub_service" and "pubsub_name" will be added to each data (both as strings) + """ + items = await self.search(query) + ret = [] + for item in items: + parsed = item.parsed + parsed["pubsub_service"] = item.node.service.full() + parsed["pubsub_node"] = item.node.name + if query.get("with_payload"): + parsed["item_payload"] = item.data.toXml() + parsed["node_profile"] = self.host.memory.storage.get_profile_by_id( + item.node.profile_id + ) + + ret.append(parsed) + return ret + + def _search(self, query: str) -> defer.Deferred: + query = data_format.deserialise(query) + services = query.get("services") + if services: + query["services"] = [jid.JID(s) for s in services] + d = defer.ensureDeferred(self.serialisable_search(query)) + d.addCallback(data_format.serialise) + return d