# HG changeset patch # User Ralph Meijer # Date 1114365034 0 # Node ID 49acdc6a2be4f4f9c594aeecea24f9bb3554dda3 # Parent 46453af6b0c3d30ff7d0f80d26116f819678aac7 Replaced by generic_backend.py and *storage.py diff -r 46453af6b0c3 -r 49acdc6a2be4 idavoll/memory_backend.py --- a/idavoll/memory_backend.py Sun Apr 24 17:46:18 2005 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,144 +0,0 @@ -from twisted.application import service -from twisted.internet import defer -from twisted.words.protocols.jabber import jid -import backend - -class Subscription: - def __init__(self, state): - self.state = state - -class NodeConfiguration: - def __init__(self): - self.persist_items = False - self.deliver_payloads = False - -class Node: - def __init__(self, id): - self.id = id - self.configuration = NodeConfiguration() - self.subscriptions = {} - self.affiliations = {} - self.items = {} - -class Storage: - def __init__(self): - self.nodes = {} - - def get_node_configuration(self, node_id): - try: - node = self.nodes[node_id] - except KeyError: - raise backend.NodeNotFound - else: - c = self.nodes[node_id].configuration - return defer.succeed({'pubsub#persist_items': c.persist_items, - 'pubsub#deliver_payloads': c.deliver_payloads}) - - def get_affiliation(self, node_id, entity): - try: - node = self.nodes[node_id] - except KeyError: - raise backend.NodeNotFound - else: - return defer.succeed(node.affiliations.get(entity.full(), None)) - - def get_subscribers(self, node_id): - try: - node = self.nodes[node_id] - except KeyError: - raise backend.NodeNotFound - else: - subscriptions = self.nodes[node_id].subscriptions - subscribers = [jid.JID(s) for s in subscriptions - if subscriptions[s].state == 'subscribed'] - return defer.succeed(subscribers) - - def store_items(self, node_id, items, publisher): - for item in items: - self.nodes[node_id].items[item["id"]] = (item, publisher) - return defer.succeed(None) - - def add_subscription(self, node_id, subscriber, state): - try: - node = self.nodes[node_id] - except KeyError: - raise backend.NodeNotFound - - try: - subscription = node.subscriptions[subscriber.full()] - except: - subscription = Subscription(state) - node.subscriptions[subscriber.full()] = subscription - - return defer.succeed({'node': node_id, - 'jid': subscriber, - 'subscription': subscription.state}) - - def remove_subscription(self, node_id, subscriber): - try: - node = self.nodes[node_id] - except KeyError: - raise backend.NodeNotFound - - try: - del node.subscriptions[subscriber.full()] - except KeyError: - raise backend.NotSubscribed - - return defer.succeed(None) - - def create_node(self, node_id, owner): - if node_id in self.nodes: - raise backend.NodeExists - - node = Node(node_id) - node.affiliations[owner.full()] = 'owner' - self.nodes[node_id] = node - - return defer.succeed(None) - - def get_affiliations(self, entity): - affiliations = [] - for node in self.nodes.itervalues(): - if entity.full() in node.affiliations: - affiliations.append((node.id, - node.affiliations[entity.full()])) - - return defer.succeed(affiliations) - - def get_subscriptions(self, entity): - subscriptions = [] - for node in self.nodes.itervalues(): - for subscriber, subscription in node.subscriptions.iteritems(): - subscriber_jid = jid.JID(subscriber) - if subscriber_jid.userhostJID() == entity: - subscriptions.append((node.id, subscriber_jid, - subscription.state)) - return defer.succeed(subscriptions) - - def get_node_type(self, node_id): - if node_id not in self.nodes: - raise backend.NodeNotFound - - return defer.succeed('leaf') - - def get_nodes(self): - return defer.succeed(self.nodes.keys()) - -class BackendService(backend.BackendService): - pass - -class NodeCreationService(backend.NodeCreationService): - pass - -class PublishService(backend.PublishService): - pass - -class NotificationService(backend.NotificationService): - pass - -class SubscriptionService(backend.SubscriptionService): - pass - -class AffiliationsService(backend.AffiliationsService): - pass diff -r 46453af6b0c3 -r 49acdc6a2be4 idavoll/pgsql_backend.py --- a/idavoll/pgsql_backend.py Sun Apr 24 17:46:18 2005 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,364 +0,0 @@ -from twisted.application import service -from twisted.internet import defer -from twisted.words.protocols.jabber import jid -from twisted.enterprise import adbapi -import backend - -class Storage: - def __init__(self, user, database): - self.dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', user=user, - database=database) - - def _check_node_exists(self, cursor, node_id): - cursor.execute("""SELECT id FROM nodes WHERE node=%s""", - (node_id.encode('utf8'))) - if not cursor.fetchone(): - raise backend.NodeNotFound - else: - return - - def _get_node_configuration(self, cursor, node_id): - configuration = {} - cursor.execute("""SELECT persistent, deliver_payload FROM nodes - WHERE node=%s""", - (node_id,)) - try: - (configuration["pubsub#persist_items"], - configuration["pubsub#deliver_payloads"]) = cursor.fetchone() - return configuration - except TypeError: - raise backend.NodeNotFound - - def get_node_configuration(self, node_id): - return self.dbpool.runInteraction(self._get_node_configuration, node_id) - - def _get_affiliation(self, cursor, node_id, entity): - self._check_node_exists(cursor, node_id) - cursor.execute("""SELECT affiliation FROM affiliations - JOIN nodes ON (node_id=nodes.id) - JOIN entities ON (entity_id=entities.id) - WHERE node=%s AND jid=%s""", - (node_id.encode('utf8'), - entity.full().encode('utf8'))) - - try: - return cursor.fetchone()[0] - except TypeError: - return None - - def get_affiliation(self, node_id, entity): - return self.dbpool.runInteraction(self._get_affiliation, node_id, - entity) - - def get_subscribers(self, node_id): - d = self.dbpool.runInteraction(self._get_subscribers, node_id) - d.addCallback(self._convert_to_jids) - return d - - def _get_subscribers(self, cursor,node_id): - self._check_node_exists(cursor, node_id) - cursor.execute("""SELECT jid, resource FROM subscriptions - JOIN nodes ON (node_id=nodes.id) - JOIN entities ON (entity_id=entities.id) - WHERE node=%s AND - subscription='subscribed'""", - (node_id.encode('utf8'),)) - return cursor.fetchall() - - def _convert_to_jids(self, list): - return [jid.JID("%s/%s" % (l[0], l[1])).full() for l in list] - - def store_items(self, node_id, items, publisher): - return self.dbpool.runInteraction(self._store_items, node_id, items, - publisher) - - def _store_items(self, cursor, node_id, items, publisher): - self._check_node_exists(cursor, node_id) - for item in items: - self._store_item(cursor, node_id, item, publisher) - - def _store_item(self, cursor, node_id, item, publisher): - data = item.toXml() - cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s - FROM nodes - WHERE nodes.id = items.node_id AND - nodes.node = %s and items.item=%s""", - (publisher.full().encode('utf8'), - data.encode('utf8'), - node_id.encode('utf8'), - item["id"].encode('utf8'))) - if cursor.rowcount == 1: - return - - cursor.execute("""INSERT INTO items (node_id, item, publisher, data) - SELECT id, %s, %s, %s FROM nodes WHERE node=%s""", - (item["id"].encode('utf8'), - publisher.full().encode('utf8'), - data.encode('utf8'), - node_id.encode('utf8'))) - - def add_subscription(self, node_id, subscriber, state): - return self.dbpool.runInteraction(self._add_subscription, node_id, - subscriber, state) - - def _add_subscription(self, cursor, node_id, subscriber, state): - self._check_node_exists(cursor, node_id) - userhost = subscriber.userhost() - resource = subscriber.resource or '' - - try: - cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", - (userhost.encode('utf8'))) - except: - pass - - try: - cursor.execute("""INSERT INTO subscriptions - (node_id, entity_id, resource, subscription) - SELECT n.id, e.id, %s, %s FROM - (SELECT id FROM nodes WHERE node=%s) AS n - CROSS JOIN - (SELECT id FROM entities WHERE jid=%s) AS e""", - (resource.encode('utf8'), - state.encode('utf8'), - node_id.encode('utf8'), - userhost.encode('utf8'))) - except: - cursor.execute("""SELECT subscription FROM subscriptions - JOIN nodes ON (nodes.id=subscriptions.node_id) - JOIN entities ON - (entities.id=subscriptions.entity_id) - WHERE node=%s AND jid=%s AND resource=%s""", - (node_id.encode('utf8'), - userhost.encode('utf8'), - resource.encode('utf8'))) - state = cursor.fetchone()[0] - - return {'node': node_id, - 'jid': subscriber, - 'subscription': state} - - def remove_subscription(self, node_id, subscriber): - return self.dbpool.runInteraction(self._remove_subscription, node_id, - subscriber) - - def _remove_subscription(self, cursor, node_id, subscriber): - self._check_node_exists(cursor, node_id) - userhost = subscriber.userhost() - resource = subscriber.resource or '' - - cursor.execute("""DELETE FROM subscriptions WHERE - node_id=(SELECT id FROM nodes WHERE node=%s) AND - entity_id=(SELECT id FROM entities WHERE jid=%s) - AND resource=%s""", - (node_id.encode('utf8'), - userhost.encode('utf8'), - resource.encode('utf8'))) - if cursor.rowcount != 1: - raise backend.NotSubscribed - - return None - - def create_node(self, node_id, owner): - return self.dbpool.runInteraction(self._create_node, node_id, - owner) - - def _create_node(self, cursor, node_id, owner): - try: - cursor.execute("""INSERT INTO nodes (node) VALUES (%s)""", - (node_id.encode('utf8'))) - except: - raise backend.NodeExists - - cursor.execute("""SELECT 1 from entities where jid=%s""", - (owner.full().encode('utf8'))) - - if not cursor.fetchone(): - cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", - (owner.full().encode('utf8'))) - - cursor.execute("""INSERT INTO affiliations - (node_id, entity_id, affiliation) - SELECT n.id, e.id, 'owner' FROM - (SELECT id FROM nodes WHERE node=%s) AS n - CROSS JOIN - (SELECT id FROM entities WHERE jid=%s) AS e""", - (node_id.encode('utf8'), - owner.full().encode('utf8'))) - - return None - - def get_affiliations(self, entity): - return self.dbpool.runQuery("""SELECT node, affiliation FROM entities - JOIN affiliations ON - (affiliations.entity_id=entities.id) - JOIN nodes ON - (nodes.id=affiliations.node_id) - WHERE jid=%s""", - (entity.full().encode('utf8'),)) - - def get_subscriptions(self, entity): - d = self.dbpool.runQuery("""SELECT node, jid, resource, subscription - FROM entities JOIN subscriptions ON - (subscriptions.entity_id=entities.id) - JOIN nodes ON - (nodes.id=subscriptions.node_id) - WHERE jid=%s""", - (entity.full().encode('utf8'),)) - d.addCallback(self._convert_subscription_jids) - return d - - def _convert_subscription_jids(self, subscriptions): - return [(node, jid.JID('%s/%s' % (subscriber, resource)), subscription) - for node, subscriber, resource, subscription in subscriptions] - - def get_node_type(self, node_id): - return self.dbpool.runInteraction(self._get_node_type, node_id) - - def _get_node_type(self, cursor, node_id): - self._check_node_exists(cursor, node_id) - return 'leaf' - - def get_nodes(self): - d = self.dbpool.runQuery("""SELECT node from nodes""") - d.addCallback(lambda results: [r[0] for r in results]) - return d - - def is_subscribed(self, node_id, subscriber): - return self.dbpool.runInteraction(self._is_subscribed, node_id, - subscriber) - - def _is_subscribed(self, cursor, node_id, subscriber): - self._check_node_exists(cursor, node_id) - - userhost = subscriber.userhost() - resource = subscriber.resource or '' - - cursor.execute("""SELECT 1 FROM entities - JOIN subscriptions ON - (entities.id=subscriptions.entity_id) - JOIN nodes ON - (nodes.id=subscriptions.node_id) - WHERE entities.jid=%s AND resource=%s - AND node=%s""", - (userhost.encode('utf8'), - resource.encode('utf8'), - node_id.encode('utf8'))) - - return cursor.fetchone() is not None - - def get_items_by_ids(self, node_id, item_ids): - return self.dbpool.runInteraction(self._get_items_by_ids, node_id, - item_ids) - - def _get_items_by_ids(self, cursor, node_id, item_ids): - self._check_node_exists(cursor, node_id) - items = [] - for item_id in item_ids: - cursor.execute("""SELECT data FROM nodes JOIN items ON - (nodes.id=items.node_id) - WHERE node=%s AND item=%s""", - (node_id.encode('utf8'), - item_id.encode('utf8'))) - result = cursor.fetchone() - if result: - items.append(result[0]) - return items - - def get_items(self, node_id, max_items=None): - return self.dbpool.runInteraction(self._get_items, node_id, max_items) - - def _get_items(self, cursor, node_id, max_items): - self._check_node_exists(cursor, node_id) - query = """SELECT data FROM nodes JOIN items ON - (nodes.id=items.node_id) - WHERE node=%s ORDER BY date DESC""" - if max_items: - cursor.execute(query + " LIMIT %s", - (node_id.encode('utf8'), - max_items)) - else: - cursor.execute(query, (node_id.encode('utf8'))) - - result = cursor.fetchall() - return [r[0] for r in result] - - def remove_items(self, node_id, item_ids): - return self.dbpool.runInteraction(self._remove_items, node_id, item_ids) - - def _remove_items(self, cursor, node_id, item_ids): - self._check_node_exists(cursor, node_id) - - deleted = [] - - for item_id in item_ids: - cursor.execute("""DELETE FROM items WHERE - node_id=(SELECT id FROM nodes WHERE node=%s) AND - item=%s""", - (node_id.encode('utf-8'), - item_id.encode('utf-8'))) - - if cursor.rowcount: - deleted.append(item_id) - - return deleted - - def purge_node(self, node_id): - return self.dbpool.runInteraction(self._purge_node, node_id) - - def _purge_node(self, cursor, node_id): - self._check_node_exists(cursor, node_id) - - cursor.execute("""DELETE FROM items WHERE - node_id=(SELECT id FROM nodes WHERE node=%s)""", - (node_id.encode('utf-8'),)) - - def delete_node(self, node_id): - return self.dbpool.runInteraction(self._delete_node, node_id) - - def _delete_node(self, cursor, node_id): - self._check_node_exists(cursor, node_id) - - cursor.execute("""DELETE FROM nodes WHERE node=%s""", - (node_id.encode('utf-8'),)) - - def set_node_configuration(self, node_id, options): - return self.dbpool.runInteraction(self._set_node_configuration, - node_id, - options) - - def _set_node_configuration(self, cursor, node_id, options): - cursor.execute("""UPDATE nodes SET persistent=%s, deliver_payload=%s - WHERE node=%s""", - (options["pubsub#persist_items"].encode('utf8'), - options["pubsub#deliver_payloads"].encode('utf8'), - node_id.encode('utf-8'))) - if cursor.rowcount != 1: - raise backend.Error - -class BackendService(backend.BackendService): - """ PostgreSQL backend Service for a JEP-0060 pubsub service """ - -class NodeCreationService(backend.NodeCreationService): - pass - -class PublishService(backend.PublishService): - pass - -class NotificationService(backend.NotificationService): - pass - -class SubscriptionService(backend.SubscriptionService): - pass - -class AffiliationsService(backend.AffiliationsService): - pass - -class ItemRetrievalService(backend.ItemRetrievalService): - pass - -class RetractionService(backend.RetractionService): - pass - -class NodeDeletionService(backend.NodeDeletionService): - pass