Mercurial > libervia-pubsub
diff idavoll/pgsql_backend.py @ 43:9685b7e291ef
Moved common stuff out of pgsql_backend.py to backend.py.
Implemented Storage class for memory backend.
Implemented item storage for pgsql Storage.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Mon, 01 Nov 2004 12:37:40 +0000 |
parents | ea3d3544a52e |
children | 4447b3c5b857 |
line wrap: on
line diff
--- a/idavoll/pgsql_backend.py Sun Oct 31 21:12:55 2004 +0000 +++ b/idavoll/pgsql_backend.py Mon Nov 01 12:37:40 2004 +0000 @@ -41,7 +41,7 @@ entity) def get_subscribers(self, node_id): - d = self.dbpool.runQuery("""SELECT jid, resource FROM subscriptions + d = self.dbpool.runQuery("""SELECT jid, resource FROM subscriptions JOIN nodes ON (node_id=nodes.id) JOIN entities ON (entity_id=entities.id) WHERE node=%s AND @@ -53,73 +53,37 @@ def _convert_to_jids(self, list): return [jid.JID("%s/%s" % (l[0], l[1])).full() for l in list] + def store_items(self, node_id, items, publisher): + return self.dbpool.runInteraction(self._store_items, node_id, items, + publisher) + + def _store_items(self, cursor, node_id, items, publisher): + for item in items: + self._store_item(cursor, node_id, item, publisher) + + def _store_item(self, cursor, node_id, item, publisher): + data = item.toXml() + cursor.execute("""UPDATE items SET publisher=%s, data=%s + FROM nodes + WHERE nodes.id = items.node_id AND + nodes.node = %s and items.item=%s""", + (publisher.encode('utf8'), + data.encode('utf8'), + node_id.encode('utf8'), + item["id"].encode('utf8'))) + if cursor.rowcount == 1: + return + + cursor.execute("""INSERT INTO items (node_id, item, publisher, data) + SELECT id, %s, %s, %s FROM nodes WHERE node=%s""", + (item["id"].encode('utf8'), + publisher.encode('utf8'), + data.encode('utf8'), + node_id.encode('utf8'))) + class BackendService(backend.BackendService): """ PostgreSQL backend Service for a JEP-0060 pubsub service """ - def __init__(self, storage): - backend.BackendService.__init__(self) - self.storage = storage - - def do_publish(self, result, node_id, items, requestor): - print result - configuration = result[0][1] - persist_items = configuration["persist_items"] - deliver_payloads = configuration["deliver_payloads"] - affiliation = result[1][1] - - if affiliation not in ['owner', 'publisher']: - raise backend.NotAuthorized - - if items and not persist_items and not deliver_payloads: - raise backend.NoPayloadAllowed - elif not items and (persist_items or deliver_payloads): - raise backend.PayloadExpected - - print "publish by %s to %s" % (requestor.full(), node_id) - - if persist_items or deliver_payloads: - for item in items: - if item["id"] is None: - item["id"] = 'random' # FIXME - - if persist_items: - d = self.store_items(node_id, items, requestor.full()) - else: - d = defer.succeed(None) - - d.addCallback(self.do_notify, node_id, items, deliver_payloads) - - def do_notify(self, result, node_id, items, deliver_payloads): - if items and not deliver_payloads: - for item in items: - item.children = [] - - self.dispatch({ 'items': items, 'node_id': node_id }, - '//event/pubsub/notify') - - def publish(self, node_id, items, requestor): - d1 = self.storage.get_node_configuration(node_id) - d2 = self.storage.get_affiliation(node_id, requestor.full()) - d = defer.DeferredList([d1, d2], fireOnOneErrback=1) - d.addErrback(lambda x: x.value[0]) - d.addCallback(self.do_publish, node_id, items, requestor) - return d - - def get_notification_list(self, node_id, items): - d = self.storage.get_subscribers(node_id) - d.addCallback(self._magic_filter, node_id, items) - return d - - def _magic_filter(self, subscribers, node_id, items): - list = {} - for subscriber in subscribers: - list[subscriber] = items - - return list - - def store_items(self, node_id, items, publisher): - return defer.succeed(None) - class PublishService(service.Service): __implements__ = backend.IPublishService,