changeset 206:274a45d2a5ab

Implement root collection that includes all leaf nodes.
author Ralph Meijer <ralphm@ik.nu>
date Mon, 04 Aug 2008 13:47:10 +0000
parents e6b710bf2b24
children 43a4d0d6c076
files db/gateway.sql db/pubsub.sql db/to_idavoll_0.8.sql idavoll/backend.py idavoll/error.py idavoll/gateway.py idavoll/iidavoll.py idavoll/memory_storage.py idavoll/pgsql_storage.py idavoll/tap_http.py idavoll/test/test_backend.py idavoll/test/test_gateway.py idavoll/test/test_storage.py setup.py
diffstat 14 files changed, 896 insertions(+), 396 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/db/gateway.sql	Mon Aug 04 13:47:10 2008 +0000
@@ -0,0 +1,7 @@
+create table callbacks (
+    service text not null,
+    node text not null,
+    uri text not null,
+    PRIMARY KEY (service, node, uri)
+);
+
--- a/db/pubsub.sql	Mon Aug 04 07:10:45 2008 +0000
+++ b/db/pubsub.sql	Mon Aug 04 13:47:10 2008 +0000
@@ -1,42 +1,49 @@
-create table entities (
-    id serial primary key,
-    jid text not null unique
+CREATE TABLE entities (
+    entity_id serial PRIMARY KEY,
+    jid text NOT NULL UNIQUE
 );
 
-create table nodes (
-    id serial primary key,
-    node text not null unique,
-    persistent boolean not null default true,
-    deliver_payload boolean not null default true,
-    send_last_published_item text not null default 'on_sub'
-        check (send_last_published_item in ('never', 'on_sub'))
+CREATE TABLE nodes (
+    node_id serial PRIMARY KEY,
+    node text NOT NULL UNIQUE,
+    node_type text NOT NULL DEFAULT 'leaf'
+        CHECK (node_type IN ('leaf', 'collection')),
+    persist_items boolean,
+    deliver_payloads boolean NOT NULL DEFAULT TRUE,
+    send_last_published_item text NOT NULL DEFAULT 'on_sub'
+        CHECK (send_last_published_item IN ('never', 'on_sub'))
 );
 
-create table affiliations (
-    id serial primary key,
-    entity_id integer not null references entities on delete cascade,
-    node_id integer not null references nodes on delete cascade,
-    affiliation text not null
-        check (affiliation in ('outcast', 'publisher', 'owner')),
-    unique (entity_id, node_id)
+INSERT INTO nodes (node, node_type) values ('', 'collection');
+
+CREATE TABLE affiliations (
+    affiliation_id serial PRIMARY KEY,
+    entity_id integer NOT NULL REFERENCES entities ON DELETE CASCADE,
+    node_id integer NOT NULL references nodes ON DELETE CASCADE,
+    affiliation text NOT NULL
+        CHECK (affiliation IN ('outcast', 'publisher', 'owner')),
+    UNIQUE (entity_id, node_id)
 );
 
-create table subscriptions (
-    id serial primary key,
-    entity_id integer not null references entities on delete cascade,
+CREATE TABLE subscriptions (
+    subscription_id serial PRIMARY KEY,
+    entity_id integer NOT NULL REFERENCES entities ON DELETE CASCADE,
     resource text,
-    node_id integer not null references nodes on delete cascade,
-    subscription text not null default 'subscribed' check
-        (subscription in ('subscribed', 'pending', 'unconfigured')),
-    unique (entity_id, resource, node_id)
-);
+    node_id integer NOT NULL REFERENCES nodes ON delete CASCADE,
+    state text NOT NULL DEFAULT 'subscribed'
+    	CHECK (state IN ('subscribed', 'pending', 'unconfigured')),
+    subscription_type text
+    	CHECK (subscription_type IN (NULL, 'items', 'nodes')),
+    subscription_depth text
+    	CHECK (subscription_depth IN (NULL, '1', 'all')),
+    UNIQUE (entity_id, resource, node_id));
 
-create table items (
-    id serial primary key,
-    node_id integer not null references nodes on delete cascade,
-    item text not null,
-    publisher text not null,
+CREATE TABLE items (
+    item_id serial PRIMARY KEY,
+    node_id integer NOT NULL REFERENCES nodes ON DELETE CASCADE,
+    item text NOT NULL,
+    publisher text NOT NULL,
     data text,
-    date timestamp with time zone not null default now(),
-    unique (node_id, item)
+    date timestamp with time zone NOT NULL DEFAULT now(),
+    UNIQUE (node_id, item)
 );
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/db/to_idavoll_0.8.sql	Mon Aug 04 13:47:10 2008 +0000
@@ -0,0 +1,25 @@
+ALTER TABLE affiliations RENAME id TO affiliation_id;
+
+ALTER TABLE entities RENAME id TO entity_id;
+
+ALTER TABLE items RENAME id TO item_id;
+
+ALTER TABLE nodes RENAME id TO node_id;
+ALTER TABLE nodes RENAME persistent to persist_items;
+ALTER TABLE nodes RENAME deliver_payload to deliver_payloads;
+ALTER TABLE nodes ADD COLUMN node_type text;
+ALTER TABLE nodes ALTER COLUMN node_type SET DEFAULT 'leaf';
+UPDATE nodes SET node_type = 'leaf';
+ALTER TABLE nodes ALTER COLUMN node_type SET NOT NULL;
+ALTER TABLE nodes ADD CHECK (node_type IN ('leaf', 'collection'));
+ALTER TABLE nodes ALTER COLUMN persistent DROP NOT NULL;
+ALTER TABLE nodes ALTER COLUMN persistent DROP DEFAULT;
+
+ALTER TABLE subscriptions RENAME id TO subscription_id;
+ALTER TABLE subscriptions RENAME subscription TO state;
+ALTER TABLE subscriptions ADD COLUMN subscription_type text
+    	CHECK (subscription_type IN (NULL, 'items', 'nodes'));
+ALTER TABLE subscriptions ADD COLUMN subscription_depth text
+    	CHECK (subscription_depth IN (NULL, '1', 'all'));
+
+INSERT INTO nodes (node, node_type) values ('', 'collection');
--- a/idavoll/backend.py	Mon Aug 04 07:10:45 2008 +0000
+++ b/idavoll/backend.py	Mon Aug 04 13:47:10 2008 +0000
@@ -18,7 +18,7 @@
 from zope.interface import implements
 
 from twisted.application import service
-from twisted.python import components
+from twisted.python import components, log
 from twisted.internet import defer, reactor
 from twisted.words.protocols.jabber.error import StanzaError
 from twisted.words.xish import domish, utility
@@ -27,7 +27,7 @@
 from wokkel.pubsub import PubSubService, PubSubError
 
 from idavoll import error, iidavoll
-from idavoll.iidavoll import IBackendService
+from idavoll.iidavoll import IBackendService, ILeafNode
 
 def _getAffiliation(node, entity):
     d = node.getAffiliation(entity)
@@ -40,35 +40,45 @@
     """
     Generic publish-subscribe backend service.
 
-    @cvar options: Node configuration form as a mapping from the field
-                   name to a dictionary that holds the field's type,
-                   label and possible options to choose from.
-    @type options: C{dict}.
+    @cvar nodeOptions: Node configuration form as a mapping from the field
+                       name to a dictionary that holds the field's type, label
+                       and possible options to choose from.
+    @type nodeOptions: C{dict}.
     @cvar defaultConfig: The default node configuration.
     """
 
     implements(iidavoll.IBackendService)
 
-    options = {"pubsub#persist_items":
-                  {"type": "boolean",
-                   "label": "Persist items to storage"},
-               "pubsub#deliver_payloads":
-                  {"type": "boolean",
-                   "label": "Deliver payloads with event notifications"},
-               "pubsub#send_last_published_item":
-                  {"type": "list-single",
-                   "label": "When to send the last published item",
-                   "options": {
-                       "never": "Never",
-                       "on_sub": "When a new subscription is processed",
-                       }
-                  },
-              }
+    nodeOptions = {
+            "pubsub#persist_items":
+                {"type": "boolean",
+                 "label": "Persist items to storage"},
+            "pubsub#deliver_payloads":
+                {"type": "boolean",
+                 "label": "Deliver payloads with event notifications"},
+            "pubsub#send_last_published_item":
+                {"type": "list-single",
+                 "label": "When to send the last published item",
+                 "options": {
+                     "never": "Never",
+                     "on_sub": "When a new subscription is processed"}
+                },
+            }
 
-    defaultConfig = {"pubsub#persist_items": True,
-                     "pubsub#deliver_payloads": True,
-                     "pubsub#send_last_published_item": 'on_sub',
-                    }
+    subscriptionOptions = {
+            "pubsub#subscription_type":
+                {"type": "list-single",
+                 "options": {
+                     "items": "Receive notification of new items only",
+                     "nodes": "Receive notification of new nodes only"}
+                },
+            "pubsub#subscription_depth":
+                {"type": "list-single",
+                 "options": {
+                     "1": "Receive notification from direct child nodes only",
+                     "all": "Receive notification from all descendent nodes"}
+                },
+            }
 
     def __init__(self, storage):
         utility.EventDispatcher.__init__(self)
@@ -108,9 +118,9 @@
     def _makeMetaData(self, metaData):
         options = []
         for key, value in metaData.iteritems():
-            if self.options.has_key(key):
+            if key in self.nodeOptions:
                 option = {"var": key}
-                option.update(self.options[key])
+                option.update(self.nodeOptions[key])
                 option["value"] = value
                 options.append(option)
 
@@ -136,6 +146,9 @@
 
 
     def _doPublish(self, node, items, requestor):
+        if node.nodeType == 'collection':
+            raise error.NoPublishing()
+
         configuration = node.getConfiguration()
         persistItems = configuration["pubsub#persist_items"]
         deliverPayloads = configuration["pubsub#deliver_payloads"]
@@ -147,6 +160,8 @@
 
         if persistItems or deliverPayloads:
             for item in items:
+                item.uri = None
+                item.defaultUri = None
                 if not item.getAttribute("id"):
                     item["id"] = str(uuid.uuid4())
 
@@ -169,18 +184,36 @@
                       '//event/pubsub/notify')
 
 
-    def getNotificationList(self, nodeIdentifier, items):
-        d = self.storage.getNode(nodeIdentifier)
-        d.addCallback(lambda node: node.getSubscribers())
-        d.addCallback(self._magicFilter, nodeIdentifier, items)
-        return d
+    def getNotifications(self, nodeIdentifier, items):
+
+        def toNotifications(subscriptions, nodeIdentifier, items):
+            subsBySubscriber = {}
+            for subscription in subscriptions:
+                if subscription.options.get('pubsub#subscription_type',
+                                            'items') == 'items':
+                    subs = subsBySubscriber.setdefault(subscription.subscriber,
+                                                       set())
+                    subs.add(subscription)
+
+            notifications = [(subscriber, subscriptions, items)
+                             for subscriber, subscriptions
+                             in subsBySubscriber.iteritems()]
 
+            return notifications
 
-    def _magicFilter(self, subscribers, nodeIdentifier, items):
-        list = []
-        for subscriber in subscribers:
-            list.append((subscriber, items))
-        return list
+        def rootNotFound(failure):
+            failure.trap(error.NodeNotFound)
+            return []
+
+        d1 = self.storage.getNode(nodeIdentifier)
+        d1.addCallback(lambda node: node.getSubscriptions('subscribed'))
+        d2 = self.storage.getNode('')
+        d2.addCallback(lambda node: node.getSubscriptions('subscribed'))
+        d2.addErrback(rootNotFound)
+        d = defer.gatherResults([d1, d2])
+        d.addCallback(lambda result: result[0] + result[1])
+        d.addCallback(toNotifications, nodeIdentifier, items)
+        return d
 
 
     def registerNotifier(self, observerfn, *args, **kwargs):
@@ -204,41 +237,42 @@
         if affiliation == 'outcast':
             raise error.Forbidden()
 
-        d = node.addSubscription(subscriber, 'subscribed')
-        d.addCallback(lambda _: self._sendLastPublished(node, subscriber))
-        d.addCallback(lambda _: 'subscribed')
-        d.addErrback(self._getSubscription, node, subscriber)
-        d.addCallback(self._returnSubscription, node.nodeIdentifier)
+        def trapExists(failure):
+            failure.trap(error.SubscriptionExists)
+            return False
+
+        def cb(sendLast):
+            d = node.getSubscription(subscriber)
+            if sendLast:
+                d.addCallback(self._sendLastPublished, node)
+            return d
+
+        d = node.addSubscription(subscriber, 'subscribed', {})
+        d.addCallbacks(lambda _: True, trapExists)
+        d.addCallback(cb)
         return d
 
 
-    def _getSubscription(self, failure, node, subscriber):
-        failure.trap(error.SubscriptionExists)
-        return node.getSubscription(subscriber)
-
-
-    def _returnSubscription(self, result, nodeIdentifier):
-        return nodeIdentifier, result
-
-
-    def _sendLastPublished(self, node, subscriber):
+    def _sendLastPublished(self, subscription, node):
 
         def notifyItem(items):
-            if not items:
-                return
-
-            reactor.callLater(0, self.dispatch,
-                                 {'items': items,
-                                  'nodeIdentifier': node.nodeIdentifier,
-                                  'subscriber': subscriber},
-                                 '//event/pubsub/notify')
+            if items:
+                reactor.callLater(0, self.dispatch,
+                                     {'items': items,
+                                      'nodeIdentifier': node.nodeIdentifier,
+                                      'subscription': subscription},
+                                     '//event/pubsub/notify')
 
         config = node.getConfiguration()
-        if config.get("pubsub#send_last_published_item", 'never') != 'on_sub':
-            return
+        sendLastPublished = config.get('pubsub#send_last_published_item',
+                                       'never')
+        if sendLastPublished == 'on_sub' and node.nodeType == 'leaf':
+            entity = subscription.subscriber.userhostJID()
+            d = self.getItems(node.nodeIdentifier, entity, 1)
+            d.addCallback(notifyItem)
+            d.addErrback(log.err)
 
-        d = self.getItems(node.nodeIdentifier, subscriber.userhostJID(), 1)
-        d.addCallback(notifyItem)
+        return subscription
 
 
     def unsubscribe(self, nodeIdentifier, subscriber, requestor):
@@ -261,13 +295,18 @@
     def createNode(self, nodeIdentifier, requestor):
         if not nodeIdentifier:
             nodeIdentifier = 'generic/%s' % uuid.uuid4()
-        d = self.storage.createNode(nodeIdentifier, requestor)
+
+        nodeType = 'leaf'
+        config = self.storage.getDefaultConfiguration(nodeType)
+        config['pubsub#node_type'] = nodeType
+
+        d = self.storage.createNode(nodeIdentifier, requestor, config)
         d.addCallback(lambda _: nodeIdentifier)
         return d
 
 
-    def getDefaultConfiguration(self):
-        d = defer.succeed(self.defaultConfig)
+    def getDefaultConfiguration(self, nodeType):
+        d = defer.succeed(self.storage.getDefaultConfiguration(nodeType))
         return d
 
 
@@ -277,6 +316,7 @@
 
         d = self.storage.getNode(nodeIdentifier)
         d.addCallback(lambda node: node.getConfiguration())
+
         return d
 
 
@@ -314,6 +354,9 @@
     def _doGetItems(self, result, maxItems, itemIdentifiers):
         node, affiliation = result
 
+        if not ILeafNode.providedBy(node):
+            return []
+
         if affiliation == 'outcast':
             raise error.Forbidden()
 
@@ -382,8 +425,12 @@
 
 
     def getSubscribers(self, nodeIdentifier):
+        def cb(subscriptions):
+            return [subscription.subscriber for subscription in subscriptions]
+
         d = self.storage.getNode(nodeIdentifier)
-        d.addCallback(lambda node: node.getSubscribers())
+        d.addCallback(lambda node: node.getSubscriptions('subscribed'))
+        d.addCallback(cb)
         return d
 
 
@@ -447,6 +494,12 @@
                                   'unsupported',
                                   'persistent-node'),
         error.NoRootNode: ('bad-request', None, None),
+        error.NoCollections: ('feature-not-implemented',
+                              'unsupported',
+                              'collections'),
+        error.NoPublishing: ('feature-not-implemented',
+                             'unsupported',
+                             'publish'),
     }
 
     def __init__(self, backend):
@@ -497,10 +550,12 @@
     def _notify(self, data):
         items = data['items']
         nodeIdentifier = data['nodeIdentifier']
-        if 'subscriber' not in data:
-            d = self.backend.getNotificationList(nodeIdentifier, items)
+        if 'subscription' not in data:
+            d = self.backend.getNotifications(nodeIdentifier, items)
         else:
-            d = defer.succeed([(data['subscriber'], items)])
+            subscription = data['subscription']
+            d = defer.succeed([(subscription.subscriber, [subscription],
+                                items)])
         d.addCallback(lambda notifications: self.notifyPublish(self.serviceJID,
                                                                nodeIdentifier,
                                                                notifications))
@@ -539,11 +594,16 @@
             info['meta-data'] = result
             return info
 
+        def trapNotFound(failure):
+            failure.trap(error.NodeNotFound)
+            return info
+
         d = defer.succeed(nodeIdentifier)
         d.addCallback(self.backend.getNodeType)
         d.addCallback(saveType)
         d.addCallback(self.backend.getNodeMetaData)
         d.addCallback(saveMetaData)
+        d.addErrback(trapNotFound)
         d.addErrback(self._mapErrors)
         return d
 
@@ -586,11 +646,11 @@
 
 
     def getConfigurationOptions(self):
-        return self.backend.options
+        return self.backend.nodeOptions
 
 
-    def getDefaultConfiguration(self, requestor, service):
-        d = self.backend.getDefaultConfiguration()
+    def getDefaultConfiguration(self, requestor, service, nodeType):
+        d = self.backend.getDefaultConfiguration(nodeType)
         return d.addErrback(self._mapErrors)
 
 
--- a/idavoll/error.py	Mon Aug 04 07:10:45 2008 +0000
+++ b/idavoll/error.py	Mon Aug 04 13:47:10 2008 +0000
@@ -8,10 +8,12 @@
         return self.msg
 
 
+
 class NodeNotFound(Error):
     pass
 
 
+
 class NodeExists(Error):
     pass
 
@@ -35,30 +37,37 @@
     pass
 
 
+
 class ItemForbidden(Error):
     pass
 
 
+
 class ItemRequired(Error):
     pass
 
 
+
 class NoInstantNodes(Error):
     pass
 
 
+
 class InvalidConfigurationOption(Error):
     msg = 'Invalid configuration option'
 
 
+
 class InvalidConfigurationValue(Error):
     msg = 'Bad configuration value'
 
 
+
 class NodeNotPersistent(Error):
     pass
 
 
+
 class NoRootNode(Error):
     pass
 
@@ -68,3 +77,15 @@
     """
     There are no callbacks for this node.
     """
+
+
+
+class NoCollections(Error):
+    pass
+
+
+
+class NoPublishing(Error):
+    """
+    This node does not support publishing.
+    """
--- a/idavoll/gateway.py	Mon Aug 04 07:10:45 2008 +0000
+++ b/idavoll/gateway.py	Mon Aug 04 13:47:10 2008 +0000
@@ -66,7 +66,7 @@
         raise XMPPURIParseError("Empty URI path component")
 
     try:
-        jid = JID(entity)
+        service = JID(entity)
     except Exception, e:
         raise XMPPURIParseError("Invalid JID: %s" % e.message)
 
@@ -75,9 +75,17 @@
     try:
         nodeIdentifier = params['node'][0]
     except (KeyError, ValueError):
-        raise XMPPURIParseError("No node in query component of URI")
+        nodeIdentifier = ''
+
+    return service, nodeIdentifier
+
+
 
-    return jid, nodeIdentifier
+def getXMPPURI(service, nodeIdentifier):
+    """
+    Construct an XMPP URI from a service JID and node identifier.
+    """
+    return "xmpp:%s?;node=%s" % (service.full(), nodeIdentifier or '')
 
 
 
@@ -134,10 +142,11 @@
         """
 
         def toResponse(nodeIdentifier):
-            uri = 'xmpp:%s?;node=%s' % (self.serviceJID.full(), nodeIdentifier)
+            uri = getXMPPURI(self.serviceJID, nodeIdentifier)
             stream = simplejson.dumps({'uri': uri})
-            return http.Response(responsecode.OK, stream=stream)
-
+            contentType = http_headers.MimeType.fromString(MIME_JSON)
+            return http.Response(responsecode.OK, stream=stream,
+                                 headers={'Content-Type': contentType})
         d = self.backend.createNode(None, self.owner)
         d.addCallback(toResponse)
         return d
@@ -237,9 +246,11 @@
         """
 
         def toResponse(nodeIdentifier):
-            uri = 'xmpp:%s?;node=%s' % (self.serviceJID.full(), nodeIdentifier)
+            uri = getXMPPURI(self.serviceJID, nodeIdentifier)
             stream = simplejson.dumps({'uri': uri})
-            return http.Response(responsecode.OK, stream=stream)
+            contentType = http_headers.MimeType.fromString(MIME_JSON)
+            return http.Response(responsecode.OK, stream=stream,
+                                 headers={'Content-Type': contentType})
 
         def gotNode(nodeIdentifier, payload):
             item = Item(id='current', payload=payload)
@@ -286,8 +297,10 @@
 
     def render(self, request):
         def responseFromNodes(nodeIdentifiers):
-            return http.Response(responsecode.OK,
-                                 stream=simplejson.dumps(nodeIdentifiers))
+            stream = simplejson.dumps(nodeIdentifiers)
+            contentType = http_headers.MimeType.fromString(MIME_JSON)
+            return http.Response(responsecode.OK, stream=stream,
+                                 headers={'Content-Type': contentType})
 
         d = self.service.getNodes()
         d.addCallback(responseFromNodes)
@@ -328,7 +341,7 @@
 
 
 def constructFeed(service, nodeIdentifier, entries, title):
-    nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier)
+    nodeURI = getXMPPURI(service, nodeIdentifier)
     now = strftime("%Y-%m-%dT%H:%M:%SZ", gmtime())
 
     # Collect the received entries in a feed
@@ -428,6 +441,9 @@
         """
 
         atomEntries = extractAtomEntries(event.items)
+        service = event.sender
+        nodeIdentifier = event.nodeIdentifier
+        headers = event.headers
 
         # Don't notify if there are no atom entries
         if not atomEntries:
@@ -438,12 +454,16 @@
             payload = atomEntries[0]
         else:
             contentType = 'application/atom+xml;type=feed'
-            payload = constructFeed(event.sender, event.nodeIdentifier,
-                                    atomEntries,
+            payload = constructFeed(service, nodeIdentifier, atomEntries,
                                     title='Received item collection')
 
-        self.callCallbacks(event.sender, event.nodeIdentifier, payload,
-                           contentType)
+        self.callCallbacks(service, nodeIdentifier, payload, contentType)
+
+        if 'Collection' in headers:
+            for collection in headers['Collection']:
+                nodeIdentifier = collection or ''
+                self.callCallbacks(service, nodeIdentifier, payload,
+                                   contentType)
 
 
     def deleteReceived(self, event):
@@ -451,8 +471,9 @@
         Fire up HTTP client to do callback
         """
 
-        self.callCallbacks(event.sender, event.nodeIdentifier,
-                           eventType='DELETED')
+        service = event.sender
+        nodeIdentifier = event.nodeIdentifier
+        self.callCallbacks(service, nodeIdentifier, eventType='DELETED')
 
 
     def _postTo(self, callbacks, service, nodeIdentifier,
@@ -462,7 +483,7 @@
             return
 
         postdata = None
-        nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier)
+        nodeURI = getXMPPURI(service, nodeIdentifier)
         headers = {'Referer': nodeURI.encode('utf-8'),
                    'PubSub-Service': service.full().encode('utf-8')}
 
@@ -491,9 +512,7 @@
         def eb(failure):
             failure.trap(error.NoCallbacks)
 
-            # No callbacks were registered for this node. Unsubscribe.
-            d = self.unsubscribe(service, nodeIdentifier, self.jid)
-            return d
+            # No callbacks were registered for this node. Unsubscribe?
 
         d = self.storage.getCallbacks(service, nodeIdentifier)
         d.addCallback(self._postTo, service, nodeIdentifier, payload,
@@ -524,6 +543,8 @@
                 (responsecode.FORBIDDEN, "Node not found"),
             error.NotSubscribed:
                 (responsecode.FORBIDDEN, "No such subscription found"),
+            error.SubscriptionExists:
+                (responsecode.FORBIDDEN, "Subscription already exists"),
     }
 
     def __init__(self, service):
@@ -734,6 +755,13 @@
         pass
 
 
+    def ping(self):
+        f = getPageWithFactory(self._makeURI(''),
+                               method='HEAD',
+                               agent=self.agent)
+        return f.deferred
+
+
     def create(self):
         f = getPageWithFactory(self._makeURI('create'),
                     method='POST',
--- a/idavoll/iidavoll.py	Mon Aug 04 07:10:45 2008 +0000
+++ b/idavoll/iidavoll.py	Mon Aug 04 13:47:10 2008 +0000
@@ -13,7 +13,7 @@
 
     def __init__(storage):
         """
-        @param storage: L{storage} object.
+        @param storage: Object providing L{IStorage}.
         """
 
 
@@ -164,9 +164,38 @@
         """ Register callback which is called for notification. """
 
 
-    def getNotificationList(nodeIdentifier, items):
+    def getNotifications(nodeIdentifier, items):
         """
-        Get list of entities to notify.
+        Get notification list.
+
+        This method is called to discover which entities should receive
+        notifications for the given items that have just been published to the
+        given node.
+
+        The notification list contains tuples (subscriber, subscriptions,
+        items) to result in one notification per tuple: the given subscriptions
+        yielded the given items to be notified to this subscriber.  This
+        structure is needed allow for letting the subscriber know which
+        subscriptions yielded which notifications, while catering for
+        collection nodes and content-based subscriptions.
+
+        To minimize the amount of notifications per entity, implementers
+        should take care that if all items in C{items} were yielded
+        by the same set of subscriptions, exactly one tuple is for this
+        subscriber is returned, so that the subscriber would get exactly one
+        notification. Alternatively, one tuple per subscription combination.
+
+        @param nodeIdentifier: The identifier of the node the items were
+                               published to.
+        @type nodeIdentifier: C{unicode}.
+        @param items: The list of published items as
+                      L{Element<twisted.words.xish.domish.Element>}s.
+        @type items: C{list}
+        @return: The notification list as tuples of
+                 (L{JID<twisted.words.protocols.jabber.jid.JID>},
+                  C{list} of L{Subscription<wokkel.pubsub.Subscription>},
+                  C{list} of L{Element<twisted.words.xish.domish.Element>}.
+        @rtype: C{list}
         """
 
 
@@ -197,8 +226,8 @@
         Get Node.
 
         @param nodeIdentifier: NodeID of the desired node.
-        @type nodeIdentifier: L{str}
-        @return: deferred that returns a L{Node} object.
+        @type nodeIdentifier: C{str}
+        @return: deferred that returns a L{INode} providing object.
         """
 
 
@@ -206,22 +235,26 @@
         """
         Return all NodeIDs.
 
-        @return: deferred that returns a list of NodeIDs (L{str}).
+        @return: deferred that returns a list of NodeIDs (C{unicode}).
         """
 
 
-    def createNode(nodeIdentifier, owner, config=None):
+    def createNode(nodeIdentifier, owner, config):
         """
         Create new node.
 
         The implementation should make sure, the passed owner JID is stripped
-        of the resource (e.g. using C{owner.userhostJID()}).
+        of the resource (e.g. using C{owner.userhostJID()}). The passed config
+        is expected to have values for the fields returned by
+        L{getDefaultConfiguration}, as well as a value for
+        C{'pubsub#node_type'}.
 
         @param nodeIdentifier: NodeID of the new node.
-        @type nodeIdentifier: L{str}
+        @type nodeIdentifier: C{unicode}
         @param owner: JID of the new nodes's owner.
-        @type owner: L{jid.JID}
-        @param config: Configuration
+        @type owner: L{JID<twisted.words.protocols.jabber.jid.JID>}
+        @param config: Node configuration.
+        @type config: C{dict}
         @return: deferred that fires on creation.
         """
 
@@ -231,7 +264,7 @@
         Delete a node.
 
         @param nodeIdentifier: NodeID of the new node.
-        @type nodeIdentifier: L{str}
+        @type nodeIdentifier: C{unicode}
         @return: deferred that fires on deletion.
         """
 
@@ -244,11 +277,11 @@
         of the resource (e.g. using C{owner.userhostJID()}).
 
         @param entity: JID of the entity.
-        @type entity: L{jid.JID}
-        @return: deferred that returns a L{list} of tuples of the form
+        @type entity: L{JID<twisted.words.protocols.jabber.jid.JID>}
+        @return: deferred that returns a C{list} of tuples of the form
                  C{(nodeIdentifier, affiliation)}, where C{nodeIdentifier} is
-                 of the type L{str} and C{affiliation} is one of C{'owner'},
-                 C{'publisher'} and C{'outcast'}.
+                 of the type L{unicode} and C{affiliation} is one of
+                 C{'owner'}, C{'publisher'} and C{'outcast'}.
         """
 
 
@@ -260,12 +293,26 @@
         of the resource (e.g. using C{owner.userhostJID()}).
 
         @param entity: JID of the entity.
-        @type entity: L{jid.JID}
-        @return: deferred that returns a L{list} of tuples of the form
+        @type entity: L{JID<twisted.words.protocols.jabber.jid.JID>}
+        @return: deferred that returns a C{list} of tuples of the form
                  C{(nodeIdentifier, subscriber, state)}, where
-                 C{nodeIdentifier} is of the type L{str}, C{subscriber} of the
-                 type {jid.JID}, and C{state} is C{'subscribed'} or
-                 C{'pending'}.
+                 C{nodeIdentifier} is of the type C{unicode}, C{subscriber} of
+                 the type J{JID<twisted.words.protocols.jabber.jid.JID>}, and
+                 C{state} is C{'subscribed'}, C{'pending'} or
+                 C{'unconfigured'}.
+        """
+
+
+    def getDefaultConfiguration(nodeType):
+        """
+        Get the default configuration for the given node type.
+
+        @param nodeType: Either C{'leaf'} or C{'collection'}.
+        @type nodeType: C{str}
+        @return: The default configuration.
+        @rtype: C{dict}.
+        @raises: L{idavoll.error.NoCollections} if collections are not
+                 supported.
         """
 
 
@@ -295,7 +342,7 @@
         The configuration must at least have two options:
         C{pubsub#persist_items}, and C{pubsub#deliver_payloads}.
 
-        @return: L{dict} of configuration options.
+        @return: C{dict} of configuration options.
         """
 
 
@@ -306,7 +353,7 @@
         The meta data must be a superset of the configuration options, and
         also at least should have a C{pubsub#node_type} entry.
 
-        @return: L{dict} of meta data.
+        @return: C{dict} of meta data.
         """
 
 
@@ -328,7 +375,7 @@
         Get affiliation of entity with this node.
 
         @param entity: JID of entity.
-        @type entity: L{jid.JID}
+        @type entity: L{JID<twisted.words.protocols.jabber.jid.JID>}
         @return: deferred that returns C{'owner'}, C{'publisher'}, C{'outcast'}
                  or C{None}.
         """
@@ -339,20 +386,37 @@
         Get subscription to this node of subscriber.
 
         @param subscriber: JID of the new subscriptions' entity.
-        @type subscriber: L{jid.JID}
+        @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>}
         @return: deferred that returns the subscription state (C{'subscribed'},
                  C{'pending'} or C{None}).
         """
 
 
-    def addSubscription(subscriber, state):
+    def getSubscriptions(state=None):
+        """
+        Get list of subscriptions to this node.
+
+        The optional C{state} argument filters the subscriptions to their
+        state.
+
+        @param state: Subscription state filter. One of C{'subscribed'},
+                      C{'pending'}, C{'unconfigured'}.
+        @type state: C{str}
+        @return: a deferred that returns a C{list} of
+                 L{wokkel.pubsub.Subscription}s.
+        """
+
+
+    def addSubscription(subscriber, state, config):
         """
         Add new subscription to this node with given state.
 
         @param subscriber: JID of the new subscriptions' entity.
-        @type subscriber: L{jid.JID}
+        @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>}
         @param state: C{'subscribed'} or C{'pending'}
-        @type state: L{str}
+        @type state: C{str}
+        @param config: Subscription configuration.
+        @param config: C{dict}
         @return: deferred that fires on subscription.
         """
 
@@ -362,22 +426,11 @@
         Remove subscription to this node.
 
         @param subscriber: JID of the subscriptions' entity.
-        @type subscriber: L{jid.JID}
+        @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>}
         @return: deferred that fires on removal.
         """
 
 
-    def getSubscribers():
-        """
-        Get list of subscribers to this node.
-
-        Retrieves the list of entities that have a subscription to this
-        node. That is, having the state C{'subscribed'}.
-
-        @return: a deferred that returns a L{list} of L{jid.JID}s.
-        """
-
-
     def isSubscribed(entity):
         """
         Returns whether entity has any subscription to this node.
@@ -386,8 +439,8 @@
         C{'subscribed'} for any subscription that matches the bare JID.
 
         @param subscriber: bare JID of the subscriptions' entity.
-        @type subscriber: L{jid.JID}
-        @return: deferred that returns a L{bool}.
+        @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>}
+        @return: deferred that returns a C{bool}.
         """
 
 
@@ -395,8 +448,9 @@
         """
         Get affiliations of entities with this node.
 
-        @return: deferred that returns a L{list} of tuples (jid, affiliation),
-        where jid is a L(jid.JID) and affiliation is one of C{'owner'},
+        @return: deferred that returns a C{list} of tuples (jid, affiliation),
+                 where jid is a L(JID<twisted.words.protocols.jabber.jid.JID>)
+                 and affiliation is one of C{'owner'},
         C{'publisher'}, C{'outcast'}.
         """
 
@@ -415,9 +469,9 @@
                       L{domish} representation of the XML fragment as defined
                       for C{<item/>} in the
                       C{http://jabber.org/protocol/pubsub} namespace.
-        @type items: L{list} of {domish.Element}
+        @type items: C{list} of {domish.Element}
         @param publisher: JID of the publishing entity.
-        @type publisher: L{jid.JID}
+        @type publisher: L{JID<twisted.words.protocols.jabber.jid.JID>}
         @return: deferred that fires upon success.
         """
 
@@ -426,8 +480,8 @@
         """
         Remove items by id.
 
-        @param itemIdentifiers: L{list} of item ids.
-        @return: deferred that fires with a L{list} of ids of the items that
+        @param itemIdentifiers: C{list} of item ids.
+        @return: deferred that fires with a C{list} of ids of the items that
                  were deleted
         """
 
@@ -443,7 +497,7 @@
 
         @param maxItems: if given, a natural number (>0) that limits the
                           returned number of items.
-        @return: deferred that fires with a L{list} of found items.
+        @return: deferred that fires with a C{list} of found items.
         """
 
 
@@ -455,8 +509,8 @@
         represent the XML of the item as it was published, including the
         item wrapper with item id.
 
-        @param itemIdentifiers: L{list} of item ids.
-        @return: deferred that fires with a L{list} of found items.
+        @param itemIdentifiers: C{list} of item ids.
+        @return: deferred that fires with a C{list} of found items.
         """
 
 
--- a/idavoll/memory_storage.py	Mon Aug 04 07:10:45 2008 +0000
+++ b/idavoll/memory_storage.py	Mon Aug 04 13:47:10 2008 +0000
@@ -6,19 +6,30 @@
 from twisted.internet import defer
 from twisted.words.protocols.jabber import jid
 
-from idavoll import error, iidavoll
+from wokkel.pubsub import Subscription
 
-defaultConfig = {"pubsub#persist_items": True,
-                  "pubsub#deliver_payloads": True,
-                  "pubsub#send_last_published_item": 'on_sub',
-                  "pubsub#node_type": "leaf"}
+from idavoll import error, iidavoll
 
 class Storage:
 
     implements(iidavoll.IStorage)
 
+    defaultConfig = {
+            'leaf': {
+                "pubsub#persist_items": True,
+                "pubsub#deliver_payloads": True,
+                "pubsub#send_last_published_item": 'on_sub',
+            },
+            'collection': {
+                "pubsub#deliver_payloads": True,
+                "pubsub#send_last_published_item": 'on_sub',
+            }
+    }
+
     def __init__(self):
-        self._nodes = {}
+        rootNode = CollectionNode('', jid.JID('localhost'),
+                                  copy.copy(self.defaultConfig['collection']))
+        self._nodes = {'': rootNode}
 
 
     def getNode(self, nodeIdentifier):
@@ -34,15 +45,12 @@
         return defer.succeed(self._nodes.keys())
 
 
-    def createNode(self, nodeIdentifier, owner, config=None):
+    def createNode(self, nodeIdentifier, owner, config):
         if nodeIdentifier in self._nodes:
             return defer.fail(error.NodeExists())
 
-        if not config:
-            config = copy.copy(defaultConfig)
-
         if config['pubsub#node_type'] != 'leaf':
-            raise NotImplementedError
+            raise error.NoCollections()
 
         node = LeafNode(nodeIdentifier, owner, config)
         self._nodes[nodeIdentifier] = node
@@ -72,12 +80,17 @@
             for subscriber, subscription in node._subscriptions.iteritems():
                 subscriber = jid.internJID(subscriber)
                 if subscriber.userhostJID() == entity.userhostJID():
-                    subscriptions.append((node.nodeIdentifier, subscriber,
-                                          subscription.state))
+                    subscriptions.append(subscription)
 
         return defer.succeed(subscriptions)
 
 
+    def getDefaultConfiguration(self, nodeType):
+        if nodeType == 'collection':
+            raise error.NoCollections()
+
+        return self.defaultConfig[nodeType]
+
 
 class Node:
 
@@ -120,17 +133,25 @@
         try:
             subscription = self._subscriptions[subscriber.full()]
         except KeyError:
-            state = None
+            return defer.succeed(None)
         else:
-            state = subscription.state
-        return defer.succeed(state)
+            return defer.succeed(subscription)
 
 
-    def addSubscription(self, subscriber, state):
+    def getSubscriptions(self, state=None):
+        return defer.succeed(
+                [subscription
+                 for subscription in self._subscriptions.itervalues()
+                 if state is None or subscription.state == state])
+
+
+
+    def addSubscription(self, subscriber, state, options):
         if self._subscriptions.get(subscriber.full()):
             return defer.fail(error.SubscriptionExists())
 
-        subscription = Subscription(state)
+        subscription = Subscription(self.nodeIdentifier, subscriber, state,
+                                    options)
         self._subscriptions[subscriber.full()] = subscription
         return defer.succeed(None)
 
@@ -144,14 +165,6 @@
         return defer.succeed(None)
 
 
-    def getSubscribers(self):
-        subscribers = [jid.internJID(subscriber) for subscriber, subscription
-                       in self._subscriptions.iteritems()
-                       if subscription.state == 'subscribed']
-
-        return defer.succeed(subscribers)
-
-
     def isSubscribed(self, entity):
         for subscriber, subscription in self._subscriptions.iteritems():
             if jid.internJID(subscriber).userhost() == entity.userhost() and \
@@ -187,11 +200,14 @@
 
 
 
-class LeafNodeMixin:
+class LeafNode(Node):
+
+    implements(iidavoll.ILeafNode)
 
     nodeType = 'leaf'
 
-    def __init__(self):
+    def __init__(self, nodeIdentifier, owner, config):
+        Node.__init__(self, nodeIdentifier, owner, config)
         self._items = {}
         self._itemlist = []
 
@@ -251,21 +267,8 @@
         return defer.succeed(None)
 
 
-
-class LeafNode(Node, LeafNodeMixin):
-
-    implements(iidavoll.ILeafNode)
-
-    def __init__(self, nodeIdentifier, owner, config):
-        Node.__init__(self, nodeIdentifier, owner, config)
-        LeafNodeMixin.__init__(self)
-
-
-
-class Subscription:
-
-    def __init__(self, state):
-        self.state = state
+class CollectionNode(Node):
+    nodeType = 'collection'
 
 
 
--- a/idavoll/pgsql_storage.py	Mon Aug 04 07:10:45 2008 +0000
+++ b/idavoll/pgsql_storage.py	Mon Aug 04 13:47:10 2008 +0000
@@ -4,8 +4,12 @@
 import copy
 
 from zope.interface import implements
+
+from twisted.enterprise import adbapi
 from twisted.words.protocols.jabber import jid
-from wokkel.generic import parseXml
+
+from wokkel.generic import parseXml, stripNamespace
+from wokkel.pubsub import Subscription
 
 from idavoll import error, iidavoll
 
@@ -13,6 +17,17 @@
 
     implements(iidavoll.IStorage)
 
+    defaultConfig = {
+            'leaf': {
+                "pubsub#persist_items": True,
+                "pubsub#deliver_payloads": True,
+                "pubsub#send_last_published_item": 'on_sub',
+            },
+            'collection': {
+                "pubsub#deliver_payloads": True,
+                "pubsub#send_last_published_item": 'on_sub',
+            }
+    }
 
     def __init__(self, dbpool):
         self.dbpool = dbpool
@@ -24,22 +39,36 @@
 
     def _getNode(self, cursor, nodeIdentifier):
         configuration = {}
-        cursor.execute("""SELECT persistent, deliver_payload,
+        cursor.execute("""SELECT node_type,
+                                 persist_items,
+                                 deliver_payloads,
                                  send_last_published_item
                           FROM nodes
                           WHERE node=%s""",
                        (nodeIdentifier,))
-        try:
-            (configuration["pubsub#persist_items"],
-             configuration["pubsub#deliver_payloads"],
-             configuration["pubsub#send_last_published_item"]) = \
-            cursor.fetchone()
-        except TypeError:
+        row = cursor.fetchone()
+
+        if not row:
             raise error.NodeNotFound()
-        else:
+
+        if row.node_type == 'leaf':
+            configuration = {
+                    'pubsub#persist_items': row.persist_items,
+                    'pubsub#deliver_payloads': row.deliver_payloads,
+                    'pubsub#send_last_published_item':
+                        row.send_last_published_item}
             node = LeafNode(nodeIdentifier, configuration)
             node.dbpool = self.dbpool
             return node
+        elif row.node_type == 'collection':
+            configuration = {
+                    'pubsub#deliver_payloads': row.deliver_payloads,
+                    'pubsub#send_last_published_item':
+                        row.send_last_published_item}
+            node = CollectionNode(nodeIdentifier, configuration)
+            node.dbpool = self.dbpool
+            return node
+
 
 
     def getNodeIds(self):
@@ -48,16 +77,27 @@
         return d
 
 
-    def createNode(self, nodeIdentifier, owner, config=None):
+    def createNode(self, nodeIdentifier, owner, config):
         return self.dbpool.runInteraction(self._createNode, nodeIdentifier,
-                                           owner)
+                                           owner, config)
 
 
-    def _createNode(self, cursor, nodeIdentifier, owner):
+    def _createNode(self, cursor, nodeIdentifier, owner, config):
+        if config['pubsub#node_type'] != 'leaf':
+            raise error.NoCollections()
+
         owner = owner.userhost()
         try:
-            cursor.execute("""INSERT INTO nodes (node) VALUES (%s)""",
-                           (nodeIdentifier))
+            cursor.execute("""INSERT INTO nodes
+                              (node, node_type, persist_items,
+                               deliver_payloads, send_last_published_item)
+                              VALUES
+                              (%s, 'leaf', %s, %s, %s)""",
+                           (nodeIdentifier,
+                            config['pubsub#persist_items'],
+                            config['pubsub#deliver_payloads'],
+                            config['pubsub#send_last_published_item'])
+                           )
         except cursor._pool.dbapi.OperationalError:
             raise error.NodeExists()
 
@@ -70,10 +110,11 @@
 
         cursor.execute("""INSERT INTO affiliations
                           (node_id, entity_id, affiliation)
-                          SELECT n.id, e.id, 'owner' FROM
-                          (SELECT id FROM nodes WHERE node=%s) AS n
+                          SELECT node_id, entity_id, 'owner' FROM
+                          (SELECT node_id FROM nodes WHERE node=%s) as n
                           CROSS JOIN
-                          (SELECT id FROM entities WHERE jid=%s) AS e""",
+                          (SELECT entity_id FROM entities
+                                            WHERE jid=%s) as e""",
                        (nodeIdentifier, owner))
 
 
@@ -91,10 +132,8 @@
 
     def getAffiliations(self, entity):
         d = self.dbpool.runQuery("""SELECT node, affiliation FROM entities
-                                        JOIN affiliations ON
-                                        (affiliations.entity_id=entities.id)
-                                        JOIN nodes ON
-                                        (nodes.id=affiliations.node_id)
+                                        NATURAL JOIN affiliations
+                                        NATURAL JOIN nodes
                                         WHERE jid=%s""",
                                      (entity.userhost(),))
         d.addCallback(lambda results: [tuple(r) for r in results])
@@ -102,22 +141,27 @@
 
 
     def getSubscriptions(self, entity):
-        d = self.dbpool.runQuery("""SELECT node, jid, resource, subscription
-                                     FROM entities JOIN subscriptions ON
-                                     (subscriptions.entity_id=entities.id)
-                                     JOIN nodes ON
-                                     (nodes.id=subscriptions.node_id)
+        def toSubscriptions(rows):
+            subscriptions = []
+            for row in rows:
+                subscriber = jid.internJID('%s/%s' % (row.jid,
+                                                      row.resource))
+                subscription = Subscription(row.node, subscriber, row.state)
+                subscriptions.append(subscription)
+            return subscriptions
+
+        d = self.dbpool.runQuery("""SELECT node, jid, resource, state
+                                     FROM entities
+                                     NATURAL JOIN subscriptions
+                                     NATURAL JOIN nodes
                                      WHERE jid=%s""",
                                   (entity.userhost(),))
-        d.addCallback(self._convertSubscriptionJIDs)
+        d.addCallback(toSubscriptions)
         return d
 
 
-    def _convertSubscriptionJIDs(self, subscriptions):
-        return [(node,
-                 jid.internJID('%s/%s' % (subscriber, resource)),
-                 subscription)
-                for node, subscriber, resource, subscription in subscriptions]
+    def getDefaultConfiguration(self, nodeType):
+        return self.defaultConfig[nodeType]
 
 
 
@@ -131,7 +175,7 @@
 
 
     def _checkNodeExists(self, cursor):
-        cursor.execute("""SELECT id FROM nodes WHERE node=%s""",
+        cursor.execute("""SELECT node_id FROM nodes WHERE node=%s""",
                        (self.nodeIdentifier))
         if not cursor.fetchone():
             raise error.NodeNotFound()
@@ -159,7 +203,8 @@
 
     def _setConfiguration(self, cursor, config):
         self._checkNodeExists(cursor)
-        cursor.execute("""UPDATE nodes SET persistent=%s, deliver_payload=%s,
+        cursor.execute("""UPDATE nodes SET persist_items=%s,
+                                           deliver_payloads=%s,
                                            send_last_published_item=%s
                           WHERE node=%s""",
                        (config["pubsub#persist_items"],
@@ -185,8 +230,8 @@
     def _getAffiliation(self, cursor, entity):
         self._checkNodeExists(cursor)
         cursor.execute("""SELECT affiliation FROM affiliations
-                          JOIN nodes ON (node_id=nodes.id)
-                          JOIN entities ON (entity_id=entities.id)
+                          NATURAL JOIN nodes
+                          NATURAL JOIN entities
                           WHERE node=%s AND jid=%s""",
                        (self.nodeIdentifier,
                         entity.userhost()))
@@ -207,31 +252,72 @@
         userhost = subscriber.userhost()
         resource = subscriber.resource or ''
 
-        cursor.execute("""SELECT subscription FROM subscriptions
-                          JOIN nodes ON (nodes.id=subscriptions.node_id)
-                          JOIN entities ON
-                               (entities.id=subscriptions.entity_id)
+        cursor.execute("""SELECT state FROM subscriptions
+                          NATURAL JOIN nodes
+                          NATURAL JOIN entities
                           WHERE node=%s AND jid=%s AND resource=%s""",
                        (self.nodeIdentifier,
                         userhost,
                         resource))
-        try:
-            return cursor.fetchone()[0]
-        except TypeError:
+        row = cursor.fetchone()
+        if not row:
             return None
+        else:
+            return Subscription(self.nodeIdentifier, subscriber, row.state)
+
+
+    def getSubscriptions(self, state=None):
+        return self.dbpool.runInteraction(self._getSubscriptions, state)
 
 
-    def addSubscription(self, subscriber, state):
-        return self.dbpool.runInteraction(self._addSubscription, subscriber,
-                                          state)
+    def _getSubscriptions(self, cursor, state):
+        self._checkNodeExists(cursor)
+
+        query = """SELECT jid, resource, state,
+                          subscription_type, subscription_depth
+                   FROM subscriptions
+                   NATURAL JOIN nodes
+                   NATURAL JOIN entities
+                   WHERE node=%s""";
+        values = [self.nodeIdentifier]
+
+        if state:
+            query += " AND state=%s"
+            values.append(state)
+
+        cursor.execute(query, values);
+        rows = cursor.fetchall()
+
+        subscriptions = []
+        for row in rows:
+            subscriber = jid.JID('%s/%s' % (row.jid, row.resource))
+
+            options = {}
+            if row.subscription_type:
+                options['pubsub#subscription_type'] = row.subscription_type;
+            if row.subscription_depth:
+                options['pubsub#subscription_depth'] = row.subscription_depth;
+
+            subscriptions.append(Subscription(self.nodeIdentifier, subscriber,
+                                              row.state, options))
+
+        return subscriptions
 
 
-    def _addSubscription(self, cursor, subscriber, state):
+    def addSubscription(self, subscriber, state, config):
+        return self.dbpool.runInteraction(self._addSubscription, subscriber,
+                                          state, config)
+
+
+    def _addSubscription(self, cursor, subscriber, state, config):
         self._checkNodeExists(cursor)
 
         userhost = subscriber.userhost()
         resource = subscriber.resource or ''
 
+        subscription_type = config.get('pubsub#subscription_type')
+        subscription_depth = config.get('pubsub#subscription_depth')
+
         try:
             cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
                            (userhost))
@@ -240,13 +326,18 @@
 
         try:
             cursor.execute("""INSERT INTO subscriptions
-                              (node_id, entity_id, resource, subscription)
-                              SELECT n.id, e.id, %s, %s FROM
-                              (SELECT id FROM nodes WHERE node=%s) AS n
+                              (node_id, entity_id, resource, state,
+                               subscription_type, subscription_depth)
+                              SELECT node_id, entity_id, %s, %s, %s, %s FROM
+                              (SELECT node_id FROM nodes
+                                              WHERE node=%s) as n
                               CROSS JOIN
-                              (SELECT id FROM entities WHERE jid=%s) AS e""",
+                              (SELECT entity_id FROM entities
+                                                WHERE jid=%s) as e""",
                            (resource,
                             state,
+                            subscription_type,
+                            subscription_depth,
                             self.nodeIdentifier,
                             userhost))
         except cursor._pool.dbapi.OperationalError:
@@ -265,9 +356,11 @@
         resource = subscriber.resource or ''
 
         cursor.execute("""DELETE FROM subscriptions WHERE
-                          node_id=(SELECT id FROM nodes WHERE node=%s) AND
-                          entity_id=(SELECT id FROM entities WHERE jid=%s)
-                          AND resource=%s""",
+                          node_id=(SELECT node_id FROM nodes
+                                                  WHERE node=%s) AND
+                          entity_id=(SELECT entity_id FROM entities
+                                                      WHERE jid=%s) AND
+                          resource=%s""",
                        (self.nodeIdentifier,
                         userhost,
                         resource))
@@ -277,27 +370,6 @@
         return None
 
 
-    def getSubscribers(self):
-        d = self.dbpool.runInteraction(self._getSubscribers)
-        d.addCallback(self._convertToJIDs)
-        return d
-
-
-    def _getSubscribers(self, cursor):
-        self._checkNodeExists(cursor)
-        cursor.execute("""SELECT jid, resource FROM subscriptions
-                          JOIN nodes ON (node_id=nodes.id)
-                          JOIN entities ON (entity_id=entities.id)
-                          WHERE node=%s AND
-                          subscription='subscribed'""",
-                       (self.nodeIdentifier,))
-        return cursor.fetchall()
-
-
-    def _convertToJIDs(self, list):
-        return [jid.internJID("%s/%s" % (l[0], l[1])) for l in list]
-
-
     def isSubscribed(self, entity):
         return self.dbpool.runInteraction(self._isSubscribed, entity)
 
@@ -306,12 +378,10 @@
         self._checkNodeExists(cursor)
 
         cursor.execute("""SELECT 1 FROM entities
-                          JOIN subscriptions ON
-                          (entities.id=subscriptions.entity_id)
-                          JOIN nodes ON
-                          (nodes.id=subscriptions.node_id)
+                          NATURAL JOIN subscriptions
+                          NATURAL JOIN nodes
                           WHERE entities.jid=%s
-                          AND node=%s AND subscription='subscribed'""",
+                          AND node=%s AND state='subscribed'""",
                        (entity.userhost(),
                        self.nodeIdentifier))
 
@@ -326,10 +396,8 @@
         self._checkNodeExists(cursor)
 
         cursor.execute("""SELECT jid, affiliation FROM nodes
-                          JOIN affiliations ON
-                            (nodes.id = affiliations.node_id)
-                          JOIN entities ON
-                            (affiliations.entity_id = entities.id)
+                          NATURAL JOIN affiliations
+                          NATURAL JOIN entities
                           WHERE node=%s""",
                        self.nodeIdentifier)
         result = cursor.fetchall()
@@ -338,7 +406,9 @@
 
 
 
-class LeafNodeMixin:
+class LeafNode(Node):
+
+    implements(iidavoll.ILeafNode)
 
     nodeType = 'leaf'
 
@@ -356,7 +426,7 @@
         data = item.toXml()
         cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s
                           FROM nodes
-                          WHERE nodes.id = items.node_id AND
+                          WHERE nodes.node_id = items.node_id AND
                                 nodes.node = %s and items.item=%s""",
                        (publisher.full(),
                         data,
@@ -366,7 +436,8 @@
             return
 
         cursor.execute("""INSERT INTO items (node_id, item, publisher, data)
-                          SELECT id, %s, %s, %s FROM nodes WHERE node=%s""",
+                          SELECT node_id, %s, %s, %s FROM nodes
+                                                     WHERE node=%s""",
                        (item["id"],
                         publisher.full(),
                         data,
@@ -384,7 +455,8 @@
 
         for itemIdentifier in itemIdentifiers:
             cursor.execute("""DELETE FROM items WHERE
-                              node_id=(SELECT id FROM nodes WHERE node=%s) AND
+                              node_id=(SELECT node_id FROM nodes
+                                                      WHERE node=%s) AND
                               item=%s""",
                            (self.nodeIdentifier,
                             itemIdentifier))
@@ -401,8 +473,8 @@
 
     def _getItems(self, cursor, maxItems):
         self._checkNodeExists(cursor)
-        query = """SELECT data FROM nodes JOIN items ON
-                   (nodes.id=items.node_id)
+        query = """SELECT data FROM nodes
+                   NATURAL JOIN items
                    WHERE node=%s ORDER BY date DESC"""
         if maxItems:
             cursor.execute(query + " LIMIT %s",
@@ -412,7 +484,8 @@
             cursor.execute(query, (self.nodeIdentifier))
 
         result = cursor.fetchall()
-        return [parseXml(r[0]) for r in result]
+        items = [stripNamespace(parseXml(r[0])) for r in result]
+        return items
 
 
     def getItemsById(self, itemIdentifiers):
@@ -423,8 +496,8 @@
         self._checkNodeExists(cursor)
         items = []
         for itemIdentifier in itemIdentifiers:
-            cursor.execute("""SELECT data FROM nodes JOIN items ON
-                              (nodes.id=items.node_id)
+            cursor.execute("""SELECT data FROM nodes
+                              NATURAL JOIN items
                               WHERE node=%s AND item=%s""",
                            (self.nodeIdentifier,
                             itemIdentifier))
@@ -442,14 +515,13 @@
         self._checkNodeExists(cursor)
 
         cursor.execute("""DELETE FROM items WHERE
-                          node_id=(SELECT id FROM nodes WHERE node=%s)""",
+                          node_id=(SELECT node_id FROM nodes WHERE node=%s)""",
                        (self.nodeIdentifier,))
 
 
+class CollectionNode(Node):
 
-class LeafNode(Node, LeafNodeMixin):
-
-    implements(iidavoll.ILeafNode)
+    nodeType = 'collection'
 
 
 
@@ -482,7 +554,7 @@
                            nodeIdentifier,
                            callback)
             if cursor.fetchall():
-                raise error.SubscriptionExists()
+                return
 
             cursor.execute("""INSERT INTO callbacks
                               (service, node, uri) VALUES
--- a/idavoll/tap_http.py	Mon Aug 04 07:10:45 2008 +0000
+++ b/idavoll/tap_http.py	Mon Aug 04 13:47:10 2008 +0000
@@ -1,10 +1,11 @@
 # Copyright (c) 2003-2008 Ralph Meijer
 # See LICENSE for details.
 
-from twisted.application import internet, strports
+from twisted.application import internet, service, strports
 from twisted.conch import manhole, manhole_ssh
 from twisted.cred import portal, checkers
-from twisted.web2 import channel, resource, server
+from twisted.web2 import channel, log, resource, server
+from twisted.web2.tap import Web2Service
 
 from idavoll import gateway, tap
 from idavoll.gateway import RemoteSubscriptionService
@@ -15,6 +16,7 @@
     ]
 
 
+
 def getManholeFactory(namespace, **passwords):
     def getManHole(_):
         return manhole.Manhole(namespace)
@@ -28,6 +30,7 @@
     return f
 
 
+
 def makeService(config):
     s = tap.makeService(config)
 
@@ -67,13 +70,22 @@
 
     site = server.Site(root)
     w = internet.TCPServer(int(config['webport']), channel.HTTPFactory(site))
+
+    if config["verbose"]:
+        logObserver = log.DefaultCommonAccessLoggingObserver()
+        w2s = Web2Service(logObserver)
+        w.setServiceParent(w2s)
+        w = w2s
+
     w.setServiceParent(s)
 
     # Set up a manhole
 
     namespace = {'service': s,
                  'component': cs,
-                 'backend': bs}
+                 'backend': bs,
+                 'root': root}
+
     f = getManholeFactory(namespace, admin='admin')
     manholeService = strports.service('2222', f)
     manholeService.setServiceParent(s)
--- a/idavoll/test/test_backend.py	Mon Aug 04 07:10:45 2008 +0000
+++ b/idavoll/test/test_backend.py	Mon Aug 04 13:47:10 2008 +0000
@@ -5,6 +5,9 @@
 Tests for L{idavoll.backend}.
 """
 
+from zope.interface import implements
+from zope.interface.verify import verifyObject
+
 from twisted.internet import defer
 from twisted.trial import unittest
 from twisted.words.protocols.jabber import jid
@@ -12,22 +15,28 @@
 
 from wokkel import pubsub
 
-from idavoll import backend, error
+from idavoll import backend, error, iidavoll
 
 OWNER = jid.JID('owner@example.com')
 NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
 
 class BackendTest(unittest.TestCase):
+
+    def test_interfaceIBackend(self):
+        self.assertTrue(verifyObject(iidavoll.IBackendService,
+                                     backend.BackendService(None)))
+
+
     def test_deleteNode(self):
-        class testNode:
+        class TestNode:
             nodeIdentifier = 'to-be-deleted'
             def getAffiliation(self, entity):
                 if entity is OWNER:
                     return defer.succeed('owner')
 
-        class testStorage:
+        class TestStorage:
             def getNode(self, nodeIdentifier):
-                return defer.succeed(testNode())
+                return defer.succeed(TestNode())
 
             def deleteNode(self, nodeIdentifier):
                 if nodeIdentifier in ['to-be-deleted']:
@@ -44,7 +53,7 @@
             self.assertTrue(self.preDeleteCalled)
             self.assertTrue(self.storage.deleteCalled)
 
-        self.storage = testStorage()
+        self.storage = TestStorage()
         self.backend = backend.BackendService(self.storage)
         self.storage.backend = self.backend
 
@@ -61,12 +70,15 @@
         """
         Test creation of a node without a given node identifier.
         """
-        class testStorage:
-            def createNode(self, nodeIdentifier, requestor):
+        class TestStorage:
+            def getDefaultConfiguration(self, nodeType):
+                return {}
+
+            def createNode(self, nodeIdentifier, requestor, config):
                 self.nodeIdentifier = nodeIdentifier
                 return defer.succeed(None)
 
-        self.storage = testStorage()
+        self.storage = TestStorage()
         self.backend = backend.BackendService(self.storage)
         self.storage.backend = self.backend
 
@@ -78,6 +90,112 @@
         d.addCallback(checkID)
         return d
 
+    class NodeStore:
+        """
+        I just store nodes to pose as an L{IStorage} implementation.
+        """
+        def __init__(self, nodes):
+            self.nodes = nodes
+
+        def getNode(self, nodeIdentifier):
+            try:
+                return defer.succeed(self.nodes[nodeIdentifier])
+            except KeyError:
+                return defer.fail(error.NodeNotFound())
+
+
+    def test_getNotifications(self):
+        """
+        Ensure subscribers show up in the notification list.
+        """
+        item = pubsub.Item()
+        sub = pubsub.Subscription('test', OWNER, 'subscribed')
+
+        class TestNode:
+            def getSubscriptions(self, state=None):
+                return [sub]
+
+        def cb(result):
+            self.assertEquals(1, len(result))
+            subscriber, subscriptions, items = result[-1]
+
+            self.assertEquals(OWNER, subscriber)
+            self.assertEquals(set([sub]), subscriptions)
+            self.assertEquals([item], items)
+
+        self.storage = self.NodeStore({'test': TestNode()})
+        self.backend = backend.BackendService(self.storage)
+        d = self.backend.getNotifications('test', [item])
+        d.addCallback(cb)
+        return d
+
+    def test_getNotificationsRoot(self):
+        """
+        Ensure subscribers to the root node show up in the notification list
+        for leaf nodes.
+
+        This assumes a flat node relationship model with exactly one collection
+        node: the root node. Each leaf node is automatically a child node
+        of the root node.
+        """
+        item = pubsub.Item()
+        subRoot = pubsub.Subscription('', OWNER, 'subscribed')
+
+        class TestNode:
+            def getSubscriptions(self, state=None):
+                return []
+
+        class TestRootNode:
+            def getSubscriptions(self, state=None):
+                return [subRoot]
+
+        def cb(result):
+            self.assertEquals(1, len(result))
+            subscriber, subscriptions, items = result[-1]
+            self.assertEquals(OWNER, subscriber)
+            self.assertEquals(set([subRoot]), subscriptions)
+            self.assertEquals([item], items)
+
+        self.storage = self.NodeStore({'test': TestNode(),
+                                       '': TestRootNode()})
+        self.backend = backend.BackendService(self.storage)
+        d = self.backend.getNotifications('test', [item])
+        d.addCallback(cb)
+        return d
+
+
+    def test_getNotificationsMultipleNodes(self):
+        """
+        Ensure that entities that subscribe to a leaf node as well as the
+        root node get exactly one notification.
+        """
+        item = pubsub.Item()
+        sub = pubsub.Subscription('test', OWNER, 'subscribed')
+        subRoot = pubsub.Subscription('', OWNER, 'subscribed')
+
+        class TestNode:
+            def getSubscriptions(self, state=None):
+                return [sub]
+
+        class TestRootNode:
+            def getSubscriptions(self, state=None):
+                return [subRoot]
+
+        def cb(result):
+            self.assertEquals(1, len(result))
+            subscriber, subscriptions, items = result[-1]
+
+            self.assertEquals(OWNER, subscriber)
+            self.assertEquals(set([sub, subRoot]), subscriptions)
+            self.assertEquals([item], items)
+
+        self.storage = self.NodeStore({'test': TestNode(),
+                                       '': TestRootNode()})
+        self.backend = backend.BackendService(self.storage)
+        d = self.backend.getNotifications('test', [item])
+        d.addCallback(cb)
+        return d
+
 
     def test_getDefaultConfiguration(self):
         """
@@ -85,12 +203,18 @@
         a deferred that fires a dictionary with configuration values.
         """
 
+        class TestStorage:
+            def getDefaultConfiguration(self, nodeType):
+                return {
+                    "pubsub#persist_items": True,
+                    "pubsub#deliver_payloads": True}
+
         def cb(options):
             self.assertIn("pubsub#persist_items", options)
             self.assertEqual(True, options["pubsub#persist_items"])
 
-        self.backend = backend.BackendService(None)
-        d = self.backend.getDefaultConfiguration()
+        self.backend = backend.BackendService(TestStorage())
+        d = self.backend.getDefaultConfiguration('leaf')
         d.addCallback(cb)
         return d
 
@@ -164,7 +288,8 @@
         """
         Test publish request with an item without a node identifier.
         """
-        class testNode:
+        class TestNode:
+            nodeType = 'leaf'
             nodeIdentifier = 'node'
             def getAffiliation(self, entity):
                 if entity is OWNER:
@@ -173,14 +298,14 @@
                 return {'pubsub#deliver_payloads': True,
                         'pubsub#persist_items': False}
 
-        class testStorage:
+        class TestStorage:
             def getNode(self, nodeIdentifier):
-                return defer.succeed(testNode())
+                return defer.succeed(TestNode())
 
         def checkID(notification):
             self.assertNotIdentical(None, notification['items'][0]['id'])
 
-        self.storage = testStorage()
+        self.storage = TestStorage()
         self.backend = backend.BackendService(self.storage)
         self.storage.backend = self.backend
 
@@ -197,8 +322,10 @@
         """
         ITEM = "<item xmlns='%s' id='1'/>" % NS_PUBSUB
 
-        class testNode:
+        class TestNode:
+            implements(iidavoll.ILeafNode)
             nodeIdentifier = 'node'
+            nodeType = 'leaf'
             def getAffiliation(self, entity):
                 if entity is OWNER:
                     return defer.succeed('owner')
@@ -208,19 +335,23 @@
                         'pubsub#send_last_published_item': 'on_sub'}
             def getItems(self, maxItems):
                 return [ITEM]
-            def addSubscription(self, subscriber, state):
+            def addSubscription(self, subscriber, state, options):
+                self.subscription = pubsub.Subscription('node', subscriber,
+                                                        state, options)
                 return defer.succeed(None)
+            def getSubscription(self, subscriber):
+                return defer.succeed(self.subscription)
 
-        class testStorage:
+        class TestStorage:
             def getNode(self, nodeIdentifier):
-                return defer.succeed(testNode())
+                return defer.succeed(TestNode())
 
         def cb(data):
             self.assertEquals('node', data['nodeIdentifier'])
             self.assertEquals([ITEM], data['items'])
-            self.assertEquals(OWNER, data['subscriber'])
+            self.assertEquals(OWNER, data['subscription'].subscriber)
 
-        self.storage = testStorage()
+        self.storage = TestStorage()
         self.backend = backend.BackendService(self.storage)
         self.storage.backend = self.backend
 
@@ -311,7 +442,7 @@
 
     def test_getConfigurationOptions(self):
         class TestBackend(BaseTestBackend):
-            options = {
+            nodeOptions = {
                     "pubsub#persist_items":
                         {"type": "boolean",
                          "label": "Persist items to storage"},
@@ -327,7 +458,7 @@
 
     def test_getDefaultConfiguration(self):
         class TestBackend(BaseTestBackend):
-            def getDefaultConfiguration(self):
+            def getDefaultConfiguration(self, nodeType):
                 options = {"pubsub#persist_items": True,
                            "pubsub#deliver_payloads": True,
                            "pubsub#send_last_published_item": 'on_sub',
@@ -338,6 +469,6 @@
             self.assertEquals(True, options["pubsub#persist_items"])
 
         s = backend.PubSubServiceFromBackend(TestBackend())
-        d = s.getDefaultConfiguration(OWNER, 'test.example.org')
+        d = s.getDefaultConfiguration(OWNER, 'test.example.org', 'leaf')
         d.addCallback(cb)
         return d
--- a/idavoll/test/test_gateway.py	Mon Aug 04 07:10:45 2008 +0000
+++ b/idavoll/test/test_gateway.py	Mon Aug 04 13:47:10 2008 +0000
@@ -18,11 +18,12 @@
 AGENT = "Idavoll Test Script"
 NS_ATOM = "http://www.w3.org/2005/Atom"
 
-entry = domish.Element((NS_ATOM, 'entry'))
-entry.addElement("id", content="urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a")
-entry.addElement("title", content="Atom-Powered Robots Run Amok")
-entry.addElement("author").addElement("name", content="John Doe")
-entry.addElement("content", content="Some text.")
+TEST_ENTRY = domish.Element((NS_ATOM, 'entry'))
+TEST_ENTRY.addElement("id",
+                      content="urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a")
+TEST_ENTRY.addElement("title", content="Atom-Powered Robots Run Amok")
+TEST_ENTRY.addElement("author").addElement("name", content="John Doe")
+TEST_ENTRY.addElement("content", content="Some text.")
 
 baseURI = "http://localhost:8086/"
 componentJID = "pubsub"
@@ -33,10 +34,27 @@
     def setUp(self):
         self.client = gateway.GatewayClient(baseURI)
         self.client.startService()
+        self.addCleanup(self.client.stopService)
+
+        def trapConnectionRefused(failure):
+            from twisted.internet.error import ConnectionRefusedError
+            failure.trap(ConnectionRefusedError)
+            raise unittest.SkipTest("Gateway to test against is not available")
+
+        def trapNotFound(failure):
+            from twisted.web.error import Error
+            failure.trap(Error)
+
+        d = self.client.ping()
+        d.addErrback(trapConnectionRefused)
+        d.addErrback(trapNotFound)
+        return d
+
 
     def tearDown(self):
         return self.client.stopService()
 
+
     def test_create(self):
 
         def cb(response):
@@ -51,7 +69,7 @@
         def cb(response):
             self.assertIn('uri', response)
 
-        d = self.client.publish(entry)
+        d = self.client.publish(TEST_ENTRY)
         d.addCallback(cb)
         return d
 
@@ -62,7 +80,7 @@
 
         def cb1(response):
             xmppURI = response['uri']
-            d = self.client.publish(entry, xmppURI)
+            d = self.client.publish(TEST_ENTRY, xmppURI)
             d.addCallback(cb2, xmppURI)
             return d
 
@@ -74,7 +92,7 @@
         def cb(err):
             self.assertEqual('404', err.status)
 
-        d = self.client.publish(entry, 'xmpp:%s?node=test' % componentJID)
+        d = self.client.publish(TEST_ENTRY, 'xmpp:%s?node=test' % componentJID)
         self.assertFailure(d, error.Error)
         d.addCallback(cb)
         return d
@@ -93,7 +111,6 @@
         d.addCallback(cb)
         return d
 
-
     def test_subscribeGetNotification(self):
 
         def onNotification(data, headers):
@@ -106,7 +123,7 @@
             return d
 
         def cb2(xmppURI):
-            d = self.client.publish(entry, xmppURI)
+            d = self.client.publish(TEST_ENTRY, xmppURI)
             return d
 
 
@@ -140,7 +157,7 @@
             return d
 
         def cb3(xmppURI):
-            d = self.client.publish(entry, xmppURI)
+            d = self.client.publish(TEST_ENTRY, xmppURI)
             return d
 
 
@@ -169,7 +186,7 @@
         def cb(response):
             xmppURI = response['uri']
             self.assertNot(self.client.deferred.called)
-            d = self.client.publish(entry, xmppURI)
+            d = self.client.publish(TEST_ENTRY, xmppURI)
             d.addCallback(lambda _: xmppURI)
             return d
 
@@ -202,7 +219,7 @@
             xmppURI = response['uri']
             self.assertNot(client1.deferred.called)
             self.assertNot(client2.deferred.called)
-            d = self.client.publish(entry, xmppURI)
+            d = self.client.publish(TEST_ENTRY, xmppURI)
             d.addCallback(lambda _: xmppURI)
             return d
 
@@ -224,6 +241,7 @@
         client2.callback = onNotification2
         client2.deferred = defer.Deferred()
 
+
         d = self.client.create()
         d.addCallback(cb)
         d.addCallback(cb2)
@@ -242,6 +260,32 @@
         return d
 
 
+    def test_subscribeRootGetNotification(self):
+
+        def onNotification(data, headers):
+            self.client.deferred.callback(None)
+
+        def cb(response):
+            xmppURI = response['uri']
+            jid, nodeIdentifier = gateway.getServiceAndNode(xmppURI)
+            rootNode = gateway.getXMPPURI(jid, '')
+
+            d = self.client.subscribe(rootNode)
+            d.addCallback(lambda _: xmppURI)
+            return d
+
+        def cb2(xmppURI):
+            return self.client.publish(TEST_ENTRY, xmppURI)
+
+
+        self.client.callback = onNotification
+        self.client.deferred = defer.Deferred()
+        d = self.client.create()
+        d.addCallback(cb)
+        d.addCallback(cb2)
+        return defer.gatherResults([d, self.client.deferred])
+
+
     def test_unsubscribeNonExisting(self):
         def cb(err):
             self.assertEqual('403', err.status)
@@ -258,7 +302,7 @@
             d = self.client.items(xmppURI)
             return d
 
-        d = self.client.publish(entry)
+        d = self.client.publish(TEST_ENTRY)
         d.addCallback(cb)
         return d
 
@@ -269,6 +313,6 @@
             d = self.client.items(xmppURI, 2)
             return d
 
-        d = self.client.publish(entry)
+        d = self.client.publish(TEST_ENTRY)
         d.addCallback(cb)
         return d
--- a/idavoll/test/test_storage.py	Mon Aug 04 07:10:45 2008 +0000
+++ b/idavoll/test/test_storage.py	Mon Aug 04 13:47:10 2008 +0000
@@ -5,6 +5,7 @@
 Tests for L{idavoll.memory_storage} and L{idavoll.pgsql_storage}.
 """
 
+from zope.interface.verify import verifyObject
 from twisted.trial import unittest
 from twisted.words.protocols.jabber import jid
 from twisted.internet import defer
@@ -12,7 +13,7 @@
 
 from wokkel import pubsub
 
-from idavoll import error
+from idavoll import error, iidavoll
 
 OWNER = jid.JID('owner@example.com')
 SUBSCRIBER = jid.JID('subscriber@example.com/Home')
@@ -20,17 +21,16 @@
 SUBSCRIBER_TO_BE_DELETED = jid.JID('to_be_deleted@example.com/Home')
 SUBSCRIBER_PENDING = jid.JID('pending@example.com/Home')
 PUBLISHER = jid.JID('publisher@example.com')
-ITEM = domish.Element((pubsub.NS_PUBSUB, 'item'), pubsub.NS_PUBSUB)
+ITEM = domish.Element((None, 'item'))
 ITEM['id'] = 'current'
 ITEM.addElement(('testns', 'test'), content=u'Test \u2083 item')
-ITEM_NEW = domish.Element((pubsub.NS_PUBSUB, 'item'), pubsub.NS_PUBSUB)
+ITEM_NEW = domish.Element((None, 'item'))
 ITEM_NEW['id'] = 'new'
 ITEM_NEW.addElement(('testns', 'test'), content=u'Test \u2083 item')
-ITEM_UPDATED = domish.Element((pubsub.NS_PUBSUB, 'item'), pubsub.NS_PUBSUB)
+ITEM_UPDATED = domish.Element((None, 'item'))
 ITEM_UPDATED['id'] = 'current'
 ITEM_UPDATED.addElement(('testns', 'test'), content=u'Test \u2084 item')
-ITEM_TO_BE_DELETED = domish.Element((pubsub.NS_PUBSUB, 'item'),
-                                    pubsub.NS_PUBSUB)
+ITEM_TO_BE_DELETED = domish.Element((None, 'item'))
 ITEM_TO_BE_DELETED['id'] = 'to-be-deleted'
 ITEM_TO_BE_DELETED.addElement(('testns', 'test'), content=u'Test \u2083 item')
 
@@ -53,6 +53,18 @@
         return d
 
 
+    def test_interfaceIStorage(self):
+        self.assertTrue(verifyObject(iidavoll.IStorage, self.s))
+
+
+    def test_interfaceINode(self):
+        self.assertTrue(verifyObject(iidavoll.INode, self.node))
+
+
+    def test_interfaceILeafNode(self):
+        self.assertTrue(verifyObject(iidavoll.ILeafNode, self.node))
+
+
     def test_getNode(self):
         return self.s.getNode('pre-existing')
 
@@ -72,7 +84,9 @@
 
 
     def test_createExistingNode(self):
-        d = self.s.createNode('pre-existing', OWNER)
+        config = self.s.getDefaultConfiguration('leaf')
+        config['pubsub#node_type'] = 'leaf'
+        d = self.s.createNode('pre-existing', OWNER, config)
         self.assertFailure(d, error.NodeExists)
         return d
 
@@ -82,7 +96,9 @@
             d = self.s.getNode('new 1')
             return d
 
-        d = self.s.createNode('new 1', OWNER)
+        config = self.s.getDefaultConfiguration('leaf')
+        config['pubsub#node_type'] = 'leaf'
+        d = self.s.createNode('new 1', OWNER, config)
         d.addCallback(cb)
         return d
 
@@ -115,7 +131,13 @@
 
     def test_getSubscriptions(self):
         def cb(subscriptions):
-            self.assertIn(('pre-existing', SUBSCRIBER, 'subscribed'), subscriptions)
+            found = False
+            for subscription in subscriptions:
+                if (subscription.nodeIdentifier == 'pre-existing' and
+                    subscription.subscriber == SUBSCRIBER and
+                    subscription.state == 'subscribed'):
+                    found = True
+            self.assertTrue(found)
 
         d = self.s.getSubscriptions(SUBSCRIBER)
         d.addCallback(cb)
@@ -192,30 +214,30 @@
         def cb1(void):
             return self.node.getSubscription(SUBSCRIBER_NEW)
 
-        def cb2(state):
-            self.assertEqual(state, 'pending')
+        def cb2(subscription):
+            self.assertEqual(subscription.state, 'pending')
 
-        d = self.node.addSubscription(SUBSCRIBER_NEW, 'pending')
+        d = self.node.addSubscription(SUBSCRIBER_NEW, 'pending', {})
         d.addCallback(cb1)
         d.addCallback(cb2)
         return d
 
 
     def test_addExistingSubscription(self):
-        d = self.node.addSubscription(SUBSCRIBER, 'pending')
+        d = self.node.addSubscription(SUBSCRIBER, 'pending', {})
         self.assertFailure(d, error.SubscriptionExists)
         return d
 
 
     def test_getSubscription(self):
         def cb(subscriptions):
-            self.assertEquals(subscriptions[0][1], 'subscribed')
-            self.assertEquals(subscriptions[1][1], 'pending')
-            self.assertEquals(subscriptions[2][1], None)
+            self.assertEquals(subscriptions[0].state, 'subscribed')
+            self.assertEquals(subscriptions[1].state, 'pending')
+            self.assertEquals(subscriptions[2], None)
 
-        d = defer.DeferredList([self.node.getSubscription(SUBSCRIBER),
-                                self.node.getSubscription(SUBSCRIBER_PENDING),
-                                self.node.getSubscription(OWNER)])
+        d = defer.gatherResults([self.node.getSubscription(SUBSCRIBER),
+                                 self.node.getSubscription(SUBSCRIBER_PENDING),
+                                 self.node.getSubscription(OWNER)])
         d.addCallback(cb)
         return d
 
@@ -230,13 +252,17 @@
         return d
 
 
-    def test_getSubscribers(self):
+    def test_getNodeSubscriptions(self):
+        def extractSubscribers(subscriptions):
+            return [subscription.subscriber for subscription in subscriptions]
+
         def cb(subscribers):
             self.assertIn(SUBSCRIBER, subscribers)
             self.assertNotIn(SUBSCRIBER_PENDING, subscribers)
             self.assertNotIn(OWNER, subscribers)
 
-        d = self.node.getSubscribers()
+        d = self.node.getSubscriptions('subscribed')
+        d.addCallback(extractSubscribers)
         d.addCallback(cb)
         return d
 
@@ -381,7 +407,9 @@
 
     def setUp(self):
         from idavoll.memory_storage import Storage, PublishedItem, LeafNode
-        from idavoll.memory_storage import Subscription, defaultConfig
+        from idavoll.memory_storage import Subscription
+
+        defaultConfig = Storage.defaultConfig['leaf']
 
         self.s = Storage()
         self.s._nodes['pre-existing'] = \
@@ -394,11 +422,15 @@
                 LeafNode('to-be-purged', OWNER, None)
 
         subscriptions = self.s._nodes['pre-existing']._subscriptions
-        subscriptions[SUBSCRIBER.full()] = Subscription('subscribed')
+        subscriptions[SUBSCRIBER.full()] = Subscription('pre-existing',
+                                                        SUBSCRIBER,
+                                                        'subscribed')
         subscriptions[SUBSCRIBER_TO_BE_DELETED.full()] = \
-                Subscription('subscribed')
+                Subscription('pre-existing', SUBSCRIBER_TO_BE_DELETED,
+                             'subscribed')
         subscriptions[SUBSCRIBER_PENDING.full()] = \
-                Subscription('pending')
+                Subscription('pre-existing', SUBSCRIBER_PENDING,
+                             'pending')
 
         item = PublishedItem(ITEM_TO_BE_DELETED, PUBLISHER)
         self.s._nodes['pre-existing']._items['to-be-deleted'] = item
@@ -436,7 +468,9 @@
 
     def init(self, cursor):
         self.cleandb(cursor)
-        cursor.execute("""INSERT INTO nodes (node) VALUES ('pre-existing')""")
+        cursor.execute("""INSERT INTO nodes
+                          (node, node_type, persist_items)
+                          VALUES ('pre-existing', 'leaf', TRUE)""")
         cursor.execute("""INSERT INTO nodes (node) VALUES ('to-be-deleted')""")
         cursor.execute("""INSERT INTO nodes (node) VALUES ('to-be-reconfigured')""")
         cursor.execute("""INSERT INTO nodes (node) VALUES ('to-be-purged')""")
@@ -444,15 +478,15 @@
                        OWNER.userhost())
         cursor.execute("""INSERT INTO affiliations
                           (node_id, entity_id, affiliation)
-                          SELECT nodes.id, entities.id, 'owner'
+                          SELECT node_id, entity_id, 'owner'
                           FROM nodes, entities
                           WHERE node='pre-existing' AND jid=%s""",
                        OWNER.userhost())
         cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
                        SUBSCRIBER.userhost())
         cursor.execute("""INSERT INTO subscriptions
-                          (node_id, entity_id, resource, subscription)
-                          SELECT nodes.id, entities.id, %s, 'subscribed'
+                          (node_id, entity_id, resource, state)
+                          SELECT node_id, entity_id, %s, 'subscribed'
                           FROM nodes, entities
                           WHERE node='pre-existing' AND jid=%s""",
                        (SUBSCRIBER.resource,
@@ -460,8 +494,8 @@
         cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
                        SUBSCRIBER_TO_BE_DELETED.userhost())
         cursor.execute("""INSERT INTO subscriptions
-                          (node_id, entity_id, resource, subscription)
-                          SELECT nodes.id, entities.id, %s, 'subscribed'
+                          (node_id, entity_id, resource, state)
+                          SELECT node_id, entity_id, %s, 'subscribed'
                           FROM nodes, entities
                           WHERE node='pre-existing' AND jid=%s""",
                        (SUBSCRIBER_TO_BE_DELETED.resource,
@@ -469,8 +503,8 @@
         cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
                        SUBSCRIBER_PENDING.userhost())
         cursor.execute("""INSERT INTO subscriptions
-                          (node_id, entity_id, resource, subscription)
-                          SELECT nodes.id, entities.id, %s, 'pending'
+                          (node_id, entity_id, resource, state)
+                          SELECT node_id, entity_id, %s, 'pending'
                           FROM nodes, entities
                           WHERE node='pre-existing' AND jid=%s""",
                        (SUBSCRIBER_PENDING.resource,
@@ -479,20 +513,20 @@
                        PUBLISHER.userhost())
         cursor.execute("""INSERT INTO items
                           (node_id, publisher, item, data, date)
-                          SELECT nodes.id, %s, 'to-be-deleted', %s,
+                          SELECT node_id, %s, 'to-be-deleted', %s,
                                  now() - interval '1 day'
                           FROM nodes
                           WHERE node='pre-existing'""",
                        (PUBLISHER.userhost(),
                         ITEM_TO_BE_DELETED.toXml()))
         cursor.execute("""INSERT INTO items (node_id, publisher, item, data)
-                          SELECT nodes.id, %s, 'to-be-deleted', %s
+                          SELECT node_id, %s, 'to-be-deleted', %s
                           FROM nodes
                           WHERE node='to-be-purged'""",
                        (PUBLISHER.userhost(),
                         ITEM_TO_BE_DELETED.toXml()))
         cursor.execute("""INSERT INTO items (node_id, publisher, item, data)
-                          SELECT nodes.id, %s, 'current', %s
+                          SELECT node_id, %s, 'current', %s
                           FROM nodes
                           WHERE node='pre-existing'""",
                        (PUBLISHER.userhost(),
--- a/setup.py	Mon Aug 04 07:10:45 2008 +0000
+++ b/setup.py	Mon Aug 04 13:47:10 2008 +0000
@@ -28,6 +28,8 @@
       ],
       package_data={'twisted.plugins': ['twisted/plugins/idavoll.py',
                                         'twisted/plugins/idavoll_http.py']},
+      data_files=[('share/idavoll', ['db/pubsub.sql',
+                                     'db/to_idavoll_0.8.sql'])],
       zip_safe=False,
       install_requires=install_requires,
 )