Mercurial > libervia-backend
comparison sat/plugins/plugin_pubsub_cache.py @ 3666:342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
rel 361
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 08 Sep 2021 17:58:48 +0200 |
parents | 32181a45d54b |
children | ffa8c8c78115 |
comparison
equal
deleted
inserted
replaced
3665:72b0e4053ab0 | 3666:342e3ddefd23 |
---|---|
115 "psCacheReset", | 115 "psCacheReset", |
116 ".plugin", | 116 ".plugin", |
117 "", | 117 "", |
118 out_sign="", | 118 out_sign="", |
119 method=self._reset, | 119 method=self._reset, |
120 async_=True, | |
121 ) | |
122 host.bridge.addMethod( | |
123 "psCacheSearch", | |
124 ".plugin", | |
125 "s", | |
126 out_sign="s", | |
127 method=self._search, | |
120 async_=True, | 128 async_=True, |
121 ) | 129 ) |
122 | 130 |
123 def registerAnalyser(self, analyser: dict) -> None: | 131 def registerAnalyser(self, analyser: dict) -> None: |
124 """Register a new pubsub node analyser | 132 """Register a new pubsub node analyser |
772 | 780 |
773 After calling this method, cache will be refilled progressively as if it where new | 781 After calling this method, cache will be refilled progressively as if it where new |
774 """ | 782 """ |
775 await self.host.memory.storage.deletePubsubNode(None, None, None) | 783 await self.host.memory.storage.deletePubsubNode(None, None, None) |
776 | 784 |
777 def _reset(self) -> None: | 785 def _reset(self) -> defer.Deferred: |
778 return defer.ensureDeferred(self.reset()) | 786 return defer.ensureDeferred(self.reset()) |
787 | |
788 async def search(self, query: dict) -> List[PubsubItem]: | |
789 """Search pubsub items in cache""" | |
790 return await self.host.memory.storage.searchPubsubItems(query) | |
791 | |
792 async def serialisableSearch(self, query: dict) -> List[dict]: | |
793 """Search pubsub items in cache and returns parsed data | |
794 | |
795 The returned data can be serialised. | |
796 | |
797 "pubsub_service" and "pubsub_name" will be added to each data (both as strings) | |
798 """ | |
799 items = await self.search(query) | |
800 ret = [] | |
801 for item in items: | |
802 parsed = item.parsed | |
803 parsed["pubsub_service"] = item.node.service.full() | |
804 parsed["pubsub_node"] = item.node.name | |
805 if query.get("with_payload"): | |
806 parsed["item_payload"] = item.data.toXml() | |
807 parsed["node_profile"] = self.host.memory.storage.getProfileById( | |
808 item.node.profile_id | |
809 ) | |
810 | |
811 ret.append(parsed) | |
812 return ret | |
813 | |
814 def _search(self, query: str) -> defer.Deferred: | |
815 query = data_format.deserialise(query) | |
816 services = query.get("services") | |
817 if services: | |
818 query["services"] = [jid.JID(s) for s in services] | |
819 d = defer.ensureDeferred(self.serialisableSearch(query)) | |
820 d.addCallback(data_format.serialise) | |
821 return d |