Mercurial > libervia-pubsub
diff sat_pubsub/gateway.py @ 232:923281d4c5bc
renamed idavoll directory to sat_pubsub
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 17 May 2012 12:48:14 +0200 |
parents | idavoll/gateway.py@55b45c7dccb4 |
children | 564ae55219e1 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat_pubsub/gateway.py Thu May 17 12:48:14 2012 +0200 @@ -0,0 +1,868 @@ +# -*- test-case-name: idavoll.test.test_gateway -*- +# +# Copyright (c) 2003-2009 Ralph Meijer +# See LICENSE for details. + +""" +Web resources and client for interacting with pubsub services. +""" + +import cgi +from time import gmtime, strftime +import urllib +import urlparse + +import simplejson + +from twisted.application import service +from twisted.internet import defer, reactor +from twisted.python import log +from twisted.web import client +from twisted.web2 import http, http_headers, resource, responsecode +from twisted.web2 import channel, server +from twisted.web2.stream import readStream +from twisted.words.protocols.jabber.jid import JID +from twisted.words.protocols.jabber.error import StanzaError +from twisted.words.xish import domish + +from wokkel.pubsub import Item +from wokkel.pubsub import PubSubClient + +from idavoll import error + +NS_ATOM = 'http://www.w3.org/2005/Atom' +MIME_ATOM_ENTRY = 'application/atom+xml;type=entry' +MIME_JSON = 'application/json' + +class XMPPURIParseError(ValueError): + """ + Raised when a given XMPP URI couldn't be properly parsed. + """ + + + +def getServiceAndNode(uri): + """ + Given an XMPP URI, extract the publish subscribe service JID and node ID. + """ + + try: + scheme, rest = uri.split(':', 1) + except ValueError: + raise XMPPURIParseError("No URI scheme component") + + if scheme != 'xmpp': + raise XMPPURIParseError("Unknown URI scheme") + + if rest.startswith("//"): + raise XMPPURIParseError("Unexpected URI authority component") + + try: + entity, query = rest.split('?', 1) + except ValueError: + raise XMPPURIParseError("No URI query component") + + if not entity: + raise XMPPURIParseError("Empty URI path component") + + try: + service = JID(entity) + except Exception, e: + raise XMPPURIParseError("Invalid JID: %s" % e) + + params = cgi.parse_qs(query) + + try: + nodeIdentifier = params['node'][0] + except (KeyError, ValueError): + nodeIdentifier = '' + + return service, nodeIdentifier + + + +def getXMPPURI(service, nodeIdentifier): + """ + Construct an XMPP URI from a service JID and node identifier. + """ + return "xmpp:%s?;node=%s" % (service.full(), nodeIdentifier or '') + + + +class WebStreamParser(object): + def __init__(self): + self.elementStream = domish.elementStream() + self.elementStream.DocumentStartEvent = self.docStart + self.elementStream.ElementEvent = self.elem + self.elementStream.DocumentEndEvent = self.docEnd + self.done = False + + + def docStart(self, elem): + self.document = elem + + + def elem(self, elem): + self.document.addChild(elem) + + + def docEnd(self): + self.done = True + + + def parse(self, stream): + def endOfStream(result): + if not self.done: + raise Exception("No more stuff?") + else: + return self.document + + d = readStream(stream, self.elementStream.parse) + d.addCallback(endOfStream) + return d + + + +class CreateResource(resource.Resource): + """ + A resource to create a publish-subscribe node. + """ + def __init__(self, backend, serviceJID, owner): + self.backend = backend + self.serviceJID = serviceJID + self.owner = owner + + + http_GET = None + + + def http_POST(self, request): + """ + Respond to a POST request to create a new node. + """ + + def toResponse(nodeIdentifier): + uri = getXMPPURI(self.serviceJID, nodeIdentifier) + stream = simplejson.dumps({'uri': uri}) + contentType = http_headers.MimeType.fromString(MIME_JSON) + return http.Response(responsecode.OK, stream=stream, + headers={'Content-Type': contentType}) + d = self.backend.createNode(None, self.owner) + d.addCallback(toResponse) + return d + + + +class DeleteResource(resource.Resource): + """ + A resource to create a publish-subscribe node. + """ + def __init__(self, backend, serviceJID, owner): + self.backend = backend + self.serviceJID = serviceJID + self.owner = owner + + + http_GET = None + + + def http_POST(self, request): + """ + Respond to a POST request to create a new node. + """ + + def gotStream(_): + if request.args.get('uri'): + jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0]) + return defer.succeed(nodeIdentifier) + else: + raise http.HTTPError(http.Response(responsecode.BAD_REQUEST, + "No URI given")) + + def doDelete(nodeIdentifier, data): + if data: + params = simplejson.loads(''.join(data)) + redirectURI = params.get('redirect_uri') + else: + redirectURI = None + + return self.backend.deleteNode(nodeIdentifier, self.owner, + redirectURI) + + def respond(result): + return http.Response(responsecode.NO_CONTENT) + + + def trapNotFound(failure): + failure.trap(error.NodeNotFound) + return http.StatusResponse(responsecode.NOT_FOUND, + "Node not found") + + def trapXMPPURIParseError(failure): + failure.trap(XMPPURIParseError) + return http.StatusResponse(responsecode.BAD_REQUEST, + "Malformed XMPP URI: %s" % failure.value) + + data = [] + d = readStream(request.stream, data.append) + d.addCallback(gotStream) + d.addCallback(doDelete, data) + d.addCallback(respond) + d.addErrback(trapNotFound) + d.addErrback(trapXMPPURIParseError) + return d + + + +class PublishResource(resource.Resource): + """ + A resource to publish to a publish-subscribe node. + """ + + def __init__(self, backend, serviceJID, owner): + self.backend = backend + self.serviceJID = serviceJID + self.owner = owner + + + http_GET = None + + + def checkMediaType(self, request): + ctype = request.headers.getHeader('content-type') + + if not ctype: + raise http.HTTPError( + http.StatusResponse( + responsecode.BAD_REQUEST, + "No specified Media Type")) + + if (ctype.mediaType != 'application' or + ctype.mediaSubtype != 'atom+xml' or + ctype.params.get('type') != 'entry' or + ctype.params.get('charset', 'utf-8') != 'utf-8'): + raise http.HTTPError( + http.StatusResponse( + responsecode.UNSUPPORTED_MEDIA_TYPE, + "Unsupported Media Type: %s" % + http_headers.generateContentType(ctype))) + + + def parseXMLPayload(self, stream): + p = WebStreamParser() + return p.parse(stream) + + + def http_POST(self, request): + """ + Respond to a POST request to create a new item. + """ + + def toResponse(nodeIdentifier): + uri = getXMPPURI(self.serviceJID, nodeIdentifier) + stream = simplejson.dumps({'uri': uri}) + contentType = http_headers.MimeType.fromString(MIME_JSON) + return http.Response(responsecode.OK, stream=stream, + headers={'Content-Type': contentType}) + + def gotNode(nodeIdentifier, payload): + item = Item(id='current', payload=payload) + d = self.backend.publish(nodeIdentifier, [item], self.owner) + d.addCallback(lambda _: nodeIdentifier) + return d + + def getNode(): + if request.args.get('uri'): + jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0]) + return defer.succeed(nodeIdentifier) + else: + return self.backend.createNode(None, self.owner) + + def doPublish(payload): + d = getNode() + d.addCallback(gotNode, payload) + return d + + def trapNotFound(failure): + failure.trap(error.NodeNotFound) + return http.StatusResponse(responsecode.NOT_FOUND, + "Node not found") + + def trapXMPPURIParseError(failure): + failure.trap(XMPPURIParseError) + return http.StatusResponse(responsecode.BAD_REQUEST, + "Malformed XMPP URI: %s" % failure.value) + + self.checkMediaType(request) + d = self.parseXMLPayload(request.stream) + d.addCallback(doPublish) + d.addCallback(toResponse) + d.addErrback(trapNotFound) + d.addErrback(trapXMPPURIParseError) + return d + + + +class ListResource(resource.Resource): + def __init__(self, service): + self.service = service + + + def render(self, request): + def responseFromNodes(nodeIdentifiers): + stream = simplejson.dumps(nodeIdentifiers) + contentType = http_headers.MimeType.fromString(MIME_JSON) + return http.Response(responsecode.OK, stream=stream, + headers={'Content-Type': contentType}) + + d = self.service.getNodes() + d.addCallback(responseFromNodes) + return d + + + +# Service for subscribing to remote XMPP Pubsub nodes and web resources + +def extractAtomEntries(items): + """ + Extract atom entries from a list of publish-subscribe items. + + @param items: List of L{domish.Element}s that represent publish-subscribe + items. + @type items: C{list} + """ + + atomEntries = [] + + for item in items: + # ignore non-items (i.e. retractions) + if item.name != 'item': + continue + + atomEntry = None + for element in item.elements(): + # extract the first element that is an atom entry + if element.uri == NS_ATOM and element.name == 'entry': + atomEntry = element + break + + if atomEntry: + atomEntries.append(atomEntry) + + return atomEntries + + + +def constructFeed(service, nodeIdentifier, entries, title): + nodeURI = getXMPPURI(service, nodeIdentifier) + now = strftime("%Y-%m-%dT%H:%M:%SZ", gmtime()) + + # Collect the received entries in a feed + feed = domish.Element((NS_ATOM, 'feed')) + feed.addElement('title', content=title) + feed.addElement('id', content=nodeURI) + feed.addElement('updated', content=now) + + for entry in entries: + feed.addChild(entry) + + return feed + + + +class RemoteSubscriptionService(service.Service, PubSubClient): + """ + Service for subscribing to remote XMPP Publish-Subscribe nodes. + + Subscriptions are created with a callback HTTP URI that is POSTed + to with the received items in notifications. + """ + + def __init__(self, jid, storage): + self.jid = jid + self.storage = storage + + + def trapNotFound(self, failure): + failure.trap(StanzaError) + + if failure.value.condition == 'item-not-found': + raise error.NodeNotFound() + else: + return failure + + + def subscribeCallback(self, jid, nodeIdentifier, callback): + """ + Subscribe a callback URI. + + This registers a callback URI to be called when a notification is + received for the given node. + + If this is the first callback registered for this node, the gateway + will subscribe to the node. Otherwise, the most recently published item + for this node is retrieved and, if present, the newly registered + callback will be called with that item. + """ + + def callbackForLastItem(items): + atomEntries = extractAtomEntries(items) + + if not atomEntries: + return + + self._postTo([callback], jid, nodeIdentifier, atomEntries[0], + 'application/atom+xml;type=entry') + + def subscribeOrItems(hasCallbacks): + if hasCallbacks: + if not nodeIdentifier: + return None + d = self.items(jid, nodeIdentifier, 1) + d.addCallback(callbackForLastItem) + else: + d = self.subscribe(jid, nodeIdentifier, self.jid) + + d.addErrback(self.trapNotFound) + return d + + d = self.storage.hasCallbacks(jid, nodeIdentifier) + d.addCallback(subscribeOrItems) + d.addCallback(lambda _: self.storage.addCallback(jid, nodeIdentifier, + callback)) + return d + + + def unsubscribeCallback(self, jid, nodeIdentifier, callback): + """ + Unsubscribe a callback. + + If this was the last registered callback for this node, the + gateway will unsubscribe from node. + """ + + def cb(last): + if last: + return self.unsubscribe(jid, nodeIdentifier, self.jid) + + d = self.storage.removeCallback(jid, nodeIdentifier, callback) + d.addCallback(cb) + return d + + + def itemsReceived(self, event): + """ + Fire up HTTP client to do callback + """ + + atomEntries = extractAtomEntries(event.items) + service = event.sender + nodeIdentifier = event.nodeIdentifier + headers = event.headers + + # Don't notify if there are no atom entries + if not atomEntries: + return + + if len(atomEntries) == 1: + contentType = 'application/atom+xml;type=entry' + payload = atomEntries[0] + else: + contentType = 'application/atom+xml;type=feed' + payload = constructFeed(service, nodeIdentifier, atomEntries, + title='Received item collection') + + self.callCallbacks(service, nodeIdentifier, payload, contentType) + + if 'Collection' in headers: + for collection in headers['Collection']: + nodeIdentifier = collection or '' + self.callCallbacks(service, nodeIdentifier, payload, + contentType) + + + def deleteReceived(self, event): + """ + Fire up HTTP client to do callback + """ + + service = event.sender + nodeIdentifier = event.nodeIdentifier + redirectURI = event.redirectURI + self.callCallbacks(service, nodeIdentifier, eventType='DELETED', + redirectURI=redirectURI) + + + def _postTo(self, callbacks, service, nodeIdentifier, + payload=None, contentType=None, eventType=None, + redirectURI=None): + + if not callbacks: + return + + postdata = None + nodeURI = getXMPPURI(service, nodeIdentifier) + headers = {'Referer': nodeURI.encode('utf-8'), + 'PubSub-Service': service.full().encode('utf-8')} + + if payload: + postdata = payload.toXml().encode('utf-8') + if contentType: + headers['Content-Type'] = "%s;charset=utf-8" % contentType + + if eventType: + headers['Event'] = eventType + + if redirectURI: + headers['Link'] = '<%s>; rel=alternate' % ( + redirectURI.encode('utf-8'), + ) + + def postNotification(callbackURI): + f = getPageWithFactory(str(callbackURI), + method='POST', + postdata=postdata, + headers=headers) + d = f.deferred + d.addErrback(log.err) + + for callbackURI in callbacks: + reactor.callLater(0, postNotification, callbackURI) + + + def callCallbacks(self, service, nodeIdentifier, + payload=None, contentType=None, eventType=None, + redirectURI=None): + + def eb(failure): + failure.trap(error.NoCallbacks) + + # No callbacks were registered for this node. Unsubscribe? + + d = self.storage.getCallbacks(service, nodeIdentifier) + d.addCallback(self._postTo, service, nodeIdentifier, payload, + contentType, eventType, redirectURI) + d.addErrback(eb) + d.addErrback(log.err) + + + +class RemoteSubscribeBaseResource(resource.Resource): + """ + Base resource for remote pubsub node subscription and unsubscription. + + This resource accepts POST request with a JSON document that holds + a dictionary with the keys C{uri} and C{callback} that respectively map + to the XMPP URI of the publish-subscribe node and the callback URI. + + This class should be inherited with L{serviceMethod} overridden. + + @cvar serviceMethod: The name of the method to be called with + the JID of the pubsub service, the node identifier + and the callback URI as received in the HTTP POST + request to this resource. + """ + serviceMethod = None + errorMap = { + error.NodeNotFound: + (responsecode.FORBIDDEN, "Node not found"), + error.NotSubscribed: + (responsecode.FORBIDDEN, "No such subscription found"), + error.SubscriptionExists: + (responsecode.FORBIDDEN, "Subscription already exists"), + } + + def __init__(self, service): + self.service = service + self.params = None + + + http_GET = None + + + def http_POST(self, request): + def trapNotFound(failure): + err = failure.trap(*self.errorMap.keys()) + code, msg = self.errorMap[err] + return http.StatusResponse(code, msg) + + def respond(result): + return http.Response(responsecode.NO_CONTENT) + + def gotRequest(result): + uri = self.params['uri'] + callback = self.params['callback'] + + jid, nodeIdentifier = getServiceAndNode(uri) + method = getattr(self.service, self.serviceMethod) + d = method(jid, nodeIdentifier, callback) + return d + + def storeParams(data): + self.params = simplejson.loads(data) + + def trapXMPPURIParseError(failure): + failure.trap(XMPPURIParseError) + return http.StatusResponse(responsecode.BAD_REQUEST, + "Malformed XMPP URI: %s" % failure.value) + + d = readStream(request.stream, storeParams) + d.addCallback(gotRequest) + d.addCallback(respond) + d.addErrback(trapNotFound) + d.addErrback(trapXMPPURIParseError) + return d + + + +class RemoteSubscribeResource(RemoteSubscribeBaseResource): + """ + Resource to subscribe to a remote publish-subscribe node. + + The passed C{uri} is the XMPP URI of the node to subscribe to and the + C{callback} is the callback URI. Upon receiving notifications from the + node, a POST request will be perfomed on the callback URI. + """ + serviceMethod = 'subscribeCallback' + + + +class RemoteUnsubscribeResource(RemoteSubscribeBaseResource): + """ + Resource to unsubscribe from a remote publish-subscribe node. + + The passed C{uri} is the XMPP URI of the node to unsubscribe from and the + C{callback} is the callback URI that was registered for it. + """ + serviceMethod = 'unsubscribeCallback' + + + +class RemoteItemsResource(resource.Resource): + """ + Resource for retrieving items from a remote pubsub node. + """ + + def __init__(self, service): + self.service = service + + + def render(self, request): + try: + maxItems = int(request.args.get('max_items', [0])[0]) or None + except ValueError: + return http.StatusResponse(responsecode.BAD_REQUEST, + "The argument max_items has an invalid value.") + + try: + uri = request.args['uri'][0] + except KeyError: + return http.StatusResponse(responsecode.BAD_REQUEST, + "No URI for the remote node provided.") + + try: + jid, nodeIdentifier = getServiceAndNode(uri) + except XMPPURIParseError: + return http.StatusResponse(responsecode.BAD_REQUEST, + "Malformed XMPP URI: %s" % uri) + + def respond(items): + """Create a feed out the retrieved items.""" + contentType = http_headers.MimeType('application', + 'atom+xml', + {'type': 'feed'}) + atomEntries = extractAtomEntries(items) + feed = constructFeed(jid, nodeIdentifier, atomEntries, + "Retrieved item collection") + payload = feed.toXml().encode('utf-8') + return http.Response(responsecode.OK, stream=payload, + headers={'Content-Type': contentType}) + + def trapNotFound(failure): + failure.trap(StanzaError) + if not failure.value.condition == 'item-not-found': + raise failure + return http.StatusResponse(responsecode.NOT_FOUND, + "Node not found") + + d = self.service.items(jid, nodeIdentifier, maxItems) + d.addCallback(respond) + d.addErrback(trapNotFound) + return d + + + +# Client side code to interact with a service as provided above + +def getPageWithFactory(url, contextFactory=None, *args, **kwargs): + """Download a web page. + + Download a page. Return the factory that holds a deferred, which will + callback with a page (as a string) or errback with a description of the + error. + + See HTTPClientFactory to see what extra args can be passed. + """ + + scheme, host, port, path = client._parse(url) + factory = client.HTTPClientFactory(url, *args, **kwargs) + factory.protocol.handleStatus_204 = lambda self: self.handleStatus_200() + + if scheme == 'https': + from twisted.internet import ssl + if contextFactory is None: + contextFactory = ssl.ClientContextFactory() + reactor.connectSSL(host, port, factory, contextFactory) + else: + reactor.connectTCP(host, port, factory) + return factory + + + +class CallbackResource(resource.Resource): + """ + Web resource for retrieving gateway notifications. + """ + + def __init__(self, callback): + self.callback = callback + + + http_GET = None + + + def http_POST(self, request): + p = WebStreamParser() + if not request.headers.hasHeader('Event'): + d = p.parse(request.stream) + else: + d = defer.succeed(None) + d.addCallback(self.callback, request.headers) + d.addCallback(lambda _: http.Response(responsecode.NO_CONTENT)) + return d + + + +class GatewayClient(service.Service): + """ + Service that provides client access to the HTTP Gateway into Idavoll. + """ + + agent = "Idavoll HTTP Gateway Client" + + def __init__(self, baseURI, callbackHost=None, callbackPort=None): + self.baseURI = baseURI + self.callbackHost = callbackHost or 'localhost' + self.callbackPort = callbackPort or 8087 + root = resource.Resource() + root.child_callback = CallbackResource(lambda *args, **kwargs: self.callback(*args, **kwargs)) + self.site = server.Site(root) + + + def startService(self): + self.port = reactor.listenTCP(self.callbackPort, + channel.HTTPFactory(self.site)) + + + def stopService(self): + return self.port.stopListening() + + + def _makeURI(self, verb, query=None): + uriComponents = urlparse.urlparse(self.baseURI) + uri = urlparse.urlunparse((uriComponents[0], + uriComponents[1], + uriComponents[2] + verb, + '', + query and urllib.urlencode(query) or '', + '')) + return uri + + + def callback(self, data, headers): + pass + + + def ping(self): + f = getPageWithFactory(self._makeURI(''), + method='HEAD', + agent=self.agent) + return f.deferred + + + def create(self): + f = getPageWithFactory(self._makeURI('create'), + method='POST', + agent=self.agent) + return f.deferred.addCallback(simplejson.loads) + + + def delete(self, xmppURI, redirectURI=None): + query = {'uri': xmppURI} + + if redirectURI: + params = {'redirect_uri': redirectURI} + postdata = simplejson.dumps(params) + headers = {'Content-Type': MIME_JSON} + else: + postdata = None + headers = None + + f = getPageWithFactory(self._makeURI('delete', query), + method='POST', + postdata=postdata, + headers=headers, + agent=self.agent) + return f.deferred + + + def publish(self, entry, xmppURI=None): + query = xmppURI and {'uri': xmppURI} + + f = getPageWithFactory(self._makeURI('publish', query), + method='POST', + postdata=entry.toXml().encode('utf-8'), + headers={'Content-Type': MIME_ATOM_ENTRY}, + agent=self.agent) + return f.deferred.addCallback(simplejson.loads) + + + def listNodes(self): + f = getPageWithFactory(self._makeURI('list'), + method='GET', + agent=self.agent) + return f.deferred.addCallback(simplejson.loads) + + + def subscribe(self, xmppURI): + params = {'uri': xmppURI, + 'callback': 'http://%s:%s/callback' % (self.callbackHost, + self.callbackPort)} + f = getPageWithFactory(self._makeURI('subscribe'), + method='POST', + postdata=simplejson.dumps(params), + headers={'Content-Type': MIME_JSON}, + agent=self.agent) + return f.deferred + + + def unsubscribe(self, xmppURI): + params = {'uri': xmppURI, + 'callback': 'http://%s:%s/callback' % (self.callbackHost, + self.callbackPort)} + f = getPageWithFactory(self._makeURI('unsubscribe'), + method='POST', + postdata=simplejson.dumps(params), + headers={'Content-Type': MIME_JSON}, + agent=self.agent) + return f.deferred + + + def items(self, xmppURI, maxItems=None): + query = {'uri': xmppURI} + if maxItems: + query['max_items'] = int(maxItems) + f = getPageWithFactory(self._makeURI('items', query), + method='GET', + agent=self.agent) + return f.deferred