comparison sat/plugins/plugin_xep_0060.py @ 3213:c2f958dde5d2

plugin XEP-0060: async sendItems + precondition-not-met policy: - sendItems is now an "async" coroutine, and sendItem use it instead of duplicating publication logic - policy to use when a precondition is not met (with publish-options) can now be specified, for now it's either raise the exception (default policy), or try to publish without the options - constants have been added to handle "extra" keys
author Goffi <goffi@goffi.org>
date Wed, 11 Mar 2020 19:15:48 +0100
parents 559a625a236b
children 4c98f4972db5
comparison
equal deleted inserted replaced
3212:89d97776fd34 3213:c2f958dde5d2
1 #!/usr/bin/env python3 1 #!/usr/bin/env python3
2 2
3 3 # SàT plugin for Publish-Subscribe (xep-0060)
4 # SAT plugin for Publish-Subscribe (xep-0060)
5 # Copyright (C) 2009-2020 Jérôme Poisson (goffi@goffi.org) 4 # Copyright (C) 2009-2020 Jérôme Poisson (goffi@goffi.org)
6 5
7 # This program is free software: you can redistribute it and/or modify 6 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by 7 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or 8 # the Free Software Foundation, either version 3 of the License, or
19 18
20 19
21 from collections import namedtuple 20 from collections import namedtuple
22 import urllib.request, urllib.parse, urllib.error 21 import urllib.request, urllib.parse, urllib.error
23 from functools import reduce 22 from functools import reduce
24
25 from sat.core.i18n import _
26 from sat.core.constants import Const as C
27 from sat.core.log import getLogger
28
29 from sat.core import exceptions
30 from sat.tools import sat_defer
31 from sat.tools.common import data_format
32
33 from zope.interface import implementer 23 from zope.interface import implementer
34 24 from twisted.words.xish import domish
25 from twisted.words.protocols.jabber import jid, error
26 from twisted.internet import reactor, defer
35 from wokkel import disco 27 from wokkel import disco
36 from wokkel import data_form 28 from wokkel import data_form
37 from wokkel import generic 29 from wokkel import generic
38
39 # XXX: sat_tmp.wokkel.pubsub is actually use instead of wokkel version 30 # XXX: sat_tmp.wokkel.pubsub is actually use instead of wokkel version
40 # mam and rsm come from sat_tmp.wokkel too 31 # mam and rsm come from sat_tmp.wokkel too
41 from wokkel import pubsub 32 from wokkel import pubsub
42 from wokkel import rsm 33 from wokkel import rsm
43 from wokkel import mam 34 from wokkel import mam
44 35 from sat.core.i18n import _
45 from twisted.words.protocols.jabber import jid, error 36 from sat.core.constants import Const as C
46 from twisted.internet import reactor, defer 37 from sat.core.log import getLogger
38 from sat.core import exceptions
39 from sat.tools import sat_defer
40 from sat.tools.common import data_format
47 41
48 42
49 log = getLogger(__name__) 43 log = getLogger(__name__)
50 44
51 PLUGIN_INFO = { 45 PLUGIN_INFO = {
84 ACCESS_ROSTER = "roster" 78 ACCESS_ROSTER = "roster"
85 ACCESS_PUBLISHER_ROSTER = "publisher-roster" 79 ACCESS_PUBLISHER_ROSTER = "publisher-roster"
86 ACCESS_AUTHORIZE = "authorize" 80 ACCESS_AUTHORIZE = "authorize"
87 ACCESS_WHITELIST = "whitelist" 81 ACCESS_WHITELIST = "whitelist"
88 ID_SINGLETON = "current" 82 ID_SINGLETON = "current"
83 EXTRA_PUBLISH_OPTIONS = "publish_options"
84 EXTRA_ON_PRECOND_NOT_MET = "on_precondition_not_met"
89 85
90 def __init__(self, host): 86 def __init__(self, host):
91 log.info(_("PubSub plugin initialization")) 87 log.info(_("PubSub plugin initialization"))
92 self.host = host 88 self.host = host
93 self._rsm = host.plugins.get("XEP-0059") 89 self._rsm = host.plugins.get("XEP-0059")
477 d = self.sendItems( 473 d = self.sendItems(
478 client, service, nodeIdentifier, items, extra 474 client, service, nodeIdentifier, items, extra
479 ) 475 )
480 return d 476 return d
481 477
482 def _getPublishedItemId(self, iq_elt, original_id): 478 def _getPublishedItemId(self, published_ids, original_id):
483 """return item of published id if found in answer 479 """Return item of published id if found in answer
484 480
485 if not found original_id is returned, or empty string if it is None or empty 481 if not found original_id is returned, which may be None
486 string
487 """ 482 """
488 try: 483 try:
489 item_id = iq_elt.pubsub.publish.item["id"] 484 return published_ids[0]
490 except (AttributeError, KeyError): 485 except IndexError:
491 item_id = None 486 return original_id
492 return item_id or original_id
493 487
494 def sendItem(self, client, service, nodeIdentifier, payload, item_id=None, 488 def sendItem(self, client, service, nodeIdentifier, payload, item_id=None,
495 extra=None): 489 extra=None):
496 """High level method to send one item 490 """High level method to send one item
497 491
501 @param payload(domish.Element, unicode): payload of the item to send 495 @param payload(domish.Element, unicode): payload of the item to send
502 @param item_id(unicode, None): id to use or None to create one 496 @param item_id(unicode, None): id to use or None to create one
503 @param extra(dict, None): extra option, not used yet 497 @param extra(dict, None): extra option, not used yet
504 @return (unicode, None): id of the created item 498 @return (unicode, None): id of the created item
505 """ 499 """
506 if extra is None: 500 item_elt = domish.Element((pubsub.NS_PUBSUB, 'item'))
507 extra = {} 501 if item_id is not None:
508 publish_options = extra.get('publish_options') 502 item_elt['id'] = item_id
509 item_elt = pubsub.Item(id=item_id, payload=payload) 503 item_elt.addChild(payload)
510 d = self.publish( 504 d = defer.ensureDeferred(self.sendItems(
511 client, service, nodeIdentifier, [item_elt], options=publish_options) 505 client,
506 service,
507 nodeIdentifier,
508 [item_elt],
509 extra
510 ))
512 d.addCallback(self._getPublishedItemId, item_id) 511 d.addCallback(self._getPublishedItemId, item_id)
513 return d 512 return d
514 513
515 def _publishCb(self, iq_result): 514 async def sendItems(self, client, service, nodeIdentifier, items, extra=None):
516 """Parse publish result, and return ids given by pubsub service"""
517 try:
518 item_ids = [item['id']
519 for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, 'item')]
520 except AttributeError:
521 return []
522 return item_ids
523
524 def sendItems(self, client, service, nodeIdentifier, items, extra=None):
525 """High level method to send several items at once 515 """High level method to send several items at once
526 516
527 @param service(jid.JID, None): service to send the item to 517 @param service(jid.JID, None): service to send the item to
528 None to use PEP 518 None to use PEP
529 @param NodeIdentifier(unicode): PubSub node to use 519 @param NodeIdentifier(unicode): PubSub node to use
530 @param items(list[domish.Element]): whole item elements to send, 520 @param items(list[domish.Element]): whole item elements to send,
531 "id" will be used if set 521 "id" will be used if set
532 @param extra(dict, None): extra options. Key can be: 522 @param extra(dict, None): extra options. Key can be:
533 - publish_options: dict of publish-options 523 - self.EXTRA_PUBLISH_OPTIONS(dict): publish options, cf. XEP-0060 § 7.1.5
524 the dict maps option name to value(s)
525 - self.EXTRA_ON_PRECOND_NOT_MET(str): policy to have when publishing is
526 failing du to failing precondition. Value can be:
527 * raise (default): raise the exception
528 * publish_without_options: re-publish without the publish-options.
529 A warning will be logged showing that the publish-options could not
530 be used
534 @return (list[unicode]): ids of the created items 531 @return (list[unicode]): ids of the created items
535 """ 532 """
536 if extra is None: 533 if extra is None:
537 extra = {} 534 extra = {}
538 parsed_items = [] 535 parsed_items = []
539 for item in items: 536 for item in items:
540 if item.name != 'item': 537 if item.name != 'item':
541 raise exceptions.DataError(_("Invalid item: {xml}").format(item.toXml())) 538 raise exceptions.DataError(_("Invalid item: {xml}").format(item.toXml()))
542 item_id = item.getAttribute("id") 539 item_id = item.getAttribute("id")
543 parsed_items.append(pubsub.Item(id=item_id, payload=item.firstChildElement())) 540 parsed_items.append(pubsub.Item(id=item_id, payload=item.firstChildElement()))
544 publish_options = extra.get('publish_options') 541 publish_options = extra.get(self.EXTRA_PUBLISH_OPTIONS)
545 d = self.publish( 542 try:
546 client, service, nodeIdentifier, parsed_items, options=publish_options) 543 iq_result = await self.publish(
547 d.addCallback(self._publishCb) 544 client, service, nodeIdentifier, parsed_items, options=publish_options)
548 return d 545 except error.StanzaError as e:
546 if ((e.condition == 'conflict' and e.appCondition
547 and e.appCondition.name == 'precondition-not-met'
548 and publish_options is not None)):
549 # this usually happens when publish-options can't be set
550 policy = extra.get(self.EXTRA_ON_PRECOND_NOT_MET, 'raise')
551 if policy == 'raise':
552 raise e
553 elif policy == 'publish_without_options':
554 log.warning(_(
555 "Can't use publish-options ({options}) on node {node}, "
556 "re-publishing without them: {reason}").format(
557 options=', '.join(f'{k} = {v}'
558 for k,v in publish_options.items()),
559 node=nodeIdentifier,
560 reason=e,
561 )
562 )
563 iq_result = await self.publish(
564 client, service, nodeIdentifier, parsed_items)
565 else:
566 raise exceptions.InternalError(
567 f"Invalid policy in extra's {self.EXTRA_ON_PRECOND_NOT_MET!r}: "
568 f"{policy}"
569 )
570 else:
571 raise e
572 try:
573 return [item['id']
574 for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, 'item')]
575 except AttributeError:
576 return []
549 577
550 def publish(self, client, service, nodeIdentifier, items=None, options=None): 578 def publish(self, client, service, nodeIdentifier, items=None, options=None):
551 return client.pubsub_client.publish( 579 return client.pubsub_client.publish(
552 service, nodeIdentifier, items, client.pubsub_client.parent.jid, 580 service, nodeIdentifier, items, client.pubsub_client.parent.jid,
553 options=options 581 options=options