Mercurial > libervia-pubsub
comparison idavoll/gateway.py @ 204:b4bf0a5ce50d
Implement storage facilities for the HTTP gateway.
Author: ralphm.
Fixes #12.
One of the storage facilities is PostgreSQL based, providing persistence.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Wed, 16 Jul 2008 06:38:32 +0000 |
parents | 2c46e6664680 |
children | 274a45d2a5ab |
comparison
equal
deleted
inserted
replaced
203:2c46e6664680 | 204:b4bf0a5ce50d |
---|---|
350 | 350 |
351 Subscriptions are created with a callback HTTP URI that is POSTed | 351 Subscriptions are created with a callback HTTP URI that is POSTed |
352 to with the received items in notifications. | 352 to with the received items in notifications. |
353 """ | 353 """ |
354 | 354 |
355 def __init__(self, jid): | 355 def __init__(self, jid, storage): |
356 self.jid = jid | 356 self.jid = jid |
357 | 357 self.storage = storage |
358 | |
359 def startService(self): | |
360 self.callbacks = {} | |
361 | 358 |
362 | 359 |
363 def trapNotFound(self, failure): | 360 def trapNotFound(self, failure): |
364 failure.trap(StanzaError) | 361 failure.trap(StanzaError) |
365 if not failure.value.condition == 'item-not-found': | 362 |
366 raise failure | 363 if failure.value.condition == 'item-not-found': |
367 raise error.NodeNotFound | 364 raise error.NodeNotFound() |
365 else: | |
366 return failure | |
368 | 367 |
369 | 368 |
370 def subscribeCallback(self, jid, nodeIdentifier, callback): | 369 def subscribeCallback(self, jid, nodeIdentifier, callback): |
371 | 370 """ |
372 def newCallbackList(result): | 371 Subscribe a callback URI. |
373 callbackList = set() | 372 |
374 self.callbacks[jid, nodeIdentifier] = callbackList | 373 This registers a callback URI to be called when a notification is |
375 return callbackList | 374 received for the given node. |
376 | 375 |
377 def callbackForLastItem(items, callback): | 376 If this is the first callback registered for this node, the gateway |
377 will subscribe to the node. Otherwise, the most recently published item | |
378 for this node is retrieved and, if present, the newly registered | |
379 callback will be called with that item. | |
380 """ | |
381 | |
382 def callbackForLastItem(items): | |
378 atomEntries = extractAtomEntries(items) | 383 atomEntries = extractAtomEntries(items) |
379 | 384 |
380 if not atomEntries: | 385 if not atomEntries: |
381 return | 386 return |
382 | 387 |
383 self._postTo([callback], jid, nodeIdentifier, atomEntries[0], | 388 self._postTo([callback], jid, nodeIdentifier, atomEntries[0], |
384 'application/atom+xml;type=entry') | 389 'application/atom+xml;type=entry') |
385 | 390 |
386 try: | 391 def subscribeOrItems(hasCallbacks): |
387 callbackList = self.callbacks[jid, nodeIdentifier] | 392 if hasCallbacks: |
388 except KeyError: | 393 d = self.items(jid, nodeIdentifier, 1) |
389 d = self.subscribe(jid, nodeIdentifier, self.jid) | 394 d.addCallback(callbackForLastItem) |
390 d.addCallback(newCallbackList) | 395 else: |
391 else: | 396 d = self.subscribe(jid, nodeIdentifier, self.jid) |
392 d = self.items(jid, nodeIdentifier, 1) | 397 |
393 d.addCallback(callbackForLastItem, callback) | 398 d.addErrback(self.trapNotFound) |
394 d.addCallback(lambda _: callbackList) | 399 return d |
395 | 400 |
396 d.addCallback(lambda callbackList: callbackList.add(callback)) | 401 d = self.storage.hasCallbacks(jid, nodeIdentifier) |
397 d.addErrback(self.trapNotFound) | 402 d.addCallback(subscribeOrItems) |
403 d.addCallback(lambda _: self.storage.addCallback(jid, nodeIdentifier, | |
404 callback)) | |
398 return d | 405 return d |
399 | 406 |
400 | 407 |
401 def unsubscribeCallback(self, jid, nodeIdentifier, callback): | 408 def unsubscribeCallback(self, jid, nodeIdentifier, callback): |
402 try: | 409 """ |
403 callbackList = self.callbacks[jid, nodeIdentifier] | 410 Unsubscribe a callback. |
404 callbackList.remove(callback) | 411 |
405 except KeyError: | 412 If this was the last registered callback for this node, the |
406 return defer.fail(error.NotSubscribed()) | 413 gateway will unsubscribe from node. |
407 | 414 """ |
408 if not callbackList: | 415 |
409 self.unsubscribe(jid, nodeIdentifier, self.jid) | 416 def cb(last): |
410 | 417 if last: |
411 return defer.succeed(None) | 418 return self.unsubscribe(jid, nodeIdentifier, self.jid) |
419 | |
420 d = self.storage.removeCallback(jid, nodeIdentifier, callback) | |
421 d.addCallback(cb) | |
422 return d | |
412 | 423 |
413 | 424 |
414 def itemsReceived(self, event): | 425 def itemsReceived(self, event): |
415 """ | 426 """ |
416 Fire up HTTP client to do callback | 427 Fire up HTTP client to do callback |
444 eventType='DELETED') | 455 eventType='DELETED') |
445 | 456 |
446 | 457 |
447 def _postTo(self, callbacks, service, nodeIdentifier, | 458 def _postTo(self, callbacks, service, nodeIdentifier, |
448 payload=None, contentType=None, eventType=None): | 459 payload=None, contentType=None, eventType=None): |
460 | |
461 if not callbacks: | |
462 return | |
463 | |
449 postdata = None | 464 postdata = None |
450 nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier) | 465 nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier) |
451 headers = {'Referer': nodeURI.encode('utf-8'), | 466 headers = {'Referer': nodeURI.encode('utf-8'), |
452 'PubSub-Service': service.full().encode('utf-8')} | 467 'PubSub-Service': service.full().encode('utf-8')} |
453 | 468 |
467 d.addErrback(log.err) | 482 d.addErrback(log.err) |
468 | 483 |
469 for callbackURI in callbacks: | 484 for callbackURI in callbacks: |
470 reactor.callLater(0, postNotification, callbackURI) | 485 reactor.callLater(0, postNotification, callbackURI) |
471 | 486 |
487 | |
472 def callCallbacks(self, service, nodeIdentifier, | 488 def callCallbacks(self, service, nodeIdentifier, |
473 payload=None, contentType=None, eventType=None): | 489 payload=None, contentType=None, eventType=None): |
474 try: | 490 |
475 callbacks = self.callbacks[service, nodeIdentifier] | 491 def eb(failure): |
476 except KeyError: | 492 failure.trap(error.NoCallbacks) |
477 return | 493 |
478 | 494 # No callbacks were registered for this node. Unsubscribe. |
479 self._postTo(callbacks, service, nodeIdentifier, payload, contentType, | 495 d = self.unsubscribe(service, nodeIdentifier, self.jid) |
480 eventType) | 496 return d |
481 | 497 |
498 d = self.storage.getCallbacks(service, nodeIdentifier) | |
499 d.addCallback(self._postTo, service, nodeIdentifier, payload, | |
500 contentType, eventType) | |
501 d.addErrback(eb) | |
502 d.addErrback(log.err) | |
482 | 503 |
483 | 504 |
484 | 505 |
485 class RemoteSubscribeBaseResource(resource.Resource): | 506 class RemoteSubscribeBaseResource(resource.Resource): |
486 """ | 507 """ |