Mercurial > libervia-pubsub
changeset 167:ef22e4150caa
Move protocol implementations (pubsub, disco, forms) to and depend on wokkel.
Author: ralphm
Fixes: #4
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Wed, 03 Oct 2007 12:41:43 +0000 |
parents | 5abb017bd687 |
children | e2c2c2baf483 |
files | INSTALL LICENSE README idavoll/__init__.py idavoll/backend.py idavoll/data_form.py idavoll/disco.py idavoll/error.py idavoll/generic_backend.py idavoll/idavoll.py idavoll/iidavoll.py idavoll/memory_storage.py idavoll/pgsql_storage.py idavoll/pubsub.py idavoll/storage.py idavoll/tap.py idavoll/test/__init__.py idavoll/test/test_backend.py idavoll/test/test_storage.py |
diffstat | 19 files changed, 1069 insertions(+), 1851 deletions(-) [+] |
line wrap: on
line diff
--- a/INSTALL Thu Jan 18 14:08:32 2007 +0000 +++ b/INSTALL Wed Oct 03 12:41:43 2007 +0000 @@ -5,6 +5,7 @@ - Twisted Words >= 0.4.0 - uuid.py (http://ofxsuite.berlios.de/uuid.py) - A jabber server that supports the component protocol (JEP-0114) +- Wokkel (http://wokkel.ik.nu/) For the PostgreSQL backend, the following is also required:
--- a/LICENSE Thu Jan 18 14:08:32 2007 +0000 +++ b/LICENSE Wed Oct 03 12:41:43 2007 +0000 @@ -1,4 +1,4 @@ -Copyright (c) 2003-2006 Ralph Meijer +Copyright (c) 2003-2007 Ralph Meijer Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the
--- a/README Thu Jan 18 14:08:32 2007 +0000 +++ b/README Wed Oct 03 12:41:43 2007 +0000 @@ -14,7 +14,7 @@ There are two different backends: one using PostgreSQL for storage, and one just keeping everything in memory. -In Idavoll the mimimal requirements of JEP-0060 version 1.8 are implemented +In Idavoll the mimimal requirements of JEP-0060 version 1.9 are implemented plus most optional features, as returned by Service Discovery. Installing @@ -26,7 +26,7 @@ Copyright ========= -All of the code in this distribution is Copyright (c) 2003-2006 Ralph Meijer. +All of the code in this distribution is Copyright (c) 2003-2007 Ralph Meijer. Idavoll is made available under the MIT license. The included LICENSE file describes this in detail.
--- a/idavoll/__init__.py Thu Jan 18 14:08:32 2007 +0000 +++ b/idavoll/__init__.py Wed Oct 03 12:41:43 2007 +0000 @@ -1,3 +1,6 @@ -# Copyright (c) 2003-2006 Ralph Meijer +# Copyright (c) 2003-2007 Ralph Meijer # See LICENSE for details. +""" +Idavoll, a generic XMPP publish-subscribe service. +"""
--- a/idavoll/backend.py Thu Jan 18 14:08:32 2007 +0000 +++ b/idavoll/backend.py Wed Oct 03 12:41:43 2007 +0000 @@ -1,214 +1,524 @@ -# Copyright (c) 2003-2006 Ralph Meijer +# -*- test-case-name: idavoll.test.test_backend -*- +# +# Copyright (c) 2003-2007 Ralph Meijer # See LICENSE for details. -from zope.interface import Interface -import storage +import uuid + +from zope.interface import implements -class Error(Exception): - msg = '' +from twisted.application import service +from twisted.python import components +from twisted.internet import defer +from twisted.words.protocols.jabber.error import StanzaError +from twisted.words.xish import utility - def __str__(self): - return self.msg - -class Forbidden(Error): - pass +from wokkel.iwokkel import IDisco, IPubSubService +from wokkel.pubsub import PubSubService, PubSubError -class ItemForbidden(Error): - pass +from idavoll import error, iidavoll +from idavoll.iidavoll import IBackendService -class ItemRequired(Error): - pass - -class NoInstantNodes(Error): - pass +def _get_affiliation(node, entity): + d = node.get_affiliation(entity) + d.addCallback(lambda affiliation: (node, affiliation)) + return d -class NotSubscribed(Error): - pass +class BackendService(service.Service, utility.EventDispatcher): -class InvalidConfigurationOption(Error): - msg = 'Invalid configuration option' - -class InvalidConfigurationValue(Error): - msg = 'Bad configuration value' + implements(iidavoll.IBackendService) -class NodeNotPersistent(Error): - pass - -class NoRootNode(Error): - pass + options = {"pubsub#persist_items": + {"type": "boolean", + "label": "Persist items to storage"}, + "pubsub#deliver_payloads": + {"type": "boolean", + "label": "Deliver payloads with event notifications"}, + } -class IBackendService(Interface): - """ Interface to a backend service of a pubsub service. """ + default_config = {"pubsub#persist_items": True, + "pubsub#deliver_payloads": True, + } - def __init__(storage): - """ - @param storage: L{storage} object. - """ + def __init__(self, storage): + utility.EventDispatcher.__init__(self) + self.storage = storage + self._callback_list = [] def supports_publisher_affiliation(self): - """ Reports if the backend supports the publisher affiliation. - - @rtype: C{bool} - """ + return True def supports_outcast_affiliation(self): - """ Reports if the backend supports the publisher affiliation. - - @rtype: C{bool} - """ + return True def supports_persistent_items(self): - """ Reports if the backend supports persistent items. - - @rtype: C{bool} - """ + return True - def get_node_type(node_id): - """ Return type of a node. - - @return: a deferred that returns either 'leaf' or 'collection' - """ + def get_node_type(self, node_id): + d = self.storage.get_node(node_id) + d.addCallback(lambda node: node.get_type()) + return d def get_nodes(self): - """ Returns list of all nodes. + return self.storage.get_node_ids() + + def get_node_meta_data(self, node_id): + d = self.storage.get_node(node_id) + d.addCallback(lambda node: node.get_meta_data()) + d.addCallback(self._make_meta_data) + return d - @return: a deferred that returns a C{list} of node ids. - """ + def _make_meta_data(self, meta_data): + options = [] + for key, value in meta_data.iteritems(): + if self.options.has_key(key): + option = {"var": key} + option.update(self.options[key]) + option["value"] = value + options.append(option) + + return options + + def _check_auth(self, node, requestor): + def check(affiliation, node): + if affiliation not in ['owner', 'publisher']: + raise error.Forbidden() + return node - def get_node_meta_data(node_id): - """ Return meta data for a node. + d = node.get_affiliation(requestor) + d.addCallback(check, node) + return d + + def publish(self, node_id, items, requestor): + d = self.storage.get_node(node_id) + d.addCallback(self._check_auth, requestor) + d.addCallback(self._do_publish, items, requestor) + return d + + def _do_publish(self, node, items, requestor): + configuration = node.get_configuration() + persist_items = configuration["pubsub#persist_items"] + deliver_payloads = configuration["pubsub#deliver_payloads"] - @return: a deferred that returns a C{list} of C{dict}s with the - metadata. - """ + if items and not persist_items and not deliver_payloads: + raise error.ItemForbidden() + elif not items and (persist_items or deliver_payloads): + raise error.ItemRequired() + + if persist_items or deliver_payloads: + for item in items: + if not item.getAttribute("id"): + item["id"] = uuid.generate() + + if persist_items: + d = node.store_items(items, requestor) + else: + d = defer.succeed(None) -class INodeCreationService(Interface): - """ A service for creating nodes """ + d.addCallback(self._do_notify, node.id, items, deliver_payloads) + return d + + def _do_notify(self, result, node_id, items, deliver_payloads): + if items and not deliver_payloads: + for item in items: + item.children = [] + + self.dispatch({'items': items, 'node_id': node_id}, + '//event/pubsub/notify') + + def get_notification_list(self, node_id, items): + d = self.storage.get_node(node_id) + d.addCallback(lambda node: node.get_subscribers()) + d.addCallback(self._magic_filter, node_id, items) + return d + + def _magic_filter(self, subscribers, node_id, items): + list = [] + for subscriber in subscribers: + list.append((subscriber, items)) + return list + + def register_notifier(self, observerfn, *args, **kwargs): + self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs) - def create_node(node_id, requestor): - """ Create a node. - - @return: a deferred that fires when the node has been created. - """ + def subscribe(self, node_id, subscriber, requestor): + subscriber_entity = subscriber.userhostJID() + if subscriber_entity != requestor: + return defer.fail(error.Forbidden()) + + d = self.storage.get_node(node_id) + d.addCallback(_get_affiliation, subscriber_entity) + d.addCallback(self._do_subscribe, subscriber) + return d + + def _do_subscribe(self, result, subscriber): + node, affiliation = result -class INodeDeletionService(Interface): - """ A service for deleting nodes. """ + if affiliation == 'outcast': + raise error.Forbidden() + + d = node.add_subscription(subscriber, 'subscribed') + d.addCallback(lambda _: 'subscribed') + d.addErrback(self._get_subscription, node, subscriber) + d.addCallback(self._return_subscription, node.id) + return d + + def _get_subscription(self, failure, node, subscriber): + failure.trap(error.SubscriptionExists) + return node.get_subscription(subscriber) + + def _return_subscription(self, result, node_id): + return node_id, result - def register_pre_delete(pre_delete_fn): - """ Register a callback that is called just before a node deletion. - - The function C{pre_deleted_fn} is added to a list of functions - to be called just before deletion of a node. The callback - C{pre_delete_fn} is called with the C{node_id} that is about to be - deleted and should return a deferred that returns a list of deferreds - that are to be fired after deletion. The backend collects the lists - from all these callbacks before actually deleting the node in question. - After deletion all collected deferreds are fired to do post-processing. + def unsubscribe(self, node_id, subscriber, requestor): + if subscriber.userhostJID() != requestor: + return defer.fail(error.Forbidden()) + + d = self.storage.get_node(node_id) + d.addCallback(lambda node: node.remove_subscription(subscriber)) + return d + + def get_subscriptions(self, entity): + return self.storage.get_subscriptions(entity) + + def supports_instant_nodes(self): + return True + + def create_node(self, node_id, requestor): + if not node_id: + node_id = 'generic/%s' % uuid.generate() + d = self.storage.create_node(node_id, requestor) + d.addCallback(lambda _: node_id) + return d - The idea is that you want to be able to collect data from the - node before deleting it, for example to get a list of subscribers - that have to be notified after the node has been deleted. To do this, - C{pre_delete_fn} fetches the subscriber list and passes this - list to a callback attached to a deferred that it sets up. This - deferred is returned in the list of deferreds. - """ + def get_default_configuration(self): + d = defer.succeed(self.default_config) + d.addCallback(self._make_config) + return d + + def get_node_configuration(self, node_id): + if not node_id: + raise error.NoRootNode() + + d = self.storage.get_node(node_id) + d.addCallback(lambda node: node.get_configuration()) + + d.addCallback(self._make_config) + return d + + def _make_config(self, config): + options = [] + for key, value in self.options.iteritems(): + option = {"var": key} + option.update(value) + if config.has_key(key): + option["value"] = config[key] + options.append(option) + + return options + + def set_node_configuration(self, node_id, options, requestor): + if not node_id: + raise error.NoRootNode() - def get_subscribers(node_id): - """ Get node subscriber list. - - @return: a deferred that fires with the list of subscribers. - """ + for key, value in options.iteritems(): + if not self.options.has_key(key): + raise error.InvalidConfigurationOption() + if self.options[key]["type"] == 'boolean': + try: + options[key] = bool(int(value)) + except ValueError: + raise error.InvalidConfigurationValue() + + d = self.storage.get_node(node_id) + d.addCallback(_get_affiliation, requestor) + d.addCallback(self._do_set_node_configuration, options) + return d - def delete_node(node_id, requestor): - """ Delete a node. - - @return: a deferred that fires when the node has been deleted. - """ + def _do_set_node_configuration(self, result, options): + node, affiliation = result + + if affiliation != 'owner': + raise error.Forbidden() + + return node.set_configuration(options) + + def get_affiliations(self, entity): + return self.storage.get_affiliations(entity) + + def get_items(self, node_id, requestor, max_items=None, item_ids=[]): + d = self.storage.get_node(node_id) + d.addCallback(_get_affiliation, requestor) + d.addCallback(self._do_get_items, max_items, item_ids) + return d -class IPublishService(Interface): - """ A service for publishing items to a node. """ + def _do_get_items(self, result, max_items, item_ids): + node, affiliation = result + + if affiliation == 'outcast': + raise error.Forbidden() + + if item_ids: + return node.get_items_by_id(item_ids) + else: + return node.get_items(max_items) + + def retract_item(self, node_id, item_ids, requestor): + d = self.storage.get_node(node_id) + d.addCallback(_get_affiliation, requestor) + d.addCallback(self._do_retract, item_ids) + return d - def publish(node_id, items, requestor): - """ Publish items to a pubsub node. - - @return: a deferred that fires when the items have been published. - """ -class INotificationService(Interface): - """ A service for notification of published items. """ + def _do_retract(self, result, item_ids): + node, affiliation = result + persist_items = node.get_configuration()["pubsub#persist_items"] + + if affiliation not in ['owner', 'publisher']: + raise error.Forbidden() - def register_notifier(observerfn, *args, **kwargs): - """ Register callback which is called for notification. """ + if not persist_items: + raise error.NodeNotPersistent() + + d = node.remove_items(item_ids) + d.addCallback(self._do_notify_retraction, node.id) + return d + + def _do_notify_retraction(self, item_ids, node_id): + self.dispatch({ 'item_ids': item_ids, 'node_id': node_id }, + '//event/pubsub/retract') - def get_notification_list(node_id, items): - pass + def purge_node(self, node_id, requestor): + d = self.storage.get_node(node_id) + d.addCallback(_get_affiliation, requestor) + d.addCallback(self._do_purge) + return d + + def _do_purge(self, result): + node, affiliation = result + persist_items = node.get_configuration()["pubsub#persist_items"] -class ISubscriptionService(Interface): - """ A service for managing subscriptions. """ + if affiliation != 'owner': + raise error.Forbidden() + + if not persist_items: + raise error.NodeNotPersistent() + + d = node.purge() + d.addCallback(self._do_notify_purge, node.id) + return d + + def _do_notify_purge(self, result, node_id): + self.dispatch(node_id, '//event/pubsub/purge') - def subscribe(node_id, subscriber, requestor): - """ Request the subscription of an entity to a pubsub node. + def register_pre_delete(self, pre_delete_fn): + self._callback_list.append(pre_delete_fn) + + def get_subscribers(self, node_id): + d = self.storage.get_node(node_id) + d.addCallback(lambda node: node.get_subscribers()) + return d - Depending on the node's configuration and possible business rules, the - C{subscriber} is added to the list of subscriptions of the node with id - C{node_id}. The C{subscriber} might be different from the C{requestor}, - and if the C{requestor} is not allowed to subscribe this entity an - exception should be raised. + def delete_node(self, node_id, requestor): + d = self.storage.get_node(node_id) + d.addCallback(_get_affiliation, requestor) + d.addCallback(self._do_pre_delete) + return d + + def _do_pre_delete(self, result): + node, affiliation = result + + if affiliation != 'owner': + raise error.Forbidden() + + d = defer.DeferredList([cb(node.id) for cb in self._callback_list], + consumeErrors=1) + d.addCallback(self._do_delete, node.id) - @return: a deferred that returns the subscription state - """ + def _do_delete(self, result, node_id): + dl = [] + for succeeded, r in result: + if succeeded and r: + dl.extend(r) + + d = self.storage.delete_node(node_id) + d.addCallback(self._do_notify_delete, dl) + + return d - def unsubscribe(node_id, subscriber, requestor): - """ Cancel the subscription of an entity to a pubsub node. + def _do_notify_delete(self, result, dl): + for d in dl: + d.callback(None) + + +class PubSubServiceFromBackend(PubSubService): + """ + Adapts a backend to an xmpp publish-subscribe service. + """ + + implements(IDisco) - The subscription of C{subscriber} is removed from the list of - subscriptions of the node with id C{node_id}. If the C{requestor} - is not allowed to unsubscribe C{subscriber}, an an exception should - be raised. + _errorMap = { + error.NodeNotFound: ('item-not-found', None, None), + error.NodeExists: ('conflict', None, None), + error.SubscriptionNotFound: ('not-authorized', + 'not-subscribed', + None), + error.Forbidden: ('forbidden', None, None), + error.ItemForbidden: ('bad-request', 'item-forbidden', None), + error.ItemRequired: ('bad-request', 'item-required', None), + error.NoInstantNodes: ('not-acceptable', + 'unsupported', + 'instant-nodes'), + error.NotSubscribed: ('not-authorized', 'not-subscribed', None), + error.InvalidConfigurationOption: ('not-acceptable', None, None), + error.InvalidConfigurationValue: ('not-acceptable', None, None), + error.NodeNotPersistent: ('feature-not-implemented', + 'unsupported', + 'persistent-node'), + error.NoRootNode: ('bad-request', None, None), + } - @return: a deferred that fires when unsubscription is complete. - """ + def __init__(self, backend): + PubSubService.__init__(self) - def get_subscriptions(entity): - """ Report the list of current subscriptions with this pubsub service. + self.backend = backend + self.hideNodes = False - Report the list of the current subscriptions with all nodes within this - pubsub service, for the C{entity}. + self.pubSubFeatures = self._getPubSubFeatures() + + self.backend.register_notifier(self._notify) - @return: a deferred that returns the list of all current subscriptions - as tuples C{(node_id, subscriber, subscription)}. - """ + def _getPubSubFeatures(self): + features = [ + "config-node", + "create-nodes", + "delete-any", + "delete-nodes", + "item-ids", + "meta-data", + "publish", + "purge-nodes", + "retract-items", + "retrieve-affiliations", + "retrieve-default", + "retrieve-items", + "retrieve-subscriptions", + "subscribe", + ] -class IAffiliationsService(Interface): - """ A service for retrieving the affiliations with this pubsub service. """ + if self.backend.supports_instant_nodes(): + features.append("instant-nodes") + + if self.backend.supports_outcast_affiliation(): + features.append("outcast-affiliation") + + if self.backend.supports_persistent_items(): + features.append("persistent-items") + + if self.backend.supports_publisher_affiliation(): + features.append("publisher-affiliation") + + return features - def get_affiliations(entity): - """ Report the list of current affiliations with this pubsub service. + def _notify(self, data): + items = data['items'] + nodeIdentifier = data['node_id'] + d = self.backend.get_notification_list(nodeIdentifier, items) + d.addCallback(lambda notifications: self.notifyPublish(self.serviceJID, + nodeIdentifier, + notifications)) + + def _mapErrors(self, failure): + e = failure.trap(*self._errorMap.keys()) + + condition, pubsubCondition, feature = self._errorMap[e] + msg = failure.value.msg - Report the list of the current affiliations with all nodes within this - pubsub service, for the C{entity}. + if pubsubCondition: + exc = PubSubError(condition, pubsubCondition, feature, msg) + else: + exc = StanzaError(condition, text=msg) + + raise exc - @return: a deferred that returns the list of all current affiliations - as tuples C{(node_id, affiliation)}. - """ + def getNodeInfo(self, requestor, service, nodeIdentifier): + info = {} + + def saveType(result): + info['type'] = result + return nodeIdentifier + + def saveMetaData(result): + info['meta-data'] = result + return info -class IRetractionService(Interface): - """ A service for retracting published items """ + d = defer.succeed(nodeIdentifier) + d.addCallback(self.backend.get_node_type) + d.addCallback(saveType) + d.addCallback(self.backend.get_node_meta_data) + d.addCallback(saveMetaData) + d.errback(self._mapErrors) + return d + + def getNodes(self, requestor, service): + d = self.backend.get_nodes() + return d.addErrback(self._mapErrors) + + def publish(self, requestor, service, nodeIdentifier, items): + d = self.backend.publish(nodeIdentifier, items, requestor) + return d.addErrback(self._mapErrors) - def retract_item(node_id, item_id, requestor): - """ Removes item in node from persistent storage """ + def subscribe(self, requestor, service, nodeIdentifier, subscriber): + d = self.backend.subscribe(nodeIdentifier, subscriber, requestor) + return d.addErrback(self._mapErrors) + + def unsubscribe(self, requestor, service, nodeIdentifier, subscriber): + d = self.backend.unsubscribe(nodeIdentifier, subscriber, requestor) + return d.addErrback(self._mapErrors) - def purge_node(node_id, requestor): - """ Removes all items in node from persistent storage """ + def subscriptions(self, requestor, service): + d = self.backend.get_subscriptions(requestor) + return d.addErrback(self._mapErrors) + + def affiliations(self, requestor, service): + d = self.backend.get_affiliations(requestor) + return d.addErrback(self._mapErrors) -class IItemRetrievalService(Interface): - """ A service for retrieving previously published items. """ + def create(self, requestor, service, nodeIdentifier): + d = self.backend.create_node(nodeIdentifier, requestor) + return d.addErrback(self._mapErrors) + + def getDefaultConfiguration(self, requestor, service): + d = self.backend.get_default_configuration() + return d.addErrback(self._mapErrors) - def get_items(node_id, requestor, max_items=None, item_ids=[]): - """ Retrieve items from persistent storage + def getConfiguration(self, requestor, service, nodeIdentifier): + d = self.backend.get_node_configuration(nodeIdentifier) + return d.addErrback(self._mapErrors) + + def setConfiguration(self, requestor, service, nodeIdentifier, options): + d = self.backend.set_node_configuration(nodeIdentifier, options, + requestor) + return d.addErrback(self._mapErrors) - If C{max_items} is given, return the C{max_items} last published - items, else if C{item_ids} is not empty, return the items requested. - If neither is given, return all items. + def items(self, requestor, service, nodeIdentifier, maxItems, itemIdentifiers): + d = self.backend.get_items(nodeIdentifier, requestor, maxItems, + itemIdentifiers) + return d.addErrback(self._mapErrors) + + def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): + d = self.backend.retract_item(nodeIdentifier, itemIdentifiers, + requestor) + return d.addErrback(self._mapErrors) - @return: a deferred that returns the requested items - """ + def purge(self, requestor, service, nodeIdentifier): + d = self.backend.purge_node(nodeIdentifier, requestor) + return d.addErrback(self._mapErrors) + + def delete(self, requestor, service, nodeIdentifier): + d = self.backend.delete_node(nodeIdentifier, requestor) + return d.addErrback(self._mapErrors) + +components.registerAdapter(PubSubServiceFromBackend, + IBackendService, + IPubSubService)
--- a/idavoll/data_form.py Thu Jan 18 14:08:32 2007 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,55 +0,0 @@ -# Copyright (c) 2003-2006 Ralph Meijer -# See LICENSE for details. - -from twisted.words.xish import domish - -NS_X_DATA = 'jabber:x:data' - -class Field(domish.Element): - def __init__(self, type='text-single', var=None, label=None, - value=None, values=[], options={}): - domish.Element.__init__(self, (NS_X_DATA, 'field')) - self['type'] = type - if var is not None: - self['var'] = var - if label is not None: - self['label'] = label - if value is not None: - self.set_value(value) - else: - self.set_values(values) - if type in ['list-single', 'list-multi']: - for value, label in options.iteritems(): - self.addChild(Option(value, label)) - - def set_value(self, value): - if self['type'] == 'boolean': - value = str(int(bool(value))) - else: - value = str(value) - - value_element = self.value or self.addElement('value') - value_element.children = [] - value_element.addContent(value) - - def set_values(self, values): - for value in values: - value = str(value) - self.addElement('value', content=value) - -class Option(domish.Element): - def __init__(self, value, label=None): - domish.Element.__init__(self, (NS_X_DATA, 'option')) - if label is not None: - self['label'] = label - self.addElement('value', content=value) - -class Form(domish.Element): - def __init__(self, type, form_type): - domish.Element.__init__(self, (NS_X_DATA, 'x'), - attribs={'type': type}) - self.add_field(type='hidden', var='FORM_TYPE', values=[form_type]) - - def add_field(self, type='text-single', var=None, label=None, - value=None, values=[], options={}): - self.addChild(Field(type, var, label, value, values, options))
--- a/idavoll/disco.py Thu Jan 18 14:08:32 2007 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,31 +0,0 @@ -# Copyright (c) 2003-2006 Ralph Meijer -# See LICENSE for details. - -from twisted.words.xish import domish - -NS = 'http://jabber.org/protocol/disco' -NS_INFO = NS + '#info' -NS_ITEMS = NS + '#items' - -class Feature(domish.Element): - def __init__(self, feature): - domish.Element.__init__(self, (NS_INFO, 'feature'), - attribs={'var': feature}) -class Identity(domish.Element): - def __init__(self, category, type, name = None): - domish.Element.__init__(self, (NS_INFO, 'identity'), - attribs={'category': category, - 'type': type}) - if name: - self['name'] = name - -class Item(domish.Element): - def __init__(self, jid, node = None, name = None): - domish.Element.__init__(self, (NS_ITEMS, 'item'), - attribs={'jid': jid}) - if node: - self['node'] = node - - if name: - self['name'] = name -
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/idavoll/error.py Wed Oct 03 12:41:43 2007 +0000 @@ -0,0 +1,47 @@ +# Copyright (c) 2003-2007 Ralph Meijer +# See LICENSE for details. + +class Error(Exception): + msg = '' + + def __str__(self): + return self.msg + +class NodeNotFound(Error): + pass + +class NodeExists(Error): + pass + +class SubscriptionNotFound(Error): + pass + +class SubscriptionExists(Error): + pass + +class Forbidden(Error): + pass + +class ItemForbidden(Error): + pass + +class ItemRequired(Error): + pass + +class NoInstantNodes(Error): + pass + +class NotSubscribed(Error): + pass + +class InvalidConfigurationOption(Error): + msg = 'Invalid configuration option' + +class InvalidConfigurationValue(Error): + msg = 'Bad configuration value' + +class NodeNotPersistent(Error): + pass + +class NoRootNode(Error): + pass
--- a/idavoll/generic_backend.py Thu Jan 18 14:08:32 2007 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,378 +0,0 @@ -# Copyright (c) 2003-2006 Ralph Meijer -# See LICENSE for details. - -import sha -import time -import uuid -from twisted.words.protocols.jabber import jid -from twisted.words.xish import utility -from twisted.application import service -from twisted.internet import defer -from zope.interface import implements -import backend, storage - -def _get_affiliation(node, entity): - d = node.get_affiliation(entity) - d.addCallback(lambda affiliation: (node, affiliation)) - return d - -class BackendService(service.MultiService, utility.EventDispatcher): - - implements(backend.IBackendService) - - options = {"pubsub#persist_items": - {"type": "boolean", - "label": "Persist items to storage"}, - "pubsub#deliver_payloads": - {"type": "boolean", - "label": "Deliver payloads with event notifications"}, - } - - default_config = {"pubsub#persist_items": True, - "pubsub#deliver_payloads": True, - } - - def __init__(self, storage): - service.MultiService.__init__(self) - utility.EventDispatcher.__init__(self) - self.storage = storage - - def supports_publisher_affiliation(self): - return True - - def supports_outcast_affiliation(self): - return True - - def supports_persistent_items(self): - return True - - def get_node_type(self, node_id): - d = self.storage.get_node(node_id) - d.addCallback(lambda node: node.get_type()) - return d - - def get_nodes(self): - return self.storage.get_node_ids() - - def get_node_meta_data(self, node_id): - d = self.storage.get_node(node_id) - d.addCallback(lambda node: node.get_meta_data()) - d.addCallback(self._make_meta_data) - return d - - def _make_meta_data(self, meta_data): - options = [] - for key, value in meta_data.iteritems(): - if self.options.has_key(key): - option = {"var": key} - option.update(self.options[key]) - option["value"] = value - options.append(option) - - return options - -class PublishService(service.Service): - - implements(backend.IPublishService) - - def _check_auth(self, node, requestor): - def check(affiliation, node): - if affiliation not in ['owner', 'publisher']: - raise backend.Forbidden - return node - - d = node.get_affiliation(requestor) - d.addCallback(check, node) - return d - - def publish(self, node_id, items, requestor): - d = self.parent.storage.get_node(node_id) - d.addCallback(self._check_auth, requestor) - d.addCallback(self._do_publish, items, requestor) - return d - - def _do_publish(self, node, items, requestor): - configuration = node.get_configuration() - persist_items = configuration["pubsub#persist_items"] - deliver_payloads = configuration["pubsub#deliver_payloads"] - - if items and not persist_items and not deliver_payloads: - raise backend.ItemForbidden - elif not items and (persist_items or deliver_payloads): - raise backend.ItemRequired - - if persist_items or deliver_payloads: - for item in items: - if not item.getAttribute("id"): - item["id"] = uuid.generate() - - if persist_items: - d = node.store_items(items, requestor) - else: - d = defer.succeed(None) - - d.addCallback(self._do_notify, node.id, items, deliver_payloads) - return d - - def _do_notify(self, result, node_id, items, deliver_payloads): - if items and not deliver_payloads: - for item in items: - item.children = [] - - self.parent.dispatch({ 'items': items, 'node_id': node_id }, - '//event/pubsub/notify') - -class NotificationService(service.Service): - - implements(backend.INotificationService) - - def get_notification_list(self, node_id, items): - d = self.parent.storage.get_node(node_id) - d.addCallback(lambda node: node.get_subscribers()) - d.addCallback(self._magic_filter, node_id, items) - return d - - def _magic_filter(self, subscribers, node_id, items): - list = [] - for subscriber in subscribers: - list.append((subscriber, items)) - return list - - def register_notifier(self, observerfn, *args, **kwargs): - self.parent.addObserver('//event/pubsub/notify', observerfn, - *args, **kwargs) - -class SubscriptionService(service.Service): - - implements(backend.ISubscriptionService) - - def subscribe(self, node_id, subscriber, requestor): - subscriber_entity = subscriber.userhostJID() - if subscriber_entity != requestor: - return defer.fail(backend.Forbidden()) - - d = self.parent.storage.get_node(node_id) - d.addCallback(_get_affiliation, subscriber_entity) - d.addCallback(self._do_subscribe, subscriber) - return d - - def _do_subscribe(self, result, subscriber): - node, affiliation = result - - if affiliation == 'outcast': - raise backend.Forbidden - - d = node.add_subscription(subscriber, 'subscribed') - d.addCallback(lambda _: 'subscribed') - d.addErrback(self._get_subscription, node, subscriber) - d.addCallback(self._return_subscription, node.id) - return d - - def _get_subscription(self, failure, node, subscriber): - failure.trap(storage.SubscriptionExists) - return node.get_subscription(subscriber) - - def _return_subscription(self, result, node_id): - return node_id, result - - def unsubscribe(self, node_id, subscriber, requestor): - if subscriber.userhostJID() != requestor: - return defer.fail(backend.Forbidden()) - - d = self.parent.storage.get_node(node_id) - d.addCallback(lambda node: node.remove_subscription(subscriber)) - return d - - def get_subscriptions(self, entity): - return self.parent.storage.get_subscriptions(entity) - -class NodeCreationService(service.Service): - - implements(backend.INodeCreationService) - - def supports_instant_nodes(self): - return True - - def create_node(self, node_id, requestor): - if not node_id: - node_id = 'generic/%s' % uuid.generate() - d = self.parent.storage.create_node(node_id, requestor) - d.addCallback(lambda _: node_id) - return d - - def get_default_configuration(self): - d = defer.succeed(self.parent.default_config) - d.addCallback(self._make_config) - return d - - def get_node_configuration(self, node_id): - if not node_id: - raise backend.NoRootNode - - d = self.parent.storage.get_node(node_id) - d.addCallback(lambda node: node.get_configuration()) - - d.addCallback(self._make_config) - return d - - def _make_config(self, config): - options = [] - for key, value in self.parent.options.iteritems(): - option = {"var": key} - option.update(value) - if config.has_key(key): - option["value"] = config[key] - options.append(option) - - return options - - def set_node_configuration(self, node_id, options, requestor): - if not node_id: - raise backend.NoRootNode - - for key, value in options.iteritems(): - if not self.parent.options.has_key(key): - raise backend.InvalidConfigurationOption - if self.parent.options[key]["type"] == 'boolean': - try: - options[key] = bool(int(value)) - except ValueError: - raise backend.InvalidConfigurationValue - - d = self.parent.storage.get_node(node_id) - d.addCallback(_get_affiliation, requestor) - d.addCallback(self._do_set_node_configuration, options) - return d - - def _do_set_node_configuration(self, result, options): - node, affiliation = result - - if affiliation != 'owner': - raise backend.Forbidden - - return node.set_configuration(options) - -class AffiliationsService(service.Service): - - implements(backend.IAffiliationsService) - - def get_affiliations(self, entity): - return self.parent.storage.get_affiliations(entity) - -class ItemRetrievalService(service.Service): - - implements(backend.IItemRetrievalService) - - def get_items(self, node_id, requestor, max_items=None, item_ids=[]): - d = self.parent.storage.get_node(node_id) - d.addCallback(_get_affiliation, requestor) - d.addCallback(self._do_get_items, max_items, item_ids) - return d - - def _do_get_items(self, result, max_items, item_ids): - node, affiliation = result - - if affiliation == 'outcast': - raise backend.Forbidden - - if item_ids: - return node.get_items_by_id(item_ids) - else: - return node.get_items(max_items) - -class RetractionService(service.Service): - - implements(backend.IRetractionService) - - def retract_item(self, node_id, item_ids, requestor): - d = self.parent.storage.get_node(node_id) - d.addCallback(_get_affiliation, requestor) - d.addCallback(self._do_retract, item_ids) - return d - - def _do_retract(self, result, item_ids): - node, affiliation = result - persist_items = node.get_configuration()["pubsub#persist_items"] - - if affiliation not in ['owner', 'publisher']: - raise backend.Forbidden - - if not persist_items: - raise backend.NodeNotPersistent - - d = node.remove_items(item_ids) - d.addCallback(self._do_notify_retraction, node.id) - return d - - def _do_notify_retraction(self, item_ids, node_id): - self.parent.dispatch({ 'item_ids': item_ids, 'node_id': node_id }, - '//event/pubsub/retract') - - def purge_node(self, node_id, requestor): - d = self.parent.storage.get_node(node_id) - d.addCallback(_get_affiliation, requestor) - d.addCallback(self._do_purge) - return d - - def _do_purge(self, result): - node, affiliation = result - persist_items = node.get_configuration()["pubsub#persist_items"] - - if affiliation != 'owner': - raise backend.Forbidden - - if not persist_items: - raise backend.NodeNotPersistent - - d = node.purge() - d.addCallback(self._do_notify_purge, node.id) - return d - - def _do_notify_purge(self, result, node_id): - self.parent.dispatch(node_id, '//event/pubsub/purge') - -class NodeDeletionService(service.Service): - - implements(backend.INodeDeletionService) - - def __init__(self): - self._callback_list = [] - - def register_pre_delete(self, pre_delete_fn): - self._callback_list.append(pre_delete_fn) - - def get_subscribers(self, node_id): - d = self.parent.storage.get_node(node_id) - d.addCallback(lambda node: node.get_subscribers()) - return d - - def delete_node(self, node_id, requestor): - d = self.parent.storage.get_node(node_id) - d.addCallback(_get_affiliation, requestor) - d.addCallback(self._do_pre_delete) - return d - - def _do_pre_delete(self, result): - node, affiliation = result - - if affiliation != 'owner': - raise backend.Forbidden - - d = defer.DeferredList([cb(node.id) for cb in self._callback_list], - consumeErrors=1) - d.addCallback(self._do_delete, node.id) - - def _do_delete(self, result, node_id): - dl = [] - for succeeded, r in result: - if succeeded and r: - dl.extend(r) - - d = self.parent.storage.delete_node(node_id) - d.addCallback(self._do_notify_delete, dl) - - return d - - def _do_notify_delete(self, result, dl): - for d in dl: - d.callback(None)
--- a/idavoll/idavoll.py Thu Jan 18 14:08:32 2007 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,202 +0,0 @@ -# Copyright (c) 2003-2006 Ralph Meijer -# See LICENSE for details. - -from twisted.words.protocols.jabber import component, error -from twisted.application import service -from twisted.internet import defer - -import disco -import pubsub - -try: - from twisted.words.protocols.jabber.ijabber import IService -except ImportError: - from twisted.words.protocols.jabber.component import IService - -__version__ = '0.6.0' - -NS_VERSION = 'jabber:iq:version' - -IQ_GET = '/iq[@type="get"]' -IQ_SET = '/iq[@type="set"]' -VERSION = IQ_GET + '/query[@xmlns="' + NS_VERSION + '"]' -DISCO_INFO = IQ_GET + '/query[@xmlns="' + disco.NS_INFO + '"]' -DISCO_ITEMS = IQ_GET + '/query[@xmlns="' + disco.NS_ITEMS + '"]' - -class IdavollService(component.Service): - - def componentConnected(self, xmlstream): - self.xmlstream = xmlstream - xmlstream.addObserver(VERSION, self.onVersion, 1) - xmlstream.addObserver(DISCO_INFO, self.onDiscoInfo, 1) - xmlstream.addObserver(DISCO_ITEMS, self.onDiscoItems, 1) - xmlstream.addObserver(IQ_GET, self.iqFallback, -1) - xmlstream.addObserver(IQ_SET, self.iqFallback, -1) - - def get_disco_info(self, node): - info = [] - - if not node: - info.append(disco.Feature(disco.NS_ITEMS)) - info.append(disco.Feature(NS_VERSION)) - - return defer.succeed(info) - - def onVersion(self, iq): - iq.swapAttributeValues("to", "from") - iq["type"] = "result" - name = iq.addElement("name", None, 'Idavoll') - version = iq.addElement("version", None, __version__) - self.send(iq) - iq.handled = True - - def onDiscoInfo(self, iq): - dl = [] - node = iq.query.getAttribute("node") - - for c in self.parent: - if IService.providedBy(c): - if hasattr(c, "get_disco_info"): - dl.append(c.get_disco_info(node)) - d = defer.DeferredList(dl, fireOnOneErrback=1, consumeErrors=1) - d.addCallback(self._disco_info_results, iq, node) - d.addErrback(self._error, iq) - d.addCallback(self.send) - - iq.handled = True - - def _disco_info_results(self, results, iq, node): - info = [] - for i in results: - info.extend(i[1]) - - if node and not info: - return error.StanzaError('item-not-found').toResponse(iq) - else: - iq.swapAttributeValues("to", "from") - iq["type"] = "result" - for item in info: - #domish.Element.addChild should probably do this for all - # subclasses of Element - item.parent = iq.query - - iq.query.addChild(item) - - return iq - - def _error(self, result, iq): - print "Got error on index %d:" % result.value[1] - result.value[0].printBriefTraceback() - return error.StanzaError('internal-server-error').toResponse(iq) - - def onDiscoItems(self, iq): - dl = [] - node = iq.query.getAttribute("node") - - for c in self.parent: - if IService.providedBy(c): - if hasattr(c, "get_disco_items"): - dl.append(c.get_disco_items(node)) - d = defer.DeferredList(dl, fireOnOneErrback=1, consumeErrors=1) - d.addCallback(self._disco_items_result, iq, node) - d.addErrback(self._error, iq) - d.addCallback(self.send) - - iq.handled = True - - def _disco_items_result(self, results, iq, node): - items = [] - - for i in results: - items.extend(i[1]) - - iq.swapAttributeValues("to", "from") - iq["type"] = "result" - iq.query.children = items - - return iq - - def iqFallback(self, iq): - if iq.handled == True: - return - - self.send(error.StanzaError('service-unavailable').toResponse(iq)) - -class LogService(component.Service): - - def transportConnected(self, xmlstream): - xmlstream.rawDataInFn = self.rawDataIn - xmlstream.rawDataOutFn = self.rawDataOut - - def rawDataIn(self, buf): - print "RECV: %s" % unicode(buf, 'utf-8').encode('ascii', 'replace') - - def rawDataOut(self, buf): - print "SEND: %s" % unicode(buf, 'utf-8').encode('ascii', 'replace') - -def makeService(config): - serviceCollection = service.MultiService() - - # set up Jabber Component - sm = component.buildServiceManager(config["jid"], config["secret"], - ("tcp:%s:%s" % (config["rhost"], config["rport"]))) - - if config["verbose"]: - LogService().setServiceParent(sm) - - if config['backend'] == 'pgsql': - import pgsql_storage - st = pgsql_storage.Storage(user=config['dbuser'], - database=config['dbname'], - password=config['dbpass']) - elif config['backend'] == 'memory': - import memory_storage - st = memory_storage.Storage() - - import generic_backend as b - bs = b.BackendService(st) - - c = IService(bs) - c.setServiceParent(sm) - c.hide_nodes = config["hide-nodes"] - - bsc = b.PublishService() - bsc.setServiceParent(bs) - IService(bsc).setServiceParent(sm) - - bsc = b.NotificationService() - bsc.setServiceParent(bs) - IService(bsc).setServiceParent(sm) - - bsc = b.SubscriptionService() - bsc.setServiceParent(bs) - IService(bsc).setServiceParent(sm) - - bsc = b.NodeCreationService() - bsc.setServiceParent(bs) - IService(bsc).setServiceParent(sm) - - bsc = b.AffiliationsService() - bsc.setServiceParent(bs) - IService(bsc).setServiceParent(sm) - - bsc = b.ItemRetrievalService() - bsc.setServiceParent(bs) - IService(bsc).setServiceParent(sm) - - bsc = b.RetractionService() - bsc.setServiceParent(bs) - IService(bsc).setServiceParent(sm) - - bsc = b.NodeDeletionService() - bsc.setServiceParent(bs) - IService(bsc).setServiceParent(sm) - - s = IdavollService() - s.setServiceParent(sm) - - sm.setServiceParent(serviceCollection) - - # other stuff - - return sm
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/idavoll/iidavoll.py Wed Oct 03 12:41:43 2007 +0000 @@ -0,0 +1,417 @@ +# Copyright (c) 2003-2007 Ralph Meijer +# See LICENSE for details. + +""" +Interfaces for idavoll. +""" + +from zope.interface import Interface + +class IBackendService(Interface): + """ Interface to a backend service of a pubsub service. """ + + def __init__(storage): + """ + @param storage: L{storage} object. + """ + + def supports_publisher_affiliation(): + """ Reports if the backend supports the publisher affiliation. + + @rtype: C{bool} + """ + + def supports_outcast_affiliation(): + """ Reports if the backend supports the publisher affiliation. + + @rtype: C{bool} + """ + + def supports_persistent_items(): + """ Reports if the backend supports persistent items. + + @rtype: C{bool} + """ + + def get_node_type(node_id): + """ Return type of a node. + + @return: a deferred that returns either 'leaf' or 'collection' + """ + + def get_nodes(): + """ Returns list of all nodes. + + @return: a deferred that returns a C{list} of node ids. + """ + + def get_node_meta_data(node_id): + """ Return meta data for a node. + + @return: a deferred that returns a C{list} of C{dict}s with the + metadata. + """ + + def create_node(node_id, requestor): + """ Create a node. + + @return: a deferred that fires when the node has been created. + """ + + def register_pre_delete(pre_delete_fn): + """ Register a callback that is called just before a node deletion. + + The function C{pre_deleted_fn} is added to a list of functions + to be called just before deletion of a node. The callback + C{pre_delete_fn} is called with the C{node_id} that is about to be + deleted and should return a deferred that returns a list of deferreds + that are to be fired after deletion. The backend collects the lists + from all these callbacks before actually deleting the node in question. + After deletion all collected deferreds are fired to do post-processing. + + The idea is that you want to be able to collect data from the + node before deleting it, for example to get a list of subscribers + that have to be notified after the node has been deleted. To do this, + C{pre_delete_fn} fetches the subscriber list and passes this + list to a callback attached to a deferred that it sets up. This + deferred is returned in the list of deferreds. + """ + + def delete_node(node_id, requestor): + """ Delete a node. + + @return: a deferred that fires when the node has been deleted. + """ + + def purge_node(node_id, requestor): + """ Removes all items in node from persistent storage """ + + def subscribe(node_id, subscriber, requestor): + """ Request the subscription of an entity to a pubsub node. + + Depending on the node's configuration and possible business rules, the + C{subscriber} is added to the list of subscriptions of the node with id + C{node_id}. The C{subscriber} might be different from the C{requestor}, + and if the C{requestor} is not allowed to subscribe this entity an + exception should be raised. + + @return: a deferred that returns the subscription state + """ + + def unsubscribe(node_id, subscriber, requestor): + """ Cancel the subscription of an entity to a pubsub node. + + The subscription of C{subscriber} is removed from the list of + subscriptions of the node with id C{node_id}. If the C{requestor} + is not allowed to unsubscribe C{subscriber}, an an exception should + be raised. + + @return: a deferred that fires when unsubscription is complete. + """ + + def get_subscribers(node_id): + """ Get node subscriber list. + + @return: a deferred that fires with the list of subscribers. + """ + + def get_subscriptions(entity): + """ Report the list of current subscriptions with this pubsub service. + + Report the list of the current subscriptions with all nodes within this + pubsub service, for the C{entity}. + + @return: a deferred that returns the list of all current subscriptions + as tuples C{(node_id, subscriber, subscription)}. + """ + + def get_affiliations(entity): + """ Report the list of current affiliations with this pubsub service. + + Report the list of the current affiliations with all nodes within this + pubsub service, for the C{entity}. + + @return: a deferred that returns the list of all current affiliations + as tuples C{(node_id, affiliation)}. + """ + + def publish(node_id, items, requestor): + """ Publish items to a pubsub node. + + @return: a deferred that fires when the items have been published. + """ + + def register_notifier(observerfn, *args, **kwargs): + """ Register callback which is called for notification. """ + + def get_notification_list(node_id, items): + """ + Get list of entities to notify. + """ + + def get_items(node_id, requestor, max_items=None, item_ids=[]): + """ Retrieve items from persistent storage + + If C{max_items} is given, return the C{max_items} last published + items, else if C{item_ids} is not empty, return the items requested. + If neither is given, return all items. + + @return: a deferred that returns the requested items + """ + + def retract_item(node_id, item_id, requestor): + """ Removes item in node from persistent storage """ + + +class IStorage(Interface): + """ + Storage interface. + """ + + def get_node(node_id): + """ + Get Node. + + @param node_id: NodeID of the desired node. + @type node_id: L{str} + @return: deferred that returns a L{Node} object. + """ + + def get_node_ids(): + """ + Return all NodeIDs. + + @return: deferred that returns a list of NodeIDs (L{str}). + """ + + def create_node(node_id, owner, config = None, type='leaf'): + """ + Create new node. + + The implementation should make sure, the passed owner JID is stripped + of the resource (e.g. using C{owner.userhostJID()}). + + @param node_id: NodeID of the new node. + @type node_id: L{str} + @param owner: JID of the new nodes's owner. + @type owner: L{jid.JID} + @param config: Configuration + @param type: Node type. Can be either C{'leaf'} or C{'collection'}. + @return: deferred that fires on creation. + """ + + def delete_node(node_id): + """ + Delete a node. + + @param node_id: NodeID of the new node. + @type node_id: L{str} + @return: deferred that fires on deletion. + """ + + def get_affiliations(entity): + """ + Get all affiliations for entity. + + The implementation should make sure, the passed owner JID is stripped + of the resource (e.g. using C{owner.userhostJID()}). + + @param entity: JID of the entity. + @type entity: L{jid.JID} + @return: deferred that returns a L{list} of tuples of the form + C{(node_id, affiliation)}, where C{node_id} is of the type + L{str} and C{affiliation} is one of C{'owner'}, C{'publisher'} + and C{'outcast'}. + """ + + def get_subscriptions(entity): + """ + Get all subscriptions for an entity. + + The implementation should make sure, the passed owner JID is stripped + of the resource (e.g. using C{owner.userhostJID()}). + + @param entity: JID of the entity. + @type entity: L{jid.JID} + @return: deferred that returns a L{list} of tuples of the form + C{(node_id, subscriber, state)}, where C{node_id} is of the + type L{str}, C{subscriber} of the type {jid.JID}, and + C{state} is C{'subscribed'} or C{'pending'}. + """ + + +class INode(Interface): + """ + Interface to the class of objects that represent nodes. + """ + + def get_type(): + """ + Get node's type. + + @return: C{'leaf'} or C{'collection'}. + """ + + def get_configuration(): + """ + Get node's configuration. + + The configuration must at least have two options: + C{pubsub#persist_items}, and C{pubsub#deliver_payloads}. + + @return: L{dict} of configuration options. + """ + + def get_meta_data(): + """ + Get node's meta data. + + The meta data must be a superset of the configuration options, and + also at least should have a C{pubsub#node_type} entry. + + @return: L{dict} of meta data. + """ + + def set_configuration(options): + """ + Set node's configuration. + + The elements of {options} will set the new values for those + configuration items. This means that only changing items have to + be given. + + @param options: a dictionary of configuration options. + @returns: a deferred that fires upon success. + """ + + def get_affiliation(entity): + """ + Get affiliation of entity with this node. + + @param entity: JID of entity. + @type entity: L{jid.JID} + @return: deferred that returns C{'owner'}, C{'publisher'}, C{'outcast'} + or C{None}. + """ + + def get_subscription(subscriber): + """ + Get subscription to this node of subscriber. + + @param subscriber: JID of the new subscriptions' entity. + @type subscriber: L{jid.JID} + @return: deferred that returns the subscription state (C{'subscribed'}, + C{'pending'} or C{None}). + """ + + def add_subscription(subscriber, state): + """ + Add new subscription to this node with given state. + + @param subscriber: JID of the new subscriptions' entity. + @type subscriber: L{jid.JID} + @param state: C{'subscribed'} or C{'pending'} + @type state: L{str} + @return: deferred that fires on subscription. + """ + + def remove_subscription(subscriber): + """ + Remove subscription to this node. + + @param subscriber: JID of the subscriptions' entity. + @type subscriber: L{jid.JID} + @return: deferred that fires on removal. + """ + + def get_subscribers(): + """ + Get list of subscribers to this node. + + Retrieves the list of entities that have a subscription to this + node. That is, having the state C{'subscribed'}. + + @return: a deferred that returns a L{list} of L{jid.JID}s. + """ + + def is_subscribed(entity): + """ + Returns whether entity has any subscription to this node. + + Only returns C{True} when the subscription state (if present) is + C{'subscribed'} for any subscription that matches the bare JID. + + @param subscriber: bare JID of the subscriptions' entity. + @type subscriber: L{jid.JID} + @return: deferred that returns a L{bool}. + """ + + def get_affiliations(): + """ + Get affiliations of entities with this node. + + @return: deferred that returns a L{list} of tuples (jid, affiliation), + where jid is a L(jid.JID) and affiliation is one of C{'owner'}, + C{'publisher'}, C{'outcast'}. + """ + +class ILeafNode(Interface): + """ + Interface to the class of objects that represent leaf nodes. + """ + + def store_items(items, publisher): + """ + Store items in persistent storage for later retrieval. + + @param items: The list of items to be stored. Each item is the + L{domish} representation of the XML fragment as defined + for C{<item/>} in the + C{http://jabber.org/protocol/pubsub} namespace. + @type items: L{list} of {domish.Element} + @param publisher: JID of the publishing entity. + @type publisher: L{jid.JID} + @return: deferred that fires upon success. + """ + + def remove_items(item_ids): + """ + Remove items by id. + + @param item_ids: L{list} of item ids. + @return: deferred that fires with a L{list} of ids of the items that + were deleted + """ + + def get_items(max_items=None): + """ + Get items. + + If C{max_items} is not given, all items in the node are returned, + just like C{get_items_by_id}. Otherwise, C{max_items} limits + the returned items to a maximum of that number of most recently + published items. + + @param max_items: if given, a natural number (>0) that limits the + returned number of items. + @return: deferred that fires with a L{list} of found items. + """ + + def get_items_by_id(item_ids): + """ + Get items by item id. + + Each item in the returned list is a unicode string that + represent the XML of the item as it was published, including the + item wrapper with item id. + + @param item_ids: L{list} of item ids. + @return: deferred that fires with a L{list} of found items. + """ + + def purge(): + """ + Purge node of all items in persistent storage. + + @return: deferred that fires when the node has been purged. + """
--- a/idavoll/memory_storage.py Thu Jan 18 14:08:32 2007 +0000 +++ b/idavoll/memory_storage.py Wed Oct 03 12:41:43 2007 +0000 @@ -1,18 +1,19 @@ -# Copyright (c) 2003-2006 Ralph Meijer +# Copyright (c) 2003-2007 Ralph Meijer # See LICENSE for details. import copy from zope.interface import implements from twisted.internet import defer from twisted.words.protocols.jabber import jid -import storage + +from idavoll import error, iidavoll default_config = {"pubsub#persist_items": True, "pubsub#deliver_payloads": True} class Storage: - implements(storage.IStorage) + implements(iidavoll.IStorage) def __init__(self): self._nodes = {} @@ -21,7 +22,7 @@ try: node = self._nodes[node_id] except KeyError: - return defer.fail(storage.NodeNotFound()) + return defer.fail(error.NodeNotFound()) return defer.succeed(node) @@ -30,7 +31,7 @@ def create_node(self, node_id, owner, config = None, type='leaf'): if node_id in self._nodes: - return defer.fail(storage.NodeExists()) + return defer.fail(error.NodeExists()) if not config: config = copy.copy(default_config) @@ -47,7 +48,7 @@ try: del self._nodes[node_id] except KeyError: - return defer.fail(storage.NodeNotFound()) + return defer.fail(error.NodeNotFound()) return defer.succeed(None) @@ -70,7 +71,7 @@ class Node: - implements(storage.INode) + implements(iidavoll.INode) def __init__(self, node_id, owner, config): self.id = node_id @@ -110,8 +111,8 @@ def add_subscription(self, subscriber, state): if self._subscriptions.get(subscriber.full()): - return defer.fail(storage.SubscriptionExists()) - + return defer.fail(error.SubscriptionExists()) + subscription = Subscription(state) self._subscriptions[subscriber.full()] = subscription return defer.succeed(None) @@ -120,7 +121,7 @@ try: del self._subscriptions[subscriber.full()] except KeyError: - return defer.fail(storage.SubscriptionNotFound()) + return defer.fail(error.SubscriptionNotFound()) return defer.succeed(None) @@ -208,7 +209,7 @@ class LeafNode(Node, LeafNodeMixin): - implements(storage.ILeafNode) + implements(iidavoll.ILeafNode) def __init__(self, node_id, owner, config): Node.__init__(self, node_id, owner, config) @@ -216,7 +217,5 @@ class Subscription: - implements(storage.ISubscription) - def __init__(self, state): self.state = state
--- a/idavoll/pgsql_storage.py Thu Jan 18 14:08:32 2007 +0000 +++ b/idavoll/pgsql_storage.py Wed Oct 03 12:41:43 2007 +0000 @@ -1,16 +1,16 @@ -# Copyright (c) 2003-2006 Ralph Meijer +# Copyright (c) 2003-2007 Ralph Meijer # See LICENSE for details. import copy -import storage from twisted.enterprise import adbapi -from twisted.internet import defer from twisted.words.protocols.jabber import jid from zope.interface import implements +from idavoll import error, iidavoll + class Storage: - implements(storage.IStorage) + implements(iidavoll.IStorage) def __init__(self, user, database, password = None): self._dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', @@ -32,7 +32,7 @@ (configuration["pubsub#persist_items"], configuration["pubsub#deliver_payloads"]) = cursor.fetchone() except TypeError: - raise storage.NodeNotFound + raise error.NodeNotFound() else: node = LeafNode(node_id, configuration) node._dbpool = self._dbpool @@ -53,8 +53,8 @@ cursor.execute("""INSERT INTO nodes (node) VALUES (%s)""", (node_id)) except cursor._pool.dbapi.OperationalError: - raise storage.NodeExists - + raise error.NodeExists() + cursor.execute("""SELECT 1 from entities where jid=%s""", (owner)) @@ -78,7 +78,7 @@ (node_id,)) if cursor.rowcount != 1: - raise storage.NodeNotFound + raise error.NodeNotFound() def get_affiliations(self, entity): d = self._dbpool.runQuery("""SELECT node, affiliation FROM entities @@ -110,7 +110,7 @@ class Node: - implements(storage.INode) + implements(iidavoll.INode) def __init__(self, node_id, config): self.id = node_id @@ -120,7 +120,7 @@ cursor.execute("""SELECT id FROM nodes WHERE node=%s""", (self.id)) if not cursor.fetchone(): - raise backend.NodeNotFound + raise error.NodeNotFound() def get_type(self): return self.type @@ -222,7 +222,7 @@ self.id, userhost)) except cursor._pool.dbapi.OperationalError: - raise storage.SubscriptionExists + raise error.SubscriptionExists() def remove_subscription(self, subscriber): return self._dbpool.runInteraction(self._remove_subscription, @@ -242,7 +242,7 @@ userhost, resource)) if cursor.rowcount != 1: - raise storage.SubscriptionNotFound + raise error.SubscriptionNotFound() return None @@ -398,4 +398,4 @@ class LeafNode(Node, LeafNodeMixin): - implements(storage.ILeafNode) + implements(iidavoll.ILeafNode)
--- a/idavoll/pubsub.py Thu Jan 18 14:08:32 2007 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,694 +0,0 @@ -# Copyright (c) 2003-2006 Ralph Meijer -# See LICENSE for details. - -from zope.interface import implements - -from twisted.words.protocols.jabber import component, jid, error -from twisted.words.xish import domish -from twisted.python import components -from twisted.internet import defer - -import backend -import storage -import disco -import data_form - -try: - from twisted.words.protocols.jabber.ijabber import IService -except ImportError: - from twisted.words.protocols.jabber.component import IService - -if issubclass(domish.SerializedXML, str): - # Work around bug # in twisted Xish - class SerializedXML(unicode): - """ Marker class for pre-serialized XML in the DOM. """ - - domish.SerializedXML = SerializedXML - -NS_COMPONENT = 'jabber:component:accept' -NS_PUBSUB = 'http://jabber.org/protocol/pubsub' -NS_PUBSUB_EVENT = NS_PUBSUB + '#event' -NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors' -NS_PUBSUB_OWNER = NS_PUBSUB + "#owner" - -IQ_GET = '/iq[@type="get"]' -IQ_SET = '/iq[@type="set"]' -PUBSUB_ELEMENT = '/pubsub[@xmlns="' + NS_PUBSUB + '"]' -PUBSUB_OWNER_ELEMENT = '/pubsub[@xmlns="' + NS_PUBSUB_OWNER + '"]' -PUBSUB_GET = IQ_GET + PUBSUB_ELEMENT -PUBSUB_SET = IQ_SET + PUBSUB_ELEMENT -PUBSUB_OWNER_GET = IQ_GET + PUBSUB_OWNER_ELEMENT -PUBSUB_OWNER_SET = IQ_SET + PUBSUB_OWNER_ELEMENT -PUBSUB_CREATE = PUBSUB_SET + '/create' -PUBSUB_PUBLISH = PUBSUB_SET + '/publish' -PUBSUB_SUBSCRIBE = PUBSUB_SET + '/subscribe' -PUBSUB_UNSUBSCRIBE = PUBSUB_SET + '/unsubscribe' -PUBSUB_OPTIONS_GET = PUBSUB_GET + '/options' -PUBSUB_OPTIONS_SET = PUBSUB_SET + '/options' -PUBSUB_DEFAULT = PUBSUB_OWNER_GET + '/default' -PUBSUB_CONFIGURE_GET = PUBSUB_OWNER_GET + '/configure' -PUBSUB_CONFIGURE_SET = PUBSUB_OWNER_SET + '/configure' -PUBSUB_SUBSCRIPTIONS = PUBSUB_GET + '/subscriptions' -PUBSUB_AFFILIATIONS = PUBSUB_GET + '/affiliations' -PUBSUB_ITEMS = PUBSUB_GET + '/items' -PUBSUB_RETRACT = PUBSUB_SET + '/retract' -PUBSUB_PURGE = PUBSUB_OWNER_SET + '/purge' -PUBSUB_DELETE = PUBSUB_OWNER_SET + '/delete' - -class BadRequest(error.StanzaError): - def __init__(self): - error.StanzaError.__init__(self, 'bad-request') - -class PubSubError(error.StanzaError): - def __init__(self, condition, pubsubCondition, feature=None, text=None): - appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition)) - if feature: - appCondition['feature'] = feature - error.StanzaError.__init__(self, condition, - text=text, - appCondition=appCondition) - -class OptionsUnavailable(PubSubError): - def __init__(self): - PubSubError.__init__(self, 'feature-not-implemented', - 'unsupported', - 'subscription-options-unavailable') - -error_map = { - storage.NodeNotFound: ('item-not-found', None, None), - storage.NodeExists: ('conflict', None, None), - storage.SubscriptionNotFound: ('not-authorized', 'not-subscribed', None), - backend.Forbidden: ('forbidden', None, None), - backend.ItemForbidden: ('bad-request', 'item-forbidden', None), - backend.ItemRequired: ('bad-request', 'item-required', None), - backend.NoInstantNodes: ('not-acceptable', 'unsupported', 'instant-nodes'), - backend.NotSubscribed: ('not-authorized', 'not-subscribed', None), - backend.InvalidConfigurationOption: ('not-acceptable', None, None), - backend.InvalidConfigurationValue: ('not-acceptable', None, None), - backend.NodeNotPersistent: ('feature-not-implemented', 'unsupported', - 'persistent-node'), - backend.NoRootNode: ('bad-request', None, None), -} - -class Service(component.Service): - - implements(IService) - - def __init__(self, backend): - self.backend = backend - - def error(self, failure, iq): - try: - e = failure.trap(error.StanzaError, *error_map.keys()) - except: - failure.printBriefTraceback() - return error.StanzaError('internal-server-error').toResponse(iq) - else: - if e == error.StanzaError: - exc = failure.value - else: - condition, pubsubCondition, feature = error_map[e] - msg = failure.value.msg - - if pubsubCondition: - exc = PubSubError(condition, pubsubCondition, feature, msg) - else: - exc = error.StanzaError(condition, text=msg) - - return exc.toResponse(iq) - - def success(self, result, iq): - iq.swapAttributeValues("to", "from") - iq["type"] = 'result' - iq.children = [] - if result: - for child in result: - iq.addChild(child) - - return iq - - def handler_wrapper(self, handler, iq): - try: - d = handler(iq) - except: - d = defer.fail() - - d.addCallback(self.success, iq) - d.addErrback(self.error, iq) - d.addCallback(self.send) - iq.handled = True - -class ComponentServiceFromService(Service): - - def __init__(self, backend): - Service.__init__(self, backend) - self.hide_nodes = False - - def get_disco_info(self, node): - info = [] - - if not node: - info.append(disco.Identity('pubsub', 'generic', - 'Generic Pubsub Service')) - - info.append(disco.Feature(NS_PUBSUB + "#meta-data")) - - if self.backend.supports_outcast_affiliation(): - info.append(disco.Feature(NS_PUBSUB + "#outcast-affiliation")) - - if self.backend.supports_persistent_items(): - info.append(disco.Feature(NS_PUBSUB + "#persistent-items")) - - if self.backend.supports_publisher_affiliation(): - info.append(disco.Feature(NS_PUBSUB + "#publisher-affiliation")) - - return defer.succeed(info) - else: - def trap_not_found(result): - result.trap(storage.NodeNotFound) - return [] - - d = self.backend.get_node_type(node) - d.addCallback(self._add_identity, [], node) - d.addErrback(trap_not_found) - return d - - def _add_identity(self, node_type, result_list, node): - result_list.append(disco.Identity('pubsub', node_type)) - d = self.backend.get_node_meta_data(node) - d.addCallback(self._add_meta_data, node_type, result_list) - return d - - def _add_meta_data(self, meta_data, node_type, result_list): - form = data_form.Form(type="result", - form_type=NS_PUBSUB + "#meta-data") - - for meta_datum in meta_data: - try: - del meta_datum['options'] - except KeyError: - pass - - form.add_field(**meta_datum) - - form.add_field("text-single", - "pubsub#node_type", - "The type of node (collection or leaf)", - node_type) - result_list.append(form) - return result_list - - def get_disco_items(self, node): - if node or self.hide_nodes: - return defer.succeed([]) - - d = self.backend.get_nodes() - d.addCallback(lambda nodes: [disco.Item(self.parent.jabberId, node) - for node in nodes]) - return d - -components.registerAdapter(ComponentServiceFromService, - backend.IBackendService, - IService) - -class ComponentServiceFromNotificationService(Service): - - def componentConnected(self, xmlstream): - self.backend.register_notifier(self.notify) - - def notify(self, object): - node_id = object["node_id"] - items = object["items"] - d = self.backend.get_notification_list(node_id, items) - d.addCallback(self._notify, node_id) - - def _notify(self, list, node_id): - for recipient, items in list: - self._notify_recipient(recipient, node_id, items) - - def _notify_recipient(self, recipient, node_id, itemlist): - message = domish.Element((NS_COMPONENT, "message")) - message["from"] = self.parent.jabberId - message["to"] = recipient.full() - event = message.addElement((NS_PUBSUB_EVENT, "event")) - items = event.addElement("items") - items["node"] = node_id - items.children.extend(itemlist) - self.send(message) - -components.registerAdapter(ComponentServiceFromNotificationService, - backend.INotificationService, - IService) - -class ComponentServiceFromPublishService(Service): - - def componentConnected(self, xmlstream): - xmlstream.addObserver(PUBSUB_PUBLISH, self.onPublish) - - def get_disco_info(self, node): - info = [] - - if not node: - info.append(disco.Feature(NS_PUBSUB + "#item-ids")) - - return defer.succeed(info) - - def onPublish(self, iq): - self.handler_wrapper(self._onPublish, iq) - - def _onPublish(self, iq): - try: - node = iq.pubsub.publish["node"] - except KeyError: - raise BadRequest - - items = [] - for child in iq.pubsub.publish.children: - if child.__class__ == domish.Element and child.name == 'item': - items.append(child) - - return self.backend.publish(node, items, - jid.internJID(iq["from"]).userhostJID()) - -components.registerAdapter(ComponentServiceFromPublishService, - backend.IPublishService, - IService) - -class ComponentServiceFromSubscriptionService(Service): - - def componentConnected(self, xmlstream): - xmlstream.addObserver(PUBSUB_SUBSCRIBE, self.onSubscribe) - xmlstream.addObserver(PUBSUB_UNSUBSCRIBE, self.onUnsubscribe) - xmlstream.addObserver(PUBSUB_OPTIONS_GET, self.onOptionsGet) - xmlstream.addObserver(PUBSUB_OPTIONS_SET, self.onOptionsSet) - xmlstream.addObserver(PUBSUB_SUBSCRIPTIONS, self.onSubscriptions) - - def get_disco_info(self, node): - info = [] - - if not node: - info.append(disco.Feature(NS_PUBSUB + '#subscribe')) - info.append(disco.Feature(NS_PUBSUB + '#retrieve-subscriptions')) - - return defer.succeed(info) - - def onSubscribe(self, iq): - self.handler_wrapper(self._onSubscribe, iq) - - def _onSubscribe(self, iq): - try: - node_id = iq.pubsub.subscribe["node"] - subscriber = jid.internJID(iq.pubsub.subscribe["jid"]) - except KeyError: - raise BadRequest - - requestor = jid.internJID(iq["from"]).userhostJID() - d = self.backend.subscribe(node_id, subscriber, requestor) - d.addCallback(self.return_subscription, subscriber) - return d - - def return_subscription(self, result, subscriber): - node, state = result - - reply = domish.Element((NS_PUBSUB, "pubsub")) - subscription = reply.addElement("subscription") - subscription["node"] = node - subscription["jid"] = subscriber.full() - subscription["subscription"] = state - return [reply] - - def onUnsubscribe(self, iq): - self.handler_wrapper(self._onUnsubscribe, iq) - - def _onUnsubscribe(self, iq): - try: - node_id = iq.pubsub.unsubscribe["node"] - subscriber = jid.internJID(iq.pubsub.unsubscribe["jid"]) - except KeyError: - raise BadRequest - - requestor = jid.internJID(iq["from"]).userhostJID() - return self.backend.unsubscribe(node_id, subscriber, requestor) - - def onOptionsGet(self, iq): - self.handler_wrapper(self._onOptionsGet, iq) - - def _onOptionsGet(self, iq): - raise OptionsUnavailable - - def onOptionsSet(self, iq): - self.handler_wrapper(self._onOptionsSet, iq) - - def _onOptionsSet(self, iq): - raise OptionsUnavailable - - def onSubscriptions(self, iq): - self.handler_wrapper(self._onSubscriptions, iq) - - def _onSubscriptions(self, iq): - entity = jid.internJID(iq["from"]).userhostJID() - d = self.backend.get_subscriptions(entity) - d.addCallback(self._return_subscriptions_response, iq) - return d - - def _return_subscriptions_response(self, result, iq): - reply = domish.Element((NS_PUBSUB, 'pubsub')) - subscriptions = reply.addElement('subscriptions') - for node, subscriber, state in result: - item = subscriptions.addElement('subscription') - item['node'] = node - item['jid'] = subscriber.full() - item['subscription'] = state - return [reply] - -components.registerAdapter(ComponentServiceFromSubscriptionService, - backend.ISubscriptionService, - IService) - -class ComponentServiceFromNodeCreationService(Service): - - def componentConnected(self, xmlstream): - xmlstream.addObserver(PUBSUB_CREATE, self.onCreate) - xmlstream.addObserver(PUBSUB_DEFAULT, self.onDefault) - xmlstream.addObserver(PUBSUB_CONFIGURE_GET, self.onConfigureGet) - xmlstream.addObserver(PUBSUB_CONFIGURE_SET, self.onConfigureSet) - - def get_disco_info(self, node): - info = [] - - if not node: - info.append(disco.Feature(NS_PUBSUB + "#create-nodes")) - info.append(disco.Feature(NS_PUBSUB + "#config-node")) - info.append(disco.Feature(NS_PUBSUB + "#retrieve-default")) - - if self.backend.supports_instant_nodes(): - info.append(disco.Feature(NS_PUBSUB + "#instant-nodes")) - - return defer.succeed(info) - - def onCreate(self, iq): - print "onCreate" - self.handler_wrapper(self._onCreate, iq) - - def _onCreate(self, iq): - node = iq.pubsub.create.getAttribute("node") - - owner = jid.internJID(iq["from"]).userhostJID() - - d = self.backend.create_node(node, owner) - d.addCallback(self._return_create_response, iq) - return d - - def _return_create_response(self, result, iq): - node_id = iq.pubsub.create.getAttribute("node") - if not node_id or node_id != result: - reply = domish.Element((NS_PUBSUB, 'pubsub')) - entity = reply.addElement('create') - entity['node'] = result - return [reply] - - def onDefault(self, iq): - self.handler_wrapper(self._onDefault, iq) - - def _onDefault(self, iq): - d = self.backend.get_default_configuration() - d.addCallback(self._return_default_response) - return d - - def _return_default_response(self, options): - reply = domish.Element((NS_PUBSUB_OWNER, "pubsub")) - default = reply.addElement("default") - default.addChild(self._form_from_configuration(options)) - - return [reply] - - def onConfigureGet(self, iq): - self.handler_wrapper(self._onConfigureGet, iq) - - def _onConfigureGet(self, iq): - node_id = iq.pubsub.configure.getAttribute("node") - - d = self.backend.get_node_configuration(node_id) - d.addCallback(self._return_configuration_response, node_id) - return d - - def _return_configuration_response(self, options, node_id): - reply = domish.Element((NS_PUBSUB_OWNER, "pubsub")) - configure = reply.addElement("configure") - if node_id: - configure["node"] = node_id - configure.addChild(self._form_from_configuration(options)) - - return [reply] - - def _form_from_configuration(self, options): - form = data_form.Form(type="form", - form_type=NS_PUBSUB + "#node_config") - - for option in options: - form.add_field(**option) - - return form - - def onConfigureSet(self, iq): - print "onConfigureSet" - self.handler_wrapper(self._onConfigureSet, iq) - - def _onConfigureSet(self, iq): - node_id = iq.pubsub.configure["node"] - requestor = jid.internJID(iq["from"]).userhostJID() - - for element in iq.pubsub.configure.elements(): - if element.name != 'x' or element.uri != data_form.NS_X_DATA: - continue - - type = element.getAttribute("type") - if type == "cancel": - return None - elif type != "submit": - continue - - options = self._get_form_options(element) - - if options["FORM_TYPE"] == NS_PUBSUB + "#node_config": - del options["FORM_TYPE"] - return self.backend.set_node_configuration(node_id, - options, - requestor) - - raise BadRequest - - def _get_form_options(self, form): - options = {} - - for element in form.elements(): - if element.name == 'field' and element.uri == data_form.NS_X_DATA: - try: - options[element["var"]] = str(element.value) - except (KeyError, AttributeError): - raise BadRequest - - return options - -components.registerAdapter(ComponentServiceFromNodeCreationService, - backend.INodeCreationService, - IService) - -class ComponentServiceFromAffiliationsService(Service): - - def componentConnected(self, xmlstream): - xmlstream.addObserver(PUBSUB_AFFILIATIONS, self.onAffiliations) - - def get_disco_info(self, node): - info = [] - - if not node: - info.append(disco.Feature(NS_PUBSUB + "#retrieve-affiliations")) - - return defer.succeed(info) - - def onAffiliations(self, iq): - self.handler_wrapper(self._onAffiliations, iq) - - def _onAffiliations(self, iq): - entity = jid.internJID(iq["from"]).userhostJID() - d = self.backend.get_affiliations(entity) - d.addCallback(self._return_affiliations_response, iq) - return d - - def _return_affiliations_response(self, result, iq): - reply = domish.Element((NS_PUBSUB, 'pubsub')) - affiliations = reply.addElement('affiliations') - for node, affiliation in result: - item = affiliations.addElement('affiliation') - item['node'] = node - item['affiliation'] = affiliation - return [reply] - -components.registerAdapter(ComponentServiceFromAffiliationsService, - backend.IAffiliationsService, - IService) - -class ComponentServiceFromItemRetrievalService(Service): - - def componentConnected(self, xmlstream): - xmlstream.addObserver(PUBSUB_ITEMS, self.onItems) - - def get_disco_info(self, node): - info = [] - - if not node: - info.append(disco.Feature(NS_PUBSUB + "#retrieve-items")) - - return defer.succeed(info) - - def onItems(self, iq): - self.handler_wrapper(self._onItems, iq) - - def _onItems(self, iq): - try: - node_id = iq.pubsub.items["node"] - except KeyError: - raise BadRequest - - max_items = iq.pubsub.items.getAttribute('max_items') - - if max_items: - try: - max_items = int(max_items) - except ValueError: - raise BadRequest - - item_ids = [] - for child in iq.pubsub.items.elements(): - if child.name == 'item' and child.uri == NS_PUBSUB: - try: - item_ids.append(child["id"]) - except KeyError: - raise BadRequest - - d = self.backend.get_items(node_id, - jid.internJID(iq["from"]), - max_items, - item_ids) - d.addCallback(self._return_items_response, node_id) - return d - - def _return_items_response(self, result, node_id): - reply = domish.Element((NS_PUBSUB, 'pubsub')) - items = reply.addElement('items') - items["node"] = node_id - for r in result: - items.addRawXml(r) - - return [reply] - -components.registerAdapter(ComponentServiceFromItemRetrievalService, - backend.IItemRetrievalService, - IService) - -class ComponentServiceFromRetractionService(Service): - - def componentConnected(self, xmlstream): - xmlstream.addObserver(PUBSUB_RETRACT, self.onRetract) - xmlstream.addObserver(PUBSUB_PURGE, self.onPurge) - - def get_disco_info(self, node): - info = [] - - if not node: - info.append(disco.Feature(NS_PUBSUB + "#delete-any")) - info.append(disco.Feature(NS_PUBSUB + "#retract-items")) - info.append(disco.Feature(NS_PUBSUB + "#purge-nodes")) - - return defer.succeed(info) - - def onRetract(self, iq): - self.handler_wrapper(self._onRetract, iq) - - def _onRetract(self, iq): - try: - node = iq.pubsub.retract["node"] - except KeyError: - raise BadRequest - - item_ids = [] - for child in iq.pubsub.retract.children: - if child.__class__ == domish.Element and child.name == 'item': - try: - item_ids.append(child["id"]) - except KeyError: - raise BadRequest - - requestor = jid.internJID(iq["from"]).userhostJID() - return self.backend.retract_item(node, item_ids, requestor) - - def onPurge(self, iq): - self.handler_wrapper(self._onPurge, iq) - - def _onPurge(self, iq): - try: - node = iq.pubsub.purge["node"] - except KeyError: - raise BadRequest - - return self.backend.purge_node(node, - jid.internJID(iq["from"]).userhostJID()) - -components.registerAdapter(ComponentServiceFromRetractionService, - backend.IRetractionService, - IService) - -class ComponentServiceFromNodeDeletionService(Service): - - def __init__(self, backend): - Service.__init__(self, backend) - self.subscribers = [] - - def componentConnected(self, xmlstream): - self.backend.register_pre_delete(self._pre_delete) - xmlstream.addObserver(PUBSUB_DELETE, self.onDelete) - - def get_disco_info(self, node): - info = [] - - if not node: - info.append(disco.Feature(NS_PUBSUB + "#delete-nodes")) - - return defer.succeed(info) - - def _pre_delete(self, node_id): - d = self.backend.get_subscribers(node_id) - d.addCallback(self._return_deferreds, node_id) - return d - - def _return_deferreds(self, subscribers, node_id): - d = defer.Deferred() - d.addCallback(self._notify, subscribers, node_id) - return [d] - - def _notify(self, result, subscribers, node_id): - message = domish.Element((NS_COMPONENT, "message")) - message["from"] = self.parent.jabberId - event = message.addElement((NS_PUBSUB_EVENT, "event")) - event.addElement("delete")["node"] = node_id - - for subscriber in subscribers: - message["to"] = subscriber - self.send(message) - - def onDelete(self, iq): - self.handler_wrapper(self._onDelete, iq) - - def _onDelete(self, iq): - try: - node = iq.pubsub.delete["node"] - except KeyError: - raise BadRequest - - return self.backend.delete_node(node, - jid.internJID(iq["from"]).userhostJID()) - -components.registerAdapter(ComponentServiceFromNodeDeletionService, - backend.INodeDeletionService, - IService)
--- a/idavoll/storage.py Thu Jan 18 14:08:32 2007 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,250 +0,0 @@ -# Copyright (c) 2003-2006 Ralph Meijer -# See LICENSE for details. - -from zope.interface import Interface -from twisted.words.protocols.jabber import jid -from twisted.words.xish import domish - -class Error(Exception): - msg = None - -class NodeNotFound(Error): - pass - -class NodeExists(Error): - pass - -class SubscriptionNotFound(Error): - pass - -class SubscriptionExists(Error): - pass - -class IStorage(Interface): - """ Storage interface """ - - def get_node(self, node_id): - """ Get Node. - - @param node_id: NodeID of the desired node. - @type node_id: L{str} - @return: deferred that returns a L{Node} object. - """ - - def get_node_ids(self): - """ Return all NodeIDs. - - @return: deferred that returns a list of NodeIDs (L{str}). - """ - - def create_node(self, node_id, owner, config = None, type='leaf'): - """ Create new node. - - The implementation should make sure, the passed owner JID is stripped - of the resource (e.g. using C{owner.userhostJID()}). - - @param node_id: NodeID of the new node. - @type node_id: L{str} - @param owner: JID of the new nodes's owner. - @type owner: L{jid.JID} - @param config: Configuration - @param type: Node type. Can be either C{'leaf'} or C{'collection'}. - @return: deferred that fires on creation. - """ - - def delete_node(self, node_id): - """ Delete a node. - - @param node_id: NodeID of the new node. - @type node_id: L{str} - @return: deferred that fires on deletion. - """ - - def get_affiliations(self, entity): - """ Get all affiliations for entity. - - The implementation should make sure, the passed owner JID is stripped - of the resource (e.g. using C{owner.userhostJID()}). - - @param entity: JID of the entity. - @type entity: L{jid.JID} - @return: deferred that returns a L{list} of tuples of the form - C{(node_id, affiliation)}, where C{node_id} is of the type - L{str} and C{affiliation} is one of C{'owner'}, C{'publisher'} - and C{'outcast'}. - """ - - def get_subscriptions(self, entity): - """ Get all subscriptions for an entity. - - The implementation should make sure, the passed owner JID is stripped - of the resource (e.g. using C{owner.userhostJID()}). - - @param entity: JID of the entity. - @type entity: L{jid.JID} - @return: deferred that returns a L{list} of tuples of the form - C{(node_id, subscriber, state)}, where C{node_id} is of the - type L{str}, C{subscriber} of the type {jid.JID}, and - C{state} is C{'subscribed'} or C{'pending'}. - """ - - -class INode(Interface): - """ Interface to the class of objects that represent nodes. """ - - def get_type(self): - """ Get node's type. - - @return: C{'leaf'} or C{'collection'}. - """ - - def get_configuration(self): - """ Get node's configuration. - - The configuration must at least have two options: - C{pubsub#persist_items}, and C{pubsub#deliver_payloads}. - - @return: L{dict} of configuration options. - """ - - def get_meta_data(self): - """ Get node's meta data. - - The meta data must be a superset of the configuration options, and - also at least should have a C{pubsub#node_type} entry. - - @return: L{dict} of meta data. - """ - - def set_configuration(self, options): - """ Set node's configuration. - - The elements of {options} will set the new values for those - configuration items. This means that only changing items have to - be given. - - @param options: a dictionary of configuration options. - @returns: a deferred that fires upon success. - """ - - def get_affiliation(self, entity): - """ Get affiliation of entity with this node. - - @param entity: JID of entity. - @type entity: L{jid.JID} - @return: deferred that returns C{'owner'}, C{'publisher'}, C{'outcast'} - or C{None}. - """ - - def get_subscription(self, subscriber): - """ Get subscription to this node of subscriber. - - @param subscriber: JID of the new subscriptions' entity. - @type subscriber: L{jid.JID} - @return: deferred that returns the subscription state (C{'subscribed'}, - C{'pending'} or C{None}). - """ - - def add_subscription(self, subscriber, state): - """ Add new subscription to this node with given state. - - @param subscriber: JID of the new subscriptions' entity. - @type subscriber: L{jid.JID} - @param state: C{'subscribed'} or C{'pending'} - @type state: L{str} - @return: deferred that fires on subscription. - """ - - def remove_subscription(self, subscriber): - """ Remove subscription to this node. - - @param subscriber: JID of the subscriptions' entity. - @type subscriber: L{jid.JID} - @return: deferred that fires on removal. - """ - - def get_subscribers(self): - """ Get list of subscribers to this node. - - Retrieves the list of entities that have a subscription to this - node. That is, having the state C{'subscribed'}. - - @return: a deferred that returns a L{list} of L{jid.JID}s. - """ - - def is_subscribed(self, entity): - """ Returns whether entity has any subscription to this node. - - Only returns C{True} when the subscription state (if present) is - C{'subscribed'} for any subscription that matches the bare JID. - - @param subscriber: bare JID of the subscriptions' entity. - @type subscriber: L{jid.JID} - @return: deferred that returns a L{bool}. - """ - - def get_affiliations(self): - """ Get affiliations of entities with this node. - - @return: deferred that returns a L{list} of tuples (jid, affiliation), - where jid is a L(jid.JID) and affiliation is one of C{'owner'}, - C{'publisher'}, C{'outcast'}. - """ - -class ILeafNode(Interface): - """ Interface to the class of objects that represent leaf nodes. """ - - def store_items(self, items, publisher): - """ Store items in persistent storage for later retrieval. - - @param items: The list of items to be stored. Each item is the - L{domish} representation of the XML fragment as defined - for C{<item/>} in the - C{http://jabber.org/protocol/pubsub} namespace. - @type items: L{list} of {domish.Element} - @param publisher: JID of the publishing entity. - @type publisher: L{jid.JID} - @return: deferred that fires upon success. - """ - - def remove_items(self, item_ids): - """ Remove items by id - - @param item_ids: L{list} of item ids. - @return: deferred that fires with a L{list} of ids of the items that - were deleted - """ - - def get_items(self, max_items=None): - """ Get items. - - If C{max_items} is not given, all items in the node are returned, - just like C{get_items_by_id}. Otherwise, C{max_items} limits - the returned items to a maximum of that number of most recently - published items. - - @param max_items: if given, a natural number (>0) that limits the - returned number of items. - @return: deferred that fires with a L{list} of found items. - """ - - def get_items_by_id(self, item_ids): - """ Get items by item id. - - Each item in the returned list is a unicode string that - represent the XML of the item as it was published, including the - item wrapper with item id. - - @param item_ids: L{list} of item ids. - @return: deferred that fires with a L{list} of found items. - """ - - def purge(self): - """ Purge node of all items in persistent storage. - - @return: deferred that fires when the node has been purged. - """ - - -class ISubscription(Interface): - """ """
--- a/idavoll/tap.py Thu Jan 18 14:08:32 2007 +0000 +++ b/idavoll/tap.py Wed Oct 03 12:41:43 2007 +0000 @@ -1,10 +1,18 @@ -# Copyright (c) 2003-2006 Ralph Meijer +# Copyright (c) 2003-2007 Ralph Meijer # See LICENSE for details. -from twisted.application import internet, service -from twisted.internet import interfaces +from twisted.application import service from twisted.python import usage -import idavoll +from twisted.words.protocols.jabber.jid import JID + +from wokkel.component import Component +from wokkel.disco import DiscoHandler +from wokkel.generic import FallbackHandler, VersionHandler +from wokkel.iwokkel import IPubSubService + +from idavoll.backend import BackendService + +__version__ = '0.6.0' class Options(usage.Options): optParameters = [ @@ -22,10 +30,42 @@ ('verbose', 'v', 'Show traffic'), ('hide-nodes', None, 'Hide all nodes for disco') ] - + def postOptions(self): if self['backend'] not in ['pgsql', 'memory']: raise usage.UsageError, "Unknown backend!" def makeService(config): - return idavoll.makeService(config) + s = service.MultiService() + + cs = Component(config["rhost"], int(config["rport"]), + config["jid"], config["secret"]) + cs.setServiceParent(s) + + cs.factory.maxDelay = 900 + + if config["verbose"]: + cs.logTraffic = True + + FallbackHandler().setHanderParent(cs) + VersionHandler('Idavoll', __version__).setHanderParent(cs) + DiscoHandler().setHanderParent(cs) + + if config['backend'] == 'pgsql': + from idavoll.pgsql_storage import Storage + st = Storage(user=config['dbuser'], + database=config['dbname'], + password=config['dbpass']) + elif config['backend'] == 'memory': + from idavoll.memory_storage import Storage + st = Storage() + + bs = BackendService(st) + bs.setServiceParent(s) + + ps = IPubSubService(bs) + ps.setHanderParent(cs) + ps.hideNodes = config["hide-nodes"] + ps.serviceJID = JID(config["jid"]) + + return s
--- a/idavoll/test/__init__.py Thu Jan 18 14:08:32 2007 +0000 +++ b/idavoll/test/__init__.py Wed Oct 03 12:41:43 2007 +0000 @@ -1,3 +1,6 @@ -# Copyright (c) 2003-2006 Ralph Meijer +# Copyright (c) 2003-2007 Ralph Meijer # See LICENSE for details. +""" +Tests for L{idavoll}. +"""
--- a/idavoll/test/test_backend.py Thu Jan 18 14:08:32 2007 +0000 +++ b/idavoll/test/test_backend.py Wed Oct 03 12:41:43 2007 +0000 @@ -1,12 +1,16 @@ -# Copyright (c) 2003-2006 Ralph Meijer +# Copyright (c) 2003-2007 Ralph Meijer # See LICENSE for details. +""" +Tests for L{idavoll.backend}. +""" + from twisted.trial import unittest from zope.interface import implements from twisted.internet import defer from twisted.words.protocols.jabber import jid -from idavoll import backend, storage +from idavoll import backend, error, iidavoll OWNER = jid.JID('owner@example.com') @@ -18,25 +22,26 @@ class testStorage: - implements(storage.IStorage) + implements(iidavoll.IStorage) def get_node(self, node_id): return defer.succeed(testNode()) def delete_node(self, node_id): if node_id in ['to-be-deleted']: - self.backend.delete_called = True + self.delete_called = True return defer.succeed(None) else: - return defer.fail(storage.NodeNotFound()) + return defer.fail(error.NodeNotFound()) -class NodeDeletionServiceTests: - pre_delete_called = False - delete_called = False +class BackendTest(unittest.TestCase): + def setUp(self): + self.storage = testStorage() + self.backend = backend.BackendService(self.storage) + self.storage.backend = self.backend - def setUpClass(self): - self.storage = testStorage() - self.storage.backend = self + self.pre_delete_called = False + self.delete_called = False def testDeleteNode(self): def pre_delete(node_id): @@ -44,19 +49,10 @@ return defer.succeed(None) def cb(result): - self.assert_(self.pre_delete_called) - self.assert_(self.delete_called) + self.assertTrue(self.pre_delete_called) + self.assertTrue(self.storage.delete_called) self.backend.register_pre_delete(pre_delete) d = self.backend.delete_node('to-be-deleted', OWNER) d.addCallback(cb) - -class GenericNodeDeletionServiceTestCase(unittest.TestCase, - NodeDeletionServiceTests): - - def setUpClass(self): - NodeDeletionServiceTests.setUpClass(self) - from idavoll.generic_backend import BackendService, NodeDeletionService - bs = BackendService(self.storage) - self.backend = NodeDeletionService() - self.backend.setServiceParent(bs) + return d
--- a/idavoll/test/test_storage.py Thu Jan 18 14:08:32 2007 +0000 +++ b/idavoll/test/test_storage.py Wed Oct 03 12:41:43 2007 +0000 @@ -1,12 +1,18 @@ -# Copyright (c) 2003-2006 Ralph Meijer +# Copyright (c) 2003-2007 Ralph Meijer # See LICENSE for details. +""" +Tests for L{idavoll.memory_storage} and L{idavoll.pgsql_storage}. +""" + from twisted.trial import unittest from twisted.words.protocols.jabber import jid from twisted.internet import defer from twisted.words.xish import domish -from idavoll import storage, pubsub +from wokkel import pubsub + +from idavoll import error OWNER = jid.JID('owner@example.com') SUBSCRIBER = jid.JID('subscriber@example.com/Home') @@ -38,7 +44,7 @@ def _assignTestNode(self, node): self.node = node - def setUpClass(self): + def setUp(self): d = self.s.get_node('pre-existing') d.addCallback(self._assignTestNode) return d @@ -48,7 +54,7 @@ def testGetNonExistingNode(self): d = self.s.get_node('non-existing') - self.assertFailure(d, storage.NodeNotFound) + self.assertFailure(d, error.NodeNotFound) return d def testGetNodeIDs(self): @@ -60,7 +66,7 @@ def testCreateExistingNode(self): d = self.s.create_node('pre-existing', OWNER) - self.assertFailure(d, storage.NodeExists) + self.assertFailure(d, error.NodeExists) return d def testCreateNode(self): @@ -74,13 +80,13 @@ def testDeleteNonExistingNode(self): d = self.s.delete_node('non-existing') - self.assertFailure(d, storage.NodeNotFound) + self.assertFailure(d, error.NodeNotFound) return d def testDeleteNode(self): def cb(void): d = self.s.get_node('to-be-deleted') - self.assertFailure(d, storage.NodeNotFound) + self.assertFailure(d, error.NodeNotFound) return d d = self.s.delete_node('to-be-deleted') @@ -98,7 +104,7 @@ def testGetSubscriptions(self): def cb(subscriptions): self.assertIn(('pre-existing', SUBSCRIBER, 'subscribed'), subscriptions) - + d = self.s.get_subscriptions(SUBSCRIBER) d.addCallback(cb) return d @@ -124,7 +130,7 @@ def check_object_config(node): config = node.get_configuration() self.assertEqual(config['pubsub#persist_items'], False) - + def get_node(void): return self.s.get_node('to-be-reconfigured') @@ -177,9 +183,9 @@ def testAddExistingSubscription(self): d = self.node.add_subscription(SUBSCRIBER, 'pending') - self.assertFailure(d, storage.SubscriptionExists) + self.assertFailure(d, error.SubscriptionExists) return d - + def testGetSubscription(self): def cb(subscriptions): self.assertEquals(subscriptions[0][1], 'subscribed') @@ -197,9 +203,9 @@ def testRemoveNonExistingSubscription(self): d = self.node.remove_subscription(OWNER) - self.assertFailure(d, storage.SubscriptionNotFound) + self.assertFailure(d, error.SubscriptionNotFound) return d - + def testGetSubscribers(self): def cb(subscribers): self.assertIn(SUBSCRIBER, subscribers) @@ -309,7 +315,7 @@ def cb2(node): return node.get_items() - + def cb3(result): self.assertEqual([], result) @@ -326,7 +332,7 @@ def cb2(affiliations): affiliations = dict(((a[0].full(), a[1]) for a in affiliations)) self.assertEquals(affiliations[OWNER.full()], 'owner') - + d = self.s.get_node('pre-existing') d.addCallback(cb1) d.addCallback(cb2) @@ -334,7 +340,7 @@ class MemoryStorageStorageTestCase(unittest.TestCase, StorageTests): - def setUpClass(self): + def setUp(self): from idavoll.memory_storage import Storage, LeafNode, Subscription, \ default_config self.s = Storage() @@ -363,18 +369,18 @@ self.s._nodes['pre-existing']._items['current'] = item self.s._nodes['pre-existing']._itemlist.append(item) - return StorageTests.setUpClass(self) + return StorageTests.setUp(self) class PgsqlStorageStorageTestCase(unittest.TestCase, StorageTests): - def _callSuperSetUpClass(self, void): - return StorageTests.setUpClass(self) + def _callSuperSetUp(self, void): + return StorageTests.setUp(self) - def setUpClass(self): + def setUp(self): from idavoll.pgsql_storage import Storage self.s = Storage('ralphm', 'pubsub_test') self.s._dbpool.start() d = self.s._dbpool.runInteraction(self.init) - d.addCallback(self._callSuperSetUpClass) + d.addCallback(self._callSuperSetUp) return d def tearDownClass(self): @@ -444,7 +450,7 @@ WHERE node='pre-existing'""", (PUBLISHER.userhost(), ITEM.toXml())) - + def cleandb(self, cursor): cursor.execute("""DELETE FROM nodes WHERE node in ('non-existing', 'pre-existing', 'to-be-deleted', @@ -460,3 +466,9 @@ SUBSCRIBER_PENDING.userhost()) cursor.execute("""DELETE FROM entities WHERE jid=%s""", PUBLISHER.userhost()) + +try: + import pyPgSQL + pyPgSQL +except ImportError: + PgsqlStorageStorageTestCase.skip = "pyPgSQL not available"