Mercurial > libervia-pubsub
diff sat_pubsub/test/test_gateway.py @ 232:923281d4c5bc
renamed idavoll directory to sat_pubsub
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 17 May 2012 12:48:14 +0200 |
parents | idavoll/test/test_gateway.py@bfc198af5d27 |
children | 564ae55219e1 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat_pubsub/test/test_gateway.py Thu May 17 12:48:14 2012 +0200 @@ -0,0 +1,398 @@ +# Copyright (c) 2003-2009 Ralph Meijer +# See LICENSE for details. + +""" +Tests for L{idavoll.gateway}. + +Note that some tests are functional tests that require a running idavoll +service. +""" + +from twisted.internet import defer +from twisted.trial import unittest +from twisted.web import error +from twisted.words.xish import domish + +from idavoll import gateway + +AGENT = "Idavoll Test Script" +NS_ATOM = "http://www.w3.org/2005/Atom" + +TEST_ENTRY = domish.Element((NS_ATOM, 'entry')) +TEST_ENTRY.addElement("id", + content="urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a") +TEST_ENTRY.addElement("title", content="Atom-Powered Robots Run Amok") +TEST_ENTRY.addElement("author").addElement("name", content="John Doe") +TEST_ENTRY.addElement("content", content="Some text.") + +baseURI = "http://localhost:8086/" +componentJID = "pubsub" + +class GatewayTest(unittest.TestCase): + timeout = 2 + + def setUp(self): + self.client = gateway.GatewayClient(baseURI) + self.client.startService() + self.addCleanup(self.client.stopService) + + def trapConnectionRefused(failure): + from twisted.internet.error import ConnectionRefusedError + failure.trap(ConnectionRefusedError) + raise unittest.SkipTest("Gateway to test against is not available") + + def trapNotFound(failure): + from twisted.web.error import Error + failure.trap(Error) + + d = self.client.ping() + d.addErrback(trapConnectionRefused) + d.addErrback(trapNotFound) + return d + + + def tearDown(self): + return self.client.stopService() + + + def test_create(self): + + def cb(response): + self.assertIn('uri', response) + + d = self.client.create() + d.addCallback(cb) + return d + + def test_publish(self): + + def cb(response): + self.assertIn('uri', response) + + d = self.client.publish(TEST_ENTRY) + d.addCallback(cb) + return d + + def test_publishExistingNode(self): + + def cb2(response, xmppURI): + self.assertEquals(xmppURI, response['uri']) + + def cb1(response): + xmppURI = response['uri'] + d = self.client.publish(TEST_ENTRY, xmppURI) + d.addCallback(cb2, xmppURI) + return d + + d = self.client.create() + d.addCallback(cb1) + return d + + def test_publishNonExisting(self): + def cb(err): + self.assertEqual('404', err.status) + + d = self.client.publish(TEST_ENTRY, 'xmpp:%s?node=test' % componentJID) + self.assertFailure(d, error.Error) + d.addCallback(cb) + return d + + def test_delete(self): + def cb(response): + xmppURI = response['uri'] + d = self.client.delete(xmppURI) + return d + + d = self.client.create() + d.addCallback(cb) + return d + + def test_deleteWithRedirect(self): + def cb(response): + xmppURI = response['uri'] + redirectURI = 'xmpp:%s?node=test' % componentJID + d = self.client.delete(xmppURI, redirectURI) + return d + + d = self.client.create() + d.addCallback(cb) + return d + + def test_deleteNotification(self): + def onNotification(data, headers): + try: + self.assertTrue(headers.hasHeader('Event')) + self.assertEquals(['DELETED'], headers.getRawHeaders('Event')) + self.assertFalse(headers.hasHeader('Link')) + except: + self.client.deferred.errback() + else: + self.client.deferred.callback(None) + + def cb(response): + xmppURI = response['uri'] + d = self.client.subscribe(xmppURI) + d.addCallback(lambda _: xmppURI) + return d + + def cb2(xmppURI): + d = self.client.delete(xmppURI) + return d + + self.client.callback = onNotification + self.client.deferred = defer.Deferred() + d = self.client.create() + d.addCallback(cb) + d.addCallback(cb2) + return defer.gatherResults([d, self.client.deferred]) + + def test_deleteNotificationWithRedirect(self): + redirectURI = 'xmpp:%s?node=test' % componentJID + + def onNotification(data, headers): + try: + self.assertTrue(headers.hasHeader('Event')) + self.assertEquals(['DELETED'], headers.getRawHeaders('Event')) + self.assertEquals(['<%s>; rel=alternate' % redirectURI], + headers.getRawHeaders('Link')) + except: + self.client.deferred.errback() + else: + self.client.deferred.callback(None) + + def cb(response): + xmppURI = response['uri'] + d = self.client.subscribe(xmppURI) + d.addCallback(lambda _: xmppURI) + return d + + def cb2(xmppURI): + d = self.client.delete(xmppURI, redirectURI) + return d + + self.client.callback = onNotification + self.client.deferred = defer.Deferred() + d = self.client.create() + d.addCallback(cb) + d.addCallback(cb2) + return defer.gatherResults([d, self.client.deferred]) + + def test_list(self): + d = self.client.listNodes() + return d + + def test_subscribe(self): + def cb(response): + xmppURI = response['uri'] + d = self.client.subscribe(xmppURI) + return d + + d = self.client.create() + d.addCallback(cb) + return d + + def test_subscribeGetNotification(self): + + def onNotification(data, headers): + self.client.deferred.callback(None) + + def cb(response): + xmppURI = response['uri'] + d = self.client.subscribe(xmppURI) + d.addCallback(lambda _: xmppURI) + return d + + def cb2(xmppURI): + d = self.client.publish(TEST_ENTRY, xmppURI) + return d + + + self.client.callback = onNotification + self.client.deferred = defer.Deferred() + d = self.client.create() + d.addCallback(cb) + d.addCallback(cb2) + return defer.gatherResults([d, self.client.deferred]) + + + def test_subscribeTwiceGetNotification(self): + + def onNotification1(data, headers): + d = client1.stopService() + d.chainDeferred(client1.deferred) + + def onNotification2(data, headers): + d = client2.stopService() + d.chainDeferred(client2.deferred) + + def cb(response): + xmppURI = response['uri'] + d = client1.subscribe(xmppURI) + d.addCallback(lambda _: xmppURI) + return d + + def cb2(xmppURI): + d = client2.subscribe(xmppURI) + d.addCallback(lambda _: xmppURI) + return d + + def cb3(xmppURI): + d = self.client.publish(TEST_ENTRY, xmppURI) + return d + + + client1 = gateway.GatewayClient(baseURI, callbackPort=8088) + client1.startService() + client1.callback = onNotification1 + client1.deferred = defer.Deferred() + client2 = gateway.GatewayClient(baseURI, callbackPort=8089) + client2.startService() + client2.callback = onNotification2 + client2.deferred = defer.Deferred() + + d = self.client.create() + d.addCallback(cb) + d.addCallback(cb2) + d.addCallback(cb3) + dl = defer.gatherResults([d, client1.deferred, client2.deferred]) + return dl + + + def test_subscribeGetDelayedNotification(self): + + def onNotification(data, headers): + self.client.deferred.callback(None) + + def cb(response): + xmppURI = response['uri'] + self.assertNot(self.client.deferred.called) + d = self.client.publish(TEST_ENTRY, xmppURI) + d.addCallback(lambda _: xmppURI) + return d + + def cb2(xmppURI): + d = self.client.subscribe(xmppURI) + return d + + + self.client.callback = onNotification + self.client.deferred = defer.Deferred() + d = self.client.create() + d.addCallback(cb) + d.addCallback(cb2) + return defer.gatherResults([d, self.client.deferred]) + + def test_subscribeGetDelayedNotification2(self): + """ + Test that subscribing as second results in a notification being sent. + """ + + def onNotification1(data, headers): + client1.deferred.callback(None) + client1.stopService() + + def onNotification2(data, headers): + client2.deferred.callback(None) + client2.stopService() + + def cb(response): + xmppURI = response['uri'] + self.assertNot(client1.deferred.called) + self.assertNot(client2.deferred.called) + d = self.client.publish(TEST_ENTRY, xmppURI) + d.addCallback(lambda _: xmppURI) + return d + + def cb2(xmppURI): + d = client1.subscribe(xmppURI) + d.addCallback(lambda _: xmppURI) + return d + + def cb3(xmppURI): + d = client2.subscribe(xmppURI) + return d + + client1 = gateway.GatewayClient(baseURI, callbackPort=8088) + client1.startService() + client1.callback = onNotification1 + client1.deferred = defer.Deferred() + client2 = gateway.GatewayClient(baseURI, callbackPort=8089) + client2.startService() + client2.callback = onNotification2 + client2.deferred = defer.Deferred() + + + d = self.client.create() + d.addCallback(cb) + d.addCallback(cb2) + d.addCallback(cb3) + dl = defer.gatherResults([d, client1.deferred, client2.deferred]) + return dl + + + def test_subscribeNonExisting(self): + def cb(err): + self.assertEqual('403', err.status) + + d = self.client.subscribe('xmpp:%s?node=test' % componentJID) + self.assertFailure(d, error.Error) + d.addCallback(cb) + return d + + + def test_subscribeRootGetNotification(self): + + def onNotification(data, headers): + self.client.deferred.callback(None) + + def cb(response): + xmppURI = response['uri'] + jid, nodeIdentifier = gateway.getServiceAndNode(xmppURI) + rootNode = gateway.getXMPPURI(jid, '') + + d = self.client.subscribe(rootNode) + d.addCallback(lambda _: xmppURI) + return d + + def cb2(xmppURI): + return self.client.publish(TEST_ENTRY, xmppURI) + + + self.client.callback = onNotification + self.client.deferred = defer.Deferred() + d = self.client.create() + d.addCallback(cb) + d.addCallback(cb2) + return defer.gatherResults([d, self.client.deferred]) + + + def test_unsubscribeNonExisting(self): + def cb(err): + self.assertEqual('403', err.status) + + d = self.client.unsubscribe('xmpp:%s?node=test' % componentJID) + self.assertFailure(d, error.Error) + d.addCallback(cb) + return d + + + def test_items(self): + def cb(response): + xmppURI = response['uri'] + d = self.client.items(xmppURI) + return d + + d = self.client.publish(TEST_ENTRY) + d.addCallback(cb) + return d + + + def test_itemsMaxItems(self): + def cb(response): + xmppURI = response['uri'] + d = self.client.items(xmppURI, 2) + return d + + d = self.client.publish(TEST_ENTRY) + d.addCallback(cb) + return d