Mercurial > libervia-pubsub
view idavoll/pgsql_backend.py @ 41:ea3d3544a52e
Rewrite using separated backend interfaces. The backend also uses a separate
class for the actual storage.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Sun, 31 Oct 2004 21:11:03 +0000 |
parents | 39d0c6fa027f |
children | 9685b7e291ef |
line wrap: on
line source
from twisted.application import service from twisted.internet import defer from twisted.protocols.jabber import jid from twisted.enterprise import adbapi import backend class Storage: def __init__(self, user, database): self.dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', user=user, database=database) def _get_node_configuration(self, cursor, node_id): configuration = {} cursor.execute("""SELECT persistent, deliver_payload FROM nodes WHERE node=%s""", (node_id,)) try: (configuration["persist_items"], configuration["deliver_payloads"]) = cursor.fetchone() return configuration except TypeError: raise backend.NodeNotFound def get_node_configuration(self, node_id): return self.dbpool.runInteraction(self._get_node_configuration, node_id) def _get_affiliation(self, cursor, node_id, entity): cursor.execute("""SELECT affiliation FROM affiliations JOIN nodes ON (node_id=nodes.id) JOIN entities ON (entity_id=entities.id) WHERE node=%s AND jid=%s""", (node_id.encode('utf8'), entity.encode('utf8'))) try: return cursor.fetchone()[0] except TypeError: return None def get_affiliation(self, node_id, entity): return self.dbpool.runInteraction(self._get_affiliation, node_id, entity) def get_subscribers(self, node_id): d = self.dbpool.runQuery("""SELECT jid, resource FROM subscriptions JOIN nodes ON (node_id=nodes.id) JOIN entities ON (entity_id=entities.id) WHERE node=%s AND subscription='subscribed'""", (node_id.encode('utf8'),)) d.addCallback(self._convert_to_jids) return d def _convert_to_jids(self, list): return [jid.JID("%s/%s" % (l[0], l[1])).full() for l in list] class BackendService(backend.BackendService): """ PostgreSQL backend Service for a JEP-0060 pubsub service """ def __init__(self, storage): backend.BackendService.__init__(self) self.storage = storage def do_publish(self, result, node_id, items, requestor): print result configuration = result[0][1] persist_items = configuration["persist_items"] deliver_payloads = configuration["deliver_payloads"] affiliation = result[1][1] if affiliation not in ['owner', 'publisher']: raise backend.NotAuthorized if items and not persist_items and not deliver_payloads: raise backend.NoPayloadAllowed elif not items and (persist_items or deliver_payloads): raise backend.PayloadExpected print "publish by %s to %s" % (requestor.full(), node_id) if persist_items or deliver_payloads: for item in items: if item["id"] is None: item["id"] = 'random' # FIXME if persist_items: d = self.store_items(node_id, items, requestor.full()) else: d = defer.succeed(None) d.addCallback(self.do_notify, node_id, items, deliver_payloads) def do_notify(self, result, node_id, items, deliver_payloads): if items and not deliver_payloads: for item in items: item.children = [] self.dispatch({ 'items': items, 'node_id': node_id }, '//event/pubsub/notify') def publish(self, node_id, items, requestor): d1 = self.storage.get_node_configuration(node_id) d2 = self.storage.get_affiliation(node_id, requestor.full()) d = defer.DeferredList([d1, d2], fireOnOneErrback=1) d.addErrback(lambda x: x.value[0]) d.addCallback(self.do_publish, node_id, items, requestor) return d def get_notification_list(self, node_id, items): d = self.storage.get_subscribers(node_id) d.addCallback(self._magic_filter, node_id, items) return d def _magic_filter(self, subscribers, node_id, items): list = {} for subscriber in subscribers: list[subscriber] = items return list def store_items(self, node_id, items, publisher): return defer.succeed(None) class PublishService(service.Service): __implements__ = backend.IPublishService, def publish(self, node_id, items, requestor): return self.parent.publish(node_id, items, requestor) class NotificationService(backend.NotificationService): __implements__ = backend.INotificationService, def get_notification_list(self, node_id, items): return self.parent.get_notification_list(node_id, items) class PersistenceService(service.Service): __implements__ = backend.IPersistenceService, def store_items(self, node_id, items, publisher): return self.parent.store_items(node_id, items, publisher)