Mercurial > libervia-pubsub
changeset 107:d252d793f0ed
Initial revision.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Fri, 08 Apr 2005 10:15:02 +0000 |
parents | dc36882d2620 |
children | 1c18759d2afb |
files | idavoll/generic_backend.py idavoll/memory_storage.py idavoll/pgsql_storage.py idavoll/storage.py |
diffstat | 4 files changed, 898 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/idavoll/generic_backend.py Fri Apr 08 10:15:02 2005 +0000 @@ -0,0 +1,390 @@ +import sha +import time +from twisted.words.protocols.jabber import jid +from twisted.application import service +from twisted.xish import utility +from twisted.internet import defer +from zope.interface import implements +import backend + +def _get_affiliation(node, entity): + d = node.get_affiliation(entity) + d.addCallback(lambda affiliation: (node, affiliation)) + return d + +class BackendService(service.MultiService, utility.EventDispatcher): + + implements(backend.IBackendService) + + options = {"pubsub#persist_items": + {"type": "boolean", + "label": "Persist items to storage"}, + "pubsub#deliver_payloads": + {"type": "boolean", + "label": "Deliver payloads with event notifications"}, + } + + default_config = {"pubsub#persist_items": True, + "pubsub#deliver_payloads": True, + } + + def __init__(self, storage): + service.MultiService.__init__(self) + utility.EventDispatcher.__init__(self) + self.storage = storage + + def supports_publisher_affiliation(self): + return True + + def supports_outcast_affiliation(self): + return True + + def supports_persistent_items(self): + return True + + def get_node_type(self, node_id): + d = self.storage.get_node(node_id) + d.addCallback(lambda node: node.get_type()) + return d + + def get_nodes(self): + return self.storage.get_node_ids() + + def get_node_meta_data(self, node_id): + d = self.storage.get_node(node_id) + d.addCallback(lambda node: node.get_meta_data()) + d.addCallback(self._make_meta_data) + return d + + def _make_meta_data(self, meta_data): + options = [] + for key, value in meta_data.iteritems(): + if self.options.has_key(key): + option = {"var": key} + option.update(self.options[key]) + option["value"] = value + options.append(option) + + return options + +class PublishService(service.Service): + + implements(backend.IPublishService) + + def publish(self, node_id, items, requestor): + d = self.parent.storage.get_node(node_id) + d.addCallback(_get_affiliation, requestor) + d.addCallback(self._do_publish, items, requestor) + return d + + def _do_publish(self, result, items, requestor): + node, affiliation = result + configuration = node.get_configuration() + persist_items = configuration["pubsub#persist_items"] + deliver_payloads = configuration["pubsub#deliver_payloads"] + + if affiliation not in ['owner', 'publisher']: + raise backend.NotAuthorized + + if items and not persist_items and not deliver_payloads: + raise backend.NoPayloadAllowed + elif not items and (persist_items or deliver_payloads): + raise backend.PayloadExpected + + if persist_items or deliver_payloads: + for item in items: + if not item.getAttribute("id"): + item["id"] = sha.new(str(time.time()) + + requestor.full()).hexdigest() + + if persist_items: + d = node.store_items(items, requestor) + else: + d = defer.succeed(None) + + d.addCallback(self._do_notify, node.id, items, deliver_payloads) + + def _do_notify(self, result, node_id, items, deliver_payloads): + if items and not deliver_payloads: + for item in items: + item.children = [] + + self.parent.dispatch({ 'items': items, 'node_id': node_id }, + '//event/pubsub/notify') + +class NotificationService(service.Service): + + implements(backend.INotificationService) + + def get_notification_list(self, node_id, items): + d = self.parent.storage.get_node(node_id) + d.addCallback(lambda node: node.get_subscribers()) + d.addCallback(self._magic_filter, node_id, items) + return d + + def _magic_filter(self, subscribers, node_id, items): + list = [] + for subscriber in subscribers: + list.append((subscriber, items)) + return list + + def register_notifier(self, observerfn, *args, **kwargs): + self.parent.addObserver('//event/pubsub/notify', observerfn, + *args, **kwargs) + +class SubscriptionService(service.Service): + + implements(backend.ISubscriptionService) + + def subscribe(self, node_id, subscriber, requestor): + subscriber_entity = subscriber.userhostJID() + if subscriber_entity != requestor: + return defer.fail(backend.NotAuthorized) + + d = self.parent.storage.get_node(node_id) + d.addCallback(_get_affiliation, subscriber_entity) + d.addCallback(self._do_subscribe, subscriber) + return d + + def _do_subscribe(self, result, subscriber): + node, affiliation = result + + if affiliation == 'outcast': + raise backend.NotAuthorized + + d = node.add_subscription(subscriber, 'subscribed') + d.addCallback(self._return_subscription, affiliation) + return d + + def _return_subscription(self, result, affiliation): + result['affiliation'] = affiliation + return result + + def unsubscribe(self, node_id, subscriber, requestor): + if subscriber.userhostJID() != requestor: + raise backend.NotAuthorized + + d = self.parent.storage.get_node(node_id) + d.addCallback(lambda node: node.remove_subscription(subscriber)) + return d + +class NodeCreationService(service.Service): + + implements(backend.INodeCreationService) + + def supports_instant_nodes(self): + return True + + def create_node(self, node_id, requestor): + if not node_id: + node_id = 'generic/%s' % sha.new(str(time.time()) + + requestor.full()).hexdigest() + + d = self.parent.storage.create_node(node_id, requestor) + d.addCallback(lambda _: node_id) + return d + + def get_node_configuration(self, node_id): + if node_id: + d = self.parent.storage.get_node(node_id) + d.addCallback(lambda node: node.get_configuration()) + else: + # XXX: this is disabled in pubsub.py + d = defer.succeed(self.parent.default_config) + + d.addCallback(self._make_config) + return d + + def _make_config(self, config): + options = [] + for key, value in self.parent.options.iteritems(): + option = {"var": key} + option.update(value) + if config.has_key(key): + option["value"] = config[key] + options.append(option) + + return options + + def set_node_configuration(self, node_id, options, requestor): + for key, value in options.iteritems(): + if not self.parent.options.has_key(key): + raise backend.InvalidConfigurationOption + if self.parent.options[key]["type"] == 'boolean': + try: + options[key] = bool(int(value)) + except ValueError: + raise backend.InvalidConfigurationValue + + d = self.parent.storage.get_node(node_id) + d.addCallback(_get_affiliation, requestor) + d.addCallback(self._do_set_node_configuration, options) + return d + + def _do_set_node_configuration(self, result, options): + node, affiliation = result + + if affiliation != 'owner': + raise backend.NotAuthorized + + return node.set_configuration(options) + +class AffiliationsService(service.Service): + + implements(backend.IAffiliationsService) + + def get_affiliations(self, entity): + d1 = self.parent.storage.get_affiliations(entity) + d2 = self.parent.storage.get_subscriptions(entity) + d = defer.DeferredList([d1, d2], fireOnOneErrback=1, consumeErrors=1) + d.addErrback(lambda x: x.value[0]) + d.addCallback(self._affiliations_result, entity) + return d + + def _affiliations_result(self, result, entity): + affiliations = result[0][1] + subscriptions = result[1][1] + + new_affiliations = {} + + for node, affiliation in affiliations: + new_affiliations[(node, entity.full())] = {'node': node, + 'jid': entity, + 'affiliation': affiliation, + 'subscription': None + } + + for node, subscriber, subscription in subscriptions: + key = node, subscriber.full() + if new_affiliations.has_key(key): + new_affiliations[key]['subscription'] = subscription + else: + new_affiliations[key] = {'node': node, + 'jid': subscriber, + 'affiliation': None, + 'subscription': subscription} + + return new_affiliations.values() + +class ItemRetrievalService(service.Service): + + implements(backend.IItemRetrievalService) + + def get_items(self, node_id, requestor, max_items=None, item_ids=[]): + d = self.parent.storage.get_node(node_id) + d.addCallback(self._is_subscribed, requestor) + d.addCallback(self._do_get_items, max_items, item_ids) + return d + + def _is_subscribed(self, node, subscriber): + d = node.is_subscribed(subscriber) + d.addCallback(lambda subscribed: (node, subscribed)) + return d + + def _do_get_items(self, result, max_items, item_ids): + node, subscribed = result + + if not subscribed: + raise backend.NotAuthorized + + if item_ids: + return node.get_items_by_id(item_ids) + else: + return node.get_items(max_items) + +class RetractionService(service.Service): + + implements(backend.IRetractionService) + + def retract_item(self, node_id, item_ids, requestor): + d = self.parent.storage.get_node(node_id) + d.addCallback(_get_affiliation, requestor) + d.addCallback(self._do_retract, item_ids) + return d + + def _do_retract(self, result, item_ids): + node, affiliation = result + persist_items = node.get_configuration()["pubsub#persist_items"] + + if affiliation not in ['owner', 'publisher']: + raise backend.NotAuthorized + + if not persist_items: + raise backend.NodeNotPersistent + + d = node.remove_items(item_ids) + d.addCallback(self._do_notify_retraction, node.id) + return d + + def _do_notify_retraction(self, result, node_id): + self.parent.dispatch({ 'item_ids': result, 'node_id': node_id }, + '//event/pubsub/retract') + + def purge_node(self, node_id, requestor): + d = self.parent.storage.get_node(node_id) + d.addCallback(_get_affiliation, requestor) + d.addCallback(self._do_purge) + return d + + def _do_purge(self, result): + node, affiliation = result + persist_items = node.get_configuration()["pubsub#persist_items"] + + if affiliation != 'owner': + raise backend.NotAuthorized + + if not persist_items: + raise backend.NodeNotPersistent + + d = node.purge() + d.addCallback(self._do_notify_purge, node.id) + return d + + def _do_notify_purge(self, result, node_id): + self.parent.dispatch(node_id, '//event/pubsub/purge') + +class NodeDeletionService(service.Service): + + implements(backend.INodeDeletionService) + + def __init__(self): + self._callback_list = [] + + def register_pre_delete(self, pre_delete_fn): + self._callback_list.append(pre_delete_fn) + + def get_subscribers(self, node_id): + d = self.parent.storage.get_node(node_id) + d.addCallback(lambda node: node.get_subscribers()) + return d + + def delete_node(self, node_id, requestor): + d = self.parent.storage.get_node(node_id) + d.addCallback(_get_affiliation, requestor) + d.addCallback(self._do_pre_delete) + return d + + def _do_pre_delete(self, result): + node, affiliation = result + + if affiliation != 'owner': + raise backend.NotAuthorized + + d = defer.DeferredList([cb(node_id) for cb in self._callback_list], + consumeErrors=1) + d.addCallback(self._do_delete, node.id) + + def _do_delete(self, result, node_id): + dl = [] + for succeeded, r in result: + if succeeded and r: + dl.extend(r) + + d = self.parent.storage.delete_node(node_id) + d.addCallback(self._do_notify_delete, dl) + + return d + + def _do_notify_delete(self, result, dl): + for d in dl: + d.callback(None)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/idavoll/memory_storage.py Fri Apr 08 10:15:02 2005 +0000 @@ -0,0 +1,194 @@ +import copy +from zope.interface import implements +from twisted.internet import defer +from twisted.words.protocols.jabber import jid +import storage + +default_config = {"pubsub#persist_items": False, + "pubsub#deliver_payloads": False} + +class Storage: + + implements(storage.IStorage) + + def __init__(self): + self._nodes = {} + + def get_node(self, node_id): + try: + node = self._nodes[node_id] + except KeyError: + return defer.fail(storage.NodeNotFound()) + + return defer.succeed(node) + + def get_node_ids(self): + return defer.succeed(self._nodes.keys()) + + def create_node(self, node_id, owner, config = None, type='leaf'): + if node_id in self._nodes: + return defer.fail(storage.NodeExists()) + + if not config: + config = copy.copy(default_config) + + if type != 'leaf': + raise NotImplementedError + + node = LeafNode(node_id, owner, config) + self._nodes[node_id] = node + + return defer.succeed(None) + + def delete_node(self, node_id): + try: + del self._nodes[node_id] + except KeyError: + return defer.fail(storage.NodeNotFound()) + + return defer.succeed(None) + + def get_affiliations(self, entity): + entity_full = entity.full() + return defer.succeed([(node.id, node._affiliations[entity_full]) + for name, node in self._nodes.iteritems() + if entity_full in node._affiliations]) + + def get_subscriptions(self, entity): + subscriptions = [] + for node in self._nodes.itervalues(): + for subscriber, subscription in node._subscriptions.iteritems(): + subscriber = jid.JID(subscriber) + if subscriber.userhostJID() == entity: + subscriptions.append((node.id, subscriber, + subscription.state)) + + return defer.succeed(subscriptions) + +class Node: + + implements(storage.INode) + + def __init__(self, node_id, owner, config): + self.id = node_id + self._affiliations = {owner.full(): 'owner'} + self._subscriptions = {} + self._config = config + + def get_type(self): + return self.type + + def get_configuration(self): + return self._config + + def get_meta_data(self): + config = copy.copy(self._config) + config["pubsub#node_type"] = self.type + return config + + def set_configuration(self, options): + for option in options: + if option in self._config: + self._config[option] = options[option] + + return defer.succeed(None) + + def get_affiliation(self, entity): + return defer.succeed(self._affiliations.get(entity.full())) + + def add_subscription(self, subscriber, state): + try: + subscription = self._subscriptions[subscriber.full()] + except: + subscription = Subscription(state) + self._subscriptions[subscriber.full()] = subscription + + return defer.succeed({'node': self.id, + 'jid': subscriber, + 'subscription': subscription.state}) + + def remove_subscription(self, subscriber): + del self._subscriptions[subscriber.full()] + + return defer.succeed(None) + + def get_subscribers(self): + subscribers = [jid.JID(subscriber) for subscriber, subscription + in self._subscriptions.iteritems() + if subscription.state == 'subscribed'] + + return defer.succeed(subscribers) + + def is_subscribed(self, subscriber): + try: + subscription = self._subscriptions[subscriber.full()] + except KeyError: + return defer.succeed(False) + + return defer.succeed(subscription.state == 'subscribed') + +class LeafNode(Node): + + implements(storage.ILeafNode) + type = 'leaf' + + def __init__(self, node_id, owner, config): + Node.__init__(self, node_id, owner, config) + self._items = {} + self._itemlist = [] + + def store_items(self, items, publisher): + for data in items: + id = data["id"] + item = (data.toXml(), publisher) + if id in self._items: + self._itemlist.remove(self._items[id]) + self._items[id] = item + self._itemlist.append(item) + + return defer.succeed(None) + + def remove_items(self, item_ids): + deleted = [] + + for item_id in item_ids: + try: + item = self._items[item_id] + self._itemlist.remove(item) + del self._items[item_id] + deleted.append(item_id) + except KeyError: + pass + + return defer.succeed(deleted) + + def get_items(self, max_items=None): + if max_items: + list = self._itemlist[-max_items:] + else: + list = self._itemlist + return defer.succeed([item[0] for item in list]) + + def get_items_by_id(self, item_ids): + items = [] + for item_id in item_ids: + try: + item = self._items[item_id] + except KeyError: + pass + else: + items.append(item[0]) + return defer.succeed(items) + + def purge(self): + self._items = {} + self._itemlist = [] + + return defer.succeed(None) + +class Subscription: + + implements(storage.ISubscription) + + def __init__(self, state): + self.state = state
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/idavoll/pgsql_storage.py Fri Apr 08 10:15:02 2005 +0000 @@ -0,0 +1,229 @@ +import copy +import storage +from twisted.enterprise import adbapi +from twisted.internet import defer +from twisted.words.protocols.jabber import jid +from zope.interface import implements + +class Storage: + + implements(storage.IStorage) + + def __init__(self, user, database): + self._dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', user=user, + database=database) + + def get_node(self, node_id): + return self._dbpool.runInteraction(self._get_node, node_id) + + def _get_node(self, cursor, node_id): + configuration = {} + cursor.execute("""SELECT persistent, deliver_payload FROM nodes + WHERE node=%s""", + (node_id,)) + try: + (configuration["pubsub#persist_items"], + configuration["pubsub#deliver_payloads"]) = cursor.fetchone() + except TypeError: + raise storage.NodeNotFound + else: + node = LeafNode(node_id, configuration) + node._dbpool = self._dbpool + return node + + def get_node_ids(self): + d = self._dbpool.runQuery("""SELECT node from nodes""") + d.addCallback(lambda results: [r[0] for r in results]) + return d + + def create_node(self, node_id, owner, type='leaf'): + return self._dbpool.runInteraction(self._create_node, node_id, owner) + + def _create_node(self, cursor, node_id, owner): + try: + cursor.execute("""INSERT INTO nodes (node) VALUES (%s)""", + (node_id.encode('utf8'))) + except: + raise storage.NodeExists + + cursor.execute("""SELECT 1 from entities where jid=%s""", + (owner.full().encode('utf8'))) + + if not cursor.fetchone(): + cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", + (owner.full().encode('utf8'))) + + cursor.execute("""INSERT INTO affiliations + (node_id, entity_id, affiliation) + SELECT n.id, e.id, 'owner' FROM + (SELECT id FROM nodes WHERE node=%s) AS n + CROSS JOIN + (SELECT id FROM entities WHERE jid=%s) AS e""", + (node_id.encode('utf8'), + owner.full().encode('utf8'))) + + def delete_node(self, node_id): + return self._dbpool.runInteraction(self._delete_node, node_id) + + def _delete_node(self, cursor, node_id): + cursor.execute("""DELETE FROM nodes WHERE node=%s""", + (node_id.encode('utf-8'),)) + + if cursor.rowcount != 1: + raise storage.NodeNotFound + + def get_affiliations(self, entity): + d = self._dbpool.runQuery("""SELECT node, affiliation FROM entities + JOIN affiliations ON + (affiliations.entity_id=entities.id) + JOIN nodes ON + (nodes.id=affiliations.node_id) + WHERE jid=%s""", + (entity.full().encode('utf8'),)) + d.addCallback(lambda results: [tuple(r) for r in results]) + return d + + def get_subscriptions(self, entity): + d = self._dbpool.runQuery("""SELECT node, jid, resource, subscription + FROM entities JOIN subscriptions ON + (subscriptions.entity_id=entities.id) + JOIN nodes ON + (nodes.id=subscriptions.node_id) + WHERE jid=%s""", + (entity.userhost().encode('utf8'),)) + d.addCallback(self._convert_subscription_jids) + return d + + def _convert_subscription_jids(self, subscriptions): + return [(node, jid.JID('%s/%s' % (subscriber, resource)), subscription) + for node, subscriber, resource, subscription in subscriptions] + +class Node: + + implements(storage.INode) + + def __init__(self, node_id, config): + self.id = node_id + self._config = config + + def get_type(self): + return self.type + + def get_configuration(self): + return self._config + + def set_configuration(self, options): + return self._dbpool.runInteraction(self._set_node_configuration, + options) + + def _set_configuration(self, cursor, options): + for option in options: + if option in self._config: + self._config[option] = options[option] + + cursor.execute("""UPDATE nodes SET persistent=%s, deliver_payload=%s + WHERE node=%s""", + (self._config["pubsub#persist_items"].encode('utf8'), + self._config["pubsub#deliver_payloads"].encode('utf8'), + self.id.encode('utf-8'))) + + def get_meta_data(self): + config = copy.copy(self._config) + config["pubsub#node_type"] = self.type + return config + + def get_affiliation(self, entity): + return self._dbpool.runInteraction(self._get_affiliation, entity) + + def _get_affiliation(self, cursor, entity): + cursor.execute("""SELECT affiliation FROM affiliations + JOIN nodes ON (node_id=nodes.id) + JOIN entities ON (entity_id=entities.id) + WHERE node=%s AND jid=%s""", + (self.id.encode('utf8'), + entity.full().encode('utf8'))) + + try: + return cursor.fetchone()[0] + except TypeError: + return None + + def add_subscription(self, subscriber, state): + return self._dbpool.runInteraction(self._add_subscription, subscriber, + state) + + def _add_subscription(self, cursor, subscriber, state): + userhost = subscriber.userhost() + resource = subscriber.resource or '' + + try: + cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", + (userhost.encode('utf8'))) + except: + pass + + try: + cursor.execute("""INSERT INTO subscriptions + (node_id, entity_id, resource, subscription) + SELECT n.id, e.id, %s, %s FROM + (SELECT id FROM nodes WHERE node=%s) AS n + CROSS JOIN + (SELECT id FROM entities WHERE jid=%s) AS e""", + (resource.encode('utf8'), + state.encode('utf8'), + self.id.encode('utf8'), + userhost.encode('utf8'))) + except: + cursor.execute("""SELECT subscription FROM subscriptions + JOIN nodes ON (nodes.id=subscriptions.node_id) + JOIN entities ON + (entities.id=subscriptions.entity_id) + WHERE node=%s AND jid=%s AND resource=%s""", + (self.id.encode('utf8'), + userhost.encode('utf8'), + resource.encode('utf8'))) + state = cursor.fetchone()[0] + + return {'node': self.id, + 'jid': subscriber, + 'subscription': state} + + def remove_subscription(self, subscriber, state): + pass + + def get_subscribers(self): + d = self._dbpool.runQuery("""SELECT jid, resource FROM subscriptions + JOIN nodes ON (node_id=nodes.id) + JOIN entities ON (entity_id=entities.id) + WHERE node=%s AND + subscription='subscribed'""", + (self.id.encode('utf8'),)) + d.addCallback(self._convert_to_jids) + return d + + def _convert_to_jids(self, list): + return [jid.JID("%s/%s" % (l[0], l[1])) for l in list] + + def is_subscribed(self, subscriber): + pass + +class LeafNode(Node): + + implements(storage.ILeafNode) + + type = 'leaf' + + def store_items(self, items, publisher): + return defer.succeed(None) + + def remove_items(self, item_ids): + pass + + def get_items(self, max_items=None): + pass + + def get_items_by_id(self, item_ids): + pass + + def purge(self): + pass
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/idavoll/storage.py Fri Apr 08 10:15:02 2005 +0000 @@ -0,0 +1,85 @@ +from zope.interface import Interface + +class Error(Exception): + msg = None + +class NodeNotFound(Error): + pass + + +class NodeExists(Error): + pass + + +class IStorage(Interface): + """ """ + + def get_node(self, node_id): + """ """ + + def get_node_ids(self): + """ """ + + def create_node(self, node_id, owner, config = None, type='leaf'): + """ """ + + def delete_node(self, node_id): + """ """ + + def get_affiliations(self, entity): + """ """ + + def get_subscriptions(self, entity): + """ """ + + +class INode(Interface): + """ """ + def get_type(self): + """ """ + + def get_configuration(self): + """ """ + + def get_meta_data(self): + """ """ + + def set_configuration(self, options): + """ """ + + def get_affiliation(self, entity): + """ """ + + def add_subscription(self, subscriber, state): + """ """ + + def remove_subscription(self, subscriber): + """ """ + + def get_subscribers(self): + """ """ + + def is_subscribed(self, subscriber): + """ """ + + +class ILeafNode(Interface): + """ """ + def store_items(self, items, publisher): + """ """ + + def remove_items(self, item_ids): + """ """ + + def get_items(self, max_items=None): + """ """ + + def get_items_by_id(self, item_ids): + """ """ + + def purge(self): + """ """ + + +class ISubscription(Interface): + """ """