# HG changeset patch # User Goffi # Date 1583950548 -3600 # Node ID c2f958dde5d20a5d77af5ea3c1cc8469f69df85c # Parent 89d97776fd34fa70b482cc86fc9927cca6f9750e plugin XEP-0060: async sendItems + precondition-not-met policy: - sendItems is now an "async" coroutine, and sendItem use it instead of duplicating publication logic - policy to use when a precondition is not met (with publish-options) can now be specified, for now it's either raise the exception (default policy), or try to publish without the options - constants have been added to handle "extra" keys diff -r 89d97776fd34 -r c2f958dde5d2 sat/plugins/plugin_xep_0060.py --- a/sat/plugins/plugin_xep_0060.py Fri Mar 06 18:51:04 2020 +0100 +++ b/sat/plugins/plugin_xep_0060.py Wed Mar 11 19:15:48 2020 +0100 @@ -1,7 +1,6 @@ #!/usr/bin/env python3 - -# SAT plugin for Publish-Subscribe (xep-0060) +# SàT plugin for Publish-Subscribe (xep-0060) # Copyright (C) 2009-2020 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify @@ -21,29 +20,24 @@ from collections import namedtuple import urllib.request, urllib.parse, urllib.error from functools import reduce - -from sat.core.i18n import _ -from sat.core.constants import Const as C -from sat.core.log import getLogger - -from sat.core import exceptions -from sat.tools import sat_defer -from sat.tools.common import data_format - from zope.interface import implementer - +from twisted.words.xish import domish +from twisted.words.protocols.jabber import jid, error +from twisted.internet import reactor, defer from wokkel import disco from wokkel import data_form from wokkel import generic - # XXX: sat_tmp.wokkel.pubsub is actually use instead of wokkel version # mam and rsm come from sat_tmp.wokkel too from wokkel import pubsub from wokkel import rsm from wokkel import mam - -from twisted.words.protocols.jabber import jid, error -from twisted.internet import reactor, defer +from sat.core.i18n import _ +from sat.core.constants import Const as C +from sat.core.log import getLogger +from sat.core import exceptions +from sat.tools import sat_defer +from sat.tools.common import data_format log = getLogger(__name__) @@ -86,6 +80,8 @@ ACCESS_AUTHORIZE = "authorize" ACCESS_WHITELIST = "whitelist" ID_SINGLETON = "current" + EXTRA_PUBLISH_OPTIONS = "publish_options" + EXTRA_ON_PRECOND_NOT_MET = "on_precondition_not_met" def __init__(self, host): log.info(_("PubSub plugin initialization")) @@ -479,17 +475,15 @@ ) return d - def _getPublishedItemId(self, iq_elt, original_id): - """return item of published id if found in answer + def _getPublishedItemId(self, published_ids, original_id): + """Return item of published id if found in answer - if not found original_id is returned, or empty string if it is None or empty - string + if not found original_id is returned, which may be None """ try: - item_id = iq_elt.pubsub.publish.item["id"] - except (AttributeError, KeyError): - item_id = None - return item_id or original_id + return published_ids[0] + except IndexError: + return original_id def sendItem(self, client, service, nodeIdentifier, payload, item_id=None, extra=None): @@ -503,25 +497,21 @@ @param extra(dict, None): extra option, not used yet @return (unicode, None): id of the created item """ - if extra is None: - extra = {} - publish_options = extra.get('publish_options') - item_elt = pubsub.Item(id=item_id, payload=payload) - d = self.publish( - client, service, nodeIdentifier, [item_elt], options=publish_options) + item_elt = domish.Element((pubsub.NS_PUBSUB, 'item')) + if item_id is not None: + item_elt['id'] = item_id + item_elt.addChild(payload) + d = defer.ensureDeferred(self.sendItems( + client, + service, + nodeIdentifier, + [item_elt], + extra + )) d.addCallback(self._getPublishedItemId, item_id) return d - def _publishCb(self, iq_result): - """Parse publish result, and return ids given by pubsub service""" - try: - item_ids = [item['id'] - for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, 'item')] - except AttributeError: - return [] - return item_ids - - def sendItems(self, client, service, nodeIdentifier, items, extra=None): + async def sendItems(self, client, service, nodeIdentifier, items, extra=None): """High level method to send several items at once @param service(jid.JID, None): service to send the item to @@ -530,7 +520,14 @@ @param items(list[domish.Element]): whole item elements to send, "id" will be used if set @param extra(dict, None): extra options. Key can be: - - publish_options: dict of publish-options + - self.EXTRA_PUBLISH_OPTIONS(dict): publish options, cf. XEP-0060 § 7.1.5 + the dict maps option name to value(s) + - self.EXTRA_ON_PRECOND_NOT_MET(str): policy to have when publishing is + failing du to failing precondition. Value can be: + * raise (default): raise the exception + * publish_without_options: re-publish without the publish-options. + A warning will be logged showing that the publish-options could not + be used @return (list[unicode]): ids of the created items """ if extra is None: @@ -541,11 +538,42 @@ raise exceptions.DataError(_("Invalid item: {xml}").format(item.toXml())) item_id = item.getAttribute("id") parsed_items.append(pubsub.Item(id=item_id, payload=item.firstChildElement())) - publish_options = extra.get('publish_options') - d = self.publish( - client, service, nodeIdentifier, parsed_items, options=publish_options) - d.addCallback(self._publishCb) - return d + publish_options = extra.get(self.EXTRA_PUBLISH_OPTIONS) + try: + iq_result = await self.publish( + client, service, nodeIdentifier, parsed_items, options=publish_options) + except error.StanzaError as e: + if ((e.condition == 'conflict' and e.appCondition + and e.appCondition.name == 'precondition-not-met' + and publish_options is not None)): + # this usually happens when publish-options can't be set + policy = extra.get(self.EXTRA_ON_PRECOND_NOT_MET, 'raise') + if policy == 'raise': + raise e + elif policy == 'publish_without_options': + log.warning(_( + "Can't use publish-options ({options}) on node {node}, " + "re-publishing without them: {reason}").format( + options=', '.join(f'{k} = {v}' + for k,v in publish_options.items()), + node=nodeIdentifier, + reason=e, + ) + ) + iq_result = await self.publish( + client, service, nodeIdentifier, parsed_items) + else: + raise exceptions.InternalError( + f"Invalid policy in extra's {self.EXTRA_ON_PRECOND_NOT_MET!r}: " + f"{policy}" + ) + else: + raise e + try: + return [item['id'] + for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, 'item')] + except AttributeError: + return [] def publish(self, client, service, nodeIdentifier, items=None, options=None): return client.pubsub_client.publish(