Mercurial > libervia-pubsub
changeset 43:9685b7e291ef
Moved common stuff out of pgsql_backend.py to backend.py.
Implemented Storage class for memory backend.
Implemented item storage for pgsql Storage.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Mon, 01 Nov 2004 12:37:40 +0000 |
parents | 7d088c61e131 |
children | bc7438476a67 |
files | idavoll/backend.py idavoll/idavoll.py idavoll/memory_backend.py idavoll/pgsql_backend.py |
diffstat | 4 files changed, 133 insertions(+), 137 deletions(-) [+] |
line wrap: on
line diff
--- a/idavoll/backend.py Sun Oct 31 21:12:55 2004 +0000 +++ b/idavoll/backend.py Mon Nov 01 12:37:40 2004 +0000 @@ -1,6 +1,7 @@ from twisted.python import components from twisted.application import service from twisted.xish import utility +from twisted.internet import defer class Error(Exception): msg = '' @@ -140,13 +141,74 @@ __implements__ = IBackendService, - def __init__(self): + def __init__(self, storage): service.MultiService.__init__(self) utility.EventDispatcher.__init__(self) + self.storage = storage def get_supported_affiliations(self): return ['none', 'owner', 'outcast', 'publisher'] + def publish(self, node_id, items, requestor): + d1 = self.storage.get_node_configuration(node_id) + d2 = self.storage.get_affiliation(node_id, requestor.full()) + d = defer.DeferredList([d1, d2], fireOnOneErrback=1) + d.addErrback(lambda x: x.value[0]) + d.addCallback(self._do_publish, node_id, items, requestor) + return d + + def _do_publish(self, result, node_id, items, requestor): + print result + configuration = result[0][1] + persist_items = configuration["persist_items"] + deliver_payloads = configuration["deliver_payloads"] + affiliation = result[1][1] + + if affiliation not in ['owner', 'publisher']: + raise NotAuthorized + + if items and not persist_items and not deliver_payloads: + raise NoPayloadAllowed + elif not items and (persist_items or deliver_payloads): + raise PayloadExpected + + print "publish by %s to %s" % (requestor.full(), node_id) + + if persist_items or deliver_payloads: + for item in items: + if item["id"] is None: + item["id"] = 'random' # FIXME + + if persist_items: + d = self.store_items(node_id, items, requestor.full()) + else: + d = defer.succeed(None) + + d.addCallback(self._do_notify, node_id, items, deliver_payloads) + + def _do_notify(self, result, node_id, items, deliver_payloads): + if items and not deliver_payloads: + for item in items: + item.children = [] + + self.dispatch({ 'items': items, 'node_id': node_id }, + '//event/pubsub/notify') + + def get_notification_list(self, node_id, items): + d = self.storage.get_subscribers(node_id) + d.addCallback(self._magic_filter, node_id, items) + return d + + def _magic_filter(self, subscribers, node_id, items): + list = {} + for subscriber in subscribers: + list[subscriber] = items + + return list + + def store_items(self, node_id, items, publisher): + return self.storage.store_items(node_id, items, publisher) + class NotificationService(service.Service): def register_notifier(self, observerfn, *args, **kwargs):
--- a/idavoll/idavoll.py Sun Oct 31 21:12:55 2004 +0000 +++ b/idavoll/idavoll.py Mon Nov 01 12:37:40 2004 +0000 @@ -90,11 +90,11 @@ if config['backend'] == 'pgsql': import pgsql_backend as b st = b.Storage(user=config['dbuser'], database=config['dbname']) - bs = b.BackendService(st) elif config['backend'] == 'memory': import memory_backend as b - bs = b.BackendService() + st = b.Storage() + bs = b.BackendService(st) component.IService(bs).setServiceParent(sm)
--- a/idavoll/memory_backend.py Sun Oct 31 21:12:55 2004 +0000 +++ b/idavoll/memory_backend.py Mon Nov 01 12:37:40 2004 +0000 @@ -20,11 +20,8 @@ self.affiliations = {} self.items = {} -class BackendService(backend.BackendService): - +class Storage: def __init__(self): - backend.BackendService.__init__(self) - self.nodes = {} node = Node("ralphm/mood/ralphm@ik.nu") @@ -35,7 +32,44 @@ node.configuration.persist_items = True node.configuration.deliver_payloads = True self.nodes[node.id] = node - + + def get_node_configuration(self, node_id): + try: + node = self.nodes[node_id] + except KeyError: + raise backend.NodeNotFound + else: + c = self.nodes[node_id].configuration + return defer.succeed({'persist_items': c.persist_items, + 'deliver_payloads': c.deliver_payloads}) + + def get_affiliation(self, node_id, entity): + try: + node = self.nodes[node_id] + except KeyError: + raise backend.NodeNotFound + else: + return defer.succeed(node.affiliations.get(entity, None)) + + def get_subscribers(self, node_id): + try: + node = self.nodes[node_id] + except KeyError: + raise backend.NodeNotFound + else: + subscriptions = self.nodes[node_id].subscriptions + subscribers = [s for s in subscriptions + if subscriptions[s].state == 'subscribed'] + return defer.succeed(subscribers) + + def store_items(self, node_id, items, publisher): + for item in items: + self.nodes[node_id].items[item["id"]] = (item, publisher) + print self.nodes[node_id].items + return defer.succeed(None) + +class BackendService(backend.BackendService): + def create_node(self, node_id, requestor): if not node_id: raise backend.NoInstantNodes @@ -49,65 +83,6 @@ return defer.succeed({'node_id': node.id}) - def publish(self, node_id, items, requestor): - try: - node = self.nodes[node_id] - persist_items = node.configuration.persist_items - deliver_payloads = node.configuration.deliver_payloads - except KeyError: - raise backend.NodeNotFound - - try: - if node.affiliations[requestor.full()] not in \ - ['owner', 'publisher']: - raise backend.NotAuthorized - except KeyError: - raise backend.NotAuthorized - - if items and not persist_items and not deliver_payloads: - raise backend.NoPayloadAllowed - elif not items and (persist_items or deliver_payloads): - raise backend.PayloadExpected - - print "publish by %s to %s" % (requestor.full(), node_id) - - if persist_items or deliver_payloads: - for item in items: - if item["id"] is None: - item["id"] = 'random' # FIXME - - if persist_items: - self.store_items(node_id, items, requestor.full()) - - if items and not deliver_payloads: - for item in items: - item.children = [] - - self.dispatch({ 'items': items, 'node_id': node_id }, - '//event/pubsub/notify') - return defer.succeed(None) - - def get_notification_list(self, node_id, items): - subscriptions = self.nodes[node_id].subscriptions - - try: - subscribers = [s for s in subscriptions - if subscriptions[s].state == 'subscribed'] - d = defer.succeed(subscribers) - except: - d = defer.fail() - - d.addCallback(self._magic_filter, node_id, items) - - return d - - def _magic_filter(self, subscribers, node_id, items): - list = {} - for subscriber in subscribers: - list[subscriber] = items - - return list - def subscribe(self, node_id, subscriber, requestor): # expect subscriber and requestor to be a jid.JID try: @@ -153,11 +128,6 @@ return defer.succeed(None) - def store_items(self, node_id, items, publisher): - for item in items: - self.nodes[node_id].items[item["id"]] = item - print self.nodes[node_id].items - class NodeCreationService(service.Service): __implements__ = backend.INodeCreationService,
--- a/idavoll/pgsql_backend.py Sun Oct 31 21:12:55 2004 +0000 +++ b/idavoll/pgsql_backend.py Mon Nov 01 12:37:40 2004 +0000 @@ -41,7 +41,7 @@ entity) def get_subscribers(self, node_id): - d = self.dbpool.runQuery("""SELECT jid, resource FROM subscriptions + d = self.dbpool.runQuery("""SELECT jid, resource FROM subscriptions JOIN nodes ON (node_id=nodes.id) JOIN entities ON (entity_id=entities.id) WHERE node=%s AND @@ -53,73 +53,37 @@ def _convert_to_jids(self, list): return [jid.JID("%s/%s" % (l[0], l[1])).full() for l in list] + def store_items(self, node_id, items, publisher): + return self.dbpool.runInteraction(self._store_items, node_id, items, + publisher) + + def _store_items(self, cursor, node_id, items, publisher): + for item in items: + self._store_item(cursor, node_id, item, publisher) + + def _store_item(self, cursor, node_id, item, publisher): + data = item.toXml() + cursor.execute("""UPDATE items SET publisher=%s, data=%s + FROM nodes + WHERE nodes.id = items.node_id AND + nodes.node = %s and items.item=%s""", + (publisher.encode('utf8'), + data.encode('utf8'), + node_id.encode('utf8'), + item["id"].encode('utf8'))) + if cursor.rowcount == 1: + return + + cursor.execute("""INSERT INTO items (node_id, item, publisher, data) + SELECT id, %s, %s, %s FROM nodes WHERE node=%s""", + (item["id"].encode('utf8'), + publisher.encode('utf8'), + data.encode('utf8'), + node_id.encode('utf8'))) + class BackendService(backend.BackendService): """ PostgreSQL backend Service for a JEP-0060 pubsub service """ - def __init__(self, storage): - backend.BackendService.__init__(self) - self.storage = storage - - def do_publish(self, result, node_id, items, requestor): - print result - configuration = result[0][1] - persist_items = configuration["persist_items"] - deliver_payloads = configuration["deliver_payloads"] - affiliation = result[1][1] - - if affiliation not in ['owner', 'publisher']: - raise backend.NotAuthorized - - if items and not persist_items and not deliver_payloads: - raise backend.NoPayloadAllowed - elif not items and (persist_items or deliver_payloads): - raise backend.PayloadExpected - - print "publish by %s to %s" % (requestor.full(), node_id) - - if persist_items or deliver_payloads: - for item in items: - if item["id"] is None: - item["id"] = 'random' # FIXME - - if persist_items: - d = self.store_items(node_id, items, requestor.full()) - else: - d = defer.succeed(None) - - d.addCallback(self.do_notify, node_id, items, deliver_payloads) - - def do_notify(self, result, node_id, items, deliver_payloads): - if items and not deliver_payloads: - for item in items: - item.children = [] - - self.dispatch({ 'items': items, 'node_id': node_id }, - '//event/pubsub/notify') - - def publish(self, node_id, items, requestor): - d1 = self.storage.get_node_configuration(node_id) - d2 = self.storage.get_affiliation(node_id, requestor.full()) - d = defer.DeferredList([d1, d2], fireOnOneErrback=1) - d.addErrback(lambda x: x.value[0]) - d.addCallback(self.do_publish, node_id, items, requestor) - return d - - def get_notification_list(self, node_id, items): - d = self.storage.get_subscribers(node_id) - d.addCallback(self._magic_filter, node_id, items) - return d - - def _magic_filter(self, subscribers, node_id, items): - list = {} - for subscriber in subscribers: - list[subscriber] = items - - return list - - def store_items(self, node_id, items, publisher): - return defer.succeed(None) - class PublishService(service.Service): __implements__ = backend.IPublishService,