changeset 183:c21b986cff30

Implement HTTP client to gateway and implement functional tests with it.
author Ralph Meijer <ralphm@ik.nu>
date Fri, 11 Apr 2008 14:41:16 +0000
parents 4aa29b1a8c67
children bd88658dbca3
files idavoll/backend.py idavoll/gateway.py idavoll/tap_http.py idavoll/test/test_gateway.py
diffstat 4 files changed, 310 insertions(+), 20 deletions(-) [+]
line wrap: on
line diff
--- a/idavoll/backend.py	Fri Apr 11 13:48:12 2008 +0000
+++ b/idavoll/backend.py	Fri Apr 11 14:41:16 2008 +0000
@@ -9,9 +9,9 @@
 
 from twisted.application import service
 from twisted.python import components
-from twisted.internet import defer
+from twisted.internet import defer, reactor
 from twisted.words.protocols.jabber.error import StanzaError
-from twisted.words.xish import utility
+from twisted.words.xish import domish, utility
 
 from wokkel.iwokkel import IDisco, IPubSubService
 from wokkel.pubsub import PubSubService, PubSubError
@@ -182,14 +182,36 @@
         return node_id, result
 
     def _send_last_published(self, node, subscriber):
-        def notify_item(items):
-            if not items:
+        class StringParser(object):
+            def __init__(self):
+                self.elementStream = domish.elementStream()
+                self.elementStream.DocumentStartEvent = self.docStart
+                self.elementStream.ElementEvent = self.elem
+                self.elementStream.DocumentEndEvent = self.docEnd
+
+            def docStart(self, elem):
+                self.document = elem
+
+            def elem(self, elem):
+                self.document.addChild(elem)
+
+            def docEnd(self):
+                pass
+
+            def parse(self, string):
+                self.elementStream.parse(string)
+                return self.document
+
+        def notify_item(result):
+            if not result:
                 return
 
-            self.dispatch({'items': items,
-                           'node_id': node.id,
-                           'subscriber': subscriber},
-                          '//event/pubsub/notify')
+            items = [domish.SerializedXML(item) for item in result]
+
+            reactor.callLater(0, self.dispatch, {'items': items,
+                                                 'node_id': node.id,
+                                                 'subscriber': subscriber},
+                                                 '//event/pubsub/notify')
 
         config = node.get_configuration()
         if config.get("pubsub#send_last_published_item", 'never') != 'on_sub':
--- a/idavoll/gateway.py	Fri Apr 11 13:48:12 2008 +0000
+++ b/idavoll/gateway.py	Fri Apr 11 14:41:16 2008 +0000
@@ -1,15 +1,25 @@
+# -*- test-case-name: idavoll.test.test_gateway -*-
+#
 # Copyright (c) 2003-2008 Ralph Meijer
 # See LICENSE for details.
 
+"""
+Web resources and client for interacting with pubsub services.
+"""
+
 import cgi
 from time import gmtime, strftime
+import urllib
+import urlparse
 
 import simplejson
 
 from twisted.application import service
 from twisted.internet import defer, reactor
+from twisted.python import log
 from twisted.web import client
 from twisted.web2 import http, http_headers, resource, responsecode
+from twisted.web2 import channel, server
 from twisted.web2.stream import readStream
 from twisted.words.protocols.jabber.jid import JID
 from twisted.words.protocols.jabber.error import StanzaError
@@ -21,7 +31,8 @@
 from idavoll import error
 
 NS_ATOM = 'http://www.w3.org/2005/Atom'
-
+MIME_ATOM_ENTRY = 'application/atom+xml;type=entry'
+MIME_JSON = 'application/json'
 
 class RemoteSubscriptionService(service.Service, PubSubClient):
 
@@ -142,10 +153,11 @@
             headers['Event'] = eventType
 
         def postNotification(callbackURI):
-            return client.getPage(str(callbackURI),
-                                  method='POST',
-                                  postdata=postdata,
-                                  headers=headers)
+            d = client.getPage(str(callbackURI),
+                                   method='POST',
+                                   postdata=postdata,
+                                   headers=headers)
+            d.addErrback(log.err)
 
         for callbackURI in callbacks:
             reactor.callLater(0, postNotification, callbackURI)
@@ -328,10 +340,6 @@
                         http_headers.generateContentType(ctype)))
 
     def parseXMLPayload(self, stream):
-        if not stream:
-            print "Stream is empty", repr(stream)
-        elif not stream.length:
-            print "Stream length is", repr(stream.length)
         p = WebStreamParser()
         return p.parse(stream)
 
@@ -466,9 +474,119 @@
 
     def render(self, request):
         def responseFromNodes(nodeIdentifiers):
-            import pprint
-            return http.Response(responsecode.OK, stream=pprint.pformat(nodeIdentifiers))
+            return http.Response(responsecode.OK,
+                                 stream=simplejson.dumps(nodeIdentifiers))
 
         d = self.service.get_nodes()
         d.addCallback(responseFromNodes)
         return d
+
+
+def getPageWithFactory(url, contextFactory=None, *args, **kwargs):
+    """Download a web page.
+
+    Download a page. Return the factory that holds a deferred, which will
+    callback with a page (as a string) or errback with a description of the
+    error.
+
+    See HTTPClientFactory to see what extra args can be passed.
+    """
+
+    scheme, host, port, path = client._parse(url)
+    factory = client.HTTPClientFactory(url, *args, **kwargs)
+    factory.protocol.handleStatus_204 = lambda self: self.handleStatus_200()
+
+    if scheme == 'https':
+        from twisted.internet import ssl
+        if contextFactory is None:
+            contextFactory = ssl.ClientContextFactory()
+        reactor.connectSSL(host, port, factory, contextFactory)
+    else:
+        reactor.connectTCP(host, port, factory)
+    return factory
+
+
+class CallbackResource(resource.Resource):
+    """
+    Web resource for retrieving gateway notifications.
+    """
+
+    def __init__(self, callback):
+        self.callback = callback
+
+    http_GET = None
+
+    def http_POST(self, request):
+        p = WebStreamParser()
+        d = p.parse(request.stream)
+        d.addCallback(self.callback, request.headers)
+        d.addCallback(lambda _: http.Response(responsecode.NO_CONTENT))
+        return d
+
+class GatewayClient(service.Service):
+    """
+    Service that provides client access to the HTTP Gateway into Idavoll.
+    """
+
+    agent = "Idavoll HTTP Gateway Client"
+
+    def __init__(self, baseURI, callbackHost=None, callbackPort=None):
+        self.baseURI = baseURI
+        self.callbackHost = callbackHost or 'localhost'
+        self.callbackPort = callbackPort or 8087
+        root = resource.Resource()
+        root.child_callback = CallbackResource(lambda *args, **kwargs: self.callback(*args, **kwargs))
+        self.site = server.Site(root)
+
+    def startService(self):
+        self.port = reactor.listenTCP(self.callbackPort,
+                                      channel.HTTPFactory(self.site))
+
+    def stopService(self):
+        return self.port.stopListening()
+
+    def _makeURI(self, verb, query=None):
+        uriComponents = urlparse.urlparse(self.baseURI)
+        uri = urlparse.urlunparse((uriComponents[0],
+                                   uriComponents[1],
+                                   uriComponents[2] + verb,
+                                   '',
+                                   query and urllib.urlencode(query) or '',
+                                   ''))
+        return uri
+
+    def callback(self, data, headers):
+        pass
+
+    def create(self):
+        f = getPageWithFactory(self._makeURI('create'),
+                    method='POST',
+                    agent=self.agent)
+        return f.deferred.addCallback(simplejson.loads)
+
+    def publish(self, entry, xmppURI=None):
+        query = xmppURI and {'uri': xmppURI}
+
+        f = getPageWithFactory(self._makeURI('publish', query),
+                    method='POST',
+                    postdata=entry.toXml().encode('utf-8'),
+                    headers={'Content-Type': MIME_ATOM_ENTRY},
+                    agent=self.agent)
+        return f.deferred.addCallback(simplejson.loads)
+
+    def listNodes(self):
+        f = getPageWithFactory(self._makeURI('list'),
+                    method='GET',
+                    agent=self.agent)
+        return f.deferred.addCallback(simplejson.loads)
+
+    def subscribe(self, xmppURI):
+        params = {'uri': xmppURI,
+                  'callback': 'http://%s:%s/callback' % (self.callbackHost,
+                                                         self.callbackPort)}
+        f = getPageWithFactory(self._makeURI('subscribe'),
+                    method='POST',
+                    postdata=simplejson.dumps(params),
+                    headers={'Content-Type': MIME_JSON},
+                    agent=self.agent)
+        return f.deferred
--- a/idavoll/tap_http.py	Fri Apr 11 13:48:12 2008 +0000
+++ b/idavoll/tap_http.py	Fri Apr 11 14:41:16 2008 +0000
@@ -49,9 +49,9 @@
                                                config['jid'])
     root.child_publish = gateway.PublishResource(bs, config['jid'],
                                                  config['jid'])
+    root.child_list = gateway.ListResource(bs)
     root.child_subscribe = gateway.SubscribeResource(ss)
     root.child_unsubscribe = gateway.UnsubscribeResource(ss)
-    root.child_list = gateway.ListResource(ss)
 
     site = server.Site(root)
     w = internet.TCPServer(int(config['webport']), channel.HTTPFactory(site))
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/idavoll/test/test_gateway.py	Fri Apr 11 14:41:16 2008 +0000
@@ -0,0 +1,150 @@
+# Copyright (c) 2003-2008 Ralph Meijer
+# See LICENSE for details.
+
+"""
+Tests for L{idavoll.gateway}.
+
+Note that some tests are functional tests that require a running idavoll
+service.
+"""
+
+from twisted.internet import defer
+from twisted.trial import unittest
+from twisted.web import error
+from twisted.words.xish import domish
+
+from idavoll import gateway
+
+AGENT = "Idavoll Test Script"
+NS_ATOM = "http://www.w3.org/2005/Atom"
+
+entry = domish.Element((NS_ATOM, 'entry'))
+entry.addElement("id", content="urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a")
+entry.addElement("title", content="Atom-Powered Robots Run Amok")
+entry.addElement("author").addElement("name", content="John Doe")
+entry.addElement("content", content="Some text.")
+
+#baseURI = "http://pubsub-test.ik.nu/"
+baseURI = "http://localhost:8086/"
+componentJID = "test.ik.nu"
+
+class GatewayTest(unittest.TestCase):
+
+    def setUp(self):
+        self.client = gateway.GatewayClient(baseURI)
+        self.client.startService()
+
+    def tearDown(self):
+        self.client.stopService()
+
+    def test_create(self):
+
+        def cb(response):
+            self.assertIn('uri', response)
+
+        d = self.client.create()
+        d.addCallback(cb)
+        return d
+
+    def test_publish(self):
+
+        def cb(response):
+            self.assertIn('uri', response)
+
+        d = self.client.publish(entry)
+        d.addCallback(cb)
+        return d
+
+    def test_publishExistingNode(self):
+
+        def cb2(response, xmppURI):
+            self.assertEquals(xmppURI, response['uri'])
+
+        def cb1(response):
+            xmppURI = response['uri']
+            d = self.client.publish(entry, xmppURI)
+            d.addCallback(cb2, xmppURI)
+            return d
+
+        d = self.client.create()
+        d.addCallback(cb1)
+        return d
+
+    def test_publishNonExisting(self):
+        def cb(err):
+            self.assertEqual('404', err.status)
+
+        d = self.client.publish(entry, 'xmpp:%s?node=test' % componentJID)
+        self.assertFailure(d, error.Error)
+        d.addCallback(cb)
+        return d
+
+    def test_list(self):
+        d = self.client.listNodes()
+        return d
+
+    def test_subscribe(self):
+        def cb(response):
+            xmppURI = response['uri']
+            d = self.client.subscribe(xmppURI)
+            return d
+
+        d = self.client.create()
+        d.addCallback(cb)
+        return d
+
+    def test_subscribeGetNotification(self):
+
+        def onNotification(data, headers):
+            self.client.deferred.callback(None)
+
+        def cb(response):
+            xmppURI = response['uri']
+            d = self.client.subscribe(xmppURI)
+            d.addCallback(lambda _: xmppURI)
+            return d
+
+        def cb2(xmppURI):
+            d = self.client.publish(entry, xmppURI)
+            return d
+
+
+        self.client.callback = onNotification
+        self.client.deferred = defer.Deferred()
+        d = self.client.create()
+        d.addCallback(cb)
+        d.addCallback(cb2)
+        return defer.gatherResults([d, self.client.deferred])
+
+    def test_subscribeGetDelayedNotification(self):
+
+        def onNotification(data, headers):
+            self.client.deferred.callback(None)
+
+        def cb(response):
+            xmppURI = response['uri']
+            self.assertNot(self.client.deferred.called)
+            d = self.client.publish(entry, xmppURI)
+            d.addCallback(lambda _: xmppURI)
+            return d
+
+        def cb2(xmppURI):
+            d = self.client.subscribe(xmppURI)
+            return d
+
+
+        self.client.callback = onNotification
+        self.client.deferred = defer.Deferred()
+        d = self.client.create()
+        d.addCallback(cb)
+        d.addCallback(cb2)
+        return defer.gatherResults([d, self.client.deferred])
+
+    def test_subscribeNonExisting(self):
+        def cb(err):
+            self.assertEqual('404', err.status)
+
+        d = self.client.subscribe('xmpp:%s?node=test' % componentJID)
+        self.assertFailure(d, error.Error)
+        d.addCallback(cb)
+        return d