changeset 204:b4bf0a5ce50d

Implement storage facilities for the HTTP gateway. Author: ralphm. Fixes #12. One of the storage facilities is PostgreSQL based, providing persistence.
author Ralph Meijer <ralphm@ik.nu>
date Wed, 16 Jul 2008 06:38:32 +0000
parents 2c46e6664680
children e6b710bf2b24
files idavoll/error.py idavoll/gateway.py idavoll/iidavoll.py idavoll/memory_storage.py idavoll/pgsql_storage.py idavoll/tap.py idavoll/tap_http.py idavoll/test/test_gateway.py idavoll/test/test_storage.py
diffstat 9 files changed, 366 insertions(+), 90 deletions(-) [+]
line wrap: on
line diff
--- a/idavoll/error.py	Mon Jul 14 09:16:16 2008 +0000
+++ b/idavoll/error.py	Wed Jul 16 06:38:32 2008 +0000
@@ -16,14 +16,19 @@
     pass
 
 
+
 class NotSubscribed(Error):
     """
     Entity is not subscribed to this node.
     """
 
 
+
 class SubscriptionExists(Error):
-    pass
+    """
+    There already exists a subscription to this node.
+    """
+
 
 
 class Forbidden(Error):
@@ -56,3 +61,10 @@
 
 class NoRootNode(Error):
     pass
+
+
+
+class NoCallbacks(Error):
+    """
+    There are no callbacks for this node.
+    """
--- a/idavoll/gateway.py	Mon Jul 14 09:16:16 2008 +0000
+++ b/idavoll/gateway.py	Wed Jul 16 06:38:32 2008 +0000
@@ -352,29 +352,34 @@
     to with the received items in notifications.
     """
 
-    def __init__(self, jid):
+    def __init__(self, jid, storage):
         self.jid = jid
-
-
-    def startService(self):
-        self.callbacks = {}
+        self.storage = storage
 
 
     def trapNotFound(self, failure):
         failure.trap(StanzaError)
-        if not failure.value.condition == 'item-not-found':
-            raise failure
-        raise error.NodeNotFound
+
+        if failure.value.condition == 'item-not-found':
+            raise error.NodeNotFound()
+        else:
+            return failure
 
 
     def subscribeCallback(self, jid, nodeIdentifier, callback):
+        """
+        Subscribe a callback URI.
 
-        def newCallbackList(result):
-            callbackList = set()
-            self.callbacks[jid, nodeIdentifier] = callbackList
-            return callbackList
+        This registers a callback URI to be called when a notification is
+        received for the given node.
 
-        def callbackForLastItem(items, callback):
+        If this is the first callback registered for this node, the gateway
+        will subscribe to the node. Otherwise, the most recently published item
+        for this node is retrieved and, if present, the newly registered
+        callback will be called with that item.
+        """
+
+        def callbackForLastItem(items):
             atomEntries = extractAtomEntries(items)
 
             if not atomEntries:
@@ -383,32 +388,38 @@
             self._postTo([callback], jid, nodeIdentifier, atomEntries[0],
                          'application/atom+xml;type=entry')
 
-        try:
-            callbackList = self.callbacks[jid, nodeIdentifier]
-        except KeyError:
-            d = self.subscribe(jid, nodeIdentifier, self.jid)
-            d.addCallback(newCallbackList)
-        else:
-            d = self.items(jid, nodeIdentifier, 1)
-            d.addCallback(callbackForLastItem, callback)
-            d.addCallback(lambda _: callbackList)
+        def subscribeOrItems(hasCallbacks):
+            if hasCallbacks:
+                d = self.items(jid, nodeIdentifier, 1)
+                d.addCallback(callbackForLastItem)
+            else:
+                d = self.subscribe(jid, nodeIdentifier, self.jid)
 
-        d.addCallback(lambda callbackList: callbackList.add(callback))
-        d.addErrback(self.trapNotFound)
+            d.addErrback(self.trapNotFound)
+            return d
+
+        d = self.storage.hasCallbacks(jid, nodeIdentifier)
+        d.addCallback(subscribeOrItems)
+        d.addCallback(lambda _: self.storage.addCallback(jid, nodeIdentifier,
+                                                         callback))
         return d
 
 
     def unsubscribeCallback(self, jid, nodeIdentifier, callback):
-        try:
-            callbackList = self.callbacks[jid, nodeIdentifier]
-            callbackList.remove(callback)
-        except KeyError:
-            return defer.fail(error.NotSubscribed())
+        """
+        Unsubscribe a callback.
+
+        If this was the last registered callback for this node, the
+        gateway will unsubscribe from node.
+        """
 
-        if not callbackList:
-            self.unsubscribe(jid, nodeIdentifier, self.jid)
+        def cb(last):
+            if last:
+                return self.unsubscribe(jid, nodeIdentifier, self.jid)
 
-        return defer.succeed(None)
+        d = self.storage.removeCallback(jid, nodeIdentifier, callback)
+        d.addCallback(cb)
+        return d
 
 
     def itemsReceived(self, event):
@@ -446,6 +457,10 @@
 
     def _postTo(self, callbacks, service, nodeIdentifier,
                       payload=None, contentType=None, eventType=None):
+
+        if not callbacks:
+            return
+
         postdata = None
         nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier)
         headers = {'Referer': nodeURI.encode('utf-8'),
@@ -469,16 +484,22 @@
         for callbackURI in callbacks:
             reactor.callLater(0, postNotification, callbackURI)
 
+
     def callCallbacks(self, service, nodeIdentifier,
                             payload=None, contentType=None, eventType=None):
-        try:
-            callbacks = self.callbacks[service, nodeIdentifier]
-        except KeyError:
-            return
+
+        def eb(failure):
+            failure.trap(error.NoCallbacks)
 
-        self._postTo(callbacks, service, nodeIdentifier, payload, contentType,
-                     eventType)
+            # No callbacks were registered for this node. Unsubscribe.
+            d = self.unsubscribe(service, nodeIdentifier, self.jid)
+            return d
 
+        d = self.storage.getCallbacks(service, nodeIdentifier)
+        d.addCallback(self._postTo, service, nodeIdentifier, payload,
+                                    contentType, eventType)
+        d.addErrback(eb)
+        d.addErrback(log.err)
 
 
 
--- a/idavoll/iidavoll.py	Mon Jul 14 09:16:16 2008 +0000
+++ b/idavoll/iidavoll.py	Wed Jul 16 06:38:32 2008 +0000
@@ -466,3 +466,66 @@
 
         @return: deferred that fires when the node has been purged.
         """
+
+
+
+class IGatewayStorage(Interface):
+
+    def addCallback(service, nodeIdentifier, callback):
+        """
+        Register a callback URI.
+
+        The registered HTTP callback URI will have an Atom Entry documented
+        POSTed to it upon receiving a notification for the given pubsub node.
+
+        @param service: The XMPP entity that holds the node.
+        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
+        @param nodeIdentifier: The identifier of the publish-subscribe node.
+        @type nodeIdentifier: C{unicode}.
+        @param callback: The callback URI to be registered.
+        @type callback: C{str}.
+        @rtype: L{Deferred<twisted.internet.defer.Deferred>}
+        """
+
+    def removeCallback(service, nodeIdentifier, callback):
+        """
+        Remove a registered callback URI.
+
+        The returned deferred will fire with a boolean that signals wether or
+        not this was the last callback unregistered for this node.
+
+        @param service: The XMPP entity that holds the node.
+        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
+        @param nodeIdentifier: The identifier of the publish-subscribe node.
+        @type nodeIdentifier: C{unicode}.
+        @param callback: The callback URI to be unregistered.
+        @type callback: C{str}.
+        @rtype: L{Deferred<twisted.internet.defer.Deferred>}
+        """
+
+    def getCallbacks(service, nodeIdentifier):
+        """
+        Get the callbacks registered for this node.
+
+        Returns a deferred that fires with the set of HTTP callback URIs
+        registered for this node.
+
+        @param service: The XMPP entity that holds the node.
+        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
+        @param nodeIdentifier: The identifier of the publish-subscribe node.
+        @type nodeIdentifier: C{unicode}.
+        @rtype: L{Deferred<twisted.internet.defer.Deferred>}
+        """
+
+
+    def hasCallbacks(service, nodeIdentifier):
+        """
+        Return wether there are callbacks registered for a node.
+
+        @param service: The XMPP entity that holds the node.
+        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
+        @param nodeIdentifier: The identifier of the publish-subscribe node.
+        @type nodeIdentifier: C{unicode}.
+        @returns: Deferred that fires with a boolean.
+        @rtype: L{Deferred<twisted.internet.defer.Deferred>}
+        """
--- a/idavoll/memory_storage.py	Mon Jul 14 09:16:16 2008 +0000
+++ b/idavoll/memory_storage.py	Wed Jul 16 06:38:32 2008 +0000
@@ -266,3 +266,53 @@
 
     def __init__(self, state):
         self.state = state
+
+
+
+class GatewayStorage(object):
+    """
+    Memory based storage facility for the XMPP-HTTP gateway.
+    """
+
+    def __init__(self):
+        self.callbacks = {}
+
+
+    def addCallback(self, service, nodeIdentifier, callback):
+        try:
+            callbacks = self.callbacks[service, nodeIdentifier]
+        except KeyError:
+            callbacks = set([callback])
+            self.callbacks[service, nodeIdentifier] = callbacks
+        else:
+            callbacks.add(callback)
+            pass
+
+        print self.callbacks
+        return defer.succeed(None)
+
+
+    def removeCallback(self, service, nodeIdentifier, callback):
+        try:
+            callbacks = self.callbacks[service, nodeIdentifier]
+            callbacks.remove(callback)
+        except KeyError:
+            return defer.fail(error.NotSubscribed())
+        else:
+            if not callbacks:
+                del self.callbacks[service, nodeIdentifier]
+
+            return defer.succeed(not callbacks)
+
+
+    def getCallbacks(self, service, nodeIdentifier):
+        try:
+            callbacks = self.callbacks[service, nodeIdentifier]
+        except KeyError:
+            return defer.fail(error.NoCallbacks())
+        else:
+            return defer.succeed(callbacks)
+
+
+    def hasCallbacks(self, service, nodeIdentifier):
+        return defer.succeed((service, nodeIdentifier) in self.callbacks)
--- a/idavoll/pgsql_storage.py	Mon Jul 14 09:16:16 2008 +0000
+++ b/idavoll/pgsql_storage.py	Wed Jul 16 06:38:32 2008 +0000
@@ -4,10 +4,7 @@
 import copy
 
 from zope.interface import implements
-
-from twisted.enterprise import adbapi
 from twisted.words.protocols.jabber import jid
-
 from wokkel.generic import parseXml
 
 from idavoll import error, iidavoll
@@ -16,20 +13,13 @@
 
     implements(iidavoll.IStorage)
 
-    def __init__(self, user, database, password=None, host=None, port=None):
-        self._dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL',
-                                             user=user,
-                                             password=password,
-                                             database=database,
-                                             host=host,
-                                             port=port,
-                                             cp_reconnect=True,
-                                             client_encoding='utf-8'
-                                             )
+
+    def __init__(self, dbpool):
+        self.dbpool = dbpool
 
 
     def getNode(self, nodeIdentifier):
-        return self._dbpool.runInteraction(self._getNode, nodeIdentifier)
+        return self.dbpool.runInteraction(self._getNode, nodeIdentifier)
 
 
     def _getNode(self, cursor, nodeIdentifier):
@@ -48,18 +38,18 @@
             raise error.NodeNotFound()
         else:
             node = LeafNode(nodeIdentifier, configuration)
-            node._dbpool = self._dbpool
+            node.dbpool = self.dbpool
             return node
 
 
     def getNodeIds(self):
-        d = self._dbpool.runQuery("""SELECT node from nodes""")
+        d = self.dbpool.runQuery("""SELECT node from nodes""")
         d.addCallback(lambda results: [r[0] for r in results])
         return d
 
 
     def createNode(self, nodeIdentifier, owner, config=None):
-        return self._dbpool.runInteraction(self._createNode, nodeIdentifier,
+        return self.dbpool.runInteraction(self._createNode, nodeIdentifier,
                                            owner)
 
 
@@ -88,7 +78,7 @@
 
 
     def deleteNode(self, nodeIdentifier):
-        return self._dbpool.runInteraction(self._deleteNode, nodeIdentifier)
+        return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier)
 
 
     def _deleteNode(self, cursor, nodeIdentifier):
@@ -100,7 +90,7 @@
 
 
     def getAffiliations(self, entity):
-        d = self._dbpool.runQuery("""SELECT node, affiliation FROM entities
+        d = self.dbpool.runQuery("""SELECT node, affiliation FROM entities
                                         JOIN affiliations ON
                                         (affiliations.entity_id=entities.id)
                                         JOIN nodes ON
@@ -112,7 +102,7 @@
 
 
     def getSubscriptions(self, entity):
-        d = self._dbpool.runQuery("""SELECT node, jid, resource, subscription
+        d = self.dbpool.runQuery("""SELECT node, jid, resource, subscription
                                      FROM entities JOIN subscriptions ON
                                      (subscriptions.entity_id=entities.id)
                                      JOIN nodes ON
@@ -162,7 +152,7 @@
             if option in config:
                 config[option] = options[option]
 
-        d = self._dbpool.runInteraction(self._setConfiguration, config)
+        d = self.dbpool.runInteraction(self._setConfiguration, config)
         d.addCallback(self._setCachedConfiguration, config)
         return d
 
@@ -189,7 +179,7 @@
 
 
     def getAffiliation(self, entity):
-        return self._dbpool.runInteraction(self._getAffiliation, entity)
+        return self.dbpool.runInteraction(self._getAffiliation, entity)
 
 
     def _getAffiliation(self, cursor, entity):
@@ -208,7 +198,7 @@
 
 
     def getSubscription(self, subscriber):
-        return self._dbpool.runInteraction(self._getSubscription, subscriber)
+        return self.dbpool.runInteraction(self._getSubscription, subscriber)
 
 
     def _getSubscription(self, cursor, subscriber):
@@ -232,7 +222,7 @@
 
 
     def addSubscription(self, subscriber, state):
-        return self._dbpool.runInteraction(self._addSubscription, subscriber,
+        return self.dbpool.runInteraction(self._addSubscription, subscriber,
                                           state)
 
 
@@ -264,7 +254,7 @@
 
 
     def removeSubscription(self, subscriber):
-        return self._dbpool.runInteraction(self._removeSubscription,
+        return self.dbpool.runInteraction(self._removeSubscription,
                                            subscriber)
 
 
@@ -288,7 +278,7 @@
 
 
     def getSubscribers(self):
-        d = self._dbpool.runInteraction(self._getSubscribers)
+        d = self.dbpool.runInteraction(self._getSubscribers)
         d.addCallback(self._convertToJIDs)
         return d
 
@@ -309,7 +299,7 @@
 
 
     def isSubscribed(self, entity):
-        return self._dbpool.runInteraction(self._isSubscribed, entity)
+        return self.dbpool.runInteraction(self._isSubscribed, entity)
 
 
     def _isSubscribed(self, cursor, entity):
@@ -329,7 +319,7 @@
 
 
     def getAffiliations(self):
-        return self._dbpool.runInteraction(self._getAffiliations)
+        return self.dbpool.runInteraction(self._getAffiliations)
 
 
     def _getAffiliations(self, cursor):
@@ -353,7 +343,7 @@
     nodeType = 'leaf'
 
     def storeItems(self, items, publisher):
-        return self._dbpool.runInteraction(self._storeItems, items, publisher)
+        return self.dbpool.runInteraction(self._storeItems, items, publisher)
 
 
     def _storeItems(self, cursor, items, publisher):
@@ -384,7 +374,7 @@
 
 
     def removeItems(self, itemIdentifiers):
-        return self._dbpool.runInteraction(self._removeItems, itemIdentifiers)
+        return self.dbpool.runInteraction(self._removeItems, itemIdentifiers)
 
 
     def _removeItems(self, cursor, itemIdentifiers):
@@ -406,7 +396,7 @@
 
 
     def getItems(self, maxItems=None):
-        return self._dbpool.runInteraction(self._getItems, maxItems)
+        return self.dbpool.runInteraction(self._getItems, maxItems)
 
 
     def _getItems(self, cursor, maxItems):
@@ -426,7 +416,7 @@
 
 
     def getItemsById(self, itemIdentifiers):
-        return self._dbpool.runInteraction(self._getItemsById, itemIdentifiers)
+        return self.dbpool.runInteraction(self._getItemsById, itemIdentifiers)
 
 
     def _getItemsById(self, cursor, itemIdentifiers):
@@ -445,7 +435,7 @@
 
 
     def purge(self):
-        return self._dbpool.runInteraction(self._purge)
+        return self.dbpool.runInteraction(self._purge)
 
 
     def _purge(self, cursor):
@@ -460,3 +450,84 @@
 class LeafNode(Node, LeafNodeMixin):
 
     implements(iidavoll.ILeafNode)
+
+
+
+class GatewayStorage(object):
+    """
+    Memory based storage facility for the XMPP-HTTP gateway.
+    """
+
+    def __init__(self, dbpool):
+        self.dbpool = dbpool
+
+
+    def _countCallbacks(self, cursor, service, nodeIdentifier):
+        """
+        Count number of callbacks registered for a node.
+        """
+        cursor.execute("""SELECT count(*) FROM callbacks
+                          WHERE service=%s and node=%s""",
+                       service.full(),
+                       nodeIdentifier)
+        results = cursor.fetchall()
+        return results[0][0]
+
+
+    def addCallback(self, service, nodeIdentifier, callback):
+        def interaction(cursor):
+            cursor.execute("""SELECT 1 FROM callbacks
+                              WHERE service=%s and node=%s and uri=%s""",
+                           service.full(),
+                           nodeIdentifier,
+                           callback)
+            if cursor.fetchall():
+                raise error.SubscriptionExists()
+
+            cursor.execute("""INSERT INTO callbacks
+                              (service, node, uri) VALUES
+                              (%s, %s, %s)""",
+                           service.full(),
+                           nodeIdentifier,
+                           callback)
+
+        return self.dbpool.runInteraction(interaction)
+
+
+    def removeCallback(self, service, nodeIdentifier, callback):
+        def interaction(cursor):
+            cursor.execute("""DELETE FROM callbacks
+                              WHERE service=%s and node=%s and uri=%s""",
+                           service.full(),
+                           nodeIdentifier,
+                           callback)
+
+            if cursor.rowcount != 1:
+                raise error.NotSubscribed()
+
+            last = not self._countCallbacks(cursor, service, nodeIdentifier)
+            return last
+
+        return self.dbpool.runInteraction(interaction)
+
+    def getCallbacks(self, service, nodeIdentifier):
+        def interaction(cursor):
+            cursor.execute("""SELECT uri FROM callbacks
+                              WHERE service=%s and node=%s""",
+                           service.full(),
+                           nodeIdentifier)
+            results = cursor.fetchall()
+
+            if not results:
+                raise error.NoCallbacks()
+
+            return [result[0] for result in results]
+
+        return self.dbpool.runInteraction(interaction)
+
+
+    def hasCallbacks(self, service, nodeIdentifier):
+        def interaction(cursor):
+            return bool(self._countCallbacks(cursor, service, nodeIdentifier))
+
+        return self.dbpool.runInteraction(interaction)
--- a/idavoll/tap.py	Mon Jul 14 09:16:16 2008 +0000
+++ b/idavoll/tap.py	Wed Jul 16 06:38:32 2008 +0000
@@ -45,12 +45,18 @@
     # Create backend service with storage
 
     if config['backend'] == 'pgsql':
+        from twisted.enterprise import adbapi
         from idavoll.pgsql_storage import Storage
-        st = Storage(user=config['dbuser'],
-                     database=config['dbname'],
-                     password=config['dbpass'],
-                     host=config['dbhost'],
-                     port=config['dbport'])
+        dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL',
+                                       user=config['dbuser'],
+                                       password=config['dbuser'],
+                                       database=config['dbname'],
+                                       host=config['dbpass'],
+                                       port=config['dbport'],
+                                       cp_reconnect=True,
+                                       client_encoding='utf-8',
+                                       )
+        st = Storage(dbpool)
     elif config['backend'] == 'memory':
         from idavoll.memory_storage import Storage
         st = Storage()
--- a/idavoll/tap_http.py	Mon Jul 14 09:16:16 2008 +0000
+++ b/idavoll/tap_http.py	Wed Jul 16 06:38:32 2008 +0000
@@ -36,7 +36,14 @@
 
     # Set up XMPP service for subscribing to remote nodes
 
-    ss = RemoteSubscriptionService(config['jid'])
+    if config['backend'] == 'pgsql':
+        from idavoll.pgsql_storage import GatewayStorage
+        gst = GatewayStorage(bs.storage.dbpool)
+    elif config['backend'] == 'memory':
+        from idavoll.memory_storage import GatewayStorage
+        gst = GatewayStorage()
+
+    ss = RemoteSubscriptionService(config['jid'], gst)
     ss.setHandlerParent(cs)
     ss.startService()
 
--- a/idavoll/test/test_gateway.py	Mon Jul 14 09:16:16 2008 +0000
+++ b/idavoll/test/test_gateway.py	Wed Jul 16 06:38:32 2008 +0000
@@ -35,7 +35,7 @@
         self.client.startService()
 
     def tearDown(self):
-        self.client.stopService()
+        return self.client.stopService()
 
     def test_create(self):
 
@@ -93,6 +93,7 @@
         d.addCallback(cb)
         return d
 
+
     def test_subscribeGetNotification(self):
 
         def onNotification(data, headers):
@@ -116,6 +117,50 @@
         d.addCallback(cb2)
         return defer.gatherResults([d, self.client.deferred])
 
+
+    def test_subscribeTwiceGetNotification(self):
+
+        def onNotification1(data, headers):
+            d = client1.stopService()
+            d.chainDeferred(client1.deferred)
+
+        def onNotification2(data, headers):
+            d = client2.stopService()
+            d.chainDeferred(client2.deferred)
+
+        def cb(response):
+            xmppURI = response['uri']
+            d = client1.subscribe(xmppURI)
+            d.addCallback(lambda _: xmppURI)
+            return d
+
+        def cb2(xmppURI):
+            d = client2.subscribe(xmppURI)
+            d.addCallback(lambda _: xmppURI)
+            return d
+
+        def cb3(xmppURI):
+            d = self.client.publish(entry, xmppURI)
+            return d
+
+
+        client1 = gateway.GatewayClient(baseURI, callbackPort=8088)
+        client1.startService()
+        client1.callback = onNotification1
+        client1.deferred = defer.Deferred()
+        client2 = gateway.GatewayClient(baseURI, callbackPort=8089)
+        client2.startService()
+        client2.callback = onNotification2
+        client2.deferred = defer.Deferred()
+
+        d = self.client.create()
+        d.addCallback(cb)
+        d.addCallback(cb2)
+        d.addCallback(cb3)
+        dl = defer.gatherResults([d, client1.deferred, client2.deferred])
+        return dl
+
+
     def test_subscribeGetDelayedNotification(self):
 
         def onNotification(data, headers):
@@ -179,7 +224,6 @@
         client2.callback = onNotification2
         client2.deferred = defer.Deferred()
 
-
         d = self.client.create()
         d.addCallback(cb)
         d.addCallback(cb2)
--- a/idavoll/test/test_storage.py	Mon Jul 14 09:16:16 2008 +0000
+++ b/idavoll/test/test_storage.py	Wed Jul 16 06:38:32 2008 +0000
@@ -414,22 +414,24 @@
 
 
 class PgsqlStorageStorageTestCase(unittest.TestCase, StorageTests):
-    def _callSuperSetUp(self, void):
-        return StorageTests.setUp(self)
-
 
     def setUp(self):
         from idavoll.pgsql_storage import Storage
-        self.s = Storage('ralphm', 'pubsub_test')
-        self.s._dbpool.start()
-        d = self.s._dbpool.runInteraction(self.init)
-        d.addCallback(self._callSuperSetUp)
+        from twisted.enterprise import adbapi
+        self.dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL',
+                                            database='pubsub_test',
+                                            cp_reconnect=True,
+                                            client_encoding='utf-8',
+                                            )
+        self.s = Storage(self.dbpool)
+        self.dbpool.start()
+        d = self.dbpool.runInteraction(self.init)
+        d.addCallback(lambda _: StorageTests.setUp(self))
         return d
 
 
-    def tearDownClass(self):
-        #return self.s._dbpool.runInteraction(self.cleandb)
-        pass
+    def tearDown(self):
+        return self.dbpool.runInteraction(self.cleandb)
 
 
     def init(self, cursor):