Mercurial > libervia-pubsub
view idavoll/pgsql_storage.py @ 189:c61034369463
Released Idavoll 0.7.0.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Tue, 22 Apr 2008 13:15:53 +0000 |
parents | 0d4474051eeb |
children | e404775b12df |
line wrap: on
line source
# Copyright (c) 2003-2008 Ralph Meijer # See LICENSE for details. import copy from twisted.enterprise import adbapi from twisted.words.protocols.jabber import jid from zope.interface import implements from idavoll import error, iidavoll class Storage: implements(iidavoll.IStorage) def __init__(self, user, database, password=None, host=None, port=None): self._dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', user=user, password=password, database=database, host=host, port=port, cp_reconnect=True, client_encoding='utf-8' ) def get_node(self, node_id): return self._dbpool.runInteraction(self._get_node, node_id) def _get_node(self, cursor, node_id): configuration = {} cursor.execute("""SELECT persistent, deliver_payload, send_last_published_item FROM nodes WHERE node=%s""", (node_id,)) try: (configuration["pubsub#persist_items"], configuration["pubsub#deliver_payloads"], configuration["pubsub#send_last_published_item"]) = \ cursor.fetchone() except TypeError: raise error.NodeNotFound() else: node = LeafNode(node_id, configuration) node._dbpool = self._dbpool return node def get_node_ids(self): d = self._dbpool.runQuery("""SELECT node from nodes""") d.addCallback(lambda results: [r[0] for r in results]) return d def create_node(self, node_id, owner, config=None): return self._dbpool.runInteraction(self._create_node, node_id, owner) def _create_node(self, cursor, node_id, owner): node_id = node_id owner = owner.userhost() try: cursor.execute("""INSERT INTO nodes (node) VALUES (%s)""", (node_id)) except cursor._pool.dbapi.OperationalError: raise error.NodeExists() cursor.execute("""SELECT 1 from entities where jid=%s""", (owner)) if not cursor.fetchone(): cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", (owner)) cursor.execute("""INSERT INTO affiliations (node_id, entity_id, affiliation) SELECT n.id, e.id, 'owner' FROM (SELECT id FROM nodes WHERE node=%s) AS n CROSS JOIN (SELECT id FROM entities WHERE jid=%s) AS e""", (node_id, owner)) def delete_node(self, node_id): return self._dbpool.runInteraction(self._delete_node, node_id) def _delete_node(self, cursor, node_id): cursor.execute("""DELETE FROM nodes WHERE node=%s""", (node_id,)) if cursor.rowcount != 1: raise error.NodeNotFound() def get_affiliations(self, entity): d = self._dbpool.runQuery("""SELECT node, affiliation FROM entities JOIN affiliations ON (affiliations.entity_id=entities.id) JOIN nodes ON (nodes.id=affiliations.node_id) WHERE jid=%s""", (entity.userhost(),)) d.addCallback(lambda results: [tuple(r) for r in results]) return d def get_subscriptions(self, entity): d = self._dbpool.runQuery("""SELECT node, jid, resource, subscription FROM entities JOIN subscriptions ON (subscriptions.entity_id=entities.id) JOIN nodes ON (nodes.id=subscriptions.node_id) WHERE jid=%s""", (entity.userhost(),)) d.addCallback(self._convert_subscription_jids) return d def _convert_subscription_jids(self, subscriptions): return [(node, jid.internJID('%s/%s' % (subscriber, resource)), subscription) for node, subscriber, resource, subscription in subscriptions] class Node: implements(iidavoll.INode) def __init__(self, node_id, config): self.id = node_id self._config = config def _check_node_exists(self, cursor): cursor.execute("""SELECT id FROM nodes WHERE node=%s""", (self.id)) if not cursor.fetchone(): raise error.NodeNotFound() def get_type(self): return self.type def get_configuration(self): return self._config def set_configuration(self, options): config = copy.copy(self._config) for option in options: if option in config: config[option] = options[option] d = self._dbpool.runInteraction(self._set_configuration, config) d.addCallback(self._set_cached_configuration, config) return d def _set_configuration(self, cursor, config): self._check_node_exists(cursor) cursor.execute("""UPDATE nodes SET persistent=%s, deliver_payload=%s, send_last_published_item=%s WHERE node=%s""", (config["pubsub#persist_items"], config["pubsub#deliver_payloads"], config["pubsub#send_last_published_item"], self.id)) def _set_cached_configuration(self, void, config): self._config = config def get_meta_data(self): config = copy.copy(self._config) config["pubsub#node_type"] = self.type return config def get_affiliation(self, entity): return self._dbpool.runInteraction(self._get_affiliation, entity) def _get_affiliation(self, cursor, entity): self._check_node_exists(cursor) cursor.execute("""SELECT affiliation FROM affiliations JOIN nodes ON (node_id=nodes.id) JOIN entities ON (entity_id=entities.id) WHERE node=%s AND jid=%s""", (self.id, entity.userhost())) try: return cursor.fetchone()[0] except TypeError: return None def get_subscription(self, subscriber): return self._dbpool.runInteraction(self._get_subscription, subscriber) def _get_subscription(self, cursor, subscriber): self._check_node_exists(cursor) userhost = subscriber.userhost() resource = subscriber.resource or '' cursor.execute("""SELECT subscription FROM subscriptions JOIN nodes ON (nodes.id=subscriptions.node_id) JOIN entities ON (entities.id=subscriptions.entity_id) WHERE node=%s AND jid=%s AND resource=%s""", (self.id, userhost, resource)) try: return cursor.fetchone()[0] except TypeError: return None def add_subscription(self, subscriber, state): return self._dbpool.runInteraction(self._add_subscription, subscriber, state) def _add_subscription(self, cursor, subscriber, state): self._check_node_exists(cursor) userhost = subscriber.userhost() resource = subscriber.resource or '' try: cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", (userhost)) except cursor._pool.dbapi.OperationalError: pass try: cursor.execute("""INSERT INTO subscriptions (node_id, entity_id, resource, subscription) SELECT n.id, e.id, %s, %s FROM (SELECT id FROM nodes WHERE node=%s) AS n CROSS JOIN (SELECT id FROM entities WHERE jid=%s) AS e""", (resource, state, self.id, userhost)) except cursor._pool.dbapi.OperationalError: raise error.SubscriptionExists() def remove_subscription(self, subscriber): return self._dbpool.runInteraction(self._remove_subscription, subscriber) def _remove_subscription(self, cursor, subscriber): self._check_node_exists(cursor) userhost = subscriber.userhost() resource = subscriber.resource or '' cursor.execute("""DELETE FROM subscriptions WHERE node_id=(SELECT id FROM nodes WHERE node=%s) AND entity_id=(SELECT id FROM entities WHERE jid=%s) AND resource=%s""", (self.id, userhost, resource)) if cursor.rowcount != 1: raise error.NotSubscribed() return None def get_subscribers(self): d = self._dbpool.runInteraction(self._get_subscribers) d.addCallback(self._convert_to_jids) return d def _get_subscribers(self, cursor): self._check_node_exists(cursor) cursor.execute("""SELECT jid, resource FROM subscriptions JOIN nodes ON (node_id=nodes.id) JOIN entities ON (entity_id=entities.id) WHERE node=%s AND subscription='subscribed'""", (self.id,)) return cursor.fetchall() def _convert_to_jids(self, list): return [jid.internJID("%s/%s" % (l[0], l[1])) for l in list] def is_subscribed(self, entity): return self._dbpool.runInteraction(self._is_subscribed, entity) def _is_subscribed(self, cursor, entity): self._check_node_exists(cursor) cursor.execute("""SELECT 1 FROM entities JOIN subscriptions ON (entities.id=subscriptions.entity_id) JOIN nodes ON (nodes.id=subscriptions.node_id) WHERE entities.jid=%s AND node=%s AND subscription='subscribed'""", (entity.userhost(), self.id)) return cursor.fetchone() is not None def get_affiliations(self): return self._dbpool.runInteraction(self._get_affiliations) def _get_affiliations(self, cursor): self._check_node_exists(cursor) cursor.execute("""SELECT jid, affiliation FROM nodes JOIN affiliations ON (nodes.id = affiliations.node_id) JOIN entities ON (affiliations.entity_id = entities.id) WHERE node=%s""", self.id) result = cursor.fetchall() return [(jid.internJID(r[0]), r[1]) for r in result] class LeafNodeMixin: type = 'leaf' def store_items(self, items, publisher): return self._dbpool.runInteraction(self._store_items, items, publisher) def _store_items(self, cursor, items, publisher): self._check_node_exists(cursor) for item in items: self._store_item(cursor, item, publisher) def _store_item(self, cursor, item, publisher): data = item.toXml() cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s FROM nodes WHERE nodes.id = items.node_id AND nodes.node = %s and items.item=%s""", (publisher.full(), data, self.id, item["id"])) if cursor.rowcount == 1: return cursor.execute("""INSERT INTO items (node_id, item, publisher, data) SELECT id, %s, %s, %s FROM nodes WHERE node=%s""", (item["id"], publisher.full(), data, self.id)) def remove_items(self, item_ids): return self._dbpool.runInteraction(self._remove_items, item_ids) def _remove_items(self, cursor, item_ids): self._check_node_exists(cursor) deleted = [] for item_id in item_ids: cursor.execute("""DELETE FROM items WHERE node_id=(SELECT id FROM nodes WHERE node=%s) AND item=%s""", (self.id, item_id)) if cursor.rowcount: deleted.append(item_id) return deleted def get_items(self, max_items=None): return self._dbpool.runInteraction(self._get_items, max_items) def _get_items(self, cursor, max_items): self._check_node_exists(cursor) query = """SELECT data FROM nodes JOIN items ON (nodes.id=items.node_id) WHERE node=%s ORDER BY date DESC""" if max_items: cursor.execute(query + " LIMIT %s", (self.id, max_items)) else: cursor.execute(query, (self.id)) result = cursor.fetchall() return [unicode(r[0], 'utf-8') for r in result] def get_items_by_id(self, item_ids): return self._dbpool.runInteraction(self._get_items_by_id, item_ids) def _get_items_by_id(self, cursor, item_ids): self._check_node_exists(cursor) items = [] for item_id in item_ids: cursor.execute("""SELECT data FROM nodes JOIN items ON (nodes.id=items.node_id) WHERE node=%s AND item=%s""", (self.id, item_id)) result = cursor.fetchone() if result: items.append(unicode(result[0], 'utf-8')) return items def purge(self): return self._dbpool.runInteraction(self._purge) def _purge(self, cursor): self._check_node_exists(cursor) cursor.execute("""DELETE FROM items WHERE node_id=(SELECT id FROM nodes WHERE node=%s)""", (self.id,)) class LeafNode(Node, LeafNodeMixin): implements(iidavoll.ILeafNode)