changeset 177:faf1c9bc2612

Add HTTP gateway in a separate plugin. Author: ralphm Fixes #8.
author Ralph Meijer <ralphm@ik.nu>
date Thu, 10 Apr 2008 11:18:29 +0000
parents 17fc5dd77158
children 07114105885a
files idavoll/gateway.py idavoll/tap.py idavoll/tap_http.py twisted/plugins/idavoll_http.py
diffstat 4 files changed, 561 insertions(+), 3 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/idavoll/gateway.py	Thu Apr 10 11:18:29 2008 +0000
@@ -0,0 +1,474 @@
+# Copyright (c) 2003-2008 Ralph Meijer
+# See LICENSE for details.
+
+import cgi
+from time import gmtime, strftime
+
+import simplejson
+
+from twisted.application import service
+from twisted.internet import defer, reactor
+from twisted.web import client
+from twisted.web2 import http, http_headers, resource, responsecode
+from twisted.web2.stream import readStream
+from twisted.words.protocols.jabber.jid import JID
+from twisted.words.protocols.jabber.error import StanzaError
+from twisted.words.xish import domish
+
+from wokkel.pubsub import Item
+from wokkel.pubsub import PubSubClient
+
+from idavoll import error
+
+NS_ATOM = 'http://www.w3.org/2005/Atom'
+
+
+class RemoteSubscriptionService(service.Service, PubSubClient):
+
+    def __init__(self, jid):
+        self.jid = jid
+
+    def startService(self):
+        self.callbacks = {}
+
+    def trapNotFound(self, failure):
+        failure.trap(StanzaError)
+        if not failure.value.condition == 'item-not-found':
+            raise failure
+        raise error.NodeNotFound
+
+    def subscribeCallback(self, jid, nodeIdentifier, callback):
+
+        def newCallbackList(result):
+            callbackList = set()
+            self.callbacks[jid, nodeIdentifier] = callbackList
+            return callbackList
+
+        try:
+            callbackList = self.callbacks[jid, nodeIdentifier]
+        except KeyError:
+            d = self.subscribe(jid, nodeIdentifier, self.jid)
+            d.addCallback(newCallbackList)
+        else:
+            d = defer.succeed(callbackList)
+
+        d.addCallback(lambda callbackList: callbackList.add(callback))
+        d.addErrback(self.trapNotFound)
+        return d
+
+    def unsubscribeCallback(self, jid, nodeIdentifier, callback):
+        try:
+            callbackList = self.callbacks[jid, nodeIdentifier]
+            callbackList.remove(callback)
+        except KeyError:
+            return defer.fail(error.NotSubscribed())
+
+        if not callbackList:
+            self.unsubscribe(jid, nodeIdentifier, self.jid)
+
+        return defer.succeed(None)
+
+    def itemsReceived(self, recipient, service, nodeIdentifier, items):
+        """
+        Fire up HTTP client to do callback
+        """
+
+        # Collect atom entries
+        atomEntries = []
+        for item in items:
+            # ignore non-items (i.e. retractions)
+            if item.name != 'item':
+                continue
+
+            atomEntry = None
+            for element in item.elements():
+                # extract the first element that is an atom entry
+                if element.uri == NS_ATOM and element.name == 'entry':
+                    atomEntry = element
+                    break
+
+            if atomEntry:
+                atomEntries.append(atomEntry)
+
+        # Don't notify if there are no atom entries
+        if not atomEntries:
+            return
+
+        if len(atomEntries) == 1:
+            contentType = 'application/atom+xml;type=entry'
+            payload = atomEntries[0]
+        else:
+            contentType = 'application/atom+xml;type=feed'
+            nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier)
+            now = strftime("%Y-%m-%dT%H:%M:%SZ", gmtime())
+
+            # Collect the received entries in a feed
+            payload = domish.Element((NS_ATOM, 'feed'))
+            payload.addElement('title', content='Received item collection')
+            payload.addElement('id', content=nodeURI)
+            payload.addElement('updated', content=now)
+            for atomEntry in atomEntries:
+                payload.addChild(atomEntry)
+
+        self.callCallbacks(recipient, service, nodeIdentifier, payload,
+                           contentType)
+
+    def deleteReceived(self, recipient, service, nodeIdentifier):
+        """
+        Fire up HTTP client to do callback
+        """
+
+        self.callCallbacks(recipient, service, nodeIdentifier,
+                           eventType='DELETED')
+
+    def callCallbacks(self, recipient, service, nodeIdentifier,
+                            payload=None, contentType=None, eventType=None):
+        try:
+            callbacks = self.callbacks[service, nodeIdentifier]
+        except KeyError:
+            return
+
+        postdata = None
+        nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier)
+        headers = {'Referer': nodeURI.encode('utf-8'),
+                   'PubSub-Service': service.full().encode('utf-8')}
+
+        if payload:
+            postdata = payload.toXml().encode('utf-8')
+            if contentType:
+                headers['Content-Type'] = "%s;charset=utf-8" % contentType
+
+        if eventType:
+            headers['Event'] = eventType
+
+        def postNotification(callbackURI):
+            return client.getPage(str(callbackURI),
+                                  method='POST',
+                                  postdata=postdata,
+                                  headers=headers)
+
+        for callbackURI in callbacks:
+            reactor.callLater(0, postNotification, callbackURI)
+
+
+class WebStreamParser(object):
+    def __init__(self):
+        self.elementStream = domish.elementStream()
+        self.elementStream.DocumentStartEvent = self.docStart
+        self.elementStream.ElementEvent = self.elem
+        self.elementStream.DocumentEndEvent = self.docEnd
+        self.done = False
+
+    def docStart(self, elem):
+        self.document = elem
+
+    def elem(self, elem):
+        self.document.addChild(elem)
+
+    def docEnd(self):
+        self.done = True
+
+    def parse(self, stream):
+        def endOfStream(result):
+            if not self.done:
+                raise Exception("No more stuff?")
+            else:
+                return self.document
+
+        d = readStream(stream, self.elementStream.parse)
+        d.addCallback(endOfStream)
+        return d
+
+
+class XMPPURIParseError(ValueError):
+    """
+    Raised when a given XMPP URI couldn't be properly parsed.
+    """
+
+
+def getServiceAndNode(uri):
+    """
+    Given an XMPP URI, extract the publish subscribe service JID and node ID.
+    """
+
+    try:
+        scheme, rest = uri.split(':', 1)
+    except ValueError:
+        raise XMPPURIParseError("No URI scheme component")
+
+    if scheme != 'xmpp':
+        raise XMPPURIParseError("Unknown URI scheme")
+
+    if rest.startswith("//"):
+        raise XMPPURIParseError("Unexpected URI authority component")
+
+    try:
+        entity, query = rest.split('?', 1)
+    except ValueError:
+        raise XMPPURIParseError("No URI query component")
+
+    if not entity:
+        raise XMPPURIParseError("Empty URI path component")
+
+    try:
+        jid = JID(entity)
+    except Exception, e:
+        raise XMPPURIParseError("Invalid JID: %s" % e.message)
+
+    params = cgi.parse_qs(query)
+
+    try:
+        nodeIdentifier = params['node'][0]
+    except (KeyError, ValueError):
+        raise XMPPURIParseError("No node in query component of URI")
+
+    return jid, nodeIdentifier
+
+
+class CreateResource(resource.Resource):
+    """
+    A resource to create a publish-subscribe node.
+    """
+    def __init__(self, backend, serviceJID, owner):
+        self.backend = backend
+        self.serviceJID = serviceJID
+        self.owner = owner
+
+    http_GET = None
+
+    def http_POST(self, request):
+        """
+        Respond to a POST request to create a new node.
+        """
+
+        def toResponse(nodeIdentifier):
+            uri = 'xmpp:%s?;node=%s' % (self.serviceJID.full(), nodeIdentifier)
+            stream = simplejson.dumps({'uri': uri})
+            return http.Response(responsecode.OK, stream=stream)
+
+        d = self.backend.create_node(None, self.owner)
+        d.addCallback(toResponse)
+        return d
+
+
+class DeleteResource(resource.Resource):
+    """
+    A resource to create a publish-subscribe node.
+    """
+    def __init__(self, backend, serviceJID, owner):
+        self.backend = backend
+        self.serviceJID = serviceJID
+        self.owner = owner
+
+    http_GET = None
+
+    def http_POST(self, request):
+        """
+        Respond to a POST request to create a new node.
+        """
+
+        def respond(result):
+            return http.Response(responsecode.NO_CONTENT)
+
+        def getNode():
+            if request.args.get('uri'):
+                jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0])
+                return defer.succeed(nodeIdentifier)
+            else:
+                raise http.HTTPError(http.Response(responsecode.BAD_REQUEST,
+                                                   "No URI given"))
+
+        def trapNotFound(failure):
+            failure.trap(error.NodeNotFound)
+            return http.StatusResponse(responsecode.NOT_FOUND,
+                                       "Node not found")
+
+        def trapXMPPURIParseError(failure):
+            failure.trap(XMPPURIParseError)
+            return http.StatusResponse(responsecode.BAD_REQUEST,
+                    "Malformed XMPP URI: %s" % failure.value.message)
+
+        d = getNode()
+        d.addCallback(self.backend.delete_node, self.owner)
+        d.addCallback(respond)
+        d.addErrback(trapNotFound)
+        d.addErrback(trapXMPPURIParseError)
+        return d
+
+
+class PublishResource(resource.Resource):
+    """
+    A resource to publish to a publish-subscribe node.
+    """
+
+    def __init__(self, backend, serviceJID, owner):
+        self.backend = backend
+        self.serviceJID = serviceJID
+        self.owner = owner
+
+    http_GET = None
+
+    def checkMediaType(self, request):
+        ctype = request.headers.getHeader('content-type')
+
+        if not ctype:
+            raise http.HTTPError(
+                http.StatusResponse(
+                    responsecode.BAD_REQUEST,
+                    "No specified Media Type"))
+
+        if (ctype.mediaType != 'application' or
+            ctype.mediaSubtype != 'atom+xml' or
+            ctype.params.get('type') != 'entry' or
+            ctype.params.get('charset', 'utf-8') != 'utf-8'):
+            raise http.HTTPError(
+                http.StatusResponse(
+                    responsecode.UNSUPPORTED_MEDIA_TYPE,
+                    "Unsupported Media Type: %s" %
+                        http_headers.generateContentType(ctype)))
+
+    def parseXMLPayload(self, stream):
+        if not stream:
+            print "Stream is empty", repr(stream)
+        elif not stream.length:
+            print "Stream length is", repr(stream.length)
+        p = WebStreamParser()
+        return p.parse(stream)
+
+    def http_POST(self, request):
+        """
+        Respond to a POST request to create a new item.
+        """
+
+        def toResponse(nodeIdentifier):
+            uri = 'xmpp:%s?;node=%s' % (self.serviceJID.full(), nodeIdentifier)
+            stream = simplejson.dumps({'uri': uri})
+            return http.Response(responsecode.OK, stream=stream)
+
+        def gotNode(nodeIdentifier, payload):
+            item = Item(id='current', payload=payload)
+            d = self.backend.publish(nodeIdentifier, [item], self.owner)
+            d.addCallback(lambda _: nodeIdentifier)
+            return d
+
+        def getNode():
+            if request.args.get('uri'):
+                jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0])
+                return defer.succeed(nodeIdentifier)
+            else:
+                return self.backend.create_node(None, self.owner)
+
+        def doPublish(payload):
+            d = getNode()
+            d.addCallback(gotNode, payload)
+            return d
+
+        def trapNotFound(failure):
+            failure.trap(error.NodeNotFound)
+            return http.StatusResponse(responsecode.NOT_FOUND,
+                                       "Node not found")
+
+        def trapXMPPURIParseError(failure):
+            failure.trap(XMPPURIParseError)
+            return http.StatusResponse(responsecode.BAD_REQUEST,
+                    "Malformed XMPP URI: %s" % failure.value.message)
+
+        self.checkMediaType(request)
+        d = self.parseXMLPayload(request.stream)
+        d.addCallback(doPublish)
+        d.addCallback(toResponse)
+        d.addErrback(trapNotFound)
+        d.addErrback(trapXMPPURIParseError)
+        return d
+
+
+class SubscribeBaseResource(resource.Resource):
+    """
+    Base resource for remote pubsub node subscription and unsubscription.
+
+    This resource accepts POST request with a JSON document that holds
+    a dictionary with the keys C{uri} and C{callback} that respectively map
+    to the XMPP URI of the publish-subscribe node and the callback URI.
+
+    This class should be inherited with L{serviceMethod} overridden.
+
+    @cvar serviceMethod: The name of the method to be called with
+                         the JID of the pubsub service, the node identifier
+                         and the callback URI as received in the HTTP POST
+                         request to this resource.
+    """
+    serviceMethod = None
+
+    def __init__(self, service):
+        self.service = service
+        self.params = None
+
+    http_GET = None
+
+    def http_POST(self, request):
+        def trapNotFound(failure):
+            failure.trap(error.NodeNotFound)
+            return http.StatusResponse(responsecode.NOT_FOUND,
+                                       "Node not found")
+
+        def respond(result):
+            return http.Response(responsecode.NO_CONTENT)
+
+        def gotRequest(result):
+            uri = self.params['uri']
+            callback = self.params['callback']
+
+            jid, nodeIdentifier = getServiceAndNode(uri)
+            method = getattr(self.service, self.serviceMethod)
+            d = method(jid, nodeIdentifier, callback)
+            return d
+
+        def storeParams(data):
+            self.params = simplejson.loads(data)
+
+        def trapXMPPURIParseError(failure):
+            failure.trap(XMPPURIParseError)
+            return http.StatusResponse(responsecode.BAD_REQUEST,
+                    "Malformed XMPP URI: %s" % failure.value.message)
+
+        d = readStream(request.stream, storeParams)
+        d.addCallback(gotRequest)
+        d.addCallback(respond)
+        d.addErrback(trapNotFound)
+        d.addErrback(trapXMPPURIParseError)
+        return d
+
+
+class SubscribeResource(SubscribeBaseResource):
+    """
+    Resource to subscribe to a remote publish-subscribe node.
+
+    The passed C{uri} is the XMPP URI of the node to subscribe to and the
+    C{callback} is the callback URI. Upon receiving notifications from the
+    node, a POST request will be perfomed on the callback URI.
+    """
+    serviceMethod = 'subscribeCallback'
+
+
+class UnsubscribeResource(SubscribeBaseResource):
+    """
+    Resource to unsubscribe from a remote publish-subscribe node.
+
+    The passed C{uri} is the XMPP URI of the node to unsubscribe from and the
+    C{callback} is the callback URI that was registered for it.
+    """
+    serviceMethod = 'unsubscribeCallback'
+
+
+class ListResource(resource.Resource):
+    def __init__(self, service):
+        self.service = service
+
+    def render(self, request):
+        def responseFromNodes(nodeIdentifiers):
+            import pprint
+            return http.Response(responsecode.OK, stream=pprint.pformat(nodeIdentifiers))
+
+        d = self.service.get_nodes()
+        d.addCallback(responseFromNodes)
+        return d
--- a/idavoll/tap.py	Wed Apr 09 13:15:39 2008 +0000
+++ b/idavoll/tap.py	Thu Apr 10 11:18:29 2008 +0000
@@ -1,4 +1,4 @@
-# Copyright (c) 2003-2007 Ralph Meijer
+# Copyright (c) 2003-2008 Ralph Meijer
 # See LICENSE for details.
 
 from twisted.application import service
@@ -37,6 +37,8 @@
         if self['backend'] not in ['pgsql', 'memory']:
             raise usage.UsageError, "Unknown backend!"
 
+        self['jid'] = JID(self['jid'])
+
 def makeService(config):
     s = service.MultiService()
 
@@ -54,12 +56,14 @@
         st = Storage()
 
     bs = BackendService(st)
+    bs.setName('backend')
     bs.setServiceParent(s)
 
     # Set up XMPP server-side component with publish-subscribe capabilities
 
     cs = Component(config["rhost"], int(config["rport"]),
-                   config["jid"], config["secret"])
+                   config["jid"].full(), config["secret"])
+    cs.setName('component')
     cs.setServiceParent(s)
 
     cs.factory.maxDelay = 900
@@ -74,6 +78,6 @@
     ps = IPubSubService(bs)
     ps.setHandlerParent(cs)
     ps.hideNodes = config["hide-nodes"]
-    ps.serviceJID = JID(config["jid"])
+    ps.serviceJID = config["jid"]
 
     return s
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/idavoll/tap_http.py	Thu Apr 10 11:18:29 2008 +0000
@@ -0,0 +1,70 @@
+# Copyright (c) 2003-2008 Ralph Meijer
+# See LICENSE for details.
+
+from twisted.application import internet, strports
+from twisted.conch import manhole, manhole_ssh
+from twisted.cred import portal, checkers
+from twisted.web2 import channel, resource, server
+
+from idavoll import gateway, tap
+from idavoll.gateway import RemoteSubscriptionService
+
+class Options(tap.Options):
+    optParameters = [
+            ('webport', None, '8086', 'Web port'),
+    ]
+
+
+def getManholeFactory(namespace, **passwords):
+    def getManHole(_):
+        return manhole.Manhole(namespace)
+
+    realm = manhole_ssh.TerminalRealm()
+    realm.chainedProtocolFactory.protocolFactory = getManHole
+    p = portal.Portal(realm)
+    p.registerChecker(
+            checkers.InMemoryUsernamePasswordDatabaseDontUse(**passwords))
+    f = manhole_ssh.ConchFactory(p)
+    return f
+
+
+def makeService(config):
+    s = tap.makeService(config)
+
+    bs = s.getServiceNamed('backend')
+    cs = s.getServiceNamed('component')
+
+    # Set up XMPP service for subscribing to remote nodes
+
+    ss = RemoteSubscriptionService(config['jid'])
+    ss.setHandlerParent(cs)
+    ss.startService()
+
+    # Set up web service that exposes the backend using REST
+
+    root = resource.Resource()
+    root.child_create = gateway.CreateResource(bs, config['jid'],
+                                               config['jid'])
+    root.child_delete = gateway.DeleteResource(bs, config['jid'],
+                                               config['jid'])
+    root.child_publish = gateway.PublishResource(bs, config['jid'],
+                                                 config['jid'])
+    root.child_subscribe = gateway.SubscribeResource(ss)
+    root.child_unsubscribe = gateway.UnsubscribeResource(ss)
+    root.child_list = gateway.ListResource(ss)
+
+    site = server.Site(root)
+    w = internet.TCPServer(int(config['webport']), channel.HTTPFactory(site))
+    w.setServiceParent(s)
+
+    # Set up a manhole
+
+    namespace = {'service': s,
+                 'component': cs,
+                 'backend': bs}
+    f = getManholeFactory(namespace, admin='admin')
+    manholeService = strports.service('2222', f)
+    manholeService.setServiceParent(s)
+
+    return s
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/twisted/plugins/idavoll_http.py	Thu Apr 10 11:18:29 2008 +0000
@@ -0,0 +1,10 @@
+# Copyright (c) 2003-2008 Ralph Meijer
+# See LICENSE for details.
+
+from twisted.scripts.mktap import _tapHelper
+
+Idavoll = _tapHelper(
+        "Idavoll HTTP",
+        "idavoll.tap_http",
+        "Jabber Publish-Subscribe Service Component with HTTP gateway",
+        "idavoll-http")