changeset 185:9038908dc2f5

Add gateway support for retrieving items from a node. Reorder gateway module.
author Ralph Meijer <ralphm@ik.nu>
date Tue, 15 Apr 2008 17:32:56 +0000
parents bd88658dbca3
children 365fd3e4daf8
files idavoll/gateway.py idavoll/tap_http.py idavoll/test/test_gateway.py
diffstat 3 files changed, 326 insertions(+), 170 deletions(-) [+]
line wrap: on
line diff
--- a/idavoll/gateway.py	Fri Apr 11 14:48:32 2008 +0000
+++ b/idavoll/gateway.py	Tue Apr 15 17:32:56 2008 +0000
@@ -34,170 +34,13 @@
 MIME_ATOM_ENTRY = 'application/atom+xml;type=entry'
 MIME_JSON = 'application/json'
 
-class RemoteSubscriptionService(service.Service, PubSubClient):
-
-    def __init__(self, jid):
-        self.jid = jid
-
-    def startService(self):
-        self.callbacks = {}
-
-    def trapNotFound(self, failure):
-        failure.trap(StanzaError)
-        if not failure.value.condition == 'item-not-found':
-            raise failure
-        raise error.NodeNotFound
-
-    def subscribeCallback(self, jid, nodeIdentifier, callback):
-
-        def newCallbackList(result):
-            callbackList = set()
-            self.callbacks[jid, nodeIdentifier] = callbackList
-            return callbackList
-
-        try:
-            callbackList = self.callbacks[jid, nodeIdentifier]
-        except KeyError:
-            d = self.subscribe(jid, nodeIdentifier, self.jid)
-            d.addCallback(newCallbackList)
-        else:
-            d = defer.succeed(callbackList)
-
-        d.addCallback(lambda callbackList: callbackList.add(callback))
-        d.addErrback(self.trapNotFound)
-        return d
-
-    def unsubscribeCallback(self, jid, nodeIdentifier, callback):
-        try:
-            callbackList = self.callbacks[jid, nodeIdentifier]
-            callbackList.remove(callback)
-        except KeyError:
-            return defer.fail(error.NotSubscribed())
-
-        if not callbackList:
-            self.unsubscribe(jid, nodeIdentifier, self.jid)
-
-        return defer.succeed(None)
-
-    def itemsReceived(self, recipient, service, nodeIdentifier, items):
-        """
-        Fire up HTTP client to do callback
-        """
-
-        # Collect atom entries
-        atomEntries = []
-        for item in items:
-            # ignore non-items (i.e. retractions)
-            if item.name != 'item':
-                continue
-
-            atomEntry = None
-            for element in item.elements():
-                # extract the first element that is an atom entry
-                if element.uri == NS_ATOM and element.name == 'entry':
-                    atomEntry = element
-                    break
-
-            if atomEntry:
-                atomEntries.append(atomEntry)
-
-        # Don't notify if there are no atom entries
-        if not atomEntries:
-            return
-
-        if len(atomEntries) == 1:
-            contentType = 'application/atom+xml;type=entry'
-            payload = atomEntries[0]
-        else:
-            contentType = 'application/atom+xml;type=feed'
-            nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier)
-            now = strftime("%Y-%m-%dT%H:%M:%SZ", gmtime())
-
-            # Collect the received entries in a feed
-            payload = domish.Element((NS_ATOM, 'feed'))
-            payload.addElement('title', content='Received item collection')
-            payload.addElement('id', content=nodeURI)
-            payload.addElement('updated', content=now)
-            for atomEntry in atomEntries:
-                payload.addChild(atomEntry)
-
-        self.callCallbacks(recipient, service, nodeIdentifier, payload,
-                           contentType)
-
-    def deleteReceived(self, recipient, service, nodeIdentifier):
-        """
-        Fire up HTTP client to do callback
-        """
-
-        self.callCallbacks(recipient, service, nodeIdentifier,
-                           eventType='DELETED')
-
-    def callCallbacks(self, recipient, service, nodeIdentifier,
-                            payload=None, contentType=None, eventType=None):
-        try:
-            callbacks = self.callbacks[service, nodeIdentifier]
-        except KeyError:
-            return
-
-        postdata = None
-        nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier)
-        headers = {'Referer': nodeURI.encode('utf-8'),
-                   'PubSub-Service': service.full().encode('utf-8')}
-
-        if payload:
-            postdata = payload.toXml().encode('utf-8')
-            if contentType:
-                headers['Content-Type'] = "%s;charset=utf-8" % contentType
-
-        if eventType:
-            headers['Event'] = eventType
-
-        def postNotification(callbackURI):
-            d = client.getPage(str(callbackURI),
-                                   method='POST',
-                                   postdata=postdata,
-                                   headers=headers)
-            d.addErrback(log.err)
-
-        for callbackURI in callbacks:
-            reactor.callLater(0, postNotification, callbackURI)
-
-
-class WebStreamParser(object):
-    def __init__(self):
-        self.elementStream = domish.elementStream()
-        self.elementStream.DocumentStartEvent = self.docStart
-        self.elementStream.ElementEvent = self.elem
-        self.elementStream.DocumentEndEvent = self.docEnd
-        self.done = False
-
-    def docStart(self, elem):
-        self.document = elem
-
-    def elem(self, elem):
-        self.document.addChild(elem)
-
-    def docEnd(self):
-        self.done = True
-
-    def parse(self, stream):
-        def endOfStream(result):
-            if not self.done:
-                raise Exception("No more stuff?")
-            else:
-                return self.document
-
-        d = readStream(stream, self.elementStream.parse)
-        d.addCallback(endOfStream)
-        return d
-
-
 class XMPPURIParseError(ValueError):
     """
     Raised when a given XMPP URI couldn't be properly parsed.
     """
 
 
+
 def getServiceAndNode(uri):
     """
     Given an XMPP URI, extract the publish subscribe service JID and node ID.
@@ -237,6 +80,41 @@
     return jid, nodeIdentifier
 
 
+
+class WebStreamParser(object):
+    def __init__(self):
+        self.elementStream = domish.elementStream()
+        self.elementStream.DocumentStartEvent = self.docStart
+        self.elementStream.ElementEvent = self.elem
+        self.elementStream.DocumentEndEvent = self.docEnd
+        self.done = False
+
+
+    def docStart(self, elem):
+        self.document = elem
+
+
+    def elem(self, elem):
+        self.document.addChild(elem)
+
+
+    def docEnd(self):
+        self.done = True
+
+
+    def parse(self, stream):
+        def endOfStream(result):
+            if not self.done:
+                raise Exception("No more stuff?")
+            else:
+                return self.document
+
+        d = readStream(stream, self.elementStream.parse)
+        d.addCallback(endOfStream)
+        return d
+
+
+
 class CreateResource(resource.Resource):
     """
     A resource to create a publish-subscribe node.
@@ -246,8 +124,10 @@
         self.serviceJID = serviceJID
         self.owner = owner
 
+
     http_GET = None
 
+
     def http_POST(self, request):
         """
         Respond to a POST request to create a new node.
@@ -263,6 +143,7 @@
         return d
 
 
+
 class DeleteResource(resource.Resource):
     """
     A resource to create a publish-subscribe node.
@@ -272,8 +153,10 @@
         self.serviceJID = serviceJID
         self.owner = owner
 
+
     http_GET = None
 
+
     def http_POST(self, request):
         """
         Respond to a POST request to create a new node.
@@ -308,6 +191,7 @@
         return d
 
 
+
 class PublishResource(resource.Resource):
     """
     A resource to publish to a publish-subscribe node.
@@ -318,8 +202,10 @@
         self.serviceJID = serviceJID
         self.owner = owner
 
+
     http_GET = None
 
+
     def checkMediaType(self, request):
         ctype = request.headers.getHeader('content-type')
 
@@ -339,10 +225,12 @@
                     "Unsupported Media Type: %s" %
                         http_headers.generateContentType(ctype)))
 
+
     def parseXMLPayload(self, stream):
         p = WebStreamParser()
         return p.parse(stream)
 
+
     def http_POST(self, request):
         """
         Respond to a POST request to create a new item.
@@ -390,7 +278,189 @@
         return d
 
 
-class SubscribeBaseResource(resource.Resource):
+
+class ListResource(resource.Resource):
+    def __init__(self, service):
+        self.service = service
+
+
+    def render(self, request):
+        def responseFromNodes(nodeIdentifiers):
+            return http.Response(responsecode.OK,
+                                 stream=simplejson.dumps(nodeIdentifiers))
+
+        d = self.service.get_nodes()
+        d.addCallback(responseFromNodes)
+        return d
+
+
+
+# Service for subscribing to remote XMPP Pubsub nodes and web resources
+
+def extractAtomEntries(items):
+    """
+    Extract atom entries from a list of publish-subscribe items.
+
+    @param items: List of L{domish.Element}s that represent publish-subscribe
+                  items.
+    @type items: C{list}
+    """
+
+    atomEntries = []
+
+    for item in items:
+        # ignore non-items (i.e. retractions)
+        if item.name != 'item':
+            continue
+
+        atomEntry = None
+        for element in item.elements():
+            # extract the first element that is an atom entry
+            if element.uri == NS_ATOM and element.name == 'entry':
+                atomEntry = element
+                break
+
+        if atomEntry:
+            atomEntries.append(atomEntry)
+
+    return atomEntries
+
+def constructFeed(service, nodeIdentifier, entries, title):
+    nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier)
+    now = strftime("%Y-%m-%dT%H:%M:%SZ", gmtime())
+
+    # Collect the received entries in a feed
+    feed = domish.Element((NS_ATOM, 'feed'))
+    feed.addElement('title', content=title)
+    feed.addElement('id', content=nodeURI)
+    feed.addElement('updated', content=now)
+
+    for entry in entries:
+        feed.addChild(entry)
+
+    return feed
+
+class RemoteSubscriptionService(service.Service, PubSubClient):
+    """
+    Service for subscribing to remote XMPP Publish-Subscribe nodes.
+
+    Subscriptions are created with a callback HTTP URI that is POSTed
+    to with the received items in notifications.
+    """
+
+    def __init__(self, jid):
+        self.jid = jid
+
+
+    def startService(self):
+        self.callbacks = {}
+
+
+    def trapNotFound(self, failure):
+        failure.trap(StanzaError)
+        if not failure.value.condition == 'item-not-found':
+            raise failure
+        raise error.NodeNotFound
+
+
+    def subscribeCallback(self, jid, nodeIdentifier, callback):
+
+        def newCallbackList(result):
+            callbackList = set()
+            self.callbacks[jid, nodeIdentifier] = callbackList
+            return callbackList
+
+        try:
+            callbackList = self.callbacks[jid, nodeIdentifier]
+        except KeyError:
+            d = self.subscribe(jid, nodeIdentifier, self.jid)
+            d.addCallback(newCallbackList)
+        else:
+            d = defer.succeed(callbackList)
+
+        d.addCallback(lambda callbackList: callbackList.add(callback))
+        d.addErrback(self.trapNotFound)
+        return d
+
+
+    def unsubscribeCallback(self, jid, nodeIdentifier, callback):
+        try:
+            callbackList = self.callbacks[jid, nodeIdentifier]
+            callbackList.remove(callback)
+        except KeyError:
+            return defer.fail(error.NotSubscribed())
+
+        if not callbackList:
+            self.unsubscribe(jid, nodeIdentifier, self.jid)
+
+        return defer.succeed(None)
+
+
+    def itemsReceived(self, recipient, service, nodeIdentifier, items):
+        """
+        Fire up HTTP client to do callback
+        """
+
+        atomEntries = extractAtomEntries(items)
+
+        # Don't notify if there are no atom entries
+        if not atomEntries:
+            return
+
+        if len(atomEntries) == 1:
+            contentType = 'application/atom+xml;type=entry'
+            payload = atomEntries[0]
+        else:
+            contentType = 'application/atom+xml;type=feed'
+            payload = constructFeed(service, nodeIdentifier, atomEntries,
+                                    title='Received item collection')
+
+        self.callCallbacks(recipient, service, nodeIdentifier, payload,
+                           contentType)
+
+
+    def deleteReceived(self, recipient, service, nodeIdentifier):
+        """
+        Fire up HTTP client to do callback
+        """
+
+        self.callCallbacks(recipient, service, nodeIdentifier,
+                           eventType='DELETED')
+
+
+    def callCallbacks(self, recipient, service, nodeIdentifier,
+                            payload=None, contentType=None, eventType=None):
+        try:
+            callbacks = self.callbacks[service, nodeIdentifier]
+        except KeyError:
+            return
+
+        postdata = None
+        nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier)
+        headers = {'Referer': nodeURI.encode('utf-8'),
+                   'PubSub-Service': service.full().encode('utf-8')}
+
+        if payload:
+            postdata = payload.toXml().encode('utf-8')
+            if contentType:
+                headers['Content-Type'] = "%s;charset=utf-8" % contentType
+
+        if eventType:
+            headers['Event'] = eventType
+
+        def postNotification(callbackURI):
+            d = client.getPage(str(callbackURI),
+                                   method='POST',
+                                   postdata=postdata,
+                                   headers=headers)
+            d.addErrback(log.err)
+
+        for callbackURI in callbacks:
+            reactor.callLater(0, postNotification, callbackURI)
+
+
+
+class RemoteSubscribeBaseResource(resource.Resource):
     """
     Base resource for remote pubsub node subscription and unsubscription.
 
@@ -411,8 +481,10 @@
         self.service = service
         self.params = None
 
+
     http_GET = None
 
+
     def http_POST(self, request):
         def trapNotFound(failure):
             failure.trap(error.NodeNotFound)
@@ -447,7 +519,8 @@
         return d
 
 
-class SubscribeResource(SubscribeBaseResource):
+
+class RemoteSubscribeResource(RemoteSubscribeBaseResource):
     """
     Resource to subscribe to a remote publish-subscribe node.
 
@@ -458,7 +531,8 @@
     serviceMethod = 'subscribeCallback'
 
 
-class UnsubscribeResource(SubscribeBaseResource):
+
+class RemoteUnsubscribeResource(RemoteSubscribeBaseResource):
     """
     Resource to unsubscribe from a remote publish-subscribe node.
 
@@ -468,20 +542,60 @@
     serviceMethod = 'unsubscribeCallback'
 
 
-class ListResource(resource.Resource):
+
+class RemoteItemsResource(resource.Resource):
+    """
+    Resource for retrieving items from a remote pubsub node.
+    """
+
     def __init__(self, service):
         self.service = service
 
     def render(self, request):
-        def responseFromNodes(nodeIdentifiers):
-            return http.Response(responsecode.OK,
-                                 stream=simplejson.dumps(nodeIdentifiers))
+        try:
+            maxItems = int(request.args.get('max_items', [0])[0]) or None
+        except ValueError:
+            return http.StatusResponse(responsecode.BAD_REQUEST,
+                    "The argument max_items has an invalid value.")
+
+        try:
+            jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0])
+        except KeyError:
+            return http.StatusResponse(responsecode.BAD_REQUEST,
+                    "No URI for the remote node provided.")
+        except XMPPURIParseError:
+            return http.StatusResponse(responsecode.BAD_REQUEST,
+                    "Malformed XMPP URI: %s" % failure.value.message)
+
 
-        d = self.service.get_nodes()
-        d.addCallback(responseFromNodes)
+        def respond(items):
+            """Create a feed out the retrieved items."""
+            contentType = http_headers.MimeType('application',
+                                                'atom+xml',
+                                                {'type': 'feed'})
+            atomEntries = extractAtomEntries(items)
+            feed = constructFeed(jid, nodeIdentifier, atomEntries,
+                                    "Retrieved item collection")
+            payload = feed.toXml().encode('utf-8')
+            return http.Response(responsecode.OK, stream=payload,
+                                 headers={'Content-Type': contentType})
+
+        def trapNotFound(failure):
+            failure.trap(StanzaError)
+            if not failure.value.condition == 'item-not-found':
+                raise failure
+            return http.StatusResponse(responsecode.NOT_FOUND,
+                                       "Node not found")
+
+        d = self.service.items(jid, nodeIdentifier)
+        d.addCallback(respond)
+        d.addErrback(trapNotFound)
         return d
 
 
+
+# Client side code to interact with a service as provided above
+
 def getPageWithFactory(url, contextFactory=None, *args, **kwargs):
     """Download a web page.
 
@@ -506,6 +620,7 @@
     return factory
 
 
+
 class CallbackResource(resource.Resource):
     """
     Web resource for retrieving gateway notifications.
@@ -514,8 +629,10 @@
     def __init__(self, callback):
         self.callback = callback
 
+
     http_GET = None
 
+
     def http_POST(self, request):
         p = WebStreamParser()
         d = p.parse(request.stream)
@@ -523,6 +640,8 @@
         d.addCallback(lambda _: http.Response(responsecode.NO_CONTENT))
         return d
 
+
+
 class GatewayClient(service.Service):
     """
     Service that provides client access to the HTTP Gateway into Idavoll.
@@ -538,13 +657,16 @@
         root.child_callback = CallbackResource(lambda *args, **kwargs: self.callback(*args, **kwargs))
         self.site = server.Site(root)
 
+
     def startService(self):
         self.port = reactor.listenTCP(self.callbackPort,
                                       channel.HTTPFactory(self.site))
 
+
     def stopService(self):
         return self.port.stopListening()
 
+
     def _makeURI(self, verb, query=None):
         uriComponents = urlparse.urlparse(self.baseURI)
         uri = urlparse.urlunparse((uriComponents[0],
@@ -555,15 +677,18 @@
                                    ''))
         return uri
 
+
     def callback(self, data, headers):
         pass
 
+
     def create(self):
         f = getPageWithFactory(self._makeURI('create'),
                     method='POST',
                     agent=self.agent)
         return f.deferred.addCallback(simplejson.loads)
 
+
     def publish(self, entry, xmppURI=None):
         query = xmppURI and {'uri': xmppURI}
 
@@ -574,12 +699,14 @@
                     agent=self.agent)
         return f.deferred.addCallback(simplejson.loads)
 
+
     def listNodes(self):
         f = getPageWithFactory(self._makeURI('list'),
                     method='GET',
                     agent=self.agent)
         return f.deferred.addCallback(simplejson.loads)
 
+
     def subscribe(self, xmppURI):
         params = {'uri': xmppURI,
                   'callback': 'http://%s:%s/callback' % (self.callbackHost,
@@ -590,3 +717,13 @@
                     headers={'Content-Type': MIME_JSON},
                     agent=self.agent)
         return f.deferred
+
+
+    def items(self, xmppURI, maxItems=None):
+        query = {'uri': xmppURI}
+        if maxItems:
+             query['maxItems'] = int(maxItems)
+        f = getPageWithFactory(self._makeURI('items', query),
+                    method='GET',
+                    agent=self.agent)
+        return f.deferred
--- a/idavoll/tap_http.py	Fri Apr 11 14:48:32 2008 +0000
+++ b/idavoll/tap_http.py	Tue Apr 15 17:32:56 2008 +0000
@@ -40,9 +40,11 @@
     ss.setHandlerParent(cs)
     ss.startService()
 
-    # Set up web service that exposes the backend using REST
+    # Set up web service
 
     root = resource.Resource()
+
+    # Set up resources that exposes the backend
     root.child_create = gateway.CreateResource(bs, config['jid'],
                                                config['jid'])
     root.child_delete = gateway.DeleteResource(bs, config['jid'],
@@ -50,8 +52,11 @@
     root.child_publish = gateway.PublishResource(bs, config['jid'],
                                                  config['jid'])
     root.child_list = gateway.ListResource(bs)
-    root.child_subscribe = gateway.SubscribeResource(ss)
-    root.child_unsubscribe = gateway.UnsubscribeResource(ss)
+
+    # Set up resources for accessing remote pubsub nodes.
+    root.child_subscribe = gateway.RemoteSubscribeResource(ss)
+    root.child_unsubscribe = gateway.RemoteUnsubscribeResource(ss)
+    root.child_items = gateway.RemoteItemsResource(ss)
 
     site = server.Site(root)
     w = internet.TCPServer(int(config['webport']), channel.HTTPFactory(site))
--- a/idavoll/test/test_gateway.py	Fri Apr 11 14:48:32 2008 +0000
+++ b/idavoll/test/test_gateway.py	Tue Apr 15 17:32:56 2008 +0000
@@ -148,3 +148,17 @@
         self.assertFailure(d, error.Error)
         d.addCallback(cb)
         return d
+
+    def test_items(self):
+        def cb(response):
+            xmppURI = response['uri']
+            d = self.client.items(xmppURI)
+            return d
+
+        def cb2(result):
+            return
+
+        d = self.client.publish(entry)
+        d.addCallback(cb)
+        d.addCallback(cb2)
+        return d