changeset 43:9685b7e291ef

Moved common stuff out of pgsql_backend.py to backend.py. Implemented Storage class for memory backend. Implemented item storage for pgsql Storage.
author Ralph Meijer <ralphm@ik.nu>
date Mon, 01 Nov 2004 12:37:40 +0000
parents 7d088c61e131
children bc7438476a67
files idavoll/backend.py idavoll/idavoll.py idavoll/memory_backend.py idavoll/pgsql_backend.py
diffstat 4 files changed, 133 insertions(+), 137 deletions(-) [+]
line wrap: on
line diff
--- a/idavoll/backend.py	Sun Oct 31 21:12:55 2004 +0000
+++ b/idavoll/backend.py	Mon Nov 01 12:37:40 2004 +0000
@@ -1,6 +1,7 @@
 from twisted.python import components
 from twisted.application import service
 from twisted.xish import utility
+from twisted.internet import defer
 
 class Error(Exception):
     msg = ''
@@ -140,13 +141,74 @@
 
     __implements__ = IBackendService,
 
-    def __init__(self):
+    def __init__(self, storage):
         service.MultiService.__init__(self)
         utility.EventDispatcher.__init__(self)
+        self.storage = storage
 
     def get_supported_affiliations(self):
         return ['none', 'owner', 'outcast', 'publisher']
 
+    def publish(self, node_id, items, requestor):
+        d1 = self.storage.get_node_configuration(node_id)
+        d2 = self.storage.get_affiliation(node_id, requestor.full())
+        d = defer.DeferredList([d1, d2], fireOnOneErrback=1)
+        d.addErrback(lambda x: x.value[0])
+        d.addCallback(self._do_publish, node_id, items, requestor)
+        return d
+
+    def _do_publish(self, result, node_id, items, requestor):
+        print result
+        configuration = result[0][1]
+        persist_items = configuration["persist_items"]
+        deliver_payloads = configuration["deliver_payloads"]
+        affiliation = result[1][1]
+
+        if affiliation not in ['owner', 'publisher']:
+            raise NotAuthorized
+
+        if items and not persist_items and not deliver_payloads:
+            raise NoPayloadAllowed
+        elif not items and (persist_items or deliver_payloads):
+            raise PayloadExpected
+
+        print "publish by %s to %s" % (requestor.full(), node_id)
+
+        if persist_items or deliver_payloads:
+            for item in items:
+                if item["id"] is None:
+                    item["id"] = 'random'   # FIXME
+
+        if persist_items:
+            d = self.store_items(node_id, items, requestor.full())
+        else:
+            d = defer.succeed(None)
+
+        d.addCallback(self._do_notify, node_id, items, deliver_payloads)
+
+    def _do_notify(self, result, node_id, items, deliver_payloads):
+        if items and not deliver_payloads:
+            for item in items:
+                item.children = []
+
+        self.dispatch({ 'items': items, 'node_id': node_id },
+                      '//event/pubsub/notify')
+
+    def get_notification_list(self, node_id, items):
+        d = self.storage.get_subscribers(node_id)
+        d.addCallback(self._magic_filter, node_id, items)
+        return d
+
+    def _magic_filter(self, subscribers, node_id, items):
+        list = {}
+        for subscriber in subscribers:
+            list[subscriber] = items
+
+        return list
+
+    def store_items(self, node_id, items, publisher):
+        return self.storage.store_items(node_id, items, publisher)
+
 class NotificationService(service.Service):
 
     def register_notifier(self, observerfn, *args, **kwargs):
--- a/idavoll/idavoll.py	Sun Oct 31 21:12:55 2004 +0000
+++ b/idavoll/idavoll.py	Mon Nov 01 12:37:40 2004 +0000
@@ -90,11 +90,11 @@
     if config['backend'] == 'pgsql':
         import pgsql_backend as b
         st = b.Storage(user=config['dbuser'], database=config['dbname'])
-        bs = b.BackendService(st)
     elif config['backend'] == 'memory':
         import memory_backend as b
-        bs = b.BackendService()
+        st = b.Storage()
 
+    bs = b.BackendService(st)
 
     component.IService(bs).setServiceParent(sm)
 
--- a/idavoll/memory_backend.py	Sun Oct 31 21:12:55 2004 +0000
+++ b/idavoll/memory_backend.py	Mon Nov 01 12:37:40 2004 +0000
@@ -20,11 +20,8 @@
         self.affiliations = {}
         self.items = {}
 
-class BackendService(backend.BackendService):
-
+class Storage:
     def __init__(self):
-        backend.BackendService.__init__(self)
-
         self.nodes = {}
 
         node = Node("ralphm/mood/ralphm@ik.nu")
@@ -35,7 +32,44 @@
         node.configuration.persist_items = True
         node.configuration.deliver_payloads = True
         self.nodes[node.id] = node
-    
+
+    def get_node_configuration(self, node_id):
+        try:
+            node = self.nodes[node_id]
+        except KeyError:
+            raise backend.NodeNotFound
+        else:
+            c = self.nodes[node_id].configuration
+            return defer.succeed({'persist_items': c.persist_items,
+                                  'deliver_payloads': c.deliver_payloads})
+
+    def get_affiliation(self, node_id, entity):
+        try:
+            node = self.nodes[node_id]
+        except KeyError:
+            raise backend.NodeNotFound
+        else:
+            return defer.succeed(node.affiliations.get(entity, None))
+
+    def get_subscribers(self, node_id):
+        try:
+            node = self.nodes[node_id]
+        except KeyError:
+            raise backend.NodeNotFound
+        else:
+            subscriptions = self.nodes[node_id].subscriptions
+            subscribers = [s for s in subscriptions
+                             if subscriptions[s].state == 'subscribed']
+            return defer.succeed(subscribers)
+
+    def store_items(self, node_id, items, publisher):
+        for item in items:
+            self.nodes[node_id].items[item["id"]] = (item, publisher)
+            print self.nodes[node_id].items
+        return defer.succeed(None)
+
+class BackendService(backend.BackendService):
+
     def create_node(self, node_id, requestor):
         if not node_id:
             raise backend.NoInstantNodes
@@ -49,65 +83,6 @@
 
         return defer.succeed({'node_id': node.id})
 
-    def publish(self, node_id, items, requestor):
-        try:
-            node = self.nodes[node_id]
-            persist_items = node.configuration.persist_items
-            deliver_payloads = node.configuration.deliver_payloads
-        except KeyError:
-            raise backend.NodeNotFound
-
-        try:
-            if node.affiliations[requestor.full()] not in \
-               ['owner', 'publisher']:
-                raise backend.NotAuthorized
-        except KeyError:
-            raise backend.NotAuthorized
-
-        if items and not persist_items and not deliver_payloads:
-            raise backend.NoPayloadAllowed
-        elif not items and (persist_items or deliver_payloads):
-            raise backend.PayloadExpected
-
-        print "publish by %s to %s" % (requestor.full(), node_id)
-
-        if persist_items or deliver_payloads:
-            for item in items:
-                if item["id"] is None:
-                    item["id"] = 'random'   # FIXME
-
-        if persist_items:
-            self.store_items(node_id, items, requestor.full())
-
-        if items and not deliver_payloads:
-            for item in items:
-                item.children = []
-
-        self.dispatch({ 'items': items, 'node_id': node_id },
-                      '//event/pubsub/notify')
-        return defer.succeed(None)
-
-    def get_notification_list(self, node_id, items):
-        subscriptions = self.nodes[node_id].subscriptions
-        
-        try:
-            subscribers = [s for s in subscriptions
-                             if subscriptions[s].state == 'subscribed']
-            d = defer.succeed(subscribers)
-        except:
-            d = defer.fail()
-
-        d.addCallback(self._magic_filter, node_id, items)
-
-        return d
-
-    def _magic_filter(self, subscribers, node_id, items):
-        list = {}
-        for subscriber in subscribers:
-            list[subscriber] = items
-
-        return list
-
     def subscribe(self, node_id, subscriber, requestor):
         # expect subscriber and requestor to be a jid.JID 
         try:
@@ -153,11 +128,6 @@
 
         return defer.succeed(None)
 
-    def store_items(self, node_id, items, publisher):
-        for item in items:
-            self.nodes[node_id].items[item["id"]] = item
-            print self.nodes[node_id].items
-
 class NodeCreationService(service.Service):
 
     __implements__ = backend.INodeCreationService,
--- a/idavoll/pgsql_backend.py	Sun Oct 31 21:12:55 2004 +0000
+++ b/idavoll/pgsql_backend.py	Mon Nov 01 12:37:40 2004 +0000
@@ -41,7 +41,7 @@
                                           entity)
 
     def get_subscribers(self, node_id):
-        d =  self.dbpool.runQuery("""SELECT jid, resource FROM subscriptions
+        d = self.dbpool.runQuery("""SELECT jid, resource FROM subscriptions
                                        JOIN nodes ON (node_id=nodes.id)
                                        JOIN entities ON (entity_id=entities.id)
                                        WHERE node=%s AND
@@ -53,73 +53,37 @@
     def _convert_to_jids(self, list):
         return [jid.JID("%s/%s" % (l[0], l[1])).full() for l in list]
 
+    def store_items(self, node_id, items, publisher):
+        return self.dbpool.runInteraction(self._store_items, node_id, items,
+                                          publisher)
+
+    def _store_items(self, cursor, node_id, items, publisher):
+        for item in items:
+            self._store_item(cursor, node_id, item, publisher)
+
+    def _store_item(self, cursor, node_id, item, publisher):
+        data = item.toXml()
+        cursor.execute("""UPDATE items SET publisher=%s, data=%s
+                          FROM nodes
+                          WHERE nodes.id = items.node_id AND
+                                nodes.node = %s and items.item=%s""",
+                       (publisher.encode('utf8'),
+                        data.encode('utf8'),
+                        node_id.encode('utf8'),
+                        item["id"].encode('utf8')))
+        if cursor.rowcount == 1:
+            return
+
+        cursor.execute("""INSERT INTO items (node_id, item, publisher, data)
+                          SELECT id, %s, %s, %s FROM nodes WHERE node=%s""",
+                       (item["id"].encode('utf8'),
+                        publisher.encode('utf8'),
+                        data.encode('utf8'),
+                        node_id.encode('utf8')))
+
 class BackendService(backend.BackendService):
     """ PostgreSQL backend Service for a JEP-0060 pubsub service """
 
-    def __init__(self, storage):
-        backend.BackendService.__init__(self)
-        self.storage = storage
-
-    def do_publish(self, result, node_id, items, requestor):
-        print result
-        configuration = result[0][1]
-        persist_items = configuration["persist_items"]
-        deliver_payloads = configuration["deliver_payloads"]
-        affiliation = result[1][1]
-
-        if affiliation not in ['owner', 'publisher']:
-            raise backend.NotAuthorized
-
-        if items and not persist_items and not deliver_payloads:
-            raise backend.NoPayloadAllowed
-        elif not items and (persist_items or deliver_payloads):
-            raise backend.PayloadExpected
-
-        print "publish by %s to %s" % (requestor.full(), node_id)
-
-        if persist_items or deliver_payloads:
-            for item in items:
-                if item["id"] is None:
-                    item["id"] = 'random'   # FIXME
-
-        if persist_items:
-            d = self.store_items(node_id, items, requestor.full())
-        else:
-            d = defer.succeed(None)
-
-        d.addCallback(self.do_notify, node_id, items, deliver_payloads)
-
-    def do_notify(self, result, node_id, items, deliver_payloads):
-        if items and not deliver_payloads:
-            for item in items:
-                item.children = []
-
-        self.dispatch({ 'items': items, 'node_id': node_id },
-                      '//event/pubsub/notify')
-
-    def publish(self, node_id, items, requestor):
-        d1 = self.storage.get_node_configuration(node_id)
-        d2 = self.storage.get_affiliation(node_id, requestor.full())
-        d = defer.DeferredList([d1, d2], fireOnOneErrback=1)
-        d.addErrback(lambda x: x.value[0])
-        d.addCallback(self.do_publish, node_id, items, requestor)
-        return d
-
-    def get_notification_list(self, node_id, items):
-        d = self.storage.get_subscribers(node_id)
-        d.addCallback(self._magic_filter, node_id, items)
-        return d
-
-    def _magic_filter(self, subscribers, node_id, items):
-        list = {}
-        for subscriber in subscribers:
-            list[subscriber] = items
-
-        return list
-
-    def store_items(self, node_id, items, publisher):
-        return defer.succeed(None)
-
 class PublishService(service.Service):
 
     __implements__ = backend.IPublishService,