# HG changeset patch # User Ralph Meijer # Date 1216190312 0 # Node ID b4bf0a5ce50dd320468b5bd4b4a63ba8cf0ccc7e # Parent 2c46e6664680bffd773126fa8bb8a35515e715d5 Implement storage facilities for the HTTP gateway. Author: ralphm. Fixes #12. One of the storage facilities is PostgreSQL based, providing persistence. diff -r 2c46e6664680 -r b4bf0a5ce50d idavoll/error.py --- a/idavoll/error.py Mon Jul 14 09:16:16 2008 +0000 +++ b/idavoll/error.py Wed Jul 16 06:38:32 2008 +0000 @@ -16,14 +16,19 @@ pass + class NotSubscribed(Error): """ Entity is not subscribed to this node. """ + class SubscriptionExists(Error): - pass + """ + There already exists a subscription to this node. + """ + class Forbidden(Error): @@ -56,3 +61,10 @@ class NoRootNode(Error): pass + + + +class NoCallbacks(Error): + """ + There are no callbacks for this node. + """ diff -r 2c46e6664680 -r b4bf0a5ce50d idavoll/gateway.py --- a/idavoll/gateway.py Mon Jul 14 09:16:16 2008 +0000 +++ b/idavoll/gateway.py Wed Jul 16 06:38:32 2008 +0000 @@ -352,29 +352,34 @@ to with the received items in notifications. """ - def __init__(self, jid): + def __init__(self, jid, storage): self.jid = jid - - - def startService(self): - self.callbacks = {} + self.storage = storage def trapNotFound(self, failure): failure.trap(StanzaError) - if not failure.value.condition == 'item-not-found': - raise failure - raise error.NodeNotFound + + if failure.value.condition == 'item-not-found': + raise error.NodeNotFound() + else: + return failure def subscribeCallback(self, jid, nodeIdentifier, callback): + """ + Subscribe a callback URI. - def newCallbackList(result): - callbackList = set() - self.callbacks[jid, nodeIdentifier] = callbackList - return callbackList + This registers a callback URI to be called when a notification is + received for the given node. - def callbackForLastItem(items, callback): + If this is the first callback registered for this node, the gateway + will subscribe to the node. Otherwise, the most recently published item + for this node is retrieved and, if present, the newly registered + callback will be called with that item. + """ + + def callbackForLastItem(items): atomEntries = extractAtomEntries(items) if not atomEntries: @@ -383,32 +388,38 @@ self._postTo([callback], jid, nodeIdentifier, atomEntries[0], 'application/atom+xml;type=entry') - try: - callbackList = self.callbacks[jid, nodeIdentifier] - except KeyError: - d = self.subscribe(jid, nodeIdentifier, self.jid) - d.addCallback(newCallbackList) - else: - d = self.items(jid, nodeIdentifier, 1) - d.addCallback(callbackForLastItem, callback) - d.addCallback(lambda _: callbackList) + def subscribeOrItems(hasCallbacks): + if hasCallbacks: + d = self.items(jid, nodeIdentifier, 1) + d.addCallback(callbackForLastItem) + else: + d = self.subscribe(jid, nodeIdentifier, self.jid) - d.addCallback(lambda callbackList: callbackList.add(callback)) - d.addErrback(self.trapNotFound) + d.addErrback(self.trapNotFound) + return d + + d = self.storage.hasCallbacks(jid, nodeIdentifier) + d.addCallback(subscribeOrItems) + d.addCallback(lambda _: self.storage.addCallback(jid, nodeIdentifier, + callback)) return d def unsubscribeCallback(self, jid, nodeIdentifier, callback): - try: - callbackList = self.callbacks[jid, nodeIdentifier] - callbackList.remove(callback) - except KeyError: - return defer.fail(error.NotSubscribed()) + """ + Unsubscribe a callback. + + If this was the last registered callback for this node, the + gateway will unsubscribe from node. + """ - if not callbackList: - self.unsubscribe(jid, nodeIdentifier, self.jid) + def cb(last): + if last: + return self.unsubscribe(jid, nodeIdentifier, self.jid) - return defer.succeed(None) + d = self.storage.removeCallback(jid, nodeIdentifier, callback) + d.addCallback(cb) + return d def itemsReceived(self, event): @@ -446,6 +457,10 @@ def _postTo(self, callbacks, service, nodeIdentifier, payload=None, contentType=None, eventType=None): + + if not callbacks: + return + postdata = None nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier) headers = {'Referer': nodeURI.encode('utf-8'), @@ -469,16 +484,22 @@ for callbackURI in callbacks: reactor.callLater(0, postNotification, callbackURI) + def callCallbacks(self, service, nodeIdentifier, payload=None, contentType=None, eventType=None): - try: - callbacks = self.callbacks[service, nodeIdentifier] - except KeyError: - return + + def eb(failure): + failure.trap(error.NoCallbacks) - self._postTo(callbacks, service, nodeIdentifier, payload, contentType, - eventType) + # No callbacks were registered for this node. Unsubscribe. + d = self.unsubscribe(service, nodeIdentifier, self.jid) + return d + d = self.storage.getCallbacks(service, nodeIdentifier) + d.addCallback(self._postTo, service, nodeIdentifier, payload, + contentType, eventType) + d.addErrback(eb) + d.addErrback(log.err) diff -r 2c46e6664680 -r b4bf0a5ce50d idavoll/iidavoll.py --- a/idavoll/iidavoll.py Mon Jul 14 09:16:16 2008 +0000 +++ b/idavoll/iidavoll.py Wed Jul 16 06:38:32 2008 +0000 @@ -466,3 +466,66 @@ @return: deferred that fires when the node has been purged. """ + + + +class IGatewayStorage(Interface): + + def addCallback(service, nodeIdentifier, callback): + """ + Register a callback URI. + + The registered HTTP callback URI will have an Atom Entry documented + POSTed to it upon receiving a notification for the given pubsub node. + + @param service: The XMPP entity that holds the node. + @type service: L{JID} + @param nodeIdentifier: The identifier of the publish-subscribe node. + @type nodeIdentifier: C{unicode}. + @param callback: The callback URI to be registered. + @type callback: C{str}. + @rtype: L{Deferred} + """ + + def removeCallback(service, nodeIdentifier, callback): + """ + Remove a registered callback URI. + + The returned deferred will fire with a boolean that signals wether or + not this was the last callback unregistered for this node. + + @param service: The XMPP entity that holds the node. + @type service: L{JID} + @param nodeIdentifier: The identifier of the publish-subscribe node. + @type nodeIdentifier: C{unicode}. + @param callback: The callback URI to be unregistered. + @type callback: C{str}. + @rtype: L{Deferred} + """ + + def getCallbacks(service, nodeIdentifier): + """ + Get the callbacks registered for this node. + + Returns a deferred that fires with the set of HTTP callback URIs + registered for this node. + + @param service: The XMPP entity that holds the node. + @type service: L{JID} + @param nodeIdentifier: The identifier of the publish-subscribe node. + @type nodeIdentifier: C{unicode}. + @rtype: L{Deferred} + """ + + + def hasCallbacks(service, nodeIdentifier): + """ + Return wether there are callbacks registered for a node. + + @param service: The XMPP entity that holds the node. + @type service: L{JID} + @param nodeIdentifier: The identifier of the publish-subscribe node. + @type nodeIdentifier: C{unicode}. + @returns: Deferred that fires with a boolean. + @rtype: L{Deferred} + """ diff -r 2c46e6664680 -r b4bf0a5ce50d idavoll/memory_storage.py --- a/idavoll/memory_storage.py Mon Jul 14 09:16:16 2008 +0000 +++ b/idavoll/memory_storage.py Wed Jul 16 06:38:32 2008 +0000 @@ -266,3 +266,53 @@ def __init__(self, state): self.state = state + + + +class GatewayStorage(object): + """ + Memory based storage facility for the XMPP-HTTP gateway. + """ + + def __init__(self): + self.callbacks = {} + + + def addCallback(self, service, nodeIdentifier, callback): + try: + callbacks = self.callbacks[service, nodeIdentifier] + except KeyError: + callbacks = set([callback]) + self.callbacks[service, nodeIdentifier] = callbacks + else: + callbacks.add(callback) + pass + + print self.callbacks + return defer.succeed(None) + + + def removeCallback(self, service, nodeIdentifier, callback): + try: + callbacks = self.callbacks[service, nodeIdentifier] + callbacks.remove(callback) + except KeyError: + return defer.fail(error.NotSubscribed()) + else: + if not callbacks: + del self.callbacks[service, nodeIdentifier] + + return defer.succeed(not callbacks) + + + def getCallbacks(self, service, nodeIdentifier): + try: + callbacks = self.callbacks[service, nodeIdentifier] + except KeyError: + return defer.fail(error.NoCallbacks()) + else: + return defer.succeed(callbacks) + + + def hasCallbacks(self, service, nodeIdentifier): + return defer.succeed((service, nodeIdentifier) in self.callbacks) diff -r 2c46e6664680 -r b4bf0a5ce50d idavoll/pgsql_storage.py --- a/idavoll/pgsql_storage.py Mon Jul 14 09:16:16 2008 +0000 +++ b/idavoll/pgsql_storage.py Wed Jul 16 06:38:32 2008 +0000 @@ -4,10 +4,7 @@ import copy from zope.interface import implements - -from twisted.enterprise import adbapi from twisted.words.protocols.jabber import jid - from wokkel.generic import parseXml from idavoll import error, iidavoll @@ -16,20 +13,13 @@ implements(iidavoll.IStorage) - def __init__(self, user, database, password=None, host=None, port=None): - self._dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', - user=user, - password=password, - database=database, - host=host, - port=port, - cp_reconnect=True, - client_encoding='utf-8' - ) + + def __init__(self, dbpool): + self.dbpool = dbpool def getNode(self, nodeIdentifier): - return self._dbpool.runInteraction(self._getNode, nodeIdentifier) + return self.dbpool.runInteraction(self._getNode, nodeIdentifier) def _getNode(self, cursor, nodeIdentifier): @@ -48,18 +38,18 @@ raise error.NodeNotFound() else: node = LeafNode(nodeIdentifier, configuration) - node._dbpool = self._dbpool + node.dbpool = self.dbpool return node def getNodeIds(self): - d = self._dbpool.runQuery("""SELECT node from nodes""") + d = self.dbpool.runQuery("""SELECT node from nodes""") d.addCallback(lambda results: [r[0] for r in results]) return d def createNode(self, nodeIdentifier, owner, config=None): - return self._dbpool.runInteraction(self._createNode, nodeIdentifier, + return self.dbpool.runInteraction(self._createNode, nodeIdentifier, owner) @@ -88,7 +78,7 @@ def deleteNode(self, nodeIdentifier): - return self._dbpool.runInteraction(self._deleteNode, nodeIdentifier) + return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier) def _deleteNode(self, cursor, nodeIdentifier): @@ -100,7 +90,7 @@ def getAffiliations(self, entity): - d = self._dbpool.runQuery("""SELECT node, affiliation FROM entities + d = self.dbpool.runQuery("""SELECT node, affiliation FROM entities JOIN affiliations ON (affiliations.entity_id=entities.id) JOIN nodes ON @@ -112,7 +102,7 @@ def getSubscriptions(self, entity): - d = self._dbpool.runQuery("""SELECT node, jid, resource, subscription + d = self.dbpool.runQuery("""SELECT node, jid, resource, subscription FROM entities JOIN subscriptions ON (subscriptions.entity_id=entities.id) JOIN nodes ON @@ -162,7 +152,7 @@ if option in config: config[option] = options[option] - d = self._dbpool.runInteraction(self._setConfiguration, config) + d = self.dbpool.runInteraction(self._setConfiguration, config) d.addCallback(self._setCachedConfiguration, config) return d @@ -189,7 +179,7 @@ def getAffiliation(self, entity): - return self._dbpool.runInteraction(self._getAffiliation, entity) + return self.dbpool.runInteraction(self._getAffiliation, entity) def _getAffiliation(self, cursor, entity): @@ -208,7 +198,7 @@ def getSubscription(self, subscriber): - return self._dbpool.runInteraction(self._getSubscription, subscriber) + return self.dbpool.runInteraction(self._getSubscription, subscriber) def _getSubscription(self, cursor, subscriber): @@ -232,7 +222,7 @@ def addSubscription(self, subscriber, state): - return self._dbpool.runInteraction(self._addSubscription, subscriber, + return self.dbpool.runInteraction(self._addSubscription, subscriber, state) @@ -264,7 +254,7 @@ def removeSubscription(self, subscriber): - return self._dbpool.runInteraction(self._removeSubscription, + return self.dbpool.runInteraction(self._removeSubscription, subscriber) @@ -288,7 +278,7 @@ def getSubscribers(self): - d = self._dbpool.runInteraction(self._getSubscribers) + d = self.dbpool.runInteraction(self._getSubscribers) d.addCallback(self._convertToJIDs) return d @@ -309,7 +299,7 @@ def isSubscribed(self, entity): - return self._dbpool.runInteraction(self._isSubscribed, entity) + return self.dbpool.runInteraction(self._isSubscribed, entity) def _isSubscribed(self, cursor, entity): @@ -329,7 +319,7 @@ def getAffiliations(self): - return self._dbpool.runInteraction(self._getAffiliations) + return self.dbpool.runInteraction(self._getAffiliations) def _getAffiliations(self, cursor): @@ -353,7 +343,7 @@ nodeType = 'leaf' def storeItems(self, items, publisher): - return self._dbpool.runInteraction(self._storeItems, items, publisher) + return self.dbpool.runInteraction(self._storeItems, items, publisher) def _storeItems(self, cursor, items, publisher): @@ -384,7 +374,7 @@ def removeItems(self, itemIdentifiers): - return self._dbpool.runInteraction(self._removeItems, itemIdentifiers) + return self.dbpool.runInteraction(self._removeItems, itemIdentifiers) def _removeItems(self, cursor, itemIdentifiers): @@ -406,7 +396,7 @@ def getItems(self, maxItems=None): - return self._dbpool.runInteraction(self._getItems, maxItems) + return self.dbpool.runInteraction(self._getItems, maxItems) def _getItems(self, cursor, maxItems): @@ -426,7 +416,7 @@ def getItemsById(self, itemIdentifiers): - return self._dbpool.runInteraction(self._getItemsById, itemIdentifiers) + return self.dbpool.runInteraction(self._getItemsById, itemIdentifiers) def _getItemsById(self, cursor, itemIdentifiers): @@ -445,7 +435,7 @@ def purge(self): - return self._dbpool.runInteraction(self._purge) + return self.dbpool.runInteraction(self._purge) def _purge(self, cursor): @@ -460,3 +450,84 @@ class LeafNode(Node, LeafNodeMixin): implements(iidavoll.ILeafNode) + + + +class GatewayStorage(object): + """ + Memory based storage facility for the XMPP-HTTP gateway. + """ + + def __init__(self, dbpool): + self.dbpool = dbpool + + + def _countCallbacks(self, cursor, service, nodeIdentifier): + """ + Count number of callbacks registered for a node. + """ + cursor.execute("""SELECT count(*) FROM callbacks + WHERE service=%s and node=%s""", + service.full(), + nodeIdentifier) + results = cursor.fetchall() + return results[0][0] + + + def addCallback(self, service, nodeIdentifier, callback): + def interaction(cursor): + cursor.execute("""SELECT 1 FROM callbacks + WHERE service=%s and node=%s and uri=%s""", + service.full(), + nodeIdentifier, + callback) + if cursor.fetchall(): + raise error.SubscriptionExists() + + cursor.execute("""INSERT INTO callbacks + (service, node, uri) VALUES + (%s, %s, %s)""", + service.full(), + nodeIdentifier, + callback) + + return self.dbpool.runInteraction(interaction) + + + def removeCallback(self, service, nodeIdentifier, callback): + def interaction(cursor): + cursor.execute("""DELETE FROM callbacks + WHERE service=%s and node=%s and uri=%s""", + service.full(), + nodeIdentifier, + callback) + + if cursor.rowcount != 1: + raise error.NotSubscribed() + + last = not self._countCallbacks(cursor, service, nodeIdentifier) + return last + + return self.dbpool.runInteraction(interaction) + + def getCallbacks(self, service, nodeIdentifier): + def interaction(cursor): + cursor.execute("""SELECT uri FROM callbacks + WHERE service=%s and node=%s""", + service.full(), + nodeIdentifier) + results = cursor.fetchall() + + if not results: + raise error.NoCallbacks() + + return [result[0] for result in results] + + return self.dbpool.runInteraction(interaction) + + + def hasCallbacks(self, service, nodeIdentifier): + def interaction(cursor): + return bool(self._countCallbacks(cursor, service, nodeIdentifier)) + + return self.dbpool.runInteraction(interaction) diff -r 2c46e6664680 -r b4bf0a5ce50d idavoll/tap.py --- a/idavoll/tap.py Mon Jul 14 09:16:16 2008 +0000 +++ b/idavoll/tap.py Wed Jul 16 06:38:32 2008 +0000 @@ -45,12 +45,18 @@ # Create backend service with storage if config['backend'] == 'pgsql': + from twisted.enterprise import adbapi from idavoll.pgsql_storage import Storage - st = Storage(user=config['dbuser'], - database=config['dbname'], - password=config['dbpass'], - host=config['dbhost'], - port=config['dbport']) + dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', + user=config['dbuser'], + password=config['dbuser'], + database=config['dbname'], + host=config['dbpass'], + port=config['dbport'], + cp_reconnect=True, + client_encoding='utf-8', + ) + st = Storage(dbpool) elif config['backend'] == 'memory': from idavoll.memory_storage import Storage st = Storage() diff -r 2c46e6664680 -r b4bf0a5ce50d idavoll/tap_http.py --- a/idavoll/tap_http.py Mon Jul 14 09:16:16 2008 +0000 +++ b/idavoll/tap_http.py Wed Jul 16 06:38:32 2008 +0000 @@ -36,7 +36,14 @@ # Set up XMPP service for subscribing to remote nodes - ss = RemoteSubscriptionService(config['jid']) + if config['backend'] == 'pgsql': + from idavoll.pgsql_storage import GatewayStorage + gst = GatewayStorage(bs.storage.dbpool) + elif config['backend'] == 'memory': + from idavoll.memory_storage import GatewayStorage + gst = GatewayStorage() + + ss = RemoteSubscriptionService(config['jid'], gst) ss.setHandlerParent(cs) ss.startService() diff -r 2c46e6664680 -r b4bf0a5ce50d idavoll/test/test_gateway.py --- a/idavoll/test/test_gateway.py Mon Jul 14 09:16:16 2008 +0000 +++ b/idavoll/test/test_gateway.py Wed Jul 16 06:38:32 2008 +0000 @@ -35,7 +35,7 @@ self.client.startService() def tearDown(self): - self.client.stopService() + return self.client.stopService() def test_create(self): @@ -93,6 +93,7 @@ d.addCallback(cb) return d + def test_subscribeGetNotification(self): def onNotification(data, headers): @@ -116,6 +117,50 @@ d.addCallback(cb2) return defer.gatherResults([d, self.client.deferred]) + + def test_subscribeTwiceGetNotification(self): + + def onNotification1(data, headers): + d = client1.stopService() + d.chainDeferred(client1.deferred) + + def onNotification2(data, headers): + d = client2.stopService() + d.chainDeferred(client2.deferred) + + def cb(response): + xmppURI = response['uri'] + d = client1.subscribe(xmppURI) + d.addCallback(lambda _: xmppURI) + return d + + def cb2(xmppURI): + d = client2.subscribe(xmppURI) + d.addCallback(lambda _: xmppURI) + return d + + def cb3(xmppURI): + d = self.client.publish(entry, xmppURI) + return d + + + client1 = gateway.GatewayClient(baseURI, callbackPort=8088) + client1.startService() + client1.callback = onNotification1 + client1.deferred = defer.Deferred() + client2 = gateway.GatewayClient(baseURI, callbackPort=8089) + client2.startService() + client2.callback = onNotification2 + client2.deferred = defer.Deferred() + + d = self.client.create() + d.addCallback(cb) + d.addCallback(cb2) + d.addCallback(cb3) + dl = defer.gatherResults([d, client1.deferred, client2.deferred]) + return dl + + def test_subscribeGetDelayedNotification(self): def onNotification(data, headers): @@ -179,7 +224,6 @@ client2.callback = onNotification2 client2.deferred = defer.Deferred() - d = self.client.create() d.addCallback(cb) d.addCallback(cb2) diff -r 2c46e6664680 -r b4bf0a5ce50d idavoll/test/test_storage.py --- a/idavoll/test/test_storage.py Mon Jul 14 09:16:16 2008 +0000 +++ b/idavoll/test/test_storage.py Wed Jul 16 06:38:32 2008 +0000 @@ -414,22 +414,24 @@ class PgsqlStorageStorageTestCase(unittest.TestCase, StorageTests): - def _callSuperSetUp(self, void): - return StorageTests.setUp(self) - def setUp(self): from idavoll.pgsql_storage import Storage - self.s = Storage('ralphm', 'pubsub_test') - self.s._dbpool.start() - d = self.s._dbpool.runInteraction(self.init) - d.addCallback(self._callSuperSetUp) + from twisted.enterprise import adbapi + self.dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', + database='pubsub_test', + cp_reconnect=True, + client_encoding='utf-8', + ) + self.s = Storage(self.dbpool) + self.dbpool.start() + d = self.dbpool.runInteraction(self.init) + d.addCallback(lambda _: StorageTests.setUp(self)) return d - def tearDownClass(self): - #return self.s._dbpool.runInteraction(self.cleandb) - pass + def tearDown(self): + return self.dbpool.runInteraction(self.cleandb) def init(self, cursor):