Mercurial > libervia-backend
comparison sat/plugins/plugin_xep_0060.py @ 3213:c2f958dde5d2
plugin XEP-0060: async sendItems + precondition-not-met policy:
- sendItems is now an "async" coroutine, and sendItem use it instead of duplicating
publication logic
- policy to use when a precondition is not met (with publish-options) can now be
specified, for now it's either raise the exception (default policy), or try to publish
without the options
- constants have been added to handle "extra" keys
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 11 Mar 2020 19:15:48 +0100 |
parents | 559a625a236b |
children | 4c98f4972db5 |
comparison
equal
deleted
inserted
replaced
3212:89d97776fd34 | 3213:c2f958dde5d2 |
---|---|
1 #!/usr/bin/env python3 | 1 #!/usr/bin/env python3 |
2 | 2 |
3 | 3 # SàT plugin for Publish-Subscribe (xep-0060) |
4 # SAT plugin for Publish-Subscribe (xep-0060) | |
5 # Copyright (C) 2009-2020 Jérôme Poisson (goffi@goffi.org) | 4 # Copyright (C) 2009-2020 Jérôme Poisson (goffi@goffi.org) |
6 | 5 |
7 # This program is free software: you can redistribute it and/or modify | 6 # This program is free software: you can redistribute it and/or modify |
8 # it under the terms of the GNU Affero General Public License as published by | 7 # it under the terms of the GNU Affero General Public License as published by |
9 # the Free Software Foundation, either version 3 of the License, or | 8 # the Free Software Foundation, either version 3 of the License, or |
19 | 18 |
20 | 19 |
21 from collections import namedtuple | 20 from collections import namedtuple |
22 import urllib.request, urllib.parse, urllib.error | 21 import urllib.request, urllib.parse, urllib.error |
23 from functools import reduce | 22 from functools import reduce |
24 | |
25 from sat.core.i18n import _ | |
26 from sat.core.constants import Const as C | |
27 from sat.core.log import getLogger | |
28 | |
29 from sat.core import exceptions | |
30 from sat.tools import sat_defer | |
31 from sat.tools.common import data_format | |
32 | |
33 from zope.interface import implementer | 23 from zope.interface import implementer |
34 | 24 from twisted.words.xish import domish |
25 from twisted.words.protocols.jabber import jid, error | |
26 from twisted.internet import reactor, defer | |
35 from wokkel import disco | 27 from wokkel import disco |
36 from wokkel import data_form | 28 from wokkel import data_form |
37 from wokkel import generic | 29 from wokkel import generic |
38 | |
39 # XXX: sat_tmp.wokkel.pubsub is actually use instead of wokkel version | 30 # XXX: sat_tmp.wokkel.pubsub is actually use instead of wokkel version |
40 # mam and rsm come from sat_tmp.wokkel too | 31 # mam and rsm come from sat_tmp.wokkel too |
41 from wokkel import pubsub | 32 from wokkel import pubsub |
42 from wokkel import rsm | 33 from wokkel import rsm |
43 from wokkel import mam | 34 from wokkel import mam |
44 | 35 from sat.core.i18n import _ |
45 from twisted.words.protocols.jabber import jid, error | 36 from sat.core.constants import Const as C |
46 from twisted.internet import reactor, defer | 37 from sat.core.log import getLogger |
38 from sat.core import exceptions | |
39 from sat.tools import sat_defer | |
40 from sat.tools.common import data_format | |
47 | 41 |
48 | 42 |
49 log = getLogger(__name__) | 43 log = getLogger(__name__) |
50 | 44 |
51 PLUGIN_INFO = { | 45 PLUGIN_INFO = { |
84 ACCESS_ROSTER = "roster" | 78 ACCESS_ROSTER = "roster" |
85 ACCESS_PUBLISHER_ROSTER = "publisher-roster" | 79 ACCESS_PUBLISHER_ROSTER = "publisher-roster" |
86 ACCESS_AUTHORIZE = "authorize" | 80 ACCESS_AUTHORIZE = "authorize" |
87 ACCESS_WHITELIST = "whitelist" | 81 ACCESS_WHITELIST = "whitelist" |
88 ID_SINGLETON = "current" | 82 ID_SINGLETON = "current" |
83 EXTRA_PUBLISH_OPTIONS = "publish_options" | |
84 EXTRA_ON_PRECOND_NOT_MET = "on_precondition_not_met" | |
89 | 85 |
90 def __init__(self, host): | 86 def __init__(self, host): |
91 log.info(_("PubSub plugin initialization")) | 87 log.info(_("PubSub plugin initialization")) |
92 self.host = host | 88 self.host = host |
93 self._rsm = host.plugins.get("XEP-0059") | 89 self._rsm = host.plugins.get("XEP-0059") |
477 d = self.sendItems( | 473 d = self.sendItems( |
478 client, service, nodeIdentifier, items, extra | 474 client, service, nodeIdentifier, items, extra |
479 ) | 475 ) |
480 return d | 476 return d |
481 | 477 |
482 def _getPublishedItemId(self, iq_elt, original_id): | 478 def _getPublishedItemId(self, published_ids, original_id): |
483 """return item of published id if found in answer | 479 """Return item of published id if found in answer |
484 | 480 |
485 if not found original_id is returned, or empty string if it is None or empty | 481 if not found original_id is returned, which may be None |
486 string | |
487 """ | 482 """ |
488 try: | 483 try: |
489 item_id = iq_elt.pubsub.publish.item["id"] | 484 return published_ids[0] |
490 except (AttributeError, KeyError): | 485 except IndexError: |
491 item_id = None | 486 return original_id |
492 return item_id or original_id | |
493 | 487 |
494 def sendItem(self, client, service, nodeIdentifier, payload, item_id=None, | 488 def sendItem(self, client, service, nodeIdentifier, payload, item_id=None, |
495 extra=None): | 489 extra=None): |
496 """High level method to send one item | 490 """High level method to send one item |
497 | 491 |
501 @param payload(domish.Element, unicode): payload of the item to send | 495 @param payload(domish.Element, unicode): payload of the item to send |
502 @param item_id(unicode, None): id to use or None to create one | 496 @param item_id(unicode, None): id to use or None to create one |
503 @param extra(dict, None): extra option, not used yet | 497 @param extra(dict, None): extra option, not used yet |
504 @return (unicode, None): id of the created item | 498 @return (unicode, None): id of the created item |
505 """ | 499 """ |
506 if extra is None: | 500 item_elt = domish.Element((pubsub.NS_PUBSUB, 'item')) |
507 extra = {} | 501 if item_id is not None: |
508 publish_options = extra.get('publish_options') | 502 item_elt['id'] = item_id |
509 item_elt = pubsub.Item(id=item_id, payload=payload) | 503 item_elt.addChild(payload) |
510 d = self.publish( | 504 d = defer.ensureDeferred(self.sendItems( |
511 client, service, nodeIdentifier, [item_elt], options=publish_options) | 505 client, |
506 service, | |
507 nodeIdentifier, | |
508 [item_elt], | |
509 extra | |
510 )) | |
512 d.addCallback(self._getPublishedItemId, item_id) | 511 d.addCallback(self._getPublishedItemId, item_id) |
513 return d | 512 return d |
514 | 513 |
515 def _publishCb(self, iq_result): | 514 async def sendItems(self, client, service, nodeIdentifier, items, extra=None): |
516 """Parse publish result, and return ids given by pubsub service""" | |
517 try: | |
518 item_ids = [item['id'] | |
519 for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, 'item')] | |
520 except AttributeError: | |
521 return [] | |
522 return item_ids | |
523 | |
524 def sendItems(self, client, service, nodeIdentifier, items, extra=None): | |
525 """High level method to send several items at once | 515 """High level method to send several items at once |
526 | 516 |
527 @param service(jid.JID, None): service to send the item to | 517 @param service(jid.JID, None): service to send the item to |
528 None to use PEP | 518 None to use PEP |
529 @param NodeIdentifier(unicode): PubSub node to use | 519 @param NodeIdentifier(unicode): PubSub node to use |
530 @param items(list[domish.Element]): whole item elements to send, | 520 @param items(list[domish.Element]): whole item elements to send, |
531 "id" will be used if set | 521 "id" will be used if set |
532 @param extra(dict, None): extra options. Key can be: | 522 @param extra(dict, None): extra options. Key can be: |
533 - publish_options: dict of publish-options | 523 - self.EXTRA_PUBLISH_OPTIONS(dict): publish options, cf. XEP-0060 § 7.1.5 |
524 the dict maps option name to value(s) | |
525 - self.EXTRA_ON_PRECOND_NOT_MET(str): policy to have when publishing is | |
526 failing du to failing precondition. Value can be: | |
527 * raise (default): raise the exception | |
528 * publish_without_options: re-publish without the publish-options. | |
529 A warning will be logged showing that the publish-options could not | |
530 be used | |
534 @return (list[unicode]): ids of the created items | 531 @return (list[unicode]): ids of the created items |
535 """ | 532 """ |
536 if extra is None: | 533 if extra is None: |
537 extra = {} | 534 extra = {} |
538 parsed_items = [] | 535 parsed_items = [] |
539 for item in items: | 536 for item in items: |
540 if item.name != 'item': | 537 if item.name != 'item': |
541 raise exceptions.DataError(_("Invalid item: {xml}").format(item.toXml())) | 538 raise exceptions.DataError(_("Invalid item: {xml}").format(item.toXml())) |
542 item_id = item.getAttribute("id") | 539 item_id = item.getAttribute("id") |
543 parsed_items.append(pubsub.Item(id=item_id, payload=item.firstChildElement())) | 540 parsed_items.append(pubsub.Item(id=item_id, payload=item.firstChildElement())) |
544 publish_options = extra.get('publish_options') | 541 publish_options = extra.get(self.EXTRA_PUBLISH_OPTIONS) |
545 d = self.publish( | 542 try: |
546 client, service, nodeIdentifier, parsed_items, options=publish_options) | 543 iq_result = await self.publish( |
547 d.addCallback(self._publishCb) | 544 client, service, nodeIdentifier, parsed_items, options=publish_options) |
548 return d | 545 except error.StanzaError as e: |
546 if ((e.condition == 'conflict' and e.appCondition | |
547 and e.appCondition.name == 'precondition-not-met' | |
548 and publish_options is not None)): | |
549 # this usually happens when publish-options can't be set | |
550 policy = extra.get(self.EXTRA_ON_PRECOND_NOT_MET, 'raise') | |
551 if policy == 'raise': | |
552 raise e | |
553 elif policy == 'publish_without_options': | |
554 log.warning(_( | |
555 "Can't use publish-options ({options}) on node {node}, " | |
556 "re-publishing without them: {reason}").format( | |
557 options=', '.join(f'{k} = {v}' | |
558 for k,v in publish_options.items()), | |
559 node=nodeIdentifier, | |
560 reason=e, | |
561 ) | |
562 ) | |
563 iq_result = await self.publish( | |
564 client, service, nodeIdentifier, parsed_items) | |
565 else: | |
566 raise exceptions.InternalError( | |
567 f"Invalid policy in extra's {self.EXTRA_ON_PRECOND_NOT_MET!r}: " | |
568 f"{policy}" | |
569 ) | |
570 else: | |
571 raise e | |
572 try: | |
573 return [item['id'] | |
574 for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, 'item')] | |
575 except AttributeError: | |
576 return [] | |
549 | 577 |
550 def publish(self, client, service, nodeIdentifier, items=None, options=None): | 578 def publish(self, client, service, nodeIdentifier, items=None, options=None): |
551 return client.pubsub_client.publish( | 579 return client.pubsub_client.publish( |
552 service, nodeIdentifier, items, client.pubsub_client.parent.jid, | 580 service, nodeIdentifier, items, client.pubsub_client.parent.jid, |
553 options=options | 581 options=options |