comparison idavoll/gateway.py @ 204:b4bf0a5ce50d

Implement storage facilities for the HTTP gateway. Author: ralphm. Fixes #12. One of the storage facilities is PostgreSQL based, providing persistence.
author Ralph Meijer <ralphm@ik.nu>
date Wed, 16 Jul 2008 06:38:32 +0000
parents 2c46e6664680
children 274a45d2a5ab
comparison
equal deleted inserted replaced
203:2c46e6664680 204:b4bf0a5ce50d
350 350
351 Subscriptions are created with a callback HTTP URI that is POSTed 351 Subscriptions are created with a callback HTTP URI that is POSTed
352 to with the received items in notifications. 352 to with the received items in notifications.
353 """ 353 """
354 354
355 def __init__(self, jid): 355 def __init__(self, jid, storage):
356 self.jid = jid 356 self.jid = jid
357 357 self.storage = storage
358
359 def startService(self):
360 self.callbacks = {}
361 358
362 359
363 def trapNotFound(self, failure): 360 def trapNotFound(self, failure):
364 failure.trap(StanzaError) 361 failure.trap(StanzaError)
365 if not failure.value.condition == 'item-not-found': 362
366 raise failure 363 if failure.value.condition == 'item-not-found':
367 raise error.NodeNotFound 364 raise error.NodeNotFound()
365 else:
366 return failure
368 367
369 368
370 def subscribeCallback(self, jid, nodeIdentifier, callback): 369 def subscribeCallback(self, jid, nodeIdentifier, callback):
371 370 """
372 def newCallbackList(result): 371 Subscribe a callback URI.
373 callbackList = set() 372
374 self.callbacks[jid, nodeIdentifier] = callbackList 373 This registers a callback URI to be called when a notification is
375 return callbackList 374 received for the given node.
376 375
377 def callbackForLastItem(items, callback): 376 If this is the first callback registered for this node, the gateway
377 will subscribe to the node. Otherwise, the most recently published item
378 for this node is retrieved and, if present, the newly registered
379 callback will be called with that item.
380 """
381
382 def callbackForLastItem(items):
378 atomEntries = extractAtomEntries(items) 383 atomEntries = extractAtomEntries(items)
379 384
380 if not atomEntries: 385 if not atomEntries:
381 return 386 return
382 387
383 self._postTo([callback], jid, nodeIdentifier, atomEntries[0], 388 self._postTo([callback], jid, nodeIdentifier, atomEntries[0],
384 'application/atom+xml;type=entry') 389 'application/atom+xml;type=entry')
385 390
386 try: 391 def subscribeOrItems(hasCallbacks):
387 callbackList = self.callbacks[jid, nodeIdentifier] 392 if hasCallbacks:
388 except KeyError: 393 d = self.items(jid, nodeIdentifier, 1)
389 d = self.subscribe(jid, nodeIdentifier, self.jid) 394 d.addCallback(callbackForLastItem)
390 d.addCallback(newCallbackList) 395 else:
391 else: 396 d = self.subscribe(jid, nodeIdentifier, self.jid)
392 d = self.items(jid, nodeIdentifier, 1) 397
393 d.addCallback(callbackForLastItem, callback) 398 d.addErrback(self.trapNotFound)
394 d.addCallback(lambda _: callbackList) 399 return d
395 400
396 d.addCallback(lambda callbackList: callbackList.add(callback)) 401 d = self.storage.hasCallbacks(jid, nodeIdentifier)
397 d.addErrback(self.trapNotFound) 402 d.addCallback(subscribeOrItems)
403 d.addCallback(lambda _: self.storage.addCallback(jid, nodeIdentifier,
404 callback))
398 return d 405 return d
399 406
400 407
401 def unsubscribeCallback(self, jid, nodeIdentifier, callback): 408 def unsubscribeCallback(self, jid, nodeIdentifier, callback):
402 try: 409 """
403 callbackList = self.callbacks[jid, nodeIdentifier] 410 Unsubscribe a callback.
404 callbackList.remove(callback) 411
405 except KeyError: 412 If this was the last registered callback for this node, the
406 return defer.fail(error.NotSubscribed()) 413 gateway will unsubscribe from node.
407 414 """
408 if not callbackList: 415
409 self.unsubscribe(jid, nodeIdentifier, self.jid) 416 def cb(last):
410 417 if last:
411 return defer.succeed(None) 418 return self.unsubscribe(jid, nodeIdentifier, self.jid)
419
420 d = self.storage.removeCallback(jid, nodeIdentifier, callback)
421 d.addCallback(cb)
422 return d
412 423
413 424
414 def itemsReceived(self, event): 425 def itemsReceived(self, event):
415 """ 426 """
416 Fire up HTTP client to do callback 427 Fire up HTTP client to do callback
444 eventType='DELETED') 455 eventType='DELETED')
445 456
446 457
447 def _postTo(self, callbacks, service, nodeIdentifier, 458 def _postTo(self, callbacks, service, nodeIdentifier,
448 payload=None, contentType=None, eventType=None): 459 payload=None, contentType=None, eventType=None):
460
461 if not callbacks:
462 return
463
449 postdata = None 464 postdata = None
450 nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier) 465 nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier)
451 headers = {'Referer': nodeURI.encode('utf-8'), 466 headers = {'Referer': nodeURI.encode('utf-8'),
452 'PubSub-Service': service.full().encode('utf-8')} 467 'PubSub-Service': service.full().encode('utf-8')}
453 468
467 d.addErrback(log.err) 482 d.addErrback(log.err)
468 483
469 for callbackURI in callbacks: 484 for callbackURI in callbacks:
470 reactor.callLater(0, postNotification, callbackURI) 485 reactor.callLater(0, postNotification, callbackURI)
471 486
487
472 def callCallbacks(self, service, nodeIdentifier, 488 def callCallbacks(self, service, nodeIdentifier,
473 payload=None, contentType=None, eventType=None): 489 payload=None, contentType=None, eventType=None):
474 try: 490
475 callbacks = self.callbacks[service, nodeIdentifier] 491 def eb(failure):
476 except KeyError: 492 failure.trap(error.NoCallbacks)
477 return 493
478 494 # No callbacks were registered for this node. Unsubscribe.
479 self._postTo(callbacks, service, nodeIdentifier, payload, contentType, 495 d = self.unsubscribe(service, nodeIdentifier, self.jid)
480 eventType) 496 return d
481 497
498 d = self.storage.getCallbacks(service, nodeIdentifier)
499 d.addCallback(self._postTo, service, nodeIdentifier, payload,
500 contentType, eventType)
501 d.addErrback(eb)
502 d.addErrback(log.err)
482 503
483 504
484 505
485 class RemoteSubscribeBaseResource(resource.Resource): 506 class RemoteSubscribeBaseResource(resource.Resource):
486 """ 507 """