Mercurial > libervia-pubsub
changeset 198:e404775b12df
Change naming and spacing conventions to match Twisted's.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Tue, 10 Jun 2008 11:31:49 +0000 |
parents | 9da5a95d408d |
children | 569e4dac9bc3 |
files | idavoll/backend.py idavoll/gateway.py idavoll/iidavoll.py idavoll/memory_storage.py idavoll/pgsql_storage.py idavoll/test/test_backend.py idavoll/test/test_gateway.py idavoll/test/test_storage.py |
diffstat | 8 files changed, 829 insertions(+), 558 deletions(-) [+] |
line wrap: on
line diff
--- a/idavoll/backend.py Thu Jun 18 11:54:56 2009 +0000 +++ b/idavoll/backend.py Tue Jun 10 11:31:49 2008 +0000 @@ -3,6 +3,16 @@ # Copyright (c) 2003-2008 Ralph Meijer # See LICENSE for details. +""" +Generic publish-subscribe backend. + +This module implements a generic publish-subscribe backend service with +business logic as per +L{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>)} that interacts with +a given storage facility. It also provides an adapter from the XMPP +publish-subscribe protocol. +""" + import uuid from zope.interface import implements @@ -19,13 +29,23 @@ from idavoll import error, iidavoll from idavoll.iidavoll import IBackendService -def _get_affiliation(node, entity): - d = node.get_affiliation(entity) +def _getAffiliation(node, entity): + d = node.getAffiliation(entity) d.addCallback(lambda affiliation: (node, affiliation)) return d + class BackendService(service.Service, utility.EventDispatcher): + """ + Generic publish-subscribe backend service. + + @cvar options: Node configuration form as a mapping from the field + name to a dictionary that holds the field's type, + label and possible options to choose from. + @type options: C{dict}. + @cvar defaultConfig: The default node configuration. + """ implements(iidavoll.IBackendService) @@ -45,42 +65,49 @@ }, } - default_config = {"pubsub#persist_items": True, - "pubsub#deliver_payloads": True, - "pubsub#send_last_published_item": 'on_sub', - } + defaultConfig = {"pubsub#persist_items": True, + "pubsub#deliver_payloads": True, + "pubsub#send_last_published_item": 'on_sub', + } def __init__(self, storage): utility.EventDispatcher.__init__(self) self.storage = storage - self._callback_list = [] + self._callbackList = [] - def supports_publisher_affiliation(self): + + def supportsPublisherAffiliation(self): return True - def supports_outcast_affiliation(self): + + def supportsOutcastAffiliation(self): return True - def supports_persistent_items(self): + + def supportsPersistentItems(self): return True - def get_node_type(self, node_id): - d = self.storage.get_node(node_id) - d.addCallback(lambda node: node.get_type()) + + def getNodeType(self, nodeIdentifier): + d = self.storage.getNode(nodeIdentifier) + d.addCallback(lambda node: node.getType()) return d - def get_nodes(self): - return self.storage.get_node_ids() + + def getNodes(self): + return self.storage.getNodeIds() + - def get_node_meta_data(self, node_id): - d = self.storage.get_node(node_id) - d.addCallback(lambda node: node.get_meta_data()) - d.addCallback(self._make_meta_data) + def getNodeMetaData(self, nodeIdentifier): + d = self.storage.getNode(nodeIdentifier) + d.addCallback(lambda node: node.getMetaData()) + d.addCallback(self._makeMetaData) return d - def _make_meta_data(self, meta_data): + + def _makeMetaData(self, metaData): options = [] - for key, value in meta_data.iteritems(): + for key, value in metaData.iteritems(): if self.options.has_key(key): option = {"var": key} option.update(self.options[key]) @@ -89,99 +116,112 @@ return options - def _check_auth(self, node, requestor): + + def _checkAuth(self, node, requestor): def check(affiliation, node): if affiliation not in ['owner', 'publisher']: raise error.Forbidden() return node - d = node.get_affiliation(requestor) + d = node.getAffiliation(requestor) d.addCallback(check, node) return d - def publish(self, node_id, items, requestor): - d = self.storage.get_node(node_id) - d.addCallback(self._check_auth, requestor) - d.addCallback(self._do_publish, items, requestor) + + def publish(self, nodeIdentifier, items, requestor): + d = self.storage.getNode(nodeIdentifier) + d.addCallback(self._checkAuth, requestor) + d.addCallback(self._doPublish, items, requestor) return d - def _do_publish(self, node, items, requestor): - configuration = node.get_configuration() - persist_items = configuration["pubsub#persist_items"] - deliver_payloads = configuration["pubsub#deliver_payloads"] - if items and not persist_items and not deliver_payloads: + def _doPublish(self, node, items, requestor): + configuration = node.getConfiguration() + persistItems = configuration["pubsub#persist_items"] + deliverPayloads = configuration["pubsub#deliver_payloads"] + + if items and not persistItems and not deliverPayloads: raise error.ItemForbidden() - elif not items and (persist_items or deliver_payloads): + elif not items and (persistItems or deliverPayloads): raise error.ItemRequired() - if persist_items or deliver_payloads: + if persistItems or deliverPayloads: for item in items: if not item.getAttribute("id"): item["id"] = str(uuid.uuid4()) - if persist_items: - d = node.store_items(items, requestor) + if persistItems: + d = node.storeItems(items, requestor) else: d = defer.succeed(None) - d.addCallback(self._do_notify, node.id, items, deliver_payloads) + d.addCallback(self._doNotify, node.nodeIdentifier, items, + deliverPayloads) return d - def _do_notify(self, result, node_id, items, deliver_payloads): - if items and not deliver_payloads: + + def _doNotify(self, result, nodeIdentifier, items, deliverPayloads): + if items and not deliverPayloads: for item in items: item.children = [] - self.dispatch({'items': items, 'node_id': node_id}, + self.dispatch({'items': items, 'nodeIdentifier': nodeIdentifier}, '//event/pubsub/notify') - def get_notification_list(self, node_id, items): - d = self.storage.get_node(node_id) - d.addCallback(lambda node: node.get_subscribers()) - d.addCallback(self._magic_filter, node_id, items) + + def getNotificationList(self, nodeIdentifier, items): + d = self.storage.getNode(nodeIdentifier) + d.addCallback(lambda node: node.getSubscribers()) + d.addCallback(self._magicFilter, nodeIdentifier, items) return d - def _magic_filter(self, subscribers, node_id, items): + + def _magicFilter(self, subscribers, nodeIdentifier, items): list = [] for subscriber in subscribers: list.append((subscriber, items)) return list - def register_notifier(self, observerfn, *args, **kwargs): + + def registerNotifier(self, observerfn, *args, **kwargs): self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs) - def subscribe(self, node_id, subscriber, requestor): - subscriber_entity = subscriber.userhostJID() - if subscriber_entity != requestor: + + def subscribe(self, nodeIdentifier, subscriber, requestor): + subscriberEntity = subscriber.userhostJID() + if subscriberEntity != requestor: return defer.fail(error.Forbidden()) - d = self.storage.get_node(node_id) - d.addCallback(_get_affiliation, subscriber_entity) - d.addCallback(self._do_subscribe, subscriber) + d = self.storage.getNode(nodeIdentifier) + d.addCallback(_getAffiliation, subscriberEntity) + d.addCallback(self._doSubscribe, subscriber) return d - def _do_subscribe(self, result, subscriber): + + def _doSubscribe(self, result, subscriber): node, affiliation = result if affiliation == 'outcast': raise error.Forbidden() - d = node.add_subscription(subscriber, 'subscribed') - d.addCallback(lambda _: self._send_last_published(node, subscriber)) + d = node.addSubscription(subscriber, 'subscribed') + d.addCallback(lambda _: self._sendLastPublished(node, subscriber)) d.addCallback(lambda _: 'subscribed') - d.addErrback(self._get_subscription, node, subscriber) - d.addCallback(self._return_subscription, node.id) + d.addErrback(self._getSubscription, node, subscriber) + d.addCallback(self._returnSubscription, node.nodeIdentifier) return d - def _get_subscription(self, failure, node, subscriber): + + def _getSubscription(self, failure, node, subscriber): failure.trap(error.SubscriptionExists) - return node.get_subscription(subscriber) + return node.getSubscription(subscriber) + - def _return_subscription(self, result, node_id): - return node_id, result + def _returnSubscription(self, result, nodeIdentifier): + return nodeIdentifier, result - def _send_last_published(self, node, subscriber): + + def _sendLastPublished(self, node, subscriber): class StringParser(object): def __init__(self): self.elementStream = domish.elementStream() @@ -202,61 +242,69 @@ self.elementStream.parse(string) return self.document - def notify_item(result): + def notifyItem(result): if not result: return items = [domish.SerializedXML(item) for item in result] - reactor.callLater(0, self.dispatch, {'items': items, - 'node_id': node.id, - 'subscriber': subscriber}, - '//event/pubsub/notify') + reactor.callLater(0, self.dispatch, + {'items': items, + 'nodeIdentifier': node.nodeIdentifier, + 'subscriber': subscriber}, + '//event/pubsub/notify') - config = node.get_configuration() + config = node.getConfiguration() if config.get("pubsub#send_last_published_item", 'never') != 'on_sub': return - d = self.get_items(node.id, subscriber.userhostJID(), 1) - d.addCallback(notify_item) + d = self.getItems(node.nodeIdentifier, subscriber.userhostJID(), 1) + d.addCallback(notifyItem) - def unsubscribe(self, node_id, subscriber, requestor): + + def unsubscribe(self, nodeIdentifier, subscriber, requestor): if subscriber.userhostJID() != requestor: return defer.fail(error.Forbidden()) - d = self.storage.get_node(node_id) - d.addCallback(lambda node: node.remove_subscription(subscriber)) + d = self.storage.getNode(nodeIdentifier) + d.addCallback(lambda node: node.removeSubscription(subscriber)) return d - def get_subscriptions(self, entity): - return self.storage.get_subscriptions(entity) - def supports_instant_nodes(self): + def getSubscriptions(self, entity): + return self.storage.getSubscriptions(entity) + + + def supportsInstantNodes(self): return True - def create_node(self, node_id, requestor): - if not node_id: - node_id = 'generic/%s' % uuid.uuid4() - d = self.storage.create_node(node_id, requestor) - d.addCallback(lambda _: node_id) + + def createNode(self, nodeIdentifier, requestor): + if not nodeIdentifier: + nodeIdentifier = 'generic/%s' % uuid.uuid4() + d = self.storage.createNode(nodeIdentifier, requestor) + d.addCallback(lambda _: nodeIdentifier) return d - def get_default_configuration(self): - d = defer.succeed(self.default_config) - d.addCallback(self._make_config) + + def getDefaultConfiguration(self): + d = defer.succeed(self.defaultConfig) + d.addCallback(self._makeConfig) return d - def get_node_configuration(self, node_id): - if not node_id: + + def getNodeConfiguration(self, nodeIdentifier): + if not nodeIdentifier: return defer.fail(error.NoRootNode()) - d = self.storage.get_node(node_id) - d.addCallback(lambda node: node.get_configuration()) + d = self.storage.getNode(nodeIdentifier) + d.addCallback(lambda node: node.getConfiguration()) - d.addCallback(self._make_config) + d.addCallback(self._makeConfig) return d - def _make_config(self, config): + + def _makeConfig(self, config): options = [] for key, value in self.options.iteritems(): option = {"var": key} @@ -267,8 +315,9 @@ return options - def set_node_configuration(self, node_id, options, requestor): - if not node_id: + + def setNodeConfiguration(self, nodeIdentifier, options, requestor): + if not nodeIdentifier: return defer.fail(error.NoRootNode()) for key, value in options.iteritems(): @@ -280,126 +329,146 @@ except ValueError: return defer.fail(error.InvalidConfigurationValue()) - d = self.storage.get_node(node_id) - d.addCallback(_get_affiliation, requestor) - d.addCallback(self._do_set_node_configuration, options) + d = self.storage.getNode(nodeIdentifier) + d.addCallback(_getAffiliation, requestor) + d.addCallback(self._doSetNodeConfiguration, options) return d - def _do_set_node_configuration(self, result, options): + + def _doSetNodeConfiguration(self, result, options): node, affiliation = result if affiliation != 'owner': raise error.Forbidden() - return node.set_configuration(options) + return node.setConfiguration(options) + - def get_affiliations(self, entity): - return self.storage.get_affiliations(entity) + def getAffiliations(self, entity): + return self.storage.getAffiliations(entity) - def get_items(self, node_id, requestor, max_items=None, item_ids=[]): - d = self.storage.get_node(node_id) - d.addCallback(_get_affiliation, requestor) - d.addCallback(self._do_get_items, max_items, item_ids) + + def getItems(self, nodeIdentifier, requestor, maxItems=None, + itemIdentifiers=None): + d = self.storage.getNode(nodeIdentifier) + d.addCallback(_getAffiliation, requestor) + d.addCallback(self._doGetItems, maxItems, itemIdentifiers) return d - def _do_get_items(self, result, max_items, item_ids): + + def _doGetItems(self, result, maxItems, itemIdentifiers): node, affiliation = result if affiliation == 'outcast': raise error.Forbidden() - if item_ids: - return node.get_items_by_id(item_ids) + if itemIdentifiers: + return node.getItemsById(itemIdentifiers) else: - return node.get_items(max_items) + return node.getItems(maxItems) - def retract_item(self, node_id, item_ids, requestor): - d = self.storage.get_node(node_id) - d.addCallback(_get_affiliation, requestor) - d.addCallback(self._do_retract, item_ids) + + def retractItem(self, nodeIdentifier, itemIdentifiers, requestor): + d = self.storage.getNode(nodeIdentifier) + d.addCallback(_getAffiliation, requestor) + d.addCallback(self._doRetract, itemIdentifiers) return d - def _do_retract(self, result, item_ids): + + def _doRetract(self, result, itemIdentifiers): node, affiliation = result - persist_items = node.get_configuration()["pubsub#persist_items"] + persistItems = node.getConfiguration()["pubsub#persist_items"] if affiliation not in ['owner', 'publisher']: raise error.Forbidden() - if not persist_items: + if not persistItems: raise error.NodeNotPersistent() - d = node.remove_items(item_ids) - d.addCallback(self._do_notify_retraction, node.id) + d = node.removeItems(itemIdentifiers) + d.addCallback(self._doNotifyRetraction, node.nodeIdentifier) return d - def _do_notify_retraction(self, item_ids, node_id): - self.dispatch({ 'item_ids': item_ids, 'node_id': node_id }, - '//event/pubsub/retract') + + def _doNotifyRetraction(self, itemIdentifiers, nodeIdentifier): + self.dispatch({'itemIdentifiers': itemIdentifiers, + 'nodeIdentifier': nodeIdentifier }, + '//event/pubsub/retract') - def purge_node(self, node_id, requestor): - d = self.storage.get_node(node_id) - d.addCallback(_get_affiliation, requestor) - d.addCallback(self._do_purge) + + def purgeNode(self, nodeIdentifier, requestor): + d = self.storage.getNode(nodeIdentifier) + d.addCallback(_getAffiliation, requestor) + d.addCallback(self._doPurge) return d - def _do_purge(self, result): + + def _doPurge(self, result): node, affiliation = result - persist_items = node.get_configuration()["pubsub#persist_items"] + persistItems = node.getConfiguration()["pubsub#persist_items"] if affiliation != 'owner': raise error.Forbidden() - if not persist_items: + if not persistItems: raise error.NodeNotPersistent() d = node.purge() - d.addCallback(self._do_notify_purge, node.id) + d.addCallback(self._doNotifyPurge, node.nodeIdentifier) return d - def _do_notify_purge(self, result, node_id): - self.dispatch(node_id, '//event/pubsub/purge') + + def _doNotifyPurge(self, result, nodeIdentifier): + self.dispatch(nodeIdentifier, '//event/pubsub/purge') + - def register_pre_delete(self, pre_delete_fn): - self._callback_list.append(pre_delete_fn) + def registerPreDelete(self, preDeleteFn): + self._callbackList.append(preDeleteFn) + - def get_subscribers(self, node_id): - d = self.storage.get_node(node_id) - d.addCallback(lambda node: node.get_subscribers()) + def getSubscribers(self, nodeIdentifier): + d = self.storage.getNode(nodeIdentifier) + d.addCallback(lambda node: node.getSubscribers()) return d - def delete_node(self, node_id, requestor): - d = self.storage.get_node(node_id) - d.addCallback(_get_affiliation, requestor) - d.addCallback(self._do_pre_delete) + + def deleteNode(self, nodeIdentifier, requestor): + d = self.storage.getNode(nodeIdentifier) + d.addCallback(_getAffiliation, requestor) + d.addCallback(self._doPreDelete) return d - def _do_pre_delete(self, result): + + def _doPreDelete(self, result): node, affiliation = result if affiliation != 'owner': raise error.Forbidden() - d = defer.DeferredList([cb(node.id) for cb in self._callback_list], + d = defer.DeferredList([cb(node.nodeIdentifier) + for cb in self._callbackList], consumeErrors=1) - d.addCallback(self._do_delete, node.id) + d.addCallback(self._doDelete, node.nodeIdentifier) - def _do_delete(self, result, node_id): + + def _doDelete(self, result, nodeIdentifier): dl = [] for succeeded, r in result: if succeeded and r: dl.extend(r) - d = self.storage.delete_node(node_id) - d.addCallback(self._do_notify_delete, dl) + d = self.storage.deleteNode(nodeIdentifier) + d.addCallback(self._doNotifyDelete, dl) return d - def _do_notify_delete(self, result, dl): + + def _doNotifyDelete(self, result, dl): for d in dl: d.callback(None) + class PubSubServiceFromBackend(PubSubService): """ Adapts a backend to an xmpp publish-subscribe service. @@ -433,8 +502,9 @@ self.pubSubFeatures = self._getPubSubFeatures() - self.backend.register_notifier(self._notify) - self.backend.register_pre_delete(self._pre_delete) + self.backend.registerNotifier(self._notify) + self.backend.registerPreDelete(self._preDelete) + def _getPubSubFeatures(self): features = [ @@ -454,38 +524,41 @@ "subscribe", ] - if self.backend.supports_instant_nodes(): + if self.backend.supportsInstantNodes(): features.append("instant-nodes") - if self.backend.supports_outcast_affiliation(): + if self.backend.supportsOutcastAffiliation(): features.append("outcast-affiliation") - if self.backend.supports_persistent_items(): + if self.backend.supportsPersistentItems(): features.append("persistent-items") - if self.backend.supports_publisher_affiliation(): + if self.backend.supportsPublisherAffiliation(): features.append("publisher-affiliation") return features + def _notify(self, data): items = data['items'] - nodeIdentifier = data['node_id'] + nodeIdentifier = data['nodeIdentifier'] if 'subscriber' not in data: - d = self.backend.get_notification_list(nodeIdentifier, items) + d = self.backend.getNotificationList(nodeIdentifier, items) else: d = defer.succeed([(data['subscriber'], items)]) d.addCallback(lambda notifications: self.notifyPublish(self.serviceJID, nodeIdentifier, notifications)) - def _pre_delete(self, nodeIdentifier): - d = self.backend.get_subscribers(nodeIdentifier) + + def _preDelete(self, nodeIdentifier): + d = self.backend.getSubscribers(nodeIdentifier) d.addCallback(lambda subscribers: self.notifyDelete(self.serviceJID, nodeIdentifier, subscribers)) return d + def _mapErrors(self, failure): e = failure.trap(*self._errorMap.keys()) @@ -499,6 +572,7 @@ raise exc + def getNodeInfo(self, requestor, service, nodeIdentifier): info = {} @@ -511,72 +585,87 @@ return info d = defer.succeed(nodeIdentifier) - d.addCallback(self.backend.get_node_type) + d.addCallback(self.backend.getNodeType) d.addCallback(saveType) - d.addCallback(self.backend.get_node_meta_data) + d.addCallback(self.backend.getNodeMetaData) d.addCallback(saveMetaData) d.addErrback(self._mapErrors) return d + def getNodes(self, requestor, service): if service.resource: return defer.succeed([]) - d = self.backend.get_nodes() + d = self.backend.getNodes() return d.addErrback(self._mapErrors) + def publish(self, requestor, service, nodeIdentifier, items): d = self.backend.publish(nodeIdentifier, items, requestor) return d.addErrback(self._mapErrors) + def subscribe(self, requestor, service, nodeIdentifier, subscriber): d = self.backend.subscribe(nodeIdentifier, subscriber, requestor) return d.addErrback(self._mapErrors) + def unsubscribe(self, requestor, service, nodeIdentifier, subscriber): d = self.backend.unsubscribe(nodeIdentifier, subscriber, requestor) return d.addErrback(self._mapErrors) + def subscriptions(self, requestor, service): - d = self.backend.get_subscriptions(requestor) + d = self.backend.getSubscriptions(requestor) return d.addErrback(self._mapErrors) + def affiliations(self, requestor, service): - d = self.backend.get_affiliations(requestor) + d = self.backend.getAffiliations(requestor) return d.addErrback(self._mapErrors) + def create(self, requestor, service, nodeIdentifier): - d = self.backend.create_node(nodeIdentifier, requestor) + d = self.backend.createNode(nodeIdentifier, requestor) return d.addErrback(self._mapErrors) + def getDefaultConfiguration(self, requestor, service): - d = self.backend.get_default_configuration() + d = self.backend.getDefaultConfiguration() return d.addErrback(self._mapErrors) + def getConfiguration(self, requestor, service, nodeIdentifier): - d = self.backend.get_node_configuration(nodeIdentifier) + d = self.backend.getNodeConfiguration(nodeIdentifier) return d.addErrback(self._mapErrors) + def setConfiguration(self, requestor, service, nodeIdentifier, options): - d = self.backend.set_node_configuration(nodeIdentifier, options, + d = self.backend.setNodeConfiguration(nodeIdentifier, options, requestor) return d.addErrback(self._mapErrors) - def items(self, requestor, service, nodeIdentifier, maxItems, itemIdentifiers): - d = self.backend.get_items(nodeIdentifier, requestor, maxItems, + + def items(self, requestor, service, nodeIdentifier, maxItems, + itemIdentifiers): + d = self.backend.getItems(nodeIdentifier, requestor, maxItems, itemIdentifiers) return d.addErrback(self._mapErrors) + def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): - d = self.backend.retract_item(nodeIdentifier, itemIdentifiers, + d = self.backend.retractItem(nodeIdentifier, itemIdentifiers, requestor) return d.addErrback(self._mapErrors) + def purge(self, requestor, service, nodeIdentifier): - d = self.backend.purge_node(nodeIdentifier, requestor) + d = self.backend.purgeNode(nodeIdentifier, requestor) return d.addErrback(self._mapErrors) + def delete(self, requestor, service, nodeIdentifier): - d = self.backend.delete_node(nodeIdentifier, requestor) + d = self.backend.deleteNode(nodeIdentifier, requestor) return d.addErrback(self._mapErrors) components.registerAdapter(PubSubServiceFromBackend,
--- a/idavoll/gateway.py Thu Jun 18 11:54:56 2009 +0000 +++ b/idavoll/gateway.py Tue Jun 10 11:31:49 2008 +0000 @@ -138,7 +138,7 @@ stream = simplejson.dumps({'uri': uri}) return http.Response(responsecode.OK, stream=stream) - d = self.backend.create_node(None, self.owner) + d = self.backend.createNode(None, self.owner) d.addCallback(toResponse) return d @@ -184,7 +184,7 @@ "Malformed XMPP URI: %s" % failure.value.message) d = getNode() - d.addCallback(self.backend.delete_node, self.owner) + d.addCallback(self.backend.deleteNode, self.owner) d.addCallback(respond) d.addErrback(trapNotFound) d.addErrback(trapXMPPURIParseError) @@ -252,7 +252,7 @@ jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0]) return defer.succeed(nodeIdentifier) else: - return self.backend.create_node(None, self.owner) + return self.backend.createNode(None, self.owner) def doPublish(payload): d = getNode() @@ -289,7 +289,7 @@ return http.Response(responsecode.OK, stream=simplejson.dumps(nodeIdentifiers)) - d = self.service.get_nodes() + d = self.service.getNodes() d.addCallback(responseFromNodes) return d @@ -325,6 +325,8 @@ return atomEntries + + def constructFeed(service, nodeIdentifier, entries, title): nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier) now = strftime("%Y-%m-%dT%H:%M:%SZ", gmtime()) @@ -340,6 +342,8 @@ return feed + + class RemoteSubscriptionService(service.Service, PubSubClient): """ Service for subscribing to remote XMPP Publish-Subscribe nodes. @@ -572,6 +576,7 @@ def __init__(self, service): self.service = service + def render(self, request): try: maxItems = int(request.args.get('max_items', [0])[0]) or None @@ -580,14 +585,16 @@ "The argument max_items has an invalid value.") try: - jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0]) + uri = request.args['uri'][0] except KeyError: return http.StatusResponse(responsecode.BAD_REQUEST, "No URI for the remote node provided.") + + try: + jid, nodeIdentifier = getServiceAndNode(uri) except XMPPURIParseError: return http.StatusResponse(responsecode.BAD_REQUEST, - "Malformed XMPP URI: %s" % failure.value.message) - + "Malformed XMPP URI: %s" % uri) def respond(items): """Create a feed out the retrieved items."""
--- a/idavoll/iidavoll.py Thu Jun 18 11:54:56 2009 +0000 +++ b/idavoll/iidavoll.py Tue Jun 10 11:31:49 2008 +0000 @@ -5,211 +5,238 @@ Interfaces for idavoll. """ -from zope.interface import Interface +from zope.interface import Attribute, Interface class IBackendService(Interface): """ Interface to a backend service of a pubsub service. """ + def __init__(storage): """ @param storage: L{storage} object. """ - def supports_publisher_affiliation(): + + def supportsPublisherAffiliation(): """ Reports if the backend supports the publisher affiliation. @rtype: C{bool} """ - def supports_outcast_affiliation(): + + def supportsOutcastAffiliation(): """ Reports if the backend supports the publisher affiliation. @rtype: C{bool} """ - def supports_persistent_items(): + + def supportsPersistentItems(): """ Reports if the backend supports persistent items. @rtype: C{bool} """ - def get_node_type(node_id): + + def getNodeType(nodeIdentifier): """ Return type of a node. @return: a deferred that returns either 'leaf' or 'collection' """ - def get_nodes(): + + def getNodes(): """ Returns list of all nodes. @return: a deferred that returns a C{list} of node ids. """ - def get_node_meta_data(node_id): + + def getNodeMetaData(nodeIdentifier): """ Return meta data for a node. @return: a deferred that returns a C{list} of C{dict}s with the metadata. """ - def create_node(node_id, requestor): + + def createNode(nodeIdentifier, requestor): """ Create a node. @return: a deferred that fires when the node has been created. """ - def register_pre_delete(pre_delete_fn): + + def registerPreDelete(preDeleteFn): """ Register a callback that is called just before a node deletion. - The function C{pre_deleted_fn} is added to a list of functions - to be called just before deletion of a node. The callback - C{pre_delete_fn} is called with the C{node_id} that is about to be - deleted and should return a deferred that returns a list of deferreds - that are to be fired after deletion. The backend collects the lists - from all these callbacks before actually deleting the node in question. - After deletion all collected deferreds are fired to do post-processing. + The function C{preDeletedFn} is added to a list of functions to be + called just before deletion of a node. The callback C{preDeleteFn} is + called with the C{nodeIdentifier} that is about to be deleted and + should return a deferred that returns a list of deferreds that are to + be fired after deletion. The backend collects the lists from all these + callbacks before actually deleting the node in question. After + deletion all collected deferreds are fired to do post-processing. - The idea is that you want to be able to collect data from the - node before deleting it, for example to get a list of subscribers - that have to be notified after the node has been deleted. To do this, - C{pre_delete_fn} fetches the subscriber list and passes this - list to a callback attached to a deferred that it sets up. This - deferred is returned in the list of deferreds. + The idea is that you want to be able to collect data from the node + before deleting it, for example to get a list of subscribers that have + to be notified after the node has been deleted. To do this, + C{preDeleteFn} fetches the subscriber list and passes this list to a + callback attached to a deferred that it sets up. This deferred is + returned in the list of deferreds. """ - def delete_node(node_id, requestor): + + def deleteNode(nodeIdentifier, requestor): """ Delete a node. @return: a deferred that fires when the node has been deleted. """ - def purge_node(node_id, requestor): + + def purgeNode(nodeIdentifier, requestor): """ Removes all items in node from persistent storage """ - def subscribe(node_id, subscriber, requestor): + + def subscribe(nodeIdentifier, subscriber, requestor): """ Request the subscription of an entity to a pubsub node. Depending on the node's configuration and possible business rules, the C{subscriber} is added to the list of subscriptions of the node with id - C{node_id}. The C{subscriber} might be different from the C{requestor}, - and if the C{requestor} is not allowed to subscribe this entity an - exception should be raised. + C{nodeIdentifier}. The C{subscriber} might be different from the + C{requestor}, and if the C{requestor} is not allowed to subscribe this + entity an exception should be raised. @return: a deferred that returns the subscription state """ - def unsubscribe(node_id, subscriber, requestor): + + def unsubscribe(nodeIdentifier, subscriber, requestor): """ Cancel the subscription of an entity to a pubsub node. The subscription of C{subscriber} is removed from the list of - subscriptions of the node with id C{node_id}. If the C{requestor} - is not allowed to unsubscribe C{subscriber}, an an exception should - be raised. + subscriptions of the node with id C{nodeIdentifier}. If the + C{requestor} is not allowed to unsubscribe C{subscriber}, an an + exception should be raised. @return: a deferred that fires when unsubscription is complete. """ - def get_subscribers(node_id): + + def getSubscribers(nodeIdentifier): """ Get node subscriber list. @return: a deferred that fires with the list of subscribers. """ - def get_subscriptions(entity): + + def getSubscriptions(entity): """ Report the list of current subscriptions with this pubsub service. Report the list of the current subscriptions with all nodes within this pubsub service, for the C{entity}. @return: a deferred that returns the list of all current subscriptions - as tuples C{(node_id, subscriber, subscription)}. + as tuples C{(nodeIdentifier, subscriber, subscription)}. """ - def get_affiliations(entity): + + def getAffiliations(entity): """ Report the list of current affiliations with this pubsub service. Report the list of the current affiliations with all nodes within this pubsub service, for the C{entity}. @return: a deferred that returns the list of all current affiliations - as tuples C{(node_id, affiliation)}. + as tuples C{(nodeIdentifier, affiliation)}. """ - def publish(node_id, items, requestor): + + def publish(nodeIdentifier, items, requestor): """ Publish items to a pubsub node. @return: a deferred that fires when the items have been published. @rtype: L{Deferred<twisted.internet.defer.Deferred>} """ - def register_notifier(observerfn, *args, **kwargs): + + def registerNotifier(observerfn, *args, **kwargs): """ Register callback which is called for notification. """ - def get_notification_list(node_id, items): + + def getNotificationList(nodeIdentifier, items): """ Get list of entities to notify. """ - def get_items(node_id, requestor, max_items=None, item_ids=[]): + + def getItems(nodeIdentifier, requestor, maxItems=None, itemIdentifiers=[]): """ Retrieve items from persistent storage - If C{max_items} is given, return the C{max_items} last published - items, else if C{item_ids} is not empty, return the items requested. - If neither is given, return all items. + If C{maxItems} is given, return the C{maxItems} last published + items, else if C{itemIdentifiers} is not empty, return the items + requested. If neither is given, return all items. @return: a deferred that returns the requested items """ - def retract_item(node_id, item_id, requestor): + + def retractItem(nodeIdentifier, itemIdentifier, requestor): """ Removes item in node from persistent storage """ + class IStorage(Interface): """ Storage interface. """ - def get_node(node_id): + + def getNode(nodeIdentifier): """ Get Node. - @param node_id: NodeID of the desired node. - @type node_id: L{str} + @param nodeIdentifier: NodeID of the desired node. + @type nodeIdentifier: L{str} @return: deferred that returns a L{Node} object. """ - def get_node_ids(): + + def getNodeIds(): """ Return all NodeIDs. @return: deferred that returns a list of NodeIDs (L{str}). """ - def create_node(node_id, owner, config=None): + + def createNode(nodeIdentifier, owner, config=None): """ Create new node. The implementation should make sure, the passed owner JID is stripped of the resource (e.g. using C{owner.userhostJID()}). - @param node_id: NodeID of the new node. - @type node_id: L{str} + @param nodeIdentifier: NodeID of the new node. + @type nodeIdentifier: L{str} @param owner: JID of the new nodes's owner. @type owner: L{jid.JID} @param config: Configuration @return: deferred that fires on creation. """ - def delete_node(node_id): + + def deleteNode(nodeIdentifier): """ Delete a node. - @param node_id: NodeID of the new node. - @type node_id: L{str} + @param nodeIdentifier: NodeID of the new node. + @type nodeIdentifier: L{str} @return: deferred that fires on deletion. """ - def get_affiliations(entity): + + def getAffiliations(entity): """ Get all affiliations for entity. @@ -219,12 +246,13 @@ @param entity: JID of the entity. @type entity: L{jid.JID} @return: deferred that returns a L{list} of tuples of the form - C{(node_id, affiliation)}, where C{node_id} is of the type - L{str} and C{affiliation} is one of C{'owner'}, C{'publisher'} - and C{'outcast'}. + C{(nodeIdentifier, affiliation)}, where C{nodeIdentifier} is + of the type L{str} and C{affiliation} is one of C{'owner'}, + C{'publisher'} and C{'outcast'}. """ - def get_subscriptions(entity): + + def getSubscriptions(entity): """ Get all subscriptions for an entity. @@ -234,25 +262,33 @@ @param entity: JID of the entity. @type entity: L{jid.JID} @return: deferred that returns a L{list} of tuples of the form - C{(node_id, subscriber, state)}, where C{node_id} is of the - type L{str}, C{subscriber} of the type {jid.JID}, and - C{state} is C{'subscribed'} or C{'pending'}. + C{(nodeIdentifier, subscriber, state)}, where + C{nodeIdentifier} is of the type L{str}, C{subscriber} of the + type {jid.JID}, and C{state} is C{'subscribed'} or + C{'pending'}. """ + class INode(Interface): """ Interface to the class of objects that represent nodes. """ - def get_type(): + nodeType = Attribute("""The type of this node. One of {'leaf'}, + {'collection'}.""") + nodeIdentifier = Attribute("""The node identifer of this node""") + + + def getType(): """ Get node's type. @return: C{'leaf'} or C{'collection'}. """ - def get_configuration(): + + def getConfiguration(): """ Get node's configuration. @@ -262,7 +298,8 @@ @return: L{dict} of configuration options. """ - def get_meta_data(): + + def getMetaData(): """ Get node's meta data. @@ -272,7 +309,8 @@ @return: L{dict} of meta data. """ - def set_configuration(options): + + def setConfiguration(options): """ Set node's configuration. @@ -284,7 +322,8 @@ @returns: a deferred that fires upon success. """ - def get_affiliation(entity): + + def getAffiliation(entity): """ Get affiliation of entity with this node. @@ -294,7 +333,8 @@ or C{None}. """ - def get_subscription(subscriber): + + def getSubscription(subscriber): """ Get subscription to this node of subscriber. @@ -304,7 +344,8 @@ C{'pending'} or C{None}). """ - def add_subscription(subscriber, state): + + def addSubscription(subscriber, state): """ Add new subscription to this node with given state. @@ -315,7 +356,8 @@ @return: deferred that fires on subscription. """ - def remove_subscription(subscriber): + + def removeSubscription(subscriber): """ Remove subscription to this node. @@ -324,7 +366,8 @@ @return: deferred that fires on removal. """ - def get_subscribers(): + + def getSubscribers(): """ Get list of subscribers to this node. @@ -334,7 +377,8 @@ @return: a deferred that returns a L{list} of L{jid.JID}s. """ - def is_subscribed(entity): + + def isSubscribed(entity): """ Returns whether entity has any subscription to this node. @@ -346,7 +390,8 @@ @return: deferred that returns a L{bool}. """ - def get_affiliations(): + + def getAffiliations(): """ Get affiliations of entities with this node. @@ -355,12 +400,14 @@ C{'publisher'}, C{'outcast'}. """ + + class ILeafNode(Interface): """ Interface to the class of objects that represent leaf nodes. """ - def store_items(items, publisher): + def storeItems(items, publisher): """ Store items in persistent storage for later retrieval. @@ -374,30 +421,33 @@ @return: deferred that fires upon success. """ - def remove_items(item_ids): + + def removeItems(itemIdentifiers): """ Remove items by id. - @param item_ids: L{list} of item ids. + @param itemIdentifiers: L{list} of item ids. @return: deferred that fires with a L{list} of ids of the items that were deleted """ - def get_items(max_items=None): + + def getItems(maxItems=None): """ Get items. - If C{max_items} is not given, all items in the node are returned, - just like C{get_items_by_id}. Otherwise, C{max_items} limits + If C{maxItems} is not given, all items in the node are returned, + just like C{getItemsById}. Otherwise, C{maxItems} limits the returned items to a maximum of that number of most recently published items. - @param max_items: if given, a natural number (>0) that limits the + @param maxItems: if given, a natural number (>0) that limits the returned number of items. @return: deferred that fires with a L{list} of found items. """ - def get_items_by_id(item_ids): + + def getItemsById(itemIdentifiers): """ Get items by item id. @@ -405,10 +455,11 @@ represent the XML of the item as it was published, including the item wrapper with item id. - @param item_ids: L{list} of item ids. + @param itemIdentifiers: L{list} of item ids. @return: deferred that fires with a L{list} of found items. """ + def purge(): """ Purge node of all items in persistent storage.
--- a/idavoll/memory_storage.py Thu Jun 18 11:54:56 2009 +0000 +++ b/idavoll/memory_storage.py Tue Jun 10 11:31:49 2008 +0000 @@ -8,7 +8,7 @@ from idavoll import error, iidavoll -default_config = {"pubsub#persist_items": True, +defaultConfig = {"pubsub#persist_items": True, "pubsub#deliver_payloads": True, "pubsub#send_last_published_item": 'on_sub', "pubsub#node_type": "leaf"} @@ -20,90 +20,103 @@ def __init__(self): self._nodes = {} - def get_node(self, node_id): + + def getNode(self, nodeIdentifier): try: - node = self._nodes[node_id] + node = self._nodes[nodeIdentifier] except KeyError: return defer.fail(error.NodeNotFound()) return defer.succeed(node) - def get_node_ids(self): + + def getNodeIds(self): return defer.succeed(self._nodes.keys()) - def create_node(self, node_id, owner, config=None): - if node_id in self._nodes: + + def createNode(self, nodeIdentifier, owner, config=None): + if nodeIdentifier in self._nodes: return defer.fail(error.NodeExists()) if not config: - config = copy.copy(default_config) + config = copy.copy(defaultConfig) if config['pubsub#node_type'] != 'leaf': raise NotImplementedError - node = LeafNode(node_id, owner, config) - self._nodes[node_id] = node + node = LeafNode(nodeIdentifier, owner, config) + self._nodes[nodeIdentifier] = node return defer.succeed(None) - def delete_node(self, node_id): + + def deleteNode(self, nodeIdentifier): try: - del self._nodes[node_id] + del self._nodes[nodeIdentifier] except KeyError: return defer.fail(error.NodeNotFound()) return defer.succeed(None) - def get_affiliations(self, entity): + + def getAffiliations(self, entity): entity = entity.userhost() - return defer.succeed([(node.id, node._affiliations[entity]) + return defer.succeed([(node.nodeIdentifier, node._affiliations[entity]) for name, node in self._nodes.iteritems() if entity in node._affiliations]) - def get_subscriptions(self, entity): + + def getSubscriptions(self, entity): subscriptions = [] for node in self._nodes.itervalues(): for subscriber, subscription in node._subscriptions.iteritems(): subscriber = jid.internJID(subscriber) if subscriber.userhostJID() == entity.userhostJID(): - subscriptions.append((node.id, subscriber, + subscriptions.append((node.nodeIdentifier, subscriber, subscription.state)) return defer.succeed(subscriptions) + class Node: implements(iidavoll.INode) - def __init__(self, node_id, owner, config): - self.id = node_id + def __init__(self, nodeIdentifier, owner, config): + self.nodeIdentifier = nodeIdentifier self._affiliations = {owner.userhost(): 'owner'} self._subscriptions = {} self._config = config - def get_type(self): - return self.type - def get_configuration(self): + def getType(self): + return self.nodeType + + + def getConfiguration(self): return self._config - def get_meta_data(self): + + def getMetaData(self): config = copy.copy(self._config) - config["pubsub#node_type"] = self.type + config["pubsub#node_type"] = self.nodeType return config - def set_configuration(self, options): + + def setConfiguration(self, options): for option in options: if option in self._config: self._config[option] = options[option] return defer.succeed(None) - def get_affiliation(self, entity): + + def getAffiliation(self, entity): return defer.succeed(self._affiliations.get(entity.full())) - def get_subscription(self, subscriber): + + def getSubscription(self, subscriber): try: subscription = self._subscriptions[subscriber.full()] except KeyError: @@ -112,7 +125,8 @@ state = subscription.state return defer.succeed(state) - def add_subscription(self, subscriber, state): + + def addSubscription(self, subscriber, state): if self._subscriptions.get(subscriber.full()): return defer.fail(error.SubscriptionExists()) @@ -120,7 +134,8 @@ self._subscriptions[subscriber.full()] = subscription return defer.succeed(None) - def remove_subscription(self, subscriber): + + def removeSubscription(self, subscriber): try: del self._subscriptions[subscriber.full()] except KeyError: @@ -128,14 +143,16 @@ return defer.succeed(None) - def get_subscribers(self): + + def getSubscribers(self): subscribers = [jid.internJID(subscriber) for subscriber, subscription in self._subscriptions.iteritems() if subscription.state == 'subscribed'] return defer.succeed(subscribers) - def is_subscribed(self, entity): + + def isSubscribed(self, entity): for subscriber, subscription in self._subscriptions.iteritems(): if jid.internJID(subscriber).userhost() == entity.userhost() and \ subscription.state == 'subscribed': @@ -143,68 +160,75 @@ return defer.succeed(False) - def get_affiliations(self): + + def getAffiliations(self): affiliations = [(jid.internJID(entity), affiliation) for entity, affiliation in self._affiliations.iteritems()] return defer.succeed(affiliations) + class LeafNodeMixin: - type = 'leaf' + nodeType = 'leaf' def __init__(self): self._items = {} self._itemlist = [] - def store_items(self, items, publisher): + + def storeItems(self, items, publisher): for data in items: - id = data["id"] + itemIdentifier = data["id"] data = data.toXml() if isinstance(data, str): data = data.decode('utf-8') item = (data, publisher) - if id in self._items: - self._itemlist.remove(self._items[id]) - self._items[id] = item + if itemIdentifier in self._items: + self._itemlist.remove(self._items[itemIdentifier]) + self._items[itemIdentifier] = item self._itemlist.append(item) return defer.succeed(None) - def remove_items(self, item_ids): + + def removeItems(self, itemIdentifiers): deleted = [] - for item_id in item_ids: + for itemIdentifier in itemIdentifiers: try: - item = self._items[item_id] + item = self._items[itemIdentifier] except KeyError: pass else: self._itemlist.remove(item) - del self._items[item_id] - deleted.append(item_id) + del self._items[itemIdentifier] + deleted.append(itemIdentifier) return defer.succeed(deleted) - def get_items(self, max_items=None): - if max_items: - list = self._itemlist[-max_items:] + + def getItems(self, maxItems=None): + if maxItems: + list = self._itemlist[-maxItems:] else: list = self._itemlist return defer.succeed([item[0] for item in list]) - def get_items_by_id(self, item_ids): + + def getItemsById(self, itemIdentifiers): items = [] - for item_id in item_ids: + for itemIdentifier in itemIdentifiers: try: - item = self._items[item_id] + item = self._items[itemIdentifier] except KeyError: pass else: items.append(item[0]) return defer.succeed(items) + def purge(self): self._items = {} self._itemlist = [] @@ -212,15 +236,17 @@ return defer.succeed(None) + class LeafNode(Node, LeafNodeMixin): implements(iidavoll.ILeafNode) - def __init__(self, node_id, owner, config): - Node.__init__(self, node_id, owner, config) + def __init__(self, nodeIdentifier, owner, config): + Node.__init__(self, nodeIdentifier, owner, config) LeafNodeMixin.__init__(self) + class Subscription: def __init__(self, state):
--- a/idavoll/pgsql_storage.py Thu Jun 18 11:54:56 2009 +0000 +++ b/idavoll/pgsql_storage.py Tue Jun 10 11:31:49 2008 +0000 @@ -23,16 +23,18 @@ client_encoding='utf-8' ) - def get_node(self, node_id): - return self._dbpool.runInteraction(self._get_node, node_id) - def _get_node(self, cursor, node_id): + def getNode(self, nodeIdentifier): + return self._dbpool.runInteraction(self._getNode, nodeIdentifier) + + + def _getNode(self, cursor, nodeIdentifier): configuration = {} cursor.execute("""SELECT persistent, deliver_payload, send_last_published_item FROM nodes WHERE node=%s""", - (node_id,)) + (nodeIdentifier,)) try: (configuration["pubsub#persist_items"], configuration["pubsub#deliver_payloads"], @@ -41,24 +43,27 @@ except TypeError: raise error.NodeNotFound() else: - node = LeafNode(node_id, configuration) + node = LeafNode(nodeIdentifier, configuration) node._dbpool = self._dbpool return node - def get_node_ids(self): + + def getNodeIds(self): d = self._dbpool.runQuery("""SELECT node from nodes""") d.addCallback(lambda results: [r[0] for r in results]) return d - def create_node(self, node_id, owner, config=None): - return self._dbpool.runInteraction(self._create_node, node_id, owner) - def _create_node(self, cursor, node_id, owner): - node_id = node_id + def createNode(self, nodeIdentifier, owner, config=None): + return self._dbpool.runInteraction(self._createNode, nodeIdentifier, + owner) + + + def _createNode(self, cursor, nodeIdentifier, owner): owner = owner.userhost() try: cursor.execute("""INSERT INTO nodes (node) VALUES (%s)""", - (node_id)) + (nodeIdentifier)) except cursor._pool.dbapi.OperationalError: raise error.NodeExists() @@ -75,19 +80,22 @@ (SELECT id FROM nodes WHERE node=%s) AS n CROSS JOIN (SELECT id FROM entities WHERE jid=%s) AS e""", - (node_id, owner)) + (nodeIdentifier, owner)) + - def delete_node(self, node_id): - return self._dbpool.runInteraction(self._delete_node, node_id) + def deleteNode(self, nodeIdentifier): + return self._dbpool.runInteraction(self._deleteNode, nodeIdentifier) + - def _delete_node(self, cursor, node_id): + def _deleteNode(self, cursor, nodeIdentifier): cursor.execute("""DELETE FROM nodes WHERE node=%s""", - (node_id,)) + (nodeIdentifier,)) if cursor.rowcount != 1: raise error.NodeNotFound() - def get_affiliations(self, entity): + + def getAffiliations(self, entity): d = self._dbpool.runQuery("""SELECT node, affiliation FROM entities JOIN affiliations ON (affiliations.entity_id=entities.id) @@ -98,7 +106,8 @@ d.addCallback(lambda results: [tuple(r) for r in results]) return d - def get_subscriptions(self, entity): + + def getSubscriptions(self, entity): d = self._dbpool.runQuery("""SELECT node, jid, resource, subscription FROM entities JOIN subscriptions ON (subscriptions.entity_id=entities.id) @@ -106,75 +115,86 @@ (nodes.id=subscriptions.node_id) WHERE jid=%s""", (entity.userhost(),)) - d.addCallback(self._convert_subscription_jids) + d.addCallback(self._convertSubscriptionJIDs) return d - def _convert_subscription_jids(self, subscriptions): + + def _convertSubscriptionJIDs(self, subscriptions): return [(node, jid.internJID('%s/%s' % (subscriber, resource)), subscription) for node, subscriber, resource, subscription in subscriptions] + class Node: implements(iidavoll.INode) - def __init__(self, node_id, config): - self.id = node_id + def __init__(self, nodeIdentifier, config): + self.nodeIdentifier = nodeIdentifier self._config = config - def _check_node_exists(self, cursor): + + def _checkNodeExists(self, cursor): cursor.execute("""SELECT id FROM nodes WHERE node=%s""", - (self.id)) + (self.nodeIdentifier)) if not cursor.fetchone(): raise error.NodeNotFound() - def get_type(self): - return self.type - def get_configuration(self): + def getType(self): + return self.nodeType + + + def getConfiguration(self): return self._config - def set_configuration(self, options): + + def setConfiguration(self, options): config = copy.copy(self._config) for option in options: if option in config: config[option] = options[option] - d = self._dbpool.runInteraction(self._set_configuration, config) - d.addCallback(self._set_cached_configuration, config) + d = self._dbpool.runInteraction(self._setConfiguration, config) + d.addCallback(self._setCachedConfiguration, config) return d - def _set_configuration(self, cursor, config): - self._check_node_exists(cursor) + + def _setConfiguration(self, cursor, config): + self._checkNodeExists(cursor) cursor.execute("""UPDATE nodes SET persistent=%s, deliver_payload=%s, send_last_published_item=%s WHERE node=%s""", (config["pubsub#persist_items"], config["pubsub#deliver_payloads"], config["pubsub#send_last_published_item"], - self.id)) + self.nodeIdentifier)) - def _set_cached_configuration(self, void, config): + + def _setCachedConfiguration(self, void, config): self._config = config - def get_meta_data(self): + + def getMetaData(self): config = copy.copy(self._config) - config["pubsub#node_type"] = self.type + config["pubsub#node_type"] = self.nodeType return config - def get_affiliation(self, entity): - return self._dbpool.runInteraction(self._get_affiliation, entity) + + def getAffiliation(self, entity): + return self._dbpool.runInteraction(self._getAffiliation, entity) - def _get_affiliation(self, cursor, entity): - self._check_node_exists(cursor) + + def _getAffiliation(self, cursor, entity): + self._checkNodeExists(cursor) cursor.execute("""SELECT affiliation FROM affiliations JOIN nodes ON (node_id=nodes.id) JOIN entities ON (entity_id=entities.id) WHERE node=%s AND jid=%s""", - (self.id, + (self.nodeIdentifier, entity.userhost())) try: @@ -182,11 +202,13 @@ except TypeError: return None - def get_subscription(self, subscriber): - return self._dbpool.runInteraction(self._get_subscription, subscriber) + + def getSubscription(self, subscriber): + return self._dbpool.runInteraction(self._getSubscription, subscriber) - def _get_subscription(self, cursor, subscriber): - self._check_node_exists(cursor) + + def _getSubscription(self, cursor, subscriber): + self._checkNodeExists(cursor) userhost = subscriber.userhost() resource = subscriber.resource or '' @@ -196,7 +218,7 @@ JOIN entities ON (entities.id=subscriptions.entity_id) WHERE node=%s AND jid=%s AND resource=%s""", - (self.id, + (self.nodeIdentifier, userhost, resource)) try: @@ -204,12 +226,14 @@ except TypeError: return None - def add_subscription(self, subscriber, state): - return self._dbpool.runInteraction(self._add_subscription, subscriber, + + def addSubscription(self, subscriber, state): + return self._dbpool.runInteraction(self._addSubscription, subscriber, state) - def _add_subscription(self, cursor, subscriber, state): - self._check_node_exists(cursor) + + def _addSubscription(self, cursor, subscriber, state): + self._checkNodeExists(cursor) userhost = subscriber.userhost() resource = subscriber.resource or '' @@ -229,17 +253,19 @@ (SELECT id FROM entities WHERE jid=%s) AS e""", (resource, state, - self.id, + self.nodeIdentifier, userhost)) except cursor._pool.dbapi.OperationalError: raise error.SubscriptionExists() - def remove_subscription(self, subscriber): - return self._dbpool.runInteraction(self._remove_subscription, + + def removeSubscription(self, subscriber): + return self._dbpool.runInteraction(self._removeSubscription, subscriber) - def _remove_subscription(self, cursor, subscriber): - self._check_node_exists(cursor) + + def _removeSubscription(self, cursor, subscriber): + self._checkNodeExists(cursor) userhost = subscriber.userhost() resource = subscriber.resource or '' @@ -248,7 +274,7 @@ node_id=(SELECT id FROM nodes WHERE node=%s) AND entity_id=(SELECT id FROM entities WHERE jid=%s) AND resource=%s""", - (self.id, + (self.nodeIdentifier, userhost, resource)) if cursor.rowcount != 1: @@ -256,29 +282,34 @@ return None - def get_subscribers(self): - d = self._dbpool.runInteraction(self._get_subscribers) - d.addCallback(self._convert_to_jids) + + def getSubscribers(self): + d = self._dbpool.runInteraction(self._getSubscribers) + d.addCallback(self._convertToJIDs) return d - def _get_subscribers(self, cursor): - self._check_node_exists(cursor) + + def _getSubscribers(self, cursor): + self._checkNodeExists(cursor) cursor.execute("""SELECT jid, resource FROM subscriptions JOIN nodes ON (node_id=nodes.id) JOIN entities ON (entity_id=entities.id) WHERE node=%s AND subscription='subscribed'""", - (self.id,)) + (self.nodeIdentifier,)) return cursor.fetchall() - def _convert_to_jids(self, list): + + def _convertToJIDs(self, list): return [jid.internJID("%s/%s" % (l[0], l[1])) for l in list] - def is_subscribed(self, entity): - return self._dbpool.runInteraction(self._is_subscribed, entity) + + def isSubscribed(self, entity): + return self._dbpool.runInteraction(self._isSubscribed, entity) - def _is_subscribed(self, cursor, entity): - self._check_node_exists(cursor) + + def _isSubscribed(self, cursor, entity): + self._checkNodeExists(cursor) cursor.execute("""SELECT 1 FROM entities JOIN subscriptions ON @@ -288,15 +319,17 @@ WHERE entities.jid=%s AND node=%s AND subscription='subscribed'""", (entity.userhost(), - self.id)) + self.nodeIdentifier)) return cursor.fetchone() is not None - def get_affiliations(self): - return self._dbpool.runInteraction(self._get_affiliations) + + def getAffiliations(self): + return self._dbpool.runInteraction(self._getAffiliations) - def _get_affiliations(self, cursor): - self._check_node_exists(cursor) + + def _getAffiliations(self, cursor): + self._checkNodeExists(cursor) cursor.execute("""SELECT jid, affiliation FROM nodes JOIN affiliations ON @@ -304,24 +337,28 @@ JOIN entities ON (affiliations.entity_id = entities.id) WHERE node=%s""", - self.id) + self.nodeIdentifier) result = cursor.fetchall() return [(jid.internJID(r[0]), r[1]) for r in result] + + class LeafNodeMixin: - type = 'leaf' + nodeType = 'leaf' - def store_items(self, items, publisher): - return self._dbpool.runInteraction(self._store_items, items, publisher) + def storeItems(self, items, publisher): + return self._dbpool.runInteraction(self._storeItems, items, publisher) + - def _store_items(self, cursor, items, publisher): - self._check_node_exists(cursor) + def _storeItems(self, cursor, items, publisher): + self._checkNodeExists(cursor) for item in items: - self._store_item(cursor, item, publisher) + self._storeItem(cursor, item, publisher) - def _store_item(self, cursor, item, publisher): + + def _storeItem(self, cursor, item, publisher): data = item.toXml() cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s FROM nodes @@ -329,7 +366,7 @@ nodes.node = %s and items.item=%s""", (publisher.full(), data, - self.id, + self.nodeIdentifier, item["id"])) if cursor.rowcount == 1: return @@ -339,72 +376,82 @@ (item["id"], publisher.full(), data, - self.id)) + self.nodeIdentifier)) + - def remove_items(self, item_ids): - return self._dbpool.runInteraction(self._remove_items, item_ids) + def removeItems(self, itemIdentifiers): + return self._dbpool.runInteraction(self._removeItems, itemIdentifiers) - def _remove_items(self, cursor, item_ids): - self._check_node_exists(cursor) + + def _removeItems(self, cursor, itemIdentifiers): + self._checkNodeExists(cursor) deleted = [] - for item_id in item_ids: + for itemIdentifier in itemIdentifiers: cursor.execute("""DELETE FROM items WHERE node_id=(SELECT id FROM nodes WHERE node=%s) AND item=%s""", - (self.id, - item_id)) + (self.nodeIdentifier, + itemIdentifier)) if cursor.rowcount: - deleted.append(item_id) + deleted.append(itemIdentifier) return deleted - def get_items(self, max_items=None): - return self._dbpool.runInteraction(self._get_items, max_items) + + def getItems(self, maxItems=None): + return self._dbpool.runInteraction(self._getItems, maxItems) - def _get_items(self, cursor, max_items): - self._check_node_exists(cursor) + + def _getItems(self, cursor, maxItems): + self._checkNodeExists(cursor) query = """SELECT data FROM nodes JOIN items ON (nodes.id=items.node_id) WHERE node=%s ORDER BY date DESC""" - if max_items: + if maxItems: cursor.execute(query + " LIMIT %s", - (self.id, - max_items)) + (self.nodeIdentifier, + maxItems)) else: - cursor.execute(query, (self.id)) + cursor.execute(query, (self.nodeIdentifier)) result = cursor.fetchall() return [unicode(r[0], 'utf-8') for r in result] - def get_items_by_id(self, item_ids): - return self._dbpool.runInteraction(self._get_items_by_id, item_ids) + + def getItemsById(self, itemIdentifiers): + return self._dbpool.runInteraction(self._getItemsById, itemIdentifiers) + - def _get_items_by_id(self, cursor, item_ids): - self._check_node_exists(cursor) + def _getItemsById(self, cursor, itemIdentifiers): + self._checkNodeExists(cursor) items = [] - for item_id in item_ids: + for itemIdentifier in itemIdentifiers: cursor.execute("""SELECT data FROM nodes JOIN items ON (nodes.id=items.node_id) WHERE node=%s AND item=%s""", - (self.id, - item_id)) + (self.nodeIdentifier, + itemIdentifier)) result = cursor.fetchone() if result: items.append(unicode(result[0], 'utf-8')) return items + def purge(self): return self._dbpool.runInteraction(self._purge) + def _purge(self, cursor): - self._check_node_exists(cursor) + self._checkNodeExists(cursor) cursor.execute("""DELETE FROM items WHERE node_id=(SELECT id FROM nodes WHERE node=%s)""", - (self.id,)) + (self.nodeIdentifier,)) + + class LeafNode(Node, LeafNodeMixin):
--- a/idavoll/test/test_backend.py Thu Jun 18 11:54:56 2009 +0000 +++ b/idavoll/test/test_backend.py Tue Jun 10 11:31:49 2008 +0000 @@ -18,51 +18,52 @@ NS_PUBSUB = 'http://jabber.org/protocol/pubsub' class BackendTest(unittest.TestCase): - def test_delete_node(self): + def test_deleteNode(self): class testNode: - id = 'to-be-deleted' - def get_affiliation(self, entity): + nodeIdentifier = 'to-be-deleted' + def getAffiliation(self, entity): if entity is OWNER: return defer.succeed('owner') class testStorage: - def get_node(self, node_id): + def getNode(self, nodeIdentifier): return defer.succeed(testNode()) - def delete_node(self, node_id): - if node_id in ['to-be-deleted']: - self.delete_called = True + def deleteNode(self, nodeIdentifier): + if nodeIdentifier in ['to-be-deleted']: + self.deleteCalled = True return defer.succeed(None) else: return defer.fail(error.NodeNotFound()) - def pre_delete(node_id): - self.pre_delete_called = True + def preDelete(nodeIdentifier): + self.preDeleteCalled = True return defer.succeed(None) def cb(result): - self.assertTrue(self.pre_delete_called) - self.assertTrue(self.storage.delete_called) + self.assertTrue(self.preDeleteCalled) + self.assertTrue(self.storage.deleteCalled) self.storage = testStorage() self.backend = backend.BackendService(self.storage) self.storage.backend = self.backend - self.pre_delete_called = False - self.delete_called = False + self.preDeleteCalled = False + self.deleteCalled = False - self.backend.register_pre_delete(pre_delete) - d = self.backend.delete_node('to-be-deleted', OWNER) + self.backend.registerPreDelete(preDelete) + d = self.backend.deleteNode('to-be-deleted', OWNER) d.addCallback(cb) return d - def test_create_nodeNoID(self): + + def test_createNodeNoID(self): """ Test creation of a node without a given node identifier. """ class testStorage: - def create_node(self, node_id, requestor): - self.node_id = node_id + def createNode(self, nodeIdentifier, requestor): + self.nodeIdentifier = nodeIdentifier return defer.succeed(None) self.storage = testStorage() @@ -71,27 +72,28 @@ def checkID(nodeIdentifier): self.assertNotIdentical(None, nodeIdentifier) - self.assertIdentical(self.storage.node_id, nodeIdentifier) + self.assertIdentical(self.storage.nodeIdentifier, nodeIdentifier) - d = self.backend.create_node(None, OWNER) + d = self.backend.createNode(None, OWNER) d.addCallback(checkID) return d + def test_publishNoID(self): """ Test publish request with an item without a node identifier. """ class testNode: - id = 'node' - def get_affiliation(self, entity): + nodeIdentifier = 'node' + def getAffiliation(self, entity): if entity is OWNER: return defer.succeed('owner') - def get_configuration(self): + def getConfiguration(self): return {'pubsub#deliver_payloads': True, 'pubsub#persist_items': False} class testStorage: - def get_node(self, node_id): + def getNode(self, nodeIdentifier): return defer.succeed(testNode()) def checkID(notification): @@ -101,12 +103,13 @@ self.backend = backend.BackendService(self.storage) self.storage.backend = self.backend - self.backend.register_notifier(checkID) + self.backend.registerNotifier(checkID) items = [pubsub.Item()] d = self.backend.publish('node', items, OWNER) return d + def test_notifyOnSubscription(self): """ Test notification of last published item on subscription. @@ -114,25 +117,25 @@ ITEM = "<item xmlns='%s' id='1'/>" % NS_PUBSUB class testNode: - id = 'node' - def get_affiliation(self, entity): + nodeIdentifier = 'node' + def getAffiliation(self, entity): if entity is OWNER: return defer.succeed('owner') - def get_configuration(self): + def getConfiguration(self): return {'pubsub#deliver_payloads': True, 'pubsub#persist_items': False, 'pubsub#send_last_published_item': 'on_sub'} - def get_items(self, max_items): + def getItems(self, maxItems): return [ITEM] - def add_subscription(self, subscriber, state): + def addSubscription(self, subscriber, state): return defer.succeed(None) class testStorage: - def get_node(self, node_id): + def getNode(self, nodeIdentifier): return defer.succeed(testNode()) def cb(data): - self.assertEquals('node', data['node_id']) + self.assertEquals('node', data['nodeIdentifier']) self.assertEquals([ITEM], data['items']) self.assertEquals(OWNER, data['subscriber']) @@ -142,37 +145,44 @@ d1 = defer.Deferred() d1.addCallback(cb) - self.backend.register_notifier(d1.callback) + self.backend.registerNotifier(d1.callback) d2 = self.backend.subscribe('node', OWNER, OWNER) return defer.gatherResults([d1, d2]) test_notifyOnSubscription.timeout = 2 + class BaseTestBackend(object): """ Base class for backend stubs. """ - def supports_publisher_affiliation(self): + def supportsPublisherAffiliation(self): return True - def supports_outcast_affiliation(self): + + def supportsOutcastAffiliation(self): return True - def supports_persistent_items(self): + + def supportsPersistentItems(self): return True - def supports_instant_nodes(self): + + def supportsInstantNodes(self): return True - def register_notifier(self, observerfn, *args, **kwargs): + + def registerNotifier(self, observerfn, *args, **kwargs): return - def register_pre_delete(self, pre_delete_fn): + + def registerPreDelete(self, preDeleteFn): return + class PubSubServiceFromBackendTest(unittest.TestCase): def test_unsubscribeNotSubscribed(self): @@ -193,16 +203,17 @@ d.addCallback(cb) return d + def test_getNodeInfo(self): """ Test retrieving node information. """ class TestBackend(BaseTestBackend): - def get_node_type(self, nodeIdentifier): + def getNodeType(self, nodeIdentifier): return defer.succeed('leaf') - def get_node_meta_data(self, nodeIdentifier): + def getNodeMetaData(self, nodeIdentifier): return defer.succeed({'pubsub#persist_items': True}) def cb(info):
--- a/idavoll/test/test_gateway.py Thu Jun 18 11:54:56 2009 +0000 +++ b/idavoll/test/test_gateway.py Tue Jun 10 11:31:49 2008 +0000 @@ -25,7 +25,7 @@ entry.addElement("content", content="Some text.") baseURI = "http://localhost:8086/" -componentJID = "pubsub.localhost" +componentJID = "pubsub" class GatewayTest(unittest.TestCase): timeout = 2
--- a/idavoll/test/test_storage.py Thu Jun 18 11:54:56 2009 +0000 +++ b/idavoll/test/test_storage.py Tue Jun 10 11:31:49 2008 +0000 @@ -40,318 +40,353 @@ return object + class StorageTests: def _assignTestNode(self, node): self.node = node + def setUp(self): - d = self.s.get_node('pre-existing') + d = self.s.getNode('pre-existing') d.addCallback(self._assignTestNode) return d - def testGetNode(self): - return self.s.get_node('pre-existing') + + def test_getNode(self): + return self.s.getNode('pre-existing') - def testGetNonExistingNode(self): - d = self.s.get_node('non-existing') + + def test_getNonExistingNode(self): + d = self.s.getNode('non-existing') self.assertFailure(d, error.NodeNotFound) return d - def testGetNodeIDs(self): - def cb(node_ids): - self.assertIn('pre-existing', node_ids) - self.assertNotIn('non-existing', node_ids) + + def test_getNodeIDs(self): + def cb(nodeIdentifiers): + self.assertIn('pre-existing', nodeIdentifiers) + self.assertNotIn('non-existing', nodeIdentifiers) - return self.s.get_node_ids().addCallback(cb) + return self.s.getNodeIds().addCallback(cb) + - def testCreateExistingNode(self): - d = self.s.create_node('pre-existing', OWNER) + def test_createExistingNode(self): + d = self.s.createNode('pre-existing', OWNER) self.assertFailure(d, error.NodeExists) return d - def testCreateNode(self): + + def test_createNode(self): def cb(void): - d = self.s.get_node('new 1') + d = self.s.getNode('new 1') return d - d = self.s.create_node('new 1', OWNER) + d = self.s.createNode('new 1', OWNER) d.addCallback(cb) return d - def testDeleteNonExistingNode(self): - d = self.s.delete_node('non-existing') + + def test_deleteNonExistingNode(self): + d = self.s.deleteNode('non-existing') self.assertFailure(d, error.NodeNotFound) return d - def testDeleteNode(self): + + def test_deleteNode(self): def cb(void): - d = self.s.get_node('to-be-deleted') + d = self.s.getNode('to-be-deleted') self.assertFailure(d, error.NodeNotFound) return d - d = self.s.delete_node('to-be-deleted') + d = self.s.deleteNode('to-be-deleted') d.addCallback(cb) return d - def testGetAffiliations(self): + + def test_getAffiliations(self): def cb(affiliations): self.assertIn(('pre-existing', 'owner'), affiliations) - d = self.s.get_affiliations(OWNER) + d = self.s.getAffiliations(OWNER) d.addCallback(cb) return d - def testGetSubscriptions(self): + + def test_getSubscriptions(self): def cb(subscriptions): self.assertIn(('pre-existing', SUBSCRIBER, 'subscribed'), subscriptions) - d = self.s.get_subscriptions(SUBSCRIBER) + d = self.s.getSubscriptions(SUBSCRIBER) d.addCallback(cb) return d + # Node tests - def testGetType(self): - self.assertEqual(self.node.get_type(), 'leaf') + def test_getType(self): + self.assertEqual(self.node.getType(), 'leaf') - def testGetConfiguration(self): - config = self.node.get_configuration() + + def test_getConfiguration(self): + config = self.node.getConfiguration() self.assertIn('pubsub#persist_items', config.iterkeys()) self.assertIn('pubsub#deliver_payloads', config.iterkeys()) self.assertEqual(config['pubsub#persist_items'], True) self.assertEqual(config['pubsub#deliver_payloads'], True) - def testSetConfiguration(self): - def get_config(node): - d = node.set_configuration({'pubsub#persist_items': False}) + + def test_setConfiguration(self): + def getConfig(node): + d = node.setConfiguration({'pubsub#persist_items': False}) d.addCallback(lambda _: node) return d - def check_object_config(node): - config = node.get_configuration() + def checkObjectConfig(node): + config = node.getConfiguration() self.assertEqual(config['pubsub#persist_items'], False) - def get_node(void): - return self.s.get_node('to-be-reconfigured') + def getNode(void): + return self.s.getNode('to-be-reconfigured') - def check_storage_config(node): - config = node.get_configuration() + def checkStorageConfig(node): + config = node.getConfiguration() self.assertEqual(config['pubsub#persist_items'], False) - d = self.s.get_node('to-be-reconfigured') - d.addCallback(get_config) - d.addCallback(check_object_config) - d.addCallback(get_node) - d.addCallback(check_storage_config) + d = self.s.getNode('to-be-reconfigured') + d.addCallback(getConfig) + d.addCallback(checkObjectConfig) + d.addCallback(getNode) + d.addCallback(checkStorageConfig) return d - def testGetMetaData(self): - meta_data = self.node.get_meta_data() - for key, value in self.node.get_configuration().iteritems(): - self.assertIn(key, meta_data.iterkeys()) - self.assertEqual(value, meta_data[key]) - self.assertIn('pubsub#node_type', meta_data.iterkeys()) - self.assertEqual(meta_data['pubsub#node_type'], 'leaf') - def testGetAffiliation(self): + def test_getMetaData(self): + metaData = self.node.getMetaData() + for key, value in self.node.getConfiguration().iteritems(): + self.assertIn(key, metaData.iterkeys()) + self.assertEqual(value, metaData[key]) + self.assertIn('pubsub#node_type', metaData.iterkeys()) + self.assertEqual(metaData['pubsub#node_type'], 'leaf') + + + def test_getAffiliation(self): def cb(affiliation): self.assertEqual(affiliation, 'owner') - d = self.node.get_affiliation(OWNER) + d = self.node.getAffiliation(OWNER) d.addCallback(cb) return d - def testGetNonExistingAffiliation(self): + + def test_getNonExistingAffiliation(self): def cb(affiliation): self.assertEqual(affiliation, None) - d = self.node.get_affiliation(SUBSCRIBER) + d = self.node.getAffiliation(SUBSCRIBER) d.addCallback(cb) return d - def testAddSubscription(self): + + def test_addSubscription(self): def cb1(void): - return self.node.get_subscription(SUBSCRIBER_NEW) + return self.node.getSubscription(SUBSCRIBER_NEW) def cb2(state): self.assertEqual(state, 'pending') - d = self.node.add_subscription(SUBSCRIBER_NEW, 'pending') + d = self.node.addSubscription(SUBSCRIBER_NEW, 'pending') d.addCallback(cb1) d.addCallback(cb2) return d - def testAddExistingSubscription(self): - d = self.node.add_subscription(SUBSCRIBER, 'pending') + + def test_addExistingSubscription(self): + d = self.node.addSubscription(SUBSCRIBER, 'pending') self.assertFailure(d, error.SubscriptionExists) return d - def testGetSubscription(self): + + def test_getSubscription(self): def cb(subscriptions): self.assertEquals(subscriptions[0][1], 'subscribed') self.assertEquals(subscriptions[1][1], 'pending') self.assertEquals(subscriptions[2][1], None) - d = defer.DeferredList([self.node.get_subscription(SUBSCRIBER), - self.node.get_subscription(SUBSCRIBER_PENDING), - self.node.get_subscription(OWNER)]) + d = defer.DeferredList([self.node.getSubscription(SUBSCRIBER), + self.node.getSubscription(SUBSCRIBER_PENDING), + self.node.getSubscription(OWNER)]) d.addCallback(cb) return d - def testRemoveSubscription(self): - return self.node.remove_subscription(SUBSCRIBER_TO_BE_DELETED) + + def test_removeSubscription(self): + return self.node.removeSubscription(SUBSCRIBER_TO_BE_DELETED) - def testRemoveNonExistingSubscription(self): - d = self.node.remove_subscription(OWNER) + + def test_removeNonExistingSubscription(self): + d = self.node.removeSubscription(OWNER) self.assertFailure(d, error.NotSubscribed) return d - def testGetSubscribers(self): + + def test_getSubscribers(self): def cb(subscribers): self.assertIn(SUBSCRIBER, subscribers) self.assertNotIn(SUBSCRIBER_PENDING, subscribers) self.assertNotIn(OWNER, subscribers) - d = self.node.get_subscribers() + d = self.node.getSubscribers() d.addCallback(cb) return d - def testIsSubscriber(self): + + def test_isSubscriber(self): def cb(subscribed): self.assertEquals(subscribed[0][1], True) self.assertEquals(subscribed[1][1], True) self.assertEquals(subscribed[2][1], False) self.assertEquals(subscribed[3][1], False) - d = defer.DeferredList([self.node.is_subscribed(SUBSCRIBER), - self.node.is_subscribed(SUBSCRIBER.userhostJID()), - self.node.is_subscribed(SUBSCRIBER_PENDING), - self.node.is_subscribed(OWNER)]) + d = defer.DeferredList([self.node.isSubscribed(SUBSCRIBER), + self.node.isSubscribed(SUBSCRIBER.userhostJID()), + self.node.isSubscribed(SUBSCRIBER_PENDING), + self.node.isSubscribed(OWNER)]) d.addCallback(cb) return d - def testStoreItems(self): + + def test_storeItems(self): def cb1(void): - return self.node.get_items_by_id(['new']) + return self.node.getItemsById(['new']) def cb2(result): self.assertEqual(result[0], decode(ITEM_NEW.toXml())) - d = self.node.store_items([ITEM_NEW], PUBLISHER) + d = self.node.storeItems([ITEM_NEW], PUBLISHER) d.addCallback(cb1) d.addCallback(cb2) return d - def testStoreUpdatedItems(self): + + def test_storeUpdatedItems(self): def cb1(void): - return self.node.get_items_by_id(['current']) + return self.node.getItemsById(['current']) def cb2(result): self.assertEqual(result[0], decode(ITEM_UPDATED.toXml())) - d = self.node.store_items([ITEM_UPDATED], PUBLISHER) + d = self.node.storeItems([ITEM_UPDATED], PUBLISHER) d.addCallback(cb1) d.addCallback(cb2) return d - def testRemoveItems(self): + + def test_removeItems(self): def cb1(result): self.assertEqual(result, ['to-be-deleted']) - return self.node.get_items_by_id(['to-be-deleted']) + return self.node.getItemsById(['to-be-deleted']) def cb2(result): self.assertEqual(len(result), 0) - d = self.node.remove_items(['to-be-deleted']) + d = self.node.removeItems(['to-be-deleted']) d.addCallback(cb1) d.addCallback(cb2) return d - def testRemoveNonExistingItems(self): + + def test_removeNonExistingItems(self): def cb(result): self.assertEqual(result, []) - d = self.node.remove_items(['non-existing']) + d = self.node.removeItems(['non-existing']) d.addCallback(cb) return d - def testGetItems(self): + + def test_getItems(self): def cb(result): self.assertIn(decode(ITEM.toXml()), result) - d = self.node.get_items() + d = self.node.getItems() d.addCallback(cb) return d - def testLastItem(self): + + def test_lastItem(self): def cb(result): self.assertEqual([decode(ITEM.toXml())], result) - d = self.node.get_items(1) + d = self.node.getItems(1) d.addCallback(cb) return d - def testGetItemsById(self): + + def test_getItemsById(self): def cb(result): self.assertEqual(len(result), 1) - d = self.node.get_items_by_id(['current']) + d = self.node.getItemsById(['current']) d.addCallback(cb) return d - def testGetNonExistingItemsById(self): + + def test_getNonExistingItemsById(self): def cb(result): self.assertEqual(len(result), 0) - d = self.node.get_items_by_id(['non-existing']) + d = self.node.getItemsById(['non-existing']) d.addCallback(cb) return d - def testPurge(self): + + def test_purge(self): def cb1(node): d = node.purge() d.addCallback(lambda _: node) return d def cb2(node): - return node.get_items() + return node.getItems() def cb3(result): self.assertEqual([], result) - d = self.s.get_node('to-be-purged') + d = self.s.getNode('to-be-purged') d.addCallback(cb1) d.addCallback(cb2) d.addCallback(cb3) return d - def testGetNodeAffilatiations(self): + + def test_getNodeAffilatiations(self): def cb1(node): - return node.get_affiliations() + return node.getAffiliations() def cb2(affiliations): affiliations = dict(((a[0].full(), a[1]) for a in affiliations)) self.assertEquals(affiliations[OWNER.full()], 'owner') - d = self.s.get_node('pre-existing') + d = self.s.getNode('pre-existing') d.addCallback(cb1) d.addCallback(cb2) return d + class MemoryStorageStorageTestCase(unittest.TestCase, StorageTests): def setUp(self): from idavoll.memory_storage import Storage, LeafNode, Subscription, \ - default_config + defaultConfig self.s = Storage() self.s._nodes['pre-existing'] = \ - LeafNode('pre-existing', OWNER, default_config) + LeafNode('pre-existing', OWNER, defaultConfig) self.s._nodes['to-be-deleted'] = \ LeafNode('to-be-deleted', OWNER, None) self.s._nodes['to-be-reconfigured'] = \ - LeafNode('to-be-reconfigured', OWNER, default_config) + LeafNode('to-be-reconfigured', OWNER, defaultConfig) self.s._nodes['to-be-purged'] = \ LeafNode('to-be-purged', OWNER, None) @@ -374,10 +409,12 @@ return StorageTests.setUp(self) + class PgsqlStorageStorageTestCase(unittest.TestCase, StorageTests): def _callSuperSetUp(self, void): return StorageTests.setUp(self) + def setUp(self): from idavoll.pgsql_storage import Storage self.s = Storage('ralphm', 'pubsub_test') @@ -386,10 +423,12 @@ d.addCallback(self._callSuperSetUp) return d + def tearDownClass(self): #return self.s._dbpool.runInteraction(self.cleandb) pass + def init(self, cursor): self.cleandb(cursor) cursor.execute("""INSERT INTO nodes (node) VALUES ('pre-existing')""") @@ -454,6 +493,7 @@ (PUBLISHER.userhost(), ITEM.toXml())) + def cleandb(self, cursor): cursor.execute("""DELETE FROM nodes WHERE node in ('non-existing', 'pre-existing', 'to-be-deleted',