diff idavoll/gateway.py @ 204:b4bf0a5ce50d

Implement storage facilities for the HTTP gateway. Author: ralphm. Fixes #12. One of the storage facilities is PostgreSQL based, providing persistence.
author Ralph Meijer <ralphm@ik.nu>
date Wed, 16 Jul 2008 06:38:32 +0000
parents 2c46e6664680
children 274a45d2a5ab
line wrap: on
line diff
--- a/idavoll/gateway.py	Mon Jul 14 09:16:16 2008 +0000
+++ b/idavoll/gateway.py	Wed Jul 16 06:38:32 2008 +0000
@@ -352,29 +352,34 @@
     to with the received items in notifications.
     """
 
-    def __init__(self, jid):
+    def __init__(self, jid, storage):
         self.jid = jid
-
-
-    def startService(self):
-        self.callbacks = {}
+        self.storage = storage
 
 
     def trapNotFound(self, failure):
         failure.trap(StanzaError)
-        if not failure.value.condition == 'item-not-found':
-            raise failure
-        raise error.NodeNotFound
+
+        if failure.value.condition == 'item-not-found':
+            raise error.NodeNotFound()
+        else:
+            return failure
 
 
     def subscribeCallback(self, jid, nodeIdentifier, callback):
+        """
+        Subscribe a callback URI.
 
-        def newCallbackList(result):
-            callbackList = set()
-            self.callbacks[jid, nodeIdentifier] = callbackList
-            return callbackList
+        This registers a callback URI to be called when a notification is
+        received for the given node.
 
-        def callbackForLastItem(items, callback):
+        If this is the first callback registered for this node, the gateway
+        will subscribe to the node. Otherwise, the most recently published item
+        for this node is retrieved and, if present, the newly registered
+        callback will be called with that item.
+        """
+
+        def callbackForLastItem(items):
             atomEntries = extractAtomEntries(items)
 
             if not atomEntries:
@@ -383,32 +388,38 @@
             self._postTo([callback], jid, nodeIdentifier, atomEntries[0],
                          'application/atom+xml;type=entry')
 
-        try:
-            callbackList = self.callbacks[jid, nodeIdentifier]
-        except KeyError:
-            d = self.subscribe(jid, nodeIdentifier, self.jid)
-            d.addCallback(newCallbackList)
-        else:
-            d = self.items(jid, nodeIdentifier, 1)
-            d.addCallback(callbackForLastItem, callback)
-            d.addCallback(lambda _: callbackList)
+        def subscribeOrItems(hasCallbacks):
+            if hasCallbacks:
+                d = self.items(jid, nodeIdentifier, 1)
+                d.addCallback(callbackForLastItem)
+            else:
+                d = self.subscribe(jid, nodeIdentifier, self.jid)
 
-        d.addCallback(lambda callbackList: callbackList.add(callback))
-        d.addErrback(self.trapNotFound)
+            d.addErrback(self.trapNotFound)
+            return d
+
+        d = self.storage.hasCallbacks(jid, nodeIdentifier)
+        d.addCallback(subscribeOrItems)
+        d.addCallback(lambda _: self.storage.addCallback(jid, nodeIdentifier,
+                                                         callback))
         return d
 
 
     def unsubscribeCallback(self, jid, nodeIdentifier, callback):
-        try:
-            callbackList = self.callbacks[jid, nodeIdentifier]
-            callbackList.remove(callback)
-        except KeyError:
-            return defer.fail(error.NotSubscribed())
+        """
+        Unsubscribe a callback.
+
+        If this was the last registered callback for this node, the
+        gateway will unsubscribe from node.
+        """
 
-        if not callbackList:
-            self.unsubscribe(jid, nodeIdentifier, self.jid)
+        def cb(last):
+            if last:
+                return self.unsubscribe(jid, nodeIdentifier, self.jid)
 
-        return defer.succeed(None)
+        d = self.storage.removeCallback(jid, nodeIdentifier, callback)
+        d.addCallback(cb)
+        return d
 
 
     def itemsReceived(self, event):
@@ -446,6 +457,10 @@
 
     def _postTo(self, callbacks, service, nodeIdentifier,
                       payload=None, contentType=None, eventType=None):
+
+        if not callbacks:
+            return
+
         postdata = None
         nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier)
         headers = {'Referer': nodeURI.encode('utf-8'),
@@ -469,16 +484,22 @@
         for callbackURI in callbacks:
             reactor.callLater(0, postNotification, callbackURI)
 
+
     def callCallbacks(self, service, nodeIdentifier,
                             payload=None, contentType=None, eventType=None):
-        try:
-            callbacks = self.callbacks[service, nodeIdentifier]
-        except KeyError:
-            return
+
+        def eb(failure):
+            failure.trap(error.NoCallbacks)
 
-        self._postTo(callbacks, service, nodeIdentifier, payload, contentType,
-                     eventType)
+            # No callbacks were registered for this node. Unsubscribe.
+            d = self.unsubscribe(service, nodeIdentifier, self.jid)
+            return d
 
+        d = self.storage.getCallbacks(service, nodeIdentifier)
+        d.addCallback(self._postTo, service, nodeIdentifier, payload,
+                                    contentType, eventType)
+        d.addErrback(eb)
+        d.addErrback(log.err)