Mercurial > libervia-pubsub
diff idavoll/pgsql_storage.py @ 198:e404775b12df
Change naming and spacing conventions to match Twisted's.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Tue, 10 Jun 2008 11:31:49 +0000 |
parents | 0d4474051eeb |
children | 77c61e2b8c75 |
line wrap: on
line diff
--- a/idavoll/pgsql_storage.py Thu Jun 18 11:54:56 2009 +0000 +++ b/idavoll/pgsql_storage.py Tue Jun 10 11:31:49 2008 +0000 @@ -23,16 +23,18 @@ client_encoding='utf-8' ) - def get_node(self, node_id): - return self._dbpool.runInteraction(self._get_node, node_id) - def _get_node(self, cursor, node_id): + def getNode(self, nodeIdentifier): + return self._dbpool.runInteraction(self._getNode, nodeIdentifier) + + + def _getNode(self, cursor, nodeIdentifier): configuration = {} cursor.execute("""SELECT persistent, deliver_payload, send_last_published_item FROM nodes WHERE node=%s""", - (node_id,)) + (nodeIdentifier,)) try: (configuration["pubsub#persist_items"], configuration["pubsub#deliver_payloads"], @@ -41,24 +43,27 @@ except TypeError: raise error.NodeNotFound() else: - node = LeafNode(node_id, configuration) + node = LeafNode(nodeIdentifier, configuration) node._dbpool = self._dbpool return node - def get_node_ids(self): + + def getNodeIds(self): d = self._dbpool.runQuery("""SELECT node from nodes""") d.addCallback(lambda results: [r[0] for r in results]) return d - def create_node(self, node_id, owner, config=None): - return self._dbpool.runInteraction(self._create_node, node_id, owner) - def _create_node(self, cursor, node_id, owner): - node_id = node_id + def createNode(self, nodeIdentifier, owner, config=None): + return self._dbpool.runInteraction(self._createNode, nodeIdentifier, + owner) + + + def _createNode(self, cursor, nodeIdentifier, owner): owner = owner.userhost() try: cursor.execute("""INSERT INTO nodes (node) VALUES (%s)""", - (node_id)) + (nodeIdentifier)) except cursor._pool.dbapi.OperationalError: raise error.NodeExists() @@ -75,19 +80,22 @@ (SELECT id FROM nodes WHERE node=%s) AS n CROSS JOIN (SELECT id FROM entities WHERE jid=%s) AS e""", - (node_id, owner)) + (nodeIdentifier, owner)) + - def delete_node(self, node_id): - return self._dbpool.runInteraction(self._delete_node, node_id) + def deleteNode(self, nodeIdentifier): + return self._dbpool.runInteraction(self._deleteNode, nodeIdentifier) + - def _delete_node(self, cursor, node_id): + def _deleteNode(self, cursor, nodeIdentifier): cursor.execute("""DELETE FROM nodes WHERE node=%s""", - (node_id,)) + (nodeIdentifier,)) if cursor.rowcount != 1: raise error.NodeNotFound() - def get_affiliations(self, entity): + + def getAffiliations(self, entity): d = self._dbpool.runQuery("""SELECT node, affiliation FROM entities JOIN affiliations ON (affiliations.entity_id=entities.id) @@ -98,7 +106,8 @@ d.addCallback(lambda results: [tuple(r) for r in results]) return d - def get_subscriptions(self, entity): + + def getSubscriptions(self, entity): d = self._dbpool.runQuery("""SELECT node, jid, resource, subscription FROM entities JOIN subscriptions ON (subscriptions.entity_id=entities.id) @@ -106,75 +115,86 @@ (nodes.id=subscriptions.node_id) WHERE jid=%s""", (entity.userhost(),)) - d.addCallback(self._convert_subscription_jids) + d.addCallback(self._convertSubscriptionJIDs) return d - def _convert_subscription_jids(self, subscriptions): + + def _convertSubscriptionJIDs(self, subscriptions): return [(node, jid.internJID('%s/%s' % (subscriber, resource)), subscription) for node, subscriber, resource, subscription in subscriptions] + class Node: implements(iidavoll.INode) - def __init__(self, node_id, config): - self.id = node_id + def __init__(self, nodeIdentifier, config): + self.nodeIdentifier = nodeIdentifier self._config = config - def _check_node_exists(self, cursor): + + def _checkNodeExists(self, cursor): cursor.execute("""SELECT id FROM nodes WHERE node=%s""", - (self.id)) + (self.nodeIdentifier)) if not cursor.fetchone(): raise error.NodeNotFound() - def get_type(self): - return self.type - def get_configuration(self): + def getType(self): + return self.nodeType + + + def getConfiguration(self): return self._config - def set_configuration(self, options): + + def setConfiguration(self, options): config = copy.copy(self._config) for option in options: if option in config: config[option] = options[option] - d = self._dbpool.runInteraction(self._set_configuration, config) - d.addCallback(self._set_cached_configuration, config) + d = self._dbpool.runInteraction(self._setConfiguration, config) + d.addCallback(self._setCachedConfiguration, config) return d - def _set_configuration(self, cursor, config): - self._check_node_exists(cursor) + + def _setConfiguration(self, cursor, config): + self._checkNodeExists(cursor) cursor.execute("""UPDATE nodes SET persistent=%s, deliver_payload=%s, send_last_published_item=%s WHERE node=%s""", (config["pubsub#persist_items"], config["pubsub#deliver_payloads"], config["pubsub#send_last_published_item"], - self.id)) + self.nodeIdentifier)) - def _set_cached_configuration(self, void, config): + + def _setCachedConfiguration(self, void, config): self._config = config - def get_meta_data(self): + + def getMetaData(self): config = copy.copy(self._config) - config["pubsub#node_type"] = self.type + config["pubsub#node_type"] = self.nodeType return config - def get_affiliation(self, entity): - return self._dbpool.runInteraction(self._get_affiliation, entity) + + def getAffiliation(self, entity): + return self._dbpool.runInteraction(self._getAffiliation, entity) - def _get_affiliation(self, cursor, entity): - self._check_node_exists(cursor) + + def _getAffiliation(self, cursor, entity): + self._checkNodeExists(cursor) cursor.execute("""SELECT affiliation FROM affiliations JOIN nodes ON (node_id=nodes.id) JOIN entities ON (entity_id=entities.id) WHERE node=%s AND jid=%s""", - (self.id, + (self.nodeIdentifier, entity.userhost())) try: @@ -182,11 +202,13 @@ except TypeError: return None - def get_subscription(self, subscriber): - return self._dbpool.runInteraction(self._get_subscription, subscriber) + + def getSubscription(self, subscriber): + return self._dbpool.runInteraction(self._getSubscription, subscriber) - def _get_subscription(self, cursor, subscriber): - self._check_node_exists(cursor) + + def _getSubscription(self, cursor, subscriber): + self._checkNodeExists(cursor) userhost = subscriber.userhost() resource = subscriber.resource or '' @@ -196,7 +218,7 @@ JOIN entities ON (entities.id=subscriptions.entity_id) WHERE node=%s AND jid=%s AND resource=%s""", - (self.id, + (self.nodeIdentifier, userhost, resource)) try: @@ -204,12 +226,14 @@ except TypeError: return None - def add_subscription(self, subscriber, state): - return self._dbpool.runInteraction(self._add_subscription, subscriber, + + def addSubscription(self, subscriber, state): + return self._dbpool.runInteraction(self._addSubscription, subscriber, state) - def _add_subscription(self, cursor, subscriber, state): - self._check_node_exists(cursor) + + def _addSubscription(self, cursor, subscriber, state): + self._checkNodeExists(cursor) userhost = subscriber.userhost() resource = subscriber.resource or '' @@ -229,17 +253,19 @@ (SELECT id FROM entities WHERE jid=%s) AS e""", (resource, state, - self.id, + self.nodeIdentifier, userhost)) except cursor._pool.dbapi.OperationalError: raise error.SubscriptionExists() - def remove_subscription(self, subscriber): - return self._dbpool.runInteraction(self._remove_subscription, + + def removeSubscription(self, subscriber): + return self._dbpool.runInteraction(self._removeSubscription, subscriber) - def _remove_subscription(self, cursor, subscriber): - self._check_node_exists(cursor) + + def _removeSubscription(self, cursor, subscriber): + self._checkNodeExists(cursor) userhost = subscriber.userhost() resource = subscriber.resource or '' @@ -248,7 +274,7 @@ node_id=(SELECT id FROM nodes WHERE node=%s) AND entity_id=(SELECT id FROM entities WHERE jid=%s) AND resource=%s""", - (self.id, + (self.nodeIdentifier, userhost, resource)) if cursor.rowcount != 1: @@ -256,29 +282,34 @@ return None - def get_subscribers(self): - d = self._dbpool.runInteraction(self._get_subscribers) - d.addCallback(self._convert_to_jids) + + def getSubscribers(self): + d = self._dbpool.runInteraction(self._getSubscribers) + d.addCallback(self._convertToJIDs) return d - def _get_subscribers(self, cursor): - self._check_node_exists(cursor) + + def _getSubscribers(self, cursor): + self._checkNodeExists(cursor) cursor.execute("""SELECT jid, resource FROM subscriptions JOIN nodes ON (node_id=nodes.id) JOIN entities ON (entity_id=entities.id) WHERE node=%s AND subscription='subscribed'""", - (self.id,)) + (self.nodeIdentifier,)) return cursor.fetchall() - def _convert_to_jids(self, list): + + def _convertToJIDs(self, list): return [jid.internJID("%s/%s" % (l[0], l[1])) for l in list] - def is_subscribed(self, entity): - return self._dbpool.runInteraction(self._is_subscribed, entity) + + def isSubscribed(self, entity): + return self._dbpool.runInteraction(self._isSubscribed, entity) - def _is_subscribed(self, cursor, entity): - self._check_node_exists(cursor) + + def _isSubscribed(self, cursor, entity): + self._checkNodeExists(cursor) cursor.execute("""SELECT 1 FROM entities JOIN subscriptions ON @@ -288,15 +319,17 @@ WHERE entities.jid=%s AND node=%s AND subscription='subscribed'""", (entity.userhost(), - self.id)) + self.nodeIdentifier)) return cursor.fetchone() is not None - def get_affiliations(self): - return self._dbpool.runInteraction(self._get_affiliations) + + def getAffiliations(self): + return self._dbpool.runInteraction(self._getAffiliations) - def _get_affiliations(self, cursor): - self._check_node_exists(cursor) + + def _getAffiliations(self, cursor): + self._checkNodeExists(cursor) cursor.execute("""SELECT jid, affiliation FROM nodes JOIN affiliations ON @@ -304,24 +337,28 @@ JOIN entities ON (affiliations.entity_id = entities.id) WHERE node=%s""", - self.id) + self.nodeIdentifier) result = cursor.fetchall() return [(jid.internJID(r[0]), r[1]) for r in result] + + class LeafNodeMixin: - type = 'leaf' + nodeType = 'leaf' - def store_items(self, items, publisher): - return self._dbpool.runInteraction(self._store_items, items, publisher) + def storeItems(self, items, publisher): + return self._dbpool.runInteraction(self._storeItems, items, publisher) + - def _store_items(self, cursor, items, publisher): - self._check_node_exists(cursor) + def _storeItems(self, cursor, items, publisher): + self._checkNodeExists(cursor) for item in items: - self._store_item(cursor, item, publisher) + self._storeItem(cursor, item, publisher) - def _store_item(self, cursor, item, publisher): + + def _storeItem(self, cursor, item, publisher): data = item.toXml() cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s FROM nodes @@ -329,7 +366,7 @@ nodes.node = %s and items.item=%s""", (publisher.full(), data, - self.id, + self.nodeIdentifier, item["id"])) if cursor.rowcount == 1: return @@ -339,72 +376,82 @@ (item["id"], publisher.full(), data, - self.id)) + self.nodeIdentifier)) + - def remove_items(self, item_ids): - return self._dbpool.runInteraction(self._remove_items, item_ids) + def removeItems(self, itemIdentifiers): + return self._dbpool.runInteraction(self._removeItems, itemIdentifiers) - def _remove_items(self, cursor, item_ids): - self._check_node_exists(cursor) + + def _removeItems(self, cursor, itemIdentifiers): + self._checkNodeExists(cursor) deleted = [] - for item_id in item_ids: + for itemIdentifier in itemIdentifiers: cursor.execute("""DELETE FROM items WHERE node_id=(SELECT id FROM nodes WHERE node=%s) AND item=%s""", - (self.id, - item_id)) + (self.nodeIdentifier, + itemIdentifier)) if cursor.rowcount: - deleted.append(item_id) + deleted.append(itemIdentifier) return deleted - def get_items(self, max_items=None): - return self._dbpool.runInteraction(self._get_items, max_items) + + def getItems(self, maxItems=None): + return self._dbpool.runInteraction(self._getItems, maxItems) - def _get_items(self, cursor, max_items): - self._check_node_exists(cursor) + + def _getItems(self, cursor, maxItems): + self._checkNodeExists(cursor) query = """SELECT data FROM nodes JOIN items ON (nodes.id=items.node_id) WHERE node=%s ORDER BY date DESC""" - if max_items: + if maxItems: cursor.execute(query + " LIMIT %s", - (self.id, - max_items)) + (self.nodeIdentifier, + maxItems)) else: - cursor.execute(query, (self.id)) + cursor.execute(query, (self.nodeIdentifier)) result = cursor.fetchall() return [unicode(r[0], 'utf-8') for r in result] - def get_items_by_id(self, item_ids): - return self._dbpool.runInteraction(self._get_items_by_id, item_ids) + + def getItemsById(self, itemIdentifiers): + return self._dbpool.runInteraction(self._getItemsById, itemIdentifiers) + - def _get_items_by_id(self, cursor, item_ids): - self._check_node_exists(cursor) + def _getItemsById(self, cursor, itemIdentifiers): + self._checkNodeExists(cursor) items = [] - for item_id in item_ids: + for itemIdentifier in itemIdentifiers: cursor.execute("""SELECT data FROM nodes JOIN items ON (nodes.id=items.node_id) WHERE node=%s AND item=%s""", - (self.id, - item_id)) + (self.nodeIdentifier, + itemIdentifier)) result = cursor.fetchone() if result: items.append(unicode(result[0], 'utf-8')) return items + def purge(self): return self._dbpool.runInteraction(self._purge) + def _purge(self, cursor): - self._check_node_exists(cursor) + self._checkNodeExists(cursor) cursor.execute("""DELETE FROM items WHERE node_id=(SELECT id FROM nodes WHERE node=%s)""", - (self.id,)) + (self.nodeIdentifier,)) + + class LeafNode(Node, LeafNodeMixin):