view idavoll/test/test_storage.py @ 187:69cdd8c6a431

Make sure second subscribers through HTTP also get a notification of the last item.
author Ralph Meijer <ralphm@ik.nu>
date Thu, 17 Apr 2008 16:02:22 +0000
parents bc269696ef42
children e404775b12df
line wrap: on
line source

# Copyright (c) 2003-2008 Ralph Meijer
# See LICENSE for details.

"""
Tests for L{idavoll.memory_storage} and L{idavoll.pgsql_storage}.
"""

from twisted.trial import unittest
from twisted.words.protocols.jabber import jid
from twisted.internet import defer
from twisted.words.xish import domish

from wokkel import pubsub

from idavoll import error

OWNER = jid.JID('owner@example.com')
SUBSCRIBER = jid.JID('subscriber@example.com/Home')
SUBSCRIBER_NEW = jid.JID('new@example.com/Home')
SUBSCRIBER_TO_BE_DELETED = jid.JID('to_be_deleted@example.com/Home')
SUBSCRIBER_PENDING = jid.JID('pending@example.com/Home')
PUBLISHER = jid.JID('publisher@example.com')
ITEM = domish.Element((pubsub.NS_PUBSUB, 'item'), pubsub.NS_PUBSUB)
ITEM['id'] = 'current'
ITEM.addElement(('testns', 'test'), content=u'Test \u2083 item')
ITEM_NEW = domish.Element((pubsub.NS_PUBSUB, 'item'), pubsub.NS_PUBSUB)
ITEM_NEW['id'] = 'new'
ITEM_NEW.addElement(('testns', 'test'), content=u'Test \u2083 item')
ITEM_UPDATED = domish.Element((pubsub.NS_PUBSUB, 'item'), pubsub.NS_PUBSUB)
ITEM_UPDATED['id'] = 'current'
ITEM_UPDATED.addElement(('testns', 'test'), content=u'Test \u2084 item')
ITEM_TO_BE_DELETED = domish.Element((pubsub.NS_PUBSUB, 'item'),
                                    pubsub.NS_PUBSUB)
ITEM_TO_BE_DELETED['id'] = 'to-be-deleted'
ITEM_TO_BE_DELETED.addElement(('testns', 'test'), content=u'Test \u2083 item')

def decode(object):
    if isinstance(object, str):
        object = object.decode('utf-8')
    return object


class StorageTests:

    def _assignTestNode(self, node):
        self.node = node

    def setUp(self):
        d = self.s.get_node('pre-existing')
        d.addCallback(self._assignTestNode)
        return d

    def testGetNode(self):
        return self.s.get_node('pre-existing')

    def testGetNonExistingNode(self):
        d = self.s.get_node('non-existing')
        self.assertFailure(d, error.NodeNotFound)
        return d

    def testGetNodeIDs(self):
        def cb(node_ids):
            self.assertIn('pre-existing', node_ids)
            self.assertNotIn('non-existing', node_ids)

        return self.s.get_node_ids().addCallback(cb)

    def testCreateExistingNode(self):
        d = self.s.create_node('pre-existing', OWNER)
        self.assertFailure(d, error.NodeExists)
        return d

    def testCreateNode(self):
        def cb(void):
            d = self.s.get_node('new 1')
            return d

        d = self.s.create_node('new 1', OWNER)
        d.addCallback(cb)
        return d

    def testDeleteNonExistingNode(self):
        d = self.s.delete_node('non-existing')
        self.assertFailure(d, error.NodeNotFound)
        return d

    def testDeleteNode(self):
        def cb(void):
            d = self.s.get_node('to-be-deleted')
            self.assertFailure(d, error.NodeNotFound)
            return d

        d = self.s.delete_node('to-be-deleted')
        d.addCallback(cb)
        return d

    def testGetAffiliations(self):
        def cb(affiliations):
            self.assertIn(('pre-existing', 'owner'), affiliations)

        d = self.s.get_affiliations(OWNER)
        d.addCallback(cb)
        return d

    def testGetSubscriptions(self):
        def cb(subscriptions):
            self.assertIn(('pre-existing', SUBSCRIBER, 'subscribed'), subscriptions)

        d = self.s.get_subscriptions(SUBSCRIBER)
        d.addCallback(cb)
        return d

    # Node tests

    def testGetType(self):
        self.assertEqual(self.node.get_type(), 'leaf')

    def testGetConfiguration(self):
        config = self.node.get_configuration()
        self.assertIn('pubsub#persist_items', config.iterkeys())
        self.assertIn('pubsub#deliver_payloads', config.iterkeys())
        self.assertEqual(config['pubsub#persist_items'], True)
        self.assertEqual(config['pubsub#deliver_payloads'], True)

    def testSetConfiguration(self):
        def get_config(node):
            d = node.set_configuration({'pubsub#persist_items': False})
            d.addCallback(lambda _: node)
            return d

        def check_object_config(node):
            config = node.get_configuration()
            self.assertEqual(config['pubsub#persist_items'], False)

        def get_node(void):
            return self.s.get_node('to-be-reconfigured')

        def check_storage_config(node):
            config = node.get_configuration()
            self.assertEqual(config['pubsub#persist_items'], False)

        d = self.s.get_node('to-be-reconfigured')
        d.addCallback(get_config)
        d.addCallback(check_object_config)
        d.addCallback(get_node)
        d.addCallback(check_storage_config)
        return d

    def testGetMetaData(self):
        meta_data = self.node.get_meta_data()
        for key, value in self.node.get_configuration().iteritems():
            self.assertIn(key, meta_data.iterkeys())
            self.assertEqual(value, meta_data[key])
        self.assertIn('pubsub#node_type', meta_data.iterkeys())
        self.assertEqual(meta_data['pubsub#node_type'], 'leaf')

    def testGetAffiliation(self):
        def cb(affiliation):
            self.assertEqual(affiliation, 'owner')

        d = self.node.get_affiliation(OWNER)
        d.addCallback(cb)
        return d

    def testGetNonExistingAffiliation(self):
        def cb(affiliation):
            self.assertEqual(affiliation, None)

        d = self.node.get_affiliation(SUBSCRIBER)
        d.addCallback(cb)
        return d

    def testAddSubscription(self):
        def cb1(void):
            return self.node.get_subscription(SUBSCRIBER_NEW)

        def cb2(state):
            self.assertEqual(state, 'pending')

        d = self.node.add_subscription(SUBSCRIBER_NEW, 'pending')
        d.addCallback(cb1)
        d.addCallback(cb2)
        return d

    def testAddExistingSubscription(self):
        d = self.node.add_subscription(SUBSCRIBER, 'pending')
        self.assertFailure(d, error.SubscriptionExists)
        return d

    def testGetSubscription(self):
        def cb(subscriptions):
            self.assertEquals(subscriptions[0][1], 'subscribed')
            self.assertEquals(subscriptions[1][1], 'pending')
            self.assertEquals(subscriptions[2][1], None)

        d = defer.DeferredList([self.node.get_subscription(SUBSCRIBER),
                                self.node.get_subscription(SUBSCRIBER_PENDING),
                                self.node.get_subscription(OWNER)])
        d.addCallback(cb)
        return d

    def testRemoveSubscription(self):
        return self.node.remove_subscription(SUBSCRIBER_TO_BE_DELETED)

    def testRemoveNonExistingSubscription(self):
        d = self.node.remove_subscription(OWNER)
        self.assertFailure(d, error.NotSubscribed)
        return d

    def testGetSubscribers(self):
        def cb(subscribers):
            self.assertIn(SUBSCRIBER, subscribers)
            self.assertNotIn(SUBSCRIBER_PENDING, subscribers)
            self.assertNotIn(OWNER, subscribers)

        d = self.node.get_subscribers()
        d.addCallback(cb)
        return d

    def testIsSubscriber(self):
        def cb(subscribed):
            self.assertEquals(subscribed[0][1], True)
            self.assertEquals(subscribed[1][1], True)
            self.assertEquals(subscribed[2][1], False)
            self.assertEquals(subscribed[3][1], False)

        d = defer.DeferredList([self.node.is_subscribed(SUBSCRIBER),
                                self.node.is_subscribed(SUBSCRIBER.userhostJID()),
                                self.node.is_subscribed(SUBSCRIBER_PENDING),
                                self.node.is_subscribed(OWNER)])
        d.addCallback(cb)
        return d

    def testStoreItems(self):
        def cb1(void):
            return self.node.get_items_by_id(['new'])

        def cb2(result):
            self.assertEqual(result[0], decode(ITEM_NEW.toXml()))

        d = self.node.store_items([ITEM_NEW], PUBLISHER)
        d.addCallback(cb1)
        d.addCallback(cb2)
        return d

    def testStoreUpdatedItems(self):
        def cb1(void):
            return self.node.get_items_by_id(['current'])

        def cb2(result):
            self.assertEqual(result[0], decode(ITEM_UPDATED.toXml()))

        d = self.node.store_items([ITEM_UPDATED], PUBLISHER)
        d.addCallback(cb1)
        d.addCallback(cb2)
        return d

    def testRemoveItems(self):
        def cb1(result):
            self.assertEqual(result, ['to-be-deleted'])
            return self.node.get_items_by_id(['to-be-deleted'])

        def cb2(result):
            self.assertEqual(len(result), 0)

        d = self.node.remove_items(['to-be-deleted'])
        d.addCallback(cb1)
        d.addCallback(cb2)
        return d

    def testRemoveNonExistingItems(self):
        def cb(result):
            self.assertEqual(result, [])

        d = self.node.remove_items(['non-existing'])
        d.addCallback(cb)
        return d

    def testGetItems(self):
        def cb(result):
            self.assertIn(decode(ITEM.toXml()), result)

        d = self.node.get_items()
        d.addCallback(cb)
        return d

    def testLastItem(self):
        def cb(result):
            self.assertEqual([decode(ITEM.toXml())], result)

        d = self.node.get_items(1)
        d.addCallback(cb)
        return d

    def testGetItemsById(self):
        def cb(result):
            self.assertEqual(len(result), 1)

        d = self.node.get_items_by_id(['current'])
        d.addCallback(cb)
        return d

    def testGetNonExistingItemsById(self):
        def cb(result):
            self.assertEqual(len(result), 0)

        d = self.node.get_items_by_id(['non-existing'])
        d.addCallback(cb)
        return d

    def testPurge(self):
        def cb1(node):
            d = node.purge()
            d.addCallback(lambda _: node)
            return d

        def cb2(node):
            return node.get_items()

        def cb3(result):
            self.assertEqual([], result)

        d = self.s.get_node('to-be-purged')
        d.addCallback(cb1)
        d.addCallback(cb2)
        d.addCallback(cb3)
        return d

    def testGetNodeAffilatiations(self):
        def cb1(node):
            return node.get_affiliations()

        def cb2(affiliations):
            affiliations = dict(((a[0].full(), a[1]) for a in affiliations))
            self.assertEquals(affiliations[OWNER.full()], 'owner')

        d = self.s.get_node('pre-existing')
        d.addCallback(cb1)
        d.addCallback(cb2)
        return d


class MemoryStorageStorageTestCase(unittest.TestCase, StorageTests):

    def setUp(self):
        from idavoll.memory_storage import Storage, LeafNode, Subscription, \
                                           default_config
        self.s = Storage()
        self.s._nodes['pre-existing'] = \
                LeafNode('pre-existing', OWNER, default_config)
        self.s._nodes['to-be-deleted'] = \
                LeafNode('to-be-deleted', OWNER, None)
        self.s._nodes['to-be-reconfigured'] = \
                LeafNode('to-be-reconfigured', OWNER, default_config)
        self.s._nodes['to-be-purged'] = \
                LeafNode('to-be-purged', OWNER, None)

        subscriptions = self.s._nodes['pre-existing']._subscriptions
        subscriptions[SUBSCRIBER.full()] = Subscription('subscribed')
        subscriptions[SUBSCRIBER_TO_BE_DELETED.full()] = \
                Subscription('subscribed')
        subscriptions[SUBSCRIBER_PENDING.full()] = \
                Subscription('pending')

        item = (decode(ITEM_TO_BE_DELETED.toXml()), PUBLISHER)
        self.s._nodes['pre-existing']._items['to-be-deleted'] = item
        self.s._nodes['pre-existing']._itemlist.append(item)
        self.s._nodes['to-be-purged']._items['to-be-deleted'] = item
        self.s._nodes['to-be-purged']._itemlist.append(item)
        item = (decode(ITEM.toXml()), PUBLISHER)
        self.s._nodes['pre-existing']._items['current'] = item
        self.s._nodes['pre-existing']._itemlist.append(item)

        return StorageTests.setUp(self)


class PgsqlStorageStorageTestCase(unittest.TestCase, StorageTests):
    def _callSuperSetUp(self, void):
        return StorageTests.setUp(self)

    def setUp(self):
        from idavoll.pgsql_storage import Storage
        self.s = Storage('ralphm', 'pubsub_test')
        self.s._dbpool.start()
        d = self.s._dbpool.runInteraction(self.init)
        d.addCallback(self._callSuperSetUp)
        return d

    def tearDownClass(self):
        #return self.s._dbpool.runInteraction(self.cleandb)
        pass

    def init(self, cursor):
        self.cleandb(cursor)
        cursor.execute("""INSERT INTO nodes (node) VALUES ('pre-existing')""")
        cursor.execute("""INSERT INTO nodes (node) VALUES ('to-be-deleted')""")
        cursor.execute("""INSERT INTO nodes (node) VALUES ('to-be-reconfigured')""")
        cursor.execute("""INSERT INTO nodes (node) VALUES ('to-be-purged')""")
        cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
                       OWNER.userhost())
        cursor.execute("""INSERT INTO affiliations
                          (node_id, entity_id, affiliation)
                          SELECT nodes.id, entities.id, 'owner'
                          FROM nodes, entities
                          WHERE node='pre-existing' AND jid=%s""",
                       OWNER.userhost())
        cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
                       SUBSCRIBER.userhost())
        cursor.execute("""INSERT INTO subscriptions
                          (node_id, entity_id, resource, subscription)
                          SELECT nodes.id, entities.id, %s, 'subscribed'
                          FROM nodes, entities
                          WHERE node='pre-existing' AND jid=%s""",
                       (SUBSCRIBER.resource,
                        SUBSCRIBER.userhost()))
        cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
                       SUBSCRIBER_TO_BE_DELETED.userhost())
        cursor.execute("""INSERT INTO subscriptions
                          (node_id, entity_id, resource, subscription)
                          SELECT nodes.id, entities.id, %s, 'subscribed'
                          FROM nodes, entities
                          WHERE node='pre-existing' AND jid=%s""",
                       (SUBSCRIBER_TO_BE_DELETED.resource,
                        SUBSCRIBER_TO_BE_DELETED.userhost()))
        cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
                       SUBSCRIBER_PENDING.userhost())
        cursor.execute("""INSERT INTO subscriptions
                          (node_id, entity_id, resource, subscription)
                          SELECT nodes.id, entities.id, %s, 'pending'
                          FROM nodes, entities
                          WHERE node='pre-existing' AND jid=%s""",
                       (SUBSCRIBER_PENDING.resource,
                        SUBSCRIBER_PENDING.userhost()))
        cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
                       PUBLISHER.userhost())
        cursor.execute("""INSERT INTO items
                          (node_id, publisher, item, data, date)
                          SELECT nodes.id, %s, 'to-be-deleted', %s,
                                 now() - interval '1 day'
                          FROM nodes
                          WHERE node='pre-existing'""",
                       (PUBLISHER.userhost(),
                        ITEM_TO_BE_DELETED.toXml()))
        cursor.execute("""INSERT INTO items (node_id, publisher, item, data)
                          SELECT nodes.id, %s, 'to-be-deleted', %s
                          FROM nodes
                          WHERE node='to-be-purged'""",
                       (PUBLISHER.userhost(),
                        ITEM_TO_BE_DELETED.toXml()))
        cursor.execute("""INSERT INTO items (node_id, publisher, item, data)
                          SELECT nodes.id, %s, 'current', %s
                          FROM nodes
                          WHERE node='pre-existing'""",
                       (PUBLISHER.userhost(),
                        ITEM.toXml()))

    def cleandb(self, cursor):
        cursor.execute("""DELETE FROM nodes WHERE node in
                          ('non-existing', 'pre-existing', 'to-be-deleted',
                           'new 1', 'new 2', 'new 3', 'to-be-reconfigured',
                           'to-be-purged')""")
        cursor.execute("""DELETE FROM entities WHERE jid=%s""",
                       OWNER.userhost())
        cursor.execute("""DELETE FROM entities WHERE jid=%s""",
                       SUBSCRIBER.userhost())
        cursor.execute("""DELETE FROM entities WHERE jid=%s""",
                       SUBSCRIBER_TO_BE_DELETED.userhost())
        cursor.execute("""DELETE FROM entities WHERE jid=%s""",
                       SUBSCRIBER_PENDING.userhost())
        cursor.execute("""DELETE FROM entities WHERE jid=%s""",
                       PUBLISHER.userhost())

try:
    import pyPgSQL
    pyPgSQL
except ImportError:
    PgsqlStorageStorageTestCase.skip = "pyPgSQL not available"