diff libervia/backend/plugins/plugin_xep_0060.py @ 4270:0d7bb4df2343

Reformatted code base using black.
author Goffi <goffi@goffi.org>
date Wed, 19 Jun 2024 18:44:57 +0200
parents 2b000790b197
children 23842a63ea00
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_xep_0060.py	Tue Jun 18 12:06:45 2024 +0200
+++ b/libervia/backend/plugins/plugin_xep_0060.py	Wed Jun 19 18:44:57 2024 +0200
@@ -70,11 +70,12 @@
 TIMEOUT = 30
 # minimum features that a pubsub service must have to be selectable as default
 DEFAULT_PUBSUB_MIN_FEAT = {
- 'http://jabber.org/protocol/pubsub#persistent-items',
- 'http://jabber.org/protocol/pubsub#publish',
- 'http://jabber.org/protocol/pubsub#retract-items',
+    "http://jabber.org/protocol/pubsub#persistent-items",
+    "http://jabber.org/protocol/pubsub#publish",
+    "http://jabber.org/protocol/pubsub#retract-items",
 }
 
+
 class XEP_0060(object):
     OPT_ACCESS_MODEL = "pubsub#access_model"
     OPT_PERSIST_ITEMS = "pubsub#persist_items"
@@ -109,7 +110,9 @@
         self.host = host
         self._rsm = host.plugins.get("XEP-0059")
         self._mam = host.plugins.get("XEP-0313")
-        self._node_cb = {}  # dictionnary of callbacks for node (key: node, value: list of callbacks)
+        self._node_cb = (
+            {}
+        )  # dictionnary of callbacks for node (key: node, value: list of callbacks)
         self.rt_sessions = sat_defer.RTDeferredSessions()
         host.bridge.add_method(
             "ps_node_create",
@@ -376,9 +379,11 @@
             return {}
         try:
             return {
-                "service": client.pubsub_service.full()
-                if client.pubsub_service is not None
-                else ""
+                "service": (
+                    client.pubsub_service.full()
+                    if client.pubsub_service is not None
+                    else ""
+                )
             }
         except AttributeError:
             if self.host.is_connected(profile):
@@ -423,12 +428,7 @@
 
         return Extra(rsm_request, extra)
 
-    def add_managed_node(
-        self,
-        node: str,
-        priority: int = 0,
-        **kwargs: Callable
-    ):
+    def add_managed_node(self, node: str, priority: int = 0, **kwargs: Callable):
         """Add a handler for a node
 
         @param node: node to monitor
@@ -514,31 +514,40 @@
     #     d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_])
     #     return d
 
-    def _send_item(self, service, nodeIdentifier, payload, item_id=None, extra_ser="",
-                  profile_key=C.PROF_KEY_NONE):
+    def _send_item(
+        self,
+        service,
+        nodeIdentifier,
+        payload,
+        item_id=None,
+        extra_ser="",
+        profile_key=C.PROF_KEY_NONE,
+    ):
         client = self.host.get_client(profile_key)
         service = None if not service else jid.JID(service)
         payload = xml_tools.parse(payload)
         extra = data_format.deserialise(extra_ser)
-        d = defer.ensureDeferred(self.send_item(
-            client, service, nodeIdentifier, payload, item_id or None, extra
-        ))
+        d = defer.ensureDeferred(
+            self.send_item(
+                client, service, nodeIdentifier, payload, item_id or None, extra
+            )
+        )
         d.addCallback(lambda ret: ret or "")
         return d
 
-    def _send_items(self, service, nodeIdentifier, items, extra_ser=None,
-                  profile_key=C.PROF_KEY_NONE):
+    def _send_items(
+        self, service, nodeIdentifier, items, extra_ser=None, profile_key=C.PROF_KEY_NONE
+    ):
         client = self.host.get_client(profile_key)
         service = None if not service else jid.JID(service)
         try:
             items = [xml_tools.parse(item) for item in items]
         except Exception as e:
-            raise exceptions.DataError(_("Can't parse items: {msg}").format(
-                msg=e))
+            raise exceptions.DataError(_("Can't parse items: {msg}").format(msg=e))
         extra = data_format.deserialise(extra_ser)
-        return defer.ensureDeferred(self.send_items(
-            client, service, nodeIdentifier, items, extra=extra
-        ))
+        return defer.ensureDeferred(
+            self.send_items(client, service, nodeIdentifier, items, extra=extra)
+        )
 
     async def send_item(
         self,
@@ -547,7 +556,7 @@
         nodeIdentifier: str,
         payload: domish.Element,
         item_id: Optional[str] = None,
-        extra: Optional[Dict[str, Any]] = None
+        extra: Optional[Dict[str, Any]] = None,
     ) -> Optional[str]:
         """High level method to send one item
 
@@ -559,16 +568,12 @@
         @return: id of the created item
         """
         assert isinstance(payload, domish.Element)
-        item_elt = domish.Element((pubsub.NS_PUBSUB, 'item'))
+        item_elt = domish.Element((pubsub.NS_PUBSUB, "item"))
         if item_id is not None:
-            item_elt['id'] = item_id
+            item_elt["id"] = item_id
         item_elt.addChild(payload)
         published_ids = await self.send_items(
-            client,
-            service,
-            nodeIdentifier,
-            [item_elt],
-            extra=extra
+            client, service, nodeIdentifier, [item_elt], extra=extra
         )
         try:
             return published_ids[0]
@@ -582,7 +587,7 @@
         nodeIdentifier: str,
         items: List[domish.Element],
         sender: Optional[jid.JID] = None,
-        extra: Optional[Dict[str, Any]] = None
+        extra: Optional[Dict[str, Any]] = None,
     ) -> List[str]:
         """High level method to send several items at once
 
@@ -608,36 +613,47 @@
             service = client.jid.userhostJID()
         parsed_items = []
         for item in items:
-            if item.name != 'item':
+            if item.name != "item":
                 raise exceptions.DataError(_("Invalid item: {xml}").format(item.toXml()))
             item_id = item.getAttribute("id")
             parsed_items.append(pubsub.Item(id=item_id, payload=item.firstChildElement()))
         publish_options = extra.get(self.EXTRA_PUBLISH_OPTIONS)
         try:
             iq_result = await self.publish(
-                client, service, nodeIdentifier, parsed_items, options=publish_options,
-                sender=sender
+                client,
+                service,
+                nodeIdentifier,
+                parsed_items,
+                options=publish_options,
+                sender=sender,
             )
         except error.StanzaError as e:
-            if ((e.condition == 'conflict' and e.appCondition
-                 and e.appCondition.name == 'precondition-not-met'
-                 and publish_options is not None)):
+            if (
+                e.condition == "conflict"
+                and e.appCondition
+                and e.appCondition.name == "precondition-not-met"
+                and publish_options is not None
+            ):
                 # this usually happens when publish-options can't be set
-                policy = extra.get(self.EXTRA_ON_PRECOND_NOT_MET, 'raise')
-                if policy == 'raise':
+                policy = extra.get(self.EXTRA_ON_PRECOND_NOT_MET, "raise")
+                if policy == "raise":
                     raise e
-                elif policy == 'publish_without_options':
-                    log.warning(_(
-                        "Can't use publish-options ({options}) on node {node}, "
-                        "re-publishing without them: {reason}").format(
-                            options=', '.join(f'{k} = {v}'
-                                    for k,v in publish_options.items()),
+                elif policy == "publish_without_options":
+                    log.warning(
+                        _(
+                            "Can't use publish-options ({options}) on node {node}, "
+                            "re-publishing without them: {reason}"
+                        ).format(
+                            options=", ".join(
+                                f"{k} = {v}" for k, v in publish_options.items()
+                            ),
                             node=nodeIdentifier,
                             reason=e,
                         )
                     )
                     iq_result = await self.publish(
-                        client, service, nodeIdentifier, parsed_items)
+                        client, service, nodeIdentifier, parsed_items
+                    )
                 else:
                     raise exceptions.InternalError(
                         f"Invalid policy in extra's {self.EXTRA_ON_PRECOND_NOT_MET!r}: "
@@ -647,8 +663,8 @@
                 raise e
         try:
             return [
-                item['id']
-                for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, 'item')
+                item["id"]
+                for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, "item")
             ]
         except AttributeError:
             return []
@@ -661,7 +677,7 @@
         items: Optional[List[domish.Element]] = None,
         options: Optional[dict] = None,
         sender: Optional[jid.JID] = None,
-        extra: Optional[Dict[str, Any]] = None
+        extra: Optional[Dict[str, Any]] = None,
     ) -> domish.Element:
         """Publish pubsub items
 
@@ -679,13 +695,18 @@
         if extra is None:
             extra = {}
         if not await self.host.trigger.async_point(
-            "XEP-0060_publish", client, service, nodeIdentifier, items, options, sender,
-            extra
+            "XEP-0060_publish",
+            client,
+            service,
+            nodeIdentifier,
+            items,
+            options,
+            sender,
+            extra,
         ):
             return extra["iq_result_elt"]
         iq_result_elt = await client.pubsub_client.publish(
-            service, nodeIdentifier, items, sender,
-            options=options
+            service, nodeIdentifier, items, sender, options=options
         )
         return iq_result_elt
 
@@ -693,25 +714,35 @@
         try:
             item_elt = reduce(
                 lambda elt, ns_name: next(elt.elements(*ns_name)),
-                (message_elt,
-                 (mam.NS_MAM, "result"),
-                 (C.NS_FORWARD, "forwarded"),
-                 (C.NS_CLIENT, "message"),
-                 ("http://jabber.org/protocol/pubsub#event", "event"),
-                 ("http://jabber.org/protocol/pubsub#event", "items"),
-                 ("http://jabber.org/protocol/pubsub#event", "item"),
-                ))
+                (
+                    message_elt,
+                    (mam.NS_MAM, "result"),
+                    (C.NS_FORWARD, "forwarded"),
+                    (C.NS_CLIENT, "message"),
+                    ("http://jabber.org/protocol/pubsub#event", "event"),
+                    ("http://jabber.org/protocol/pubsub#event", "items"),
+                    ("http://jabber.org/protocol/pubsub#event", "item"),
+                ),
+            )
         except StopIteration:
             raise exceptions.DataError("Can't find Item in MAM message element")
         return item_elt
 
     def serialise_items(self, items_data):
         items, metadata = items_data
-        metadata['items'] = items
+        metadata["items"] = items
         return data_format.serialise(metadata)
 
-    def _get_items(self, service="", node="", max_items=10, item_ids=None, sub_id=None,
-                  extra="", profile_key=C.PROF_KEY_NONE):
+    def _get_items(
+        self,
+        service="",
+        node="",
+        max_items=10,
+        item_ids=None,
+        sub_id=None,
+        extra="",
+        profile_key=C.PROF_KEY_NONE,
+    ):
         """Get items from pubsub node
 
         @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit
@@ -720,16 +751,18 @@
         service = jid.JID(service) if service else None
         max_items = None if max_items == C.NO_LIMIT else max_items
         extra = self.parse_extra(data_format.deserialise(extra))
-        d = defer.ensureDeferred(self.get_items(
-            client,
-            service,
-            node,
-            max_items,
-            item_ids,
-            sub_id or None,
-            extra.rsm_request,
-            extra.extra,
-        ))
+        d = defer.ensureDeferred(
+            self.get_items(
+                client,
+                service,
+                node,
+                max_items,
+                item_ids,
+                sub_id or None,
+                extra.rsm_request,
+                extra.extra,
+            )
+        )
         d.addCallback(self.trans_items_data)
         d.addCallback(self.serialise_items)
         return d
@@ -743,7 +776,7 @@
         item_ids: Optional[List[str]] = None,
         sub_id: Optional[str] = None,
         rsm_request: Optional[rsm.RSMRequest] = None,
-        extra: Optional[dict] = None
+        extra: Optional[dict] = None,
     ) -> Tuple[List[domish.Element], dict]:
         """Retrieve pubsub items from a node.
 
@@ -770,25 +803,34 @@
         if extra is None:
             extra = {}
         cont, ret = await self.host.trigger.async_return_point(
-            "XEP-0060_getItems", client, service, node, max_items, item_ids, sub_id,
-            rsm_request, extra
+            "XEP-0060_getItems",
+            client,
+            service,
+            node,
+            max_items,
+            item_ids,
+            sub_id,
+            rsm_request,
+            extra,
         )
         if not cont:
             return ret
         try:
             mam_query = extra["mam"]
         except KeyError:
-            d = defer.ensureDeferred(client.pubsub_client.items(
-                service = service,
-                nodeIdentifier = node,
-                maxItems = max_items,
-                subscriptionIdentifier = sub_id,
-                sender = None,
-                itemIdentifiers = item_ids,
-                orderBy = extra.get(C.KEY_ORDER_BY),
-                rsm_request = rsm_request,
-                extra = extra
-            ))
+            d = defer.ensureDeferred(
+                client.pubsub_client.items(
+                    service=service,
+                    nodeIdentifier=node,
+                    maxItems=max_items,
+                    subscriptionIdentifier=sub_id,
+                    sender=None,
+                    itemIdentifiers=item_ids,
+                    orderBy=extra.get(C.KEY_ORDER_BY),
+                    rsm_request=rsm_request,
+                    extra=extra,
+                )
+            )
             # we have no MAM data here, so we add None
             d.addErrback(sat_defer.stanza_2_not_found)
             d.addTimeout(TIMEOUT, reactor)
@@ -844,7 +886,7 @@
             # we can put them directly in metadata
             metadata.update(mam_response)
         if rsm_request is not None and rsm_response is not None:
-            metadata['rsm'] = rsm_response.toDict()
+            metadata["rsm"] = rsm_response.toDict()
             if mam_response is None:
                 index = rsm_response.index
                 count = rsm_response.count
@@ -887,15 +929,28 @@
     #         d_dict[publisher] = self.get_items(service, node, max_items, None, sub_id, rsm, client.profile)
     #     defer.returnValue(d_dict)
 
-    def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None,
-                   profile_key=C.PROF_KEY_NONE):
+    def getOptions(
+        self,
+        service,
+        nodeIdentifier,
+        subscriber,
+        subscriptionIdentifier=None,
+        profile_key=C.PROF_KEY_NONE,
+    ):
         client = self.host.get_client(profile_key)
         return client.pubsub_client.getOptions(
             service, nodeIdentifier, subscriber, subscriptionIdentifier
         )
 
-    def setOptions(self, service, nodeIdentifier, subscriber, options,
-                   subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE):
+    def setOptions(
+        self,
+        service,
+        nodeIdentifier,
+        subscriber,
+        options,
+        subscriptionIdentifier=None,
+        profile_key=C.PROF_KEY_NONE,
+    ):
         client = self.host.get_client(profile_key)
         return client.pubsub_client.setOptions(
             service, nodeIdentifier, subscriber, options, subscriptionIdentifier
@@ -912,7 +967,7 @@
         client: SatXMPPClient,
         service: jid.JID,
         nodeIdentifier: Optional[str] = None,
-        options: Optional[Dict[str, str]] = None
+        options: Optional[Dict[str, str]] = None,
     ) -> str:
         """Create a new node
 
@@ -1038,9 +1093,7 @@
         d = self.get_node_affiliations(
             client, jid.JID(service_s) if service_s else None, nodeIdentifier
         )
-        d.addCallback(
-            lambda affiliations: {j.full(): a for j, a in affiliations.items()}
-        )
+        d.addCallback(lambda affiliations: {j.full(): a for j, a in affiliations.items()})
         return d
 
     def get_node_affiliations(self, client, service, nodeIdentifier):
@@ -1122,10 +1175,7 @@
         )
 
     def deleteNode(
-        self,
-        client: SatXMPPClient,
-        service: jid.JID,
-        nodeIdentifier: str
+        self, client: SatXMPPClient, service: jid.JID, nodeIdentifier: str
     ) -> defer.Deferred:
         return client.pubsub_client.deleteNode(service, nodeIdentifier)
 
@@ -1188,9 +1238,9 @@
     ):
         client = self.host.get_client(profile_key)
         service = jid.JID(service) if service else None
-        return defer.ensureDeferred(self.rename_item(
-            client, service, node, item_id, new_id
-        ))
+        return defer.ensureDeferred(
+            self.rename_item(client, service, node, item_id, new_id)
+        )
 
     async def rename_item(
         self,
@@ -1198,7 +1248,7 @@
         service: Optional[jid.JID],
         node: str,
         item_id: str,
-        new_id: str
+        new_id: str,
     ) -> None:
         """Rename an item by recreating it then deleting it
 
@@ -1218,10 +1268,7 @@
         service = None if not service else jid.JID(service)
         d = defer.ensureDeferred(
             self.subscribe(
-                client,
-                service,
-                nodeIdentifier,
-                options=data_format.deserialise(options)
+                client, service, nodeIdentifier, options=data_format.deserialise(options)
             )
         )
         d.addCallback(lambda subscription: subscription.subscriptionIdentifier or "")
@@ -1233,23 +1280,31 @@
         service: Optional[jid.JID],
         nodeIdentifier: str,
         sub_jid: Optional[jid.JID] = None,
-        options: Optional[dict] = None
+        options: Optional[dict] = None,
     ) -> pubsub.Subscription:
         # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe
         if service is None:
             service = client.jid.userhostJID()
         cont, trigger_sub = await self.host.trigger.async_return_point(
-            "XEP-0060_subscribe", client, service, nodeIdentifier, sub_jid, options,
+            "XEP-0060_subscribe",
+            client,
+            service,
+            nodeIdentifier,
+            sub_jid,
+            options,
         )
         if not cont:
             return trigger_sub
         try:
             subscription = await client.pubsub_client.subscribe(
-                service, nodeIdentifier, sub_jid or client.jid.userhostJID(),
-                options=options, sender=client.jid.userhostJID()
+                service,
+                nodeIdentifier,
+                sub_jid or client.jid.userhostJID(),
+                options=options,
+                sender=client.jid.userhostJID(),
             )
         except error.StanzaError as e:
-            if e.condition == 'item-not-found':
+            if e.condition == "item-not-found":
                 raise exceptions.NotFound(e.text or e.condition)
             else:
                 raise e
@@ -1270,18 +1325,23 @@
         sender: Optional[jid.JID] = None,
     ) -> None:
         if not await self.host.trigger.async_point(
-            "XEP-0060_unsubscribe", client, service, nodeIdentifier, sub_jid,
-            subscriptionIdentifier, sender
+            "XEP-0060_unsubscribe",
+            client,
+            service,
+            nodeIdentifier,
+            sub_jid,
+            subscriptionIdentifier,
+            sender,
         ):
             return
         try:
             await client.pubsub_client.unsubscribe(
-            service,
-            nodeIdentifier,
-            sub_jid or client.jid.userhostJID(),
-            subscriptionIdentifier,
-            sender,
-        )
+                service,
+                nodeIdentifier,
+                sub_jid or client.jid.userhostJID(),
+                subscriptionIdentifier,
+                sender,
+            )
         except error.StanzaError as e:
             try:
                 next(e.getElement().elements(pubsub.NS_PUBSUB_ERRORS, "not-subscribed"))
@@ -1295,10 +1355,7 @@
 
     @utils.ensure_deferred
     async def _subscriptions(
-        self,
-        service="",
-        nodeIdentifier="",
-        profile_key=C.PROF_KEY_NONE
+        self, service="", nodeIdentifier="", profile_key=C.PROF_KEY_NONE
     ) -> str:
         client = self.host.get_client(profile_key)
         service = None if not service else jid.JID(service)
@@ -1309,7 +1366,7 @@
         self,
         client: SatXMPPEntity,
         service: Optional[jid.JID] = None,
-        node: Optional[str] = None
+        node: Optional[str] = None,
     ) -> List[Dict[str, Union[str, bool]]]:
         """Retrieve subscriptions from a service
 
@@ -1399,10 +1456,9 @@
             log.warning(f"Error while parsing item: {failure_.value}")
 
         d = defer.gatherResults([item_cb(item).addErrback(eb) for item in items])
-        d.addCallback(lambda parsed_items: (
-            [i for i in parsed_items if i is not None],
-            metadata
-        ))
+        d.addCallback(
+            lambda parsed_items: ([i for i in parsed_items if i is not None], metadata)
+        )
         return d
 
     def ser_d_list(self, results, failure_result=None):
@@ -1418,9 +1474,11 @@
         if failure_result is None:
             failure_result = ()
         return [
-            ("", result)
-            if success
-            else (str(result.result) or UNSPECIFIED, failure_result)
+            (
+                ("", result)
+                if success
+                else (str(result.result) or UNSPECIFIED, failure_result)
+            )
             for success, result in results
         ]
 
@@ -1428,10 +1486,7 @@
 
     @utils.ensure_deferred
     async def _get_node_subscriptions(
-        self,
-        service: str,
-        node: str,
-        profile_key: str
+        self, service: str, node: str, profile_key: str
     ) -> Dict[str, str]:
         client = self.host.get_client(profile_key)
         subs = await self.get_node_subscriptions(
@@ -1440,10 +1495,7 @@
         return {j.full(): a for j, a in subs.items()}
 
     async def get_node_subscriptions(
-        self,
-        client: SatXMPPEntity,
-        service: Optional[jid.JID],
-        nodeIdentifier: str
+        self, client: SatXMPPEntity, service: Optional[jid.JID], nodeIdentifier: str
     ) -> Dict[jid.JID, str]:
         """Retrieve subscriptions to a node
 
@@ -1471,15 +1523,11 @@
         try:
             return {
                 jid.JID(s["jid"]): s["subscription"]
-                for s in subscriptions_elt.elements(
-                    (pubsub.NS_PUBSUB, "subscription")
-                )
+                for s in subscriptions_elt.elements((pubsub.NS_PUBSUB, "subscription"))
             }
         except KeyError:
             raise ValueError(
-                _("Invalid result: bad <subscription> element: {}").format(
-                    iq_elt.toXml
-                )
+                _("Invalid result: bad <subscription> element: {}").format(iq_elt.toXml)
             )
 
     def _set_node_subscriptions(
@@ -1487,8 +1535,7 @@
     ):
         client = self.host.get_client(profile_key)
         subscriptions = {
-            jid.JID(jid_): subscription
-            for jid_, subscription in subscriptions.items()
+            jid.JID(jid_): subscription for jid_, subscription in subscriptions.items()
         }
         d = self.set_node_subscriptions(
             client,
@@ -1572,9 +1619,7 @@
         deferreds = {}
         for service, node in node_data:
             deferreds[(service, node)] = defer.ensureDeferred(
-                client.pubsub_client.subscribe(
-                    service, node, subscriber, options=options
-                )
+                client.pubsub_client.subscribe(service, node, subscriber, options=options)
             )
         return self.rt_sessions.new_session(deferreds, client.profile)
         # found_nodes = yield self.listNodes(service, profile=client.profile)
@@ -1639,8 +1684,14 @@
             profile_key,
         )
 
-    def get_from_many(self, node_data, max_item=None, rsm_request=None, extra=None,
-                    profile_key=C.PROF_KEY_NONE):
+    def get_from_many(
+        self,
+        node_data,
+        max_item=None,
+        rsm_request=None,
+        extra=None,
+        profile_key=C.PROF_KEY_NONE,
+    ):
         """Get items from many nodes at once
 
         @param node_data (iterable[tuple]): iterable of tuple (service, node) where:
@@ -1654,9 +1705,11 @@
         client = self.host.get_client(profile_key)
         deferreds = {}
         for service, node in node_data:
-            deferreds[(service, node)] = defer.ensureDeferred(self.get_items(
-                client, service, node, max_item, rsm_request=rsm_request, extra=extra
-            ))
+            deferreds[(service, node)] = defer.ensureDeferred(
+                self.get_items(
+                    client, service, node, max_item, rsm_request=rsm_request, extra=extra
+                )
+            )
         return self.rt_sessions.new_session(deferreds, client.profile)
 
 
@@ -1686,14 +1739,25 @@
         if extra is None:
             extra = {}
         items, rsm_response = await super().items(
-            service, nodeIdentifier, maxItems, subscriptionIdentifier, sender,
-            itemIdentifiers, orderBy, rsm_request
+            service,
+            nodeIdentifier,
+            maxItems,
+            subscriptionIdentifier,
+            sender,
+            itemIdentifiers,
+            orderBy,
+            rsm_request,
         )
         # items must be returned, thus this async point can't stop the workflow (but it
         # can modify returned items)
         await self.host.trigger.async_point(
-            "XEP-0060_items", self.parent, service, nodeIdentifier, items, rsm_response,
-            extra
+            "XEP-0060_items",
+            self.parent,
+            service,
+            nodeIdentifier,
+            items,
+            rsm_response,
+            extra,
         )
         return items, rsm_response
 
@@ -1725,9 +1789,7 @@
             try:
                 await utils.as_deferred(callback, client, event)
             except Exception as e:
-                log.error(
-                    f"Error while running items event callback {callback}: {e}"
-                )
+                log.error(f"Error while running items event callback {callback}: {e}")
 
     def itemsReceived(self, event):
         log.debug("Pubsub items received")
@@ -1747,9 +1809,11 @@
         log.debug(("Publish node deleted"))
         for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_DELETE):
             d = utils.as_deferred(callback, self.parent, event)
-            d.addErrback(lambda f: log.error(
-                f"Error while running delete event callback {callback}: {f}"
-            ))
+            d.addErrback(
+                lambda f: log.error(
+                    f"Error while running delete event callback {callback}: {f}"
+                )
+            )
         client = self.parent
         if (event.sender, event.nodeIdentifier) in client.pubsub_watching:
             self.host.bridge.ps_event_raw(
@@ -1760,9 +1824,11 @@
         log.debug(("Publish node purged"))
         for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_PURGE):
             d = utils.as_deferred(callback, self.parent, event)
-            d.addErrback(lambda f: log.error(
-                f"Error while running purge event callback {callback}: {f}"
-            ))
+            d.addErrback(
+                lambda f: log.error(
+                    f"Error while running purge event callback {callback}: {f}"
+                )
+            )
         client = self.parent
         if (event.sender, event.nodeIdentifier) in client.pubsub_watching:
             self.host.bridge.ps_event_raw(
@@ -1808,7 +1874,7 @@
         @param NodeIdentifier(unicode): PubSub node to use
         """
         # TODO: propose this upstream and remove it once merged
-        request = pubsub.PubSubRequest('purge')
+        request = pubsub.PubSubRequest("purge")
         request.recipient = service
         request.nodeIdentifier = nodeIdentifier
         return request.send(self.xmlstream)