Mercurial > libervia-pubsub
diff sat_pubsub/gateway.py @ 273:6ba0d6def7f5
Use twisted.web instead of web2, initial tests.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Sun, 20 Jan 2013 13:38:41 +0100 |
parents | d55620ceafed |
children | 6641ea7990ee |
line wrap: on
line diff
--- a/sat_pubsub/gateway.py Tue Sep 09 08:09:26 2014 +0200 +++ b/sat_pubsub/gateway.py Sun Jan 20 13:38:41 2013 +0100 @@ -56,8 +56,9 @@ Web resources and client for interacting with pubsub services. """ -import cgi +import mimetools from time import gmtime, strftime +from StringIO import StringIO import urllib import urlparse @@ -66,22 +67,22 @@ from twisted.application import service from twisted.internet import defer, reactor from twisted.python import log -from twisted.web import client -from twisted.web2 import http, http_headers, resource, responsecode -from twisted.web2 import channel, server -from twisted.web2.stream import readStream +from twisted.web import client, http, resource, server +from twisted.web.error import Error from twisted.words.protocols.jabber.jid import JID from twisted.words.protocols.jabber.error import StanzaError from twisted.words.xish import domish +from wokkel.generic import parseXml from wokkel.pubsub import Item from wokkel.pubsub import PubSubClient from sat_pubsub import error NS_ATOM = 'http://www.w3.org/2005/Atom' -MIME_ATOM_ENTRY = 'application/atom+xml;type=entry' -MIME_JSON = 'application/json' +MIME_ATOM_ENTRY = b'application/atom+xml;type=entry' +MIME_ATOM_FEED = b'application/atom+xml;type=feed' +MIME_JSON = b'application/json' class XMPPURIParseError(ValueError): """ @@ -109,7 +110,7 @@ try: entity, query = rest.split('?', 1) except ValueError: - raise XMPPURIParseError("No URI query component") + entity, query = rest, '' if not entity: raise XMPPURIParseError("Empty URI path component") @@ -119,7 +120,7 @@ except Exception, e: raise XMPPURIParseError("Invalid JID: %s" % e) - params = cgi.parse_qs(query) + params = urlparse.parse_qs(query) try: nodeIdentifier = params['node'][0] @@ -138,37 +139,46 @@ -class WebStreamParser(object): - def __init__(self): - self.elementStream = domish.elementStream() - self.elementStream.DocumentStartEvent = self.docStart - self.elementStream.ElementEvent = self.elem - self.elementStream.DocumentEndEvent = self.docEnd - self.done = False +def _parseContentType(header): + """ + Parse a Content-Type header value to a L{mimetools.Message}. + L{mimetools.Message} parses a Content-Type header and makes the + components available with its C{getmaintype}, C{getsubtype}, C{gettype}, + C{getplist} and C{getparam} methods. + """ + return mimetools.Message(StringIO(b'Content-Type: ' + header)) - def docStart(self, elem): - self.document = elem - def elem(self, elem): - self.document.addChild(elem) - - - def docEnd(self): - self.done = True - +def _asyncResponse(render): + """ + """ + def wrapped(self, request): + def eb(failure): + if failure.check(Error): + err = failure.value + else: + log.err(failure) + err = Error(500) + request.setResponseCode(err.status, err.message) + return err.response - def parse(self, stream): - def endOfStream(result): - if not self.done: - raise Exception("No more stuff?") - else: - return self.document + def finish(result): + if result is server.NOT_DONE_YET: + return + + if result: + request.write(result) + request.finish() - d = readStream(stream, self.elementStream.parse) - d.addCallback(endOfStream) - return d + d = defer.maybeDeferred(render, self, request) + d.addErrback(eb) + d.addCallback(finish) + + return server.NOT_DONE_YET + + return wrapped @@ -185,17 +195,18 @@ http_GET = None - def http_POST(self, request): + @_asyncResponse + def render_POST(self, request): """ Respond to a POST request to create a new node. """ def toResponse(nodeIdentifier): uri = getXMPPURI(self.serviceJID, nodeIdentifier) - stream = simplejson.dumps({'uri': uri}) - contentType = http_headers.MimeType.fromString(MIME_JSON) - return http.Response(responsecode.OK, stream=stream, - headers={'Content-Type': contentType}) + body = simplejson.dumps({'uri': uri}) + request.setHeader(b'Content-Type', MIME_JSON) + return body + d = self.backend.createNode(None, self.owner) d.addCallback(toResponse) return d @@ -212,51 +223,41 @@ self.owner = owner - http_GET = None + render_GET = None - def http_POST(self, request): + @_asyncResponse + def render_POST(self, request): """ Respond to a POST request to create a new node. """ - - def gotStream(_): - if request.args.get('uri'): - jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0]) - return defer.succeed(nodeIdentifier) - else: - raise http.HTTPError(http.Response(responsecode.BAD_REQUEST, - "No URI given")) - - def doDelete(nodeIdentifier, data): - if data: - params = simplejson.loads(''.join(data)) - redirectURI = params.get('redirect_uri') - else: - redirectURI = None - - return self.backend.deleteNode(nodeIdentifier, self.owner, - redirectURI) - - def respond(result): - return http.Response(responsecode.NO_CONTENT) - + def toResponse(result): + request.setResponseCode(http.NO_CONTENT) def trapNotFound(failure): failure.trap(error.NodeNotFound) - return http.StatusResponse(responsecode.NOT_FOUND, - "Node not found") + raise Error(http.NOT_FOUND, "Node not found") def trapXMPPURIParseError(failure): failure.trap(XMPPURIParseError) - return http.StatusResponse(responsecode.BAD_REQUEST, - "Malformed XMPP URI: %s" % failure.value) + raise Error(http.BAD_REQUEST, + "Malformed XMPP URI: %s" % failure.value) + + if not request.args.get('uri'): + raise Error(http.BAD_REQUEST, "No URI given") + + jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0]) - data = [] - d = readStream(request.stream, data.append) - d.addCallback(gotStream) - d.addCallback(doDelete, data) - d.addCallback(respond) + data = request.content.read() + if data: + params = simplejson.loads(data) + redirectURI = params.get('redirect_uri', None) + else: + redirectURI = None + + d = self.backend.deleteNode(nodeIdentifier, self.owner, + redirectURI) + d.addCallback(toResponse) d.addErrback(trapNotFound) d.addErrback(trapXMPPURIParseError) return d @@ -274,45 +275,37 @@ self.owner = owner - http_GET = None + render_GET = None def checkMediaType(self, request): - ctype = request.headers.getHeader('content-type') + ctype = request.getHeader(b'content-type') if not ctype: - raise http.HTTPError( - http.StatusResponse( - responsecode.BAD_REQUEST, - "No specified Media Type")) + request.setResponseCode(http.BAD_REQUEST) + + raise Error(http.BAD_REQUEST, b"No specified Media Type") - if (ctype.mediaType != 'application' or - ctype.mediaSubtype != 'atom+xml' or - ctype.params.get('type') != 'entry' or - ctype.params.get('charset', 'utf-8') != 'utf-8'): - raise http.HTTPError( - http.StatusResponse( - responsecode.UNSUPPORTED_MEDIA_TYPE, - "Unsupported Media Type: %s" % - http_headers.generateContentType(ctype))) + message = _parseContentType(ctype) + if (message.maintype != b'application' or + message.subtype != b'atom+xml' or + message.getparam(b'type') != b'entry' or + (message.getparam(b'charset') or b'utf-8') != b'utf-8'): + raise Error(http.UNSUPPORTED_MEDIA_TYPE, + b"Unsupported Media Type: %s" % ctype) - def parseXMLPayload(self, stream): - p = WebStreamParser() - return p.parse(stream) - - - def http_POST(self, request): + @_asyncResponse + def render_POST(self, request): """ Respond to a POST request to create a new item. """ def toResponse(nodeIdentifier): uri = getXMPPURI(self.serviceJID, nodeIdentifier) - stream = simplejson.dumps({'uri': uri}) - contentType = http_headers.MimeType.fromString(MIME_JSON) - return http.Response(responsecode.OK, stream=stream, - headers={'Content-Type': contentType}) + body = simplejson.dumps({'uri': uri}) + request.setHeader(b'Content-Type', MIME_JSON) + return body def gotNode(nodeIdentifier, payload): item = Item(id='current', payload=payload) @@ -327,24 +320,19 @@ else: return self.backend.createNode(None, self.owner) - def doPublish(payload): - d = getNode() - d.addCallback(gotNode, payload) - return d - def trapNotFound(failure): failure.trap(error.NodeNotFound) - return http.StatusResponse(responsecode.NOT_FOUND, - "Node not found") + raise Error(http.NOT_FOUND, "Node not found") def trapXMPPURIParseError(failure): failure.trap(XMPPURIParseError) - return http.StatusResponse(responsecode.BAD_REQUEST, - "Malformed XMPP URI: %s" % failure.value) + raise Error(http.BAD_REQUEST, + "Malformed XMPP URI: %s" % failure.value) self.checkMediaType(request) - d = self.parseXMLPayload(request.stream) - d.addCallback(doPublish) + payload = parseXml(request.content.read()) + d = getNode() + d.addCallback(gotNode, payload) d.addCallback(toResponse) d.addErrback(trapNotFound) d.addErrback(trapXMPPURIParseError) @@ -357,12 +345,12 @@ self.service = service - def render(self, request): + @_asyncResponse + def render_GET(self, request): def responseFromNodes(nodeIdentifiers): - stream = simplejson.dumps(nodeIdentifiers) - contentType = http_headers.MimeType.fromString(MIME_JSON) - return http.Response(responsecode.OK, stream=stream, - headers={'Content-Type': contentType}) + body = simplejson.dumps(nodeIdentifiers) + request.setHeader(b'Content-Type', MIME_JSON) + return body d = self.service.getNodes() d.addCallback(responseFromNodes) @@ -377,7 +365,7 @@ Extract atom entries from a list of publish-subscribe items. @param items: List of L{domish.Element}s that represent publish-subscribe - items. + items. @type items: C{list} """ @@ -461,7 +449,7 @@ return self._postTo([callback], jid, nodeIdentifier, atomEntries[0], - 'application/atom+xml;type=entry') + MIME_ATOM_ENTRY) def subscribeOrItems(hasCallbacks): if hasCallbacks: @@ -514,10 +502,10 @@ return if len(atomEntries) == 1: - contentType = 'application/atom+xml;type=entry' + contentType = MIME_ATOM_ENTRY payload = atomEntries[0] else: - contentType = 'application/atom+xml;type=feed' + contentType = MIME_ATOM_FEED payload = constructFeed(service, nodeIdentifier, atomEntries, title='Received item collection') @@ -614,11 +602,11 @@ serviceMethod = None errorMap = { error.NodeNotFound: - (responsecode.FORBIDDEN, "Node not found"), + (http.FORBIDDEN, "Node not found"), error.NotSubscribed: - (responsecode.FORBIDDEN, "No such subscription found"), + (http.FORBIDDEN, "No such subscription found"), error.SubscriptionExists: - (responsecode.FORBIDDEN, "Subscription already exists"), + (http.FORBIDDEN, "Subscription already exists"), } def __init__(self, service): @@ -626,38 +614,35 @@ self.params = None - http_GET = None + render_GET = None - def http_POST(self, request): + @_asyncResponse + def render_POST(self, request): def trapNotFound(failure): err = failure.trap(*self.errorMap.keys()) - code, msg = self.errorMap[err] - return http.StatusResponse(code, msg) - - def respond(result): - return http.Response(responsecode.NO_CONTENT) + status, message = self.errorMap[err] + raise Error(status, message) - def gotRequest(result): - uri = self.params['uri'] - callback = self.params['callback'] - - jid, nodeIdentifier = getServiceAndNode(uri) - method = getattr(self.service, self.serviceMethod) - d = method(jid, nodeIdentifier, callback) - return d - - def storeParams(data): - self.params = simplejson.loads(data) + def toResponse(result): + request.setResponseCode(http.NO_CONTENT) + return b'' def trapXMPPURIParseError(failure): failure.trap(XMPPURIParseError) - return http.StatusResponse(responsecode.BAD_REQUEST, - "Malformed XMPP URI: %s" % failure.value) + raise Error(http.BAD_REQUEST, + "Malformed XMPP URI: %s" % failure.value) + + data = request.content.read() + self.params = simplejson.loads(data) - d = readStream(request.stream, storeParams) - d.addCallback(gotRequest) - d.addCallback(respond) + uri = self.params['uri'] + callback = self.params['callback'] + + jid, nodeIdentifier = getServiceAndNode(uri) + method = getattr(self.service, self.serviceMethod) + d = method(jid, nodeIdentifier, callback) + d.addCallback(toResponse) d.addErrback(trapNotFound) d.addErrback(trapXMPPURIParseError) return d @@ -696,46 +681,45 @@ self.service = service - def render(self, request): + @_asyncResponse + def render_GET(self, request): try: maxItems = int(request.args.get('max_items', [0])[0]) or None except ValueError: - return http.StatusResponse(responsecode.BAD_REQUEST, - "The argument max_items has an invalid value.") + raise Error(http.BAD_REQUEST, + "The argument max_items has an invalid value.") try: uri = request.args['uri'][0] except KeyError: - return http.StatusResponse(responsecode.BAD_REQUEST, - "No URI for the remote node provided.") + raise Error(http.BAD_REQUEST, + "No URI for the remote node provided.") try: jid, nodeIdentifier = getServiceAndNode(uri) except XMPPURIParseError: - return http.StatusResponse(responsecode.BAD_REQUEST, - "Malformed XMPP URI: %s" % uri) + raise Error(http.BAD_REQUEST, + "Malformed XMPP URI: %s" % uri) - def respond(items): - """Create a feed out the retrieved items.""" - contentType = http_headers.MimeType('application', - 'atom+xml', - {'type': 'feed'}) + def toResponse(items): + """ + Create a feed out the retrieved items. + """ atomEntries = extractAtomEntries(items) feed = constructFeed(jid, nodeIdentifier, atomEntries, "Retrieved item collection") - payload = feed.toXml().encode('utf-8') - return http.Response(responsecode.OK, stream=payload, - headers={'Content-Type': contentType}) + body = feed.toXml().encode('utf-8') + request.setHeader(b'Content-Type', MIME_ATOM_FEED) + return body def trapNotFound(failure): failure.trap(StanzaError) if not failure.value.condition == 'item-not-found': raise failure - return http.StatusResponse(responsecode.NOT_FOUND, - "Node not found") + raise Error(http.NOT_FOUND, "Node not found") d = self.service.items(jid, nodeIdentifier, maxItems) - d.addCallback(respond) + d.addCallback(toResponse) d.addErrback(trapNotFound) return d @@ -780,15 +764,17 @@ http_GET = None - def http_POST(self, request): - p = WebStreamParser() - if not request.headers.hasHeader('Event'): - d = p.parse(request.stream) + def render_POST(self, request): + if request.requestHeaders.hasHeader(b'Event'): + payload = None else: - d = defer.succeed(None) - d.addCallback(self.callback, request.headers) - d.addCallback(lambda _: http.Response(responsecode.NO_CONTENT)) - return d + payload = parseXml(request.content.read()) + + self.callback(payload, request.requestHeaders) + + request.setResponseCode(http.NO_CONTENT) + return b'' + @@ -804,13 +790,14 @@ self.callbackHost = callbackHost or 'localhost' self.callbackPort = callbackPort or 8087 root = resource.Resource() - root.child_callback = CallbackResource(lambda *args, **kwargs: self.callback(*args, **kwargs)) + root.putChild('callback', CallbackResource( + lambda *args, **kwargs: self.callback(*args, **kwargs))) self.site = server.Site(root) def startService(self): self.port = reactor.listenTCP(self.callbackPort, - channel.HTTPFactory(self.site)) + self.site) def stopService(self):