Mercurial > libervia-pubsub
diff idavoll/gateway.py @ 204:b4bf0a5ce50d
Implement storage facilities for the HTTP gateway.
Author: ralphm.
Fixes #12.
One of the storage facilities is PostgreSQL based, providing persistence.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Wed, 16 Jul 2008 06:38:32 +0000 |
parents | 2c46e6664680 |
children | 274a45d2a5ab |
line wrap: on
line diff
--- a/idavoll/gateway.py Mon Jul 14 09:16:16 2008 +0000 +++ b/idavoll/gateway.py Wed Jul 16 06:38:32 2008 +0000 @@ -352,29 +352,34 @@ to with the received items in notifications. """ - def __init__(self, jid): + def __init__(self, jid, storage): self.jid = jid - - - def startService(self): - self.callbacks = {} + self.storage = storage def trapNotFound(self, failure): failure.trap(StanzaError) - if not failure.value.condition == 'item-not-found': - raise failure - raise error.NodeNotFound + + if failure.value.condition == 'item-not-found': + raise error.NodeNotFound() + else: + return failure def subscribeCallback(self, jid, nodeIdentifier, callback): + """ + Subscribe a callback URI. - def newCallbackList(result): - callbackList = set() - self.callbacks[jid, nodeIdentifier] = callbackList - return callbackList + This registers a callback URI to be called when a notification is + received for the given node. - def callbackForLastItem(items, callback): + If this is the first callback registered for this node, the gateway + will subscribe to the node. Otherwise, the most recently published item + for this node is retrieved and, if present, the newly registered + callback will be called with that item. + """ + + def callbackForLastItem(items): atomEntries = extractAtomEntries(items) if not atomEntries: @@ -383,32 +388,38 @@ self._postTo([callback], jid, nodeIdentifier, atomEntries[0], 'application/atom+xml;type=entry') - try: - callbackList = self.callbacks[jid, nodeIdentifier] - except KeyError: - d = self.subscribe(jid, nodeIdentifier, self.jid) - d.addCallback(newCallbackList) - else: - d = self.items(jid, nodeIdentifier, 1) - d.addCallback(callbackForLastItem, callback) - d.addCallback(lambda _: callbackList) + def subscribeOrItems(hasCallbacks): + if hasCallbacks: + d = self.items(jid, nodeIdentifier, 1) + d.addCallback(callbackForLastItem) + else: + d = self.subscribe(jid, nodeIdentifier, self.jid) - d.addCallback(lambda callbackList: callbackList.add(callback)) - d.addErrback(self.trapNotFound) + d.addErrback(self.trapNotFound) + return d + + d = self.storage.hasCallbacks(jid, nodeIdentifier) + d.addCallback(subscribeOrItems) + d.addCallback(lambda _: self.storage.addCallback(jid, nodeIdentifier, + callback)) return d def unsubscribeCallback(self, jid, nodeIdentifier, callback): - try: - callbackList = self.callbacks[jid, nodeIdentifier] - callbackList.remove(callback) - except KeyError: - return defer.fail(error.NotSubscribed()) + """ + Unsubscribe a callback. + + If this was the last registered callback for this node, the + gateway will unsubscribe from node. + """ - if not callbackList: - self.unsubscribe(jid, nodeIdentifier, self.jid) + def cb(last): + if last: + return self.unsubscribe(jid, nodeIdentifier, self.jid) - return defer.succeed(None) + d = self.storage.removeCallback(jid, nodeIdentifier, callback) + d.addCallback(cb) + return d def itemsReceived(self, event): @@ -446,6 +457,10 @@ def _postTo(self, callbacks, service, nodeIdentifier, payload=None, contentType=None, eventType=None): + + if not callbacks: + return + postdata = None nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier) headers = {'Referer': nodeURI.encode('utf-8'), @@ -469,16 +484,22 @@ for callbackURI in callbacks: reactor.callLater(0, postNotification, callbackURI) + def callCallbacks(self, service, nodeIdentifier, payload=None, contentType=None, eventType=None): - try: - callbacks = self.callbacks[service, nodeIdentifier] - except KeyError: - return + + def eb(failure): + failure.trap(error.NoCallbacks) - self._postTo(callbacks, service, nodeIdentifier, payload, contentType, - eventType) + # No callbacks were registered for this node. Unsubscribe. + d = self.unsubscribe(service, nodeIdentifier, self.jid) + return d + d = self.storage.getCallbacks(service, nodeIdentifier) + d.addCallback(self._postTo, service, nodeIdentifier, payload, + contentType, eventType) + d.addErrback(eb) + d.addErrback(log.err)