Mercurial > libervia-pubsub
diff idavoll/backend.py @ 167:ef22e4150caa
Move protocol implementations (pubsub, disco, forms) to and depend on wokkel.
Author: ralphm
Fixes: #4
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Wed, 03 Oct 2007 12:41:43 +0000 |
parents | 6fe78048baf9 |
children | e2c2c2baf483 |
line wrap: on
line diff
--- a/idavoll/backend.py Thu Jan 18 14:08:32 2007 +0000 +++ b/idavoll/backend.py Wed Oct 03 12:41:43 2007 +0000 @@ -1,214 +1,524 @@ -# Copyright (c) 2003-2006 Ralph Meijer +# -*- test-case-name: idavoll.test.test_backend -*- +# +# Copyright (c) 2003-2007 Ralph Meijer # See LICENSE for details. -from zope.interface import Interface -import storage +import uuid + +from zope.interface import implements -class Error(Exception): - msg = '' +from twisted.application import service +from twisted.python import components +from twisted.internet import defer +from twisted.words.protocols.jabber.error import StanzaError +from twisted.words.xish import utility - def __str__(self): - return self.msg - -class Forbidden(Error): - pass +from wokkel.iwokkel import IDisco, IPubSubService +from wokkel.pubsub import PubSubService, PubSubError -class ItemForbidden(Error): - pass +from idavoll import error, iidavoll +from idavoll.iidavoll import IBackendService -class ItemRequired(Error): - pass - -class NoInstantNodes(Error): - pass +def _get_affiliation(node, entity): + d = node.get_affiliation(entity) + d.addCallback(lambda affiliation: (node, affiliation)) + return d -class NotSubscribed(Error): - pass +class BackendService(service.Service, utility.EventDispatcher): -class InvalidConfigurationOption(Error): - msg = 'Invalid configuration option' - -class InvalidConfigurationValue(Error): - msg = 'Bad configuration value' + implements(iidavoll.IBackendService) -class NodeNotPersistent(Error): - pass - -class NoRootNode(Error): - pass + options = {"pubsub#persist_items": + {"type": "boolean", + "label": "Persist items to storage"}, + "pubsub#deliver_payloads": + {"type": "boolean", + "label": "Deliver payloads with event notifications"}, + } -class IBackendService(Interface): - """ Interface to a backend service of a pubsub service. """ + default_config = {"pubsub#persist_items": True, + "pubsub#deliver_payloads": True, + } - def __init__(storage): - """ - @param storage: L{storage} object. - """ + def __init__(self, storage): + utility.EventDispatcher.__init__(self) + self.storage = storage + self._callback_list = [] def supports_publisher_affiliation(self): - """ Reports if the backend supports the publisher affiliation. - - @rtype: C{bool} - """ + return True def supports_outcast_affiliation(self): - """ Reports if the backend supports the publisher affiliation. - - @rtype: C{bool} - """ + return True def supports_persistent_items(self): - """ Reports if the backend supports persistent items. - - @rtype: C{bool} - """ + return True - def get_node_type(node_id): - """ Return type of a node. - - @return: a deferred that returns either 'leaf' or 'collection' - """ + def get_node_type(self, node_id): + d = self.storage.get_node(node_id) + d.addCallback(lambda node: node.get_type()) + return d def get_nodes(self): - """ Returns list of all nodes. + return self.storage.get_node_ids() + + def get_node_meta_data(self, node_id): + d = self.storage.get_node(node_id) + d.addCallback(lambda node: node.get_meta_data()) + d.addCallback(self._make_meta_data) + return d - @return: a deferred that returns a C{list} of node ids. - """ + def _make_meta_data(self, meta_data): + options = [] + for key, value in meta_data.iteritems(): + if self.options.has_key(key): + option = {"var": key} + option.update(self.options[key]) + option["value"] = value + options.append(option) + + return options + + def _check_auth(self, node, requestor): + def check(affiliation, node): + if affiliation not in ['owner', 'publisher']: + raise error.Forbidden() + return node - def get_node_meta_data(node_id): - """ Return meta data for a node. + d = node.get_affiliation(requestor) + d.addCallback(check, node) + return d + + def publish(self, node_id, items, requestor): + d = self.storage.get_node(node_id) + d.addCallback(self._check_auth, requestor) + d.addCallback(self._do_publish, items, requestor) + return d + + def _do_publish(self, node, items, requestor): + configuration = node.get_configuration() + persist_items = configuration["pubsub#persist_items"] + deliver_payloads = configuration["pubsub#deliver_payloads"] - @return: a deferred that returns a C{list} of C{dict}s with the - metadata. - """ + if items and not persist_items and not deliver_payloads: + raise error.ItemForbidden() + elif not items and (persist_items or deliver_payloads): + raise error.ItemRequired() + + if persist_items or deliver_payloads: + for item in items: + if not item.getAttribute("id"): + item["id"] = uuid.generate() + + if persist_items: + d = node.store_items(items, requestor) + else: + d = defer.succeed(None) -class INodeCreationService(Interface): - """ A service for creating nodes """ + d.addCallback(self._do_notify, node.id, items, deliver_payloads) + return d + + def _do_notify(self, result, node_id, items, deliver_payloads): + if items and not deliver_payloads: + for item in items: + item.children = [] + + self.dispatch({'items': items, 'node_id': node_id}, + '//event/pubsub/notify') + + def get_notification_list(self, node_id, items): + d = self.storage.get_node(node_id) + d.addCallback(lambda node: node.get_subscribers()) + d.addCallback(self._magic_filter, node_id, items) + return d + + def _magic_filter(self, subscribers, node_id, items): + list = [] + for subscriber in subscribers: + list.append((subscriber, items)) + return list + + def register_notifier(self, observerfn, *args, **kwargs): + self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs) - def create_node(node_id, requestor): - """ Create a node. - - @return: a deferred that fires when the node has been created. - """ + def subscribe(self, node_id, subscriber, requestor): + subscriber_entity = subscriber.userhostJID() + if subscriber_entity != requestor: + return defer.fail(error.Forbidden()) + + d = self.storage.get_node(node_id) + d.addCallback(_get_affiliation, subscriber_entity) + d.addCallback(self._do_subscribe, subscriber) + return d + + def _do_subscribe(self, result, subscriber): + node, affiliation = result -class INodeDeletionService(Interface): - """ A service for deleting nodes. """ + if affiliation == 'outcast': + raise error.Forbidden() + + d = node.add_subscription(subscriber, 'subscribed') + d.addCallback(lambda _: 'subscribed') + d.addErrback(self._get_subscription, node, subscriber) + d.addCallback(self._return_subscription, node.id) + return d + + def _get_subscription(self, failure, node, subscriber): + failure.trap(error.SubscriptionExists) + return node.get_subscription(subscriber) + + def _return_subscription(self, result, node_id): + return node_id, result - def register_pre_delete(pre_delete_fn): - """ Register a callback that is called just before a node deletion. - - The function C{pre_deleted_fn} is added to a list of functions - to be called just before deletion of a node. The callback - C{pre_delete_fn} is called with the C{node_id} that is about to be - deleted and should return a deferred that returns a list of deferreds - that are to be fired after deletion. The backend collects the lists - from all these callbacks before actually deleting the node in question. - After deletion all collected deferreds are fired to do post-processing. + def unsubscribe(self, node_id, subscriber, requestor): + if subscriber.userhostJID() != requestor: + return defer.fail(error.Forbidden()) + + d = self.storage.get_node(node_id) + d.addCallback(lambda node: node.remove_subscription(subscriber)) + return d + + def get_subscriptions(self, entity): + return self.storage.get_subscriptions(entity) + + def supports_instant_nodes(self): + return True + + def create_node(self, node_id, requestor): + if not node_id: + node_id = 'generic/%s' % uuid.generate() + d = self.storage.create_node(node_id, requestor) + d.addCallback(lambda _: node_id) + return d - The idea is that you want to be able to collect data from the - node before deleting it, for example to get a list of subscribers - that have to be notified after the node has been deleted. To do this, - C{pre_delete_fn} fetches the subscriber list and passes this - list to a callback attached to a deferred that it sets up. This - deferred is returned in the list of deferreds. - """ + def get_default_configuration(self): + d = defer.succeed(self.default_config) + d.addCallback(self._make_config) + return d + + def get_node_configuration(self, node_id): + if not node_id: + raise error.NoRootNode() + + d = self.storage.get_node(node_id) + d.addCallback(lambda node: node.get_configuration()) + + d.addCallback(self._make_config) + return d + + def _make_config(self, config): + options = [] + for key, value in self.options.iteritems(): + option = {"var": key} + option.update(value) + if config.has_key(key): + option["value"] = config[key] + options.append(option) + + return options + + def set_node_configuration(self, node_id, options, requestor): + if not node_id: + raise error.NoRootNode() - def get_subscribers(node_id): - """ Get node subscriber list. - - @return: a deferred that fires with the list of subscribers. - """ + for key, value in options.iteritems(): + if not self.options.has_key(key): + raise error.InvalidConfigurationOption() + if self.options[key]["type"] == 'boolean': + try: + options[key] = bool(int(value)) + except ValueError: + raise error.InvalidConfigurationValue() + + d = self.storage.get_node(node_id) + d.addCallback(_get_affiliation, requestor) + d.addCallback(self._do_set_node_configuration, options) + return d - def delete_node(node_id, requestor): - """ Delete a node. - - @return: a deferred that fires when the node has been deleted. - """ + def _do_set_node_configuration(self, result, options): + node, affiliation = result + + if affiliation != 'owner': + raise error.Forbidden() + + return node.set_configuration(options) + + def get_affiliations(self, entity): + return self.storage.get_affiliations(entity) + + def get_items(self, node_id, requestor, max_items=None, item_ids=[]): + d = self.storage.get_node(node_id) + d.addCallback(_get_affiliation, requestor) + d.addCallback(self._do_get_items, max_items, item_ids) + return d -class IPublishService(Interface): - """ A service for publishing items to a node. """ + def _do_get_items(self, result, max_items, item_ids): + node, affiliation = result + + if affiliation == 'outcast': + raise error.Forbidden() + + if item_ids: + return node.get_items_by_id(item_ids) + else: + return node.get_items(max_items) + + def retract_item(self, node_id, item_ids, requestor): + d = self.storage.get_node(node_id) + d.addCallback(_get_affiliation, requestor) + d.addCallback(self._do_retract, item_ids) + return d - def publish(node_id, items, requestor): - """ Publish items to a pubsub node. - - @return: a deferred that fires when the items have been published. - """ -class INotificationService(Interface): - """ A service for notification of published items. """ + def _do_retract(self, result, item_ids): + node, affiliation = result + persist_items = node.get_configuration()["pubsub#persist_items"] + + if affiliation not in ['owner', 'publisher']: + raise error.Forbidden() - def register_notifier(observerfn, *args, **kwargs): - """ Register callback which is called for notification. """ + if not persist_items: + raise error.NodeNotPersistent() + + d = node.remove_items(item_ids) + d.addCallback(self._do_notify_retraction, node.id) + return d + + def _do_notify_retraction(self, item_ids, node_id): + self.dispatch({ 'item_ids': item_ids, 'node_id': node_id }, + '//event/pubsub/retract') - def get_notification_list(node_id, items): - pass + def purge_node(self, node_id, requestor): + d = self.storage.get_node(node_id) + d.addCallback(_get_affiliation, requestor) + d.addCallback(self._do_purge) + return d + + def _do_purge(self, result): + node, affiliation = result + persist_items = node.get_configuration()["pubsub#persist_items"] -class ISubscriptionService(Interface): - """ A service for managing subscriptions. """ + if affiliation != 'owner': + raise error.Forbidden() + + if not persist_items: + raise error.NodeNotPersistent() + + d = node.purge() + d.addCallback(self._do_notify_purge, node.id) + return d + + def _do_notify_purge(self, result, node_id): + self.dispatch(node_id, '//event/pubsub/purge') - def subscribe(node_id, subscriber, requestor): - """ Request the subscription of an entity to a pubsub node. + def register_pre_delete(self, pre_delete_fn): + self._callback_list.append(pre_delete_fn) + + def get_subscribers(self, node_id): + d = self.storage.get_node(node_id) + d.addCallback(lambda node: node.get_subscribers()) + return d - Depending on the node's configuration and possible business rules, the - C{subscriber} is added to the list of subscriptions of the node with id - C{node_id}. The C{subscriber} might be different from the C{requestor}, - and if the C{requestor} is not allowed to subscribe this entity an - exception should be raised. + def delete_node(self, node_id, requestor): + d = self.storage.get_node(node_id) + d.addCallback(_get_affiliation, requestor) + d.addCallback(self._do_pre_delete) + return d + + def _do_pre_delete(self, result): + node, affiliation = result + + if affiliation != 'owner': + raise error.Forbidden() + + d = defer.DeferredList([cb(node.id) for cb in self._callback_list], + consumeErrors=1) + d.addCallback(self._do_delete, node.id) - @return: a deferred that returns the subscription state - """ + def _do_delete(self, result, node_id): + dl = [] + for succeeded, r in result: + if succeeded and r: + dl.extend(r) + + d = self.storage.delete_node(node_id) + d.addCallback(self._do_notify_delete, dl) + + return d - def unsubscribe(node_id, subscriber, requestor): - """ Cancel the subscription of an entity to a pubsub node. + def _do_notify_delete(self, result, dl): + for d in dl: + d.callback(None) + + +class PubSubServiceFromBackend(PubSubService): + """ + Adapts a backend to an xmpp publish-subscribe service. + """ + + implements(IDisco) - The subscription of C{subscriber} is removed from the list of - subscriptions of the node with id C{node_id}. If the C{requestor} - is not allowed to unsubscribe C{subscriber}, an an exception should - be raised. + _errorMap = { + error.NodeNotFound: ('item-not-found', None, None), + error.NodeExists: ('conflict', None, None), + error.SubscriptionNotFound: ('not-authorized', + 'not-subscribed', + None), + error.Forbidden: ('forbidden', None, None), + error.ItemForbidden: ('bad-request', 'item-forbidden', None), + error.ItemRequired: ('bad-request', 'item-required', None), + error.NoInstantNodes: ('not-acceptable', + 'unsupported', + 'instant-nodes'), + error.NotSubscribed: ('not-authorized', 'not-subscribed', None), + error.InvalidConfigurationOption: ('not-acceptable', None, None), + error.InvalidConfigurationValue: ('not-acceptable', None, None), + error.NodeNotPersistent: ('feature-not-implemented', + 'unsupported', + 'persistent-node'), + error.NoRootNode: ('bad-request', None, None), + } - @return: a deferred that fires when unsubscription is complete. - """ + def __init__(self, backend): + PubSubService.__init__(self) - def get_subscriptions(entity): - """ Report the list of current subscriptions with this pubsub service. + self.backend = backend + self.hideNodes = False - Report the list of the current subscriptions with all nodes within this - pubsub service, for the C{entity}. + self.pubSubFeatures = self._getPubSubFeatures() + + self.backend.register_notifier(self._notify) - @return: a deferred that returns the list of all current subscriptions - as tuples C{(node_id, subscriber, subscription)}. - """ + def _getPubSubFeatures(self): + features = [ + "config-node", + "create-nodes", + "delete-any", + "delete-nodes", + "item-ids", + "meta-data", + "publish", + "purge-nodes", + "retract-items", + "retrieve-affiliations", + "retrieve-default", + "retrieve-items", + "retrieve-subscriptions", + "subscribe", + ] -class IAffiliationsService(Interface): - """ A service for retrieving the affiliations with this pubsub service. """ + if self.backend.supports_instant_nodes(): + features.append("instant-nodes") + + if self.backend.supports_outcast_affiliation(): + features.append("outcast-affiliation") + + if self.backend.supports_persistent_items(): + features.append("persistent-items") + + if self.backend.supports_publisher_affiliation(): + features.append("publisher-affiliation") + + return features - def get_affiliations(entity): - """ Report the list of current affiliations with this pubsub service. + def _notify(self, data): + items = data['items'] + nodeIdentifier = data['node_id'] + d = self.backend.get_notification_list(nodeIdentifier, items) + d.addCallback(lambda notifications: self.notifyPublish(self.serviceJID, + nodeIdentifier, + notifications)) + + def _mapErrors(self, failure): + e = failure.trap(*self._errorMap.keys()) + + condition, pubsubCondition, feature = self._errorMap[e] + msg = failure.value.msg - Report the list of the current affiliations with all nodes within this - pubsub service, for the C{entity}. + if pubsubCondition: + exc = PubSubError(condition, pubsubCondition, feature, msg) + else: + exc = StanzaError(condition, text=msg) + + raise exc - @return: a deferred that returns the list of all current affiliations - as tuples C{(node_id, affiliation)}. - """ + def getNodeInfo(self, requestor, service, nodeIdentifier): + info = {} + + def saveType(result): + info['type'] = result + return nodeIdentifier + + def saveMetaData(result): + info['meta-data'] = result + return info -class IRetractionService(Interface): - """ A service for retracting published items """ + d = defer.succeed(nodeIdentifier) + d.addCallback(self.backend.get_node_type) + d.addCallback(saveType) + d.addCallback(self.backend.get_node_meta_data) + d.addCallback(saveMetaData) + d.errback(self._mapErrors) + return d + + def getNodes(self, requestor, service): + d = self.backend.get_nodes() + return d.addErrback(self._mapErrors) + + def publish(self, requestor, service, nodeIdentifier, items): + d = self.backend.publish(nodeIdentifier, items, requestor) + return d.addErrback(self._mapErrors) - def retract_item(node_id, item_id, requestor): - """ Removes item in node from persistent storage """ + def subscribe(self, requestor, service, nodeIdentifier, subscriber): + d = self.backend.subscribe(nodeIdentifier, subscriber, requestor) + return d.addErrback(self._mapErrors) + + def unsubscribe(self, requestor, service, nodeIdentifier, subscriber): + d = self.backend.unsubscribe(nodeIdentifier, subscriber, requestor) + return d.addErrback(self._mapErrors) - def purge_node(node_id, requestor): - """ Removes all items in node from persistent storage """ + def subscriptions(self, requestor, service): + d = self.backend.get_subscriptions(requestor) + return d.addErrback(self._mapErrors) + + def affiliations(self, requestor, service): + d = self.backend.get_affiliations(requestor) + return d.addErrback(self._mapErrors) -class IItemRetrievalService(Interface): - """ A service for retrieving previously published items. """ + def create(self, requestor, service, nodeIdentifier): + d = self.backend.create_node(nodeIdentifier, requestor) + return d.addErrback(self._mapErrors) + + def getDefaultConfiguration(self, requestor, service): + d = self.backend.get_default_configuration() + return d.addErrback(self._mapErrors) - def get_items(node_id, requestor, max_items=None, item_ids=[]): - """ Retrieve items from persistent storage + def getConfiguration(self, requestor, service, nodeIdentifier): + d = self.backend.get_node_configuration(nodeIdentifier) + return d.addErrback(self._mapErrors) + + def setConfiguration(self, requestor, service, nodeIdentifier, options): + d = self.backend.set_node_configuration(nodeIdentifier, options, + requestor) + return d.addErrback(self._mapErrors) - If C{max_items} is given, return the C{max_items} last published - items, else if C{item_ids} is not empty, return the items requested. - If neither is given, return all items. + def items(self, requestor, service, nodeIdentifier, maxItems, itemIdentifiers): + d = self.backend.get_items(nodeIdentifier, requestor, maxItems, + itemIdentifiers) + return d.addErrback(self._mapErrors) + + def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): + d = self.backend.retract_item(nodeIdentifier, itemIdentifiers, + requestor) + return d.addErrback(self._mapErrors) - @return: a deferred that returns the requested items - """ + def purge(self, requestor, service, nodeIdentifier): + d = self.backend.purge_node(nodeIdentifier, requestor) + return d.addErrback(self._mapErrors) + + def delete(self, requestor, service, nodeIdentifier): + d = self.backend.delete_node(nodeIdentifier, requestor) + return d.addErrback(self._mapErrors) + +components.registerAdapter(PubSubServiceFromBackend, + IBackendService, + IPubSubService)