diff sat_pubsub/backend.py @ 232:923281d4c5bc

renamed idavoll directory to sat_pubsub
author Goffi <goffi@goffi.org>
date Thu, 17 May 2012 12:48:14 +0200
parents idavoll/backend.py@77029ecf9817
children 564ae55219e1
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat_pubsub/backend.py	Thu May 17 12:48:14 2012 +0200
@@ -0,0 +1,730 @@
+# -*- test-case-name: idavoll.test.test_backend -*-
+#
+# Copyright (c) 2003-2010 Ralph Meijer
+# See LICENSE for details.
+
+"""
+Generic publish-subscribe backend.
+
+This module implements a generic publish-subscribe backend service with
+business logic as per
+U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>} that interacts with
+a given storage facility. It also provides an adapter from the XMPP
+publish-subscribe protocol.
+"""
+
+import uuid
+
+from zope.interface import implements
+
+from twisted.application import service
+from twisted.python import components, log
+from twisted.internet import defer, reactor
+from twisted.words.protocols.jabber.error import StanzaError
+from twisted.words.xish import utility
+
+from wokkel import disco
+from wokkel.iwokkel import IPubSubResource
+from wokkel.pubsub import PubSubResource, PubSubError
+
+from idavoll import error, iidavoll
+from idavoll.iidavoll import IBackendService, ILeafNode
+
+def _getAffiliation(node, entity):
+    d = node.getAffiliation(entity)
+    d.addCallback(lambda affiliation: (node, affiliation))
+    return d
+
+
+
+class BackendService(service.Service, utility.EventDispatcher):
+    """
+    Generic publish-subscribe backend service.
+
+    @cvar nodeOptions: Node configuration form as a mapping from the field
+                       name to a dictionary that holds the field's type, label
+                       and possible options to choose from.
+    @type nodeOptions: C{dict}.
+    @cvar defaultConfig: The default node configuration.
+    """
+
+    implements(iidavoll.IBackendService)
+
+    nodeOptions = {
+            "pubsub#persist_items":
+                {"type": "boolean",
+                 "label": "Persist items to storage"},
+            "pubsub#deliver_payloads":
+                {"type": "boolean",
+                 "label": "Deliver payloads with event notifications"},
+            "pubsub#send_last_published_item":
+                {"type": "list-single",
+                 "label": "When to send the last published item",
+                 "options": {
+                     "never": "Never",
+                     "on_sub": "When a new subscription is processed"}
+                },
+            }
+
+    subscriptionOptions = {
+            "pubsub#subscription_type":
+                {"type": "list-single",
+                 "options": {
+                     "items": "Receive notification of new items only",
+                     "nodes": "Receive notification of new nodes only"}
+                },
+            "pubsub#subscription_depth":
+                {"type": "list-single",
+                 "options": {
+                     "1": "Receive notification from direct child nodes only",
+                     "all": "Receive notification from all descendent nodes"}
+                },
+            }
+
+    def __init__(self, storage):
+        utility.EventDispatcher.__init__(self)
+        self.storage = storage
+        self._callbackList = []
+
+
+    def supportsPublisherAffiliation(self):
+        return True
+
+
+    def supportsOutcastAffiliation(self):
+        return True
+
+
+    def supportsPersistentItems(self):
+        return True
+
+
+    def getNodeType(self, nodeIdentifier):
+        d = self.storage.getNode(nodeIdentifier)
+        d.addCallback(lambda node: node.getType())
+        return d
+
+
+    def getNodes(self):
+        return self.storage.getNodeIds()
+
+
+    def getNodeMetaData(self, nodeIdentifier):
+        d = self.storage.getNode(nodeIdentifier)
+        d.addCallback(lambda node: node.getMetaData())
+        d.addCallback(self._makeMetaData)
+        return d
+
+
+    def _makeMetaData(self, metaData):
+        options = []
+        for key, value in metaData.iteritems():
+            if key in self.nodeOptions:
+                option = {"var": key}
+                option.update(self.nodeOptions[key])
+                option["value"] = value
+                options.append(option)
+
+        return options
+
+
+    def _checkAuth(self, node, requestor):
+        def check(affiliation, node):
+            if affiliation not in ['owner', 'publisher']:
+                raise error.Forbidden()
+            return node
+
+        d = node.getAffiliation(requestor)
+        d.addCallback(check, node)
+        return d
+
+
+    def publish(self, nodeIdentifier, items, requestor):
+        d = self.storage.getNode(nodeIdentifier)
+        d.addCallback(self._checkAuth, requestor)
+        d.addCallback(self._doPublish, items, requestor)
+        return d
+
+
+    def _doPublish(self, node, items, requestor):
+        if node.nodeType == 'collection':
+            raise error.NoPublishing()
+
+        configuration = node.getConfiguration()
+        persistItems = configuration["pubsub#persist_items"]
+        deliverPayloads = configuration["pubsub#deliver_payloads"]
+
+        if items and not persistItems and not deliverPayloads:
+            raise error.ItemForbidden()
+        elif not items and (persistItems or deliverPayloads):
+            raise error.ItemRequired()
+
+        if persistItems or deliverPayloads:
+            for item in items:
+                item.uri = None
+                item.defaultUri = None
+                if not item.getAttribute("id"):
+                    item["id"] = str(uuid.uuid4())
+
+        if persistItems:
+            d = node.storeItems(items, requestor)
+        else:
+            d = defer.succeed(None)
+
+        d.addCallback(self._doNotify, node.nodeIdentifier, items,
+                      deliverPayloads)
+        return d
+
+
+    def _doNotify(self, result, nodeIdentifier, items, deliverPayloads):
+        if items and not deliverPayloads:
+            for item in items:
+                item.children = []
+
+        self.dispatch({'items': items, 'nodeIdentifier': nodeIdentifier},
+                      '//event/pubsub/notify')
+
+
+    def getNotifications(self, nodeIdentifier, items):
+
+        def toNotifications(subscriptions, nodeIdentifier, items):
+            subsBySubscriber = {}
+            for subscription in subscriptions:
+                if subscription.options.get('pubsub#subscription_type',
+                                            'items') == 'items':
+                    subs = subsBySubscriber.setdefault(subscription.subscriber,
+                                                       set())
+                    subs.add(subscription)
+
+            notifications = [(subscriber, subscriptions, items)
+                             for subscriber, subscriptions
+                             in subsBySubscriber.iteritems()]
+
+            return notifications
+
+        def rootNotFound(failure):
+            failure.trap(error.NodeNotFound)
+            return []
+
+        d1 = self.storage.getNode(nodeIdentifier)
+        d1.addCallback(lambda node: node.getSubscriptions('subscribed'))
+        d2 = self.storage.getNode('')
+        d2.addCallback(lambda node: node.getSubscriptions('subscribed'))
+        d2.addErrback(rootNotFound)
+        d = defer.gatherResults([d1, d2])
+        d.addCallback(lambda result: result[0] + result[1])
+        d.addCallback(toNotifications, nodeIdentifier, items)
+        return d
+
+
+    def registerNotifier(self, observerfn, *args, **kwargs):
+        self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs)
+
+
+    def subscribe(self, nodeIdentifier, subscriber, requestor):
+        subscriberEntity = subscriber.userhostJID()
+        if subscriberEntity != requestor.userhostJID():
+            return defer.fail(error.Forbidden())
+
+        d = self.storage.getNode(nodeIdentifier)
+        d.addCallback(_getAffiliation, subscriberEntity)
+        d.addCallback(self._doSubscribe, subscriber)
+        return d
+
+
+    def _doSubscribe(self, result, subscriber):
+        node, affiliation = result
+
+        if affiliation == 'outcast':
+            raise error.Forbidden()
+
+        def trapExists(failure):
+            failure.trap(error.SubscriptionExists)
+            return False
+
+        def cb(sendLast):
+            d = node.getSubscription(subscriber)
+            if sendLast:
+                d.addCallback(self._sendLastPublished, node)
+            return d
+
+        d = node.addSubscription(subscriber, 'subscribed', {})
+        d.addCallbacks(lambda _: True, trapExists)
+        d.addCallback(cb)
+        return d
+
+
+    def _sendLastPublished(self, subscription, node):
+
+        def notifyItem(items):
+            if items:
+                reactor.callLater(0, self.dispatch,
+                                     {'items': items,
+                                      'nodeIdentifier': node.nodeIdentifier,
+                                      'subscription': subscription},
+                                     '//event/pubsub/notify')
+
+        config = node.getConfiguration()
+        sendLastPublished = config.get('pubsub#send_last_published_item',
+                                       'never')
+        if sendLastPublished == 'on_sub' and node.nodeType == 'leaf':
+            entity = subscription.subscriber.userhostJID()
+            d = self.getItems(node.nodeIdentifier, entity, 1)
+            d.addCallback(notifyItem)
+            d.addErrback(log.err)
+
+        return subscription
+
+
+    def unsubscribe(self, nodeIdentifier, subscriber, requestor):
+        if subscriber.userhostJID() != requestor.userhostJID():
+            return defer.fail(error.Forbidden())
+
+        d = self.storage.getNode(nodeIdentifier)
+        d.addCallback(lambda node: node.removeSubscription(subscriber))
+        return d
+
+
+    def getSubscriptions(self, entity):
+        return self.storage.getSubscriptions(entity)
+
+    def supportsAutoCreate(self):
+        return True
+
+    def supportsInstantNodes(self):
+        return True
+
+
+    def createNode(self, nodeIdentifier, requestor):
+        if not nodeIdentifier:
+            nodeIdentifier = 'generic/%s' % uuid.uuid4()
+
+        nodeType = 'leaf'
+        config = self.storage.getDefaultConfiguration(nodeType)
+        config['pubsub#node_type'] = nodeType
+
+        d = self.storage.createNode(nodeIdentifier, requestor, config)
+        d.addCallback(lambda _: nodeIdentifier)
+        return d
+
+
+    def getDefaultConfiguration(self, nodeType):
+        d = defer.succeed(self.storage.getDefaultConfiguration(nodeType))
+        return d
+
+
+    def getNodeConfiguration(self, nodeIdentifier):
+        if not nodeIdentifier:
+            return defer.fail(error.NoRootNode())
+
+        d = self.storage.getNode(nodeIdentifier)
+        d.addCallback(lambda node: node.getConfiguration())
+
+        return d
+
+
+    def setNodeConfiguration(self, nodeIdentifier, options, requestor):
+        if not nodeIdentifier:
+            return defer.fail(error.NoRootNode())
+
+        d = self.storage.getNode(nodeIdentifier)
+        d.addCallback(_getAffiliation, requestor)
+        d.addCallback(self._doSetNodeConfiguration, options)
+        return d
+
+
+    def _doSetNodeConfiguration(self, result, options):
+        node, affiliation = result
+
+        if affiliation != 'owner':
+            raise error.Forbidden()
+
+        return node.setConfiguration(options)
+
+
+    def getAffiliations(self, entity):
+        return self.storage.getAffiliations(entity)
+
+
+    def getItems(self, nodeIdentifier, requestor, maxItems=None,
+                       itemIdentifiers=None):
+        d = self.storage.getNode(nodeIdentifier)
+        d.addCallback(_getAffiliation, requestor)
+        d.addCallback(self._doGetItems, maxItems, itemIdentifiers)
+        return d
+
+
+    def _doGetItems(self, result, maxItems, itemIdentifiers):
+        node, affiliation = result
+
+        if not ILeafNode.providedBy(node):
+            return []
+
+        if affiliation == 'outcast':
+            raise error.Forbidden()
+
+        if itemIdentifiers:
+            return node.getItemsById(itemIdentifiers)
+        else:
+            return node.getItems(maxItems)
+
+
+    def retractItem(self, nodeIdentifier, itemIdentifiers, requestor):
+        d = self.storage.getNode(nodeIdentifier)
+        d.addCallback(_getAffiliation, requestor)
+        d.addCallback(self._doRetract, itemIdentifiers)
+        return d
+
+
+    def _doRetract(self, result, itemIdentifiers):
+        node, affiliation = result
+        persistItems = node.getConfiguration()["pubsub#persist_items"]
+
+        if affiliation not in ['owner', 'publisher']:
+            raise error.Forbidden()
+
+        if not persistItems:
+            raise error.NodeNotPersistent()
+
+        d = node.removeItems(itemIdentifiers)
+        d.addCallback(self._doNotifyRetraction, node.nodeIdentifier)
+        return d
+
+
+    def _doNotifyRetraction(self, itemIdentifiers, nodeIdentifier):
+        self.dispatch({'itemIdentifiers': itemIdentifiers,
+                       'nodeIdentifier': nodeIdentifier },
+                      '//event/pubsub/retract')
+
+
+    def purgeNode(self, nodeIdentifier, requestor):
+        d = self.storage.getNode(nodeIdentifier)
+        d.addCallback(_getAffiliation, requestor)
+        d.addCallback(self._doPurge)
+        return d
+
+
+    def _doPurge(self, result):
+        node, affiliation = result
+        persistItems = node.getConfiguration()["pubsub#persist_items"]
+
+        if affiliation != 'owner':
+            raise error.Forbidden()
+
+        if not persistItems:
+            raise error.NodeNotPersistent()
+
+        d = node.purge()
+        d.addCallback(self._doNotifyPurge, node.nodeIdentifier)
+        return d
+
+
+    def _doNotifyPurge(self, result, nodeIdentifier):
+        self.dispatch(nodeIdentifier, '//event/pubsub/purge')
+
+
+    def registerPreDelete(self, preDeleteFn):
+        self._callbackList.append(preDeleteFn)
+
+
+    def getSubscribers(self, nodeIdentifier):
+        def cb(subscriptions):
+            return [subscription.subscriber for subscription in subscriptions]
+
+        d = self.storage.getNode(nodeIdentifier)
+        d.addCallback(lambda node: node.getSubscriptions('subscribed'))
+        d.addCallback(cb)
+        return d
+
+
+    def deleteNode(self, nodeIdentifier, requestor, redirectURI=None):
+        d = self.storage.getNode(nodeIdentifier)
+        d.addCallback(_getAffiliation, requestor)
+        d.addCallback(self._doPreDelete, redirectURI)
+        return d
+
+
+    def _doPreDelete(self, result, redirectURI):
+        node, affiliation = result
+
+        if affiliation != 'owner':
+            raise error.Forbidden()
+
+        data = {'nodeIdentifier': node.nodeIdentifier,
+                'redirectURI': redirectURI}
+
+        d = defer.DeferredList([cb(data)
+                                for cb in self._callbackList],
+                               consumeErrors=1)
+        d.addCallback(self._doDelete, node.nodeIdentifier)
+
+
+    def _doDelete(self, result, nodeIdentifier):
+        dl = []
+        for succeeded, r in result:
+            if succeeded and r:
+                dl.extend(r)
+
+        d = self.storage.deleteNode(nodeIdentifier)
+        d.addCallback(self._doNotifyDelete, dl)
+
+        return d
+
+
+    def _doNotifyDelete(self, result, dl):
+        for d in dl:
+            d.callback(None)
+
+
+
+class PubSubResourceFromBackend(PubSubResource):
+    """
+    Adapts a backend to an xmpp publish-subscribe service.
+    """
+
+    features = [
+        "config-node",
+        "create-nodes",
+        "delete-any",
+        "delete-nodes",
+        "item-ids",
+        "meta-data",
+        "publish",
+        "purge-nodes",
+        "retract-items",
+        "retrieve-affiliations",
+        "retrieve-default",
+        "retrieve-items",
+        "retrieve-subscriptions",
+        "subscribe",
+    ]
+
+    discoIdentity = disco.DiscoIdentity('pubsub',
+                                        'service',
+                                        'Idavoll publish-subscribe service')
+
+    pubsubService = None
+
+    _errorMap = {
+        error.NodeNotFound: ('item-not-found', None, None),
+        error.NodeExists: ('conflict', None, None),
+        error.Forbidden: ('forbidden', None, None),
+        error.ItemForbidden: ('bad-request', 'item-forbidden', None),
+        error.ItemRequired: ('bad-request', 'item-required', None),
+        error.NoInstantNodes: ('not-acceptable',
+                               'unsupported',
+                               'instant-nodes'),
+        error.NotSubscribed: ('unexpected-request', 'not-subscribed', None),
+        error.InvalidConfigurationOption: ('not-acceptable', None, None),
+        error.InvalidConfigurationValue: ('not-acceptable', None, None),
+        error.NodeNotPersistent: ('feature-not-implemented',
+                                  'unsupported',
+                                  'persistent-node'),
+        error.NoRootNode: ('bad-request', None, None),
+        error.NoCollections: ('feature-not-implemented',
+                              'unsupported',
+                              'collections'),
+        error.NoPublishing: ('feature-not-implemented',
+                             'unsupported',
+                             'publish'),
+    }
+
+    def __init__(self, backend):
+        PubSubResource.__init__(self)
+
+        self.backend = backend
+        self.hideNodes = False
+
+        self.backend.registerNotifier(self._notify)
+        self.backend.registerPreDelete(self._preDelete)
+
+        if self.backend.supportsAutoCreate():
+            self.features.append("auto-create")
+
+        if self.backend.supportsInstantNodes():
+            self.features.append("instant-nodes")
+
+        if self.backend.supportsOutcastAffiliation():
+            self.features.append("outcast-affiliation")
+
+        if self.backend.supportsPersistentItems():
+            self.features.append("persistent-items")
+
+        if self.backend.supportsPublisherAffiliation():
+            self.features.append("publisher-affiliation")
+
+
+    def _notify(self, data):
+        items = data['items']
+        nodeIdentifier = data['nodeIdentifier']
+        if 'subscription' not in data:
+            d = self.backend.getNotifications(nodeIdentifier, items)
+        else:
+            subscription = data['subscription']
+            d = defer.succeed([(subscription.subscriber, [subscription],
+                                items)])
+        d.addCallback(lambda notifications: self.pubsubService.notifyPublish(
+                                                self.serviceJID,
+                                                nodeIdentifier,
+                                                notifications))
+
+
+    def _preDelete(self, data):
+        nodeIdentifier = data['nodeIdentifier']
+        redirectURI = data.get('redirectURI', None)
+        d = self.backend.getSubscribers(nodeIdentifier)
+        d.addCallback(lambda subscribers: self.pubsubService.notifyDelete(
+                                                self.serviceJID,
+                                                nodeIdentifier,
+                                                subscribers,
+                                                redirectURI))
+        return d
+
+
+    def _mapErrors(self, failure):
+        e = failure.trap(*self._errorMap.keys())
+
+        condition, pubsubCondition, feature = self._errorMap[e]
+        msg = failure.value.msg
+
+        if pubsubCondition:
+            exc = PubSubError(condition, pubsubCondition, feature, msg)
+        else:
+            exc = StanzaError(condition, text=msg)
+
+        raise exc
+
+
+    def getInfo(self, requestor, service, nodeIdentifier):
+        info = {}
+
+        def saveType(result):
+            info['type'] = result
+            return nodeIdentifier
+
+        def saveMetaData(result):
+            info['meta-data'] = result
+            return info
+
+        def trapNotFound(failure):
+            failure.trap(error.NodeNotFound)
+            return info
+
+        d = defer.succeed(nodeIdentifier)
+        d.addCallback(self.backend.getNodeType)
+        d.addCallback(saveType)
+        d.addCallback(self.backend.getNodeMetaData)
+        d.addCallback(saveMetaData)
+        d.addErrback(trapNotFound)
+        d.addErrback(self._mapErrors)
+        return d
+
+
+    def getNodes(self, requestor, service, nodeIdentifier):
+        if service.resource:
+            return defer.succeed([])
+        d = self.backend.getNodes()
+        return d.addErrback(self._mapErrors)
+
+
+    def getConfigurationOptions(self):
+        return self.backend.nodeOptions
+
+    def _publish_errb(self, failure, request):
+        if failure.type == error.NodeNotFound and self.backend.supportsAutoCreate():
+            d = self.backend.createNode(request.nodeIdentifier,
+                                        request.sender)
+            d.addCallback(lambda ignore,
+                                 request: self.backend.publish(request.nodeIdentifier,
+                                                               request.items,
+                                                               request.sender),
+                          request)
+            return d
+
+        raise failure
+
+
+    def publish(self, request):
+        d = self.backend.publish(request.nodeIdentifier,
+                                 request.items,
+                                 request.sender)
+        d.addErrback(self._publish_errb, request)
+        return d.addErrback(self._mapErrors)
+
+
+    def subscribe(self, request):
+        d = self.backend.subscribe(request.nodeIdentifier,
+                                   request.subscriber,
+                                   request.sender)
+        return d.addErrback(self._mapErrors)
+
+
+    def unsubscribe(self, request):
+        d = self.backend.unsubscribe(request.nodeIdentifier,
+                                     request.subscriber,
+                                     request.sender)
+        return d.addErrback(self._mapErrors)
+
+
+    def subscriptions(self, request):
+        d = self.backend.getSubscriptions(request.sender)
+        return d.addErrback(self._mapErrors)
+
+
+    def affiliations(self, request):
+        d = self.backend.getAffiliations(request.sender)
+        return d.addErrback(self._mapErrors)
+
+
+    def create(self, request):
+        d = self.backend.createNode(request.nodeIdentifier,
+                                    request.sender)
+        return d.addErrback(self._mapErrors)
+
+
+    def default(self, request):
+        d = self.backend.getDefaultConfiguration(request.nodeType)
+        return d.addErrback(self._mapErrors)
+
+
+    def configureGet(self, request):
+        d = self.backend.getNodeConfiguration(request.nodeIdentifier)
+        return d.addErrback(self._mapErrors)
+
+
+    def configureSet(self, request):
+        d = self.backend.setNodeConfiguration(request.nodeIdentifier,
+                                              request.options,
+                                              request.sender)
+        return d.addErrback(self._mapErrors)
+
+
+    def items(self, request):
+        d = self.backend.getItems(request.nodeIdentifier,
+                                  request.sender,
+                                  request.maxItems,
+                                  request.itemIdentifiers)
+        return d.addErrback(self._mapErrors)
+
+
+    def retract(self, request):
+        d = self.backend.retractItem(request.nodeIdentifier,
+                                     request.itemIdentifiers,
+                                     request.sender)
+        return d.addErrback(self._mapErrors)
+
+
+    def purge(self, request):
+        d = self.backend.purgeNode(request.nodeIdentifier,
+                                   request.sender)
+        return d.addErrback(self._mapErrors)
+
+
+    def delete(self, request):
+        d = self.backend.deleteNode(request.nodeIdentifier,
+                                    request.sender)
+        return d.addErrback(self._mapErrors)
+
+components.registerAdapter(PubSubResourceFromBackend,
+                           IBackendService,
+                           IPubSubResource)