Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_xep_0060.py @ 4270:0d7bb4df2343
Reformatted code base using black.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 19 Jun 2024 18:44:57 +0200 |
parents | 2b000790b197 |
children | 23842a63ea00 |
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_xep_0060.py Tue Jun 18 12:06:45 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0060.py Wed Jun 19 18:44:57 2024 +0200 @@ -70,11 +70,12 @@ TIMEOUT = 30 # minimum features that a pubsub service must have to be selectable as default DEFAULT_PUBSUB_MIN_FEAT = { - 'http://jabber.org/protocol/pubsub#persistent-items', - 'http://jabber.org/protocol/pubsub#publish', - 'http://jabber.org/protocol/pubsub#retract-items', + "http://jabber.org/protocol/pubsub#persistent-items", + "http://jabber.org/protocol/pubsub#publish", + "http://jabber.org/protocol/pubsub#retract-items", } + class XEP_0060(object): OPT_ACCESS_MODEL = "pubsub#access_model" OPT_PERSIST_ITEMS = "pubsub#persist_items" @@ -109,7 +110,9 @@ self.host = host self._rsm = host.plugins.get("XEP-0059") self._mam = host.plugins.get("XEP-0313") - self._node_cb = {} # dictionnary of callbacks for node (key: node, value: list of callbacks) + self._node_cb = ( + {} + ) # dictionnary of callbacks for node (key: node, value: list of callbacks) self.rt_sessions = sat_defer.RTDeferredSessions() host.bridge.add_method( "ps_node_create", @@ -376,9 +379,11 @@ return {} try: return { - "service": client.pubsub_service.full() - if client.pubsub_service is not None - else "" + "service": ( + client.pubsub_service.full() + if client.pubsub_service is not None + else "" + ) } except AttributeError: if self.host.is_connected(profile): @@ -423,12 +428,7 @@ return Extra(rsm_request, extra) - def add_managed_node( - self, - node: str, - priority: int = 0, - **kwargs: Callable - ): + def add_managed_node(self, node: str, priority: int = 0, **kwargs: Callable): """Add a handler for a node @param node: node to monitor @@ -514,31 +514,40 @@ # d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_]) # return d - def _send_item(self, service, nodeIdentifier, payload, item_id=None, extra_ser="", - profile_key=C.PROF_KEY_NONE): + def _send_item( + self, + service, + nodeIdentifier, + payload, + item_id=None, + extra_ser="", + profile_key=C.PROF_KEY_NONE, + ): client = self.host.get_client(profile_key) service = None if not service else jid.JID(service) payload = xml_tools.parse(payload) extra = data_format.deserialise(extra_ser) - d = defer.ensureDeferred(self.send_item( - client, service, nodeIdentifier, payload, item_id or None, extra - )) + d = defer.ensureDeferred( + self.send_item( + client, service, nodeIdentifier, payload, item_id or None, extra + ) + ) d.addCallback(lambda ret: ret or "") return d - def _send_items(self, service, nodeIdentifier, items, extra_ser=None, - profile_key=C.PROF_KEY_NONE): + def _send_items( + self, service, nodeIdentifier, items, extra_ser=None, profile_key=C.PROF_KEY_NONE + ): client = self.host.get_client(profile_key) service = None if not service else jid.JID(service) try: items = [xml_tools.parse(item) for item in items] except Exception as e: - raise exceptions.DataError(_("Can't parse items: {msg}").format( - msg=e)) + raise exceptions.DataError(_("Can't parse items: {msg}").format(msg=e)) extra = data_format.deserialise(extra_ser) - return defer.ensureDeferred(self.send_items( - client, service, nodeIdentifier, items, extra=extra - )) + return defer.ensureDeferred( + self.send_items(client, service, nodeIdentifier, items, extra=extra) + ) async def send_item( self, @@ -547,7 +556,7 @@ nodeIdentifier: str, payload: domish.Element, item_id: Optional[str] = None, - extra: Optional[Dict[str, Any]] = None + extra: Optional[Dict[str, Any]] = None, ) -> Optional[str]: """High level method to send one item @@ -559,16 +568,12 @@ @return: id of the created item """ assert isinstance(payload, domish.Element) - item_elt = domish.Element((pubsub.NS_PUBSUB, 'item')) + item_elt = domish.Element((pubsub.NS_PUBSUB, "item")) if item_id is not None: - item_elt['id'] = item_id + item_elt["id"] = item_id item_elt.addChild(payload) published_ids = await self.send_items( - client, - service, - nodeIdentifier, - [item_elt], - extra=extra + client, service, nodeIdentifier, [item_elt], extra=extra ) try: return published_ids[0] @@ -582,7 +587,7 @@ nodeIdentifier: str, items: List[domish.Element], sender: Optional[jid.JID] = None, - extra: Optional[Dict[str, Any]] = None + extra: Optional[Dict[str, Any]] = None, ) -> List[str]: """High level method to send several items at once @@ -608,36 +613,47 @@ service = client.jid.userhostJID() parsed_items = [] for item in items: - if item.name != 'item': + if item.name != "item": raise exceptions.DataError(_("Invalid item: {xml}").format(item.toXml())) item_id = item.getAttribute("id") parsed_items.append(pubsub.Item(id=item_id, payload=item.firstChildElement())) publish_options = extra.get(self.EXTRA_PUBLISH_OPTIONS) try: iq_result = await self.publish( - client, service, nodeIdentifier, parsed_items, options=publish_options, - sender=sender + client, + service, + nodeIdentifier, + parsed_items, + options=publish_options, + sender=sender, ) except error.StanzaError as e: - if ((e.condition == 'conflict' and e.appCondition - and e.appCondition.name == 'precondition-not-met' - and publish_options is not None)): + if ( + e.condition == "conflict" + and e.appCondition + and e.appCondition.name == "precondition-not-met" + and publish_options is not None + ): # this usually happens when publish-options can't be set - policy = extra.get(self.EXTRA_ON_PRECOND_NOT_MET, 'raise') - if policy == 'raise': + policy = extra.get(self.EXTRA_ON_PRECOND_NOT_MET, "raise") + if policy == "raise": raise e - elif policy == 'publish_without_options': - log.warning(_( - "Can't use publish-options ({options}) on node {node}, " - "re-publishing without them: {reason}").format( - options=', '.join(f'{k} = {v}' - for k,v in publish_options.items()), + elif policy == "publish_without_options": + log.warning( + _( + "Can't use publish-options ({options}) on node {node}, " + "re-publishing without them: {reason}" + ).format( + options=", ".join( + f"{k} = {v}" for k, v in publish_options.items() + ), node=nodeIdentifier, reason=e, ) ) iq_result = await self.publish( - client, service, nodeIdentifier, parsed_items) + client, service, nodeIdentifier, parsed_items + ) else: raise exceptions.InternalError( f"Invalid policy in extra's {self.EXTRA_ON_PRECOND_NOT_MET!r}: " @@ -647,8 +663,8 @@ raise e try: return [ - item['id'] - for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, 'item') + item["id"] + for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, "item") ] except AttributeError: return [] @@ -661,7 +677,7 @@ items: Optional[List[domish.Element]] = None, options: Optional[dict] = None, sender: Optional[jid.JID] = None, - extra: Optional[Dict[str, Any]] = None + extra: Optional[Dict[str, Any]] = None, ) -> domish.Element: """Publish pubsub items @@ -679,13 +695,18 @@ if extra is None: extra = {} if not await self.host.trigger.async_point( - "XEP-0060_publish", client, service, nodeIdentifier, items, options, sender, - extra + "XEP-0060_publish", + client, + service, + nodeIdentifier, + items, + options, + sender, + extra, ): return extra["iq_result_elt"] iq_result_elt = await client.pubsub_client.publish( - service, nodeIdentifier, items, sender, - options=options + service, nodeIdentifier, items, sender, options=options ) return iq_result_elt @@ -693,25 +714,35 @@ try: item_elt = reduce( lambda elt, ns_name: next(elt.elements(*ns_name)), - (message_elt, - (mam.NS_MAM, "result"), - (C.NS_FORWARD, "forwarded"), - (C.NS_CLIENT, "message"), - ("http://jabber.org/protocol/pubsub#event", "event"), - ("http://jabber.org/protocol/pubsub#event", "items"), - ("http://jabber.org/protocol/pubsub#event", "item"), - )) + ( + message_elt, + (mam.NS_MAM, "result"), + (C.NS_FORWARD, "forwarded"), + (C.NS_CLIENT, "message"), + ("http://jabber.org/protocol/pubsub#event", "event"), + ("http://jabber.org/protocol/pubsub#event", "items"), + ("http://jabber.org/protocol/pubsub#event", "item"), + ), + ) except StopIteration: raise exceptions.DataError("Can't find Item in MAM message element") return item_elt def serialise_items(self, items_data): items, metadata = items_data - metadata['items'] = items + metadata["items"] = items return data_format.serialise(metadata) - def _get_items(self, service="", node="", max_items=10, item_ids=None, sub_id=None, - extra="", profile_key=C.PROF_KEY_NONE): + def _get_items( + self, + service="", + node="", + max_items=10, + item_ids=None, + sub_id=None, + extra="", + profile_key=C.PROF_KEY_NONE, + ): """Get items from pubsub node @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit @@ -720,16 +751,18 @@ service = jid.JID(service) if service else None max_items = None if max_items == C.NO_LIMIT else max_items extra = self.parse_extra(data_format.deserialise(extra)) - d = defer.ensureDeferred(self.get_items( - client, - service, - node, - max_items, - item_ids, - sub_id or None, - extra.rsm_request, - extra.extra, - )) + d = defer.ensureDeferred( + self.get_items( + client, + service, + node, + max_items, + item_ids, + sub_id or None, + extra.rsm_request, + extra.extra, + ) + ) d.addCallback(self.trans_items_data) d.addCallback(self.serialise_items) return d @@ -743,7 +776,7 @@ item_ids: Optional[List[str]] = None, sub_id: Optional[str] = None, rsm_request: Optional[rsm.RSMRequest] = None, - extra: Optional[dict] = None + extra: Optional[dict] = None, ) -> Tuple[List[domish.Element], dict]: """Retrieve pubsub items from a node. @@ -770,25 +803,34 @@ if extra is None: extra = {} cont, ret = await self.host.trigger.async_return_point( - "XEP-0060_getItems", client, service, node, max_items, item_ids, sub_id, - rsm_request, extra + "XEP-0060_getItems", + client, + service, + node, + max_items, + item_ids, + sub_id, + rsm_request, + extra, ) if not cont: return ret try: mam_query = extra["mam"] except KeyError: - d = defer.ensureDeferred(client.pubsub_client.items( - service = service, - nodeIdentifier = node, - maxItems = max_items, - subscriptionIdentifier = sub_id, - sender = None, - itemIdentifiers = item_ids, - orderBy = extra.get(C.KEY_ORDER_BY), - rsm_request = rsm_request, - extra = extra - )) + d = defer.ensureDeferred( + client.pubsub_client.items( + service=service, + nodeIdentifier=node, + maxItems=max_items, + subscriptionIdentifier=sub_id, + sender=None, + itemIdentifiers=item_ids, + orderBy=extra.get(C.KEY_ORDER_BY), + rsm_request=rsm_request, + extra=extra, + ) + ) # we have no MAM data here, so we add None d.addErrback(sat_defer.stanza_2_not_found) d.addTimeout(TIMEOUT, reactor) @@ -844,7 +886,7 @@ # we can put them directly in metadata metadata.update(mam_response) if rsm_request is not None and rsm_response is not None: - metadata['rsm'] = rsm_response.toDict() + metadata["rsm"] = rsm_response.toDict() if mam_response is None: index = rsm_response.index count = rsm_response.count @@ -887,15 +929,28 @@ # d_dict[publisher] = self.get_items(service, node, max_items, None, sub_id, rsm, client.profile) # defer.returnValue(d_dict) - def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None, - profile_key=C.PROF_KEY_NONE): + def getOptions( + self, + service, + nodeIdentifier, + subscriber, + subscriptionIdentifier=None, + profile_key=C.PROF_KEY_NONE, + ): client = self.host.get_client(profile_key) return client.pubsub_client.getOptions( service, nodeIdentifier, subscriber, subscriptionIdentifier ) - def setOptions(self, service, nodeIdentifier, subscriber, options, - subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): + def setOptions( + self, + service, + nodeIdentifier, + subscriber, + options, + subscriptionIdentifier=None, + profile_key=C.PROF_KEY_NONE, + ): client = self.host.get_client(profile_key) return client.pubsub_client.setOptions( service, nodeIdentifier, subscriber, options, subscriptionIdentifier @@ -912,7 +967,7 @@ client: SatXMPPClient, service: jid.JID, nodeIdentifier: Optional[str] = None, - options: Optional[Dict[str, str]] = None + options: Optional[Dict[str, str]] = None, ) -> str: """Create a new node @@ -1038,9 +1093,7 @@ d = self.get_node_affiliations( client, jid.JID(service_s) if service_s else None, nodeIdentifier ) - d.addCallback( - lambda affiliations: {j.full(): a for j, a in affiliations.items()} - ) + d.addCallback(lambda affiliations: {j.full(): a for j, a in affiliations.items()}) return d def get_node_affiliations(self, client, service, nodeIdentifier): @@ -1122,10 +1175,7 @@ ) def deleteNode( - self, - client: SatXMPPClient, - service: jid.JID, - nodeIdentifier: str + self, client: SatXMPPClient, service: jid.JID, nodeIdentifier: str ) -> defer.Deferred: return client.pubsub_client.deleteNode(service, nodeIdentifier) @@ -1188,9 +1238,9 @@ ): client = self.host.get_client(profile_key) service = jid.JID(service) if service else None - return defer.ensureDeferred(self.rename_item( - client, service, node, item_id, new_id - )) + return defer.ensureDeferred( + self.rename_item(client, service, node, item_id, new_id) + ) async def rename_item( self, @@ -1198,7 +1248,7 @@ service: Optional[jid.JID], node: str, item_id: str, - new_id: str + new_id: str, ) -> None: """Rename an item by recreating it then deleting it @@ -1218,10 +1268,7 @@ service = None if not service else jid.JID(service) d = defer.ensureDeferred( self.subscribe( - client, - service, - nodeIdentifier, - options=data_format.deserialise(options) + client, service, nodeIdentifier, options=data_format.deserialise(options) ) ) d.addCallback(lambda subscription: subscription.subscriptionIdentifier or "") @@ -1233,23 +1280,31 @@ service: Optional[jid.JID], nodeIdentifier: str, sub_jid: Optional[jid.JID] = None, - options: Optional[dict] = None + options: Optional[dict] = None, ) -> pubsub.Subscription: # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe if service is None: service = client.jid.userhostJID() cont, trigger_sub = await self.host.trigger.async_return_point( - "XEP-0060_subscribe", client, service, nodeIdentifier, sub_jid, options, + "XEP-0060_subscribe", + client, + service, + nodeIdentifier, + sub_jid, + options, ) if not cont: return trigger_sub try: subscription = await client.pubsub_client.subscribe( - service, nodeIdentifier, sub_jid or client.jid.userhostJID(), - options=options, sender=client.jid.userhostJID() + service, + nodeIdentifier, + sub_jid or client.jid.userhostJID(), + options=options, + sender=client.jid.userhostJID(), ) except error.StanzaError as e: - if e.condition == 'item-not-found': + if e.condition == "item-not-found": raise exceptions.NotFound(e.text or e.condition) else: raise e @@ -1270,18 +1325,23 @@ sender: Optional[jid.JID] = None, ) -> None: if not await self.host.trigger.async_point( - "XEP-0060_unsubscribe", client, service, nodeIdentifier, sub_jid, - subscriptionIdentifier, sender + "XEP-0060_unsubscribe", + client, + service, + nodeIdentifier, + sub_jid, + subscriptionIdentifier, + sender, ): return try: await client.pubsub_client.unsubscribe( - service, - nodeIdentifier, - sub_jid or client.jid.userhostJID(), - subscriptionIdentifier, - sender, - ) + service, + nodeIdentifier, + sub_jid or client.jid.userhostJID(), + subscriptionIdentifier, + sender, + ) except error.StanzaError as e: try: next(e.getElement().elements(pubsub.NS_PUBSUB_ERRORS, "not-subscribed")) @@ -1295,10 +1355,7 @@ @utils.ensure_deferred async def _subscriptions( - self, - service="", - nodeIdentifier="", - profile_key=C.PROF_KEY_NONE + self, service="", nodeIdentifier="", profile_key=C.PROF_KEY_NONE ) -> str: client = self.host.get_client(profile_key) service = None if not service else jid.JID(service) @@ -1309,7 +1366,7 @@ self, client: SatXMPPEntity, service: Optional[jid.JID] = None, - node: Optional[str] = None + node: Optional[str] = None, ) -> List[Dict[str, Union[str, bool]]]: """Retrieve subscriptions from a service @@ -1399,10 +1456,9 @@ log.warning(f"Error while parsing item: {failure_.value}") d = defer.gatherResults([item_cb(item).addErrback(eb) for item in items]) - d.addCallback(lambda parsed_items: ( - [i for i in parsed_items if i is not None], - metadata - )) + d.addCallback( + lambda parsed_items: ([i for i in parsed_items if i is not None], metadata) + ) return d def ser_d_list(self, results, failure_result=None): @@ -1418,9 +1474,11 @@ if failure_result is None: failure_result = () return [ - ("", result) - if success - else (str(result.result) or UNSPECIFIED, failure_result) + ( + ("", result) + if success + else (str(result.result) or UNSPECIFIED, failure_result) + ) for success, result in results ] @@ -1428,10 +1486,7 @@ @utils.ensure_deferred async def _get_node_subscriptions( - self, - service: str, - node: str, - profile_key: str + self, service: str, node: str, profile_key: str ) -> Dict[str, str]: client = self.host.get_client(profile_key) subs = await self.get_node_subscriptions( @@ -1440,10 +1495,7 @@ return {j.full(): a for j, a in subs.items()} async def get_node_subscriptions( - self, - client: SatXMPPEntity, - service: Optional[jid.JID], - nodeIdentifier: str + self, client: SatXMPPEntity, service: Optional[jid.JID], nodeIdentifier: str ) -> Dict[jid.JID, str]: """Retrieve subscriptions to a node @@ -1471,15 +1523,11 @@ try: return { jid.JID(s["jid"]): s["subscription"] - for s in subscriptions_elt.elements( - (pubsub.NS_PUBSUB, "subscription") - ) + for s in subscriptions_elt.elements((pubsub.NS_PUBSUB, "subscription")) } except KeyError: raise ValueError( - _("Invalid result: bad <subscription> element: {}").format( - iq_elt.toXml - ) + _("Invalid result: bad <subscription> element: {}").format(iq_elt.toXml) ) def _set_node_subscriptions( @@ -1487,8 +1535,7 @@ ): client = self.host.get_client(profile_key) subscriptions = { - jid.JID(jid_): subscription - for jid_, subscription in subscriptions.items() + jid.JID(jid_): subscription for jid_, subscription in subscriptions.items() } d = self.set_node_subscriptions( client, @@ -1572,9 +1619,7 @@ deferreds = {} for service, node in node_data: deferreds[(service, node)] = defer.ensureDeferred( - client.pubsub_client.subscribe( - service, node, subscriber, options=options - ) + client.pubsub_client.subscribe(service, node, subscriber, options=options) ) return self.rt_sessions.new_session(deferreds, client.profile) # found_nodes = yield self.listNodes(service, profile=client.profile) @@ -1639,8 +1684,14 @@ profile_key, ) - def get_from_many(self, node_data, max_item=None, rsm_request=None, extra=None, - profile_key=C.PROF_KEY_NONE): + def get_from_many( + self, + node_data, + max_item=None, + rsm_request=None, + extra=None, + profile_key=C.PROF_KEY_NONE, + ): """Get items from many nodes at once @param node_data (iterable[tuple]): iterable of tuple (service, node) where: @@ -1654,9 +1705,11 @@ client = self.host.get_client(profile_key) deferreds = {} for service, node in node_data: - deferreds[(service, node)] = defer.ensureDeferred(self.get_items( - client, service, node, max_item, rsm_request=rsm_request, extra=extra - )) + deferreds[(service, node)] = defer.ensureDeferred( + self.get_items( + client, service, node, max_item, rsm_request=rsm_request, extra=extra + ) + ) return self.rt_sessions.new_session(deferreds, client.profile) @@ -1686,14 +1739,25 @@ if extra is None: extra = {} items, rsm_response = await super().items( - service, nodeIdentifier, maxItems, subscriptionIdentifier, sender, - itemIdentifiers, orderBy, rsm_request + service, + nodeIdentifier, + maxItems, + subscriptionIdentifier, + sender, + itemIdentifiers, + orderBy, + rsm_request, ) # items must be returned, thus this async point can't stop the workflow (but it # can modify returned items) await self.host.trigger.async_point( - "XEP-0060_items", self.parent, service, nodeIdentifier, items, rsm_response, - extra + "XEP-0060_items", + self.parent, + service, + nodeIdentifier, + items, + rsm_response, + extra, ) return items, rsm_response @@ -1725,9 +1789,7 @@ try: await utils.as_deferred(callback, client, event) except Exception as e: - log.error( - f"Error while running items event callback {callback}: {e}" - ) + log.error(f"Error while running items event callback {callback}: {e}") def itemsReceived(self, event): log.debug("Pubsub items received") @@ -1747,9 +1809,11 @@ log.debug(("Publish node deleted")) for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_DELETE): d = utils.as_deferred(callback, self.parent, event) - d.addErrback(lambda f: log.error( - f"Error while running delete event callback {callback}: {f}" - )) + d.addErrback( + lambda f: log.error( + f"Error while running delete event callback {callback}: {f}" + ) + ) client = self.parent if (event.sender, event.nodeIdentifier) in client.pubsub_watching: self.host.bridge.ps_event_raw( @@ -1760,9 +1824,11 @@ log.debug(("Publish node purged")) for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_PURGE): d = utils.as_deferred(callback, self.parent, event) - d.addErrback(lambda f: log.error( - f"Error while running purge event callback {callback}: {f}" - )) + d.addErrback( + lambda f: log.error( + f"Error while running purge event callback {callback}: {f}" + ) + ) client = self.parent if (event.sender, event.nodeIdentifier) in client.pubsub_watching: self.host.bridge.ps_event_raw( @@ -1808,7 +1874,7 @@ @param NodeIdentifier(unicode): PubSub node to use """ # TODO: propose this upstream and remove it once merged - request = pubsub.PubSubRequest('purge') + request = pubsub.PubSubRequest("purge") request.recipient = service request.nodeIdentifier = nodeIdentifier return request.send(self.xmlstream)