diff idavoll/backend.py @ 206:274a45d2a5ab

Implement root collection that includes all leaf nodes.
author Ralph Meijer <ralphm@ik.nu>
date Mon, 04 Aug 2008 13:47:10 +0000
parents e6b710bf2b24
children 7f3ffb7a1a9e
line wrap: on
line diff
--- a/idavoll/backend.py	Mon Aug 04 07:10:45 2008 +0000
+++ b/idavoll/backend.py	Mon Aug 04 13:47:10 2008 +0000
@@ -18,7 +18,7 @@
 from zope.interface import implements
 
 from twisted.application import service
-from twisted.python import components
+from twisted.python import components, log
 from twisted.internet import defer, reactor
 from twisted.words.protocols.jabber.error import StanzaError
 from twisted.words.xish import domish, utility
@@ -27,7 +27,7 @@
 from wokkel.pubsub import PubSubService, PubSubError
 
 from idavoll import error, iidavoll
-from idavoll.iidavoll import IBackendService
+from idavoll.iidavoll import IBackendService, ILeafNode
 
 def _getAffiliation(node, entity):
     d = node.getAffiliation(entity)
@@ -40,35 +40,45 @@
     """
     Generic publish-subscribe backend service.
 
-    @cvar options: Node configuration form as a mapping from the field
-                   name to a dictionary that holds the field's type,
-                   label and possible options to choose from.
-    @type options: C{dict}.
+    @cvar nodeOptions: Node configuration form as a mapping from the field
+                       name to a dictionary that holds the field's type, label
+                       and possible options to choose from.
+    @type nodeOptions: C{dict}.
     @cvar defaultConfig: The default node configuration.
     """
 
     implements(iidavoll.IBackendService)
 
-    options = {"pubsub#persist_items":
-                  {"type": "boolean",
-                   "label": "Persist items to storage"},
-               "pubsub#deliver_payloads":
-                  {"type": "boolean",
-                   "label": "Deliver payloads with event notifications"},
-               "pubsub#send_last_published_item":
-                  {"type": "list-single",
-                   "label": "When to send the last published item",
-                   "options": {
-                       "never": "Never",
-                       "on_sub": "When a new subscription is processed",
-                       }
-                  },
-              }
+    nodeOptions = {
+            "pubsub#persist_items":
+                {"type": "boolean",
+                 "label": "Persist items to storage"},
+            "pubsub#deliver_payloads":
+                {"type": "boolean",
+                 "label": "Deliver payloads with event notifications"},
+            "pubsub#send_last_published_item":
+                {"type": "list-single",
+                 "label": "When to send the last published item",
+                 "options": {
+                     "never": "Never",
+                     "on_sub": "When a new subscription is processed"}
+                },
+            }
 
-    defaultConfig = {"pubsub#persist_items": True,
-                     "pubsub#deliver_payloads": True,
-                     "pubsub#send_last_published_item": 'on_sub',
-                    }
+    subscriptionOptions = {
+            "pubsub#subscription_type":
+                {"type": "list-single",
+                 "options": {
+                     "items": "Receive notification of new items only",
+                     "nodes": "Receive notification of new nodes only"}
+                },
+            "pubsub#subscription_depth":
+                {"type": "list-single",
+                 "options": {
+                     "1": "Receive notification from direct child nodes only",
+                     "all": "Receive notification from all descendent nodes"}
+                },
+            }
 
     def __init__(self, storage):
         utility.EventDispatcher.__init__(self)
@@ -108,9 +118,9 @@
     def _makeMetaData(self, metaData):
         options = []
         for key, value in metaData.iteritems():
-            if self.options.has_key(key):
+            if key in self.nodeOptions:
                 option = {"var": key}
-                option.update(self.options[key])
+                option.update(self.nodeOptions[key])
                 option["value"] = value
                 options.append(option)
 
@@ -136,6 +146,9 @@
 
 
     def _doPublish(self, node, items, requestor):
+        if node.nodeType == 'collection':
+            raise error.NoPublishing()
+
         configuration = node.getConfiguration()
         persistItems = configuration["pubsub#persist_items"]
         deliverPayloads = configuration["pubsub#deliver_payloads"]
@@ -147,6 +160,8 @@
 
         if persistItems or deliverPayloads:
             for item in items:
+                item.uri = None
+                item.defaultUri = None
                 if not item.getAttribute("id"):
                     item["id"] = str(uuid.uuid4())
 
@@ -169,18 +184,36 @@
                       '//event/pubsub/notify')
 
 
-    def getNotificationList(self, nodeIdentifier, items):
-        d = self.storage.getNode(nodeIdentifier)
-        d.addCallback(lambda node: node.getSubscribers())
-        d.addCallback(self._magicFilter, nodeIdentifier, items)
-        return d
+    def getNotifications(self, nodeIdentifier, items):
+
+        def toNotifications(subscriptions, nodeIdentifier, items):
+            subsBySubscriber = {}
+            for subscription in subscriptions:
+                if subscription.options.get('pubsub#subscription_type',
+                                            'items') == 'items':
+                    subs = subsBySubscriber.setdefault(subscription.subscriber,
+                                                       set())
+                    subs.add(subscription)
+
+            notifications = [(subscriber, subscriptions, items)
+                             for subscriber, subscriptions
+                             in subsBySubscriber.iteritems()]
 
+            return notifications
 
-    def _magicFilter(self, subscribers, nodeIdentifier, items):
-        list = []
-        for subscriber in subscribers:
-            list.append((subscriber, items))
-        return list
+        def rootNotFound(failure):
+            failure.trap(error.NodeNotFound)
+            return []
+
+        d1 = self.storage.getNode(nodeIdentifier)
+        d1.addCallback(lambda node: node.getSubscriptions('subscribed'))
+        d2 = self.storage.getNode('')
+        d2.addCallback(lambda node: node.getSubscriptions('subscribed'))
+        d2.addErrback(rootNotFound)
+        d = defer.gatherResults([d1, d2])
+        d.addCallback(lambda result: result[0] + result[1])
+        d.addCallback(toNotifications, nodeIdentifier, items)
+        return d
 
 
     def registerNotifier(self, observerfn, *args, **kwargs):
@@ -204,41 +237,42 @@
         if affiliation == 'outcast':
             raise error.Forbidden()
 
-        d = node.addSubscription(subscriber, 'subscribed')
-        d.addCallback(lambda _: self._sendLastPublished(node, subscriber))
-        d.addCallback(lambda _: 'subscribed')
-        d.addErrback(self._getSubscription, node, subscriber)
-        d.addCallback(self._returnSubscription, node.nodeIdentifier)
+        def trapExists(failure):
+            failure.trap(error.SubscriptionExists)
+            return False
+
+        def cb(sendLast):
+            d = node.getSubscription(subscriber)
+            if sendLast:
+                d.addCallback(self._sendLastPublished, node)
+            return d
+
+        d = node.addSubscription(subscriber, 'subscribed', {})
+        d.addCallbacks(lambda _: True, trapExists)
+        d.addCallback(cb)
         return d
 
 
-    def _getSubscription(self, failure, node, subscriber):
-        failure.trap(error.SubscriptionExists)
-        return node.getSubscription(subscriber)
-
-
-    def _returnSubscription(self, result, nodeIdentifier):
-        return nodeIdentifier, result
-
-
-    def _sendLastPublished(self, node, subscriber):
+    def _sendLastPublished(self, subscription, node):
 
         def notifyItem(items):
-            if not items:
-                return
-
-            reactor.callLater(0, self.dispatch,
-                                 {'items': items,
-                                  'nodeIdentifier': node.nodeIdentifier,
-                                  'subscriber': subscriber},
-                                 '//event/pubsub/notify')
+            if items:
+                reactor.callLater(0, self.dispatch,
+                                     {'items': items,
+                                      'nodeIdentifier': node.nodeIdentifier,
+                                      'subscription': subscription},
+                                     '//event/pubsub/notify')
 
         config = node.getConfiguration()
-        if config.get("pubsub#send_last_published_item", 'never') != 'on_sub':
-            return
+        sendLastPublished = config.get('pubsub#send_last_published_item',
+                                       'never')
+        if sendLastPublished == 'on_sub' and node.nodeType == 'leaf':
+            entity = subscription.subscriber.userhostJID()
+            d = self.getItems(node.nodeIdentifier, entity, 1)
+            d.addCallback(notifyItem)
+            d.addErrback(log.err)
 
-        d = self.getItems(node.nodeIdentifier, subscriber.userhostJID(), 1)
-        d.addCallback(notifyItem)
+        return subscription
 
 
     def unsubscribe(self, nodeIdentifier, subscriber, requestor):
@@ -261,13 +295,18 @@
     def createNode(self, nodeIdentifier, requestor):
         if not nodeIdentifier:
             nodeIdentifier = 'generic/%s' % uuid.uuid4()
-        d = self.storage.createNode(nodeIdentifier, requestor)
+
+        nodeType = 'leaf'
+        config = self.storage.getDefaultConfiguration(nodeType)
+        config['pubsub#node_type'] = nodeType
+
+        d = self.storage.createNode(nodeIdentifier, requestor, config)
         d.addCallback(lambda _: nodeIdentifier)
         return d
 
 
-    def getDefaultConfiguration(self):
-        d = defer.succeed(self.defaultConfig)
+    def getDefaultConfiguration(self, nodeType):
+        d = defer.succeed(self.storage.getDefaultConfiguration(nodeType))
         return d
 
 
@@ -277,6 +316,7 @@
 
         d = self.storage.getNode(nodeIdentifier)
         d.addCallback(lambda node: node.getConfiguration())
+
         return d
 
 
@@ -314,6 +354,9 @@
     def _doGetItems(self, result, maxItems, itemIdentifiers):
         node, affiliation = result
 
+        if not ILeafNode.providedBy(node):
+            return []
+
         if affiliation == 'outcast':
             raise error.Forbidden()
 
@@ -382,8 +425,12 @@
 
 
     def getSubscribers(self, nodeIdentifier):
+        def cb(subscriptions):
+            return [subscription.subscriber for subscription in subscriptions]
+
         d = self.storage.getNode(nodeIdentifier)
-        d.addCallback(lambda node: node.getSubscribers())
+        d.addCallback(lambda node: node.getSubscriptions('subscribed'))
+        d.addCallback(cb)
         return d
 
 
@@ -447,6 +494,12 @@
                                   'unsupported',
                                   'persistent-node'),
         error.NoRootNode: ('bad-request', None, None),
+        error.NoCollections: ('feature-not-implemented',
+                              'unsupported',
+                              'collections'),
+        error.NoPublishing: ('feature-not-implemented',
+                             'unsupported',
+                             'publish'),
     }
 
     def __init__(self, backend):
@@ -497,10 +550,12 @@
     def _notify(self, data):
         items = data['items']
         nodeIdentifier = data['nodeIdentifier']
-        if 'subscriber' not in data:
-            d = self.backend.getNotificationList(nodeIdentifier, items)
+        if 'subscription' not in data:
+            d = self.backend.getNotifications(nodeIdentifier, items)
         else:
-            d = defer.succeed([(data['subscriber'], items)])
+            subscription = data['subscription']
+            d = defer.succeed([(subscription.subscriber, [subscription],
+                                items)])
         d.addCallback(lambda notifications: self.notifyPublish(self.serviceJID,
                                                                nodeIdentifier,
                                                                notifications))
@@ -539,11 +594,16 @@
             info['meta-data'] = result
             return info
 
+        def trapNotFound(failure):
+            failure.trap(error.NodeNotFound)
+            return info
+
         d = defer.succeed(nodeIdentifier)
         d.addCallback(self.backend.getNodeType)
         d.addCallback(saveType)
         d.addCallback(self.backend.getNodeMetaData)
         d.addCallback(saveMetaData)
+        d.addErrback(trapNotFound)
         d.addErrback(self._mapErrors)
         return d
 
@@ -586,11 +646,11 @@
 
 
     def getConfigurationOptions(self):
-        return self.backend.options
+        return self.backend.nodeOptions
 
 
-    def getDefaultConfiguration(self, requestor, service):
-        d = self.backend.getDefaultConfiguration()
+    def getDefaultConfiguration(self, requestor, service, nodeType):
+        d = self.backend.getDefaultConfiguration(nodeType)
         return d.addErrback(self._mapErrors)