diff sat/plugins/plugin_pubsub_cache.py @ 4037:524856bd7b19

massive refactoring to switch from camelCase to snake_case: historically, Libervia (SàT before) was using camelCase as allowed by PEP8 when using a pre-PEP8 code, to use the same coding style as in Twisted. However, snake_case is more readable and it's better to follow PEP8 best practices, so it has been decided to move on full snake_case. Because Libervia has a huge codebase, this ended with a ugly mix of camelCase and snake_case. To fix that, this patch does a big refactoring by renaming every function and method (including bridge) that are not coming from Twisted or Wokkel, to use fully snake_case. This is a massive change, and may result in some bugs.
author Goffi <goffi@goffi.org>
date Sat, 08 Apr 2023 13:54:42 +0200
parents 036188fff714
children
line wrap: on
line diff
--- a/sat/plugins/plugin_pubsub_cache.py	Fri Apr 07 15:18:39 2023 +0200
+++ b/sat/plugins/plugin_pubsub_cache.py	Sat Apr 08 13:54:42 2023 +0200
@@ -65,7 +65,7 @@
 
     def __init__(self, host):
         log.info(_("PubSub Cache initialization"))
-        strategy = host.memory.getConfig(None, "pubsub_cache_strategy")
+        strategy = host.memory.config_get(None, "pubsub_cache_strategy")
         if strategy == "no_cache":
             log.info(
                 _(
@@ -81,47 +81,47 @@
         self.analysers = {}
         # map for caching in progress (node, service) => Deferred
         self.in_progress = {}
-        self.host.trigger.add("XEP-0060_getItems", self._getItemsTrigger)
-        self._p.addManagedNode(
+        self.host.trigger.add("XEP-0060_getItems", self._get_items_trigger)
+        self._p.add_managed_node(
             "",
-            items_cb=self.onItemsEvent,
-            delete_cb=self.onDeleteEvent,
-            purge_db=self.onPurgeEvent,
+            items_cb=self.on_items_event,
+            delete_cb=self.on_delete_event,
+            purge_db=self.on_purge_event,
         )
-        host.bridge.addMethod(
-            "psCacheGet",
+        host.bridge.add_method(
+            "ps_cache_get",
             ".plugin",
             in_sign="ssiassss",
             out_sign="s",
-            method=self._getItemsFromCache,
+            method=self._get_items_from_cache,
             async_=True,
         )
-        host.bridge.addMethod(
-            "psCacheSync",
+        host.bridge.add_method(
+            "ps_cache_sync",
             ".plugin",
             "sss",
             out_sign="",
             method=self._synchronise,
             async_=True,
         )
-        host.bridge.addMethod(
-            "psCachePurge",
+        host.bridge.add_method(
+            "ps_cache_purge",
             ".plugin",
             "s",
             out_sign="",
             method=self._purge,
             async_=True,
         )
-        host.bridge.addMethod(
-            "psCacheReset",
+        host.bridge.add_method(
+            "ps_cache_reset",
             ".plugin",
             "",
             out_sign="",
             method=self._reset,
             async_=True,
         )
-        host.bridge.addMethod(
-            "psCacheSearch",
+        host.bridge.add_method(
+            "ps_cache_search",
             ".plugin",
             "s",
             out_sign="s",
@@ -129,7 +129,7 @@
             async_=True,
         )
 
-    def registerAnalyser(self, analyser: dict) -> None:
+    def register_analyser(self, analyser: dict) -> None:
         """Register a new pubsub node analyser
 
         @param analyser: An analyser is a dictionary which may have the following keys
@@ -203,7 +203,7 @@
             )
         self.analysers[name] = analyser
 
-    async def cacheItems(
+    async def cache_items(
         self,
         client: SatXMPPEntity,
         pubsub_node: PubsubNode,
@@ -216,7 +216,7 @@
 
         if parser is not None:
             parsed_items = [
-                await utils.asDeferred(
+                await utils.as_deferred(
                     parser,
                     client,
                     item,
@@ -228,16 +228,16 @@
         else:
             parsed_items = None
 
-        await self.host.memory.storage.cachePubsubItems(
+        await self.host.memory.storage.cache_pubsub_items(
             client, pubsub_node, items, parsed_items
         )
 
-    async def _cacheNode(
+    async def _cache_node(
         self,
         client: SatXMPPEntity,
         pubsub_node: PubsubNode
     ) -> None:
-        await self.host.memory.storage.updatePubsubNodeSyncState(
+        await self.host.memory.storage.update_pubsub_node_sync_state(
             pubsub_node, SyncState.IN_PROGRESS
         )
         service, node = pubsub_node.service, pubsub_node.name
@@ -274,7 +274,7 @@
                         )
 
             try:
-                await self.host.checkFeatures(
+                await self.host.check_features(
                     client, [rsm.NS_RSM, self._p.DISCO_RSM], pubsub_node.service
                 )
             except error.StanzaError as e:
@@ -286,7 +286,7 @@
                     items, __ = await client.pubsub_client.items(
                         pubsub_node.service, pubsub_node.name, maxItems=20
                     )
-                    await self.cacheItems(
+                    await self.cache_items(
                         client, pubsub_node, items
                     )
                 else:
@@ -299,7 +299,7 @@
                 items, __ = await client.pubsub_client.items(
                     pubsub_node.service, pubsub_node.name, maxItems=20
                 )
-                await self.cacheItems(
+                await self.cache_items(
                     client, pubsub_node, items
                 )
             else:
@@ -310,7 +310,7 @@
                     items, rsm_response = await client.pubsub_client.items(
                         service, node, rsm_request=rsm_request
                     )
-                    await self.cacheItems(
+                    await self.cache_items(
                         client, pubsub_node, items
                     )
                     for item in items:
@@ -334,11 +334,11 @@
                             )
                             rsm_request = None
                             break
-                    rsm_request = rsm_p.getNextRequest(rsm_request, rsm_response)
+                    rsm_request = rsm_p.get_next_request(rsm_request, rsm_response)
                     if rsm_request is None:
                         break
 
-            await self.host.memory.storage.updatePubsubNodeSyncState(
+            await self.host.memory.storage.update_pubsub_node_sync_state(
                 pubsub_node, SyncState.COMPLETED
             )
         except Exception as e:
@@ -347,27 +347,27 @@
             log.error(
                 f"Can't cache node {node!r} at {service} for {client.profile}: {e}\n{tb}"
             )
-            await self.host.memory.storage.updatePubsubNodeSyncState(
+            await self.host.memory.storage.update_pubsub_node_sync_state(
                 pubsub_node, SyncState.ERROR
             )
-            await self.host.memory.storage.deletePubsubItems(pubsub_node)
+            await self.host.memory.storage.delete_pubsub_items(pubsub_node)
             raise e
 
-    def _cacheNodeClean(self, __, pubsub_node):
+    def _cache_node_clean(self, __, pubsub_node):
         del self.in_progress[(pubsub_node.service, pubsub_node.name)]
 
-    def cacheNode(
+    def cache_node(
         self,
         client: SatXMPPEntity,
         pubsub_node: PubsubNode
     ) -> None:
         """Launch node caching as a background task"""
-        d = defer.ensureDeferred(self._cacheNode(client, pubsub_node))
-        d.addBoth(self._cacheNodeClean, pubsub_node=pubsub_node)
+        d = defer.ensureDeferred(self._cache_node(client, pubsub_node))
+        d.addBoth(self._cache_node_clean, pubsub_node=pubsub_node)
         self.in_progress[(pubsub_node.service, pubsub_node.name)] = d
         return d
 
-    async def analyseNode(
+    async def analyse_node(
         self,
         client: SatXMPPEntity,
         service: jid.JID,
@@ -451,26 +451,26 @@
             except KeyError:
                 pass
             else:
-                await utils.asDeferred(match_cb, client, analyse)
+                await utils.as_deferred(match_cb, client, analyse)
         return analyse
 
-    def _getItemsFromCache(
+    def _get_items_from_cache(
         self, service="", node="", max_items=10, item_ids=None, sub_id=None,
         extra="", profile_key=C.PROF_KEY_NONE
     ):
-        d = defer.ensureDeferred(self._aGetItemsFromCache(
+        d = defer.ensureDeferred(self._a_get_items_from_cache(
             service, node, max_items, item_ids, sub_id, extra, profile_key
         ))
-        d.addCallback(self._p.transItemsData)
-        d.addCallback(self._p.serialiseItems)
+        d.addCallback(self._p.trans_items_data)
+        d.addCallback(self._p.serialise_items)
         return d
 
-    async def _aGetItemsFromCache(
+    async def _a_get_items_from_cache(
         self, service, node, max_items, item_ids, sub_id, extra, profile_key
     ):
-        client = self.host.getClient(profile_key)
+        client = self.host.get_client(profile_key)
         service = jid.JID(service) if service else client.jid.userhostJID()
-        pubsub_node = await self.host.memory.storage.getPubsubNode(
+        pubsub_node = await self.host.memory.storage.get_pubsub_node(
             client, service, node
         )
         if pubsub_node is None:
@@ -478,8 +478,8 @@
                 f"{node!r} at {service} doesn't exist in cache for {client.profile!r}"
             )
         max_items = None if max_items == C.NO_LIMIT else max_items
-        extra = self._p.parseExtra(data_format.deserialise(extra))
-        items, metadata = await self.getItemsFromCache(
+        extra = self._p.parse_extra(data_format.deserialise(extra))
+        items, metadata = await self.get_items_from_cache(
             client,
             pubsub_node,
             max_items,
@@ -490,7 +490,7 @@
         )
         return [i.data for i in items], metadata
 
-    async def getItemsFromCache(
+    async def get_items_from_cache(
         self,
         client: SatXMPPEntity,
         node: PubsubNode,
@@ -507,7 +507,7 @@
             raise NotImplementedError("MAM queries are not supported yet")
         if max_items is None and rsm_request is None:
             max_items = 20
-            pubsub_items, metadata = await self.host.memory.storage.getItems(
+            pubsub_items, metadata = await self.host.memory.storage.get_items(
                 node, max_items=max_items, item_ids=item_ids or None,
                 order_by=extra.get(C.KEY_ORDER_BY)
             )
@@ -520,7 +520,7 @@
                 raise exceptions.InternalError(
                     "Pubsub max items and item IDs must not be used at the same time"
                 )
-            pubsub_items, metadata = await self.host.memory.storage.getItems(
+            pubsub_items, metadata = await self.host.memory.storage.get_items(
                 node, max_items=max_items, order_by=extra.get(C.KEY_ORDER_BY)
             )
         else:
@@ -530,7 +530,7 @@
                 desc = True
             else:
                 before = rsm_request.before
-            pubsub_items, metadata = await self.host.memory.storage.getItems(
+            pubsub_items, metadata = await self.host.memory.storage.get_items(
                 node, max_items=rsm_request.max, before=before, after=rsm_request.after,
                 from_index=rsm_request.index, order_by=extra.get(C.KEY_ORDER_BY),
                 desc=desc, force_rsm=True,
@@ -538,8 +538,8 @@
 
         return pubsub_items, metadata
 
-    async def onItemsEvent(self, client, event):
-        node = await self.host.memory.storage.getPubsubNode(
+    async def on_items_event(self, client, event):
+        node = await self.host.memory.storage.get_pubsub_node(
             client, event.sender, event.nodeIdentifier
         )
         if node is None:
@@ -555,45 +555,45 @@
                     if not item_id:
                         log.warning(
                             "Ignoring invalid retract item element: "
-                            f"{xml_tools.pFmtElt(elt)}"
+                            f"{xml_tools.p_fmt_elt(elt)}"
                         )
                         continue
 
                     retract_ids.append(elt["id"])
                 else:
                     log.warning(
-                        f"Unexpected Pubsub event element: {xml_tools.pFmtElt(elt)}"
+                        f"Unexpected Pubsub event element: {xml_tools.p_fmt_elt(elt)}"
                     )
             if items:
                 log.debug(f"[{client.profile}] caching new items received from {node}")
-                await self.cacheItems(
+                await self.cache_items(
                     client, node, items
                 )
             if retract_ids:
                 log.debug(f"deleting retracted items from {node}")
-                await self.host.memory.storage.deletePubsubItems(
+                await self.host.memory.storage.delete_pubsub_items(
                     node, items_names=retract_ids
                 )
 
-    async def onDeleteEvent(self, client, event):
+    async def on_delete_event(self, client, event):
         log.debug(
             f"deleting node {event.nodeIdentifier} from {event.sender} for "
             f"{client.profile}"
         )
-        await self.host.memory.storage.deletePubsubNode(
+        await self.host.memory.storage.delete_pubsub_node(
             [client.profile], [event.sender], [event.nodeIdentifier]
         )
 
-    async def onPurgeEvent(self, client, event):
-        node = await self.host.memory.storage.getPubsubNode(
+    async def on_purge_event(self, client, event):
+        node = await self.host.memory.storage.get_pubsub_node(
             client, event.sender, event.nodeIdentifier
         )
         if node is None:
             return
         log.debug(f"purging node {node} for {client.profile}")
-        await self.host.memory.storage.deletePubsubItems(node)
+        await self.host.memory.storage.delete_pubsub_items(node)
 
-    async def _getItemsTrigger(
+    async def _get_items_trigger(
         self,
         client: SatXMPPEntity,
         service: Optional[jid.JID],
@@ -613,17 +613,17 @@
         if service is None:
             service = client.jid.userhostJID()
         for __ in range(5):
-            pubsub_node = await self.host.memory.storage.getPubsubNode(
+            pubsub_node = await self.host.memory.storage.get_pubsub_node(
                 client, service, node
             )
             if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED:
                 analyse = {"to_sync": True}
             else:
-                analyse = await self.analyseNode(client, service, node)
+                analyse = await self.analyse_node(client, service, node)
 
             if pubsub_node is None:
                 try:
-                    pubsub_node = await self.host.memory.storage.setPubsubNode(
+                    pubsub_node = await self.host.memory.storage.set_pubsub_node(
                         client,
                         service,
                         node,
@@ -650,7 +650,7 @@
                 if "mam" in extra:
                     log.debug("MAM caching is not supported yet, skipping cache")
                     return True, None
-                pubsub_items, metadata = await self.getItemsFromCache(
+                pubsub_items, metadata = await self.get_items_from_cache(
                     client, pubsub_node, max_items, item_ids, sub_id, rsm_request, extra
                 )
                 return False, ([i.data for i in pubsub_items], metadata)
@@ -663,7 +663,7 @@
                         "restarted. Resetting the status, caching will be done again."
                     )
                     pubsub_node.sync_state = None
-                    await self.host.memory.storage.deletePubsubItems(pubsub_node)
+                    await self.host.memory.storage.delete_pubsub_items(pubsub_node)
                 elif time.time() - pubsub_node.sync_state_updated > PROGRESS_DEADLINE:
                     log.warning(
                         f"{pubsub_node} is in progress for too long "
@@ -672,7 +672,7 @@
                     )
                     self.in_progress.pop[(service, node)].cancel()
                     pubsub_node.sync_state = None
-                    await self.host.memory.storage.deletePubsubItems(pubsub_node)
+                    await self.host.memory.storage.delete_pubsub_items(pubsub_node)
                 else:
                     log.debug(
                         f"{pubsub_node} synchronisation is already in progress, skipping"
@@ -684,7 +684,7 @@
                         f"There is already a caching in progress for {pubsub_node}, this "
                         "should not happen"
                     )
-                self.cacheNode(client, pubsub_node)
+                self.cache_node(client, pubsub_node)
             elif pubsub_node.sync_state == SyncState.ERROR:
                 log.debug(
                     f"{pubsub_node} synchronisation has previously failed, skipping"
@@ -692,7 +692,7 @@
 
         return True, None
 
-    async def _subscribeTrigger(
+    async def _subscribe_trigger(
         self,
         client: SatXMPPEntity,
         service: jid.JID,
@@ -703,7 +703,7 @@
     ) -> None:
         pass
 
-    async def _unsubscribeTrigger(
+    async def _unsubscribe_trigger(
         self,
         client: SatXMPPEntity,
         service: jid.JID,
@@ -715,7 +715,7 @@
         pass
 
     def _synchronise(self, service, node, profile_key):
-        client = self.host.getClient(profile_key)
+        client = self.host.get_client(profile_key)
         service = client.jid.userhostJID() if not service else jid.JID(service)
         return defer.ensureDeferred(self.synchronise(client, service, node))
 
@@ -735,7 +735,7 @@
             resynchronised (all items will be deleted and re-downloaded).
 
         """
-        pubsub_node = await self.host.memory.storage.getPubsubNode(
+        pubsub_node = await self.host.memory.storage.get_pubsub_node(
             client, service, node
         )
         if pubsub_node is None:
@@ -744,8 +744,8 @@
                     "Synchronising the new node {node} at {service}"
                 ).format(node=node, service=service.full)
             )
-            analyse = await self.analyseNode(client, service, node)
-            pubsub_node = await self.host.memory.storage.setPubsubNode(
+            analyse = await self.analyse_node(client, service, node)
+            pubsub_node = await self.host.memory.storage.set_pubsub_node(
                 client,
                 service,
                 node,
@@ -772,8 +772,8 @@
             )
             # we first delete and recreate the node (will also delete its items)
             await self.host.memory.storage.delete(pubsub_node)
-            analyse = await self.analyseNode(client, service, node)
-            pubsub_node = await self.host.memory.storage.setPubsubNode(
+            analyse = await self.analyse_node(client, service, node)
+            pubsub_node = await self.host.memory.storage.set_pubsub_node(
                 client,
                 service,
                 node,
@@ -781,7 +781,7 @@
                 type_=analyse.get("type"),
             )
             # then we can put node in cache
-            await self.cacheNode(client, pubsub_node)
+            await self.cache_node(client, pubsub_node)
 
     async def purge(self, purge_filters: dict) -> None:
         """Remove items according to filters
@@ -804,7 +804,7 @@
                 datetime before which items must have been updated last to be deleted
         """
         purge_filters["names"] = purge_filters.pop("nodes", None)
-        await self.host.memory.storage.purgePubsubItems(**purge_filters)
+        await self.host.memory.storage.purge_pubsub_items(**purge_filters)
 
     def _purge(self, purge_filters: str) -> None:
         purge_filters = data_format.deserialise(purge_filters)
@@ -820,16 +820,16 @@
 
         After calling this method, cache will be refilled progressively as if it where new
         """
-        await self.host.memory.storage.deletePubsubNode(None, None, None)
+        await self.host.memory.storage.delete_pubsub_node(None, None, None)
 
     def _reset(self) -> defer.Deferred:
         return defer.ensureDeferred(self.reset())
 
     async def search(self, query: dict) -> List[PubsubItem]:
         """Search pubsub items in cache"""
-        return await self.host.memory.storage.searchPubsubItems(query)
+        return await self.host.memory.storage.search_pubsub_items(query)
 
-    async def serialisableSearch(self, query: dict) -> List[dict]:
+    async def serialisable_search(self, query: dict) -> List[dict]:
         """Search pubsub items in cache and returns parsed data
 
         The returned data can be serialised.
@@ -844,7 +844,7 @@
             parsed["pubsub_node"] = item.node.name
             if query.get("with_payload"):
                 parsed["item_payload"] = item.data.toXml()
-            parsed["node_profile"] = self.host.memory.storage.getProfileById(
+            parsed["node_profile"] = self.host.memory.storage.get_profile_by_id(
                 item.node.profile_id
             )
 
@@ -856,6 +856,6 @@
         services = query.get("services")
         if services:
             query["services"] = [jid.JID(s) for s in services]
-        d = defer.ensureDeferred(self.serialisableSearch(query))
+        d = defer.ensureDeferred(self.serialisable_search(query))
         d.addCallback(data_format.serialise)
         return d