Mercurial > libervia-pubsub
view idavoll/pgsql_storage.py @ 162:84cfe9fe38c5
Comply with the access model 'open'.
Currently, the only implemented access model is 'open', so we should not check
for the subscription of the requestor for item retrieval. We do reject
outcasts.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Wed, 06 Sep 2006 12:57:53 +0000 |
parents | 5191ba7c4df8 |
children | 1701c0e2c707 |
line wrap: on
line source
# Copyright (c) 2003-2006 Ralph Meijer # See LICENSE for details. import copy import storage from twisted.enterprise import adbapi from twisted.internet import defer from twisted.words.protocols.jabber import jid from zope.interface import implements class Storage: implements(storage.IStorage) def __init__(self, user, database): self._dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', user=user, database=database, client_encoding='utf-8' ) def get_node(self, node_id): return self._dbpool.runInteraction(self._get_node, node_id) def _get_node(self, cursor, node_id): configuration = {} cursor.execute("""SELECT persistent, deliver_payload FROM nodes WHERE node=%s""", (node_id,)) try: (configuration["pubsub#persist_items"], configuration["pubsub#deliver_payloads"]) = cursor.fetchone() except TypeError: raise storage.NodeNotFound else: node = LeafNode(node_id, configuration) node._dbpool = self._dbpool return node def get_node_ids(self): d = self._dbpool.runQuery("""SELECT node from nodes""") d.addCallback(lambda results: [r[0] for r in results]) return d def create_node(self, node_id, owner, type='leaf'): return self._dbpool.runInteraction(self._create_node, node_id, owner) def _create_node(self, cursor, node_id, owner): node_id = node_id owner = owner.userhost() try: cursor.execute("""INSERT INTO nodes (node) VALUES (%s)""", (node_id)) except cursor._pool.dbapi.OperationalError: raise storage.NodeExists cursor.execute("""SELECT 1 from entities where jid=%s""", (owner)) if not cursor.fetchone(): cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", (owner)) cursor.execute("""INSERT INTO affiliations (node_id, entity_id, affiliation) SELECT n.id, e.id, 'owner' FROM (SELECT id FROM nodes WHERE node=%s) AS n CROSS JOIN (SELECT id FROM entities WHERE jid=%s) AS e""", (node_id, owner)) def delete_node(self, node_id): return self._dbpool.runInteraction(self._delete_node, node_id) def _delete_node(self, cursor, node_id): cursor.execute("""DELETE FROM nodes WHERE node=%s""", (node_id,)) if cursor.rowcount != 1: raise storage.NodeNotFound def get_affiliations(self, entity): d = self._dbpool.runQuery("""SELECT node, affiliation FROM entities JOIN affiliations ON (affiliations.entity_id=entities.id) JOIN nodes ON (nodes.id=affiliations.node_id) WHERE jid=%s""", (entity.userhost(),)) d.addCallback(lambda results: [tuple(r) for r in results]) return d def get_subscriptions(self, entity): d = self._dbpool.runQuery("""SELECT node, jid, resource, subscription FROM entities JOIN subscriptions ON (subscriptions.entity_id=entities.id) JOIN nodes ON (nodes.id=subscriptions.node_id) WHERE jid=%s""", (entity.userhost(),)) d.addCallback(self._convert_subscription_jids) return d def _convert_subscription_jids(self, subscriptions): return [(node, jid.internJID('%s/%s' % (subscriber, resource)), subscription) for node, subscriber, resource, subscription in subscriptions] class Node: implements(storage.INode) def __init__(self, node_id, config): self.id = node_id self._config = config def _check_node_exists(self, cursor): cursor.execute("""SELECT id FROM nodes WHERE node=%s""", (self.id)) if not cursor.fetchone(): raise backend.NodeNotFound def get_type(self): return self.type def get_configuration(self): return self._config def set_configuration(self, options): config = copy.copy(self._config) for option in options: if option in config: config[option] = options[option] d = self._dbpool.runInteraction(self._set_configuration, config) d.addCallback(self._set_cached_configuration, config) return d def _set_configuration(self, cursor, config): self._check_node_exists(cursor) cursor.execute("""UPDATE nodes SET persistent=%s, deliver_payload=%s WHERE node=%s""", (config["pubsub#persist_items"], config["pubsub#deliver_payloads"], self.id)) def _set_cached_configuration(self, void, config): self._config = config def get_meta_data(self): config = copy.copy(self._config) config["pubsub#node_type"] = self.type return config def get_affiliation(self, entity): return self._dbpool.runInteraction(self._get_affiliation, entity) def _get_affiliation(self, cursor, entity): self._check_node_exists(cursor) cursor.execute("""SELECT affiliation FROM affiliations JOIN nodes ON (node_id=nodes.id) JOIN entities ON (entity_id=entities.id) WHERE node=%s AND jid=%s""", (self.id, entity.userhost())) try: return cursor.fetchone()[0] except TypeError: return None def get_subscription(self, subscriber): return self._dbpool.runInteraction(self._get_subscription, subscriber) def _get_subscription(self, cursor, subscriber): self._check_node_exists(cursor) userhost = subscriber.userhost() resource = subscriber.resource or '' cursor.execute("""SELECT subscription FROM subscriptions JOIN nodes ON (nodes.id=subscriptions.node_id) JOIN entities ON (entities.id=subscriptions.entity_id) WHERE node=%s AND jid=%s AND resource=%s""", (self.id, userhost, resource)) try: return cursor.fetchone()[0] except TypeError: return None def add_subscription(self, subscriber, state): return self._dbpool.runInteraction(self._add_subscription, subscriber, state) def _add_subscription(self, cursor, subscriber, state): self._check_node_exists(cursor) userhost = subscriber.userhost() resource = subscriber.resource or '' try: cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", (userhost)) except cursor._pool.dbapi.OperationalError: pass try: cursor.execute("""INSERT INTO subscriptions (node_id, entity_id, resource, subscription) SELECT n.id, e.id, %s, %s FROM (SELECT id FROM nodes WHERE node=%s) AS n CROSS JOIN (SELECT id FROM entities WHERE jid=%s) AS e""", (resource, state, self.id, userhost)) except cursor._pool.dbapi.OperationalError: raise storage.SubscriptionExists def remove_subscription(self, subscriber): return self._dbpool.runInteraction(self._remove_subscription, subscriber) def _remove_subscription(self, cursor, subscriber): self._check_node_exists(cursor) userhost = subscriber.userhost() resource = subscriber.resource or '' cursor.execute("""DELETE FROM subscriptions WHERE node_id=(SELECT id FROM nodes WHERE node=%s) AND entity_id=(SELECT id FROM entities WHERE jid=%s) AND resource=%s""", (self.id, userhost, resource)) if cursor.rowcount != 1: raise storage.SubscriptionNotFound return None def get_subscribers(self): d = self._dbpool.runInteraction(self._get_subscribers) d.addCallback(self._convert_to_jids) return d def _get_subscribers(self, cursor): self._check_node_exists(cursor) cursor.execute("""SELECT jid, resource FROM subscriptions JOIN nodes ON (node_id=nodes.id) JOIN entities ON (entity_id=entities.id) WHERE node=%s AND subscription='subscribed'""", (self.id,)) return cursor.fetchall() def _convert_to_jids(self, list): return [jid.internJID("%s/%s" % (l[0], l[1])) for l in list] def is_subscribed(self, entity): return self._dbpool.runInteraction(self._is_subscribed, entity) def _is_subscribed(self, cursor, entity): self._check_node_exists(cursor) cursor.execute("""SELECT 1 FROM entities JOIN subscriptions ON (entities.id=subscriptions.entity_id) JOIN nodes ON (nodes.id=subscriptions.node_id) WHERE entities.jid=%s AND node=%s AND subscription='subscribed'""", (entity.userhost(), self.id)) return cursor.fetchone() is not None def get_affiliations(self): return self._dbpool.runInteraction(self._get_affiliations) def _get_affiliations(self, cursor): self._check_node_exists(cursor) cursor.execute("""SELECT jid, affiliation FROM nodes JOIN affiliations ON (nodes.id = affiliations.node_id) JOIN entities ON (affiliations.entity_id = entities.id) WHERE node=%s""", self.id) result = cursor.fetchall() return [(jid.internJID(r[0]), r[1]) for r in result] class LeafNodeMixin: type = 'leaf' def store_items(self, items, publisher): return self._dbpool.runInteraction(self._store_items, items, publisher) def _store_items(self, cursor, items, publisher): self._check_node_exists(cursor) for item in items: self._store_item(cursor, item, publisher) def _store_item(self, cursor, item, publisher): data = item.toXml() cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s FROM nodes WHERE nodes.id = items.node_id AND nodes.node = %s and items.item=%s""", (publisher.full(), data, self.id, item["id"])) if cursor.rowcount == 1: return cursor.execute("""INSERT INTO items (node_id, item, publisher, data) SELECT id, %s, %s, %s FROM nodes WHERE node=%s""", (item["id"], publisher.full(), data, self.id)) def remove_items(self, item_ids): return self._dbpool.runInteraction(self._remove_items, item_ids) def _remove_items(self, cursor, item_ids): self._check_node_exists(cursor) deleted = [] for item_id in item_ids: cursor.execute("""DELETE FROM items WHERE node_id=(SELECT id FROM nodes WHERE node=%s) AND item=%s""", (self.id, item_id)) if cursor.rowcount: deleted.append(item_id) return deleted def get_items(self, max_items=None): return self._dbpool.runInteraction(self._get_items, max_items) def _get_items(self, cursor, max_items): self._check_node_exists(cursor) query = """SELECT data FROM nodes JOIN items ON (nodes.id=items.node_id) WHERE node=%s ORDER BY date DESC""" if max_items: cursor.execute(query + " LIMIT %s", (self.id, max_items)) else: cursor.execute(query, (self.id)) result = cursor.fetchall() return [unicode(r[0], 'utf-8') for r in result] def get_items_by_id(self, item_ids): return self._dbpool.runInteraction(self._get_items_by_id, item_ids) def _get_items_by_id(self, cursor, item_ids): self._check_node_exists(cursor) items = [] for item_id in item_ids: cursor.execute("""SELECT data FROM nodes JOIN items ON (nodes.id=items.node_id) WHERE node=%s AND item=%s""", (self.id, item_id)) result = cursor.fetchone() if result: items.append(unicode(result[0], 'utf-8')) return items def purge(self): return self._dbpool.runInteraction(self._purge) def _purge(self, cursor): self._check_node_exists(cursor) cursor.execute("""DELETE FROM items WHERE node_id=(SELECT id FROM nodes WHERE node=%s)""", (self.id,)) class LeafNode(Node, LeafNodeMixin): implements(storage.ILeafNode)