changeset 107:d252d793f0ed

Initial revision.
author Ralph Meijer <ralphm@ik.nu>
date Fri, 08 Apr 2005 10:15:02 +0000
parents dc36882d2620
children 1c18759d2afb
files idavoll/generic_backend.py idavoll/memory_storage.py idavoll/pgsql_storage.py idavoll/storage.py
diffstat 4 files changed, 898 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/idavoll/generic_backend.py	Fri Apr 08 10:15:02 2005 +0000
@@ -0,0 +1,390 @@
+import sha
+import time
+from twisted.words.protocols.jabber import jid
+from twisted.application import service
+from twisted.xish import utility
+from twisted.internet import defer
+from zope.interface import implements
+import backend
+
+def _get_affiliation(node, entity):
+    d = node.get_affiliation(entity)
+    d.addCallback(lambda affiliation: (node, affiliation))
+    return d
+
+class BackendService(service.MultiService, utility.EventDispatcher):
+
+    implements(backend.IBackendService)
+
+    options = {"pubsub#persist_items":
+                  {"type": "boolean",
+                   "label": "Persist items to storage"},
+               "pubsub#deliver_payloads":
+                  {"type": "boolean",
+                   "label": "Deliver payloads with event notifications"},
+              }
+
+    default_config = {"pubsub#persist_items": True,
+                      "pubsub#deliver_payloads": True,
+                     }
+
+    def __init__(self, storage):
+        service.MultiService.__init__(self)
+        utility.EventDispatcher.__init__(self)
+        self.storage = storage
+
+    def supports_publisher_affiliation(self):
+        return True
+
+    def supports_outcast_affiliation(self):
+        return True
+
+    def supports_persistent_items(self):
+        return True
+
+    def get_node_type(self, node_id):
+        d = self.storage.get_node(node_id)
+        d.addCallback(lambda node: node.get_type())
+        return d
+
+    def get_nodes(self):
+        return self.storage.get_node_ids()
+
+    def get_node_meta_data(self, node_id):
+        d = self.storage.get_node(node_id)
+        d.addCallback(lambda node: node.get_meta_data())
+        d.addCallback(self._make_meta_data)
+        return d
+
+    def _make_meta_data(self, meta_data):
+        options = []
+        for key, value in meta_data.iteritems():
+            if self.options.has_key(key):
+                option = {"var": key}
+                option.update(self.options[key])
+                option["value"] = value
+                options.append(option)
+
+        return options
+
+class PublishService(service.Service):
+    
+    implements(backend.IPublishService)
+
+    def publish(self, node_id, items, requestor):
+        d = self.parent.storage.get_node(node_id)
+        d.addCallback(_get_affiliation, requestor)
+        d.addCallback(self._do_publish, items, requestor)
+        return d
+
+    def _do_publish(self, result, items, requestor):
+        node, affiliation = result
+        configuration = node.get_configuration()
+        persist_items = configuration["pubsub#persist_items"]
+        deliver_payloads = configuration["pubsub#deliver_payloads"]
+        
+        if affiliation not in ['owner', 'publisher']:
+            raise backend.NotAuthorized
+
+        if items and not persist_items and not deliver_payloads:
+            raise backend.NoPayloadAllowed
+        elif not items and (persist_items or deliver_payloads):
+            raise backend.PayloadExpected
+
+        if persist_items or deliver_payloads:
+            for item in items:
+                if not item.getAttribute("id"):
+                    item["id"] = sha.new(str(time.time()) +
+                                         requestor.full()).hexdigest()
+
+        if persist_items:
+            d = node.store_items(items, requestor)
+        else:
+            d = defer.succeed(None)
+
+        d.addCallback(self._do_notify, node.id, items, deliver_payloads)
+
+    def _do_notify(self, result, node_id, items, deliver_payloads):
+        if items and not deliver_payloads:
+            for item in items:
+                item.children = []
+
+        self.parent.dispatch({ 'items': items, 'node_id': node_id },
+                             '//event/pubsub/notify')
+
+class NotificationService(service.Service):
+
+    implements(backend.INotificationService)
+
+    def get_notification_list(self, node_id, items):
+        d = self.parent.storage.get_node(node_id)
+        d.addCallback(lambda node: node.get_subscribers())
+        d.addCallback(self._magic_filter, node_id, items)
+        return d
+
+    def _magic_filter(self, subscribers, node_id, items):
+        list = []
+        for subscriber in subscribers:
+            list.append((subscriber, items))
+        return list
+
+    def register_notifier(self, observerfn, *args, **kwargs):
+        self.parent.addObserver('//event/pubsub/notify', observerfn,
+                                *args, **kwargs)
+
+class SubscriptionService(service.Service):
+
+    implements(backend.ISubscriptionService)
+
+    def subscribe(self, node_id, subscriber, requestor):
+        subscriber_entity = subscriber.userhostJID()
+        if subscriber_entity != requestor:
+            return defer.fail(backend.NotAuthorized)
+
+        d = self.parent.storage.get_node(node_id)
+        d.addCallback(_get_affiliation, subscriber_entity)
+        d.addCallback(self._do_subscribe, subscriber)
+        return d
+
+    def _do_subscribe(self, result, subscriber):
+        node, affiliation = result
+
+        if affiliation == 'outcast':
+            raise backend.NotAuthorized
+
+        d = node.add_subscription(subscriber, 'subscribed')
+        d.addCallback(self._return_subscription, affiliation)
+        return d
+
+    def _return_subscription(self, result, affiliation):
+        result['affiliation'] = affiliation
+        return result
+
+    def unsubscribe(self, node_id, subscriber, requestor):
+        if subscriber.userhostJID() != requestor:
+            raise backend.NotAuthorized
+
+        d = self.parent.storage.get_node(node_id)
+        d.addCallback(lambda node: node.remove_subscription(subscriber))
+        return d
+
+class NodeCreationService(service.Service):
+
+    implements(backend.INodeCreationService)
+
+    def supports_instant_nodes(self):
+        return True
+
+    def create_node(self, node_id, requestor):
+        if not node_id:
+            node_id = 'generic/%s' % sha.new(str(time.time()) +
+                                             requestor.full()).hexdigest()
+
+        d = self.parent.storage.create_node(node_id, requestor)
+        d.addCallback(lambda _: node_id)
+        return d
+
+    def get_node_configuration(self, node_id):
+        if node_id:
+            d = self.parent.storage.get_node(node_id)
+            d.addCallback(lambda node: node.get_configuration())
+        else:
+            # XXX: this is disabled in pubsub.py
+            d = defer.succeed(self.parent.default_config)
+
+        d.addCallback(self._make_config)
+        return d
+
+    def _make_config(self, config):
+        options = []
+        for key, value in self.parent.options.iteritems():
+            option = {"var": key}
+            option.update(value)
+            if config.has_key(key):
+                option["value"] = config[key]
+            options.append(option)
+
+        return options
+
+    def set_node_configuration(self, node_id, options, requestor):
+        for key, value in options.iteritems():
+            if not self.parent.options.has_key(key):
+                raise backend.InvalidConfigurationOption
+            if self.parent.options[key]["type"] == 'boolean':
+                try:
+                    options[key] = bool(int(value))
+                except ValueError:
+                    raise backend.InvalidConfigurationValue
+
+        d = self.parent.storage.get_node(node_id)
+        d.addCallback(_get_affiliation, requestor)
+        d.addCallback(self._do_set_node_configuration, options)
+        return d
+
+    def _do_set_node_configuration(self, result, options):
+        node, affiliation = result
+
+        if affiliation != 'owner':
+            raise backend.NotAuthorized
+
+        return node.set_configuration(options)
+
+class AffiliationsService(service.Service):
+
+    implements(backend.IAffiliationsService)
+
+    def get_affiliations(self, entity):
+        d1 = self.parent.storage.get_affiliations(entity)
+        d2 = self.parent.storage.get_subscriptions(entity)
+        d = defer.DeferredList([d1, d2], fireOnOneErrback=1, consumeErrors=1)
+        d.addErrback(lambda x: x.value[0])
+        d.addCallback(self._affiliations_result, entity)
+        return d
+
+    def _affiliations_result(self, result, entity):
+        affiliations = result[0][1]
+        subscriptions = result[1][1]
+
+        new_affiliations = {}
+
+        for node, affiliation in affiliations:
+            new_affiliations[(node, entity.full())] = {'node': node,
+                                                'jid': entity,
+                                                'affiliation': affiliation,
+                                                'subscription': None
+                                               }
+
+        for node, subscriber, subscription in subscriptions:
+            key = node, subscriber.full()
+            if new_affiliations.has_key(key):
+                new_affiliations[key]['subscription'] = subscription
+            else:
+                new_affiliations[key] = {'node': node,
+                                         'jid': subscriber,
+                                         'affiliation': None,
+                                         'subscription': subscription}
+
+        return new_affiliations.values()
+
+class ItemRetrievalService(service.Service):
+
+    implements(backend.IItemRetrievalService)
+
+    def get_items(self, node_id, requestor, max_items=None, item_ids=[]):
+        d = self.parent.storage.get_node(node_id)
+        d.addCallback(self._is_subscribed, requestor)
+        d.addCallback(self._do_get_items, max_items, item_ids)
+        return d
+
+    def _is_subscribed(self, node, subscriber):
+        d = node.is_subscribed(subscriber)
+        d.addCallback(lambda subscribed: (node, subscribed))
+        return d
+
+    def _do_get_items(self, result, max_items, item_ids):
+        node, subscribed = result
+
+        if not subscribed:
+            raise backend.NotAuthorized
+
+        if item_ids:
+            return node.get_items_by_id(item_ids)
+        else:
+            return node.get_items(max_items)
+
+class RetractionService(service.Service):
+
+    implements(backend.IRetractionService)
+                                                                                
+    def retract_item(self, node_id, item_ids, requestor):
+        d = self.parent.storage.get_node(node_id)
+        d.addCallback(_get_affiliation, requestor)
+        d.addCallback(self._do_retract, item_ids)
+        return d
+                                                                                
+    def _do_retract(self, result, item_ids):
+        node, affiliation = result
+        persist_items = node.get_configuration()["pubsub#persist_items"]
+                                                                                
+        if affiliation not in ['owner', 'publisher']:
+            raise backend.NotAuthorized
+                                                                                
+        if not persist_items:
+            raise backend.NodeNotPersistent
+                                                                                
+        d = node.remove_items(item_ids)
+        d.addCallback(self._do_notify_retraction, node.id)
+        return d
+                                                                                
+    def _do_notify_retraction(self, result, node_id):
+        self.parent.dispatch({ 'item_ids': result, 'node_id': node_id },
+                             '//event/pubsub/retract')
+                                                                                
+    def purge_node(self, node_id, requestor):
+        d = self.parent.storage.get_node(node_id)
+        d.addCallback(_get_affiliation, requestor)
+        d.addCallback(self._do_purge)
+        return d
+    
+    def _do_purge(self, result):
+        node, affiliation = result
+        persist_items = node.get_configuration()["pubsub#persist_items"]
+                                                                                
+        if affiliation != 'owner':
+            raise backend.NotAuthorized
+                                                                                
+        if not persist_items:
+            raise backend.NodeNotPersistent
+                                                                                
+        d = node.purge()
+        d.addCallback(self._do_notify_purge, node.id)
+        return d
+    
+    def _do_notify_purge(self, result, node_id):
+        self.parent.dispatch(node_id, '//event/pubsub/purge')
+
+class NodeDeletionService(service.Service):
+
+    implements(backend.INodeDeletionService)
+
+    def __init__(self):
+        self._callback_list = []
+
+    def register_pre_delete(self, pre_delete_fn):
+        self._callback_list.append(pre_delete_fn)
+
+    def get_subscribers(self, node_id):
+        d = self.parent.storage.get_node(node_id)
+        d.addCallback(lambda node: node.get_subscribers())
+        return d
+
+    def delete_node(self, node_id, requestor):
+        d = self.parent.storage.get_node(node_id)
+        d.addCallback(_get_affiliation, requestor)
+        d.addCallback(self._do_pre_delete)
+        return d
+    
+    def _do_pre_delete(self, result):
+        node, affiliation = result
+                                                                                
+        if affiliation != 'owner':
+            raise backend.NotAuthorized
+
+        d = defer.DeferredList([cb(node_id) for cb in self._callback_list],
+                               consumeErrors=1)
+        d.addCallback(self._do_delete, node.id)
+
+    def _do_delete(self, result, node_id):
+        dl = []
+        for succeeded, r in result:
+            if succeeded and r:
+                dl.extend(r)
+
+        d = self.parent.storage.delete_node(node_id)
+        d.addCallback(self._do_notify_delete, dl)
+
+        return d
+    
+    def _do_notify_delete(self, result, dl):
+        for d in dl:
+            d.callback(None)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/idavoll/memory_storage.py	Fri Apr 08 10:15:02 2005 +0000
@@ -0,0 +1,194 @@
+import copy
+from zope.interface import implements
+from twisted.internet import defer
+from twisted.words.protocols.jabber import jid
+import storage
+
+default_config = {"pubsub#persist_items": False,
+                  "pubsub#deliver_payloads": False}
+
+class Storage:
+
+    implements(storage.IStorage)
+
+    def __init__(self):
+        self._nodes = {}
+
+    def get_node(self, node_id):
+        try:
+            node = self._nodes[node_id]
+        except KeyError:
+            return defer.fail(storage.NodeNotFound())
+
+        return defer.succeed(node)
+
+    def get_node_ids(self):
+        return defer.succeed(self._nodes.keys())
+
+    def create_node(self, node_id, owner, config = None, type='leaf'):
+        if node_id in self._nodes:
+            return defer.fail(storage.NodeExists())
+
+        if not config:
+            config = copy.copy(default_config)
+
+        if type != 'leaf':
+            raise NotImplementedError
+
+        node = LeafNode(node_id, owner, config)
+        self._nodes[node_id] = node
+
+        return defer.succeed(None)
+
+    def delete_node(self, node_id):
+        try:
+            del self._nodes[node_id]
+        except KeyError:
+            return defer.fail(storage.NodeNotFound())
+
+        return defer.succeed(None)
+
+    def get_affiliations(self, entity):
+        entity_full = entity.full()
+        return defer.succeed([(node.id, node._affiliations[entity_full])
+                              for name, node in self._nodes.iteritems()
+                              if entity_full in node._affiliations])
+
+    def get_subscriptions(self, entity):
+        subscriptions = []
+        for node in self._nodes.itervalues():
+            for subscriber, subscription in node._subscriptions.iteritems():
+                subscriber = jid.JID(subscriber)
+                if subscriber.userhostJID() == entity:
+                    subscriptions.append((node.id, subscriber,
+                                          subscription.state))
+
+        return defer.succeed(subscriptions)
+        
+class Node:
+
+    implements(storage.INode)
+
+    def __init__(self, node_id, owner, config):
+        self.id = node_id
+        self._affiliations = {owner.full(): 'owner'}
+        self._subscriptions = {}
+        self._config = config
+
+    def get_type(self):
+        return self.type
+
+    def get_configuration(self):
+        return self._config
+    
+    def get_meta_data(self):
+        config = copy.copy(self._config)
+        config["pubsub#node_type"] = self.type
+        return config
+
+    def set_configuration(self, options):
+        for option in options:
+            if option in self._config:
+                self._config[option] = options[option]
+
+        return defer.succeed(None)
+                
+    def get_affiliation(self, entity):
+        return defer.succeed(self._affiliations.get(entity.full()))
+
+    def add_subscription(self, subscriber, state):
+        try:
+            subscription = self._subscriptions[subscriber.full()]
+        except:
+            subscription = Subscription(state)
+            self._subscriptions[subscriber.full()] = subscription
+
+        return defer.succeed({'node': self.id,
+                              'jid': subscriber,
+                              'subscription': subscription.state})
+
+    def remove_subscription(self, subscriber):
+        del self._subscriptions[subscriber.full()]
+
+        return defer.succeed(None)
+
+    def get_subscribers(self):
+        subscribers = [jid.JID(subscriber) for subscriber, subscription
+                       in self._subscriptions.iteritems()
+                       if subscription.state == 'subscribed']
+
+        return defer.succeed(subscribers)
+
+    def is_subscribed(self, subscriber):
+        try:
+            subscription = self._subscriptions[subscriber.full()]
+        except KeyError:
+            return defer.succeed(False)
+
+        return defer.succeed(subscription.state == 'subscribed')
+
+class LeafNode(Node):
+
+    implements(storage.ILeafNode)
+    type = 'leaf'
+
+    def __init__(self, node_id, owner, config):
+        Node.__init__(self, node_id, owner, config)
+        self._items = {}
+        self._itemlist = []
+
+    def store_items(self, items, publisher):
+        for data in items:
+            id = data["id"]
+            item = (data.toXml(), publisher)
+            if id in self._items:
+                self._itemlist.remove(self._items[id])
+            self._items[id] = item
+            self._itemlist.append(item)
+
+        return defer.succeed(None)
+
+    def remove_items(self, item_ids):
+        deleted = []
+
+        for item_id in item_ids:
+            try:
+                item = self._items[item_id]
+                self._itemlist.remove(item)
+                del self._items[item_id]
+                deleted.append(item_id)
+            except KeyError:
+                pass
+        
+        return defer.succeed(deleted)
+
+    def get_items(self, max_items=None):
+        if max_items:
+            list = self._itemlist[-max_items:]
+        else:
+            list = self._itemlist
+        return defer.succeed([item[0] for item in list])
+    
+    def get_items_by_id(self, item_ids):
+        items = []
+        for item_id in item_ids:
+            try:
+                item = self._items[item_id]
+            except KeyError:
+                pass
+            else:
+                items.append(item[0])
+        return defer.succeed(items)
+
+    def purge(self):
+        self._items = {}
+        self._itemlist = []
+
+        return defer.succeed(None)
+
+class Subscription:
+
+    implements(storage.ISubscription)
+
+    def __init__(self, state):
+        self.state = state
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/idavoll/pgsql_storage.py	Fri Apr 08 10:15:02 2005 +0000
@@ -0,0 +1,229 @@
+import copy
+import storage
+from twisted.enterprise import adbapi
+from twisted.internet import defer
+from twisted.words.protocols.jabber import jid
+from zope.interface import implements
+
+class Storage:
+
+    implements(storage.IStorage)
+
+    def __init__(self, user, database):
+        self._dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', user=user,
+                database=database)
+
+    def get_node(self, node_id):
+        return self._dbpool.runInteraction(self._get_node, node_id)
+
+    def _get_node(self, cursor, node_id):
+        configuration = {}
+        cursor.execute("""SELECT persistent, deliver_payload FROM nodes
+                          WHERE node=%s""",
+                       (node_id,))
+        try:
+            (configuration["pubsub#persist_items"],
+             configuration["pubsub#deliver_payloads"]) = cursor.fetchone()
+        except TypeError:
+            raise storage.NodeNotFound
+        else:
+            node = LeafNode(node_id, configuration)
+            node._dbpool = self._dbpool
+            return node
+
+    def get_node_ids(self):
+        d = self._dbpool.runQuery("""SELECT node from nodes""")
+        d.addCallback(lambda results: [r[0] for r in results])
+        return d
+
+    def create_node(self, node_id, owner, type='leaf'):
+        return self._dbpool.runInteraction(self._create_node, node_id, owner)
+
+    def _create_node(self, cursor, node_id, owner):
+        try:
+            cursor.execute("""INSERT INTO nodes (node) VALUES (%s)""",
+                           (node_id.encode('utf8')))
+        except:
+            raise storage.NodeExists
+       
+        cursor.execute("""SELECT 1 from entities where jid=%s""",
+                       (owner.full().encode('utf8')))
+
+        if not cursor.fetchone():
+            cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
+                           (owner.full().encode('utf8')))
+
+        cursor.execute("""INSERT INTO affiliations
+                          (node_id, entity_id, affiliation)
+                          SELECT n.id, e.id, 'owner' FROM
+                          (SELECT id FROM nodes WHERE node=%s) AS n
+                          CROSS JOIN
+                          (SELECT id FROM entities WHERE jid=%s) AS e""",
+                       (node_id.encode('utf8'),
+                        owner.full().encode('utf8')))
+
+    def delete_node(self, node_id):
+        return self._dbpool.runInteraction(self._delete_node, node_id)
+
+    def _delete_node(self, cursor, node_id):
+        cursor.execute("""DELETE FROM nodes WHERE node=%s""",
+                       (node_id.encode('utf-8'),))
+
+        if cursor.rowcount != 1:
+            raise storage.NodeNotFound
+
+    def get_affiliations(self, entity):
+        d = self._dbpool.runQuery("""SELECT node, affiliation FROM entities
+                                        JOIN affiliations ON
+                                        (affiliations.entity_id=entities.id)
+                                        JOIN nodes ON
+                                        (nodes.id=affiliations.node_id)
+                                        WHERE jid=%s""",
+                                     (entity.full().encode('utf8'),))
+        d.addCallback(lambda results: [tuple(r) for r in results])
+        return d
+
+    def get_subscriptions(self, entity):
+        d = self._dbpool.runQuery("""SELECT node, jid, resource, subscription
+                                     FROM entities JOIN subscriptions ON
+                                     (subscriptions.entity_id=entities.id)
+                                     JOIN nodes ON
+                                     (nodes.id=subscriptions.node_id)
+                                     WHERE jid=%s""",
+                                  (entity.userhost().encode('utf8'),))
+        d.addCallback(self._convert_subscription_jids)
+        return d
+
+    def _convert_subscription_jids(self, subscriptions):
+        return [(node, jid.JID('%s/%s' % (subscriber, resource)), subscription)
+                for node, subscriber, resource, subscription in subscriptions]
+
+class Node:
+
+    implements(storage.INode)
+
+    def __init__(self, node_id, config):
+        self.id = node_id
+        self._config = config
+
+    def get_type(self):
+        return self.type
+
+    def get_configuration(self):
+        return self._config
+
+    def set_configuration(self, options):
+        return self._dbpool.runInteraction(self._set_node_configuration,
+                                           options)
+
+    def _set_configuration(self, cursor, options):
+        for option in options:
+            if option in self._config:
+                self._config[option] = options[option]
+        
+        cursor.execute("""UPDATE nodes SET persistent=%s, deliver_payload=%s
+                          WHERE node=%s""",
+                       (self._config["pubsub#persist_items"].encode('utf8'),
+                        self._config["pubsub#deliver_payloads"].encode('utf8'),
+                        self.id.encode('utf-8')))
+
+    def get_meta_data(self):
+        config = copy.copy(self._config)
+        config["pubsub#node_type"] = self.type
+        return config
+
+    def get_affiliation(self, entity):
+        return self._dbpool.runInteraction(self._get_affiliation, entity)
+
+    def _get_affiliation(self, cursor, entity):
+        cursor.execute("""SELECT affiliation FROM affiliations
+                          JOIN nodes ON (node_id=nodes.id)
+                          JOIN entities ON (entity_id=entities.id)
+                          WHERE node=%s AND jid=%s""",
+                       (self.id.encode('utf8'),
+                        entity.full().encode('utf8')))
+
+        try:
+            return cursor.fetchone()[0]
+        except TypeError:
+            return None
+
+    def add_subscription(self, subscriber, state):
+        return self._dbpool.runInteraction(self._add_subscription, subscriber,
+                                          state)
+
+    def _add_subscription(self, cursor, subscriber, state):
+        userhost = subscriber.userhost()
+        resource = subscriber.resource or ''
+
+        try:
+            cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
+                           (userhost.encode('utf8')))
+        except:
+            pass
+
+        try:
+            cursor.execute("""INSERT INTO subscriptions
+                              (node_id, entity_id, resource, subscription)
+                              SELECT n.id, e.id, %s, %s FROM
+                              (SELECT id FROM nodes WHERE node=%s) AS n
+                              CROSS JOIN
+                              (SELECT id FROM entities WHERE jid=%s) AS e""",
+                           (resource.encode('utf8'),
+                            state.encode('utf8'),
+                            self.id.encode('utf8'),
+                            userhost.encode('utf8')))
+        except:
+            cursor.execute("""SELECT subscription FROM subscriptions
+                              JOIN nodes ON (nodes.id=subscriptions.node_id)
+                              JOIN entities ON
+                                   (entities.id=subscriptions.entity_id)
+                              WHERE node=%s AND jid=%s AND resource=%s""",
+                           (self.id.encode('utf8'),
+                            userhost.encode('utf8'),
+                            resource.encode('utf8')))
+            state = cursor.fetchone()[0]
+
+        return {'node': self.id,
+                'jid': subscriber,
+                'subscription': state}
+
+    def remove_subscription(self, subscriber, state):
+        pass
+
+    def get_subscribers(self):
+        d = self._dbpool.runQuery("""SELECT jid, resource FROM subscriptions
+                                     JOIN nodes ON (node_id=nodes.id)
+                                     JOIN entities ON (entity_id=entities.id)
+                                     WHERE node=%s AND
+                                     subscription='subscribed'""",
+                                  (self.id.encode('utf8'),))
+        d.addCallback(self._convert_to_jids)
+        return d
+
+    def _convert_to_jids(self, list):
+        return [jid.JID("%s/%s" % (l[0], l[1])) for l in list]
+
+    def is_subscribed(self, subscriber):
+        pass
+
+class LeafNode(Node):
+
+    implements(storage.ILeafNode)
+
+    type = 'leaf'
+
+    def store_items(self, items, publisher):
+        return defer.succeed(None)
+
+    def remove_items(self, item_ids):
+        pass
+
+    def get_items(self, max_items=None):
+        pass
+
+    def get_items_by_id(self, item_ids):
+        pass
+
+    def purge(self):
+        pass
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/idavoll/storage.py	Fri Apr 08 10:15:02 2005 +0000
@@ -0,0 +1,85 @@
+from zope.interface import Interface
+
+class Error(Exception):
+    msg = None
+
+class NodeNotFound(Error):
+    pass
+
+
+class NodeExists(Error):
+    pass
+
+
+class IStorage(Interface):
+    """ """
+
+    def get_node(self, node_id):
+        """ """
+
+    def get_node_ids(self):
+        """ """
+
+    def create_node(self, node_id, owner, config = None, type='leaf'):
+        """ """
+
+    def delete_node(self, node_id):
+        """ """
+
+    def get_affiliations(self, entity):
+        """ """
+
+    def get_subscriptions(self, entity):
+        """ """
+
+
+class INode(Interface):
+    """ """
+    def get_type(self):
+        """ """
+
+    def get_configuration(self):
+        """ """
+
+    def get_meta_data(self):
+        """ """
+
+    def set_configuration(self, options):
+        """ """
+
+    def get_affiliation(self, entity):
+        """ """
+
+    def add_subscription(self, subscriber, state):
+        """ """
+
+    def remove_subscription(self, subscriber):
+        """ """
+
+    def get_subscribers(self):
+        """ """
+
+    def is_subscribed(self, subscriber):
+        """ """
+
+
+class ILeafNode(Interface):
+    """ """
+    def store_items(self, items, publisher):
+        """ """
+
+    def remove_items(self, item_ids):
+        """ """
+
+    def get_items(self, max_items=None):
+        """ """
+
+    def get_items_by_id(self, item_ids):
+        """ """
+
+    def purge(self):
+        """ """
+
+
+class ISubscription(Interface):
+    """ """