comparison sat_pubsub/backend.py @ 455:0b5233981671

backend: fix `delete` notification + add `purge` notification
author Goffi <goffi@goffi.org>
date Mon, 02 Aug 2021 21:58:17 +0200
parents 1a179ad10125
children cebcb7f56889
comparison
equal deleted inserted replaced
454:7f1394bb96db 455:0b5233981671
220 } 220 }
221 221
222 def __init__(self, storage, config): 222 def __init__(self, storage, config):
223 utility.EventDispatcher.__init__(self) 223 utility.EventDispatcher.__init__(self)
224 self.storage = storage 224 self.storage = storage
225 self._callbackList = []
226 self.config = config 225 self.config = config
227 self.admins = config['admins_jids_list'] 226 self.admins = config['admins_jids_list']
228 d = self.storage.getFTSLanguages() 227 d = self.storage.getFTSLanguages()
229 d.addCallbacks(self._getFTSLanguagesCb, self._getFTSLanguagesEb) 228 d.addCallbacks(self._getFTSLanguagesCb, self._getFTSLanguagesEb)
230 229
594 def registerPublishNotifier(self, observerfn, *args, **kwargs): 593 def registerPublishNotifier(self, observerfn, *args, **kwargs):
595 self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs) 594 self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs)
596 595
597 def registerRetractNotifier(self, observerfn, *args, **kwargs): 596 def registerRetractNotifier(self, observerfn, *args, **kwargs):
598 self.addObserver('//event/pubsub/retract', observerfn, *args, **kwargs) 597 self.addObserver('//event/pubsub/retract', observerfn, *args, **kwargs)
598
599 def registerDeleteNotifier(self, observerfn, *args, **kwargs):
600 self.addObserver('//event/pubsub/delete', observerfn, *args, **kwargs)
601
602 def registerPurgeNotifier(self, observerfn, *args, **kwargs):
603 self.addObserver('//event/pubsub/purge', observerfn, *args, **kwargs)
599 604
600 def subscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient): 605 def subscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient):
601 subscriberEntity = subscriber.userhostJID() 606 subscriberEntity = subscriber.userhostJID()
602 if subscriberEntity != requestor.userhostJID(): 607 if subscriberEntity != requestor.userhostJID():
603 return defer.fail(error.Forbidden()) 608 return defer.fail(error.Forbidden())
1240 'node': node, 1245 'node': node,
1241 'pep': pep, 1246 'pep': pep,
1242 'recipient': recipient}, 1247 'recipient': recipient},
1243 '//event/pubsub/retract') 1248 '//event/pubsub/retract')
1244 1249
1245 def purgeNode(self, nodeIdentifier, requestor, pep, recipient): 1250 async def purgeNode(self, nodeIdentifier, requestor, pep, recipient):
1246 d = self.storage.getNode(nodeIdentifier, pep, recipient) 1251 node = await self.storage.getNode(nodeIdentifier, pep, recipient)
1247 d.addCallback(_getAffiliation, requestor) 1252 node, affiliation = await _getAffiliation(node, requestor)
1248 d.addCallback(self._doPurge, requestor)
1249 return d
1250
1251 def _doPurge(self, result, requestor):
1252 node, affiliation = result
1253 persistItems = node.getConfiguration()[const.OPT_PERSIST_ITEMS] 1253 persistItems = node.getConfiguration()[const.OPT_PERSIST_ITEMS]
1254 1254
1255 if affiliation != 'owner' and not self.isAdmin(requestor): 1255 if affiliation != 'owner' and not self.isAdmin(requestor):
1256 raise error.Forbidden() 1256 raise error.Forbidden()
1257 1257
1258 if not persistItems: 1258 if not persistItems:
1259 raise error.NodeNotPersistent() 1259 raise error.NodeNotPersistent()
1260 1260
1261 d = node.purge() 1261 await node.purge()
1262 d.addCallback(self._doNotifyPurge, node.nodeIdentifier) 1262
1263 return d 1263 subscribers = await self.getSubscribers(nodeIdentifier, pep, recipient)
1264 1264
1265 def _doNotifyPurge(self, result, nodeIdentifier): 1265 # now we can send notifications
1266 self.dispatch(nodeIdentifier, '//event/pubsub/purge') 1266 self.dispatch(
1267 1267 {
1268 def registerPreDelete(self, preDeleteFn): 1268 'node': node,
1269 self._callbackList.append(preDeleteFn) 1269 'pep': pep,
1270 'recipient': recipient,
1271 'subscribers': subscribers,
1272 },
1273 '//event/pubsub/purge'
1274 )
1270 1275
1271 def getSubscribers(self, nodeIdentifier, pep, recipient): 1276 def getSubscribers(self, nodeIdentifier, pep, recipient):
1272 def cb(subscriptions): 1277 def cb(subscriptions):
1273 return [subscription.subscriber for subscription in subscriptions] 1278 return [subscription.subscriber for subscription in subscriptions]
1274 1279
1275 d = self.storage.getNode(nodeIdentifier, pep, recipient) 1280 d = self.storage.getNode(nodeIdentifier, pep, recipient)
1276 d.addCallback(lambda node: node.getSubscriptions('subscribed')) 1281 d.addCallback(lambda node: node.getSubscriptions('subscribed'))
1277 d.addCallback(cb) 1282 d.addCallback(cb)
1278 return d 1283 return d
1279 1284
1280 async def deleteNode(self, nodeIdentifier, requestor, pep, recipient, redirectURI=None): 1285 async def deleteNode(
1286 self,
1287 nodeIdentifier: str,
1288 requestor: jid.JID,
1289 pep: bool,
1290 recipient: jid.JID,
1291 redirectURI: str = None
1292 ) -> None:
1281 node = await self.storage.getNode(nodeIdentifier, pep, recipient) 1293 node = await self.storage.getNode(nodeIdentifier, pep, recipient)
1282 node, affiliation = await _getAffiliation(node, requestor) 1294 node, affiliation = await _getAffiliation(node, requestor)
1283 1295
1284 if affiliation != 'owner' and not self.isAdmin(requestor): 1296 if affiliation != 'owner' and not self.isAdmin(requestor):
1285 raise error.Forbidden() 1297 raise error.Forbidden()
1286 1298
1287 data = { 1299 # we have to get subscribers (for notifications) before the node is deleted
1288 'node': node, 1300 subscribers = await self.getSubscribers(nodeIdentifier, pep, recipient)
1289 'redirectURI': redirectURI
1290 }
1291
1292 d = defer.DeferredList([cb(data, pep, recipient)
1293 for cb in self._callbackList],
1294 consumeErrors=1)
1295 result = await d
1296 dl = []
1297 for succeeded, r in result:
1298 if succeeded and r:
1299 dl.extend(r)
1300 1301
1301 await self.storage.deleteNodeByDbId(node.nodeDbId) 1302 await self.storage.deleteNodeByDbId(node.nodeDbId)
1302 1303
1303 if node.nodeIdentifier.startswith(const.FDP_TEMPLATE_PREFIX): 1304 if node.nodeIdentifier.startswith(const.FDP_TEMPLATE_PREFIX):
1304 # we need to delete the associated schema 1305 # we need to delete the associated schema
1305 submitted_node = await self._getFDPSubmittedNode( 1306 submitted_node = await self._getFDPSubmittedNode(
1306 node.nodeIdentifier, pep, recipient) 1307 node.nodeIdentifier, pep, recipient)
1307 if submitted_node is not None: 1308 if submitted_node is not None:
1308 await submitted_node.setSchema(None) 1309 await submitted_node.setSchema(None)
1309 1310
1310 for d in dl: 1311 # now we can send notifications
1311 d.callback(None) 1312 self.dispatch(
1313 {
1314 'node': node,
1315 'pep': pep,
1316 'recipient': recipient,
1317 'redirectURI': redirectURI,
1318 'subscribers': subscribers,
1319 },
1320 '//event/pubsub/delete'
1321 )
1312 1322
1313 1323
1314 class PubSubResourceFromBackend(pubsub.PubSubResource): 1324 class PubSubResourceFromBackend(pubsub.PubSubResource):
1315 """ 1325 """
1316 Adapts a backend to an xmpp publish-subscribe service. 1326 Adapts a backend to an xmpp publish-subscribe service.
1374 1384
1375 self.__class__.discoIdentity.name = backend.config["service_name"] 1385 self.__class__.discoIdentity.name = backend.config["service_name"]
1376 1386
1377 self.backend.registerPublishNotifier(self._notifyPublish) 1387 self.backend.registerPublishNotifier(self._notifyPublish)
1378 self.backend.registerRetractNotifier(self._notifyRetract) 1388 self.backend.registerRetractNotifier(self._notifyRetract)
1379 self.backend.registerPreDelete(self._preDelete) 1389 self.backend.registerDeleteNotifier(self._notifyDelete)
1390 self.backend.registerPurgeNotifier(self._notifyPurge)
1380 1391
1381 if self.backend.supportsAutoCreate(): 1392 if self.backend.supportsAutoCreate():
1382 self.features.append("auto-create") 1393 self.features.append("auto-create")
1383 1394
1384 if self.backend.supportsPublishOptions(): 1395 if self.backend.supportsPublishOptions():
1565 if allowed_items: 1576 if allowed_items:
1566 notifications_filtered.append((subscriber, subscriptions, allowed_items)) 1577 notifications_filtered.append((subscriber, subscriptions, allowed_items))
1567 1578
1568 defer.returnValue((owners, notifications_filtered)) 1579 defer.returnValue((owners, notifications_filtered))
1569 1580
1570 def _preDelete(self, data, pep, recipient): 1581 async def _aNotifyDelete(self, data):
1571 nodeIdentifier = data['node'].nodeIdentifier 1582 nodeIdentifier = data['node'].nodeIdentifier
1583 pep = data['pep']
1584 recipient = data['recipient']
1572 redirectURI = data.get('redirectURI', None) 1585 redirectURI = data.get('redirectURI', None)
1573 d = self.backend.getSubscribers(nodeIdentifier, pep, recipient) 1586 subscribers = data['subscribers']
1574 d.addCallback(lambda subscribers: self.pubsubService.notifyDelete( 1587 if pep:
1575 self.serviceJID, 1588 self.backend.privilege.notifyDelete(
1576 nodeIdentifier, 1589 recipient,
1577 subscribers, 1590 nodeIdentifier,
1578 redirectURI)) 1591 subscribers,
1579 return d 1592 redirectURI
1593 )
1594 else:
1595 self.pubsubService.notifyDelete(
1596 self.serviceJID,
1597 nodeIdentifier,
1598 subscribers,
1599 redirectURI
1600 )
1601
1602 def _notifyDelete(self, data):
1603 d = defer.ensureDeferred(self._aNotifyDelete(data))
1604 d.addErrback(log.err)
1605
1606 async def _aNotifyPurge(self, data):
1607 nodeIdentifier = data['node'].nodeIdentifier
1608 pep = data['pep']
1609 recipient = data['recipient']
1610 subscribers = data['subscribers']
1611 if pep:
1612 self.backend.privilege.notifyPurge(
1613 recipient,
1614 nodeIdentifier,
1615 subscribers,
1616 )
1617 else:
1618 self.pubsubService.notifyPurge(
1619 self.serviceJID,
1620 nodeIdentifier,
1621 subscribers,
1622 )
1623
1624 def _notifyPurge(self, data):
1625 d = defer.ensureDeferred(self._aNotifyPurge(data))
1626 d.addErrback(log.err)
1580 1627
1581 def _mapErrors(self, failure): 1628 def _mapErrors(self, failure):
1582 e = failure.trap(*list(self._errorMap.keys())) 1629 e = failure.trap(*list(self._errorMap.keys()))
1583 1630
1584 condition, pubsubCondition, feature = self._errorMap[e] 1631 condition, pubsubCondition, feature = self._errorMap[e]
1824 request.recipient) 1871 request.recipient)
1825 ) 1872 )
1826 return d.addErrback(self._mapErrors) 1873 return d.addErrback(self._mapErrors)
1827 1874
1828 def purge(self, request): 1875 def purge(self, request):
1829 d = self.backend.purgeNode(request.nodeIdentifier, 1876 d = defer.ensureDeferred(
1877 self.backend.purgeNode(request.nodeIdentifier,
1830 request.sender, 1878 request.sender,
1831 self._isPep(request), 1879 self._isPep(request),
1832 request.recipient) 1880 request.recipient)
1881 )
1833 return d.addErrback(self._mapErrors) 1882 return d.addErrback(self._mapErrors)
1834 1883
1835 def delete(self, request): 1884 def delete(self, request):
1836 d = defer.ensureDeferred( 1885 d = defer.ensureDeferred(
1837 self.backend.deleteNode(request.nodeIdentifier, 1886 self.backend.deleteNode(request.nodeIdentifier,