comparison sat_pubsub/backend.py @ 478:b544109ab4c4

Privileged Entity update + Pubsub Account Management partial implementation + Public Pubsub Subscription /!\ pgsql schema needs to be updated /!\ /!\ server conf needs to be updated for privileged entity: only the new `urn:xmpp:privilege:2` namespace is handled now /!\ Privileged entity has been updated to hanlde the new namespace and IQ permission. Roster pushes are not managed yet. XEP-0376 (Pubsub Account Management) is partially implemented. The XEP is not fully specified at the moment, and my messages on standard@ haven't seen any reply. Thus for now only "Subscribing", "Unsubscribing" and "Listing Subscriptions" is implemented, "Auto Subscriptions" and "Filtering" is not. Public Pubsub Subscription (https://xmpp.org/extensions/inbox/pubsub-public-subscriptions.html) is implemented; the XEP has been accepted by council but is not yet published. It will be updated to use subscription options instead of the <public> element actually specified, I'm waiting for publication to update the XEP. unsubscribe has been updated to return the `<subscription>` element as expected by XEP-0060 (sat_tmp needs to be updated). database schema has been updated to add columns necessary to keep track of subscriptions to external nodes and to mark subscriptions as public.
author Goffi <goffi@goffi.org>
date Wed, 11 May 2022 13:39:08 +0200
parents ed9e12701e0f
children 0e801ae1869f
comparison
equal deleted inserted replaced
477:9125a6e440c0 478:b544109ab4c4
60 publish-subscribe protocol. 60 publish-subscribe protocol.
61 """ 61 """
62 62
63 import copy 63 import copy
64 import uuid 64 import uuid
65 import hashlib
65 from typing import Optional, List, Tuple 66 from typing import Optional, List, Tuple
66 67
67 from zope.interface import implementer 68 from zope.interface import implementer
68 69
69 from twisted.application import service 70 from twisted.application import service
238 self.nodeOptions[const.OPT_FTS_LANGUAGE]["options"][l] = l.title() 239 self.nodeOptions[const.OPT_FTS_LANGUAGE]["options"][l] = l.title()
239 240
240 def _getFTSLanguagesEb(self, failure_): 241 def _getFTSLanguagesEb(self, failure_):
241 log.msg(f"WARNING: can get FTS languages: {failure_}") 242 log.msg(f"WARNING: can get FTS languages: {failure_}")
242 243
243 def isAdmin(self, entity_jid): 244 def isAdmin(self, entity_jid: jid.JID) -> bool:
244 """Return True if an entity is an administrator""" 245 """Return True if an entity is an administrator"""
245 return entity_jid.userhostJID() in self.admins 246 return entity_jid.userhostJID() in self.admins
247
248 def isFromServer(self, entity_jid: jid.JID) -> bool:
249 """Return True if an entity come from our server"""
250 return entity_jid.host == self.server_jid.host
246 251
247 def supportsPublishOptions(self): 252 def supportsPublishOptions(self):
248 return True 253 return True
249 254
250 def supportsPublisherAffiliation(self): 255 def supportsPublisherAffiliation(self):
593 self.addObserver('//event/pubsub/delete', observerfn, *args, **kwargs) 598 self.addObserver('//event/pubsub/delete', observerfn, *args, **kwargs)
594 599
595 def registerPurgeNotifier(self, observerfn, *args, **kwargs): 600 def registerPurgeNotifier(self, observerfn, *args, **kwargs):
596 self.addObserver('//event/pubsub/purge', observerfn, *args, **kwargs) 601 self.addObserver('//event/pubsub/purge', observerfn, *args, **kwargs)
597 602
598 def subscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient): 603 async def subscribe(
604 self,
605 nodeIdentifier: str,
606 subscriber: jid.JID,
607 requestor: jid.JID,
608 options: Optional[dict],
609 pep: bool,
610 recipient: jid.JID
611 ) -> pubsub.Subscription:
599 subscriberEntity = subscriber.userhostJID() 612 subscriberEntity = subscriber.userhostJID()
600 if subscriberEntity != requestor.userhostJID(): 613 if subscriberEntity != requestor.userhostJID():
601 return defer.fail(error.Forbidden()) 614 raise error.Forbidden()
602 615
603 d = self.storage.getNode(nodeIdentifier, pep, recipient) 616 node = await self.storage.getNode(nodeIdentifier, pep, recipient)
604 d.addCallback(_getAffiliation, subscriberEntity) 617 __, affiliation = await _getAffiliation(node, subscriberEntity)
605 d.addCallback(self._doSubscribe, subscriber, pep, recipient)
606 return d
607
608 def _doSubscribe(self, result, subscriber, pep, recipient):
609 node, affiliation = result
610 618
611 if affiliation == 'outcast': 619 if affiliation == 'outcast':
612 raise error.Forbidden() 620 raise error.Forbidden()
613 621
614 access_model = node.getAccessModel() 622 access_model = node.getAccessModel()
615 623
616 if access_model == const.VAL_AMODEL_OPEN: 624 if access_model == const.VAL_AMODEL_OPEN:
617 d = defer.succeed(None) 625 pass
618 elif access_model == const.VAL_AMODEL_PRESENCE: 626 elif access_model == const.VAL_AMODEL_PRESENCE:
619 d = self.checkPresenceSubscription(node, subscriber) 627 await self.checkPresenceSubscription(node, subscriber)
620 elif access_model == const.VAL_AMODEL_PUBLISHER_ROSTER: 628 elif access_model == const.VAL_AMODEL_PUBLISHER_ROSTER:
621 d = self.checkRosterGroups(node, subscriber) 629 await self.checkRosterGroups(node, subscriber)
622 elif access_model == const.VAL_AMODEL_WHITELIST: 630 elif access_model == const.VAL_AMODEL_WHITELIST:
623 d = self.checkNodeAffiliations(node, subscriber) 631 await self.checkNodeAffiliations(node, subscriber)
624 else: 632 else:
625 raise NotImplementedError 633 raise NotImplementedError
626 634
627 def trapExists(failure): 635 config = {}
628 failure.trap(error.SubscriptionExists) 636 if options and options.get(f"{{{const.NS_PPS}}}public"):
629 return False 637 config["public"] = True
630 638 try:
631 def cb(sendLast): 639 await node.addSubscription(subscriber, 'subscribed', config)
632 d = node.getSubscription(subscriber) 640 except error.SubscriptionExists:
633 if sendLast: 641 send_last = False
634 d.addCallback(self._sendLastPublished, node, pep, recipient) 642 else:
635 return d 643 send_last = True
636 644
637 d.addCallback(lambda _: node.addSubscription(subscriber, 'subscribed', {})) 645 subscription = await node.getSubscription(subscriber)
638 d.addCallbacks(lambda _: True, trapExists) 646
639 d.addCallback(cb) 647 if send_last:
640 648 config = node.getConfiguration()
641 return d 649 sendLastPublished = config.get(
642 650 'pubsub#send_last_published_item', 'never'
643 def _sendLastPublished(self, subscription, node, pep, recipient): 651 )
644 652 if sendLastPublished == 'on_sub' and node.nodeType == 'leaf':
645 def notifyItem(items_data): 653 entity = subscription.subscriber.userhostJID()
646 if items_data: 654 items_data, __ = await self.getItemsData(
647 reactor.callLater(0, self.dispatch, 655 node.nodeIdentifier, entity, recipient, maxItems=1,
648 {'items_data': items_data, 656 ext_data={'pep': pep}
649 'node': node,
650 'pep': pep,
651 'recipient': recipient,
652 'subscription': subscription,
653 },
654 '//event/pubsub/notify')
655
656 config = node.getConfiguration()
657 sendLastPublished = config.get('pubsub#send_last_published_item',
658 'never')
659 if sendLastPublished == 'on_sub' and node.nodeType == 'leaf':
660 entity = subscription.subscriber.userhostJID()
661 d = defer.ensureDeferred(
662 self.getItemsData(
663 node.nodeIdentifier, entity, recipient, maxItems=1, ext_data={'pep': pep}
664 ) 657 )
665 ) 658 if items_data:
666 d.addCallback(notifyItem) 659 reactor.callLater(
667 d.addErrback(log.err) 660 0,
661 self.dispatch,
662 {'items_data': items_data,
663 'node': node,
664 'pep': pep,
665 'recipient': recipient,
666 'subscription': subscription,
667 },
668 '//event/pubsub/notify'
669 )
668 670
669 return subscription 671 return subscription
670 672
671 def unsubscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient): 673 async def unsubscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient):
672 if subscriber.userhostJID() != requestor.userhostJID(): 674 if subscriber.userhostJID() != requestor.userhostJID():
673 return defer.fail(error.Forbidden()) 675 raise error.Forbidden()
674 676
675 d = self.storage.getNode(nodeIdentifier, pep, recipient) 677 node = await self.storage.getNode(nodeIdentifier, pep, recipient)
676 d.addCallback(lambda node: node.removeSubscription(subscriber)) 678 await node.removeSubscription(subscriber)
677 return d 679 return pubsub.Subscription(nodeIdentifier, subscriber, "none")
678 680
679 def getSubscriptions(self, requestor, nodeIdentifier, pep, recipient): 681 def getSubscriptions(self, requestor, nodeIdentifier, pep, recipient):
680 """retrieve subscriptions of an entity 682 """retrieve subscriptions of an entity
681 683
682 @param requestor(jid.JID): entity who want to check subscriptions 684 @param requestor(jid.JID): entity who want to check subscriptions
683 @param nodeIdentifier(unicode, None): identifier of the node 685 @param nodeIdentifier(unicode, None): identifier of the node
684 node to get all subscriptions of a service 686 node to get all subscriptions of a service
685 @param pep(bool): True if it's a PEP request 687 @param pep(bool): True if it's a PEP request
686 @param recipient(jid.JID, None): recipient of the PEP request 688 @param recipient(jid.JID, None): recipient of the PEP request
687 """ 689 """
688 return self.storage.getSubscriptions(requestor, nodeIdentifier, pep, recipient) 690 return self.storage.getSubscriptions(
691 requestor, nodeIdentifier, None, pep, recipient
692 )
689 693
690 def supportsAutoCreate(self): 694 def supportsAutoCreate(self):
691 return True 695 return True
692 696
693 def supportsInstantNodes(self): 697 def supportsInstantNodes(self):
746 return d 750 return d
747 751
748 def getNodeConfiguration(self, nodeIdentifier, pep, recipient): 752 def getNodeConfiguration(self, nodeIdentifier, pep, recipient):
749 if not nodeIdentifier: 753 if not nodeIdentifier:
750 return defer.fail(error.NoRootNode()) 754 return defer.fail(error.NoRootNode())
755
756 if ((nodeIdentifier == const.NS_PPS_SUBSCRIPTIONS
757 or nodeIdentifier.startswith(const.PPS_SUBSCRIBERS_PREFIX))):
758 return defer.succeed({const.OPT_ACCESS_MODEL: const.VAL_AMODEL_OPEN})
751 759
752 d = self.storage.getNode(nodeIdentifier, pep, recipient) 760 d = self.storage.getNode(nodeIdentifier, pep, recipient)
753 d.addCallback(lambda node: node.getConfiguration()) 761 d.addCallback(lambda node: node.getConfiguration())
754 762
755 return d 763 return d
1065 maxItems, 1073 maxItems,
1066 ext_data 1074 ext_data
1067 ) 1075 )
1068 return ids 1076 return ids
1069 1077
1070 def getItems(self, nodeIdentifier, requestor, recipient, maxItems=None, 1078 async def getItems(
1071 itemIdentifiers=None, ext_data=None): 1079 self,
1072 d = defer.ensureDeferred( 1080 nodeIdentifier: str,
1073 self.getItemsData( 1081 requestor: jid.JID,
1074 nodeIdentifier, requestor, recipient, maxItems, itemIdentifiers, ext_data 1082 recipient: jid.JID,
1075 ) 1083 maxItems: Optional[int] = None,
1084 itemIdentifiers: Optional[List[str]] = None,
1085 ext_data: Optional[dict] = None
1086 ) -> Tuple[List[domish.Element], Optional[rsm.RSMResponse]]:
1087 items_data, rsm_response = await self.getItemsData(
1088 nodeIdentifier, requestor, recipient, maxItems, itemIdentifiers, ext_data
1076 ) 1089 )
1077 d.addCallback(lambda items_data: [item_data.item for item_data in items_data]) 1090 return [item_data.item for item_data in items_data], rsm_response
1078 return d
1079 1091
1080 async def getOwnerRoster(self, node, owners=None): 1092 async def getOwnerRoster(self, node, owners=None):
1081 # FIXME: roster of publisher, not owner, must be used 1093 # FIXME: roster of publisher, not owner, must be used
1082 if owners is None: 1094 if owners is None:
1083 owners = await node.getOwners() 1095 owners = await node.getOwners()
1097 owner_jid = owner_jid.full(), 1109 owner_jid = owner_jid.full(),
1098 msg = e)) 1110 msg = e))
1099 return 1111 return
1100 return roster 1112 return roster
1101 1113
1102 async def getItemsData(self, nodeIdentifier, requestor, recipient, maxItems=None, 1114 async def getItemsData(
1103 itemIdentifiers=None, ext_data=None): 1115 self,
1116 nodeIdentifier: str,
1117 requestor: jid.JID,
1118 recipient: jid.JID,
1119 maxItems: Optional[int] = None,
1120 itemIdentifiers: Optional[List[str]] = None,
1121 ext_data: Optional[dict] = None
1122 ) -> Tuple[List[container.ItemData], Optional[rsm.RSMResponse]]:
1104 """like getItems but return the whole ItemData""" 1123 """like getItems but return the whole ItemData"""
1105 if maxItems == 0: 1124 if maxItems == 0:
1106 log.msg("WARNING: maxItems=0 on items retrieval") 1125 log.msg("WARNING: maxItems=0 on items retrieval")
1107 return [] 1126 return [], None
1108 1127
1109 if ext_data is None: 1128 if ext_data is None:
1110 ext_data = {} 1129 ext_data = {}
1130
1131 if nodeIdentifier == const.NS_PPS_SUBSCRIPTIONS:
1132 return await self.getPublicSubscriptions(
1133 requestor, maxItems, itemIdentifiers, ext_data,
1134 ext_data.pop("pep", False), recipient
1135 )
1136 elif nodeIdentifier.startswith(f"{const.NS_PPS_SUBSCRIBERS}/"):
1137 target_node = nodeIdentifier[len(const.NS_PPS_SUBSCRIBERS)+1:]
1138 return await self.getPublicNodeSubscriptions(
1139 target_node, requestor, maxItems, itemIdentifiers, ext_data,
1140 ext_data.pop("pep", False), recipient
1141 )
1142
1111 node = await self.storage.getNode(nodeIdentifier, ext_data.get('pep', False), recipient) 1143 node = await self.storage.getNode(nodeIdentifier, ext_data.get('pep', False), recipient)
1112 try: 1144 try:
1113 affiliation, owner, roster, access_model = await self.checkNodeAccess(node, requestor) 1145 affiliation, owner, roster, access_model = await self.checkNodeAccess(node, requestor)
1114 except error.NotLeafNodeError: 1146 except error.NotLeafNodeError:
1115 return [] 1147 return [], None
1116 1148
1117 # at this point node access is checked 1149 # at this point node access is checked
1118 1150
1119 if owner: 1151 if owner:
1120 # requestor_groups is only used in restricted access 1152 # requestor_groups is only used in restricted access
1153 1185
1154 schema = node.getSchema() 1186 schema = node.getSchema()
1155 if schema is not None: 1187 if schema is not None:
1156 self.filterItemsWithSchema(items_data, schema, owner) 1188 self.filterItemsWithSchema(items_data, schema, owner)
1157 1189
1158 await self._items_rsm( 1190 return await self._items_rsm(
1159 items_data, node, requestor_groups, owner, itemIdentifiers, ext_data) 1191 items_data, node, requestor_groups, owner, itemIdentifiers, ext_data
1160 return items_data 1192 )
1161 1193
1162 def _setCount(self, value, response): 1194 def _setCount(self, value, response):
1163 response.count = value 1195 response.count = value
1164 1196
1165 def _setIndex(self, value, response, adjust): 1197 def _setIndex(self, value, response, adjust):
1178 # An other optimisation would be to look for index first and use it as offset 1210 # An other optimisation would be to look for index first and use it as offset
1179 try: 1211 try:
1180 rsm_request = ext_data['rsm'] 1212 rsm_request = ext_data['rsm']
1181 except KeyError: 1213 except KeyError:
1182 # No RSM in this request, nothing to do 1214 # No RSM in this request, nothing to do
1183 return items_data 1215 return items_data, None
1184 1216
1185 if itemIdentifiers: 1217 if itemIdentifiers:
1186 log.msg("WARNING, itemIdentifiers used with RSM, ignoring the RSM part") 1218 log.msg("WARNING, itemIdentifiers used with RSM, ignoring the RSM part")
1187 return items_data 1219 return items_data, None
1188 1220
1189 response = rsm.RSMResponse() 1221 rsm_response = rsm.RSMResponse()
1190 1222
1191 d_count = node.getItemsCount(authorized_groups, owner, ext_data) 1223 d_count = node.getItemsCount(authorized_groups, owner, ext_data)
1192 d_count.addCallback(self._setCount, response) 1224 d_count.addCallback(self._setCount, rsm_response)
1193 d_list = [d_count] 1225 d_list = [d_count]
1194 1226
1195 if items_data: 1227 if items_data:
1196 response.first = items_data[0].item['id'] 1228 rsm_response.first = items_data[0].item['id']
1197 response.last = items_data[-1].item['id'] 1229 rsm_response.last = items_data[-1].item['id']
1198 1230
1199 # index handling 1231 # index handling
1200 if rsm_request.index is not None: 1232 if rsm_request.index is not None:
1201 response.index = rsm_request.index 1233 rsm_response.index = rsm_request.index
1202 elif rsm_request.before: 1234 elif rsm_request.before:
1203 # The last page case (before == '') is managed in render method 1235 # The last page case (before == '') is managed in render method
1204 d_index = node.getItemsIndex(rsm_request.before, authorized_groups, owner, ext_data) 1236 d_index = node.getItemsIndex(rsm_request.before, authorized_groups, owner, ext_data)
1205 d_index.addCallback(self._setIndex, response, -len(items_data)) 1237 d_index.addCallback(self._setIndex, rsm_response, -len(items_data))
1206 d_list.append(d_index) 1238 d_list.append(d_index)
1207 elif rsm_request.after is not None: 1239 elif rsm_request.after is not None:
1208 d_index = node.getItemsIndex(rsm_request.after, authorized_groups, owner, ext_data) 1240 d_index = node.getItemsIndex(rsm_request.after, authorized_groups, owner, ext_data)
1209 d_index.addCallback(self._setIndex, response, 1) 1241 d_index.addCallback(self._setIndex, rsm_response, 1)
1210 d_list.append(d_index) 1242 d_list.append(d_index)
1211 else: 1243 else:
1212 # the first page was requested 1244 # the first page was requested
1213 response.index = 0 1245 rsm_response.index = 0
1214 1246
1215 1247
1216 await defer.DeferredList(d_list) 1248 await defer.DeferredList(d_list)
1217 1249
1218 if rsm_request.before == '': 1250 if rsm_request.before == '':
1219 # the last page was requested 1251 # the last page was requested
1220 response.index = response.count - len(items_data) 1252 rsm_response.index = rsm_response.count - len(items_data)
1221 1253
1222 items_data.append(container.ItemData(response.toElement())) 1254 return items_data, rsm_response
1223 1255
1224 return items_data 1256 def addEltFromSubDict(
1257 self,
1258 parent_elt: domish.Element,
1259 from_jid: Optional[jid.JID],
1260 sub_dict: dict[str, str],
1261 namespace: Optional[str] = None,
1262 ) -> None:
1263 """Generate <subscription> element from storage.getAllSubscriptions's dict
1264
1265 @param parent_elt: element where the new subscription element must be added
1266 @param sub_dict: subscription data as returned by storage.getAllSubscriptions
1267 @param namespace: if not None, namespace to use for <subscription> element
1268 @param service_attribute: name of the attribute to use for the subscribed service
1269 """
1270 subscription_elt = parent_elt.addElement(
1271 "subscription" if namespace is None else (namespace, "subscription")
1272 )
1273 if from_jid is not None:
1274 subscription_elt["jid"] = from_jid.userhost()
1275 if sub_dict["node"] is not None:
1276 if sub_dict["pep"] is not None:
1277 subscription_elt["service"] = sub_dict["pep"]
1278 else:
1279 subscription_elt["service"] = self.jid.full()
1280 subscription_elt["node"] = sub_dict["node"]
1281 else:
1282 subscription_elt["service"] = sub_dict["ext_service"]
1283 subscription_elt["node"] = sub_dict["ext_node"]
1284 subscription_elt["subscription"] = sub_dict["state"]
1285
1286 async def getPublicSubscriptions(
1287 self,
1288 requestor: jid.JID,
1289 maxItems: Optional[int],
1290 itemIdentifiers: Optional[List[str]],
1291 ext_data: dict,
1292 pep: bool,
1293 recipient: jid.JID
1294 ) -> Tuple[List[container.ItemData], Optional[rsm.RSMResponse]]:
1295
1296 if itemIdentifiers or ext_data.get("rsm") or ext_data.get("mam"):
1297 raise NotImplementedError(
1298 "item identifiers, RSM and MAM are not implemented yet"
1299 )
1300
1301 if not pep:
1302 return [], None
1303
1304 subs = await self.storage.getAllSubscriptions(recipient, True)
1305 items_data = []
1306 for sub in subs:
1307 if sub["state"] != "subscribed":
1308 continue
1309 item = domish.Element((pubsub.NS_PUBSUB, "item"))
1310 item["id"] = sub["id"]
1311 self.addEltFromSubDict(item, None, sub, const.NS_PPS)
1312 items_data.append(container.ItemData(item))
1313
1314 return items_data, None
1315
1316 async def getPublicNodeSubscriptions(
1317 self,
1318 nodeIdentifier: str,
1319 requestor: jid.JID,
1320 maxItems: Optional[int],
1321 itemIdentifiers: Optional[List[str]],
1322 ext_data: dict,
1323 pep: bool,
1324 recipient: jid.JID
1325 ) -> Tuple[List[container.ItemData], Optional[rsm.RSMResponse]]:
1326
1327 if itemIdentifiers or ext_data.get("rsm") or ext_data.get("mam"):
1328 raise NotImplementedError(
1329 "item identifiers, RSM and MAM are not implemented yet"
1330 )
1331
1332 node = await self.storage.getNode(nodeIdentifier, pep, recipient)
1333
1334 subs = await node.getSubscriptions(public=True)
1335 items_data = []
1336 for sub in subs:
1337 item = domish.Element((pubsub.NS_PUBSUB, "item"))
1338 item["id"] = sub.id
1339 subscriber_elt = item.addElement((const.NS_PPS, "subscriber"))
1340 subscriber_elt["jid"] = sub.subscriber.full()
1341 items_data.append(container.ItemData(item))
1342
1343 return items_data, None
1225 1344
1226 async def retractItem(self, nodeIdentifier, itemIdentifiers, requestor, notify, pep, recipient): 1345 async def retractItem(self, nodeIdentifier, itemIdentifiers, requestor, notify, pep, recipient):
1227 node = await self.storage.getNode(nodeIdentifier, pep, recipient) 1346 node = await self.storage.getNode(nodeIdentifier, pep, recipient)
1228 node, affiliation = await _getAffiliation(node, requestor) 1347 node, affiliation = await _getAffiliation(node, requestor)
1229 1348
1822 )) 1941 ))
1823 d.addErrback(self._publish_errb, request) 1942 d.addErrback(self._publish_errb, request)
1824 return d.addErrback(self._mapErrors) 1943 return d.addErrback(self._mapErrors)
1825 1944
1826 def subscribe(self, request): 1945 def subscribe(self, request):
1827 d = self.backend.subscribe(request.nodeIdentifier, 1946 d = defer.ensureDeferred(
1828 request.subscriber, 1947 self.backend.subscribe(
1829 request.sender, 1948 request.nodeIdentifier,
1830 self._isPep(request), 1949 request.subscriber,
1831 request.recipient) 1950 request.sender,
1951 request.options,
1952 self._isPep(request),
1953 request.recipient
1954 )
1955 )
1832 return d.addErrback(self._mapErrors) 1956 return d.addErrback(self._mapErrors)
1833 1957
1834 def unsubscribe(self, request): 1958 def unsubscribe(self, request):
1835 d = self.backend.unsubscribe(request.nodeIdentifier, 1959 d = defer.ensureDeferred(
1960 self.backend.unsubscribe(request.nodeIdentifier,
1836 request.subscriber, 1961 request.subscriber,
1837 request.sender, 1962 request.sender,
1838 self._isPep(request), 1963 self._isPep(request),
1839 request.recipient) 1964 request.recipient)
1965 )
1840 return d.addErrback(self._mapErrors) 1966 return d.addErrback(self._mapErrors)
1841 1967
1842 def subscriptions(self, request): 1968 def subscriptions(self, request):
1843 d = self.backend.getSubscriptions(request.sender, 1969 d = self.backend.getSubscriptions(request.sender,
1844 request.nodeIdentifier, 1970 request.nodeIdentifier,
1931 try: 2057 try:
1932 ext_data['pep'] = request.delegated 2058 ext_data['pep'] = request.delegated
1933 except AttributeError: 2059 except AttributeError:
1934 pass 2060 pass
1935 ext_data['order_by'] = request.orderBy or [] 2061 ext_data['order_by'] = request.orderBy or []
1936 d = self.backend.getItems(request.nodeIdentifier, 2062 d = defer.ensureDeferred(
1937 request.sender, 2063 self.backend.getItems(
1938 request.recipient, 2064 request.nodeIdentifier,
1939 request.maxItems, 2065 request.sender,
1940 request.itemIdentifiers, 2066 request.recipient,
1941 ext_data) 2067 request.maxItems,
2068 request.itemIdentifiers,
2069 ext_data
2070 )
2071 )
1942 return d.addErrback(self._mapErrors) 2072 return d.addErrback(self._mapErrors)
1943 2073
1944 def retract(self, request): 2074 def retract(self, request):
1945 d = defer.ensureDeferred( 2075 d = defer.ensureDeferred(
1946 self.backend.retractItem(request.nodeIdentifier, 2076 self.backend.retractItem(request.nodeIdentifier,
1985 return [ 2115 return [
1986 disco.DiscoFeature(rsm.NS_RSM), 2116 disco.DiscoFeature(rsm.NS_RSM),
1987 # cf. https://xmpp.org/extensions/xep-0060.html#subscriber-retrieve-returnsome 2117 # cf. https://xmpp.org/extensions/xep-0060.html#subscriber-retrieve-returnsome
1988 disco.DiscoFeature(const.NS_PUBSUB_RSM), 2118 disco.DiscoFeature(const.NS_PUBSUB_RSM),
1989 disco.DiscoFeature(pubsub.NS_ORDER_BY), 2119 disco.DiscoFeature(pubsub.NS_ORDER_BY),
1990 disco.DiscoFeature(const.NS_FDP) 2120 disco.DiscoFeature(const.NS_FDP),
2121 disco.DiscoFeature(const.NS_PPS)
1991 ] 2122 ]
1992 2123
1993 def getDiscoItems(self, requestor, service, nodeIdentifier=''): 2124 def getDiscoItems(self, requestor, service, nodeIdentifier=''):
1994 return [] 2125 return []