diff idavoll/pgsql_backend.py @ 43:9685b7e291ef

Moved common stuff out of pgsql_backend.py to backend.py. Implemented Storage class for memory backend. Implemented item storage for pgsql Storage.
author Ralph Meijer <ralphm@ik.nu>
date Mon, 01 Nov 2004 12:37:40 +0000
parents ea3d3544a52e
children 4447b3c5b857
line wrap: on
line diff
--- a/idavoll/pgsql_backend.py	Sun Oct 31 21:12:55 2004 +0000
+++ b/idavoll/pgsql_backend.py	Mon Nov 01 12:37:40 2004 +0000
@@ -41,7 +41,7 @@
                                           entity)
 
     def get_subscribers(self, node_id):
-        d =  self.dbpool.runQuery("""SELECT jid, resource FROM subscriptions
+        d = self.dbpool.runQuery("""SELECT jid, resource FROM subscriptions
                                        JOIN nodes ON (node_id=nodes.id)
                                        JOIN entities ON (entity_id=entities.id)
                                        WHERE node=%s AND
@@ -53,73 +53,37 @@
     def _convert_to_jids(self, list):
         return [jid.JID("%s/%s" % (l[0], l[1])).full() for l in list]
 
+    def store_items(self, node_id, items, publisher):
+        return self.dbpool.runInteraction(self._store_items, node_id, items,
+                                          publisher)
+
+    def _store_items(self, cursor, node_id, items, publisher):
+        for item in items:
+            self._store_item(cursor, node_id, item, publisher)
+
+    def _store_item(self, cursor, node_id, item, publisher):
+        data = item.toXml()
+        cursor.execute("""UPDATE items SET publisher=%s, data=%s
+                          FROM nodes
+                          WHERE nodes.id = items.node_id AND
+                                nodes.node = %s and items.item=%s""",
+                       (publisher.encode('utf8'),
+                        data.encode('utf8'),
+                        node_id.encode('utf8'),
+                        item["id"].encode('utf8')))
+        if cursor.rowcount == 1:
+            return
+
+        cursor.execute("""INSERT INTO items (node_id, item, publisher, data)
+                          SELECT id, %s, %s, %s FROM nodes WHERE node=%s""",
+                       (item["id"].encode('utf8'),
+                        publisher.encode('utf8'),
+                        data.encode('utf8'),
+                        node_id.encode('utf8')))
+
 class BackendService(backend.BackendService):
     """ PostgreSQL backend Service for a JEP-0060 pubsub service """
 
-    def __init__(self, storage):
-        backend.BackendService.__init__(self)
-        self.storage = storage
-
-    def do_publish(self, result, node_id, items, requestor):
-        print result
-        configuration = result[0][1]
-        persist_items = configuration["persist_items"]
-        deliver_payloads = configuration["deliver_payloads"]
-        affiliation = result[1][1]
-
-        if affiliation not in ['owner', 'publisher']:
-            raise backend.NotAuthorized
-
-        if items and not persist_items and not deliver_payloads:
-            raise backend.NoPayloadAllowed
-        elif not items and (persist_items or deliver_payloads):
-            raise backend.PayloadExpected
-
-        print "publish by %s to %s" % (requestor.full(), node_id)
-
-        if persist_items or deliver_payloads:
-            for item in items:
-                if item["id"] is None:
-                    item["id"] = 'random'   # FIXME
-
-        if persist_items:
-            d = self.store_items(node_id, items, requestor.full())
-        else:
-            d = defer.succeed(None)
-
-        d.addCallback(self.do_notify, node_id, items, deliver_payloads)
-
-    def do_notify(self, result, node_id, items, deliver_payloads):
-        if items and not deliver_payloads:
-            for item in items:
-                item.children = []
-
-        self.dispatch({ 'items': items, 'node_id': node_id },
-                      '//event/pubsub/notify')
-
-    def publish(self, node_id, items, requestor):
-        d1 = self.storage.get_node_configuration(node_id)
-        d2 = self.storage.get_affiliation(node_id, requestor.full())
-        d = defer.DeferredList([d1, d2], fireOnOneErrback=1)
-        d.addErrback(lambda x: x.value[0])
-        d.addCallback(self.do_publish, node_id, items, requestor)
-        return d
-
-    def get_notification_list(self, node_id, items):
-        d = self.storage.get_subscribers(node_id)
-        d.addCallback(self._magic_filter, node_id, items)
-        return d
-
-    def _magic_filter(self, subscribers, node_id, items):
-        list = {}
-        for subscriber in subscribers:
-            list[subscriber] = items
-
-        return list
-
-    def store_items(self, node_id, items, publisher):
-        return defer.succeed(None)
-
 class PublishService(service.Service):
 
     __implements__ = backend.IPublishService,