Mercurial > libervia-pubsub
comparison sat_pubsub/backend.py @ 431:5e8b8ef5c862
implentation of XEP-0346 (Form Discovery and Publishing):
The former non standard node schema has been replaced by XEP-0346 which uses 2 nodes (one
from schema/template and one for submitted values).
The implementation is an adapation of the former one, and data validation is still done
even if this is not currently specified in any XEP.
When the template node is modified, the change is reflected in the node schema.
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 11 Dec 2020 17:19:00 +0100 |
parents | 5a0ada3b61ca |
children | 454f61a32427 |
comparison
equal
deleted
inserted
replaced
430:5a0ada3b61ca | 431:5e8b8ef5c862 |
---|---|
60 publish-subscribe protocol. | 60 publish-subscribe protocol. |
61 """ | 61 """ |
62 | 62 |
63 import copy | 63 import copy |
64 import uuid | 64 import uuid |
65 from typing import Optional | |
65 | 66 |
66 from zope.interface import implementer | 67 from zope.interface import implementer |
67 | 68 |
68 from twisted.application import service | 69 from twisted.application import service |
69 from twisted.python import components, log | 70 from twisted.python import components, log |
70 from twisted.internet import defer, reactor | 71 from twisted.internet import defer, reactor |
71 from twisted.words.protocols.jabber.error import StanzaError | 72 from twisted.words.protocols.jabber.error import StanzaError |
73 from twisted.words.protocols.jabber import jid | |
72 from twisted.words.xish import domish, utility | 74 from twisted.words.xish import domish, utility |
73 | 75 |
74 from wokkel import disco | 76 from wokkel import disco |
75 from wokkel import data_form | 77 from wokkel import data_form |
76 from wokkel import rsm | 78 from wokkel import rsm |
418 | 420 |
419 d = node.getItemsPublishers(itemIdentifiers) | 421 d = node.getItemsPublishers(itemIdentifiers) |
420 d.addCallback(doCheck) | 422 d.addCallback(doCheck) |
421 return d | 423 return d |
422 | 424 |
425 def _getFDPSubmittedNode( | |
426 self, | |
427 nodeIdentifier: str, | |
428 pep: bool, | |
429 recipient: jid.JID, | |
430 ) -> Optional[defer.Deferred]: | |
431 """Get submitted forms node for Form Discovery and Publishing | |
432 | |
433 @param nodeIdentifier: template node (must start with const.FDP_TEMPLATE_PREFIX) | |
434 @ĂȘeturn: node or None if the node doesn't exist | |
435 """ | |
436 app_ns = nodeIdentifier[len(const.FDP_TEMPLATE_PREFIX):] | |
437 submitted_node_id = f"{const.FDP_SUBMITTED_PREFIX}{app_ns}" | |
438 try: | |
439 return self.storage.getNode(submitted_node_id, pep, recipient) | |
440 except error.NodeNotFound: | |
441 return None | |
442 | |
423 async def publish(self, nodeIdentifier, items, requestor, options, pep, recipient): | 443 async def publish(self, nodeIdentifier, items, requestor, options, pep, recipient): |
444 if len(items) == 0: | |
445 raise pubsub.BadRequest(text="no item to publish") | |
424 node = await self.storage.getNode(nodeIdentifier, pep, recipient) | 446 node = await self.storage.getNode(nodeIdentifier, pep, recipient) |
425 affiliation, node = await self._checkAuth(node, requestor) | 447 affiliation, node = await self._checkAuth(node, requestor) |
426 | 448 |
427 if node.nodeType == 'collection': | 449 if node.nodeType == 'collection': |
428 raise error.NoPublishing() | 450 raise error.NoPublishing() |
508 else: | 530 else: |
509 # we don't want a publisher to overwrite the item | 531 # we don't want a publisher to overwrite the item |
510 # of an other publisher | 532 # of an other publisher |
511 await self._checkOverwrite(node, itemIdentifiers, requestor) | 533 await self._checkOverwrite(node, itemIdentifiers, requestor) |
512 | 534 |
535 if node.nodeIdentifier.startswith(const.FDP_TEMPLATE_PREFIX): | |
536 schema_item = items_data[-1].item | |
537 try: | |
538 schema = next(schema_item.elements(data_form.NS_X_DATA, 'x')) | |
539 except StopIteration: | |
540 raise pubsub.BadRequest(text="Data form is missing in FDP request") | |
541 submitted_node = await self._getFDPSubmittedNode( | |
542 node.nodeIdentifier, pep, recipient) | |
543 if submitted_node is not None: | |
544 await submitted_node.setSchema(schema) | |
545 | |
513 # TODO: check conflict and recalculate max id if serial_ids is set | 546 # TODO: check conflict and recalculate max id if serial_ids is set |
514 await node.storeItems(items_data, requestor) | 547 await node.storeItems(items_data, requestor) |
515 | 548 |
516 self._doNotify(node, items_data, deliverPayloads, pep, recipient) | 549 self._doNotify(node, items_data, deliverPayloads, pep, recipient) |
517 return ret_payload | 550 return ret_payload |
626 config = node.getConfiguration() | 659 config = node.getConfiguration() |
627 sendLastPublished = config.get('pubsub#send_last_published_item', | 660 sendLastPublished = config.get('pubsub#send_last_published_item', |
628 'never') | 661 'never') |
629 if sendLastPublished == 'on_sub' and node.nodeType == 'leaf': | 662 if sendLastPublished == 'on_sub' and node.nodeType == 'leaf': |
630 entity = subscription.subscriber.userhostJID() | 663 entity = subscription.subscriber.userhostJID() |
631 d = self.getItemsData(node.nodeIdentifier, entity, recipient, maxItems=1, ext_data={'pep': pep}) | 664 d = defer.ensureDeferred( |
665 self.getItemsData( | |
666 node.nodeIdentifier, entity, recipient, maxItems=1, ext_data={'pep': pep} | |
667 ) | |
668 ) | |
632 d.addCallback(notifyItem) | 669 d.addCallback(notifyItem) |
633 d.addErrback(log.err) | 670 d.addErrback(log.err) |
634 | 671 |
635 return subscription | 672 return subscription |
636 | 673 |
732 "number of items on this node. Please increase \"max_items\" " | 769 "number of items on this node. Please increase \"max_items\" " |
733 "or delete items from this node") | 770 "or delete items from this node") |
734 | 771 |
735 return await node.setConfiguration(options) | 772 return await node.setConfiguration(options) |
736 | 773 |
737 def getNodeSchema(self, nodeIdentifier, pep, recipient): | |
738 if not nodeIdentifier: | |
739 return defer.fail(error.NoRootNode()) | |
740 | |
741 d = self.storage.getNode(nodeIdentifier, pep, recipient) | |
742 d.addCallback(lambda node: node.getSchema()) | |
743 | |
744 return d | |
745 | |
746 def setNodeSchema(self, nodeIdentifier, schema, requestor, pep, recipient): | |
747 """set or remove Schema of a node | |
748 | |
749 @param nodeIdentifier(unicode): identifier of the pubsub node | |
750 @param schema(domish.Element, None): schema to set | |
751 None to remove schema | |
752 @param requestor(jid.JID): entity doing the request | |
753 @param pep(bool): True if it's a PEP request | |
754 @param recipient(jid.JID, None): recipient of the PEP request | |
755 """ | |
756 if not nodeIdentifier: | |
757 return defer.fail(error.NoRootNode()) | |
758 | |
759 d = self.storage.getNode(nodeIdentifier, pep, recipient) | |
760 d.addCallback(_getAffiliation, requestor) | |
761 d.addCallback(self._doSetNodeSchema, requestor, schema) | |
762 return d | |
763 | |
764 def _doSetNodeSchema(self, result, requestor, schema): | |
765 node, affiliation = result | |
766 | |
767 if affiliation != 'owner' and not self.isAdmin(requestor): | |
768 raise error.Forbidden() | |
769 | |
770 return node.setSchema(schema) | |
771 | |
772 def getAffiliations(self, entity, nodeIdentifier, pep, recipient): | 774 def getAffiliations(self, entity, nodeIdentifier, pep, recipient): |
773 return self.storage.getAffiliations(entity, nodeIdentifier, pep, recipient) | 775 return self.storage.getAffiliations(entity, nodeIdentifier, pep, recipient) |
774 | 776 |
775 def getAffiliationsOwner(self, nodeIdentifier, requestor, pep, recipient): | 777 def getAffiliationsOwner(self, nodeIdentifier, requestor, pep, recipient): |
776 d = self.storage.getNode(nodeIdentifier, pep, recipient) | 778 d = self.storage.getNode(nodeIdentifier, pep, recipient) |
868 d.addCallback(lambda _: None) | 870 d.addCallback(lambda _: None) |
869 d.addErrback(self.unwrapFirstError) | 871 d.addErrback(self.unwrapFirstError) |
870 return d | 872 return d |
871 | 873 |
872 def filterItemsWithSchema(self, items_data, schema, owner): | 874 def filterItemsWithSchema(self, items_data, schema, owner): |
873 """check schema restriction and remove fields/items if they don't comply | 875 """Check schema restriction and remove fields/items if they don't comply |
874 | 876 |
875 @param items_data(list[ItemData]): items to filter | 877 @param items_data(list[ItemData]): items to filter |
876 items in this list will be modified | 878 items in this list will be modified |
877 @param schema(domish.Element): node schema | 879 @param schema(domish.Element): node schema |
878 @param owner(bool): True is requestor is a owner of the node | 880 @param owner(bool): True is requestor is a owner of the node |
1022 ext_data) | 1024 ext_data) |
1023 defer.returnValue(ids) | 1025 defer.returnValue(ids) |
1024 | 1026 |
1025 def getItems(self, nodeIdentifier, requestor, recipient, maxItems=None, | 1027 def getItems(self, nodeIdentifier, requestor, recipient, maxItems=None, |
1026 itemIdentifiers=None, ext_data=None): | 1028 itemIdentifiers=None, ext_data=None): |
1027 d = self.getItemsData(nodeIdentifier, requestor, recipient, maxItems, itemIdentifiers, ext_data) | 1029 d = defer.ensureDeferred( |
1030 self.getItemsData( | |
1031 nodeIdentifier, requestor, recipient, maxItems, itemIdentifiers, ext_data | |
1032 ) | |
1033 ) | |
1028 d.addCallback(lambda items_data: [item_data.item for item_data in items_data]) | 1034 d.addCallback(lambda items_data: [item_data.item for item_data in items_data]) |
1029 return d | 1035 return d |
1030 | 1036 |
1031 @defer.inlineCallbacks | 1037 @defer.inlineCallbacks |
1032 def getOwnerRoster(self, node, owners=None): | 1038 def getOwnerRoster(self, node, owners=None): |
1047 owner_jid = owner_jid.full(), | 1053 owner_jid = owner_jid.full(), |
1048 msg = e)) | 1054 msg = e)) |
1049 return | 1055 return |
1050 defer.returnValue(roster) | 1056 defer.returnValue(roster) |
1051 | 1057 |
1052 @defer.inlineCallbacks | 1058 async def getItemsData(self, nodeIdentifier, requestor, recipient, maxItems=None, |
1053 def getItemsData(self, nodeIdentifier, requestor, recipient, maxItems=None, | |
1054 itemIdentifiers=None, ext_data=None): | 1059 itemIdentifiers=None, ext_data=None): |
1055 """like getItems but return the whole ItemData""" | 1060 """like getItems but return the whole ItemData""" |
1056 if maxItems == 0: | 1061 if maxItems == 0: |
1057 log.msg("WARNING: maxItems=0 on items retrieval") | 1062 log.msg("WARNING: maxItems=0 on items retrieval") |
1058 defer.returnValue([]) | 1063 return [] |
1059 | 1064 |
1060 if ext_data is None: | 1065 if ext_data is None: |
1061 ext_data = {} | 1066 ext_data = {} |
1062 node = yield self.storage.getNode(nodeIdentifier, ext_data.get('pep', False), recipient) | 1067 node = await self.storage.getNode(nodeIdentifier, ext_data.get('pep', False), recipient) |
1063 try: | 1068 try: |
1064 affiliation, owner, roster, access_model = yield self.checkNodeAccess(node, requestor) | 1069 affiliation, owner, roster, access_model = await self.checkNodeAccess(node, requestor) |
1065 except error.NotLeafNodeError: | 1070 except error.NotLeafNodeError: |
1066 defer.returnValue([]) | 1071 return [] |
1067 | 1072 |
1068 # at this point node access is checked | 1073 # at this point node access is checked |
1069 | 1074 |
1070 if owner: | 1075 if owner: |
1071 # requestor_groups is only used in restricted access | 1076 # requestor_groups is only used in restricted access |
1072 requestor_groups = None | 1077 requestor_groups = None |
1073 else: | 1078 else: |
1074 if roster is None: | 1079 if roster is None: |
1075 # FIXME: publisher roster should be used, not owner | 1080 # FIXME: publisher roster should be used, not owner |
1076 roster = yield self.getOwnerRoster(node) | 1081 roster = await self.getOwnerRoster(node) |
1077 if roster is None: | 1082 if roster is None: |
1078 roster = {} | 1083 roster = {} |
1079 roster_item = roster.get(requestor.userhostJID()) | 1084 roster_item = roster.get(requestor.userhostJID()) |
1080 requestor_groups = tuple(roster_item.groups) if roster_item else tuple() | 1085 requestor_groups = tuple(roster_item.groups) if roster_item else tuple() |
1081 | 1086 |
1082 if itemIdentifiers: | 1087 if itemIdentifiers: |
1083 items_data = yield node.getItemsById(requestor_groups, owner, itemIdentifiers) | 1088 items_data = await node.getItemsById(requestor_groups, owner, itemIdentifiers) |
1084 else: | 1089 else: |
1085 items_data = yield node.getItems(requestor_groups, owner, maxItems, ext_data) | 1090 items_data = await node.getItems(requestor_groups, owner, maxItems, ext_data) |
1086 | 1091 |
1087 if owner: | 1092 if owner: |
1088 # Add item config data form to items with roster access model | 1093 # Add item config data form to items with roster access model |
1089 for item_data in items_data: | 1094 for item_data in items_data: |
1090 if item_data.access_model == const.VAL_AMODEL_OPEN: | 1095 if item_data.access_model == const.VAL_AMODEL_OPEN: |
1104 | 1109 |
1105 schema = node.getSchema() | 1110 schema = node.getSchema() |
1106 if schema is not None: | 1111 if schema is not None: |
1107 self.filterItemsWithSchema(items_data, schema, owner) | 1112 self.filterItemsWithSchema(items_data, schema, owner) |
1108 | 1113 |
1109 yield self._items_rsm(items_data, node, requestor_groups, owner, itemIdentifiers, ext_data) | 1114 await self._items_rsm( |
1110 defer.returnValue(items_data) | 1115 items_data, node, requestor_groups, owner, itemIdentifiers, ext_data) |
1116 return items_data | |
1111 | 1117 |
1112 def _setCount(self, value, response): | 1118 def _setCount(self, value, response): |
1113 response.count = value | 1119 response.count = value |
1114 | 1120 |
1115 def _setIndex(self, value, response, adjust): | 1121 def _setIndex(self, value, response, adjust): |
1119 @param response(RSMResponse): response instance to fill | 1125 @param response(RSMResponse): response instance to fill |
1120 @param adjust(int): adjustement term (i.e. difference between reference index and first item of the result) | 1126 @param adjust(int): adjustement term (i.e. difference between reference index and first item of the result) |
1121 """ | 1127 """ |
1122 response.index = value + adjust | 1128 response.index = value + adjust |
1123 | 1129 |
1124 def _items_rsm(self, items_data, node, authorized_groups, owner, | 1130 async def _items_rsm(self, items_data, node, authorized_groups, owner, |
1125 itemIdentifiers, ext_data): | 1131 itemIdentifiers, ext_data): |
1126 # FIXME: move this to a separate module | 1132 # FIXME: move this to a separate module |
1127 # TODO: Index can be optimized by keeping a cache of the last RSM request | 1133 # TODO: Index can be optimized by keeping a cache of the last RSM request |
1128 # An other optimisation would be to look for index first and use it as offset | 1134 # An other optimisation would be to look for index first and use it as offset |
1129 try: | 1135 try: |
1160 d_list.append(d_index) | 1166 d_list.append(d_index) |
1161 else: | 1167 else: |
1162 # the first page was requested | 1168 # the first page was requested |
1163 response.index = 0 | 1169 response.index = 0 |
1164 | 1170 |
1165 def render(result): | 1171 |
1166 if rsm_request.before == '': | 1172 await defer.DeferredList(d_list) |
1167 # the last page was requested | 1173 |
1168 response.index = response.count - len(items_data) | 1174 if rsm_request.before == '': |
1169 items_data.append(container.ItemData(response.toElement())) | 1175 # the last page was requested |
1170 return items_data | 1176 response.index = response.count - len(items_data) |
1171 | 1177 |
1172 return defer.DeferredList(d_list).addCallback(render) | 1178 items_data.append(container.ItemData(response.toElement())) |
1173 | 1179 |
1174 def retractItem(self, nodeIdentifier, itemIdentifiers, requestor, notify, pep, recipient): | 1180 return items_data |
1175 d = self.storage.getNode(nodeIdentifier, pep, recipient) | 1181 |
1176 d.addCallback(_getAffiliation, requestor) | 1182 async def retractItem(self, nodeIdentifier, itemIdentifiers, requestor, notify, pep, recipient): |
1177 d.addCallback(self._doRetract, itemIdentifiers, requestor, notify, pep, recipient) | 1183 node = await self.storage.getNode(nodeIdentifier, pep, recipient) |
1178 return d | 1184 node, affiliation = await _getAffiliation(node, requestor) |
1179 | 1185 |
1180 def _doRetract(self, result, itemIdentifiers, requestor, notify, pep, recipient): | |
1181 node, affiliation = result | |
1182 persistItems = node.getConfiguration()[const.OPT_PERSIST_ITEMS] | 1186 persistItems = node.getConfiguration()[const.OPT_PERSIST_ITEMS] |
1183 | 1187 |
1184 if not persistItems: | 1188 if not persistItems: |
1185 raise error.NodeNotPersistent() | 1189 raise error.NodeNotPersistent() |
1186 | 1190 |
1187 # we need to get the items before removing them, for the notifications | 1191 # we need to get the items before removing them, for the notifications |
1188 | 1192 |
1189 def removeItems(items_data): | 1193 if affiliation not in ['owner', 'publisher']: |
1190 """Remove the items and keep only actually removed ones in items_data""" | 1194 # the requestor doesn't have right to retract on the whole node |
1191 d = node.removeItems(itemIdentifiers) | 1195 # we check if he is a publisher for all items he wants to retract |
1192 d.addCallback(lambda removed: [item_data for item_data in items_data if item_data.item["id"] in removed]) | 1196 # and forbid the retraction else. |
1193 return d | 1197 publishers_map = await node.getItemsPublishers(itemIdentifiers) |
1194 | |
1195 def checkPublishers(publishers_map): | |
1196 """Called when requestor is neither owner neither publisher of the Node | |
1197 | |
1198 We check that requestor is publisher of all the items he wants to retract | |
1199 and raise error.Forbidden if it is not the case | |
1200 """ | |
1201 # TODO: the behaviour should be configurable (per node ?) | 1198 # TODO: the behaviour should be configurable (per node ?) |
1202 if (any((requestor.userhostJID() != publisher.userhostJID() | 1199 if (any((requestor.userhostJID() != publisher.userhostJID() |
1203 for publisher in publishers_map.values())) | 1200 for publisher in publishers_map.values())) |
1204 and not self.isAdmin(requestor) | 1201 and not self.isAdmin(requestor) |
1205 ): | 1202 ): |
1206 raise error.Forbidden() | 1203 raise error.Forbidden() |
1207 | 1204 |
1208 if affiliation in ['owner', 'publisher']: | 1205 items_data = await node.getItemsById(None, True, itemIdentifiers) |
1209 # the requestor is owner or publisher of the node | 1206 # Remove the items and keep only actually removed ones in items_data |
1210 # he can retract what he wants | 1207 removed = await node.removeItems(itemIdentifiers) |
1211 d = defer.succeed(None) | 1208 retracted_items = [ |
1212 else: | 1209 item_data for item_data in items_data if item_data.item["id"] in removed |
1213 # the requestor doesn't have right to retract on the whole node | 1210 ] |
1214 # we check if he is a publisher for all items he wants to retract | 1211 if nodeIdentifier.startswith(const.FDP_TEMPLATE_PREFIX): |
1215 # and forbid the retraction else. | 1212 app_ns = nodeIdentifier[len(const.FDP_TEMPLATE_PREFIX):] |
1216 d = node.getItemsPublishers(itemIdentifiers) | 1213 submitted_node_id = f"{const.FDP_SUBMITTED_PREFIX}{app_ns}" |
1217 d.addCallback(checkPublishers) | 1214 submitted_node = await self.storage.getNode(submitted_node_id, pep, recipient) |
1218 d.addCallback(lambda dummy: node.getItemsById(None, True, itemIdentifiers)) | 1215 schema_items = await node.getItems([], True, maxItems=1) |
1219 d.addCallback(removeItems) | 1216 if not schema_items: |
1217 # no more schema, we need to remove it from submitted node | |
1218 submitted_node = await self._getFDPSubmittedNode( | |
1219 nodeIdentifier, pep, recipient) | |
1220 if submitted_node is not None: | |
1221 submitted_node.setSchema(None) | |
1222 else: | |
1223 # not all items have been removed from the FDP template node, we check | |
1224 # if the last one correspond to current submitted node schema, and if not, | |
1225 # we update it. | |
1226 current_schema = next(schema_items[0].item.elements( | |
1227 data_form.NS_X_DATA, 'x')) | |
1228 if current_schema == node.schema: | |
1229 submitted_node = await self._getFDPSubmittedNode( | |
1230 nodeIdentifier, pep, recipient) | |
1231 if submitted_node is not None: | |
1232 submitted_node.setSchema(current_schema) | |
1220 | 1233 |
1221 if notify: | 1234 if notify: |
1222 d.addCallback(self._doNotifyRetraction, node, pep, recipient) | 1235 await self._doNotifyRetraction(retracted_items, node, pep, recipient) |
1223 return d | 1236 |
1237 return retracted_items | |
1224 | 1238 |
1225 def _doNotifyRetraction(self, items_data, node, pep, recipient): | 1239 def _doNotifyRetraction(self, items_data, node, pep, recipient): |
1226 self.dispatch({'items_data': items_data, | 1240 self.dispatch({'items_data': items_data, |
1227 'node': node, | 1241 'node': node, |
1228 'pep': pep, | 1242 'pep': pep, |
1262 d = self.storage.getNode(nodeIdentifier, pep, recipient) | 1276 d = self.storage.getNode(nodeIdentifier, pep, recipient) |
1263 d.addCallback(lambda node: node.getSubscriptions('subscribed')) | 1277 d.addCallback(lambda node: node.getSubscriptions('subscribed')) |
1264 d.addCallback(cb) | 1278 d.addCallback(cb) |
1265 return d | 1279 return d |
1266 | 1280 |
1267 def deleteNode(self, nodeIdentifier, requestor, pep, recipient, redirectURI=None): | 1281 async def deleteNode(self, nodeIdentifier, requestor, pep, recipient, redirectURI=None): |
1268 d = self.storage.getNode(nodeIdentifier, pep, recipient) | 1282 node = await self.storage.getNode(nodeIdentifier, pep, recipient) |
1269 d.addCallback(_getAffiliation, requestor) | 1283 node, affiliation = await _getAffiliation(node, requestor) |
1270 d.addCallback(self._doPreDelete, requestor, redirectURI, pep, recipient) | |
1271 return d | |
1272 | |
1273 def _doPreDelete(self, result, requestor, redirectURI, pep, recipient): | |
1274 node, affiliation = result | |
1275 | 1284 |
1276 if affiliation != 'owner' and not self.isAdmin(requestor): | 1285 if affiliation != 'owner' and not self.isAdmin(requestor): |
1277 raise error.Forbidden() | 1286 raise error.Forbidden() |
1278 | 1287 |
1279 data = {'node': node, | 1288 data = { |
1280 'redirectURI': redirectURI} | 1289 'node': node, |
1290 'redirectURI': redirectURI | |
1291 } | |
1281 | 1292 |
1282 d = defer.DeferredList([cb(data, pep, recipient) | 1293 d = defer.DeferredList([cb(data, pep, recipient) |
1283 for cb in self._callbackList], | 1294 for cb in self._callbackList], |
1284 consumeErrors=1) | 1295 consumeErrors=1) |
1285 d.addCallback(self._doDelete, node.nodeDbId) | 1296 result = await d |
1286 | |
1287 def _doDelete(self, result, nodeDbId): | |
1288 dl = [] | 1297 dl = [] |
1289 for succeeded, r in result: | 1298 for succeeded, r in result: |
1290 if succeeded and r: | 1299 if succeeded and r: |
1291 dl.extend(r) | 1300 dl.extend(r) |
1292 | 1301 |
1293 d = self.storage.deleteNodeByDbId(nodeDbId) | 1302 await self.storage.deleteNodeByDbId(node.nodeDbId) |
1294 d.addCallback(self._doNotifyDelete, dl) | 1303 |
1295 | 1304 if node.nodeIdentifier.startswith(const.FDP_TEMPLATE_PREFIX): |
1296 return d | 1305 # we need to delete the associated schema |
1297 | 1306 submitted_node = await self._getFDPSubmittedNode( |
1298 def _doNotifyDelete(self, result, dl): | 1307 node.nodeIdentifier, pep, recipient) |
1308 if submitted_node is not None: | |
1309 await submitted_node.setSchema(None) | |
1310 | |
1299 for d in dl: | 1311 for d in dl: |
1300 d.callback(None) | 1312 d.callback(None) |
1301 | 1313 |
1302 | 1314 |
1303 class PubSubResourceFromBackend(pubsub.PubSubResource): | 1315 class PubSubResourceFromBackend(pubsub.PubSubResource): |
1800 request.itemIdentifiers, | 1812 request.itemIdentifiers, |
1801 ext_data) | 1813 ext_data) |
1802 return d.addErrback(self._mapErrors) | 1814 return d.addErrback(self._mapErrors) |
1803 | 1815 |
1804 def retract(self, request): | 1816 def retract(self, request): |
1805 d = self.backend.retractItem(request.nodeIdentifier, | 1817 d = defer.ensureDeferred( |
1818 self.backend.retractItem(request.nodeIdentifier, | |
1806 request.itemIdentifiers, | 1819 request.itemIdentifiers, |
1807 request.sender, | 1820 request.sender, |
1808 request.notify, | 1821 request.notify, |
1809 self._isPep(request), | 1822 self._isPep(request), |
1810 request.recipient) | 1823 request.recipient) |
1824 ) | |
1811 return d.addErrback(self._mapErrors) | 1825 return d.addErrback(self._mapErrors) |
1812 | 1826 |
1813 def purge(self, request): | 1827 def purge(self, request): |
1814 d = self.backend.purgeNode(request.nodeIdentifier, | 1828 d = self.backend.purgeNode(request.nodeIdentifier, |
1815 request.sender, | 1829 request.sender, |
1816 self._isPep(request), | 1830 self._isPep(request), |
1817 request.recipient) | 1831 request.recipient) |
1818 return d.addErrback(self._mapErrors) | 1832 return d.addErrback(self._mapErrors) |
1819 | 1833 |
1820 def delete(self, request): | 1834 def delete(self, request): |
1821 d = self.backend.deleteNode(request.nodeIdentifier, | 1835 d = defer.ensureDeferred( |
1836 self.backend.deleteNode(request.nodeIdentifier, | |
1822 request.sender, | 1837 request.sender, |
1823 self._isPep(request), | 1838 self._isPep(request), |
1824 request.recipient) | 1839 request.recipient) |
1840 ) | |
1825 return d.addErrback(self._mapErrors) | 1841 return d.addErrback(self._mapErrors) |
1826 | 1842 |
1827 components.registerAdapter(PubSubResourceFromBackend, | 1843 components.registerAdapter(PubSubResourceFromBackend, |
1828 iidavoll.IBackendService, | 1844 iidavoll.IBackendService, |
1829 iwokkel.IPubSubResource) | 1845 iwokkel.IPubSubResource) |
1834 class ExtraDiscoHandler(XMPPHandler): | 1850 class ExtraDiscoHandler(XMPPHandler): |
1835 # see comment in twisted/plugins/pubsub.py | 1851 # see comment in twisted/plugins/pubsub.py |
1836 # FIXME: upstream must be fixed so we can use custom (non pubsub#) disco features | 1852 # FIXME: upstream must be fixed so we can use custom (non pubsub#) disco features |
1837 | 1853 |
1838 def getDiscoInfo(self, requestor, service, nodeIdentifier=''): | 1854 def getDiscoInfo(self, requestor, service, nodeIdentifier=''): |
1839 return [disco.DiscoFeature(pubsub.NS_ORDER_BY)] | 1855 return [disco.DiscoFeature(pubsub.NS_ORDER_BY), disco.DiscoFeature(const.NS_FDP)] |
1840 | 1856 |
1841 def getDiscoItems(self, requestor, service, nodeIdentifier=''): | 1857 def getDiscoItems(self, requestor, service, nodeIdentifier=''): |
1842 return [] | 1858 return [] |