Mercurial > libervia-pubsub
view idavoll/generic_backend.py @ 158:b2149e448465 idavoll-0.5.0
update tags
author | convert-repo |
---|---|
date | Thu, 18 Jun 2009 11:52:01 +0000 |
parents | 5191ba7c4df8 |
children | 6fe78048baf9 |
line wrap: on
line source
# Copyright (c) 2003-2006 Ralph Meijer # See LICENSE for details. import sha import time import uuid from twisted.words.protocols.jabber import jid from twisted.words.xish import utility from twisted.application import service from twisted.internet import defer from zope.interface import implements import backend, storage def _get_affiliation(node, entity): d = node.get_affiliation(entity) d.addCallback(lambda affiliation: (node, affiliation)) return d class BackendService(service.MultiService, utility.EventDispatcher): implements(backend.IBackendService) options = {"pubsub#persist_items": {"type": "boolean", "label": "Persist items to storage"}, "pubsub#deliver_payloads": {"type": "boolean", "label": "Deliver payloads with event notifications"}, } default_config = {"pubsub#persist_items": True, "pubsub#deliver_payloads": True, } def __init__(self, storage): service.MultiService.__init__(self) utility.EventDispatcher.__init__(self) self.storage = storage def supports_publisher_affiliation(self): return True def supports_outcast_affiliation(self): return True def supports_persistent_items(self): return True def get_node_type(self, node_id): d = self.storage.get_node(node_id) d.addCallback(lambda node: node.get_type()) return d def get_nodes(self): return self.storage.get_node_ids() def get_node_meta_data(self, node_id): d = self.storage.get_node(node_id) d.addCallback(lambda node: node.get_meta_data()) d.addCallback(self._make_meta_data) return d def _make_meta_data(self, meta_data): options = [] for key, value in meta_data.iteritems(): if self.options.has_key(key): option = {"var": key} option.update(self.options[key]) option["value"] = value options.append(option) return options class PublishService(service.Service): implements(backend.IPublishService) def _check_auth(self, node, requestor): def check(affiliation, node): if affiliation not in ['owner', 'publisher']: raise backend.NotAuthorized return node d = node.get_affiliation(requestor) d.addCallback(check, node) return d def publish(self, node_id, items, requestor): d = self.parent.storage.get_node(node_id) d.addCallback(self._check_auth, requestor) d.addCallback(self._do_publish, items, requestor) return d def _do_publish(self, node, items, requestor): configuration = node.get_configuration() persist_items = configuration["pubsub#persist_items"] deliver_payloads = configuration["pubsub#deliver_payloads"] if items and not persist_items and not deliver_payloads: raise backend.NoPayloadAllowed elif not items and (persist_items or deliver_payloads): raise backend.PayloadExpected if persist_items or deliver_payloads: for item in items: if not item.getAttribute("id"): item["id"] = uuid.generate() if persist_items: d = node.store_items(items, requestor) else: d = defer.succeed(None) d.addCallback(self._do_notify, node.id, items, deliver_payloads) return d def _do_notify(self, result, node_id, items, deliver_payloads): if items and not deliver_payloads: for item in items: item.children = [] self.parent.dispatch({ 'items': items, 'node_id': node_id }, '//event/pubsub/notify') class NotificationService(service.Service): implements(backend.INotificationService) def get_notification_list(self, node_id, items): d = self.parent.storage.get_node(node_id) d.addCallback(lambda node: node.get_subscribers()) d.addCallback(self._magic_filter, node_id, items) return d def _magic_filter(self, subscribers, node_id, items): list = [] for subscriber in subscribers: list.append((subscriber, items)) return list def register_notifier(self, observerfn, *args, **kwargs): self.parent.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs) class SubscriptionService(service.Service): implements(backend.ISubscriptionService) def subscribe(self, node_id, subscriber, requestor): subscriber_entity = subscriber.userhostJID() if subscriber_entity != requestor: return defer.fail(backend.NotAuthorized()) d = self.parent.storage.get_node(node_id) d.addCallback(_get_affiliation, subscriber_entity) d.addCallback(self._do_subscribe, subscriber) return d def _do_subscribe(self, result, subscriber): node, affiliation = result if affiliation == 'outcast': raise backend.NotAuthorized d = node.add_subscription(subscriber, 'subscribed') d.addCallback(lambda _: 'subscribed') d.addErrback(self._get_subscription, node, subscriber) d.addCallback(self._return_subscription, node.id) return d def _get_subscription(self, failure, node, subscriber): failure.trap(storage.SubscriptionExists) return node.get_subscription(subscriber) def _return_subscription(self, result, node_id): return node_id, result def unsubscribe(self, node_id, subscriber, requestor): if subscriber.userhostJID() != requestor: raise backend.NotAuthorized d = self.parent.storage.get_node(node_id) d.addCallback(lambda node: node.remove_subscription(subscriber)) return d def get_subscriptions(self, entity): return self.parent.storage.get_subscriptions(entity) class NodeCreationService(service.Service): implements(backend.INodeCreationService) def supports_instant_nodes(self): return True def create_node(self, node_id, requestor): if not node_id: node_id = 'generic/%s' % uuid.generate() d = self.parent.storage.create_node(node_id, requestor) d.addCallback(lambda _: node_id) return d def get_node_configuration(self, node_id): if node_id: d = self.parent.storage.get_node(node_id) d.addCallback(lambda node: node.get_configuration()) else: # XXX: this is disabled in pubsub.py d = defer.succeed(self.parent.default_config) d.addCallback(self._make_config) return d def _make_config(self, config): options = [] for key, value in self.parent.options.iteritems(): option = {"var": key} option.update(value) if config.has_key(key): option["value"] = config[key] options.append(option) return options def set_node_configuration(self, node_id, options, requestor): for key, value in options.iteritems(): if not self.parent.options.has_key(key): raise backend.InvalidConfigurationOption if self.parent.options[key]["type"] == 'boolean': try: options[key] = bool(int(value)) except ValueError: raise backend.InvalidConfigurationValue d = self.parent.storage.get_node(node_id) d.addCallback(_get_affiliation, requestor) d.addCallback(self._do_set_node_configuration, options) return d def _do_set_node_configuration(self, result, options): node, affiliation = result if affiliation != 'owner': raise backend.NotAuthorized return node.set_configuration(options) class AffiliationsService(service.Service): implements(backend.IAffiliationsService) def get_affiliations(self, entity): return self.parent.storage.get_affiliations(entity) class ItemRetrievalService(service.Service): implements(backend.IItemRetrievalService) def get_items(self, node_id, requestor, max_items=None, item_ids=[]): d = self.parent.storage.get_node(node_id) d.addCallback(self._is_subscribed, requestor) d.addCallback(self._do_get_items, max_items, item_ids) return d def _is_subscribed(self, node, subscriber): d = node.is_subscribed(subscriber) d.addCallback(lambda subscribed: (node, subscribed)) return d def _do_get_items(self, result, max_items, item_ids): node, subscribed = result if not subscribed: raise backend.NotAuthorized if item_ids: return node.get_items_by_id(item_ids) else: return node.get_items(max_items) class RetractionService(service.Service): implements(backend.IRetractionService) def retract_item(self, node_id, item_ids, requestor): d = self.parent.storage.get_node(node_id) d.addCallback(_get_affiliation, requestor) d.addCallback(self._do_retract, item_ids) return d def _do_retract(self, result, item_ids): node, affiliation = result persist_items = node.get_configuration()["pubsub#persist_items"] if affiliation not in ['owner', 'publisher']: raise backend.NotAuthorized if not persist_items: raise backend.NodeNotPersistent d = node.remove_items(item_ids) d.addCallback(self._do_notify_retraction, node.id) return d def _do_notify_retraction(self, item_ids, node_id): self.parent.dispatch({ 'item_ids': item_ids, 'node_id': node_id }, '//event/pubsub/retract') def purge_node(self, node_id, requestor): d = self.parent.storage.get_node(node_id) d.addCallback(_get_affiliation, requestor) d.addCallback(self._do_purge) return d def _do_purge(self, result): node, affiliation = result persist_items = node.get_configuration()["pubsub#persist_items"] if affiliation != 'owner': raise backend.NotAuthorized if not persist_items: raise backend.NodeNotPersistent d = node.purge() d.addCallback(self._do_notify_purge, node.id) return d def _do_notify_purge(self, result, node_id): self.parent.dispatch(node_id, '//event/pubsub/purge') class NodeDeletionService(service.Service): implements(backend.INodeDeletionService) def __init__(self): self._callback_list = [] def register_pre_delete(self, pre_delete_fn): self._callback_list.append(pre_delete_fn) def get_subscribers(self, node_id): d = self.parent.storage.get_node(node_id) d.addCallback(lambda node: node.get_subscribers()) return d def delete_node(self, node_id, requestor): d = self.parent.storage.get_node(node_id) d.addCallback(_get_affiliation, requestor) d.addCallback(self._do_pre_delete) return d def _do_pre_delete(self, result): node, affiliation = result if affiliation != 'owner': raise backend.NotAuthorized d = defer.DeferredList([cb(node.id) for cb in self._callback_list], consumeErrors=1) d.addCallback(self._do_delete, node.id) def _do_delete(self, result, node_id): dl = [] for succeeded, r in result: if succeeded and r: dl.extend(r) d = self.parent.storage.delete_node(node_id) d.addCallback(self._do_notify_delete, dl) return d def _do_notify_delete(self, result, dl): for d in dl: d.callback(None)