changeset 51:40ac06941edc

Added node existance checks. Added Storage support for adding and removing subscriptions. Added SubscriptionService as subclass of backend.SubscriptionService.
author Ralph Meijer <ralphm@ik.nu>
date Wed, 03 Nov 2004 17:27:23 +0000
parents 64f0986d8b35
children 0947b46c0968
files idavoll/pgsql_backend.py
diffstat 1 files changed, 78 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- a/idavoll/pgsql_backend.py	Wed Nov 03 16:19:17 2004 +0000
+++ b/idavoll/pgsql_backend.py	Wed Nov 03 17:27:23 2004 +0000
@@ -9,6 +9,14 @@
         self.dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', user=user,
                 database=database)
 
+    def _check_node_exists(self, cursor, node_id):
+        cursor.execute("""SELECT id FROM nodes WHERE node=%s""",
+                       (node_id.encode('utf8')))
+        if not cursor.fetchone():
+            raise backend.NodeNotFound
+        else:
+            return
+
     def _get_node_configuration(self, cursor, node_id):
         configuration = {}
         cursor.execute("""SELECT persistent, deliver_payload FROM nodes
@@ -25,6 +33,7 @@
         return self.dbpool.runInteraction(self._get_node_configuration, node_id)
 
     def _get_affiliation(self, cursor, node_id, entity):
+        self._check_node_exists(cursor, node_id)
         cursor.execute("""SELECT affiliation FROM affiliations
                           JOIN nodes ON (node_id=nodes.id)
                           JOIN entities ON (entity_id=entities.id)
@@ -41,6 +50,7 @@
                                           entity)
 
     def get_subscribers(self, node_id):
+        self._check_node_exists(cursor, node_id)
         d = self.dbpool.runQuery("""SELECT jid, resource FROM subscriptions
                                        JOIN nodes ON (node_id=nodes.id)
                                        JOIN entities ON (entity_id=entities.id)
@@ -58,6 +68,7 @@
                                           publisher)
 
     def _store_items(self, cursor, node_id, items, publisher):
+        self._check_node_exists(cursor, node_id)
         for item in items:
             self._store_item(cursor, node_id, item, publisher)
 
@@ -81,6 +92,70 @@
                         data.encode('utf8'),
                         node_id.encode('utf8')))
 
+    def add_subscription(self, node_id, subscriber, state):
+        return self.dbpool.runInteraction(self._add_subscription, node_id,
+                                          subscriber, state)
+
+    def _add_subscription(self, cursor, node_id, subscriber, state):
+        self._check_node_exists(cursor, node_id)
+        subscriber = jid.JID(subscriber)
+        userhost = subscriber.userhost()
+        resource = subscriber.resource or ''
+
+        try:
+            cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
+                           (userhost.encode('utf8')))
+        except:
+            pass
+
+        try:
+            cursor.execute("""INSERT INTO subscriptions
+                              (node_id, entity_id, resource, subscription)
+                              SELECT n.id, e.id, %s, %s FROM
+                              (SELECT id FROM nodes WHERE node=%s) AS n
+                              CROSS JOIN
+                              (SELECT id FROM entities WHERE jid=%s) AS e""",
+                           (resource.encode('utf8'),
+                            state.encode('utf8'),
+                            node_id.encode('utf8'),
+                            userhost.encode('utf8')))
+        except:
+            cursor.execute("""SELECT subscription FROM subscriptions
+                              JOIN nodes ON (nodes.id=subscriptions.node_id)
+                              JOIN entities ON
+                                   (entities.id=subscriptions.entity_id)
+                              WHERE node=%s AND jid=%s AND resource=%s""",
+                           (node_id.encode('utf8'),
+                            userhost.encode('utf8'),
+                            resource.encode('utf8')))
+            state = cursor.fetchone()[0]
+
+        return {'node': node_id,
+                'jid': subscriber.full(),
+                'subscription': state}
+
+    def remove_subscription(self, node_id, subscriber):
+        return self.dbpool.runInteraction(self._remove_subscription, node_id,
+                                          subscriber)
+
+    def _remove_subscription(self, cursor, node_id, subscriber):
+        self._check_node_exists(cursor, node_id)
+        subscriber = jid.JID(subscriber)
+        userhost = subscriber.userhost()
+        resource = subscriber.resource or ''
+
+        cursor.execute("""DELETE FROM subscriptions WHERE
+                          node_id=(SELECT id FROM nodes WHERE node=%s) AND
+                          entity_id=(SELECT id FROM entities WHERE jid=%s)
+                          AND resource=%s""",
+                       (node_id.encode('utf8'),
+                        userhost.encode('utf8'),
+                        resource.encode('utf8')))
+        if cursor.rowcount != 1:
+            raise backend.NotSubscribed
+
+        return None
+
 class BackendService(backend.BackendService):
     """ PostgreSQL backend Service for a JEP-0060 pubsub service """
 
@@ -89,3 +164,6 @@
 
 class NotificationService(backend.NotificationService):
     pass
+
+class SubscriptionService(backend.SubscriptionService):
+    pass