Mercurial > libervia-pubsub
changeset 187:69cdd8c6a431
Make sure second subscribers through HTTP also get a notification of the
last item.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Thu, 17 Apr 2008 16:02:22 +0000 |
parents | 365fd3e4daf8 |
children | a5d267289e92 |
files | idavoll/gateway.py idavoll/test/test_backend.py idavoll/test/test_gateway.py |
diffstat | 3 files changed, 76 insertions(+), 13 deletions(-) [+] |
line wrap: on
line diff
--- a/idavoll/gateway.py Wed Apr 16 14:14:27 2008 +0000 +++ b/idavoll/gateway.py Thu Apr 17 16:02:22 2008 +0000 @@ -370,13 +370,24 @@ self.callbacks[jid, nodeIdentifier] = callbackList return callbackList + def callbackForLastItem(items, callback): + atomEntries = extractAtomEntries(items) + + if not atomEntries: + return + + self._postTo([callback], jid, nodeIdentifier, atomEntries[0], + 'application/atom+xml;type=entry') + try: callbackList = self.callbacks[jid, nodeIdentifier] except KeyError: d = self.subscribe(jid, nodeIdentifier, self.jid) d.addCallback(newCallbackList) else: - d = defer.succeed(callbackList) + d = self.items(jid, nodeIdentifier, 1) + d.addCallback(callbackForLastItem, callback) + d.addCallback(lambda _: callbackList) d.addCallback(lambda callbackList: callbackList.add(callback)) d.addErrback(self.trapNotFound) @@ -415,8 +426,7 @@ payload = constructFeed(service, nodeIdentifier, atomEntries, title='Received item collection') - self.callCallbacks(recipient, service, nodeIdentifier, payload, - contentType) + self.callCallbacks(service, nodeIdentifier, payload, contentType) def deleteReceived(self, recipient, service, nodeIdentifier): @@ -424,17 +434,11 @@ Fire up HTTP client to do callback """ - self.callCallbacks(recipient, service, nodeIdentifier, - eventType='DELETED') + self.callCallbacks(service, nodeIdentifier, eventType='DELETED') - def callCallbacks(self, recipient, service, nodeIdentifier, - payload=None, contentType=None, eventType=None): - try: - callbacks = self.callbacks[service, nodeIdentifier] - except KeyError: - return - + def _postTo(self, callbacks, service, nodeIdentifier, + payload=None, contentType=None, eventType=None): postdata = None nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier) headers = {'Referer': nodeURI.encode('utf-8'), @@ -458,6 +462,17 @@ for callbackURI in callbacks: reactor.callLater(0, postNotification, callbackURI) + def callCallbacks(self, service, nodeIdentifier, + payload=None, contentType=None, eventType=None): + try: + callbacks = self.callbacks[service, nodeIdentifier] + except KeyError: + return + + self._postTo(callbacks, service, nodeIdentifier, payload, contentType, + eventType) + + class RemoteSubscribeBaseResource(resource.Resource):
--- a/idavoll/test/test_backend.py Wed Apr 16 14:14:27 2008 +0000 +++ b/idavoll/test/test_backend.py Thu Apr 17 16:02:22 2008 +0000 @@ -132,7 +132,6 @@ return defer.succeed(testNode()) def cb(data): - print [ITEM] == data['items'] self.assertEquals('node', data['node_id']) self.assertEquals([ITEM], data['items']) self.assertEquals(OWNER, data['subscriber'])
--- a/idavoll/test/test_gateway.py Wed Apr 16 14:14:27 2008 +0000 +++ b/idavoll/test/test_gateway.py Thu Apr 17 16:02:22 2008 +0000 @@ -29,6 +29,7 @@ componentJID = "test.ik.nu" class GatewayTest(unittest.TestCase): + timeout = 2 def setUp(self): self.client = gateway.GatewayClient(baseURI) @@ -140,6 +141,54 @@ d.addCallback(cb2) return defer.gatherResults([d, self.client.deferred]) + def test_subscribeGetDelayedNotification2(self): + """ + Test that subscribing as second results in a notification being sent. + """ + + def onNotification1(data, headers): + client1.deferred.callback(None) + client1.stopService() + + def onNotification2(data, headers): + client2.deferred.callback(None) + client2.stopService() + + def cb(response): + xmppURI = response['uri'] + self.assertNot(client1.deferred.called) + self.assertNot(client2.deferred.called) + d = self.client.publish(entry, xmppURI) + d.addCallback(lambda _: xmppURI) + return d + + def cb2(xmppURI): + d = client1.subscribe(xmppURI) + d.addCallback(lambda _: xmppURI) + return d + + def cb3(xmppURI): + d = client2.subscribe(xmppURI) + return d + + client1 = gateway.GatewayClient(baseURI, callbackPort=8088) + client1.startService() + client1.callback = onNotification1 + client1.deferred = defer.Deferred() + client2 = gateway.GatewayClient(baseURI, callbackPort=8089) + client2.startService() + client2.callback = onNotification2 + client2.deferred = defer.Deferred() + + + d = self.client.create() + d.addCallback(cb) + d.addCallback(cb2) + d.addCallback(cb3) + dl = defer.gatherResults([d, client1.deferred, client2.deferred]) + return dl + + def test_subscribeNonExisting(self): def cb(err): self.assertEqual('404', err.status)