# HG changeset patch # User Ralph Meijer # Date 1099257063 0 # Node ID ea3d3544a52ef017e90a83250f1255b22d832d92 # Parent b9e7b3b6c68768ed2e27112f0eb33cbe2e9179f2 Rewrite using separated backend interfaces. The backend also uses a separate class for the actual storage. diff -r b9e7b3b6c687 -r ea3d3544a52e idavoll/pgsql_backend.py --- a/idavoll/pgsql_backend.py Sun Oct 31 21:08:40 2004 +0000 +++ b/idavoll/pgsql_backend.py Sun Oct 31 21:11:03 2004 +0000 @@ -4,40 +4,78 @@ from twisted.enterprise import adbapi import backend -class Service(service.Service): - """ PostgreSQL backend Service for a JEP-0060 pubsub service """ - - __implements__ = backend.IService +class Storage: + def __init__(self, user, database): + self.dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', user=user, + database=database) - def __init__(self): - self.dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', user='ralphm', - database='pubsub_test') - - def _do_publish(self, cursor, node_id, publisher, items): + def _get_node_configuration(self, cursor, node_id): + configuration = {} cursor.execute("""SELECT persistent, deliver_payload FROM nodes WHERE node=%s""", (node_id,)) try: - persist_items, deliver_payloads = cursor.fetchone() + (configuration["persist_items"], + configuration["deliver_payloads"]) = cursor.fetchone() + return configuration except TypeError: raise backend.NodeNotFound - cursor.execute("""SELECT 1 FROM affiliations + def get_node_configuration(self, node_id): + return self.dbpool.runInteraction(self._get_node_configuration, node_id) + + def _get_affiliation(self, cursor, node_id, entity): + cursor.execute("""SELECT affiliation FROM affiliations JOIN nodes ON (node_id=nodes.id) JOIN entities ON (entity_id=entities.id) - WHERE node=%s AND jid=%s AND - affiliation IN ('owner', 'publisher')""", - (node_id.encode('utf8'), publisher.encode('utf8'))) + WHERE node=%s AND jid=%s""", + (node_id.encode('utf8'), entity.encode('utf8'))) + + try: + return cursor.fetchone()[0] + except TypeError: + return None + + def get_affiliation(self, node_id, entity): + return self.dbpool.runInteraction(self._get_affiliation, node_id, + entity) - if not cursor.fetchone(): + def get_subscribers(self, node_id): + d = self.dbpool.runQuery("""SELECT jid, resource FROM subscriptions + JOIN nodes ON (node_id=nodes.id) + JOIN entities ON (entity_id=entities.id) + WHERE node=%s AND + subscription='subscribed'""", + (node_id.encode('utf8'),)) + d.addCallback(self._convert_to_jids) + return d + + def _convert_to_jids(self, list): + return [jid.JID("%s/%s" % (l[0], l[1])).full() for l in list] + +class BackendService(backend.BackendService): + """ PostgreSQL backend Service for a JEP-0060 pubsub service """ + + def __init__(self, storage): + backend.BackendService.__init__(self) + self.storage = storage + + def do_publish(self, result, node_id, items, requestor): + print result + configuration = result[0][1] + persist_items = configuration["persist_items"] + deliver_payloads = configuration["deliver_payloads"] + affiliation = result[1][1] + + if affiliation not in ['owner', 'publisher']: raise backend.NotAuthorized - + if items and not persist_items and not deliver_payloads: raise backend.NoPayloadAllowed elif not items and (persist_items or deliver_payloads): raise backend.PayloadExpected - print "publish by %s to %s" % (publisher, node_id) + print "publish by %s to %s" % (requestor.full(), node_id) if persist_items or deliver_payloads: for item in items: @@ -45,39 +83,60 @@ item["id"] = 'random' # FIXME if persist_items: - self.storeItems(node_id, publisher, items) + d = self.store_items(node_id, items, requestor.full()) + else: + d = defer.succeed(None) + d.addCallback(self.do_notify, node_id, items, deliver_payloads) + + def do_notify(self, result, node_id, items, deliver_payloads): if items and not deliver_payloads: for item in items: item.children = [] - recipients = self.get_subscribers(node_id) - recipients.addCallback(self.magic_filter, node_id, items) - recipients.addCallback(self.pubsub_service.do_notification, node_id) + self.dispatch({ 'items': items, 'node_id': node_id }, + '//event/pubsub/notify') - def do_publish(self, node_id, publisher, items): - d = self.dbpool.runInteraction(self._do_publish, node_id, publisher, items) + def publish(self, node_id, items, requestor): + d1 = self.storage.get_node_configuration(node_id) + d2 = self.storage.get_affiliation(node_id, requestor.full()) + d = defer.DeferredList([d1, d2], fireOnOneErrback=1) + d.addErrback(lambda x: x.value[0]) + d.addCallback(self.do_publish, node_id, items, requestor) return d - def magic_filter(self, subscribers, node_id, items): + def get_notification_list(self, node_id, items): + d = self.storage.get_subscribers(node_id) + d.addCallback(self._magic_filter, node_id, items) + return d + + def _magic_filter(self, subscribers, node_id, items): list = {} for subscriber in subscribers: list[subscriber] = items return list - def get_subscribers(self, node_id): - d = self.dbpool.runQuery("""SELECT jid, resource FROM subscriptions - JOIN nodes ON (node_id=nodes.id) - JOIN entities ON (entity_id=entities.id) - WHERE node=%s AND - subscription='subscribed'""", - (node_id.encode('utf8'),)) - d.addCallback(self.convert_to_jids) - return d + def store_items(self, node_id, items, publisher): + return defer.succeed(None) + +class PublishService(service.Service): + + __implements__ = backend.IPublishService, + + def publish(self, node_id, items, requestor): + return self.parent.publish(node_id, items, requestor) + +class NotificationService(backend.NotificationService): - def convert_to_jids(self, list): - return [jid.JID("%s/%s" % (l[0], l[1])).full() for l in list] + __implements__ = backend.INotificationService, + + def get_notification_list(self, node_id, items): + return self.parent.get_notification_list(node_id, items) - def storeItems(self, node_id, publisher, items): - pass +class PersistenceService(service.Service): + + __implements__ = backend.IPersistenceService, + + def store_items(self, node_id, items, publisher): + return self.parent.store_items(node_id, items, publisher)