comparison sat/plugins/plugin_xep_0060.py @ 3715:b9718216a1c0 0.9

merge bookmark 0.9
author Goffi <goffi@goffi.org>
date Wed, 01 Dec 2021 16:13:31 +0100
parents 5d108ce026d7
children 1cdb9d9fad6b
comparison
equal deleted inserted replaced
3714:af09b5aaa5d7 3715:b9718216a1c0
15 15
16 # You should have received a copy of the GNU Affero General Public License 16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. 17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 18
19 19
20 from typing import Optional 20 from typing import Optional, List, Tuple
21 from collections import namedtuple 21 from collections import namedtuple
22 import urllib.request, urllib.parse, urllib.error 22 import urllib.request, urllib.parse, urllib.error
23 from functools import reduce 23 from functools import reduce
24 from zope.interface import implementer 24 from zope.interface import implementer
25 from twisted.words.xish import domish 25 from twisted.words.xish import domish
33 from wokkel import rsm 33 from wokkel import rsm
34 from wokkel import mam 34 from wokkel import mam
35 from sat.core.i18n import _ 35 from sat.core.i18n import _
36 from sat.core.constants import Const as C 36 from sat.core.constants import Const as C
37 from sat.core.log import getLogger 37 from sat.core.log import getLogger
38 from sat.core.xmpp import SatXMPPEntity 38 from sat.core.core_types import SatXMPPEntity
39 from sat.core import exceptions 39 from sat.core import exceptions
40 from sat.tools import utils
40 from sat.tools import sat_defer 41 from sat.tools import sat_defer
41 from sat.tools import xml_tools 42 from sat.tools import xml_tools
42 from sat.tools.common import data_format 43 from sat.tools.common import data_format
43 44
44 45
88 OWPOL_ORIGINAL = "original_publisher" 89 OWPOL_ORIGINAL = "original_publisher"
89 OWPOL_ANY_PUB = "any_publisher" 90 OWPOL_ANY_PUB = "any_publisher"
90 ID_SINGLETON = "current" 91 ID_SINGLETON = "current"
91 EXTRA_PUBLISH_OPTIONS = "publish_options" 92 EXTRA_PUBLISH_OPTIONS = "publish_options"
92 EXTRA_ON_PRECOND_NOT_MET = "on_precondition_not_met" 93 EXTRA_ON_PRECOND_NOT_MET = "on_precondition_not_met"
94 # extra disco needed for RSM, cf. XEP-0060 § 6.5.4
95 DISCO_RSM = "http://jabber.org/protocol/pubsub#rsm"
93 96
94 def __init__(self, host): 97 def __init__(self, host):
95 log.info(_("PubSub plugin initialization")) 98 log.info(_("PubSub plugin initialization"))
96 self.host = host 99 self.host = host
97 self._rsm = host.plugins.get("XEP-0059") 100 self._rsm = host.plugins.get("XEP-0059")
195 async_=True, 198 async_=True,
196 ) 199 )
197 host.bridge.addMethod( 200 host.bridge.addMethod(
198 "psItemsGet", 201 "psItemsGet",
199 ".plugin", 202 ".plugin",
200 in_sign="ssiassa{ss}s", 203 in_sign="ssiassss",
201 out_sign="s", 204 out_sign="s",
202 method=self._getItems, 205 method=self._getItems,
203 async_=True, 206 async_=True,
204 ) 207 )
205 host.bridge.addMethod( 208 host.bridge.addMethod(
282 async_=True, 285 async_=True,
283 ) 286 )
284 host.bridge.addMethod( 287 host.bridge.addMethod(
285 "psGetFromMany", 288 "psGetFromMany",
286 ".plugin", 289 ".plugin",
287 in_sign="a(ss)ia{ss}s", 290 in_sign="a(ss)iss",
288 out_sign="s", 291 out_sign="s",
289 method=self._getFromMany, 292 method=self._getFromMany,
290 ) 293 )
291 host.bridge.addMethod( 294 host.bridge.addMethod(
292 "psGetFromManyRTResult", 295 "psGetFromManyRTResult",
389 all node *prefixed* with this one will be triggered 392 all node *prefixed* with this one will be triggered
390 @param **kwargs: method(s) to call when the node is found 393 @param **kwargs: method(s) to call when the node is found
391 the method must be named after PubSub constants in lower case 394 the method must be named after PubSub constants in lower case
392 and suffixed with "_cb" 395 and suffixed with "_cb"
393 e.g.: "items_cb" for C.PS_ITEMS, "delete_cb" for C.PS_DELETE 396 e.g.: "items_cb" for C.PS_ITEMS, "delete_cb" for C.PS_DELETE
397 note: only C.PS_ITEMS and C.PS_DELETE are implemented so far
394 """ 398 """
395 assert node is not None 399 assert node is not None
396 assert kwargs 400 assert kwargs
397 callbacks = self._node_cb.setdefault(node, {}) 401 callbacks = self._node_cb.setdefault(node, {})
398 for event, cb in kwargs.items(): 402 for event, cb in kwargs.items():
469 profile_key=C.PROF_KEY_NONE): 473 profile_key=C.PROF_KEY_NONE):
470 client = self.host.getClient(profile_key) 474 client = self.host.getClient(profile_key)
471 service = None if not service else jid.JID(service) 475 service = None if not service else jid.JID(service)
472 payload = xml_tools.parse(payload) 476 payload = xml_tools.parse(payload)
473 extra = data_format.deserialise(extra_ser) 477 extra = data_format.deserialise(extra_ser)
474 d = self.sendItem( 478 d = defer.ensureDeferred(self.sendItem(
475 client, service, nodeIdentifier, payload, item_id or None, extra 479 client, service, nodeIdentifier, payload, item_id or None, extra
476 ) 480 ))
477 d.addCallback(lambda ret: ret or "") 481 d.addCallback(lambda ret: ret or "")
478 return d 482 return d
479 483
480 def _sendItems(self, service, nodeIdentifier, items, extra_ser=None, 484 def _sendItems(self, service, nodeIdentifier, items, extra_ser=None,
481 profile_key=C.PROF_KEY_NONE): 485 profile_key=C.PROF_KEY_NONE):
485 items = [xml_tools.parse(item) for item in items] 489 items = [xml_tools.parse(item) for item in items]
486 except Exception as e: 490 except Exception as e:
487 raise exceptions.DataError(_("Can't parse items: {msg}").format( 491 raise exceptions.DataError(_("Can't parse items: {msg}").format(
488 msg=e)) 492 msg=e))
489 extra = data_format.deserialise(extra_ser) 493 extra = data_format.deserialise(extra_ser)
490 d = self.sendItems( 494 return defer.ensureDeferred(self.sendItems(
491 client, service, nodeIdentifier, items, extra 495 client, service, nodeIdentifier, items, extra
492 ) 496 ))
493 return d 497
494 498 async def sendItem(
495 def _getPublishedItemId(self, published_ids, original_id): 499 self, client, service, nodeIdentifier, payload, item_id=None, extra=None
496 """Return item of published id if found in answer 500 ):
497
498 if not found original_id is returned, which may be None
499 """
500 try:
501 return published_ids[0]
502 except IndexError:
503 return original_id
504
505 def sendItem(self, client, service, nodeIdentifier, payload, item_id=None,
506 extra=None):
507 """High level method to send one item 501 """High level method to send one item
508 502
509 @param service(jid.JID, None): service to send the item to 503 @param service(jid.JID, None): service to send the item to
510 None to use PEP 504 None to use PEP
511 @param NodeIdentifier(unicode): PubSub node to use 505 @param NodeIdentifier(unicode): PubSub node to use
517 assert isinstance(payload, domish.Element) 511 assert isinstance(payload, domish.Element)
518 item_elt = domish.Element((pubsub.NS_PUBSUB, 'item')) 512 item_elt = domish.Element((pubsub.NS_PUBSUB, 'item'))
519 if item_id is not None: 513 if item_id is not None:
520 item_elt['id'] = item_id 514 item_elt['id'] = item_id
521 item_elt.addChild(payload) 515 item_elt.addChild(payload)
522 d = defer.ensureDeferred(self.sendItems( 516 published_ids = await self.sendItems(
523 client, 517 client,
524 service, 518 service,
525 nodeIdentifier, 519 nodeIdentifier,
526 [item_elt], 520 [item_elt],
527 extra 521 extra
528 )) 522 )
529 d.addCallback(self._getPublishedItemId, item_id) 523 try:
530 return d 524 return published_ids[0]
525 except IndexError:
526 return item_id
531 527
532 async def sendItems(self, client, service, nodeIdentifier, items, extra=None): 528 async def sendItems(self, client, service, nodeIdentifier, items, extra=None):
533 """High level method to send several items at once 529 """High level method to send several items at once
534 530
535 @param service(jid.JID, None): service to send the item to 531 @param service(jid.JID, None): service to send the item to
591 return [item['id'] 587 return [item['id']
592 for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, 'item')] 588 for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, 'item')]
593 except AttributeError: 589 except AttributeError:
594 return [] 590 return []
595 591
596 def publish(self, client, service, nodeIdentifier, items=None, options=None): 592 async def publish(
597 return client.pubsub_client.publish( 593 self,
594 client: SatXMPPEntity,
595 service: jid.JID,
596 nodeIdentifier: str,
597 items: Optional[List[domish.Element]] = None,
598 options: Optional[dict] = None
599 ) -> List[str]:
600 published_ids = await client.pubsub_client.publish(
598 service, nodeIdentifier, items, client.pubsub_client.parent.jid, 601 service, nodeIdentifier, items, client.pubsub_client.parent.jid,
599 options=options 602 options=options
600 ) 603 )
604
605 await self.host.trigger.asyncPoint(
606 "XEP-0060_publish", client, service, nodeIdentifier, items, options,
607 published_ids
608 )
609 return published_ids
601 610
602 def _unwrapMAMMessage(self, message_elt): 611 def _unwrapMAMMessage(self, message_elt):
603 try: 612 try:
604 item_elt = reduce( 613 item_elt = reduce(
605 lambda elt, ns_name: next(elt.elements(*ns_name)), 614 lambda elt, ns_name: next(elt.elements(*ns_name)),
619 items, metadata = items_data 628 items, metadata = items_data
620 metadata['items'] = items 629 metadata['items'] = items
621 return data_format.serialise(metadata) 630 return data_format.serialise(metadata)
622 631
623 def _getItems(self, service="", node="", max_items=10, item_ids=None, sub_id=None, 632 def _getItems(self, service="", node="", max_items=10, item_ids=None, sub_id=None,
624 extra_dict=None, profile_key=C.PROF_KEY_NONE): 633 extra="", profile_key=C.PROF_KEY_NONE):
625 """Get items from pubsub node 634 """Get items from pubsub node
626 635
627 @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit 636 @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit
628 """ 637 """
629 client = self.host.getClient(profile_key) 638 client = self.host.getClient(profile_key)
630 service = jid.JID(service) if service else None 639 service = jid.JID(service) if service else None
631 max_items = None if max_items == C.NO_LIMIT else max_items 640 max_items = None if max_items == C.NO_LIMIT else max_items
632 extra = self.parseExtra(extra_dict) 641 extra = self.parseExtra(data_format.deserialise(extra))
633 d = self.getItems( 642 d = defer.ensureDeferred(self.getItems(
634 client, 643 client,
635 service, 644 service,
636 node or None, 645 node or None,
637 max_items or None, 646 max_items,
638 item_ids, 647 item_ids,
639 sub_id or None, 648 sub_id or None,
640 extra.rsm_request, 649 extra.rsm_request,
641 extra.extra, 650 extra.extra,
642 ) 651 ))
643 d.addCallback(self.transItemsData) 652 d.addCallback(self.transItemsData)
644 d.addCallback(self.serialiseItems) 653 d.addCallback(self.serialiseItems)
645 return d 654 return d
646 655
647 def getItems(self, client, service, node, max_items=None, item_ids=None, sub_id=None, 656 async def getItems(
648 rsm_request=None, extra=None): 657 self,
658 client: SatXMPPEntity,
659 service: Optional[jid.JID],
660 node: str,
661 max_items: Optional[int] = None,
662 item_ids: Optional[List[str]] = None,
663 sub_id: Optional[str] = None,
664 rsm_request: Optional[rsm.RSMRequest] = None,
665 extra: Optional[dict] = None
666 ) -> Tuple[List[dict], dict]:
649 """Retrieve pubsub items from a node. 667 """Retrieve pubsub items from a node.
650 668
651 @param service (JID, None): pubsub service. 669 @param service (JID, None): pubsub service.
652 @param node (str): node id. 670 @param node (str): node id.
653 @param max_items (int): optional limit on the number of retrieved items. 671 @param max_items (int): optional limit on the number of retrieved items.
666 max_items = None 684 max_items = None
667 if rsm_request and item_ids: 685 if rsm_request and item_ids:
668 raise ValueError("items_id can't be used with rsm") 686 raise ValueError("items_id can't be used with rsm")
669 if extra is None: 687 if extra is None:
670 extra = {} 688 extra = {}
689 cont, ret = await self.host.trigger.asyncReturnPoint(
690 "XEP-0060_getItems", client, service, node, max_items, item_ids, sub_id,
691 rsm_request, extra
692 )
693 if not cont:
694 return ret
671 try: 695 try:
672 mam_query = extra["mam"] 696 mam_query = extra["mam"]
673 except KeyError: 697 except KeyError:
674 d = client.pubsub_client.items( 698 d = client.pubsub_client.items(
675 service = service, 699 service = service,
680 itemIdentifiers = item_ids, 704 itemIdentifiers = item_ids,
681 orderBy = extra.get(C.KEY_ORDER_BY), 705 orderBy = extra.get(C.KEY_ORDER_BY),
682 rsm_request = rsm_request 706 rsm_request = rsm_request
683 ) 707 )
684 # we have no MAM data here, so we add None 708 # we have no MAM data here, so we add None
685 d.addCallback(lambda data: data + (None,))
686 d.addErrback(sat_defer.stanza2NotFound) 709 d.addErrback(sat_defer.stanza2NotFound)
687 d.addTimeout(TIMEOUT, reactor) 710 d.addTimeout(TIMEOUT, reactor)
711 items, rsm_response = await d
712 mam_response = None
688 else: 713 else:
689 # if mam is requested, we have to do a totally different query 714 # if mam is requested, we have to do a totally different query
690 if self._mam is None: 715 if self._mam is None:
691 raise exceptions.NotFound("MAM (XEP-0313) plugin is not available") 716 raise exceptions.NotFound("MAM (XEP-0313) plugin is not available")
692 if max_items is not None: 717 if max_items is not None:
704 else: 729 else:
705 if mam_query.rsm != rsm_request: 730 if mam_query.rsm != rsm_request:
706 raise exceptions.DataError( 731 raise exceptions.DataError(
707 "Conflict between RSM request and MAM's RSM request" 732 "Conflict between RSM request and MAM's RSM request"
708 ) 733 )
709 d = self._mam.getArchives(client, mam_query, service, self._unwrapMAMMessage) 734 items, rsm_response, mam_response = await self._mam.getArchives(
735 client, mam_query, service, self._unwrapMAMMessage
736 )
710 737
711 try: 738 try:
712 subscribe = C.bool(extra["subscribe"]) 739 subscribe = C.bool(extra["subscribe"])
713 except KeyError: 740 except KeyError:
714 subscribe = False 741 subscribe = False
715 742
716 def subscribeEb(failure, service, node): 743 if subscribe:
717 failure.trap(error.StanzaError) 744 try:
718 log.warning( 745 await self.subscribe(client, service, node)
719 "Could not subscribe to node {} on service {}: {}".format( 746 except error.StanzaError as e:
720 node, str(service), str(failure.value) 747 log.warning(
748 f"Could not subscribe to node {node} on service {service}: {e}"
721 ) 749 )
722 ) 750
723 751 # TODO: handle mam_response
724 def doSubscribe(data): 752 service_jid = service if service else client.jid.userhostJID()
725 self.subscribe(client, service, node).addErrback( 753 metadata = {
726 subscribeEb, service, node 754 "service": service_jid,
727 ) 755 "node": node,
728 return data 756 "uri": self.getNodeURI(service_jid, node),
729 757 }
730 if subscribe: 758 if mam_response is not None:
731 d.addCallback(doSubscribe) 759 # mam_response is a dict with "complete" and "stable" keys
732 760 # we can put them directly in metadata
733 def addMetadata(result): 761 metadata.update(mam_response)
734 # TODO: handle the third argument (mam_response) 762 if rsm_request is not None and rsm_response is not None:
735 items, rsm_response, mam_response = result 763 metadata['rsm'] = rsm_response.toDict()
736 service_jid = service if service else client.jid.userhostJID() 764 if mam_response is None:
737 metadata = { 765 index = rsm_response.index
738 "service": service_jid, 766 count = rsm_response.count
739 "node": node, 767 if index is None or count is None:
740 "uri": self.getNodeURI(service_jid, node), 768 # we don't have enough information to know if the data is complete
741 } 769 # or not
742 if mam_response is not None: 770 metadata["complete"] = None
743 # mam_response is a dict with "complete" and "stable" keys 771 else:
744 # we can put them directly in metadata 772 # normally we have a strict equality here but XEP-0059 states
745 metadata.update(mam_response) 773 # that index MAY be approximative, so just in case…
746 if rsm_request is not None and rsm_response is not None: 774 metadata["complete"] = index + len(items) >= count
747 metadata['rsm'] = rsm_response.toDict() 775
748 if mam_response is None: 776 return (items, metadata)
749 index = rsm_response.index
750 count = rsm_response.count
751 if index is None or count is None:
752 # we don't have enough information to know if the data is complete
753 # or not
754 metadata["complete"] = None
755 else:
756 # normally we have a strict equality here but XEP-0059 states
757 # that index MAY be approximative, so just in case…
758 metadata["complete"] = index + len(items) >= count
759
760 return (items, metadata)
761
762 d.addCallback(addMetadata)
763 return d
764 777
765 # @defer.inlineCallbacks 778 # @defer.inlineCallbacks
766 # def getItemsFromMany(self, service, data, max_items=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE): 779 # def getItemsFromMany(self, service, data, max_items=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE):
767 # """Massively retrieve pubsub items from many nodes. 780 # """Massively retrieve pubsub items from many nodes.
768 # @param service (JID): target service. 781 # @param service (JID): target service.
1057 nodeIdentifier, 1070 nodeIdentifier,
1058 itemIdentifiers, 1071 itemIdentifiers,
1059 notify=True, 1072 notify=True,
1060 ): 1073 ):
1061 return client.pubsub_client.retractItems( 1074 return client.pubsub_client.retractItems(
1062 service, nodeIdentifier, itemIdentifiers, notify=True 1075 service, nodeIdentifier, itemIdentifiers, notify=notify
1063 ) 1076 )
1064 1077
1065 def _renameItem( 1078 def _renameItem(
1066 self, 1079 self,
1067 service, 1080 service,
1098 await self.retractItems(client, service, node, [item_id]) 1111 await self.retractItems(client, service, node, [item_id])
1099 1112
1100 def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE): 1113 def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE):
1101 client = self.host.getClient(profile_key) 1114 client = self.host.getClient(profile_key)
1102 service = None if not service else jid.JID(service) 1115 service = None if not service else jid.JID(service)
1103 d = self.subscribe(client, service, nodeIdentifier, options=options or None) 1116 d = defer.ensureDeferred(
1117 self.subscribe(client, service, nodeIdentifier, options=options or None)
1118 )
1104 d.addCallback(lambda subscription: subscription.subscriptionIdentifier or "") 1119 d.addCallback(lambda subscription: subscription.subscriptionIdentifier or "")
1105 return d 1120 return d
1106 1121
1107 def subscribe(self, client, service, nodeIdentifier, sub_jid=None, options=None): 1122 async def subscribe(
1123 self,
1124 client: SatXMPPEntity,
1125 service: jid.JID,
1126 nodeIdentifier: str,
1127 sub_jid: Optional[jid.JID] = None,
1128 options: Optional[dict] = None
1129 ) -> pubsub.Subscription:
1108 # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe 1130 # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe
1109 return client.pubsub_client.subscribe( 1131 subscription = await client.pubsub_client.subscribe(
1110 service, nodeIdentifier, sub_jid or client.jid.userhostJID(), options=options 1132 service, nodeIdentifier, sub_jid or client.jid.userhostJID(), options=options
1111 ) 1133 )
1134 await self.host.trigger.asyncPoint(
1135 "XEP-0060_subscribe", client, service, nodeIdentifier, sub_jid, options,
1136 subscription
1137 )
1138 return subscription
1112 1139
1113 def _unsubscribe(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE): 1140 def _unsubscribe(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE):
1114 client = self.host.getClient(profile_key) 1141 client = self.host.getClient(profile_key)
1115 service = None if not service else jid.JID(service) 1142 service = None if not service else jid.JID(service)
1116 return self.unsubscribe(client, service, nodeIdentifier) 1143 return defer.ensureDeferred(self.unsubscribe(client, service, nodeIdentifier))
1117 1144
1118 def unsubscribe( 1145 async def unsubscribe(
1119 self, 1146 self,
1120 client, 1147 client: SatXMPPEntity,
1121 service, 1148 service: jid.JID,
1122 nodeIdentifier, 1149 nodeIdentifier: str,
1123 sub_jid=None, 1150 sub_jid=None,
1124 subscriptionIdentifier=None, 1151 subscriptionIdentifier=None,
1125 sender=None, 1152 sender=None,
1126 ): 1153 ):
1127 return client.pubsub_client.unsubscribe( 1154 await client.pubsub_client.unsubscribe(
1128 service, 1155 service,
1129 nodeIdentifier, 1156 nodeIdentifier,
1130 sub_jid or client.jid.userhostJID(), 1157 sub_jid or client.jid.userhostJID(),
1131 subscriptionIdentifier, 1158 subscriptionIdentifier,
1132 sender, 1159 sender,
1160 )
1161 await self.host.trigger.asyncPoint(
1162 "XEP-0060_unsubscribe", client, service, nodeIdentifier, sub_jid,
1163 subscriptionIdentifier, sender
1133 ) 1164 )
1134 1165
1135 def _subscriptions(self, service, nodeIdentifier="", profile_key=C.PROF_KEY_NONE): 1166 def _subscriptions(self, service, nodeIdentifier="", profile_key=C.PROF_KEY_NONE):
1136 client = self.host.getClient(profile_key) 1167 client = self.host.getClient(profile_key)
1137 service = None if not service else jid.JID(service) 1168 service = None if not service else jid.JID(service)
1392 @return (str): RT Deferred session id 1423 @return (str): RT Deferred session id
1393 """ 1424 """
1394 client = self.host.getClient(profile_key) 1425 client = self.host.getClient(profile_key)
1395 deferreds = {} 1426 deferreds = {}
1396 for service, node in node_data: 1427 for service, node in node_data:
1397 deferreds[(service, node)] = client.pubsub_client.subscribe( 1428 deferreds[(service, node)] = defer.ensureDeferred(
1398 service, node, subscriber, options=options 1429 client.pubsub_client.subscribe(
1430 service, node, subscriber, options=options
1431 )
1399 ) 1432 )
1400 return self.rt_sessions.newSession(deferreds, client.profile) 1433 return self.rt_sessions.newSession(deferreds, client.profile)
1401 # found_nodes = yield self.listNodes(service, profile=client.profile) 1434 # found_nodes = yield self.listNodes(service, profile=client.profile)
1402 # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile) 1435 # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile)
1403 # d_list = [] 1436 # d_list = []
1443 ) 1476 )
1444 ) 1477 )
1445 return d 1478 return d
1446 1479
1447 def _getFromMany( 1480 def _getFromMany(
1448 self, node_data, max_item=10, extra_dict=None, profile_key=C.PROF_KEY_NONE 1481 self, node_data, max_item=10, extra="", profile_key=C.PROF_KEY_NONE
1449 ): 1482 ):
1450 """ 1483 """
1451 @param max_item(int): maximum number of item to get, C.NO_LIMIT for no limit 1484 @param max_item(int): maximum number of item to get, C.NO_LIMIT for no limit
1452 """ 1485 """
1453 max_item = None if max_item == C.NO_LIMIT else max_item 1486 max_item = None if max_item == C.NO_LIMIT else max_item
1454 extra = self.parseExtra(extra_dict) 1487 extra = self.parseExtra(data_format.deserialise(extra))
1455 return self.getFromMany( 1488 return self.getFromMany(
1456 [(jid.JID(service), str(node)) for service, node in node_data], 1489 [(jid.JID(service), str(node)) for service, node in node_data],
1457 max_item, 1490 max_item,
1458 extra.rsm_request, 1491 extra.rsm_request,
1459 extra.extra, 1492 extra.extra,
1473 @return (str): RT Deferred session id 1506 @return (str): RT Deferred session id
1474 """ 1507 """
1475 client = self.host.getClient(profile_key) 1508 client = self.host.getClient(profile_key)
1476 deferreds = {} 1509 deferreds = {}
1477 for service, node in node_data: 1510 for service, node in node_data:
1478 deferreds[(service, node)] = self.getItems( 1511 deferreds[(service, node)] = defer.ensureDeferred(self.getItems(
1479 client, service, node, max_item, rsm_request=rsm_request, extra=extra 1512 client, service, node, max_item, rsm_request=rsm_request, extra=extra
1480 ) 1513 ))
1481 return self.rt_sessions.newSession(deferreds, client.profile) 1514 return self.rt_sessions.newSession(deferreds, client.profile)
1482 1515
1483 1516
1484 @implementer(disco.IDisco) 1517 @implementer(disco.IDisco)
1485 class SatPubSubClient(rsm.PubSubClient): 1518 class SatPubSubClient(rsm.PubSubClient):
1511 1544
1512 1545
1513 def itemsReceived(self, event): 1546 def itemsReceived(self, event):
1514 log.debug("Pubsub items received") 1547 log.debug("Pubsub items received")
1515 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_ITEMS): 1548 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_ITEMS):
1516 callback(self.parent, event) 1549 d = utils.asDeferred(callback, self.parent, event)
1550 d.addErrback(lambda f: log.error(
1551 f"Error while running items event callback {callback}: {f}"
1552 ))
1517 client = self.parent 1553 client = self.parent
1518 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: 1554 if (event.sender, event.nodeIdentifier) in client.pubsub_watching:
1519 raw_items = [i.toXml() for i in event.items] 1555 raw_items = [i.toXml() for i in event.items]
1520 self.host.bridge.psEventRaw( 1556 self.host.bridge.psEventRaw(
1521 event.sender.full(), 1557 event.sender.full(),
1526 ) 1562 )
1527 1563
1528 def deleteReceived(self, event): 1564 def deleteReceived(self, event):
1529 log.debug(("Publish node deleted")) 1565 log.debug(("Publish node deleted"))
1530 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_DELETE): 1566 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_DELETE):
1531 callback(self.parent, event) 1567 d = utils.asDeferred(callback, self.parent, event)
1568 d.addErrback(lambda f: log.error(
1569 f"Error while running delete event callback {callback}: {f}"
1570 ))
1532 client = self.parent 1571 client = self.parent
1533 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: 1572 if (event.sender, event.nodeIdentifier) in client.pubsub_watching:
1534 self.host.bridge.psEventRaw( 1573 self.host.bridge.psEventRaw(
1535 event.sender.full(), event.nodeIdentifier, C.PS_DELETE, [], client.profile 1574 event.sender.full(), event.nodeIdentifier, C.PS_DELETE, [], client.profile
1575 )
1576
1577 def purgeReceived(self, event):
1578 log.debug(("Publish node purged"))
1579 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_PURGE):
1580 d = utils.asDeferred(callback, self.parent, event)
1581 d.addErrback(lambda f: log.error(
1582 f"Error while running purge event callback {callback}: {f}"
1583 ))
1584 client = self.parent
1585 if (event.sender, event.nodeIdentifier) in client.pubsub_watching:
1586 self.host.bridge.psEventRaw(
1587 event.sender.full(), event.nodeIdentifier, C.PS_PURGE, [], client.profile
1536 ) 1588 )
1537 1589
1538 def subscriptions(self, service, nodeIdentifier, sender=None): 1590 def subscriptions(self, service, nodeIdentifier, sender=None):
1539 """Return the list of subscriptions to the given service and node. 1591 """Return the list of subscriptions to the given service and node.
1540 1592