# HG changeset patch # User Ralph Meijer # Date 1099502843 0 # Node ID 40ac06941edc6aa939db029dca82585e847c93bf # Parent 64f0986d8b35b652a4a142777f13e16a1e54ba52 Added node existance checks. Added Storage support for adding and removing subscriptions. Added SubscriptionService as subclass of backend.SubscriptionService. diff -r 64f0986d8b35 -r 40ac06941edc idavoll/pgsql_backend.py --- a/idavoll/pgsql_backend.py Wed Nov 03 16:19:17 2004 +0000 +++ b/idavoll/pgsql_backend.py Wed Nov 03 17:27:23 2004 +0000 @@ -9,6 +9,14 @@ self.dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', user=user, database=database) + def _check_node_exists(self, cursor, node_id): + cursor.execute("""SELECT id FROM nodes WHERE node=%s""", + (node_id.encode('utf8'))) + if not cursor.fetchone(): + raise backend.NodeNotFound + else: + return + def _get_node_configuration(self, cursor, node_id): configuration = {} cursor.execute("""SELECT persistent, deliver_payload FROM nodes @@ -25,6 +33,7 @@ return self.dbpool.runInteraction(self._get_node_configuration, node_id) def _get_affiliation(self, cursor, node_id, entity): + self._check_node_exists(cursor, node_id) cursor.execute("""SELECT affiliation FROM affiliations JOIN nodes ON (node_id=nodes.id) JOIN entities ON (entity_id=entities.id) @@ -41,6 +50,7 @@ entity) def get_subscribers(self, node_id): + self._check_node_exists(cursor, node_id) d = self.dbpool.runQuery("""SELECT jid, resource FROM subscriptions JOIN nodes ON (node_id=nodes.id) JOIN entities ON (entity_id=entities.id) @@ -58,6 +68,7 @@ publisher) def _store_items(self, cursor, node_id, items, publisher): + self._check_node_exists(cursor, node_id) for item in items: self._store_item(cursor, node_id, item, publisher) @@ -81,6 +92,70 @@ data.encode('utf8'), node_id.encode('utf8'))) + def add_subscription(self, node_id, subscriber, state): + return self.dbpool.runInteraction(self._add_subscription, node_id, + subscriber, state) + + def _add_subscription(self, cursor, node_id, subscriber, state): + self._check_node_exists(cursor, node_id) + subscriber = jid.JID(subscriber) + userhost = subscriber.userhost() + resource = subscriber.resource or '' + + try: + cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", + (userhost.encode('utf8'))) + except: + pass + + try: + cursor.execute("""INSERT INTO subscriptions + (node_id, entity_id, resource, subscription) + SELECT n.id, e.id, %s, %s FROM + (SELECT id FROM nodes WHERE node=%s) AS n + CROSS JOIN + (SELECT id FROM entities WHERE jid=%s) AS e""", + (resource.encode('utf8'), + state.encode('utf8'), + node_id.encode('utf8'), + userhost.encode('utf8'))) + except: + cursor.execute("""SELECT subscription FROM subscriptions + JOIN nodes ON (nodes.id=subscriptions.node_id) + JOIN entities ON + (entities.id=subscriptions.entity_id) + WHERE node=%s AND jid=%s AND resource=%s""", + (node_id.encode('utf8'), + userhost.encode('utf8'), + resource.encode('utf8'))) + state = cursor.fetchone()[0] + + return {'node': node_id, + 'jid': subscriber.full(), + 'subscription': state} + + def remove_subscription(self, node_id, subscriber): + return self.dbpool.runInteraction(self._remove_subscription, node_id, + subscriber) + + def _remove_subscription(self, cursor, node_id, subscriber): + self._check_node_exists(cursor, node_id) + subscriber = jid.JID(subscriber) + userhost = subscriber.userhost() + resource = subscriber.resource or '' + + cursor.execute("""DELETE FROM subscriptions WHERE + node_id=(SELECT id FROM nodes WHERE node=%s) AND + entity_id=(SELECT id FROM entities WHERE jid=%s) + AND resource=%s""", + (node_id.encode('utf8'), + userhost.encode('utf8'), + resource.encode('utf8'))) + if cursor.rowcount != 1: + raise backend.NotSubscribed + + return None + class BackendService(backend.BackendService): """ PostgreSQL backend Service for a JEP-0060 pubsub service """ @@ -89,3 +164,6 @@ class NotificationService(backend.NotificationService): pass + +class SubscriptionService(backend.SubscriptionService): + pass