Mercurial > libervia-pubsub
diff idavoll/gateway.py @ 183:c21b986cff30
Implement HTTP client to gateway and implement functional tests with it.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Fri, 11 Apr 2008 14:41:16 +0000 |
parents | faf1c9bc2612 |
children | 9038908dc2f5 |
line wrap: on
line diff
--- a/idavoll/gateway.py Fri Apr 11 13:48:12 2008 +0000 +++ b/idavoll/gateway.py Fri Apr 11 14:41:16 2008 +0000 @@ -1,15 +1,25 @@ +# -*- test-case-name: idavoll.test.test_gateway -*- +# # Copyright (c) 2003-2008 Ralph Meijer # See LICENSE for details. +""" +Web resources and client for interacting with pubsub services. +""" + import cgi from time import gmtime, strftime +import urllib +import urlparse import simplejson from twisted.application import service from twisted.internet import defer, reactor +from twisted.python import log from twisted.web import client from twisted.web2 import http, http_headers, resource, responsecode +from twisted.web2 import channel, server from twisted.web2.stream import readStream from twisted.words.protocols.jabber.jid import JID from twisted.words.protocols.jabber.error import StanzaError @@ -21,7 +31,8 @@ from idavoll import error NS_ATOM = 'http://www.w3.org/2005/Atom' - +MIME_ATOM_ENTRY = 'application/atom+xml;type=entry' +MIME_JSON = 'application/json' class RemoteSubscriptionService(service.Service, PubSubClient): @@ -142,10 +153,11 @@ headers['Event'] = eventType def postNotification(callbackURI): - return client.getPage(str(callbackURI), - method='POST', - postdata=postdata, - headers=headers) + d = client.getPage(str(callbackURI), + method='POST', + postdata=postdata, + headers=headers) + d.addErrback(log.err) for callbackURI in callbacks: reactor.callLater(0, postNotification, callbackURI) @@ -328,10 +340,6 @@ http_headers.generateContentType(ctype))) def parseXMLPayload(self, stream): - if not stream: - print "Stream is empty", repr(stream) - elif not stream.length: - print "Stream length is", repr(stream.length) p = WebStreamParser() return p.parse(stream) @@ -466,9 +474,119 @@ def render(self, request): def responseFromNodes(nodeIdentifiers): - import pprint - return http.Response(responsecode.OK, stream=pprint.pformat(nodeIdentifiers)) + return http.Response(responsecode.OK, + stream=simplejson.dumps(nodeIdentifiers)) d = self.service.get_nodes() d.addCallback(responseFromNodes) return d + + +def getPageWithFactory(url, contextFactory=None, *args, **kwargs): + """Download a web page. + + Download a page. Return the factory that holds a deferred, which will + callback with a page (as a string) or errback with a description of the + error. + + See HTTPClientFactory to see what extra args can be passed. + """ + + scheme, host, port, path = client._parse(url) + factory = client.HTTPClientFactory(url, *args, **kwargs) + factory.protocol.handleStatus_204 = lambda self: self.handleStatus_200() + + if scheme == 'https': + from twisted.internet import ssl + if contextFactory is None: + contextFactory = ssl.ClientContextFactory() + reactor.connectSSL(host, port, factory, contextFactory) + else: + reactor.connectTCP(host, port, factory) + return factory + + +class CallbackResource(resource.Resource): + """ + Web resource for retrieving gateway notifications. + """ + + def __init__(self, callback): + self.callback = callback + + http_GET = None + + def http_POST(self, request): + p = WebStreamParser() + d = p.parse(request.stream) + d.addCallback(self.callback, request.headers) + d.addCallback(lambda _: http.Response(responsecode.NO_CONTENT)) + return d + +class GatewayClient(service.Service): + """ + Service that provides client access to the HTTP Gateway into Idavoll. + """ + + agent = "Idavoll HTTP Gateway Client" + + def __init__(self, baseURI, callbackHost=None, callbackPort=None): + self.baseURI = baseURI + self.callbackHost = callbackHost or 'localhost' + self.callbackPort = callbackPort or 8087 + root = resource.Resource() + root.child_callback = CallbackResource(lambda *args, **kwargs: self.callback(*args, **kwargs)) + self.site = server.Site(root) + + def startService(self): + self.port = reactor.listenTCP(self.callbackPort, + channel.HTTPFactory(self.site)) + + def stopService(self): + return self.port.stopListening() + + def _makeURI(self, verb, query=None): + uriComponents = urlparse.urlparse(self.baseURI) + uri = urlparse.urlunparse((uriComponents[0], + uriComponents[1], + uriComponents[2] + verb, + '', + query and urllib.urlencode(query) or '', + '')) + return uri + + def callback(self, data, headers): + pass + + def create(self): + f = getPageWithFactory(self._makeURI('create'), + method='POST', + agent=self.agent) + return f.deferred.addCallback(simplejson.loads) + + def publish(self, entry, xmppURI=None): + query = xmppURI and {'uri': xmppURI} + + f = getPageWithFactory(self._makeURI('publish', query), + method='POST', + postdata=entry.toXml().encode('utf-8'), + headers={'Content-Type': MIME_ATOM_ENTRY}, + agent=self.agent) + return f.deferred.addCallback(simplejson.loads) + + def listNodes(self): + f = getPageWithFactory(self._makeURI('list'), + method='GET', + agent=self.agent) + return f.deferred.addCallback(simplejson.loads) + + def subscribe(self, xmppURI): + params = {'uri': xmppURI, + 'callback': 'http://%s:%s/callback' % (self.callbackHost, + self.callbackPort)} + f = getPageWithFactory(self._makeURI('subscribe'), + method='POST', + postdata=simplejson.dumps(params), + headers={'Content-Type': MIME_JSON}, + agent=self.agent) + return f.deferred