Mercurial > libervia-pubsub
comparison sat_pubsub/backend.py @ 478:b544109ab4c4
Privileged Entity update + Pubsub Account Management partial implementation + Public Pubsub Subscription
/!\ pgsql schema needs to be updated /!\
/!\ server conf needs to be updated for privileged entity: only the new
`urn:xmpp:privilege:2` namespace is handled now /!\
Privileged entity has been updated to hanlde the new namespace and IQ permission. Roster
pushes are not managed yet.
XEP-0376 (Pubsub Account Management) is partially implemented. The XEP is not fully
specified at the moment, and my messages on standard@ haven't seen any reply. Thus for now
only "Subscribing", "Unsubscribing" and "Listing Subscriptions" is implemented, "Auto
Subscriptions" and "Filtering" is not.
Public Pubsub Subscription
(https://xmpp.org/extensions/inbox/pubsub-public-subscriptions.html) is implemented;
the XEP has been accepted by council but is not yet published. It will be updated to use
subscription options instead of the <public> element actually specified, I'm waiting
for publication to update the XEP.
unsubscribe has been updated to return the `<subscription>` element as expected by
XEP-0060 (sat_tmp needs to be updated).
database schema has been updated to add columns necessary to keep track of subscriptions
to external nodes and to mark subscriptions as public.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 11 May 2022 13:39:08 +0200 |
parents | ed9e12701e0f |
children | 0e801ae1869f |
comparison
equal
deleted
inserted
replaced
477:9125a6e440c0 | 478:b544109ab4c4 |
---|---|
60 publish-subscribe protocol. | 60 publish-subscribe protocol. |
61 """ | 61 """ |
62 | 62 |
63 import copy | 63 import copy |
64 import uuid | 64 import uuid |
65 import hashlib | |
65 from typing import Optional, List, Tuple | 66 from typing import Optional, List, Tuple |
66 | 67 |
67 from zope.interface import implementer | 68 from zope.interface import implementer |
68 | 69 |
69 from twisted.application import service | 70 from twisted.application import service |
238 self.nodeOptions[const.OPT_FTS_LANGUAGE]["options"][l] = l.title() | 239 self.nodeOptions[const.OPT_FTS_LANGUAGE]["options"][l] = l.title() |
239 | 240 |
240 def _getFTSLanguagesEb(self, failure_): | 241 def _getFTSLanguagesEb(self, failure_): |
241 log.msg(f"WARNING: can get FTS languages: {failure_}") | 242 log.msg(f"WARNING: can get FTS languages: {failure_}") |
242 | 243 |
243 def isAdmin(self, entity_jid): | 244 def isAdmin(self, entity_jid: jid.JID) -> bool: |
244 """Return True if an entity is an administrator""" | 245 """Return True if an entity is an administrator""" |
245 return entity_jid.userhostJID() in self.admins | 246 return entity_jid.userhostJID() in self.admins |
247 | |
248 def isFromServer(self, entity_jid: jid.JID) -> bool: | |
249 """Return True if an entity come from our server""" | |
250 return entity_jid.host == self.server_jid.host | |
246 | 251 |
247 def supportsPublishOptions(self): | 252 def supportsPublishOptions(self): |
248 return True | 253 return True |
249 | 254 |
250 def supportsPublisherAffiliation(self): | 255 def supportsPublisherAffiliation(self): |
593 self.addObserver('//event/pubsub/delete', observerfn, *args, **kwargs) | 598 self.addObserver('//event/pubsub/delete', observerfn, *args, **kwargs) |
594 | 599 |
595 def registerPurgeNotifier(self, observerfn, *args, **kwargs): | 600 def registerPurgeNotifier(self, observerfn, *args, **kwargs): |
596 self.addObserver('//event/pubsub/purge', observerfn, *args, **kwargs) | 601 self.addObserver('//event/pubsub/purge', observerfn, *args, **kwargs) |
597 | 602 |
598 def subscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient): | 603 async def subscribe( |
604 self, | |
605 nodeIdentifier: str, | |
606 subscriber: jid.JID, | |
607 requestor: jid.JID, | |
608 options: Optional[dict], | |
609 pep: bool, | |
610 recipient: jid.JID | |
611 ) -> pubsub.Subscription: | |
599 subscriberEntity = subscriber.userhostJID() | 612 subscriberEntity = subscriber.userhostJID() |
600 if subscriberEntity != requestor.userhostJID(): | 613 if subscriberEntity != requestor.userhostJID(): |
601 return defer.fail(error.Forbidden()) | 614 raise error.Forbidden() |
602 | 615 |
603 d = self.storage.getNode(nodeIdentifier, pep, recipient) | 616 node = await self.storage.getNode(nodeIdentifier, pep, recipient) |
604 d.addCallback(_getAffiliation, subscriberEntity) | 617 __, affiliation = await _getAffiliation(node, subscriberEntity) |
605 d.addCallback(self._doSubscribe, subscriber, pep, recipient) | |
606 return d | |
607 | |
608 def _doSubscribe(self, result, subscriber, pep, recipient): | |
609 node, affiliation = result | |
610 | 618 |
611 if affiliation == 'outcast': | 619 if affiliation == 'outcast': |
612 raise error.Forbidden() | 620 raise error.Forbidden() |
613 | 621 |
614 access_model = node.getAccessModel() | 622 access_model = node.getAccessModel() |
615 | 623 |
616 if access_model == const.VAL_AMODEL_OPEN: | 624 if access_model == const.VAL_AMODEL_OPEN: |
617 d = defer.succeed(None) | 625 pass |
618 elif access_model == const.VAL_AMODEL_PRESENCE: | 626 elif access_model == const.VAL_AMODEL_PRESENCE: |
619 d = self.checkPresenceSubscription(node, subscriber) | 627 await self.checkPresenceSubscription(node, subscriber) |
620 elif access_model == const.VAL_AMODEL_PUBLISHER_ROSTER: | 628 elif access_model == const.VAL_AMODEL_PUBLISHER_ROSTER: |
621 d = self.checkRosterGroups(node, subscriber) | 629 await self.checkRosterGroups(node, subscriber) |
622 elif access_model == const.VAL_AMODEL_WHITELIST: | 630 elif access_model == const.VAL_AMODEL_WHITELIST: |
623 d = self.checkNodeAffiliations(node, subscriber) | 631 await self.checkNodeAffiliations(node, subscriber) |
624 else: | 632 else: |
625 raise NotImplementedError | 633 raise NotImplementedError |
626 | 634 |
627 def trapExists(failure): | 635 config = {} |
628 failure.trap(error.SubscriptionExists) | 636 if options and options.get(f"{{{const.NS_PPS}}}public"): |
629 return False | 637 config["public"] = True |
630 | 638 try: |
631 def cb(sendLast): | 639 await node.addSubscription(subscriber, 'subscribed', config) |
632 d = node.getSubscription(subscriber) | 640 except error.SubscriptionExists: |
633 if sendLast: | 641 send_last = False |
634 d.addCallback(self._sendLastPublished, node, pep, recipient) | 642 else: |
635 return d | 643 send_last = True |
636 | 644 |
637 d.addCallback(lambda _: node.addSubscription(subscriber, 'subscribed', {})) | 645 subscription = await node.getSubscription(subscriber) |
638 d.addCallbacks(lambda _: True, trapExists) | 646 |
639 d.addCallback(cb) | 647 if send_last: |
640 | 648 config = node.getConfiguration() |
641 return d | 649 sendLastPublished = config.get( |
642 | 650 'pubsub#send_last_published_item', 'never' |
643 def _sendLastPublished(self, subscription, node, pep, recipient): | 651 ) |
644 | 652 if sendLastPublished == 'on_sub' and node.nodeType == 'leaf': |
645 def notifyItem(items_data): | 653 entity = subscription.subscriber.userhostJID() |
646 if items_data: | 654 items_data, __ = await self.getItemsData( |
647 reactor.callLater(0, self.dispatch, | 655 node.nodeIdentifier, entity, recipient, maxItems=1, |
648 {'items_data': items_data, | 656 ext_data={'pep': pep} |
649 'node': node, | |
650 'pep': pep, | |
651 'recipient': recipient, | |
652 'subscription': subscription, | |
653 }, | |
654 '//event/pubsub/notify') | |
655 | |
656 config = node.getConfiguration() | |
657 sendLastPublished = config.get('pubsub#send_last_published_item', | |
658 'never') | |
659 if sendLastPublished == 'on_sub' and node.nodeType == 'leaf': | |
660 entity = subscription.subscriber.userhostJID() | |
661 d = defer.ensureDeferred( | |
662 self.getItemsData( | |
663 node.nodeIdentifier, entity, recipient, maxItems=1, ext_data={'pep': pep} | |
664 ) | 657 ) |
665 ) | 658 if items_data: |
666 d.addCallback(notifyItem) | 659 reactor.callLater( |
667 d.addErrback(log.err) | 660 0, |
661 self.dispatch, | |
662 {'items_data': items_data, | |
663 'node': node, | |
664 'pep': pep, | |
665 'recipient': recipient, | |
666 'subscription': subscription, | |
667 }, | |
668 '//event/pubsub/notify' | |
669 ) | |
668 | 670 |
669 return subscription | 671 return subscription |
670 | 672 |
671 def unsubscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient): | 673 async def unsubscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient): |
672 if subscriber.userhostJID() != requestor.userhostJID(): | 674 if subscriber.userhostJID() != requestor.userhostJID(): |
673 return defer.fail(error.Forbidden()) | 675 raise error.Forbidden() |
674 | 676 |
675 d = self.storage.getNode(nodeIdentifier, pep, recipient) | 677 node = await self.storage.getNode(nodeIdentifier, pep, recipient) |
676 d.addCallback(lambda node: node.removeSubscription(subscriber)) | 678 await node.removeSubscription(subscriber) |
677 return d | 679 return pubsub.Subscription(nodeIdentifier, subscriber, "none") |
678 | 680 |
679 def getSubscriptions(self, requestor, nodeIdentifier, pep, recipient): | 681 def getSubscriptions(self, requestor, nodeIdentifier, pep, recipient): |
680 """retrieve subscriptions of an entity | 682 """retrieve subscriptions of an entity |
681 | 683 |
682 @param requestor(jid.JID): entity who want to check subscriptions | 684 @param requestor(jid.JID): entity who want to check subscriptions |
683 @param nodeIdentifier(unicode, None): identifier of the node | 685 @param nodeIdentifier(unicode, None): identifier of the node |
684 node to get all subscriptions of a service | 686 node to get all subscriptions of a service |
685 @param pep(bool): True if it's a PEP request | 687 @param pep(bool): True if it's a PEP request |
686 @param recipient(jid.JID, None): recipient of the PEP request | 688 @param recipient(jid.JID, None): recipient of the PEP request |
687 """ | 689 """ |
688 return self.storage.getSubscriptions(requestor, nodeIdentifier, pep, recipient) | 690 return self.storage.getSubscriptions( |
691 requestor, nodeIdentifier, None, pep, recipient | |
692 ) | |
689 | 693 |
690 def supportsAutoCreate(self): | 694 def supportsAutoCreate(self): |
691 return True | 695 return True |
692 | 696 |
693 def supportsInstantNodes(self): | 697 def supportsInstantNodes(self): |
746 return d | 750 return d |
747 | 751 |
748 def getNodeConfiguration(self, nodeIdentifier, pep, recipient): | 752 def getNodeConfiguration(self, nodeIdentifier, pep, recipient): |
749 if not nodeIdentifier: | 753 if not nodeIdentifier: |
750 return defer.fail(error.NoRootNode()) | 754 return defer.fail(error.NoRootNode()) |
755 | |
756 if ((nodeIdentifier == const.NS_PPS_SUBSCRIPTIONS | |
757 or nodeIdentifier.startswith(const.PPS_SUBSCRIBERS_PREFIX))): | |
758 return defer.succeed({const.OPT_ACCESS_MODEL: const.VAL_AMODEL_OPEN}) | |
751 | 759 |
752 d = self.storage.getNode(nodeIdentifier, pep, recipient) | 760 d = self.storage.getNode(nodeIdentifier, pep, recipient) |
753 d.addCallback(lambda node: node.getConfiguration()) | 761 d.addCallback(lambda node: node.getConfiguration()) |
754 | 762 |
755 return d | 763 return d |
1065 maxItems, | 1073 maxItems, |
1066 ext_data | 1074 ext_data |
1067 ) | 1075 ) |
1068 return ids | 1076 return ids |
1069 | 1077 |
1070 def getItems(self, nodeIdentifier, requestor, recipient, maxItems=None, | 1078 async def getItems( |
1071 itemIdentifiers=None, ext_data=None): | 1079 self, |
1072 d = defer.ensureDeferred( | 1080 nodeIdentifier: str, |
1073 self.getItemsData( | 1081 requestor: jid.JID, |
1074 nodeIdentifier, requestor, recipient, maxItems, itemIdentifiers, ext_data | 1082 recipient: jid.JID, |
1075 ) | 1083 maxItems: Optional[int] = None, |
1084 itemIdentifiers: Optional[List[str]] = None, | |
1085 ext_data: Optional[dict] = None | |
1086 ) -> Tuple[List[domish.Element], Optional[rsm.RSMResponse]]: | |
1087 items_data, rsm_response = await self.getItemsData( | |
1088 nodeIdentifier, requestor, recipient, maxItems, itemIdentifiers, ext_data | |
1076 ) | 1089 ) |
1077 d.addCallback(lambda items_data: [item_data.item for item_data in items_data]) | 1090 return [item_data.item for item_data in items_data], rsm_response |
1078 return d | |
1079 | 1091 |
1080 async def getOwnerRoster(self, node, owners=None): | 1092 async def getOwnerRoster(self, node, owners=None): |
1081 # FIXME: roster of publisher, not owner, must be used | 1093 # FIXME: roster of publisher, not owner, must be used |
1082 if owners is None: | 1094 if owners is None: |
1083 owners = await node.getOwners() | 1095 owners = await node.getOwners() |
1097 owner_jid = owner_jid.full(), | 1109 owner_jid = owner_jid.full(), |
1098 msg = e)) | 1110 msg = e)) |
1099 return | 1111 return |
1100 return roster | 1112 return roster |
1101 | 1113 |
1102 async def getItemsData(self, nodeIdentifier, requestor, recipient, maxItems=None, | 1114 async def getItemsData( |
1103 itemIdentifiers=None, ext_data=None): | 1115 self, |
1116 nodeIdentifier: str, | |
1117 requestor: jid.JID, | |
1118 recipient: jid.JID, | |
1119 maxItems: Optional[int] = None, | |
1120 itemIdentifiers: Optional[List[str]] = None, | |
1121 ext_data: Optional[dict] = None | |
1122 ) -> Tuple[List[container.ItemData], Optional[rsm.RSMResponse]]: | |
1104 """like getItems but return the whole ItemData""" | 1123 """like getItems but return the whole ItemData""" |
1105 if maxItems == 0: | 1124 if maxItems == 0: |
1106 log.msg("WARNING: maxItems=0 on items retrieval") | 1125 log.msg("WARNING: maxItems=0 on items retrieval") |
1107 return [] | 1126 return [], None |
1108 | 1127 |
1109 if ext_data is None: | 1128 if ext_data is None: |
1110 ext_data = {} | 1129 ext_data = {} |
1130 | |
1131 if nodeIdentifier == const.NS_PPS_SUBSCRIPTIONS: | |
1132 return await self.getPublicSubscriptions( | |
1133 requestor, maxItems, itemIdentifiers, ext_data, | |
1134 ext_data.pop("pep", False), recipient | |
1135 ) | |
1136 elif nodeIdentifier.startswith(f"{const.NS_PPS_SUBSCRIBERS}/"): | |
1137 target_node = nodeIdentifier[len(const.NS_PPS_SUBSCRIBERS)+1:] | |
1138 return await self.getPublicNodeSubscriptions( | |
1139 target_node, requestor, maxItems, itemIdentifiers, ext_data, | |
1140 ext_data.pop("pep", False), recipient | |
1141 ) | |
1142 | |
1111 node = await self.storage.getNode(nodeIdentifier, ext_data.get('pep', False), recipient) | 1143 node = await self.storage.getNode(nodeIdentifier, ext_data.get('pep', False), recipient) |
1112 try: | 1144 try: |
1113 affiliation, owner, roster, access_model = await self.checkNodeAccess(node, requestor) | 1145 affiliation, owner, roster, access_model = await self.checkNodeAccess(node, requestor) |
1114 except error.NotLeafNodeError: | 1146 except error.NotLeafNodeError: |
1115 return [] | 1147 return [], None |
1116 | 1148 |
1117 # at this point node access is checked | 1149 # at this point node access is checked |
1118 | 1150 |
1119 if owner: | 1151 if owner: |
1120 # requestor_groups is only used in restricted access | 1152 # requestor_groups is only used in restricted access |
1153 | 1185 |
1154 schema = node.getSchema() | 1186 schema = node.getSchema() |
1155 if schema is not None: | 1187 if schema is not None: |
1156 self.filterItemsWithSchema(items_data, schema, owner) | 1188 self.filterItemsWithSchema(items_data, schema, owner) |
1157 | 1189 |
1158 await self._items_rsm( | 1190 return await self._items_rsm( |
1159 items_data, node, requestor_groups, owner, itemIdentifiers, ext_data) | 1191 items_data, node, requestor_groups, owner, itemIdentifiers, ext_data |
1160 return items_data | 1192 ) |
1161 | 1193 |
1162 def _setCount(self, value, response): | 1194 def _setCount(self, value, response): |
1163 response.count = value | 1195 response.count = value |
1164 | 1196 |
1165 def _setIndex(self, value, response, adjust): | 1197 def _setIndex(self, value, response, adjust): |
1178 # An other optimisation would be to look for index first and use it as offset | 1210 # An other optimisation would be to look for index first and use it as offset |
1179 try: | 1211 try: |
1180 rsm_request = ext_data['rsm'] | 1212 rsm_request = ext_data['rsm'] |
1181 except KeyError: | 1213 except KeyError: |
1182 # No RSM in this request, nothing to do | 1214 # No RSM in this request, nothing to do |
1183 return items_data | 1215 return items_data, None |
1184 | 1216 |
1185 if itemIdentifiers: | 1217 if itemIdentifiers: |
1186 log.msg("WARNING, itemIdentifiers used with RSM, ignoring the RSM part") | 1218 log.msg("WARNING, itemIdentifiers used with RSM, ignoring the RSM part") |
1187 return items_data | 1219 return items_data, None |
1188 | 1220 |
1189 response = rsm.RSMResponse() | 1221 rsm_response = rsm.RSMResponse() |
1190 | 1222 |
1191 d_count = node.getItemsCount(authorized_groups, owner, ext_data) | 1223 d_count = node.getItemsCount(authorized_groups, owner, ext_data) |
1192 d_count.addCallback(self._setCount, response) | 1224 d_count.addCallback(self._setCount, rsm_response) |
1193 d_list = [d_count] | 1225 d_list = [d_count] |
1194 | 1226 |
1195 if items_data: | 1227 if items_data: |
1196 response.first = items_data[0].item['id'] | 1228 rsm_response.first = items_data[0].item['id'] |
1197 response.last = items_data[-1].item['id'] | 1229 rsm_response.last = items_data[-1].item['id'] |
1198 | 1230 |
1199 # index handling | 1231 # index handling |
1200 if rsm_request.index is not None: | 1232 if rsm_request.index is not None: |
1201 response.index = rsm_request.index | 1233 rsm_response.index = rsm_request.index |
1202 elif rsm_request.before: | 1234 elif rsm_request.before: |
1203 # The last page case (before == '') is managed in render method | 1235 # The last page case (before == '') is managed in render method |
1204 d_index = node.getItemsIndex(rsm_request.before, authorized_groups, owner, ext_data) | 1236 d_index = node.getItemsIndex(rsm_request.before, authorized_groups, owner, ext_data) |
1205 d_index.addCallback(self._setIndex, response, -len(items_data)) | 1237 d_index.addCallback(self._setIndex, rsm_response, -len(items_data)) |
1206 d_list.append(d_index) | 1238 d_list.append(d_index) |
1207 elif rsm_request.after is not None: | 1239 elif rsm_request.after is not None: |
1208 d_index = node.getItemsIndex(rsm_request.after, authorized_groups, owner, ext_data) | 1240 d_index = node.getItemsIndex(rsm_request.after, authorized_groups, owner, ext_data) |
1209 d_index.addCallback(self._setIndex, response, 1) | 1241 d_index.addCallback(self._setIndex, rsm_response, 1) |
1210 d_list.append(d_index) | 1242 d_list.append(d_index) |
1211 else: | 1243 else: |
1212 # the first page was requested | 1244 # the first page was requested |
1213 response.index = 0 | 1245 rsm_response.index = 0 |
1214 | 1246 |
1215 | 1247 |
1216 await defer.DeferredList(d_list) | 1248 await defer.DeferredList(d_list) |
1217 | 1249 |
1218 if rsm_request.before == '': | 1250 if rsm_request.before == '': |
1219 # the last page was requested | 1251 # the last page was requested |
1220 response.index = response.count - len(items_data) | 1252 rsm_response.index = rsm_response.count - len(items_data) |
1221 | 1253 |
1222 items_data.append(container.ItemData(response.toElement())) | 1254 return items_data, rsm_response |
1223 | 1255 |
1224 return items_data | 1256 def addEltFromSubDict( |
1257 self, | |
1258 parent_elt: domish.Element, | |
1259 from_jid: Optional[jid.JID], | |
1260 sub_dict: dict[str, str], | |
1261 namespace: Optional[str] = None, | |
1262 ) -> None: | |
1263 """Generate <subscription> element from storage.getAllSubscriptions's dict | |
1264 | |
1265 @param parent_elt: element where the new subscription element must be added | |
1266 @param sub_dict: subscription data as returned by storage.getAllSubscriptions | |
1267 @param namespace: if not None, namespace to use for <subscription> element | |
1268 @param service_attribute: name of the attribute to use for the subscribed service | |
1269 """ | |
1270 subscription_elt = parent_elt.addElement( | |
1271 "subscription" if namespace is None else (namespace, "subscription") | |
1272 ) | |
1273 if from_jid is not None: | |
1274 subscription_elt["jid"] = from_jid.userhost() | |
1275 if sub_dict["node"] is not None: | |
1276 if sub_dict["pep"] is not None: | |
1277 subscription_elt["service"] = sub_dict["pep"] | |
1278 else: | |
1279 subscription_elt["service"] = self.jid.full() | |
1280 subscription_elt["node"] = sub_dict["node"] | |
1281 else: | |
1282 subscription_elt["service"] = sub_dict["ext_service"] | |
1283 subscription_elt["node"] = sub_dict["ext_node"] | |
1284 subscription_elt["subscription"] = sub_dict["state"] | |
1285 | |
1286 async def getPublicSubscriptions( | |
1287 self, | |
1288 requestor: jid.JID, | |
1289 maxItems: Optional[int], | |
1290 itemIdentifiers: Optional[List[str]], | |
1291 ext_data: dict, | |
1292 pep: bool, | |
1293 recipient: jid.JID | |
1294 ) -> Tuple[List[container.ItemData], Optional[rsm.RSMResponse]]: | |
1295 | |
1296 if itemIdentifiers or ext_data.get("rsm") or ext_data.get("mam"): | |
1297 raise NotImplementedError( | |
1298 "item identifiers, RSM and MAM are not implemented yet" | |
1299 ) | |
1300 | |
1301 if not pep: | |
1302 return [], None | |
1303 | |
1304 subs = await self.storage.getAllSubscriptions(recipient, True) | |
1305 items_data = [] | |
1306 for sub in subs: | |
1307 if sub["state"] != "subscribed": | |
1308 continue | |
1309 item = domish.Element((pubsub.NS_PUBSUB, "item")) | |
1310 item["id"] = sub["id"] | |
1311 self.addEltFromSubDict(item, None, sub, const.NS_PPS) | |
1312 items_data.append(container.ItemData(item)) | |
1313 | |
1314 return items_data, None | |
1315 | |
1316 async def getPublicNodeSubscriptions( | |
1317 self, | |
1318 nodeIdentifier: str, | |
1319 requestor: jid.JID, | |
1320 maxItems: Optional[int], | |
1321 itemIdentifiers: Optional[List[str]], | |
1322 ext_data: dict, | |
1323 pep: bool, | |
1324 recipient: jid.JID | |
1325 ) -> Tuple[List[container.ItemData], Optional[rsm.RSMResponse]]: | |
1326 | |
1327 if itemIdentifiers or ext_data.get("rsm") or ext_data.get("mam"): | |
1328 raise NotImplementedError( | |
1329 "item identifiers, RSM and MAM are not implemented yet" | |
1330 ) | |
1331 | |
1332 node = await self.storage.getNode(nodeIdentifier, pep, recipient) | |
1333 | |
1334 subs = await node.getSubscriptions(public=True) | |
1335 items_data = [] | |
1336 for sub in subs: | |
1337 item = domish.Element((pubsub.NS_PUBSUB, "item")) | |
1338 item["id"] = sub.id | |
1339 subscriber_elt = item.addElement((const.NS_PPS, "subscriber")) | |
1340 subscriber_elt["jid"] = sub.subscriber.full() | |
1341 items_data.append(container.ItemData(item)) | |
1342 | |
1343 return items_data, None | |
1225 | 1344 |
1226 async def retractItem(self, nodeIdentifier, itemIdentifiers, requestor, notify, pep, recipient): | 1345 async def retractItem(self, nodeIdentifier, itemIdentifiers, requestor, notify, pep, recipient): |
1227 node = await self.storage.getNode(nodeIdentifier, pep, recipient) | 1346 node = await self.storage.getNode(nodeIdentifier, pep, recipient) |
1228 node, affiliation = await _getAffiliation(node, requestor) | 1347 node, affiliation = await _getAffiliation(node, requestor) |
1229 | 1348 |
1822 )) | 1941 )) |
1823 d.addErrback(self._publish_errb, request) | 1942 d.addErrback(self._publish_errb, request) |
1824 return d.addErrback(self._mapErrors) | 1943 return d.addErrback(self._mapErrors) |
1825 | 1944 |
1826 def subscribe(self, request): | 1945 def subscribe(self, request): |
1827 d = self.backend.subscribe(request.nodeIdentifier, | 1946 d = defer.ensureDeferred( |
1828 request.subscriber, | 1947 self.backend.subscribe( |
1829 request.sender, | 1948 request.nodeIdentifier, |
1830 self._isPep(request), | 1949 request.subscriber, |
1831 request.recipient) | 1950 request.sender, |
1951 request.options, | |
1952 self._isPep(request), | |
1953 request.recipient | |
1954 ) | |
1955 ) | |
1832 return d.addErrback(self._mapErrors) | 1956 return d.addErrback(self._mapErrors) |
1833 | 1957 |
1834 def unsubscribe(self, request): | 1958 def unsubscribe(self, request): |
1835 d = self.backend.unsubscribe(request.nodeIdentifier, | 1959 d = defer.ensureDeferred( |
1960 self.backend.unsubscribe(request.nodeIdentifier, | |
1836 request.subscriber, | 1961 request.subscriber, |
1837 request.sender, | 1962 request.sender, |
1838 self._isPep(request), | 1963 self._isPep(request), |
1839 request.recipient) | 1964 request.recipient) |
1965 ) | |
1840 return d.addErrback(self._mapErrors) | 1966 return d.addErrback(self._mapErrors) |
1841 | 1967 |
1842 def subscriptions(self, request): | 1968 def subscriptions(self, request): |
1843 d = self.backend.getSubscriptions(request.sender, | 1969 d = self.backend.getSubscriptions(request.sender, |
1844 request.nodeIdentifier, | 1970 request.nodeIdentifier, |
1931 try: | 2057 try: |
1932 ext_data['pep'] = request.delegated | 2058 ext_data['pep'] = request.delegated |
1933 except AttributeError: | 2059 except AttributeError: |
1934 pass | 2060 pass |
1935 ext_data['order_by'] = request.orderBy or [] | 2061 ext_data['order_by'] = request.orderBy or [] |
1936 d = self.backend.getItems(request.nodeIdentifier, | 2062 d = defer.ensureDeferred( |
1937 request.sender, | 2063 self.backend.getItems( |
1938 request.recipient, | 2064 request.nodeIdentifier, |
1939 request.maxItems, | 2065 request.sender, |
1940 request.itemIdentifiers, | 2066 request.recipient, |
1941 ext_data) | 2067 request.maxItems, |
2068 request.itemIdentifiers, | |
2069 ext_data | |
2070 ) | |
2071 ) | |
1942 return d.addErrback(self._mapErrors) | 2072 return d.addErrback(self._mapErrors) |
1943 | 2073 |
1944 def retract(self, request): | 2074 def retract(self, request): |
1945 d = defer.ensureDeferred( | 2075 d = defer.ensureDeferred( |
1946 self.backend.retractItem(request.nodeIdentifier, | 2076 self.backend.retractItem(request.nodeIdentifier, |
1985 return [ | 2115 return [ |
1986 disco.DiscoFeature(rsm.NS_RSM), | 2116 disco.DiscoFeature(rsm.NS_RSM), |
1987 # cf. https://xmpp.org/extensions/xep-0060.html#subscriber-retrieve-returnsome | 2117 # cf. https://xmpp.org/extensions/xep-0060.html#subscriber-retrieve-returnsome |
1988 disco.DiscoFeature(const.NS_PUBSUB_RSM), | 2118 disco.DiscoFeature(const.NS_PUBSUB_RSM), |
1989 disco.DiscoFeature(pubsub.NS_ORDER_BY), | 2119 disco.DiscoFeature(pubsub.NS_ORDER_BY), |
1990 disco.DiscoFeature(const.NS_FDP) | 2120 disco.DiscoFeature(const.NS_FDP), |
2121 disco.DiscoFeature(const.NS_PPS) | |
1991 ] | 2122 ] |
1992 | 2123 |
1993 def getDiscoItems(self, requestor, service, nodeIdentifier=''): | 2124 def getDiscoItems(self, requestor, service, nodeIdentifier=''): |
1994 return [] | 2125 return [] |