diff sat/plugins/plugin_pubsub_cache.py @ 3666:342e3ddefd23

plugin pubsub cache: implement `psCacheSearch` rel 361
author Goffi <goffi@goffi.org>
date Wed, 08 Sep 2021 17:58:48 +0200
parents 32181a45d54b
children ffa8c8c78115
line wrap: on
line diff
--- a/sat/plugins/plugin_pubsub_cache.py	Wed Sep 08 17:58:48 2021 +0200
+++ b/sat/plugins/plugin_pubsub_cache.py	Wed Sep 08 17:58:48 2021 +0200
@@ -119,6 +119,14 @@
             method=self._reset,
             async_=True,
         )
+        host.bridge.addMethod(
+            "psCacheSearch",
+            ".plugin",
+            "s",
+            out_sign="s",
+            method=self._search,
+            async_=True,
+        )
 
     def registerAnalyser(self, analyser: dict) -> None:
         """Register a new pubsub node analyser
@@ -774,5 +782,40 @@
         """
         await self.host.memory.storage.deletePubsubNode(None, None, None)
 
-    def _reset(self) -> None:
+    def _reset(self) -> defer.Deferred:
         return defer.ensureDeferred(self.reset())
+
+    async def search(self, query: dict) -> List[PubsubItem]:
+        """Search pubsub items in cache"""
+        return await self.host.memory.storage.searchPubsubItems(query)
+
+    async def serialisableSearch(self, query: dict) -> List[dict]:
+        """Search pubsub items in cache and returns parsed data
+
+        The returned data can be serialised.
+
+        "pubsub_service" and "pubsub_name" will be added to each data (both as strings)
+        """
+        items = await self.search(query)
+        ret = []
+        for item in items:
+            parsed = item.parsed
+            parsed["pubsub_service"] = item.node.service.full()
+            parsed["pubsub_node"] = item.node.name
+            if query.get("with_payload"):
+                parsed["item_payload"] = item.data.toXml()
+            parsed["node_profile"] = self.host.memory.storage.getProfileById(
+                item.node.profile_id
+            )
+
+            ret.append(parsed)
+        return ret
+
+    def _search(self, query: str) -> defer.Deferred:
+        query = data_format.deserialise(query)
+        services = query.get("services")
+        if services:
+            query["services"] = [jid.JID(s) for s in services]
+        d = defer.ensureDeferred(self.serialisableSearch(query))
+        d.addCallback(data_format.serialise)
+        return d