comparison sat/plugins/plugin_pubsub_cache.py @ 3666:342e3ddefd23

plugin pubsub cache: implement `psCacheSearch` rel 361
author Goffi <goffi@goffi.org>
date Wed, 08 Sep 2021 17:58:48 +0200
parents 32181a45d54b
children ffa8c8c78115
comparison
equal deleted inserted replaced
3665:72b0e4053ab0 3666:342e3ddefd23
115 "psCacheReset", 115 "psCacheReset",
116 ".plugin", 116 ".plugin",
117 "", 117 "",
118 out_sign="", 118 out_sign="",
119 method=self._reset, 119 method=self._reset,
120 async_=True,
121 )
122 host.bridge.addMethod(
123 "psCacheSearch",
124 ".plugin",
125 "s",
126 out_sign="s",
127 method=self._search,
120 async_=True, 128 async_=True,
121 ) 129 )
122 130
123 def registerAnalyser(self, analyser: dict) -> None: 131 def registerAnalyser(self, analyser: dict) -> None:
124 """Register a new pubsub node analyser 132 """Register a new pubsub node analyser
772 780
773 After calling this method, cache will be refilled progressively as if it where new 781 After calling this method, cache will be refilled progressively as if it where new
774 """ 782 """
775 await self.host.memory.storage.deletePubsubNode(None, None, None) 783 await self.host.memory.storage.deletePubsubNode(None, None, None)
776 784
777 def _reset(self) -> None: 785 def _reset(self) -> defer.Deferred:
778 return defer.ensureDeferred(self.reset()) 786 return defer.ensureDeferred(self.reset())
787
788 async def search(self, query: dict) -> List[PubsubItem]:
789 """Search pubsub items in cache"""
790 return await self.host.memory.storage.searchPubsubItems(query)
791
792 async def serialisableSearch(self, query: dict) -> List[dict]:
793 """Search pubsub items in cache and returns parsed data
794
795 The returned data can be serialised.
796
797 "pubsub_service" and "pubsub_name" will be added to each data (both as strings)
798 """
799 items = await self.search(query)
800 ret = []
801 for item in items:
802 parsed = item.parsed
803 parsed["pubsub_service"] = item.node.service.full()
804 parsed["pubsub_node"] = item.node.name
805 if query.get("with_payload"):
806 parsed["item_payload"] = item.data.toXml()
807 parsed["node_profile"] = self.host.memory.storage.getProfileById(
808 item.node.profile_id
809 )
810
811 ret.append(parsed)
812 return ret
813
814 def _search(self, query: str) -> defer.Deferred:
815 query = data_format.deserialise(query)
816 services = query.get("services")
817 if services:
818 query["services"] = [jid.JID(s) for s in services]
819 d = defer.ensureDeferred(self.serialisableSearch(query))
820 d.addCallback(data_format.serialise)
821 return d