# HG changeset patch # User Goffi # Date 1631116728 -7200 # Node ID 342e3ddefd2306e879fa27bef8aceea177333572 # Parent 72b0e4053ab02908cfc3fa03b77f609c5fe5dff7 plugin pubsub cache: implement `psCacheSearch` rel 361 diff -r 72b0e4053ab0 -r 342e3ddefd23 sat/plugins/plugin_pubsub_cache.py --- a/sat/plugins/plugin_pubsub_cache.py Wed Sep 08 17:58:48 2021 +0200 +++ b/sat/plugins/plugin_pubsub_cache.py Wed Sep 08 17:58:48 2021 +0200 @@ -119,6 +119,14 @@ method=self._reset, async_=True, ) + host.bridge.addMethod( + "psCacheSearch", + ".plugin", + "s", + out_sign="s", + method=self._search, + async_=True, + ) def registerAnalyser(self, analyser: dict) -> None: """Register a new pubsub node analyser @@ -774,5 +782,40 @@ """ await self.host.memory.storage.deletePubsubNode(None, None, None) - def _reset(self) -> None: + def _reset(self) -> defer.Deferred: return defer.ensureDeferred(self.reset()) + + async def search(self, query: dict) -> List[PubsubItem]: + """Search pubsub items in cache""" + return await self.host.memory.storage.searchPubsubItems(query) + + async def serialisableSearch(self, query: dict) -> List[dict]: + """Search pubsub items in cache and returns parsed data + + The returned data can be serialised. + + "pubsub_service" and "pubsub_name" will be added to each data (both as strings) + """ + items = await self.search(query) + ret = [] + for item in items: + parsed = item.parsed + parsed["pubsub_service"] = item.node.service.full() + parsed["pubsub_node"] = item.node.name + if query.get("with_payload"): + parsed["item_payload"] = item.data.toXml() + parsed["node_profile"] = self.host.memory.storage.getProfileById( + item.node.profile_id + ) + + ret.append(parsed) + return ret + + def _search(self, query: str) -> defer.Deferred: + query = data_format.deserialise(query) + services = query.get("services") + if services: + query["services"] = [jid.JID(s) for s in services] + d = defer.ensureDeferred(self.serialisableSearch(query)) + d.addCallback(data_format.serialise) + return d