Mercurial > libervia-pubsub
diff idavoll/backend.py @ 198:e404775b12df
Change naming and spacing conventions to match Twisted's.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Tue, 10 Jun 2008 11:31:49 +0000 |
parents | 00a6dbfbee42 |
children | 2189c663ba44 |
line wrap: on
line diff
--- a/idavoll/backend.py Thu Jun 18 11:54:56 2009 +0000 +++ b/idavoll/backend.py Tue Jun 10 11:31:49 2008 +0000 @@ -3,6 +3,16 @@ # Copyright (c) 2003-2008 Ralph Meijer # See LICENSE for details. +""" +Generic publish-subscribe backend. + +This module implements a generic publish-subscribe backend service with +business logic as per +L{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>)} that interacts with +a given storage facility. It also provides an adapter from the XMPP +publish-subscribe protocol. +""" + import uuid from zope.interface import implements @@ -19,13 +29,23 @@ from idavoll import error, iidavoll from idavoll.iidavoll import IBackendService -def _get_affiliation(node, entity): - d = node.get_affiliation(entity) +def _getAffiliation(node, entity): + d = node.getAffiliation(entity) d.addCallback(lambda affiliation: (node, affiliation)) return d + class BackendService(service.Service, utility.EventDispatcher): + """ + Generic publish-subscribe backend service. + + @cvar options: Node configuration form as a mapping from the field + name to a dictionary that holds the field's type, + label and possible options to choose from. + @type options: C{dict}. + @cvar defaultConfig: The default node configuration. + """ implements(iidavoll.IBackendService) @@ -45,42 +65,49 @@ }, } - default_config = {"pubsub#persist_items": True, - "pubsub#deliver_payloads": True, - "pubsub#send_last_published_item": 'on_sub', - } + defaultConfig = {"pubsub#persist_items": True, + "pubsub#deliver_payloads": True, + "pubsub#send_last_published_item": 'on_sub', + } def __init__(self, storage): utility.EventDispatcher.__init__(self) self.storage = storage - self._callback_list = [] + self._callbackList = [] - def supports_publisher_affiliation(self): + + def supportsPublisherAffiliation(self): return True - def supports_outcast_affiliation(self): + + def supportsOutcastAffiliation(self): return True - def supports_persistent_items(self): + + def supportsPersistentItems(self): return True - def get_node_type(self, node_id): - d = self.storage.get_node(node_id) - d.addCallback(lambda node: node.get_type()) + + def getNodeType(self, nodeIdentifier): + d = self.storage.getNode(nodeIdentifier) + d.addCallback(lambda node: node.getType()) return d - def get_nodes(self): - return self.storage.get_node_ids() + + def getNodes(self): + return self.storage.getNodeIds() + - def get_node_meta_data(self, node_id): - d = self.storage.get_node(node_id) - d.addCallback(lambda node: node.get_meta_data()) - d.addCallback(self._make_meta_data) + def getNodeMetaData(self, nodeIdentifier): + d = self.storage.getNode(nodeIdentifier) + d.addCallback(lambda node: node.getMetaData()) + d.addCallback(self._makeMetaData) return d - def _make_meta_data(self, meta_data): + + def _makeMetaData(self, metaData): options = [] - for key, value in meta_data.iteritems(): + for key, value in metaData.iteritems(): if self.options.has_key(key): option = {"var": key} option.update(self.options[key]) @@ -89,99 +116,112 @@ return options - def _check_auth(self, node, requestor): + + def _checkAuth(self, node, requestor): def check(affiliation, node): if affiliation not in ['owner', 'publisher']: raise error.Forbidden() return node - d = node.get_affiliation(requestor) + d = node.getAffiliation(requestor) d.addCallback(check, node) return d - def publish(self, node_id, items, requestor): - d = self.storage.get_node(node_id) - d.addCallback(self._check_auth, requestor) - d.addCallback(self._do_publish, items, requestor) + + def publish(self, nodeIdentifier, items, requestor): + d = self.storage.getNode(nodeIdentifier) + d.addCallback(self._checkAuth, requestor) + d.addCallback(self._doPublish, items, requestor) return d - def _do_publish(self, node, items, requestor): - configuration = node.get_configuration() - persist_items = configuration["pubsub#persist_items"] - deliver_payloads = configuration["pubsub#deliver_payloads"] - if items and not persist_items and not deliver_payloads: + def _doPublish(self, node, items, requestor): + configuration = node.getConfiguration() + persistItems = configuration["pubsub#persist_items"] + deliverPayloads = configuration["pubsub#deliver_payloads"] + + if items and not persistItems and not deliverPayloads: raise error.ItemForbidden() - elif not items and (persist_items or deliver_payloads): + elif not items and (persistItems or deliverPayloads): raise error.ItemRequired() - if persist_items or deliver_payloads: + if persistItems or deliverPayloads: for item in items: if not item.getAttribute("id"): item["id"] = str(uuid.uuid4()) - if persist_items: - d = node.store_items(items, requestor) + if persistItems: + d = node.storeItems(items, requestor) else: d = defer.succeed(None) - d.addCallback(self._do_notify, node.id, items, deliver_payloads) + d.addCallback(self._doNotify, node.nodeIdentifier, items, + deliverPayloads) return d - def _do_notify(self, result, node_id, items, deliver_payloads): - if items and not deliver_payloads: + + def _doNotify(self, result, nodeIdentifier, items, deliverPayloads): + if items and not deliverPayloads: for item in items: item.children = [] - self.dispatch({'items': items, 'node_id': node_id}, + self.dispatch({'items': items, 'nodeIdentifier': nodeIdentifier}, '//event/pubsub/notify') - def get_notification_list(self, node_id, items): - d = self.storage.get_node(node_id) - d.addCallback(lambda node: node.get_subscribers()) - d.addCallback(self._magic_filter, node_id, items) + + def getNotificationList(self, nodeIdentifier, items): + d = self.storage.getNode(nodeIdentifier) + d.addCallback(lambda node: node.getSubscribers()) + d.addCallback(self._magicFilter, nodeIdentifier, items) return d - def _magic_filter(self, subscribers, node_id, items): + + def _magicFilter(self, subscribers, nodeIdentifier, items): list = [] for subscriber in subscribers: list.append((subscriber, items)) return list - def register_notifier(self, observerfn, *args, **kwargs): + + def registerNotifier(self, observerfn, *args, **kwargs): self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs) - def subscribe(self, node_id, subscriber, requestor): - subscriber_entity = subscriber.userhostJID() - if subscriber_entity != requestor: + + def subscribe(self, nodeIdentifier, subscriber, requestor): + subscriberEntity = subscriber.userhostJID() + if subscriberEntity != requestor: return defer.fail(error.Forbidden()) - d = self.storage.get_node(node_id) - d.addCallback(_get_affiliation, subscriber_entity) - d.addCallback(self._do_subscribe, subscriber) + d = self.storage.getNode(nodeIdentifier) + d.addCallback(_getAffiliation, subscriberEntity) + d.addCallback(self._doSubscribe, subscriber) return d - def _do_subscribe(self, result, subscriber): + + def _doSubscribe(self, result, subscriber): node, affiliation = result if affiliation == 'outcast': raise error.Forbidden() - d = node.add_subscription(subscriber, 'subscribed') - d.addCallback(lambda _: self._send_last_published(node, subscriber)) + d = node.addSubscription(subscriber, 'subscribed') + d.addCallback(lambda _: self._sendLastPublished(node, subscriber)) d.addCallback(lambda _: 'subscribed') - d.addErrback(self._get_subscription, node, subscriber) - d.addCallback(self._return_subscription, node.id) + d.addErrback(self._getSubscription, node, subscriber) + d.addCallback(self._returnSubscription, node.nodeIdentifier) return d - def _get_subscription(self, failure, node, subscriber): + + def _getSubscription(self, failure, node, subscriber): failure.trap(error.SubscriptionExists) - return node.get_subscription(subscriber) + return node.getSubscription(subscriber) + - def _return_subscription(self, result, node_id): - return node_id, result + def _returnSubscription(self, result, nodeIdentifier): + return nodeIdentifier, result - def _send_last_published(self, node, subscriber): + + def _sendLastPublished(self, node, subscriber): class StringParser(object): def __init__(self): self.elementStream = domish.elementStream() @@ -202,61 +242,69 @@ self.elementStream.parse(string) return self.document - def notify_item(result): + def notifyItem(result): if not result: return items = [domish.SerializedXML(item) for item in result] - reactor.callLater(0, self.dispatch, {'items': items, - 'node_id': node.id, - 'subscriber': subscriber}, - '//event/pubsub/notify') + reactor.callLater(0, self.dispatch, + {'items': items, + 'nodeIdentifier': node.nodeIdentifier, + 'subscriber': subscriber}, + '//event/pubsub/notify') - config = node.get_configuration() + config = node.getConfiguration() if config.get("pubsub#send_last_published_item", 'never') != 'on_sub': return - d = self.get_items(node.id, subscriber.userhostJID(), 1) - d.addCallback(notify_item) + d = self.getItems(node.nodeIdentifier, subscriber.userhostJID(), 1) + d.addCallback(notifyItem) - def unsubscribe(self, node_id, subscriber, requestor): + + def unsubscribe(self, nodeIdentifier, subscriber, requestor): if subscriber.userhostJID() != requestor: return defer.fail(error.Forbidden()) - d = self.storage.get_node(node_id) - d.addCallback(lambda node: node.remove_subscription(subscriber)) + d = self.storage.getNode(nodeIdentifier) + d.addCallback(lambda node: node.removeSubscription(subscriber)) return d - def get_subscriptions(self, entity): - return self.storage.get_subscriptions(entity) - def supports_instant_nodes(self): + def getSubscriptions(self, entity): + return self.storage.getSubscriptions(entity) + + + def supportsInstantNodes(self): return True - def create_node(self, node_id, requestor): - if not node_id: - node_id = 'generic/%s' % uuid.uuid4() - d = self.storage.create_node(node_id, requestor) - d.addCallback(lambda _: node_id) + + def createNode(self, nodeIdentifier, requestor): + if not nodeIdentifier: + nodeIdentifier = 'generic/%s' % uuid.uuid4() + d = self.storage.createNode(nodeIdentifier, requestor) + d.addCallback(lambda _: nodeIdentifier) return d - def get_default_configuration(self): - d = defer.succeed(self.default_config) - d.addCallback(self._make_config) + + def getDefaultConfiguration(self): + d = defer.succeed(self.defaultConfig) + d.addCallback(self._makeConfig) return d - def get_node_configuration(self, node_id): - if not node_id: + + def getNodeConfiguration(self, nodeIdentifier): + if not nodeIdentifier: return defer.fail(error.NoRootNode()) - d = self.storage.get_node(node_id) - d.addCallback(lambda node: node.get_configuration()) + d = self.storage.getNode(nodeIdentifier) + d.addCallback(lambda node: node.getConfiguration()) - d.addCallback(self._make_config) + d.addCallback(self._makeConfig) return d - def _make_config(self, config): + + def _makeConfig(self, config): options = [] for key, value in self.options.iteritems(): option = {"var": key} @@ -267,8 +315,9 @@ return options - def set_node_configuration(self, node_id, options, requestor): - if not node_id: + + def setNodeConfiguration(self, nodeIdentifier, options, requestor): + if not nodeIdentifier: return defer.fail(error.NoRootNode()) for key, value in options.iteritems(): @@ -280,126 +329,146 @@ except ValueError: return defer.fail(error.InvalidConfigurationValue()) - d = self.storage.get_node(node_id) - d.addCallback(_get_affiliation, requestor) - d.addCallback(self._do_set_node_configuration, options) + d = self.storage.getNode(nodeIdentifier) + d.addCallback(_getAffiliation, requestor) + d.addCallback(self._doSetNodeConfiguration, options) return d - def _do_set_node_configuration(self, result, options): + + def _doSetNodeConfiguration(self, result, options): node, affiliation = result if affiliation != 'owner': raise error.Forbidden() - return node.set_configuration(options) + return node.setConfiguration(options) + - def get_affiliations(self, entity): - return self.storage.get_affiliations(entity) + def getAffiliations(self, entity): + return self.storage.getAffiliations(entity) - def get_items(self, node_id, requestor, max_items=None, item_ids=[]): - d = self.storage.get_node(node_id) - d.addCallback(_get_affiliation, requestor) - d.addCallback(self._do_get_items, max_items, item_ids) + + def getItems(self, nodeIdentifier, requestor, maxItems=None, + itemIdentifiers=None): + d = self.storage.getNode(nodeIdentifier) + d.addCallback(_getAffiliation, requestor) + d.addCallback(self._doGetItems, maxItems, itemIdentifiers) return d - def _do_get_items(self, result, max_items, item_ids): + + def _doGetItems(self, result, maxItems, itemIdentifiers): node, affiliation = result if affiliation == 'outcast': raise error.Forbidden() - if item_ids: - return node.get_items_by_id(item_ids) + if itemIdentifiers: + return node.getItemsById(itemIdentifiers) else: - return node.get_items(max_items) + return node.getItems(maxItems) - def retract_item(self, node_id, item_ids, requestor): - d = self.storage.get_node(node_id) - d.addCallback(_get_affiliation, requestor) - d.addCallback(self._do_retract, item_ids) + + def retractItem(self, nodeIdentifier, itemIdentifiers, requestor): + d = self.storage.getNode(nodeIdentifier) + d.addCallback(_getAffiliation, requestor) + d.addCallback(self._doRetract, itemIdentifiers) return d - def _do_retract(self, result, item_ids): + + def _doRetract(self, result, itemIdentifiers): node, affiliation = result - persist_items = node.get_configuration()["pubsub#persist_items"] + persistItems = node.getConfiguration()["pubsub#persist_items"] if affiliation not in ['owner', 'publisher']: raise error.Forbidden() - if not persist_items: + if not persistItems: raise error.NodeNotPersistent() - d = node.remove_items(item_ids) - d.addCallback(self._do_notify_retraction, node.id) + d = node.removeItems(itemIdentifiers) + d.addCallback(self._doNotifyRetraction, node.nodeIdentifier) return d - def _do_notify_retraction(self, item_ids, node_id): - self.dispatch({ 'item_ids': item_ids, 'node_id': node_id }, - '//event/pubsub/retract') + + def _doNotifyRetraction(self, itemIdentifiers, nodeIdentifier): + self.dispatch({'itemIdentifiers': itemIdentifiers, + 'nodeIdentifier': nodeIdentifier }, + '//event/pubsub/retract') - def purge_node(self, node_id, requestor): - d = self.storage.get_node(node_id) - d.addCallback(_get_affiliation, requestor) - d.addCallback(self._do_purge) + + def purgeNode(self, nodeIdentifier, requestor): + d = self.storage.getNode(nodeIdentifier) + d.addCallback(_getAffiliation, requestor) + d.addCallback(self._doPurge) return d - def _do_purge(self, result): + + def _doPurge(self, result): node, affiliation = result - persist_items = node.get_configuration()["pubsub#persist_items"] + persistItems = node.getConfiguration()["pubsub#persist_items"] if affiliation != 'owner': raise error.Forbidden() - if not persist_items: + if not persistItems: raise error.NodeNotPersistent() d = node.purge() - d.addCallback(self._do_notify_purge, node.id) + d.addCallback(self._doNotifyPurge, node.nodeIdentifier) return d - def _do_notify_purge(self, result, node_id): - self.dispatch(node_id, '//event/pubsub/purge') + + def _doNotifyPurge(self, result, nodeIdentifier): + self.dispatch(nodeIdentifier, '//event/pubsub/purge') + - def register_pre_delete(self, pre_delete_fn): - self._callback_list.append(pre_delete_fn) + def registerPreDelete(self, preDeleteFn): + self._callbackList.append(preDeleteFn) + - def get_subscribers(self, node_id): - d = self.storage.get_node(node_id) - d.addCallback(lambda node: node.get_subscribers()) + def getSubscribers(self, nodeIdentifier): + d = self.storage.getNode(nodeIdentifier) + d.addCallback(lambda node: node.getSubscribers()) return d - def delete_node(self, node_id, requestor): - d = self.storage.get_node(node_id) - d.addCallback(_get_affiliation, requestor) - d.addCallback(self._do_pre_delete) + + def deleteNode(self, nodeIdentifier, requestor): + d = self.storage.getNode(nodeIdentifier) + d.addCallback(_getAffiliation, requestor) + d.addCallback(self._doPreDelete) return d - def _do_pre_delete(self, result): + + def _doPreDelete(self, result): node, affiliation = result if affiliation != 'owner': raise error.Forbidden() - d = defer.DeferredList([cb(node.id) for cb in self._callback_list], + d = defer.DeferredList([cb(node.nodeIdentifier) + for cb in self._callbackList], consumeErrors=1) - d.addCallback(self._do_delete, node.id) + d.addCallback(self._doDelete, node.nodeIdentifier) - def _do_delete(self, result, node_id): + + def _doDelete(self, result, nodeIdentifier): dl = [] for succeeded, r in result: if succeeded and r: dl.extend(r) - d = self.storage.delete_node(node_id) - d.addCallback(self._do_notify_delete, dl) + d = self.storage.deleteNode(nodeIdentifier) + d.addCallback(self._doNotifyDelete, dl) return d - def _do_notify_delete(self, result, dl): + + def _doNotifyDelete(self, result, dl): for d in dl: d.callback(None) + class PubSubServiceFromBackend(PubSubService): """ Adapts a backend to an xmpp publish-subscribe service. @@ -433,8 +502,9 @@ self.pubSubFeatures = self._getPubSubFeatures() - self.backend.register_notifier(self._notify) - self.backend.register_pre_delete(self._pre_delete) + self.backend.registerNotifier(self._notify) + self.backend.registerPreDelete(self._preDelete) + def _getPubSubFeatures(self): features = [ @@ -454,38 +524,41 @@ "subscribe", ] - if self.backend.supports_instant_nodes(): + if self.backend.supportsInstantNodes(): features.append("instant-nodes") - if self.backend.supports_outcast_affiliation(): + if self.backend.supportsOutcastAffiliation(): features.append("outcast-affiliation") - if self.backend.supports_persistent_items(): + if self.backend.supportsPersistentItems(): features.append("persistent-items") - if self.backend.supports_publisher_affiliation(): + if self.backend.supportsPublisherAffiliation(): features.append("publisher-affiliation") return features + def _notify(self, data): items = data['items'] - nodeIdentifier = data['node_id'] + nodeIdentifier = data['nodeIdentifier'] if 'subscriber' not in data: - d = self.backend.get_notification_list(nodeIdentifier, items) + d = self.backend.getNotificationList(nodeIdentifier, items) else: d = defer.succeed([(data['subscriber'], items)]) d.addCallback(lambda notifications: self.notifyPublish(self.serviceJID, nodeIdentifier, notifications)) - def _pre_delete(self, nodeIdentifier): - d = self.backend.get_subscribers(nodeIdentifier) + + def _preDelete(self, nodeIdentifier): + d = self.backend.getSubscribers(nodeIdentifier) d.addCallback(lambda subscribers: self.notifyDelete(self.serviceJID, nodeIdentifier, subscribers)) return d + def _mapErrors(self, failure): e = failure.trap(*self._errorMap.keys()) @@ -499,6 +572,7 @@ raise exc + def getNodeInfo(self, requestor, service, nodeIdentifier): info = {} @@ -511,72 +585,87 @@ return info d = defer.succeed(nodeIdentifier) - d.addCallback(self.backend.get_node_type) + d.addCallback(self.backend.getNodeType) d.addCallback(saveType) - d.addCallback(self.backend.get_node_meta_data) + d.addCallback(self.backend.getNodeMetaData) d.addCallback(saveMetaData) d.addErrback(self._mapErrors) return d + def getNodes(self, requestor, service): if service.resource: return defer.succeed([]) - d = self.backend.get_nodes() + d = self.backend.getNodes() return d.addErrback(self._mapErrors) + def publish(self, requestor, service, nodeIdentifier, items): d = self.backend.publish(nodeIdentifier, items, requestor) return d.addErrback(self._mapErrors) + def subscribe(self, requestor, service, nodeIdentifier, subscriber): d = self.backend.subscribe(nodeIdentifier, subscriber, requestor) return d.addErrback(self._mapErrors) + def unsubscribe(self, requestor, service, nodeIdentifier, subscriber): d = self.backend.unsubscribe(nodeIdentifier, subscriber, requestor) return d.addErrback(self._mapErrors) + def subscriptions(self, requestor, service): - d = self.backend.get_subscriptions(requestor) + d = self.backend.getSubscriptions(requestor) return d.addErrback(self._mapErrors) + def affiliations(self, requestor, service): - d = self.backend.get_affiliations(requestor) + d = self.backend.getAffiliations(requestor) return d.addErrback(self._mapErrors) + def create(self, requestor, service, nodeIdentifier): - d = self.backend.create_node(nodeIdentifier, requestor) + d = self.backend.createNode(nodeIdentifier, requestor) return d.addErrback(self._mapErrors) + def getDefaultConfiguration(self, requestor, service): - d = self.backend.get_default_configuration() + d = self.backend.getDefaultConfiguration() return d.addErrback(self._mapErrors) + def getConfiguration(self, requestor, service, nodeIdentifier): - d = self.backend.get_node_configuration(nodeIdentifier) + d = self.backend.getNodeConfiguration(nodeIdentifier) return d.addErrback(self._mapErrors) + def setConfiguration(self, requestor, service, nodeIdentifier, options): - d = self.backend.set_node_configuration(nodeIdentifier, options, + d = self.backend.setNodeConfiguration(nodeIdentifier, options, requestor) return d.addErrback(self._mapErrors) - def items(self, requestor, service, nodeIdentifier, maxItems, itemIdentifiers): - d = self.backend.get_items(nodeIdentifier, requestor, maxItems, + + def items(self, requestor, service, nodeIdentifier, maxItems, + itemIdentifiers): + d = self.backend.getItems(nodeIdentifier, requestor, maxItems, itemIdentifiers) return d.addErrback(self._mapErrors) + def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): - d = self.backend.retract_item(nodeIdentifier, itemIdentifiers, + d = self.backend.retractItem(nodeIdentifier, itemIdentifiers, requestor) return d.addErrback(self._mapErrors) + def purge(self, requestor, service, nodeIdentifier): - d = self.backend.purge_node(nodeIdentifier, requestor) + d = self.backend.purgeNode(nodeIdentifier, requestor) return d.addErrback(self._mapErrors) + def delete(self, requestor, service, nodeIdentifier): - d = self.backend.delete_node(nodeIdentifier, requestor) + d = self.backend.deleteNode(nodeIdentifier, requestor) return d.addErrback(self._mapErrors) components.registerAdapter(PubSubServiceFromBackend,