Mercurial > libervia-backend
diff sat/plugins/plugin_pubsub_cache.py @ 4037:524856bd7b19
massive refactoring to switch from camelCase to snake_case:
historically, Libervia (SàT before) was using camelCase as allowed by PEP8 when using a
pre-PEP8 code, to use the same coding style as in Twisted.
However, snake_case is more readable and it's better to follow PEP8 best practices, so it
has been decided to move on full snake_case. Because Libervia has a huge codebase, this
ended with a ugly mix of camelCase and snake_case.
To fix that, this patch does a big refactoring by renaming every function and method
(including bridge) that are not coming from Twisted or Wokkel, to use fully snake_case.
This is a massive change, and may result in some bugs.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 08 Apr 2023 13:54:42 +0200 |
parents | 036188fff714 |
children |
line wrap: on
line diff
--- a/sat/plugins/plugin_pubsub_cache.py Fri Apr 07 15:18:39 2023 +0200 +++ b/sat/plugins/plugin_pubsub_cache.py Sat Apr 08 13:54:42 2023 +0200 @@ -65,7 +65,7 @@ def __init__(self, host): log.info(_("PubSub Cache initialization")) - strategy = host.memory.getConfig(None, "pubsub_cache_strategy") + strategy = host.memory.config_get(None, "pubsub_cache_strategy") if strategy == "no_cache": log.info( _( @@ -81,47 +81,47 @@ self.analysers = {} # map for caching in progress (node, service) => Deferred self.in_progress = {} - self.host.trigger.add("XEP-0060_getItems", self._getItemsTrigger) - self._p.addManagedNode( + self.host.trigger.add("XEP-0060_getItems", self._get_items_trigger) + self._p.add_managed_node( "", - items_cb=self.onItemsEvent, - delete_cb=self.onDeleteEvent, - purge_db=self.onPurgeEvent, + items_cb=self.on_items_event, + delete_cb=self.on_delete_event, + purge_db=self.on_purge_event, ) - host.bridge.addMethod( - "psCacheGet", + host.bridge.add_method( + "ps_cache_get", ".plugin", in_sign="ssiassss", out_sign="s", - method=self._getItemsFromCache, + method=self._get_items_from_cache, async_=True, ) - host.bridge.addMethod( - "psCacheSync", + host.bridge.add_method( + "ps_cache_sync", ".plugin", "sss", out_sign="", method=self._synchronise, async_=True, ) - host.bridge.addMethod( - "psCachePurge", + host.bridge.add_method( + "ps_cache_purge", ".plugin", "s", out_sign="", method=self._purge, async_=True, ) - host.bridge.addMethod( - "psCacheReset", + host.bridge.add_method( + "ps_cache_reset", ".plugin", "", out_sign="", method=self._reset, async_=True, ) - host.bridge.addMethod( - "psCacheSearch", + host.bridge.add_method( + "ps_cache_search", ".plugin", "s", out_sign="s", @@ -129,7 +129,7 @@ async_=True, ) - def registerAnalyser(self, analyser: dict) -> None: + def register_analyser(self, analyser: dict) -> None: """Register a new pubsub node analyser @param analyser: An analyser is a dictionary which may have the following keys @@ -203,7 +203,7 @@ ) self.analysers[name] = analyser - async def cacheItems( + async def cache_items( self, client: SatXMPPEntity, pubsub_node: PubsubNode, @@ -216,7 +216,7 @@ if parser is not None: parsed_items = [ - await utils.asDeferred( + await utils.as_deferred( parser, client, item, @@ -228,16 +228,16 @@ else: parsed_items = None - await self.host.memory.storage.cachePubsubItems( + await self.host.memory.storage.cache_pubsub_items( client, pubsub_node, items, parsed_items ) - async def _cacheNode( + async def _cache_node( self, client: SatXMPPEntity, pubsub_node: PubsubNode ) -> None: - await self.host.memory.storage.updatePubsubNodeSyncState( + await self.host.memory.storage.update_pubsub_node_sync_state( pubsub_node, SyncState.IN_PROGRESS ) service, node = pubsub_node.service, pubsub_node.name @@ -274,7 +274,7 @@ ) try: - await self.host.checkFeatures( + await self.host.check_features( client, [rsm.NS_RSM, self._p.DISCO_RSM], pubsub_node.service ) except error.StanzaError as e: @@ -286,7 +286,7 @@ items, __ = await client.pubsub_client.items( pubsub_node.service, pubsub_node.name, maxItems=20 ) - await self.cacheItems( + await self.cache_items( client, pubsub_node, items ) else: @@ -299,7 +299,7 @@ items, __ = await client.pubsub_client.items( pubsub_node.service, pubsub_node.name, maxItems=20 ) - await self.cacheItems( + await self.cache_items( client, pubsub_node, items ) else: @@ -310,7 +310,7 @@ items, rsm_response = await client.pubsub_client.items( service, node, rsm_request=rsm_request ) - await self.cacheItems( + await self.cache_items( client, pubsub_node, items ) for item in items: @@ -334,11 +334,11 @@ ) rsm_request = None break - rsm_request = rsm_p.getNextRequest(rsm_request, rsm_response) + rsm_request = rsm_p.get_next_request(rsm_request, rsm_response) if rsm_request is None: break - await self.host.memory.storage.updatePubsubNodeSyncState( + await self.host.memory.storage.update_pubsub_node_sync_state( pubsub_node, SyncState.COMPLETED ) except Exception as e: @@ -347,27 +347,27 @@ log.error( f"Can't cache node {node!r} at {service} for {client.profile}: {e}\n{tb}" ) - await self.host.memory.storage.updatePubsubNodeSyncState( + await self.host.memory.storage.update_pubsub_node_sync_state( pubsub_node, SyncState.ERROR ) - await self.host.memory.storage.deletePubsubItems(pubsub_node) + await self.host.memory.storage.delete_pubsub_items(pubsub_node) raise e - def _cacheNodeClean(self, __, pubsub_node): + def _cache_node_clean(self, __, pubsub_node): del self.in_progress[(pubsub_node.service, pubsub_node.name)] - def cacheNode( + def cache_node( self, client: SatXMPPEntity, pubsub_node: PubsubNode ) -> None: """Launch node caching as a background task""" - d = defer.ensureDeferred(self._cacheNode(client, pubsub_node)) - d.addBoth(self._cacheNodeClean, pubsub_node=pubsub_node) + d = defer.ensureDeferred(self._cache_node(client, pubsub_node)) + d.addBoth(self._cache_node_clean, pubsub_node=pubsub_node) self.in_progress[(pubsub_node.service, pubsub_node.name)] = d return d - async def analyseNode( + async def analyse_node( self, client: SatXMPPEntity, service: jid.JID, @@ -451,26 +451,26 @@ except KeyError: pass else: - await utils.asDeferred(match_cb, client, analyse) + await utils.as_deferred(match_cb, client, analyse) return analyse - def _getItemsFromCache( + def _get_items_from_cache( self, service="", node="", max_items=10, item_ids=None, sub_id=None, extra="", profile_key=C.PROF_KEY_NONE ): - d = defer.ensureDeferred(self._aGetItemsFromCache( + d = defer.ensureDeferred(self._a_get_items_from_cache( service, node, max_items, item_ids, sub_id, extra, profile_key )) - d.addCallback(self._p.transItemsData) - d.addCallback(self._p.serialiseItems) + d.addCallback(self._p.trans_items_data) + d.addCallback(self._p.serialise_items) return d - async def _aGetItemsFromCache( + async def _a_get_items_from_cache( self, service, node, max_items, item_ids, sub_id, extra, profile_key ): - client = self.host.getClient(profile_key) + client = self.host.get_client(profile_key) service = jid.JID(service) if service else client.jid.userhostJID() - pubsub_node = await self.host.memory.storage.getPubsubNode( + pubsub_node = await self.host.memory.storage.get_pubsub_node( client, service, node ) if pubsub_node is None: @@ -478,8 +478,8 @@ f"{node!r} at {service} doesn't exist in cache for {client.profile!r}" ) max_items = None if max_items == C.NO_LIMIT else max_items - extra = self._p.parseExtra(data_format.deserialise(extra)) - items, metadata = await self.getItemsFromCache( + extra = self._p.parse_extra(data_format.deserialise(extra)) + items, metadata = await self.get_items_from_cache( client, pubsub_node, max_items, @@ -490,7 +490,7 @@ ) return [i.data for i in items], metadata - async def getItemsFromCache( + async def get_items_from_cache( self, client: SatXMPPEntity, node: PubsubNode, @@ -507,7 +507,7 @@ raise NotImplementedError("MAM queries are not supported yet") if max_items is None and rsm_request is None: max_items = 20 - pubsub_items, metadata = await self.host.memory.storage.getItems( + pubsub_items, metadata = await self.host.memory.storage.get_items( node, max_items=max_items, item_ids=item_ids or None, order_by=extra.get(C.KEY_ORDER_BY) ) @@ -520,7 +520,7 @@ raise exceptions.InternalError( "Pubsub max items and item IDs must not be used at the same time" ) - pubsub_items, metadata = await self.host.memory.storage.getItems( + pubsub_items, metadata = await self.host.memory.storage.get_items( node, max_items=max_items, order_by=extra.get(C.KEY_ORDER_BY) ) else: @@ -530,7 +530,7 @@ desc = True else: before = rsm_request.before - pubsub_items, metadata = await self.host.memory.storage.getItems( + pubsub_items, metadata = await self.host.memory.storage.get_items( node, max_items=rsm_request.max, before=before, after=rsm_request.after, from_index=rsm_request.index, order_by=extra.get(C.KEY_ORDER_BY), desc=desc, force_rsm=True, @@ -538,8 +538,8 @@ return pubsub_items, metadata - async def onItemsEvent(self, client, event): - node = await self.host.memory.storage.getPubsubNode( + async def on_items_event(self, client, event): + node = await self.host.memory.storage.get_pubsub_node( client, event.sender, event.nodeIdentifier ) if node is None: @@ -555,45 +555,45 @@ if not item_id: log.warning( "Ignoring invalid retract item element: " - f"{xml_tools.pFmtElt(elt)}" + f"{xml_tools.p_fmt_elt(elt)}" ) continue retract_ids.append(elt["id"]) else: log.warning( - f"Unexpected Pubsub event element: {xml_tools.pFmtElt(elt)}" + f"Unexpected Pubsub event element: {xml_tools.p_fmt_elt(elt)}" ) if items: log.debug(f"[{client.profile}] caching new items received from {node}") - await self.cacheItems( + await self.cache_items( client, node, items ) if retract_ids: log.debug(f"deleting retracted items from {node}") - await self.host.memory.storage.deletePubsubItems( + await self.host.memory.storage.delete_pubsub_items( node, items_names=retract_ids ) - async def onDeleteEvent(self, client, event): + async def on_delete_event(self, client, event): log.debug( f"deleting node {event.nodeIdentifier} from {event.sender} for " f"{client.profile}" ) - await self.host.memory.storage.deletePubsubNode( + await self.host.memory.storage.delete_pubsub_node( [client.profile], [event.sender], [event.nodeIdentifier] ) - async def onPurgeEvent(self, client, event): - node = await self.host.memory.storage.getPubsubNode( + async def on_purge_event(self, client, event): + node = await self.host.memory.storage.get_pubsub_node( client, event.sender, event.nodeIdentifier ) if node is None: return log.debug(f"purging node {node} for {client.profile}") - await self.host.memory.storage.deletePubsubItems(node) + await self.host.memory.storage.delete_pubsub_items(node) - async def _getItemsTrigger( + async def _get_items_trigger( self, client: SatXMPPEntity, service: Optional[jid.JID], @@ -613,17 +613,17 @@ if service is None: service = client.jid.userhostJID() for __ in range(5): - pubsub_node = await self.host.memory.storage.getPubsubNode( + pubsub_node = await self.host.memory.storage.get_pubsub_node( client, service, node ) if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED: analyse = {"to_sync": True} else: - analyse = await self.analyseNode(client, service, node) + analyse = await self.analyse_node(client, service, node) if pubsub_node is None: try: - pubsub_node = await self.host.memory.storage.setPubsubNode( + pubsub_node = await self.host.memory.storage.set_pubsub_node( client, service, node, @@ -650,7 +650,7 @@ if "mam" in extra: log.debug("MAM caching is not supported yet, skipping cache") return True, None - pubsub_items, metadata = await self.getItemsFromCache( + pubsub_items, metadata = await self.get_items_from_cache( client, pubsub_node, max_items, item_ids, sub_id, rsm_request, extra ) return False, ([i.data for i in pubsub_items], metadata) @@ -663,7 +663,7 @@ "restarted. Resetting the status, caching will be done again." ) pubsub_node.sync_state = None - await self.host.memory.storage.deletePubsubItems(pubsub_node) + await self.host.memory.storage.delete_pubsub_items(pubsub_node) elif time.time() - pubsub_node.sync_state_updated > PROGRESS_DEADLINE: log.warning( f"{pubsub_node} is in progress for too long " @@ -672,7 +672,7 @@ ) self.in_progress.pop[(service, node)].cancel() pubsub_node.sync_state = None - await self.host.memory.storage.deletePubsubItems(pubsub_node) + await self.host.memory.storage.delete_pubsub_items(pubsub_node) else: log.debug( f"{pubsub_node} synchronisation is already in progress, skipping" @@ -684,7 +684,7 @@ f"There is already a caching in progress for {pubsub_node}, this " "should not happen" ) - self.cacheNode(client, pubsub_node) + self.cache_node(client, pubsub_node) elif pubsub_node.sync_state == SyncState.ERROR: log.debug( f"{pubsub_node} synchronisation has previously failed, skipping" @@ -692,7 +692,7 @@ return True, None - async def _subscribeTrigger( + async def _subscribe_trigger( self, client: SatXMPPEntity, service: jid.JID, @@ -703,7 +703,7 @@ ) -> None: pass - async def _unsubscribeTrigger( + async def _unsubscribe_trigger( self, client: SatXMPPEntity, service: jid.JID, @@ -715,7 +715,7 @@ pass def _synchronise(self, service, node, profile_key): - client = self.host.getClient(profile_key) + client = self.host.get_client(profile_key) service = client.jid.userhostJID() if not service else jid.JID(service) return defer.ensureDeferred(self.synchronise(client, service, node)) @@ -735,7 +735,7 @@ resynchronised (all items will be deleted and re-downloaded). """ - pubsub_node = await self.host.memory.storage.getPubsubNode( + pubsub_node = await self.host.memory.storage.get_pubsub_node( client, service, node ) if pubsub_node is None: @@ -744,8 +744,8 @@ "Synchronising the new node {node} at {service}" ).format(node=node, service=service.full) ) - analyse = await self.analyseNode(client, service, node) - pubsub_node = await self.host.memory.storage.setPubsubNode( + analyse = await self.analyse_node(client, service, node) + pubsub_node = await self.host.memory.storage.set_pubsub_node( client, service, node, @@ -772,8 +772,8 @@ ) # we first delete and recreate the node (will also delete its items) await self.host.memory.storage.delete(pubsub_node) - analyse = await self.analyseNode(client, service, node) - pubsub_node = await self.host.memory.storage.setPubsubNode( + analyse = await self.analyse_node(client, service, node) + pubsub_node = await self.host.memory.storage.set_pubsub_node( client, service, node, @@ -781,7 +781,7 @@ type_=analyse.get("type"), ) # then we can put node in cache - await self.cacheNode(client, pubsub_node) + await self.cache_node(client, pubsub_node) async def purge(self, purge_filters: dict) -> None: """Remove items according to filters @@ -804,7 +804,7 @@ datetime before which items must have been updated last to be deleted """ purge_filters["names"] = purge_filters.pop("nodes", None) - await self.host.memory.storage.purgePubsubItems(**purge_filters) + await self.host.memory.storage.purge_pubsub_items(**purge_filters) def _purge(self, purge_filters: str) -> None: purge_filters = data_format.deserialise(purge_filters) @@ -820,16 +820,16 @@ After calling this method, cache will be refilled progressively as if it where new """ - await self.host.memory.storage.deletePubsubNode(None, None, None) + await self.host.memory.storage.delete_pubsub_node(None, None, None) def _reset(self) -> defer.Deferred: return defer.ensureDeferred(self.reset()) async def search(self, query: dict) -> List[PubsubItem]: """Search pubsub items in cache""" - return await self.host.memory.storage.searchPubsubItems(query) + return await self.host.memory.storage.search_pubsub_items(query) - async def serialisableSearch(self, query: dict) -> List[dict]: + async def serialisable_search(self, query: dict) -> List[dict]: """Search pubsub items in cache and returns parsed data The returned data can be serialised. @@ -844,7 +844,7 @@ parsed["pubsub_node"] = item.node.name if query.get("with_payload"): parsed["item_payload"] = item.data.toXml() - parsed["node_profile"] = self.host.memory.storage.getProfileById( + parsed["node_profile"] = self.host.memory.storage.get_profile_by_id( item.node.profile_id ) @@ -856,6 +856,6 @@ services = query.get("services") if services: query["services"] = [jid.JID(s) for s in services] - d = defer.ensureDeferred(self.serialisableSearch(query)) + d = defer.ensureDeferred(self.serialisable_search(query)) d.addCallback(data_format.serialise) return d