Mercurial > libervia-pubsub
diff idavoll/backend.py @ 206:274a45d2a5ab
Implement root collection that includes all leaf nodes.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Mon, 04 Aug 2008 13:47:10 +0000 |
parents | e6b710bf2b24 |
children | 7f3ffb7a1a9e |
line wrap: on
line diff
--- a/idavoll/backend.py Mon Aug 04 07:10:45 2008 +0000 +++ b/idavoll/backend.py Mon Aug 04 13:47:10 2008 +0000 @@ -18,7 +18,7 @@ from zope.interface import implements from twisted.application import service -from twisted.python import components +from twisted.python import components, log from twisted.internet import defer, reactor from twisted.words.protocols.jabber.error import StanzaError from twisted.words.xish import domish, utility @@ -27,7 +27,7 @@ from wokkel.pubsub import PubSubService, PubSubError from idavoll import error, iidavoll -from idavoll.iidavoll import IBackendService +from idavoll.iidavoll import IBackendService, ILeafNode def _getAffiliation(node, entity): d = node.getAffiliation(entity) @@ -40,35 +40,45 @@ """ Generic publish-subscribe backend service. - @cvar options: Node configuration form as a mapping from the field - name to a dictionary that holds the field's type, - label and possible options to choose from. - @type options: C{dict}. + @cvar nodeOptions: Node configuration form as a mapping from the field + name to a dictionary that holds the field's type, label + and possible options to choose from. + @type nodeOptions: C{dict}. @cvar defaultConfig: The default node configuration. """ implements(iidavoll.IBackendService) - options = {"pubsub#persist_items": - {"type": "boolean", - "label": "Persist items to storage"}, - "pubsub#deliver_payloads": - {"type": "boolean", - "label": "Deliver payloads with event notifications"}, - "pubsub#send_last_published_item": - {"type": "list-single", - "label": "When to send the last published item", - "options": { - "never": "Never", - "on_sub": "When a new subscription is processed", - } - }, - } + nodeOptions = { + "pubsub#persist_items": + {"type": "boolean", + "label": "Persist items to storage"}, + "pubsub#deliver_payloads": + {"type": "boolean", + "label": "Deliver payloads with event notifications"}, + "pubsub#send_last_published_item": + {"type": "list-single", + "label": "When to send the last published item", + "options": { + "never": "Never", + "on_sub": "When a new subscription is processed"} + }, + } - defaultConfig = {"pubsub#persist_items": True, - "pubsub#deliver_payloads": True, - "pubsub#send_last_published_item": 'on_sub', - } + subscriptionOptions = { + "pubsub#subscription_type": + {"type": "list-single", + "options": { + "items": "Receive notification of new items only", + "nodes": "Receive notification of new nodes only"} + }, + "pubsub#subscription_depth": + {"type": "list-single", + "options": { + "1": "Receive notification from direct child nodes only", + "all": "Receive notification from all descendent nodes"} + }, + } def __init__(self, storage): utility.EventDispatcher.__init__(self) @@ -108,9 +118,9 @@ def _makeMetaData(self, metaData): options = [] for key, value in metaData.iteritems(): - if self.options.has_key(key): + if key in self.nodeOptions: option = {"var": key} - option.update(self.options[key]) + option.update(self.nodeOptions[key]) option["value"] = value options.append(option) @@ -136,6 +146,9 @@ def _doPublish(self, node, items, requestor): + if node.nodeType == 'collection': + raise error.NoPublishing() + configuration = node.getConfiguration() persistItems = configuration["pubsub#persist_items"] deliverPayloads = configuration["pubsub#deliver_payloads"] @@ -147,6 +160,8 @@ if persistItems or deliverPayloads: for item in items: + item.uri = None + item.defaultUri = None if not item.getAttribute("id"): item["id"] = str(uuid.uuid4()) @@ -169,18 +184,36 @@ '//event/pubsub/notify') - def getNotificationList(self, nodeIdentifier, items): - d = self.storage.getNode(nodeIdentifier) - d.addCallback(lambda node: node.getSubscribers()) - d.addCallback(self._magicFilter, nodeIdentifier, items) - return d + def getNotifications(self, nodeIdentifier, items): + + def toNotifications(subscriptions, nodeIdentifier, items): + subsBySubscriber = {} + for subscription in subscriptions: + if subscription.options.get('pubsub#subscription_type', + 'items') == 'items': + subs = subsBySubscriber.setdefault(subscription.subscriber, + set()) + subs.add(subscription) + + notifications = [(subscriber, subscriptions, items) + for subscriber, subscriptions + in subsBySubscriber.iteritems()] + return notifications - def _magicFilter(self, subscribers, nodeIdentifier, items): - list = [] - for subscriber in subscribers: - list.append((subscriber, items)) - return list + def rootNotFound(failure): + failure.trap(error.NodeNotFound) + return [] + + d1 = self.storage.getNode(nodeIdentifier) + d1.addCallback(lambda node: node.getSubscriptions('subscribed')) + d2 = self.storage.getNode('') + d2.addCallback(lambda node: node.getSubscriptions('subscribed')) + d2.addErrback(rootNotFound) + d = defer.gatherResults([d1, d2]) + d.addCallback(lambda result: result[0] + result[1]) + d.addCallback(toNotifications, nodeIdentifier, items) + return d def registerNotifier(self, observerfn, *args, **kwargs): @@ -204,41 +237,42 @@ if affiliation == 'outcast': raise error.Forbidden() - d = node.addSubscription(subscriber, 'subscribed') - d.addCallback(lambda _: self._sendLastPublished(node, subscriber)) - d.addCallback(lambda _: 'subscribed') - d.addErrback(self._getSubscription, node, subscriber) - d.addCallback(self._returnSubscription, node.nodeIdentifier) + def trapExists(failure): + failure.trap(error.SubscriptionExists) + return False + + def cb(sendLast): + d = node.getSubscription(subscriber) + if sendLast: + d.addCallback(self._sendLastPublished, node) + return d + + d = node.addSubscription(subscriber, 'subscribed', {}) + d.addCallbacks(lambda _: True, trapExists) + d.addCallback(cb) return d - def _getSubscription(self, failure, node, subscriber): - failure.trap(error.SubscriptionExists) - return node.getSubscription(subscriber) - - - def _returnSubscription(self, result, nodeIdentifier): - return nodeIdentifier, result - - - def _sendLastPublished(self, node, subscriber): + def _sendLastPublished(self, subscription, node): def notifyItem(items): - if not items: - return - - reactor.callLater(0, self.dispatch, - {'items': items, - 'nodeIdentifier': node.nodeIdentifier, - 'subscriber': subscriber}, - '//event/pubsub/notify') + if items: + reactor.callLater(0, self.dispatch, + {'items': items, + 'nodeIdentifier': node.nodeIdentifier, + 'subscription': subscription}, + '//event/pubsub/notify') config = node.getConfiguration() - if config.get("pubsub#send_last_published_item", 'never') != 'on_sub': - return + sendLastPublished = config.get('pubsub#send_last_published_item', + 'never') + if sendLastPublished == 'on_sub' and node.nodeType == 'leaf': + entity = subscription.subscriber.userhostJID() + d = self.getItems(node.nodeIdentifier, entity, 1) + d.addCallback(notifyItem) + d.addErrback(log.err) - d = self.getItems(node.nodeIdentifier, subscriber.userhostJID(), 1) - d.addCallback(notifyItem) + return subscription def unsubscribe(self, nodeIdentifier, subscriber, requestor): @@ -261,13 +295,18 @@ def createNode(self, nodeIdentifier, requestor): if not nodeIdentifier: nodeIdentifier = 'generic/%s' % uuid.uuid4() - d = self.storage.createNode(nodeIdentifier, requestor) + + nodeType = 'leaf' + config = self.storage.getDefaultConfiguration(nodeType) + config['pubsub#node_type'] = nodeType + + d = self.storage.createNode(nodeIdentifier, requestor, config) d.addCallback(lambda _: nodeIdentifier) return d - def getDefaultConfiguration(self): - d = defer.succeed(self.defaultConfig) + def getDefaultConfiguration(self, nodeType): + d = defer.succeed(self.storage.getDefaultConfiguration(nodeType)) return d @@ -277,6 +316,7 @@ d = self.storage.getNode(nodeIdentifier) d.addCallback(lambda node: node.getConfiguration()) + return d @@ -314,6 +354,9 @@ def _doGetItems(self, result, maxItems, itemIdentifiers): node, affiliation = result + if not ILeafNode.providedBy(node): + return [] + if affiliation == 'outcast': raise error.Forbidden() @@ -382,8 +425,12 @@ def getSubscribers(self, nodeIdentifier): + def cb(subscriptions): + return [subscription.subscriber for subscription in subscriptions] + d = self.storage.getNode(nodeIdentifier) - d.addCallback(lambda node: node.getSubscribers()) + d.addCallback(lambda node: node.getSubscriptions('subscribed')) + d.addCallback(cb) return d @@ -447,6 +494,12 @@ 'unsupported', 'persistent-node'), error.NoRootNode: ('bad-request', None, None), + error.NoCollections: ('feature-not-implemented', + 'unsupported', + 'collections'), + error.NoPublishing: ('feature-not-implemented', + 'unsupported', + 'publish'), } def __init__(self, backend): @@ -497,10 +550,12 @@ def _notify(self, data): items = data['items'] nodeIdentifier = data['nodeIdentifier'] - if 'subscriber' not in data: - d = self.backend.getNotificationList(nodeIdentifier, items) + if 'subscription' not in data: + d = self.backend.getNotifications(nodeIdentifier, items) else: - d = defer.succeed([(data['subscriber'], items)]) + subscription = data['subscription'] + d = defer.succeed([(subscription.subscriber, [subscription], + items)]) d.addCallback(lambda notifications: self.notifyPublish(self.serviceJID, nodeIdentifier, notifications)) @@ -539,11 +594,16 @@ info['meta-data'] = result return info + def trapNotFound(failure): + failure.trap(error.NodeNotFound) + return info + d = defer.succeed(nodeIdentifier) d.addCallback(self.backend.getNodeType) d.addCallback(saveType) d.addCallback(self.backend.getNodeMetaData) d.addCallback(saveMetaData) + d.addErrback(trapNotFound) d.addErrback(self._mapErrors) return d @@ -586,11 +646,11 @@ def getConfigurationOptions(self): - return self.backend.options + return self.backend.nodeOptions - def getDefaultConfiguration(self, requestor, service): - d = self.backend.getDefaultConfiguration() + def getDefaultConfiguration(self, requestor, service, nodeType): + d = self.backend.getDefaultConfiguration(nodeType) return d.addErrback(self._mapErrors)