Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_pubsub_cache.py @ 4270:0d7bb4df2343
Reformatted code base using black.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 19 Jun 2024 18:44:57 +0200 |
parents | ba46d6a0ff3a |
children |
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_pubsub_cache.py Tue Jun 18 12:06:45 2024 +0200 +++ b/libervia/backend/plugins/plugin_pubsub_cache.py Wed Jun 19 18:44:57 2024 +0200 @@ -55,7 +55,6 @@ PROGRESS_DEADLINE = 60 * 60 * 6 - class PubsubCache: # TODO: there is currently no notification for (un)subscribe events with XEP-0060, # but it would be necessary to have this data if some devices unsubscribe a cached @@ -196,7 +195,7 @@ raise ValueError('"type" is mandatory in analyser') type_test_keys = {"node", "namespace"} if not type_test_keys.intersection(analyser): - raise ValueError(f'at least one of {type_test_keys} must be used') + raise ValueError(f"at least one of {type_test_keys} must be used") if name in self.analysers: raise exceptions.Conflict( f"An analyser with the name {name!r} is already registered" @@ -204,10 +203,7 @@ self.analysers[name] = analyser async def cache_items( - self, - client: SatXMPPEntity, - pubsub_node: PubsubNode, - items: List[domish.Element] + self, client: SatXMPPEntity, pubsub_node: PubsubNode, items: List[domish.Element] ) -> None: try: parser = self.analysers[pubsub_node.analyser].get("parser") @@ -217,11 +213,7 @@ if parser is not None: parsed_items = [ await utils.as_deferred( - parser, - client, - item, - pubsub_node.service, - pubsub_node.name + parser, client, item, pubsub_node.service, pubsub_node.name ) for item in items ] @@ -232,19 +224,13 @@ client, pubsub_node, items, parsed_items ) - async def _cache_node( - self, - client: SatXMPPEntity, - pubsub_node: PubsubNode - ) -> None: + async def _cache_node(self, client: SatXMPPEntity, pubsub_node: PubsubNode) -> None: await self.host.memory.storage.update_pubsub_node_sync_state( pubsub_node, SyncState.IN_PROGRESS ) service, node = pubsub_node.service, pubsub_node.name try: - log.debug( - f"Caching node {node!r} at {service} for {client.profile}" - ) + log.debug(f"Caching node {node!r} at {service} for {client.profile}") if not pubsub_node.subscribed: try: sub = await self._p.subscribe(client, service, node) @@ -286,9 +272,7 @@ items, __ = await client.pubsub_client.items( pubsub_node.service, pubsub_node.name, maxItems=20 ) - await self.cache_items( - client, pubsub_node, items - ) + await self.cache_items(client, pubsub_node, items) else: raise e except exceptions.FeatureNotFound: @@ -299,9 +283,7 @@ items, __ = await client.pubsub_client.items( pubsub_node.service, pubsub_node.name, maxItems=20 ) - await self.cache_items( - client, pubsub_node, items - ) + await self.cache_items(client, pubsub_node, items) else: rsm_p = self.host.plugins["XEP-0059"] rsm_request = rsm.RSMRequest() @@ -310,9 +292,7 @@ items, rsm_response = await client.pubsub_client.items( service, node, rsm_request=rsm_request ) - await self.cache_items( - client, pubsub_node, items - ) + await self.cache_items(client, pubsub_node, items) for item in items: item_id = item["id"] if item_id in cached_ids: @@ -343,6 +323,7 @@ ) except Exception as e: import traceback + tb = traceback.format_tb(e.__traceback__) log.error( f"Can't cache node {node!r} at {service} for {client.profile}: {e}\n{tb}" @@ -356,11 +337,7 @@ def _cache_node_clean(self, __, pubsub_node): del self.in_progress[(pubsub_node.service, pubsub_node.name)] - def cache_node( - self, - client: SatXMPPEntity, - pubsub_node: PubsubNode - ) -> None: + def cache_node(self, client: SatXMPPEntity, pubsub_node: PubsubNode) -> None: """Launch node caching as a background task""" d = defer.ensureDeferred(self._cache_node(client, pubsub_node)) d.addBoth(self._cache_node_clean, pubsub_node=pubsub_node) @@ -372,15 +349,13 @@ client: SatXMPPEntity, service: jid.JID, node: str, - pubsub_node : PubsubNode = None, + pubsub_node: PubsubNode = None, ) -> dict: """Use registered analysers on a node to determine what it is used for""" analyse = {"service": service, "node": node} if pubsub_node is None: try: - first_item = (await client.pubsub_client.items( - service, node, 1 - ))[0][0] + first_item = (await client.pubsub_client.items(service, node, 1))[0][0] except IndexError: pass except error.StanzaError as e: @@ -442,9 +417,7 @@ else: found = False - log.debug( - f"node {node!r} at service {service} doesn't match any known type" - ) + log.debug(f"node {node!r} at service {service} doesn't match any known type") if found: try: match_cb = analyser["match_cb"] @@ -455,12 +428,20 @@ return analyse def _get_items_from_cache( - self, service="", node="", max_items=10, item_ids=None, sub_id=None, - extra="", profile_key=C.PROF_KEY_NONE + self, + service="", + node="", + max_items=10, + item_ids=None, + sub_id=None, + extra="", + profile_key=C.PROF_KEY_NONE, ): - d = defer.ensureDeferred(self._a_get_items_from_cache( - service, node, max_items, item_ids, sub_id, extra, profile_key - )) + d = defer.ensureDeferred( + self._a_get_items_from_cache( + service, node, max_items, item_ids, sub_id, extra, profile_key + ) + ) d.addCallback(self._p.trans_items_data) d.addCallback(self._p.serialise_items) return d @@ -498,7 +479,7 @@ item_ids: Optional[List[str]] = None, sub_id: Optional[str] = None, rsm_request: Optional[rsm.RSMRequest] = None, - extra: Optional[Dict[str, Any]] = None + extra: Optional[Dict[str, Any]] = None, ) -> Tuple[List[PubsubItem], dict]: """Get items from cache, using same arguments as for external Pubsub request""" if extra is None: @@ -508,8 +489,10 @@ if max_items is None and rsm_request is None: max_items = 20 pubsub_items, metadata = await self.host.memory.storage.get_items( - node, max_items=max_items, item_ids=item_ids or None, - order_by=extra.get(C.KEY_ORDER_BY) + node, + max_items=max_items, + item_ids=item_ids or None, + order_by=extra.get(C.KEY_ORDER_BY), ) elif max_items is not None: if rsm_request is not None: @@ -531,9 +514,14 @@ else: before = rsm_request.before pubsub_items, metadata = await self.host.memory.storage.get_items( - node, max_items=rsm_request.max, before=before, after=rsm_request.after, - from_index=rsm_request.index, order_by=extra.get(C.KEY_ORDER_BY), - desc=desc, force_rsm=True, + node, + max_items=rsm_request.max, + before=before, + after=rsm_request.after, + from_index=rsm_request.index, + order_by=extra.get(C.KEY_ORDER_BY), + desc=desc, + force_rsm=True, ) return pubsub_items, metadata @@ -566,9 +554,7 @@ ) if items: log.debug(f"[{client.profile}] caching new items received from {node}") - await self.cache_items( - client, node, items - ) + await self.cache_items(client, node, items) if retract_ids: log.debug(f"deleting retracted items from {node}") await self.host.memory.storage.delete_pubsub_items( @@ -602,7 +588,7 @@ item_ids: Optional[List[str]], sub_id: Optional[str], rsm_request: Optional[rsm.RSMRequest], - extra: dict + extra: dict, ) -> Tuple[bool, Optional[Tuple[List[dict], dict]]]: if not self.use_cache: log.debug("cache disabled in settings") @@ -699,7 +685,7 @@ nodeIdentifier: str, sub_jid: Optional[jid.JID], options: Optional[dict], - subscription: pubsub.Subscription + subscription: pubsub.Subscription, ) -> None: pass @@ -720,11 +706,7 @@ return defer.ensureDeferred(self.synchronise(client, service, node)) async def synchronise( - self, - client: SatXMPPEntity, - service: jid.JID, - node: str, - resync: bool = True + self, client: SatXMPPEntity, service: jid.JID, node: str, resync: bool = True ) -> None: """Synchronise a node with a pubsub service @@ -740,9 +722,9 @@ ) if pubsub_node is None: log.info( - _( - "Synchronising the new node {node} at {service}" - ).format(node=node, service=service.full) + _("Synchronising the new node {node} at {service}").format( + node=node, service=service.full + ) ) analyse = await self.analyse_node(client, service, node) pubsub_node = await self.host.memory.storage.set_pubsub_node( @@ -753,11 +735,13 @@ type_=analyse.get("type"), ) elif not resync and pubsub_node.sync_state is not None: - # the node exists, nothing to do - return + # the node exists, nothing to do + return - if ((pubsub_node.sync_state == SyncState.IN_PROGRESS - or (service, node) in self.in_progress)): + if ( + pubsub_node.sync_state == SyncState.IN_PROGRESS + or (service, node) in self.in_progress + ): log.warning( _( "{node} at {service} is already being synchronised, can't do a new "