diff libervia/backend/plugins/plugin_pubsub_cache.py @ 4270:0d7bb4df2343

Reformatted code base using black.
author Goffi <goffi@goffi.org>
date Wed, 19 Jun 2024 18:44:57 +0200
parents ba46d6a0ff3a
children
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_pubsub_cache.py	Tue Jun 18 12:06:45 2024 +0200
+++ b/libervia/backend/plugins/plugin_pubsub_cache.py	Wed Jun 19 18:44:57 2024 +0200
@@ -55,7 +55,6 @@
 PROGRESS_DEADLINE = 60 * 60 * 6
 
 
-
 class PubsubCache:
     # TODO: there is currently no notification for (un)subscribe events with XEP-0060,
     #   but it would be necessary to have this data if some devices unsubscribe a cached
@@ -196,7 +195,7 @@
             raise ValueError('"type" is mandatory in analyser')
         type_test_keys = {"node", "namespace"}
         if not type_test_keys.intersection(analyser):
-            raise ValueError(f'at least one of {type_test_keys} must be used')
+            raise ValueError(f"at least one of {type_test_keys} must be used")
         if name in self.analysers:
             raise exceptions.Conflict(
                 f"An analyser with the name {name!r} is already registered"
@@ -204,10 +203,7 @@
         self.analysers[name] = analyser
 
     async def cache_items(
-        self,
-        client: SatXMPPEntity,
-        pubsub_node: PubsubNode,
-        items: List[domish.Element]
+        self, client: SatXMPPEntity, pubsub_node: PubsubNode, items: List[domish.Element]
     ) -> None:
         try:
             parser = self.analysers[pubsub_node.analyser].get("parser")
@@ -217,11 +213,7 @@
         if parser is not None:
             parsed_items = [
                 await utils.as_deferred(
-                    parser,
-                    client,
-                    item,
-                    pubsub_node.service,
-                    pubsub_node.name
+                    parser, client, item, pubsub_node.service, pubsub_node.name
                 )
                 for item in items
             ]
@@ -232,19 +224,13 @@
             client, pubsub_node, items, parsed_items
         )
 
-    async def _cache_node(
-        self,
-        client: SatXMPPEntity,
-        pubsub_node: PubsubNode
-    ) -> None:
+    async def _cache_node(self, client: SatXMPPEntity, pubsub_node: PubsubNode) -> None:
         await self.host.memory.storage.update_pubsub_node_sync_state(
             pubsub_node, SyncState.IN_PROGRESS
         )
         service, node = pubsub_node.service, pubsub_node.name
         try:
-            log.debug(
-                f"Caching node {node!r} at {service} for {client.profile}"
-            )
+            log.debug(f"Caching node {node!r} at {service} for {client.profile}")
             if not pubsub_node.subscribed:
                 try:
                     sub = await self._p.subscribe(client, service, node)
@@ -286,9 +272,7 @@
                     items, __ = await client.pubsub_client.items(
                         pubsub_node.service, pubsub_node.name, maxItems=20
                     )
-                    await self.cache_items(
-                        client, pubsub_node, items
-                    )
+                    await self.cache_items(client, pubsub_node, items)
                 else:
                     raise e
             except exceptions.FeatureNotFound:
@@ -299,9 +283,7 @@
                 items, __ = await client.pubsub_client.items(
                     pubsub_node.service, pubsub_node.name, maxItems=20
                 )
-                await self.cache_items(
-                    client, pubsub_node, items
-                )
+                await self.cache_items(client, pubsub_node, items)
             else:
                 rsm_p = self.host.plugins["XEP-0059"]
                 rsm_request = rsm.RSMRequest()
@@ -310,9 +292,7 @@
                     items, rsm_response = await client.pubsub_client.items(
                         service, node, rsm_request=rsm_request
                     )
-                    await self.cache_items(
-                        client, pubsub_node, items
-                    )
+                    await self.cache_items(client, pubsub_node, items)
                     for item in items:
                         item_id = item["id"]
                         if item_id in cached_ids:
@@ -343,6 +323,7 @@
             )
         except Exception as e:
             import traceback
+
             tb = traceback.format_tb(e.__traceback__)
             log.error(
                 f"Can't cache node {node!r} at {service} for {client.profile}: {e}\n{tb}"
@@ -356,11 +337,7 @@
     def _cache_node_clean(self, __, pubsub_node):
         del self.in_progress[(pubsub_node.service, pubsub_node.name)]
 
-    def cache_node(
-        self,
-        client: SatXMPPEntity,
-        pubsub_node: PubsubNode
-    ) -> None:
+    def cache_node(self, client: SatXMPPEntity, pubsub_node: PubsubNode) -> None:
         """Launch node caching as a background task"""
         d = defer.ensureDeferred(self._cache_node(client, pubsub_node))
         d.addBoth(self._cache_node_clean, pubsub_node=pubsub_node)
@@ -372,15 +349,13 @@
         client: SatXMPPEntity,
         service: jid.JID,
         node: str,
-        pubsub_node : PubsubNode = None,
+        pubsub_node: PubsubNode = None,
     ) -> dict:
         """Use registered analysers on a node to determine what it is used for"""
         analyse = {"service": service, "node": node}
         if pubsub_node is None:
             try:
-                first_item = (await client.pubsub_client.items(
-                    service, node, 1
-                ))[0][0]
+                first_item = (await client.pubsub_client.items(service, node, 1))[0][0]
             except IndexError:
                 pass
             except error.StanzaError as e:
@@ -442,9 +417,7 @@
 
         else:
             found = False
-            log.debug(
-                f"node {node!r} at service {service} doesn't match any known type"
-            )
+            log.debug(f"node {node!r} at service {service} doesn't match any known type")
         if found:
             try:
                 match_cb = analyser["match_cb"]
@@ -455,12 +428,20 @@
         return analyse
 
     def _get_items_from_cache(
-        self, service="", node="", max_items=10, item_ids=None, sub_id=None,
-        extra="", profile_key=C.PROF_KEY_NONE
+        self,
+        service="",
+        node="",
+        max_items=10,
+        item_ids=None,
+        sub_id=None,
+        extra="",
+        profile_key=C.PROF_KEY_NONE,
     ):
-        d = defer.ensureDeferred(self._a_get_items_from_cache(
-            service, node, max_items, item_ids, sub_id, extra, profile_key
-        ))
+        d = defer.ensureDeferred(
+            self._a_get_items_from_cache(
+                service, node, max_items, item_ids, sub_id, extra, profile_key
+            )
+        )
         d.addCallback(self._p.trans_items_data)
         d.addCallback(self._p.serialise_items)
         return d
@@ -498,7 +479,7 @@
         item_ids: Optional[List[str]] = None,
         sub_id: Optional[str] = None,
         rsm_request: Optional[rsm.RSMRequest] = None,
-        extra: Optional[Dict[str, Any]] = None
+        extra: Optional[Dict[str, Any]] = None,
     ) -> Tuple[List[PubsubItem], dict]:
         """Get items from cache, using same arguments as for external Pubsub request"""
         if extra is None:
@@ -508,8 +489,10 @@
         if max_items is None and rsm_request is None:
             max_items = 20
             pubsub_items, metadata = await self.host.memory.storage.get_items(
-                node, max_items=max_items, item_ids=item_ids or None,
-                order_by=extra.get(C.KEY_ORDER_BY)
+                node,
+                max_items=max_items,
+                item_ids=item_ids or None,
+                order_by=extra.get(C.KEY_ORDER_BY),
             )
         elif max_items is not None:
             if rsm_request is not None:
@@ -531,9 +514,14 @@
             else:
                 before = rsm_request.before
             pubsub_items, metadata = await self.host.memory.storage.get_items(
-                node, max_items=rsm_request.max, before=before, after=rsm_request.after,
-                from_index=rsm_request.index, order_by=extra.get(C.KEY_ORDER_BY),
-                desc=desc, force_rsm=True,
+                node,
+                max_items=rsm_request.max,
+                before=before,
+                after=rsm_request.after,
+                from_index=rsm_request.index,
+                order_by=extra.get(C.KEY_ORDER_BY),
+                desc=desc,
+                force_rsm=True,
             )
 
         return pubsub_items, metadata
@@ -566,9 +554,7 @@
                     )
             if items:
                 log.debug(f"[{client.profile}] caching new items received from {node}")
-                await self.cache_items(
-                    client, node, items
-                )
+                await self.cache_items(client, node, items)
             if retract_ids:
                 log.debug(f"deleting retracted items from {node}")
                 await self.host.memory.storage.delete_pubsub_items(
@@ -602,7 +588,7 @@
         item_ids: Optional[List[str]],
         sub_id: Optional[str],
         rsm_request: Optional[rsm.RSMRequest],
-        extra: dict
+        extra: dict,
     ) -> Tuple[bool, Optional[Tuple[List[dict], dict]]]:
         if not self.use_cache:
             log.debug("cache disabled in settings")
@@ -699,7 +685,7 @@
         nodeIdentifier: str,
         sub_jid: Optional[jid.JID],
         options: Optional[dict],
-        subscription: pubsub.Subscription
+        subscription: pubsub.Subscription,
     ) -> None:
         pass
 
@@ -720,11 +706,7 @@
         return defer.ensureDeferred(self.synchronise(client, service, node))
 
     async def synchronise(
-        self,
-        client: SatXMPPEntity,
-        service: jid.JID,
-        node: str,
-        resync: bool = True
+        self, client: SatXMPPEntity, service: jid.JID, node: str, resync: bool = True
     ) -> None:
         """Synchronise a node with a pubsub service
 
@@ -740,9 +722,9 @@
         )
         if pubsub_node is None:
             log.info(
-                _(
-                    "Synchronising the new node {node} at {service}"
-                ).format(node=node, service=service.full)
+                _("Synchronising the new node {node} at {service}").format(
+                    node=node, service=service.full
+                )
             )
             analyse = await self.analyse_node(client, service, node)
             pubsub_node = await self.host.memory.storage.set_pubsub_node(
@@ -753,11 +735,13 @@
                 type_=analyse.get("type"),
             )
         elif not resync and pubsub_node.sync_state is not None:
-                # the node exists, nothing to do
-                return
+            # the node exists, nothing to do
+            return
 
-        if ((pubsub_node.sync_state == SyncState.IN_PROGRESS
-             or (service, node) in self.in_progress)):
+        if (
+            pubsub_node.sync_state == SyncState.IN_PROGRESS
+            or (service, node) in self.in_progress
+        ):
             log.warning(
                 _(
                     "{node} at {service} is already being synchronised, can't do a new "