Mercurial > libervia-pubsub
diff sat_pubsub/memory_storage.py @ 232:923281d4c5bc
renamed idavoll directory to sat_pubsub
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 17 May 2012 12:48:14 +0200 |
parents | idavoll/memory_storage.py@b7018ec56ee5 |
children | 564ae55219e1 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat_pubsub/memory_storage.py Thu May 17 12:48:14 2012 +0200 @@ -0,0 +1,320 @@ +# Copyright (c) 2003-2010 Ralph Meijer +# See LICENSE for details. + +import copy +from zope.interface import implements +from twisted.internet import defer +from twisted.words.protocols.jabber import jid + +from wokkel.pubsub import Subscription + +from idavoll import error, iidavoll + +class Storage: + + implements(iidavoll.IStorage) + + defaultConfig = { + 'leaf': { + "pubsub#persist_items": True, + "pubsub#deliver_payloads": True, + "pubsub#send_last_published_item": 'on_sub', + }, + 'collection': { + "pubsub#deliver_payloads": True, + "pubsub#send_last_published_item": 'on_sub', + } + } + + def __init__(self): + rootNode = CollectionNode('', jid.JID('localhost'), + copy.copy(self.defaultConfig['collection'])) + self._nodes = {'': rootNode} + + + def getNode(self, nodeIdentifier): + try: + node = self._nodes[nodeIdentifier] + except KeyError: + return defer.fail(error.NodeNotFound()) + + return defer.succeed(node) + + + def getNodeIds(self): + return defer.succeed(self._nodes.keys()) + + + def createNode(self, nodeIdentifier, owner, config): + if nodeIdentifier in self._nodes: + return defer.fail(error.NodeExists()) + + if config['pubsub#node_type'] != 'leaf': + raise error.NoCollections() + + node = LeafNode(nodeIdentifier, owner, config) + self._nodes[nodeIdentifier] = node + + return defer.succeed(None) + + + def deleteNode(self, nodeIdentifier): + try: + del self._nodes[nodeIdentifier] + except KeyError: + return defer.fail(error.NodeNotFound()) + + return defer.succeed(None) + + + def getAffiliations(self, entity): + entity = entity.userhost() + return defer.succeed([(node.nodeIdentifier, node._affiliations[entity]) + for name, node in self._nodes.iteritems() + if entity in node._affiliations]) + + + def getSubscriptions(self, entity): + subscriptions = [] + for node in self._nodes.itervalues(): + for subscriber, subscription in node._subscriptions.iteritems(): + subscriber = jid.internJID(subscriber) + if subscriber.userhostJID() == entity.userhostJID(): + subscriptions.append(subscription) + + return defer.succeed(subscriptions) + + + def getDefaultConfiguration(self, nodeType): + if nodeType == 'collection': + raise error.NoCollections() + + return self.defaultConfig[nodeType] + + +class Node: + + implements(iidavoll.INode) + + def __init__(self, nodeIdentifier, owner, config): + self.nodeIdentifier = nodeIdentifier + self._affiliations = {owner.userhost(): 'owner'} + self._subscriptions = {} + self._config = copy.copy(config) + + + def getType(self): + return self.nodeType + + + def getConfiguration(self): + return self._config + + + def getMetaData(self): + config = copy.copy(self._config) + config["pubsub#node_type"] = self.nodeType + return config + + + def setConfiguration(self, options): + for option in options: + if option in self._config: + self._config[option] = options[option] + + return defer.succeed(None) + + + def getAffiliation(self, entity): + return defer.succeed(self._affiliations.get(entity.userhost())) + + + def getSubscription(self, subscriber): + try: + subscription = self._subscriptions[subscriber.full()] + except KeyError: + return defer.succeed(None) + else: + return defer.succeed(subscription) + + + def getSubscriptions(self, state=None): + return defer.succeed( + [subscription + for subscription in self._subscriptions.itervalues() + if state is None or subscription.state == state]) + + + + def addSubscription(self, subscriber, state, options): + if self._subscriptions.get(subscriber.full()): + return defer.fail(error.SubscriptionExists()) + + subscription = Subscription(self.nodeIdentifier, subscriber, state, + options) + self._subscriptions[subscriber.full()] = subscription + return defer.succeed(None) + + + def removeSubscription(self, subscriber): + try: + del self._subscriptions[subscriber.full()] + except KeyError: + return defer.fail(error.NotSubscribed()) + + return defer.succeed(None) + + + def isSubscribed(self, entity): + for subscriber, subscription in self._subscriptions.iteritems(): + if jid.internJID(subscriber).userhost() == entity.userhost() and \ + subscription.state == 'subscribed': + return defer.succeed(True) + + return defer.succeed(False) + + + def getAffiliations(self): + affiliations = [(jid.internJID(entity), affiliation) for entity, affiliation + in self._affiliations.iteritems()] + + return defer.succeed(affiliations) + + + +class PublishedItem(object): + """ + A published item. + + This represent an item as it was published by an entity. + + @ivar element: The DOM representation of the item that was published. + @type element: L{Element<twisted.words.xish.domish.Element>} + @ivar publisher: The entity that published the item. + @type publisher: L{JID<twisted.words.protocols.jabber.jid.JID>} + """ + + def __init__(self, element, publisher): + self.element = element + self.publisher = publisher + + + +class LeafNode(Node): + + implements(iidavoll.ILeafNode) + + nodeType = 'leaf' + + def __init__(self, nodeIdentifier, owner, config): + Node.__init__(self, nodeIdentifier, owner, config) + self._items = {} + self._itemlist = [] + + + def storeItems(self, items, publisher): + for element in items: + item = PublishedItem(element, publisher) + itemIdentifier = element["id"] + if itemIdentifier in self._items: + self._itemlist.remove(self._items[itemIdentifier]) + self._items[itemIdentifier] = item + self._itemlist.append(item) + + return defer.succeed(None) + + + def removeItems(self, itemIdentifiers): + deleted = [] + + for itemIdentifier in itemIdentifiers: + try: + item = self._items[itemIdentifier] + except KeyError: + pass + else: + self._itemlist.remove(item) + del self._items[itemIdentifier] + deleted.append(itemIdentifier) + + return defer.succeed(deleted) + + + def getItems(self, maxItems=None): + if maxItems: + itemList = self._itemlist[-maxItems:] + else: + itemList = self._itemlist + return defer.succeed([item.element for item in itemList]) + + + def getItemsById(self, itemIdentifiers): + items = [] + for itemIdentifier in itemIdentifiers: + try: + item = self._items[itemIdentifier] + except KeyError: + pass + else: + items.append(item.element) + return defer.succeed(items) + + + def purge(self): + self._items = {} + self._itemlist = [] + + return defer.succeed(None) + + +class CollectionNode(Node): + nodeType = 'collection' + + + +class GatewayStorage(object): + """ + Memory based storage facility for the XMPP-HTTP gateway. + """ + + def __init__(self): + self.callbacks = {} + + + def addCallback(self, service, nodeIdentifier, callback): + try: + callbacks = self.callbacks[service, nodeIdentifier] + except KeyError: + callbacks = set([callback]) + self.callbacks[service, nodeIdentifier] = callbacks + else: + callbacks.add(callback) + pass + + return defer.succeed(None) + + + def removeCallback(self, service, nodeIdentifier, callback): + try: + callbacks = self.callbacks[service, nodeIdentifier] + callbacks.remove(callback) + except KeyError: + return defer.fail(error.NotSubscribed()) + else: + if not callbacks: + del self.callbacks[service, nodeIdentifier] + + return defer.succeed(not callbacks) + + + def getCallbacks(self, service, nodeIdentifier): + try: + callbacks = self.callbacks[service, nodeIdentifier] + except KeyError: + return defer.fail(error.NoCallbacks()) + else: + return defer.succeed(callbacks) + + + def hasCallbacks(self, service, nodeIdentifier): + return defer.succeed((service, nodeIdentifier) in self.callbacks)