Mercurial > libervia-pubsub
diff sat_pubsub/backend.py @ 455:0b5233981671
backend: fix `delete` notification + add `purge` notification
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 02 Aug 2021 21:58:17 +0200 |
parents | 1a179ad10125 |
children | cebcb7f56889 |
line wrap: on
line diff
--- a/sat_pubsub/backend.py Mon Aug 02 21:56:43 2021 +0200 +++ b/sat_pubsub/backend.py Mon Aug 02 21:58:17 2021 +0200 @@ -222,7 +222,6 @@ def __init__(self, storage, config): utility.EventDispatcher.__init__(self) self.storage = storage - self._callbackList = [] self.config = config self.admins = config['admins_jids_list'] d = self.storage.getFTSLanguages() @@ -597,6 +596,12 @@ def registerRetractNotifier(self, observerfn, *args, **kwargs): self.addObserver('//event/pubsub/retract', observerfn, *args, **kwargs) + def registerDeleteNotifier(self, observerfn, *args, **kwargs): + self.addObserver('//event/pubsub/delete', observerfn, *args, **kwargs) + + def registerPurgeNotifier(self, observerfn, *args, **kwargs): + self.addObserver('//event/pubsub/purge', observerfn, *args, **kwargs) + def subscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient): subscriberEntity = subscriber.userhostJID() if subscriberEntity != requestor.userhostJID(): @@ -1242,14 +1247,9 @@ 'recipient': recipient}, '//event/pubsub/retract') - def purgeNode(self, nodeIdentifier, requestor, pep, recipient): - d = self.storage.getNode(nodeIdentifier, pep, recipient) - d.addCallback(_getAffiliation, requestor) - d.addCallback(self._doPurge, requestor) - return d - - def _doPurge(self, result, requestor): - node, affiliation = result + async def purgeNode(self, nodeIdentifier, requestor, pep, recipient): + node = await self.storage.getNode(nodeIdentifier, pep, recipient) + node, affiliation = await _getAffiliation(node, requestor) persistItems = node.getConfiguration()[const.OPT_PERSIST_ITEMS] if affiliation != 'owner' and not self.isAdmin(requestor): @@ -1258,15 +1258,20 @@ if not persistItems: raise error.NodeNotPersistent() - d = node.purge() - d.addCallback(self._doNotifyPurge, node.nodeIdentifier) - return d + await node.purge() + + subscribers = await self.getSubscribers(nodeIdentifier, pep, recipient) - def _doNotifyPurge(self, result, nodeIdentifier): - self.dispatch(nodeIdentifier, '//event/pubsub/purge') - - def registerPreDelete(self, preDeleteFn): - self._callbackList.append(preDeleteFn) + # now we can send notifications + self.dispatch( + { + 'node': node, + 'pep': pep, + 'recipient': recipient, + 'subscribers': subscribers, + }, + '//event/pubsub/purge' + ) def getSubscribers(self, nodeIdentifier, pep, recipient): def cb(subscriptions): @@ -1277,26 +1282,22 @@ d.addCallback(cb) return d - async def deleteNode(self, nodeIdentifier, requestor, pep, recipient, redirectURI=None): + async def deleteNode( + self, + nodeIdentifier: str, + requestor: jid.JID, + pep: bool, + recipient: jid.JID, + redirectURI: str = None + ) -> None: node = await self.storage.getNode(nodeIdentifier, pep, recipient) node, affiliation = await _getAffiliation(node, requestor) if affiliation != 'owner' and not self.isAdmin(requestor): raise error.Forbidden() - data = { - 'node': node, - 'redirectURI': redirectURI - } - - d = defer.DeferredList([cb(data, pep, recipient) - for cb in self._callbackList], - consumeErrors=1) - result = await d - dl = [] - for succeeded, r in result: - if succeeded and r: - dl.extend(r) + # we have to get subscribers (for notifications) before the node is deleted + subscribers = await self.getSubscribers(nodeIdentifier, pep, recipient) await self.storage.deleteNodeByDbId(node.nodeDbId) @@ -1307,8 +1308,17 @@ if submitted_node is not None: await submitted_node.setSchema(None) - for d in dl: - d.callback(None) + # now we can send notifications + self.dispatch( + { + 'node': node, + 'pep': pep, + 'recipient': recipient, + 'redirectURI': redirectURI, + 'subscribers': subscribers, + }, + '//event/pubsub/delete' + ) class PubSubResourceFromBackend(pubsub.PubSubResource): @@ -1376,7 +1386,8 @@ self.backend.registerPublishNotifier(self._notifyPublish) self.backend.registerRetractNotifier(self._notifyRetract) - self.backend.registerPreDelete(self._preDelete) + self.backend.registerDeleteNotifier(self._notifyDelete) + self.backend.registerPurgeNotifier(self._notifyPurge) if self.backend.supportsAutoCreate(): self.features.append("auto-create") @@ -1567,16 +1578,52 @@ defer.returnValue((owners, notifications_filtered)) - def _preDelete(self, data, pep, recipient): + async def _aNotifyDelete(self, data): nodeIdentifier = data['node'].nodeIdentifier + pep = data['pep'] + recipient = data['recipient'] redirectURI = data.get('redirectURI', None) - d = self.backend.getSubscribers(nodeIdentifier, pep, recipient) - d.addCallback(lambda subscribers: self.pubsubService.notifyDelete( - self.serviceJID, - nodeIdentifier, - subscribers, - redirectURI)) - return d + subscribers = data['subscribers'] + if pep: + self.backend.privilege.notifyDelete( + recipient, + nodeIdentifier, + subscribers, + redirectURI + ) + else: + self.pubsubService.notifyDelete( + self.serviceJID, + nodeIdentifier, + subscribers, + redirectURI + ) + + def _notifyDelete(self, data): + d = defer.ensureDeferred(self._aNotifyDelete(data)) + d.addErrback(log.err) + + async def _aNotifyPurge(self, data): + nodeIdentifier = data['node'].nodeIdentifier + pep = data['pep'] + recipient = data['recipient'] + subscribers = data['subscribers'] + if pep: + self.backend.privilege.notifyPurge( + recipient, + nodeIdentifier, + subscribers, + ) + else: + self.pubsubService.notifyPurge( + self.serviceJID, + nodeIdentifier, + subscribers, + ) + + def _notifyPurge(self, data): + d = defer.ensureDeferred(self._aNotifyPurge(data)) + d.addErrback(log.err) def _mapErrors(self, failure): e = failure.trap(*list(self._errorMap.keys())) @@ -1826,10 +1873,12 @@ return d.addErrback(self._mapErrors) def purge(self, request): - d = self.backend.purgeNode(request.nodeIdentifier, + d = defer.ensureDeferred( + self.backend.purgeNode(request.nodeIdentifier, request.sender, self._isPep(request), request.recipient) + ) return d.addErrback(self._mapErrors) def delete(self, request):