changeset 232:923281d4c5bc

renamed idavoll directory to sat_pubsub
author Goffi <goffi@goffi.org>
date Thu, 17 May 2012 12:48:14 +0200
parents d99047cd90f9
children 564ae55219e1
files idavoll/__init__.py idavoll/backend.py idavoll/error.py idavoll/gateway.py idavoll/iidavoll.py idavoll/memory_storage.py idavoll/pgsql_storage.py idavoll/tap.py idavoll/tap_http.py idavoll/test/__init__.py idavoll/test/test_backend.py idavoll/test/test_gateway.py idavoll/test/test_storage.py sat_pubsub/__init__.py sat_pubsub/backend.py sat_pubsub/error.py sat_pubsub/gateway.py sat_pubsub/iidavoll.py sat_pubsub/memory_storage.py sat_pubsub/pgsql_storage.py sat_pubsub/tap.py sat_pubsub/tap_http.py sat_pubsub/test/__init__.py sat_pubsub/test/test_backend.py sat_pubsub/test/test_gateway.py sat_pubsub/test/test_storage.py
diffstat 26 files changed, 5002 insertions(+), 5002 deletions(-) [+]
line wrap: on
line diff
--- a/idavoll/__init__.py	Thu May 10 23:08:08 2012 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,8 +0,0 @@
-# Copyright (c) 2003-2007 Ralph Meijer
-# See LICENSE for details.
-
-"""
-Idavoll, a generic XMPP publish-subscribe service.
-"""
-
-__version__ = '0.9.1' 
--- a/idavoll/backend.py	Thu May 10 23:08:08 2012 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,730 +0,0 @@
-# -*- test-case-name: idavoll.test.test_backend -*-
-#
-# Copyright (c) 2003-2010 Ralph Meijer
-# See LICENSE for details.
-
-"""
-Generic publish-subscribe backend.
-
-This module implements a generic publish-subscribe backend service with
-business logic as per
-U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>} that interacts with
-a given storage facility. It also provides an adapter from the XMPP
-publish-subscribe protocol.
-"""
-
-import uuid
-
-from zope.interface import implements
-
-from twisted.application import service
-from twisted.python import components, log
-from twisted.internet import defer, reactor
-from twisted.words.protocols.jabber.error import StanzaError
-from twisted.words.xish import utility
-
-from wokkel import disco
-from wokkel.iwokkel import IPubSubResource
-from wokkel.pubsub import PubSubResource, PubSubError
-
-from idavoll import error, iidavoll
-from idavoll.iidavoll import IBackendService, ILeafNode
-
-def _getAffiliation(node, entity):
-    d = node.getAffiliation(entity)
-    d.addCallback(lambda affiliation: (node, affiliation))
-    return d
-
-
-
-class BackendService(service.Service, utility.EventDispatcher):
-    """
-    Generic publish-subscribe backend service.
-
-    @cvar nodeOptions: Node configuration form as a mapping from the field
-                       name to a dictionary that holds the field's type, label
-                       and possible options to choose from.
-    @type nodeOptions: C{dict}.
-    @cvar defaultConfig: The default node configuration.
-    """
-
-    implements(iidavoll.IBackendService)
-
-    nodeOptions = {
-            "pubsub#persist_items":
-                {"type": "boolean",
-                 "label": "Persist items to storage"},
-            "pubsub#deliver_payloads":
-                {"type": "boolean",
-                 "label": "Deliver payloads with event notifications"},
-            "pubsub#send_last_published_item":
-                {"type": "list-single",
-                 "label": "When to send the last published item",
-                 "options": {
-                     "never": "Never",
-                     "on_sub": "When a new subscription is processed"}
-                },
-            }
-
-    subscriptionOptions = {
-            "pubsub#subscription_type":
-                {"type": "list-single",
-                 "options": {
-                     "items": "Receive notification of new items only",
-                     "nodes": "Receive notification of new nodes only"}
-                },
-            "pubsub#subscription_depth":
-                {"type": "list-single",
-                 "options": {
-                     "1": "Receive notification from direct child nodes only",
-                     "all": "Receive notification from all descendent nodes"}
-                },
-            }
-
-    def __init__(self, storage):
-        utility.EventDispatcher.__init__(self)
-        self.storage = storage
-        self._callbackList = []
-
-
-    def supportsPublisherAffiliation(self):
-        return True
-
-
-    def supportsOutcastAffiliation(self):
-        return True
-
-
-    def supportsPersistentItems(self):
-        return True
-
-
-    def getNodeType(self, nodeIdentifier):
-        d = self.storage.getNode(nodeIdentifier)
-        d.addCallback(lambda node: node.getType())
-        return d
-
-
-    def getNodes(self):
-        return self.storage.getNodeIds()
-
-
-    def getNodeMetaData(self, nodeIdentifier):
-        d = self.storage.getNode(nodeIdentifier)
-        d.addCallback(lambda node: node.getMetaData())
-        d.addCallback(self._makeMetaData)
-        return d
-
-
-    def _makeMetaData(self, metaData):
-        options = []
-        for key, value in metaData.iteritems():
-            if key in self.nodeOptions:
-                option = {"var": key}
-                option.update(self.nodeOptions[key])
-                option["value"] = value
-                options.append(option)
-
-        return options
-
-
-    def _checkAuth(self, node, requestor):
-        def check(affiliation, node):
-            if affiliation not in ['owner', 'publisher']:
-                raise error.Forbidden()
-            return node
-
-        d = node.getAffiliation(requestor)
-        d.addCallback(check, node)
-        return d
-
-
-    def publish(self, nodeIdentifier, items, requestor):
-        d = self.storage.getNode(nodeIdentifier)
-        d.addCallback(self._checkAuth, requestor)
-        d.addCallback(self._doPublish, items, requestor)
-        return d
-
-
-    def _doPublish(self, node, items, requestor):
-        if node.nodeType == 'collection':
-            raise error.NoPublishing()
-
-        configuration = node.getConfiguration()
-        persistItems = configuration["pubsub#persist_items"]
-        deliverPayloads = configuration["pubsub#deliver_payloads"]
-
-        if items and not persistItems and not deliverPayloads:
-            raise error.ItemForbidden()
-        elif not items and (persistItems or deliverPayloads):
-            raise error.ItemRequired()
-
-        if persistItems or deliverPayloads:
-            for item in items:
-                item.uri = None
-                item.defaultUri = None
-                if not item.getAttribute("id"):
-                    item["id"] = str(uuid.uuid4())
-
-        if persistItems:
-            d = node.storeItems(items, requestor)
-        else:
-            d = defer.succeed(None)
-
-        d.addCallback(self._doNotify, node.nodeIdentifier, items,
-                      deliverPayloads)
-        return d
-
-
-    def _doNotify(self, result, nodeIdentifier, items, deliverPayloads):
-        if items and not deliverPayloads:
-            for item in items:
-                item.children = []
-
-        self.dispatch({'items': items, 'nodeIdentifier': nodeIdentifier},
-                      '//event/pubsub/notify')
-
-
-    def getNotifications(self, nodeIdentifier, items):
-
-        def toNotifications(subscriptions, nodeIdentifier, items):
-            subsBySubscriber = {}
-            for subscription in subscriptions:
-                if subscription.options.get('pubsub#subscription_type',
-                                            'items') == 'items':
-                    subs = subsBySubscriber.setdefault(subscription.subscriber,
-                                                       set())
-                    subs.add(subscription)
-
-            notifications = [(subscriber, subscriptions, items)
-                             for subscriber, subscriptions
-                             in subsBySubscriber.iteritems()]
-
-            return notifications
-
-        def rootNotFound(failure):
-            failure.trap(error.NodeNotFound)
-            return []
-
-        d1 = self.storage.getNode(nodeIdentifier)
-        d1.addCallback(lambda node: node.getSubscriptions('subscribed'))
-        d2 = self.storage.getNode('')
-        d2.addCallback(lambda node: node.getSubscriptions('subscribed'))
-        d2.addErrback(rootNotFound)
-        d = defer.gatherResults([d1, d2])
-        d.addCallback(lambda result: result[0] + result[1])
-        d.addCallback(toNotifications, nodeIdentifier, items)
-        return d
-
-
-    def registerNotifier(self, observerfn, *args, **kwargs):
-        self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs)
-
-
-    def subscribe(self, nodeIdentifier, subscriber, requestor):
-        subscriberEntity = subscriber.userhostJID()
-        if subscriberEntity != requestor.userhostJID():
-            return defer.fail(error.Forbidden())
-
-        d = self.storage.getNode(nodeIdentifier)
-        d.addCallback(_getAffiliation, subscriberEntity)
-        d.addCallback(self._doSubscribe, subscriber)
-        return d
-
-
-    def _doSubscribe(self, result, subscriber):
-        node, affiliation = result
-
-        if affiliation == 'outcast':
-            raise error.Forbidden()
-
-        def trapExists(failure):
-            failure.trap(error.SubscriptionExists)
-            return False
-
-        def cb(sendLast):
-            d = node.getSubscription(subscriber)
-            if sendLast:
-                d.addCallback(self._sendLastPublished, node)
-            return d
-
-        d = node.addSubscription(subscriber, 'subscribed', {})
-        d.addCallbacks(lambda _: True, trapExists)
-        d.addCallback(cb)
-        return d
-
-
-    def _sendLastPublished(self, subscription, node):
-
-        def notifyItem(items):
-            if items:
-                reactor.callLater(0, self.dispatch,
-                                     {'items': items,
-                                      'nodeIdentifier': node.nodeIdentifier,
-                                      'subscription': subscription},
-                                     '//event/pubsub/notify')
-
-        config = node.getConfiguration()
-        sendLastPublished = config.get('pubsub#send_last_published_item',
-                                       'never')
-        if sendLastPublished == 'on_sub' and node.nodeType == 'leaf':
-            entity = subscription.subscriber.userhostJID()
-            d = self.getItems(node.nodeIdentifier, entity, 1)
-            d.addCallback(notifyItem)
-            d.addErrback(log.err)
-
-        return subscription
-
-
-    def unsubscribe(self, nodeIdentifier, subscriber, requestor):
-        if subscriber.userhostJID() != requestor.userhostJID():
-            return defer.fail(error.Forbidden())
-
-        d = self.storage.getNode(nodeIdentifier)
-        d.addCallback(lambda node: node.removeSubscription(subscriber))
-        return d
-
-
-    def getSubscriptions(self, entity):
-        return self.storage.getSubscriptions(entity)
-
-    def supportsAutoCreate(self):
-        return True
-
-    def supportsInstantNodes(self):
-        return True
-
-
-    def createNode(self, nodeIdentifier, requestor):
-        if not nodeIdentifier:
-            nodeIdentifier = 'generic/%s' % uuid.uuid4()
-
-        nodeType = 'leaf'
-        config = self.storage.getDefaultConfiguration(nodeType)
-        config['pubsub#node_type'] = nodeType
-
-        d = self.storage.createNode(nodeIdentifier, requestor, config)
-        d.addCallback(lambda _: nodeIdentifier)
-        return d
-
-
-    def getDefaultConfiguration(self, nodeType):
-        d = defer.succeed(self.storage.getDefaultConfiguration(nodeType))
-        return d
-
-
-    def getNodeConfiguration(self, nodeIdentifier):
-        if not nodeIdentifier:
-            return defer.fail(error.NoRootNode())
-
-        d = self.storage.getNode(nodeIdentifier)
-        d.addCallback(lambda node: node.getConfiguration())
-
-        return d
-
-
-    def setNodeConfiguration(self, nodeIdentifier, options, requestor):
-        if not nodeIdentifier:
-            return defer.fail(error.NoRootNode())
-
-        d = self.storage.getNode(nodeIdentifier)
-        d.addCallback(_getAffiliation, requestor)
-        d.addCallback(self._doSetNodeConfiguration, options)
-        return d
-
-
-    def _doSetNodeConfiguration(self, result, options):
-        node, affiliation = result
-
-        if affiliation != 'owner':
-            raise error.Forbidden()
-
-        return node.setConfiguration(options)
-
-
-    def getAffiliations(self, entity):
-        return self.storage.getAffiliations(entity)
-
-
-    def getItems(self, nodeIdentifier, requestor, maxItems=None,
-                       itemIdentifiers=None):
-        d = self.storage.getNode(nodeIdentifier)
-        d.addCallback(_getAffiliation, requestor)
-        d.addCallback(self._doGetItems, maxItems, itemIdentifiers)
-        return d
-
-
-    def _doGetItems(self, result, maxItems, itemIdentifiers):
-        node, affiliation = result
-
-        if not ILeafNode.providedBy(node):
-            return []
-
-        if affiliation == 'outcast':
-            raise error.Forbidden()
-
-        if itemIdentifiers:
-            return node.getItemsById(itemIdentifiers)
-        else:
-            return node.getItems(maxItems)
-
-
-    def retractItem(self, nodeIdentifier, itemIdentifiers, requestor):
-        d = self.storage.getNode(nodeIdentifier)
-        d.addCallback(_getAffiliation, requestor)
-        d.addCallback(self._doRetract, itemIdentifiers)
-        return d
-
-
-    def _doRetract(self, result, itemIdentifiers):
-        node, affiliation = result
-        persistItems = node.getConfiguration()["pubsub#persist_items"]
-
-        if affiliation not in ['owner', 'publisher']:
-            raise error.Forbidden()
-
-        if not persistItems:
-            raise error.NodeNotPersistent()
-
-        d = node.removeItems(itemIdentifiers)
-        d.addCallback(self._doNotifyRetraction, node.nodeIdentifier)
-        return d
-
-
-    def _doNotifyRetraction(self, itemIdentifiers, nodeIdentifier):
-        self.dispatch({'itemIdentifiers': itemIdentifiers,
-                       'nodeIdentifier': nodeIdentifier },
-                      '//event/pubsub/retract')
-
-
-    def purgeNode(self, nodeIdentifier, requestor):
-        d = self.storage.getNode(nodeIdentifier)
-        d.addCallback(_getAffiliation, requestor)
-        d.addCallback(self._doPurge)
-        return d
-
-
-    def _doPurge(self, result):
-        node, affiliation = result
-        persistItems = node.getConfiguration()["pubsub#persist_items"]
-
-        if affiliation != 'owner':
-            raise error.Forbidden()
-
-        if not persistItems:
-            raise error.NodeNotPersistent()
-
-        d = node.purge()
-        d.addCallback(self._doNotifyPurge, node.nodeIdentifier)
-        return d
-
-
-    def _doNotifyPurge(self, result, nodeIdentifier):
-        self.dispatch(nodeIdentifier, '//event/pubsub/purge')
-
-
-    def registerPreDelete(self, preDeleteFn):
-        self._callbackList.append(preDeleteFn)
-
-
-    def getSubscribers(self, nodeIdentifier):
-        def cb(subscriptions):
-            return [subscription.subscriber for subscription in subscriptions]
-
-        d = self.storage.getNode(nodeIdentifier)
-        d.addCallback(lambda node: node.getSubscriptions('subscribed'))
-        d.addCallback(cb)
-        return d
-
-
-    def deleteNode(self, nodeIdentifier, requestor, redirectURI=None):
-        d = self.storage.getNode(nodeIdentifier)
-        d.addCallback(_getAffiliation, requestor)
-        d.addCallback(self._doPreDelete, redirectURI)
-        return d
-
-
-    def _doPreDelete(self, result, redirectURI):
-        node, affiliation = result
-
-        if affiliation != 'owner':
-            raise error.Forbidden()
-
-        data = {'nodeIdentifier': node.nodeIdentifier,
-                'redirectURI': redirectURI}
-
-        d = defer.DeferredList([cb(data)
-                                for cb in self._callbackList],
-                               consumeErrors=1)
-        d.addCallback(self._doDelete, node.nodeIdentifier)
-
-
-    def _doDelete(self, result, nodeIdentifier):
-        dl = []
-        for succeeded, r in result:
-            if succeeded and r:
-                dl.extend(r)
-
-        d = self.storage.deleteNode(nodeIdentifier)
-        d.addCallback(self._doNotifyDelete, dl)
-
-        return d
-
-
-    def _doNotifyDelete(self, result, dl):
-        for d in dl:
-            d.callback(None)
-
-
-
-class PubSubResourceFromBackend(PubSubResource):
-    """
-    Adapts a backend to an xmpp publish-subscribe service.
-    """
-
-    features = [
-        "config-node",
-        "create-nodes",
-        "delete-any",
-        "delete-nodes",
-        "item-ids",
-        "meta-data",
-        "publish",
-        "purge-nodes",
-        "retract-items",
-        "retrieve-affiliations",
-        "retrieve-default",
-        "retrieve-items",
-        "retrieve-subscriptions",
-        "subscribe",
-    ]
-
-    discoIdentity = disco.DiscoIdentity('pubsub',
-                                        'service',
-                                        'Idavoll publish-subscribe service')
-
-    pubsubService = None
-
-    _errorMap = {
-        error.NodeNotFound: ('item-not-found', None, None),
-        error.NodeExists: ('conflict', None, None),
-        error.Forbidden: ('forbidden', None, None),
-        error.ItemForbidden: ('bad-request', 'item-forbidden', None),
-        error.ItemRequired: ('bad-request', 'item-required', None),
-        error.NoInstantNodes: ('not-acceptable',
-                               'unsupported',
-                               'instant-nodes'),
-        error.NotSubscribed: ('unexpected-request', 'not-subscribed', None),
-        error.InvalidConfigurationOption: ('not-acceptable', None, None),
-        error.InvalidConfigurationValue: ('not-acceptable', None, None),
-        error.NodeNotPersistent: ('feature-not-implemented',
-                                  'unsupported',
-                                  'persistent-node'),
-        error.NoRootNode: ('bad-request', None, None),
-        error.NoCollections: ('feature-not-implemented',
-                              'unsupported',
-                              'collections'),
-        error.NoPublishing: ('feature-not-implemented',
-                             'unsupported',
-                             'publish'),
-    }
-
-    def __init__(self, backend):
-        PubSubResource.__init__(self)
-
-        self.backend = backend
-        self.hideNodes = False
-
-        self.backend.registerNotifier(self._notify)
-        self.backend.registerPreDelete(self._preDelete)
-
-        if self.backend.supportsAutoCreate():
-            self.features.append("auto-create")
-
-        if self.backend.supportsInstantNodes():
-            self.features.append("instant-nodes")
-
-        if self.backend.supportsOutcastAffiliation():
-            self.features.append("outcast-affiliation")
-
-        if self.backend.supportsPersistentItems():
-            self.features.append("persistent-items")
-
-        if self.backend.supportsPublisherAffiliation():
-            self.features.append("publisher-affiliation")
-
-
-    def _notify(self, data):
-        items = data['items']
-        nodeIdentifier = data['nodeIdentifier']
-        if 'subscription' not in data:
-            d = self.backend.getNotifications(nodeIdentifier, items)
-        else:
-            subscription = data['subscription']
-            d = defer.succeed([(subscription.subscriber, [subscription],
-                                items)])
-        d.addCallback(lambda notifications: self.pubsubService.notifyPublish(
-                                                self.serviceJID,
-                                                nodeIdentifier,
-                                                notifications))
-
-
-    def _preDelete(self, data):
-        nodeIdentifier = data['nodeIdentifier']
-        redirectURI = data.get('redirectURI', None)
-        d = self.backend.getSubscribers(nodeIdentifier)
-        d.addCallback(lambda subscribers: self.pubsubService.notifyDelete(
-                                                self.serviceJID,
-                                                nodeIdentifier,
-                                                subscribers,
-                                                redirectURI))
-        return d
-
-
-    def _mapErrors(self, failure):
-        e = failure.trap(*self._errorMap.keys())
-
-        condition, pubsubCondition, feature = self._errorMap[e]
-        msg = failure.value.msg
-
-        if pubsubCondition:
-            exc = PubSubError(condition, pubsubCondition, feature, msg)
-        else:
-            exc = StanzaError(condition, text=msg)
-
-        raise exc
-
-
-    def getInfo(self, requestor, service, nodeIdentifier):
-        info = {}
-
-        def saveType(result):
-            info['type'] = result
-            return nodeIdentifier
-
-        def saveMetaData(result):
-            info['meta-data'] = result
-            return info
-
-        def trapNotFound(failure):
-            failure.trap(error.NodeNotFound)
-            return info
-
-        d = defer.succeed(nodeIdentifier)
-        d.addCallback(self.backend.getNodeType)
-        d.addCallback(saveType)
-        d.addCallback(self.backend.getNodeMetaData)
-        d.addCallback(saveMetaData)
-        d.addErrback(trapNotFound)
-        d.addErrback(self._mapErrors)
-        return d
-
-
-    def getNodes(self, requestor, service, nodeIdentifier):
-        if service.resource:
-            return defer.succeed([])
-        d = self.backend.getNodes()
-        return d.addErrback(self._mapErrors)
-
-
-    def getConfigurationOptions(self):
-        return self.backend.nodeOptions
-
-    def _publish_errb(self, failure, request):
-        if failure.type == error.NodeNotFound and self.backend.supportsAutoCreate():
-            d = self.backend.createNode(request.nodeIdentifier,
-                                        request.sender)
-            d.addCallback(lambda ignore,
-                                 request: self.backend.publish(request.nodeIdentifier,
-                                                               request.items,
-                                                               request.sender),
-                          request)
-            return d
-
-        raise failure
-
-
-    def publish(self, request):
-        d = self.backend.publish(request.nodeIdentifier,
-                                 request.items,
-                                 request.sender)
-        d.addErrback(self._publish_errb, request)
-        return d.addErrback(self._mapErrors)
-
-
-    def subscribe(self, request):
-        d = self.backend.subscribe(request.nodeIdentifier,
-                                   request.subscriber,
-                                   request.sender)
-        return d.addErrback(self._mapErrors)
-
-
-    def unsubscribe(self, request):
-        d = self.backend.unsubscribe(request.nodeIdentifier,
-                                     request.subscriber,
-                                     request.sender)
-        return d.addErrback(self._mapErrors)
-
-
-    def subscriptions(self, request):
-        d = self.backend.getSubscriptions(request.sender)
-        return d.addErrback(self._mapErrors)
-
-
-    def affiliations(self, request):
-        d = self.backend.getAffiliations(request.sender)
-        return d.addErrback(self._mapErrors)
-
-
-    def create(self, request):
-        d = self.backend.createNode(request.nodeIdentifier,
-                                    request.sender)
-        return d.addErrback(self._mapErrors)
-
-
-    def default(self, request):
-        d = self.backend.getDefaultConfiguration(request.nodeType)
-        return d.addErrback(self._mapErrors)
-
-
-    def configureGet(self, request):
-        d = self.backend.getNodeConfiguration(request.nodeIdentifier)
-        return d.addErrback(self._mapErrors)
-
-
-    def configureSet(self, request):
-        d = self.backend.setNodeConfiguration(request.nodeIdentifier,
-                                              request.options,
-                                              request.sender)
-        return d.addErrback(self._mapErrors)
-
-
-    def items(self, request):
-        d = self.backend.getItems(request.nodeIdentifier,
-                                  request.sender,
-                                  request.maxItems,
-                                  request.itemIdentifiers)
-        return d.addErrback(self._mapErrors)
-
-
-    def retract(self, request):
-        d = self.backend.retractItem(request.nodeIdentifier,
-                                     request.itemIdentifiers,
-                                     request.sender)
-        return d.addErrback(self._mapErrors)
-
-
-    def purge(self, request):
-        d = self.backend.purgeNode(request.nodeIdentifier,
-                                   request.sender)
-        return d.addErrback(self._mapErrors)
-
-
-    def delete(self, request):
-        d = self.backend.deleteNode(request.nodeIdentifier,
-                                    request.sender)
-        return d.addErrback(self._mapErrors)
-
-components.registerAdapter(PubSubResourceFromBackend,
-                           IBackendService,
-                           IPubSubResource)
--- a/idavoll/error.py	Thu May 10 23:08:08 2012 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,95 +0,0 @@
-# Copyright (c) 2003-2008 Ralph Meijer
-# See LICENSE for details.
-
-class Error(Exception):
-    msg = ''
-
-    def __init__(self, msg=None):
-        self.msg = msg or self.msg
-
-
-    def __str__(self):
-        return self.msg
-
-
-
-class NodeNotFound(Error):
-    pass
-
-
-
-class NodeExists(Error):
-    pass
-
-
-
-class NotSubscribed(Error):
-    """
-    Entity is not subscribed to this node.
-    """
-
-
-
-class SubscriptionExists(Error):
-    """
-    There already exists a subscription to this node.
-    """
-
-
-
-class Forbidden(Error):
-    pass
-
-
-
-class ItemForbidden(Error):
-    pass
-
-
-
-class ItemRequired(Error):
-    pass
-
-
-
-class NoInstantNodes(Error):
-    pass
-
-
-
-class InvalidConfigurationOption(Error):
-    msg = 'Invalid configuration option'
-
-
-
-class InvalidConfigurationValue(Error):
-    msg = 'Bad configuration value'
-
-
-
-class NodeNotPersistent(Error):
-    pass
-
-
-
-class NoRootNode(Error):
-    pass
-
-
-
-class NoCallbacks(Error):
-    """
-    There are no callbacks for this node.
-    """
-
-
-
-class NoCollections(Error):
-    pass
-
-
-
-class NoPublishing(Error):
-    """
-    This node does not support publishing.
-    """
--- a/idavoll/gateway.py	Thu May 10 23:08:08 2012 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,868 +0,0 @@
-# -*- test-case-name: idavoll.test.test_gateway -*-
-#
-# Copyright (c) 2003-2009 Ralph Meijer
-# See LICENSE for details.
-
-"""
-Web resources and client for interacting with pubsub services.
-"""
-
-import cgi
-from time import gmtime, strftime
-import urllib
-import urlparse
-
-import simplejson
-
-from twisted.application import service
-from twisted.internet import defer, reactor
-from twisted.python import log
-from twisted.web import client
-from twisted.web2 import http, http_headers, resource, responsecode
-from twisted.web2 import channel, server
-from twisted.web2.stream import readStream
-from twisted.words.protocols.jabber.jid import JID
-from twisted.words.protocols.jabber.error import StanzaError
-from twisted.words.xish import domish
-
-from wokkel.pubsub import Item
-from wokkel.pubsub import PubSubClient
-
-from idavoll import error
-
-NS_ATOM = 'http://www.w3.org/2005/Atom'
-MIME_ATOM_ENTRY = 'application/atom+xml;type=entry'
-MIME_JSON = 'application/json'
-
-class XMPPURIParseError(ValueError):
-    """
-    Raised when a given XMPP URI couldn't be properly parsed.
-    """
-
-
-
-def getServiceAndNode(uri):
-    """
-    Given an XMPP URI, extract the publish subscribe service JID and node ID.
-    """
-
-    try:
-        scheme, rest = uri.split(':', 1)
-    except ValueError:
-        raise XMPPURIParseError("No URI scheme component")
-
-    if scheme != 'xmpp':
-        raise XMPPURIParseError("Unknown URI scheme")
-
-    if rest.startswith("//"):
-        raise XMPPURIParseError("Unexpected URI authority component")
-
-    try:
-        entity, query = rest.split('?', 1)
-    except ValueError:
-        raise XMPPURIParseError("No URI query component")
-
-    if not entity:
-        raise XMPPURIParseError("Empty URI path component")
-
-    try:
-        service = JID(entity)
-    except Exception, e:
-        raise XMPPURIParseError("Invalid JID: %s" % e)
-
-    params = cgi.parse_qs(query)
-
-    try:
-        nodeIdentifier = params['node'][0]
-    except (KeyError, ValueError):
-        nodeIdentifier = ''
-
-    return service, nodeIdentifier
-
-
-
-def getXMPPURI(service, nodeIdentifier):
-    """
-    Construct an XMPP URI from a service JID and node identifier.
-    """
-    return "xmpp:%s?;node=%s" % (service.full(), nodeIdentifier or '')
-
-
-
-class WebStreamParser(object):
-    def __init__(self):
-        self.elementStream = domish.elementStream()
-        self.elementStream.DocumentStartEvent = self.docStart
-        self.elementStream.ElementEvent = self.elem
-        self.elementStream.DocumentEndEvent = self.docEnd
-        self.done = False
-
-
-    def docStart(self, elem):
-        self.document = elem
-
-
-    def elem(self, elem):
-        self.document.addChild(elem)
-
-
-    def docEnd(self):
-        self.done = True
-
-
-    def parse(self, stream):
-        def endOfStream(result):
-            if not self.done:
-                raise Exception("No more stuff?")
-            else:
-                return self.document
-
-        d = readStream(stream, self.elementStream.parse)
-        d.addCallback(endOfStream)
-        return d
-
-
-
-class CreateResource(resource.Resource):
-    """
-    A resource to create a publish-subscribe node.
-    """
-    def __init__(self, backend, serviceJID, owner):
-        self.backend = backend
-        self.serviceJID = serviceJID
-        self.owner = owner
-
-
-    http_GET = None
-
-
-    def http_POST(self, request):
-        """
-        Respond to a POST request to create a new node.
-        """
-
-        def toResponse(nodeIdentifier):
-            uri = getXMPPURI(self.serviceJID, nodeIdentifier)
-            stream = simplejson.dumps({'uri': uri})
-            contentType = http_headers.MimeType.fromString(MIME_JSON)
-            return http.Response(responsecode.OK, stream=stream,
-                                 headers={'Content-Type': contentType})
-        d = self.backend.createNode(None, self.owner)
-        d.addCallback(toResponse)
-        return d
-
-
-
-class DeleteResource(resource.Resource):
-    """
-    A resource to create a publish-subscribe node.
-    """
-    def __init__(self, backend, serviceJID, owner):
-        self.backend = backend
-        self.serviceJID = serviceJID
-        self.owner = owner
-
-
-    http_GET = None
-
-
-    def http_POST(self, request):
-        """
-        Respond to a POST request to create a new node.
-        """
-
-        def gotStream(_):
-            if request.args.get('uri'):
-                jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0])
-                return defer.succeed(nodeIdentifier)
-            else:
-                raise http.HTTPError(http.Response(responsecode.BAD_REQUEST,
-                                                   "No URI given"))
-
-        def doDelete(nodeIdentifier, data):
-            if data:
-                params = simplejson.loads(''.join(data))
-                redirectURI = params.get('redirect_uri')
-            else:
-                redirectURI = None
-
-            return self.backend.deleteNode(nodeIdentifier, self.owner,
-                                           redirectURI)
-
-        def respond(result):
-            return http.Response(responsecode.NO_CONTENT)
-
-
-        def trapNotFound(failure):
-            failure.trap(error.NodeNotFound)
-            return http.StatusResponse(responsecode.NOT_FOUND,
-                                       "Node not found")
-
-        def trapXMPPURIParseError(failure):
-            failure.trap(XMPPURIParseError)
-            return http.StatusResponse(responsecode.BAD_REQUEST,
-                    "Malformed XMPP URI: %s" % failure.value)
-
-        data = []
-        d = readStream(request.stream, data.append)
-        d.addCallback(gotStream)
-        d.addCallback(doDelete, data)
-        d.addCallback(respond)
-        d.addErrback(trapNotFound)
-        d.addErrback(trapXMPPURIParseError)
-        return d
-
-
-
-class PublishResource(resource.Resource):
-    """
-    A resource to publish to a publish-subscribe node.
-    """
-
-    def __init__(self, backend, serviceJID, owner):
-        self.backend = backend
-        self.serviceJID = serviceJID
-        self.owner = owner
-
-
-    http_GET = None
-
-
-    def checkMediaType(self, request):
-        ctype = request.headers.getHeader('content-type')
-
-        if not ctype:
-            raise http.HTTPError(
-                http.StatusResponse(
-                    responsecode.BAD_REQUEST,
-                    "No specified Media Type"))
-
-        if (ctype.mediaType != 'application' or
-            ctype.mediaSubtype != 'atom+xml' or
-            ctype.params.get('type') != 'entry' or
-            ctype.params.get('charset', 'utf-8') != 'utf-8'):
-            raise http.HTTPError(
-                http.StatusResponse(
-                    responsecode.UNSUPPORTED_MEDIA_TYPE,
-                    "Unsupported Media Type: %s" %
-                        http_headers.generateContentType(ctype)))
-
-
-    def parseXMLPayload(self, stream):
-        p = WebStreamParser()
-        return p.parse(stream)
-
-
-    def http_POST(self, request):
-        """
-        Respond to a POST request to create a new item.
-        """
-
-        def toResponse(nodeIdentifier):
-            uri = getXMPPURI(self.serviceJID, nodeIdentifier)
-            stream = simplejson.dumps({'uri': uri})
-            contentType = http_headers.MimeType.fromString(MIME_JSON)
-            return http.Response(responsecode.OK, stream=stream,
-                                 headers={'Content-Type': contentType})
-
-        def gotNode(nodeIdentifier, payload):
-            item = Item(id='current', payload=payload)
-            d = self.backend.publish(nodeIdentifier, [item], self.owner)
-            d.addCallback(lambda _: nodeIdentifier)
-            return d
-
-        def getNode():
-            if request.args.get('uri'):
-                jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0])
-                return defer.succeed(nodeIdentifier)
-            else:
-                return self.backend.createNode(None, self.owner)
-
-        def doPublish(payload):
-            d = getNode()
-            d.addCallback(gotNode, payload)
-            return d
-
-        def trapNotFound(failure):
-            failure.trap(error.NodeNotFound)
-            return http.StatusResponse(responsecode.NOT_FOUND,
-                                       "Node not found")
-
-        def trapXMPPURIParseError(failure):
-            failure.trap(XMPPURIParseError)
-            return http.StatusResponse(responsecode.BAD_REQUEST,
-                    "Malformed XMPP URI: %s" % failure.value)
-
-        self.checkMediaType(request)
-        d = self.parseXMLPayload(request.stream)
-        d.addCallback(doPublish)
-        d.addCallback(toResponse)
-        d.addErrback(trapNotFound)
-        d.addErrback(trapXMPPURIParseError)
-        return d
-
-
-
-class ListResource(resource.Resource):
-    def __init__(self, service):
-        self.service = service
-
-
-    def render(self, request):
-        def responseFromNodes(nodeIdentifiers):
-            stream = simplejson.dumps(nodeIdentifiers)
-            contentType = http_headers.MimeType.fromString(MIME_JSON)
-            return http.Response(responsecode.OK, stream=stream,
-                                 headers={'Content-Type': contentType})
-
-        d = self.service.getNodes()
-        d.addCallback(responseFromNodes)
-        return d
-
-
-
-# Service for subscribing to remote XMPP Pubsub nodes and web resources
-
-def extractAtomEntries(items):
-    """
-    Extract atom entries from a list of publish-subscribe items.
-
-    @param items: List of L{domish.Element}s that represent publish-subscribe
-                  items.
-    @type items: C{list}
-    """
-
-    atomEntries = []
-
-    for item in items:
-        # ignore non-items (i.e. retractions)
-        if item.name != 'item':
-            continue
-
-        atomEntry = None
-        for element in item.elements():
-            # extract the first element that is an atom entry
-            if element.uri == NS_ATOM and element.name == 'entry':
-                atomEntry = element
-                break
-
-        if atomEntry:
-            atomEntries.append(atomEntry)
-
-    return atomEntries
-
-
-
-def constructFeed(service, nodeIdentifier, entries, title):
-    nodeURI = getXMPPURI(service, nodeIdentifier)
-    now = strftime("%Y-%m-%dT%H:%M:%SZ", gmtime())
-
-    # Collect the received entries in a feed
-    feed = domish.Element((NS_ATOM, 'feed'))
-    feed.addElement('title', content=title)
-    feed.addElement('id', content=nodeURI)
-    feed.addElement('updated', content=now)
-
-    for entry in entries:
-        feed.addChild(entry)
-
-    return feed
-
-
-
-class RemoteSubscriptionService(service.Service, PubSubClient):
-    """
-    Service for subscribing to remote XMPP Publish-Subscribe nodes.
-
-    Subscriptions are created with a callback HTTP URI that is POSTed
-    to with the received items in notifications.
-    """
-
-    def __init__(self, jid, storage):
-        self.jid = jid
-        self.storage = storage
-
-
-    def trapNotFound(self, failure):
-        failure.trap(StanzaError)
-
-        if failure.value.condition == 'item-not-found':
-            raise error.NodeNotFound()
-        else:
-            return failure
-
-
-    def subscribeCallback(self, jid, nodeIdentifier, callback):
-        """
-        Subscribe a callback URI.
-
-        This registers a callback URI to be called when a notification is
-        received for the given node.
-
-        If this is the first callback registered for this node, the gateway
-        will subscribe to the node. Otherwise, the most recently published item
-        for this node is retrieved and, if present, the newly registered
-        callback will be called with that item.
-        """
-
-        def callbackForLastItem(items):
-            atomEntries = extractAtomEntries(items)
-
-            if not atomEntries:
-                return
-
-            self._postTo([callback], jid, nodeIdentifier, atomEntries[0],
-                         'application/atom+xml;type=entry')
-
-        def subscribeOrItems(hasCallbacks):
-            if hasCallbacks:
-                if not nodeIdentifier:
-                    return None
-                d = self.items(jid, nodeIdentifier, 1)
-                d.addCallback(callbackForLastItem)
-            else:
-                d = self.subscribe(jid, nodeIdentifier, self.jid)
-
-            d.addErrback(self.trapNotFound)
-            return d
-
-        d = self.storage.hasCallbacks(jid, nodeIdentifier)
-        d.addCallback(subscribeOrItems)
-        d.addCallback(lambda _: self.storage.addCallback(jid, nodeIdentifier,
-                                                         callback))
-        return d
-
-
-    def unsubscribeCallback(self, jid, nodeIdentifier, callback):
-        """
-        Unsubscribe a callback.
-
-        If this was the last registered callback for this node, the
-        gateway will unsubscribe from node.
-        """
-
-        def cb(last):
-            if last:
-                return self.unsubscribe(jid, nodeIdentifier, self.jid)
-
-        d = self.storage.removeCallback(jid, nodeIdentifier, callback)
-        d.addCallback(cb)
-        return d
-
-
-    def itemsReceived(self, event):
-        """
-        Fire up HTTP client to do callback
-        """
-
-        atomEntries = extractAtomEntries(event.items)
-        service = event.sender
-        nodeIdentifier = event.nodeIdentifier
-        headers = event.headers
-
-        # Don't notify if there are no atom entries
-        if not atomEntries:
-            return
-
-        if len(atomEntries) == 1:
-            contentType = 'application/atom+xml;type=entry'
-            payload = atomEntries[0]
-        else:
-            contentType = 'application/atom+xml;type=feed'
-            payload = constructFeed(service, nodeIdentifier, atomEntries,
-                                    title='Received item collection')
-
-        self.callCallbacks(service, nodeIdentifier, payload, contentType)
-
-        if 'Collection' in headers:
-            for collection in headers['Collection']:
-                nodeIdentifier = collection or ''
-                self.callCallbacks(service, nodeIdentifier, payload,
-                                   contentType)
-
-
-    def deleteReceived(self, event):
-        """
-        Fire up HTTP client to do callback
-        """
-
-        service = event.sender
-        nodeIdentifier = event.nodeIdentifier
-        redirectURI = event.redirectURI
-        self.callCallbacks(service, nodeIdentifier, eventType='DELETED',
-                           redirectURI=redirectURI)
-
-
-    def _postTo(self, callbacks, service, nodeIdentifier,
-                      payload=None, contentType=None, eventType=None,
-                      redirectURI=None):
-
-        if not callbacks:
-            return
-
-        postdata = None
-        nodeURI = getXMPPURI(service, nodeIdentifier)
-        headers = {'Referer': nodeURI.encode('utf-8'),
-                   'PubSub-Service': service.full().encode('utf-8')}
-
-        if payload:
-            postdata = payload.toXml().encode('utf-8')
-            if contentType:
-                headers['Content-Type'] = "%s;charset=utf-8" % contentType
-
-        if eventType:
-            headers['Event'] = eventType
-
-        if redirectURI:
-            headers['Link'] = '<%s>; rel=alternate' % (
-                              redirectURI.encode('utf-8'),
-                              )
-
-        def postNotification(callbackURI):
-            f = getPageWithFactory(str(callbackURI),
-                                   method='POST',
-                                   postdata=postdata,
-                                   headers=headers)
-            d = f.deferred
-            d.addErrback(log.err)
-
-        for callbackURI in callbacks:
-            reactor.callLater(0, postNotification, callbackURI)
-
-
-    def callCallbacks(self, service, nodeIdentifier,
-                            payload=None, contentType=None, eventType=None,
-                            redirectURI=None):
-
-        def eb(failure):
-            failure.trap(error.NoCallbacks)
-
-            # No callbacks were registered for this node. Unsubscribe?
-
-        d = self.storage.getCallbacks(service, nodeIdentifier)
-        d.addCallback(self._postTo, service, nodeIdentifier, payload,
-                                    contentType, eventType, redirectURI)
-        d.addErrback(eb)
-        d.addErrback(log.err)
-
-
-
-class RemoteSubscribeBaseResource(resource.Resource):
-    """
-    Base resource for remote pubsub node subscription and unsubscription.
-
-    This resource accepts POST request with a JSON document that holds
-    a dictionary with the keys C{uri} and C{callback} that respectively map
-    to the XMPP URI of the publish-subscribe node and the callback URI.
-
-    This class should be inherited with L{serviceMethod} overridden.
-
-    @cvar serviceMethod: The name of the method to be called with
-                         the JID of the pubsub service, the node identifier
-                         and the callback URI as received in the HTTP POST
-                         request to this resource.
-    """
-    serviceMethod = None
-    errorMap = {
-            error.NodeNotFound:
-                (responsecode.FORBIDDEN, "Node not found"),
-            error.NotSubscribed:
-                (responsecode.FORBIDDEN, "No such subscription found"),
-            error.SubscriptionExists:
-                (responsecode.FORBIDDEN, "Subscription already exists"),
-    }
-
-    def __init__(self, service):
-        self.service = service
-        self.params = None
-
-
-    http_GET = None
-
-
-    def http_POST(self, request):
-        def trapNotFound(failure):
-            err = failure.trap(*self.errorMap.keys())
-            code, msg = self.errorMap[err]
-            return http.StatusResponse(code, msg)
-
-        def respond(result):
-            return http.Response(responsecode.NO_CONTENT)
-
-        def gotRequest(result):
-            uri = self.params['uri']
-            callback = self.params['callback']
-
-            jid, nodeIdentifier = getServiceAndNode(uri)
-            method = getattr(self.service, self.serviceMethod)
-            d = method(jid, nodeIdentifier, callback)
-            return d
-
-        def storeParams(data):
-            self.params = simplejson.loads(data)
-
-        def trapXMPPURIParseError(failure):
-            failure.trap(XMPPURIParseError)
-            return http.StatusResponse(responsecode.BAD_REQUEST,
-                    "Malformed XMPP URI: %s" % failure.value)
-
-        d = readStream(request.stream, storeParams)
-        d.addCallback(gotRequest)
-        d.addCallback(respond)
-        d.addErrback(trapNotFound)
-        d.addErrback(trapXMPPURIParseError)
-        return d
-
-
-
-class RemoteSubscribeResource(RemoteSubscribeBaseResource):
-    """
-    Resource to subscribe to a remote publish-subscribe node.
-
-    The passed C{uri} is the XMPP URI of the node to subscribe to and the
-    C{callback} is the callback URI. Upon receiving notifications from the
-    node, a POST request will be perfomed on the callback URI.
-    """
-    serviceMethod = 'subscribeCallback'
-
-
-
-class RemoteUnsubscribeResource(RemoteSubscribeBaseResource):
-    """
-    Resource to unsubscribe from a remote publish-subscribe node.
-
-    The passed C{uri} is the XMPP URI of the node to unsubscribe from and the
-    C{callback} is the callback URI that was registered for it.
-    """
-    serviceMethod = 'unsubscribeCallback'
-
-
-
-class RemoteItemsResource(resource.Resource):
-    """
-    Resource for retrieving items from a remote pubsub node.
-    """
-
-    def __init__(self, service):
-        self.service = service
-
-
-    def render(self, request):
-        try:
-            maxItems = int(request.args.get('max_items', [0])[0]) or None
-        except ValueError:
-            return http.StatusResponse(responsecode.BAD_REQUEST,
-                    "The argument max_items has an invalid value.")
-
-        try:
-            uri = request.args['uri'][0]
-        except KeyError:
-            return http.StatusResponse(responsecode.BAD_REQUEST,
-                    "No URI for the remote node provided.")
-
-        try:
-            jid, nodeIdentifier = getServiceAndNode(uri)
-        except XMPPURIParseError:
-            return http.StatusResponse(responsecode.BAD_REQUEST,
-                    "Malformed XMPP URI: %s" % uri)
-
-        def respond(items):
-            """Create a feed out the retrieved items."""
-            contentType = http_headers.MimeType('application',
-                                                'atom+xml',
-                                                {'type': 'feed'})
-            atomEntries = extractAtomEntries(items)
-            feed = constructFeed(jid, nodeIdentifier, atomEntries,
-                                    "Retrieved item collection")
-            payload = feed.toXml().encode('utf-8')
-            return http.Response(responsecode.OK, stream=payload,
-                                 headers={'Content-Type': contentType})
-
-        def trapNotFound(failure):
-            failure.trap(StanzaError)
-            if not failure.value.condition == 'item-not-found':
-                raise failure
-            return http.StatusResponse(responsecode.NOT_FOUND,
-                                       "Node not found")
-
-        d = self.service.items(jid, nodeIdentifier, maxItems)
-        d.addCallback(respond)
-        d.addErrback(trapNotFound)
-        return d
-
-
-
-# Client side code to interact with a service as provided above
-
-def getPageWithFactory(url, contextFactory=None, *args, **kwargs):
-    """Download a web page.
-
-    Download a page. Return the factory that holds a deferred, which will
-    callback with a page (as a string) or errback with a description of the
-    error.
-
-    See HTTPClientFactory to see what extra args can be passed.
-    """
-
-    scheme, host, port, path = client._parse(url)
-    factory = client.HTTPClientFactory(url, *args, **kwargs)
-    factory.protocol.handleStatus_204 = lambda self: self.handleStatus_200()
-
-    if scheme == 'https':
-        from twisted.internet import ssl
-        if contextFactory is None:
-            contextFactory = ssl.ClientContextFactory()
-        reactor.connectSSL(host, port, factory, contextFactory)
-    else:
-        reactor.connectTCP(host, port, factory)
-    return factory
-
-
-
-class CallbackResource(resource.Resource):
-    """
-    Web resource for retrieving gateway notifications.
-    """
-
-    def __init__(self, callback):
-        self.callback = callback
-
-
-    http_GET = None
-
-
-    def http_POST(self, request):
-        p = WebStreamParser()
-        if not request.headers.hasHeader('Event'):
-            d = p.parse(request.stream)
-        else:
-            d = defer.succeed(None)
-        d.addCallback(self.callback, request.headers)
-        d.addCallback(lambda _: http.Response(responsecode.NO_CONTENT))
-        return d
-
-
-
-class GatewayClient(service.Service):
-    """
-    Service that provides client access to the HTTP Gateway into Idavoll.
-    """
-
-    agent = "Idavoll HTTP Gateway Client"
-
-    def __init__(self, baseURI, callbackHost=None, callbackPort=None):
-        self.baseURI = baseURI
-        self.callbackHost = callbackHost or 'localhost'
-        self.callbackPort = callbackPort or 8087
-        root = resource.Resource()
-        root.child_callback = CallbackResource(lambda *args, **kwargs: self.callback(*args, **kwargs))
-        self.site = server.Site(root)
-
-
-    def startService(self):
-        self.port = reactor.listenTCP(self.callbackPort,
-                                      channel.HTTPFactory(self.site))
-
-
-    def stopService(self):
-        return self.port.stopListening()
-
-
-    def _makeURI(self, verb, query=None):
-        uriComponents = urlparse.urlparse(self.baseURI)
-        uri = urlparse.urlunparse((uriComponents[0],
-                                   uriComponents[1],
-                                   uriComponents[2] + verb,
-                                   '',
-                                   query and urllib.urlencode(query) or '',
-                                   ''))
-        return uri
-
-
-    def callback(self, data, headers):
-        pass
-
-
-    def ping(self):
-        f = getPageWithFactory(self._makeURI(''),
-                               method='HEAD',
-                               agent=self.agent)
-        return f.deferred
-
-
-    def create(self):
-        f = getPageWithFactory(self._makeURI('create'),
-                    method='POST',
-                    agent=self.agent)
-        return f.deferred.addCallback(simplejson.loads)
-
-
-    def delete(self, xmppURI, redirectURI=None):
-        query = {'uri': xmppURI}
-
-        if redirectURI:
-            params = {'redirect_uri': redirectURI}
-            postdata = simplejson.dumps(params)
-            headers = {'Content-Type': MIME_JSON}
-        else:
-            postdata = None
-            headers = None
-
-        f = getPageWithFactory(self._makeURI('delete', query),
-                    method='POST',
-                    postdata=postdata,
-                    headers=headers,
-                    agent=self.agent)
-        return f.deferred
-
-
-    def publish(self, entry, xmppURI=None):
-        query = xmppURI and {'uri': xmppURI}
-
-        f = getPageWithFactory(self._makeURI('publish', query),
-                    method='POST',
-                    postdata=entry.toXml().encode('utf-8'),
-                    headers={'Content-Type': MIME_ATOM_ENTRY},
-                    agent=self.agent)
-        return f.deferred.addCallback(simplejson.loads)
-
-
-    def listNodes(self):
-        f = getPageWithFactory(self._makeURI('list'),
-                    method='GET',
-                    agent=self.agent)
-        return f.deferred.addCallback(simplejson.loads)
-
-
-    def subscribe(self, xmppURI):
-        params = {'uri': xmppURI,
-                  'callback': 'http://%s:%s/callback' % (self.callbackHost,
-                                                         self.callbackPort)}
-        f = getPageWithFactory(self._makeURI('subscribe'),
-                    method='POST',
-                    postdata=simplejson.dumps(params),
-                    headers={'Content-Type': MIME_JSON},
-                    agent=self.agent)
-        return f.deferred
-
-
-    def unsubscribe(self, xmppURI):
-        params = {'uri': xmppURI,
-                  'callback': 'http://%s:%s/callback' % (self.callbackHost,
-                                                         self.callbackPort)}
-        f = getPageWithFactory(self._makeURI('unsubscribe'),
-                    method='POST',
-                    postdata=simplejson.dumps(params),
-                    headers={'Content-Type': MIME_JSON},
-                    agent=self.agent)
-        return f.deferred
-
-
-    def items(self, xmppURI, maxItems=None):
-        query = {'uri': xmppURI}
-        if maxItems:
-             query['max_items'] = int(maxItems)
-        f = getPageWithFactory(self._makeURI('items', query),
-                    method='GET',
-                    agent=self.agent)
-        return f.deferred
--- a/idavoll/iidavoll.py	Thu May 10 23:08:08 2012 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,585 +0,0 @@
-# Copyright (c) 2003-2008 Ralph Meijer
-# See LICENSE for details.
-
-"""
-Interfaces for idavoll.
-"""
-
-from zope.interface import Attribute, Interface
-
-class IBackendService(Interface):
-    """ Interface to a backend service of a pubsub service. """
-
-
-    def __init__(storage):
-        """
-        @param storage: Object providing L{IStorage}.
-        """
-
-
-    def supportsPublisherAffiliation():
-        """ Reports if the backend supports the publisher affiliation.
-
-        @rtype: C{bool}
-        """
-
-
-    def supportsOutcastAffiliation():
-        """ Reports if the backend supports the publisher affiliation.
-
-        @rtype: C{bool}
-        """
-
-
-    def supportsPersistentItems():
-        """ Reports if the backend supports persistent items.
-
-        @rtype: C{bool}
-        """
-
-
-    def getNodeType(nodeIdentifier):
-        """ Return type of a node.
-
-        @return: a deferred that returns either 'leaf' or 'collection'
-        """
-
-
-    def getNodes():
-        """ Returns list of all nodes.
-
-        @return: a deferred that returns a C{list} of node ids.
-        """
-
-
-    def getNodeMetaData(nodeIdentifier):
-        """ Return meta data for a node.
-
-        @return: a deferred that returns a C{list} of C{dict}s with the
-                 metadata.
-        """
-
-
-    def createNode(nodeIdentifier, requestor):
-        """ Create a node.
-
-        @return: a deferred that fires when the node has been created.
-        """
-
-
-    def registerPreDelete(preDeleteFn):
-        """ Register a callback that is called just before a node deletion.
-
-        The function C{preDeletedFn} is added to a list of functions to be
-        called just before deletion of a node. The callback C{preDeleteFn} is
-        called with the C{nodeIdentifier} that is about to be deleted and
-        should return a deferred that returns a list of deferreds that are to
-        be fired after deletion. The backend collects the lists from all these
-        callbacks before actually deleting the node in question.  After
-        deletion all collected deferreds are fired to do post-processing.
-
-        The idea is that you want to be able to collect data from the node
-        before deleting it, for example to get a list of subscribers that have
-        to be notified after the node has been deleted. To do this,
-        C{preDeleteFn} fetches the subscriber list and passes this list to a
-        callback attached to a deferred that it sets up. This deferred is
-        returned in the list of deferreds.
-        """
-
-
-    def deleteNode(nodeIdentifier, requestor):
-        """ Delete a node.
-
-        @return: a deferred that fires when the node has been deleted.
-        """
-
-
-    def purgeNode(nodeIdentifier, requestor):
-        """ Removes all items in node from persistent storage """
-
-
-    def subscribe(nodeIdentifier, subscriber, requestor):
-        """ Request the subscription of an entity to a pubsub node.
-
-        Depending on the node's configuration and possible business rules, the
-        C{subscriber} is added to the list of subscriptions of the node with id
-        C{nodeIdentifier}. The C{subscriber} might be different from the
-        C{requestor}, and if the C{requestor} is not allowed to subscribe this
-        entity an exception should be raised.
-
-        @return: a deferred that returns the subscription state
-        """
-
-
-    def unsubscribe(nodeIdentifier, subscriber, requestor):
-        """ Cancel the subscription of an entity to a pubsub node.
-
-        The subscription of C{subscriber} is removed from the list of
-        subscriptions of the node with id C{nodeIdentifier}. If the
-        C{requestor} is not allowed to unsubscribe C{subscriber}, an an
-        exception should be raised.
-
-        @return: a deferred that fires when unsubscription is complete.
-        """
-
-
-    def getSubscribers(nodeIdentifier):
-        """ Get node subscriber list.
-
-        @return: a deferred that fires with the list of subscribers.
-        """
-
-
-    def getSubscriptions(entity):
-        """ Report the list of current subscriptions with this pubsub service.
-
-        Report the list of the current subscriptions with all nodes within this
-        pubsub service, for the C{entity}.
-
-        @return: a deferred that returns the list of all current subscriptions
-                 as tuples C{(nodeIdentifier, subscriber, subscription)}.
-        """
-
-
-    def getAffiliations(entity):
-        """ Report the list of current affiliations with this pubsub service.
-
-        Report the list of the current affiliations with all nodes within this
-        pubsub service, for the C{entity}.
-
-        @return: a deferred that returns the list of all current affiliations
-                 as tuples C{(nodeIdentifier, affiliation)}.
-        """
-
-
-    def publish(nodeIdentifier, items, requestor):
-        """ Publish items to a pubsub node.
-
-        @return: a deferred that fires when the items have been published.
-        @rtype: L{Deferred<twisted.internet.defer.Deferred>}
-        """
-
-
-    def registerNotifier(observerfn, *args, **kwargs):
-        """ Register callback which is called for notification. """
-
-
-    def getNotifications(nodeIdentifier, items):
-        """
-        Get notification list.
-
-        This method is called to discover which entities should receive
-        notifications for the given items that have just been published to the
-        given node.
-
-        The notification list contains tuples (subscriber, subscriptions,
-        items) to result in one notification per tuple: the given subscriptions
-        yielded the given items to be notified to this subscriber.  This
-        structure is needed allow for letting the subscriber know which
-        subscriptions yielded which notifications, while catering for
-        collection nodes and content-based subscriptions.
-
-        To minimize the amount of notifications per entity, implementers
-        should take care that if all items in C{items} were yielded
-        by the same set of subscriptions, exactly one tuple is for this
-        subscriber is returned, so that the subscriber would get exactly one
-        notification. Alternatively, one tuple per subscription combination.
-
-        @param nodeIdentifier: The identifier of the node the items were
-                               published to.
-        @type nodeIdentifier: C{unicode}.
-        @param items: The list of published items as
-                      L{Element<twisted.words.xish.domish.Element>}s.
-        @type items: C{list}
-        @return: The notification list as tuples of
-                 (L{JID<twisted.words.protocols.jabber.jid.JID>},
-                  C{list} of L{Subscription<wokkel.pubsub.Subscription>},
-                  C{list} of L{Element<twisted.words.xish.domish.Element>}.
-        @rtype: C{list}
-        """
-
-
-    def getItems(nodeIdentifier, requestor, maxItems=None, itemIdentifiers=[]):
-        """ Retrieve items from persistent storage
-
-        If C{maxItems} is given, return the C{maxItems} last published
-        items, else if C{itemIdentifiers} is not empty, return the items
-        requested.  If neither is given, return all items.
-
-        @return: a deferred that returns the requested items
-        """
-
-
-    def retractItem(nodeIdentifier, itemIdentifier, requestor):
-        """ Removes item in node from persistent storage """
-
-
-
-class IStorage(Interface):
-    """
-    Storage interface.
-    """
-
-
-    def getNode(nodeIdentifier):
-        """
-        Get Node.
-
-        @param nodeIdentifier: NodeID of the desired node.
-        @type nodeIdentifier: C{str}
-        @return: deferred that returns a L{INode} providing object.
-        """
-
-
-    def getNodeIds():
-        """
-        Return all NodeIDs.
-
-        @return: deferred that returns a list of NodeIDs (C{unicode}).
-        """
-
-
-    def createNode(nodeIdentifier, owner, config):
-        """
-        Create new node.
-
-        The implementation should make sure, the passed owner JID is stripped
-        of the resource (e.g. using C{owner.userhostJID()}). The passed config
-        is expected to have values for the fields returned by
-        L{getDefaultConfiguration}, as well as a value for
-        C{'pubsub#node_type'}.
-
-        @param nodeIdentifier: NodeID of the new node.
-        @type nodeIdentifier: C{unicode}
-        @param owner: JID of the new nodes's owner.
-        @type owner: L{JID<twisted.words.protocols.jabber.jid.JID>}
-        @param config: Node configuration.
-        @type config: C{dict}
-        @return: deferred that fires on creation.
-        """
-
-
-    def deleteNode(nodeIdentifier):
-        """
-        Delete a node.
-
-        @param nodeIdentifier: NodeID of the new node.
-        @type nodeIdentifier: C{unicode}
-        @return: deferred that fires on deletion.
-        """
-
-
-    def getAffiliations(entity):
-        """
-        Get all affiliations for entity.
-
-        The implementation should make sure, the passed owner JID is stripped
-        of the resource (e.g. using C{owner.userhostJID()}).
-
-        @param entity: JID of the entity.
-        @type entity: L{JID<twisted.words.protocols.jabber.jid.JID>}
-        @return: deferred that returns a C{list} of tuples of the form
-                 C{(nodeIdentifier, affiliation)}, where C{nodeIdentifier} is
-                 of the type L{unicode} and C{affiliation} is one of
-                 C{'owner'}, C{'publisher'} and C{'outcast'}.
-        """
-
-
-    def getSubscriptions(entity):
-        """
-        Get all subscriptions for an entity.
-
-        The implementation should make sure, the passed owner JID is stripped
-        of the resource (e.g. using C{owner.userhostJID()}).
-
-        @param entity: JID of the entity.
-        @type entity: L{JID<twisted.words.protocols.jabber.jid.JID>}
-        @return: deferred that returns a C{list} of tuples of the form
-                 C{(nodeIdentifier, subscriber, state)}, where
-                 C{nodeIdentifier} is of the type C{unicode}, C{subscriber} of
-                 the type J{JID<twisted.words.protocols.jabber.jid.JID>}, and
-                 C{state} is C{'subscribed'}, C{'pending'} or
-                 C{'unconfigured'}.
-        """
-
-
-    def getDefaultConfiguration(nodeType):
-        """
-        Get the default configuration for the given node type.
-
-        @param nodeType: Either C{'leaf'} or C{'collection'}.
-        @type nodeType: C{str}
-        @return: The default configuration.
-        @rtype: C{dict}.
-        @raises: L{idavoll.error.NoCollections} if collections are not
-                 supported.
-        """
-
-
-
-class INode(Interface):
-    """
-    Interface to the class of objects that represent nodes.
-    """
-
-    nodeType = Attribute("""The type of this node. One of {'leaf'},
-                           {'collection'}.""")
-    nodeIdentifier = Attribute("""The node identifer of this node""")
-
-
-    def getType():
-        """
-        Get node's type.
-
-        @return: C{'leaf'} or C{'collection'}.
-        """
-
-
-    def getConfiguration():
-        """
-        Get node's configuration.
-
-        The configuration must at least have two options:
-        C{pubsub#persist_items}, and C{pubsub#deliver_payloads}.
-
-        @return: C{dict} of configuration options.
-        """
-
-
-    def getMetaData():
-        """
-        Get node's meta data.
-
-        The meta data must be a superset of the configuration options, and
-        also at least should have a C{pubsub#node_type} entry.
-
-        @return: C{dict} of meta data.
-        """
-
-
-    def setConfiguration(options):
-        """
-        Set node's configuration.
-
-        The elements of {options} will set the new values for those
-        configuration items. This means that only changing items have to
-        be given.
-
-        @param options: a dictionary of configuration options.
-        @returns: a deferred that fires upon success.
-        """
-
-
-    def getAffiliation(entity):
-        """
-        Get affiliation of entity with this node.
-
-        @param entity: JID of entity.
-        @type entity: L{JID<twisted.words.protocols.jabber.jid.JID>}
-        @return: deferred that returns C{'owner'}, C{'publisher'}, C{'outcast'}
-                 or C{None}.
-        """
-
-
-    def getSubscription(subscriber):
-        """
-        Get subscription to this node of subscriber.
-
-        @param subscriber: JID of the new subscriptions' entity.
-        @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>}
-        @return: deferred that returns the subscription state (C{'subscribed'},
-                 C{'pending'} or C{None}).
-        """
-
-
-    def getSubscriptions(state=None):
-        """
-        Get list of subscriptions to this node.
-
-        The optional C{state} argument filters the subscriptions to their
-        state.
-
-        @param state: Subscription state filter. One of C{'subscribed'},
-                      C{'pending'}, C{'unconfigured'}.
-        @type state: C{str}
-        @return: a deferred that returns a C{list} of
-                 L{wokkel.pubsub.Subscription}s.
-        """
-
-
-    def addSubscription(subscriber, state, config):
-        """
-        Add new subscription to this node with given state.
-
-        @param subscriber: JID of the new subscriptions' entity.
-        @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>}
-        @param state: C{'subscribed'} or C{'pending'}
-        @type state: C{str}
-        @param config: Subscription configuration.
-        @param config: C{dict}
-        @return: deferred that fires on subscription.
-        """
-
-
-    def removeSubscription(subscriber):
-        """
-        Remove subscription to this node.
-
-        @param subscriber: JID of the subscriptions' entity.
-        @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>}
-        @return: deferred that fires on removal.
-        """
-
-
-    def isSubscribed(entity):
-        """
-        Returns whether entity has any subscription to this node.
-
-        Only returns C{True} when the subscription state (if present) is
-        C{'subscribed'} for any subscription that matches the bare JID.
-
-        @param subscriber: bare JID of the subscriptions' entity.
-        @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>}
-        @return: deferred that returns a C{bool}.
-        """
-
-
-    def getAffiliations():
-        """
-        Get affiliations of entities with this node.
-
-        @return: deferred that returns a C{list} of tuples (jid, affiliation),
-                 where jid is a L(JID<twisted.words.protocols.jabber.jid.JID>)
-                 and affiliation is one of C{'owner'},
-        C{'publisher'}, C{'outcast'}.
-        """
-
-
-
-class ILeafNode(Interface):
-    """
-    Interface to the class of objects that represent leaf nodes.
-    """
-
-    def storeItems(items, publisher):
-        """
-        Store items in persistent storage for later retrieval.
-
-        @param items: The list of items to be stored. Each item is the
-                      L{domish} representation of the XML fragment as defined
-                      for C{<item/>} in the
-                      C{http://jabber.org/protocol/pubsub} namespace.
-        @type items: C{list} of {domish.Element}
-        @param publisher: JID of the publishing entity.
-        @type publisher: L{JID<twisted.words.protocols.jabber.jid.JID>}
-        @return: deferred that fires upon success.
-        """
-
-
-    def removeItems(itemIdentifiers):
-        """
-        Remove items by id.
-
-        @param itemIdentifiers: C{list} of item ids.
-        @return: deferred that fires with a C{list} of ids of the items that
-                 were deleted
-        """
-
-
-    def getItems(maxItems=None):
-        """
-        Get items.
-
-        If C{maxItems} is not given, all items in the node are returned,
-        just like C{getItemsById}. Otherwise, C{maxItems} limits
-        the returned items to a maximum of that number of most recently
-        published items.
-
-        @param maxItems: if given, a natural number (>0) that limits the
-                          returned number of items.
-        @return: deferred that fires with a C{list} of found items.
-        """
-
-
-    def getItemsById(itemIdentifiers):
-        """
-        Get items by item id.
-
-        Each item in the returned list is a unicode string that
-        represent the XML of the item as it was published, including the
-        item wrapper with item id.
-
-        @param itemIdentifiers: C{list} of item ids.
-        @return: deferred that fires with a C{list} of found items.
-        """
-
-
-    def purge():
-        """
-        Purge node of all items in persistent storage.
-
-        @return: deferred that fires when the node has been purged.
-        """
-
-
-
-class IGatewayStorage(Interface):
-
-    def addCallback(service, nodeIdentifier, callback):
-        """
-        Register a callback URI.
-
-        The registered HTTP callback URI will have an Atom Entry documented
-        POSTed to it upon receiving a notification for the given pubsub node.
-
-        @param service: The XMPP entity that holds the node.
-        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
-        @param nodeIdentifier: The identifier of the publish-subscribe node.
-        @type nodeIdentifier: C{unicode}.
-        @param callback: The callback URI to be registered.
-        @type callback: C{str}.
-        @rtype: L{Deferred<twisted.internet.defer.Deferred>}
-        """
-
-    def removeCallback(service, nodeIdentifier, callback):
-        """
-        Remove a registered callback URI.
-
-        The returned deferred will fire with a boolean that signals wether or
-        not this was the last callback unregistered for this node.
-
-        @param service: The XMPP entity that holds the node.
-        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
-        @param nodeIdentifier: The identifier of the publish-subscribe node.
-        @type nodeIdentifier: C{unicode}.
-        @param callback: The callback URI to be unregistered.
-        @type callback: C{str}.
-        @rtype: L{Deferred<twisted.internet.defer.Deferred>}
-        """
-
-    def getCallbacks(service, nodeIdentifier):
-        """
-        Get the callbacks registered for this node.
-
-        Returns a deferred that fires with the set of HTTP callback URIs
-        registered for this node.
-
-        @param service: The XMPP entity that holds the node.
-        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
-        @param nodeIdentifier: The identifier of the publish-subscribe node.
-        @type nodeIdentifier: C{unicode}.
-        @rtype: L{Deferred<twisted.internet.defer.Deferred>}
-        """
-
-
-    def hasCallbacks(service, nodeIdentifier):
-        """
-        Return wether there are callbacks registered for a node.
-
-        @param service: The XMPP entity that holds the node.
-        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
-        @param nodeIdentifier: The identifier of the publish-subscribe node.
-        @type nodeIdentifier: C{unicode}.
-        @returns: Deferred that fires with a boolean.
-        @rtype: L{Deferred<twisted.internet.defer.Deferred>}
-        """
--- a/idavoll/memory_storage.py	Thu May 10 23:08:08 2012 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,320 +0,0 @@
-# Copyright (c) 2003-2010 Ralph Meijer
-# See LICENSE for details.
-
-import copy
-from zope.interface import implements
-from twisted.internet import defer
-from twisted.words.protocols.jabber import jid
-
-from wokkel.pubsub import Subscription
-
-from idavoll import error, iidavoll
-
-class Storage:
-
-    implements(iidavoll.IStorage)
-
-    defaultConfig = {
-            'leaf': {
-                "pubsub#persist_items": True,
-                "pubsub#deliver_payloads": True,
-                "pubsub#send_last_published_item": 'on_sub',
-            },
-            'collection': {
-                "pubsub#deliver_payloads": True,
-                "pubsub#send_last_published_item": 'on_sub',
-            }
-    }
-
-    def __init__(self):
-        rootNode = CollectionNode('', jid.JID('localhost'),
-                                  copy.copy(self.defaultConfig['collection']))
-        self._nodes = {'': rootNode}
-
-
-    def getNode(self, nodeIdentifier):
-        try:
-            node = self._nodes[nodeIdentifier]
-        except KeyError:
-            return defer.fail(error.NodeNotFound())
-
-        return defer.succeed(node)
-
-
-    def getNodeIds(self):
-        return defer.succeed(self._nodes.keys())
-
-
-    def createNode(self, nodeIdentifier, owner, config):
-        if nodeIdentifier in self._nodes:
-            return defer.fail(error.NodeExists())
-
-        if config['pubsub#node_type'] != 'leaf':
-            raise error.NoCollections()
-
-        node = LeafNode(nodeIdentifier, owner, config)
-        self._nodes[nodeIdentifier] = node
-
-        return defer.succeed(None)
-
-
-    def deleteNode(self, nodeIdentifier):
-        try:
-            del self._nodes[nodeIdentifier]
-        except KeyError:
-            return defer.fail(error.NodeNotFound())
-
-        return defer.succeed(None)
-
-
-    def getAffiliations(self, entity):
-        entity = entity.userhost()
-        return defer.succeed([(node.nodeIdentifier, node._affiliations[entity])
-                              for name, node in self._nodes.iteritems()
-                              if entity in node._affiliations])
-
-
-    def getSubscriptions(self, entity):
-        subscriptions = []
-        for node in self._nodes.itervalues():
-            for subscriber, subscription in node._subscriptions.iteritems():
-                subscriber = jid.internJID(subscriber)
-                if subscriber.userhostJID() == entity.userhostJID():
-                    subscriptions.append(subscription)
-
-        return defer.succeed(subscriptions)
-
-
-    def getDefaultConfiguration(self, nodeType):
-        if nodeType == 'collection':
-            raise error.NoCollections()
-
-        return self.defaultConfig[nodeType]
-
-
-class Node:
-
-    implements(iidavoll.INode)
-
-    def __init__(self, nodeIdentifier, owner, config):
-        self.nodeIdentifier = nodeIdentifier
-        self._affiliations = {owner.userhost(): 'owner'}
-        self._subscriptions = {}
-        self._config = copy.copy(config)
-
-
-    def getType(self):
-        return self.nodeType
-
-
-    def getConfiguration(self):
-        return self._config
-
-
-    def getMetaData(self):
-        config = copy.copy(self._config)
-        config["pubsub#node_type"] = self.nodeType
-        return config
-
-
-    def setConfiguration(self, options):
-        for option in options:
-            if option in self._config:
-                self._config[option] = options[option]
-
-        return defer.succeed(None)
-
-
-    def getAffiliation(self, entity):
-        return defer.succeed(self._affiliations.get(entity.userhost()))
-
-
-    def getSubscription(self, subscriber):
-        try:
-            subscription = self._subscriptions[subscriber.full()]
-        except KeyError:
-            return defer.succeed(None)
-        else:
-            return defer.succeed(subscription)
-
-
-    def getSubscriptions(self, state=None):
-        return defer.succeed(
-                [subscription
-                 for subscription in self._subscriptions.itervalues()
-                 if state is None or subscription.state == state])
-
-
-
-    def addSubscription(self, subscriber, state, options):
-        if self._subscriptions.get(subscriber.full()):
-            return defer.fail(error.SubscriptionExists())
-
-        subscription = Subscription(self.nodeIdentifier, subscriber, state,
-                                    options)
-        self._subscriptions[subscriber.full()] = subscription
-        return defer.succeed(None)
-
-
-    def removeSubscription(self, subscriber):
-        try:
-            del self._subscriptions[subscriber.full()]
-        except KeyError:
-            return defer.fail(error.NotSubscribed())
-
-        return defer.succeed(None)
-
-
-    def isSubscribed(self, entity):
-        for subscriber, subscription in self._subscriptions.iteritems():
-            if jid.internJID(subscriber).userhost() == entity.userhost() and \
-                    subscription.state == 'subscribed':
-                return defer.succeed(True)
-
-        return defer.succeed(False)
-
-
-    def getAffiliations(self):
-        affiliations = [(jid.internJID(entity), affiliation) for entity, affiliation
-                       in self._affiliations.iteritems()]
-
-        return defer.succeed(affiliations)
-
-
-
-class PublishedItem(object):
-    """
-    A published item.
-
-    This represent an item as it was published by an entity.
-
-    @ivar element: The DOM representation of the item that was published.
-    @type element: L{Element<twisted.words.xish.domish.Element>}
-    @ivar publisher: The entity that published the item.
-    @type publisher: L{JID<twisted.words.protocols.jabber.jid.JID>}
-    """
-
-    def __init__(self, element, publisher):
-        self.element = element
-        self.publisher = publisher
-
-
-
-class LeafNode(Node):
-
-    implements(iidavoll.ILeafNode)
-
-    nodeType = 'leaf'
-
-    def __init__(self, nodeIdentifier, owner, config):
-        Node.__init__(self, nodeIdentifier, owner, config)
-        self._items = {}
-        self._itemlist = []
-
-
-    def storeItems(self, items, publisher):
-        for element in items:
-            item = PublishedItem(element, publisher)
-            itemIdentifier = element["id"]
-            if itemIdentifier in self._items:
-                self._itemlist.remove(self._items[itemIdentifier])
-            self._items[itemIdentifier] = item
-            self._itemlist.append(item)
-
-        return defer.succeed(None)
-
-
-    def removeItems(self, itemIdentifiers):
-        deleted = []
-
-        for itemIdentifier in itemIdentifiers:
-            try:
-                item = self._items[itemIdentifier]
-            except KeyError:
-                pass
-            else:
-                self._itemlist.remove(item)
-                del self._items[itemIdentifier]
-                deleted.append(itemIdentifier)
-
-        return defer.succeed(deleted)
-
-
-    def getItems(self, maxItems=None):
-        if maxItems:
-            itemList = self._itemlist[-maxItems:]
-        else:
-            itemList = self._itemlist
-        return defer.succeed([item.element for item in itemList])
-
-
-    def getItemsById(self, itemIdentifiers):
-        items = []
-        for itemIdentifier in itemIdentifiers:
-            try:
-                item = self._items[itemIdentifier]
-            except KeyError:
-                pass
-            else:
-                items.append(item.element)
-        return defer.succeed(items)
-
-
-    def purge(self):
-        self._items = {}
-        self._itemlist = []
-
-        return defer.succeed(None)
-
-
-class CollectionNode(Node):
-    nodeType = 'collection'
-
-
-
-class GatewayStorage(object):
-    """
-    Memory based storage facility for the XMPP-HTTP gateway.
-    """
-
-    def __init__(self):
-        self.callbacks = {}
-
-
-    def addCallback(self, service, nodeIdentifier, callback):
-        try:
-            callbacks = self.callbacks[service, nodeIdentifier]
-        except KeyError:
-            callbacks = set([callback])
-            self.callbacks[service, nodeIdentifier] = callbacks
-        else:
-            callbacks.add(callback)
-            pass
-
-        return defer.succeed(None)
-
-
-    def removeCallback(self, service, nodeIdentifier, callback):
-        try:
-            callbacks = self.callbacks[service, nodeIdentifier]
-            callbacks.remove(callback)
-        except KeyError:
-            return defer.fail(error.NotSubscribed())
-        else:
-            if not callbacks:
-                del self.callbacks[service, nodeIdentifier]
-
-            return defer.succeed(not callbacks)
-
-
-    def getCallbacks(self, service, nodeIdentifier):
-        try:
-            callbacks = self.callbacks[service, nodeIdentifier]
-        except KeyError:
-            return defer.fail(error.NoCallbacks())
-        else:
-            return defer.succeed(callbacks)
-
-
-    def hasCallbacks(self, service, nodeIdentifier):
-        return defer.succeed((service, nodeIdentifier) in self.callbacks)
--- a/idavoll/pgsql_storage.py	Thu May 10 23:08:08 2012 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,605 +0,0 @@
-# Copyright (c) 2003-2008 Ralph Meijer
-# See LICENSE for details.
-
-import copy
-
-from zope.interface import implements
-
-from twisted.enterprise import adbapi
-from twisted.words.protocols.jabber import jid
-
-from wokkel.generic import parseXml, stripNamespace
-from wokkel.pubsub import Subscription
-
-from idavoll import error, iidavoll
-
-class Storage:
-
-    implements(iidavoll.IStorage)
-
-    defaultConfig = {
-            'leaf': {
-                "pubsub#persist_items": True,
-                "pubsub#deliver_payloads": True,
-                "pubsub#send_last_published_item": 'on_sub',
-            },
-            'collection': {
-                "pubsub#deliver_payloads": True,
-                "pubsub#send_last_published_item": 'on_sub',
-            }
-    }
-
-    def __init__(self, dbpool):
-        self.dbpool = dbpool
-
-
-    def getNode(self, nodeIdentifier):
-        return self.dbpool.runInteraction(self._getNode, nodeIdentifier)
-
-
-    def _getNode(self, cursor, nodeIdentifier):
-        configuration = {}
-        cursor.execute("""SELECT node_type,
-                                 persist_items,
-                                 deliver_payloads,
-                                 send_last_published_item
-                          FROM nodes
-                          WHERE node=%s""",
-                       (nodeIdentifier,))
-        row = cursor.fetchone()
-
-        if not row:
-            raise error.NodeNotFound()
-
-        if row[0] == 'leaf':
-            configuration = {
-                    'pubsub#persist_items': row[1],
-                    'pubsub#deliver_payloads': row[2],
-                    'pubsub#send_last_published_item':
-                        row[3]}
-            node = LeafNode(nodeIdentifier, configuration)
-            node.dbpool = self.dbpool
-            return node
-        elif row[0] == 'collection':
-            configuration = {
-                    'pubsub#deliver_payloads': row[2],
-                    'pubsub#send_last_published_item':
-                        row[3]}
-            node = CollectionNode(nodeIdentifier, configuration)
-            node.dbpool = self.dbpool
-            return node
-
-
-
-    def getNodeIds(self):
-        d = self.dbpool.runQuery("""SELECT node from nodes""")
-        d.addCallback(lambda results: [r[0] for r in results])
-        return d
-
-
-    def createNode(self, nodeIdentifier, owner, config):
-        return self.dbpool.runInteraction(self._createNode, nodeIdentifier,
-                                           owner, config)
-
-
-    def _createNode(self, cursor, nodeIdentifier, owner, config):
-        if config['pubsub#node_type'] != 'leaf':
-            raise error.NoCollections()
-
-        owner = owner.userhost()
-        try:
-            cursor.execute("""INSERT INTO nodes
-                              (node, node_type, persist_items,
-                               deliver_payloads, send_last_published_item)
-                              VALUES
-                              (%s, 'leaf', %s, %s, %s)""",
-                           (nodeIdentifier,
-                            config['pubsub#persist_items'],
-                            config['pubsub#deliver_payloads'],
-                            config['pubsub#send_last_published_item'])
-                           )
-        except cursor._pool.dbapi.IntegrityError:
-            raise error.NodeExists()
-
-        cursor.execute("""SELECT 1 from entities where jid=%s""",
-                       (owner,))
-
-        if not cursor.fetchone():
-            cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
-                           (owner,))
-
-        cursor.execute("""INSERT INTO affiliations
-                          (node_id, entity_id, affiliation)
-                          SELECT node_id, entity_id, 'owner' FROM
-                          (SELECT node_id FROM nodes WHERE node=%s) as n
-                          CROSS JOIN
-                          (SELECT entity_id FROM entities
-                                            WHERE jid=%s) as e""",
-                       (nodeIdentifier, owner))
-
-
-    def deleteNode(self, nodeIdentifier):
-        return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier)
-
-
-    def _deleteNode(self, cursor, nodeIdentifier):
-        cursor.execute("""DELETE FROM nodes WHERE node=%s""",
-                       (nodeIdentifier,))
-
-        if cursor.rowcount != 1:
-            raise error.NodeNotFound()
-
-
-    def getAffiliations(self, entity):
-        d = self.dbpool.runQuery("""SELECT node, affiliation FROM entities
-                                        NATURAL JOIN affiliations
-                                        NATURAL JOIN nodes
-                                        WHERE jid=%s""",
-                                     (entity.userhost(),))
-        d.addCallback(lambda results: [tuple(r) for r in results])
-        return d
-
-
-    def getSubscriptions(self, entity):
-        def toSubscriptions(rows):
-            subscriptions = []
-            for row in rows:
-                subscriber = jid.internJID('%s/%s' % (row[1],
-                                                      row[2]))
-                subscription = Subscription(row[0], subscriber, row[3])
-                subscriptions.append(subscription)
-            return subscriptions
-
-        d = self.dbpool.runQuery("""SELECT node, jid, resource, state
-                                     FROM entities
-                                     NATURAL JOIN subscriptions
-                                     NATURAL JOIN nodes
-                                     WHERE jid=%s""",
-                                  (entity.userhost(),))
-        d.addCallback(toSubscriptions)
-        return d
-
-
-    def getDefaultConfiguration(self, nodeType):
-        return self.defaultConfig[nodeType]
-
-
-
-class Node:
-
-    implements(iidavoll.INode)
-
-    def __init__(self, nodeIdentifier, config):
-        self.nodeIdentifier = nodeIdentifier
-        self._config = config
-
-
-    def _checkNodeExists(self, cursor):
-        cursor.execute("""SELECT node_id FROM nodes WHERE node=%s""",
-                       (self.nodeIdentifier,))
-        if not cursor.fetchone():
-            raise error.NodeNotFound()
-
-
-    def getType(self):
-        return self.nodeType
-
-
-    def getConfiguration(self):
-        return self._config
-
-
-    def setConfiguration(self, options):
-        config = copy.copy(self._config)
-
-        for option in options:
-            if option in config:
-                config[option] = options[option]
-
-        d = self.dbpool.runInteraction(self._setConfiguration, config)
-        d.addCallback(self._setCachedConfiguration, config)
-        return d
-
-
-    def _setConfiguration(self, cursor, config):
-        self._checkNodeExists(cursor)
-        cursor.execute("""UPDATE nodes SET persist_items=%s,
-                                           deliver_payloads=%s,
-                                           send_last_published_item=%s
-                          WHERE node=%s""",
-                       (config["pubsub#persist_items"],
-                        config["pubsub#deliver_payloads"],
-                        config["pubsub#send_last_published_item"],
-                        self.nodeIdentifier))
-
-
-    def _setCachedConfiguration(self, void, config):
-        self._config = config
-
-
-    def getMetaData(self):
-        config = copy.copy(self._config)
-        config["pubsub#node_type"] = self.nodeType
-        return config
-
-
-    def getAffiliation(self, entity):
-        return self.dbpool.runInteraction(self._getAffiliation, entity)
-
-
-    def _getAffiliation(self, cursor, entity):
-        self._checkNodeExists(cursor)
-        cursor.execute("""SELECT affiliation FROM affiliations
-                          NATURAL JOIN nodes
-                          NATURAL JOIN entities
-                          WHERE node=%s AND jid=%s""",
-                       (self.nodeIdentifier,
-                        entity.userhost()))
-
-        try:
-            return cursor.fetchone()[0]
-        except TypeError:
-            return None
-
-
-    def getSubscription(self, subscriber):
-        return self.dbpool.runInteraction(self._getSubscription, subscriber)
-
-
-    def _getSubscription(self, cursor, subscriber):
-        self._checkNodeExists(cursor)
-
-        userhost = subscriber.userhost()
-        resource = subscriber.resource or ''
-
-        cursor.execute("""SELECT state FROM subscriptions
-                          NATURAL JOIN nodes
-                          NATURAL JOIN entities
-                          WHERE node=%s AND jid=%s AND resource=%s""",
-                       (self.nodeIdentifier,
-                        userhost,
-                        resource))
-        row = cursor.fetchone()
-        if not row:
-            return None
-        else:
-            return Subscription(self.nodeIdentifier, subscriber, row[0])
-
-
-    def getSubscriptions(self, state=None):
-        return self.dbpool.runInteraction(self._getSubscriptions, state)
-
-
-    def _getSubscriptions(self, cursor, state):
-        self._checkNodeExists(cursor)
-
-        query = """SELECT jid, resource, state,
-                          subscription_type, subscription_depth
-                   FROM subscriptions
-                   NATURAL JOIN nodes
-                   NATURAL JOIN entities
-                   WHERE node=%s""";
-        values = [self.nodeIdentifier]
-
-        if state:
-            query += " AND state=%s"
-            values.append(state)
-
-        cursor.execute(query, values);
-        rows = cursor.fetchall()
-
-        subscriptions = []
-        for row in rows:
-            subscriber = jid.JID('%s/%s' % (row[0], row[1]))
-
-            options = {}
-            if row[3]:
-                options['pubsub#subscription_type'] = row[3];
-            if row[4]:
-                options['pubsub#subscription_depth'] = row[4];
-
-            subscriptions.append(Subscription(self.nodeIdentifier, subscriber,
-                                              row[2], options))
-
-        return subscriptions
-
-
-    def addSubscription(self, subscriber, state, config):
-        return self.dbpool.runInteraction(self._addSubscription, subscriber,
-                                          state, config)
-
-
-    def _addSubscription(self, cursor, subscriber, state, config):
-        self._checkNodeExists(cursor)
-
-        userhost = subscriber.userhost()
-        resource = subscriber.resource or ''
-
-        subscription_type = config.get('pubsub#subscription_type')
-        subscription_depth = config.get('pubsub#subscription_depth')
-
-        try:
-            cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
-                           (userhost,))
-        except cursor._pool.dbapi.IntegrityError:
-            cursor._connection.rollback()
-
-        try:
-            cursor.execute("""INSERT INTO subscriptions
-                              (node_id, entity_id, resource, state,
-                               subscription_type, subscription_depth)
-                              SELECT node_id, entity_id, %s, %s, %s, %s FROM
-                              (SELECT node_id FROM nodes
-                                              WHERE node=%s) as n
-                              CROSS JOIN
-                              (SELECT entity_id FROM entities
-                                                WHERE jid=%s) as e""",
-                           (resource,
-                            state,
-                            subscription_type,
-                            subscription_depth,
-                            self.nodeIdentifier,
-                            userhost))
-        except cursor._pool.dbapi.IntegrityError:
-            raise error.SubscriptionExists()
-
-
-    def removeSubscription(self, subscriber):
-        return self.dbpool.runInteraction(self._removeSubscription,
-                                           subscriber)
-
-
-    def _removeSubscription(self, cursor, subscriber):
-        self._checkNodeExists(cursor)
-
-        userhost = subscriber.userhost()
-        resource = subscriber.resource or ''
-
-        cursor.execute("""DELETE FROM subscriptions WHERE
-                          node_id=(SELECT node_id FROM nodes
-                                                  WHERE node=%s) AND
-                          entity_id=(SELECT entity_id FROM entities
-                                                      WHERE jid=%s) AND
-                          resource=%s""",
-                       (self.nodeIdentifier,
-                        userhost,
-                        resource))
-        if cursor.rowcount != 1:
-            raise error.NotSubscribed()
-
-        return None
-
-
-    def isSubscribed(self, entity):
-        return self.dbpool.runInteraction(self._isSubscribed, entity)
-
-
-    def _isSubscribed(self, cursor, entity):
-        self._checkNodeExists(cursor)
-
-        cursor.execute("""SELECT 1 FROM entities
-                          NATURAL JOIN subscriptions
-                          NATURAL JOIN nodes
-                          WHERE entities.jid=%s
-                          AND node=%s AND state='subscribed'""",
-                       (entity.userhost(),
-                       self.nodeIdentifier))
-
-        return cursor.fetchone() is not None
-
-
-    def getAffiliations(self):
-        return self.dbpool.runInteraction(self._getAffiliations)
-
-
-    def _getAffiliations(self, cursor):
-        self._checkNodeExists(cursor)
-
-        cursor.execute("""SELECT jid, affiliation FROM nodes
-                          NATURAL JOIN affiliations
-                          NATURAL JOIN entities
-                          WHERE node=%s""",
-                       (self.nodeIdentifier,))
-        result = cursor.fetchall()
-
-        return [(jid.internJID(r[0]), r[1]) for r in result]
-
-
-
-class LeafNode(Node):
-
-    implements(iidavoll.ILeafNode)
-
-    nodeType = 'leaf'
-
-    def storeItems(self, items, publisher):
-        return self.dbpool.runInteraction(self._storeItems, items, publisher)
-
-
-    def _storeItems(self, cursor, items, publisher):
-        self._checkNodeExists(cursor)
-        for item in items:
-            self._storeItem(cursor, item, publisher)
-
-
-    def _storeItem(self, cursor, item, publisher):
-        data = item.toXml()
-        cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s
-                          FROM nodes
-                          WHERE nodes.node_id = items.node_id AND
-                                nodes.node = %s and items.item=%s""",
-                       (publisher.full(),
-                        data,
-                        self.nodeIdentifier,
-                        item["id"]))
-        if cursor.rowcount == 1:
-            return
-
-        cursor.execute("""INSERT INTO items (node_id, item, publisher, data)
-                          SELECT node_id, %s, %s, %s FROM nodes
-                                                     WHERE node=%s""",
-                       (item["id"],
-                        publisher.full(),
-                        data,
-                        self.nodeIdentifier))
-
-
-    def removeItems(self, itemIdentifiers):
-        return self.dbpool.runInteraction(self._removeItems, itemIdentifiers)
-
-
-    def _removeItems(self, cursor, itemIdentifiers):
-        self._checkNodeExists(cursor)
-
-        deleted = []
-
-        for itemIdentifier in itemIdentifiers:
-            cursor.execute("""DELETE FROM items WHERE
-                              node_id=(SELECT node_id FROM nodes
-                                                      WHERE node=%s) AND
-                              item=%s""",
-                           (self.nodeIdentifier,
-                            itemIdentifier))
-
-            if cursor.rowcount:
-                deleted.append(itemIdentifier)
-
-        return deleted
-
-
-    def getItems(self, maxItems=None):
-        return self.dbpool.runInteraction(self._getItems, maxItems)
-
-
-    def _getItems(self, cursor, maxItems):
-        self._checkNodeExists(cursor)
-        query = """SELECT data FROM nodes
-                   NATURAL JOIN items
-                   WHERE node=%s ORDER BY date DESC"""
-        if maxItems:
-            cursor.execute(query + " LIMIT %s",
-                           (self.nodeIdentifier,
-                            maxItems))
-        else:
-            cursor.execute(query, (self.nodeIdentifier,))
-
-        result = cursor.fetchall()
-        items = [stripNamespace(parseXml(r[0])) for r in result]
-        return items
-
-
-    def getItemsById(self, itemIdentifiers):
-        return self.dbpool.runInteraction(self._getItemsById, itemIdentifiers)
-
-
-    def _getItemsById(self, cursor, itemIdentifiers):
-        self._checkNodeExists(cursor)
-        items = []
-        for itemIdentifier in itemIdentifiers:
-            cursor.execute("""SELECT data FROM nodes
-                              NATURAL JOIN items
-                              WHERE node=%s AND item=%s""",
-                           (self.nodeIdentifier,
-                            itemIdentifier))
-            result = cursor.fetchone()
-            if result:
-                items.append(parseXml(result[0]))
-        return items
-
-
-    def purge(self):
-        return self.dbpool.runInteraction(self._purge)
-
-
-    def _purge(self, cursor):
-        self._checkNodeExists(cursor)
-
-        cursor.execute("""DELETE FROM items WHERE
-                          node_id=(SELECT node_id FROM nodes WHERE node=%s)""",
-                       (self.nodeIdentifier,))
-
-
-class CollectionNode(Node):
-
-    nodeType = 'collection'
-
-
-
-class GatewayStorage(object):
-    """
-    Memory based storage facility for the XMPP-HTTP gateway.
-    """
-
-    def __init__(self, dbpool):
-        self.dbpool = dbpool
-
-
-    def _countCallbacks(self, cursor, service, nodeIdentifier):
-        """
-        Count number of callbacks registered for a node.
-        """
-        cursor.execute("""SELECT count(*) FROM callbacks
-                          WHERE service=%s and node=%s""",
-                       service.full(),
-                       nodeIdentifier)
-        results = cursor.fetchall()
-        return results[0][0]
-
-
-    def addCallback(self, service, nodeIdentifier, callback):
-        def interaction(cursor):
-            cursor.execute("""SELECT 1 FROM callbacks
-                              WHERE service=%s and node=%s and uri=%s""",
-                           service.full(),
-                           nodeIdentifier,
-                           callback)
-            if cursor.fetchall():
-                return
-
-            cursor.execute("""INSERT INTO callbacks
-                              (service, node, uri) VALUES
-                              (%s, %s, %s)""",
-                           service.full(),
-                           nodeIdentifier,
-                           callback)
-
-        return self.dbpool.runInteraction(interaction)
-
-
-    def removeCallback(self, service, nodeIdentifier, callback):
-        def interaction(cursor):
-            cursor.execute("""DELETE FROM callbacks
-                              WHERE service=%s and node=%s and uri=%s""",
-                           service.full(),
-                           nodeIdentifier,
-                           callback)
-
-            if cursor.rowcount != 1:
-                raise error.NotSubscribed()
-
-            last = not self._countCallbacks(cursor, service, nodeIdentifier)
-            return last
-
-        return self.dbpool.runInteraction(interaction)
-
-    def getCallbacks(self, service, nodeIdentifier):
-        def interaction(cursor):
-            cursor.execute("""SELECT uri FROM callbacks
-                              WHERE service=%s and node=%s""",
-                           service.full(),
-                           nodeIdentifier)
-            results = cursor.fetchall()
-
-            if not results:
-                raise error.NoCallbacks()
-
-            return [result[0] for result in results]
-
-        return self.dbpool.runInteraction(interaction)
-
-
-    def hasCallbacks(self, service, nodeIdentifier):
-        def interaction(cursor):
-            return bool(self._countCallbacks(cursor, service, nodeIdentifier))
-
-        return self.dbpool.runInteraction(interaction)
--- a/idavoll/tap.py	Thu May 10 23:08:08 2012 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,94 +0,0 @@
-# Copyright (c) 2003-2010 Ralph Meijer
-# See LICENSE for details.
-
-from twisted.application import service
-from twisted.python import usage
-from twisted.words.protocols.jabber.jid import JID
-
-from wokkel.component import Component
-from wokkel.disco import DiscoHandler
-from wokkel.generic import FallbackHandler, VersionHandler
-from wokkel.iwokkel import IPubSubResource
-from wokkel.pubsub import PubSubService
-
-from idavoll import __version__
-from idavoll.backend import BackendService
-
-class Options(usage.Options):
-    optParameters = [
-        ('jid', None, 'pubsub', 'JID this component will be available at'),
-        ('secret', None, 'secret', 'Jabber server component secret'),
-        ('rhost', None, '127.0.0.1', 'Jabber server host'),
-        ('rport', None, '5347', 'Jabber server port'),
-        ('backend', None, 'memory', 'Choice of storage backend'),
-        ('dbuser', None, None, 'Database user (pgsql backend)'),
-        ('dbname', None, 'pubsub', 'Database name (pgsql backend)'),
-        ('dbpass', None, None, 'Database password (pgsql backend)'),
-        ('dbhost', None, None, 'Database host (pgsql backend)'),
-        ('dbport', None, None, 'Database port (pgsql backend)'),
-    ]
-
-    optFlags = [
-        ('verbose', 'v', 'Show traffic'),
-        ('hide-nodes', None, 'Hide all nodes for disco')
-    ]
-
-    def postOptions(self):
-        if self['backend'] not in ['pgsql', 'memory']:
-            raise usage.UsageError, "Unknown backend!"
-
-        self['jid'] = JID(self['jid'])
-
-
-
-def makeService(config):
-    s = service.MultiService()
-
-    # Create backend service with storage
-
-    if config['backend'] == 'pgsql':
-        from twisted.enterprise import adbapi
-        from idavoll.pgsql_storage import Storage
-        dbpool = adbapi.ConnectionPool('psycopg2',
-                                       user=config['dbuser'],
-                                       password=config['dbpass'],
-                                       database=config['dbname'],
-                                       host=config['dbhost'],
-                                       port=config['dbport'],
-                                       cp_reconnect=True,
-                                       client_encoding='utf-8',
-                                       )
-        st = Storage(dbpool)
-    elif config['backend'] == 'memory':
-        from idavoll.memory_storage import Storage
-        st = Storage()
-
-    bs = BackendService(st)
-    bs.setName('backend')
-    bs.setServiceParent(s)
-
-    # Set up XMPP server-side component with publish-subscribe capabilities
-
-    cs = Component(config["rhost"], int(config["rport"]),
-                   config["jid"].full(), config["secret"])
-    cs.setName('component')
-    cs.setServiceParent(s)
-
-    cs.factory.maxDelay = 900
-
-    if config["verbose"]:
-        cs.logTraffic = True
-
-    FallbackHandler().setHandlerParent(cs)
-    VersionHandler('Idavoll', __version__).setHandlerParent(cs)
-    DiscoHandler().setHandlerParent(cs)
-
-    resource = IPubSubResource(bs)
-    resource.hideNodes = config["hide-nodes"]
-    resource.serviceJID = config["jid"]
-
-    ps = PubSubService(resource)
-    ps.setHandlerParent(cs)
-    resource.pubsubService = ps
-
-    return s
--- a/idavoll/tap_http.py	Thu May 10 23:08:08 2012 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,97 +0,0 @@
-# Copyright (c) 2003-2008 Ralph Meijer
-# See LICENSE for details.
-
-from twisted.application import internet, service, strports
-from twisted.conch import manhole, manhole_ssh
-from twisted.cred import portal, checkers
-from twisted.web2 import channel, log, resource, server
-from twisted.web2.tap import Web2Service
-
-from idavoll import gateway, tap
-from idavoll.gateway import RemoteSubscriptionService
-
-class Options(tap.Options):
-    optParameters = [
-            ('webport', None, '8086', 'Web port'),
-    ]
-
-
-
-def getManholeFactory(namespace, **passwords):
-    def getManHole(_):
-        return manhole.Manhole(namespace)
-
-    realm = manhole_ssh.TerminalRealm()
-    realm.chainedProtocolFactory.protocolFactory = getManHole
-    p = portal.Portal(realm)
-    p.registerChecker(
-            checkers.InMemoryUsernamePasswordDatabaseDontUse(**passwords))
-    f = manhole_ssh.ConchFactory(p)
-    return f
-
-
-
-def makeService(config):
-    s = tap.makeService(config)
-
-    bs = s.getServiceNamed('backend')
-    cs = s.getServiceNamed('component')
-
-    # Set up XMPP service for subscribing to remote nodes
-
-    if config['backend'] == 'pgsql':
-        from idavoll.pgsql_storage import GatewayStorage
-        gst = GatewayStorage(bs.storage.dbpool)
-    elif config['backend'] == 'memory':
-        from idavoll.memory_storage import GatewayStorage
-        gst = GatewayStorage()
-
-    ss = RemoteSubscriptionService(config['jid'], gst)
-    ss.setHandlerParent(cs)
-    ss.startService()
-
-    # Set up web service
-
-    root = resource.Resource()
-
-    # Set up resources that exposes the backend
-    root.child_create = gateway.CreateResource(bs, config['jid'],
-                                               config['jid'])
-    root.child_delete = gateway.DeleteResource(bs, config['jid'],
-                                               config['jid'])
-    root.child_publish = gateway.PublishResource(bs, config['jid'],
-                                                 config['jid'])
-    root.child_list = gateway.ListResource(bs)
-
-    # Set up resources for accessing remote pubsub nodes.
-    root.child_subscribe = gateway.RemoteSubscribeResource(ss)
-    root.child_unsubscribe = gateway.RemoteUnsubscribeResource(ss)
-    root.child_items = gateway.RemoteItemsResource(ss)
-
-    if config["verbose"]:
-        root = log.LogWrapperResource(root)
-
-    site = server.Site(root)
-    w = internet.TCPServer(int(config['webport']), channel.HTTPFactory(site))
-
-    if config["verbose"]:
-        logObserver = log.DefaultCommonAccessLoggingObserver()
-        w2s = Web2Service(logObserver)
-        w.setServiceParent(w2s)
-        w = w2s
-
-    w.setServiceParent(s)
-
-    # Set up a manhole
-
-    namespace = {'service': s,
-                 'component': cs,
-                 'backend': bs,
-                 'root': root}
-
-    f = getManholeFactory(namespace, admin='admin')
-    manholeService = strports.service('2222', f)
-    manholeService.setServiceParent(s)
-
-    return s
-
--- a/idavoll/test/__init__.py	Thu May 10 23:08:08 2012 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,6 +0,0 @@
-# Copyright (c) 2003-2007 Ralph Meijer
-# See LICENSE for details.
-
-"""
-Tests for L{idavoll}.
-"""
--- a/idavoll/test/test_backend.py	Thu May 10 23:08:08 2012 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,611 +0,0 @@
-# Copyright (c) 2003-2010 Ralph Meijer
-# See LICENSE for details.
-
-"""
-Tests for L{idavoll.backend}.
-"""
-
-from zope.interface import implements
-from zope.interface.verify import verifyObject
-
-from twisted.internet import defer
-from twisted.trial import unittest
-from twisted.words.protocols.jabber import jid
-from twisted.words.protocols.jabber.error import StanzaError
-
-from wokkel import iwokkel, pubsub
-
-from idavoll import backend, error, iidavoll
-
-OWNER = jid.JID('owner@example.com')
-OWNER_FULL = jid.JID('owner@example.com/home')
-SERVICE = jid.JID('test.example.org')
-NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
-
-class BackendTest(unittest.TestCase):
-
-    def test_interfaceIBackend(self):
-        self.assertTrue(verifyObject(iidavoll.IBackendService,
-                                     backend.BackendService(None)))
-
-
-    def test_deleteNode(self):
-        class TestNode:
-            nodeIdentifier = 'to-be-deleted'
-            def getAffiliation(self, entity):
-                if entity.userhostJID() == OWNER:
-                    return defer.succeed('owner')
-
-        class TestStorage:
-            def __init__(self):
-                self.deleteCalled = []
-
-            def getNode(self, nodeIdentifier):
-                return defer.succeed(TestNode())
-
-            def deleteNode(self, nodeIdentifier):
-                if nodeIdentifier in ['to-be-deleted']:
-                    self.deleteCalled.append(nodeIdentifier)
-                    return defer.succeed(None)
-                else:
-                    return defer.fail(error.NodeNotFound())
-
-        def preDelete(data):
-            self.assertFalse(self.storage.deleteCalled)
-            preDeleteCalled.append(data)
-            return defer.succeed(None)
-
-        def cb(result):
-            self.assertEquals(1, len(preDeleteCalled))
-            data = preDeleteCalled[-1]
-            self.assertEquals('to-be-deleted', data['nodeIdentifier'])
-            self.assertTrue(self.storage.deleteCalled)
-
-        self.storage = TestStorage()
-        self.backend = backend.BackendService(self.storage)
-
-        preDeleteCalled = []
-
-        self.backend.registerPreDelete(preDelete)
-        d = self.backend.deleteNode('to-be-deleted', OWNER_FULL)
-        d.addCallback(cb)
-        return d
-
-
-    def test_deleteNodeRedirect(self):
-        uri = 'xmpp:%s?;node=test2' % (SERVICE.full(),)
-
-        class TestNode:
-            nodeIdentifier = 'to-be-deleted'
-            def getAffiliation(self, entity):
-                if entity.userhostJID() == OWNER:
-                    return defer.succeed('owner')
-
-        class TestStorage:
-            def __init__(self):
-                self.deleteCalled = []
-
-            def getNode(self, nodeIdentifier):
-                return defer.succeed(TestNode())
-
-            def deleteNode(self, nodeIdentifier):
-                if nodeIdentifier in ['to-be-deleted']:
-                    self.deleteCalled.append(nodeIdentifier)
-                    return defer.succeed(None)
-                else:
-                    return defer.fail(error.NodeNotFound())
-
-        def preDelete(data):
-            self.assertFalse(self.storage.deleteCalled)
-            preDeleteCalled.append(data)
-            return defer.succeed(None)
-
-        def cb(result):
-            self.assertEquals(1, len(preDeleteCalled))
-            data = preDeleteCalled[-1]
-            self.assertEquals('to-be-deleted', data['nodeIdentifier'])
-            self.assertEquals(uri, data['redirectURI'])
-            self.assertTrue(self.storage.deleteCalled)
-
-        self.storage = TestStorage()
-        self.backend = backend.BackendService(self.storage)
-
-        preDeleteCalled = []
-
-        self.backend.registerPreDelete(preDelete)
-        d = self.backend.deleteNode('to-be-deleted', OWNER, redirectURI=uri)
-        d.addCallback(cb)
-        return d
-
-
-    def test_createNodeNoID(self):
-        """
-        Test creation of a node without a given node identifier.
-        """
-        class TestStorage:
-            def getDefaultConfiguration(self, nodeType):
-                return {}
-
-            def createNode(self, nodeIdentifier, requestor, config):
-                self.nodeIdentifier = nodeIdentifier
-                return defer.succeed(None)
-
-        self.storage = TestStorage()
-        self.backend = backend.BackendService(self.storage)
-        self.storage.backend = self.backend
-
-        def checkID(nodeIdentifier):
-            self.assertNotIdentical(None, nodeIdentifier)
-            self.assertIdentical(self.storage.nodeIdentifier, nodeIdentifier)
-
-        d = self.backend.createNode(None, OWNER_FULL)
-        d.addCallback(checkID)
-        return d
-
-    class NodeStore:
-        """
-        I just store nodes to pose as an L{IStorage} implementation.
-        """
-        def __init__(self, nodes):
-            self.nodes = nodes
-
-        def getNode(self, nodeIdentifier):
-            try:
-                return defer.succeed(self.nodes[nodeIdentifier])
-            except KeyError:
-                return defer.fail(error.NodeNotFound())
-
-
-    def test_getNotifications(self):
-        """
-        Ensure subscribers show up in the notification list.
-        """
-        item = pubsub.Item()
-        sub = pubsub.Subscription('test', OWNER, 'subscribed')
-
-        class TestNode:
-            def getSubscriptions(self, state=None):
-                return [sub]
-
-        def cb(result):
-            self.assertEquals(1, len(result))
-            subscriber, subscriptions, items = result[-1]
-
-            self.assertEquals(OWNER, subscriber)
-            self.assertEquals(set([sub]), subscriptions)
-            self.assertEquals([item], items)
-
-        self.storage = self.NodeStore({'test': TestNode()})
-        self.backend = backend.BackendService(self.storage)
-        d = self.backend.getNotifications('test', [item])
-        d.addCallback(cb)
-        return d
-
-    def test_getNotificationsRoot(self):
-        """
-        Ensure subscribers to the root node show up in the notification list
-        for leaf nodes.
-
-        This assumes a flat node relationship model with exactly one collection
-        node: the root node. Each leaf node is automatically a child node
-        of the root node.
-        """
-        item = pubsub.Item()
-        subRoot = pubsub.Subscription('', OWNER, 'subscribed')
-
-        class TestNode:
-            def getSubscriptions(self, state=None):
-                return []
-
-        class TestRootNode:
-            def getSubscriptions(self, state=None):
-                return [subRoot]
-
-        def cb(result):
-            self.assertEquals(1, len(result))
-            subscriber, subscriptions, items = result[-1]
-            self.assertEquals(OWNER, subscriber)
-            self.assertEquals(set([subRoot]), subscriptions)
-            self.assertEquals([item], items)
-
-        self.storage = self.NodeStore({'test': TestNode(),
-                                       '': TestRootNode()})
-        self.backend = backend.BackendService(self.storage)
-        d = self.backend.getNotifications('test', [item])
-        d.addCallback(cb)
-        return d
-
-
-    def test_getNotificationsMultipleNodes(self):
-        """
-        Ensure that entities that subscribe to a leaf node as well as the
-        root node get exactly one notification.
-        """
-        item = pubsub.Item()
-        sub = pubsub.Subscription('test', OWNER, 'subscribed')
-        subRoot = pubsub.Subscription('', OWNER, 'subscribed')
-
-        class TestNode:
-            def getSubscriptions(self, state=None):
-                return [sub]
-
-        class TestRootNode:
-            def getSubscriptions(self, state=None):
-                return [subRoot]
-
-        def cb(result):
-            self.assertEquals(1, len(result))
-            subscriber, subscriptions, items = result[-1]
-
-            self.assertEquals(OWNER, subscriber)
-            self.assertEquals(set([sub, subRoot]), subscriptions)
-            self.assertEquals([item], items)
-
-        self.storage = self.NodeStore({'test': TestNode(),
-                                       '': TestRootNode()})
-        self.backend = backend.BackendService(self.storage)
-        d = self.backend.getNotifications('test', [item])
-        d.addCallback(cb)
-        return d
-
-
-    def test_getDefaultConfiguration(self):
-        """
-        L{backend.BackendService.getDefaultConfiguration} should return
-        a deferred that fires a dictionary with configuration values.
-        """
-
-        class TestStorage:
-            def getDefaultConfiguration(self, nodeType):
-                return {
-                    "pubsub#persist_items": True,
-                    "pubsub#deliver_payloads": True}
-
-        def cb(options):
-            self.assertIn("pubsub#persist_items", options)
-            self.assertEqual(True, options["pubsub#persist_items"])
-
-        self.backend = backend.BackendService(TestStorage())
-        d = self.backend.getDefaultConfiguration('leaf')
-        d.addCallback(cb)
-        return d
-
-
-    def test_getNodeConfiguration(self):
-        class testNode:
-            nodeIdentifier = 'node'
-            def getConfiguration(self):
-                return {'pubsub#deliver_payloads': True,
-                        'pubsub#persist_items': False}
-
-        class testStorage:
-            def getNode(self, nodeIdentifier):
-                return defer.succeed(testNode())
-
-        def cb(options):
-            self.assertIn("pubsub#deliver_payloads", options)
-            self.assertEqual(True, options["pubsub#deliver_payloads"])
-            self.assertIn("pubsub#persist_items", options)
-            self.assertEqual(False, options["pubsub#persist_items"])
-
-        self.storage = testStorage()
-        self.backend = backend.BackendService(self.storage)
-        self.storage.backend = self.backend
-
-        d = self.backend.getNodeConfiguration('node')
-        d.addCallback(cb)
-        return d
-
-
-    def test_setNodeConfiguration(self):
-        class testNode:
-            nodeIdentifier = 'node'
-            def getAffiliation(self, entity):
-                if entity.userhostJID() == OWNER:
-                    return defer.succeed('owner')
-            def setConfiguration(self, options):
-                self.options = options
-
-        class testStorage:
-            def __init__(self):
-                self.nodes = {'node': testNode()}
-            def getNode(self, nodeIdentifier):
-                return defer.succeed(self.nodes[nodeIdentifier])
-
-        def checkOptions(node):
-            options = node.options
-            self.assertIn("pubsub#deliver_payloads", options)
-            self.assertEqual(True, options["pubsub#deliver_payloads"])
-            self.assertIn("pubsub#persist_items", options)
-            self.assertEqual(False, options["pubsub#persist_items"])
-
-        def cb(result):
-            d = self.storage.getNode('node')
-            d.addCallback(checkOptions)
-            return d
-
-        self.storage = testStorage()
-        self.backend = backend.BackendService(self.storage)
-        self.storage.backend = self.backend
-
-        options = {'pubsub#deliver_payloads': True,
-                   'pubsub#persist_items': False}
-
-        d = self.backend.setNodeConfiguration('node', options, OWNER_FULL)
-        d.addCallback(cb)
-        return d
-
-
-    def test_publishNoID(self):
-        """
-        Test publish request with an item without a node identifier.
-        """
-        class TestNode:
-            nodeType = 'leaf'
-            nodeIdentifier = 'node'
-            def getAffiliation(self, entity):
-                if entity.userhostJID() == OWNER:
-                    return defer.succeed('owner')
-            def getConfiguration(self):
-                return {'pubsub#deliver_payloads': True,
-                        'pubsub#persist_items': False}
-
-        class TestStorage:
-            def getNode(self, nodeIdentifier):
-                return defer.succeed(TestNode())
-
-        def checkID(notification):
-            self.assertNotIdentical(None, notification['items'][0]['id'])
-
-        self.storage = TestStorage()
-        self.backend = backend.BackendService(self.storage)
-        self.storage.backend = self.backend
-
-        self.backend.registerNotifier(checkID)
-
-        items = [pubsub.Item()]
-        d = self.backend.publish('node', items, OWNER_FULL)
-        return d
-
-
-    def test_notifyOnSubscription(self):
-        """
-        Test notification of last published item on subscription.
-        """
-        ITEM = "<item xmlns='%s' id='1'/>" % NS_PUBSUB
-
-        class TestNode:
-            implements(iidavoll.ILeafNode)
-            nodeIdentifier = 'node'
-            nodeType = 'leaf'
-            def getAffiliation(self, entity):
-                if entity is OWNER:
-                    return defer.succeed('owner')
-            def getConfiguration(self):
-                return {'pubsub#deliver_payloads': True,
-                        'pubsub#persist_items': False,
-                        'pubsub#send_last_published_item': 'on_sub'}
-            def getItems(self, maxItems):
-                return [ITEM]
-            def addSubscription(self, subscriber, state, options):
-                self.subscription = pubsub.Subscription('node', subscriber,
-                                                        state, options)
-                return defer.succeed(None)
-            def getSubscription(self, subscriber):
-                return defer.succeed(self.subscription)
-
-        class TestStorage:
-            def getNode(self, nodeIdentifier):
-                return defer.succeed(TestNode())
-
-        def cb(data):
-            self.assertEquals('node', data['nodeIdentifier'])
-            self.assertEquals([ITEM], data['items'])
-            self.assertEquals(OWNER, data['subscription'].subscriber)
-
-        self.storage = TestStorage()
-        self.backend = backend.BackendService(self.storage)
-        self.storage.backend = self.backend
-
-        d1 = defer.Deferred()
-        d1.addCallback(cb)
-        self.backend.registerNotifier(d1.callback)
-        d2 = self.backend.subscribe('node', OWNER, OWNER_FULL)
-        return defer.gatherResults([d1, d2])
-
-    test_notifyOnSubscription.timeout = 2
-
-
-
-class BaseTestBackend(object):
-    """
-    Base class for backend stubs.
-    """
-
-    def supportsPublisherAffiliation(self):
-        return True
-
-
-    def supportsOutcastAffiliation(self):
-        return True
-
-
-    def supportsPersistentItems(self):
-        return True
-
-
-    def supportsInstantNodes(self):
-        return True
-
-
-    def registerNotifier(self, observerfn, *args, **kwargs):
-        return
-
-
-    def registerPreDelete(self, preDeleteFn):
-        return
-
-
-
-class PubSubResourceFromBackendTest(unittest.TestCase):
-
-    def test_interface(self):
-        resource = backend.PubSubResourceFromBackend(BaseTestBackend())
-        self.assertTrue(verifyObject(iwokkel.IPubSubResource, resource))
-
-
-    def test_preDelete(self):
-        """
-        Test pre-delete sending out notifications to subscribers.
-        """
-
-        class TestBackend(BaseTestBackend):
-            preDeleteFn = None
-
-            def registerPreDelete(self, preDeleteFn):
-                self.preDeleteFn = preDeleteFn
-
-            def getSubscribers(self, nodeIdentifier):
-                return defer.succeed([OWNER])
-
-        def notifyDelete(service, nodeIdentifier, subscribers,
-                         redirectURI=None):
-            self.assertEqual(SERVICE, service)
-            self.assertEqual('test', nodeIdentifier)
-            self.assertEqual([OWNER], subscribers)
-            self.assertIdentical(None, redirectURI)
-            d1.callback(None)
-
-        d1 = defer.Deferred()
-        resource = backend.PubSubResourceFromBackend(TestBackend())
-        resource.serviceJID = SERVICE
-        resource.pubsubService = pubsub.PubSubService()
-        resource.pubsubService.notifyDelete = notifyDelete
-        self.assertTrue(verifyObject(iwokkel.IPubSubResource, resource))
-        self.assertNotIdentical(None, resource.backend.preDeleteFn)
-        data = {'nodeIdentifier': 'test'}
-        d2 = resource.backend.preDeleteFn(data)
-        return defer.DeferredList([d1, d2], fireOnOneErrback=1)
-
-
-    def test_preDeleteRedirect(self):
-        """
-        Test pre-delete sending out notifications to subscribers.
-        """
-
-        uri = 'xmpp:%s?;node=test2' % (SERVICE.full(),)
-
-        class TestBackend(BaseTestBackend):
-            preDeleteFn = None
-
-            def registerPreDelete(self, preDeleteFn):
-                self.preDeleteFn = preDeleteFn
-
-            def getSubscribers(self, nodeIdentifier):
-                return defer.succeed([OWNER])
-
-        def notifyDelete(service, nodeIdentifier, subscribers,
-                         redirectURI=None):
-            self.assertEqual(SERVICE, service)
-            self.assertEqual('test', nodeIdentifier)
-            self.assertEqual([OWNER], subscribers)
-            self.assertEqual(uri, redirectURI)
-            d1.callback(None)
-
-        d1 = defer.Deferred()
-        resource = backend.PubSubResourceFromBackend(TestBackend())
-        resource.serviceJID = SERVICE
-        resource.pubsubService = pubsub.PubSubService()
-        resource.pubsubService.notifyDelete = notifyDelete
-        self.assertTrue(verifyObject(iwokkel.IPubSubResource, resource))
-        self.assertNotIdentical(None, resource.backend.preDeleteFn)
-        data = {'nodeIdentifier': 'test',
-                'redirectURI': uri}
-        d2 = resource.backend.preDeleteFn(data)
-        return defer.DeferredList([d1, d2], fireOnOneErrback=1)
-
-
-    def test_unsubscribeNotSubscribed(self):
-        """
-        Test unsubscription request when not subscribed.
-        """
-
-        class TestBackend(BaseTestBackend):
-            def unsubscribe(self, nodeIdentifier, subscriber, requestor):
-                return defer.fail(error.NotSubscribed())
-
-        def cb(e):
-            self.assertEquals('unexpected-request', e.condition)
-
-        resource = backend.PubSubResourceFromBackend(TestBackend())
-        request = pubsub.PubSubRequest()
-        request.sender = OWNER
-        request.recipient = SERVICE
-        request.nodeIdentifier = 'test'
-        request.subscriber = OWNER
-        d = resource.unsubscribe(request)
-        self.assertFailure(d, StanzaError)
-        d.addCallback(cb)
-        return d
-
-
-    def test_getInfo(self):
-        """
-        Test retrieving node information.
-        """
-
-        class TestBackend(BaseTestBackend):
-            def getNodeType(self, nodeIdentifier):
-                return defer.succeed('leaf')
-
-            def getNodeMetaData(self, nodeIdentifier):
-                return defer.succeed({'pubsub#persist_items': True})
-
-        def cb(info):
-            self.assertIn('type', info)
-            self.assertEquals('leaf', info['type'])
-            self.assertIn('meta-data', info)
-            self.assertEquals({'pubsub#persist_items': True}, info['meta-data'])
-
-        resource = backend.PubSubResourceFromBackend(TestBackend())
-        d = resource.getInfo(OWNER, SERVICE, 'test')
-        d.addCallback(cb)
-        return d
-
-
-    def test_getConfigurationOptions(self):
-        class TestBackend(BaseTestBackend):
-            nodeOptions = {
-                    "pubsub#persist_items":
-                        {"type": "boolean",
-                         "label": "Persist items to storage"},
-                    "pubsub#deliver_payloads":
-                        {"type": "boolean",
-                         "label": "Deliver payloads with event notifications"}
-            }
-
-        resource = backend.PubSubResourceFromBackend(TestBackend())
-        options = resource.getConfigurationOptions()
-        self.assertIn("pubsub#persist_items", options)
-
-
-    def test_default(self):
-        class TestBackend(BaseTestBackend):
-            def getDefaultConfiguration(self, nodeType):
-                options = {"pubsub#persist_items": True,
-                           "pubsub#deliver_payloads": True,
-                           "pubsub#send_last_published_item": 'on_sub',
-                }
-                return defer.succeed(options)
-
-        def cb(options):
-            self.assertEquals(True, options["pubsub#persist_items"])
-
-        resource = backend.PubSubResourceFromBackend(TestBackend())
-        request = pubsub.PubSubRequest()
-        request.sender = OWNER
-        request.recipient = SERVICE
-        request.nodeType = 'leaf'
-        d = resource.default(request)
-        d.addCallback(cb)
-        return d
--- a/idavoll/test/test_gateway.py	Thu May 10 23:08:08 2012 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,398 +0,0 @@
-# Copyright (c) 2003-2009 Ralph Meijer
-# See LICENSE for details.
-
-"""
-Tests for L{idavoll.gateway}.
-
-Note that some tests are functional tests that require a running idavoll
-service.
-"""
-
-from twisted.internet import defer
-from twisted.trial import unittest
-from twisted.web import error
-from twisted.words.xish import domish
-
-from idavoll import gateway
-
-AGENT = "Idavoll Test Script"
-NS_ATOM = "http://www.w3.org/2005/Atom"
-
-TEST_ENTRY = domish.Element((NS_ATOM, 'entry'))
-TEST_ENTRY.addElement("id",
-                      content="urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a")
-TEST_ENTRY.addElement("title", content="Atom-Powered Robots Run Amok")
-TEST_ENTRY.addElement("author").addElement("name", content="John Doe")
-TEST_ENTRY.addElement("content", content="Some text.")
-
-baseURI = "http://localhost:8086/"
-componentJID = "pubsub"
-
-class GatewayTest(unittest.TestCase):
-    timeout = 2
-
-    def setUp(self):
-        self.client = gateway.GatewayClient(baseURI)
-        self.client.startService()
-        self.addCleanup(self.client.stopService)
-
-        def trapConnectionRefused(failure):
-            from twisted.internet.error import ConnectionRefusedError
-            failure.trap(ConnectionRefusedError)
-            raise unittest.SkipTest("Gateway to test against is not available")
-
-        def trapNotFound(failure):
-            from twisted.web.error import Error
-            failure.trap(Error)
-
-        d = self.client.ping()
-        d.addErrback(trapConnectionRefused)
-        d.addErrback(trapNotFound)
-        return d
-
-
-    def tearDown(self):
-        return self.client.stopService()
-
-
-    def test_create(self):
-
-        def cb(response):
-            self.assertIn('uri', response)
-
-        d = self.client.create()
-        d.addCallback(cb)
-        return d
-
-    def test_publish(self):
-
-        def cb(response):
-            self.assertIn('uri', response)
-
-        d = self.client.publish(TEST_ENTRY)
-        d.addCallback(cb)
-        return d
-
-    def test_publishExistingNode(self):
-
-        def cb2(response, xmppURI):
-            self.assertEquals(xmppURI, response['uri'])
-
-        def cb1(response):
-            xmppURI = response['uri']
-            d = self.client.publish(TEST_ENTRY, xmppURI)
-            d.addCallback(cb2, xmppURI)
-            return d
-
-        d = self.client.create()
-        d.addCallback(cb1)
-        return d
-
-    def test_publishNonExisting(self):
-        def cb(err):
-            self.assertEqual('404', err.status)
-
-        d = self.client.publish(TEST_ENTRY, 'xmpp:%s?node=test' % componentJID)
-        self.assertFailure(d, error.Error)
-        d.addCallback(cb)
-        return d
-
-    def test_delete(self):
-        def cb(response):
-            xmppURI = response['uri']
-            d = self.client.delete(xmppURI)
-            return d
-
-        d = self.client.create()
-        d.addCallback(cb)
-        return d
-
-    def test_deleteWithRedirect(self):
-        def cb(response):
-            xmppURI = response['uri']
-            redirectURI = 'xmpp:%s?node=test' % componentJID
-            d = self.client.delete(xmppURI, redirectURI)
-            return d
-
-        d = self.client.create()
-        d.addCallback(cb)
-        return d
-
-    def test_deleteNotification(self):
-        def onNotification(data, headers):
-            try:
-                self.assertTrue(headers.hasHeader('Event'))
-                self.assertEquals(['DELETED'], headers.getRawHeaders('Event'))
-                self.assertFalse(headers.hasHeader('Link'))
-            except:
-                self.client.deferred.errback()
-            else:
-                self.client.deferred.callback(None)
-
-        def cb(response):
-            xmppURI = response['uri']
-            d = self.client.subscribe(xmppURI)
-            d.addCallback(lambda _: xmppURI)
-            return d
-
-        def cb2(xmppURI):
-            d = self.client.delete(xmppURI)
-            return d
-
-        self.client.callback = onNotification
-        self.client.deferred = defer.Deferred()
-        d = self.client.create()
-        d.addCallback(cb)
-        d.addCallback(cb2)
-        return defer.gatherResults([d, self.client.deferred])
-
-    def test_deleteNotificationWithRedirect(self):
-        redirectURI = 'xmpp:%s?node=test' % componentJID
-
-        def onNotification(data, headers):
-            try:
-                self.assertTrue(headers.hasHeader('Event'))
-                self.assertEquals(['DELETED'], headers.getRawHeaders('Event'))
-                self.assertEquals(['<%s>; rel=alternate' % redirectURI],
-                                  headers.getRawHeaders('Link'))
-            except:
-                self.client.deferred.errback()
-            else:
-                self.client.deferred.callback(None)
-
-        def cb(response):
-            xmppURI = response['uri']
-            d = self.client.subscribe(xmppURI)
-            d.addCallback(lambda _: xmppURI)
-            return d
-
-        def cb2(xmppURI):
-            d = self.client.delete(xmppURI, redirectURI)
-            return d
-
-        self.client.callback = onNotification
-        self.client.deferred = defer.Deferred()
-        d = self.client.create()
-        d.addCallback(cb)
-        d.addCallback(cb2)
-        return defer.gatherResults([d, self.client.deferred])
-
-    def test_list(self):
-        d = self.client.listNodes()
-        return d
-
-    def test_subscribe(self):
-        def cb(response):
-            xmppURI = response['uri']
-            d = self.client.subscribe(xmppURI)
-            return d
-
-        d = self.client.create()
-        d.addCallback(cb)
-        return d
-
-    def test_subscribeGetNotification(self):
-
-        def onNotification(data, headers):
-            self.client.deferred.callback(None)
-
-        def cb(response):
-            xmppURI = response['uri']
-            d = self.client.subscribe(xmppURI)
-            d.addCallback(lambda _: xmppURI)
-            return d
-
-        def cb2(xmppURI):
-            d = self.client.publish(TEST_ENTRY, xmppURI)
-            return d
-
-
-        self.client.callback = onNotification
-        self.client.deferred = defer.Deferred()
-        d = self.client.create()
-        d.addCallback(cb)
-        d.addCallback(cb2)
-        return defer.gatherResults([d, self.client.deferred])
-
-
-    def test_subscribeTwiceGetNotification(self):
-
-        def onNotification1(data, headers):
-            d = client1.stopService()
-            d.chainDeferred(client1.deferred)
-
-        def onNotification2(data, headers):
-            d = client2.stopService()
-            d.chainDeferred(client2.deferred)
-
-        def cb(response):
-            xmppURI = response['uri']
-            d = client1.subscribe(xmppURI)
-            d.addCallback(lambda _: xmppURI)
-            return d
-
-        def cb2(xmppURI):
-            d = client2.subscribe(xmppURI)
-            d.addCallback(lambda _: xmppURI)
-            return d
-
-        def cb3(xmppURI):
-            d = self.client.publish(TEST_ENTRY, xmppURI)
-            return d
-
-
-        client1 = gateway.GatewayClient(baseURI, callbackPort=8088)
-        client1.startService()
-        client1.callback = onNotification1
-        client1.deferred = defer.Deferred()
-        client2 = gateway.GatewayClient(baseURI, callbackPort=8089)
-        client2.startService()
-        client2.callback = onNotification2
-        client2.deferred = defer.Deferred()
-
-        d = self.client.create()
-        d.addCallback(cb)
-        d.addCallback(cb2)
-        d.addCallback(cb3)
-        dl = defer.gatherResults([d, client1.deferred, client2.deferred])
-        return dl
-
-
-    def test_subscribeGetDelayedNotification(self):
-
-        def onNotification(data, headers):
-            self.client.deferred.callback(None)
-
-        def cb(response):
-            xmppURI = response['uri']
-            self.assertNot(self.client.deferred.called)
-            d = self.client.publish(TEST_ENTRY, xmppURI)
-            d.addCallback(lambda _: xmppURI)
-            return d
-
-        def cb2(xmppURI):
-            d = self.client.subscribe(xmppURI)
-            return d
-
-
-        self.client.callback = onNotification
-        self.client.deferred = defer.Deferred()
-        d = self.client.create()
-        d.addCallback(cb)
-        d.addCallback(cb2)
-        return defer.gatherResults([d, self.client.deferred])
-
-    def test_subscribeGetDelayedNotification2(self):
-        """
-        Test that subscribing as second results in a notification being sent.
-        """
-
-        def onNotification1(data, headers):
-            client1.deferred.callback(None)
-            client1.stopService()
-
-        def onNotification2(data, headers):
-            client2.deferred.callback(None)
-            client2.stopService()
-
-        def cb(response):
-            xmppURI = response['uri']
-            self.assertNot(client1.deferred.called)
-            self.assertNot(client2.deferred.called)
-            d = self.client.publish(TEST_ENTRY, xmppURI)
-            d.addCallback(lambda _: xmppURI)
-            return d
-
-        def cb2(xmppURI):
-            d = client1.subscribe(xmppURI)
-            d.addCallback(lambda _: xmppURI)
-            return d
-
-        def cb3(xmppURI):
-            d = client2.subscribe(xmppURI)
-            return d
-
-        client1 = gateway.GatewayClient(baseURI, callbackPort=8088)
-        client1.startService()
-        client1.callback = onNotification1
-        client1.deferred = defer.Deferred()
-        client2 = gateway.GatewayClient(baseURI, callbackPort=8089)
-        client2.startService()
-        client2.callback = onNotification2
-        client2.deferred = defer.Deferred()
-
-
-        d = self.client.create()
-        d.addCallback(cb)
-        d.addCallback(cb2)
-        d.addCallback(cb3)
-        dl = defer.gatherResults([d, client1.deferred, client2.deferred])
-        return dl
-
-
-    def test_subscribeNonExisting(self):
-        def cb(err):
-            self.assertEqual('403', err.status)
-
-        d = self.client.subscribe('xmpp:%s?node=test' % componentJID)
-        self.assertFailure(d, error.Error)
-        d.addCallback(cb)
-        return d
-
-
-    def test_subscribeRootGetNotification(self):
-
-        def onNotification(data, headers):
-            self.client.deferred.callback(None)
-
-        def cb(response):
-            xmppURI = response['uri']
-            jid, nodeIdentifier = gateway.getServiceAndNode(xmppURI)
-            rootNode = gateway.getXMPPURI(jid, '')
-
-            d = self.client.subscribe(rootNode)
-            d.addCallback(lambda _: xmppURI)
-            return d
-
-        def cb2(xmppURI):
-            return self.client.publish(TEST_ENTRY, xmppURI)
-
-
-        self.client.callback = onNotification
-        self.client.deferred = defer.Deferred()
-        d = self.client.create()
-        d.addCallback(cb)
-        d.addCallback(cb2)
-        return defer.gatherResults([d, self.client.deferred])
-
-
-    def test_unsubscribeNonExisting(self):
-        def cb(err):
-            self.assertEqual('403', err.status)
-
-        d = self.client.unsubscribe('xmpp:%s?node=test' % componentJID)
-        self.assertFailure(d, error.Error)
-        d.addCallback(cb)
-        return d
-
-
-    def test_items(self):
-        def cb(response):
-            xmppURI = response['uri']
-            d = self.client.items(xmppURI)
-            return d
-
-        d = self.client.publish(TEST_ENTRY)
-        d.addCallback(cb)
-        return d
-
-
-    def test_itemsMaxItems(self):
-        def cb(response):
-            xmppURI = response['uri']
-            d = self.client.items(xmppURI, 2)
-            return d
-
-        d = self.client.publish(TEST_ENTRY)
-        d.addCallback(cb)
-        return d
--- a/idavoll/test/test_storage.py	Thu May 10 23:08:08 2012 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,585 +0,0 @@
-# Copyright (c) 2003-2010 Ralph Meijer
-# See LICENSE for details.
-
-"""
-Tests for L{idavoll.memory_storage} and L{idavoll.pgsql_storage}.
-"""
-
-from zope.interface.verify import verifyObject
-from twisted.trial import unittest
-from twisted.words.protocols.jabber import jid
-from twisted.internet import defer
-from twisted.words.xish import domish
-
-from idavoll import error, iidavoll
-
-OWNER = jid.JID('owner@example.com/Work')
-SUBSCRIBER = jid.JID('subscriber@example.com/Home')
-SUBSCRIBER_NEW = jid.JID('new@example.com/Home')
-SUBSCRIBER_TO_BE_DELETED = jid.JID('to_be_deleted@example.com/Home')
-SUBSCRIBER_PENDING = jid.JID('pending@example.com/Home')
-PUBLISHER = jid.JID('publisher@example.com')
-ITEM = domish.Element((None, 'item'))
-ITEM['id'] = 'current'
-ITEM.addElement(('testns', 'test'), content=u'Test \u2083 item')
-ITEM_NEW = domish.Element((None, 'item'))
-ITEM_NEW['id'] = 'new'
-ITEM_NEW.addElement(('testns', 'test'), content=u'Test \u2083 item')
-ITEM_UPDATED = domish.Element((None, 'item'))
-ITEM_UPDATED['id'] = 'current'
-ITEM_UPDATED.addElement(('testns', 'test'), content=u'Test \u2084 item')
-ITEM_TO_BE_DELETED = domish.Element((None, 'item'))
-ITEM_TO_BE_DELETED['id'] = 'to-be-deleted'
-ITEM_TO_BE_DELETED.addElement(('testns', 'test'), content=u'Test \u2083 item')
-
-def decode(object):
-    if isinstance(object, str):
-        object = object.decode('utf-8')
-    return object
-
-
-
-class StorageTests:
-
-    def _assignTestNode(self, node):
-        self.node = node
-
-
-    def setUp(self):
-        d = self.s.getNode('pre-existing')
-        d.addCallback(self._assignTestNode)
-        return d
-
-
-    def test_interfaceIStorage(self):
-        self.assertTrue(verifyObject(iidavoll.IStorage, self.s))
-
-
-    def test_interfaceINode(self):
-        self.assertTrue(verifyObject(iidavoll.INode, self.node))
-
-
-    def test_interfaceILeafNode(self):
-        self.assertTrue(verifyObject(iidavoll.ILeafNode, self.node))
-
-
-    def test_getNode(self):
-        return self.s.getNode('pre-existing')
-
-
-    def test_getNonExistingNode(self):
-        d = self.s.getNode('non-existing')
-        self.assertFailure(d, error.NodeNotFound)
-        return d
-
-
-    def test_getNodeIDs(self):
-        def cb(nodeIdentifiers):
-            self.assertIn('pre-existing', nodeIdentifiers)
-            self.assertNotIn('non-existing', nodeIdentifiers)
-
-        return self.s.getNodeIds().addCallback(cb)
-
-
-    def test_createExistingNode(self):
-        config = self.s.getDefaultConfiguration('leaf')
-        config['pubsub#node_type'] = 'leaf'
-        d = self.s.createNode('pre-existing', OWNER, config)
-        self.assertFailure(d, error.NodeExists)
-        return d
-
-
-    def test_createNode(self):
-        def cb(void):
-            d = self.s.getNode('new 1')
-            return d
-
-        config = self.s.getDefaultConfiguration('leaf')
-        config['pubsub#node_type'] = 'leaf'
-        d = self.s.createNode('new 1', OWNER, config)
-        d.addCallback(cb)
-        return d
-
-
-    def test_createNodeChangingConfig(self):
-        """
-        The configuration passed to createNode must be free to be changed.
-        """
-        def cb(result):
-            node1, node2 = result
-            self.assertTrue(node1.getConfiguration()['pubsub#persist_items'])
-
-        config = {
-                "pubsub#persist_items": True,
-                "pubsub#deliver_payloads": True,
-                "pubsub#send_last_published_item": 'on_sub',
-                "pubsub#node_type": 'leaf',
-                }
-
-        def unsetPersistItems(_):
-            config["pubsub#persist_items"] = False
-
-        d = defer.succeed(None)
-        d.addCallback(lambda _: self.s.createNode('new 1', OWNER, config))
-        d.addCallback(unsetPersistItems)
-        d.addCallback(lambda _: self.s.createNode('new 2', OWNER, config))
-        d.addCallback(lambda _: defer.gatherResults([
-                                    self.s.getNode('new 1'),
-                                    self.s.getNode('new 2')]))
-        d.addCallback(cb)
-        return d
-
-
-    def test_deleteNonExistingNode(self):
-        d = self.s.deleteNode('non-existing')
-        self.assertFailure(d, error.NodeNotFound)
-        return d
-
-
-    def test_deleteNode(self):
-        def cb(void):
-            d = self.s.getNode('to-be-deleted')
-            self.assertFailure(d, error.NodeNotFound)
-            return d
-
-        d = self.s.deleteNode('to-be-deleted')
-        d.addCallback(cb)
-        return d
-
-
-    def test_getAffiliations(self):
-        def cb(affiliations):
-            self.assertIn(('pre-existing', 'owner'), affiliations)
-
-        d = self.s.getAffiliations(OWNER)
-        d.addCallback(cb)
-        return d
-
-
-    def test_getSubscriptions(self):
-        def cb(subscriptions):
-            found = False
-            for subscription in subscriptions:
-                if (subscription.nodeIdentifier == 'pre-existing' and
-                    subscription.subscriber == SUBSCRIBER and
-                    subscription.state == 'subscribed'):
-                    found = True
-            self.assertTrue(found)
-
-        d = self.s.getSubscriptions(SUBSCRIBER)
-        d.addCallback(cb)
-        return d
-
-
-    # Node tests
-
-    def test_getType(self):
-        self.assertEqual(self.node.getType(), 'leaf')
-
-
-    def test_getConfiguration(self):
-        config = self.node.getConfiguration()
-        self.assertIn('pubsub#persist_items', config.iterkeys())
-        self.assertIn('pubsub#deliver_payloads', config.iterkeys())
-        self.assertEqual(config['pubsub#persist_items'], True)
-        self.assertEqual(config['pubsub#deliver_payloads'], True)
-
-
-    def test_setConfiguration(self):
-        def getConfig(node):
-            d = node.setConfiguration({'pubsub#persist_items': False})
-            d.addCallback(lambda _: node)
-            return d
-
-        def checkObjectConfig(node):
-            config = node.getConfiguration()
-            self.assertEqual(config['pubsub#persist_items'], False)
-
-        def getNode(void):
-            return self.s.getNode('to-be-reconfigured')
-
-        def checkStorageConfig(node):
-            config = node.getConfiguration()
-            self.assertEqual(config['pubsub#persist_items'], False)
-
-        d = self.s.getNode('to-be-reconfigured')
-        d.addCallback(getConfig)
-        d.addCallback(checkObjectConfig)
-        d.addCallback(getNode)
-        d.addCallback(checkStorageConfig)
-        return d
-
-
-    def test_getMetaData(self):
-        metaData = self.node.getMetaData()
-        for key, value in self.node.getConfiguration().iteritems():
-            self.assertIn(key, metaData.iterkeys())
-            self.assertEqual(value, metaData[key])
-        self.assertIn('pubsub#node_type', metaData.iterkeys())
-        self.assertEqual(metaData['pubsub#node_type'], 'leaf')
-
-
-    def test_getAffiliation(self):
-        def cb(affiliation):
-            self.assertEqual(affiliation, 'owner')
-
-        d = self.node.getAffiliation(OWNER)
-        d.addCallback(cb)
-        return d
-
-
-    def test_getNonExistingAffiliation(self):
-        def cb(affiliation):
-            self.assertEqual(affiliation, None)
-
-        d = self.node.getAffiliation(SUBSCRIBER)
-        d.addCallback(cb)
-        return d
-
-
-    def test_addSubscription(self):
-        def cb1(void):
-            return self.node.getSubscription(SUBSCRIBER_NEW)
-
-        def cb2(subscription):
-            self.assertEqual(subscription.state, 'pending')
-
-        d = self.node.addSubscription(SUBSCRIBER_NEW, 'pending', {})
-        d.addCallback(cb1)
-        d.addCallback(cb2)
-        return d
-
-
-    def test_addExistingSubscription(self):
-        d = self.node.addSubscription(SUBSCRIBER, 'pending', {})
-        self.assertFailure(d, error.SubscriptionExists)
-        return d
-
-
-    def test_getSubscription(self):
-        def cb(subscriptions):
-            self.assertEquals(subscriptions[0].state, 'subscribed')
-            self.assertEquals(subscriptions[1].state, 'pending')
-            self.assertEquals(subscriptions[2], None)
-
-        d = defer.gatherResults([self.node.getSubscription(SUBSCRIBER),
-                                 self.node.getSubscription(SUBSCRIBER_PENDING),
-                                 self.node.getSubscription(OWNER)])
-        d.addCallback(cb)
-        return d
-
-
-    def test_removeSubscription(self):
-        return self.node.removeSubscription(SUBSCRIBER_TO_BE_DELETED)
-
-
-    def test_removeNonExistingSubscription(self):
-        d = self.node.removeSubscription(OWNER)
-        self.assertFailure(d, error.NotSubscribed)
-        return d
-
-
-    def test_getNodeSubscriptions(self):
-        def extractSubscribers(subscriptions):
-            return [subscription.subscriber for subscription in subscriptions]
-
-        def cb(subscribers):
-            self.assertIn(SUBSCRIBER, subscribers)
-            self.assertNotIn(SUBSCRIBER_PENDING, subscribers)
-            self.assertNotIn(OWNER, subscribers)
-
-        d = self.node.getSubscriptions('subscribed')
-        d.addCallback(extractSubscribers)
-        d.addCallback(cb)
-        return d
-
-
-    def test_isSubscriber(self):
-        def cb(subscribed):
-            self.assertEquals(subscribed[0][1], True)
-            self.assertEquals(subscribed[1][1], True)
-            self.assertEquals(subscribed[2][1], False)
-            self.assertEquals(subscribed[3][1], False)
-
-        d = defer.DeferredList([self.node.isSubscribed(SUBSCRIBER),
-                                self.node.isSubscribed(SUBSCRIBER.userhostJID()),
-                                self.node.isSubscribed(SUBSCRIBER_PENDING),
-                                self.node.isSubscribed(OWNER)])
-        d.addCallback(cb)
-        return d
-
-
-    def test_storeItems(self):
-        def cb1(void):
-            return self.node.getItemsById(['new'])
-
-        def cb2(result):
-            self.assertEqual(ITEM_NEW.toXml(), result[0].toXml())
-
-        d = self.node.storeItems([ITEM_NEW], PUBLISHER)
-        d.addCallback(cb1)
-        d.addCallback(cb2)
-        return d
-
-
-    def test_storeUpdatedItems(self):
-        def cb1(void):
-            return self.node.getItemsById(['current'])
-
-        def cb2(result):
-            self.assertEqual(ITEM_UPDATED.toXml(), result[0].toXml())
-
-        d = self.node.storeItems([ITEM_UPDATED], PUBLISHER)
-        d.addCallback(cb1)
-        d.addCallback(cb2)
-        return d
-
-
-    def test_removeItems(self):
-        def cb1(result):
-            self.assertEqual(['to-be-deleted'], result)
-            return self.node.getItemsById(['to-be-deleted'])
-
-        def cb2(result):
-            self.assertEqual(0, len(result))
-
-        d = self.node.removeItems(['to-be-deleted'])
-        d.addCallback(cb1)
-        d.addCallback(cb2)
-        return d
-
-
-    def test_removeNonExistingItems(self):
-        def cb(result):
-            self.assertEqual([], result)
-
-        d = self.node.removeItems(['non-existing'])
-        d.addCallback(cb)
-        return d
-
-
-    def test_getItems(self):
-        def cb(result):
-            items = [item.toXml() for item in result]
-            self.assertIn(ITEM.toXml(), items)
-
-        d = self.node.getItems()
-        d.addCallback(cb)
-        return d
-
-
-    def test_lastItem(self):
-        def cb(result):
-            self.assertEqual(1, len(result))
-            self.assertEqual(ITEM.toXml(), result[0].toXml())
-
-        d = self.node.getItems(1)
-        d.addCallback(cb)
-        return d
-
-
-    def test_getItemsById(self):
-        def cb(result):
-            self.assertEqual(1, len(result))
-
-        d = self.node.getItemsById(['current'])
-        d.addCallback(cb)
-        return d
-
-
-    def test_getNonExistingItemsById(self):
-        def cb(result):
-            self.assertEqual(0, len(result))
-
-        d = self.node.getItemsById(['non-existing'])
-        d.addCallback(cb)
-        return d
-
-
-    def test_purge(self):
-        def cb1(node):
-            d = node.purge()
-            d.addCallback(lambda _: node)
-            return d
-
-        def cb2(node):
-            return node.getItems()
-
-        def cb3(result):
-            self.assertEqual([], result)
-
-        d = self.s.getNode('to-be-purged')
-        d.addCallback(cb1)
-        d.addCallback(cb2)
-        d.addCallback(cb3)
-        return d
-
-
-    def test_getNodeAffilatiations(self):
-        def cb1(node):
-            return node.getAffiliations()
-
-        def cb2(affiliations):
-            affiliations = dict(((a[0].full(), a[1]) for a in affiliations))
-            self.assertEquals(affiliations[OWNER.userhost()], 'owner')
-
-        d = self.s.getNode('pre-existing')
-        d.addCallback(cb1)
-        d.addCallback(cb2)
-        return d
-
-
-
-class MemoryStorageStorageTestCase(unittest.TestCase, StorageTests):
-
-    def setUp(self):
-        from idavoll.memory_storage import Storage, PublishedItem, LeafNode
-        from idavoll.memory_storage import Subscription
-
-        defaultConfig = Storage.defaultConfig['leaf']
-
-        self.s = Storage()
-        self.s._nodes['pre-existing'] = \
-                LeafNode('pre-existing', OWNER, defaultConfig)
-        self.s._nodes['to-be-deleted'] = \
-                LeafNode('to-be-deleted', OWNER, None)
-        self.s._nodes['to-be-reconfigured'] = \
-                LeafNode('to-be-reconfigured', OWNER, defaultConfig)
-        self.s._nodes['to-be-purged'] = \
-                LeafNode('to-be-purged', OWNER, None)
-
-        subscriptions = self.s._nodes['pre-existing']._subscriptions
-        subscriptions[SUBSCRIBER.full()] = Subscription('pre-existing',
-                                                        SUBSCRIBER,
-                                                        'subscribed')
-        subscriptions[SUBSCRIBER_TO_BE_DELETED.full()] = \
-                Subscription('pre-existing', SUBSCRIBER_TO_BE_DELETED,
-                             'subscribed')
-        subscriptions[SUBSCRIBER_PENDING.full()] = \
-                Subscription('pre-existing', SUBSCRIBER_PENDING,
-                             'pending')
-
-        item = PublishedItem(ITEM_TO_BE_DELETED, PUBLISHER)
-        self.s._nodes['pre-existing']._items['to-be-deleted'] = item
-        self.s._nodes['pre-existing']._itemlist.append(item)
-        self.s._nodes['to-be-purged']._items['to-be-deleted'] = item
-        self.s._nodes['to-be-purged']._itemlist.append(item)
-        item = PublishedItem(ITEM, PUBLISHER)
-        self.s._nodes['pre-existing']._items['current'] = item
-        self.s._nodes['pre-existing']._itemlist.append(item)
-
-        return StorageTests.setUp(self)
-
-
-
-class PgsqlStorageStorageTestCase(unittest.TestCase, StorageTests):
-
-    dbpool = None
-
-    def setUp(self):
-        from idavoll.pgsql_storage import Storage
-        from twisted.enterprise import adbapi
-        if self.dbpool is None:
-            self.__class__.dbpool = adbapi.ConnectionPool('psycopg2',
-                                            database='pubsub_test',
-                                            cp_reconnect=True,
-                                            client_encoding='utf-8',
-                                            )
-        self.s = Storage(self.dbpool)
-        self.dbpool.start()
-        d = self.dbpool.runInteraction(self.init)
-        d.addCallback(lambda _: StorageTests.setUp(self))
-        return d
-
-
-    def tearDown(self):
-        return self.dbpool.runInteraction(self.cleandb)
-
-
-    def init(self, cursor):
-        self.cleandb(cursor)
-        cursor.execute("""INSERT INTO nodes
-                          (node, node_type, persist_items)
-                          VALUES ('pre-existing', 'leaf', TRUE)""")
-        cursor.execute("""INSERT INTO nodes (node) VALUES ('to-be-deleted')""")
-        cursor.execute("""INSERT INTO nodes (node) VALUES ('to-be-reconfigured')""")
-        cursor.execute("""INSERT INTO nodes (node) VALUES ('to-be-purged')""")
-        cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
-                       (OWNER.userhost(),))
-        cursor.execute("""INSERT INTO affiliations
-                          (node_id, entity_id, affiliation)
-                          SELECT node_id, entity_id, 'owner'
-                          FROM nodes, entities
-                          WHERE node='pre-existing' AND jid=%s""",
-                       (OWNER.userhost(),))
-        cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
-                       (SUBSCRIBER.userhost(),))
-        cursor.execute("""INSERT INTO subscriptions
-                          (node_id, entity_id, resource, state)
-                          SELECT node_id, entity_id, %s, 'subscribed'
-                          FROM nodes, entities
-                          WHERE node='pre-existing' AND jid=%s""",
-                       (SUBSCRIBER.resource,
-                        SUBSCRIBER.userhost()))
-        cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
-                       (SUBSCRIBER_TO_BE_DELETED.userhost(),))
-        cursor.execute("""INSERT INTO subscriptions
-                          (node_id, entity_id, resource, state)
-                          SELECT node_id, entity_id, %s, 'subscribed'
-                          FROM nodes, entities
-                          WHERE node='pre-existing' AND jid=%s""",
-                       (SUBSCRIBER_TO_BE_DELETED.resource,
-                        SUBSCRIBER_TO_BE_DELETED.userhost()))
-        cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
-                       (SUBSCRIBER_PENDING.userhost(),))
-        cursor.execute("""INSERT INTO subscriptions
-                          (node_id, entity_id, resource, state)
-                          SELECT node_id, entity_id, %s, 'pending'
-                          FROM nodes, entities
-                          WHERE node='pre-existing' AND jid=%s""",
-                       (SUBSCRIBER_PENDING.resource,
-                        SUBSCRIBER_PENDING.userhost()))
-        cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
-                       (PUBLISHER.userhost(),))
-        cursor.execute("""INSERT INTO items
-                          (node_id, publisher, item, data, date)
-                          SELECT node_id, %s, 'to-be-deleted', %s,
-                                 now() - interval '1 day'
-                          FROM nodes
-                          WHERE node='pre-existing'""",
-                       (PUBLISHER.userhost(),
-                        ITEM_TO_BE_DELETED.toXml()))
-        cursor.execute("""INSERT INTO items (node_id, publisher, item, data)
-                          SELECT node_id, %s, 'to-be-deleted', %s
-                          FROM nodes
-                          WHERE node='to-be-purged'""",
-                       (PUBLISHER.userhost(),
-                        ITEM_TO_BE_DELETED.toXml()))
-        cursor.execute("""INSERT INTO items (node_id, publisher, item, data)
-                          SELECT node_id, %s, 'current', %s
-                          FROM nodes
-                          WHERE node='pre-existing'""",
-                       (PUBLISHER.userhost(),
-                        ITEM.toXml()))
-
-
-    def cleandb(self, cursor):
-        cursor.execute("""DELETE FROM nodes WHERE node in
-                          ('non-existing', 'pre-existing', 'to-be-deleted',
-                           'new 1', 'new 2', 'new 3', 'to-be-reconfigured',
-                           'to-be-purged')""")
-        cursor.execute("""DELETE FROM entities WHERE jid=%s""",
-                       (OWNER.userhost(),))
-        cursor.execute("""DELETE FROM entities WHERE jid=%s""",
-                       (SUBSCRIBER.userhost(),))
-        cursor.execute("""DELETE FROM entities WHERE jid=%s""",
-                       (SUBSCRIBER_TO_BE_DELETED.userhost(),))
-        cursor.execute("""DELETE FROM entities WHERE jid=%s""",
-                       (SUBSCRIBER_PENDING.userhost(),))
-        cursor.execute("""DELETE FROM entities WHERE jid=%s""",
-                       (PUBLISHER.userhost(),))
-
-try:
-    import psycopg2
-except ImportError:
-    PgsqlStorageStorageTestCase.skip = "Psycopg2 not available"
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat_pubsub/__init__.py	Thu May 17 12:48:14 2012 +0200
@@ -0,0 +1,8 @@
+# Copyright (c) 2003-2007 Ralph Meijer
+# See LICENSE for details.
+
+"""
+Idavoll, a generic XMPP publish-subscribe service.
+"""
+
+__version__ = '0.9.1' 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat_pubsub/backend.py	Thu May 17 12:48:14 2012 +0200
@@ -0,0 +1,730 @@
+# -*- test-case-name: idavoll.test.test_backend -*-
+#
+# Copyright (c) 2003-2010 Ralph Meijer
+# See LICENSE for details.
+
+"""
+Generic publish-subscribe backend.
+
+This module implements a generic publish-subscribe backend service with
+business logic as per
+U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>} that interacts with
+a given storage facility. It also provides an adapter from the XMPP
+publish-subscribe protocol.
+"""
+
+import uuid
+
+from zope.interface import implements
+
+from twisted.application import service
+from twisted.python import components, log
+from twisted.internet import defer, reactor
+from twisted.words.protocols.jabber.error import StanzaError
+from twisted.words.xish import utility
+
+from wokkel import disco
+from wokkel.iwokkel import IPubSubResource
+from wokkel.pubsub import PubSubResource, PubSubError
+
+from idavoll import error, iidavoll
+from idavoll.iidavoll import IBackendService, ILeafNode
+
+def _getAffiliation(node, entity):
+    d = node.getAffiliation(entity)
+    d.addCallback(lambda affiliation: (node, affiliation))
+    return d
+
+
+
+class BackendService(service.Service, utility.EventDispatcher):
+    """
+    Generic publish-subscribe backend service.
+
+    @cvar nodeOptions: Node configuration form as a mapping from the field
+                       name to a dictionary that holds the field's type, label
+                       and possible options to choose from.
+    @type nodeOptions: C{dict}.
+    @cvar defaultConfig: The default node configuration.
+    """
+
+    implements(iidavoll.IBackendService)
+
+    nodeOptions = {
+            "pubsub#persist_items":
+                {"type": "boolean",
+                 "label": "Persist items to storage"},
+            "pubsub#deliver_payloads":
+                {"type": "boolean",
+                 "label": "Deliver payloads with event notifications"},
+            "pubsub#send_last_published_item":
+                {"type": "list-single",
+                 "label": "When to send the last published item",
+                 "options": {
+                     "never": "Never",
+                     "on_sub": "When a new subscription is processed"}
+                },
+            }
+
+    subscriptionOptions = {
+            "pubsub#subscription_type":
+                {"type": "list-single",
+                 "options": {
+                     "items": "Receive notification of new items only",
+                     "nodes": "Receive notification of new nodes only"}
+                },
+            "pubsub#subscription_depth":
+                {"type": "list-single",
+                 "options": {
+                     "1": "Receive notification from direct child nodes only",
+                     "all": "Receive notification from all descendent nodes"}
+                },
+            }
+
+    def __init__(self, storage):
+        utility.EventDispatcher.__init__(self)
+        self.storage = storage
+        self._callbackList = []
+
+
+    def supportsPublisherAffiliation(self):
+        return True
+
+
+    def supportsOutcastAffiliation(self):
+        return True
+
+
+    def supportsPersistentItems(self):
+        return True
+
+
+    def getNodeType(self, nodeIdentifier):
+        d = self.storage.getNode(nodeIdentifier)
+        d.addCallback(lambda node: node.getType())
+        return d
+
+
+    def getNodes(self):
+        return self.storage.getNodeIds()
+
+
+    def getNodeMetaData(self, nodeIdentifier):
+        d = self.storage.getNode(nodeIdentifier)
+        d.addCallback(lambda node: node.getMetaData())
+        d.addCallback(self._makeMetaData)
+        return d
+
+
+    def _makeMetaData(self, metaData):
+        options = []
+        for key, value in metaData.iteritems():
+            if key in self.nodeOptions:
+                option = {"var": key}
+                option.update(self.nodeOptions[key])
+                option["value"] = value
+                options.append(option)
+
+        return options
+
+
+    def _checkAuth(self, node, requestor):
+        def check(affiliation, node):
+            if affiliation not in ['owner', 'publisher']:
+                raise error.Forbidden()
+            return node
+
+        d = node.getAffiliation(requestor)
+        d.addCallback(check, node)
+        return d
+
+
+    def publish(self, nodeIdentifier, items, requestor):
+        d = self.storage.getNode(nodeIdentifier)
+        d.addCallback(self._checkAuth, requestor)
+        d.addCallback(self._doPublish, items, requestor)
+        return d
+
+
+    def _doPublish(self, node, items, requestor):
+        if node.nodeType == 'collection':
+            raise error.NoPublishing()
+
+        configuration = node.getConfiguration()
+        persistItems = configuration["pubsub#persist_items"]
+        deliverPayloads = configuration["pubsub#deliver_payloads"]
+
+        if items and not persistItems and not deliverPayloads:
+            raise error.ItemForbidden()
+        elif not items and (persistItems or deliverPayloads):
+            raise error.ItemRequired()
+
+        if persistItems or deliverPayloads:
+            for item in items:
+                item.uri = None
+                item.defaultUri = None
+                if not item.getAttribute("id"):
+                    item["id"] = str(uuid.uuid4())
+
+        if persistItems:
+            d = node.storeItems(items, requestor)
+        else:
+            d = defer.succeed(None)
+
+        d.addCallback(self._doNotify, node.nodeIdentifier, items,
+                      deliverPayloads)
+        return d
+
+
+    def _doNotify(self, result, nodeIdentifier, items, deliverPayloads):
+        if items and not deliverPayloads:
+            for item in items:
+                item.children = []
+
+        self.dispatch({'items': items, 'nodeIdentifier': nodeIdentifier},
+                      '//event/pubsub/notify')
+
+
+    def getNotifications(self, nodeIdentifier, items):
+
+        def toNotifications(subscriptions, nodeIdentifier, items):
+            subsBySubscriber = {}
+            for subscription in subscriptions:
+                if subscription.options.get('pubsub#subscription_type',
+                                            'items') == 'items':
+                    subs = subsBySubscriber.setdefault(subscription.subscriber,
+                                                       set())
+                    subs.add(subscription)
+
+            notifications = [(subscriber, subscriptions, items)
+                             for subscriber, subscriptions
+                             in subsBySubscriber.iteritems()]
+
+            return notifications
+
+        def rootNotFound(failure):
+            failure.trap(error.NodeNotFound)
+            return []
+
+        d1 = self.storage.getNode(nodeIdentifier)
+        d1.addCallback(lambda node: node.getSubscriptions('subscribed'))
+        d2 = self.storage.getNode('')
+        d2.addCallback(lambda node: node.getSubscriptions('subscribed'))
+        d2.addErrback(rootNotFound)
+        d = defer.gatherResults([d1, d2])
+        d.addCallback(lambda result: result[0] + result[1])
+        d.addCallback(toNotifications, nodeIdentifier, items)
+        return d
+
+
+    def registerNotifier(self, observerfn, *args, **kwargs):
+        self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs)
+
+
+    def subscribe(self, nodeIdentifier, subscriber, requestor):
+        subscriberEntity = subscriber.userhostJID()
+        if subscriberEntity != requestor.userhostJID():
+            return defer.fail(error.Forbidden())
+
+        d = self.storage.getNode(nodeIdentifier)
+        d.addCallback(_getAffiliation, subscriberEntity)
+        d.addCallback(self._doSubscribe, subscriber)
+        return d
+
+
+    def _doSubscribe(self, result, subscriber):
+        node, affiliation = result
+
+        if affiliation == 'outcast':
+            raise error.Forbidden()
+
+        def trapExists(failure):
+            failure.trap(error.SubscriptionExists)
+            return False
+
+        def cb(sendLast):
+            d = node.getSubscription(subscriber)
+            if sendLast:
+                d.addCallback(self._sendLastPublished, node)
+            return d
+
+        d = node.addSubscription(subscriber, 'subscribed', {})
+        d.addCallbacks(lambda _: True, trapExists)
+        d.addCallback(cb)
+        return d
+
+
+    def _sendLastPublished(self, subscription, node):
+
+        def notifyItem(items):
+            if items:
+                reactor.callLater(0, self.dispatch,
+                                     {'items': items,
+                                      'nodeIdentifier': node.nodeIdentifier,
+                                      'subscription': subscription},
+                                     '//event/pubsub/notify')
+
+        config = node.getConfiguration()
+        sendLastPublished = config.get('pubsub#send_last_published_item',
+                                       'never')
+        if sendLastPublished == 'on_sub' and node.nodeType == 'leaf':
+            entity = subscription.subscriber.userhostJID()
+            d = self.getItems(node.nodeIdentifier, entity, 1)
+            d.addCallback(notifyItem)
+            d.addErrback(log.err)
+
+        return subscription
+
+
+    def unsubscribe(self, nodeIdentifier, subscriber, requestor):
+        if subscriber.userhostJID() != requestor.userhostJID():
+            return defer.fail(error.Forbidden())
+
+        d = self.storage.getNode(nodeIdentifier)
+        d.addCallback(lambda node: node.removeSubscription(subscriber))
+        return d
+
+
+    def getSubscriptions(self, entity):
+        return self.storage.getSubscriptions(entity)
+
+    def supportsAutoCreate(self):
+        return True
+
+    def supportsInstantNodes(self):
+        return True
+
+
+    def createNode(self, nodeIdentifier, requestor):
+        if not nodeIdentifier:
+            nodeIdentifier = 'generic/%s' % uuid.uuid4()
+
+        nodeType = 'leaf'
+        config = self.storage.getDefaultConfiguration(nodeType)
+        config['pubsub#node_type'] = nodeType
+
+        d = self.storage.createNode(nodeIdentifier, requestor, config)
+        d.addCallback(lambda _: nodeIdentifier)
+        return d
+
+
+    def getDefaultConfiguration(self, nodeType):
+        d = defer.succeed(self.storage.getDefaultConfiguration(nodeType))
+        return d
+
+
+    def getNodeConfiguration(self, nodeIdentifier):
+        if not nodeIdentifier:
+            return defer.fail(error.NoRootNode())
+
+        d = self.storage.getNode(nodeIdentifier)
+        d.addCallback(lambda node: node.getConfiguration())
+
+        return d
+
+
+    def setNodeConfiguration(self, nodeIdentifier, options, requestor):
+        if not nodeIdentifier:
+            return defer.fail(error.NoRootNode())
+
+        d = self.storage.getNode(nodeIdentifier)
+        d.addCallback(_getAffiliation, requestor)
+        d.addCallback(self._doSetNodeConfiguration, options)
+        return d
+
+
+    def _doSetNodeConfiguration(self, result, options):
+        node, affiliation = result
+
+        if affiliation != 'owner':
+            raise error.Forbidden()
+
+        return node.setConfiguration(options)
+
+
+    def getAffiliations(self, entity):
+        return self.storage.getAffiliations(entity)
+
+
+    def getItems(self, nodeIdentifier, requestor, maxItems=None,
+                       itemIdentifiers=None):
+        d = self.storage.getNode(nodeIdentifier)
+        d.addCallback(_getAffiliation, requestor)
+        d.addCallback(self._doGetItems, maxItems, itemIdentifiers)
+        return d
+
+
+    def _doGetItems(self, result, maxItems, itemIdentifiers):
+        node, affiliation = result
+
+        if not ILeafNode.providedBy(node):
+            return []
+
+        if affiliation == 'outcast':
+            raise error.Forbidden()
+
+        if itemIdentifiers:
+            return node.getItemsById(itemIdentifiers)
+        else:
+            return node.getItems(maxItems)
+
+
+    def retractItem(self, nodeIdentifier, itemIdentifiers, requestor):
+        d = self.storage.getNode(nodeIdentifier)
+        d.addCallback(_getAffiliation, requestor)
+        d.addCallback(self._doRetract, itemIdentifiers)
+        return d
+
+
+    def _doRetract(self, result, itemIdentifiers):
+        node, affiliation = result
+        persistItems = node.getConfiguration()["pubsub#persist_items"]
+
+        if affiliation not in ['owner', 'publisher']:
+            raise error.Forbidden()
+
+        if not persistItems:
+            raise error.NodeNotPersistent()
+
+        d = node.removeItems(itemIdentifiers)
+        d.addCallback(self._doNotifyRetraction, node.nodeIdentifier)
+        return d
+
+
+    def _doNotifyRetraction(self, itemIdentifiers, nodeIdentifier):
+        self.dispatch({'itemIdentifiers': itemIdentifiers,
+                       'nodeIdentifier': nodeIdentifier },
+                      '//event/pubsub/retract')
+
+
+    def purgeNode(self, nodeIdentifier, requestor):
+        d = self.storage.getNode(nodeIdentifier)
+        d.addCallback(_getAffiliation, requestor)
+        d.addCallback(self._doPurge)
+        return d
+
+
+    def _doPurge(self, result):
+        node, affiliation = result
+        persistItems = node.getConfiguration()["pubsub#persist_items"]
+
+        if affiliation != 'owner':
+            raise error.Forbidden()
+
+        if not persistItems:
+            raise error.NodeNotPersistent()
+
+        d = node.purge()
+        d.addCallback(self._doNotifyPurge, node.nodeIdentifier)
+        return d
+
+
+    def _doNotifyPurge(self, result, nodeIdentifier):
+        self.dispatch(nodeIdentifier, '//event/pubsub/purge')
+
+
+    def registerPreDelete(self, preDeleteFn):
+        self._callbackList.append(preDeleteFn)
+
+
+    def getSubscribers(self, nodeIdentifier):
+        def cb(subscriptions):
+            return [subscription.subscriber for subscription in subscriptions]
+
+        d = self.storage.getNode(nodeIdentifier)
+        d.addCallback(lambda node: node.getSubscriptions('subscribed'))
+        d.addCallback(cb)
+        return d
+
+
+    def deleteNode(self, nodeIdentifier, requestor, redirectURI=None):
+        d = self.storage.getNode(nodeIdentifier)
+        d.addCallback(_getAffiliation, requestor)
+        d.addCallback(self._doPreDelete, redirectURI)
+        return d
+
+
+    def _doPreDelete(self, result, redirectURI):
+        node, affiliation = result
+
+        if affiliation != 'owner':
+            raise error.Forbidden()
+
+        data = {'nodeIdentifier': node.nodeIdentifier,
+                'redirectURI': redirectURI}
+
+        d = defer.DeferredList([cb(data)
+                                for cb in self._callbackList],
+                               consumeErrors=1)
+        d.addCallback(self._doDelete, node.nodeIdentifier)
+
+
+    def _doDelete(self, result, nodeIdentifier):
+        dl = []
+        for succeeded, r in result:
+            if succeeded and r:
+                dl.extend(r)
+
+        d = self.storage.deleteNode(nodeIdentifier)
+        d.addCallback(self._doNotifyDelete, dl)
+
+        return d
+
+
+    def _doNotifyDelete(self, result, dl):
+        for d in dl:
+            d.callback(None)
+
+
+
+class PubSubResourceFromBackend(PubSubResource):
+    """
+    Adapts a backend to an xmpp publish-subscribe service.
+    """
+
+    features = [
+        "config-node",
+        "create-nodes",
+        "delete-any",
+        "delete-nodes",
+        "item-ids",
+        "meta-data",
+        "publish",
+        "purge-nodes",
+        "retract-items",
+        "retrieve-affiliations",
+        "retrieve-default",
+        "retrieve-items",
+        "retrieve-subscriptions",
+        "subscribe",
+    ]
+
+    discoIdentity = disco.DiscoIdentity('pubsub',
+                                        'service',
+                                        'Idavoll publish-subscribe service')
+
+    pubsubService = None
+
+    _errorMap = {
+        error.NodeNotFound: ('item-not-found', None, None),
+        error.NodeExists: ('conflict', None, None),
+        error.Forbidden: ('forbidden', None, None),
+        error.ItemForbidden: ('bad-request', 'item-forbidden', None),
+        error.ItemRequired: ('bad-request', 'item-required', None),
+        error.NoInstantNodes: ('not-acceptable',
+                               'unsupported',
+                               'instant-nodes'),
+        error.NotSubscribed: ('unexpected-request', 'not-subscribed', None),
+        error.InvalidConfigurationOption: ('not-acceptable', None, None),
+        error.InvalidConfigurationValue: ('not-acceptable', None, None),
+        error.NodeNotPersistent: ('feature-not-implemented',
+                                  'unsupported',
+                                  'persistent-node'),
+        error.NoRootNode: ('bad-request', None, None),
+        error.NoCollections: ('feature-not-implemented',
+                              'unsupported',
+                              'collections'),
+        error.NoPublishing: ('feature-not-implemented',
+                             'unsupported',
+                             'publish'),
+    }
+
+    def __init__(self, backend):
+        PubSubResource.__init__(self)
+
+        self.backend = backend
+        self.hideNodes = False
+
+        self.backend.registerNotifier(self._notify)
+        self.backend.registerPreDelete(self._preDelete)
+
+        if self.backend.supportsAutoCreate():
+            self.features.append("auto-create")
+
+        if self.backend.supportsInstantNodes():
+            self.features.append("instant-nodes")
+
+        if self.backend.supportsOutcastAffiliation():
+            self.features.append("outcast-affiliation")
+
+        if self.backend.supportsPersistentItems():
+            self.features.append("persistent-items")
+
+        if self.backend.supportsPublisherAffiliation():
+            self.features.append("publisher-affiliation")
+
+
+    def _notify(self, data):
+        items = data['items']
+        nodeIdentifier = data['nodeIdentifier']
+        if 'subscription' not in data:
+            d = self.backend.getNotifications(nodeIdentifier, items)
+        else:
+            subscription = data['subscription']
+            d = defer.succeed([(subscription.subscriber, [subscription],
+                                items)])
+        d.addCallback(lambda notifications: self.pubsubService.notifyPublish(
+                                                self.serviceJID,
+                                                nodeIdentifier,
+                                                notifications))
+
+
+    def _preDelete(self, data):
+        nodeIdentifier = data['nodeIdentifier']
+        redirectURI = data.get('redirectURI', None)
+        d = self.backend.getSubscribers(nodeIdentifier)
+        d.addCallback(lambda subscribers: self.pubsubService.notifyDelete(
+                                                self.serviceJID,
+                                                nodeIdentifier,
+                                                subscribers,
+                                                redirectURI))
+        return d
+
+
+    def _mapErrors(self, failure):
+        e = failure.trap(*self._errorMap.keys())
+
+        condition, pubsubCondition, feature = self._errorMap[e]
+        msg = failure.value.msg
+
+        if pubsubCondition:
+            exc = PubSubError(condition, pubsubCondition, feature, msg)
+        else:
+            exc = StanzaError(condition, text=msg)
+
+        raise exc
+
+
+    def getInfo(self, requestor, service, nodeIdentifier):
+        info = {}
+
+        def saveType(result):
+            info['type'] = result
+            return nodeIdentifier
+
+        def saveMetaData(result):
+            info['meta-data'] = result
+            return info
+
+        def trapNotFound(failure):
+            failure.trap(error.NodeNotFound)
+            return info
+
+        d = defer.succeed(nodeIdentifier)
+        d.addCallback(self.backend.getNodeType)
+        d.addCallback(saveType)
+        d.addCallback(self.backend.getNodeMetaData)
+        d.addCallback(saveMetaData)
+        d.addErrback(trapNotFound)
+        d.addErrback(self._mapErrors)
+        return d
+
+
+    def getNodes(self, requestor, service, nodeIdentifier):
+        if service.resource:
+            return defer.succeed([])
+        d = self.backend.getNodes()
+        return d.addErrback(self._mapErrors)
+
+
+    def getConfigurationOptions(self):
+        return self.backend.nodeOptions
+
+    def _publish_errb(self, failure, request):
+        if failure.type == error.NodeNotFound and self.backend.supportsAutoCreate():
+            d = self.backend.createNode(request.nodeIdentifier,
+                                        request.sender)
+            d.addCallback(lambda ignore,
+                                 request: self.backend.publish(request.nodeIdentifier,
+                                                               request.items,
+                                                               request.sender),
+                          request)
+            return d
+
+        raise failure
+
+
+    def publish(self, request):
+        d = self.backend.publish(request.nodeIdentifier,
+                                 request.items,
+                                 request.sender)
+        d.addErrback(self._publish_errb, request)
+        return d.addErrback(self._mapErrors)
+
+
+    def subscribe(self, request):
+        d = self.backend.subscribe(request.nodeIdentifier,
+                                   request.subscriber,
+                                   request.sender)
+        return d.addErrback(self._mapErrors)
+
+
+    def unsubscribe(self, request):
+        d = self.backend.unsubscribe(request.nodeIdentifier,
+                                     request.subscriber,
+                                     request.sender)
+        return d.addErrback(self._mapErrors)
+
+
+    def subscriptions(self, request):
+        d = self.backend.getSubscriptions(request.sender)
+        return d.addErrback(self._mapErrors)
+
+
+    def affiliations(self, request):
+        d = self.backend.getAffiliations(request.sender)
+        return d.addErrback(self._mapErrors)
+
+
+    def create(self, request):
+        d = self.backend.createNode(request.nodeIdentifier,
+                                    request.sender)
+        return d.addErrback(self._mapErrors)
+
+
+    def default(self, request):
+        d = self.backend.getDefaultConfiguration(request.nodeType)
+        return d.addErrback(self._mapErrors)
+
+
+    def configureGet(self, request):
+        d = self.backend.getNodeConfiguration(request.nodeIdentifier)
+        return d.addErrback(self._mapErrors)
+
+
+    def configureSet(self, request):
+        d = self.backend.setNodeConfiguration(request.nodeIdentifier,
+                                              request.options,
+                                              request.sender)
+        return d.addErrback(self._mapErrors)
+
+
+    def items(self, request):
+        d = self.backend.getItems(request.nodeIdentifier,
+                                  request.sender,
+                                  request.maxItems,
+                                  request.itemIdentifiers)
+        return d.addErrback(self._mapErrors)
+
+
+    def retract(self, request):
+        d = self.backend.retractItem(request.nodeIdentifier,
+                                     request.itemIdentifiers,
+                                     request.sender)
+        return d.addErrback(self._mapErrors)
+
+
+    def purge(self, request):
+        d = self.backend.purgeNode(request.nodeIdentifier,
+                                   request.sender)
+        return d.addErrback(self._mapErrors)
+
+
+    def delete(self, request):
+        d = self.backend.deleteNode(request.nodeIdentifier,
+                                    request.sender)
+        return d.addErrback(self._mapErrors)
+
+components.registerAdapter(PubSubResourceFromBackend,
+                           IBackendService,
+                           IPubSubResource)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat_pubsub/error.py	Thu May 17 12:48:14 2012 +0200
@@ -0,0 +1,95 @@
+# Copyright (c) 2003-2008 Ralph Meijer
+# See LICENSE for details.
+
+class Error(Exception):
+    msg = ''
+
+    def __init__(self, msg=None):
+        self.msg = msg or self.msg
+
+
+    def __str__(self):
+        return self.msg
+
+
+
+class NodeNotFound(Error):
+    pass
+
+
+
+class NodeExists(Error):
+    pass
+
+
+
+class NotSubscribed(Error):
+    """
+    Entity is not subscribed to this node.
+    """
+
+
+
+class SubscriptionExists(Error):
+    """
+    There already exists a subscription to this node.
+    """
+
+
+
+class Forbidden(Error):
+    pass
+
+
+
+class ItemForbidden(Error):
+    pass
+
+
+
+class ItemRequired(Error):
+    pass
+
+
+
+class NoInstantNodes(Error):
+    pass
+
+
+
+class InvalidConfigurationOption(Error):
+    msg = 'Invalid configuration option'
+
+
+
+class InvalidConfigurationValue(Error):
+    msg = 'Bad configuration value'
+
+
+
+class NodeNotPersistent(Error):
+    pass
+
+
+
+class NoRootNode(Error):
+    pass
+
+
+
+class NoCallbacks(Error):
+    """
+    There are no callbacks for this node.
+    """
+
+
+
+class NoCollections(Error):
+    pass
+
+
+
+class NoPublishing(Error):
+    """
+    This node does not support publishing.
+    """
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat_pubsub/gateway.py	Thu May 17 12:48:14 2012 +0200
@@ -0,0 +1,868 @@
+# -*- test-case-name: idavoll.test.test_gateway -*-
+#
+# Copyright (c) 2003-2009 Ralph Meijer
+# See LICENSE for details.
+
+"""
+Web resources and client for interacting with pubsub services.
+"""
+
+import cgi
+from time import gmtime, strftime
+import urllib
+import urlparse
+
+import simplejson
+
+from twisted.application import service
+from twisted.internet import defer, reactor
+from twisted.python import log
+from twisted.web import client
+from twisted.web2 import http, http_headers, resource, responsecode
+from twisted.web2 import channel, server
+from twisted.web2.stream import readStream
+from twisted.words.protocols.jabber.jid import JID
+from twisted.words.protocols.jabber.error import StanzaError
+from twisted.words.xish import domish
+
+from wokkel.pubsub import Item
+from wokkel.pubsub import PubSubClient
+
+from idavoll import error
+
+NS_ATOM = 'http://www.w3.org/2005/Atom'
+MIME_ATOM_ENTRY = 'application/atom+xml;type=entry'
+MIME_JSON = 'application/json'
+
+class XMPPURIParseError(ValueError):
+    """
+    Raised when a given XMPP URI couldn't be properly parsed.
+    """
+
+
+
+def getServiceAndNode(uri):
+    """
+    Given an XMPP URI, extract the publish subscribe service JID and node ID.
+    """
+
+    try:
+        scheme, rest = uri.split(':', 1)
+    except ValueError:
+        raise XMPPURIParseError("No URI scheme component")
+
+    if scheme != 'xmpp':
+        raise XMPPURIParseError("Unknown URI scheme")
+
+    if rest.startswith("//"):
+        raise XMPPURIParseError("Unexpected URI authority component")
+
+    try:
+        entity, query = rest.split('?', 1)
+    except ValueError:
+        raise XMPPURIParseError("No URI query component")
+
+    if not entity:
+        raise XMPPURIParseError("Empty URI path component")
+
+    try:
+        service = JID(entity)
+    except Exception, e:
+        raise XMPPURIParseError("Invalid JID: %s" % e)
+
+    params = cgi.parse_qs(query)
+
+    try:
+        nodeIdentifier = params['node'][0]
+    except (KeyError, ValueError):
+        nodeIdentifier = ''
+
+    return service, nodeIdentifier
+
+
+
+def getXMPPURI(service, nodeIdentifier):
+    """
+    Construct an XMPP URI from a service JID and node identifier.
+    """
+    return "xmpp:%s?;node=%s" % (service.full(), nodeIdentifier or '')
+
+
+
+class WebStreamParser(object):
+    def __init__(self):
+        self.elementStream = domish.elementStream()
+        self.elementStream.DocumentStartEvent = self.docStart
+        self.elementStream.ElementEvent = self.elem
+        self.elementStream.DocumentEndEvent = self.docEnd
+        self.done = False
+
+
+    def docStart(self, elem):
+        self.document = elem
+
+
+    def elem(self, elem):
+        self.document.addChild(elem)
+
+
+    def docEnd(self):
+        self.done = True
+
+
+    def parse(self, stream):
+        def endOfStream(result):
+            if not self.done:
+                raise Exception("No more stuff?")
+            else:
+                return self.document
+
+        d = readStream(stream, self.elementStream.parse)
+        d.addCallback(endOfStream)
+        return d
+
+
+
+class CreateResource(resource.Resource):
+    """
+    A resource to create a publish-subscribe node.
+    """
+    def __init__(self, backend, serviceJID, owner):
+        self.backend = backend
+        self.serviceJID = serviceJID
+        self.owner = owner
+
+
+    http_GET = None
+
+
+    def http_POST(self, request):
+        """
+        Respond to a POST request to create a new node.
+        """
+
+        def toResponse(nodeIdentifier):
+            uri = getXMPPURI(self.serviceJID, nodeIdentifier)
+            stream = simplejson.dumps({'uri': uri})
+            contentType = http_headers.MimeType.fromString(MIME_JSON)
+            return http.Response(responsecode.OK, stream=stream,
+                                 headers={'Content-Type': contentType})
+        d = self.backend.createNode(None, self.owner)
+        d.addCallback(toResponse)
+        return d
+
+
+
+class DeleteResource(resource.Resource):
+    """
+    A resource to create a publish-subscribe node.
+    """
+    def __init__(self, backend, serviceJID, owner):
+        self.backend = backend
+        self.serviceJID = serviceJID
+        self.owner = owner
+
+
+    http_GET = None
+
+
+    def http_POST(self, request):
+        """
+        Respond to a POST request to create a new node.
+        """
+
+        def gotStream(_):
+            if request.args.get('uri'):
+                jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0])
+                return defer.succeed(nodeIdentifier)
+            else:
+                raise http.HTTPError(http.Response(responsecode.BAD_REQUEST,
+                                                   "No URI given"))
+
+        def doDelete(nodeIdentifier, data):
+            if data:
+                params = simplejson.loads(''.join(data))
+                redirectURI = params.get('redirect_uri')
+            else:
+                redirectURI = None
+
+            return self.backend.deleteNode(nodeIdentifier, self.owner,
+                                           redirectURI)
+
+        def respond(result):
+            return http.Response(responsecode.NO_CONTENT)
+
+
+        def trapNotFound(failure):
+            failure.trap(error.NodeNotFound)
+            return http.StatusResponse(responsecode.NOT_FOUND,
+                                       "Node not found")
+
+        def trapXMPPURIParseError(failure):
+            failure.trap(XMPPURIParseError)
+            return http.StatusResponse(responsecode.BAD_REQUEST,
+                    "Malformed XMPP URI: %s" % failure.value)
+
+        data = []
+        d = readStream(request.stream, data.append)
+        d.addCallback(gotStream)
+        d.addCallback(doDelete, data)
+        d.addCallback(respond)
+        d.addErrback(trapNotFound)
+        d.addErrback(trapXMPPURIParseError)
+        return d
+
+
+
+class PublishResource(resource.Resource):
+    """
+    A resource to publish to a publish-subscribe node.
+    """
+
+    def __init__(self, backend, serviceJID, owner):
+        self.backend = backend
+        self.serviceJID = serviceJID
+        self.owner = owner
+
+
+    http_GET = None
+
+
+    def checkMediaType(self, request):
+        ctype = request.headers.getHeader('content-type')
+
+        if not ctype:
+            raise http.HTTPError(
+                http.StatusResponse(
+                    responsecode.BAD_REQUEST,
+                    "No specified Media Type"))
+
+        if (ctype.mediaType != 'application' or
+            ctype.mediaSubtype != 'atom+xml' or
+            ctype.params.get('type') != 'entry' or
+            ctype.params.get('charset', 'utf-8') != 'utf-8'):
+            raise http.HTTPError(
+                http.StatusResponse(
+                    responsecode.UNSUPPORTED_MEDIA_TYPE,
+                    "Unsupported Media Type: %s" %
+                        http_headers.generateContentType(ctype)))
+
+
+    def parseXMLPayload(self, stream):
+        p = WebStreamParser()
+        return p.parse(stream)
+
+
+    def http_POST(self, request):
+        """
+        Respond to a POST request to create a new item.
+        """
+
+        def toResponse(nodeIdentifier):
+            uri = getXMPPURI(self.serviceJID, nodeIdentifier)
+            stream = simplejson.dumps({'uri': uri})
+            contentType = http_headers.MimeType.fromString(MIME_JSON)
+            return http.Response(responsecode.OK, stream=stream,
+                                 headers={'Content-Type': contentType})
+
+        def gotNode(nodeIdentifier, payload):
+            item = Item(id='current', payload=payload)
+            d = self.backend.publish(nodeIdentifier, [item], self.owner)
+            d.addCallback(lambda _: nodeIdentifier)
+            return d
+
+        def getNode():
+            if request.args.get('uri'):
+                jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0])
+                return defer.succeed(nodeIdentifier)
+            else:
+                return self.backend.createNode(None, self.owner)
+
+        def doPublish(payload):
+            d = getNode()
+            d.addCallback(gotNode, payload)
+            return d
+
+        def trapNotFound(failure):
+            failure.trap(error.NodeNotFound)
+            return http.StatusResponse(responsecode.NOT_FOUND,
+                                       "Node not found")
+
+        def trapXMPPURIParseError(failure):
+            failure.trap(XMPPURIParseError)
+            return http.StatusResponse(responsecode.BAD_REQUEST,
+                    "Malformed XMPP URI: %s" % failure.value)
+
+        self.checkMediaType(request)
+        d = self.parseXMLPayload(request.stream)
+        d.addCallback(doPublish)
+        d.addCallback(toResponse)
+        d.addErrback(trapNotFound)
+        d.addErrback(trapXMPPURIParseError)
+        return d
+
+
+
+class ListResource(resource.Resource):
+    def __init__(self, service):
+        self.service = service
+
+
+    def render(self, request):
+        def responseFromNodes(nodeIdentifiers):
+            stream = simplejson.dumps(nodeIdentifiers)
+            contentType = http_headers.MimeType.fromString(MIME_JSON)
+            return http.Response(responsecode.OK, stream=stream,
+                                 headers={'Content-Type': contentType})
+
+        d = self.service.getNodes()
+        d.addCallback(responseFromNodes)
+        return d
+
+
+
+# Service for subscribing to remote XMPP Pubsub nodes and web resources
+
+def extractAtomEntries(items):
+    """
+    Extract atom entries from a list of publish-subscribe items.
+
+    @param items: List of L{domish.Element}s that represent publish-subscribe
+                  items.
+    @type items: C{list}
+    """
+
+    atomEntries = []
+
+    for item in items:
+        # ignore non-items (i.e. retractions)
+        if item.name != 'item':
+            continue
+
+        atomEntry = None
+        for element in item.elements():
+            # extract the first element that is an atom entry
+            if element.uri == NS_ATOM and element.name == 'entry':
+                atomEntry = element
+                break
+
+        if atomEntry:
+            atomEntries.append(atomEntry)
+
+    return atomEntries
+
+
+
+def constructFeed(service, nodeIdentifier, entries, title):
+    nodeURI = getXMPPURI(service, nodeIdentifier)
+    now = strftime("%Y-%m-%dT%H:%M:%SZ", gmtime())
+
+    # Collect the received entries in a feed
+    feed = domish.Element((NS_ATOM, 'feed'))
+    feed.addElement('title', content=title)
+    feed.addElement('id', content=nodeURI)
+    feed.addElement('updated', content=now)
+
+    for entry in entries:
+        feed.addChild(entry)
+
+    return feed
+
+
+
+class RemoteSubscriptionService(service.Service, PubSubClient):
+    """
+    Service for subscribing to remote XMPP Publish-Subscribe nodes.
+
+    Subscriptions are created with a callback HTTP URI that is POSTed
+    to with the received items in notifications.
+    """
+
+    def __init__(self, jid, storage):
+        self.jid = jid
+        self.storage = storage
+
+
+    def trapNotFound(self, failure):
+        failure.trap(StanzaError)
+
+        if failure.value.condition == 'item-not-found':
+            raise error.NodeNotFound()
+        else:
+            return failure
+
+
+    def subscribeCallback(self, jid, nodeIdentifier, callback):
+        """
+        Subscribe a callback URI.
+
+        This registers a callback URI to be called when a notification is
+        received for the given node.
+
+        If this is the first callback registered for this node, the gateway
+        will subscribe to the node. Otherwise, the most recently published item
+        for this node is retrieved and, if present, the newly registered
+        callback will be called with that item.
+        """
+
+        def callbackForLastItem(items):
+            atomEntries = extractAtomEntries(items)
+
+            if not atomEntries:
+                return
+
+            self._postTo([callback], jid, nodeIdentifier, atomEntries[0],
+                         'application/atom+xml;type=entry')
+
+        def subscribeOrItems(hasCallbacks):
+            if hasCallbacks:
+                if not nodeIdentifier:
+                    return None
+                d = self.items(jid, nodeIdentifier, 1)
+                d.addCallback(callbackForLastItem)
+            else:
+                d = self.subscribe(jid, nodeIdentifier, self.jid)
+
+            d.addErrback(self.trapNotFound)
+            return d
+
+        d = self.storage.hasCallbacks(jid, nodeIdentifier)
+        d.addCallback(subscribeOrItems)
+        d.addCallback(lambda _: self.storage.addCallback(jid, nodeIdentifier,
+                                                         callback))
+        return d
+
+
+    def unsubscribeCallback(self, jid, nodeIdentifier, callback):
+        """
+        Unsubscribe a callback.
+
+        If this was the last registered callback for this node, the
+        gateway will unsubscribe from node.
+        """
+
+        def cb(last):
+            if last:
+                return self.unsubscribe(jid, nodeIdentifier, self.jid)
+
+        d = self.storage.removeCallback(jid, nodeIdentifier, callback)
+        d.addCallback(cb)
+        return d
+
+
+    def itemsReceived(self, event):
+        """
+        Fire up HTTP client to do callback
+        """
+
+        atomEntries = extractAtomEntries(event.items)
+        service = event.sender
+        nodeIdentifier = event.nodeIdentifier
+        headers = event.headers
+
+        # Don't notify if there are no atom entries
+        if not atomEntries:
+            return
+
+        if len(atomEntries) == 1:
+            contentType = 'application/atom+xml;type=entry'
+            payload = atomEntries[0]
+        else:
+            contentType = 'application/atom+xml;type=feed'
+            payload = constructFeed(service, nodeIdentifier, atomEntries,
+                                    title='Received item collection')
+
+        self.callCallbacks(service, nodeIdentifier, payload, contentType)
+
+        if 'Collection' in headers:
+            for collection in headers['Collection']:
+                nodeIdentifier = collection or ''
+                self.callCallbacks(service, nodeIdentifier, payload,
+                                   contentType)
+
+
+    def deleteReceived(self, event):
+        """
+        Fire up HTTP client to do callback
+        """
+
+        service = event.sender
+        nodeIdentifier = event.nodeIdentifier
+        redirectURI = event.redirectURI
+        self.callCallbacks(service, nodeIdentifier, eventType='DELETED',
+                           redirectURI=redirectURI)
+
+
+    def _postTo(self, callbacks, service, nodeIdentifier,
+                      payload=None, contentType=None, eventType=None,
+                      redirectURI=None):
+
+        if not callbacks:
+            return
+
+        postdata = None
+        nodeURI = getXMPPURI(service, nodeIdentifier)
+        headers = {'Referer': nodeURI.encode('utf-8'),
+                   'PubSub-Service': service.full().encode('utf-8')}
+
+        if payload:
+            postdata = payload.toXml().encode('utf-8')
+            if contentType:
+                headers['Content-Type'] = "%s;charset=utf-8" % contentType
+
+        if eventType:
+            headers['Event'] = eventType
+
+        if redirectURI:
+            headers['Link'] = '<%s>; rel=alternate' % (
+                              redirectURI.encode('utf-8'),
+                              )
+
+        def postNotification(callbackURI):
+            f = getPageWithFactory(str(callbackURI),
+                                   method='POST',
+                                   postdata=postdata,
+                                   headers=headers)
+            d = f.deferred
+            d.addErrback(log.err)
+
+        for callbackURI in callbacks:
+            reactor.callLater(0, postNotification, callbackURI)
+
+
+    def callCallbacks(self, service, nodeIdentifier,
+                            payload=None, contentType=None, eventType=None,
+                            redirectURI=None):
+
+        def eb(failure):
+            failure.trap(error.NoCallbacks)
+
+            # No callbacks were registered for this node. Unsubscribe?
+
+        d = self.storage.getCallbacks(service, nodeIdentifier)
+        d.addCallback(self._postTo, service, nodeIdentifier, payload,
+                                    contentType, eventType, redirectURI)
+        d.addErrback(eb)
+        d.addErrback(log.err)
+
+
+
+class RemoteSubscribeBaseResource(resource.Resource):
+    """
+    Base resource for remote pubsub node subscription and unsubscription.
+
+    This resource accepts POST request with a JSON document that holds
+    a dictionary with the keys C{uri} and C{callback} that respectively map
+    to the XMPP URI of the publish-subscribe node and the callback URI.
+
+    This class should be inherited with L{serviceMethod} overridden.
+
+    @cvar serviceMethod: The name of the method to be called with
+                         the JID of the pubsub service, the node identifier
+                         and the callback URI as received in the HTTP POST
+                         request to this resource.
+    """
+    serviceMethod = None
+    errorMap = {
+            error.NodeNotFound:
+                (responsecode.FORBIDDEN, "Node not found"),
+            error.NotSubscribed:
+                (responsecode.FORBIDDEN, "No such subscription found"),
+            error.SubscriptionExists:
+                (responsecode.FORBIDDEN, "Subscription already exists"),
+    }
+
+    def __init__(self, service):
+        self.service = service
+        self.params = None
+
+
+    http_GET = None
+
+
+    def http_POST(self, request):
+        def trapNotFound(failure):
+            err = failure.trap(*self.errorMap.keys())
+            code, msg = self.errorMap[err]
+            return http.StatusResponse(code, msg)
+
+        def respond(result):
+            return http.Response(responsecode.NO_CONTENT)
+
+        def gotRequest(result):
+            uri = self.params['uri']
+            callback = self.params['callback']
+
+            jid, nodeIdentifier = getServiceAndNode(uri)
+            method = getattr(self.service, self.serviceMethod)
+            d = method(jid, nodeIdentifier, callback)
+            return d
+
+        def storeParams(data):
+            self.params = simplejson.loads(data)
+
+        def trapXMPPURIParseError(failure):
+            failure.trap(XMPPURIParseError)
+            return http.StatusResponse(responsecode.BAD_REQUEST,
+                    "Malformed XMPP URI: %s" % failure.value)
+
+        d = readStream(request.stream, storeParams)
+        d.addCallback(gotRequest)
+        d.addCallback(respond)
+        d.addErrback(trapNotFound)
+        d.addErrback(trapXMPPURIParseError)
+        return d
+
+
+
+class RemoteSubscribeResource(RemoteSubscribeBaseResource):
+    """
+    Resource to subscribe to a remote publish-subscribe node.
+
+    The passed C{uri} is the XMPP URI of the node to subscribe to and the
+    C{callback} is the callback URI. Upon receiving notifications from the
+    node, a POST request will be perfomed on the callback URI.
+    """
+    serviceMethod = 'subscribeCallback'
+
+
+
+class RemoteUnsubscribeResource(RemoteSubscribeBaseResource):
+    """
+    Resource to unsubscribe from a remote publish-subscribe node.
+
+    The passed C{uri} is the XMPP URI of the node to unsubscribe from and the
+    C{callback} is the callback URI that was registered for it.
+    """
+    serviceMethod = 'unsubscribeCallback'
+
+
+
+class RemoteItemsResource(resource.Resource):
+    """
+    Resource for retrieving items from a remote pubsub node.
+    """
+
+    def __init__(self, service):
+        self.service = service
+
+
+    def render(self, request):
+        try:
+            maxItems = int(request.args.get('max_items', [0])[0]) or None
+        except ValueError:
+            return http.StatusResponse(responsecode.BAD_REQUEST,
+                    "The argument max_items has an invalid value.")
+
+        try:
+            uri = request.args['uri'][0]
+        except KeyError:
+            return http.StatusResponse(responsecode.BAD_REQUEST,
+                    "No URI for the remote node provided.")
+
+        try:
+            jid, nodeIdentifier = getServiceAndNode(uri)
+        except XMPPURIParseError:
+            return http.StatusResponse(responsecode.BAD_REQUEST,
+                    "Malformed XMPP URI: %s" % uri)
+
+        def respond(items):
+            """Create a feed out the retrieved items."""
+            contentType = http_headers.MimeType('application',
+                                                'atom+xml',
+                                                {'type': 'feed'})
+            atomEntries = extractAtomEntries(items)
+            feed = constructFeed(jid, nodeIdentifier, atomEntries,
+                                    "Retrieved item collection")
+            payload = feed.toXml().encode('utf-8')
+            return http.Response(responsecode.OK, stream=payload,
+                                 headers={'Content-Type': contentType})
+
+        def trapNotFound(failure):
+            failure.trap(StanzaError)
+            if not failure.value.condition == 'item-not-found':
+                raise failure
+            return http.StatusResponse(responsecode.NOT_FOUND,
+                                       "Node not found")
+
+        d = self.service.items(jid, nodeIdentifier, maxItems)
+        d.addCallback(respond)
+        d.addErrback(trapNotFound)
+        return d
+
+
+
+# Client side code to interact with a service as provided above
+
+def getPageWithFactory(url, contextFactory=None, *args, **kwargs):
+    """Download a web page.
+
+    Download a page. Return the factory that holds a deferred, which will
+    callback with a page (as a string) or errback with a description of the
+    error.
+
+    See HTTPClientFactory to see what extra args can be passed.
+    """
+
+    scheme, host, port, path = client._parse(url)
+    factory = client.HTTPClientFactory(url, *args, **kwargs)
+    factory.protocol.handleStatus_204 = lambda self: self.handleStatus_200()
+
+    if scheme == 'https':
+        from twisted.internet import ssl
+        if contextFactory is None:
+            contextFactory = ssl.ClientContextFactory()
+        reactor.connectSSL(host, port, factory, contextFactory)
+    else:
+        reactor.connectTCP(host, port, factory)
+    return factory
+
+
+
+class CallbackResource(resource.Resource):
+    """
+    Web resource for retrieving gateway notifications.
+    """
+
+    def __init__(self, callback):
+        self.callback = callback
+
+
+    http_GET = None
+
+
+    def http_POST(self, request):
+        p = WebStreamParser()
+        if not request.headers.hasHeader('Event'):
+            d = p.parse(request.stream)
+        else:
+            d = defer.succeed(None)
+        d.addCallback(self.callback, request.headers)
+        d.addCallback(lambda _: http.Response(responsecode.NO_CONTENT))
+        return d
+
+
+
+class GatewayClient(service.Service):
+    """
+    Service that provides client access to the HTTP Gateway into Idavoll.
+    """
+
+    agent = "Idavoll HTTP Gateway Client"
+
+    def __init__(self, baseURI, callbackHost=None, callbackPort=None):
+        self.baseURI = baseURI
+        self.callbackHost = callbackHost or 'localhost'
+        self.callbackPort = callbackPort or 8087
+        root = resource.Resource()
+        root.child_callback = CallbackResource(lambda *args, **kwargs: self.callback(*args, **kwargs))
+        self.site = server.Site(root)
+
+
+    def startService(self):
+        self.port = reactor.listenTCP(self.callbackPort,
+                                      channel.HTTPFactory(self.site))
+
+
+    def stopService(self):
+        return self.port.stopListening()
+
+
+    def _makeURI(self, verb, query=None):
+        uriComponents = urlparse.urlparse(self.baseURI)
+        uri = urlparse.urlunparse((uriComponents[0],
+                                   uriComponents[1],
+                                   uriComponents[2] + verb,
+                                   '',
+                                   query and urllib.urlencode(query) or '',
+                                   ''))
+        return uri
+
+
+    def callback(self, data, headers):
+        pass
+
+
+    def ping(self):
+        f = getPageWithFactory(self._makeURI(''),
+                               method='HEAD',
+                               agent=self.agent)
+        return f.deferred
+
+
+    def create(self):
+        f = getPageWithFactory(self._makeURI('create'),
+                    method='POST',
+                    agent=self.agent)
+        return f.deferred.addCallback(simplejson.loads)
+
+
+    def delete(self, xmppURI, redirectURI=None):
+        query = {'uri': xmppURI}
+
+        if redirectURI:
+            params = {'redirect_uri': redirectURI}
+            postdata = simplejson.dumps(params)
+            headers = {'Content-Type': MIME_JSON}
+        else:
+            postdata = None
+            headers = None
+
+        f = getPageWithFactory(self._makeURI('delete', query),
+                    method='POST',
+                    postdata=postdata,
+                    headers=headers,
+                    agent=self.agent)
+        return f.deferred
+
+
+    def publish(self, entry, xmppURI=None):
+        query = xmppURI and {'uri': xmppURI}
+
+        f = getPageWithFactory(self._makeURI('publish', query),
+                    method='POST',
+                    postdata=entry.toXml().encode('utf-8'),
+                    headers={'Content-Type': MIME_ATOM_ENTRY},
+                    agent=self.agent)
+        return f.deferred.addCallback(simplejson.loads)
+
+
+    def listNodes(self):
+        f = getPageWithFactory(self._makeURI('list'),
+                    method='GET',
+                    agent=self.agent)
+        return f.deferred.addCallback(simplejson.loads)
+
+
+    def subscribe(self, xmppURI):
+        params = {'uri': xmppURI,
+                  'callback': 'http://%s:%s/callback' % (self.callbackHost,
+                                                         self.callbackPort)}
+        f = getPageWithFactory(self._makeURI('subscribe'),
+                    method='POST',
+                    postdata=simplejson.dumps(params),
+                    headers={'Content-Type': MIME_JSON},
+                    agent=self.agent)
+        return f.deferred
+
+
+    def unsubscribe(self, xmppURI):
+        params = {'uri': xmppURI,
+                  'callback': 'http://%s:%s/callback' % (self.callbackHost,
+                                                         self.callbackPort)}
+        f = getPageWithFactory(self._makeURI('unsubscribe'),
+                    method='POST',
+                    postdata=simplejson.dumps(params),
+                    headers={'Content-Type': MIME_JSON},
+                    agent=self.agent)
+        return f.deferred
+
+
+    def items(self, xmppURI, maxItems=None):
+        query = {'uri': xmppURI}
+        if maxItems:
+             query['max_items'] = int(maxItems)
+        f = getPageWithFactory(self._makeURI('items', query),
+                    method='GET',
+                    agent=self.agent)
+        return f.deferred
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat_pubsub/iidavoll.py	Thu May 17 12:48:14 2012 +0200
@@ -0,0 +1,585 @@
+# Copyright (c) 2003-2008 Ralph Meijer
+# See LICENSE for details.
+
+"""
+Interfaces for idavoll.
+"""
+
+from zope.interface import Attribute, Interface
+
+class IBackendService(Interface):
+    """ Interface to a backend service of a pubsub service. """
+
+
+    def __init__(storage):
+        """
+        @param storage: Object providing L{IStorage}.
+        """
+
+
+    def supportsPublisherAffiliation():
+        """ Reports if the backend supports the publisher affiliation.
+
+        @rtype: C{bool}
+        """
+
+
+    def supportsOutcastAffiliation():
+        """ Reports if the backend supports the publisher affiliation.
+
+        @rtype: C{bool}
+        """
+
+
+    def supportsPersistentItems():
+        """ Reports if the backend supports persistent items.
+
+        @rtype: C{bool}
+        """
+
+
+    def getNodeType(nodeIdentifier):
+        """ Return type of a node.
+
+        @return: a deferred that returns either 'leaf' or 'collection'
+        """
+
+
+    def getNodes():
+        """ Returns list of all nodes.
+
+        @return: a deferred that returns a C{list} of node ids.
+        """
+
+
+    def getNodeMetaData(nodeIdentifier):
+        """ Return meta data for a node.
+
+        @return: a deferred that returns a C{list} of C{dict}s with the
+                 metadata.
+        """
+
+
+    def createNode(nodeIdentifier, requestor):
+        """ Create a node.
+
+        @return: a deferred that fires when the node has been created.
+        """
+
+
+    def registerPreDelete(preDeleteFn):
+        """ Register a callback that is called just before a node deletion.
+
+        The function C{preDeletedFn} is added to a list of functions to be
+        called just before deletion of a node. The callback C{preDeleteFn} is
+        called with the C{nodeIdentifier} that is about to be deleted and
+        should return a deferred that returns a list of deferreds that are to
+        be fired after deletion. The backend collects the lists from all these
+        callbacks before actually deleting the node in question.  After
+        deletion all collected deferreds are fired to do post-processing.
+
+        The idea is that you want to be able to collect data from the node
+        before deleting it, for example to get a list of subscribers that have
+        to be notified after the node has been deleted. To do this,
+        C{preDeleteFn} fetches the subscriber list and passes this list to a
+        callback attached to a deferred that it sets up. This deferred is
+        returned in the list of deferreds.
+        """
+
+
+    def deleteNode(nodeIdentifier, requestor):
+        """ Delete a node.
+
+        @return: a deferred that fires when the node has been deleted.
+        """
+
+
+    def purgeNode(nodeIdentifier, requestor):
+        """ Removes all items in node from persistent storage """
+
+
+    def subscribe(nodeIdentifier, subscriber, requestor):
+        """ Request the subscription of an entity to a pubsub node.
+
+        Depending on the node's configuration and possible business rules, the
+        C{subscriber} is added to the list of subscriptions of the node with id
+        C{nodeIdentifier}. The C{subscriber} might be different from the
+        C{requestor}, and if the C{requestor} is not allowed to subscribe this
+        entity an exception should be raised.
+
+        @return: a deferred that returns the subscription state
+        """
+
+
+    def unsubscribe(nodeIdentifier, subscriber, requestor):
+        """ Cancel the subscription of an entity to a pubsub node.
+
+        The subscription of C{subscriber} is removed from the list of
+        subscriptions of the node with id C{nodeIdentifier}. If the
+        C{requestor} is not allowed to unsubscribe C{subscriber}, an an
+        exception should be raised.
+
+        @return: a deferred that fires when unsubscription is complete.
+        """
+
+
+    def getSubscribers(nodeIdentifier):
+        """ Get node subscriber list.
+
+        @return: a deferred that fires with the list of subscribers.
+        """
+
+
+    def getSubscriptions(entity):
+        """ Report the list of current subscriptions with this pubsub service.
+
+        Report the list of the current subscriptions with all nodes within this
+        pubsub service, for the C{entity}.
+
+        @return: a deferred that returns the list of all current subscriptions
+                 as tuples C{(nodeIdentifier, subscriber, subscription)}.
+        """
+
+
+    def getAffiliations(entity):
+        """ Report the list of current affiliations with this pubsub service.
+
+        Report the list of the current affiliations with all nodes within this
+        pubsub service, for the C{entity}.
+
+        @return: a deferred that returns the list of all current affiliations
+                 as tuples C{(nodeIdentifier, affiliation)}.
+        """
+
+
+    def publish(nodeIdentifier, items, requestor):
+        """ Publish items to a pubsub node.
+
+        @return: a deferred that fires when the items have been published.
+        @rtype: L{Deferred<twisted.internet.defer.Deferred>}
+        """
+
+
+    def registerNotifier(observerfn, *args, **kwargs):
+        """ Register callback which is called for notification. """
+
+
+    def getNotifications(nodeIdentifier, items):
+        """
+        Get notification list.
+
+        This method is called to discover which entities should receive
+        notifications for the given items that have just been published to the
+        given node.
+
+        The notification list contains tuples (subscriber, subscriptions,
+        items) to result in one notification per tuple: the given subscriptions
+        yielded the given items to be notified to this subscriber.  This
+        structure is needed allow for letting the subscriber know which
+        subscriptions yielded which notifications, while catering for
+        collection nodes and content-based subscriptions.
+
+        To minimize the amount of notifications per entity, implementers
+        should take care that if all items in C{items} were yielded
+        by the same set of subscriptions, exactly one tuple is for this
+        subscriber is returned, so that the subscriber would get exactly one
+        notification. Alternatively, one tuple per subscription combination.
+
+        @param nodeIdentifier: The identifier of the node the items were
+                               published to.
+        @type nodeIdentifier: C{unicode}.
+        @param items: The list of published items as
+                      L{Element<twisted.words.xish.domish.Element>}s.
+        @type items: C{list}
+        @return: The notification list as tuples of
+                 (L{JID<twisted.words.protocols.jabber.jid.JID>},
+                  C{list} of L{Subscription<wokkel.pubsub.Subscription>},
+                  C{list} of L{Element<twisted.words.xish.domish.Element>}.
+        @rtype: C{list}
+        """
+
+
+    def getItems(nodeIdentifier, requestor, maxItems=None, itemIdentifiers=[]):
+        """ Retrieve items from persistent storage
+
+        If C{maxItems} is given, return the C{maxItems} last published
+        items, else if C{itemIdentifiers} is not empty, return the items
+        requested.  If neither is given, return all items.
+
+        @return: a deferred that returns the requested items
+        """
+
+
+    def retractItem(nodeIdentifier, itemIdentifier, requestor):
+        """ Removes item in node from persistent storage """
+
+
+
+class IStorage(Interface):
+    """
+    Storage interface.
+    """
+
+
+    def getNode(nodeIdentifier):
+        """
+        Get Node.
+
+        @param nodeIdentifier: NodeID of the desired node.
+        @type nodeIdentifier: C{str}
+        @return: deferred that returns a L{INode} providing object.
+        """
+
+
+    def getNodeIds():
+        """
+        Return all NodeIDs.
+
+        @return: deferred that returns a list of NodeIDs (C{unicode}).
+        """
+
+
+    def createNode(nodeIdentifier, owner, config):
+        """
+        Create new node.
+
+        The implementation should make sure, the passed owner JID is stripped
+        of the resource (e.g. using C{owner.userhostJID()}). The passed config
+        is expected to have values for the fields returned by
+        L{getDefaultConfiguration}, as well as a value for
+        C{'pubsub#node_type'}.
+
+        @param nodeIdentifier: NodeID of the new node.
+        @type nodeIdentifier: C{unicode}
+        @param owner: JID of the new nodes's owner.
+        @type owner: L{JID<twisted.words.protocols.jabber.jid.JID>}
+        @param config: Node configuration.
+        @type config: C{dict}
+        @return: deferred that fires on creation.
+        """
+
+
+    def deleteNode(nodeIdentifier):
+        """
+        Delete a node.
+
+        @param nodeIdentifier: NodeID of the new node.
+        @type nodeIdentifier: C{unicode}
+        @return: deferred that fires on deletion.
+        """
+
+
+    def getAffiliations(entity):
+        """
+        Get all affiliations for entity.
+
+        The implementation should make sure, the passed owner JID is stripped
+        of the resource (e.g. using C{owner.userhostJID()}).
+
+        @param entity: JID of the entity.
+        @type entity: L{JID<twisted.words.protocols.jabber.jid.JID>}
+        @return: deferred that returns a C{list} of tuples of the form
+                 C{(nodeIdentifier, affiliation)}, where C{nodeIdentifier} is
+                 of the type L{unicode} and C{affiliation} is one of
+                 C{'owner'}, C{'publisher'} and C{'outcast'}.
+        """
+
+
+    def getSubscriptions(entity):
+        """
+        Get all subscriptions for an entity.
+
+        The implementation should make sure, the passed owner JID is stripped
+        of the resource (e.g. using C{owner.userhostJID()}).
+
+        @param entity: JID of the entity.
+        @type entity: L{JID<twisted.words.protocols.jabber.jid.JID>}
+        @return: deferred that returns a C{list} of tuples of the form
+                 C{(nodeIdentifier, subscriber, state)}, where
+                 C{nodeIdentifier} is of the type C{unicode}, C{subscriber} of
+                 the type J{JID<twisted.words.protocols.jabber.jid.JID>}, and
+                 C{state} is C{'subscribed'}, C{'pending'} or
+                 C{'unconfigured'}.
+        """
+
+
+    def getDefaultConfiguration(nodeType):
+        """
+        Get the default configuration for the given node type.
+
+        @param nodeType: Either C{'leaf'} or C{'collection'}.
+        @type nodeType: C{str}
+        @return: The default configuration.
+        @rtype: C{dict}.
+        @raises: L{idavoll.error.NoCollections} if collections are not
+                 supported.
+        """
+
+
+
+class INode(Interface):
+    """
+    Interface to the class of objects that represent nodes.
+    """
+
+    nodeType = Attribute("""The type of this node. One of {'leaf'},
+                           {'collection'}.""")
+    nodeIdentifier = Attribute("""The node identifer of this node""")
+
+
+    def getType():
+        """
+        Get node's type.
+
+        @return: C{'leaf'} or C{'collection'}.
+        """
+
+
+    def getConfiguration():
+        """
+        Get node's configuration.
+
+        The configuration must at least have two options:
+        C{pubsub#persist_items}, and C{pubsub#deliver_payloads}.
+
+        @return: C{dict} of configuration options.
+        """
+
+
+    def getMetaData():
+        """
+        Get node's meta data.
+
+        The meta data must be a superset of the configuration options, and
+        also at least should have a C{pubsub#node_type} entry.
+
+        @return: C{dict} of meta data.
+        """
+
+
+    def setConfiguration(options):
+        """
+        Set node's configuration.
+
+        The elements of {options} will set the new values for those
+        configuration items. This means that only changing items have to
+        be given.
+
+        @param options: a dictionary of configuration options.
+        @returns: a deferred that fires upon success.
+        """
+
+
+    def getAffiliation(entity):
+        """
+        Get affiliation of entity with this node.
+
+        @param entity: JID of entity.
+        @type entity: L{JID<twisted.words.protocols.jabber.jid.JID>}
+        @return: deferred that returns C{'owner'}, C{'publisher'}, C{'outcast'}
+                 or C{None}.
+        """
+
+
+    def getSubscription(subscriber):
+        """
+        Get subscription to this node of subscriber.
+
+        @param subscriber: JID of the new subscriptions' entity.
+        @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>}
+        @return: deferred that returns the subscription state (C{'subscribed'},
+                 C{'pending'} or C{None}).
+        """
+
+
+    def getSubscriptions(state=None):
+        """
+        Get list of subscriptions to this node.
+
+        The optional C{state} argument filters the subscriptions to their
+        state.
+
+        @param state: Subscription state filter. One of C{'subscribed'},
+                      C{'pending'}, C{'unconfigured'}.
+        @type state: C{str}
+        @return: a deferred that returns a C{list} of
+                 L{wokkel.pubsub.Subscription}s.
+        """
+
+
+    def addSubscription(subscriber, state, config):
+        """
+        Add new subscription to this node with given state.
+
+        @param subscriber: JID of the new subscriptions' entity.
+        @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>}
+        @param state: C{'subscribed'} or C{'pending'}
+        @type state: C{str}
+        @param config: Subscription configuration.
+        @param config: C{dict}
+        @return: deferred that fires on subscription.
+        """
+
+
+    def removeSubscription(subscriber):
+        """
+        Remove subscription to this node.
+
+        @param subscriber: JID of the subscriptions' entity.
+        @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>}
+        @return: deferred that fires on removal.
+        """
+
+
+    def isSubscribed(entity):
+        """
+        Returns whether entity has any subscription to this node.
+
+        Only returns C{True} when the subscription state (if present) is
+        C{'subscribed'} for any subscription that matches the bare JID.
+
+        @param subscriber: bare JID of the subscriptions' entity.
+        @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>}
+        @return: deferred that returns a C{bool}.
+        """
+
+
+    def getAffiliations():
+        """
+        Get affiliations of entities with this node.
+
+        @return: deferred that returns a C{list} of tuples (jid, affiliation),
+                 where jid is a L(JID<twisted.words.protocols.jabber.jid.JID>)
+                 and affiliation is one of C{'owner'},
+        C{'publisher'}, C{'outcast'}.
+        """
+
+
+
+class ILeafNode(Interface):
+    """
+    Interface to the class of objects that represent leaf nodes.
+    """
+
+    def storeItems(items, publisher):
+        """
+        Store items in persistent storage for later retrieval.
+
+        @param items: The list of items to be stored. Each item is the
+                      L{domish} representation of the XML fragment as defined
+                      for C{<item/>} in the
+                      C{http://jabber.org/protocol/pubsub} namespace.
+        @type items: C{list} of {domish.Element}
+        @param publisher: JID of the publishing entity.
+        @type publisher: L{JID<twisted.words.protocols.jabber.jid.JID>}
+        @return: deferred that fires upon success.
+        """
+
+
+    def removeItems(itemIdentifiers):
+        """
+        Remove items by id.
+
+        @param itemIdentifiers: C{list} of item ids.
+        @return: deferred that fires with a C{list} of ids of the items that
+                 were deleted
+        """
+
+
+    def getItems(maxItems=None):
+        """
+        Get items.
+
+        If C{maxItems} is not given, all items in the node are returned,
+        just like C{getItemsById}. Otherwise, C{maxItems} limits
+        the returned items to a maximum of that number of most recently
+        published items.
+
+        @param maxItems: if given, a natural number (>0) that limits the
+                          returned number of items.
+        @return: deferred that fires with a C{list} of found items.
+        """
+
+
+    def getItemsById(itemIdentifiers):
+        """
+        Get items by item id.
+
+        Each item in the returned list is a unicode string that
+        represent the XML of the item as it was published, including the
+        item wrapper with item id.
+
+        @param itemIdentifiers: C{list} of item ids.
+        @return: deferred that fires with a C{list} of found items.
+        """
+
+
+    def purge():
+        """
+        Purge node of all items in persistent storage.
+
+        @return: deferred that fires when the node has been purged.
+        """
+
+
+
+class IGatewayStorage(Interface):
+
+    def addCallback(service, nodeIdentifier, callback):
+        """
+        Register a callback URI.
+
+        The registered HTTP callback URI will have an Atom Entry documented
+        POSTed to it upon receiving a notification for the given pubsub node.
+
+        @param service: The XMPP entity that holds the node.
+        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
+        @param nodeIdentifier: The identifier of the publish-subscribe node.
+        @type nodeIdentifier: C{unicode}.
+        @param callback: The callback URI to be registered.
+        @type callback: C{str}.
+        @rtype: L{Deferred<twisted.internet.defer.Deferred>}
+        """
+
+    def removeCallback(service, nodeIdentifier, callback):
+        """
+        Remove a registered callback URI.
+
+        The returned deferred will fire with a boolean that signals wether or
+        not this was the last callback unregistered for this node.
+
+        @param service: The XMPP entity that holds the node.
+        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
+        @param nodeIdentifier: The identifier of the publish-subscribe node.
+        @type nodeIdentifier: C{unicode}.
+        @param callback: The callback URI to be unregistered.
+        @type callback: C{str}.
+        @rtype: L{Deferred<twisted.internet.defer.Deferred>}
+        """
+
+    def getCallbacks(service, nodeIdentifier):
+        """
+        Get the callbacks registered for this node.
+
+        Returns a deferred that fires with the set of HTTP callback URIs
+        registered for this node.
+
+        @param service: The XMPP entity that holds the node.
+        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
+        @param nodeIdentifier: The identifier of the publish-subscribe node.
+        @type nodeIdentifier: C{unicode}.
+        @rtype: L{Deferred<twisted.internet.defer.Deferred>}
+        """
+
+
+    def hasCallbacks(service, nodeIdentifier):
+        """
+        Return wether there are callbacks registered for a node.
+
+        @param service: The XMPP entity that holds the node.
+        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
+        @param nodeIdentifier: The identifier of the publish-subscribe node.
+        @type nodeIdentifier: C{unicode}.
+        @returns: Deferred that fires with a boolean.
+        @rtype: L{Deferred<twisted.internet.defer.Deferred>}
+        """
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat_pubsub/memory_storage.py	Thu May 17 12:48:14 2012 +0200
@@ -0,0 +1,320 @@
+# Copyright (c) 2003-2010 Ralph Meijer
+# See LICENSE for details.
+
+import copy
+from zope.interface import implements
+from twisted.internet import defer
+from twisted.words.protocols.jabber import jid
+
+from wokkel.pubsub import Subscription
+
+from idavoll import error, iidavoll
+
+class Storage:
+
+    implements(iidavoll.IStorage)
+
+    defaultConfig = {
+            'leaf': {
+                "pubsub#persist_items": True,
+                "pubsub#deliver_payloads": True,
+                "pubsub#send_last_published_item": 'on_sub',
+            },
+            'collection': {
+                "pubsub#deliver_payloads": True,
+                "pubsub#send_last_published_item": 'on_sub',
+            }
+    }
+
+    def __init__(self):
+        rootNode = CollectionNode('', jid.JID('localhost'),
+                                  copy.copy(self.defaultConfig['collection']))
+        self._nodes = {'': rootNode}
+
+
+    def getNode(self, nodeIdentifier):
+        try:
+            node = self._nodes[nodeIdentifier]
+        except KeyError:
+            return defer.fail(error.NodeNotFound())
+
+        return defer.succeed(node)
+
+
+    def getNodeIds(self):
+        return defer.succeed(self._nodes.keys())
+
+
+    def createNode(self, nodeIdentifier, owner, config):
+        if nodeIdentifier in self._nodes:
+            return defer.fail(error.NodeExists())
+
+        if config['pubsub#node_type'] != 'leaf':
+            raise error.NoCollections()
+
+        node = LeafNode(nodeIdentifier, owner, config)
+        self._nodes[nodeIdentifier] = node
+
+        return defer.succeed(None)
+
+
+    def deleteNode(self, nodeIdentifier):
+        try:
+            del self._nodes[nodeIdentifier]
+        except KeyError:
+            return defer.fail(error.NodeNotFound())
+
+        return defer.succeed(None)
+
+
+    def getAffiliations(self, entity):
+        entity = entity.userhost()
+        return defer.succeed([(node.nodeIdentifier, node._affiliations[entity])
+                              for name, node in self._nodes.iteritems()
+                              if entity in node._affiliations])
+
+
+    def getSubscriptions(self, entity):
+        subscriptions = []
+        for node in self._nodes.itervalues():
+            for subscriber, subscription in node._subscriptions.iteritems():
+                subscriber = jid.internJID(subscriber)
+                if subscriber.userhostJID() == entity.userhostJID():
+                    subscriptions.append(subscription)
+
+        return defer.succeed(subscriptions)
+
+
+    def getDefaultConfiguration(self, nodeType):
+        if nodeType == 'collection':
+            raise error.NoCollections()
+
+        return self.defaultConfig[nodeType]
+
+
+class Node:
+
+    implements(iidavoll.INode)
+
+    def __init__(self, nodeIdentifier, owner, config):
+        self.nodeIdentifier = nodeIdentifier
+        self._affiliations = {owner.userhost(): 'owner'}
+        self._subscriptions = {}
+        self._config = copy.copy(config)
+
+
+    def getType(self):
+        return self.nodeType
+
+
+    def getConfiguration(self):
+        return self._config
+
+
+    def getMetaData(self):
+        config = copy.copy(self._config)
+        config["pubsub#node_type"] = self.nodeType
+        return config
+
+
+    def setConfiguration(self, options):
+        for option in options:
+            if option in self._config:
+                self._config[option] = options[option]
+
+        return defer.succeed(None)
+
+
+    def getAffiliation(self, entity):
+        return defer.succeed(self._affiliations.get(entity.userhost()))
+
+
+    def getSubscription(self, subscriber):
+        try:
+            subscription = self._subscriptions[subscriber.full()]
+        except KeyError:
+            return defer.succeed(None)
+        else:
+            return defer.succeed(subscription)
+
+
+    def getSubscriptions(self, state=None):
+        return defer.succeed(
+                [subscription
+                 for subscription in self._subscriptions.itervalues()
+                 if state is None or subscription.state == state])
+
+
+
+    def addSubscription(self, subscriber, state, options):
+        if self._subscriptions.get(subscriber.full()):
+            return defer.fail(error.SubscriptionExists())
+
+        subscription = Subscription(self.nodeIdentifier, subscriber, state,
+                                    options)
+        self._subscriptions[subscriber.full()] = subscription
+        return defer.succeed(None)
+
+
+    def removeSubscription(self, subscriber):
+        try:
+            del self._subscriptions[subscriber.full()]
+        except KeyError:
+            return defer.fail(error.NotSubscribed())
+
+        return defer.succeed(None)
+
+
+    def isSubscribed(self, entity):
+        for subscriber, subscription in self._subscriptions.iteritems():
+            if jid.internJID(subscriber).userhost() == entity.userhost() and \
+                    subscription.state == 'subscribed':
+                return defer.succeed(True)
+
+        return defer.succeed(False)
+
+
+    def getAffiliations(self):
+        affiliations = [(jid.internJID(entity), affiliation) for entity, affiliation
+                       in self._affiliations.iteritems()]
+
+        return defer.succeed(affiliations)
+
+
+
+class PublishedItem(object):
+    """
+    A published item.
+
+    This represent an item as it was published by an entity.
+
+    @ivar element: The DOM representation of the item that was published.
+    @type element: L{Element<twisted.words.xish.domish.Element>}
+    @ivar publisher: The entity that published the item.
+    @type publisher: L{JID<twisted.words.protocols.jabber.jid.JID>}
+    """
+
+    def __init__(self, element, publisher):
+        self.element = element
+        self.publisher = publisher
+
+
+
+class LeafNode(Node):
+
+    implements(iidavoll.ILeafNode)
+
+    nodeType = 'leaf'
+
+    def __init__(self, nodeIdentifier, owner, config):
+        Node.__init__(self, nodeIdentifier, owner, config)
+        self._items = {}
+        self._itemlist = []
+
+
+    def storeItems(self, items, publisher):
+        for element in items:
+            item = PublishedItem(element, publisher)
+            itemIdentifier = element["id"]
+            if itemIdentifier in self._items:
+                self._itemlist.remove(self._items[itemIdentifier])
+            self._items[itemIdentifier] = item
+            self._itemlist.append(item)
+
+        return defer.succeed(None)
+
+
+    def removeItems(self, itemIdentifiers):
+        deleted = []
+
+        for itemIdentifier in itemIdentifiers:
+            try:
+                item = self._items[itemIdentifier]
+            except KeyError:
+                pass
+            else:
+                self._itemlist.remove(item)
+                del self._items[itemIdentifier]
+                deleted.append(itemIdentifier)
+
+        return defer.succeed(deleted)
+
+
+    def getItems(self, maxItems=None):
+        if maxItems:
+            itemList = self._itemlist[-maxItems:]
+        else:
+            itemList = self._itemlist
+        return defer.succeed([item.element for item in itemList])
+
+
+    def getItemsById(self, itemIdentifiers):
+        items = []
+        for itemIdentifier in itemIdentifiers:
+            try:
+                item = self._items[itemIdentifier]
+            except KeyError:
+                pass
+            else:
+                items.append(item.element)
+        return defer.succeed(items)
+
+
+    def purge(self):
+        self._items = {}
+        self._itemlist = []
+
+        return defer.succeed(None)
+
+
+class CollectionNode(Node):
+    nodeType = 'collection'
+
+
+
+class GatewayStorage(object):
+    """
+    Memory based storage facility for the XMPP-HTTP gateway.
+    """
+
+    def __init__(self):
+        self.callbacks = {}
+
+
+    def addCallback(self, service, nodeIdentifier, callback):
+        try:
+            callbacks = self.callbacks[service, nodeIdentifier]
+        except KeyError:
+            callbacks = set([callback])
+            self.callbacks[service, nodeIdentifier] = callbacks
+        else:
+            callbacks.add(callback)
+            pass
+
+        return defer.succeed(None)
+
+
+    def removeCallback(self, service, nodeIdentifier, callback):
+        try:
+            callbacks = self.callbacks[service, nodeIdentifier]
+            callbacks.remove(callback)
+        except KeyError:
+            return defer.fail(error.NotSubscribed())
+        else:
+            if not callbacks:
+                del self.callbacks[service, nodeIdentifier]
+
+            return defer.succeed(not callbacks)
+
+
+    def getCallbacks(self, service, nodeIdentifier):
+        try:
+            callbacks = self.callbacks[service, nodeIdentifier]
+        except KeyError:
+            return defer.fail(error.NoCallbacks())
+        else:
+            return defer.succeed(callbacks)
+
+
+    def hasCallbacks(self, service, nodeIdentifier):
+        return defer.succeed((service, nodeIdentifier) in self.callbacks)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat_pubsub/pgsql_storage.py	Thu May 17 12:48:14 2012 +0200
@@ -0,0 +1,605 @@
+# Copyright (c) 2003-2008 Ralph Meijer
+# See LICENSE for details.
+
+import copy
+
+from zope.interface import implements
+
+from twisted.enterprise import adbapi
+from twisted.words.protocols.jabber import jid
+
+from wokkel.generic import parseXml, stripNamespace
+from wokkel.pubsub import Subscription
+
+from idavoll import error, iidavoll
+
+class Storage:
+
+    implements(iidavoll.IStorage)
+
+    defaultConfig = {
+            'leaf': {
+                "pubsub#persist_items": True,
+                "pubsub#deliver_payloads": True,
+                "pubsub#send_last_published_item": 'on_sub',
+            },
+            'collection': {
+                "pubsub#deliver_payloads": True,
+                "pubsub#send_last_published_item": 'on_sub',
+            }
+    }
+
+    def __init__(self, dbpool):
+        self.dbpool = dbpool
+
+
+    def getNode(self, nodeIdentifier):
+        return self.dbpool.runInteraction(self._getNode, nodeIdentifier)
+
+
+    def _getNode(self, cursor, nodeIdentifier):
+        configuration = {}
+        cursor.execute("""SELECT node_type,
+                                 persist_items,
+                                 deliver_payloads,
+                                 send_last_published_item
+                          FROM nodes
+                          WHERE node=%s""",
+                       (nodeIdentifier,))
+        row = cursor.fetchone()
+
+        if not row:
+            raise error.NodeNotFound()
+
+        if row[0] == 'leaf':
+            configuration = {
+                    'pubsub#persist_items': row[1],
+                    'pubsub#deliver_payloads': row[2],
+                    'pubsub#send_last_published_item':
+                        row[3]}
+            node = LeafNode(nodeIdentifier, configuration)
+            node.dbpool = self.dbpool
+            return node
+        elif row[0] == 'collection':
+            configuration = {
+                    'pubsub#deliver_payloads': row[2],
+                    'pubsub#send_last_published_item':
+                        row[3]}
+            node = CollectionNode(nodeIdentifier, configuration)
+            node.dbpool = self.dbpool
+            return node
+
+
+
+    def getNodeIds(self):
+        d = self.dbpool.runQuery("""SELECT node from nodes""")
+        d.addCallback(lambda results: [r[0] for r in results])
+        return d
+
+
+    def createNode(self, nodeIdentifier, owner, config):
+        return self.dbpool.runInteraction(self._createNode, nodeIdentifier,
+                                           owner, config)
+
+
+    def _createNode(self, cursor, nodeIdentifier, owner, config):
+        if config['pubsub#node_type'] != 'leaf':
+            raise error.NoCollections()
+
+        owner = owner.userhost()
+        try:
+            cursor.execute("""INSERT INTO nodes
+                              (node, node_type, persist_items,
+                               deliver_payloads, send_last_published_item)
+                              VALUES
+                              (%s, 'leaf', %s, %s, %s)""",
+                           (nodeIdentifier,
+                            config['pubsub#persist_items'],
+                            config['pubsub#deliver_payloads'],
+                            config['pubsub#send_last_published_item'])
+                           )
+        except cursor._pool.dbapi.IntegrityError:
+            raise error.NodeExists()
+
+        cursor.execute("""SELECT 1 from entities where jid=%s""",
+                       (owner,))
+
+        if not cursor.fetchone():
+            cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
+                           (owner,))
+
+        cursor.execute("""INSERT INTO affiliations
+                          (node_id, entity_id, affiliation)
+                          SELECT node_id, entity_id, 'owner' FROM
+                          (SELECT node_id FROM nodes WHERE node=%s) as n
+                          CROSS JOIN
+                          (SELECT entity_id FROM entities
+                                            WHERE jid=%s) as e""",
+                       (nodeIdentifier, owner))
+
+
+    def deleteNode(self, nodeIdentifier):
+        return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier)
+
+
+    def _deleteNode(self, cursor, nodeIdentifier):
+        cursor.execute("""DELETE FROM nodes WHERE node=%s""",
+                       (nodeIdentifier,))
+
+        if cursor.rowcount != 1:
+            raise error.NodeNotFound()
+
+
+    def getAffiliations(self, entity):
+        d = self.dbpool.runQuery("""SELECT node, affiliation FROM entities
+                                        NATURAL JOIN affiliations
+                                        NATURAL JOIN nodes
+                                        WHERE jid=%s""",
+                                     (entity.userhost(),))
+        d.addCallback(lambda results: [tuple(r) for r in results])
+        return d
+
+
+    def getSubscriptions(self, entity):
+        def toSubscriptions(rows):
+            subscriptions = []
+            for row in rows:
+                subscriber = jid.internJID('%s/%s' % (row[1],
+                                                      row[2]))
+                subscription = Subscription(row[0], subscriber, row[3])
+                subscriptions.append(subscription)
+            return subscriptions
+
+        d = self.dbpool.runQuery("""SELECT node, jid, resource, state
+                                     FROM entities
+                                     NATURAL JOIN subscriptions
+                                     NATURAL JOIN nodes
+                                     WHERE jid=%s""",
+                                  (entity.userhost(),))
+        d.addCallback(toSubscriptions)
+        return d
+
+
+    def getDefaultConfiguration(self, nodeType):
+        return self.defaultConfig[nodeType]
+
+
+
+class Node:
+
+    implements(iidavoll.INode)
+
+    def __init__(self, nodeIdentifier, config):
+        self.nodeIdentifier = nodeIdentifier
+        self._config = config
+
+
+    def _checkNodeExists(self, cursor):
+        cursor.execute("""SELECT node_id FROM nodes WHERE node=%s""",
+                       (self.nodeIdentifier,))
+        if not cursor.fetchone():
+            raise error.NodeNotFound()
+
+
+    def getType(self):
+        return self.nodeType
+
+
+    def getConfiguration(self):
+        return self._config
+
+
+    def setConfiguration(self, options):
+        config = copy.copy(self._config)
+
+        for option in options:
+            if option in config:
+                config[option] = options[option]
+
+        d = self.dbpool.runInteraction(self._setConfiguration, config)
+        d.addCallback(self._setCachedConfiguration, config)
+        return d
+
+
+    def _setConfiguration(self, cursor, config):
+        self._checkNodeExists(cursor)
+        cursor.execute("""UPDATE nodes SET persist_items=%s,
+                                           deliver_payloads=%s,
+                                           send_last_published_item=%s
+                          WHERE node=%s""",
+                       (config["pubsub#persist_items"],
+                        config["pubsub#deliver_payloads"],
+                        config["pubsub#send_last_published_item"],
+                        self.nodeIdentifier))
+
+
+    def _setCachedConfiguration(self, void, config):
+        self._config = config
+
+
+    def getMetaData(self):
+        config = copy.copy(self._config)
+        config["pubsub#node_type"] = self.nodeType
+        return config
+
+
+    def getAffiliation(self, entity):
+        return self.dbpool.runInteraction(self._getAffiliation, entity)
+
+
+    def _getAffiliation(self, cursor, entity):
+        self._checkNodeExists(cursor)
+        cursor.execute("""SELECT affiliation FROM affiliations
+                          NATURAL JOIN nodes
+                          NATURAL JOIN entities
+                          WHERE node=%s AND jid=%s""",
+                       (self.nodeIdentifier,
+                        entity.userhost()))
+
+        try:
+            return cursor.fetchone()[0]
+        except TypeError:
+            return None
+
+
+    def getSubscription(self, subscriber):
+        return self.dbpool.runInteraction(self._getSubscription, subscriber)
+
+
+    def _getSubscription(self, cursor, subscriber):
+        self._checkNodeExists(cursor)
+
+        userhost = subscriber.userhost()
+        resource = subscriber.resource or ''
+
+        cursor.execute("""SELECT state FROM subscriptions
+                          NATURAL JOIN nodes
+                          NATURAL JOIN entities
+                          WHERE node=%s AND jid=%s AND resource=%s""",
+                       (self.nodeIdentifier,
+                        userhost,
+                        resource))
+        row = cursor.fetchone()
+        if not row:
+            return None
+        else:
+            return Subscription(self.nodeIdentifier, subscriber, row[0])
+
+
+    def getSubscriptions(self, state=None):
+        return self.dbpool.runInteraction(self._getSubscriptions, state)
+
+
+    def _getSubscriptions(self, cursor, state):
+        self._checkNodeExists(cursor)
+
+        query = """SELECT jid, resource, state,
+                          subscription_type, subscription_depth
+                   FROM subscriptions
+                   NATURAL JOIN nodes
+                   NATURAL JOIN entities
+                   WHERE node=%s""";
+        values = [self.nodeIdentifier]
+
+        if state:
+            query += " AND state=%s"
+            values.append(state)
+
+        cursor.execute(query, values);
+        rows = cursor.fetchall()
+
+        subscriptions = []
+        for row in rows:
+            subscriber = jid.JID('%s/%s' % (row[0], row[1]))
+
+            options = {}
+            if row[3]:
+                options['pubsub#subscription_type'] = row[3];
+            if row[4]:
+                options['pubsub#subscription_depth'] = row[4];
+
+            subscriptions.append(Subscription(self.nodeIdentifier, subscriber,
+                                              row[2], options))
+
+        return subscriptions
+
+
+    def addSubscription(self, subscriber, state, config):
+        return self.dbpool.runInteraction(self._addSubscription, subscriber,
+                                          state, config)
+
+
+    def _addSubscription(self, cursor, subscriber, state, config):
+        self._checkNodeExists(cursor)
+
+        userhost = subscriber.userhost()
+        resource = subscriber.resource or ''
+
+        subscription_type = config.get('pubsub#subscription_type')
+        subscription_depth = config.get('pubsub#subscription_depth')
+
+        try:
+            cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
+                           (userhost,))
+        except cursor._pool.dbapi.IntegrityError:
+            cursor._connection.rollback()
+
+        try:
+            cursor.execute("""INSERT INTO subscriptions
+                              (node_id, entity_id, resource, state,
+                               subscription_type, subscription_depth)
+                              SELECT node_id, entity_id, %s, %s, %s, %s FROM
+                              (SELECT node_id FROM nodes
+                                              WHERE node=%s) as n
+                              CROSS JOIN
+                              (SELECT entity_id FROM entities
+                                                WHERE jid=%s) as e""",
+                           (resource,
+                            state,
+                            subscription_type,
+                            subscription_depth,
+                            self.nodeIdentifier,
+                            userhost))
+        except cursor._pool.dbapi.IntegrityError:
+            raise error.SubscriptionExists()
+
+
+    def removeSubscription(self, subscriber):
+        return self.dbpool.runInteraction(self._removeSubscription,
+                                           subscriber)
+
+
+    def _removeSubscription(self, cursor, subscriber):
+        self._checkNodeExists(cursor)
+
+        userhost = subscriber.userhost()
+        resource = subscriber.resource or ''
+
+        cursor.execute("""DELETE FROM subscriptions WHERE
+                          node_id=(SELECT node_id FROM nodes
+                                                  WHERE node=%s) AND
+                          entity_id=(SELECT entity_id FROM entities
+                                                      WHERE jid=%s) AND
+                          resource=%s""",
+                       (self.nodeIdentifier,
+                        userhost,
+                        resource))
+        if cursor.rowcount != 1:
+            raise error.NotSubscribed()
+
+        return None
+
+
+    def isSubscribed(self, entity):
+        return self.dbpool.runInteraction(self._isSubscribed, entity)
+
+
+    def _isSubscribed(self, cursor, entity):
+        self._checkNodeExists(cursor)
+
+        cursor.execute("""SELECT 1 FROM entities
+                          NATURAL JOIN subscriptions
+                          NATURAL JOIN nodes
+                          WHERE entities.jid=%s
+                          AND node=%s AND state='subscribed'""",
+                       (entity.userhost(),
+                       self.nodeIdentifier))
+
+        return cursor.fetchone() is not None
+
+
+    def getAffiliations(self):
+        return self.dbpool.runInteraction(self._getAffiliations)
+
+
+    def _getAffiliations(self, cursor):
+        self._checkNodeExists(cursor)
+
+        cursor.execute("""SELECT jid, affiliation FROM nodes
+                          NATURAL JOIN affiliations
+                          NATURAL JOIN entities
+                          WHERE node=%s""",
+                       (self.nodeIdentifier,))
+        result = cursor.fetchall()
+
+        return [(jid.internJID(r[0]), r[1]) for r in result]
+
+
+
+class LeafNode(Node):
+
+    implements(iidavoll.ILeafNode)
+
+    nodeType = 'leaf'
+
+    def storeItems(self, items, publisher):
+        return self.dbpool.runInteraction(self._storeItems, items, publisher)
+
+
+    def _storeItems(self, cursor, items, publisher):
+        self._checkNodeExists(cursor)
+        for item in items:
+            self._storeItem(cursor, item, publisher)
+
+
+    def _storeItem(self, cursor, item, publisher):
+        data = item.toXml()
+        cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s
+                          FROM nodes
+                          WHERE nodes.node_id = items.node_id AND
+                                nodes.node = %s and items.item=%s""",
+                       (publisher.full(),
+                        data,
+                        self.nodeIdentifier,
+                        item["id"]))
+        if cursor.rowcount == 1:
+            return
+
+        cursor.execute("""INSERT INTO items (node_id, item, publisher, data)
+                          SELECT node_id, %s, %s, %s FROM nodes
+                                                     WHERE node=%s""",
+                       (item["id"],
+                        publisher.full(),
+                        data,
+                        self.nodeIdentifier))
+
+
+    def removeItems(self, itemIdentifiers):
+        return self.dbpool.runInteraction(self._removeItems, itemIdentifiers)
+
+
+    def _removeItems(self, cursor, itemIdentifiers):
+        self._checkNodeExists(cursor)
+
+        deleted = []
+
+        for itemIdentifier in itemIdentifiers:
+            cursor.execute("""DELETE FROM items WHERE
+                              node_id=(SELECT node_id FROM nodes
+                                                      WHERE node=%s) AND
+                              item=%s""",
+                           (self.nodeIdentifier,
+                            itemIdentifier))
+
+            if cursor.rowcount:
+                deleted.append(itemIdentifier)
+
+        return deleted
+
+
+    def getItems(self, maxItems=None):
+        return self.dbpool.runInteraction(self._getItems, maxItems)
+
+
+    def _getItems(self, cursor, maxItems):
+        self._checkNodeExists(cursor)
+        query = """SELECT data FROM nodes
+                   NATURAL JOIN items
+                   WHERE node=%s ORDER BY date DESC"""
+        if maxItems:
+            cursor.execute(query + " LIMIT %s",
+                           (self.nodeIdentifier,
+                            maxItems))
+        else:
+            cursor.execute(query, (self.nodeIdentifier,))
+
+        result = cursor.fetchall()
+        items = [stripNamespace(parseXml(r[0])) for r in result]
+        return items
+
+
+    def getItemsById(self, itemIdentifiers):
+        return self.dbpool.runInteraction(self._getItemsById, itemIdentifiers)
+
+
+    def _getItemsById(self, cursor, itemIdentifiers):
+        self._checkNodeExists(cursor)
+        items = []
+        for itemIdentifier in itemIdentifiers:
+            cursor.execute("""SELECT data FROM nodes
+                              NATURAL JOIN items
+                              WHERE node=%s AND item=%s""",
+                           (self.nodeIdentifier,
+                            itemIdentifier))
+            result = cursor.fetchone()
+            if result:
+                items.append(parseXml(result[0]))
+        return items
+
+
+    def purge(self):
+        return self.dbpool.runInteraction(self._purge)
+
+
+    def _purge(self, cursor):
+        self._checkNodeExists(cursor)
+
+        cursor.execute("""DELETE FROM items WHERE
+                          node_id=(SELECT node_id FROM nodes WHERE node=%s)""",
+                       (self.nodeIdentifier,))
+
+
+class CollectionNode(Node):
+
+    nodeType = 'collection'
+
+
+
+class GatewayStorage(object):
+    """
+    Memory based storage facility for the XMPP-HTTP gateway.
+    """
+
+    def __init__(self, dbpool):
+        self.dbpool = dbpool
+
+
+    def _countCallbacks(self, cursor, service, nodeIdentifier):
+        """
+        Count number of callbacks registered for a node.
+        """
+        cursor.execute("""SELECT count(*) FROM callbacks
+                          WHERE service=%s and node=%s""",
+                       service.full(),
+                       nodeIdentifier)
+        results = cursor.fetchall()
+        return results[0][0]
+
+
+    def addCallback(self, service, nodeIdentifier, callback):
+        def interaction(cursor):
+            cursor.execute("""SELECT 1 FROM callbacks
+                              WHERE service=%s and node=%s and uri=%s""",
+                           service.full(),
+                           nodeIdentifier,
+                           callback)
+            if cursor.fetchall():
+                return
+
+            cursor.execute("""INSERT INTO callbacks
+                              (service, node, uri) VALUES
+                              (%s, %s, %s)""",
+                           service.full(),
+                           nodeIdentifier,
+                           callback)
+
+        return self.dbpool.runInteraction(interaction)
+
+
+    def removeCallback(self, service, nodeIdentifier, callback):
+        def interaction(cursor):
+            cursor.execute("""DELETE FROM callbacks
+                              WHERE service=%s and node=%s and uri=%s""",
+                           service.full(),
+                           nodeIdentifier,
+                           callback)
+
+            if cursor.rowcount != 1:
+                raise error.NotSubscribed()
+
+            last = not self._countCallbacks(cursor, service, nodeIdentifier)
+            return last
+
+        return self.dbpool.runInteraction(interaction)
+
+    def getCallbacks(self, service, nodeIdentifier):
+        def interaction(cursor):
+            cursor.execute("""SELECT uri FROM callbacks
+                              WHERE service=%s and node=%s""",
+                           service.full(),
+                           nodeIdentifier)
+            results = cursor.fetchall()
+
+            if not results:
+                raise error.NoCallbacks()
+
+            return [result[0] for result in results]
+
+        return self.dbpool.runInteraction(interaction)
+
+
+    def hasCallbacks(self, service, nodeIdentifier):
+        def interaction(cursor):
+            return bool(self._countCallbacks(cursor, service, nodeIdentifier))
+
+        return self.dbpool.runInteraction(interaction)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat_pubsub/tap.py	Thu May 17 12:48:14 2012 +0200
@@ -0,0 +1,94 @@
+# Copyright (c) 2003-2010 Ralph Meijer
+# See LICENSE for details.
+
+from twisted.application import service
+from twisted.python import usage
+from twisted.words.protocols.jabber.jid import JID
+
+from wokkel.component import Component
+from wokkel.disco import DiscoHandler
+from wokkel.generic import FallbackHandler, VersionHandler
+from wokkel.iwokkel import IPubSubResource
+from wokkel.pubsub import PubSubService
+
+from idavoll import __version__
+from idavoll.backend import BackendService
+
+class Options(usage.Options):
+    optParameters = [
+        ('jid', None, 'pubsub', 'JID this component will be available at'),
+        ('secret', None, 'secret', 'Jabber server component secret'),
+        ('rhost', None, '127.0.0.1', 'Jabber server host'),
+        ('rport', None, '5347', 'Jabber server port'),
+        ('backend', None, 'memory', 'Choice of storage backend'),
+        ('dbuser', None, None, 'Database user (pgsql backend)'),
+        ('dbname', None, 'pubsub', 'Database name (pgsql backend)'),
+        ('dbpass', None, None, 'Database password (pgsql backend)'),
+        ('dbhost', None, None, 'Database host (pgsql backend)'),
+        ('dbport', None, None, 'Database port (pgsql backend)'),
+    ]
+
+    optFlags = [
+        ('verbose', 'v', 'Show traffic'),
+        ('hide-nodes', None, 'Hide all nodes for disco')
+    ]
+
+    def postOptions(self):
+        if self['backend'] not in ['pgsql', 'memory']:
+            raise usage.UsageError, "Unknown backend!"
+
+        self['jid'] = JID(self['jid'])
+
+
+
+def makeService(config):
+    s = service.MultiService()
+
+    # Create backend service with storage
+
+    if config['backend'] == 'pgsql':
+        from twisted.enterprise import adbapi
+        from idavoll.pgsql_storage import Storage
+        dbpool = adbapi.ConnectionPool('psycopg2',
+                                       user=config['dbuser'],
+                                       password=config['dbpass'],
+                                       database=config['dbname'],
+                                       host=config['dbhost'],
+                                       port=config['dbport'],
+                                       cp_reconnect=True,
+                                       client_encoding='utf-8',
+                                       )
+        st = Storage(dbpool)
+    elif config['backend'] == 'memory':
+        from idavoll.memory_storage import Storage
+        st = Storage()
+
+    bs = BackendService(st)
+    bs.setName('backend')
+    bs.setServiceParent(s)
+
+    # Set up XMPP server-side component with publish-subscribe capabilities
+
+    cs = Component(config["rhost"], int(config["rport"]),
+                   config["jid"].full(), config["secret"])
+    cs.setName('component')
+    cs.setServiceParent(s)
+
+    cs.factory.maxDelay = 900
+
+    if config["verbose"]:
+        cs.logTraffic = True
+
+    FallbackHandler().setHandlerParent(cs)
+    VersionHandler('Idavoll', __version__).setHandlerParent(cs)
+    DiscoHandler().setHandlerParent(cs)
+
+    resource = IPubSubResource(bs)
+    resource.hideNodes = config["hide-nodes"]
+    resource.serviceJID = config["jid"]
+
+    ps = PubSubService(resource)
+    ps.setHandlerParent(cs)
+    resource.pubsubService = ps
+
+    return s
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat_pubsub/tap_http.py	Thu May 17 12:48:14 2012 +0200
@@ -0,0 +1,97 @@
+# Copyright (c) 2003-2008 Ralph Meijer
+# See LICENSE for details.
+
+from twisted.application import internet, service, strports
+from twisted.conch import manhole, manhole_ssh
+from twisted.cred import portal, checkers
+from twisted.web2 import channel, log, resource, server
+from twisted.web2.tap import Web2Service
+
+from idavoll import gateway, tap
+from idavoll.gateway import RemoteSubscriptionService
+
+class Options(tap.Options):
+    optParameters = [
+            ('webport', None, '8086', 'Web port'),
+    ]
+
+
+
+def getManholeFactory(namespace, **passwords):
+    def getManHole(_):
+        return manhole.Manhole(namespace)
+
+    realm = manhole_ssh.TerminalRealm()
+    realm.chainedProtocolFactory.protocolFactory = getManHole
+    p = portal.Portal(realm)
+    p.registerChecker(
+            checkers.InMemoryUsernamePasswordDatabaseDontUse(**passwords))
+    f = manhole_ssh.ConchFactory(p)
+    return f
+
+
+
+def makeService(config):
+    s = tap.makeService(config)
+
+    bs = s.getServiceNamed('backend')
+    cs = s.getServiceNamed('component')
+
+    # Set up XMPP service for subscribing to remote nodes
+
+    if config['backend'] == 'pgsql':
+        from idavoll.pgsql_storage import GatewayStorage
+        gst = GatewayStorage(bs.storage.dbpool)
+    elif config['backend'] == 'memory':
+        from idavoll.memory_storage import GatewayStorage
+        gst = GatewayStorage()
+
+    ss = RemoteSubscriptionService(config['jid'], gst)
+    ss.setHandlerParent(cs)
+    ss.startService()
+
+    # Set up web service
+
+    root = resource.Resource()
+
+    # Set up resources that exposes the backend
+    root.child_create = gateway.CreateResource(bs, config['jid'],
+                                               config['jid'])
+    root.child_delete = gateway.DeleteResource(bs, config['jid'],
+                                               config['jid'])
+    root.child_publish = gateway.PublishResource(bs, config['jid'],
+                                                 config['jid'])
+    root.child_list = gateway.ListResource(bs)
+
+    # Set up resources for accessing remote pubsub nodes.
+    root.child_subscribe = gateway.RemoteSubscribeResource(ss)
+    root.child_unsubscribe = gateway.RemoteUnsubscribeResource(ss)
+    root.child_items = gateway.RemoteItemsResource(ss)
+
+    if config["verbose"]:
+        root = log.LogWrapperResource(root)
+
+    site = server.Site(root)
+    w = internet.TCPServer(int(config['webport']), channel.HTTPFactory(site))
+
+    if config["verbose"]:
+        logObserver = log.DefaultCommonAccessLoggingObserver()
+        w2s = Web2Service(logObserver)
+        w.setServiceParent(w2s)
+        w = w2s
+
+    w.setServiceParent(s)
+
+    # Set up a manhole
+
+    namespace = {'service': s,
+                 'component': cs,
+                 'backend': bs,
+                 'root': root}
+
+    f = getManholeFactory(namespace, admin='admin')
+    manholeService = strports.service('2222', f)
+    manholeService.setServiceParent(s)
+
+    return s
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat_pubsub/test/__init__.py	Thu May 17 12:48:14 2012 +0200
@@ -0,0 +1,6 @@
+# Copyright (c) 2003-2007 Ralph Meijer
+# See LICENSE for details.
+
+"""
+Tests for L{idavoll}.
+"""
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat_pubsub/test/test_backend.py	Thu May 17 12:48:14 2012 +0200
@@ -0,0 +1,611 @@
+# Copyright (c) 2003-2010 Ralph Meijer
+# See LICENSE for details.
+
+"""
+Tests for L{idavoll.backend}.
+"""
+
+from zope.interface import implements
+from zope.interface.verify import verifyObject
+
+from twisted.internet import defer
+from twisted.trial import unittest
+from twisted.words.protocols.jabber import jid
+from twisted.words.protocols.jabber.error import StanzaError
+
+from wokkel import iwokkel, pubsub
+
+from idavoll import backend, error, iidavoll
+
+OWNER = jid.JID('owner@example.com')
+OWNER_FULL = jid.JID('owner@example.com/home')
+SERVICE = jid.JID('test.example.org')
+NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
+
+class BackendTest(unittest.TestCase):
+
+    def test_interfaceIBackend(self):
+        self.assertTrue(verifyObject(iidavoll.IBackendService,
+                                     backend.BackendService(None)))
+
+
+    def test_deleteNode(self):
+        class TestNode:
+            nodeIdentifier = 'to-be-deleted'
+            def getAffiliation(self, entity):
+                if entity.userhostJID() == OWNER:
+                    return defer.succeed('owner')
+
+        class TestStorage:
+            def __init__(self):
+                self.deleteCalled = []
+
+            def getNode(self, nodeIdentifier):
+                return defer.succeed(TestNode())
+
+            def deleteNode(self, nodeIdentifier):
+                if nodeIdentifier in ['to-be-deleted']:
+                    self.deleteCalled.append(nodeIdentifier)
+                    return defer.succeed(None)
+                else:
+                    return defer.fail(error.NodeNotFound())
+
+        def preDelete(data):
+            self.assertFalse(self.storage.deleteCalled)
+            preDeleteCalled.append(data)
+            return defer.succeed(None)
+
+        def cb(result):
+            self.assertEquals(1, len(preDeleteCalled))
+            data = preDeleteCalled[-1]
+            self.assertEquals('to-be-deleted', data['nodeIdentifier'])
+            self.assertTrue(self.storage.deleteCalled)
+
+        self.storage = TestStorage()
+        self.backend = backend.BackendService(self.storage)
+
+        preDeleteCalled = []
+
+        self.backend.registerPreDelete(preDelete)
+        d = self.backend.deleteNode('to-be-deleted', OWNER_FULL)
+        d.addCallback(cb)
+        return d
+
+
+    def test_deleteNodeRedirect(self):
+        uri = 'xmpp:%s?;node=test2' % (SERVICE.full(),)
+
+        class TestNode:
+            nodeIdentifier = 'to-be-deleted'
+            def getAffiliation(self, entity):
+                if entity.userhostJID() == OWNER:
+                    return defer.succeed('owner')
+
+        class TestStorage:
+            def __init__(self):
+                self.deleteCalled = []
+
+            def getNode(self, nodeIdentifier):
+                return defer.succeed(TestNode())
+
+            def deleteNode(self, nodeIdentifier):
+                if nodeIdentifier in ['to-be-deleted']:
+                    self.deleteCalled.append(nodeIdentifier)
+                    return defer.succeed(None)
+                else:
+                    return defer.fail(error.NodeNotFound())
+
+        def preDelete(data):
+            self.assertFalse(self.storage.deleteCalled)
+            preDeleteCalled.append(data)
+            return defer.succeed(None)
+
+        def cb(result):
+            self.assertEquals(1, len(preDeleteCalled))
+            data = preDeleteCalled[-1]
+            self.assertEquals('to-be-deleted', data['nodeIdentifier'])
+            self.assertEquals(uri, data['redirectURI'])
+            self.assertTrue(self.storage.deleteCalled)
+
+        self.storage = TestStorage()
+        self.backend = backend.BackendService(self.storage)
+
+        preDeleteCalled = []
+
+        self.backend.registerPreDelete(preDelete)
+        d = self.backend.deleteNode('to-be-deleted', OWNER, redirectURI=uri)
+        d.addCallback(cb)
+        return d
+
+
+    def test_createNodeNoID(self):
+        """
+        Test creation of a node without a given node identifier.
+        """
+        class TestStorage:
+            def getDefaultConfiguration(self, nodeType):
+                return {}
+
+            def createNode(self, nodeIdentifier, requestor, config):
+                self.nodeIdentifier = nodeIdentifier
+                return defer.succeed(None)
+
+        self.storage = TestStorage()
+        self.backend = backend.BackendService(self.storage)
+        self.storage.backend = self.backend
+
+        def checkID(nodeIdentifier):
+            self.assertNotIdentical(None, nodeIdentifier)
+            self.assertIdentical(self.storage.nodeIdentifier, nodeIdentifier)
+
+        d = self.backend.createNode(None, OWNER_FULL)
+        d.addCallback(checkID)
+        return d
+
+    class NodeStore:
+        """
+        I just store nodes to pose as an L{IStorage} implementation.
+        """
+        def __init__(self, nodes):
+            self.nodes = nodes
+
+        def getNode(self, nodeIdentifier):
+            try:
+                return defer.succeed(self.nodes[nodeIdentifier])
+            except KeyError:
+                return defer.fail(error.NodeNotFound())
+
+
+    def test_getNotifications(self):
+        """
+        Ensure subscribers show up in the notification list.
+        """
+        item = pubsub.Item()
+        sub = pubsub.Subscription('test', OWNER, 'subscribed')
+
+        class TestNode:
+            def getSubscriptions(self, state=None):
+                return [sub]
+
+        def cb(result):
+            self.assertEquals(1, len(result))
+            subscriber, subscriptions, items = result[-1]
+
+            self.assertEquals(OWNER, subscriber)
+            self.assertEquals(set([sub]), subscriptions)
+            self.assertEquals([item], items)
+
+        self.storage = self.NodeStore({'test': TestNode()})
+        self.backend = backend.BackendService(self.storage)
+        d = self.backend.getNotifications('test', [item])
+        d.addCallback(cb)
+        return d
+
+    def test_getNotificationsRoot(self):
+        """
+        Ensure subscribers to the root node show up in the notification list
+        for leaf nodes.
+
+        This assumes a flat node relationship model with exactly one collection
+        node: the root node. Each leaf node is automatically a child node
+        of the root node.
+        """
+        item = pubsub.Item()
+        subRoot = pubsub.Subscription('', OWNER, 'subscribed')
+
+        class TestNode:
+            def getSubscriptions(self, state=None):
+                return []
+
+        class TestRootNode:
+            def getSubscriptions(self, state=None):
+                return [subRoot]
+
+        def cb(result):
+            self.assertEquals(1, len(result))
+            subscriber, subscriptions, items = result[-1]
+            self.assertEquals(OWNER, subscriber)
+            self.assertEquals(set([subRoot]), subscriptions)
+            self.assertEquals([item], items)
+
+        self.storage = self.NodeStore({'test': TestNode(),
+                                       '': TestRootNode()})
+        self.backend = backend.BackendService(self.storage)
+        d = self.backend.getNotifications('test', [item])
+        d.addCallback(cb)
+        return d
+
+
+    def test_getNotificationsMultipleNodes(self):
+        """
+        Ensure that entities that subscribe to a leaf node as well as the
+        root node get exactly one notification.
+        """
+        item = pubsub.Item()
+        sub = pubsub.Subscription('test', OWNER, 'subscribed')
+        subRoot = pubsub.Subscription('', OWNER, 'subscribed')
+
+        class TestNode:
+            def getSubscriptions(self, state=None):
+                return [sub]
+
+        class TestRootNode:
+            def getSubscriptions(self, state=None):
+                return [subRoot]
+
+        def cb(result):
+            self.assertEquals(1, len(result))
+            subscriber, subscriptions, items = result[-1]
+
+            self.assertEquals(OWNER, subscriber)
+            self.assertEquals(set([sub, subRoot]), subscriptions)
+            self.assertEquals([item], items)
+
+        self.storage = self.NodeStore({'test': TestNode(),
+                                       '': TestRootNode()})
+        self.backend = backend.BackendService(self.storage)
+        d = self.backend.getNotifications('test', [item])
+        d.addCallback(cb)
+        return d
+
+
+    def test_getDefaultConfiguration(self):
+        """
+        L{backend.BackendService.getDefaultConfiguration} should return
+        a deferred that fires a dictionary with configuration values.
+        """
+
+        class TestStorage:
+            def getDefaultConfiguration(self, nodeType):
+                return {
+                    "pubsub#persist_items": True,
+                    "pubsub#deliver_payloads": True}
+
+        def cb(options):
+            self.assertIn("pubsub#persist_items", options)
+            self.assertEqual(True, options["pubsub#persist_items"])
+
+        self.backend = backend.BackendService(TestStorage())
+        d = self.backend.getDefaultConfiguration('leaf')
+        d.addCallback(cb)
+        return d
+
+
+    def test_getNodeConfiguration(self):
+        class testNode:
+            nodeIdentifier = 'node'
+            def getConfiguration(self):
+                return {'pubsub#deliver_payloads': True,
+                        'pubsub#persist_items': False}
+
+        class testStorage:
+            def getNode(self, nodeIdentifier):
+                return defer.succeed(testNode())
+
+        def cb(options):
+            self.assertIn("pubsub#deliver_payloads", options)
+            self.assertEqual(True, options["pubsub#deliver_payloads"])
+            self.assertIn("pubsub#persist_items", options)
+            self.assertEqual(False, options["pubsub#persist_items"])
+
+        self.storage = testStorage()
+        self.backend = backend.BackendService(self.storage)
+        self.storage.backend = self.backend
+
+        d = self.backend.getNodeConfiguration('node')
+        d.addCallback(cb)
+        return d
+
+
+    def test_setNodeConfiguration(self):
+        class testNode:
+            nodeIdentifier = 'node'
+            def getAffiliation(self, entity):
+                if entity.userhostJID() == OWNER:
+                    return defer.succeed('owner')
+            def setConfiguration(self, options):
+                self.options = options
+
+        class testStorage:
+            def __init__(self):
+                self.nodes = {'node': testNode()}
+            def getNode(self, nodeIdentifier):
+                return defer.succeed(self.nodes[nodeIdentifier])
+
+        def checkOptions(node):
+            options = node.options
+            self.assertIn("pubsub#deliver_payloads", options)
+            self.assertEqual(True, options["pubsub#deliver_payloads"])
+            self.assertIn("pubsub#persist_items", options)
+            self.assertEqual(False, options["pubsub#persist_items"])
+
+        def cb(result):
+            d = self.storage.getNode('node')
+            d.addCallback(checkOptions)
+            return d
+
+        self.storage = testStorage()
+        self.backend = backend.BackendService(self.storage)
+        self.storage.backend = self.backend
+
+        options = {'pubsub#deliver_payloads': True,
+                   'pubsub#persist_items': False}
+
+        d = self.backend.setNodeConfiguration('node', options, OWNER_FULL)
+        d.addCallback(cb)
+        return d
+
+
+    def test_publishNoID(self):
+        """
+        Test publish request with an item without a node identifier.
+        """
+        class TestNode:
+            nodeType = 'leaf'
+            nodeIdentifier = 'node'
+            def getAffiliation(self, entity):
+                if entity.userhostJID() == OWNER:
+                    return defer.succeed('owner')
+            def getConfiguration(self):
+                return {'pubsub#deliver_payloads': True,
+                        'pubsub#persist_items': False}
+
+        class TestStorage:
+            def getNode(self, nodeIdentifier):
+                return defer.succeed(TestNode())
+
+        def checkID(notification):
+            self.assertNotIdentical(None, notification['items'][0]['id'])
+
+        self.storage = TestStorage()
+        self.backend = backend.BackendService(self.storage)
+        self.storage.backend = self.backend
+
+        self.backend.registerNotifier(checkID)
+
+        items = [pubsub.Item()]
+        d = self.backend.publish('node', items, OWNER_FULL)
+        return d
+
+
+    def test_notifyOnSubscription(self):
+        """
+        Test notification of last published item on subscription.
+        """
+        ITEM = "<item xmlns='%s' id='1'/>" % NS_PUBSUB
+
+        class TestNode:
+            implements(iidavoll.ILeafNode)
+            nodeIdentifier = 'node'
+            nodeType = 'leaf'
+            def getAffiliation(self, entity):
+                if entity is OWNER:
+                    return defer.succeed('owner')
+            def getConfiguration(self):
+                return {'pubsub#deliver_payloads': True,
+                        'pubsub#persist_items': False,
+                        'pubsub#send_last_published_item': 'on_sub'}
+            def getItems(self, maxItems):
+                return [ITEM]
+            def addSubscription(self, subscriber, state, options):
+                self.subscription = pubsub.Subscription('node', subscriber,
+                                                        state, options)
+                return defer.succeed(None)
+            def getSubscription(self, subscriber):
+                return defer.succeed(self.subscription)
+
+        class TestStorage:
+            def getNode(self, nodeIdentifier):
+                return defer.succeed(TestNode())
+
+        def cb(data):
+            self.assertEquals('node', data['nodeIdentifier'])
+            self.assertEquals([ITEM], data['items'])
+            self.assertEquals(OWNER, data['subscription'].subscriber)
+
+        self.storage = TestStorage()
+        self.backend = backend.BackendService(self.storage)
+        self.storage.backend = self.backend
+
+        d1 = defer.Deferred()
+        d1.addCallback(cb)
+        self.backend.registerNotifier(d1.callback)
+        d2 = self.backend.subscribe('node', OWNER, OWNER_FULL)
+        return defer.gatherResults([d1, d2])
+
+    test_notifyOnSubscription.timeout = 2
+
+
+
+class BaseTestBackend(object):
+    """
+    Base class for backend stubs.
+    """
+
+    def supportsPublisherAffiliation(self):
+        return True
+
+
+    def supportsOutcastAffiliation(self):
+        return True
+
+
+    def supportsPersistentItems(self):
+        return True
+
+
+    def supportsInstantNodes(self):
+        return True
+
+
+    def registerNotifier(self, observerfn, *args, **kwargs):
+        return
+
+
+    def registerPreDelete(self, preDeleteFn):
+        return
+
+
+
+class PubSubResourceFromBackendTest(unittest.TestCase):
+
+    def test_interface(self):
+        resource = backend.PubSubResourceFromBackend(BaseTestBackend())
+        self.assertTrue(verifyObject(iwokkel.IPubSubResource, resource))
+
+
+    def test_preDelete(self):
+        """
+        Test pre-delete sending out notifications to subscribers.
+        """
+
+        class TestBackend(BaseTestBackend):
+            preDeleteFn = None
+
+            def registerPreDelete(self, preDeleteFn):
+                self.preDeleteFn = preDeleteFn
+
+            def getSubscribers(self, nodeIdentifier):
+                return defer.succeed([OWNER])
+
+        def notifyDelete(service, nodeIdentifier, subscribers,
+                         redirectURI=None):
+            self.assertEqual(SERVICE, service)
+            self.assertEqual('test', nodeIdentifier)
+            self.assertEqual([OWNER], subscribers)
+            self.assertIdentical(None, redirectURI)
+            d1.callback(None)
+
+        d1 = defer.Deferred()
+        resource = backend.PubSubResourceFromBackend(TestBackend())
+        resource.serviceJID = SERVICE
+        resource.pubsubService = pubsub.PubSubService()
+        resource.pubsubService.notifyDelete = notifyDelete
+        self.assertTrue(verifyObject(iwokkel.IPubSubResource, resource))
+        self.assertNotIdentical(None, resource.backend.preDeleteFn)
+        data = {'nodeIdentifier': 'test'}
+        d2 = resource.backend.preDeleteFn(data)
+        return defer.DeferredList([d1, d2], fireOnOneErrback=1)
+
+
+    def test_preDeleteRedirect(self):
+        """
+        Test pre-delete sending out notifications to subscribers.
+        """
+
+        uri = 'xmpp:%s?;node=test2' % (SERVICE.full(),)
+
+        class TestBackend(BaseTestBackend):
+            preDeleteFn = None
+
+            def registerPreDelete(self, preDeleteFn):
+                self.preDeleteFn = preDeleteFn
+
+            def getSubscribers(self, nodeIdentifier):
+                return defer.succeed([OWNER])
+
+        def notifyDelete(service, nodeIdentifier, subscribers,
+                         redirectURI=None):
+            self.assertEqual(SERVICE, service)
+            self.assertEqual('test', nodeIdentifier)
+            self.assertEqual([OWNER], subscribers)
+            self.assertEqual(uri, redirectURI)
+            d1.callback(None)
+
+        d1 = defer.Deferred()
+        resource = backend.PubSubResourceFromBackend(TestBackend())
+        resource.serviceJID = SERVICE
+        resource.pubsubService = pubsub.PubSubService()
+        resource.pubsubService.notifyDelete = notifyDelete
+        self.assertTrue(verifyObject(iwokkel.IPubSubResource, resource))
+        self.assertNotIdentical(None, resource.backend.preDeleteFn)
+        data = {'nodeIdentifier': 'test',
+                'redirectURI': uri}
+        d2 = resource.backend.preDeleteFn(data)
+        return defer.DeferredList([d1, d2], fireOnOneErrback=1)
+
+
+    def test_unsubscribeNotSubscribed(self):
+        """
+        Test unsubscription request when not subscribed.
+        """
+
+        class TestBackend(BaseTestBackend):
+            def unsubscribe(self, nodeIdentifier, subscriber, requestor):
+                return defer.fail(error.NotSubscribed())
+
+        def cb(e):
+            self.assertEquals('unexpected-request', e.condition)
+
+        resource = backend.PubSubResourceFromBackend(TestBackend())
+        request = pubsub.PubSubRequest()
+        request.sender = OWNER
+        request.recipient = SERVICE
+        request.nodeIdentifier = 'test'
+        request.subscriber = OWNER
+        d = resource.unsubscribe(request)
+        self.assertFailure(d, StanzaError)
+        d.addCallback(cb)
+        return d
+
+
+    def test_getInfo(self):
+        """
+        Test retrieving node information.
+        """
+
+        class TestBackend(BaseTestBackend):
+            def getNodeType(self, nodeIdentifier):
+                return defer.succeed('leaf')
+
+            def getNodeMetaData(self, nodeIdentifier):
+                return defer.succeed({'pubsub#persist_items': True})
+
+        def cb(info):
+            self.assertIn('type', info)
+            self.assertEquals('leaf', info['type'])
+            self.assertIn('meta-data', info)
+            self.assertEquals({'pubsub#persist_items': True}, info['meta-data'])
+
+        resource = backend.PubSubResourceFromBackend(TestBackend())
+        d = resource.getInfo(OWNER, SERVICE, 'test')
+        d.addCallback(cb)
+        return d
+
+
+    def test_getConfigurationOptions(self):
+        class TestBackend(BaseTestBackend):
+            nodeOptions = {
+                    "pubsub#persist_items":
+                        {"type": "boolean",
+                         "label": "Persist items to storage"},
+                    "pubsub#deliver_payloads":
+                        {"type": "boolean",
+                         "label": "Deliver payloads with event notifications"}
+            }
+
+        resource = backend.PubSubResourceFromBackend(TestBackend())
+        options = resource.getConfigurationOptions()
+        self.assertIn("pubsub#persist_items", options)
+
+
+    def test_default(self):
+        class TestBackend(BaseTestBackend):
+            def getDefaultConfiguration(self, nodeType):
+                options = {"pubsub#persist_items": True,
+                           "pubsub#deliver_payloads": True,
+                           "pubsub#send_last_published_item": 'on_sub',
+                }
+                return defer.succeed(options)
+
+        def cb(options):
+            self.assertEquals(True, options["pubsub#persist_items"])
+
+        resource = backend.PubSubResourceFromBackend(TestBackend())
+        request = pubsub.PubSubRequest()
+        request.sender = OWNER
+        request.recipient = SERVICE
+        request.nodeType = 'leaf'
+        d = resource.default(request)
+        d.addCallback(cb)
+        return d
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat_pubsub/test/test_gateway.py	Thu May 17 12:48:14 2012 +0200
@@ -0,0 +1,398 @@
+# Copyright (c) 2003-2009 Ralph Meijer
+# See LICENSE for details.
+
+"""
+Tests for L{idavoll.gateway}.
+
+Note that some tests are functional tests that require a running idavoll
+service.
+"""
+
+from twisted.internet import defer
+from twisted.trial import unittest
+from twisted.web import error
+from twisted.words.xish import domish
+
+from idavoll import gateway
+
+AGENT = "Idavoll Test Script"
+NS_ATOM = "http://www.w3.org/2005/Atom"
+
+TEST_ENTRY = domish.Element((NS_ATOM, 'entry'))
+TEST_ENTRY.addElement("id",
+                      content="urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a")
+TEST_ENTRY.addElement("title", content="Atom-Powered Robots Run Amok")
+TEST_ENTRY.addElement("author").addElement("name", content="John Doe")
+TEST_ENTRY.addElement("content", content="Some text.")
+
+baseURI = "http://localhost:8086/"
+componentJID = "pubsub"
+
+class GatewayTest(unittest.TestCase):
+    timeout = 2
+
+    def setUp(self):
+        self.client = gateway.GatewayClient(baseURI)
+        self.client.startService()
+        self.addCleanup(self.client.stopService)
+
+        def trapConnectionRefused(failure):
+            from twisted.internet.error import ConnectionRefusedError
+            failure.trap(ConnectionRefusedError)
+            raise unittest.SkipTest("Gateway to test against is not available")
+
+        def trapNotFound(failure):
+            from twisted.web.error import Error
+            failure.trap(Error)
+
+        d = self.client.ping()
+        d.addErrback(trapConnectionRefused)
+        d.addErrback(trapNotFound)
+        return d
+
+
+    def tearDown(self):
+        return self.client.stopService()
+
+
+    def test_create(self):
+
+        def cb(response):
+            self.assertIn('uri', response)
+
+        d = self.client.create()
+        d.addCallback(cb)
+        return d
+
+    def test_publish(self):
+
+        def cb(response):
+            self.assertIn('uri', response)
+
+        d = self.client.publish(TEST_ENTRY)
+        d.addCallback(cb)
+        return d
+
+    def test_publishExistingNode(self):
+
+        def cb2(response, xmppURI):
+            self.assertEquals(xmppURI, response['uri'])
+
+        def cb1(response):
+            xmppURI = response['uri']
+            d = self.client.publish(TEST_ENTRY, xmppURI)
+            d.addCallback(cb2, xmppURI)
+            return d
+
+        d = self.client.create()
+        d.addCallback(cb1)
+        return d
+
+    def test_publishNonExisting(self):
+        def cb(err):
+            self.assertEqual('404', err.status)
+
+        d = self.client.publish(TEST_ENTRY, 'xmpp:%s?node=test' % componentJID)
+        self.assertFailure(d, error.Error)
+        d.addCallback(cb)
+        return d
+
+    def test_delete(self):
+        def cb(response):
+            xmppURI = response['uri']
+            d = self.client.delete(xmppURI)
+            return d
+
+        d = self.client.create()
+        d.addCallback(cb)
+        return d
+
+    def test_deleteWithRedirect(self):
+        def cb(response):
+            xmppURI = response['uri']
+            redirectURI = 'xmpp:%s?node=test' % componentJID
+            d = self.client.delete(xmppURI, redirectURI)
+            return d
+
+        d = self.client.create()
+        d.addCallback(cb)
+        return d
+
+    def test_deleteNotification(self):
+        def onNotification(data, headers):
+            try:
+                self.assertTrue(headers.hasHeader('Event'))
+                self.assertEquals(['DELETED'], headers.getRawHeaders('Event'))
+                self.assertFalse(headers.hasHeader('Link'))
+            except:
+                self.client.deferred.errback()
+            else:
+                self.client.deferred.callback(None)
+
+        def cb(response):
+            xmppURI = response['uri']
+            d = self.client.subscribe(xmppURI)
+            d.addCallback(lambda _: xmppURI)
+            return d
+
+        def cb2(xmppURI):
+            d = self.client.delete(xmppURI)
+            return d
+
+        self.client.callback = onNotification
+        self.client.deferred = defer.Deferred()
+        d = self.client.create()
+        d.addCallback(cb)
+        d.addCallback(cb2)
+        return defer.gatherResults([d, self.client.deferred])
+
+    def test_deleteNotificationWithRedirect(self):
+        redirectURI = 'xmpp:%s?node=test' % componentJID
+
+        def onNotification(data, headers):
+            try:
+                self.assertTrue(headers.hasHeader('Event'))
+                self.assertEquals(['DELETED'], headers.getRawHeaders('Event'))
+                self.assertEquals(['<%s>; rel=alternate' % redirectURI],
+                                  headers.getRawHeaders('Link'))
+            except:
+                self.client.deferred.errback()
+            else:
+                self.client.deferred.callback(None)
+
+        def cb(response):
+            xmppURI = response['uri']
+            d = self.client.subscribe(xmppURI)
+            d.addCallback(lambda _: xmppURI)
+            return d
+
+        def cb2(xmppURI):
+            d = self.client.delete(xmppURI, redirectURI)
+            return d
+
+        self.client.callback = onNotification
+        self.client.deferred = defer.Deferred()
+        d = self.client.create()
+        d.addCallback(cb)
+        d.addCallback(cb2)
+        return defer.gatherResults([d, self.client.deferred])
+
+    def test_list(self):
+        d = self.client.listNodes()
+        return d
+
+    def test_subscribe(self):
+        def cb(response):
+            xmppURI = response['uri']
+            d = self.client.subscribe(xmppURI)
+            return d
+
+        d = self.client.create()
+        d.addCallback(cb)
+        return d
+
+    def test_subscribeGetNotification(self):
+
+        def onNotification(data, headers):
+            self.client.deferred.callback(None)
+
+        def cb(response):
+            xmppURI = response['uri']
+            d = self.client.subscribe(xmppURI)
+            d.addCallback(lambda _: xmppURI)
+            return d
+
+        def cb2(xmppURI):
+            d = self.client.publish(TEST_ENTRY, xmppURI)
+            return d
+
+
+        self.client.callback = onNotification
+        self.client.deferred = defer.Deferred()
+        d = self.client.create()
+        d.addCallback(cb)
+        d.addCallback(cb2)
+        return defer.gatherResults([d, self.client.deferred])
+
+
+    def test_subscribeTwiceGetNotification(self):
+
+        def onNotification1(data, headers):
+            d = client1.stopService()
+            d.chainDeferred(client1.deferred)
+
+        def onNotification2(data, headers):
+            d = client2.stopService()
+            d.chainDeferred(client2.deferred)
+
+        def cb(response):
+            xmppURI = response['uri']
+            d = client1.subscribe(xmppURI)
+            d.addCallback(lambda _: xmppURI)
+            return d
+
+        def cb2(xmppURI):
+            d = client2.subscribe(xmppURI)
+            d.addCallback(lambda _: xmppURI)
+            return d
+
+        def cb3(xmppURI):
+            d = self.client.publish(TEST_ENTRY, xmppURI)
+            return d
+
+
+        client1 = gateway.GatewayClient(baseURI, callbackPort=8088)
+        client1.startService()
+        client1.callback = onNotification1
+        client1.deferred = defer.Deferred()
+        client2 = gateway.GatewayClient(baseURI, callbackPort=8089)
+        client2.startService()
+        client2.callback = onNotification2
+        client2.deferred = defer.Deferred()
+
+        d = self.client.create()
+        d.addCallback(cb)
+        d.addCallback(cb2)
+        d.addCallback(cb3)
+        dl = defer.gatherResults([d, client1.deferred, client2.deferred])
+        return dl
+
+
+    def test_subscribeGetDelayedNotification(self):
+
+        def onNotification(data, headers):
+            self.client.deferred.callback(None)
+
+        def cb(response):
+            xmppURI = response['uri']
+            self.assertNot(self.client.deferred.called)
+            d = self.client.publish(TEST_ENTRY, xmppURI)
+            d.addCallback(lambda _: xmppURI)
+            return d
+
+        def cb2(xmppURI):
+            d = self.client.subscribe(xmppURI)
+            return d
+
+
+        self.client.callback = onNotification
+        self.client.deferred = defer.Deferred()
+        d = self.client.create()
+        d.addCallback(cb)
+        d.addCallback(cb2)
+        return defer.gatherResults([d, self.client.deferred])
+
+    def test_subscribeGetDelayedNotification2(self):
+        """
+        Test that subscribing as second results in a notification being sent.
+        """
+
+        def onNotification1(data, headers):
+            client1.deferred.callback(None)
+            client1.stopService()
+
+        def onNotification2(data, headers):
+            client2.deferred.callback(None)
+            client2.stopService()
+
+        def cb(response):
+            xmppURI = response['uri']
+            self.assertNot(client1.deferred.called)
+            self.assertNot(client2.deferred.called)
+            d = self.client.publish(TEST_ENTRY, xmppURI)
+            d.addCallback(lambda _: xmppURI)
+            return d
+
+        def cb2(xmppURI):
+            d = client1.subscribe(xmppURI)
+            d.addCallback(lambda _: xmppURI)
+            return d
+
+        def cb3(xmppURI):
+            d = client2.subscribe(xmppURI)
+            return d
+
+        client1 = gateway.GatewayClient(baseURI, callbackPort=8088)
+        client1.startService()
+        client1.callback = onNotification1
+        client1.deferred = defer.Deferred()
+        client2 = gateway.GatewayClient(baseURI, callbackPort=8089)
+        client2.startService()
+        client2.callback = onNotification2
+        client2.deferred = defer.Deferred()
+
+
+        d = self.client.create()
+        d.addCallback(cb)
+        d.addCallback(cb2)
+        d.addCallback(cb3)
+        dl = defer.gatherResults([d, client1.deferred, client2.deferred])
+        return dl
+
+
+    def test_subscribeNonExisting(self):
+        def cb(err):
+            self.assertEqual('403', err.status)
+
+        d = self.client.subscribe('xmpp:%s?node=test' % componentJID)
+        self.assertFailure(d, error.Error)
+        d.addCallback(cb)
+        return d
+
+
+    def test_subscribeRootGetNotification(self):
+
+        def onNotification(data, headers):
+            self.client.deferred.callback(None)
+
+        def cb(response):
+            xmppURI = response['uri']
+            jid, nodeIdentifier = gateway.getServiceAndNode(xmppURI)
+            rootNode = gateway.getXMPPURI(jid, '')
+
+            d = self.client.subscribe(rootNode)
+            d.addCallback(lambda _: xmppURI)
+            return d
+
+        def cb2(xmppURI):
+            return self.client.publish(TEST_ENTRY, xmppURI)
+
+
+        self.client.callback = onNotification
+        self.client.deferred = defer.Deferred()
+        d = self.client.create()
+        d.addCallback(cb)
+        d.addCallback(cb2)
+        return defer.gatherResults([d, self.client.deferred])
+
+
+    def test_unsubscribeNonExisting(self):
+        def cb(err):
+            self.assertEqual('403', err.status)
+
+        d = self.client.unsubscribe('xmpp:%s?node=test' % componentJID)
+        self.assertFailure(d, error.Error)
+        d.addCallback(cb)
+        return d
+
+
+    def test_items(self):
+        def cb(response):
+            xmppURI = response['uri']
+            d = self.client.items(xmppURI)
+            return d
+
+        d = self.client.publish(TEST_ENTRY)
+        d.addCallback(cb)
+        return d
+
+
+    def test_itemsMaxItems(self):
+        def cb(response):
+            xmppURI = response['uri']
+            d = self.client.items(xmppURI, 2)
+            return d
+
+        d = self.client.publish(TEST_ENTRY)
+        d.addCallback(cb)
+        return d
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat_pubsub/test/test_storage.py	Thu May 17 12:48:14 2012 +0200
@@ -0,0 +1,585 @@
+# Copyright (c) 2003-2010 Ralph Meijer
+# See LICENSE for details.
+
+"""
+Tests for L{idavoll.memory_storage} and L{idavoll.pgsql_storage}.
+"""
+
+from zope.interface.verify import verifyObject
+from twisted.trial import unittest
+from twisted.words.protocols.jabber import jid
+from twisted.internet import defer
+from twisted.words.xish import domish
+
+from idavoll import error, iidavoll
+
+OWNER = jid.JID('owner@example.com/Work')
+SUBSCRIBER = jid.JID('subscriber@example.com/Home')
+SUBSCRIBER_NEW = jid.JID('new@example.com/Home')
+SUBSCRIBER_TO_BE_DELETED = jid.JID('to_be_deleted@example.com/Home')
+SUBSCRIBER_PENDING = jid.JID('pending@example.com/Home')
+PUBLISHER = jid.JID('publisher@example.com')
+ITEM = domish.Element((None, 'item'))
+ITEM['id'] = 'current'
+ITEM.addElement(('testns', 'test'), content=u'Test \u2083 item')
+ITEM_NEW = domish.Element((None, 'item'))
+ITEM_NEW['id'] = 'new'
+ITEM_NEW.addElement(('testns', 'test'), content=u'Test \u2083 item')
+ITEM_UPDATED = domish.Element((None, 'item'))
+ITEM_UPDATED['id'] = 'current'
+ITEM_UPDATED.addElement(('testns', 'test'), content=u'Test \u2084 item')
+ITEM_TO_BE_DELETED = domish.Element((None, 'item'))
+ITEM_TO_BE_DELETED['id'] = 'to-be-deleted'
+ITEM_TO_BE_DELETED.addElement(('testns', 'test'), content=u'Test \u2083 item')
+
+def decode(object):
+    if isinstance(object, str):
+        object = object.decode('utf-8')
+    return object
+
+
+
+class StorageTests:
+
+    def _assignTestNode(self, node):
+        self.node = node
+
+
+    def setUp(self):
+        d = self.s.getNode('pre-existing')
+        d.addCallback(self._assignTestNode)
+        return d
+
+
+    def test_interfaceIStorage(self):
+        self.assertTrue(verifyObject(iidavoll.IStorage, self.s))
+
+
+    def test_interfaceINode(self):
+        self.assertTrue(verifyObject(iidavoll.INode, self.node))
+
+
+    def test_interfaceILeafNode(self):
+        self.assertTrue(verifyObject(iidavoll.ILeafNode, self.node))
+
+
+    def test_getNode(self):
+        return self.s.getNode('pre-existing')
+
+
+    def test_getNonExistingNode(self):
+        d = self.s.getNode('non-existing')
+        self.assertFailure(d, error.NodeNotFound)
+        return d
+
+
+    def test_getNodeIDs(self):
+        def cb(nodeIdentifiers):
+            self.assertIn('pre-existing', nodeIdentifiers)
+            self.assertNotIn('non-existing', nodeIdentifiers)
+
+        return self.s.getNodeIds().addCallback(cb)
+
+
+    def test_createExistingNode(self):
+        config = self.s.getDefaultConfiguration('leaf')
+        config['pubsub#node_type'] = 'leaf'
+        d = self.s.createNode('pre-existing', OWNER, config)
+        self.assertFailure(d, error.NodeExists)
+        return d
+
+
+    def test_createNode(self):
+        def cb(void):
+            d = self.s.getNode('new 1')
+            return d
+
+        config = self.s.getDefaultConfiguration('leaf')
+        config['pubsub#node_type'] = 'leaf'
+        d = self.s.createNode('new 1', OWNER, config)
+        d.addCallback(cb)
+        return d
+
+
+    def test_createNodeChangingConfig(self):
+        """
+        The configuration passed to createNode must be free to be changed.
+        """
+        def cb(result):
+            node1, node2 = result
+            self.assertTrue(node1.getConfiguration()['pubsub#persist_items'])
+
+        config = {
+                "pubsub#persist_items": True,
+                "pubsub#deliver_payloads": True,
+                "pubsub#send_last_published_item": 'on_sub',
+                "pubsub#node_type": 'leaf',
+                }
+
+        def unsetPersistItems(_):
+            config["pubsub#persist_items"] = False
+
+        d = defer.succeed(None)
+        d.addCallback(lambda _: self.s.createNode('new 1', OWNER, config))
+        d.addCallback(unsetPersistItems)
+        d.addCallback(lambda _: self.s.createNode('new 2', OWNER, config))
+        d.addCallback(lambda _: defer.gatherResults([
+                                    self.s.getNode('new 1'),
+                                    self.s.getNode('new 2')]))
+        d.addCallback(cb)
+        return d
+
+
+    def test_deleteNonExistingNode(self):
+        d = self.s.deleteNode('non-existing')
+        self.assertFailure(d, error.NodeNotFound)
+        return d
+
+
+    def test_deleteNode(self):
+        def cb(void):
+            d = self.s.getNode('to-be-deleted')
+            self.assertFailure(d, error.NodeNotFound)
+            return d
+
+        d = self.s.deleteNode('to-be-deleted')
+        d.addCallback(cb)
+        return d
+
+
+    def test_getAffiliations(self):
+        def cb(affiliations):
+            self.assertIn(('pre-existing', 'owner'), affiliations)
+
+        d = self.s.getAffiliations(OWNER)
+        d.addCallback(cb)
+        return d
+
+
+    def test_getSubscriptions(self):
+        def cb(subscriptions):
+            found = False
+            for subscription in subscriptions:
+                if (subscription.nodeIdentifier == 'pre-existing' and
+                    subscription.subscriber == SUBSCRIBER and
+                    subscription.state == 'subscribed'):
+                    found = True
+            self.assertTrue(found)
+
+        d = self.s.getSubscriptions(SUBSCRIBER)
+        d.addCallback(cb)
+        return d
+
+
+    # Node tests
+
+    def test_getType(self):
+        self.assertEqual(self.node.getType(), 'leaf')
+
+
+    def test_getConfiguration(self):
+        config = self.node.getConfiguration()
+        self.assertIn('pubsub#persist_items', config.iterkeys())
+        self.assertIn('pubsub#deliver_payloads', config.iterkeys())
+        self.assertEqual(config['pubsub#persist_items'], True)
+        self.assertEqual(config['pubsub#deliver_payloads'], True)
+
+
+    def test_setConfiguration(self):
+        def getConfig(node):
+            d = node.setConfiguration({'pubsub#persist_items': False})
+            d.addCallback(lambda _: node)
+            return d
+
+        def checkObjectConfig(node):
+            config = node.getConfiguration()
+            self.assertEqual(config['pubsub#persist_items'], False)
+
+        def getNode(void):
+            return self.s.getNode('to-be-reconfigured')
+
+        def checkStorageConfig(node):
+            config = node.getConfiguration()
+            self.assertEqual(config['pubsub#persist_items'], False)
+
+        d = self.s.getNode('to-be-reconfigured')
+        d.addCallback(getConfig)
+        d.addCallback(checkObjectConfig)
+        d.addCallback(getNode)
+        d.addCallback(checkStorageConfig)
+        return d
+
+
+    def test_getMetaData(self):
+        metaData = self.node.getMetaData()
+        for key, value in self.node.getConfiguration().iteritems():
+            self.assertIn(key, metaData.iterkeys())
+            self.assertEqual(value, metaData[key])
+        self.assertIn('pubsub#node_type', metaData.iterkeys())
+        self.assertEqual(metaData['pubsub#node_type'], 'leaf')
+
+
+    def test_getAffiliation(self):
+        def cb(affiliation):
+            self.assertEqual(affiliation, 'owner')
+
+        d = self.node.getAffiliation(OWNER)
+        d.addCallback(cb)
+        return d
+
+
+    def test_getNonExistingAffiliation(self):
+        def cb(affiliation):
+            self.assertEqual(affiliation, None)
+
+        d = self.node.getAffiliation(SUBSCRIBER)
+        d.addCallback(cb)
+        return d
+
+
+    def test_addSubscription(self):
+        def cb1(void):
+            return self.node.getSubscription(SUBSCRIBER_NEW)
+
+        def cb2(subscription):
+            self.assertEqual(subscription.state, 'pending')
+
+        d = self.node.addSubscription(SUBSCRIBER_NEW, 'pending', {})
+        d.addCallback(cb1)
+        d.addCallback(cb2)
+        return d
+
+
+    def test_addExistingSubscription(self):
+        d = self.node.addSubscription(SUBSCRIBER, 'pending', {})
+        self.assertFailure(d, error.SubscriptionExists)
+        return d
+
+
+    def test_getSubscription(self):
+        def cb(subscriptions):
+            self.assertEquals(subscriptions[0].state, 'subscribed')
+            self.assertEquals(subscriptions[1].state, 'pending')
+            self.assertEquals(subscriptions[2], None)
+
+        d = defer.gatherResults([self.node.getSubscription(SUBSCRIBER),
+                                 self.node.getSubscription(SUBSCRIBER_PENDING),
+                                 self.node.getSubscription(OWNER)])
+        d.addCallback(cb)
+        return d
+
+
+    def test_removeSubscription(self):
+        return self.node.removeSubscription(SUBSCRIBER_TO_BE_DELETED)
+
+
+    def test_removeNonExistingSubscription(self):
+        d = self.node.removeSubscription(OWNER)
+        self.assertFailure(d, error.NotSubscribed)
+        return d
+
+
+    def test_getNodeSubscriptions(self):
+        def extractSubscribers(subscriptions):
+            return [subscription.subscriber for subscription in subscriptions]
+
+        def cb(subscribers):
+            self.assertIn(SUBSCRIBER, subscribers)
+            self.assertNotIn(SUBSCRIBER_PENDING, subscribers)
+            self.assertNotIn(OWNER, subscribers)
+
+        d = self.node.getSubscriptions('subscribed')
+        d.addCallback(extractSubscribers)
+        d.addCallback(cb)
+        return d
+
+
+    def test_isSubscriber(self):
+        def cb(subscribed):
+            self.assertEquals(subscribed[0][1], True)
+            self.assertEquals(subscribed[1][1], True)
+            self.assertEquals(subscribed[2][1], False)
+            self.assertEquals(subscribed[3][1], False)
+
+        d = defer.DeferredList([self.node.isSubscribed(SUBSCRIBER),
+                                self.node.isSubscribed(SUBSCRIBER.userhostJID()),
+                                self.node.isSubscribed(SUBSCRIBER_PENDING),
+                                self.node.isSubscribed(OWNER)])
+        d.addCallback(cb)
+        return d
+
+
+    def test_storeItems(self):
+        def cb1(void):
+            return self.node.getItemsById(['new'])
+
+        def cb2(result):
+            self.assertEqual(ITEM_NEW.toXml(), result[0].toXml())
+
+        d = self.node.storeItems([ITEM_NEW], PUBLISHER)
+        d.addCallback(cb1)
+        d.addCallback(cb2)
+        return d
+
+
+    def test_storeUpdatedItems(self):
+        def cb1(void):
+            return self.node.getItemsById(['current'])
+
+        def cb2(result):
+            self.assertEqual(ITEM_UPDATED.toXml(), result[0].toXml())
+
+        d = self.node.storeItems([ITEM_UPDATED], PUBLISHER)
+        d.addCallback(cb1)
+        d.addCallback(cb2)
+        return d
+
+
+    def test_removeItems(self):
+        def cb1(result):
+            self.assertEqual(['to-be-deleted'], result)
+            return self.node.getItemsById(['to-be-deleted'])
+
+        def cb2(result):
+            self.assertEqual(0, len(result))
+
+        d = self.node.removeItems(['to-be-deleted'])
+        d.addCallback(cb1)
+        d.addCallback(cb2)
+        return d
+
+
+    def test_removeNonExistingItems(self):
+        def cb(result):
+            self.assertEqual([], result)
+
+        d = self.node.removeItems(['non-existing'])
+        d.addCallback(cb)
+        return d
+
+
+    def test_getItems(self):
+        def cb(result):
+            items = [item.toXml() for item in result]
+            self.assertIn(ITEM.toXml(), items)
+
+        d = self.node.getItems()
+        d.addCallback(cb)
+        return d
+
+
+    def test_lastItem(self):
+        def cb(result):
+            self.assertEqual(1, len(result))
+            self.assertEqual(ITEM.toXml(), result[0].toXml())
+
+        d = self.node.getItems(1)
+        d.addCallback(cb)
+        return d
+
+
+    def test_getItemsById(self):
+        def cb(result):
+            self.assertEqual(1, len(result))
+
+        d = self.node.getItemsById(['current'])
+        d.addCallback(cb)
+        return d
+
+
+    def test_getNonExistingItemsById(self):
+        def cb(result):
+            self.assertEqual(0, len(result))
+
+        d = self.node.getItemsById(['non-existing'])
+        d.addCallback(cb)
+        return d
+
+
+    def test_purge(self):
+        def cb1(node):
+            d = node.purge()
+            d.addCallback(lambda _: node)
+            return d
+
+        def cb2(node):
+            return node.getItems()
+
+        def cb3(result):
+            self.assertEqual([], result)
+
+        d = self.s.getNode('to-be-purged')
+        d.addCallback(cb1)
+        d.addCallback(cb2)
+        d.addCallback(cb3)
+        return d
+
+
+    def test_getNodeAffilatiations(self):
+        def cb1(node):
+            return node.getAffiliations()
+
+        def cb2(affiliations):
+            affiliations = dict(((a[0].full(), a[1]) for a in affiliations))
+            self.assertEquals(affiliations[OWNER.userhost()], 'owner')
+
+        d = self.s.getNode('pre-existing')
+        d.addCallback(cb1)
+        d.addCallback(cb2)
+        return d
+
+
+
+class MemoryStorageStorageTestCase(unittest.TestCase, StorageTests):
+
+    def setUp(self):
+        from idavoll.memory_storage import Storage, PublishedItem, LeafNode
+        from idavoll.memory_storage import Subscription
+
+        defaultConfig = Storage.defaultConfig['leaf']
+
+        self.s = Storage()
+        self.s._nodes['pre-existing'] = \
+                LeafNode('pre-existing', OWNER, defaultConfig)
+        self.s._nodes['to-be-deleted'] = \
+                LeafNode('to-be-deleted', OWNER, None)
+        self.s._nodes['to-be-reconfigured'] = \
+                LeafNode('to-be-reconfigured', OWNER, defaultConfig)
+        self.s._nodes['to-be-purged'] = \
+                LeafNode('to-be-purged', OWNER, None)
+
+        subscriptions = self.s._nodes['pre-existing']._subscriptions
+        subscriptions[SUBSCRIBER.full()] = Subscription('pre-existing',
+                                                        SUBSCRIBER,
+                                                        'subscribed')
+        subscriptions[SUBSCRIBER_TO_BE_DELETED.full()] = \
+                Subscription('pre-existing', SUBSCRIBER_TO_BE_DELETED,
+                             'subscribed')
+        subscriptions[SUBSCRIBER_PENDING.full()] = \
+                Subscription('pre-existing', SUBSCRIBER_PENDING,
+                             'pending')
+
+        item = PublishedItem(ITEM_TO_BE_DELETED, PUBLISHER)
+        self.s._nodes['pre-existing']._items['to-be-deleted'] = item
+        self.s._nodes['pre-existing']._itemlist.append(item)
+        self.s._nodes['to-be-purged']._items['to-be-deleted'] = item
+        self.s._nodes['to-be-purged']._itemlist.append(item)
+        item = PublishedItem(ITEM, PUBLISHER)
+        self.s._nodes['pre-existing']._items['current'] = item
+        self.s._nodes['pre-existing']._itemlist.append(item)
+
+        return StorageTests.setUp(self)
+
+
+
+class PgsqlStorageStorageTestCase(unittest.TestCase, StorageTests):
+
+    dbpool = None
+
+    def setUp(self):
+        from idavoll.pgsql_storage import Storage
+        from twisted.enterprise import adbapi
+        if self.dbpool is None:
+            self.__class__.dbpool = adbapi.ConnectionPool('psycopg2',
+                                            database='pubsub_test',
+                                            cp_reconnect=True,
+                                            client_encoding='utf-8',
+                                            )
+        self.s = Storage(self.dbpool)
+        self.dbpool.start()
+        d = self.dbpool.runInteraction(self.init)
+        d.addCallback(lambda _: StorageTests.setUp(self))
+        return d
+
+
+    def tearDown(self):
+        return self.dbpool.runInteraction(self.cleandb)
+
+
+    def init(self, cursor):
+        self.cleandb(cursor)
+        cursor.execute("""INSERT INTO nodes
+                          (node, node_type, persist_items)
+                          VALUES ('pre-existing', 'leaf', TRUE)""")
+        cursor.execute("""INSERT INTO nodes (node) VALUES ('to-be-deleted')""")
+        cursor.execute("""INSERT INTO nodes (node) VALUES ('to-be-reconfigured')""")
+        cursor.execute("""INSERT INTO nodes (node) VALUES ('to-be-purged')""")
+        cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
+                       (OWNER.userhost(),))
+        cursor.execute("""INSERT INTO affiliations
+                          (node_id, entity_id, affiliation)
+                          SELECT node_id, entity_id, 'owner'
+                          FROM nodes, entities
+                          WHERE node='pre-existing' AND jid=%s""",
+                       (OWNER.userhost(),))
+        cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
+                       (SUBSCRIBER.userhost(),))
+        cursor.execute("""INSERT INTO subscriptions
+                          (node_id, entity_id, resource, state)
+                          SELECT node_id, entity_id, %s, 'subscribed'
+                          FROM nodes, entities
+                          WHERE node='pre-existing' AND jid=%s""",
+                       (SUBSCRIBER.resource,
+                        SUBSCRIBER.userhost()))
+        cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
+                       (SUBSCRIBER_TO_BE_DELETED.userhost(),))
+        cursor.execute("""INSERT INTO subscriptions
+                          (node_id, entity_id, resource, state)
+                          SELECT node_id, entity_id, %s, 'subscribed'
+                          FROM nodes, entities
+                          WHERE node='pre-existing' AND jid=%s""",
+                       (SUBSCRIBER_TO_BE_DELETED.resource,
+                        SUBSCRIBER_TO_BE_DELETED.userhost()))
+        cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
+                       (SUBSCRIBER_PENDING.userhost(),))
+        cursor.execute("""INSERT INTO subscriptions
+                          (node_id, entity_id, resource, state)
+                          SELECT node_id, entity_id, %s, 'pending'
+                          FROM nodes, entities
+                          WHERE node='pre-existing' AND jid=%s""",
+                       (SUBSCRIBER_PENDING.resource,
+                        SUBSCRIBER_PENDING.userhost()))
+        cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
+                       (PUBLISHER.userhost(),))
+        cursor.execute("""INSERT INTO items
+                          (node_id, publisher, item, data, date)
+                          SELECT node_id, %s, 'to-be-deleted', %s,
+                                 now() - interval '1 day'
+                          FROM nodes
+                          WHERE node='pre-existing'""",
+                       (PUBLISHER.userhost(),
+                        ITEM_TO_BE_DELETED.toXml()))
+        cursor.execute("""INSERT INTO items (node_id, publisher, item, data)
+                          SELECT node_id, %s, 'to-be-deleted', %s
+                          FROM nodes
+                          WHERE node='to-be-purged'""",
+                       (PUBLISHER.userhost(),
+                        ITEM_TO_BE_DELETED.toXml()))
+        cursor.execute("""INSERT INTO items (node_id, publisher, item, data)
+                          SELECT node_id, %s, 'current', %s
+                          FROM nodes
+                          WHERE node='pre-existing'""",
+                       (PUBLISHER.userhost(),
+                        ITEM.toXml()))
+
+
+    def cleandb(self, cursor):
+        cursor.execute("""DELETE FROM nodes WHERE node in
+                          ('non-existing', 'pre-existing', 'to-be-deleted',
+                           'new 1', 'new 2', 'new 3', 'to-be-reconfigured',
+                           'to-be-purged')""")
+        cursor.execute("""DELETE FROM entities WHERE jid=%s""",
+                       (OWNER.userhost(),))
+        cursor.execute("""DELETE FROM entities WHERE jid=%s""",
+                       (SUBSCRIBER.userhost(),))
+        cursor.execute("""DELETE FROM entities WHERE jid=%s""",
+                       (SUBSCRIBER_TO_BE_DELETED.userhost(),))
+        cursor.execute("""DELETE FROM entities WHERE jid=%s""",
+                       (SUBSCRIBER_PENDING.userhost(),))
+        cursor.execute("""DELETE FROM entities WHERE jid=%s""",
+                       (PUBLISHER.userhost(),))
+
+try:
+    import psycopg2
+except ImportError:
+    PgsqlStorageStorageTestCase.skip = "Psycopg2 not available"