# HG changeset patch # User Ralph Meijer # Date 1112978020 0 # Node ID e7cfe05bc1d2774552d1b104ebec8d9d6aeda50f # Parent dfef919aaf1b18db22a12e6698eccce97a275c81 Initial revision. diff -r dfef919aaf1b -r e7cfe05bc1d2 idavoll/test/__init__.py diff -r dfef919aaf1b -r e7cfe05bc1d2 idavoll/test/test_storage.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/idavoll/test/test_storage.py Fri Apr 08 16:33:40 2005 +0000 @@ -0,0 +1,120 @@ +from twisted.trial import unittest +from twisted.trial.assertions import * +from twisted.words.protocols.jabber import jid + +from idavoll import storage + +OWNER = jid.JID('owner@example.com') +SUBSCRIBER = jid.JID('subscriber@example.com/Home') + +class StorageTests: + + def testGetNode(self): + return self.s.get_node('pre-existing') + + def testGetNonExistingNode(self): + d = self.s.get_node('non-existing') + assertFailure(d, storage.NodeNotFound) + return d + + def testGetNodeIDs(self): + def cb(node_ids): + assertIn('pre-existing', node_ids) + assertNotIn('non-existing', node_ids) + + return self.s.get_node_ids().addCallback(cb) + + def testCreateExistingNode(self): + d = self.s.create_node('pre-existing', OWNER) + assertFailure(d, storage.NodeExists) + return d + + def testCreateNode(self): + def cb(void): + d = self.s.get_node('new 1') + return d + + d = self.s.create_node('new 1', OWNER) + d.addCallback(cb) + return d + + def testDeleteNonExistingNode(self): + d = self.s.delete_node('non-existing') + assertFailure(d, storage.NodeNotFound) + return d + + def testDeleteNode(self): + def cb(void): + d = self.s.get_node('to-be-deleted') + assertFailure(d, storage.NodeNotFound) + return d + + d = self.s.delete_node('to-be-deleted') + d.addCallback(cb) + return d + + def testGetAffiliations(self): + def cb(affiliations): + assertIn(('pre-existing', 'owner'), affiliations) + + d = self.s.get_affiliations(OWNER) + d.addCallback(cb) + return d + + def testGetSubscriptions(self): + def cb(subscriptions): + assertIn(('pre-existing', SUBSCRIBER, 'subscribed'), subscriptions) + + d = self.s.get_subscriptions(SUBSCRIBER) + d.addCallback(cb) + return d + +class MemoryStorageStorageTestCase(unittest.TestCase, StorageTests): + + def setUpClass(self): + from idavoll.memory_storage import Storage, LeafNode, Subscription + self.s = Storage() + self.s._nodes['pre-existing'] = LeafNode('pre-existing', OWNER, None) + self.s._nodes['to-be-deleted'] = LeafNode('to-be-deleted', OWNER, None) + self.s._nodes['pre-existing']._subscriptions[SUBSCRIBER.full()] = \ + Subscription('subscribed') + +class PgsqlStorageStorageTestCase(unittest.TestCase, StorageTests): + def setUpClass(self): + from idavoll.pgsql_storage import Storage + self.s = Storage('ralphm', 'pubsub_test') + self.s._dbpool.start() + return self.s._dbpool.runInteraction(self.init) + + def tearDownClass(self): + return self.s._dbpool.runInteraction(self.cleandb) + + def init(self, cursor): + self.cleandb(cursor) + cursor.execute("""INSERT INTO nodes (node) VALUES ('pre-existing')""") + cursor.execute("""INSERT INTO nodes (node) VALUES ('to-be-deleted')""") + cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", + OWNER.userhost().encode('utf-8')) + cursor.execute("""INSERT INTO affiliations + (node_id, entity_id, affiliation) + SELECT nodes.id, entities.id, 'owner' + FROM nodes, entities + WHERE node='pre-existing' AND jid=%s""", + OWNER.userhost().encode('utf-8')) + cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", + SUBSCRIBER.userhost().encode('utf-8')) + cursor.execute("""INSERT INTO subscriptions + (node_id, entity_id, resource, subscription) + SELECT nodes.id, entities.id, %s, 'subscribed' + FROM nodes, entities + WHERE node='pre-existing' AND jid=%s""", + (SUBSCRIBER.resource.encode('utf-8'), + SUBSCRIBER.userhost().encode('utf-8'))) + + def cleandb(self, cursor): + cursor.execute("""DELETE FROM nodes WHERE node in + ('pre-existing', 'new 1', 'new 2', 'new 3')""") + cursor.execute("""DELETE FROM entities WHERE jid=%s""", + OWNER.userhost().encode('utf-8')) + cursor.execute("""DELETE FROM entities WHERE jid=%s""", + SUBSCRIBER.userhost().encode('utf-8'))