# HG changeset patch # User Ralph Meijer # Date 1207924876 0 # Node ID c21b986cff304f8889b9d14b394b2f4268298227 # Parent 4aa29b1a8c67cbb91810e1c9a1f968bb42337ceb Implement HTTP client to gateway and implement functional tests with it. diff -r 4aa29b1a8c67 -r c21b986cff30 idavoll/backend.py --- a/idavoll/backend.py Fri Apr 11 13:48:12 2008 +0000 +++ b/idavoll/backend.py Fri Apr 11 14:41:16 2008 +0000 @@ -9,9 +9,9 @@ from twisted.application import service from twisted.python import components -from twisted.internet import defer +from twisted.internet import defer, reactor from twisted.words.protocols.jabber.error import StanzaError -from twisted.words.xish import utility +from twisted.words.xish import domish, utility from wokkel.iwokkel import IDisco, IPubSubService from wokkel.pubsub import PubSubService, PubSubError @@ -182,14 +182,36 @@ return node_id, result def _send_last_published(self, node, subscriber): - def notify_item(items): - if not items: + class StringParser(object): + def __init__(self): + self.elementStream = domish.elementStream() + self.elementStream.DocumentStartEvent = self.docStart + self.elementStream.ElementEvent = self.elem + self.elementStream.DocumentEndEvent = self.docEnd + + def docStart(self, elem): + self.document = elem + + def elem(self, elem): + self.document.addChild(elem) + + def docEnd(self): + pass + + def parse(self, string): + self.elementStream.parse(string) + return self.document + + def notify_item(result): + if not result: return - self.dispatch({'items': items, - 'node_id': node.id, - 'subscriber': subscriber}, - '//event/pubsub/notify') + items = [domish.SerializedXML(item) for item in result] + + reactor.callLater(0, self.dispatch, {'items': items, + 'node_id': node.id, + 'subscriber': subscriber}, + '//event/pubsub/notify') config = node.get_configuration() if config.get("pubsub#send_last_published_item", 'never') != 'on_sub': diff -r 4aa29b1a8c67 -r c21b986cff30 idavoll/gateway.py --- a/idavoll/gateway.py Fri Apr 11 13:48:12 2008 +0000 +++ b/idavoll/gateway.py Fri Apr 11 14:41:16 2008 +0000 @@ -1,15 +1,25 @@ +# -*- test-case-name: idavoll.test.test_gateway -*- +# # Copyright (c) 2003-2008 Ralph Meijer # See LICENSE for details. +""" +Web resources and client for interacting with pubsub services. +""" + import cgi from time import gmtime, strftime +import urllib +import urlparse import simplejson from twisted.application import service from twisted.internet import defer, reactor +from twisted.python import log from twisted.web import client from twisted.web2 import http, http_headers, resource, responsecode +from twisted.web2 import channel, server from twisted.web2.stream import readStream from twisted.words.protocols.jabber.jid import JID from twisted.words.protocols.jabber.error import StanzaError @@ -21,7 +31,8 @@ from idavoll import error NS_ATOM = 'http://www.w3.org/2005/Atom' - +MIME_ATOM_ENTRY = 'application/atom+xml;type=entry' +MIME_JSON = 'application/json' class RemoteSubscriptionService(service.Service, PubSubClient): @@ -142,10 +153,11 @@ headers['Event'] = eventType def postNotification(callbackURI): - return client.getPage(str(callbackURI), - method='POST', - postdata=postdata, - headers=headers) + d = client.getPage(str(callbackURI), + method='POST', + postdata=postdata, + headers=headers) + d.addErrback(log.err) for callbackURI in callbacks: reactor.callLater(0, postNotification, callbackURI) @@ -328,10 +340,6 @@ http_headers.generateContentType(ctype))) def parseXMLPayload(self, stream): - if not stream: - print "Stream is empty", repr(stream) - elif not stream.length: - print "Stream length is", repr(stream.length) p = WebStreamParser() return p.parse(stream) @@ -466,9 +474,119 @@ def render(self, request): def responseFromNodes(nodeIdentifiers): - import pprint - return http.Response(responsecode.OK, stream=pprint.pformat(nodeIdentifiers)) + return http.Response(responsecode.OK, + stream=simplejson.dumps(nodeIdentifiers)) d = self.service.get_nodes() d.addCallback(responseFromNodes) return d + + +def getPageWithFactory(url, contextFactory=None, *args, **kwargs): + """Download a web page. + + Download a page. Return the factory that holds a deferred, which will + callback with a page (as a string) or errback with a description of the + error. + + See HTTPClientFactory to see what extra args can be passed. + """ + + scheme, host, port, path = client._parse(url) + factory = client.HTTPClientFactory(url, *args, **kwargs) + factory.protocol.handleStatus_204 = lambda self: self.handleStatus_200() + + if scheme == 'https': + from twisted.internet import ssl + if contextFactory is None: + contextFactory = ssl.ClientContextFactory() + reactor.connectSSL(host, port, factory, contextFactory) + else: + reactor.connectTCP(host, port, factory) + return factory + + +class CallbackResource(resource.Resource): + """ + Web resource for retrieving gateway notifications. + """ + + def __init__(self, callback): + self.callback = callback + + http_GET = None + + def http_POST(self, request): + p = WebStreamParser() + d = p.parse(request.stream) + d.addCallback(self.callback, request.headers) + d.addCallback(lambda _: http.Response(responsecode.NO_CONTENT)) + return d + +class GatewayClient(service.Service): + """ + Service that provides client access to the HTTP Gateway into Idavoll. + """ + + agent = "Idavoll HTTP Gateway Client" + + def __init__(self, baseURI, callbackHost=None, callbackPort=None): + self.baseURI = baseURI + self.callbackHost = callbackHost or 'localhost' + self.callbackPort = callbackPort or 8087 + root = resource.Resource() + root.child_callback = CallbackResource(lambda *args, **kwargs: self.callback(*args, **kwargs)) + self.site = server.Site(root) + + def startService(self): + self.port = reactor.listenTCP(self.callbackPort, + channel.HTTPFactory(self.site)) + + def stopService(self): + return self.port.stopListening() + + def _makeURI(self, verb, query=None): + uriComponents = urlparse.urlparse(self.baseURI) + uri = urlparse.urlunparse((uriComponents[0], + uriComponents[1], + uriComponents[2] + verb, + '', + query and urllib.urlencode(query) or '', + '')) + return uri + + def callback(self, data, headers): + pass + + def create(self): + f = getPageWithFactory(self._makeURI('create'), + method='POST', + agent=self.agent) + return f.deferred.addCallback(simplejson.loads) + + def publish(self, entry, xmppURI=None): + query = xmppURI and {'uri': xmppURI} + + f = getPageWithFactory(self._makeURI('publish', query), + method='POST', + postdata=entry.toXml().encode('utf-8'), + headers={'Content-Type': MIME_ATOM_ENTRY}, + agent=self.agent) + return f.deferred.addCallback(simplejson.loads) + + def listNodes(self): + f = getPageWithFactory(self._makeURI('list'), + method='GET', + agent=self.agent) + return f.deferred.addCallback(simplejson.loads) + + def subscribe(self, xmppURI): + params = {'uri': xmppURI, + 'callback': 'http://%s:%s/callback' % (self.callbackHost, + self.callbackPort)} + f = getPageWithFactory(self._makeURI('subscribe'), + method='POST', + postdata=simplejson.dumps(params), + headers={'Content-Type': MIME_JSON}, + agent=self.agent) + return f.deferred diff -r 4aa29b1a8c67 -r c21b986cff30 idavoll/tap_http.py --- a/idavoll/tap_http.py Fri Apr 11 13:48:12 2008 +0000 +++ b/idavoll/tap_http.py Fri Apr 11 14:41:16 2008 +0000 @@ -49,9 +49,9 @@ config['jid']) root.child_publish = gateway.PublishResource(bs, config['jid'], config['jid']) + root.child_list = gateway.ListResource(bs) root.child_subscribe = gateway.SubscribeResource(ss) root.child_unsubscribe = gateway.UnsubscribeResource(ss) - root.child_list = gateway.ListResource(ss) site = server.Site(root) w = internet.TCPServer(int(config['webport']), channel.HTTPFactory(site)) diff -r 4aa29b1a8c67 -r c21b986cff30 idavoll/test/test_gateway.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/idavoll/test/test_gateway.py Fri Apr 11 14:41:16 2008 +0000 @@ -0,0 +1,150 @@ +# Copyright (c) 2003-2008 Ralph Meijer +# See LICENSE for details. + +""" +Tests for L{idavoll.gateway}. + +Note that some tests are functional tests that require a running idavoll +service. +""" + +from twisted.internet import defer +from twisted.trial import unittest +from twisted.web import error +from twisted.words.xish import domish + +from idavoll import gateway + +AGENT = "Idavoll Test Script" +NS_ATOM = "http://www.w3.org/2005/Atom" + +entry = domish.Element((NS_ATOM, 'entry')) +entry.addElement("id", content="urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a") +entry.addElement("title", content="Atom-Powered Robots Run Amok") +entry.addElement("author").addElement("name", content="John Doe") +entry.addElement("content", content="Some text.") + +#baseURI = "http://pubsub-test.ik.nu/" +baseURI = "http://localhost:8086/" +componentJID = "test.ik.nu" + +class GatewayTest(unittest.TestCase): + + def setUp(self): + self.client = gateway.GatewayClient(baseURI) + self.client.startService() + + def tearDown(self): + self.client.stopService() + + def test_create(self): + + def cb(response): + self.assertIn('uri', response) + + d = self.client.create() + d.addCallback(cb) + return d + + def test_publish(self): + + def cb(response): + self.assertIn('uri', response) + + d = self.client.publish(entry) + d.addCallback(cb) + return d + + def test_publishExistingNode(self): + + def cb2(response, xmppURI): + self.assertEquals(xmppURI, response['uri']) + + def cb1(response): + xmppURI = response['uri'] + d = self.client.publish(entry, xmppURI) + d.addCallback(cb2, xmppURI) + return d + + d = self.client.create() + d.addCallback(cb1) + return d + + def test_publishNonExisting(self): + def cb(err): + self.assertEqual('404', err.status) + + d = self.client.publish(entry, 'xmpp:%s?node=test' % componentJID) + self.assertFailure(d, error.Error) + d.addCallback(cb) + return d + + def test_list(self): + d = self.client.listNodes() + return d + + def test_subscribe(self): + def cb(response): + xmppURI = response['uri'] + d = self.client.subscribe(xmppURI) + return d + + d = self.client.create() + d.addCallback(cb) + return d + + def test_subscribeGetNotification(self): + + def onNotification(data, headers): + self.client.deferred.callback(None) + + def cb(response): + xmppURI = response['uri'] + d = self.client.subscribe(xmppURI) + d.addCallback(lambda _: xmppURI) + return d + + def cb2(xmppURI): + d = self.client.publish(entry, xmppURI) + return d + + + self.client.callback = onNotification + self.client.deferred = defer.Deferred() + d = self.client.create() + d.addCallback(cb) + d.addCallback(cb2) + return defer.gatherResults([d, self.client.deferred]) + + def test_subscribeGetDelayedNotification(self): + + def onNotification(data, headers): + self.client.deferred.callback(None) + + def cb(response): + xmppURI = response['uri'] + self.assertNot(self.client.deferred.called) + d = self.client.publish(entry, xmppURI) + d.addCallback(lambda _: xmppURI) + return d + + def cb2(xmppURI): + d = self.client.subscribe(xmppURI) + return d + + + self.client.callback = onNotification + self.client.deferred = defer.Deferred() + d = self.client.create() + d.addCallback(cb) + d.addCallback(cb2) + return defer.gatherResults([d, self.client.deferred]) + + def test_subscribeNonExisting(self): + def cb(err): + self.assertEqual('404', err.status) + + d = self.client.subscribe('xmpp:%s?node=test' % componentJID) + self.assertFailure(d, error.Error) + d.addCallback(cb) + return d