Mercurial > libervia-backend
comparison sat/plugins/plugin_xep_0060.py @ 3715:b9718216a1c0 0.9
merge bookmark 0.9
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 01 Dec 2021 16:13:31 +0100 |
parents | 5d108ce026d7 |
children | 1cdb9d9fad6b |
comparison
equal
deleted
inserted
replaced
3714:af09b5aaa5d7 | 3715:b9718216a1c0 |
---|---|
15 | 15 |
16 # You should have received a copy of the GNU Affero General Public License | 16 # You should have received a copy of the GNU Affero General Public License |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 17 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
18 | 18 |
19 | 19 |
20 from typing import Optional | 20 from typing import Optional, List, Tuple |
21 from collections import namedtuple | 21 from collections import namedtuple |
22 import urllib.request, urllib.parse, urllib.error | 22 import urllib.request, urllib.parse, urllib.error |
23 from functools import reduce | 23 from functools import reduce |
24 from zope.interface import implementer | 24 from zope.interface import implementer |
25 from twisted.words.xish import domish | 25 from twisted.words.xish import domish |
33 from wokkel import rsm | 33 from wokkel import rsm |
34 from wokkel import mam | 34 from wokkel import mam |
35 from sat.core.i18n import _ | 35 from sat.core.i18n import _ |
36 from sat.core.constants import Const as C | 36 from sat.core.constants import Const as C |
37 from sat.core.log import getLogger | 37 from sat.core.log import getLogger |
38 from sat.core.xmpp import SatXMPPEntity | 38 from sat.core.core_types import SatXMPPEntity |
39 from sat.core import exceptions | 39 from sat.core import exceptions |
40 from sat.tools import utils | |
40 from sat.tools import sat_defer | 41 from sat.tools import sat_defer |
41 from sat.tools import xml_tools | 42 from sat.tools import xml_tools |
42 from sat.tools.common import data_format | 43 from sat.tools.common import data_format |
43 | 44 |
44 | 45 |
88 OWPOL_ORIGINAL = "original_publisher" | 89 OWPOL_ORIGINAL = "original_publisher" |
89 OWPOL_ANY_PUB = "any_publisher" | 90 OWPOL_ANY_PUB = "any_publisher" |
90 ID_SINGLETON = "current" | 91 ID_SINGLETON = "current" |
91 EXTRA_PUBLISH_OPTIONS = "publish_options" | 92 EXTRA_PUBLISH_OPTIONS = "publish_options" |
92 EXTRA_ON_PRECOND_NOT_MET = "on_precondition_not_met" | 93 EXTRA_ON_PRECOND_NOT_MET = "on_precondition_not_met" |
94 # extra disco needed for RSM, cf. XEP-0060 § 6.5.4 | |
95 DISCO_RSM = "http://jabber.org/protocol/pubsub#rsm" | |
93 | 96 |
94 def __init__(self, host): | 97 def __init__(self, host): |
95 log.info(_("PubSub plugin initialization")) | 98 log.info(_("PubSub plugin initialization")) |
96 self.host = host | 99 self.host = host |
97 self._rsm = host.plugins.get("XEP-0059") | 100 self._rsm = host.plugins.get("XEP-0059") |
195 async_=True, | 198 async_=True, |
196 ) | 199 ) |
197 host.bridge.addMethod( | 200 host.bridge.addMethod( |
198 "psItemsGet", | 201 "psItemsGet", |
199 ".plugin", | 202 ".plugin", |
200 in_sign="ssiassa{ss}s", | 203 in_sign="ssiassss", |
201 out_sign="s", | 204 out_sign="s", |
202 method=self._getItems, | 205 method=self._getItems, |
203 async_=True, | 206 async_=True, |
204 ) | 207 ) |
205 host.bridge.addMethod( | 208 host.bridge.addMethod( |
282 async_=True, | 285 async_=True, |
283 ) | 286 ) |
284 host.bridge.addMethod( | 287 host.bridge.addMethod( |
285 "psGetFromMany", | 288 "psGetFromMany", |
286 ".plugin", | 289 ".plugin", |
287 in_sign="a(ss)ia{ss}s", | 290 in_sign="a(ss)iss", |
288 out_sign="s", | 291 out_sign="s", |
289 method=self._getFromMany, | 292 method=self._getFromMany, |
290 ) | 293 ) |
291 host.bridge.addMethod( | 294 host.bridge.addMethod( |
292 "psGetFromManyRTResult", | 295 "psGetFromManyRTResult", |
389 all node *prefixed* with this one will be triggered | 392 all node *prefixed* with this one will be triggered |
390 @param **kwargs: method(s) to call when the node is found | 393 @param **kwargs: method(s) to call when the node is found |
391 the method must be named after PubSub constants in lower case | 394 the method must be named after PubSub constants in lower case |
392 and suffixed with "_cb" | 395 and suffixed with "_cb" |
393 e.g.: "items_cb" for C.PS_ITEMS, "delete_cb" for C.PS_DELETE | 396 e.g.: "items_cb" for C.PS_ITEMS, "delete_cb" for C.PS_DELETE |
397 note: only C.PS_ITEMS and C.PS_DELETE are implemented so far | |
394 """ | 398 """ |
395 assert node is not None | 399 assert node is not None |
396 assert kwargs | 400 assert kwargs |
397 callbacks = self._node_cb.setdefault(node, {}) | 401 callbacks = self._node_cb.setdefault(node, {}) |
398 for event, cb in kwargs.items(): | 402 for event, cb in kwargs.items(): |
469 profile_key=C.PROF_KEY_NONE): | 473 profile_key=C.PROF_KEY_NONE): |
470 client = self.host.getClient(profile_key) | 474 client = self.host.getClient(profile_key) |
471 service = None if not service else jid.JID(service) | 475 service = None if not service else jid.JID(service) |
472 payload = xml_tools.parse(payload) | 476 payload = xml_tools.parse(payload) |
473 extra = data_format.deserialise(extra_ser) | 477 extra = data_format.deserialise(extra_ser) |
474 d = self.sendItem( | 478 d = defer.ensureDeferred(self.sendItem( |
475 client, service, nodeIdentifier, payload, item_id or None, extra | 479 client, service, nodeIdentifier, payload, item_id or None, extra |
476 ) | 480 )) |
477 d.addCallback(lambda ret: ret or "") | 481 d.addCallback(lambda ret: ret or "") |
478 return d | 482 return d |
479 | 483 |
480 def _sendItems(self, service, nodeIdentifier, items, extra_ser=None, | 484 def _sendItems(self, service, nodeIdentifier, items, extra_ser=None, |
481 profile_key=C.PROF_KEY_NONE): | 485 profile_key=C.PROF_KEY_NONE): |
485 items = [xml_tools.parse(item) for item in items] | 489 items = [xml_tools.parse(item) for item in items] |
486 except Exception as e: | 490 except Exception as e: |
487 raise exceptions.DataError(_("Can't parse items: {msg}").format( | 491 raise exceptions.DataError(_("Can't parse items: {msg}").format( |
488 msg=e)) | 492 msg=e)) |
489 extra = data_format.deserialise(extra_ser) | 493 extra = data_format.deserialise(extra_ser) |
490 d = self.sendItems( | 494 return defer.ensureDeferred(self.sendItems( |
491 client, service, nodeIdentifier, items, extra | 495 client, service, nodeIdentifier, items, extra |
492 ) | 496 )) |
493 return d | 497 |
494 | 498 async def sendItem( |
495 def _getPublishedItemId(self, published_ids, original_id): | 499 self, client, service, nodeIdentifier, payload, item_id=None, extra=None |
496 """Return item of published id if found in answer | 500 ): |
497 | |
498 if not found original_id is returned, which may be None | |
499 """ | |
500 try: | |
501 return published_ids[0] | |
502 except IndexError: | |
503 return original_id | |
504 | |
505 def sendItem(self, client, service, nodeIdentifier, payload, item_id=None, | |
506 extra=None): | |
507 """High level method to send one item | 501 """High level method to send one item |
508 | 502 |
509 @param service(jid.JID, None): service to send the item to | 503 @param service(jid.JID, None): service to send the item to |
510 None to use PEP | 504 None to use PEP |
511 @param NodeIdentifier(unicode): PubSub node to use | 505 @param NodeIdentifier(unicode): PubSub node to use |
517 assert isinstance(payload, domish.Element) | 511 assert isinstance(payload, domish.Element) |
518 item_elt = domish.Element((pubsub.NS_PUBSUB, 'item')) | 512 item_elt = domish.Element((pubsub.NS_PUBSUB, 'item')) |
519 if item_id is not None: | 513 if item_id is not None: |
520 item_elt['id'] = item_id | 514 item_elt['id'] = item_id |
521 item_elt.addChild(payload) | 515 item_elt.addChild(payload) |
522 d = defer.ensureDeferred(self.sendItems( | 516 published_ids = await self.sendItems( |
523 client, | 517 client, |
524 service, | 518 service, |
525 nodeIdentifier, | 519 nodeIdentifier, |
526 [item_elt], | 520 [item_elt], |
527 extra | 521 extra |
528 )) | 522 ) |
529 d.addCallback(self._getPublishedItemId, item_id) | 523 try: |
530 return d | 524 return published_ids[0] |
525 except IndexError: | |
526 return item_id | |
531 | 527 |
532 async def sendItems(self, client, service, nodeIdentifier, items, extra=None): | 528 async def sendItems(self, client, service, nodeIdentifier, items, extra=None): |
533 """High level method to send several items at once | 529 """High level method to send several items at once |
534 | 530 |
535 @param service(jid.JID, None): service to send the item to | 531 @param service(jid.JID, None): service to send the item to |
591 return [item['id'] | 587 return [item['id'] |
592 for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, 'item')] | 588 for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, 'item')] |
593 except AttributeError: | 589 except AttributeError: |
594 return [] | 590 return [] |
595 | 591 |
596 def publish(self, client, service, nodeIdentifier, items=None, options=None): | 592 async def publish( |
597 return client.pubsub_client.publish( | 593 self, |
594 client: SatXMPPEntity, | |
595 service: jid.JID, | |
596 nodeIdentifier: str, | |
597 items: Optional[List[domish.Element]] = None, | |
598 options: Optional[dict] = None | |
599 ) -> List[str]: | |
600 published_ids = await client.pubsub_client.publish( | |
598 service, nodeIdentifier, items, client.pubsub_client.parent.jid, | 601 service, nodeIdentifier, items, client.pubsub_client.parent.jid, |
599 options=options | 602 options=options |
600 ) | 603 ) |
604 | |
605 await self.host.trigger.asyncPoint( | |
606 "XEP-0060_publish", client, service, nodeIdentifier, items, options, | |
607 published_ids | |
608 ) | |
609 return published_ids | |
601 | 610 |
602 def _unwrapMAMMessage(self, message_elt): | 611 def _unwrapMAMMessage(self, message_elt): |
603 try: | 612 try: |
604 item_elt = reduce( | 613 item_elt = reduce( |
605 lambda elt, ns_name: next(elt.elements(*ns_name)), | 614 lambda elt, ns_name: next(elt.elements(*ns_name)), |
619 items, metadata = items_data | 628 items, metadata = items_data |
620 metadata['items'] = items | 629 metadata['items'] = items |
621 return data_format.serialise(metadata) | 630 return data_format.serialise(metadata) |
622 | 631 |
623 def _getItems(self, service="", node="", max_items=10, item_ids=None, sub_id=None, | 632 def _getItems(self, service="", node="", max_items=10, item_ids=None, sub_id=None, |
624 extra_dict=None, profile_key=C.PROF_KEY_NONE): | 633 extra="", profile_key=C.PROF_KEY_NONE): |
625 """Get items from pubsub node | 634 """Get items from pubsub node |
626 | 635 |
627 @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit | 636 @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit |
628 """ | 637 """ |
629 client = self.host.getClient(profile_key) | 638 client = self.host.getClient(profile_key) |
630 service = jid.JID(service) if service else None | 639 service = jid.JID(service) if service else None |
631 max_items = None if max_items == C.NO_LIMIT else max_items | 640 max_items = None if max_items == C.NO_LIMIT else max_items |
632 extra = self.parseExtra(extra_dict) | 641 extra = self.parseExtra(data_format.deserialise(extra)) |
633 d = self.getItems( | 642 d = defer.ensureDeferred(self.getItems( |
634 client, | 643 client, |
635 service, | 644 service, |
636 node or None, | 645 node or None, |
637 max_items or None, | 646 max_items, |
638 item_ids, | 647 item_ids, |
639 sub_id or None, | 648 sub_id or None, |
640 extra.rsm_request, | 649 extra.rsm_request, |
641 extra.extra, | 650 extra.extra, |
642 ) | 651 )) |
643 d.addCallback(self.transItemsData) | 652 d.addCallback(self.transItemsData) |
644 d.addCallback(self.serialiseItems) | 653 d.addCallback(self.serialiseItems) |
645 return d | 654 return d |
646 | 655 |
647 def getItems(self, client, service, node, max_items=None, item_ids=None, sub_id=None, | 656 async def getItems( |
648 rsm_request=None, extra=None): | 657 self, |
658 client: SatXMPPEntity, | |
659 service: Optional[jid.JID], | |
660 node: str, | |
661 max_items: Optional[int] = None, | |
662 item_ids: Optional[List[str]] = None, | |
663 sub_id: Optional[str] = None, | |
664 rsm_request: Optional[rsm.RSMRequest] = None, | |
665 extra: Optional[dict] = None | |
666 ) -> Tuple[List[dict], dict]: | |
649 """Retrieve pubsub items from a node. | 667 """Retrieve pubsub items from a node. |
650 | 668 |
651 @param service (JID, None): pubsub service. | 669 @param service (JID, None): pubsub service. |
652 @param node (str): node id. | 670 @param node (str): node id. |
653 @param max_items (int): optional limit on the number of retrieved items. | 671 @param max_items (int): optional limit on the number of retrieved items. |
666 max_items = None | 684 max_items = None |
667 if rsm_request and item_ids: | 685 if rsm_request and item_ids: |
668 raise ValueError("items_id can't be used with rsm") | 686 raise ValueError("items_id can't be used with rsm") |
669 if extra is None: | 687 if extra is None: |
670 extra = {} | 688 extra = {} |
689 cont, ret = await self.host.trigger.asyncReturnPoint( | |
690 "XEP-0060_getItems", client, service, node, max_items, item_ids, sub_id, | |
691 rsm_request, extra | |
692 ) | |
693 if not cont: | |
694 return ret | |
671 try: | 695 try: |
672 mam_query = extra["mam"] | 696 mam_query = extra["mam"] |
673 except KeyError: | 697 except KeyError: |
674 d = client.pubsub_client.items( | 698 d = client.pubsub_client.items( |
675 service = service, | 699 service = service, |
680 itemIdentifiers = item_ids, | 704 itemIdentifiers = item_ids, |
681 orderBy = extra.get(C.KEY_ORDER_BY), | 705 orderBy = extra.get(C.KEY_ORDER_BY), |
682 rsm_request = rsm_request | 706 rsm_request = rsm_request |
683 ) | 707 ) |
684 # we have no MAM data here, so we add None | 708 # we have no MAM data here, so we add None |
685 d.addCallback(lambda data: data + (None,)) | |
686 d.addErrback(sat_defer.stanza2NotFound) | 709 d.addErrback(sat_defer.stanza2NotFound) |
687 d.addTimeout(TIMEOUT, reactor) | 710 d.addTimeout(TIMEOUT, reactor) |
711 items, rsm_response = await d | |
712 mam_response = None | |
688 else: | 713 else: |
689 # if mam is requested, we have to do a totally different query | 714 # if mam is requested, we have to do a totally different query |
690 if self._mam is None: | 715 if self._mam is None: |
691 raise exceptions.NotFound("MAM (XEP-0313) plugin is not available") | 716 raise exceptions.NotFound("MAM (XEP-0313) plugin is not available") |
692 if max_items is not None: | 717 if max_items is not None: |
704 else: | 729 else: |
705 if mam_query.rsm != rsm_request: | 730 if mam_query.rsm != rsm_request: |
706 raise exceptions.DataError( | 731 raise exceptions.DataError( |
707 "Conflict between RSM request and MAM's RSM request" | 732 "Conflict between RSM request and MAM's RSM request" |
708 ) | 733 ) |
709 d = self._mam.getArchives(client, mam_query, service, self._unwrapMAMMessage) | 734 items, rsm_response, mam_response = await self._mam.getArchives( |
735 client, mam_query, service, self._unwrapMAMMessage | |
736 ) | |
710 | 737 |
711 try: | 738 try: |
712 subscribe = C.bool(extra["subscribe"]) | 739 subscribe = C.bool(extra["subscribe"]) |
713 except KeyError: | 740 except KeyError: |
714 subscribe = False | 741 subscribe = False |
715 | 742 |
716 def subscribeEb(failure, service, node): | 743 if subscribe: |
717 failure.trap(error.StanzaError) | 744 try: |
718 log.warning( | 745 await self.subscribe(client, service, node) |
719 "Could not subscribe to node {} on service {}: {}".format( | 746 except error.StanzaError as e: |
720 node, str(service), str(failure.value) | 747 log.warning( |
748 f"Could not subscribe to node {node} on service {service}: {e}" | |
721 ) | 749 ) |
722 ) | 750 |
723 | 751 # TODO: handle mam_response |
724 def doSubscribe(data): | 752 service_jid = service if service else client.jid.userhostJID() |
725 self.subscribe(client, service, node).addErrback( | 753 metadata = { |
726 subscribeEb, service, node | 754 "service": service_jid, |
727 ) | 755 "node": node, |
728 return data | 756 "uri": self.getNodeURI(service_jid, node), |
729 | 757 } |
730 if subscribe: | 758 if mam_response is not None: |
731 d.addCallback(doSubscribe) | 759 # mam_response is a dict with "complete" and "stable" keys |
732 | 760 # we can put them directly in metadata |
733 def addMetadata(result): | 761 metadata.update(mam_response) |
734 # TODO: handle the third argument (mam_response) | 762 if rsm_request is not None and rsm_response is not None: |
735 items, rsm_response, mam_response = result | 763 metadata['rsm'] = rsm_response.toDict() |
736 service_jid = service if service else client.jid.userhostJID() | 764 if mam_response is None: |
737 metadata = { | 765 index = rsm_response.index |
738 "service": service_jid, | 766 count = rsm_response.count |
739 "node": node, | 767 if index is None or count is None: |
740 "uri": self.getNodeURI(service_jid, node), | 768 # we don't have enough information to know if the data is complete |
741 } | 769 # or not |
742 if mam_response is not None: | 770 metadata["complete"] = None |
743 # mam_response is a dict with "complete" and "stable" keys | 771 else: |
744 # we can put them directly in metadata | 772 # normally we have a strict equality here but XEP-0059 states |
745 metadata.update(mam_response) | 773 # that index MAY be approximative, so just in case… |
746 if rsm_request is not None and rsm_response is not None: | 774 metadata["complete"] = index + len(items) >= count |
747 metadata['rsm'] = rsm_response.toDict() | 775 |
748 if mam_response is None: | 776 return (items, metadata) |
749 index = rsm_response.index | |
750 count = rsm_response.count | |
751 if index is None or count is None: | |
752 # we don't have enough information to know if the data is complete | |
753 # or not | |
754 metadata["complete"] = None | |
755 else: | |
756 # normally we have a strict equality here but XEP-0059 states | |
757 # that index MAY be approximative, so just in case… | |
758 metadata["complete"] = index + len(items) >= count | |
759 | |
760 return (items, metadata) | |
761 | |
762 d.addCallback(addMetadata) | |
763 return d | |
764 | 777 |
765 # @defer.inlineCallbacks | 778 # @defer.inlineCallbacks |
766 # def getItemsFromMany(self, service, data, max_items=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE): | 779 # def getItemsFromMany(self, service, data, max_items=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE): |
767 # """Massively retrieve pubsub items from many nodes. | 780 # """Massively retrieve pubsub items from many nodes. |
768 # @param service (JID): target service. | 781 # @param service (JID): target service. |
1057 nodeIdentifier, | 1070 nodeIdentifier, |
1058 itemIdentifiers, | 1071 itemIdentifiers, |
1059 notify=True, | 1072 notify=True, |
1060 ): | 1073 ): |
1061 return client.pubsub_client.retractItems( | 1074 return client.pubsub_client.retractItems( |
1062 service, nodeIdentifier, itemIdentifiers, notify=True | 1075 service, nodeIdentifier, itemIdentifiers, notify=notify |
1063 ) | 1076 ) |
1064 | 1077 |
1065 def _renameItem( | 1078 def _renameItem( |
1066 self, | 1079 self, |
1067 service, | 1080 service, |
1098 await self.retractItems(client, service, node, [item_id]) | 1111 await self.retractItems(client, service, node, [item_id]) |
1099 | 1112 |
1100 def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE): | 1113 def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE): |
1101 client = self.host.getClient(profile_key) | 1114 client = self.host.getClient(profile_key) |
1102 service = None if not service else jid.JID(service) | 1115 service = None if not service else jid.JID(service) |
1103 d = self.subscribe(client, service, nodeIdentifier, options=options or None) | 1116 d = defer.ensureDeferred( |
1117 self.subscribe(client, service, nodeIdentifier, options=options or None) | |
1118 ) | |
1104 d.addCallback(lambda subscription: subscription.subscriptionIdentifier or "") | 1119 d.addCallback(lambda subscription: subscription.subscriptionIdentifier or "") |
1105 return d | 1120 return d |
1106 | 1121 |
1107 def subscribe(self, client, service, nodeIdentifier, sub_jid=None, options=None): | 1122 async def subscribe( |
1123 self, | |
1124 client: SatXMPPEntity, | |
1125 service: jid.JID, | |
1126 nodeIdentifier: str, | |
1127 sub_jid: Optional[jid.JID] = None, | |
1128 options: Optional[dict] = None | |
1129 ) -> pubsub.Subscription: | |
1108 # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe | 1130 # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe |
1109 return client.pubsub_client.subscribe( | 1131 subscription = await client.pubsub_client.subscribe( |
1110 service, nodeIdentifier, sub_jid or client.jid.userhostJID(), options=options | 1132 service, nodeIdentifier, sub_jid or client.jid.userhostJID(), options=options |
1111 ) | 1133 ) |
1134 await self.host.trigger.asyncPoint( | |
1135 "XEP-0060_subscribe", client, service, nodeIdentifier, sub_jid, options, | |
1136 subscription | |
1137 ) | |
1138 return subscription | |
1112 | 1139 |
1113 def _unsubscribe(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE): | 1140 def _unsubscribe(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE): |
1114 client = self.host.getClient(profile_key) | 1141 client = self.host.getClient(profile_key) |
1115 service = None if not service else jid.JID(service) | 1142 service = None if not service else jid.JID(service) |
1116 return self.unsubscribe(client, service, nodeIdentifier) | 1143 return defer.ensureDeferred(self.unsubscribe(client, service, nodeIdentifier)) |
1117 | 1144 |
1118 def unsubscribe( | 1145 async def unsubscribe( |
1119 self, | 1146 self, |
1120 client, | 1147 client: SatXMPPEntity, |
1121 service, | 1148 service: jid.JID, |
1122 nodeIdentifier, | 1149 nodeIdentifier: str, |
1123 sub_jid=None, | 1150 sub_jid=None, |
1124 subscriptionIdentifier=None, | 1151 subscriptionIdentifier=None, |
1125 sender=None, | 1152 sender=None, |
1126 ): | 1153 ): |
1127 return client.pubsub_client.unsubscribe( | 1154 await client.pubsub_client.unsubscribe( |
1128 service, | 1155 service, |
1129 nodeIdentifier, | 1156 nodeIdentifier, |
1130 sub_jid or client.jid.userhostJID(), | 1157 sub_jid or client.jid.userhostJID(), |
1131 subscriptionIdentifier, | 1158 subscriptionIdentifier, |
1132 sender, | 1159 sender, |
1160 ) | |
1161 await self.host.trigger.asyncPoint( | |
1162 "XEP-0060_unsubscribe", client, service, nodeIdentifier, sub_jid, | |
1163 subscriptionIdentifier, sender | |
1133 ) | 1164 ) |
1134 | 1165 |
1135 def _subscriptions(self, service, nodeIdentifier="", profile_key=C.PROF_KEY_NONE): | 1166 def _subscriptions(self, service, nodeIdentifier="", profile_key=C.PROF_KEY_NONE): |
1136 client = self.host.getClient(profile_key) | 1167 client = self.host.getClient(profile_key) |
1137 service = None if not service else jid.JID(service) | 1168 service = None if not service else jid.JID(service) |
1392 @return (str): RT Deferred session id | 1423 @return (str): RT Deferred session id |
1393 """ | 1424 """ |
1394 client = self.host.getClient(profile_key) | 1425 client = self.host.getClient(profile_key) |
1395 deferreds = {} | 1426 deferreds = {} |
1396 for service, node in node_data: | 1427 for service, node in node_data: |
1397 deferreds[(service, node)] = client.pubsub_client.subscribe( | 1428 deferreds[(service, node)] = defer.ensureDeferred( |
1398 service, node, subscriber, options=options | 1429 client.pubsub_client.subscribe( |
1430 service, node, subscriber, options=options | |
1431 ) | |
1399 ) | 1432 ) |
1400 return self.rt_sessions.newSession(deferreds, client.profile) | 1433 return self.rt_sessions.newSession(deferreds, client.profile) |
1401 # found_nodes = yield self.listNodes(service, profile=client.profile) | 1434 # found_nodes = yield self.listNodes(service, profile=client.profile) |
1402 # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile) | 1435 # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile) |
1403 # d_list = [] | 1436 # d_list = [] |
1443 ) | 1476 ) |
1444 ) | 1477 ) |
1445 return d | 1478 return d |
1446 | 1479 |
1447 def _getFromMany( | 1480 def _getFromMany( |
1448 self, node_data, max_item=10, extra_dict=None, profile_key=C.PROF_KEY_NONE | 1481 self, node_data, max_item=10, extra="", profile_key=C.PROF_KEY_NONE |
1449 ): | 1482 ): |
1450 """ | 1483 """ |
1451 @param max_item(int): maximum number of item to get, C.NO_LIMIT for no limit | 1484 @param max_item(int): maximum number of item to get, C.NO_LIMIT for no limit |
1452 """ | 1485 """ |
1453 max_item = None if max_item == C.NO_LIMIT else max_item | 1486 max_item = None if max_item == C.NO_LIMIT else max_item |
1454 extra = self.parseExtra(extra_dict) | 1487 extra = self.parseExtra(data_format.deserialise(extra)) |
1455 return self.getFromMany( | 1488 return self.getFromMany( |
1456 [(jid.JID(service), str(node)) for service, node in node_data], | 1489 [(jid.JID(service), str(node)) for service, node in node_data], |
1457 max_item, | 1490 max_item, |
1458 extra.rsm_request, | 1491 extra.rsm_request, |
1459 extra.extra, | 1492 extra.extra, |
1473 @return (str): RT Deferred session id | 1506 @return (str): RT Deferred session id |
1474 """ | 1507 """ |
1475 client = self.host.getClient(profile_key) | 1508 client = self.host.getClient(profile_key) |
1476 deferreds = {} | 1509 deferreds = {} |
1477 for service, node in node_data: | 1510 for service, node in node_data: |
1478 deferreds[(service, node)] = self.getItems( | 1511 deferreds[(service, node)] = defer.ensureDeferred(self.getItems( |
1479 client, service, node, max_item, rsm_request=rsm_request, extra=extra | 1512 client, service, node, max_item, rsm_request=rsm_request, extra=extra |
1480 ) | 1513 )) |
1481 return self.rt_sessions.newSession(deferreds, client.profile) | 1514 return self.rt_sessions.newSession(deferreds, client.profile) |
1482 | 1515 |
1483 | 1516 |
1484 @implementer(disco.IDisco) | 1517 @implementer(disco.IDisco) |
1485 class SatPubSubClient(rsm.PubSubClient): | 1518 class SatPubSubClient(rsm.PubSubClient): |
1511 | 1544 |
1512 | 1545 |
1513 def itemsReceived(self, event): | 1546 def itemsReceived(self, event): |
1514 log.debug("Pubsub items received") | 1547 log.debug("Pubsub items received") |
1515 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_ITEMS): | 1548 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_ITEMS): |
1516 callback(self.parent, event) | 1549 d = utils.asDeferred(callback, self.parent, event) |
1550 d.addErrback(lambda f: log.error( | |
1551 f"Error while running items event callback {callback}: {f}" | |
1552 )) | |
1517 client = self.parent | 1553 client = self.parent |
1518 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: | 1554 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: |
1519 raw_items = [i.toXml() for i in event.items] | 1555 raw_items = [i.toXml() for i in event.items] |
1520 self.host.bridge.psEventRaw( | 1556 self.host.bridge.psEventRaw( |
1521 event.sender.full(), | 1557 event.sender.full(), |
1526 ) | 1562 ) |
1527 | 1563 |
1528 def deleteReceived(self, event): | 1564 def deleteReceived(self, event): |
1529 log.debug(("Publish node deleted")) | 1565 log.debug(("Publish node deleted")) |
1530 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_DELETE): | 1566 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_DELETE): |
1531 callback(self.parent, event) | 1567 d = utils.asDeferred(callback, self.parent, event) |
1568 d.addErrback(lambda f: log.error( | |
1569 f"Error while running delete event callback {callback}: {f}" | |
1570 )) | |
1532 client = self.parent | 1571 client = self.parent |
1533 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: | 1572 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: |
1534 self.host.bridge.psEventRaw( | 1573 self.host.bridge.psEventRaw( |
1535 event.sender.full(), event.nodeIdentifier, C.PS_DELETE, [], client.profile | 1574 event.sender.full(), event.nodeIdentifier, C.PS_DELETE, [], client.profile |
1575 ) | |
1576 | |
1577 def purgeReceived(self, event): | |
1578 log.debug(("Publish node purged")) | |
1579 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_PURGE): | |
1580 d = utils.asDeferred(callback, self.parent, event) | |
1581 d.addErrback(lambda f: log.error( | |
1582 f"Error while running purge event callback {callback}: {f}" | |
1583 )) | |
1584 client = self.parent | |
1585 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: | |
1586 self.host.bridge.psEventRaw( | |
1587 event.sender.full(), event.nodeIdentifier, C.PS_PURGE, [], client.profile | |
1536 ) | 1588 ) |
1537 | 1589 |
1538 def subscriptions(self, service, nodeIdentifier, sender=None): | 1590 def subscriptions(self, service, nodeIdentifier, sender=None): |
1539 """Return the list of subscriptions to the given service and node. | 1591 """Return the list of subscriptions to the given service and node. |
1540 | 1592 |