Mercurial > libervia-pubsub
comparison sat_pubsub/backend.py @ 455:0b5233981671
backend: fix `delete` notification + add `purge` notification
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 02 Aug 2021 21:58:17 +0200 |
parents | 1a179ad10125 |
children | cebcb7f56889 |
comparison
equal
deleted
inserted
replaced
454:7f1394bb96db | 455:0b5233981671 |
---|---|
220 } | 220 } |
221 | 221 |
222 def __init__(self, storage, config): | 222 def __init__(self, storage, config): |
223 utility.EventDispatcher.__init__(self) | 223 utility.EventDispatcher.__init__(self) |
224 self.storage = storage | 224 self.storage = storage |
225 self._callbackList = [] | |
226 self.config = config | 225 self.config = config |
227 self.admins = config['admins_jids_list'] | 226 self.admins = config['admins_jids_list'] |
228 d = self.storage.getFTSLanguages() | 227 d = self.storage.getFTSLanguages() |
229 d.addCallbacks(self._getFTSLanguagesCb, self._getFTSLanguagesEb) | 228 d.addCallbacks(self._getFTSLanguagesCb, self._getFTSLanguagesEb) |
230 | 229 |
594 def registerPublishNotifier(self, observerfn, *args, **kwargs): | 593 def registerPublishNotifier(self, observerfn, *args, **kwargs): |
595 self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs) | 594 self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs) |
596 | 595 |
597 def registerRetractNotifier(self, observerfn, *args, **kwargs): | 596 def registerRetractNotifier(self, observerfn, *args, **kwargs): |
598 self.addObserver('//event/pubsub/retract', observerfn, *args, **kwargs) | 597 self.addObserver('//event/pubsub/retract', observerfn, *args, **kwargs) |
598 | |
599 def registerDeleteNotifier(self, observerfn, *args, **kwargs): | |
600 self.addObserver('//event/pubsub/delete', observerfn, *args, **kwargs) | |
601 | |
602 def registerPurgeNotifier(self, observerfn, *args, **kwargs): | |
603 self.addObserver('//event/pubsub/purge', observerfn, *args, **kwargs) | |
599 | 604 |
600 def subscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient): | 605 def subscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient): |
601 subscriberEntity = subscriber.userhostJID() | 606 subscriberEntity = subscriber.userhostJID() |
602 if subscriberEntity != requestor.userhostJID(): | 607 if subscriberEntity != requestor.userhostJID(): |
603 return defer.fail(error.Forbidden()) | 608 return defer.fail(error.Forbidden()) |
1240 'node': node, | 1245 'node': node, |
1241 'pep': pep, | 1246 'pep': pep, |
1242 'recipient': recipient}, | 1247 'recipient': recipient}, |
1243 '//event/pubsub/retract') | 1248 '//event/pubsub/retract') |
1244 | 1249 |
1245 def purgeNode(self, nodeIdentifier, requestor, pep, recipient): | 1250 async def purgeNode(self, nodeIdentifier, requestor, pep, recipient): |
1246 d = self.storage.getNode(nodeIdentifier, pep, recipient) | 1251 node = await self.storage.getNode(nodeIdentifier, pep, recipient) |
1247 d.addCallback(_getAffiliation, requestor) | 1252 node, affiliation = await _getAffiliation(node, requestor) |
1248 d.addCallback(self._doPurge, requestor) | |
1249 return d | |
1250 | |
1251 def _doPurge(self, result, requestor): | |
1252 node, affiliation = result | |
1253 persistItems = node.getConfiguration()[const.OPT_PERSIST_ITEMS] | 1253 persistItems = node.getConfiguration()[const.OPT_PERSIST_ITEMS] |
1254 | 1254 |
1255 if affiliation != 'owner' and not self.isAdmin(requestor): | 1255 if affiliation != 'owner' and not self.isAdmin(requestor): |
1256 raise error.Forbidden() | 1256 raise error.Forbidden() |
1257 | 1257 |
1258 if not persistItems: | 1258 if not persistItems: |
1259 raise error.NodeNotPersistent() | 1259 raise error.NodeNotPersistent() |
1260 | 1260 |
1261 d = node.purge() | 1261 await node.purge() |
1262 d.addCallback(self._doNotifyPurge, node.nodeIdentifier) | 1262 |
1263 return d | 1263 subscribers = await self.getSubscribers(nodeIdentifier, pep, recipient) |
1264 | 1264 |
1265 def _doNotifyPurge(self, result, nodeIdentifier): | 1265 # now we can send notifications |
1266 self.dispatch(nodeIdentifier, '//event/pubsub/purge') | 1266 self.dispatch( |
1267 | 1267 { |
1268 def registerPreDelete(self, preDeleteFn): | 1268 'node': node, |
1269 self._callbackList.append(preDeleteFn) | 1269 'pep': pep, |
1270 'recipient': recipient, | |
1271 'subscribers': subscribers, | |
1272 }, | |
1273 '//event/pubsub/purge' | |
1274 ) | |
1270 | 1275 |
1271 def getSubscribers(self, nodeIdentifier, pep, recipient): | 1276 def getSubscribers(self, nodeIdentifier, pep, recipient): |
1272 def cb(subscriptions): | 1277 def cb(subscriptions): |
1273 return [subscription.subscriber for subscription in subscriptions] | 1278 return [subscription.subscriber for subscription in subscriptions] |
1274 | 1279 |
1275 d = self.storage.getNode(nodeIdentifier, pep, recipient) | 1280 d = self.storage.getNode(nodeIdentifier, pep, recipient) |
1276 d.addCallback(lambda node: node.getSubscriptions('subscribed')) | 1281 d.addCallback(lambda node: node.getSubscriptions('subscribed')) |
1277 d.addCallback(cb) | 1282 d.addCallback(cb) |
1278 return d | 1283 return d |
1279 | 1284 |
1280 async def deleteNode(self, nodeIdentifier, requestor, pep, recipient, redirectURI=None): | 1285 async def deleteNode( |
1286 self, | |
1287 nodeIdentifier: str, | |
1288 requestor: jid.JID, | |
1289 pep: bool, | |
1290 recipient: jid.JID, | |
1291 redirectURI: str = None | |
1292 ) -> None: | |
1281 node = await self.storage.getNode(nodeIdentifier, pep, recipient) | 1293 node = await self.storage.getNode(nodeIdentifier, pep, recipient) |
1282 node, affiliation = await _getAffiliation(node, requestor) | 1294 node, affiliation = await _getAffiliation(node, requestor) |
1283 | 1295 |
1284 if affiliation != 'owner' and not self.isAdmin(requestor): | 1296 if affiliation != 'owner' and not self.isAdmin(requestor): |
1285 raise error.Forbidden() | 1297 raise error.Forbidden() |
1286 | 1298 |
1287 data = { | 1299 # we have to get subscribers (for notifications) before the node is deleted |
1288 'node': node, | 1300 subscribers = await self.getSubscribers(nodeIdentifier, pep, recipient) |
1289 'redirectURI': redirectURI | |
1290 } | |
1291 | |
1292 d = defer.DeferredList([cb(data, pep, recipient) | |
1293 for cb in self._callbackList], | |
1294 consumeErrors=1) | |
1295 result = await d | |
1296 dl = [] | |
1297 for succeeded, r in result: | |
1298 if succeeded and r: | |
1299 dl.extend(r) | |
1300 | 1301 |
1301 await self.storage.deleteNodeByDbId(node.nodeDbId) | 1302 await self.storage.deleteNodeByDbId(node.nodeDbId) |
1302 | 1303 |
1303 if node.nodeIdentifier.startswith(const.FDP_TEMPLATE_PREFIX): | 1304 if node.nodeIdentifier.startswith(const.FDP_TEMPLATE_PREFIX): |
1304 # we need to delete the associated schema | 1305 # we need to delete the associated schema |
1305 submitted_node = await self._getFDPSubmittedNode( | 1306 submitted_node = await self._getFDPSubmittedNode( |
1306 node.nodeIdentifier, pep, recipient) | 1307 node.nodeIdentifier, pep, recipient) |
1307 if submitted_node is not None: | 1308 if submitted_node is not None: |
1308 await submitted_node.setSchema(None) | 1309 await submitted_node.setSchema(None) |
1309 | 1310 |
1310 for d in dl: | 1311 # now we can send notifications |
1311 d.callback(None) | 1312 self.dispatch( |
1313 { | |
1314 'node': node, | |
1315 'pep': pep, | |
1316 'recipient': recipient, | |
1317 'redirectURI': redirectURI, | |
1318 'subscribers': subscribers, | |
1319 }, | |
1320 '//event/pubsub/delete' | |
1321 ) | |
1312 | 1322 |
1313 | 1323 |
1314 class PubSubResourceFromBackend(pubsub.PubSubResource): | 1324 class PubSubResourceFromBackend(pubsub.PubSubResource): |
1315 """ | 1325 """ |
1316 Adapts a backend to an xmpp publish-subscribe service. | 1326 Adapts a backend to an xmpp publish-subscribe service. |
1374 | 1384 |
1375 self.__class__.discoIdentity.name = backend.config["service_name"] | 1385 self.__class__.discoIdentity.name = backend.config["service_name"] |
1376 | 1386 |
1377 self.backend.registerPublishNotifier(self._notifyPublish) | 1387 self.backend.registerPublishNotifier(self._notifyPublish) |
1378 self.backend.registerRetractNotifier(self._notifyRetract) | 1388 self.backend.registerRetractNotifier(self._notifyRetract) |
1379 self.backend.registerPreDelete(self._preDelete) | 1389 self.backend.registerDeleteNotifier(self._notifyDelete) |
1390 self.backend.registerPurgeNotifier(self._notifyPurge) | |
1380 | 1391 |
1381 if self.backend.supportsAutoCreate(): | 1392 if self.backend.supportsAutoCreate(): |
1382 self.features.append("auto-create") | 1393 self.features.append("auto-create") |
1383 | 1394 |
1384 if self.backend.supportsPublishOptions(): | 1395 if self.backend.supportsPublishOptions(): |
1565 if allowed_items: | 1576 if allowed_items: |
1566 notifications_filtered.append((subscriber, subscriptions, allowed_items)) | 1577 notifications_filtered.append((subscriber, subscriptions, allowed_items)) |
1567 | 1578 |
1568 defer.returnValue((owners, notifications_filtered)) | 1579 defer.returnValue((owners, notifications_filtered)) |
1569 | 1580 |
1570 def _preDelete(self, data, pep, recipient): | 1581 async def _aNotifyDelete(self, data): |
1571 nodeIdentifier = data['node'].nodeIdentifier | 1582 nodeIdentifier = data['node'].nodeIdentifier |
1583 pep = data['pep'] | |
1584 recipient = data['recipient'] | |
1572 redirectURI = data.get('redirectURI', None) | 1585 redirectURI = data.get('redirectURI', None) |
1573 d = self.backend.getSubscribers(nodeIdentifier, pep, recipient) | 1586 subscribers = data['subscribers'] |
1574 d.addCallback(lambda subscribers: self.pubsubService.notifyDelete( | 1587 if pep: |
1575 self.serviceJID, | 1588 self.backend.privilege.notifyDelete( |
1576 nodeIdentifier, | 1589 recipient, |
1577 subscribers, | 1590 nodeIdentifier, |
1578 redirectURI)) | 1591 subscribers, |
1579 return d | 1592 redirectURI |
1593 ) | |
1594 else: | |
1595 self.pubsubService.notifyDelete( | |
1596 self.serviceJID, | |
1597 nodeIdentifier, | |
1598 subscribers, | |
1599 redirectURI | |
1600 ) | |
1601 | |
1602 def _notifyDelete(self, data): | |
1603 d = defer.ensureDeferred(self._aNotifyDelete(data)) | |
1604 d.addErrback(log.err) | |
1605 | |
1606 async def _aNotifyPurge(self, data): | |
1607 nodeIdentifier = data['node'].nodeIdentifier | |
1608 pep = data['pep'] | |
1609 recipient = data['recipient'] | |
1610 subscribers = data['subscribers'] | |
1611 if pep: | |
1612 self.backend.privilege.notifyPurge( | |
1613 recipient, | |
1614 nodeIdentifier, | |
1615 subscribers, | |
1616 ) | |
1617 else: | |
1618 self.pubsubService.notifyPurge( | |
1619 self.serviceJID, | |
1620 nodeIdentifier, | |
1621 subscribers, | |
1622 ) | |
1623 | |
1624 def _notifyPurge(self, data): | |
1625 d = defer.ensureDeferred(self._aNotifyPurge(data)) | |
1626 d.addErrback(log.err) | |
1580 | 1627 |
1581 def _mapErrors(self, failure): | 1628 def _mapErrors(self, failure): |
1582 e = failure.trap(*list(self._errorMap.keys())) | 1629 e = failure.trap(*list(self._errorMap.keys())) |
1583 | 1630 |
1584 condition, pubsubCondition, feature = self._errorMap[e] | 1631 condition, pubsubCondition, feature = self._errorMap[e] |
1824 request.recipient) | 1871 request.recipient) |
1825 ) | 1872 ) |
1826 return d.addErrback(self._mapErrors) | 1873 return d.addErrback(self._mapErrors) |
1827 | 1874 |
1828 def purge(self, request): | 1875 def purge(self, request): |
1829 d = self.backend.purgeNode(request.nodeIdentifier, | 1876 d = defer.ensureDeferred( |
1877 self.backend.purgeNode(request.nodeIdentifier, | |
1830 request.sender, | 1878 request.sender, |
1831 self._isPep(request), | 1879 self._isPep(request), |
1832 request.recipient) | 1880 request.recipient) |
1881 ) | |
1833 return d.addErrback(self._mapErrors) | 1882 return d.addErrback(self._mapErrors) |
1834 | 1883 |
1835 def delete(self, request): | 1884 def delete(self, request): |
1836 d = defer.ensureDeferred( | 1885 d = defer.ensureDeferred( |
1837 self.backend.deleteNode(request.nodeIdentifier, | 1886 self.backend.deleteNode(request.nodeIdentifier, |