Mercurial > libervia-pubsub
view idavoll/pgsql_backend.py @ 43:9685b7e291ef
Moved common stuff out of pgsql_backend.py to backend.py.
Implemented Storage class for memory backend.
Implemented item storage for pgsql Storage.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Mon, 01 Nov 2004 12:37:40 +0000 |
parents | ea3d3544a52e |
children | 4447b3c5b857 |
line wrap: on
line source
from twisted.application import service from twisted.internet import defer from twisted.protocols.jabber import jid from twisted.enterprise import adbapi import backend class Storage: def __init__(self, user, database): self.dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', user=user, database=database) def _get_node_configuration(self, cursor, node_id): configuration = {} cursor.execute("""SELECT persistent, deliver_payload FROM nodes WHERE node=%s""", (node_id,)) try: (configuration["persist_items"], configuration["deliver_payloads"]) = cursor.fetchone() return configuration except TypeError: raise backend.NodeNotFound def get_node_configuration(self, node_id): return self.dbpool.runInteraction(self._get_node_configuration, node_id) def _get_affiliation(self, cursor, node_id, entity): cursor.execute("""SELECT affiliation FROM affiliations JOIN nodes ON (node_id=nodes.id) JOIN entities ON (entity_id=entities.id) WHERE node=%s AND jid=%s""", (node_id.encode('utf8'), entity.encode('utf8'))) try: return cursor.fetchone()[0] except TypeError: return None def get_affiliation(self, node_id, entity): return self.dbpool.runInteraction(self._get_affiliation, node_id, entity) def get_subscribers(self, node_id): d = self.dbpool.runQuery("""SELECT jid, resource FROM subscriptions JOIN nodes ON (node_id=nodes.id) JOIN entities ON (entity_id=entities.id) WHERE node=%s AND subscription='subscribed'""", (node_id.encode('utf8'),)) d.addCallback(self._convert_to_jids) return d def _convert_to_jids(self, list): return [jid.JID("%s/%s" % (l[0], l[1])).full() for l in list] def store_items(self, node_id, items, publisher): return self.dbpool.runInteraction(self._store_items, node_id, items, publisher) def _store_items(self, cursor, node_id, items, publisher): for item in items: self._store_item(cursor, node_id, item, publisher) def _store_item(self, cursor, node_id, item, publisher): data = item.toXml() cursor.execute("""UPDATE items SET publisher=%s, data=%s FROM nodes WHERE nodes.id = items.node_id AND nodes.node = %s and items.item=%s""", (publisher.encode('utf8'), data.encode('utf8'), node_id.encode('utf8'), item["id"].encode('utf8'))) if cursor.rowcount == 1: return cursor.execute("""INSERT INTO items (node_id, item, publisher, data) SELECT id, %s, %s, %s FROM nodes WHERE node=%s""", (item["id"].encode('utf8'), publisher.encode('utf8'), data.encode('utf8'), node_id.encode('utf8'))) class BackendService(backend.BackendService): """ PostgreSQL backend Service for a JEP-0060 pubsub service """ class PublishService(service.Service): __implements__ = backend.IPublishService, def publish(self, node_id, items, requestor): return self.parent.publish(node_id, items, requestor) class NotificationService(backend.NotificationService): __implements__ = backend.INotificationService, def get_notification_list(self, node_id, items): return self.parent.get_notification_list(node_id, items) class PersistenceService(service.Service): __implements__ = backend.IPersistenceService, def store_items(self, node_id, items, publisher): return self.parent.store_items(node_id, items, publisher)