comparison sat_pubsub/backend.py @ 232:923281d4c5bc

renamed idavoll directory to sat_pubsub
author Goffi <goffi@goffi.org>
date Thu, 17 May 2012 12:48:14 +0200
parents idavoll/backend.py@77029ecf9817
children 564ae55219e1
comparison
equal deleted inserted replaced
231:d99047cd90f9 232:923281d4c5bc
1 # -*- test-case-name: idavoll.test.test_backend -*-
2 #
3 # Copyright (c) 2003-2010 Ralph Meijer
4 # See LICENSE for details.
5
6 """
7 Generic publish-subscribe backend.
8
9 This module implements a generic publish-subscribe backend service with
10 business logic as per
11 U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>} that interacts with
12 a given storage facility. It also provides an adapter from the XMPP
13 publish-subscribe protocol.
14 """
15
16 import uuid
17
18 from zope.interface import implements
19
20 from twisted.application import service
21 from twisted.python import components, log
22 from twisted.internet import defer, reactor
23 from twisted.words.protocols.jabber.error import StanzaError
24 from twisted.words.xish import utility
25
26 from wokkel import disco
27 from wokkel.iwokkel import IPubSubResource
28 from wokkel.pubsub import PubSubResource, PubSubError
29
30 from idavoll import error, iidavoll
31 from idavoll.iidavoll import IBackendService, ILeafNode
32
33 def _getAffiliation(node, entity):
34 d = node.getAffiliation(entity)
35 d.addCallback(lambda affiliation: (node, affiliation))
36 return d
37
38
39
40 class BackendService(service.Service, utility.EventDispatcher):
41 """
42 Generic publish-subscribe backend service.
43
44 @cvar nodeOptions: Node configuration form as a mapping from the field
45 name to a dictionary that holds the field's type, label
46 and possible options to choose from.
47 @type nodeOptions: C{dict}.
48 @cvar defaultConfig: The default node configuration.
49 """
50
51 implements(iidavoll.IBackendService)
52
53 nodeOptions = {
54 "pubsub#persist_items":
55 {"type": "boolean",
56 "label": "Persist items to storage"},
57 "pubsub#deliver_payloads":
58 {"type": "boolean",
59 "label": "Deliver payloads with event notifications"},
60 "pubsub#send_last_published_item":
61 {"type": "list-single",
62 "label": "When to send the last published item",
63 "options": {
64 "never": "Never",
65 "on_sub": "When a new subscription is processed"}
66 },
67 }
68
69 subscriptionOptions = {
70 "pubsub#subscription_type":
71 {"type": "list-single",
72 "options": {
73 "items": "Receive notification of new items only",
74 "nodes": "Receive notification of new nodes only"}
75 },
76 "pubsub#subscription_depth":
77 {"type": "list-single",
78 "options": {
79 "1": "Receive notification from direct child nodes only",
80 "all": "Receive notification from all descendent nodes"}
81 },
82 }
83
84 def __init__(self, storage):
85 utility.EventDispatcher.__init__(self)
86 self.storage = storage
87 self._callbackList = []
88
89
90 def supportsPublisherAffiliation(self):
91 return True
92
93
94 def supportsOutcastAffiliation(self):
95 return True
96
97
98 def supportsPersistentItems(self):
99 return True
100
101
102 def getNodeType(self, nodeIdentifier):
103 d = self.storage.getNode(nodeIdentifier)
104 d.addCallback(lambda node: node.getType())
105 return d
106
107
108 def getNodes(self):
109 return self.storage.getNodeIds()
110
111
112 def getNodeMetaData(self, nodeIdentifier):
113 d = self.storage.getNode(nodeIdentifier)
114 d.addCallback(lambda node: node.getMetaData())
115 d.addCallback(self._makeMetaData)
116 return d
117
118
119 def _makeMetaData(self, metaData):
120 options = []
121 for key, value in metaData.iteritems():
122 if key in self.nodeOptions:
123 option = {"var": key}
124 option.update(self.nodeOptions[key])
125 option["value"] = value
126 options.append(option)
127
128 return options
129
130
131 def _checkAuth(self, node, requestor):
132 def check(affiliation, node):
133 if affiliation not in ['owner', 'publisher']:
134 raise error.Forbidden()
135 return node
136
137 d = node.getAffiliation(requestor)
138 d.addCallback(check, node)
139 return d
140
141
142 def publish(self, nodeIdentifier, items, requestor):
143 d = self.storage.getNode(nodeIdentifier)
144 d.addCallback(self._checkAuth, requestor)
145 d.addCallback(self._doPublish, items, requestor)
146 return d
147
148
149 def _doPublish(self, node, items, requestor):
150 if node.nodeType == 'collection':
151 raise error.NoPublishing()
152
153 configuration = node.getConfiguration()
154 persistItems = configuration["pubsub#persist_items"]
155 deliverPayloads = configuration["pubsub#deliver_payloads"]
156
157 if items and not persistItems and not deliverPayloads:
158 raise error.ItemForbidden()
159 elif not items and (persistItems or deliverPayloads):
160 raise error.ItemRequired()
161
162 if persistItems or deliverPayloads:
163 for item in items:
164 item.uri = None
165 item.defaultUri = None
166 if not item.getAttribute("id"):
167 item["id"] = str(uuid.uuid4())
168
169 if persistItems:
170 d = node.storeItems(items, requestor)
171 else:
172 d = defer.succeed(None)
173
174 d.addCallback(self._doNotify, node.nodeIdentifier, items,
175 deliverPayloads)
176 return d
177
178
179 def _doNotify(self, result, nodeIdentifier, items, deliverPayloads):
180 if items and not deliverPayloads:
181 for item in items:
182 item.children = []
183
184 self.dispatch({'items': items, 'nodeIdentifier': nodeIdentifier},
185 '//event/pubsub/notify')
186
187
188 def getNotifications(self, nodeIdentifier, items):
189
190 def toNotifications(subscriptions, nodeIdentifier, items):
191 subsBySubscriber = {}
192 for subscription in subscriptions:
193 if subscription.options.get('pubsub#subscription_type',
194 'items') == 'items':
195 subs = subsBySubscriber.setdefault(subscription.subscriber,
196 set())
197 subs.add(subscription)
198
199 notifications = [(subscriber, subscriptions, items)
200 for subscriber, subscriptions
201 in subsBySubscriber.iteritems()]
202
203 return notifications
204
205 def rootNotFound(failure):
206 failure.trap(error.NodeNotFound)
207 return []
208
209 d1 = self.storage.getNode(nodeIdentifier)
210 d1.addCallback(lambda node: node.getSubscriptions('subscribed'))
211 d2 = self.storage.getNode('')
212 d2.addCallback(lambda node: node.getSubscriptions('subscribed'))
213 d2.addErrback(rootNotFound)
214 d = defer.gatherResults([d1, d2])
215 d.addCallback(lambda result: result[0] + result[1])
216 d.addCallback(toNotifications, nodeIdentifier, items)
217 return d
218
219
220 def registerNotifier(self, observerfn, *args, **kwargs):
221 self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs)
222
223
224 def subscribe(self, nodeIdentifier, subscriber, requestor):
225 subscriberEntity = subscriber.userhostJID()
226 if subscriberEntity != requestor.userhostJID():
227 return defer.fail(error.Forbidden())
228
229 d = self.storage.getNode(nodeIdentifier)
230 d.addCallback(_getAffiliation, subscriberEntity)
231 d.addCallback(self._doSubscribe, subscriber)
232 return d
233
234
235 def _doSubscribe(self, result, subscriber):
236 node, affiliation = result
237
238 if affiliation == 'outcast':
239 raise error.Forbidden()
240
241 def trapExists(failure):
242 failure.trap(error.SubscriptionExists)
243 return False
244
245 def cb(sendLast):
246 d = node.getSubscription(subscriber)
247 if sendLast:
248 d.addCallback(self._sendLastPublished, node)
249 return d
250
251 d = node.addSubscription(subscriber, 'subscribed', {})
252 d.addCallbacks(lambda _: True, trapExists)
253 d.addCallback(cb)
254 return d
255
256
257 def _sendLastPublished(self, subscription, node):
258
259 def notifyItem(items):
260 if items:
261 reactor.callLater(0, self.dispatch,
262 {'items': items,
263 'nodeIdentifier': node.nodeIdentifier,
264 'subscription': subscription},
265 '//event/pubsub/notify')
266
267 config = node.getConfiguration()
268 sendLastPublished = config.get('pubsub#send_last_published_item',
269 'never')
270 if sendLastPublished == 'on_sub' and node.nodeType == 'leaf':
271 entity = subscription.subscriber.userhostJID()
272 d = self.getItems(node.nodeIdentifier, entity, 1)
273 d.addCallback(notifyItem)
274 d.addErrback(log.err)
275
276 return subscription
277
278
279 def unsubscribe(self, nodeIdentifier, subscriber, requestor):
280 if subscriber.userhostJID() != requestor.userhostJID():
281 return defer.fail(error.Forbidden())
282
283 d = self.storage.getNode(nodeIdentifier)
284 d.addCallback(lambda node: node.removeSubscription(subscriber))
285 return d
286
287
288 def getSubscriptions(self, entity):
289 return self.storage.getSubscriptions(entity)
290
291 def supportsAutoCreate(self):
292 return True
293
294 def supportsInstantNodes(self):
295 return True
296
297
298 def createNode(self, nodeIdentifier, requestor):
299 if not nodeIdentifier:
300 nodeIdentifier = 'generic/%s' % uuid.uuid4()
301
302 nodeType = 'leaf'
303 config = self.storage.getDefaultConfiguration(nodeType)
304 config['pubsub#node_type'] = nodeType
305
306 d = self.storage.createNode(nodeIdentifier, requestor, config)
307 d.addCallback(lambda _: nodeIdentifier)
308 return d
309
310
311 def getDefaultConfiguration(self, nodeType):
312 d = defer.succeed(self.storage.getDefaultConfiguration(nodeType))
313 return d
314
315
316 def getNodeConfiguration(self, nodeIdentifier):
317 if not nodeIdentifier:
318 return defer.fail(error.NoRootNode())
319
320 d = self.storage.getNode(nodeIdentifier)
321 d.addCallback(lambda node: node.getConfiguration())
322
323 return d
324
325
326 def setNodeConfiguration(self, nodeIdentifier, options, requestor):
327 if not nodeIdentifier:
328 return defer.fail(error.NoRootNode())
329
330 d = self.storage.getNode(nodeIdentifier)
331 d.addCallback(_getAffiliation, requestor)
332 d.addCallback(self._doSetNodeConfiguration, options)
333 return d
334
335
336 def _doSetNodeConfiguration(self, result, options):
337 node, affiliation = result
338
339 if affiliation != 'owner':
340 raise error.Forbidden()
341
342 return node.setConfiguration(options)
343
344
345 def getAffiliations(self, entity):
346 return self.storage.getAffiliations(entity)
347
348
349 def getItems(self, nodeIdentifier, requestor, maxItems=None,
350 itemIdentifiers=None):
351 d = self.storage.getNode(nodeIdentifier)
352 d.addCallback(_getAffiliation, requestor)
353 d.addCallback(self._doGetItems, maxItems, itemIdentifiers)
354 return d
355
356
357 def _doGetItems(self, result, maxItems, itemIdentifiers):
358 node, affiliation = result
359
360 if not ILeafNode.providedBy(node):
361 return []
362
363 if affiliation == 'outcast':
364 raise error.Forbidden()
365
366 if itemIdentifiers:
367 return node.getItemsById(itemIdentifiers)
368 else:
369 return node.getItems(maxItems)
370
371
372 def retractItem(self, nodeIdentifier, itemIdentifiers, requestor):
373 d = self.storage.getNode(nodeIdentifier)
374 d.addCallback(_getAffiliation, requestor)
375 d.addCallback(self._doRetract, itemIdentifiers)
376 return d
377
378
379 def _doRetract(self, result, itemIdentifiers):
380 node, affiliation = result
381 persistItems = node.getConfiguration()["pubsub#persist_items"]
382
383 if affiliation not in ['owner', 'publisher']:
384 raise error.Forbidden()
385
386 if not persistItems:
387 raise error.NodeNotPersistent()
388
389 d = node.removeItems(itemIdentifiers)
390 d.addCallback(self._doNotifyRetraction, node.nodeIdentifier)
391 return d
392
393
394 def _doNotifyRetraction(self, itemIdentifiers, nodeIdentifier):
395 self.dispatch({'itemIdentifiers': itemIdentifiers,
396 'nodeIdentifier': nodeIdentifier },
397 '//event/pubsub/retract')
398
399
400 def purgeNode(self, nodeIdentifier, requestor):
401 d = self.storage.getNode(nodeIdentifier)
402 d.addCallback(_getAffiliation, requestor)
403 d.addCallback(self._doPurge)
404 return d
405
406
407 def _doPurge(self, result):
408 node, affiliation = result
409 persistItems = node.getConfiguration()["pubsub#persist_items"]
410
411 if affiliation != 'owner':
412 raise error.Forbidden()
413
414 if not persistItems:
415 raise error.NodeNotPersistent()
416
417 d = node.purge()
418 d.addCallback(self._doNotifyPurge, node.nodeIdentifier)
419 return d
420
421
422 def _doNotifyPurge(self, result, nodeIdentifier):
423 self.dispatch(nodeIdentifier, '//event/pubsub/purge')
424
425
426 def registerPreDelete(self, preDeleteFn):
427 self._callbackList.append(preDeleteFn)
428
429
430 def getSubscribers(self, nodeIdentifier):
431 def cb(subscriptions):
432 return [subscription.subscriber for subscription in subscriptions]
433
434 d = self.storage.getNode(nodeIdentifier)
435 d.addCallback(lambda node: node.getSubscriptions('subscribed'))
436 d.addCallback(cb)
437 return d
438
439
440 def deleteNode(self, nodeIdentifier, requestor, redirectURI=None):
441 d = self.storage.getNode(nodeIdentifier)
442 d.addCallback(_getAffiliation, requestor)
443 d.addCallback(self._doPreDelete, redirectURI)
444 return d
445
446
447 def _doPreDelete(self, result, redirectURI):
448 node, affiliation = result
449
450 if affiliation != 'owner':
451 raise error.Forbidden()
452
453 data = {'nodeIdentifier': node.nodeIdentifier,
454 'redirectURI': redirectURI}
455
456 d = defer.DeferredList([cb(data)
457 for cb in self._callbackList],
458 consumeErrors=1)
459 d.addCallback(self._doDelete, node.nodeIdentifier)
460
461
462 def _doDelete(self, result, nodeIdentifier):
463 dl = []
464 for succeeded, r in result:
465 if succeeded and r:
466 dl.extend(r)
467
468 d = self.storage.deleteNode(nodeIdentifier)
469 d.addCallback(self._doNotifyDelete, dl)
470
471 return d
472
473
474 def _doNotifyDelete(self, result, dl):
475 for d in dl:
476 d.callback(None)
477
478
479
480 class PubSubResourceFromBackend(PubSubResource):
481 """
482 Adapts a backend to an xmpp publish-subscribe service.
483 """
484
485 features = [
486 "config-node",
487 "create-nodes",
488 "delete-any",
489 "delete-nodes",
490 "item-ids",
491 "meta-data",
492 "publish",
493 "purge-nodes",
494 "retract-items",
495 "retrieve-affiliations",
496 "retrieve-default",
497 "retrieve-items",
498 "retrieve-subscriptions",
499 "subscribe",
500 ]
501
502 discoIdentity = disco.DiscoIdentity('pubsub',
503 'service',
504 'Idavoll publish-subscribe service')
505
506 pubsubService = None
507
508 _errorMap = {
509 error.NodeNotFound: ('item-not-found', None, None),
510 error.NodeExists: ('conflict', None, None),
511 error.Forbidden: ('forbidden', None, None),
512 error.ItemForbidden: ('bad-request', 'item-forbidden', None),
513 error.ItemRequired: ('bad-request', 'item-required', None),
514 error.NoInstantNodes: ('not-acceptable',
515 'unsupported',
516 'instant-nodes'),
517 error.NotSubscribed: ('unexpected-request', 'not-subscribed', None),
518 error.InvalidConfigurationOption: ('not-acceptable', None, None),
519 error.InvalidConfigurationValue: ('not-acceptable', None, None),
520 error.NodeNotPersistent: ('feature-not-implemented',
521 'unsupported',
522 'persistent-node'),
523 error.NoRootNode: ('bad-request', None, None),
524 error.NoCollections: ('feature-not-implemented',
525 'unsupported',
526 'collections'),
527 error.NoPublishing: ('feature-not-implemented',
528 'unsupported',
529 'publish'),
530 }
531
532 def __init__(self, backend):
533 PubSubResource.__init__(self)
534
535 self.backend = backend
536 self.hideNodes = False
537
538 self.backend.registerNotifier(self._notify)
539 self.backend.registerPreDelete(self._preDelete)
540
541 if self.backend.supportsAutoCreate():
542 self.features.append("auto-create")
543
544 if self.backend.supportsInstantNodes():
545 self.features.append("instant-nodes")
546
547 if self.backend.supportsOutcastAffiliation():
548 self.features.append("outcast-affiliation")
549
550 if self.backend.supportsPersistentItems():
551 self.features.append("persistent-items")
552
553 if self.backend.supportsPublisherAffiliation():
554 self.features.append("publisher-affiliation")
555
556
557 def _notify(self, data):
558 items = data['items']
559 nodeIdentifier = data['nodeIdentifier']
560 if 'subscription' not in data:
561 d = self.backend.getNotifications(nodeIdentifier, items)
562 else:
563 subscription = data['subscription']
564 d = defer.succeed([(subscription.subscriber, [subscription],
565 items)])
566 d.addCallback(lambda notifications: self.pubsubService.notifyPublish(
567 self.serviceJID,
568 nodeIdentifier,
569 notifications))
570
571
572 def _preDelete(self, data):
573 nodeIdentifier = data['nodeIdentifier']
574 redirectURI = data.get('redirectURI', None)
575 d = self.backend.getSubscribers(nodeIdentifier)
576 d.addCallback(lambda subscribers: self.pubsubService.notifyDelete(
577 self.serviceJID,
578 nodeIdentifier,
579 subscribers,
580 redirectURI))
581 return d
582
583
584 def _mapErrors(self, failure):
585 e = failure.trap(*self._errorMap.keys())
586
587 condition, pubsubCondition, feature = self._errorMap[e]
588 msg = failure.value.msg
589
590 if pubsubCondition:
591 exc = PubSubError(condition, pubsubCondition, feature, msg)
592 else:
593 exc = StanzaError(condition, text=msg)
594
595 raise exc
596
597
598 def getInfo(self, requestor, service, nodeIdentifier):
599 info = {}
600
601 def saveType(result):
602 info['type'] = result
603 return nodeIdentifier
604
605 def saveMetaData(result):
606 info['meta-data'] = result
607 return info
608
609 def trapNotFound(failure):
610 failure.trap(error.NodeNotFound)
611 return info
612
613 d = defer.succeed(nodeIdentifier)
614 d.addCallback(self.backend.getNodeType)
615 d.addCallback(saveType)
616 d.addCallback(self.backend.getNodeMetaData)
617 d.addCallback(saveMetaData)
618 d.addErrback(trapNotFound)
619 d.addErrback(self._mapErrors)
620 return d
621
622
623 def getNodes(self, requestor, service, nodeIdentifier):
624 if service.resource:
625 return defer.succeed([])
626 d = self.backend.getNodes()
627 return d.addErrback(self._mapErrors)
628
629
630 def getConfigurationOptions(self):
631 return self.backend.nodeOptions
632
633 def _publish_errb(self, failure, request):
634 if failure.type == error.NodeNotFound and self.backend.supportsAutoCreate():
635 d = self.backend.createNode(request.nodeIdentifier,
636 request.sender)
637 d.addCallback(lambda ignore,
638 request: self.backend.publish(request.nodeIdentifier,
639 request.items,
640 request.sender),
641 request)
642 return d
643
644 raise failure
645
646
647 def publish(self, request):
648 d = self.backend.publish(request.nodeIdentifier,
649 request.items,
650 request.sender)
651 d.addErrback(self._publish_errb, request)
652 return d.addErrback(self._mapErrors)
653
654
655 def subscribe(self, request):
656 d = self.backend.subscribe(request.nodeIdentifier,
657 request.subscriber,
658 request.sender)
659 return d.addErrback(self._mapErrors)
660
661
662 def unsubscribe(self, request):
663 d = self.backend.unsubscribe(request.nodeIdentifier,
664 request.subscriber,
665 request.sender)
666 return d.addErrback(self._mapErrors)
667
668
669 def subscriptions(self, request):
670 d = self.backend.getSubscriptions(request.sender)
671 return d.addErrback(self._mapErrors)
672
673
674 def affiliations(self, request):
675 d = self.backend.getAffiliations(request.sender)
676 return d.addErrback(self._mapErrors)
677
678
679 def create(self, request):
680 d = self.backend.createNode(request.nodeIdentifier,
681 request.sender)
682 return d.addErrback(self._mapErrors)
683
684
685 def default(self, request):
686 d = self.backend.getDefaultConfiguration(request.nodeType)
687 return d.addErrback(self._mapErrors)
688
689
690 def configureGet(self, request):
691 d = self.backend.getNodeConfiguration(request.nodeIdentifier)
692 return d.addErrback(self._mapErrors)
693
694
695 def configureSet(self, request):
696 d = self.backend.setNodeConfiguration(request.nodeIdentifier,
697 request.options,
698 request.sender)
699 return d.addErrback(self._mapErrors)
700
701
702 def items(self, request):
703 d = self.backend.getItems(request.nodeIdentifier,
704 request.sender,
705 request.maxItems,
706 request.itemIdentifiers)
707 return d.addErrback(self._mapErrors)
708
709
710 def retract(self, request):
711 d = self.backend.retractItem(request.nodeIdentifier,
712 request.itemIdentifiers,
713 request.sender)
714 return d.addErrback(self._mapErrors)
715
716
717 def purge(self, request):
718 d = self.backend.purgeNode(request.nodeIdentifier,
719 request.sender)
720 return d.addErrback(self._mapErrors)
721
722
723 def delete(self, request):
724 d = self.backend.deleteNode(request.nodeIdentifier,
725 request.sender)
726 return d.addErrback(self._mapErrors)
727
728 components.registerAdapter(PubSubResourceFromBackend,
729 IBackendService,
730 IPubSubResource)