diff sat_pubsub/backend.py @ 455:0b5233981671

backend: fix `delete` notification + add `purge` notification
author Goffi <goffi@goffi.org>
date Mon, 02 Aug 2021 21:58:17 +0200
parents 1a179ad10125
children cebcb7f56889
line wrap: on
line diff
--- a/sat_pubsub/backend.py	Mon Aug 02 21:56:43 2021 +0200
+++ b/sat_pubsub/backend.py	Mon Aug 02 21:58:17 2021 +0200
@@ -222,7 +222,6 @@
     def __init__(self, storage, config):
         utility.EventDispatcher.__init__(self)
         self.storage = storage
-        self._callbackList = []
         self.config = config
         self.admins = config['admins_jids_list']
         d = self.storage.getFTSLanguages()
@@ -597,6 +596,12 @@
     def registerRetractNotifier(self, observerfn, *args, **kwargs):
         self.addObserver('//event/pubsub/retract', observerfn, *args, **kwargs)
 
+    def registerDeleteNotifier(self, observerfn, *args, **kwargs):
+        self.addObserver('//event/pubsub/delete', observerfn, *args, **kwargs)
+
+    def registerPurgeNotifier(self, observerfn, *args, **kwargs):
+        self.addObserver('//event/pubsub/purge', observerfn, *args, **kwargs)
+
     def subscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient):
         subscriberEntity = subscriber.userhostJID()
         if subscriberEntity != requestor.userhostJID():
@@ -1242,14 +1247,9 @@
                        'recipient': recipient},
                       '//event/pubsub/retract')
 
-    def purgeNode(self, nodeIdentifier, requestor, pep, recipient):
-        d = self.storage.getNode(nodeIdentifier, pep, recipient)
-        d.addCallback(_getAffiliation, requestor)
-        d.addCallback(self._doPurge, requestor)
-        return d
-
-    def _doPurge(self, result, requestor):
-        node, affiliation = result
+    async def purgeNode(self, nodeIdentifier, requestor, pep, recipient):
+        node = await self.storage.getNode(nodeIdentifier, pep, recipient)
+        node, affiliation = await _getAffiliation(node, requestor)
         persistItems = node.getConfiguration()[const.OPT_PERSIST_ITEMS]
 
         if affiliation != 'owner' and not self.isAdmin(requestor):
@@ -1258,15 +1258,20 @@
         if not persistItems:
             raise error.NodeNotPersistent()
 
-        d = node.purge()
-        d.addCallback(self._doNotifyPurge, node.nodeIdentifier)
-        return d
+        await node.purge()
+
+        subscribers = await self.getSubscribers(nodeIdentifier, pep, recipient)
 
-    def _doNotifyPurge(self, result, nodeIdentifier):
-        self.dispatch(nodeIdentifier, '//event/pubsub/purge')
-
-    def registerPreDelete(self, preDeleteFn):
-        self._callbackList.append(preDeleteFn)
+        # now we can send notifications
+        self.dispatch(
+            {
+                'node': node,
+                'pep': pep,
+                'recipient': recipient,
+                'subscribers': subscribers,
+            },
+            '//event/pubsub/purge'
+        )
 
     def getSubscribers(self, nodeIdentifier, pep, recipient):
         def cb(subscriptions):
@@ -1277,26 +1282,22 @@
         d.addCallback(cb)
         return d
 
-    async def deleteNode(self, nodeIdentifier, requestor, pep, recipient, redirectURI=None):
+    async def deleteNode(
+        self,
+        nodeIdentifier: str,
+        requestor: jid.JID,
+        pep: bool,
+        recipient: jid.JID,
+        redirectURI: str = None
+    ) -> None:
         node = await self.storage.getNode(nodeIdentifier, pep, recipient)
         node, affiliation = await _getAffiliation(node, requestor)
 
         if affiliation != 'owner' and not self.isAdmin(requestor):
             raise error.Forbidden()
 
-        data = {
-            'node': node,
-            'redirectURI': redirectURI
-        }
-
-        d = defer.DeferredList([cb(data, pep, recipient)
-                                for cb in self._callbackList],
-                               consumeErrors=1)
-        result = await d
-        dl = []
-        for succeeded, r in result:
-            if succeeded and r:
-                dl.extend(r)
+        # we have to get subscribers (for notifications) before the node is deleted
+        subscribers = await self.getSubscribers(nodeIdentifier, pep, recipient)
 
         await self.storage.deleteNodeByDbId(node.nodeDbId)
 
@@ -1307,8 +1308,17 @@
             if submitted_node is not None:
                 await submitted_node.setSchema(None)
 
-        for d in dl:
-            d.callback(None)
+        # now we can send notifications
+        self.dispatch(
+            {
+                'node': node,
+                'pep': pep,
+                'recipient': recipient,
+                'redirectURI': redirectURI,
+                'subscribers': subscribers,
+            },
+            '//event/pubsub/delete'
+        )
 
 
 class PubSubResourceFromBackend(pubsub.PubSubResource):
@@ -1376,7 +1386,8 @@
 
         self.backend.registerPublishNotifier(self._notifyPublish)
         self.backend.registerRetractNotifier(self._notifyRetract)
-        self.backend.registerPreDelete(self._preDelete)
+        self.backend.registerDeleteNotifier(self._notifyDelete)
+        self.backend.registerPurgeNotifier(self._notifyPurge)
 
         if self.backend.supportsAutoCreate():
             self.features.append("auto-create")
@@ -1567,16 +1578,52 @@
 
         defer.returnValue((owners, notifications_filtered))
 
-    def _preDelete(self, data, pep, recipient):
+    async def _aNotifyDelete(self, data):
         nodeIdentifier = data['node'].nodeIdentifier
+        pep = data['pep']
+        recipient = data['recipient']
         redirectURI = data.get('redirectURI', None)
-        d = self.backend.getSubscribers(nodeIdentifier, pep, recipient)
-        d.addCallback(lambda subscribers: self.pubsubService.notifyDelete(
-                                                self.serviceJID,
-                                                nodeIdentifier,
-                                                subscribers,
-                                                redirectURI))
-        return d
+        subscribers = data['subscribers']
+        if pep:
+            self.backend.privilege.notifyDelete(
+                recipient,
+                nodeIdentifier,
+                subscribers,
+                redirectURI
+            )
+        else:
+            self.pubsubService.notifyDelete(
+                self.serviceJID,
+                nodeIdentifier,
+                subscribers,
+                redirectURI
+            )
+
+    def _notifyDelete(self, data):
+        d = defer.ensureDeferred(self._aNotifyDelete(data))
+        d.addErrback(log.err)
+
+    async def _aNotifyPurge(self, data):
+        nodeIdentifier = data['node'].nodeIdentifier
+        pep = data['pep']
+        recipient = data['recipient']
+        subscribers = data['subscribers']
+        if pep:
+            self.backend.privilege.notifyPurge(
+                recipient,
+                nodeIdentifier,
+                subscribers,
+            )
+        else:
+            self.pubsubService.notifyPurge(
+                self.serviceJID,
+                nodeIdentifier,
+                subscribers,
+            )
+
+    def _notifyPurge(self, data):
+        d = defer.ensureDeferred(self._aNotifyPurge(data))
+        d.addErrback(log.err)
 
     def _mapErrors(self, failure):
         e = failure.trap(*list(self._errorMap.keys()))
@@ -1826,10 +1873,12 @@
         return d.addErrback(self._mapErrors)
 
     def purge(self, request):
-        d = self.backend.purgeNode(request.nodeIdentifier,
+        d = defer.ensureDeferred(
+            self.backend.purgeNode(request.nodeIdentifier,
                                    request.sender,
                                    self._isPep(request),
                                    request.recipient)
+        )
         return d.addErrback(self._mapErrors)
 
     def delete(self, request):