diff sat_pubsub/gateway.py @ 273:6ba0d6def7f5

Use twisted.web instead of web2, initial tests.
author Ralph Meijer <ralphm@ik.nu>
date Sun, 20 Jan 2013 13:38:41 +0100
parents d55620ceafed
children 6641ea7990ee
line wrap: on
line diff
--- a/sat_pubsub/gateway.py	Tue Sep 09 08:09:26 2014 +0200
+++ b/sat_pubsub/gateway.py	Sun Jan 20 13:38:41 2013 +0100
@@ -56,8 +56,9 @@
 Web resources and client for interacting with pubsub services.
 """
 
-import cgi
+import mimetools
 from time import gmtime, strftime
+from StringIO import StringIO
 import urllib
 import urlparse
 
@@ -66,22 +67,22 @@
 from twisted.application import service
 from twisted.internet import defer, reactor
 from twisted.python import log
-from twisted.web import client
-from twisted.web2 import http, http_headers, resource, responsecode
-from twisted.web2 import channel, server
-from twisted.web2.stream import readStream
+from twisted.web import client, http, resource, server
+from twisted.web.error import Error
 from twisted.words.protocols.jabber.jid import JID
 from twisted.words.protocols.jabber.error import StanzaError
 from twisted.words.xish import domish
 
+from wokkel.generic import parseXml
 from wokkel.pubsub import Item
 from wokkel.pubsub import PubSubClient
 
 from sat_pubsub import error
 
 NS_ATOM = 'http://www.w3.org/2005/Atom'
-MIME_ATOM_ENTRY = 'application/atom+xml;type=entry'
-MIME_JSON = 'application/json'
+MIME_ATOM_ENTRY = b'application/atom+xml;type=entry'
+MIME_ATOM_FEED = b'application/atom+xml;type=feed'
+MIME_JSON = b'application/json'
 
 class XMPPURIParseError(ValueError):
     """
@@ -109,7 +110,7 @@
     try:
         entity, query = rest.split('?', 1)
     except ValueError:
-        raise XMPPURIParseError("No URI query component")
+        entity, query = rest, ''
 
     if not entity:
         raise XMPPURIParseError("Empty URI path component")
@@ -119,7 +120,7 @@
     except Exception, e:
         raise XMPPURIParseError("Invalid JID: %s" % e)
 
-    params = cgi.parse_qs(query)
+    params = urlparse.parse_qs(query)
 
     try:
         nodeIdentifier = params['node'][0]
@@ -138,37 +139,46 @@
 
 
 
-class WebStreamParser(object):
-    def __init__(self):
-        self.elementStream = domish.elementStream()
-        self.elementStream.DocumentStartEvent = self.docStart
-        self.elementStream.ElementEvent = self.elem
-        self.elementStream.DocumentEndEvent = self.docEnd
-        self.done = False
+def _parseContentType(header):
+    """
+    Parse a Content-Type header value to a L{mimetools.Message}.
 
+    L{mimetools.Message} parses a Content-Type header and makes the
+    components available with its C{getmaintype}, C{getsubtype}, C{gettype},
+    C{getplist} and C{getparam} methods.
+    """
+    return mimetools.Message(StringIO(b'Content-Type: ' + header))
 
-    def docStart(self, elem):
-        self.document = elem
 
 
-    def elem(self, elem):
-        self.document.addChild(elem)
-
-
-    def docEnd(self):
-        self.done = True
-
+def _asyncResponse(render):
+    """
+    """
+    def wrapped(self, request):
+        def eb(failure):
+            if failure.check(Error):
+                err = failure.value
+            else:
+                log.err(failure)
+                err = Error(500)
+            request.setResponseCode(err.status, err.message)
+            return err.response
 
-    def parse(self, stream):
-        def endOfStream(result):
-            if not self.done:
-                raise Exception("No more stuff?")
-            else:
-                return self.document
+        def finish(result):
+            if result is server.NOT_DONE_YET:
+                return
+
+            if result:
+                request.write(result)
+            request.finish()
 
-        d = readStream(stream, self.elementStream.parse)
-        d.addCallback(endOfStream)
-        return d
+        d = defer.maybeDeferred(render, self, request)
+        d.addErrback(eb)
+        d.addCallback(finish)
+
+        return server.NOT_DONE_YET
+
+    return wrapped
 
 
 
@@ -185,17 +195,18 @@
     http_GET = None
 
 
-    def http_POST(self, request):
+    @_asyncResponse
+    def render_POST(self, request):
         """
         Respond to a POST request to create a new node.
         """
 
         def toResponse(nodeIdentifier):
             uri = getXMPPURI(self.serviceJID, nodeIdentifier)
-            stream = simplejson.dumps({'uri': uri})
-            contentType = http_headers.MimeType.fromString(MIME_JSON)
-            return http.Response(responsecode.OK, stream=stream,
-                                 headers={'Content-Type': contentType})
+            body = simplejson.dumps({'uri': uri})
+            request.setHeader(b'Content-Type', MIME_JSON)
+            return body
+
         d = self.backend.createNode(None, self.owner)
         d.addCallback(toResponse)
         return d
@@ -212,51 +223,41 @@
         self.owner = owner
 
 
-    http_GET = None
+    render_GET = None
 
 
-    def http_POST(self, request):
+    @_asyncResponse
+    def render_POST(self, request):
         """
         Respond to a POST request to create a new node.
         """
-
-        def gotStream(_):
-            if request.args.get('uri'):
-                jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0])
-                return defer.succeed(nodeIdentifier)
-            else:
-                raise http.HTTPError(http.Response(responsecode.BAD_REQUEST,
-                                                   "No URI given"))
-
-        def doDelete(nodeIdentifier, data):
-            if data:
-                params = simplejson.loads(''.join(data))
-                redirectURI = params.get('redirect_uri')
-            else:
-                redirectURI = None
-
-            return self.backend.deleteNode(nodeIdentifier, self.owner,
-                                           redirectURI)
-
-        def respond(result):
-            return http.Response(responsecode.NO_CONTENT)
-
+        def toResponse(result):
+            request.setResponseCode(http.NO_CONTENT)
 
         def trapNotFound(failure):
             failure.trap(error.NodeNotFound)
-            return http.StatusResponse(responsecode.NOT_FOUND,
-                                       "Node not found")
+            raise Error(http.NOT_FOUND, "Node not found")
 
         def trapXMPPURIParseError(failure):
             failure.trap(XMPPURIParseError)
-            return http.StatusResponse(responsecode.BAD_REQUEST,
-                    "Malformed XMPP URI: %s" % failure.value)
+            raise Error(http.BAD_REQUEST,
+                        "Malformed XMPP URI: %s" % failure.value)
+
+        if not request.args.get('uri'):
+            raise Error(http.BAD_REQUEST, "No URI given")
+
+        jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0])
 
-        data = []
-        d = readStream(request.stream, data.append)
-        d.addCallback(gotStream)
-        d.addCallback(doDelete, data)
-        d.addCallback(respond)
+        data = request.content.read()
+        if data:
+            params = simplejson.loads(data)
+            redirectURI = params.get('redirect_uri', None)
+        else:
+            redirectURI = None
+
+        d = self.backend.deleteNode(nodeIdentifier, self.owner,
+                                    redirectURI)
+        d.addCallback(toResponse)
         d.addErrback(trapNotFound)
         d.addErrback(trapXMPPURIParseError)
         return d
@@ -274,45 +275,37 @@
         self.owner = owner
 
 
-    http_GET = None
+    render_GET = None
 
 
     def checkMediaType(self, request):
-        ctype = request.headers.getHeader('content-type')
+        ctype = request.getHeader(b'content-type')
 
         if not ctype:
-            raise http.HTTPError(
-                http.StatusResponse(
-                    responsecode.BAD_REQUEST,
-                    "No specified Media Type"))
+            request.setResponseCode(http.BAD_REQUEST)
+
+            raise Error(http.BAD_REQUEST, b"No specified Media Type")
 
-        if (ctype.mediaType != 'application' or
-            ctype.mediaSubtype != 'atom+xml' or
-            ctype.params.get('type') != 'entry' or
-            ctype.params.get('charset', 'utf-8') != 'utf-8'):
-            raise http.HTTPError(
-                http.StatusResponse(
-                    responsecode.UNSUPPORTED_MEDIA_TYPE,
-                    "Unsupported Media Type: %s" %
-                        http_headers.generateContentType(ctype)))
+        message = _parseContentType(ctype)
+        if (message.maintype != b'application' or
+            message.subtype != b'atom+xml' or
+            message.getparam(b'type') != b'entry' or
+            (message.getparam(b'charset') or b'utf-8') != b'utf-8'):
+            raise Error(http.UNSUPPORTED_MEDIA_TYPE,
+                              b"Unsupported Media Type: %s" % ctype)
 
 
-    def parseXMLPayload(self, stream):
-        p = WebStreamParser()
-        return p.parse(stream)
-
-
-    def http_POST(self, request):
+    @_asyncResponse
+    def render_POST(self, request):
         """
         Respond to a POST request to create a new item.
         """
 
         def toResponse(nodeIdentifier):
             uri = getXMPPURI(self.serviceJID, nodeIdentifier)
-            stream = simplejson.dumps({'uri': uri})
-            contentType = http_headers.MimeType.fromString(MIME_JSON)
-            return http.Response(responsecode.OK, stream=stream,
-                                 headers={'Content-Type': contentType})
+            body = simplejson.dumps({'uri': uri})
+            request.setHeader(b'Content-Type', MIME_JSON)
+            return body
 
         def gotNode(nodeIdentifier, payload):
             item = Item(id='current', payload=payload)
@@ -327,24 +320,19 @@
             else:
                 return self.backend.createNode(None, self.owner)
 
-        def doPublish(payload):
-            d = getNode()
-            d.addCallback(gotNode, payload)
-            return d
-
         def trapNotFound(failure):
             failure.trap(error.NodeNotFound)
-            return http.StatusResponse(responsecode.NOT_FOUND,
-                                       "Node not found")
+            raise Error(http.NOT_FOUND, "Node not found")
 
         def trapXMPPURIParseError(failure):
             failure.trap(XMPPURIParseError)
-            return http.StatusResponse(responsecode.BAD_REQUEST,
-                    "Malformed XMPP URI: %s" % failure.value)
+            raise Error(http.BAD_REQUEST,
+                        "Malformed XMPP URI: %s" % failure.value)
 
         self.checkMediaType(request)
-        d = self.parseXMLPayload(request.stream)
-        d.addCallback(doPublish)
+        payload = parseXml(request.content.read())
+        d = getNode()
+        d.addCallback(gotNode, payload)
         d.addCallback(toResponse)
         d.addErrback(trapNotFound)
         d.addErrback(trapXMPPURIParseError)
@@ -357,12 +345,12 @@
         self.service = service
 
 
-    def render(self, request):
+    @_asyncResponse
+    def render_GET(self, request):
         def responseFromNodes(nodeIdentifiers):
-            stream = simplejson.dumps(nodeIdentifiers)
-            contentType = http_headers.MimeType.fromString(MIME_JSON)
-            return http.Response(responsecode.OK, stream=stream,
-                                 headers={'Content-Type': contentType})
+            body = simplejson.dumps(nodeIdentifiers)
+            request.setHeader(b'Content-Type', MIME_JSON)
+            return body
 
         d = self.service.getNodes()
         d.addCallback(responseFromNodes)
@@ -377,7 +365,7 @@
     Extract atom entries from a list of publish-subscribe items.
 
     @param items: List of L{domish.Element}s that represent publish-subscribe
-                  items.
+        items.
     @type items: C{list}
     """
 
@@ -461,7 +449,7 @@
                 return
 
             self._postTo([callback], jid, nodeIdentifier, atomEntries[0],
-                         'application/atom+xml;type=entry')
+                         MIME_ATOM_ENTRY)
 
         def subscribeOrItems(hasCallbacks):
             if hasCallbacks:
@@ -514,10 +502,10 @@
             return
 
         if len(atomEntries) == 1:
-            contentType = 'application/atom+xml;type=entry'
+            contentType = MIME_ATOM_ENTRY
             payload = atomEntries[0]
         else:
-            contentType = 'application/atom+xml;type=feed'
+            contentType = MIME_ATOM_FEED
             payload = constructFeed(service, nodeIdentifier, atomEntries,
                                     title='Received item collection')
 
@@ -614,11 +602,11 @@
     serviceMethod = None
     errorMap = {
             error.NodeNotFound:
-                (responsecode.FORBIDDEN, "Node not found"),
+                (http.FORBIDDEN, "Node not found"),
             error.NotSubscribed:
-                (responsecode.FORBIDDEN, "No such subscription found"),
+                (http.FORBIDDEN, "No such subscription found"),
             error.SubscriptionExists:
-                (responsecode.FORBIDDEN, "Subscription already exists"),
+                (http.FORBIDDEN, "Subscription already exists"),
     }
 
     def __init__(self, service):
@@ -626,38 +614,35 @@
         self.params = None
 
 
-    http_GET = None
+    render_GET = None
 
 
-    def http_POST(self, request):
+    @_asyncResponse
+    def render_POST(self, request):
         def trapNotFound(failure):
             err = failure.trap(*self.errorMap.keys())
-            code, msg = self.errorMap[err]
-            return http.StatusResponse(code, msg)
-
-        def respond(result):
-            return http.Response(responsecode.NO_CONTENT)
+            status, message = self.errorMap[err]
+            raise Error(status, message)
 
-        def gotRequest(result):
-            uri = self.params['uri']
-            callback = self.params['callback']
-
-            jid, nodeIdentifier = getServiceAndNode(uri)
-            method = getattr(self.service, self.serviceMethod)
-            d = method(jid, nodeIdentifier, callback)
-            return d
-
-        def storeParams(data):
-            self.params = simplejson.loads(data)
+        def toResponse(result):
+            request.setResponseCode(http.NO_CONTENT)
+            return b''
 
         def trapXMPPURIParseError(failure):
             failure.trap(XMPPURIParseError)
-            return http.StatusResponse(responsecode.BAD_REQUEST,
-                    "Malformed XMPP URI: %s" % failure.value)
+            raise Error(http.BAD_REQUEST,
+                        "Malformed XMPP URI: %s" % failure.value)
+
+        data = request.content.read()
+        self.params = simplejson.loads(data)
 
-        d = readStream(request.stream, storeParams)
-        d.addCallback(gotRequest)
-        d.addCallback(respond)
+        uri = self.params['uri']
+        callback = self.params['callback']
+
+        jid, nodeIdentifier = getServiceAndNode(uri)
+        method = getattr(self.service, self.serviceMethod)
+        d = method(jid, nodeIdentifier, callback)
+        d.addCallback(toResponse)
         d.addErrback(trapNotFound)
         d.addErrback(trapXMPPURIParseError)
         return d
@@ -696,46 +681,45 @@
         self.service = service
 
 
-    def render(self, request):
+    @_asyncResponse
+    def render_GET(self, request):
         try:
             maxItems = int(request.args.get('max_items', [0])[0]) or None
         except ValueError:
-            return http.StatusResponse(responsecode.BAD_REQUEST,
-                    "The argument max_items has an invalid value.")
+            raise Error(http.BAD_REQUEST,
+                        "The argument max_items has an invalid value.")
 
         try:
             uri = request.args['uri'][0]
         except KeyError:
-            return http.StatusResponse(responsecode.BAD_REQUEST,
-                    "No URI for the remote node provided.")
+            raise Error(http.BAD_REQUEST,
+                        "No URI for the remote node provided.")
 
         try:
             jid, nodeIdentifier = getServiceAndNode(uri)
         except XMPPURIParseError:
-            return http.StatusResponse(responsecode.BAD_REQUEST,
-                    "Malformed XMPP URI: %s" % uri)
+            raise Error(http.BAD_REQUEST,
+                        "Malformed XMPP URI: %s" % uri)
 
-        def respond(items):
-            """Create a feed out the retrieved items."""
-            contentType = http_headers.MimeType('application',
-                                                'atom+xml',
-                                                {'type': 'feed'})
+        def toResponse(items):
+            """
+            Create a feed out the retrieved items.
+            """
             atomEntries = extractAtomEntries(items)
             feed = constructFeed(jid, nodeIdentifier, atomEntries,
                                     "Retrieved item collection")
-            payload = feed.toXml().encode('utf-8')
-            return http.Response(responsecode.OK, stream=payload,
-                                 headers={'Content-Type': contentType})
+            body = feed.toXml().encode('utf-8')
+            request.setHeader(b'Content-Type', MIME_ATOM_FEED)
+            return body
 
         def trapNotFound(failure):
             failure.trap(StanzaError)
             if not failure.value.condition == 'item-not-found':
                 raise failure
-            return http.StatusResponse(responsecode.NOT_FOUND,
-                                       "Node not found")
+            raise Error(http.NOT_FOUND, "Node not found")
 
         d = self.service.items(jid, nodeIdentifier, maxItems)
-        d.addCallback(respond)
+        d.addCallback(toResponse)
         d.addErrback(trapNotFound)
         return d
 
@@ -780,15 +764,17 @@
     http_GET = None
 
 
-    def http_POST(self, request):
-        p = WebStreamParser()
-        if not request.headers.hasHeader('Event'):
-            d = p.parse(request.stream)
+    def render_POST(self, request):
+        if request.requestHeaders.hasHeader(b'Event'):
+            payload = None
         else:
-            d = defer.succeed(None)
-        d.addCallback(self.callback, request.headers)
-        d.addCallback(lambda _: http.Response(responsecode.NO_CONTENT))
-        return d
+            payload = parseXml(request.content.read())
+
+        self.callback(payload, request.requestHeaders)
+
+        request.setResponseCode(http.NO_CONTENT)
+        return b''
+
 
 
 
@@ -804,13 +790,14 @@
         self.callbackHost = callbackHost or 'localhost'
         self.callbackPort = callbackPort or 8087
         root = resource.Resource()
-        root.child_callback = CallbackResource(lambda *args, **kwargs: self.callback(*args, **kwargs))
+        root.putChild('callback', CallbackResource(
+                lambda *args, **kwargs: self.callback(*args, **kwargs)))
         self.site = server.Site(root)
 
 
     def startService(self):
         self.port = reactor.listenTCP(self.callbackPort,
-                                      channel.HTTPFactory(self.site))
+                                      self.site)
 
 
     def stopService(self):