Mercurial > libervia-pubsub
changeset 108:1c18759d2afb
Moved two errors to storage.py.
Moved generic backend implementation to generic_backend.py.
Added two configuration errors.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Fri, 08 Apr 2005 10:16:08 +0000 |
parents | d252d793f0ed |
children | 9fb8f0867d02 |
files | idavoll/backend.py |
diffstat | 1 files changed, 47 insertions(+), 391 deletions(-) [+] |
line wrap: on
line diff
--- a/idavoll/backend.py Fri Apr 08 10:15:02 2005 +0000 +++ b/idavoll/backend.py Fri Apr 08 10:16:08 2005 +0000 @@ -1,10 +1,5 @@ -from twisted.words.protocols.jabber import jid -from twisted.application import service -from twisted.xish import utility -from twisted.internet import defer -from zope.interface import Interface, implements -import sha -import time +from zope.interface import Interface +import storage class Error(Exception): msg = '' @@ -12,9 +7,6 @@ def __str__(self): return self.msg -class NodeNotFound(Error): - msg = 'Node not found' - class NotAuthorized(Error): pass @@ -27,22 +19,61 @@ class NoInstantNodes(Error): pass -class NodeExists(Error): - pass - class NotImplemented(Error): pass class NotSubscribed(Error): pass +class InvalidConfigurationOption(Error): + msg = 'Invalid configuration option' + +class InvalidConfigurationValue(Error): + msg = 'Bad configuration value' + class IBackendService(Interface): """ Interface to a backend service of a pubsub service. """ - def get_supported_affiliations(self): - """ Reports the list of supported affiliation types. + def __init__(self, storage): + """ + @param storage: L{storage} object. + """ + + def supports_publisher_affiliation(self): + """ Reports if the backend supports the publisher affiliation. + + @rtype: C{bool} + """ + + def supports_outcast_affiliation(self): + """ Reports if the backend supports the publisher affiliation. + + @rtype: C{bool} + """ + + def supports_persistent_items(self): + """ Reports if the backend supports persistent items. - @return: a list of supported affiliation types. + @rtype: C{bool} + """ + + def get_node_type(self, node_id): + """ Return type of a node. + + @return: a deferred that returns either 'leaf' or 'collection' + """ + + def get_nodes(self): + """ Returns list of all nodes. + + @return: a deferred that returns a C{list} of node ids. + """ + + def get_node_meta_data(self, node_id): + """ Return meta data for a node. + + @return: a deferred that returns a C{list} of C{dict}s with the + metadata. """ class INodeCreationService(Interface): @@ -166,378 +197,3 @@ @return: a deferred that returns the requested items """ - -class BackendService(service.MultiService, utility.EventDispatcher): - - implements(IBackendService) - - options = {"pubsub#persist_items": - {"type": "boolean", - "label": "Persist items to storage"}, - "pubsub#deliver_payloads": - {"type": "boolean", - "label": "Deliver payloads with event notifications"}, - } - - default_config = {"pubsub#persist_items": True, - "pubsub#deliver_payloads": True, - } - - def __init__(self, storage): - service.MultiService.__init__(self) - utility.EventDispatcher.__init__(self) - self.storage = storage - - def supports_publisher_affiliation(self): - return True - - def supports_outcast_affiliation(self): - return True - - def supports_persistent_items(self): - return True - - def get_node_type(self, node_id): - return self.storage.get_node_type(node_id) - - def get_nodes(self): - return self.storage.get_nodes() - - def get_node_meta_data(self, node_id): - d = self.storage.get_node_configuration(node_id) - - d.addCallback(self._make_meta_data) - return d - - def _make_meta_data(self, meta_data): - options = [] - for key, value in meta_data.iteritems(): - if self.options.has_key(key): - option = {"var": key} - option.update(self.options[key]) - option["value"] = value - options.append(option) - - return options - -class PublishService(service.Service): - - implements(IPublishService) - - def publish(self, node_id, items, requestor): - d1 = self.parent.storage.get_node_configuration(node_id) - d2 = self.parent.storage.get_affiliation(node_id, requestor) - d = defer.DeferredList([d1, d2], fireOnOneErrback=1, consumeErrors=1) - d.addErrback(lambda x: x.value[0]) - d.addCallback(self._do_publish, node_id, items, requestor) - return d - - def _do_publish(self, result, node_id, items, requestor): - configuration = result[0][1] - persist_items = configuration["pubsub#persist_items"] - deliver_payloads = configuration["pubsub#deliver_payloads"] - affiliation = result[1][1] - - if affiliation not in ['owner', 'publisher']: - raise NotAuthorized - - if items and not persist_items and not deliver_payloads: - raise NoPayloadAllowed - elif not items and (persist_items or deliver_payloads): - raise PayloadExpected - - if persist_items or deliver_payloads: - for item in items: - if not item.getAttribute("id"): - item["id"] = sha.new(str(time.time()) + - requestor.full()).hexdigest() - - if persist_items: - d = self.parent.storage.store_items(node_id, items, - requestor) - else: - d = defer.succeed(None) - - d.addCallback(self._do_notify, node_id, items, deliver_payloads) - - def _do_notify(self, result, node_id, items, deliver_payloads): - if items and not deliver_payloads: - for item in items: - item.children = [] - - self.parent.dispatch({ 'items': items, 'node_id': node_id }, - '//event/pubsub/notify') - -class NotificationService(service.Service): - - implements(INotificationService) - - def get_notification_list(self, node_id, items): - d = self.parent.storage.get_subscribers(node_id) - d.addCallback(self._magic_filter, node_id, items) - return d - - def _magic_filter(self, subscribers, node_id, items): - list = {} - for subscriber in subscribers: - list[subscriber] = items - return list - - def register_notifier(self, observerfn, *args, **kwargs): - self.parent.addObserver('//event/pubsub/notify', observerfn, - *args, **kwargs) - -class SubscriptionService(service.Service): - - implements(ISubscriptionService) - - def subscribe(self, node_id, subscriber, requestor): - if subscriber.userhostJID() != requestor: - raise NotAuthorized - - d1 = self.parent.storage.get_node_configuration(node_id) - d2 = self.parent.storage.get_affiliation(node_id, subscriber) - d = defer.DeferredList([d1, d2], fireOnOneErrback=1, consumeErrors=1) - d.addErrback(lambda x: x.value[0]) - d.addCallback(self._do_subscribe, node_id, subscriber) - return d - - def _do_subscribe(self, result, node_id, subscriber): - configuration = result[0][1] - affiliation = result[1][1] - - if affiliation == 'outcast': - raise NotAuthorized - - d = self.parent.storage.add_subscription(node_id, subscriber, - 'subscribed') - d.addCallback(self._return_subscription, affiliation) - return d - - def _return_subscription(self, result, affiliation): - result['affiliation'] = affiliation - return result - - def unsubscribe(self, node_id, subscriber, requestor): - if subscriber.userhostJID() != requestor: - raise NotAuthorized - - d = self.parent.storage.get_node_configuration(node_id) - d.addCallback(self._do_unsubscribe, node_id, subscriber) - return d - - def _do_unsubscribe(self, result, node_id, subscriber): - return self.parent.storage.remove_subscription(node_id, - subscriber) - -class NodeCreationService(service.Service): - - implements(INodeCreationService) - - def supports_instant_nodes(self): - return True - - def create_node(self, node_id, requestor): - if not node_id: - node_id = 'generic/%s' % sha.new(str(time.time()) + - requestor.full()).hexdigest() - - d = self.parent.storage.create_node(node_id, requestor) - d.addCallback(lambda _: node_id) - return d - - def get_node_configuration(self, node_id): - if node_id: - d = self.parent.storage.get_node_configuration(node_id) - else: - # XXX: this is disabled in pubsub.py - d = defer.succeed(self.parent.default_config) - - d.addCallback(self._make_config) - return d - - def _make_config(self, config): - options = [] - for key, value in self.parent.options.iteritems(): - option = {"var": key} - option.update(value) - if config.has_key(key): - option["value"] = config[key] - options.append(option) - - return options - - def set_node_configuration(self, node_id, options, requestor): - for key in options.iterkeys(): - if not self.parent.options.has_key(key): - raise InvalidConfigurationOption - - d = self.parent.storage.get_affiliation(node_id, requestor) - d.addCallback(self._do_set_node_configuration, node_id, options) - return d - - def _do_set_node_configuration(self, affiliation, node_id, options): - if affiliation != 'owner': - raise NotAuthorized - - return self.parent.storage.set_node_configuration(node_id, options) - -class AffiliationsService(service.Service): - - implements(IAffiliationsService) - - def get_affiliations(self, entity): - d1 = self.parent.storage.get_affiliations(entity) - d2 = self.parent.storage.get_subscriptions(entity) - d = defer.DeferredList([d1, d2], fireOnOneErrback=1, consumeErrors=1) - d.addErrback(lambda x: x.value[0]) - d.addCallback(self._affiliations_result, entity) - return d - - def _affiliations_result(self, result, entity): - affiliations = result[0][1] - subscriptions = result[1][1] - - new_affiliations = {} - - for node, affiliation in affiliations: - new_affiliations[(node, entity.full())] = {'node': node, - 'jid': entity, - 'affiliation': affiliation, - 'subscription': None - } - - for node, subscriber, subscription in subscriptions: - key = node, subscriber.full() - if new_affiliations.has_key(key): - new_affiliations[key]['subscription'] = subscription - else: - new_affiliations[key] = {'node': node, - 'jid': subscriber, - 'affiliation': None, - 'subscription': subscription} - - return new_affiliations.values() - -class ItemRetrievalService(service.Service): - - implements(IItemRetrievalService) - - def get_items(self, node_id, requestor, max_items=None, item_ids=[]): - d = self.parent.storage.is_subscribed(node_id, requestor) - d.addCallback(self._do_get_items, node_id, max_items, item_ids) - return d - - def _do_get_items(self, result, node_id, max_items, item_ids): - if not result: - raise NotAuthorized - - if item_ids: - return self.parent.storage.get_items_by_ids(node_id, item_ids) - else: - return self.parent.storage.get_items(node_id, max_items) - -class RetractionService(service.Service): - - implements(IRetractionService) - - def retract_item(self, node_id, item_ids, requestor): - d1 = self.parent.storage.get_node_configuration(node_id) - d2 = self.parent.storage.get_affiliation(node_id, requestor) - d = defer.DeferredList([d1, d2], fireOnOneErrback=1) - d.addErrback(lambda x: x.value[0]) - d.addCallback(self._do_retract, node_id, item_ids) - return d - - def _do_retract(self, result, node_id, item_ids): - configuration = result[0][1] - persist_items = configuration["persist_items"] - affiliation = result[1][1] - - if affiliation not in ['owner', 'publisher']: - raise NotAuthorized - - if not persist_items: - raise NodeNotPersistent - - d = self.parent.storage.remove_items(node_id, item_ids) - d.addCallback(self._do_notify_retraction, node_id) - return d - - def _do_notify_retraction(self, result, node_id): - self.parent.dispatch({ 'item_ids': result, 'node_id': node_id }, - '//event/pubsub/retract') - - def purge_node(self, node_id, requestor): - d1 = self.parent.storage.get_node_configuration(node_id) - d2 = self.parent.storage.get_affiliation(node_id, requestor) - d = defer.DeferredList([d1, d2], fireOnOneErrback=1) - d.addErrback(lambda x: x.value[0]) - d.addCallback(self._do_purge, node_id) - return d - - def _do_purge(self, result, node_id): - configuration = result[0][1] - persist_items = configuration["persist_items"] - affiliation = result[1][1] - - if affiliation != 'owner': - raise NotAuthorized - - if not persist_items: - raise NodeNotPersistent - - d = self.parent.storage.purge_node(node_id) - d.addCallback(self._do_notify_purge, node_id) - return d - - def _do_notify_purge(self, result, node_id): - self.parent.dispatch(node_id, '//event/pubsub/purge') - -class NodeDeletionService(service.Service): - - implements(INodeDeletionService) - - def __init__(self): - self._callback_list = [] - - def register_pre_delete(self, pre_delete_fn): - self._callback_list.append(pre_delete_fn) - - def get_subscribers(self, node_id): - return self.parent.storage.get_subscribers(node_id) - - def delete_node(self, node_id, requestor): - d1 = self.parent.storage.get_node_configuration(node_id) - d2 = self.parent.storage.get_affiliation(node_id, requestor) - d = defer.DeferredList([d1, d2], fireOnOneErrback=1) - d.addErrback(lambda x: x.value[0]) - d.addCallback(self._do_pre_delete, node_id) - return d - - def _do_pre_delete(self, result, node_id): - configuration = result[0][1] - persist_items = configuration["persist_items"] - affiliation = result[1][1] - - if affiliation != 'owner': - raise NotAuthorized - - d = defer.DeferredList([cb(node_id) for cb in self._callback_list], - consumeErrors=1) - d.addCallback(self._do_delete, node_id) - - def _do_delete(self, result, node_id): - dl = [] - for succeeded, r in result: - if succeeded and r: - dl.extend(r) - - d = self.parent.storage.delete_node(node_id) - d.addCallback(self._do_notify_delete, dl) - - return d - - def _do_notify_delete(self, result, dl): - for d in dl: - d.callback(None)