changeset 273:6ba0d6def7f5

Use twisted.web instead of web2, initial tests.
author Ralph Meijer <ralphm@ik.nu>
date Sun, 20 Jan 2013 13:38:41 +0100
parents 558a43366c9f
children 6641ea7990ee
files sat_pubsub/gateway.py sat_pubsub/tap_http.py sat_pubsub/test/test_gateway.py
diffstat 3 files changed, 533 insertions(+), 202 deletions(-) [+]
line wrap: on
line diff
--- a/sat_pubsub/gateway.py	Tue Sep 09 08:09:26 2014 +0200
+++ b/sat_pubsub/gateway.py	Sun Jan 20 13:38:41 2013 +0100
@@ -56,8 +56,9 @@
 Web resources and client for interacting with pubsub services.
 """
 
-import cgi
+import mimetools
 from time import gmtime, strftime
+from StringIO import StringIO
 import urllib
 import urlparse
 
@@ -66,22 +67,22 @@
 from twisted.application import service
 from twisted.internet import defer, reactor
 from twisted.python import log
-from twisted.web import client
-from twisted.web2 import http, http_headers, resource, responsecode
-from twisted.web2 import channel, server
-from twisted.web2.stream import readStream
+from twisted.web import client, http, resource, server
+from twisted.web.error import Error
 from twisted.words.protocols.jabber.jid import JID
 from twisted.words.protocols.jabber.error import StanzaError
 from twisted.words.xish import domish
 
+from wokkel.generic import parseXml
 from wokkel.pubsub import Item
 from wokkel.pubsub import PubSubClient
 
 from sat_pubsub import error
 
 NS_ATOM = 'http://www.w3.org/2005/Atom'
-MIME_ATOM_ENTRY = 'application/atom+xml;type=entry'
-MIME_JSON = 'application/json'
+MIME_ATOM_ENTRY = b'application/atom+xml;type=entry'
+MIME_ATOM_FEED = b'application/atom+xml;type=feed'
+MIME_JSON = b'application/json'
 
 class XMPPURIParseError(ValueError):
     """
@@ -109,7 +110,7 @@
     try:
         entity, query = rest.split('?', 1)
     except ValueError:
-        raise XMPPURIParseError("No URI query component")
+        entity, query = rest, ''
 
     if not entity:
         raise XMPPURIParseError("Empty URI path component")
@@ -119,7 +120,7 @@
     except Exception, e:
         raise XMPPURIParseError("Invalid JID: %s" % e)
 
-    params = cgi.parse_qs(query)
+    params = urlparse.parse_qs(query)
 
     try:
         nodeIdentifier = params['node'][0]
@@ -138,37 +139,46 @@
 
 
 
-class WebStreamParser(object):
-    def __init__(self):
-        self.elementStream = domish.elementStream()
-        self.elementStream.DocumentStartEvent = self.docStart
-        self.elementStream.ElementEvent = self.elem
-        self.elementStream.DocumentEndEvent = self.docEnd
-        self.done = False
+def _parseContentType(header):
+    """
+    Parse a Content-Type header value to a L{mimetools.Message}.
 
+    L{mimetools.Message} parses a Content-Type header and makes the
+    components available with its C{getmaintype}, C{getsubtype}, C{gettype},
+    C{getplist} and C{getparam} methods.
+    """
+    return mimetools.Message(StringIO(b'Content-Type: ' + header))
 
-    def docStart(self, elem):
-        self.document = elem
 
 
-    def elem(self, elem):
-        self.document.addChild(elem)
-
-
-    def docEnd(self):
-        self.done = True
-
+def _asyncResponse(render):
+    """
+    """
+    def wrapped(self, request):
+        def eb(failure):
+            if failure.check(Error):
+                err = failure.value
+            else:
+                log.err(failure)
+                err = Error(500)
+            request.setResponseCode(err.status, err.message)
+            return err.response
 
-    def parse(self, stream):
-        def endOfStream(result):
-            if not self.done:
-                raise Exception("No more stuff?")
-            else:
-                return self.document
+        def finish(result):
+            if result is server.NOT_DONE_YET:
+                return
+
+            if result:
+                request.write(result)
+            request.finish()
 
-        d = readStream(stream, self.elementStream.parse)
-        d.addCallback(endOfStream)
-        return d
+        d = defer.maybeDeferred(render, self, request)
+        d.addErrback(eb)
+        d.addCallback(finish)
+
+        return server.NOT_DONE_YET
+
+    return wrapped
 
 
 
@@ -185,17 +195,18 @@
     http_GET = None
 
 
-    def http_POST(self, request):
+    @_asyncResponse
+    def render_POST(self, request):
         """
         Respond to a POST request to create a new node.
         """
 
         def toResponse(nodeIdentifier):
             uri = getXMPPURI(self.serviceJID, nodeIdentifier)
-            stream = simplejson.dumps({'uri': uri})
-            contentType = http_headers.MimeType.fromString(MIME_JSON)
-            return http.Response(responsecode.OK, stream=stream,
-                                 headers={'Content-Type': contentType})
+            body = simplejson.dumps({'uri': uri})
+            request.setHeader(b'Content-Type', MIME_JSON)
+            return body
+
         d = self.backend.createNode(None, self.owner)
         d.addCallback(toResponse)
         return d
@@ -212,51 +223,41 @@
         self.owner = owner
 
 
-    http_GET = None
+    render_GET = None
 
 
-    def http_POST(self, request):
+    @_asyncResponse
+    def render_POST(self, request):
         """
         Respond to a POST request to create a new node.
         """
-
-        def gotStream(_):
-            if request.args.get('uri'):
-                jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0])
-                return defer.succeed(nodeIdentifier)
-            else:
-                raise http.HTTPError(http.Response(responsecode.BAD_REQUEST,
-                                                   "No URI given"))
-
-        def doDelete(nodeIdentifier, data):
-            if data:
-                params = simplejson.loads(''.join(data))
-                redirectURI = params.get('redirect_uri')
-            else:
-                redirectURI = None
-
-            return self.backend.deleteNode(nodeIdentifier, self.owner,
-                                           redirectURI)
-
-        def respond(result):
-            return http.Response(responsecode.NO_CONTENT)
-
+        def toResponse(result):
+            request.setResponseCode(http.NO_CONTENT)
 
         def trapNotFound(failure):
             failure.trap(error.NodeNotFound)
-            return http.StatusResponse(responsecode.NOT_FOUND,
-                                       "Node not found")
+            raise Error(http.NOT_FOUND, "Node not found")
 
         def trapXMPPURIParseError(failure):
             failure.trap(XMPPURIParseError)
-            return http.StatusResponse(responsecode.BAD_REQUEST,
-                    "Malformed XMPP URI: %s" % failure.value)
+            raise Error(http.BAD_REQUEST,
+                        "Malformed XMPP URI: %s" % failure.value)
+
+        if not request.args.get('uri'):
+            raise Error(http.BAD_REQUEST, "No URI given")
+
+        jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0])
 
-        data = []
-        d = readStream(request.stream, data.append)
-        d.addCallback(gotStream)
-        d.addCallback(doDelete, data)
-        d.addCallback(respond)
+        data = request.content.read()
+        if data:
+            params = simplejson.loads(data)
+            redirectURI = params.get('redirect_uri', None)
+        else:
+            redirectURI = None
+
+        d = self.backend.deleteNode(nodeIdentifier, self.owner,
+                                    redirectURI)
+        d.addCallback(toResponse)
         d.addErrback(trapNotFound)
         d.addErrback(trapXMPPURIParseError)
         return d
@@ -274,45 +275,37 @@
         self.owner = owner
 
 
-    http_GET = None
+    render_GET = None
 
 
     def checkMediaType(self, request):
-        ctype = request.headers.getHeader('content-type')
+        ctype = request.getHeader(b'content-type')
 
         if not ctype:
-            raise http.HTTPError(
-                http.StatusResponse(
-                    responsecode.BAD_REQUEST,
-                    "No specified Media Type"))
+            request.setResponseCode(http.BAD_REQUEST)
+
+            raise Error(http.BAD_REQUEST, b"No specified Media Type")
 
-        if (ctype.mediaType != 'application' or
-            ctype.mediaSubtype != 'atom+xml' or
-            ctype.params.get('type') != 'entry' or
-            ctype.params.get('charset', 'utf-8') != 'utf-8'):
-            raise http.HTTPError(
-                http.StatusResponse(
-                    responsecode.UNSUPPORTED_MEDIA_TYPE,
-                    "Unsupported Media Type: %s" %
-                        http_headers.generateContentType(ctype)))
+        message = _parseContentType(ctype)
+        if (message.maintype != b'application' or
+            message.subtype != b'atom+xml' or
+            message.getparam(b'type') != b'entry' or
+            (message.getparam(b'charset') or b'utf-8') != b'utf-8'):
+            raise Error(http.UNSUPPORTED_MEDIA_TYPE,
+                              b"Unsupported Media Type: %s" % ctype)
 
 
-    def parseXMLPayload(self, stream):
-        p = WebStreamParser()
-        return p.parse(stream)
-
-
-    def http_POST(self, request):
+    @_asyncResponse
+    def render_POST(self, request):
         """
         Respond to a POST request to create a new item.
         """
 
         def toResponse(nodeIdentifier):
             uri = getXMPPURI(self.serviceJID, nodeIdentifier)
-            stream = simplejson.dumps({'uri': uri})
-            contentType = http_headers.MimeType.fromString(MIME_JSON)
-            return http.Response(responsecode.OK, stream=stream,
-                                 headers={'Content-Type': contentType})
+            body = simplejson.dumps({'uri': uri})
+            request.setHeader(b'Content-Type', MIME_JSON)
+            return body
 
         def gotNode(nodeIdentifier, payload):
             item = Item(id='current', payload=payload)
@@ -327,24 +320,19 @@
             else:
                 return self.backend.createNode(None, self.owner)
 
-        def doPublish(payload):
-            d = getNode()
-            d.addCallback(gotNode, payload)
-            return d
-
         def trapNotFound(failure):
             failure.trap(error.NodeNotFound)
-            return http.StatusResponse(responsecode.NOT_FOUND,
-                                       "Node not found")
+            raise Error(http.NOT_FOUND, "Node not found")
 
         def trapXMPPURIParseError(failure):
             failure.trap(XMPPURIParseError)
-            return http.StatusResponse(responsecode.BAD_REQUEST,
-                    "Malformed XMPP URI: %s" % failure.value)
+            raise Error(http.BAD_REQUEST,
+                        "Malformed XMPP URI: %s" % failure.value)
 
         self.checkMediaType(request)
-        d = self.parseXMLPayload(request.stream)
-        d.addCallback(doPublish)
+        payload = parseXml(request.content.read())
+        d = getNode()
+        d.addCallback(gotNode, payload)
         d.addCallback(toResponse)
         d.addErrback(trapNotFound)
         d.addErrback(trapXMPPURIParseError)
@@ -357,12 +345,12 @@
         self.service = service
 
 
-    def render(self, request):
+    @_asyncResponse
+    def render_GET(self, request):
         def responseFromNodes(nodeIdentifiers):
-            stream = simplejson.dumps(nodeIdentifiers)
-            contentType = http_headers.MimeType.fromString(MIME_JSON)
-            return http.Response(responsecode.OK, stream=stream,
-                                 headers={'Content-Type': contentType})
+            body = simplejson.dumps(nodeIdentifiers)
+            request.setHeader(b'Content-Type', MIME_JSON)
+            return body
 
         d = self.service.getNodes()
         d.addCallback(responseFromNodes)
@@ -377,7 +365,7 @@
     Extract atom entries from a list of publish-subscribe items.
 
     @param items: List of L{domish.Element}s that represent publish-subscribe
-                  items.
+        items.
     @type items: C{list}
     """
 
@@ -461,7 +449,7 @@
                 return
 
             self._postTo([callback], jid, nodeIdentifier, atomEntries[0],
-                         'application/atom+xml;type=entry')
+                         MIME_ATOM_ENTRY)
 
         def subscribeOrItems(hasCallbacks):
             if hasCallbacks:
@@ -514,10 +502,10 @@
             return
 
         if len(atomEntries) == 1:
-            contentType = 'application/atom+xml;type=entry'
+            contentType = MIME_ATOM_ENTRY
             payload = atomEntries[0]
         else:
-            contentType = 'application/atom+xml;type=feed'
+            contentType = MIME_ATOM_FEED
             payload = constructFeed(service, nodeIdentifier, atomEntries,
                                     title='Received item collection')
 
@@ -614,11 +602,11 @@
     serviceMethod = None
     errorMap = {
             error.NodeNotFound:
-                (responsecode.FORBIDDEN, "Node not found"),
+                (http.FORBIDDEN, "Node not found"),
             error.NotSubscribed:
-                (responsecode.FORBIDDEN, "No such subscription found"),
+                (http.FORBIDDEN, "No such subscription found"),
             error.SubscriptionExists:
-                (responsecode.FORBIDDEN, "Subscription already exists"),
+                (http.FORBIDDEN, "Subscription already exists"),
     }
 
     def __init__(self, service):
@@ -626,38 +614,35 @@
         self.params = None
 
 
-    http_GET = None
+    render_GET = None
 
 
-    def http_POST(self, request):
+    @_asyncResponse
+    def render_POST(self, request):
         def trapNotFound(failure):
             err = failure.trap(*self.errorMap.keys())
-            code, msg = self.errorMap[err]
-            return http.StatusResponse(code, msg)
-
-        def respond(result):
-            return http.Response(responsecode.NO_CONTENT)
+            status, message = self.errorMap[err]
+            raise Error(status, message)
 
-        def gotRequest(result):
-            uri = self.params['uri']
-            callback = self.params['callback']
-
-            jid, nodeIdentifier = getServiceAndNode(uri)
-            method = getattr(self.service, self.serviceMethod)
-            d = method(jid, nodeIdentifier, callback)
-            return d
-
-        def storeParams(data):
-            self.params = simplejson.loads(data)
+        def toResponse(result):
+            request.setResponseCode(http.NO_CONTENT)
+            return b''
 
         def trapXMPPURIParseError(failure):
             failure.trap(XMPPURIParseError)
-            return http.StatusResponse(responsecode.BAD_REQUEST,
-                    "Malformed XMPP URI: %s" % failure.value)
+            raise Error(http.BAD_REQUEST,
+                        "Malformed XMPP URI: %s" % failure.value)
+
+        data = request.content.read()
+        self.params = simplejson.loads(data)
 
-        d = readStream(request.stream, storeParams)
-        d.addCallback(gotRequest)
-        d.addCallback(respond)
+        uri = self.params['uri']
+        callback = self.params['callback']
+
+        jid, nodeIdentifier = getServiceAndNode(uri)
+        method = getattr(self.service, self.serviceMethod)
+        d = method(jid, nodeIdentifier, callback)
+        d.addCallback(toResponse)
         d.addErrback(trapNotFound)
         d.addErrback(trapXMPPURIParseError)
         return d
@@ -696,46 +681,45 @@
         self.service = service
 
 
-    def render(self, request):
+    @_asyncResponse
+    def render_GET(self, request):
         try:
             maxItems = int(request.args.get('max_items', [0])[0]) or None
         except ValueError:
-            return http.StatusResponse(responsecode.BAD_REQUEST,
-                    "The argument max_items has an invalid value.")
+            raise Error(http.BAD_REQUEST,
+                        "The argument max_items has an invalid value.")
 
         try:
             uri = request.args['uri'][0]
         except KeyError:
-            return http.StatusResponse(responsecode.BAD_REQUEST,
-                    "No URI for the remote node provided.")
+            raise Error(http.BAD_REQUEST,
+                        "No URI for the remote node provided.")
 
         try:
             jid, nodeIdentifier = getServiceAndNode(uri)
         except XMPPURIParseError:
-            return http.StatusResponse(responsecode.BAD_REQUEST,
-                    "Malformed XMPP URI: %s" % uri)
+            raise Error(http.BAD_REQUEST,
+                        "Malformed XMPP URI: %s" % uri)
 
-        def respond(items):
-            """Create a feed out the retrieved items."""
-            contentType = http_headers.MimeType('application',
-                                                'atom+xml',
-                                                {'type': 'feed'})
+        def toResponse(items):
+            """
+            Create a feed out the retrieved items.
+            """
             atomEntries = extractAtomEntries(items)
             feed = constructFeed(jid, nodeIdentifier, atomEntries,
                                     "Retrieved item collection")
-            payload = feed.toXml().encode('utf-8')
-            return http.Response(responsecode.OK, stream=payload,
-                                 headers={'Content-Type': contentType})
+            body = feed.toXml().encode('utf-8')
+            request.setHeader(b'Content-Type', MIME_ATOM_FEED)
+            return body
 
         def trapNotFound(failure):
             failure.trap(StanzaError)
             if not failure.value.condition == 'item-not-found':
                 raise failure
-            return http.StatusResponse(responsecode.NOT_FOUND,
-                                       "Node not found")
+            raise Error(http.NOT_FOUND, "Node not found")
 
         d = self.service.items(jid, nodeIdentifier, maxItems)
-        d.addCallback(respond)
+        d.addCallback(toResponse)
         d.addErrback(trapNotFound)
         return d
 
@@ -780,15 +764,17 @@
     http_GET = None
 
 
-    def http_POST(self, request):
-        p = WebStreamParser()
-        if not request.headers.hasHeader('Event'):
-            d = p.parse(request.stream)
+    def render_POST(self, request):
+        if request.requestHeaders.hasHeader(b'Event'):
+            payload = None
         else:
-            d = defer.succeed(None)
-        d.addCallback(self.callback, request.headers)
-        d.addCallback(lambda _: http.Response(responsecode.NO_CONTENT))
-        return d
+            payload = parseXml(request.content.read())
+
+        self.callback(payload, request.requestHeaders)
+
+        request.setResponseCode(http.NO_CONTENT)
+        return b''
+
 
 
 
@@ -804,13 +790,14 @@
         self.callbackHost = callbackHost or 'localhost'
         self.callbackPort = callbackPort or 8087
         root = resource.Resource()
-        root.child_callback = CallbackResource(lambda *args, **kwargs: self.callback(*args, **kwargs))
+        root.putChild('callback', CallbackResource(
+                lambda *args, **kwargs: self.callback(*args, **kwargs)))
         self.site = server.Site(root)
 
 
     def startService(self):
         self.port = reactor.listenTCP(self.callbackPort,
-                                      channel.HTTPFactory(self.site))
+                                      self.site)
 
 
     def stopService(self):
--- a/sat_pubsub/tap_http.py	Tue Sep 09 08:09:26 2014 +0200
+++ b/sat_pubsub/tap_http.py	Sun Jan 20 13:38:41 2013 +0100
@@ -52,11 +52,10 @@
 
 """
 
-from twisted.application import internet, service, strports
+from twisted.application import internet, strports
 from twisted.conch import manhole, manhole_ssh
 from twisted.cred import portal, checkers
-from twisted.web2 import channel, log, resource, server
-from twisted.web2.tap import Web2Service
+from twisted.web import resource, server
 
 from sat_pubsub import gateway, tap
 from sat_pubsub.gateway import RemoteSubscriptionService
@@ -106,31 +105,21 @@
     root = resource.Resource()
 
     # Set up resources that exposes the backend
-    root.child_create = gateway.CreateResource(bs, config['jid'],
-                                               config['jid'])
-    root.child_delete = gateway.DeleteResource(bs, config['jid'],
-                                               config['jid'])
-    root.child_publish = gateway.PublishResource(bs, config['jid'],
-                                                 config['jid'])
-    root.child_list = gateway.ListResource(bs)
+    root.putChild('create', gateway.CreateResource(bs, config['jid'],
+                                                   config['jid']))
+    root.putChild('delete', gateway.DeleteResource(bs, config['jid'],
+                                                   config['jid']))
+    root.putChild('publish', gateway.PublishResource(bs, config['jid'],
+                                                     config['jid']))
+    root.putChild('list', gateway.ListResource(bs))
 
     # Set up resources for accessing remote pubsub nodes.
-    root.child_subscribe = gateway.RemoteSubscribeResource(ss)
-    root.child_unsubscribe = gateway.RemoteUnsubscribeResource(ss)
-    root.child_items = gateway.RemoteItemsResource(ss)
-
-    if config["verbose"]:
-        root = log.LogWrapperResource(root)
+    root.putChild('subscribe', gateway.RemoteSubscribeResource(ss))
+    root.putChild('unsubscribe', gateway.RemoteUnsubscribeResource(ss))
+    root.putChild('items', gateway.RemoteItemsResource(ss))
 
     site = server.Site(root)
-    w = internet.TCPServer(int(config['webport']), channel.HTTPFactory(site))
-
-    if config["verbose"]:
-        logObserver = log.DefaultCommonAccessLoggingObserver()
-        w2s = Web2Service(logObserver)
-        w.setServiceParent(w2s)
-        w = w2s
-
+    w = internet.TCPServer(int(config['webport']), site)
     w.setServiceParent(s)
 
     # Set up a manhole
--- a/sat_pubsub/test/test_gateway.py	Tue Sep 09 08:09:26 2014 +0200
+++ b/sat_pubsub/test/test_gateway.py	Sun Jan 20 13:38:41 2013 +0100
@@ -59,12 +59,20 @@
 service.
 """
 
+from StringIO import StringIO
+
+import simplejson
+
 from twisted.internet import defer
 from twisted.trial import unittest
-from twisted.web import error
+from twisted.web import error, http, http_headers, server
+from twisted.web.test import requesthelper
 from twisted.words.xish import domish
+from twisted.words.protocols.jabber.jid import JID
 
 from sat_pubsub import gateway
+from sat_pubsub.backend import BackendService
+from sat_pubsub.memory_storage import Storage
 
 AGENT = "Idavoll Test Script"
 NS_ATOM = "http://www.w3.org/2005/Atom"
@@ -77,7 +85,350 @@
 TEST_ENTRY.addElement("content", content="Some text.")
 
 baseURI = "http://localhost:8086/"
-componentJID = "pubsub"
+component = "pubsub"
+componentJID = JID(component)
+ownerJID = JID('owner@example.org')
+
+def _render(resource, request):
+    result = resource.render(request)
+    if isinstance(result, str):
+        request.write(result)
+        request.finish()
+        return defer.succeed(None)
+    elif result is server.NOT_DONE_YET:
+        if request.finished:
+            return defer.succeed(None)
+        else:
+            return request.notifyFinish()
+    else:
+        raise ValueError("Unexpected return value: %r" % (result,))
+
+
+class DummyRequest(requesthelper.DummyRequest):
+
+    def __init__(self, *args, **kwargs):
+        requesthelper.DummyRequest.__init__(self, *args, **kwargs)
+        self.requestHeaders = http_headers.Headers()
+
+
+
+class GetServiceAndNodeTest(unittest.TestCase):
+    """
+    Tests for {gateway.getServiceAndNode}.
+    """
+
+    def test_basic(self):
+        """
+        getServiceAndNode parses an XMPP URI with node parameter.
+        """
+        uri = b'xmpp:pubsub.example.org?;node=test'
+        service, nodeIdentifier = gateway.getServiceAndNode(uri)
+        self.assertEqual(JID(u'pubsub.example.org'), service)
+        self.assertEqual(u'test', nodeIdentifier)
+
+
+    def test_schemeEmpty(self):
+        """
+        If the URI scheme is empty, an exception is raised.
+        """
+        uri = b'pubsub.example.org'
+        self.assertRaises(gateway.XMPPURIParseError,
+                          gateway.getServiceAndNode, uri)
+
+
+    def test_schemeNotXMPP(self):
+        """
+        If the URI scheme is not 'xmpp', an exception is raised.
+        """
+        uri = b'mailto:test@example.org'
+        self.assertRaises(gateway.XMPPURIParseError,
+                          gateway.getServiceAndNode, uri)
+
+
+    def test_authorityPresent(self):
+        """
+        If the URI has an authority component, an exception is raised.
+        """
+        uri = b'xmpp://pubsub.example.org/'
+        self.assertRaises(gateway.XMPPURIParseError,
+                          gateway.getServiceAndNode, uri)
+
+
+    def test_queryEmpty(self):
+        """
+        If there is no query component, the nodeIdentifier is empty.
+        """
+        uri = b'xmpp:pubsub.example.org'
+        service, nodeIdentifier = gateway.getServiceAndNode(uri)
+
+        self.assertEqual(JID(u'pubsub.example.org'), service)
+        self.assertEqual(u'', nodeIdentifier)
+
+
+    def test_jidInvalid(self):
+        """
+        If the JID from the path component is invalid, an exception is raised.
+        """
+        uri = b'xmpp:@@pubsub.example.org?;node=test'
+        self.assertRaises(gateway.XMPPURIParseError,
+                          gateway.getServiceAndNode, uri)
+
+
+    def test_pathEmpty(self):
+        """
+        If there is no path component, an exception is raised.
+        """
+        uri = b'xmpp:?node=test'
+        self.assertRaises(gateway.XMPPURIParseError,
+                          gateway.getServiceAndNode, uri)
+
+
+    def test_nodeAbsent(self):
+        """
+        If the node parameter is missing, the nodeIdentifier is empty.
+        """
+        uri = b'xmpp:pubsub.example.org?'
+        service, nodeIdentifier = gateway.getServiceAndNode(uri)
+
+        self.assertEqual(JID(u'pubsub.example.org'), service)
+        self.assertEqual(u'', nodeIdentifier)
+
+
+
+class GetXMPPURITest(unittest.TestCase):
+    """
+    Tests for L{gateway.getXMPPURITest}.
+    """
+
+    def test_basic(self):
+        uri = gateway.getXMPPURI(JID(u'pubsub.example.org'), u'test')
+        self.assertEqual('xmpp:pubsub.example.org?;node=test', uri)
+
+
+class CreateResourceTest(unittest.TestCase):
+    """
+    Tests for L{gateway.CreateResource}.
+    """
+
+    def setUp(self):
+        self.backend = BackendService(Storage())
+        self.resource = gateway.CreateResource(self.backend, componentJID,
+                                               ownerJID)
+
+
+    def test_get(self):
+        """
+        The method GET is not supported.
+        """
+        request = DummyRequest([b''])
+        self.assertRaises(error.UnsupportedMethod,
+                          _render, self.resource, request)
+
+
+    def test_post(self):
+        """
+        Upon a POST, a new node is created and the URI returned.
+        """
+        request = DummyRequest([b''])
+        request.method = 'POST'
+
+        def gotNodes(nodeIdentifiers, uri):
+            service, nodeIdentifier = gateway.getServiceAndNode(uri)
+            self.assertIn(nodeIdentifier, nodeIdentifiers)
+
+        def rendered(result):
+            self.assertEqual('application/json',
+                             request.outgoingHeaders['content-type'])
+            payload = simplejson.loads(b''.join(request.written))
+            self.assertIn('uri', payload)
+            d = self.backend.getNodes()
+            d.addCallback(gotNodes, payload['uri'])
+            return d
+
+        d = _render(self.resource, request)
+        d.addCallback(rendered)
+        return d
+
+
+
+class DeleteResourceTest(unittest.TestCase):
+    """
+    Tests for L{gateway.DeleteResource}.
+    """
+
+    def setUp(self):
+        self.backend = BackendService(Storage())
+        self.resource = gateway.DeleteResource(self.backend, componentJID,
+                                               ownerJID)
+
+
+    def test_get(self):
+        """
+        The method GET is not supported.
+        """
+        request = DummyRequest([b''])
+        self.assertRaises(error.UnsupportedMethod,
+                          _render, self.resource, request)
+
+
+    def test_post(self):
+        """
+        Upon a POST, a new node is created and the URI returned.
+        """
+        request = DummyRequest([b''])
+        request.method = b'POST'
+
+        def rendered(result):
+            self.assertEqual(http.NO_CONTENT, request.responseCode)
+
+        def nodeCreated(nodeIdentifier):
+            uri = gateway.getXMPPURI(componentJID, nodeIdentifier)
+            request.args[b'uri'] = [uri]
+            request.content = StringIO(b'')
+
+            return _render(self.resource, request)
+
+        d = self.backend.createNode(u'test', ownerJID)
+        d.addCallback(nodeCreated)
+        d.addCallback(rendered)
+        return d
+
+
+    def test_postWithRedirect(self):
+        """
+        Upon a POST, a new node is created and the URI returned.
+        """
+        request = DummyRequest([b''])
+        request.method = b'POST'
+        otherNodeURI = b'xmpp:pubsub.example.org?node=other'
+
+        def rendered(result):
+            self.assertEqual(http.NO_CONTENT, request.responseCode)
+            self.assertEqual(1, len(deletes))
+            nodeIdentifier, owner, redirectURI = deletes[-1]
+            self.assertEqual(otherNodeURI, redirectURI)
+
+        def nodeCreated(nodeIdentifier):
+            uri = gateway.getXMPPURI(componentJID, nodeIdentifier)
+            request.args[b'uri'] = [uri]
+            payload = {b'redirect_uri': otherNodeURI}
+            body = simplejson.dumps(payload)
+            request.content = StringIO(body)
+            return _render(self.resource, request)
+
+        def deleteNode(nodeIdentifier, owner, redirectURI):
+            deletes.append((nodeIdentifier, owner, redirectURI))
+            return defer.succeed(nodeIdentifier)
+
+        deletes = []
+        self.patch(self.backend, 'deleteNode', deleteNode)
+        d = self.backend.createNode(u'test', ownerJID)
+        d.addCallback(nodeCreated)
+        d.addCallback(rendered)
+        return d
+
+
+    def test_postUnknownNode(self):
+        """
+        If the node to be deleted is unknown, 404 Not Found is returned.
+        """
+        request = DummyRequest([b''])
+        request.method = b'POST'
+
+        def rendered(result):
+            self.assertEqual(http.NOT_FOUND, request.responseCode)
+
+        uri = gateway.getXMPPURI(componentJID, u'unknown')
+        request.args[b'uri'] = [uri]
+        request.content = StringIO(b'')
+
+        d = _render(self.resource, request)
+        d.addCallback(rendered)
+        return d
+
+
+    def test_postURIMissing(self):
+        """
+        If no URI is passed, 400 Bad Request is returned.
+        """
+        request = DummyRequest([b''])
+        request.method = b'POST'
+
+        def rendered(result):
+            self.assertEqual(http.BAD_REQUEST, request.responseCode)
+
+        request.content = StringIO(b'')
+
+        d = _render(self.resource, request)
+        d.addCallback(rendered)
+        return d
+
+
+class CallbackResourceTest(unittest.TestCase):
+    """
+    Tests for L{gateway.CallbackResource}.
+    """
+
+    def setUp(self):
+        self.callbackEvents = []
+        self.resource = gateway.CallbackResource(self._callback)
+
+
+    def _callback(self, payload, headers):
+        self.callbackEvents.append((payload, headers))
+
+
+    def test_get(self):
+        """
+        The method GET is not supported.
+        """
+        request = DummyRequest([b''])
+        self.assertRaises(error.UnsupportedMethod,
+                          _render, self.resource, request)
+
+
+    def test_post(self):
+        """
+        The body posted is passed to the callback.
+        """
+        request = DummyRequest([b''])
+        request.method = 'POST'
+        request.content = StringIO(b'<root><child/></root>')
+
+        def rendered(result):
+            self.assertEqual(1, len(self.callbackEvents))
+            payload, headers = self.callbackEvents[-1]
+            self.assertEqual('root', payload.name)
+
+            self.assertEqual(http.NO_CONTENT, request.responseCode)
+            self.assertFalse(b''.join(request.written))
+
+        d = _render(self.resource, request)
+        d.addCallback(rendered)
+        return d
+
+
+    def test_postEvent(self):
+        """
+        If the Event header is set, the payload is empty and the header passed.
+        """
+        request = DummyRequest([b''])
+        request.method = 'POST'
+        request.requestHeaders.addRawHeader(b'Event', b'DELETE')
+        request.content = StringIO(b'')
+
+        def rendered(result):
+            self.assertEqual(1, len(self.callbackEvents))
+            payload, headers = self.callbackEvents[-1]
+            self.assertIdentical(None, payload)
+            self.assertEqual(['DELETE'], headers.getRawHeaders(b'Event'))
+            self.assertFalse(b''.join(request.written))
+
+        d = _render(self.resource, request)
+        d.addCallback(rendered)
+        return d
+
+
 
 class GatewayTest(unittest.TestCase):
     timeout = 2
@@ -143,7 +494,7 @@
         def cb(err):
             self.assertEqual('404', err.status)
 
-        d = self.client.publish(TEST_ENTRY, 'xmpp:%s?node=test' % componentJID)
+        d = self.client.publish(TEST_ENTRY, 'xmpp:%s?node=test' % component)
         self.assertFailure(d, error.Error)
         d.addCallback(cb)
         return d
@@ -161,7 +512,7 @@
     def test_deleteWithRedirect(self):
         def cb(response):
             xmppURI = response['uri']
-            redirectURI = 'xmpp:%s?node=test' % componentJID
+            redirectURI = 'xmpp:%s?node=test' % component
             d = self.client.delete(xmppURI, redirectURI)
             return d
 
@@ -198,7 +549,7 @@
         return defer.gatherResults([d, self.client.deferred])
 
     def test_deleteNotificationWithRedirect(self):
-        redirectURI = 'xmpp:%s?node=test' % componentJID
+        redirectURI = 'xmpp:%s?node=test' % component
 
         def onNotification(data, headers):
             try:
@@ -385,7 +736,7 @@
         def cb(err):
             self.assertEqual('403', err.status)
 
-        d = self.client.subscribe('xmpp:%s?node=test' % componentJID)
+        d = self.client.subscribe('xmpp:%s?node=test' % component)
         self.assertFailure(d, error.Error)
         d.addCallback(cb)
         return d
@@ -393,6 +744,9 @@
 
     def test_subscribeRootGetNotification(self):
 
+        def clean(rootNode):
+            return self.client.unsubscribe(rootNode)
+
         def onNotification(data, headers):
             self.client.deferred.callback(None)
 
@@ -402,6 +756,7 @@
             rootNode = gateway.getXMPPURI(jid, '')
 
             d = self.client.subscribe(rootNode)
+            d.addCallback(lambda _: self.addCleanup(clean, rootNode))
             d.addCallback(lambda _: xmppURI)
             return d
 
@@ -421,7 +776,7 @@
         def cb(err):
             self.assertEqual('403', err.status)
 
-        d = self.client.unsubscribe('xmpp:%s?node=test' % componentJID)
+        d = self.client.unsubscribe('xmpp:%s?node=test' % component)
         self.assertFailure(d, error.Error)
         d.addCallback(cb)
         return d