changeset 3213:c2f958dde5d2

plugin XEP-0060: async sendItems + precondition-not-met policy: - sendItems is now an "async" coroutine, and sendItem use it instead of duplicating publication logic - policy to use when a precondition is not met (with publish-options) can now be specified, for now it's either raise the exception (default policy), or try to publish without the options - constants have been added to handle "extra" keys
author Goffi <goffi@goffi.org>
date Wed, 11 Mar 2020 19:15:48 +0100
parents 89d97776fd34
children 8d92d4d829fb
files sat/plugins/plugin_xep_0060.py
diffstat 1 files changed, 74 insertions(+), 46 deletions(-) [+]
line wrap: on
line diff
--- a/sat/plugins/plugin_xep_0060.py	Fri Mar 06 18:51:04 2020 +0100
+++ b/sat/plugins/plugin_xep_0060.py	Wed Mar 11 19:15:48 2020 +0100
@@ -1,7 +1,6 @@
 #!/usr/bin/env python3
 
-
-# SAT plugin for Publish-Subscribe (xep-0060)
+# SàT plugin for Publish-Subscribe (xep-0060)
 # Copyright (C) 2009-2020 Jérôme Poisson (goffi@goffi.org)
 
 # This program is free software: you can redistribute it and/or modify
@@ -21,29 +20,24 @@
 from collections import namedtuple
 import urllib.request, urllib.parse, urllib.error
 from functools import reduce
-
-from sat.core.i18n import _
-from sat.core.constants import Const as C
-from sat.core.log import getLogger
-
-from sat.core import exceptions
-from sat.tools import sat_defer
-from sat.tools.common import data_format
-
 from zope.interface import implementer
-
+from twisted.words.xish import domish
+from twisted.words.protocols.jabber import jid, error
+from twisted.internet import reactor, defer
 from wokkel import disco
 from wokkel import data_form
 from wokkel import generic
-
 # XXX: sat_tmp.wokkel.pubsub is actually use instead of wokkel version
 #      mam and rsm come from sat_tmp.wokkel too
 from wokkel import pubsub
 from wokkel import rsm
 from wokkel import mam
-
-from twisted.words.protocols.jabber import jid, error
-from twisted.internet import reactor, defer
+from sat.core.i18n import _
+from sat.core.constants import Const as C
+from sat.core.log import getLogger
+from sat.core import exceptions
+from sat.tools import sat_defer
+from sat.tools.common import data_format
 
 
 log = getLogger(__name__)
@@ -86,6 +80,8 @@
     ACCESS_AUTHORIZE = "authorize"
     ACCESS_WHITELIST = "whitelist"
     ID_SINGLETON = "current"
+    EXTRA_PUBLISH_OPTIONS = "publish_options"
+    EXTRA_ON_PRECOND_NOT_MET = "on_precondition_not_met"
 
     def __init__(self, host):
         log.info(_("PubSub plugin initialization"))
@@ -479,17 +475,15 @@
         )
         return d
 
-    def _getPublishedItemId(self, iq_elt, original_id):
-        """return item of published id if found in answer
+    def _getPublishedItemId(self, published_ids, original_id):
+        """Return item of published id if found in answer
 
-        if not found original_id is returned, or empty string if it is None or empty
-            string
+        if not found original_id is returned, which may be None
         """
         try:
-            item_id = iq_elt.pubsub.publish.item["id"]
-        except (AttributeError, KeyError):
-            item_id = None
-        return item_id or original_id
+            return published_ids[0]
+        except IndexError:
+            return original_id
 
     def sendItem(self, client, service, nodeIdentifier, payload, item_id=None,
                  extra=None):
@@ -503,25 +497,21 @@
         @param extra(dict, None): extra option, not used yet
         @return (unicode, None): id of the created item
         """
-        if extra is None:
-            extra = {}
-        publish_options = extra.get('publish_options')
-        item_elt = pubsub.Item(id=item_id, payload=payload)
-        d = self.publish(
-            client, service, nodeIdentifier, [item_elt], options=publish_options)
+        item_elt = domish.Element((pubsub.NS_PUBSUB, 'item'))
+        if item_id is not None:
+            item_elt['id'] = item_id
+        item_elt.addChild(payload)
+        d = defer.ensureDeferred(self.sendItems(
+            client,
+            service,
+            nodeIdentifier,
+            [item_elt],
+            extra
+        ))
         d.addCallback(self._getPublishedItemId, item_id)
         return d
 
-    def _publishCb(self, iq_result):
-        """Parse publish result, and return ids given by pubsub service"""
-        try:
-            item_ids = [item['id']
-                for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, 'item')]
-        except AttributeError:
-            return []
-        return item_ids
-
-    def sendItems(self, client, service, nodeIdentifier, items, extra=None):
+    async def sendItems(self, client, service, nodeIdentifier, items, extra=None):
         """High level method to send several items at once
 
         @param service(jid.JID, None): service to send the item to
@@ -530,7 +520,14 @@
         @param items(list[domish.Element]): whole item elements to send,
             "id" will be used if set
         @param extra(dict, None): extra options. Key can be:
-            - publish_options: dict of publish-options
+            - self.EXTRA_PUBLISH_OPTIONS(dict): publish options, cf. XEP-0060 § 7.1.5
+                the dict maps option name to value(s)
+            - self.EXTRA_ON_PRECOND_NOT_MET(str): policy to have when publishing is
+                failing du to failing precondition. Value can be:
+                * raise (default): raise the exception
+                * publish_without_options: re-publish without the publish-options.
+                    A warning will be logged showing that the publish-options could not
+                    be used
         @return (list[unicode]): ids of the created items
         """
         if extra is None:
@@ -541,11 +538,42 @@
                 raise exceptions.DataError(_("Invalid item: {xml}").format(item.toXml()))
             item_id = item.getAttribute("id")
             parsed_items.append(pubsub.Item(id=item_id, payload=item.firstChildElement()))
-        publish_options = extra.get('publish_options')
-        d = self.publish(
-            client, service, nodeIdentifier, parsed_items, options=publish_options)
-        d.addCallback(self._publishCb)
-        return d
+        publish_options = extra.get(self.EXTRA_PUBLISH_OPTIONS)
+        try:
+            iq_result = await self.publish(
+                client, service, nodeIdentifier, parsed_items, options=publish_options)
+        except error.StanzaError as e:
+            if ((e.condition == 'conflict' and e.appCondition
+                 and e.appCondition.name == 'precondition-not-met'
+                 and publish_options is not None)):
+                # this usually happens when publish-options can't be set
+                policy = extra.get(self.EXTRA_ON_PRECOND_NOT_MET, 'raise')
+                if policy == 'raise':
+                    raise e
+                elif policy == 'publish_without_options':
+                    log.warning(_(
+                        "Can't use publish-options ({options}) on node {node}, "
+                        "re-publishing without them: {reason}").format(
+                            options=', '.join(f'{k} = {v}'
+                                    for k,v in publish_options.items()),
+                            node=nodeIdentifier,
+                            reason=e,
+                        )
+                    )
+                    iq_result = await self.publish(
+                        client, service, nodeIdentifier, parsed_items)
+                else:
+                    raise exceptions.InternalError(
+                        f"Invalid policy in extra's {self.EXTRA_ON_PRECOND_NOT_MET!r}: "
+                        f"{policy}"
+                    )
+            else:
+                raise e
+        try:
+            return [item['id']
+                for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, 'item')]
+        except AttributeError:
+            return []
 
     def publish(self, client, service, nodeIdentifier, items=None, options=None):
         return client.pubsub_client.publish(