diff idavoll/backend.py @ 167:ef22e4150caa

Move protocol implementations (pubsub, disco, forms) to and depend on wokkel. Author: ralphm Fixes: #4
author Ralph Meijer <ralphm@ik.nu>
date Wed, 03 Oct 2007 12:41:43 +0000
parents 6fe78048baf9
children e2c2c2baf483
line wrap: on
line diff
--- a/idavoll/backend.py	Thu Jan 18 14:08:32 2007 +0000
+++ b/idavoll/backend.py	Wed Oct 03 12:41:43 2007 +0000
@@ -1,214 +1,524 @@
-# Copyright (c) 2003-2006 Ralph Meijer
+# -*- test-case-name: idavoll.test.test_backend -*-
+#
+# Copyright (c) 2003-2007 Ralph Meijer
 # See LICENSE for details.
 
-from zope.interface import Interface
-import storage
+import uuid
+
+from zope.interface import implements
 
-class Error(Exception):
-    msg = ''
+from twisted.application import service
+from twisted.python import components
+from twisted.internet import defer
+from twisted.words.protocols.jabber.error import StanzaError
+from twisted.words.xish import utility
 
-    def __str__(self):
-        return self.msg
-    
-class Forbidden(Error):
-    pass
+from wokkel.iwokkel import IDisco, IPubSubService
+from wokkel.pubsub import PubSubService, PubSubError
 
-class ItemForbidden(Error):
-    pass
+from idavoll import error, iidavoll
+from idavoll.iidavoll import IBackendService
 
-class ItemRequired(Error):
-    pass
-
-class NoInstantNodes(Error):
-    pass
+def _get_affiliation(node, entity):
+    d = node.get_affiliation(entity)
+    d.addCallback(lambda affiliation: (node, affiliation))
+    return d
 
-class NotSubscribed(Error):
-    pass
+class BackendService(service.Service, utility.EventDispatcher):
 
-class InvalidConfigurationOption(Error):
-    msg = 'Invalid configuration option'
-
-class InvalidConfigurationValue(Error):
-    msg = 'Bad configuration value'
+    implements(iidavoll.IBackendService)
 
-class NodeNotPersistent(Error):
-    pass
-
-class NoRootNode(Error):
-    pass
+    options = {"pubsub#persist_items":
+                  {"type": "boolean",
+                   "label": "Persist items to storage"},
+               "pubsub#deliver_payloads":
+                  {"type": "boolean",
+                   "label": "Deliver payloads with event notifications"},
+              }
 
-class IBackendService(Interface):
-    """ Interface to a backend service of a pubsub service. """
+    default_config = {"pubsub#persist_items": True,
+                      "pubsub#deliver_payloads": True,
+                     }
 
-    def __init__(storage):
-        """
-        @param storage: L{storage} object.
-        """
+    def __init__(self, storage):
+        utility.EventDispatcher.__init__(self)
+        self.storage = storage
+        self._callback_list = []
 
     def supports_publisher_affiliation(self):
-        """ Reports if the backend supports the publisher affiliation.
-    
-        @rtype: C{bool}
-        """
+        return True
 
     def supports_outcast_affiliation(self):
-        """ Reports if the backend supports the publisher affiliation.
-    
-        @rtype: C{bool}
-        """
+        return True
 
     def supports_persistent_items(self):
-        """ Reports if the backend supports persistent items.
-    
-        @rtype: C{bool}
-        """
+        return True
 
-    def get_node_type(node_id):
-        """ Return type of a node.
-
-        @return: a deferred that returns either 'leaf' or 'collection'
-        """
+    def get_node_type(self, node_id):
+        d = self.storage.get_node(node_id)
+        d.addCallback(lambda node: node.get_type())
+        return d
 
     def get_nodes(self):
-        """ Returns list of all nodes.
+        return self.storage.get_node_ids()
+
+    def get_node_meta_data(self, node_id):
+        d = self.storage.get_node(node_id)
+        d.addCallback(lambda node: node.get_meta_data())
+        d.addCallback(self._make_meta_data)
+        return d
 
-        @return: a deferred that returns a C{list} of node ids.
-        """
+    def _make_meta_data(self, meta_data):
+        options = []
+        for key, value in meta_data.iteritems():
+            if self.options.has_key(key):
+                option = {"var": key}
+                option.update(self.options[key])
+                option["value"] = value
+                options.append(option)
+
+        return options
+
+    def _check_auth(self, node, requestor):
+        def check(affiliation, node):
+            if affiliation not in ['owner', 'publisher']:
+                raise error.Forbidden()
+            return node
 
-    def get_node_meta_data(node_id):
-        """ Return meta data for a node.
+        d = node.get_affiliation(requestor)
+        d.addCallback(check, node)
+        return d
+
+    def publish(self, node_id, items, requestor):
+        d = self.storage.get_node(node_id)
+        d.addCallback(self._check_auth, requestor)
+        d.addCallback(self._do_publish, items, requestor)
+        return d
+
+    def _do_publish(self, node, items, requestor):
+        configuration = node.get_configuration()
+        persist_items = configuration["pubsub#persist_items"]
+        deliver_payloads = configuration["pubsub#deliver_payloads"]
 
-        @return: a deferred that returns a C{list} of C{dict}s with the
-                 metadata.
-        """
+        if items and not persist_items and not deliver_payloads:
+            raise error.ItemForbidden()
+        elif not items and (persist_items or deliver_payloads):
+            raise error.ItemRequired()
+
+        if persist_items or deliver_payloads:
+            for item in items:
+                if not item.getAttribute("id"):
+                    item["id"] = uuid.generate()
+
+        if persist_items:
+            d = node.store_items(items, requestor)
+        else:
+            d = defer.succeed(None)
 
-class INodeCreationService(Interface):
-    """ A service for creating nodes """
+        d.addCallback(self._do_notify, node.id, items, deliver_payloads)
+        return d
+
+    def _do_notify(self, result, node_id, items, deliver_payloads):
+        if items and not deliver_payloads:
+            for item in items:
+                item.children = []
+
+        self.dispatch({'items': items, 'node_id': node_id},
+                      '//event/pubsub/notify')
+
+    def get_notification_list(self, node_id, items):
+        d = self.storage.get_node(node_id)
+        d.addCallback(lambda node: node.get_subscribers())
+        d.addCallback(self._magic_filter, node_id, items)
+        return d
+
+    def _magic_filter(self, subscribers, node_id, items):
+        list = []
+        for subscriber in subscribers:
+            list.append((subscriber, items))
+        return list
+
+    def register_notifier(self, observerfn, *args, **kwargs):
+        self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs)
 
-    def create_node(node_id, requestor):
-        """ Create a node.
-        
-        @return: a deferred that fires when the node has been created.
-        """
+    def subscribe(self, node_id, subscriber, requestor):
+        subscriber_entity = subscriber.userhostJID()
+        if subscriber_entity != requestor:
+            return defer.fail(error.Forbidden())
+
+        d = self.storage.get_node(node_id)
+        d.addCallback(_get_affiliation, subscriber_entity)
+        d.addCallback(self._do_subscribe, subscriber)
+        return d
+
+    def _do_subscribe(self, result, subscriber):
+        node, affiliation = result
 
-class INodeDeletionService(Interface):
-    """ A service for deleting nodes. """
+        if affiliation == 'outcast':
+            raise error.Forbidden()
+
+        d = node.add_subscription(subscriber, 'subscribed')
+        d.addCallback(lambda _: 'subscribed')
+        d.addErrback(self._get_subscription, node, subscriber)
+        d.addCallback(self._return_subscription, node.id)
+        return d
+
+    def _get_subscription(self, failure, node, subscriber):
+        failure.trap(error.SubscriptionExists)
+        return node.get_subscription(subscriber)
+
+    def _return_subscription(self, result, node_id):
+        return node_id, result
 
-    def register_pre_delete(pre_delete_fn):
-        """ Register a callback that is called just before a node deletion.
-        
-        The function C{pre_deleted_fn} is added to a list of functions
-        to be called just before deletion of a node. The callback
-        C{pre_delete_fn} is called with the C{node_id} that is about to be
-        deleted and should return a deferred that returns a list of deferreds
-        that are to be fired after deletion. The backend collects the lists
-        from all these callbacks before actually deleting the node in question.
-        After deletion all collected deferreds are fired to do post-processing.
+    def unsubscribe(self, node_id, subscriber, requestor):
+        if subscriber.userhostJID() != requestor:
+            return defer.fail(error.Forbidden())
+
+        d = self.storage.get_node(node_id)
+        d.addCallback(lambda node: node.remove_subscription(subscriber))
+        return d
+
+    def get_subscriptions(self, entity):
+        return self.storage.get_subscriptions(entity)
+
+    def supports_instant_nodes(self):
+        return True
+
+    def create_node(self, node_id, requestor):
+        if not node_id:
+            node_id = 'generic/%s' % uuid.generate()
+        d = self.storage.create_node(node_id, requestor)
+        d.addCallback(lambda _: node_id)
+        return d
 
-        The idea is that you want to be able to collect data from the
-        node before deleting it, for example to get a list of subscribers
-        that have to be notified after the node has been deleted. To do this,
-        C{pre_delete_fn} fetches the subscriber list and passes this
-        list to a callback attached to a deferred that it sets up. This
-        deferred is returned in the list of deferreds.
-        """
+    def get_default_configuration(self):
+        d = defer.succeed(self.default_config)
+        d.addCallback(self._make_config)
+        return d
+
+    def get_node_configuration(self, node_id):
+        if not node_id:
+            raise error.NoRootNode()
+
+        d = self.storage.get_node(node_id)
+        d.addCallback(lambda node: node.get_configuration())
+
+        d.addCallback(self._make_config)
+        return d
+
+    def _make_config(self, config):
+        options = []
+        for key, value in self.options.iteritems():
+            option = {"var": key}
+            option.update(value)
+            if config.has_key(key):
+                option["value"] = config[key]
+            options.append(option)
+
+        return options
+
+    def set_node_configuration(self, node_id, options, requestor):
+        if not node_id:
+            raise error.NoRootNode()
 
-    def get_subscribers(node_id):
-        """ Get node subscriber list.
-        
-        @return: a deferred that fires with the list of subscribers.
-        """
+        for key, value in options.iteritems():
+            if not self.options.has_key(key):
+                raise error.InvalidConfigurationOption()
+            if self.options[key]["type"] == 'boolean':
+                try:
+                    options[key] = bool(int(value))
+                except ValueError:
+                    raise error.InvalidConfigurationValue()
+
+        d = self.storage.get_node(node_id)
+        d.addCallback(_get_affiliation, requestor)
+        d.addCallback(self._do_set_node_configuration, options)
+        return d
 
-    def delete_node(node_id, requestor):
-        """ Delete a node.
-        
-        @return: a deferred that fires when the node has been deleted.
-        """
+    def _do_set_node_configuration(self, result, options):
+        node, affiliation = result
+
+        if affiliation != 'owner':
+            raise error.Forbidden()
+
+        return node.set_configuration(options)
+
+    def get_affiliations(self, entity):
+        return self.storage.get_affiliations(entity)
+
+    def get_items(self, node_id, requestor, max_items=None, item_ids=[]):
+        d = self.storage.get_node(node_id)
+        d.addCallback(_get_affiliation, requestor)
+        d.addCallback(self._do_get_items, max_items, item_ids)
+        return d
 
-class IPublishService(Interface):
-    """ A service for publishing items to a node. """
+    def _do_get_items(self, result, max_items, item_ids):
+        node, affiliation = result
+
+        if affiliation == 'outcast':
+            raise error.Forbidden()
+
+        if item_ids:
+            return node.get_items_by_id(item_ids)
+        else:
+            return node.get_items(max_items)
+
+    def retract_item(self, node_id, item_ids, requestor):
+        d = self.storage.get_node(node_id)
+        d.addCallback(_get_affiliation, requestor)
+        d.addCallback(self._do_retract, item_ids)
+        return d
 
-    def publish(node_id, items, requestor):
-        """ Publish items to a pubsub node.
-        
-        @return: a deferred that fires when the items have been published.
-        """
-class INotificationService(Interface):
-    """ A service for notification of published items. """
+    def _do_retract(self, result, item_ids):
+        node, affiliation = result
+        persist_items = node.get_configuration()["pubsub#persist_items"]
+
+        if affiliation not in ['owner', 'publisher']:
+            raise error.Forbidden()
 
-    def register_notifier(observerfn, *args, **kwargs):
-        """ Register callback which is called for notification. """
+        if not persist_items:
+            raise error.NodeNotPersistent()
+
+        d = node.remove_items(item_ids)
+        d.addCallback(self._do_notify_retraction, node.id)
+        return d
+
+    def _do_notify_retraction(self, item_ids, node_id):
+        self.dispatch({ 'item_ids': item_ids, 'node_id': node_id },
+                             '//event/pubsub/retract')
 
-    def get_notification_list(node_id, items):
-        pass
+    def purge_node(self, node_id, requestor):
+        d = self.storage.get_node(node_id)
+        d.addCallback(_get_affiliation, requestor)
+        d.addCallback(self._do_purge)
+        return d
+
+    def _do_purge(self, result):
+        node, affiliation = result
+        persist_items = node.get_configuration()["pubsub#persist_items"]
 
-class ISubscriptionService(Interface):
-    """ A service for managing subscriptions. """
+        if affiliation != 'owner':
+            raise error.Forbidden()
+
+        if not persist_items:
+            raise error.NodeNotPersistent()
+
+        d = node.purge()
+        d.addCallback(self._do_notify_purge, node.id)
+        return d
+
+    def _do_notify_purge(self, result, node_id):
+        self.dispatch(node_id, '//event/pubsub/purge')
 
-    def subscribe(node_id, subscriber, requestor):
-        """ Request the subscription of an entity to a pubsub node.
+    def register_pre_delete(self, pre_delete_fn):
+        self._callback_list.append(pre_delete_fn)
+
+    def get_subscribers(self, node_id):
+        d = self.storage.get_node(node_id)
+        d.addCallback(lambda node: node.get_subscribers())
+        return d
 
-        Depending on the node's configuration and possible business rules, the
-        C{subscriber} is added to the list of subscriptions of the node with id
-        C{node_id}. The C{subscriber} might be different from the C{requestor},
-        and if the C{requestor} is not allowed to subscribe this entity an
-        exception should be raised.
+    def delete_node(self, node_id, requestor):
+        d = self.storage.get_node(node_id)
+        d.addCallback(_get_affiliation, requestor)
+        d.addCallback(self._do_pre_delete)
+        return d
+
+    def _do_pre_delete(self, result):
+        node, affiliation = result
+
+        if affiliation != 'owner':
+            raise error.Forbidden()
+
+        d = defer.DeferredList([cb(node.id) for cb in self._callback_list],
+                               consumeErrors=1)
+        d.addCallback(self._do_delete, node.id)
 
-        @return: a deferred that returns the subscription state
-        """
+    def _do_delete(self, result, node_id):
+        dl = []
+        for succeeded, r in result:
+            if succeeded and r:
+                dl.extend(r)
+
+        d = self.storage.delete_node(node_id)
+        d.addCallback(self._do_notify_delete, dl)
+
+        return d
 
-    def unsubscribe(node_id, subscriber, requestor):
-        """ Cancel the subscription of an entity to a pubsub node.
+    def _do_notify_delete(self, result, dl):
+        for d in dl:
+            d.callback(None)
+
+
+class PubSubServiceFromBackend(PubSubService):
+    """
+    Adapts a backend to an xmpp publish-subscribe service.
+    """
+
+    implements(IDisco)
 
-        The subscription of C{subscriber} is removed from the list of
-        subscriptions of the node with id C{node_id}. If the C{requestor}
-        is not allowed to unsubscribe C{subscriber}, an an exception should
-        be raised.
+    _errorMap = {
+        error.NodeNotFound: ('item-not-found', None, None),
+        error.NodeExists: ('conflict', None, None),
+        error.SubscriptionNotFound: ('not-authorized',
+                                     'not-subscribed',
+                                     None),
+        error.Forbidden: ('forbidden', None, None),
+        error.ItemForbidden: ('bad-request', 'item-forbidden', None),
+        error.ItemRequired: ('bad-request', 'item-required', None),
+        error.NoInstantNodes: ('not-acceptable',
+                               'unsupported',
+                               'instant-nodes'),
+        error.NotSubscribed: ('not-authorized', 'not-subscribed', None),
+        error.InvalidConfigurationOption: ('not-acceptable', None, None),
+        error.InvalidConfigurationValue: ('not-acceptable', None, None),
+        error.NodeNotPersistent: ('feature-not-implemented',
+                                  'unsupported',
+                                  'persistent-node'),
+        error.NoRootNode: ('bad-request', None, None),
+    }
 
-        @return: a deferred that fires when unsubscription is complete.
-        """
+    def __init__(self, backend):
+        PubSubService.__init__(self)
 
-    def get_subscriptions(entity):
-        """ Report the list of current subscriptions with this pubsub service.
+        self.backend = backend
+        self.hideNodes = False
 
-        Report the list of the current subscriptions with all nodes within this
-        pubsub service, for the C{entity}.
+        self.pubSubFeatures = self._getPubSubFeatures()
+
+        self.backend.register_notifier(self._notify)
 
-        @return: a deferred that returns the list of all current subscriptions
-                 as tuples C{(node_id, subscriber, subscription)}.
-        """
+    def _getPubSubFeatures(self):
+        features = [
+            "config-node",
+            "create-nodes",
+            "delete-any",
+            "delete-nodes",
+            "item-ids",
+            "meta-data",
+            "publish",
+            "purge-nodes",
+            "retract-items",
+            "retrieve-affiliations",
+            "retrieve-default",
+            "retrieve-items",
+            "retrieve-subscriptions",
+            "subscribe",
+        ]
 
-class IAffiliationsService(Interface):
-    """ A service for retrieving the affiliations with this pubsub service. """
+        if self.backend.supports_instant_nodes():
+            features.append("instant-nodes")
+
+        if self.backend.supports_outcast_affiliation():
+            features.append("outcast-affiliation")
+
+        if self.backend.supports_persistent_items():
+            features.append("persistent-items")
+
+        if self.backend.supports_publisher_affiliation():
+            features.append("publisher-affiliation")
+
+        return features
 
-    def get_affiliations(entity):
-        """ Report the list of current affiliations with this pubsub service.
+    def _notify(self, data):
+        items = data['items']
+        nodeIdentifier = data['node_id']
+        d = self.backend.get_notification_list(nodeIdentifier, items)
+        d.addCallback(lambda notifications: self.notifyPublish(self.serviceJID,
+                                                               nodeIdentifier,
+                                                               notifications))
+
+    def _mapErrors(self, failure):
+        e = failure.trap(*self._errorMap.keys())
+
+        condition, pubsubCondition, feature = self._errorMap[e]
+        msg = failure.value.msg
 
-        Report the list of the current affiliations with all nodes within this
-        pubsub service, for the C{entity}.
+        if pubsubCondition:
+            exc = PubSubError(condition, pubsubCondition, feature, msg)
+        else:
+            exc = StanzaError(condition, text=msg)
+
+        raise exc
 
-        @return: a deferred that returns the list of all current affiliations
-                 as tuples C{(node_id, affiliation)}.
-        """
+    def getNodeInfo(self, requestor, service, nodeIdentifier):
+        info = {}
+
+        def saveType(result):
+            info['type'] = result
+            return nodeIdentifier
+
+        def saveMetaData(result):
+            info['meta-data'] = result
+            return info
 
-class IRetractionService(Interface):
-    """ A service for retracting published items """
+        d = defer.succeed(nodeIdentifier)
+        d.addCallback(self.backend.get_node_type)
+        d.addCallback(saveType)
+        d.addCallback(self.backend.get_node_meta_data)
+        d.addCallback(saveMetaData)
+        d.errback(self._mapErrors)
+        return d
+
+    def getNodes(self, requestor, service):
+        d = self.backend.get_nodes()
+        return d.addErrback(self._mapErrors)
+
+    def publish(self, requestor, service, nodeIdentifier, items):
+        d = self.backend.publish(nodeIdentifier, items, requestor)
+        return d.addErrback(self._mapErrors)
 
-    def retract_item(node_id, item_id, requestor):
-        """ Removes item in node from persistent storage """
+    def subscribe(self, requestor, service, nodeIdentifier, subscriber):
+        d = self.backend.subscribe(nodeIdentifier, subscriber, requestor)
+        return d.addErrback(self._mapErrors)
+
+    def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
+        d = self.backend.unsubscribe(nodeIdentifier, subscriber, requestor)
+        return d.addErrback(self._mapErrors)
 
-    def purge_node(node_id, requestor):
-        """ Removes all items in node from persistent storage """
+    def subscriptions(self, requestor, service):
+        d = self.backend.get_subscriptions(requestor)
+        return d.addErrback(self._mapErrors)
+
+    def affiliations(self, requestor, service):
+        d = self.backend.get_affiliations(requestor)
+        return d.addErrback(self._mapErrors)
 
-class IItemRetrievalService(Interface):
-    """ A service for retrieving previously published items. """
+    def create(self, requestor, service, nodeIdentifier):
+        d = self.backend.create_node(nodeIdentifier, requestor)
+        return d.addErrback(self._mapErrors)
+
+    def getDefaultConfiguration(self, requestor, service):
+        d = self.backend.get_default_configuration()
+        return d.addErrback(self._mapErrors)
 
-    def get_items(node_id, requestor, max_items=None, item_ids=[]):
-        """ Retrieve items from persistent storage
+    def getConfiguration(self, requestor, service, nodeIdentifier):
+        d = self.backend.get_node_configuration(nodeIdentifier)
+        return d.addErrback(self._mapErrors)
+
+    def setConfiguration(self, requestor, service, nodeIdentifier, options):
+        d = self.backend.set_node_configuration(nodeIdentifier, options,
+                                                requestor)
+        return d.addErrback(self._mapErrors)
 
-        If C{max_items} is given, return the C{max_items} last published
-        items, else if C{item_ids} is not empty, return the items requested.
-        If neither is given, return all items.
+    def items(self, requestor, service, nodeIdentifier, maxItems, itemIdentifiers):
+        d = self.backend.get_items(nodeIdentifier, requestor, maxItems,
+                                   itemIdentifiers)
+        return d.addErrback(self._mapErrors)
+
+    def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
+        d = self.backend.retract_item(nodeIdentifier, itemIdentifiers,
+                                      requestor)
+        return d.addErrback(self._mapErrors)
 
-        @return: a deferred that returns the requested items
-        """
+    def purge(self, requestor, service, nodeIdentifier):
+        d = self.backend.purge_node(nodeIdentifier, requestor)
+        return d.addErrback(self._mapErrors)
+
+    def delete(self, requestor, service, nodeIdentifier):
+        d = self.backend.delete_node(nodeIdentifier, requestor)
+        return d.addErrback(self._mapErrors)
+
+components.registerAdapter(PubSubServiceFromBackend,
+                           IBackendService,
+                           IPubSubService)