Mercurial > libervia-pubsub
diff src/memory_storage.py @ 369:dabee42494ac
config file + cleaning:
- SàT Pubsub can now be configured using the same config file as SàT itself (i.e. sat.conf or .sat.conf), in the same locations (/etc, local dir, xdg dir).
Its options must be in the "pubsub" section
- options on command line override config options
- removed tap and http files which are not used anymore
- changed directory structure to put source in src, to be coherent with SàT and Libervia
- changed options name, db* become db_*, secret become xmpp_pwd
- an exception is raised if jid or xmpp_pwd is are not configured
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Mar 2018 12:59:38 +0100 |
parents | sat_pubsub/memory_storage.py@618a92080812 |
children | aa3a464df605 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/memory_storage.py Fri Mar 02 12:59:38 2018 +0100 @@ -0,0 +1,380 @@ +#!/usr/bin/python +#-*- coding: utf-8 -*- + +# Copyright (c) 2003-2011 Ralph Meijer +# Copyright (c) 2012-2018 Jérôme Poisson + + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# -- + +# This program is based on Idavoll (http://idavoll.ik.nu/), +# originaly written by Ralph Meijer (http://ralphm.net/blog/) +# It is sublicensed under AGPL v3 (or any later version) as allowed by the original +# license. + +# -- + +# Here is a copy of the original license: + +# Copyright (c) 2003-2011 Ralph Meijer + +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: + +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + + +import copy +from zope.interface import implements +from twisted.internet import defer +from twisted.words.protocols.jabber import jid + +from wokkel.pubsub import Subscription + +from sat_pubsub import error, iidavoll + +class Storage: + + implements(iidavoll.IStorage) + + defaultConfig = { + 'leaf': { + "pubsub#persist_items": True, + "pubsub#deliver_payloads": True, + "pubsub#send_last_published_item": 'on_sub', + }, + 'collection': { + "pubsub#deliver_payloads": True, + "pubsub#send_last_published_item": 'on_sub', + } + } + + def __init__(self): + rootNode = CollectionNode('', jid.JID('localhost'), + copy.copy(self.defaultConfig['collection'])) + self._nodes = {'': rootNode} + + + def getNode(self, nodeIdentifier): + try: + node = self._nodes[nodeIdentifier] + except KeyError: + return defer.fail(error.NodeNotFound()) + + return defer.succeed(node) + + + def getNodeIds(self): + return defer.succeed(self._nodes.keys()) + + + def createNode(self, nodeIdentifier, owner, config): + if nodeIdentifier in self._nodes: + return defer.fail(error.NodeExists()) + + if config['pubsub#node_type'] != 'leaf': + raise error.NoCollections() + + node = LeafNode(nodeIdentifier, owner, config) + self._nodes[nodeIdentifier] = node + + return defer.succeed(None) + + + def deleteNode(self, nodeIdentifier): + try: + del self._nodes[nodeIdentifier] + except KeyError: + return defer.fail(error.NodeNotFound()) + + return defer.succeed(None) + + + def getAffiliations(self, entity): + entity = entity.userhost() + return defer.succeed([(node.nodeIdentifier, node._affiliations[entity]) + for name, node in self._nodes.iteritems() + if entity in node._affiliations]) + + + def getSubscriptions(self, entity): + subscriptions = [] + for node in self._nodes.itervalues(): + for subscriber, subscription in node._subscriptions.iteritems(): + subscriber = jid.internJID(subscriber) + if subscriber.userhostJID() == entity.userhostJID(): + subscriptions.append(subscription) + + return defer.succeed(subscriptions) + + + def getDefaultConfiguration(self, nodeType): + if nodeType == 'collection': + raise error.NoCollections() + + return self.defaultConfig[nodeType] + + +class Node: + + implements(iidavoll.INode) + + def __init__(self, nodeIdentifier, owner, config): + self.nodeIdentifier = nodeIdentifier + self._affiliations = {owner.userhost(): 'owner'} + self._subscriptions = {} + self._config = copy.copy(config) + + + def getType(self): + return self.nodeType + + + def getConfiguration(self): + return self._config + + + def getMetaData(self): + config = copy.copy(self._config) + config["pubsub#node_type"] = self.nodeType + return config + + + def setConfiguration(self, options): + for option in options: + if option in self._config: + self._config[option] = options[option] + + return defer.succeed(None) + + + def getAffiliation(self, entity): + return defer.succeed(self._affiliations.get(entity.userhost())) + + + def getSubscription(self, subscriber): + try: + subscription = self._subscriptions[subscriber.full()] + except KeyError: + return defer.succeed(None) + else: + return defer.succeed(subscription) + + + def getSubscriptions(self, state=None): + return defer.succeed( + [subscription + for subscription in self._subscriptions.itervalues() + if state is None or subscription.state == state]) + + + + def addSubscription(self, subscriber, state, options): + if self._subscriptions.get(subscriber.full()): + return defer.fail(error.SubscriptionExists()) + + subscription = Subscription(self.nodeIdentifier, subscriber, state, + options) + self._subscriptions[subscriber.full()] = subscription + return defer.succeed(None) + + + def removeSubscription(self, subscriber): + try: + del self._subscriptions[subscriber.full()] + except KeyError: + return defer.fail(error.NotSubscribed()) + + return defer.succeed(None) + + + def isSubscribed(self, entity): + for subscriber, subscription in self._subscriptions.iteritems(): + if jid.internJID(subscriber).userhost() == entity.userhost() and \ + subscription.state == 'subscribed': + return defer.succeed(True) + + return defer.succeed(False) + + + def getAffiliations(self): + affiliations = [(jid.internJID(entity), affiliation) for entity, affiliation + in self._affiliations.iteritems()] + + return defer.succeed(affiliations) + + + +class PublishedItem(object): + """ + A published item. + + This represent an item as it was published by an entity. + + @ivar element: The DOM representation of the item that was published. + @type element: L{Element<twisted.words.xish.domish.Element>} + @ivar publisher: The entity that published the item. + @type publisher: L{JID<twisted.words.protocols.jabber.jid.JID>} + """ + + def __init__(self, element, publisher): + self.element = element + self.publisher = publisher + + + +class LeafNode(Node): + + implements(iidavoll.ILeafNode) + + nodeType = 'leaf' + + def __init__(self, nodeIdentifier, owner, config): + Node.__init__(self, nodeIdentifier, owner, config) + self._items = {} + self._itemlist = [] + + + def storeItems(self, item_data, publisher): + for access_model, item_config, element in item_data: + item = PublishedItem(element, publisher) + itemIdentifier = element["id"] + if itemIdentifier in self._items: + self._itemlist.remove(self._items[itemIdentifier]) + self._items[itemIdentifier] = item + self._itemlist.append(item) + + return defer.succeed(None) + + + def removeItems(self, itemIdentifiers): + deleted = [] + + for itemIdentifier in itemIdentifiers: + try: + item = self._items[itemIdentifier] + except KeyError: + pass + else: + self._itemlist.remove(item) + del self._items[itemIdentifier] + deleted.append(itemIdentifier) + + return defer.succeed(deleted) + + + def getItems(self, authorized_groups, unrestricted, maxItems=None): + if maxItems is not None: + itemList = self._itemlist[-maxItems:] + else: + itemList = self._itemlist + return defer.succeed([item.element for item in itemList]) + + + def getItemsById(self, authorized_groups, unrestricted, itemIdentifiers): + items = [] + for itemIdentifier in itemIdentifiers: + try: + item = self._items[itemIdentifier] + except KeyError: + pass + else: + items.append(item.element) + return defer.succeed(items) + + + def purge(self): + self._items = {} + self._itemlist = [] + + return defer.succeed(None) + + + def filterItemsWithPublisher(self, itemIdentifiers, requestor): + filteredItems = [] + for itemIdentifier in itemIdentifiers: + try: + if self._items[itemIdentifier].publisher.userhost() == requestor.userhost(): + filteredItems.append(self.items[itemIdentifier]) + except KeyError, AttributeError: + pass + return defer.succeed(filteredItems) + + +class CollectionNode(Node): + nodeType = 'collection' + + + +class GatewayStorage(object): + """ + Memory based storage facility for the XMPP-HTTP gateway. + """ + + def __init__(self): + self.callbacks = {} + + + def addCallback(self, service, nodeIdentifier, callback): + try: + callbacks = self.callbacks[service, nodeIdentifier] + except KeyError: + callbacks = {callback} + self.callbacks[service, nodeIdentifier] = callbacks + else: + callbacks.add(callback) + pass + + return defer.succeed(None) + + + def removeCallback(self, service, nodeIdentifier, callback): + try: + callbacks = self.callbacks[service, nodeIdentifier] + callbacks.remove(callback) + except KeyError: + return defer.fail(error.NotSubscribed()) + else: + if not callbacks: + del self.callbacks[service, nodeIdentifier] + + return defer.succeed(not callbacks) + + + def getCallbacks(self, service, nodeIdentifier): + try: + callbacks = self.callbacks[service, nodeIdentifier] + except KeyError: + return defer.fail(error.NoCallbacks()) + else: + return defer.succeed(callbacks) + + + def hasCallbacks(self, service, nodeIdentifier): + return defer.succeed((service, nodeIdentifier) in self.callbacks)