diff idavoll/pgsql_backend.py @ 41:ea3d3544a52e

Rewrite using separated backend interfaces. The backend also uses a separate class for the actual storage.
author Ralph Meijer <ralphm@ik.nu>
date Sun, 31 Oct 2004 21:11:03 +0000
parents 39d0c6fa027f
children 9685b7e291ef
line wrap: on
line diff
--- a/idavoll/pgsql_backend.py	Sun Oct 31 21:08:40 2004 +0000
+++ b/idavoll/pgsql_backend.py	Sun Oct 31 21:11:03 2004 +0000
@@ -4,40 +4,78 @@
 from twisted.enterprise import adbapi
 import backend
 
-class Service(service.Service):
-    """ PostgreSQL backend Service for a JEP-0060 pubsub service """
-
-    __implements__ = backend.IService
+class Storage:
+    def __init__(self, user, database):
+        self.dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', user=user,
+                database=database)
 
-    def __init__(self):
-        self.dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', user='ralphm',
-                database='pubsub_test')
-
-    def _do_publish(self, cursor, node_id, publisher, items):
+    def _get_node_configuration(self, cursor, node_id):
+        configuration = {}
         cursor.execute("""SELECT persistent, deliver_payload FROM nodes
                           WHERE node=%s""",
                        (node_id,))
         try:
-            persist_items, deliver_payloads = cursor.fetchone()
+            (configuration["persist_items"],
+             configuration["deliver_payloads"]) = cursor.fetchone()
+            return configuration
         except TypeError:
             raise backend.NodeNotFound
 
-        cursor.execute("""SELECT 1 FROM affiliations
+    def get_node_configuration(self, node_id):
+        return self.dbpool.runInteraction(self._get_node_configuration, node_id)
+
+    def _get_affiliation(self, cursor, node_id, entity):
+        cursor.execute("""SELECT affiliation FROM affiliations
                           JOIN nodes ON (node_id=nodes.id)
                           JOIN entities ON (entity_id=entities.id)
-                          WHERE node=%s AND jid=%s AND
-                          affiliation IN ('owner', 'publisher')""",
-                       (node_id.encode('utf8'), publisher.encode('utf8')))
+                          WHERE node=%s AND jid=%s""",
+                       (node_id.encode('utf8'), entity.encode('utf8')))
+
+        try:
+            return cursor.fetchone()[0]
+        except TypeError:
+            return None
+
+    def get_affiliation(self, node_id, entity):
+        return self.dbpool.runInteraction(self._get_affiliation, node_id,
+                                          entity)
 
-        if not cursor.fetchone():
+    def get_subscribers(self, node_id):
+        d =  self.dbpool.runQuery("""SELECT jid, resource FROM subscriptions
+                                       JOIN nodes ON (node_id=nodes.id)
+                                       JOIN entities ON (entity_id=entities.id)
+                                       WHERE node=%s AND
+                                             subscription='subscribed'""",
+                                    (node_id.encode('utf8'),))
+        d.addCallback(self._convert_to_jids)
+        return d
+
+    def _convert_to_jids(self, list):
+        return [jid.JID("%s/%s" % (l[0], l[1])).full() for l in list]
+
+class BackendService(backend.BackendService):
+    """ PostgreSQL backend Service for a JEP-0060 pubsub service """
+
+    def __init__(self, storage):
+        backend.BackendService.__init__(self)
+        self.storage = storage
+
+    def do_publish(self, result, node_id, items, requestor):
+        print result
+        configuration = result[0][1]
+        persist_items = configuration["persist_items"]
+        deliver_payloads = configuration["deliver_payloads"]
+        affiliation = result[1][1]
+
+        if affiliation not in ['owner', 'publisher']:
             raise backend.NotAuthorized
-        
+
         if items and not persist_items and not deliver_payloads:
             raise backend.NoPayloadAllowed
         elif not items and (persist_items or deliver_payloads):
             raise backend.PayloadExpected
 
-        print "publish by %s to %s" % (publisher, node_id)
+        print "publish by %s to %s" % (requestor.full(), node_id)
 
         if persist_items or deliver_payloads:
             for item in items:
@@ -45,39 +83,60 @@
                     item["id"] = 'random'   # FIXME
 
         if persist_items:
-            self.storeItems(node_id, publisher, items)
+            d = self.store_items(node_id, items, requestor.full())
+        else:
+            d = defer.succeed(None)
 
+        d.addCallback(self.do_notify, node_id, items, deliver_payloads)
+
+    def do_notify(self, result, node_id, items, deliver_payloads):
         if items and not deliver_payloads:
             for item in items:
                 item.children = []
 
-        recipients = self.get_subscribers(node_id)
-        recipients.addCallback(self.magic_filter, node_id, items)
-        recipients.addCallback(self.pubsub_service.do_notification, node_id)
+        self.dispatch({ 'items': items, 'node_id': node_id },
+                      '//event/pubsub/notify')
 
-    def do_publish(self, node_id, publisher, items):
-        d = self.dbpool.runInteraction(self._do_publish, node_id, publisher, items)
+    def publish(self, node_id, items, requestor):
+        d1 = self.storage.get_node_configuration(node_id)
+        d2 = self.storage.get_affiliation(node_id, requestor.full())
+        d = defer.DeferredList([d1, d2], fireOnOneErrback=1)
+        d.addErrback(lambda x: x.value[0])
+        d.addCallback(self.do_publish, node_id, items, requestor)
         return d
 
-    def magic_filter(self, subscribers, node_id, items):
+    def get_notification_list(self, node_id, items):
+        d = self.storage.get_subscribers(node_id)
+        d.addCallback(self._magic_filter, node_id, items)
+        return d
+
+    def _magic_filter(self, subscribers, node_id, items):
         list = {}
         for subscriber in subscribers:
             list[subscriber] = items
 
         return list
 
-    def get_subscribers(self, node_id):
-        d = self.dbpool.runQuery("""SELECT jid, resource FROM subscriptions
-                                    JOIN nodes ON (node_id=nodes.id)
-                                    JOIN entities ON (entity_id=entities.id)
-                                    WHERE node=%s AND
-                                          subscription='subscribed'""",
-                                 (node_id.encode('utf8'),))
-        d.addCallback(self.convert_to_jids)
-        return d
+    def store_items(self, node_id, items, publisher):
+        return defer.succeed(None)
+
+class PublishService(service.Service):
+
+    __implements__ = backend.IPublishService,
+    
+    def publish(self, node_id, items, requestor):
+        return self.parent.publish(node_id, items, requestor)
+
+class NotificationService(backend.NotificationService):
 
-    def convert_to_jids(self, list):
-        return [jid.JID("%s/%s" % (l[0], l[1])).full() for l in list]
+    __implements__ = backend.INotificationService,
+
+    def get_notification_list(self, node_id, items):
+        return self.parent.get_notification_list(node_id, items)
 
-    def storeItems(self, node_id, publisher, items):
-        pass
+class PersistenceService(service.Service):
+
+    __implements__ = backend.IPersistenceService,
+
+    def store_items(self, node_id, items, publisher):
+        return self.parent.store_items(node_id, items, publisher)