# HG changeset patch # User Goffi # Date 1337251694 -7200 # Node ID 923281d4c5bc01301d39385a03afa4cab0305bdd # Parent d99047cd90f921bcf32a51724de97f1071833c05 renamed idavoll directory to sat_pubsub diff -r d99047cd90f9 -r 923281d4c5bc idavoll/__init__.py --- a/idavoll/__init__.py Thu May 10 23:08:08 2012 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,8 +0,0 @@ -# Copyright (c) 2003-2007 Ralph Meijer -# See LICENSE for details. - -""" -Idavoll, a generic XMPP publish-subscribe service. -""" - -__version__ = '0.9.1' diff -r d99047cd90f9 -r 923281d4c5bc idavoll/backend.py --- a/idavoll/backend.py Thu May 10 23:08:08 2012 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,730 +0,0 @@ -# -*- test-case-name: idavoll.test.test_backend -*- -# -# Copyright (c) 2003-2010 Ralph Meijer -# See LICENSE for details. - -""" -Generic publish-subscribe backend. - -This module implements a generic publish-subscribe backend service with -business logic as per -U{XEP-0060} that interacts with -a given storage facility. It also provides an adapter from the XMPP -publish-subscribe protocol. -""" - -import uuid - -from zope.interface import implements - -from twisted.application import service -from twisted.python import components, log -from twisted.internet import defer, reactor -from twisted.words.protocols.jabber.error import StanzaError -from twisted.words.xish import utility - -from wokkel import disco -from wokkel.iwokkel import IPubSubResource -from wokkel.pubsub import PubSubResource, PubSubError - -from idavoll import error, iidavoll -from idavoll.iidavoll import IBackendService, ILeafNode - -def _getAffiliation(node, entity): - d = node.getAffiliation(entity) - d.addCallback(lambda affiliation: (node, affiliation)) - return d - - - -class BackendService(service.Service, utility.EventDispatcher): - """ - Generic publish-subscribe backend service. - - @cvar nodeOptions: Node configuration form as a mapping from the field - name to a dictionary that holds the field's type, label - and possible options to choose from. - @type nodeOptions: C{dict}. - @cvar defaultConfig: The default node configuration. - """ - - implements(iidavoll.IBackendService) - - nodeOptions = { - "pubsub#persist_items": - {"type": "boolean", - "label": "Persist items to storage"}, - "pubsub#deliver_payloads": - {"type": "boolean", - "label": "Deliver payloads with event notifications"}, - "pubsub#send_last_published_item": - {"type": "list-single", - "label": "When to send the last published item", - "options": { - "never": "Never", - "on_sub": "When a new subscription is processed"} - }, - } - - subscriptionOptions = { - "pubsub#subscription_type": - {"type": "list-single", - "options": { - "items": "Receive notification of new items only", - "nodes": "Receive notification of new nodes only"} - }, - "pubsub#subscription_depth": - {"type": "list-single", - "options": { - "1": "Receive notification from direct child nodes only", - "all": "Receive notification from all descendent nodes"} - }, - } - - def __init__(self, storage): - utility.EventDispatcher.__init__(self) - self.storage = storage - self._callbackList = [] - - - def supportsPublisherAffiliation(self): - return True - - - def supportsOutcastAffiliation(self): - return True - - - def supportsPersistentItems(self): - return True - - - def getNodeType(self, nodeIdentifier): - d = self.storage.getNode(nodeIdentifier) - d.addCallback(lambda node: node.getType()) - return d - - - def getNodes(self): - return self.storage.getNodeIds() - - - def getNodeMetaData(self, nodeIdentifier): - d = self.storage.getNode(nodeIdentifier) - d.addCallback(lambda node: node.getMetaData()) - d.addCallback(self._makeMetaData) - return d - - - def _makeMetaData(self, metaData): - options = [] - for key, value in metaData.iteritems(): - if key in self.nodeOptions: - option = {"var": key} - option.update(self.nodeOptions[key]) - option["value"] = value - options.append(option) - - return options - - - def _checkAuth(self, node, requestor): - def check(affiliation, node): - if affiliation not in ['owner', 'publisher']: - raise error.Forbidden() - return node - - d = node.getAffiliation(requestor) - d.addCallback(check, node) - return d - - - def publish(self, nodeIdentifier, items, requestor): - d = self.storage.getNode(nodeIdentifier) - d.addCallback(self._checkAuth, requestor) - d.addCallback(self._doPublish, items, requestor) - return d - - - def _doPublish(self, node, items, requestor): - if node.nodeType == 'collection': - raise error.NoPublishing() - - configuration = node.getConfiguration() - persistItems = configuration["pubsub#persist_items"] - deliverPayloads = configuration["pubsub#deliver_payloads"] - - if items and not persistItems and not deliverPayloads: - raise error.ItemForbidden() - elif not items and (persistItems or deliverPayloads): - raise error.ItemRequired() - - if persistItems or deliverPayloads: - for item in items: - item.uri = None - item.defaultUri = None - if not item.getAttribute("id"): - item["id"] = str(uuid.uuid4()) - - if persistItems: - d = node.storeItems(items, requestor) - else: - d = defer.succeed(None) - - d.addCallback(self._doNotify, node.nodeIdentifier, items, - deliverPayloads) - return d - - - def _doNotify(self, result, nodeIdentifier, items, deliverPayloads): - if items and not deliverPayloads: - for item in items: - item.children = [] - - self.dispatch({'items': items, 'nodeIdentifier': nodeIdentifier}, - '//event/pubsub/notify') - - - def getNotifications(self, nodeIdentifier, items): - - def toNotifications(subscriptions, nodeIdentifier, items): - subsBySubscriber = {} - for subscription in subscriptions: - if subscription.options.get('pubsub#subscription_type', - 'items') == 'items': - subs = subsBySubscriber.setdefault(subscription.subscriber, - set()) - subs.add(subscription) - - notifications = [(subscriber, subscriptions, items) - for subscriber, subscriptions - in subsBySubscriber.iteritems()] - - return notifications - - def rootNotFound(failure): - failure.trap(error.NodeNotFound) - return [] - - d1 = self.storage.getNode(nodeIdentifier) - d1.addCallback(lambda node: node.getSubscriptions('subscribed')) - d2 = self.storage.getNode('') - d2.addCallback(lambda node: node.getSubscriptions('subscribed')) - d2.addErrback(rootNotFound) - d = defer.gatherResults([d1, d2]) - d.addCallback(lambda result: result[0] + result[1]) - d.addCallback(toNotifications, nodeIdentifier, items) - return d - - - def registerNotifier(self, observerfn, *args, **kwargs): - self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs) - - - def subscribe(self, nodeIdentifier, subscriber, requestor): - subscriberEntity = subscriber.userhostJID() - if subscriberEntity != requestor.userhostJID(): - return defer.fail(error.Forbidden()) - - d = self.storage.getNode(nodeIdentifier) - d.addCallback(_getAffiliation, subscriberEntity) - d.addCallback(self._doSubscribe, subscriber) - return d - - - def _doSubscribe(self, result, subscriber): - node, affiliation = result - - if affiliation == 'outcast': - raise error.Forbidden() - - def trapExists(failure): - failure.trap(error.SubscriptionExists) - return False - - def cb(sendLast): - d = node.getSubscription(subscriber) - if sendLast: - d.addCallback(self._sendLastPublished, node) - return d - - d = node.addSubscription(subscriber, 'subscribed', {}) - d.addCallbacks(lambda _: True, trapExists) - d.addCallback(cb) - return d - - - def _sendLastPublished(self, subscription, node): - - def notifyItem(items): - if items: - reactor.callLater(0, self.dispatch, - {'items': items, - 'nodeIdentifier': node.nodeIdentifier, - 'subscription': subscription}, - '//event/pubsub/notify') - - config = node.getConfiguration() - sendLastPublished = config.get('pubsub#send_last_published_item', - 'never') - if sendLastPublished == 'on_sub' and node.nodeType == 'leaf': - entity = subscription.subscriber.userhostJID() - d = self.getItems(node.nodeIdentifier, entity, 1) - d.addCallback(notifyItem) - d.addErrback(log.err) - - return subscription - - - def unsubscribe(self, nodeIdentifier, subscriber, requestor): - if subscriber.userhostJID() != requestor.userhostJID(): - return defer.fail(error.Forbidden()) - - d = self.storage.getNode(nodeIdentifier) - d.addCallback(lambda node: node.removeSubscription(subscriber)) - return d - - - def getSubscriptions(self, entity): - return self.storage.getSubscriptions(entity) - - def supportsAutoCreate(self): - return True - - def supportsInstantNodes(self): - return True - - - def createNode(self, nodeIdentifier, requestor): - if not nodeIdentifier: - nodeIdentifier = 'generic/%s' % uuid.uuid4() - - nodeType = 'leaf' - config = self.storage.getDefaultConfiguration(nodeType) - config['pubsub#node_type'] = nodeType - - d = self.storage.createNode(nodeIdentifier, requestor, config) - d.addCallback(lambda _: nodeIdentifier) - return d - - - def getDefaultConfiguration(self, nodeType): - d = defer.succeed(self.storage.getDefaultConfiguration(nodeType)) - return d - - - def getNodeConfiguration(self, nodeIdentifier): - if not nodeIdentifier: - return defer.fail(error.NoRootNode()) - - d = self.storage.getNode(nodeIdentifier) - d.addCallback(lambda node: node.getConfiguration()) - - return d - - - def setNodeConfiguration(self, nodeIdentifier, options, requestor): - if not nodeIdentifier: - return defer.fail(error.NoRootNode()) - - d = self.storage.getNode(nodeIdentifier) - d.addCallback(_getAffiliation, requestor) - d.addCallback(self._doSetNodeConfiguration, options) - return d - - - def _doSetNodeConfiguration(self, result, options): - node, affiliation = result - - if affiliation != 'owner': - raise error.Forbidden() - - return node.setConfiguration(options) - - - def getAffiliations(self, entity): - return self.storage.getAffiliations(entity) - - - def getItems(self, nodeIdentifier, requestor, maxItems=None, - itemIdentifiers=None): - d = self.storage.getNode(nodeIdentifier) - d.addCallback(_getAffiliation, requestor) - d.addCallback(self._doGetItems, maxItems, itemIdentifiers) - return d - - - def _doGetItems(self, result, maxItems, itemIdentifiers): - node, affiliation = result - - if not ILeafNode.providedBy(node): - return [] - - if affiliation == 'outcast': - raise error.Forbidden() - - if itemIdentifiers: - return node.getItemsById(itemIdentifiers) - else: - return node.getItems(maxItems) - - - def retractItem(self, nodeIdentifier, itemIdentifiers, requestor): - d = self.storage.getNode(nodeIdentifier) - d.addCallback(_getAffiliation, requestor) - d.addCallback(self._doRetract, itemIdentifiers) - return d - - - def _doRetract(self, result, itemIdentifiers): - node, affiliation = result - persistItems = node.getConfiguration()["pubsub#persist_items"] - - if affiliation not in ['owner', 'publisher']: - raise error.Forbidden() - - if not persistItems: - raise error.NodeNotPersistent() - - d = node.removeItems(itemIdentifiers) - d.addCallback(self._doNotifyRetraction, node.nodeIdentifier) - return d - - - def _doNotifyRetraction(self, itemIdentifiers, nodeIdentifier): - self.dispatch({'itemIdentifiers': itemIdentifiers, - 'nodeIdentifier': nodeIdentifier }, - '//event/pubsub/retract') - - - def purgeNode(self, nodeIdentifier, requestor): - d = self.storage.getNode(nodeIdentifier) - d.addCallback(_getAffiliation, requestor) - d.addCallback(self._doPurge) - return d - - - def _doPurge(self, result): - node, affiliation = result - persistItems = node.getConfiguration()["pubsub#persist_items"] - - if affiliation != 'owner': - raise error.Forbidden() - - if not persistItems: - raise error.NodeNotPersistent() - - d = node.purge() - d.addCallback(self._doNotifyPurge, node.nodeIdentifier) - return d - - - def _doNotifyPurge(self, result, nodeIdentifier): - self.dispatch(nodeIdentifier, '//event/pubsub/purge') - - - def registerPreDelete(self, preDeleteFn): - self._callbackList.append(preDeleteFn) - - - def getSubscribers(self, nodeIdentifier): - def cb(subscriptions): - return [subscription.subscriber for subscription in subscriptions] - - d = self.storage.getNode(nodeIdentifier) - d.addCallback(lambda node: node.getSubscriptions('subscribed')) - d.addCallback(cb) - return d - - - def deleteNode(self, nodeIdentifier, requestor, redirectURI=None): - d = self.storage.getNode(nodeIdentifier) - d.addCallback(_getAffiliation, requestor) - d.addCallback(self._doPreDelete, redirectURI) - return d - - - def _doPreDelete(self, result, redirectURI): - node, affiliation = result - - if affiliation != 'owner': - raise error.Forbidden() - - data = {'nodeIdentifier': node.nodeIdentifier, - 'redirectURI': redirectURI} - - d = defer.DeferredList([cb(data) - for cb in self._callbackList], - consumeErrors=1) - d.addCallback(self._doDelete, node.nodeIdentifier) - - - def _doDelete(self, result, nodeIdentifier): - dl = [] - for succeeded, r in result: - if succeeded and r: - dl.extend(r) - - d = self.storage.deleteNode(nodeIdentifier) - d.addCallback(self._doNotifyDelete, dl) - - return d - - - def _doNotifyDelete(self, result, dl): - for d in dl: - d.callback(None) - - - -class PubSubResourceFromBackend(PubSubResource): - """ - Adapts a backend to an xmpp publish-subscribe service. - """ - - features = [ - "config-node", - "create-nodes", - "delete-any", - "delete-nodes", - "item-ids", - "meta-data", - "publish", - "purge-nodes", - "retract-items", - "retrieve-affiliations", - "retrieve-default", - "retrieve-items", - "retrieve-subscriptions", - "subscribe", - ] - - discoIdentity = disco.DiscoIdentity('pubsub', - 'service', - 'Idavoll publish-subscribe service') - - pubsubService = None - - _errorMap = { - error.NodeNotFound: ('item-not-found', None, None), - error.NodeExists: ('conflict', None, None), - error.Forbidden: ('forbidden', None, None), - error.ItemForbidden: ('bad-request', 'item-forbidden', None), - error.ItemRequired: ('bad-request', 'item-required', None), - error.NoInstantNodes: ('not-acceptable', - 'unsupported', - 'instant-nodes'), - error.NotSubscribed: ('unexpected-request', 'not-subscribed', None), - error.InvalidConfigurationOption: ('not-acceptable', None, None), - error.InvalidConfigurationValue: ('not-acceptable', None, None), - error.NodeNotPersistent: ('feature-not-implemented', - 'unsupported', - 'persistent-node'), - error.NoRootNode: ('bad-request', None, None), - error.NoCollections: ('feature-not-implemented', - 'unsupported', - 'collections'), - error.NoPublishing: ('feature-not-implemented', - 'unsupported', - 'publish'), - } - - def __init__(self, backend): - PubSubResource.__init__(self) - - self.backend = backend - self.hideNodes = False - - self.backend.registerNotifier(self._notify) - self.backend.registerPreDelete(self._preDelete) - - if self.backend.supportsAutoCreate(): - self.features.append("auto-create") - - if self.backend.supportsInstantNodes(): - self.features.append("instant-nodes") - - if self.backend.supportsOutcastAffiliation(): - self.features.append("outcast-affiliation") - - if self.backend.supportsPersistentItems(): - self.features.append("persistent-items") - - if self.backend.supportsPublisherAffiliation(): - self.features.append("publisher-affiliation") - - - def _notify(self, data): - items = data['items'] - nodeIdentifier = data['nodeIdentifier'] - if 'subscription' not in data: - d = self.backend.getNotifications(nodeIdentifier, items) - else: - subscription = data['subscription'] - d = defer.succeed([(subscription.subscriber, [subscription], - items)]) - d.addCallback(lambda notifications: self.pubsubService.notifyPublish( - self.serviceJID, - nodeIdentifier, - notifications)) - - - def _preDelete(self, data): - nodeIdentifier = data['nodeIdentifier'] - redirectURI = data.get('redirectURI', None) - d = self.backend.getSubscribers(nodeIdentifier) - d.addCallback(lambda subscribers: self.pubsubService.notifyDelete( - self.serviceJID, - nodeIdentifier, - subscribers, - redirectURI)) - return d - - - def _mapErrors(self, failure): - e = failure.trap(*self._errorMap.keys()) - - condition, pubsubCondition, feature = self._errorMap[e] - msg = failure.value.msg - - if pubsubCondition: - exc = PubSubError(condition, pubsubCondition, feature, msg) - else: - exc = StanzaError(condition, text=msg) - - raise exc - - - def getInfo(self, requestor, service, nodeIdentifier): - info = {} - - def saveType(result): - info['type'] = result - return nodeIdentifier - - def saveMetaData(result): - info['meta-data'] = result - return info - - def trapNotFound(failure): - failure.trap(error.NodeNotFound) - return info - - d = defer.succeed(nodeIdentifier) - d.addCallback(self.backend.getNodeType) - d.addCallback(saveType) - d.addCallback(self.backend.getNodeMetaData) - d.addCallback(saveMetaData) - d.addErrback(trapNotFound) - d.addErrback(self._mapErrors) - return d - - - def getNodes(self, requestor, service, nodeIdentifier): - if service.resource: - return defer.succeed([]) - d = self.backend.getNodes() - return d.addErrback(self._mapErrors) - - - def getConfigurationOptions(self): - return self.backend.nodeOptions - - def _publish_errb(self, failure, request): - if failure.type == error.NodeNotFound and self.backend.supportsAutoCreate(): - d = self.backend.createNode(request.nodeIdentifier, - request.sender) - d.addCallback(lambda ignore, - request: self.backend.publish(request.nodeIdentifier, - request.items, - request.sender), - request) - return d - - raise failure - - - def publish(self, request): - d = self.backend.publish(request.nodeIdentifier, - request.items, - request.sender) - d.addErrback(self._publish_errb, request) - return d.addErrback(self._mapErrors) - - - def subscribe(self, request): - d = self.backend.subscribe(request.nodeIdentifier, - request.subscriber, - request.sender) - return d.addErrback(self._mapErrors) - - - def unsubscribe(self, request): - d = self.backend.unsubscribe(request.nodeIdentifier, - request.subscriber, - request.sender) - return d.addErrback(self._mapErrors) - - - def subscriptions(self, request): - d = self.backend.getSubscriptions(request.sender) - return d.addErrback(self._mapErrors) - - - def affiliations(self, request): - d = self.backend.getAffiliations(request.sender) - return d.addErrback(self._mapErrors) - - - def create(self, request): - d = self.backend.createNode(request.nodeIdentifier, - request.sender) - return d.addErrback(self._mapErrors) - - - def default(self, request): - d = self.backend.getDefaultConfiguration(request.nodeType) - return d.addErrback(self._mapErrors) - - - def configureGet(self, request): - d = self.backend.getNodeConfiguration(request.nodeIdentifier) - return d.addErrback(self._mapErrors) - - - def configureSet(self, request): - d = self.backend.setNodeConfiguration(request.nodeIdentifier, - request.options, - request.sender) - return d.addErrback(self._mapErrors) - - - def items(self, request): - d = self.backend.getItems(request.nodeIdentifier, - request.sender, - request.maxItems, - request.itemIdentifiers) - return d.addErrback(self._mapErrors) - - - def retract(self, request): - d = self.backend.retractItem(request.nodeIdentifier, - request.itemIdentifiers, - request.sender) - return d.addErrback(self._mapErrors) - - - def purge(self, request): - d = self.backend.purgeNode(request.nodeIdentifier, - request.sender) - return d.addErrback(self._mapErrors) - - - def delete(self, request): - d = self.backend.deleteNode(request.nodeIdentifier, - request.sender) - return d.addErrback(self._mapErrors) - -components.registerAdapter(PubSubResourceFromBackend, - IBackendService, - IPubSubResource) diff -r d99047cd90f9 -r 923281d4c5bc idavoll/error.py --- a/idavoll/error.py Thu May 10 23:08:08 2012 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,95 +0,0 @@ -# Copyright (c) 2003-2008 Ralph Meijer -# See LICENSE for details. - -class Error(Exception): - msg = '' - - def __init__(self, msg=None): - self.msg = msg or self.msg - - - def __str__(self): - return self.msg - - - -class NodeNotFound(Error): - pass - - - -class NodeExists(Error): - pass - - - -class NotSubscribed(Error): - """ - Entity is not subscribed to this node. - """ - - - -class SubscriptionExists(Error): - """ - There already exists a subscription to this node. - """ - - - -class Forbidden(Error): - pass - - - -class ItemForbidden(Error): - pass - - - -class ItemRequired(Error): - pass - - - -class NoInstantNodes(Error): - pass - - - -class InvalidConfigurationOption(Error): - msg = 'Invalid configuration option' - - - -class InvalidConfigurationValue(Error): - msg = 'Bad configuration value' - - - -class NodeNotPersistent(Error): - pass - - - -class NoRootNode(Error): - pass - - - -class NoCallbacks(Error): - """ - There are no callbacks for this node. - """ - - - -class NoCollections(Error): - pass - - - -class NoPublishing(Error): - """ - This node does not support publishing. - """ diff -r d99047cd90f9 -r 923281d4c5bc idavoll/gateway.py --- a/idavoll/gateway.py Thu May 10 23:08:08 2012 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,868 +0,0 @@ -# -*- test-case-name: idavoll.test.test_gateway -*- -# -# Copyright (c) 2003-2009 Ralph Meijer -# See LICENSE for details. - -""" -Web resources and client for interacting with pubsub services. -""" - -import cgi -from time import gmtime, strftime -import urllib -import urlparse - -import simplejson - -from twisted.application import service -from twisted.internet import defer, reactor -from twisted.python import log -from twisted.web import client -from twisted.web2 import http, http_headers, resource, responsecode -from twisted.web2 import channel, server -from twisted.web2.stream import readStream -from twisted.words.protocols.jabber.jid import JID -from twisted.words.protocols.jabber.error import StanzaError -from twisted.words.xish import domish - -from wokkel.pubsub import Item -from wokkel.pubsub import PubSubClient - -from idavoll import error - -NS_ATOM = 'http://www.w3.org/2005/Atom' -MIME_ATOM_ENTRY = 'application/atom+xml;type=entry' -MIME_JSON = 'application/json' - -class XMPPURIParseError(ValueError): - """ - Raised when a given XMPP URI couldn't be properly parsed. - """ - - - -def getServiceAndNode(uri): - """ - Given an XMPP URI, extract the publish subscribe service JID and node ID. - """ - - try: - scheme, rest = uri.split(':', 1) - except ValueError: - raise XMPPURIParseError("No URI scheme component") - - if scheme != 'xmpp': - raise XMPPURIParseError("Unknown URI scheme") - - if rest.startswith("//"): - raise XMPPURIParseError("Unexpected URI authority component") - - try: - entity, query = rest.split('?', 1) - except ValueError: - raise XMPPURIParseError("No URI query component") - - if not entity: - raise XMPPURIParseError("Empty URI path component") - - try: - service = JID(entity) - except Exception, e: - raise XMPPURIParseError("Invalid JID: %s" % e) - - params = cgi.parse_qs(query) - - try: - nodeIdentifier = params['node'][0] - except (KeyError, ValueError): - nodeIdentifier = '' - - return service, nodeIdentifier - - - -def getXMPPURI(service, nodeIdentifier): - """ - Construct an XMPP URI from a service JID and node identifier. - """ - return "xmpp:%s?;node=%s" % (service.full(), nodeIdentifier or '') - - - -class WebStreamParser(object): - def __init__(self): - self.elementStream = domish.elementStream() - self.elementStream.DocumentStartEvent = self.docStart - self.elementStream.ElementEvent = self.elem - self.elementStream.DocumentEndEvent = self.docEnd - self.done = False - - - def docStart(self, elem): - self.document = elem - - - def elem(self, elem): - self.document.addChild(elem) - - - def docEnd(self): - self.done = True - - - def parse(self, stream): - def endOfStream(result): - if not self.done: - raise Exception("No more stuff?") - else: - return self.document - - d = readStream(stream, self.elementStream.parse) - d.addCallback(endOfStream) - return d - - - -class CreateResource(resource.Resource): - """ - A resource to create a publish-subscribe node. - """ - def __init__(self, backend, serviceJID, owner): - self.backend = backend - self.serviceJID = serviceJID - self.owner = owner - - - http_GET = None - - - def http_POST(self, request): - """ - Respond to a POST request to create a new node. - """ - - def toResponse(nodeIdentifier): - uri = getXMPPURI(self.serviceJID, nodeIdentifier) - stream = simplejson.dumps({'uri': uri}) - contentType = http_headers.MimeType.fromString(MIME_JSON) - return http.Response(responsecode.OK, stream=stream, - headers={'Content-Type': contentType}) - d = self.backend.createNode(None, self.owner) - d.addCallback(toResponse) - return d - - - -class DeleteResource(resource.Resource): - """ - A resource to create a publish-subscribe node. - """ - def __init__(self, backend, serviceJID, owner): - self.backend = backend - self.serviceJID = serviceJID - self.owner = owner - - - http_GET = None - - - def http_POST(self, request): - """ - Respond to a POST request to create a new node. - """ - - def gotStream(_): - if request.args.get('uri'): - jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0]) - return defer.succeed(nodeIdentifier) - else: - raise http.HTTPError(http.Response(responsecode.BAD_REQUEST, - "No URI given")) - - def doDelete(nodeIdentifier, data): - if data: - params = simplejson.loads(''.join(data)) - redirectURI = params.get('redirect_uri') - else: - redirectURI = None - - return self.backend.deleteNode(nodeIdentifier, self.owner, - redirectURI) - - def respond(result): - return http.Response(responsecode.NO_CONTENT) - - - def trapNotFound(failure): - failure.trap(error.NodeNotFound) - return http.StatusResponse(responsecode.NOT_FOUND, - "Node not found") - - def trapXMPPURIParseError(failure): - failure.trap(XMPPURIParseError) - return http.StatusResponse(responsecode.BAD_REQUEST, - "Malformed XMPP URI: %s" % failure.value) - - data = [] - d = readStream(request.stream, data.append) - d.addCallback(gotStream) - d.addCallback(doDelete, data) - d.addCallback(respond) - d.addErrback(trapNotFound) - d.addErrback(trapXMPPURIParseError) - return d - - - -class PublishResource(resource.Resource): - """ - A resource to publish to a publish-subscribe node. - """ - - def __init__(self, backend, serviceJID, owner): - self.backend = backend - self.serviceJID = serviceJID - self.owner = owner - - - http_GET = None - - - def checkMediaType(self, request): - ctype = request.headers.getHeader('content-type') - - if not ctype: - raise http.HTTPError( - http.StatusResponse( - responsecode.BAD_REQUEST, - "No specified Media Type")) - - if (ctype.mediaType != 'application' or - ctype.mediaSubtype != 'atom+xml' or - ctype.params.get('type') != 'entry' or - ctype.params.get('charset', 'utf-8') != 'utf-8'): - raise http.HTTPError( - http.StatusResponse( - responsecode.UNSUPPORTED_MEDIA_TYPE, - "Unsupported Media Type: %s" % - http_headers.generateContentType(ctype))) - - - def parseXMLPayload(self, stream): - p = WebStreamParser() - return p.parse(stream) - - - def http_POST(self, request): - """ - Respond to a POST request to create a new item. - """ - - def toResponse(nodeIdentifier): - uri = getXMPPURI(self.serviceJID, nodeIdentifier) - stream = simplejson.dumps({'uri': uri}) - contentType = http_headers.MimeType.fromString(MIME_JSON) - return http.Response(responsecode.OK, stream=stream, - headers={'Content-Type': contentType}) - - def gotNode(nodeIdentifier, payload): - item = Item(id='current', payload=payload) - d = self.backend.publish(nodeIdentifier, [item], self.owner) - d.addCallback(lambda _: nodeIdentifier) - return d - - def getNode(): - if request.args.get('uri'): - jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0]) - return defer.succeed(nodeIdentifier) - else: - return self.backend.createNode(None, self.owner) - - def doPublish(payload): - d = getNode() - d.addCallback(gotNode, payload) - return d - - def trapNotFound(failure): - failure.trap(error.NodeNotFound) - return http.StatusResponse(responsecode.NOT_FOUND, - "Node not found") - - def trapXMPPURIParseError(failure): - failure.trap(XMPPURIParseError) - return http.StatusResponse(responsecode.BAD_REQUEST, - "Malformed XMPP URI: %s" % failure.value) - - self.checkMediaType(request) - d = self.parseXMLPayload(request.stream) - d.addCallback(doPublish) - d.addCallback(toResponse) - d.addErrback(trapNotFound) - d.addErrback(trapXMPPURIParseError) - return d - - - -class ListResource(resource.Resource): - def __init__(self, service): - self.service = service - - - def render(self, request): - def responseFromNodes(nodeIdentifiers): - stream = simplejson.dumps(nodeIdentifiers) - contentType = http_headers.MimeType.fromString(MIME_JSON) - return http.Response(responsecode.OK, stream=stream, - headers={'Content-Type': contentType}) - - d = self.service.getNodes() - d.addCallback(responseFromNodes) - return d - - - -# Service for subscribing to remote XMPP Pubsub nodes and web resources - -def extractAtomEntries(items): - """ - Extract atom entries from a list of publish-subscribe items. - - @param items: List of L{domish.Element}s that represent publish-subscribe - items. - @type items: C{list} - """ - - atomEntries = [] - - for item in items: - # ignore non-items (i.e. retractions) - if item.name != 'item': - continue - - atomEntry = None - for element in item.elements(): - # extract the first element that is an atom entry - if element.uri == NS_ATOM and element.name == 'entry': - atomEntry = element - break - - if atomEntry: - atomEntries.append(atomEntry) - - return atomEntries - - - -def constructFeed(service, nodeIdentifier, entries, title): - nodeURI = getXMPPURI(service, nodeIdentifier) - now = strftime("%Y-%m-%dT%H:%M:%SZ", gmtime()) - - # Collect the received entries in a feed - feed = domish.Element((NS_ATOM, 'feed')) - feed.addElement('title', content=title) - feed.addElement('id', content=nodeURI) - feed.addElement('updated', content=now) - - for entry in entries: - feed.addChild(entry) - - return feed - - - -class RemoteSubscriptionService(service.Service, PubSubClient): - """ - Service for subscribing to remote XMPP Publish-Subscribe nodes. - - Subscriptions are created with a callback HTTP URI that is POSTed - to with the received items in notifications. - """ - - def __init__(self, jid, storage): - self.jid = jid - self.storage = storage - - - def trapNotFound(self, failure): - failure.trap(StanzaError) - - if failure.value.condition == 'item-not-found': - raise error.NodeNotFound() - else: - return failure - - - def subscribeCallback(self, jid, nodeIdentifier, callback): - """ - Subscribe a callback URI. - - This registers a callback URI to be called when a notification is - received for the given node. - - If this is the first callback registered for this node, the gateway - will subscribe to the node. Otherwise, the most recently published item - for this node is retrieved and, if present, the newly registered - callback will be called with that item. - """ - - def callbackForLastItem(items): - atomEntries = extractAtomEntries(items) - - if not atomEntries: - return - - self._postTo([callback], jid, nodeIdentifier, atomEntries[0], - 'application/atom+xml;type=entry') - - def subscribeOrItems(hasCallbacks): - if hasCallbacks: - if not nodeIdentifier: - return None - d = self.items(jid, nodeIdentifier, 1) - d.addCallback(callbackForLastItem) - else: - d = self.subscribe(jid, nodeIdentifier, self.jid) - - d.addErrback(self.trapNotFound) - return d - - d = self.storage.hasCallbacks(jid, nodeIdentifier) - d.addCallback(subscribeOrItems) - d.addCallback(lambda _: self.storage.addCallback(jid, nodeIdentifier, - callback)) - return d - - - def unsubscribeCallback(self, jid, nodeIdentifier, callback): - """ - Unsubscribe a callback. - - If this was the last registered callback for this node, the - gateway will unsubscribe from node. - """ - - def cb(last): - if last: - return self.unsubscribe(jid, nodeIdentifier, self.jid) - - d = self.storage.removeCallback(jid, nodeIdentifier, callback) - d.addCallback(cb) - return d - - - def itemsReceived(self, event): - """ - Fire up HTTP client to do callback - """ - - atomEntries = extractAtomEntries(event.items) - service = event.sender - nodeIdentifier = event.nodeIdentifier - headers = event.headers - - # Don't notify if there are no atom entries - if not atomEntries: - return - - if len(atomEntries) == 1: - contentType = 'application/atom+xml;type=entry' - payload = atomEntries[0] - else: - contentType = 'application/atom+xml;type=feed' - payload = constructFeed(service, nodeIdentifier, atomEntries, - title='Received item collection') - - self.callCallbacks(service, nodeIdentifier, payload, contentType) - - if 'Collection' in headers: - for collection in headers['Collection']: - nodeIdentifier = collection or '' - self.callCallbacks(service, nodeIdentifier, payload, - contentType) - - - def deleteReceived(self, event): - """ - Fire up HTTP client to do callback - """ - - service = event.sender - nodeIdentifier = event.nodeIdentifier - redirectURI = event.redirectURI - self.callCallbacks(service, nodeIdentifier, eventType='DELETED', - redirectURI=redirectURI) - - - def _postTo(self, callbacks, service, nodeIdentifier, - payload=None, contentType=None, eventType=None, - redirectURI=None): - - if not callbacks: - return - - postdata = None - nodeURI = getXMPPURI(service, nodeIdentifier) - headers = {'Referer': nodeURI.encode('utf-8'), - 'PubSub-Service': service.full().encode('utf-8')} - - if payload: - postdata = payload.toXml().encode('utf-8') - if contentType: - headers['Content-Type'] = "%s;charset=utf-8" % contentType - - if eventType: - headers['Event'] = eventType - - if redirectURI: - headers['Link'] = '<%s>; rel=alternate' % ( - redirectURI.encode('utf-8'), - ) - - def postNotification(callbackURI): - f = getPageWithFactory(str(callbackURI), - method='POST', - postdata=postdata, - headers=headers) - d = f.deferred - d.addErrback(log.err) - - for callbackURI in callbacks: - reactor.callLater(0, postNotification, callbackURI) - - - def callCallbacks(self, service, nodeIdentifier, - payload=None, contentType=None, eventType=None, - redirectURI=None): - - def eb(failure): - failure.trap(error.NoCallbacks) - - # No callbacks were registered for this node. Unsubscribe? - - d = self.storage.getCallbacks(service, nodeIdentifier) - d.addCallback(self._postTo, service, nodeIdentifier, payload, - contentType, eventType, redirectURI) - d.addErrback(eb) - d.addErrback(log.err) - - - -class RemoteSubscribeBaseResource(resource.Resource): - """ - Base resource for remote pubsub node subscription and unsubscription. - - This resource accepts POST request with a JSON document that holds - a dictionary with the keys C{uri} and C{callback} that respectively map - to the XMPP URI of the publish-subscribe node and the callback URI. - - This class should be inherited with L{serviceMethod} overridden. - - @cvar serviceMethod: The name of the method to be called with - the JID of the pubsub service, the node identifier - and the callback URI as received in the HTTP POST - request to this resource. - """ - serviceMethod = None - errorMap = { - error.NodeNotFound: - (responsecode.FORBIDDEN, "Node not found"), - error.NotSubscribed: - (responsecode.FORBIDDEN, "No such subscription found"), - error.SubscriptionExists: - (responsecode.FORBIDDEN, "Subscription already exists"), - } - - def __init__(self, service): - self.service = service - self.params = None - - - http_GET = None - - - def http_POST(self, request): - def trapNotFound(failure): - err = failure.trap(*self.errorMap.keys()) - code, msg = self.errorMap[err] - return http.StatusResponse(code, msg) - - def respond(result): - return http.Response(responsecode.NO_CONTENT) - - def gotRequest(result): - uri = self.params['uri'] - callback = self.params['callback'] - - jid, nodeIdentifier = getServiceAndNode(uri) - method = getattr(self.service, self.serviceMethod) - d = method(jid, nodeIdentifier, callback) - return d - - def storeParams(data): - self.params = simplejson.loads(data) - - def trapXMPPURIParseError(failure): - failure.trap(XMPPURIParseError) - return http.StatusResponse(responsecode.BAD_REQUEST, - "Malformed XMPP URI: %s" % failure.value) - - d = readStream(request.stream, storeParams) - d.addCallback(gotRequest) - d.addCallback(respond) - d.addErrback(trapNotFound) - d.addErrback(trapXMPPURIParseError) - return d - - - -class RemoteSubscribeResource(RemoteSubscribeBaseResource): - """ - Resource to subscribe to a remote publish-subscribe node. - - The passed C{uri} is the XMPP URI of the node to subscribe to and the - C{callback} is the callback URI. Upon receiving notifications from the - node, a POST request will be perfomed on the callback URI. - """ - serviceMethod = 'subscribeCallback' - - - -class RemoteUnsubscribeResource(RemoteSubscribeBaseResource): - """ - Resource to unsubscribe from a remote publish-subscribe node. - - The passed C{uri} is the XMPP URI of the node to unsubscribe from and the - C{callback} is the callback URI that was registered for it. - """ - serviceMethod = 'unsubscribeCallback' - - - -class RemoteItemsResource(resource.Resource): - """ - Resource for retrieving items from a remote pubsub node. - """ - - def __init__(self, service): - self.service = service - - - def render(self, request): - try: - maxItems = int(request.args.get('max_items', [0])[0]) or None - except ValueError: - return http.StatusResponse(responsecode.BAD_REQUEST, - "The argument max_items has an invalid value.") - - try: - uri = request.args['uri'][0] - except KeyError: - return http.StatusResponse(responsecode.BAD_REQUEST, - "No URI for the remote node provided.") - - try: - jid, nodeIdentifier = getServiceAndNode(uri) - except XMPPURIParseError: - return http.StatusResponse(responsecode.BAD_REQUEST, - "Malformed XMPP URI: %s" % uri) - - def respond(items): - """Create a feed out the retrieved items.""" - contentType = http_headers.MimeType('application', - 'atom+xml', - {'type': 'feed'}) - atomEntries = extractAtomEntries(items) - feed = constructFeed(jid, nodeIdentifier, atomEntries, - "Retrieved item collection") - payload = feed.toXml().encode('utf-8') - return http.Response(responsecode.OK, stream=payload, - headers={'Content-Type': contentType}) - - def trapNotFound(failure): - failure.trap(StanzaError) - if not failure.value.condition == 'item-not-found': - raise failure - return http.StatusResponse(responsecode.NOT_FOUND, - "Node not found") - - d = self.service.items(jid, nodeIdentifier, maxItems) - d.addCallback(respond) - d.addErrback(trapNotFound) - return d - - - -# Client side code to interact with a service as provided above - -def getPageWithFactory(url, contextFactory=None, *args, **kwargs): - """Download a web page. - - Download a page. Return the factory that holds a deferred, which will - callback with a page (as a string) or errback with a description of the - error. - - See HTTPClientFactory to see what extra args can be passed. - """ - - scheme, host, port, path = client._parse(url) - factory = client.HTTPClientFactory(url, *args, **kwargs) - factory.protocol.handleStatus_204 = lambda self: self.handleStatus_200() - - if scheme == 'https': - from twisted.internet import ssl - if contextFactory is None: - contextFactory = ssl.ClientContextFactory() - reactor.connectSSL(host, port, factory, contextFactory) - else: - reactor.connectTCP(host, port, factory) - return factory - - - -class CallbackResource(resource.Resource): - """ - Web resource for retrieving gateway notifications. - """ - - def __init__(self, callback): - self.callback = callback - - - http_GET = None - - - def http_POST(self, request): - p = WebStreamParser() - if not request.headers.hasHeader('Event'): - d = p.parse(request.stream) - else: - d = defer.succeed(None) - d.addCallback(self.callback, request.headers) - d.addCallback(lambda _: http.Response(responsecode.NO_CONTENT)) - return d - - - -class GatewayClient(service.Service): - """ - Service that provides client access to the HTTP Gateway into Idavoll. - """ - - agent = "Idavoll HTTP Gateway Client" - - def __init__(self, baseURI, callbackHost=None, callbackPort=None): - self.baseURI = baseURI - self.callbackHost = callbackHost or 'localhost' - self.callbackPort = callbackPort or 8087 - root = resource.Resource() - root.child_callback = CallbackResource(lambda *args, **kwargs: self.callback(*args, **kwargs)) - self.site = server.Site(root) - - - def startService(self): - self.port = reactor.listenTCP(self.callbackPort, - channel.HTTPFactory(self.site)) - - - def stopService(self): - return self.port.stopListening() - - - def _makeURI(self, verb, query=None): - uriComponents = urlparse.urlparse(self.baseURI) - uri = urlparse.urlunparse((uriComponents[0], - uriComponents[1], - uriComponents[2] + verb, - '', - query and urllib.urlencode(query) or '', - '')) - return uri - - - def callback(self, data, headers): - pass - - - def ping(self): - f = getPageWithFactory(self._makeURI(''), - method='HEAD', - agent=self.agent) - return f.deferred - - - def create(self): - f = getPageWithFactory(self._makeURI('create'), - method='POST', - agent=self.agent) - return f.deferred.addCallback(simplejson.loads) - - - def delete(self, xmppURI, redirectURI=None): - query = {'uri': xmppURI} - - if redirectURI: - params = {'redirect_uri': redirectURI} - postdata = simplejson.dumps(params) - headers = {'Content-Type': MIME_JSON} - else: - postdata = None - headers = None - - f = getPageWithFactory(self._makeURI('delete', query), - method='POST', - postdata=postdata, - headers=headers, - agent=self.agent) - return f.deferred - - - def publish(self, entry, xmppURI=None): - query = xmppURI and {'uri': xmppURI} - - f = getPageWithFactory(self._makeURI('publish', query), - method='POST', - postdata=entry.toXml().encode('utf-8'), - headers={'Content-Type': MIME_ATOM_ENTRY}, - agent=self.agent) - return f.deferred.addCallback(simplejson.loads) - - - def listNodes(self): - f = getPageWithFactory(self._makeURI('list'), - method='GET', - agent=self.agent) - return f.deferred.addCallback(simplejson.loads) - - - def subscribe(self, xmppURI): - params = {'uri': xmppURI, - 'callback': 'http://%s:%s/callback' % (self.callbackHost, - self.callbackPort)} - f = getPageWithFactory(self._makeURI('subscribe'), - method='POST', - postdata=simplejson.dumps(params), - headers={'Content-Type': MIME_JSON}, - agent=self.agent) - return f.deferred - - - def unsubscribe(self, xmppURI): - params = {'uri': xmppURI, - 'callback': 'http://%s:%s/callback' % (self.callbackHost, - self.callbackPort)} - f = getPageWithFactory(self._makeURI('unsubscribe'), - method='POST', - postdata=simplejson.dumps(params), - headers={'Content-Type': MIME_JSON}, - agent=self.agent) - return f.deferred - - - def items(self, xmppURI, maxItems=None): - query = {'uri': xmppURI} - if maxItems: - query['max_items'] = int(maxItems) - f = getPageWithFactory(self._makeURI('items', query), - method='GET', - agent=self.agent) - return f.deferred diff -r d99047cd90f9 -r 923281d4c5bc idavoll/iidavoll.py --- a/idavoll/iidavoll.py Thu May 10 23:08:08 2012 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,585 +0,0 @@ -# Copyright (c) 2003-2008 Ralph Meijer -# See LICENSE for details. - -""" -Interfaces for idavoll. -""" - -from zope.interface import Attribute, Interface - -class IBackendService(Interface): - """ Interface to a backend service of a pubsub service. """ - - - def __init__(storage): - """ - @param storage: Object providing L{IStorage}. - """ - - - def supportsPublisherAffiliation(): - """ Reports if the backend supports the publisher affiliation. - - @rtype: C{bool} - """ - - - def supportsOutcastAffiliation(): - """ Reports if the backend supports the publisher affiliation. - - @rtype: C{bool} - """ - - - def supportsPersistentItems(): - """ Reports if the backend supports persistent items. - - @rtype: C{bool} - """ - - - def getNodeType(nodeIdentifier): - """ Return type of a node. - - @return: a deferred that returns either 'leaf' or 'collection' - """ - - - def getNodes(): - """ Returns list of all nodes. - - @return: a deferred that returns a C{list} of node ids. - """ - - - def getNodeMetaData(nodeIdentifier): - """ Return meta data for a node. - - @return: a deferred that returns a C{list} of C{dict}s with the - metadata. - """ - - - def createNode(nodeIdentifier, requestor): - """ Create a node. - - @return: a deferred that fires when the node has been created. - """ - - - def registerPreDelete(preDeleteFn): - """ Register a callback that is called just before a node deletion. - - The function C{preDeletedFn} is added to a list of functions to be - called just before deletion of a node. The callback C{preDeleteFn} is - called with the C{nodeIdentifier} that is about to be deleted and - should return a deferred that returns a list of deferreds that are to - be fired after deletion. The backend collects the lists from all these - callbacks before actually deleting the node in question. After - deletion all collected deferreds are fired to do post-processing. - - The idea is that you want to be able to collect data from the node - before deleting it, for example to get a list of subscribers that have - to be notified after the node has been deleted. To do this, - C{preDeleteFn} fetches the subscriber list and passes this list to a - callback attached to a deferred that it sets up. This deferred is - returned in the list of deferreds. - """ - - - def deleteNode(nodeIdentifier, requestor): - """ Delete a node. - - @return: a deferred that fires when the node has been deleted. - """ - - - def purgeNode(nodeIdentifier, requestor): - """ Removes all items in node from persistent storage """ - - - def subscribe(nodeIdentifier, subscriber, requestor): - """ Request the subscription of an entity to a pubsub node. - - Depending on the node's configuration and possible business rules, the - C{subscriber} is added to the list of subscriptions of the node with id - C{nodeIdentifier}. The C{subscriber} might be different from the - C{requestor}, and if the C{requestor} is not allowed to subscribe this - entity an exception should be raised. - - @return: a deferred that returns the subscription state - """ - - - def unsubscribe(nodeIdentifier, subscriber, requestor): - """ Cancel the subscription of an entity to a pubsub node. - - The subscription of C{subscriber} is removed from the list of - subscriptions of the node with id C{nodeIdentifier}. If the - C{requestor} is not allowed to unsubscribe C{subscriber}, an an - exception should be raised. - - @return: a deferred that fires when unsubscription is complete. - """ - - - def getSubscribers(nodeIdentifier): - """ Get node subscriber list. - - @return: a deferred that fires with the list of subscribers. - """ - - - def getSubscriptions(entity): - """ Report the list of current subscriptions with this pubsub service. - - Report the list of the current subscriptions with all nodes within this - pubsub service, for the C{entity}. - - @return: a deferred that returns the list of all current subscriptions - as tuples C{(nodeIdentifier, subscriber, subscription)}. - """ - - - def getAffiliations(entity): - """ Report the list of current affiliations with this pubsub service. - - Report the list of the current affiliations with all nodes within this - pubsub service, for the C{entity}. - - @return: a deferred that returns the list of all current affiliations - as tuples C{(nodeIdentifier, affiliation)}. - """ - - - def publish(nodeIdentifier, items, requestor): - """ Publish items to a pubsub node. - - @return: a deferred that fires when the items have been published. - @rtype: L{Deferred} - """ - - - def registerNotifier(observerfn, *args, **kwargs): - """ Register callback which is called for notification. """ - - - def getNotifications(nodeIdentifier, items): - """ - Get notification list. - - This method is called to discover which entities should receive - notifications for the given items that have just been published to the - given node. - - The notification list contains tuples (subscriber, subscriptions, - items) to result in one notification per tuple: the given subscriptions - yielded the given items to be notified to this subscriber. This - structure is needed allow for letting the subscriber know which - subscriptions yielded which notifications, while catering for - collection nodes and content-based subscriptions. - - To minimize the amount of notifications per entity, implementers - should take care that if all items in C{items} were yielded - by the same set of subscriptions, exactly one tuple is for this - subscriber is returned, so that the subscriber would get exactly one - notification. Alternatively, one tuple per subscription combination. - - @param nodeIdentifier: The identifier of the node the items were - published to. - @type nodeIdentifier: C{unicode}. - @param items: The list of published items as - L{Element}s. - @type items: C{list} - @return: The notification list as tuples of - (L{JID}, - C{list} of L{Subscription}, - C{list} of L{Element}. - @rtype: C{list} - """ - - - def getItems(nodeIdentifier, requestor, maxItems=None, itemIdentifiers=[]): - """ Retrieve items from persistent storage - - If C{maxItems} is given, return the C{maxItems} last published - items, else if C{itemIdentifiers} is not empty, return the items - requested. If neither is given, return all items. - - @return: a deferred that returns the requested items - """ - - - def retractItem(nodeIdentifier, itemIdentifier, requestor): - """ Removes item in node from persistent storage """ - - - -class IStorage(Interface): - """ - Storage interface. - """ - - - def getNode(nodeIdentifier): - """ - Get Node. - - @param nodeIdentifier: NodeID of the desired node. - @type nodeIdentifier: C{str} - @return: deferred that returns a L{INode} providing object. - """ - - - def getNodeIds(): - """ - Return all NodeIDs. - - @return: deferred that returns a list of NodeIDs (C{unicode}). - """ - - - def createNode(nodeIdentifier, owner, config): - """ - Create new node. - - The implementation should make sure, the passed owner JID is stripped - of the resource (e.g. using C{owner.userhostJID()}). The passed config - is expected to have values for the fields returned by - L{getDefaultConfiguration}, as well as a value for - C{'pubsub#node_type'}. - - @param nodeIdentifier: NodeID of the new node. - @type nodeIdentifier: C{unicode} - @param owner: JID of the new nodes's owner. - @type owner: L{JID} - @param config: Node configuration. - @type config: C{dict} - @return: deferred that fires on creation. - """ - - - def deleteNode(nodeIdentifier): - """ - Delete a node. - - @param nodeIdentifier: NodeID of the new node. - @type nodeIdentifier: C{unicode} - @return: deferred that fires on deletion. - """ - - - def getAffiliations(entity): - """ - Get all affiliations for entity. - - The implementation should make sure, the passed owner JID is stripped - of the resource (e.g. using C{owner.userhostJID()}). - - @param entity: JID of the entity. - @type entity: L{JID} - @return: deferred that returns a C{list} of tuples of the form - C{(nodeIdentifier, affiliation)}, where C{nodeIdentifier} is - of the type L{unicode} and C{affiliation} is one of - C{'owner'}, C{'publisher'} and C{'outcast'}. - """ - - - def getSubscriptions(entity): - """ - Get all subscriptions for an entity. - - The implementation should make sure, the passed owner JID is stripped - of the resource (e.g. using C{owner.userhostJID()}). - - @param entity: JID of the entity. - @type entity: L{JID} - @return: deferred that returns a C{list} of tuples of the form - C{(nodeIdentifier, subscriber, state)}, where - C{nodeIdentifier} is of the type C{unicode}, C{subscriber} of - the type J{JID}, and - C{state} is C{'subscribed'}, C{'pending'} or - C{'unconfigured'}. - """ - - - def getDefaultConfiguration(nodeType): - """ - Get the default configuration for the given node type. - - @param nodeType: Either C{'leaf'} or C{'collection'}. - @type nodeType: C{str} - @return: The default configuration. - @rtype: C{dict}. - @raises: L{idavoll.error.NoCollections} if collections are not - supported. - """ - - - -class INode(Interface): - """ - Interface to the class of objects that represent nodes. - """ - - nodeType = Attribute("""The type of this node. One of {'leaf'}, - {'collection'}.""") - nodeIdentifier = Attribute("""The node identifer of this node""") - - - def getType(): - """ - Get node's type. - - @return: C{'leaf'} or C{'collection'}. - """ - - - def getConfiguration(): - """ - Get node's configuration. - - The configuration must at least have two options: - C{pubsub#persist_items}, and C{pubsub#deliver_payloads}. - - @return: C{dict} of configuration options. - """ - - - def getMetaData(): - """ - Get node's meta data. - - The meta data must be a superset of the configuration options, and - also at least should have a C{pubsub#node_type} entry. - - @return: C{dict} of meta data. - """ - - - def setConfiguration(options): - """ - Set node's configuration. - - The elements of {options} will set the new values for those - configuration items. This means that only changing items have to - be given. - - @param options: a dictionary of configuration options. - @returns: a deferred that fires upon success. - """ - - - def getAffiliation(entity): - """ - Get affiliation of entity with this node. - - @param entity: JID of entity. - @type entity: L{JID} - @return: deferred that returns C{'owner'}, C{'publisher'}, C{'outcast'} - or C{None}. - """ - - - def getSubscription(subscriber): - """ - Get subscription to this node of subscriber. - - @param subscriber: JID of the new subscriptions' entity. - @type subscriber: L{JID} - @return: deferred that returns the subscription state (C{'subscribed'}, - C{'pending'} or C{None}). - """ - - - def getSubscriptions(state=None): - """ - Get list of subscriptions to this node. - - The optional C{state} argument filters the subscriptions to their - state. - - @param state: Subscription state filter. One of C{'subscribed'}, - C{'pending'}, C{'unconfigured'}. - @type state: C{str} - @return: a deferred that returns a C{list} of - L{wokkel.pubsub.Subscription}s. - """ - - - def addSubscription(subscriber, state, config): - """ - Add new subscription to this node with given state. - - @param subscriber: JID of the new subscriptions' entity. - @type subscriber: L{JID} - @param state: C{'subscribed'} or C{'pending'} - @type state: C{str} - @param config: Subscription configuration. - @param config: C{dict} - @return: deferred that fires on subscription. - """ - - - def removeSubscription(subscriber): - """ - Remove subscription to this node. - - @param subscriber: JID of the subscriptions' entity. - @type subscriber: L{JID} - @return: deferred that fires on removal. - """ - - - def isSubscribed(entity): - """ - Returns whether entity has any subscription to this node. - - Only returns C{True} when the subscription state (if present) is - C{'subscribed'} for any subscription that matches the bare JID. - - @param subscriber: bare JID of the subscriptions' entity. - @type subscriber: L{JID} - @return: deferred that returns a C{bool}. - """ - - - def getAffiliations(): - """ - Get affiliations of entities with this node. - - @return: deferred that returns a C{list} of tuples (jid, affiliation), - where jid is a L(JID) - and affiliation is one of C{'owner'}, - C{'publisher'}, C{'outcast'}. - """ - - - -class ILeafNode(Interface): - """ - Interface to the class of objects that represent leaf nodes. - """ - - def storeItems(items, publisher): - """ - Store items in persistent storage for later retrieval. - - @param items: The list of items to be stored. Each item is the - L{domish} representation of the XML fragment as defined - for C{} in the - C{http://jabber.org/protocol/pubsub} namespace. - @type items: C{list} of {domish.Element} - @param publisher: JID of the publishing entity. - @type publisher: L{JID} - @return: deferred that fires upon success. - """ - - - def removeItems(itemIdentifiers): - """ - Remove items by id. - - @param itemIdentifiers: C{list} of item ids. - @return: deferred that fires with a C{list} of ids of the items that - were deleted - """ - - - def getItems(maxItems=None): - """ - Get items. - - If C{maxItems} is not given, all items in the node are returned, - just like C{getItemsById}. Otherwise, C{maxItems} limits - the returned items to a maximum of that number of most recently - published items. - - @param maxItems: if given, a natural number (>0) that limits the - returned number of items. - @return: deferred that fires with a C{list} of found items. - """ - - - def getItemsById(itemIdentifiers): - """ - Get items by item id. - - Each item in the returned list is a unicode string that - represent the XML of the item as it was published, including the - item wrapper with item id. - - @param itemIdentifiers: C{list} of item ids. - @return: deferred that fires with a C{list} of found items. - """ - - - def purge(): - """ - Purge node of all items in persistent storage. - - @return: deferred that fires when the node has been purged. - """ - - - -class IGatewayStorage(Interface): - - def addCallback(service, nodeIdentifier, callback): - """ - Register a callback URI. - - The registered HTTP callback URI will have an Atom Entry documented - POSTed to it upon receiving a notification for the given pubsub node. - - @param service: The XMPP entity that holds the node. - @type service: L{JID} - @param nodeIdentifier: The identifier of the publish-subscribe node. - @type nodeIdentifier: C{unicode}. - @param callback: The callback URI to be registered. - @type callback: C{str}. - @rtype: L{Deferred} - """ - - def removeCallback(service, nodeIdentifier, callback): - """ - Remove a registered callback URI. - - The returned deferred will fire with a boolean that signals wether or - not this was the last callback unregistered for this node. - - @param service: The XMPP entity that holds the node. - @type service: L{JID} - @param nodeIdentifier: The identifier of the publish-subscribe node. - @type nodeIdentifier: C{unicode}. - @param callback: The callback URI to be unregistered. - @type callback: C{str}. - @rtype: L{Deferred} - """ - - def getCallbacks(service, nodeIdentifier): - """ - Get the callbacks registered for this node. - - Returns a deferred that fires with the set of HTTP callback URIs - registered for this node. - - @param service: The XMPP entity that holds the node. - @type service: L{JID} - @param nodeIdentifier: The identifier of the publish-subscribe node. - @type nodeIdentifier: C{unicode}. - @rtype: L{Deferred} - """ - - - def hasCallbacks(service, nodeIdentifier): - """ - Return wether there are callbacks registered for a node. - - @param service: The XMPP entity that holds the node. - @type service: L{JID} - @param nodeIdentifier: The identifier of the publish-subscribe node. - @type nodeIdentifier: C{unicode}. - @returns: Deferred that fires with a boolean. - @rtype: L{Deferred} - """ diff -r d99047cd90f9 -r 923281d4c5bc idavoll/memory_storage.py --- a/idavoll/memory_storage.py Thu May 10 23:08:08 2012 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,320 +0,0 @@ -# Copyright (c) 2003-2010 Ralph Meijer -# See LICENSE for details. - -import copy -from zope.interface import implements -from twisted.internet import defer -from twisted.words.protocols.jabber import jid - -from wokkel.pubsub import Subscription - -from idavoll import error, iidavoll - -class Storage: - - implements(iidavoll.IStorage) - - defaultConfig = { - 'leaf': { - "pubsub#persist_items": True, - "pubsub#deliver_payloads": True, - "pubsub#send_last_published_item": 'on_sub', - }, - 'collection': { - "pubsub#deliver_payloads": True, - "pubsub#send_last_published_item": 'on_sub', - } - } - - def __init__(self): - rootNode = CollectionNode('', jid.JID('localhost'), - copy.copy(self.defaultConfig['collection'])) - self._nodes = {'': rootNode} - - - def getNode(self, nodeIdentifier): - try: - node = self._nodes[nodeIdentifier] - except KeyError: - return defer.fail(error.NodeNotFound()) - - return defer.succeed(node) - - - def getNodeIds(self): - return defer.succeed(self._nodes.keys()) - - - def createNode(self, nodeIdentifier, owner, config): - if nodeIdentifier in self._nodes: - return defer.fail(error.NodeExists()) - - if config['pubsub#node_type'] != 'leaf': - raise error.NoCollections() - - node = LeafNode(nodeIdentifier, owner, config) - self._nodes[nodeIdentifier] = node - - return defer.succeed(None) - - - def deleteNode(self, nodeIdentifier): - try: - del self._nodes[nodeIdentifier] - except KeyError: - return defer.fail(error.NodeNotFound()) - - return defer.succeed(None) - - - def getAffiliations(self, entity): - entity = entity.userhost() - return defer.succeed([(node.nodeIdentifier, node._affiliations[entity]) - for name, node in self._nodes.iteritems() - if entity in node._affiliations]) - - - def getSubscriptions(self, entity): - subscriptions = [] - for node in self._nodes.itervalues(): - for subscriber, subscription in node._subscriptions.iteritems(): - subscriber = jid.internJID(subscriber) - if subscriber.userhostJID() == entity.userhostJID(): - subscriptions.append(subscription) - - return defer.succeed(subscriptions) - - - def getDefaultConfiguration(self, nodeType): - if nodeType == 'collection': - raise error.NoCollections() - - return self.defaultConfig[nodeType] - - -class Node: - - implements(iidavoll.INode) - - def __init__(self, nodeIdentifier, owner, config): - self.nodeIdentifier = nodeIdentifier - self._affiliations = {owner.userhost(): 'owner'} - self._subscriptions = {} - self._config = copy.copy(config) - - - def getType(self): - return self.nodeType - - - def getConfiguration(self): - return self._config - - - def getMetaData(self): - config = copy.copy(self._config) - config["pubsub#node_type"] = self.nodeType - return config - - - def setConfiguration(self, options): - for option in options: - if option in self._config: - self._config[option] = options[option] - - return defer.succeed(None) - - - def getAffiliation(self, entity): - return defer.succeed(self._affiliations.get(entity.userhost())) - - - def getSubscription(self, subscriber): - try: - subscription = self._subscriptions[subscriber.full()] - except KeyError: - return defer.succeed(None) - else: - return defer.succeed(subscription) - - - def getSubscriptions(self, state=None): - return defer.succeed( - [subscription - for subscription in self._subscriptions.itervalues() - if state is None or subscription.state == state]) - - - - def addSubscription(self, subscriber, state, options): - if self._subscriptions.get(subscriber.full()): - return defer.fail(error.SubscriptionExists()) - - subscription = Subscription(self.nodeIdentifier, subscriber, state, - options) - self._subscriptions[subscriber.full()] = subscription - return defer.succeed(None) - - - def removeSubscription(self, subscriber): - try: - del self._subscriptions[subscriber.full()] - except KeyError: - return defer.fail(error.NotSubscribed()) - - return defer.succeed(None) - - - def isSubscribed(self, entity): - for subscriber, subscription in self._subscriptions.iteritems(): - if jid.internJID(subscriber).userhost() == entity.userhost() and \ - subscription.state == 'subscribed': - return defer.succeed(True) - - return defer.succeed(False) - - - def getAffiliations(self): - affiliations = [(jid.internJID(entity), affiliation) for entity, affiliation - in self._affiliations.iteritems()] - - return defer.succeed(affiliations) - - - -class PublishedItem(object): - """ - A published item. - - This represent an item as it was published by an entity. - - @ivar element: The DOM representation of the item that was published. - @type element: L{Element} - @ivar publisher: The entity that published the item. - @type publisher: L{JID} - """ - - def __init__(self, element, publisher): - self.element = element - self.publisher = publisher - - - -class LeafNode(Node): - - implements(iidavoll.ILeafNode) - - nodeType = 'leaf' - - def __init__(self, nodeIdentifier, owner, config): - Node.__init__(self, nodeIdentifier, owner, config) - self._items = {} - self._itemlist = [] - - - def storeItems(self, items, publisher): - for element in items: - item = PublishedItem(element, publisher) - itemIdentifier = element["id"] - if itemIdentifier in self._items: - self._itemlist.remove(self._items[itemIdentifier]) - self._items[itemIdentifier] = item - self._itemlist.append(item) - - return defer.succeed(None) - - - def removeItems(self, itemIdentifiers): - deleted = [] - - for itemIdentifier in itemIdentifiers: - try: - item = self._items[itemIdentifier] - except KeyError: - pass - else: - self._itemlist.remove(item) - del self._items[itemIdentifier] - deleted.append(itemIdentifier) - - return defer.succeed(deleted) - - - def getItems(self, maxItems=None): - if maxItems: - itemList = self._itemlist[-maxItems:] - else: - itemList = self._itemlist - return defer.succeed([item.element for item in itemList]) - - - def getItemsById(self, itemIdentifiers): - items = [] - for itemIdentifier in itemIdentifiers: - try: - item = self._items[itemIdentifier] - except KeyError: - pass - else: - items.append(item.element) - return defer.succeed(items) - - - def purge(self): - self._items = {} - self._itemlist = [] - - return defer.succeed(None) - - -class CollectionNode(Node): - nodeType = 'collection' - - - -class GatewayStorage(object): - """ - Memory based storage facility for the XMPP-HTTP gateway. - """ - - def __init__(self): - self.callbacks = {} - - - def addCallback(self, service, nodeIdentifier, callback): - try: - callbacks = self.callbacks[service, nodeIdentifier] - except KeyError: - callbacks = set([callback]) - self.callbacks[service, nodeIdentifier] = callbacks - else: - callbacks.add(callback) - pass - - return defer.succeed(None) - - - def removeCallback(self, service, nodeIdentifier, callback): - try: - callbacks = self.callbacks[service, nodeIdentifier] - callbacks.remove(callback) - except KeyError: - return defer.fail(error.NotSubscribed()) - else: - if not callbacks: - del self.callbacks[service, nodeIdentifier] - - return defer.succeed(not callbacks) - - - def getCallbacks(self, service, nodeIdentifier): - try: - callbacks = self.callbacks[service, nodeIdentifier] - except KeyError: - return defer.fail(error.NoCallbacks()) - else: - return defer.succeed(callbacks) - - - def hasCallbacks(self, service, nodeIdentifier): - return defer.succeed((service, nodeIdentifier) in self.callbacks) diff -r d99047cd90f9 -r 923281d4c5bc idavoll/pgsql_storage.py --- a/idavoll/pgsql_storage.py Thu May 10 23:08:08 2012 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,605 +0,0 @@ -# Copyright (c) 2003-2008 Ralph Meijer -# See LICENSE for details. - -import copy - -from zope.interface import implements - -from twisted.enterprise import adbapi -from twisted.words.protocols.jabber import jid - -from wokkel.generic import parseXml, stripNamespace -from wokkel.pubsub import Subscription - -from idavoll import error, iidavoll - -class Storage: - - implements(iidavoll.IStorage) - - defaultConfig = { - 'leaf': { - "pubsub#persist_items": True, - "pubsub#deliver_payloads": True, - "pubsub#send_last_published_item": 'on_sub', - }, - 'collection': { - "pubsub#deliver_payloads": True, - "pubsub#send_last_published_item": 'on_sub', - } - } - - def __init__(self, dbpool): - self.dbpool = dbpool - - - def getNode(self, nodeIdentifier): - return self.dbpool.runInteraction(self._getNode, nodeIdentifier) - - - def _getNode(self, cursor, nodeIdentifier): - configuration = {} - cursor.execute("""SELECT node_type, - persist_items, - deliver_payloads, - send_last_published_item - FROM nodes - WHERE node=%s""", - (nodeIdentifier,)) - row = cursor.fetchone() - - if not row: - raise error.NodeNotFound() - - if row[0] == 'leaf': - configuration = { - 'pubsub#persist_items': row[1], - 'pubsub#deliver_payloads': row[2], - 'pubsub#send_last_published_item': - row[3]} - node = LeafNode(nodeIdentifier, configuration) - node.dbpool = self.dbpool - return node - elif row[0] == 'collection': - configuration = { - 'pubsub#deliver_payloads': row[2], - 'pubsub#send_last_published_item': - row[3]} - node = CollectionNode(nodeIdentifier, configuration) - node.dbpool = self.dbpool - return node - - - - def getNodeIds(self): - d = self.dbpool.runQuery("""SELECT node from nodes""") - d.addCallback(lambda results: [r[0] for r in results]) - return d - - - def createNode(self, nodeIdentifier, owner, config): - return self.dbpool.runInteraction(self._createNode, nodeIdentifier, - owner, config) - - - def _createNode(self, cursor, nodeIdentifier, owner, config): - if config['pubsub#node_type'] != 'leaf': - raise error.NoCollections() - - owner = owner.userhost() - try: - cursor.execute("""INSERT INTO nodes - (node, node_type, persist_items, - deliver_payloads, send_last_published_item) - VALUES - (%s, 'leaf', %s, %s, %s)""", - (nodeIdentifier, - config['pubsub#persist_items'], - config['pubsub#deliver_payloads'], - config['pubsub#send_last_published_item']) - ) - except cursor._pool.dbapi.IntegrityError: - raise error.NodeExists() - - cursor.execute("""SELECT 1 from entities where jid=%s""", - (owner,)) - - if not cursor.fetchone(): - cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", - (owner,)) - - cursor.execute("""INSERT INTO affiliations - (node_id, entity_id, affiliation) - SELECT node_id, entity_id, 'owner' FROM - (SELECT node_id FROM nodes WHERE node=%s) as n - CROSS JOIN - (SELECT entity_id FROM entities - WHERE jid=%s) as e""", - (nodeIdentifier, owner)) - - - def deleteNode(self, nodeIdentifier): - return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier) - - - def _deleteNode(self, cursor, nodeIdentifier): - cursor.execute("""DELETE FROM nodes WHERE node=%s""", - (nodeIdentifier,)) - - if cursor.rowcount != 1: - raise error.NodeNotFound() - - - def getAffiliations(self, entity): - d = self.dbpool.runQuery("""SELECT node, affiliation FROM entities - NATURAL JOIN affiliations - NATURAL JOIN nodes - WHERE jid=%s""", - (entity.userhost(),)) - d.addCallback(lambda results: [tuple(r) for r in results]) - return d - - - def getSubscriptions(self, entity): - def toSubscriptions(rows): - subscriptions = [] - for row in rows: - subscriber = jid.internJID('%s/%s' % (row[1], - row[2])) - subscription = Subscription(row[0], subscriber, row[3]) - subscriptions.append(subscription) - return subscriptions - - d = self.dbpool.runQuery("""SELECT node, jid, resource, state - FROM entities - NATURAL JOIN subscriptions - NATURAL JOIN nodes - WHERE jid=%s""", - (entity.userhost(),)) - d.addCallback(toSubscriptions) - return d - - - def getDefaultConfiguration(self, nodeType): - return self.defaultConfig[nodeType] - - - -class Node: - - implements(iidavoll.INode) - - def __init__(self, nodeIdentifier, config): - self.nodeIdentifier = nodeIdentifier - self._config = config - - - def _checkNodeExists(self, cursor): - cursor.execute("""SELECT node_id FROM nodes WHERE node=%s""", - (self.nodeIdentifier,)) - if not cursor.fetchone(): - raise error.NodeNotFound() - - - def getType(self): - return self.nodeType - - - def getConfiguration(self): - return self._config - - - def setConfiguration(self, options): - config = copy.copy(self._config) - - for option in options: - if option in config: - config[option] = options[option] - - d = self.dbpool.runInteraction(self._setConfiguration, config) - d.addCallback(self._setCachedConfiguration, config) - return d - - - def _setConfiguration(self, cursor, config): - self._checkNodeExists(cursor) - cursor.execute("""UPDATE nodes SET persist_items=%s, - deliver_payloads=%s, - send_last_published_item=%s - WHERE node=%s""", - (config["pubsub#persist_items"], - config["pubsub#deliver_payloads"], - config["pubsub#send_last_published_item"], - self.nodeIdentifier)) - - - def _setCachedConfiguration(self, void, config): - self._config = config - - - def getMetaData(self): - config = copy.copy(self._config) - config["pubsub#node_type"] = self.nodeType - return config - - - def getAffiliation(self, entity): - return self.dbpool.runInteraction(self._getAffiliation, entity) - - - def _getAffiliation(self, cursor, entity): - self._checkNodeExists(cursor) - cursor.execute("""SELECT affiliation FROM affiliations - NATURAL JOIN nodes - NATURAL JOIN entities - WHERE node=%s AND jid=%s""", - (self.nodeIdentifier, - entity.userhost())) - - try: - return cursor.fetchone()[0] - except TypeError: - return None - - - def getSubscription(self, subscriber): - return self.dbpool.runInteraction(self._getSubscription, subscriber) - - - def _getSubscription(self, cursor, subscriber): - self._checkNodeExists(cursor) - - userhost = subscriber.userhost() - resource = subscriber.resource or '' - - cursor.execute("""SELECT state FROM subscriptions - NATURAL JOIN nodes - NATURAL JOIN entities - WHERE node=%s AND jid=%s AND resource=%s""", - (self.nodeIdentifier, - userhost, - resource)) - row = cursor.fetchone() - if not row: - return None - else: - return Subscription(self.nodeIdentifier, subscriber, row[0]) - - - def getSubscriptions(self, state=None): - return self.dbpool.runInteraction(self._getSubscriptions, state) - - - def _getSubscriptions(self, cursor, state): - self._checkNodeExists(cursor) - - query = """SELECT jid, resource, state, - subscription_type, subscription_depth - FROM subscriptions - NATURAL JOIN nodes - NATURAL JOIN entities - WHERE node=%s"""; - values = [self.nodeIdentifier] - - if state: - query += " AND state=%s" - values.append(state) - - cursor.execute(query, values); - rows = cursor.fetchall() - - subscriptions = [] - for row in rows: - subscriber = jid.JID('%s/%s' % (row[0], row[1])) - - options = {} - if row[3]: - options['pubsub#subscription_type'] = row[3]; - if row[4]: - options['pubsub#subscription_depth'] = row[4]; - - subscriptions.append(Subscription(self.nodeIdentifier, subscriber, - row[2], options)) - - return subscriptions - - - def addSubscription(self, subscriber, state, config): - return self.dbpool.runInteraction(self._addSubscription, subscriber, - state, config) - - - def _addSubscription(self, cursor, subscriber, state, config): - self._checkNodeExists(cursor) - - userhost = subscriber.userhost() - resource = subscriber.resource or '' - - subscription_type = config.get('pubsub#subscription_type') - subscription_depth = config.get('pubsub#subscription_depth') - - try: - cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", - (userhost,)) - except cursor._pool.dbapi.IntegrityError: - cursor._connection.rollback() - - try: - cursor.execute("""INSERT INTO subscriptions - (node_id, entity_id, resource, state, - subscription_type, subscription_depth) - SELECT node_id, entity_id, %s, %s, %s, %s FROM - (SELECT node_id FROM nodes - WHERE node=%s) as n - CROSS JOIN - (SELECT entity_id FROM entities - WHERE jid=%s) as e""", - (resource, - state, - subscription_type, - subscription_depth, - self.nodeIdentifier, - userhost)) - except cursor._pool.dbapi.IntegrityError: - raise error.SubscriptionExists() - - - def removeSubscription(self, subscriber): - return self.dbpool.runInteraction(self._removeSubscription, - subscriber) - - - def _removeSubscription(self, cursor, subscriber): - self._checkNodeExists(cursor) - - userhost = subscriber.userhost() - resource = subscriber.resource or '' - - cursor.execute("""DELETE FROM subscriptions WHERE - node_id=(SELECT node_id FROM nodes - WHERE node=%s) AND - entity_id=(SELECT entity_id FROM entities - WHERE jid=%s) AND - resource=%s""", - (self.nodeIdentifier, - userhost, - resource)) - if cursor.rowcount != 1: - raise error.NotSubscribed() - - return None - - - def isSubscribed(self, entity): - return self.dbpool.runInteraction(self._isSubscribed, entity) - - - def _isSubscribed(self, cursor, entity): - self._checkNodeExists(cursor) - - cursor.execute("""SELECT 1 FROM entities - NATURAL JOIN subscriptions - NATURAL JOIN nodes - WHERE entities.jid=%s - AND node=%s AND state='subscribed'""", - (entity.userhost(), - self.nodeIdentifier)) - - return cursor.fetchone() is not None - - - def getAffiliations(self): - return self.dbpool.runInteraction(self._getAffiliations) - - - def _getAffiliations(self, cursor): - self._checkNodeExists(cursor) - - cursor.execute("""SELECT jid, affiliation FROM nodes - NATURAL JOIN affiliations - NATURAL JOIN entities - WHERE node=%s""", - (self.nodeIdentifier,)) - result = cursor.fetchall() - - return [(jid.internJID(r[0]), r[1]) for r in result] - - - -class LeafNode(Node): - - implements(iidavoll.ILeafNode) - - nodeType = 'leaf' - - def storeItems(self, items, publisher): - return self.dbpool.runInteraction(self._storeItems, items, publisher) - - - def _storeItems(self, cursor, items, publisher): - self._checkNodeExists(cursor) - for item in items: - self._storeItem(cursor, item, publisher) - - - def _storeItem(self, cursor, item, publisher): - data = item.toXml() - cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s - FROM nodes - WHERE nodes.node_id = items.node_id AND - nodes.node = %s and items.item=%s""", - (publisher.full(), - data, - self.nodeIdentifier, - item["id"])) - if cursor.rowcount == 1: - return - - cursor.execute("""INSERT INTO items (node_id, item, publisher, data) - SELECT node_id, %s, %s, %s FROM nodes - WHERE node=%s""", - (item["id"], - publisher.full(), - data, - self.nodeIdentifier)) - - - def removeItems(self, itemIdentifiers): - return self.dbpool.runInteraction(self._removeItems, itemIdentifiers) - - - def _removeItems(self, cursor, itemIdentifiers): - self._checkNodeExists(cursor) - - deleted = [] - - for itemIdentifier in itemIdentifiers: - cursor.execute("""DELETE FROM items WHERE - node_id=(SELECT node_id FROM nodes - WHERE node=%s) AND - item=%s""", - (self.nodeIdentifier, - itemIdentifier)) - - if cursor.rowcount: - deleted.append(itemIdentifier) - - return deleted - - - def getItems(self, maxItems=None): - return self.dbpool.runInteraction(self._getItems, maxItems) - - - def _getItems(self, cursor, maxItems): - self._checkNodeExists(cursor) - query = """SELECT data FROM nodes - NATURAL JOIN items - WHERE node=%s ORDER BY date DESC""" - if maxItems: - cursor.execute(query + " LIMIT %s", - (self.nodeIdentifier, - maxItems)) - else: - cursor.execute(query, (self.nodeIdentifier,)) - - result = cursor.fetchall() - items = [stripNamespace(parseXml(r[0])) for r in result] - return items - - - def getItemsById(self, itemIdentifiers): - return self.dbpool.runInteraction(self._getItemsById, itemIdentifiers) - - - def _getItemsById(self, cursor, itemIdentifiers): - self._checkNodeExists(cursor) - items = [] - for itemIdentifier in itemIdentifiers: - cursor.execute("""SELECT data FROM nodes - NATURAL JOIN items - WHERE node=%s AND item=%s""", - (self.nodeIdentifier, - itemIdentifier)) - result = cursor.fetchone() - if result: - items.append(parseXml(result[0])) - return items - - - def purge(self): - return self.dbpool.runInteraction(self._purge) - - - def _purge(self, cursor): - self._checkNodeExists(cursor) - - cursor.execute("""DELETE FROM items WHERE - node_id=(SELECT node_id FROM nodes WHERE node=%s)""", - (self.nodeIdentifier,)) - - -class CollectionNode(Node): - - nodeType = 'collection' - - - -class GatewayStorage(object): - """ - Memory based storage facility for the XMPP-HTTP gateway. - """ - - def __init__(self, dbpool): - self.dbpool = dbpool - - - def _countCallbacks(self, cursor, service, nodeIdentifier): - """ - Count number of callbacks registered for a node. - """ - cursor.execute("""SELECT count(*) FROM callbacks - WHERE service=%s and node=%s""", - service.full(), - nodeIdentifier) - results = cursor.fetchall() - return results[0][0] - - - def addCallback(self, service, nodeIdentifier, callback): - def interaction(cursor): - cursor.execute("""SELECT 1 FROM callbacks - WHERE service=%s and node=%s and uri=%s""", - service.full(), - nodeIdentifier, - callback) - if cursor.fetchall(): - return - - cursor.execute("""INSERT INTO callbacks - (service, node, uri) VALUES - (%s, %s, %s)""", - service.full(), - nodeIdentifier, - callback) - - return self.dbpool.runInteraction(interaction) - - - def removeCallback(self, service, nodeIdentifier, callback): - def interaction(cursor): - cursor.execute("""DELETE FROM callbacks - WHERE service=%s and node=%s and uri=%s""", - service.full(), - nodeIdentifier, - callback) - - if cursor.rowcount != 1: - raise error.NotSubscribed() - - last = not self._countCallbacks(cursor, service, nodeIdentifier) - return last - - return self.dbpool.runInteraction(interaction) - - def getCallbacks(self, service, nodeIdentifier): - def interaction(cursor): - cursor.execute("""SELECT uri FROM callbacks - WHERE service=%s and node=%s""", - service.full(), - nodeIdentifier) - results = cursor.fetchall() - - if not results: - raise error.NoCallbacks() - - return [result[0] for result in results] - - return self.dbpool.runInteraction(interaction) - - - def hasCallbacks(self, service, nodeIdentifier): - def interaction(cursor): - return bool(self._countCallbacks(cursor, service, nodeIdentifier)) - - return self.dbpool.runInteraction(interaction) diff -r d99047cd90f9 -r 923281d4c5bc idavoll/tap.py --- a/idavoll/tap.py Thu May 10 23:08:08 2012 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,94 +0,0 @@ -# Copyright (c) 2003-2010 Ralph Meijer -# See LICENSE for details. - -from twisted.application import service -from twisted.python import usage -from twisted.words.protocols.jabber.jid import JID - -from wokkel.component import Component -from wokkel.disco import DiscoHandler -from wokkel.generic import FallbackHandler, VersionHandler -from wokkel.iwokkel import IPubSubResource -from wokkel.pubsub import PubSubService - -from idavoll import __version__ -from idavoll.backend import BackendService - -class Options(usage.Options): - optParameters = [ - ('jid', None, 'pubsub', 'JID this component will be available at'), - ('secret', None, 'secret', 'Jabber server component secret'), - ('rhost', None, '127.0.0.1', 'Jabber server host'), - ('rport', None, '5347', 'Jabber server port'), - ('backend', None, 'memory', 'Choice of storage backend'), - ('dbuser', None, None, 'Database user (pgsql backend)'), - ('dbname', None, 'pubsub', 'Database name (pgsql backend)'), - ('dbpass', None, None, 'Database password (pgsql backend)'), - ('dbhost', None, None, 'Database host (pgsql backend)'), - ('dbport', None, None, 'Database port (pgsql backend)'), - ] - - optFlags = [ - ('verbose', 'v', 'Show traffic'), - ('hide-nodes', None, 'Hide all nodes for disco') - ] - - def postOptions(self): - if self['backend'] not in ['pgsql', 'memory']: - raise usage.UsageError, "Unknown backend!" - - self['jid'] = JID(self['jid']) - - - -def makeService(config): - s = service.MultiService() - - # Create backend service with storage - - if config['backend'] == 'pgsql': - from twisted.enterprise import adbapi - from idavoll.pgsql_storage import Storage - dbpool = adbapi.ConnectionPool('psycopg2', - user=config['dbuser'], - password=config['dbpass'], - database=config['dbname'], - host=config['dbhost'], - port=config['dbport'], - cp_reconnect=True, - client_encoding='utf-8', - ) - st = Storage(dbpool) - elif config['backend'] == 'memory': - from idavoll.memory_storage import Storage - st = Storage() - - bs = BackendService(st) - bs.setName('backend') - bs.setServiceParent(s) - - # Set up XMPP server-side component with publish-subscribe capabilities - - cs = Component(config["rhost"], int(config["rport"]), - config["jid"].full(), config["secret"]) - cs.setName('component') - cs.setServiceParent(s) - - cs.factory.maxDelay = 900 - - if config["verbose"]: - cs.logTraffic = True - - FallbackHandler().setHandlerParent(cs) - VersionHandler('Idavoll', __version__).setHandlerParent(cs) - DiscoHandler().setHandlerParent(cs) - - resource = IPubSubResource(bs) - resource.hideNodes = config["hide-nodes"] - resource.serviceJID = config["jid"] - - ps = PubSubService(resource) - ps.setHandlerParent(cs) - resource.pubsubService = ps - - return s diff -r d99047cd90f9 -r 923281d4c5bc idavoll/tap_http.py --- a/idavoll/tap_http.py Thu May 10 23:08:08 2012 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,97 +0,0 @@ -# Copyright (c) 2003-2008 Ralph Meijer -# See LICENSE for details. - -from twisted.application import internet, service, strports -from twisted.conch import manhole, manhole_ssh -from twisted.cred import portal, checkers -from twisted.web2 import channel, log, resource, server -from twisted.web2.tap import Web2Service - -from idavoll import gateway, tap -from idavoll.gateway import RemoteSubscriptionService - -class Options(tap.Options): - optParameters = [ - ('webport', None, '8086', 'Web port'), - ] - - - -def getManholeFactory(namespace, **passwords): - def getManHole(_): - return manhole.Manhole(namespace) - - realm = manhole_ssh.TerminalRealm() - realm.chainedProtocolFactory.protocolFactory = getManHole - p = portal.Portal(realm) - p.registerChecker( - checkers.InMemoryUsernamePasswordDatabaseDontUse(**passwords)) - f = manhole_ssh.ConchFactory(p) - return f - - - -def makeService(config): - s = tap.makeService(config) - - bs = s.getServiceNamed('backend') - cs = s.getServiceNamed('component') - - # Set up XMPP service for subscribing to remote nodes - - if config['backend'] == 'pgsql': - from idavoll.pgsql_storage import GatewayStorage - gst = GatewayStorage(bs.storage.dbpool) - elif config['backend'] == 'memory': - from idavoll.memory_storage import GatewayStorage - gst = GatewayStorage() - - ss = RemoteSubscriptionService(config['jid'], gst) - ss.setHandlerParent(cs) - ss.startService() - - # Set up web service - - root = resource.Resource() - - # Set up resources that exposes the backend - root.child_create = gateway.CreateResource(bs, config['jid'], - config['jid']) - root.child_delete = gateway.DeleteResource(bs, config['jid'], - config['jid']) - root.child_publish = gateway.PublishResource(bs, config['jid'], - config['jid']) - root.child_list = gateway.ListResource(bs) - - # Set up resources for accessing remote pubsub nodes. - root.child_subscribe = gateway.RemoteSubscribeResource(ss) - root.child_unsubscribe = gateway.RemoteUnsubscribeResource(ss) - root.child_items = gateway.RemoteItemsResource(ss) - - if config["verbose"]: - root = log.LogWrapperResource(root) - - site = server.Site(root) - w = internet.TCPServer(int(config['webport']), channel.HTTPFactory(site)) - - if config["verbose"]: - logObserver = log.DefaultCommonAccessLoggingObserver() - w2s = Web2Service(logObserver) - w.setServiceParent(w2s) - w = w2s - - w.setServiceParent(s) - - # Set up a manhole - - namespace = {'service': s, - 'component': cs, - 'backend': bs, - 'root': root} - - f = getManholeFactory(namespace, admin='admin') - manholeService = strports.service('2222', f) - manholeService.setServiceParent(s) - - return s - diff -r d99047cd90f9 -r 923281d4c5bc idavoll/test/__init__.py --- a/idavoll/test/__init__.py Thu May 10 23:08:08 2012 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,6 +0,0 @@ -# Copyright (c) 2003-2007 Ralph Meijer -# See LICENSE for details. - -""" -Tests for L{idavoll}. -""" diff -r d99047cd90f9 -r 923281d4c5bc idavoll/test/test_backend.py --- a/idavoll/test/test_backend.py Thu May 10 23:08:08 2012 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,611 +0,0 @@ -# Copyright (c) 2003-2010 Ralph Meijer -# See LICENSE for details. - -""" -Tests for L{idavoll.backend}. -""" - -from zope.interface import implements -from zope.interface.verify import verifyObject - -from twisted.internet import defer -from twisted.trial import unittest -from twisted.words.protocols.jabber import jid -from twisted.words.protocols.jabber.error import StanzaError - -from wokkel import iwokkel, pubsub - -from idavoll import backend, error, iidavoll - -OWNER = jid.JID('owner@example.com') -OWNER_FULL = jid.JID('owner@example.com/home') -SERVICE = jid.JID('test.example.org') -NS_PUBSUB = 'http://jabber.org/protocol/pubsub' - -class BackendTest(unittest.TestCase): - - def test_interfaceIBackend(self): - self.assertTrue(verifyObject(iidavoll.IBackendService, - backend.BackendService(None))) - - - def test_deleteNode(self): - class TestNode: - nodeIdentifier = 'to-be-deleted' - def getAffiliation(self, entity): - if entity.userhostJID() == OWNER: - return defer.succeed('owner') - - class TestStorage: - def __init__(self): - self.deleteCalled = [] - - def getNode(self, nodeIdentifier): - return defer.succeed(TestNode()) - - def deleteNode(self, nodeIdentifier): - if nodeIdentifier in ['to-be-deleted']: - self.deleteCalled.append(nodeIdentifier) - return defer.succeed(None) - else: - return defer.fail(error.NodeNotFound()) - - def preDelete(data): - self.assertFalse(self.storage.deleteCalled) - preDeleteCalled.append(data) - return defer.succeed(None) - - def cb(result): - self.assertEquals(1, len(preDeleteCalled)) - data = preDeleteCalled[-1] - self.assertEquals('to-be-deleted', data['nodeIdentifier']) - self.assertTrue(self.storage.deleteCalled) - - self.storage = TestStorage() - self.backend = backend.BackendService(self.storage) - - preDeleteCalled = [] - - self.backend.registerPreDelete(preDelete) - d = self.backend.deleteNode('to-be-deleted', OWNER_FULL) - d.addCallback(cb) - return d - - - def test_deleteNodeRedirect(self): - uri = 'xmpp:%s?;node=test2' % (SERVICE.full(),) - - class TestNode: - nodeIdentifier = 'to-be-deleted' - def getAffiliation(self, entity): - if entity.userhostJID() == OWNER: - return defer.succeed('owner') - - class TestStorage: - def __init__(self): - self.deleteCalled = [] - - def getNode(self, nodeIdentifier): - return defer.succeed(TestNode()) - - def deleteNode(self, nodeIdentifier): - if nodeIdentifier in ['to-be-deleted']: - self.deleteCalled.append(nodeIdentifier) - return defer.succeed(None) - else: - return defer.fail(error.NodeNotFound()) - - def preDelete(data): - self.assertFalse(self.storage.deleteCalled) - preDeleteCalled.append(data) - return defer.succeed(None) - - def cb(result): - self.assertEquals(1, len(preDeleteCalled)) - data = preDeleteCalled[-1] - self.assertEquals('to-be-deleted', data['nodeIdentifier']) - self.assertEquals(uri, data['redirectURI']) - self.assertTrue(self.storage.deleteCalled) - - self.storage = TestStorage() - self.backend = backend.BackendService(self.storage) - - preDeleteCalled = [] - - self.backend.registerPreDelete(preDelete) - d = self.backend.deleteNode('to-be-deleted', OWNER, redirectURI=uri) - d.addCallback(cb) - return d - - - def test_createNodeNoID(self): - """ - Test creation of a node without a given node identifier. - """ - class TestStorage: - def getDefaultConfiguration(self, nodeType): - return {} - - def createNode(self, nodeIdentifier, requestor, config): - self.nodeIdentifier = nodeIdentifier - return defer.succeed(None) - - self.storage = TestStorage() - self.backend = backend.BackendService(self.storage) - self.storage.backend = self.backend - - def checkID(nodeIdentifier): - self.assertNotIdentical(None, nodeIdentifier) - self.assertIdentical(self.storage.nodeIdentifier, nodeIdentifier) - - d = self.backend.createNode(None, OWNER_FULL) - d.addCallback(checkID) - return d - - class NodeStore: - """ - I just store nodes to pose as an L{IStorage} implementation. - """ - def __init__(self, nodes): - self.nodes = nodes - - def getNode(self, nodeIdentifier): - try: - return defer.succeed(self.nodes[nodeIdentifier]) - except KeyError: - return defer.fail(error.NodeNotFound()) - - - def test_getNotifications(self): - """ - Ensure subscribers show up in the notification list. - """ - item = pubsub.Item() - sub = pubsub.Subscription('test', OWNER, 'subscribed') - - class TestNode: - def getSubscriptions(self, state=None): - return [sub] - - def cb(result): - self.assertEquals(1, len(result)) - subscriber, subscriptions, items = result[-1] - - self.assertEquals(OWNER, subscriber) - self.assertEquals(set([sub]), subscriptions) - self.assertEquals([item], items) - - self.storage = self.NodeStore({'test': TestNode()}) - self.backend = backend.BackendService(self.storage) - d = self.backend.getNotifications('test', [item]) - d.addCallback(cb) - return d - - def test_getNotificationsRoot(self): - """ - Ensure subscribers to the root node show up in the notification list - for leaf nodes. - - This assumes a flat node relationship model with exactly one collection - node: the root node. Each leaf node is automatically a child node - of the root node. - """ - item = pubsub.Item() - subRoot = pubsub.Subscription('', OWNER, 'subscribed') - - class TestNode: - def getSubscriptions(self, state=None): - return [] - - class TestRootNode: - def getSubscriptions(self, state=None): - return [subRoot] - - def cb(result): - self.assertEquals(1, len(result)) - subscriber, subscriptions, items = result[-1] - self.assertEquals(OWNER, subscriber) - self.assertEquals(set([subRoot]), subscriptions) - self.assertEquals([item], items) - - self.storage = self.NodeStore({'test': TestNode(), - '': TestRootNode()}) - self.backend = backend.BackendService(self.storage) - d = self.backend.getNotifications('test', [item]) - d.addCallback(cb) - return d - - - def test_getNotificationsMultipleNodes(self): - """ - Ensure that entities that subscribe to a leaf node as well as the - root node get exactly one notification. - """ - item = pubsub.Item() - sub = pubsub.Subscription('test', OWNER, 'subscribed') - subRoot = pubsub.Subscription('', OWNER, 'subscribed') - - class TestNode: - def getSubscriptions(self, state=None): - return [sub] - - class TestRootNode: - def getSubscriptions(self, state=None): - return [subRoot] - - def cb(result): - self.assertEquals(1, len(result)) - subscriber, subscriptions, items = result[-1] - - self.assertEquals(OWNER, subscriber) - self.assertEquals(set([sub, subRoot]), subscriptions) - self.assertEquals([item], items) - - self.storage = self.NodeStore({'test': TestNode(), - '': TestRootNode()}) - self.backend = backend.BackendService(self.storage) - d = self.backend.getNotifications('test', [item]) - d.addCallback(cb) - return d - - - def test_getDefaultConfiguration(self): - """ - L{backend.BackendService.getDefaultConfiguration} should return - a deferred that fires a dictionary with configuration values. - """ - - class TestStorage: - def getDefaultConfiguration(self, nodeType): - return { - "pubsub#persist_items": True, - "pubsub#deliver_payloads": True} - - def cb(options): - self.assertIn("pubsub#persist_items", options) - self.assertEqual(True, options["pubsub#persist_items"]) - - self.backend = backend.BackendService(TestStorage()) - d = self.backend.getDefaultConfiguration('leaf') - d.addCallback(cb) - return d - - - def test_getNodeConfiguration(self): - class testNode: - nodeIdentifier = 'node' - def getConfiguration(self): - return {'pubsub#deliver_payloads': True, - 'pubsub#persist_items': False} - - class testStorage: - def getNode(self, nodeIdentifier): - return defer.succeed(testNode()) - - def cb(options): - self.assertIn("pubsub#deliver_payloads", options) - self.assertEqual(True, options["pubsub#deliver_payloads"]) - self.assertIn("pubsub#persist_items", options) - self.assertEqual(False, options["pubsub#persist_items"]) - - self.storage = testStorage() - self.backend = backend.BackendService(self.storage) - self.storage.backend = self.backend - - d = self.backend.getNodeConfiguration('node') - d.addCallback(cb) - return d - - - def test_setNodeConfiguration(self): - class testNode: - nodeIdentifier = 'node' - def getAffiliation(self, entity): - if entity.userhostJID() == OWNER: - return defer.succeed('owner') - def setConfiguration(self, options): - self.options = options - - class testStorage: - def __init__(self): - self.nodes = {'node': testNode()} - def getNode(self, nodeIdentifier): - return defer.succeed(self.nodes[nodeIdentifier]) - - def checkOptions(node): - options = node.options - self.assertIn("pubsub#deliver_payloads", options) - self.assertEqual(True, options["pubsub#deliver_payloads"]) - self.assertIn("pubsub#persist_items", options) - self.assertEqual(False, options["pubsub#persist_items"]) - - def cb(result): - d = self.storage.getNode('node') - d.addCallback(checkOptions) - return d - - self.storage = testStorage() - self.backend = backend.BackendService(self.storage) - self.storage.backend = self.backend - - options = {'pubsub#deliver_payloads': True, - 'pubsub#persist_items': False} - - d = self.backend.setNodeConfiguration('node', options, OWNER_FULL) - d.addCallback(cb) - return d - - - def test_publishNoID(self): - """ - Test publish request with an item without a node identifier. - """ - class TestNode: - nodeType = 'leaf' - nodeIdentifier = 'node' - def getAffiliation(self, entity): - if entity.userhostJID() == OWNER: - return defer.succeed('owner') - def getConfiguration(self): - return {'pubsub#deliver_payloads': True, - 'pubsub#persist_items': False} - - class TestStorage: - def getNode(self, nodeIdentifier): - return defer.succeed(TestNode()) - - def checkID(notification): - self.assertNotIdentical(None, notification['items'][0]['id']) - - self.storage = TestStorage() - self.backend = backend.BackendService(self.storage) - self.storage.backend = self.backend - - self.backend.registerNotifier(checkID) - - items = [pubsub.Item()] - d = self.backend.publish('node', items, OWNER_FULL) - return d - - - def test_notifyOnSubscription(self): - """ - Test notification of last published item on subscription. - """ - ITEM = "" % NS_PUBSUB - - class TestNode: - implements(iidavoll.ILeafNode) - nodeIdentifier = 'node' - nodeType = 'leaf' - def getAffiliation(self, entity): - if entity is OWNER: - return defer.succeed('owner') - def getConfiguration(self): - return {'pubsub#deliver_payloads': True, - 'pubsub#persist_items': False, - 'pubsub#send_last_published_item': 'on_sub'} - def getItems(self, maxItems): - return [ITEM] - def addSubscription(self, subscriber, state, options): - self.subscription = pubsub.Subscription('node', subscriber, - state, options) - return defer.succeed(None) - def getSubscription(self, subscriber): - return defer.succeed(self.subscription) - - class TestStorage: - def getNode(self, nodeIdentifier): - return defer.succeed(TestNode()) - - def cb(data): - self.assertEquals('node', data['nodeIdentifier']) - self.assertEquals([ITEM], data['items']) - self.assertEquals(OWNER, data['subscription'].subscriber) - - self.storage = TestStorage() - self.backend = backend.BackendService(self.storage) - self.storage.backend = self.backend - - d1 = defer.Deferred() - d1.addCallback(cb) - self.backend.registerNotifier(d1.callback) - d2 = self.backend.subscribe('node', OWNER, OWNER_FULL) - return defer.gatherResults([d1, d2]) - - test_notifyOnSubscription.timeout = 2 - - - -class BaseTestBackend(object): - """ - Base class for backend stubs. - """ - - def supportsPublisherAffiliation(self): - return True - - - def supportsOutcastAffiliation(self): - return True - - - def supportsPersistentItems(self): - return True - - - def supportsInstantNodes(self): - return True - - - def registerNotifier(self, observerfn, *args, **kwargs): - return - - - def registerPreDelete(self, preDeleteFn): - return - - - -class PubSubResourceFromBackendTest(unittest.TestCase): - - def test_interface(self): - resource = backend.PubSubResourceFromBackend(BaseTestBackend()) - self.assertTrue(verifyObject(iwokkel.IPubSubResource, resource)) - - - def test_preDelete(self): - """ - Test pre-delete sending out notifications to subscribers. - """ - - class TestBackend(BaseTestBackend): - preDeleteFn = None - - def registerPreDelete(self, preDeleteFn): - self.preDeleteFn = preDeleteFn - - def getSubscribers(self, nodeIdentifier): - return defer.succeed([OWNER]) - - def notifyDelete(service, nodeIdentifier, subscribers, - redirectURI=None): - self.assertEqual(SERVICE, service) - self.assertEqual('test', nodeIdentifier) - self.assertEqual([OWNER], subscribers) - self.assertIdentical(None, redirectURI) - d1.callback(None) - - d1 = defer.Deferred() - resource = backend.PubSubResourceFromBackend(TestBackend()) - resource.serviceJID = SERVICE - resource.pubsubService = pubsub.PubSubService() - resource.pubsubService.notifyDelete = notifyDelete - self.assertTrue(verifyObject(iwokkel.IPubSubResource, resource)) - self.assertNotIdentical(None, resource.backend.preDeleteFn) - data = {'nodeIdentifier': 'test'} - d2 = resource.backend.preDeleteFn(data) - return defer.DeferredList([d1, d2], fireOnOneErrback=1) - - - def test_preDeleteRedirect(self): - """ - Test pre-delete sending out notifications to subscribers. - """ - - uri = 'xmpp:%s?;node=test2' % (SERVICE.full(),) - - class TestBackend(BaseTestBackend): - preDeleteFn = None - - def registerPreDelete(self, preDeleteFn): - self.preDeleteFn = preDeleteFn - - def getSubscribers(self, nodeIdentifier): - return defer.succeed([OWNER]) - - def notifyDelete(service, nodeIdentifier, subscribers, - redirectURI=None): - self.assertEqual(SERVICE, service) - self.assertEqual('test', nodeIdentifier) - self.assertEqual([OWNER], subscribers) - self.assertEqual(uri, redirectURI) - d1.callback(None) - - d1 = defer.Deferred() - resource = backend.PubSubResourceFromBackend(TestBackend()) - resource.serviceJID = SERVICE - resource.pubsubService = pubsub.PubSubService() - resource.pubsubService.notifyDelete = notifyDelete - self.assertTrue(verifyObject(iwokkel.IPubSubResource, resource)) - self.assertNotIdentical(None, resource.backend.preDeleteFn) - data = {'nodeIdentifier': 'test', - 'redirectURI': uri} - d2 = resource.backend.preDeleteFn(data) - return defer.DeferredList([d1, d2], fireOnOneErrback=1) - - - def test_unsubscribeNotSubscribed(self): - """ - Test unsubscription request when not subscribed. - """ - - class TestBackend(BaseTestBackend): - def unsubscribe(self, nodeIdentifier, subscriber, requestor): - return defer.fail(error.NotSubscribed()) - - def cb(e): - self.assertEquals('unexpected-request', e.condition) - - resource = backend.PubSubResourceFromBackend(TestBackend()) - request = pubsub.PubSubRequest() - request.sender = OWNER - request.recipient = SERVICE - request.nodeIdentifier = 'test' - request.subscriber = OWNER - d = resource.unsubscribe(request) - self.assertFailure(d, StanzaError) - d.addCallback(cb) - return d - - - def test_getInfo(self): - """ - Test retrieving node information. - """ - - class TestBackend(BaseTestBackend): - def getNodeType(self, nodeIdentifier): - return defer.succeed('leaf') - - def getNodeMetaData(self, nodeIdentifier): - return defer.succeed({'pubsub#persist_items': True}) - - def cb(info): - self.assertIn('type', info) - self.assertEquals('leaf', info['type']) - self.assertIn('meta-data', info) - self.assertEquals({'pubsub#persist_items': True}, info['meta-data']) - - resource = backend.PubSubResourceFromBackend(TestBackend()) - d = resource.getInfo(OWNER, SERVICE, 'test') - d.addCallback(cb) - return d - - - def test_getConfigurationOptions(self): - class TestBackend(BaseTestBackend): - nodeOptions = { - "pubsub#persist_items": - {"type": "boolean", - "label": "Persist items to storage"}, - "pubsub#deliver_payloads": - {"type": "boolean", - "label": "Deliver payloads with event notifications"} - } - - resource = backend.PubSubResourceFromBackend(TestBackend()) - options = resource.getConfigurationOptions() - self.assertIn("pubsub#persist_items", options) - - - def test_default(self): - class TestBackend(BaseTestBackend): - def getDefaultConfiguration(self, nodeType): - options = {"pubsub#persist_items": True, - "pubsub#deliver_payloads": True, - "pubsub#send_last_published_item": 'on_sub', - } - return defer.succeed(options) - - def cb(options): - self.assertEquals(True, options["pubsub#persist_items"]) - - resource = backend.PubSubResourceFromBackend(TestBackend()) - request = pubsub.PubSubRequest() - request.sender = OWNER - request.recipient = SERVICE - request.nodeType = 'leaf' - d = resource.default(request) - d.addCallback(cb) - return d diff -r d99047cd90f9 -r 923281d4c5bc idavoll/test/test_gateway.py --- a/idavoll/test/test_gateway.py Thu May 10 23:08:08 2012 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,398 +0,0 @@ -# Copyright (c) 2003-2009 Ralph Meijer -# See LICENSE for details. - -""" -Tests for L{idavoll.gateway}. - -Note that some tests are functional tests that require a running idavoll -service. -""" - -from twisted.internet import defer -from twisted.trial import unittest -from twisted.web import error -from twisted.words.xish import domish - -from idavoll import gateway - -AGENT = "Idavoll Test Script" -NS_ATOM = "http://www.w3.org/2005/Atom" - -TEST_ENTRY = domish.Element((NS_ATOM, 'entry')) -TEST_ENTRY.addElement("id", - content="urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a") -TEST_ENTRY.addElement("title", content="Atom-Powered Robots Run Amok") -TEST_ENTRY.addElement("author").addElement("name", content="John Doe") -TEST_ENTRY.addElement("content", content="Some text.") - -baseURI = "http://localhost:8086/" -componentJID = "pubsub" - -class GatewayTest(unittest.TestCase): - timeout = 2 - - def setUp(self): - self.client = gateway.GatewayClient(baseURI) - self.client.startService() - self.addCleanup(self.client.stopService) - - def trapConnectionRefused(failure): - from twisted.internet.error import ConnectionRefusedError - failure.trap(ConnectionRefusedError) - raise unittest.SkipTest("Gateway to test against is not available") - - def trapNotFound(failure): - from twisted.web.error import Error - failure.trap(Error) - - d = self.client.ping() - d.addErrback(trapConnectionRefused) - d.addErrback(trapNotFound) - return d - - - def tearDown(self): - return self.client.stopService() - - - def test_create(self): - - def cb(response): - self.assertIn('uri', response) - - d = self.client.create() - d.addCallback(cb) - return d - - def test_publish(self): - - def cb(response): - self.assertIn('uri', response) - - d = self.client.publish(TEST_ENTRY) - d.addCallback(cb) - return d - - def test_publishExistingNode(self): - - def cb2(response, xmppURI): - self.assertEquals(xmppURI, response['uri']) - - def cb1(response): - xmppURI = response['uri'] - d = self.client.publish(TEST_ENTRY, xmppURI) - d.addCallback(cb2, xmppURI) - return d - - d = self.client.create() - d.addCallback(cb1) - return d - - def test_publishNonExisting(self): - def cb(err): - self.assertEqual('404', err.status) - - d = self.client.publish(TEST_ENTRY, 'xmpp:%s?node=test' % componentJID) - self.assertFailure(d, error.Error) - d.addCallback(cb) - return d - - def test_delete(self): - def cb(response): - xmppURI = response['uri'] - d = self.client.delete(xmppURI) - return d - - d = self.client.create() - d.addCallback(cb) - return d - - def test_deleteWithRedirect(self): - def cb(response): - xmppURI = response['uri'] - redirectURI = 'xmpp:%s?node=test' % componentJID - d = self.client.delete(xmppURI, redirectURI) - return d - - d = self.client.create() - d.addCallback(cb) - return d - - def test_deleteNotification(self): - def onNotification(data, headers): - try: - self.assertTrue(headers.hasHeader('Event')) - self.assertEquals(['DELETED'], headers.getRawHeaders('Event')) - self.assertFalse(headers.hasHeader('Link')) - except: - self.client.deferred.errback() - else: - self.client.deferred.callback(None) - - def cb(response): - xmppURI = response['uri'] - d = self.client.subscribe(xmppURI) - d.addCallback(lambda _: xmppURI) - return d - - def cb2(xmppURI): - d = self.client.delete(xmppURI) - return d - - self.client.callback = onNotification - self.client.deferred = defer.Deferred() - d = self.client.create() - d.addCallback(cb) - d.addCallback(cb2) - return defer.gatherResults([d, self.client.deferred]) - - def test_deleteNotificationWithRedirect(self): - redirectURI = 'xmpp:%s?node=test' % componentJID - - def onNotification(data, headers): - try: - self.assertTrue(headers.hasHeader('Event')) - self.assertEquals(['DELETED'], headers.getRawHeaders('Event')) - self.assertEquals(['<%s>; rel=alternate' % redirectURI], - headers.getRawHeaders('Link')) - except: - self.client.deferred.errback() - else: - self.client.deferred.callback(None) - - def cb(response): - xmppURI = response['uri'] - d = self.client.subscribe(xmppURI) - d.addCallback(lambda _: xmppURI) - return d - - def cb2(xmppURI): - d = self.client.delete(xmppURI, redirectURI) - return d - - self.client.callback = onNotification - self.client.deferred = defer.Deferred() - d = self.client.create() - d.addCallback(cb) - d.addCallback(cb2) - return defer.gatherResults([d, self.client.deferred]) - - def test_list(self): - d = self.client.listNodes() - return d - - def test_subscribe(self): - def cb(response): - xmppURI = response['uri'] - d = self.client.subscribe(xmppURI) - return d - - d = self.client.create() - d.addCallback(cb) - return d - - def test_subscribeGetNotification(self): - - def onNotification(data, headers): - self.client.deferred.callback(None) - - def cb(response): - xmppURI = response['uri'] - d = self.client.subscribe(xmppURI) - d.addCallback(lambda _: xmppURI) - return d - - def cb2(xmppURI): - d = self.client.publish(TEST_ENTRY, xmppURI) - return d - - - self.client.callback = onNotification - self.client.deferred = defer.Deferred() - d = self.client.create() - d.addCallback(cb) - d.addCallback(cb2) - return defer.gatherResults([d, self.client.deferred]) - - - def test_subscribeTwiceGetNotification(self): - - def onNotification1(data, headers): - d = client1.stopService() - d.chainDeferred(client1.deferred) - - def onNotification2(data, headers): - d = client2.stopService() - d.chainDeferred(client2.deferred) - - def cb(response): - xmppURI = response['uri'] - d = client1.subscribe(xmppURI) - d.addCallback(lambda _: xmppURI) - return d - - def cb2(xmppURI): - d = client2.subscribe(xmppURI) - d.addCallback(lambda _: xmppURI) - return d - - def cb3(xmppURI): - d = self.client.publish(TEST_ENTRY, xmppURI) - return d - - - client1 = gateway.GatewayClient(baseURI, callbackPort=8088) - client1.startService() - client1.callback = onNotification1 - client1.deferred = defer.Deferred() - client2 = gateway.GatewayClient(baseURI, callbackPort=8089) - client2.startService() - client2.callback = onNotification2 - client2.deferred = defer.Deferred() - - d = self.client.create() - d.addCallback(cb) - d.addCallback(cb2) - d.addCallback(cb3) - dl = defer.gatherResults([d, client1.deferred, client2.deferred]) - return dl - - - def test_subscribeGetDelayedNotification(self): - - def onNotification(data, headers): - self.client.deferred.callback(None) - - def cb(response): - xmppURI = response['uri'] - self.assertNot(self.client.deferred.called) - d = self.client.publish(TEST_ENTRY, xmppURI) - d.addCallback(lambda _: xmppURI) - return d - - def cb2(xmppURI): - d = self.client.subscribe(xmppURI) - return d - - - self.client.callback = onNotification - self.client.deferred = defer.Deferred() - d = self.client.create() - d.addCallback(cb) - d.addCallback(cb2) - return defer.gatherResults([d, self.client.deferred]) - - def test_subscribeGetDelayedNotification2(self): - """ - Test that subscribing as second results in a notification being sent. - """ - - def onNotification1(data, headers): - client1.deferred.callback(None) - client1.stopService() - - def onNotification2(data, headers): - client2.deferred.callback(None) - client2.stopService() - - def cb(response): - xmppURI = response['uri'] - self.assertNot(client1.deferred.called) - self.assertNot(client2.deferred.called) - d = self.client.publish(TEST_ENTRY, xmppURI) - d.addCallback(lambda _: xmppURI) - return d - - def cb2(xmppURI): - d = client1.subscribe(xmppURI) - d.addCallback(lambda _: xmppURI) - return d - - def cb3(xmppURI): - d = client2.subscribe(xmppURI) - return d - - client1 = gateway.GatewayClient(baseURI, callbackPort=8088) - client1.startService() - client1.callback = onNotification1 - client1.deferred = defer.Deferred() - client2 = gateway.GatewayClient(baseURI, callbackPort=8089) - client2.startService() - client2.callback = onNotification2 - client2.deferred = defer.Deferred() - - - d = self.client.create() - d.addCallback(cb) - d.addCallback(cb2) - d.addCallback(cb3) - dl = defer.gatherResults([d, client1.deferred, client2.deferred]) - return dl - - - def test_subscribeNonExisting(self): - def cb(err): - self.assertEqual('403', err.status) - - d = self.client.subscribe('xmpp:%s?node=test' % componentJID) - self.assertFailure(d, error.Error) - d.addCallback(cb) - return d - - - def test_subscribeRootGetNotification(self): - - def onNotification(data, headers): - self.client.deferred.callback(None) - - def cb(response): - xmppURI = response['uri'] - jid, nodeIdentifier = gateway.getServiceAndNode(xmppURI) - rootNode = gateway.getXMPPURI(jid, '') - - d = self.client.subscribe(rootNode) - d.addCallback(lambda _: xmppURI) - return d - - def cb2(xmppURI): - return self.client.publish(TEST_ENTRY, xmppURI) - - - self.client.callback = onNotification - self.client.deferred = defer.Deferred() - d = self.client.create() - d.addCallback(cb) - d.addCallback(cb2) - return defer.gatherResults([d, self.client.deferred]) - - - def test_unsubscribeNonExisting(self): - def cb(err): - self.assertEqual('403', err.status) - - d = self.client.unsubscribe('xmpp:%s?node=test' % componentJID) - self.assertFailure(d, error.Error) - d.addCallback(cb) - return d - - - def test_items(self): - def cb(response): - xmppURI = response['uri'] - d = self.client.items(xmppURI) - return d - - d = self.client.publish(TEST_ENTRY) - d.addCallback(cb) - return d - - - def test_itemsMaxItems(self): - def cb(response): - xmppURI = response['uri'] - d = self.client.items(xmppURI, 2) - return d - - d = self.client.publish(TEST_ENTRY) - d.addCallback(cb) - return d diff -r d99047cd90f9 -r 923281d4c5bc idavoll/test/test_storage.py --- a/idavoll/test/test_storage.py Thu May 10 23:08:08 2012 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,585 +0,0 @@ -# Copyright (c) 2003-2010 Ralph Meijer -# See LICENSE for details. - -""" -Tests for L{idavoll.memory_storage} and L{idavoll.pgsql_storage}. -""" - -from zope.interface.verify import verifyObject -from twisted.trial import unittest -from twisted.words.protocols.jabber import jid -from twisted.internet import defer -from twisted.words.xish import domish - -from idavoll import error, iidavoll - -OWNER = jid.JID('owner@example.com/Work') -SUBSCRIBER = jid.JID('subscriber@example.com/Home') -SUBSCRIBER_NEW = jid.JID('new@example.com/Home') -SUBSCRIBER_TO_BE_DELETED = jid.JID('to_be_deleted@example.com/Home') -SUBSCRIBER_PENDING = jid.JID('pending@example.com/Home') -PUBLISHER = jid.JID('publisher@example.com') -ITEM = domish.Element((None, 'item')) -ITEM['id'] = 'current' -ITEM.addElement(('testns', 'test'), content=u'Test \u2083 item') -ITEM_NEW = domish.Element((None, 'item')) -ITEM_NEW['id'] = 'new' -ITEM_NEW.addElement(('testns', 'test'), content=u'Test \u2083 item') -ITEM_UPDATED = domish.Element((None, 'item')) -ITEM_UPDATED['id'] = 'current' -ITEM_UPDATED.addElement(('testns', 'test'), content=u'Test \u2084 item') -ITEM_TO_BE_DELETED = domish.Element((None, 'item')) -ITEM_TO_BE_DELETED['id'] = 'to-be-deleted' -ITEM_TO_BE_DELETED.addElement(('testns', 'test'), content=u'Test \u2083 item') - -def decode(object): - if isinstance(object, str): - object = object.decode('utf-8') - return object - - - -class StorageTests: - - def _assignTestNode(self, node): - self.node = node - - - def setUp(self): - d = self.s.getNode('pre-existing') - d.addCallback(self._assignTestNode) - return d - - - def test_interfaceIStorage(self): - self.assertTrue(verifyObject(iidavoll.IStorage, self.s)) - - - def test_interfaceINode(self): - self.assertTrue(verifyObject(iidavoll.INode, self.node)) - - - def test_interfaceILeafNode(self): - self.assertTrue(verifyObject(iidavoll.ILeafNode, self.node)) - - - def test_getNode(self): - return self.s.getNode('pre-existing') - - - def test_getNonExistingNode(self): - d = self.s.getNode('non-existing') - self.assertFailure(d, error.NodeNotFound) - return d - - - def test_getNodeIDs(self): - def cb(nodeIdentifiers): - self.assertIn('pre-existing', nodeIdentifiers) - self.assertNotIn('non-existing', nodeIdentifiers) - - return self.s.getNodeIds().addCallback(cb) - - - def test_createExistingNode(self): - config = self.s.getDefaultConfiguration('leaf') - config['pubsub#node_type'] = 'leaf' - d = self.s.createNode('pre-existing', OWNER, config) - self.assertFailure(d, error.NodeExists) - return d - - - def test_createNode(self): - def cb(void): - d = self.s.getNode('new 1') - return d - - config = self.s.getDefaultConfiguration('leaf') - config['pubsub#node_type'] = 'leaf' - d = self.s.createNode('new 1', OWNER, config) - d.addCallback(cb) - return d - - - def test_createNodeChangingConfig(self): - """ - The configuration passed to createNode must be free to be changed. - """ - def cb(result): - node1, node2 = result - self.assertTrue(node1.getConfiguration()['pubsub#persist_items']) - - config = { - "pubsub#persist_items": True, - "pubsub#deliver_payloads": True, - "pubsub#send_last_published_item": 'on_sub', - "pubsub#node_type": 'leaf', - } - - def unsetPersistItems(_): - config["pubsub#persist_items"] = False - - d = defer.succeed(None) - d.addCallback(lambda _: self.s.createNode('new 1', OWNER, config)) - d.addCallback(unsetPersistItems) - d.addCallback(lambda _: self.s.createNode('new 2', OWNER, config)) - d.addCallback(lambda _: defer.gatherResults([ - self.s.getNode('new 1'), - self.s.getNode('new 2')])) - d.addCallback(cb) - return d - - - def test_deleteNonExistingNode(self): - d = self.s.deleteNode('non-existing') - self.assertFailure(d, error.NodeNotFound) - return d - - - def test_deleteNode(self): - def cb(void): - d = self.s.getNode('to-be-deleted') - self.assertFailure(d, error.NodeNotFound) - return d - - d = self.s.deleteNode('to-be-deleted') - d.addCallback(cb) - return d - - - def test_getAffiliations(self): - def cb(affiliations): - self.assertIn(('pre-existing', 'owner'), affiliations) - - d = self.s.getAffiliations(OWNER) - d.addCallback(cb) - return d - - - def test_getSubscriptions(self): - def cb(subscriptions): - found = False - for subscription in subscriptions: - if (subscription.nodeIdentifier == 'pre-existing' and - subscription.subscriber == SUBSCRIBER and - subscription.state == 'subscribed'): - found = True - self.assertTrue(found) - - d = self.s.getSubscriptions(SUBSCRIBER) - d.addCallback(cb) - return d - - - # Node tests - - def test_getType(self): - self.assertEqual(self.node.getType(), 'leaf') - - - def test_getConfiguration(self): - config = self.node.getConfiguration() - self.assertIn('pubsub#persist_items', config.iterkeys()) - self.assertIn('pubsub#deliver_payloads', config.iterkeys()) - self.assertEqual(config['pubsub#persist_items'], True) - self.assertEqual(config['pubsub#deliver_payloads'], True) - - - def test_setConfiguration(self): - def getConfig(node): - d = node.setConfiguration({'pubsub#persist_items': False}) - d.addCallback(lambda _: node) - return d - - def checkObjectConfig(node): - config = node.getConfiguration() - self.assertEqual(config['pubsub#persist_items'], False) - - def getNode(void): - return self.s.getNode('to-be-reconfigured') - - def checkStorageConfig(node): - config = node.getConfiguration() - self.assertEqual(config['pubsub#persist_items'], False) - - d = self.s.getNode('to-be-reconfigured') - d.addCallback(getConfig) - d.addCallback(checkObjectConfig) - d.addCallback(getNode) - d.addCallback(checkStorageConfig) - return d - - - def test_getMetaData(self): - metaData = self.node.getMetaData() - for key, value in self.node.getConfiguration().iteritems(): - self.assertIn(key, metaData.iterkeys()) - self.assertEqual(value, metaData[key]) - self.assertIn('pubsub#node_type', metaData.iterkeys()) - self.assertEqual(metaData['pubsub#node_type'], 'leaf') - - - def test_getAffiliation(self): - def cb(affiliation): - self.assertEqual(affiliation, 'owner') - - d = self.node.getAffiliation(OWNER) - d.addCallback(cb) - return d - - - def test_getNonExistingAffiliation(self): - def cb(affiliation): - self.assertEqual(affiliation, None) - - d = self.node.getAffiliation(SUBSCRIBER) - d.addCallback(cb) - return d - - - def test_addSubscription(self): - def cb1(void): - return self.node.getSubscription(SUBSCRIBER_NEW) - - def cb2(subscription): - self.assertEqual(subscription.state, 'pending') - - d = self.node.addSubscription(SUBSCRIBER_NEW, 'pending', {}) - d.addCallback(cb1) - d.addCallback(cb2) - return d - - - def test_addExistingSubscription(self): - d = self.node.addSubscription(SUBSCRIBER, 'pending', {}) - self.assertFailure(d, error.SubscriptionExists) - return d - - - def test_getSubscription(self): - def cb(subscriptions): - self.assertEquals(subscriptions[0].state, 'subscribed') - self.assertEquals(subscriptions[1].state, 'pending') - self.assertEquals(subscriptions[2], None) - - d = defer.gatherResults([self.node.getSubscription(SUBSCRIBER), - self.node.getSubscription(SUBSCRIBER_PENDING), - self.node.getSubscription(OWNER)]) - d.addCallback(cb) - return d - - - def test_removeSubscription(self): - return self.node.removeSubscription(SUBSCRIBER_TO_BE_DELETED) - - - def test_removeNonExistingSubscription(self): - d = self.node.removeSubscription(OWNER) - self.assertFailure(d, error.NotSubscribed) - return d - - - def test_getNodeSubscriptions(self): - def extractSubscribers(subscriptions): - return [subscription.subscriber for subscription in subscriptions] - - def cb(subscribers): - self.assertIn(SUBSCRIBER, subscribers) - self.assertNotIn(SUBSCRIBER_PENDING, subscribers) - self.assertNotIn(OWNER, subscribers) - - d = self.node.getSubscriptions('subscribed') - d.addCallback(extractSubscribers) - d.addCallback(cb) - return d - - - def test_isSubscriber(self): - def cb(subscribed): - self.assertEquals(subscribed[0][1], True) - self.assertEquals(subscribed[1][1], True) - self.assertEquals(subscribed[2][1], False) - self.assertEquals(subscribed[3][1], False) - - d = defer.DeferredList([self.node.isSubscribed(SUBSCRIBER), - self.node.isSubscribed(SUBSCRIBER.userhostJID()), - self.node.isSubscribed(SUBSCRIBER_PENDING), - self.node.isSubscribed(OWNER)]) - d.addCallback(cb) - return d - - - def test_storeItems(self): - def cb1(void): - return self.node.getItemsById(['new']) - - def cb2(result): - self.assertEqual(ITEM_NEW.toXml(), result[0].toXml()) - - d = self.node.storeItems([ITEM_NEW], PUBLISHER) - d.addCallback(cb1) - d.addCallback(cb2) - return d - - - def test_storeUpdatedItems(self): - def cb1(void): - return self.node.getItemsById(['current']) - - def cb2(result): - self.assertEqual(ITEM_UPDATED.toXml(), result[0].toXml()) - - d = self.node.storeItems([ITEM_UPDATED], PUBLISHER) - d.addCallback(cb1) - d.addCallback(cb2) - return d - - - def test_removeItems(self): - def cb1(result): - self.assertEqual(['to-be-deleted'], result) - return self.node.getItemsById(['to-be-deleted']) - - def cb2(result): - self.assertEqual(0, len(result)) - - d = self.node.removeItems(['to-be-deleted']) - d.addCallback(cb1) - d.addCallback(cb2) - return d - - - def test_removeNonExistingItems(self): - def cb(result): - self.assertEqual([], result) - - d = self.node.removeItems(['non-existing']) - d.addCallback(cb) - return d - - - def test_getItems(self): - def cb(result): - items = [item.toXml() for item in result] - self.assertIn(ITEM.toXml(), items) - - d = self.node.getItems() - d.addCallback(cb) - return d - - - def test_lastItem(self): - def cb(result): - self.assertEqual(1, len(result)) - self.assertEqual(ITEM.toXml(), result[0].toXml()) - - d = self.node.getItems(1) - d.addCallback(cb) - return d - - - def test_getItemsById(self): - def cb(result): - self.assertEqual(1, len(result)) - - d = self.node.getItemsById(['current']) - d.addCallback(cb) - return d - - - def test_getNonExistingItemsById(self): - def cb(result): - self.assertEqual(0, len(result)) - - d = self.node.getItemsById(['non-existing']) - d.addCallback(cb) - return d - - - def test_purge(self): - def cb1(node): - d = node.purge() - d.addCallback(lambda _: node) - return d - - def cb2(node): - return node.getItems() - - def cb3(result): - self.assertEqual([], result) - - d = self.s.getNode('to-be-purged') - d.addCallback(cb1) - d.addCallback(cb2) - d.addCallback(cb3) - return d - - - def test_getNodeAffilatiations(self): - def cb1(node): - return node.getAffiliations() - - def cb2(affiliations): - affiliations = dict(((a[0].full(), a[1]) for a in affiliations)) - self.assertEquals(affiliations[OWNER.userhost()], 'owner') - - d = self.s.getNode('pre-existing') - d.addCallback(cb1) - d.addCallback(cb2) - return d - - - -class MemoryStorageStorageTestCase(unittest.TestCase, StorageTests): - - def setUp(self): - from idavoll.memory_storage import Storage, PublishedItem, LeafNode - from idavoll.memory_storage import Subscription - - defaultConfig = Storage.defaultConfig['leaf'] - - self.s = Storage() - self.s._nodes['pre-existing'] = \ - LeafNode('pre-existing', OWNER, defaultConfig) - self.s._nodes['to-be-deleted'] = \ - LeafNode('to-be-deleted', OWNER, None) - self.s._nodes['to-be-reconfigured'] = \ - LeafNode('to-be-reconfigured', OWNER, defaultConfig) - self.s._nodes['to-be-purged'] = \ - LeafNode('to-be-purged', OWNER, None) - - subscriptions = self.s._nodes['pre-existing']._subscriptions - subscriptions[SUBSCRIBER.full()] = Subscription('pre-existing', - SUBSCRIBER, - 'subscribed') - subscriptions[SUBSCRIBER_TO_BE_DELETED.full()] = \ - Subscription('pre-existing', SUBSCRIBER_TO_BE_DELETED, - 'subscribed') - subscriptions[SUBSCRIBER_PENDING.full()] = \ - Subscription('pre-existing', SUBSCRIBER_PENDING, - 'pending') - - item = PublishedItem(ITEM_TO_BE_DELETED, PUBLISHER) - self.s._nodes['pre-existing']._items['to-be-deleted'] = item - self.s._nodes['pre-existing']._itemlist.append(item) - self.s._nodes['to-be-purged']._items['to-be-deleted'] = item - self.s._nodes['to-be-purged']._itemlist.append(item) - item = PublishedItem(ITEM, PUBLISHER) - self.s._nodes['pre-existing']._items['current'] = item - self.s._nodes['pre-existing']._itemlist.append(item) - - return StorageTests.setUp(self) - - - -class PgsqlStorageStorageTestCase(unittest.TestCase, StorageTests): - - dbpool = None - - def setUp(self): - from idavoll.pgsql_storage import Storage - from twisted.enterprise import adbapi - if self.dbpool is None: - self.__class__.dbpool = adbapi.ConnectionPool('psycopg2', - database='pubsub_test', - cp_reconnect=True, - client_encoding='utf-8', - ) - self.s = Storage(self.dbpool) - self.dbpool.start() - d = self.dbpool.runInteraction(self.init) - d.addCallback(lambda _: StorageTests.setUp(self)) - return d - - - def tearDown(self): - return self.dbpool.runInteraction(self.cleandb) - - - def init(self, cursor): - self.cleandb(cursor) - cursor.execute("""INSERT INTO nodes - (node, node_type, persist_items) - VALUES ('pre-existing', 'leaf', TRUE)""") - cursor.execute("""INSERT INTO nodes (node) VALUES ('to-be-deleted')""") - cursor.execute("""INSERT INTO nodes (node) VALUES ('to-be-reconfigured')""") - cursor.execute("""INSERT INTO nodes (node) VALUES ('to-be-purged')""") - cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", - (OWNER.userhost(),)) - cursor.execute("""INSERT INTO affiliations - (node_id, entity_id, affiliation) - SELECT node_id, entity_id, 'owner' - FROM nodes, entities - WHERE node='pre-existing' AND jid=%s""", - (OWNER.userhost(),)) - cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", - (SUBSCRIBER.userhost(),)) - cursor.execute("""INSERT INTO subscriptions - (node_id, entity_id, resource, state) - SELECT node_id, entity_id, %s, 'subscribed' - FROM nodes, entities - WHERE node='pre-existing' AND jid=%s""", - (SUBSCRIBER.resource, - SUBSCRIBER.userhost())) - cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", - (SUBSCRIBER_TO_BE_DELETED.userhost(),)) - cursor.execute("""INSERT INTO subscriptions - (node_id, entity_id, resource, state) - SELECT node_id, entity_id, %s, 'subscribed' - FROM nodes, entities - WHERE node='pre-existing' AND jid=%s""", - (SUBSCRIBER_TO_BE_DELETED.resource, - SUBSCRIBER_TO_BE_DELETED.userhost())) - cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", - (SUBSCRIBER_PENDING.userhost(),)) - cursor.execute("""INSERT INTO subscriptions - (node_id, entity_id, resource, state) - SELECT node_id, entity_id, %s, 'pending' - FROM nodes, entities - WHERE node='pre-existing' AND jid=%s""", - (SUBSCRIBER_PENDING.resource, - SUBSCRIBER_PENDING.userhost())) - cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", - (PUBLISHER.userhost(),)) - cursor.execute("""INSERT INTO items - (node_id, publisher, item, data, date) - SELECT node_id, %s, 'to-be-deleted', %s, - now() - interval '1 day' - FROM nodes - WHERE node='pre-existing'""", - (PUBLISHER.userhost(), - ITEM_TO_BE_DELETED.toXml())) - cursor.execute("""INSERT INTO items (node_id, publisher, item, data) - SELECT node_id, %s, 'to-be-deleted', %s - FROM nodes - WHERE node='to-be-purged'""", - (PUBLISHER.userhost(), - ITEM_TO_BE_DELETED.toXml())) - cursor.execute("""INSERT INTO items (node_id, publisher, item, data) - SELECT node_id, %s, 'current', %s - FROM nodes - WHERE node='pre-existing'""", - (PUBLISHER.userhost(), - ITEM.toXml())) - - - def cleandb(self, cursor): - cursor.execute("""DELETE FROM nodes WHERE node in - ('non-existing', 'pre-existing', 'to-be-deleted', - 'new 1', 'new 2', 'new 3', 'to-be-reconfigured', - 'to-be-purged')""") - cursor.execute("""DELETE FROM entities WHERE jid=%s""", - (OWNER.userhost(),)) - cursor.execute("""DELETE FROM entities WHERE jid=%s""", - (SUBSCRIBER.userhost(),)) - cursor.execute("""DELETE FROM entities WHERE jid=%s""", - (SUBSCRIBER_TO_BE_DELETED.userhost(),)) - cursor.execute("""DELETE FROM entities WHERE jid=%s""", - (SUBSCRIBER_PENDING.userhost(),)) - cursor.execute("""DELETE FROM entities WHERE jid=%s""", - (PUBLISHER.userhost(),)) - -try: - import psycopg2 -except ImportError: - PgsqlStorageStorageTestCase.skip = "Psycopg2 not available" diff -r d99047cd90f9 -r 923281d4c5bc sat_pubsub/__init__.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat_pubsub/__init__.py Thu May 17 12:48:14 2012 +0200 @@ -0,0 +1,8 @@ +# Copyright (c) 2003-2007 Ralph Meijer +# See LICENSE for details. + +""" +Idavoll, a generic XMPP publish-subscribe service. +""" + +__version__ = '0.9.1' diff -r d99047cd90f9 -r 923281d4c5bc sat_pubsub/backend.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat_pubsub/backend.py Thu May 17 12:48:14 2012 +0200 @@ -0,0 +1,730 @@ +# -*- test-case-name: idavoll.test.test_backend -*- +# +# Copyright (c) 2003-2010 Ralph Meijer +# See LICENSE for details. + +""" +Generic publish-subscribe backend. + +This module implements a generic publish-subscribe backend service with +business logic as per +U{XEP-0060} that interacts with +a given storage facility. It also provides an adapter from the XMPP +publish-subscribe protocol. +""" + +import uuid + +from zope.interface import implements + +from twisted.application import service +from twisted.python import components, log +from twisted.internet import defer, reactor +from twisted.words.protocols.jabber.error import StanzaError +from twisted.words.xish import utility + +from wokkel import disco +from wokkel.iwokkel import IPubSubResource +from wokkel.pubsub import PubSubResource, PubSubError + +from idavoll import error, iidavoll +from idavoll.iidavoll import IBackendService, ILeafNode + +def _getAffiliation(node, entity): + d = node.getAffiliation(entity) + d.addCallback(lambda affiliation: (node, affiliation)) + return d + + + +class BackendService(service.Service, utility.EventDispatcher): + """ + Generic publish-subscribe backend service. + + @cvar nodeOptions: Node configuration form as a mapping from the field + name to a dictionary that holds the field's type, label + and possible options to choose from. + @type nodeOptions: C{dict}. + @cvar defaultConfig: The default node configuration. + """ + + implements(iidavoll.IBackendService) + + nodeOptions = { + "pubsub#persist_items": + {"type": "boolean", + "label": "Persist items to storage"}, + "pubsub#deliver_payloads": + {"type": "boolean", + "label": "Deliver payloads with event notifications"}, + "pubsub#send_last_published_item": + {"type": "list-single", + "label": "When to send the last published item", + "options": { + "never": "Never", + "on_sub": "When a new subscription is processed"} + }, + } + + subscriptionOptions = { + "pubsub#subscription_type": + {"type": "list-single", + "options": { + "items": "Receive notification of new items only", + "nodes": "Receive notification of new nodes only"} + }, + "pubsub#subscription_depth": + {"type": "list-single", + "options": { + "1": "Receive notification from direct child nodes only", + "all": "Receive notification from all descendent nodes"} + }, + } + + def __init__(self, storage): + utility.EventDispatcher.__init__(self) + self.storage = storage + self._callbackList = [] + + + def supportsPublisherAffiliation(self): + return True + + + def supportsOutcastAffiliation(self): + return True + + + def supportsPersistentItems(self): + return True + + + def getNodeType(self, nodeIdentifier): + d = self.storage.getNode(nodeIdentifier) + d.addCallback(lambda node: node.getType()) + return d + + + def getNodes(self): + return self.storage.getNodeIds() + + + def getNodeMetaData(self, nodeIdentifier): + d = self.storage.getNode(nodeIdentifier) + d.addCallback(lambda node: node.getMetaData()) + d.addCallback(self._makeMetaData) + return d + + + def _makeMetaData(self, metaData): + options = [] + for key, value in metaData.iteritems(): + if key in self.nodeOptions: + option = {"var": key} + option.update(self.nodeOptions[key]) + option["value"] = value + options.append(option) + + return options + + + def _checkAuth(self, node, requestor): + def check(affiliation, node): + if affiliation not in ['owner', 'publisher']: + raise error.Forbidden() + return node + + d = node.getAffiliation(requestor) + d.addCallback(check, node) + return d + + + def publish(self, nodeIdentifier, items, requestor): + d = self.storage.getNode(nodeIdentifier) + d.addCallback(self._checkAuth, requestor) + d.addCallback(self._doPublish, items, requestor) + return d + + + def _doPublish(self, node, items, requestor): + if node.nodeType == 'collection': + raise error.NoPublishing() + + configuration = node.getConfiguration() + persistItems = configuration["pubsub#persist_items"] + deliverPayloads = configuration["pubsub#deliver_payloads"] + + if items and not persistItems and not deliverPayloads: + raise error.ItemForbidden() + elif not items and (persistItems or deliverPayloads): + raise error.ItemRequired() + + if persistItems or deliverPayloads: + for item in items: + item.uri = None + item.defaultUri = None + if not item.getAttribute("id"): + item["id"] = str(uuid.uuid4()) + + if persistItems: + d = node.storeItems(items, requestor) + else: + d = defer.succeed(None) + + d.addCallback(self._doNotify, node.nodeIdentifier, items, + deliverPayloads) + return d + + + def _doNotify(self, result, nodeIdentifier, items, deliverPayloads): + if items and not deliverPayloads: + for item in items: + item.children = [] + + self.dispatch({'items': items, 'nodeIdentifier': nodeIdentifier}, + '//event/pubsub/notify') + + + def getNotifications(self, nodeIdentifier, items): + + def toNotifications(subscriptions, nodeIdentifier, items): + subsBySubscriber = {} + for subscription in subscriptions: + if subscription.options.get('pubsub#subscription_type', + 'items') == 'items': + subs = subsBySubscriber.setdefault(subscription.subscriber, + set()) + subs.add(subscription) + + notifications = [(subscriber, subscriptions, items) + for subscriber, subscriptions + in subsBySubscriber.iteritems()] + + return notifications + + def rootNotFound(failure): + failure.trap(error.NodeNotFound) + return [] + + d1 = self.storage.getNode(nodeIdentifier) + d1.addCallback(lambda node: node.getSubscriptions('subscribed')) + d2 = self.storage.getNode('') + d2.addCallback(lambda node: node.getSubscriptions('subscribed')) + d2.addErrback(rootNotFound) + d = defer.gatherResults([d1, d2]) + d.addCallback(lambda result: result[0] + result[1]) + d.addCallback(toNotifications, nodeIdentifier, items) + return d + + + def registerNotifier(self, observerfn, *args, **kwargs): + self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs) + + + def subscribe(self, nodeIdentifier, subscriber, requestor): + subscriberEntity = subscriber.userhostJID() + if subscriberEntity != requestor.userhostJID(): + return defer.fail(error.Forbidden()) + + d = self.storage.getNode(nodeIdentifier) + d.addCallback(_getAffiliation, subscriberEntity) + d.addCallback(self._doSubscribe, subscriber) + return d + + + def _doSubscribe(self, result, subscriber): + node, affiliation = result + + if affiliation == 'outcast': + raise error.Forbidden() + + def trapExists(failure): + failure.trap(error.SubscriptionExists) + return False + + def cb(sendLast): + d = node.getSubscription(subscriber) + if sendLast: + d.addCallback(self._sendLastPublished, node) + return d + + d = node.addSubscription(subscriber, 'subscribed', {}) + d.addCallbacks(lambda _: True, trapExists) + d.addCallback(cb) + return d + + + def _sendLastPublished(self, subscription, node): + + def notifyItem(items): + if items: + reactor.callLater(0, self.dispatch, + {'items': items, + 'nodeIdentifier': node.nodeIdentifier, + 'subscription': subscription}, + '//event/pubsub/notify') + + config = node.getConfiguration() + sendLastPublished = config.get('pubsub#send_last_published_item', + 'never') + if sendLastPublished == 'on_sub' and node.nodeType == 'leaf': + entity = subscription.subscriber.userhostJID() + d = self.getItems(node.nodeIdentifier, entity, 1) + d.addCallback(notifyItem) + d.addErrback(log.err) + + return subscription + + + def unsubscribe(self, nodeIdentifier, subscriber, requestor): + if subscriber.userhostJID() != requestor.userhostJID(): + return defer.fail(error.Forbidden()) + + d = self.storage.getNode(nodeIdentifier) + d.addCallback(lambda node: node.removeSubscription(subscriber)) + return d + + + def getSubscriptions(self, entity): + return self.storage.getSubscriptions(entity) + + def supportsAutoCreate(self): + return True + + def supportsInstantNodes(self): + return True + + + def createNode(self, nodeIdentifier, requestor): + if not nodeIdentifier: + nodeIdentifier = 'generic/%s' % uuid.uuid4() + + nodeType = 'leaf' + config = self.storage.getDefaultConfiguration(nodeType) + config['pubsub#node_type'] = nodeType + + d = self.storage.createNode(nodeIdentifier, requestor, config) + d.addCallback(lambda _: nodeIdentifier) + return d + + + def getDefaultConfiguration(self, nodeType): + d = defer.succeed(self.storage.getDefaultConfiguration(nodeType)) + return d + + + def getNodeConfiguration(self, nodeIdentifier): + if not nodeIdentifier: + return defer.fail(error.NoRootNode()) + + d = self.storage.getNode(nodeIdentifier) + d.addCallback(lambda node: node.getConfiguration()) + + return d + + + def setNodeConfiguration(self, nodeIdentifier, options, requestor): + if not nodeIdentifier: + return defer.fail(error.NoRootNode()) + + d = self.storage.getNode(nodeIdentifier) + d.addCallback(_getAffiliation, requestor) + d.addCallback(self._doSetNodeConfiguration, options) + return d + + + def _doSetNodeConfiguration(self, result, options): + node, affiliation = result + + if affiliation != 'owner': + raise error.Forbidden() + + return node.setConfiguration(options) + + + def getAffiliations(self, entity): + return self.storage.getAffiliations(entity) + + + def getItems(self, nodeIdentifier, requestor, maxItems=None, + itemIdentifiers=None): + d = self.storage.getNode(nodeIdentifier) + d.addCallback(_getAffiliation, requestor) + d.addCallback(self._doGetItems, maxItems, itemIdentifiers) + return d + + + def _doGetItems(self, result, maxItems, itemIdentifiers): + node, affiliation = result + + if not ILeafNode.providedBy(node): + return [] + + if affiliation == 'outcast': + raise error.Forbidden() + + if itemIdentifiers: + return node.getItemsById(itemIdentifiers) + else: + return node.getItems(maxItems) + + + def retractItem(self, nodeIdentifier, itemIdentifiers, requestor): + d = self.storage.getNode(nodeIdentifier) + d.addCallback(_getAffiliation, requestor) + d.addCallback(self._doRetract, itemIdentifiers) + return d + + + def _doRetract(self, result, itemIdentifiers): + node, affiliation = result + persistItems = node.getConfiguration()["pubsub#persist_items"] + + if affiliation not in ['owner', 'publisher']: + raise error.Forbidden() + + if not persistItems: + raise error.NodeNotPersistent() + + d = node.removeItems(itemIdentifiers) + d.addCallback(self._doNotifyRetraction, node.nodeIdentifier) + return d + + + def _doNotifyRetraction(self, itemIdentifiers, nodeIdentifier): + self.dispatch({'itemIdentifiers': itemIdentifiers, + 'nodeIdentifier': nodeIdentifier }, + '//event/pubsub/retract') + + + def purgeNode(self, nodeIdentifier, requestor): + d = self.storage.getNode(nodeIdentifier) + d.addCallback(_getAffiliation, requestor) + d.addCallback(self._doPurge) + return d + + + def _doPurge(self, result): + node, affiliation = result + persistItems = node.getConfiguration()["pubsub#persist_items"] + + if affiliation != 'owner': + raise error.Forbidden() + + if not persistItems: + raise error.NodeNotPersistent() + + d = node.purge() + d.addCallback(self._doNotifyPurge, node.nodeIdentifier) + return d + + + def _doNotifyPurge(self, result, nodeIdentifier): + self.dispatch(nodeIdentifier, '//event/pubsub/purge') + + + def registerPreDelete(self, preDeleteFn): + self._callbackList.append(preDeleteFn) + + + def getSubscribers(self, nodeIdentifier): + def cb(subscriptions): + return [subscription.subscriber for subscription in subscriptions] + + d = self.storage.getNode(nodeIdentifier) + d.addCallback(lambda node: node.getSubscriptions('subscribed')) + d.addCallback(cb) + return d + + + def deleteNode(self, nodeIdentifier, requestor, redirectURI=None): + d = self.storage.getNode(nodeIdentifier) + d.addCallback(_getAffiliation, requestor) + d.addCallback(self._doPreDelete, redirectURI) + return d + + + def _doPreDelete(self, result, redirectURI): + node, affiliation = result + + if affiliation != 'owner': + raise error.Forbidden() + + data = {'nodeIdentifier': node.nodeIdentifier, + 'redirectURI': redirectURI} + + d = defer.DeferredList([cb(data) + for cb in self._callbackList], + consumeErrors=1) + d.addCallback(self._doDelete, node.nodeIdentifier) + + + def _doDelete(self, result, nodeIdentifier): + dl = [] + for succeeded, r in result: + if succeeded and r: + dl.extend(r) + + d = self.storage.deleteNode(nodeIdentifier) + d.addCallback(self._doNotifyDelete, dl) + + return d + + + def _doNotifyDelete(self, result, dl): + for d in dl: + d.callback(None) + + + +class PubSubResourceFromBackend(PubSubResource): + """ + Adapts a backend to an xmpp publish-subscribe service. + """ + + features = [ + "config-node", + "create-nodes", + "delete-any", + "delete-nodes", + "item-ids", + "meta-data", + "publish", + "purge-nodes", + "retract-items", + "retrieve-affiliations", + "retrieve-default", + "retrieve-items", + "retrieve-subscriptions", + "subscribe", + ] + + discoIdentity = disco.DiscoIdentity('pubsub', + 'service', + 'Idavoll publish-subscribe service') + + pubsubService = None + + _errorMap = { + error.NodeNotFound: ('item-not-found', None, None), + error.NodeExists: ('conflict', None, None), + error.Forbidden: ('forbidden', None, None), + error.ItemForbidden: ('bad-request', 'item-forbidden', None), + error.ItemRequired: ('bad-request', 'item-required', None), + error.NoInstantNodes: ('not-acceptable', + 'unsupported', + 'instant-nodes'), + error.NotSubscribed: ('unexpected-request', 'not-subscribed', None), + error.InvalidConfigurationOption: ('not-acceptable', None, None), + error.InvalidConfigurationValue: ('not-acceptable', None, None), + error.NodeNotPersistent: ('feature-not-implemented', + 'unsupported', + 'persistent-node'), + error.NoRootNode: ('bad-request', None, None), + error.NoCollections: ('feature-not-implemented', + 'unsupported', + 'collections'), + error.NoPublishing: ('feature-not-implemented', + 'unsupported', + 'publish'), + } + + def __init__(self, backend): + PubSubResource.__init__(self) + + self.backend = backend + self.hideNodes = False + + self.backend.registerNotifier(self._notify) + self.backend.registerPreDelete(self._preDelete) + + if self.backend.supportsAutoCreate(): + self.features.append("auto-create") + + if self.backend.supportsInstantNodes(): + self.features.append("instant-nodes") + + if self.backend.supportsOutcastAffiliation(): + self.features.append("outcast-affiliation") + + if self.backend.supportsPersistentItems(): + self.features.append("persistent-items") + + if self.backend.supportsPublisherAffiliation(): + self.features.append("publisher-affiliation") + + + def _notify(self, data): + items = data['items'] + nodeIdentifier = data['nodeIdentifier'] + if 'subscription' not in data: + d = self.backend.getNotifications(nodeIdentifier, items) + else: + subscription = data['subscription'] + d = defer.succeed([(subscription.subscriber, [subscription], + items)]) + d.addCallback(lambda notifications: self.pubsubService.notifyPublish( + self.serviceJID, + nodeIdentifier, + notifications)) + + + def _preDelete(self, data): + nodeIdentifier = data['nodeIdentifier'] + redirectURI = data.get('redirectURI', None) + d = self.backend.getSubscribers(nodeIdentifier) + d.addCallback(lambda subscribers: self.pubsubService.notifyDelete( + self.serviceJID, + nodeIdentifier, + subscribers, + redirectURI)) + return d + + + def _mapErrors(self, failure): + e = failure.trap(*self._errorMap.keys()) + + condition, pubsubCondition, feature = self._errorMap[e] + msg = failure.value.msg + + if pubsubCondition: + exc = PubSubError(condition, pubsubCondition, feature, msg) + else: + exc = StanzaError(condition, text=msg) + + raise exc + + + def getInfo(self, requestor, service, nodeIdentifier): + info = {} + + def saveType(result): + info['type'] = result + return nodeIdentifier + + def saveMetaData(result): + info['meta-data'] = result + return info + + def trapNotFound(failure): + failure.trap(error.NodeNotFound) + return info + + d = defer.succeed(nodeIdentifier) + d.addCallback(self.backend.getNodeType) + d.addCallback(saveType) + d.addCallback(self.backend.getNodeMetaData) + d.addCallback(saveMetaData) + d.addErrback(trapNotFound) + d.addErrback(self._mapErrors) + return d + + + def getNodes(self, requestor, service, nodeIdentifier): + if service.resource: + return defer.succeed([]) + d = self.backend.getNodes() + return d.addErrback(self._mapErrors) + + + def getConfigurationOptions(self): + return self.backend.nodeOptions + + def _publish_errb(self, failure, request): + if failure.type == error.NodeNotFound and self.backend.supportsAutoCreate(): + d = self.backend.createNode(request.nodeIdentifier, + request.sender) + d.addCallback(lambda ignore, + request: self.backend.publish(request.nodeIdentifier, + request.items, + request.sender), + request) + return d + + raise failure + + + def publish(self, request): + d = self.backend.publish(request.nodeIdentifier, + request.items, + request.sender) + d.addErrback(self._publish_errb, request) + return d.addErrback(self._mapErrors) + + + def subscribe(self, request): + d = self.backend.subscribe(request.nodeIdentifier, + request.subscriber, + request.sender) + return d.addErrback(self._mapErrors) + + + def unsubscribe(self, request): + d = self.backend.unsubscribe(request.nodeIdentifier, + request.subscriber, + request.sender) + return d.addErrback(self._mapErrors) + + + def subscriptions(self, request): + d = self.backend.getSubscriptions(request.sender) + return d.addErrback(self._mapErrors) + + + def affiliations(self, request): + d = self.backend.getAffiliations(request.sender) + return d.addErrback(self._mapErrors) + + + def create(self, request): + d = self.backend.createNode(request.nodeIdentifier, + request.sender) + return d.addErrback(self._mapErrors) + + + def default(self, request): + d = self.backend.getDefaultConfiguration(request.nodeType) + return d.addErrback(self._mapErrors) + + + def configureGet(self, request): + d = self.backend.getNodeConfiguration(request.nodeIdentifier) + return d.addErrback(self._mapErrors) + + + def configureSet(self, request): + d = self.backend.setNodeConfiguration(request.nodeIdentifier, + request.options, + request.sender) + return d.addErrback(self._mapErrors) + + + def items(self, request): + d = self.backend.getItems(request.nodeIdentifier, + request.sender, + request.maxItems, + request.itemIdentifiers) + return d.addErrback(self._mapErrors) + + + def retract(self, request): + d = self.backend.retractItem(request.nodeIdentifier, + request.itemIdentifiers, + request.sender) + return d.addErrback(self._mapErrors) + + + def purge(self, request): + d = self.backend.purgeNode(request.nodeIdentifier, + request.sender) + return d.addErrback(self._mapErrors) + + + def delete(self, request): + d = self.backend.deleteNode(request.nodeIdentifier, + request.sender) + return d.addErrback(self._mapErrors) + +components.registerAdapter(PubSubResourceFromBackend, + IBackendService, + IPubSubResource) diff -r d99047cd90f9 -r 923281d4c5bc sat_pubsub/error.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat_pubsub/error.py Thu May 17 12:48:14 2012 +0200 @@ -0,0 +1,95 @@ +# Copyright (c) 2003-2008 Ralph Meijer +# See LICENSE for details. + +class Error(Exception): + msg = '' + + def __init__(self, msg=None): + self.msg = msg or self.msg + + + def __str__(self): + return self.msg + + + +class NodeNotFound(Error): + pass + + + +class NodeExists(Error): + pass + + + +class NotSubscribed(Error): + """ + Entity is not subscribed to this node. + """ + + + +class SubscriptionExists(Error): + """ + There already exists a subscription to this node. + """ + + + +class Forbidden(Error): + pass + + + +class ItemForbidden(Error): + pass + + + +class ItemRequired(Error): + pass + + + +class NoInstantNodes(Error): + pass + + + +class InvalidConfigurationOption(Error): + msg = 'Invalid configuration option' + + + +class InvalidConfigurationValue(Error): + msg = 'Bad configuration value' + + + +class NodeNotPersistent(Error): + pass + + + +class NoRootNode(Error): + pass + + + +class NoCallbacks(Error): + """ + There are no callbacks for this node. + """ + + + +class NoCollections(Error): + pass + + + +class NoPublishing(Error): + """ + This node does not support publishing. + """ diff -r d99047cd90f9 -r 923281d4c5bc sat_pubsub/gateway.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat_pubsub/gateway.py Thu May 17 12:48:14 2012 +0200 @@ -0,0 +1,868 @@ +# -*- test-case-name: idavoll.test.test_gateway -*- +# +# Copyright (c) 2003-2009 Ralph Meijer +# See LICENSE for details. + +""" +Web resources and client for interacting with pubsub services. +""" + +import cgi +from time import gmtime, strftime +import urllib +import urlparse + +import simplejson + +from twisted.application import service +from twisted.internet import defer, reactor +from twisted.python import log +from twisted.web import client +from twisted.web2 import http, http_headers, resource, responsecode +from twisted.web2 import channel, server +from twisted.web2.stream import readStream +from twisted.words.protocols.jabber.jid import JID +from twisted.words.protocols.jabber.error import StanzaError +from twisted.words.xish import domish + +from wokkel.pubsub import Item +from wokkel.pubsub import PubSubClient + +from idavoll import error + +NS_ATOM = 'http://www.w3.org/2005/Atom' +MIME_ATOM_ENTRY = 'application/atom+xml;type=entry' +MIME_JSON = 'application/json' + +class XMPPURIParseError(ValueError): + """ + Raised when a given XMPP URI couldn't be properly parsed. + """ + + + +def getServiceAndNode(uri): + """ + Given an XMPP URI, extract the publish subscribe service JID and node ID. + """ + + try: + scheme, rest = uri.split(':', 1) + except ValueError: + raise XMPPURIParseError("No URI scheme component") + + if scheme != 'xmpp': + raise XMPPURIParseError("Unknown URI scheme") + + if rest.startswith("//"): + raise XMPPURIParseError("Unexpected URI authority component") + + try: + entity, query = rest.split('?', 1) + except ValueError: + raise XMPPURIParseError("No URI query component") + + if not entity: + raise XMPPURIParseError("Empty URI path component") + + try: + service = JID(entity) + except Exception, e: + raise XMPPURIParseError("Invalid JID: %s" % e) + + params = cgi.parse_qs(query) + + try: + nodeIdentifier = params['node'][0] + except (KeyError, ValueError): + nodeIdentifier = '' + + return service, nodeIdentifier + + + +def getXMPPURI(service, nodeIdentifier): + """ + Construct an XMPP URI from a service JID and node identifier. + """ + return "xmpp:%s?;node=%s" % (service.full(), nodeIdentifier or '') + + + +class WebStreamParser(object): + def __init__(self): + self.elementStream = domish.elementStream() + self.elementStream.DocumentStartEvent = self.docStart + self.elementStream.ElementEvent = self.elem + self.elementStream.DocumentEndEvent = self.docEnd + self.done = False + + + def docStart(self, elem): + self.document = elem + + + def elem(self, elem): + self.document.addChild(elem) + + + def docEnd(self): + self.done = True + + + def parse(self, stream): + def endOfStream(result): + if not self.done: + raise Exception("No more stuff?") + else: + return self.document + + d = readStream(stream, self.elementStream.parse) + d.addCallback(endOfStream) + return d + + + +class CreateResource(resource.Resource): + """ + A resource to create a publish-subscribe node. + """ + def __init__(self, backend, serviceJID, owner): + self.backend = backend + self.serviceJID = serviceJID + self.owner = owner + + + http_GET = None + + + def http_POST(self, request): + """ + Respond to a POST request to create a new node. + """ + + def toResponse(nodeIdentifier): + uri = getXMPPURI(self.serviceJID, nodeIdentifier) + stream = simplejson.dumps({'uri': uri}) + contentType = http_headers.MimeType.fromString(MIME_JSON) + return http.Response(responsecode.OK, stream=stream, + headers={'Content-Type': contentType}) + d = self.backend.createNode(None, self.owner) + d.addCallback(toResponse) + return d + + + +class DeleteResource(resource.Resource): + """ + A resource to create a publish-subscribe node. + """ + def __init__(self, backend, serviceJID, owner): + self.backend = backend + self.serviceJID = serviceJID + self.owner = owner + + + http_GET = None + + + def http_POST(self, request): + """ + Respond to a POST request to create a new node. + """ + + def gotStream(_): + if request.args.get('uri'): + jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0]) + return defer.succeed(nodeIdentifier) + else: + raise http.HTTPError(http.Response(responsecode.BAD_REQUEST, + "No URI given")) + + def doDelete(nodeIdentifier, data): + if data: + params = simplejson.loads(''.join(data)) + redirectURI = params.get('redirect_uri') + else: + redirectURI = None + + return self.backend.deleteNode(nodeIdentifier, self.owner, + redirectURI) + + def respond(result): + return http.Response(responsecode.NO_CONTENT) + + + def trapNotFound(failure): + failure.trap(error.NodeNotFound) + return http.StatusResponse(responsecode.NOT_FOUND, + "Node not found") + + def trapXMPPURIParseError(failure): + failure.trap(XMPPURIParseError) + return http.StatusResponse(responsecode.BAD_REQUEST, + "Malformed XMPP URI: %s" % failure.value) + + data = [] + d = readStream(request.stream, data.append) + d.addCallback(gotStream) + d.addCallback(doDelete, data) + d.addCallback(respond) + d.addErrback(trapNotFound) + d.addErrback(trapXMPPURIParseError) + return d + + + +class PublishResource(resource.Resource): + """ + A resource to publish to a publish-subscribe node. + """ + + def __init__(self, backend, serviceJID, owner): + self.backend = backend + self.serviceJID = serviceJID + self.owner = owner + + + http_GET = None + + + def checkMediaType(self, request): + ctype = request.headers.getHeader('content-type') + + if not ctype: + raise http.HTTPError( + http.StatusResponse( + responsecode.BAD_REQUEST, + "No specified Media Type")) + + if (ctype.mediaType != 'application' or + ctype.mediaSubtype != 'atom+xml' or + ctype.params.get('type') != 'entry' or + ctype.params.get('charset', 'utf-8') != 'utf-8'): + raise http.HTTPError( + http.StatusResponse( + responsecode.UNSUPPORTED_MEDIA_TYPE, + "Unsupported Media Type: %s" % + http_headers.generateContentType(ctype))) + + + def parseXMLPayload(self, stream): + p = WebStreamParser() + return p.parse(stream) + + + def http_POST(self, request): + """ + Respond to a POST request to create a new item. + """ + + def toResponse(nodeIdentifier): + uri = getXMPPURI(self.serviceJID, nodeIdentifier) + stream = simplejson.dumps({'uri': uri}) + contentType = http_headers.MimeType.fromString(MIME_JSON) + return http.Response(responsecode.OK, stream=stream, + headers={'Content-Type': contentType}) + + def gotNode(nodeIdentifier, payload): + item = Item(id='current', payload=payload) + d = self.backend.publish(nodeIdentifier, [item], self.owner) + d.addCallback(lambda _: nodeIdentifier) + return d + + def getNode(): + if request.args.get('uri'): + jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0]) + return defer.succeed(nodeIdentifier) + else: + return self.backend.createNode(None, self.owner) + + def doPublish(payload): + d = getNode() + d.addCallback(gotNode, payload) + return d + + def trapNotFound(failure): + failure.trap(error.NodeNotFound) + return http.StatusResponse(responsecode.NOT_FOUND, + "Node not found") + + def trapXMPPURIParseError(failure): + failure.trap(XMPPURIParseError) + return http.StatusResponse(responsecode.BAD_REQUEST, + "Malformed XMPP URI: %s" % failure.value) + + self.checkMediaType(request) + d = self.parseXMLPayload(request.stream) + d.addCallback(doPublish) + d.addCallback(toResponse) + d.addErrback(trapNotFound) + d.addErrback(trapXMPPURIParseError) + return d + + + +class ListResource(resource.Resource): + def __init__(self, service): + self.service = service + + + def render(self, request): + def responseFromNodes(nodeIdentifiers): + stream = simplejson.dumps(nodeIdentifiers) + contentType = http_headers.MimeType.fromString(MIME_JSON) + return http.Response(responsecode.OK, stream=stream, + headers={'Content-Type': contentType}) + + d = self.service.getNodes() + d.addCallback(responseFromNodes) + return d + + + +# Service for subscribing to remote XMPP Pubsub nodes and web resources + +def extractAtomEntries(items): + """ + Extract atom entries from a list of publish-subscribe items. + + @param items: List of L{domish.Element}s that represent publish-subscribe + items. + @type items: C{list} + """ + + atomEntries = [] + + for item in items: + # ignore non-items (i.e. retractions) + if item.name != 'item': + continue + + atomEntry = None + for element in item.elements(): + # extract the first element that is an atom entry + if element.uri == NS_ATOM and element.name == 'entry': + atomEntry = element + break + + if atomEntry: + atomEntries.append(atomEntry) + + return atomEntries + + + +def constructFeed(service, nodeIdentifier, entries, title): + nodeURI = getXMPPURI(service, nodeIdentifier) + now = strftime("%Y-%m-%dT%H:%M:%SZ", gmtime()) + + # Collect the received entries in a feed + feed = domish.Element((NS_ATOM, 'feed')) + feed.addElement('title', content=title) + feed.addElement('id', content=nodeURI) + feed.addElement('updated', content=now) + + for entry in entries: + feed.addChild(entry) + + return feed + + + +class RemoteSubscriptionService(service.Service, PubSubClient): + """ + Service for subscribing to remote XMPP Publish-Subscribe nodes. + + Subscriptions are created with a callback HTTP URI that is POSTed + to with the received items in notifications. + """ + + def __init__(self, jid, storage): + self.jid = jid + self.storage = storage + + + def trapNotFound(self, failure): + failure.trap(StanzaError) + + if failure.value.condition == 'item-not-found': + raise error.NodeNotFound() + else: + return failure + + + def subscribeCallback(self, jid, nodeIdentifier, callback): + """ + Subscribe a callback URI. + + This registers a callback URI to be called when a notification is + received for the given node. + + If this is the first callback registered for this node, the gateway + will subscribe to the node. Otherwise, the most recently published item + for this node is retrieved and, if present, the newly registered + callback will be called with that item. + """ + + def callbackForLastItem(items): + atomEntries = extractAtomEntries(items) + + if not atomEntries: + return + + self._postTo([callback], jid, nodeIdentifier, atomEntries[0], + 'application/atom+xml;type=entry') + + def subscribeOrItems(hasCallbacks): + if hasCallbacks: + if not nodeIdentifier: + return None + d = self.items(jid, nodeIdentifier, 1) + d.addCallback(callbackForLastItem) + else: + d = self.subscribe(jid, nodeIdentifier, self.jid) + + d.addErrback(self.trapNotFound) + return d + + d = self.storage.hasCallbacks(jid, nodeIdentifier) + d.addCallback(subscribeOrItems) + d.addCallback(lambda _: self.storage.addCallback(jid, nodeIdentifier, + callback)) + return d + + + def unsubscribeCallback(self, jid, nodeIdentifier, callback): + """ + Unsubscribe a callback. + + If this was the last registered callback for this node, the + gateway will unsubscribe from node. + """ + + def cb(last): + if last: + return self.unsubscribe(jid, nodeIdentifier, self.jid) + + d = self.storage.removeCallback(jid, nodeIdentifier, callback) + d.addCallback(cb) + return d + + + def itemsReceived(self, event): + """ + Fire up HTTP client to do callback + """ + + atomEntries = extractAtomEntries(event.items) + service = event.sender + nodeIdentifier = event.nodeIdentifier + headers = event.headers + + # Don't notify if there are no atom entries + if not atomEntries: + return + + if len(atomEntries) == 1: + contentType = 'application/atom+xml;type=entry' + payload = atomEntries[0] + else: + contentType = 'application/atom+xml;type=feed' + payload = constructFeed(service, nodeIdentifier, atomEntries, + title='Received item collection') + + self.callCallbacks(service, nodeIdentifier, payload, contentType) + + if 'Collection' in headers: + for collection in headers['Collection']: + nodeIdentifier = collection or '' + self.callCallbacks(service, nodeIdentifier, payload, + contentType) + + + def deleteReceived(self, event): + """ + Fire up HTTP client to do callback + """ + + service = event.sender + nodeIdentifier = event.nodeIdentifier + redirectURI = event.redirectURI + self.callCallbacks(service, nodeIdentifier, eventType='DELETED', + redirectURI=redirectURI) + + + def _postTo(self, callbacks, service, nodeIdentifier, + payload=None, contentType=None, eventType=None, + redirectURI=None): + + if not callbacks: + return + + postdata = None + nodeURI = getXMPPURI(service, nodeIdentifier) + headers = {'Referer': nodeURI.encode('utf-8'), + 'PubSub-Service': service.full().encode('utf-8')} + + if payload: + postdata = payload.toXml().encode('utf-8') + if contentType: + headers['Content-Type'] = "%s;charset=utf-8" % contentType + + if eventType: + headers['Event'] = eventType + + if redirectURI: + headers['Link'] = '<%s>; rel=alternate' % ( + redirectURI.encode('utf-8'), + ) + + def postNotification(callbackURI): + f = getPageWithFactory(str(callbackURI), + method='POST', + postdata=postdata, + headers=headers) + d = f.deferred + d.addErrback(log.err) + + for callbackURI in callbacks: + reactor.callLater(0, postNotification, callbackURI) + + + def callCallbacks(self, service, nodeIdentifier, + payload=None, contentType=None, eventType=None, + redirectURI=None): + + def eb(failure): + failure.trap(error.NoCallbacks) + + # No callbacks were registered for this node. Unsubscribe? + + d = self.storage.getCallbacks(service, nodeIdentifier) + d.addCallback(self._postTo, service, nodeIdentifier, payload, + contentType, eventType, redirectURI) + d.addErrback(eb) + d.addErrback(log.err) + + + +class RemoteSubscribeBaseResource(resource.Resource): + """ + Base resource for remote pubsub node subscription and unsubscription. + + This resource accepts POST request with a JSON document that holds + a dictionary with the keys C{uri} and C{callback} that respectively map + to the XMPP URI of the publish-subscribe node and the callback URI. + + This class should be inherited with L{serviceMethod} overridden. + + @cvar serviceMethod: The name of the method to be called with + the JID of the pubsub service, the node identifier + and the callback URI as received in the HTTP POST + request to this resource. + """ + serviceMethod = None + errorMap = { + error.NodeNotFound: + (responsecode.FORBIDDEN, "Node not found"), + error.NotSubscribed: + (responsecode.FORBIDDEN, "No such subscription found"), + error.SubscriptionExists: + (responsecode.FORBIDDEN, "Subscription already exists"), + } + + def __init__(self, service): + self.service = service + self.params = None + + + http_GET = None + + + def http_POST(self, request): + def trapNotFound(failure): + err = failure.trap(*self.errorMap.keys()) + code, msg = self.errorMap[err] + return http.StatusResponse(code, msg) + + def respond(result): + return http.Response(responsecode.NO_CONTENT) + + def gotRequest(result): + uri = self.params['uri'] + callback = self.params['callback'] + + jid, nodeIdentifier = getServiceAndNode(uri) + method = getattr(self.service, self.serviceMethod) + d = method(jid, nodeIdentifier, callback) + return d + + def storeParams(data): + self.params = simplejson.loads(data) + + def trapXMPPURIParseError(failure): + failure.trap(XMPPURIParseError) + return http.StatusResponse(responsecode.BAD_REQUEST, + "Malformed XMPP URI: %s" % failure.value) + + d = readStream(request.stream, storeParams) + d.addCallback(gotRequest) + d.addCallback(respond) + d.addErrback(trapNotFound) + d.addErrback(trapXMPPURIParseError) + return d + + + +class RemoteSubscribeResource(RemoteSubscribeBaseResource): + """ + Resource to subscribe to a remote publish-subscribe node. + + The passed C{uri} is the XMPP URI of the node to subscribe to and the + C{callback} is the callback URI. Upon receiving notifications from the + node, a POST request will be perfomed on the callback URI. + """ + serviceMethod = 'subscribeCallback' + + + +class RemoteUnsubscribeResource(RemoteSubscribeBaseResource): + """ + Resource to unsubscribe from a remote publish-subscribe node. + + The passed C{uri} is the XMPP URI of the node to unsubscribe from and the + C{callback} is the callback URI that was registered for it. + """ + serviceMethod = 'unsubscribeCallback' + + + +class RemoteItemsResource(resource.Resource): + """ + Resource for retrieving items from a remote pubsub node. + """ + + def __init__(self, service): + self.service = service + + + def render(self, request): + try: + maxItems = int(request.args.get('max_items', [0])[0]) or None + except ValueError: + return http.StatusResponse(responsecode.BAD_REQUEST, + "The argument max_items has an invalid value.") + + try: + uri = request.args['uri'][0] + except KeyError: + return http.StatusResponse(responsecode.BAD_REQUEST, + "No URI for the remote node provided.") + + try: + jid, nodeIdentifier = getServiceAndNode(uri) + except XMPPURIParseError: + return http.StatusResponse(responsecode.BAD_REQUEST, + "Malformed XMPP URI: %s" % uri) + + def respond(items): + """Create a feed out the retrieved items.""" + contentType = http_headers.MimeType('application', + 'atom+xml', + {'type': 'feed'}) + atomEntries = extractAtomEntries(items) + feed = constructFeed(jid, nodeIdentifier, atomEntries, + "Retrieved item collection") + payload = feed.toXml().encode('utf-8') + return http.Response(responsecode.OK, stream=payload, + headers={'Content-Type': contentType}) + + def trapNotFound(failure): + failure.trap(StanzaError) + if not failure.value.condition == 'item-not-found': + raise failure + return http.StatusResponse(responsecode.NOT_FOUND, + "Node not found") + + d = self.service.items(jid, nodeIdentifier, maxItems) + d.addCallback(respond) + d.addErrback(trapNotFound) + return d + + + +# Client side code to interact with a service as provided above + +def getPageWithFactory(url, contextFactory=None, *args, **kwargs): + """Download a web page. + + Download a page. Return the factory that holds a deferred, which will + callback with a page (as a string) or errback with a description of the + error. + + See HTTPClientFactory to see what extra args can be passed. + """ + + scheme, host, port, path = client._parse(url) + factory = client.HTTPClientFactory(url, *args, **kwargs) + factory.protocol.handleStatus_204 = lambda self: self.handleStatus_200() + + if scheme == 'https': + from twisted.internet import ssl + if contextFactory is None: + contextFactory = ssl.ClientContextFactory() + reactor.connectSSL(host, port, factory, contextFactory) + else: + reactor.connectTCP(host, port, factory) + return factory + + + +class CallbackResource(resource.Resource): + """ + Web resource for retrieving gateway notifications. + """ + + def __init__(self, callback): + self.callback = callback + + + http_GET = None + + + def http_POST(self, request): + p = WebStreamParser() + if not request.headers.hasHeader('Event'): + d = p.parse(request.stream) + else: + d = defer.succeed(None) + d.addCallback(self.callback, request.headers) + d.addCallback(lambda _: http.Response(responsecode.NO_CONTENT)) + return d + + + +class GatewayClient(service.Service): + """ + Service that provides client access to the HTTP Gateway into Idavoll. + """ + + agent = "Idavoll HTTP Gateway Client" + + def __init__(self, baseURI, callbackHost=None, callbackPort=None): + self.baseURI = baseURI + self.callbackHost = callbackHost or 'localhost' + self.callbackPort = callbackPort or 8087 + root = resource.Resource() + root.child_callback = CallbackResource(lambda *args, **kwargs: self.callback(*args, **kwargs)) + self.site = server.Site(root) + + + def startService(self): + self.port = reactor.listenTCP(self.callbackPort, + channel.HTTPFactory(self.site)) + + + def stopService(self): + return self.port.stopListening() + + + def _makeURI(self, verb, query=None): + uriComponents = urlparse.urlparse(self.baseURI) + uri = urlparse.urlunparse((uriComponents[0], + uriComponents[1], + uriComponents[2] + verb, + '', + query and urllib.urlencode(query) or '', + '')) + return uri + + + def callback(self, data, headers): + pass + + + def ping(self): + f = getPageWithFactory(self._makeURI(''), + method='HEAD', + agent=self.agent) + return f.deferred + + + def create(self): + f = getPageWithFactory(self._makeURI('create'), + method='POST', + agent=self.agent) + return f.deferred.addCallback(simplejson.loads) + + + def delete(self, xmppURI, redirectURI=None): + query = {'uri': xmppURI} + + if redirectURI: + params = {'redirect_uri': redirectURI} + postdata = simplejson.dumps(params) + headers = {'Content-Type': MIME_JSON} + else: + postdata = None + headers = None + + f = getPageWithFactory(self._makeURI('delete', query), + method='POST', + postdata=postdata, + headers=headers, + agent=self.agent) + return f.deferred + + + def publish(self, entry, xmppURI=None): + query = xmppURI and {'uri': xmppURI} + + f = getPageWithFactory(self._makeURI('publish', query), + method='POST', + postdata=entry.toXml().encode('utf-8'), + headers={'Content-Type': MIME_ATOM_ENTRY}, + agent=self.agent) + return f.deferred.addCallback(simplejson.loads) + + + def listNodes(self): + f = getPageWithFactory(self._makeURI('list'), + method='GET', + agent=self.agent) + return f.deferred.addCallback(simplejson.loads) + + + def subscribe(self, xmppURI): + params = {'uri': xmppURI, + 'callback': 'http://%s:%s/callback' % (self.callbackHost, + self.callbackPort)} + f = getPageWithFactory(self._makeURI('subscribe'), + method='POST', + postdata=simplejson.dumps(params), + headers={'Content-Type': MIME_JSON}, + agent=self.agent) + return f.deferred + + + def unsubscribe(self, xmppURI): + params = {'uri': xmppURI, + 'callback': 'http://%s:%s/callback' % (self.callbackHost, + self.callbackPort)} + f = getPageWithFactory(self._makeURI('unsubscribe'), + method='POST', + postdata=simplejson.dumps(params), + headers={'Content-Type': MIME_JSON}, + agent=self.agent) + return f.deferred + + + def items(self, xmppURI, maxItems=None): + query = {'uri': xmppURI} + if maxItems: + query['max_items'] = int(maxItems) + f = getPageWithFactory(self._makeURI('items', query), + method='GET', + agent=self.agent) + return f.deferred diff -r d99047cd90f9 -r 923281d4c5bc sat_pubsub/iidavoll.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat_pubsub/iidavoll.py Thu May 17 12:48:14 2012 +0200 @@ -0,0 +1,585 @@ +# Copyright (c) 2003-2008 Ralph Meijer +# See LICENSE for details. + +""" +Interfaces for idavoll. +""" + +from zope.interface import Attribute, Interface + +class IBackendService(Interface): + """ Interface to a backend service of a pubsub service. """ + + + def __init__(storage): + """ + @param storage: Object providing L{IStorage}. + """ + + + def supportsPublisherAffiliation(): + """ Reports if the backend supports the publisher affiliation. + + @rtype: C{bool} + """ + + + def supportsOutcastAffiliation(): + """ Reports if the backend supports the publisher affiliation. + + @rtype: C{bool} + """ + + + def supportsPersistentItems(): + """ Reports if the backend supports persistent items. + + @rtype: C{bool} + """ + + + def getNodeType(nodeIdentifier): + """ Return type of a node. + + @return: a deferred that returns either 'leaf' or 'collection' + """ + + + def getNodes(): + """ Returns list of all nodes. + + @return: a deferred that returns a C{list} of node ids. + """ + + + def getNodeMetaData(nodeIdentifier): + """ Return meta data for a node. + + @return: a deferred that returns a C{list} of C{dict}s with the + metadata. + """ + + + def createNode(nodeIdentifier, requestor): + """ Create a node. + + @return: a deferred that fires when the node has been created. + """ + + + def registerPreDelete(preDeleteFn): + """ Register a callback that is called just before a node deletion. + + The function C{preDeletedFn} is added to a list of functions to be + called just before deletion of a node. The callback C{preDeleteFn} is + called with the C{nodeIdentifier} that is about to be deleted and + should return a deferred that returns a list of deferreds that are to + be fired after deletion. The backend collects the lists from all these + callbacks before actually deleting the node in question. After + deletion all collected deferreds are fired to do post-processing. + + The idea is that you want to be able to collect data from the node + before deleting it, for example to get a list of subscribers that have + to be notified after the node has been deleted. To do this, + C{preDeleteFn} fetches the subscriber list and passes this list to a + callback attached to a deferred that it sets up. This deferred is + returned in the list of deferreds. + """ + + + def deleteNode(nodeIdentifier, requestor): + """ Delete a node. + + @return: a deferred that fires when the node has been deleted. + """ + + + def purgeNode(nodeIdentifier, requestor): + """ Removes all items in node from persistent storage """ + + + def subscribe(nodeIdentifier, subscriber, requestor): + """ Request the subscription of an entity to a pubsub node. + + Depending on the node's configuration and possible business rules, the + C{subscriber} is added to the list of subscriptions of the node with id + C{nodeIdentifier}. The C{subscriber} might be different from the + C{requestor}, and if the C{requestor} is not allowed to subscribe this + entity an exception should be raised. + + @return: a deferred that returns the subscription state + """ + + + def unsubscribe(nodeIdentifier, subscriber, requestor): + """ Cancel the subscription of an entity to a pubsub node. + + The subscription of C{subscriber} is removed from the list of + subscriptions of the node with id C{nodeIdentifier}. If the + C{requestor} is not allowed to unsubscribe C{subscriber}, an an + exception should be raised. + + @return: a deferred that fires when unsubscription is complete. + """ + + + def getSubscribers(nodeIdentifier): + """ Get node subscriber list. + + @return: a deferred that fires with the list of subscribers. + """ + + + def getSubscriptions(entity): + """ Report the list of current subscriptions with this pubsub service. + + Report the list of the current subscriptions with all nodes within this + pubsub service, for the C{entity}. + + @return: a deferred that returns the list of all current subscriptions + as tuples C{(nodeIdentifier, subscriber, subscription)}. + """ + + + def getAffiliations(entity): + """ Report the list of current affiliations with this pubsub service. + + Report the list of the current affiliations with all nodes within this + pubsub service, for the C{entity}. + + @return: a deferred that returns the list of all current affiliations + as tuples C{(nodeIdentifier, affiliation)}. + """ + + + def publish(nodeIdentifier, items, requestor): + """ Publish items to a pubsub node. + + @return: a deferred that fires when the items have been published. + @rtype: L{Deferred} + """ + + + def registerNotifier(observerfn, *args, **kwargs): + """ Register callback which is called for notification. """ + + + def getNotifications(nodeIdentifier, items): + """ + Get notification list. + + This method is called to discover which entities should receive + notifications for the given items that have just been published to the + given node. + + The notification list contains tuples (subscriber, subscriptions, + items) to result in one notification per tuple: the given subscriptions + yielded the given items to be notified to this subscriber. This + structure is needed allow for letting the subscriber know which + subscriptions yielded which notifications, while catering for + collection nodes and content-based subscriptions. + + To minimize the amount of notifications per entity, implementers + should take care that if all items in C{items} were yielded + by the same set of subscriptions, exactly one tuple is for this + subscriber is returned, so that the subscriber would get exactly one + notification. Alternatively, one tuple per subscription combination. + + @param nodeIdentifier: The identifier of the node the items were + published to. + @type nodeIdentifier: C{unicode}. + @param items: The list of published items as + L{Element}s. + @type items: C{list} + @return: The notification list as tuples of + (L{JID}, + C{list} of L{Subscription}, + C{list} of L{Element}. + @rtype: C{list} + """ + + + def getItems(nodeIdentifier, requestor, maxItems=None, itemIdentifiers=[]): + """ Retrieve items from persistent storage + + If C{maxItems} is given, return the C{maxItems} last published + items, else if C{itemIdentifiers} is not empty, return the items + requested. If neither is given, return all items. + + @return: a deferred that returns the requested items + """ + + + def retractItem(nodeIdentifier, itemIdentifier, requestor): + """ Removes item in node from persistent storage """ + + + +class IStorage(Interface): + """ + Storage interface. + """ + + + def getNode(nodeIdentifier): + """ + Get Node. + + @param nodeIdentifier: NodeID of the desired node. + @type nodeIdentifier: C{str} + @return: deferred that returns a L{INode} providing object. + """ + + + def getNodeIds(): + """ + Return all NodeIDs. + + @return: deferred that returns a list of NodeIDs (C{unicode}). + """ + + + def createNode(nodeIdentifier, owner, config): + """ + Create new node. + + The implementation should make sure, the passed owner JID is stripped + of the resource (e.g. using C{owner.userhostJID()}). The passed config + is expected to have values for the fields returned by + L{getDefaultConfiguration}, as well as a value for + C{'pubsub#node_type'}. + + @param nodeIdentifier: NodeID of the new node. + @type nodeIdentifier: C{unicode} + @param owner: JID of the new nodes's owner. + @type owner: L{JID} + @param config: Node configuration. + @type config: C{dict} + @return: deferred that fires on creation. + """ + + + def deleteNode(nodeIdentifier): + """ + Delete a node. + + @param nodeIdentifier: NodeID of the new node. + @type nodeIdentifier: C{unicode} + @return: deferred that fires on deletion. + """ + + + def getAffiliations(entity): + """ + Get all affiliations for entity. + + The implementation should make sure, the passed owner JID is stripped + of the resource (e.g. using C{owner.userhostJID()}). + + @param entity: JID of the entity. + @type entity: L{JID} + @return: deferred that returns a C{list} of tuples of the form + C{(nodeIdentifier, affiliation)}, where C{nodeIdentifier} is + of the type L{unicode} and C{affiliation} is one of + C{'owner'}, C{'publisher'} and C{'outcast'}. + """ + + + def getSubscriptions(entity): + """ + Get all subscriptions for an entity. + + The implementation should make sure, the passed owner JID is stripped + of the resource (e.g. using C{owner.userhostJID()}). + + @param entity: JID of the entity. + @type entity: L{JID} + @return: deferred that returns a C{list} of tuples of the form + C{(nodeIdentifier, subscriber, state)}, where + C{nodeIdentifier} is of the type C{unicode}, C{subscriber} of + the type J{JID}, and + C{state} is C{'subscribed'}, C{'pending'} or + C{'unconfigured'}. + """ + + + def getDefaultConfiguration(nodeType): + """ + Get the default configuration for the given node type. + + @param nodeType: Either C{'leaf'} or C{'collection'}. + @type nodeType: C{str} + @return: The default configuration. + @rtype: C{dict}. + @raises: L{idavoll.error.NoCollections} if collections are not + supported. + """ + + + +class INode(Interface): + """ + Interface to the class of objects that represent nodes. + """ + + nodeType = Attribute("""The type of this node. One of {'leaf'}, + {'collection'}.""") + nodeIdentifier = Attribute("""The node identifer of this node""") + + + def getType(): + """ + Get node's type. + + @return: C{'leaf'} or C{'collection'}. + """ + + + def getConfiguration(): + """ + Get node's configuration. + + The configuration must at least have two options: + C{pubsub#persist_items}, and C{pubsub#deliver_payloads}. + + @return: C{dict} of configuration options. + """ + + + def getMetaData(): + """ + Get node's meta data. + + The meta data must be a superset of the configuration options, and + also at least should have a C{pubsub#node_type} entry. + + @return: C{dict} of meta data. + """ + + + def setConfiguration(options): + """ + Set node's configuration. + + The elements of {options} will set the new values for those + configuration items. This means that only changing items have to + be given. + + @param options: a dictionary of configuration options. + @returns: a deferred that fires upon success. + """ + + + def getAffiliation(entity): + """ + Get affiliation of entity with this node. + + @param entity: JID of entity. + @type entity: L{JID} + @return: deferred that returns C{'owner'}, C{'publisher'}, C{'outcast'} + or C{None}. + """ + + + def getSubscription(subscriber): + """ + Get subscription to this node of subscriber. + + @param subscriber: JID of the new subscriptions' entity. + @type subscriber: L{JID} + @return: deferred that returns the subscription state (C{'subscribed'}, + C{'pending'} or C{None}). + """ + + + def getSubscriptions(state=None): + """ + Get list of subscriptions to this node. + + The optional C{state} argument filters the subscriptions to their + state. + + @param state: Subscription state filter. One of C{'subscribed'}, + C{'pending'}, C{'unconfigured'}. + @type state: C{str} + @return: a deferred that returns a C{list} of + L{wokkel.pubsub.Subscription}s. + """ + + + def addSubscription(subscriber, state, config): + """ + Add new subscription to this node with given state. + + @param subscriber: JID of the new subscriptions' entity. + @type subscriber: L{JID} + @param state: C{'subscribed'} or C{'pending'} + @type state: C{str} + @param config: Subscription configuration. + @param config: C{dict} + @return: deferred that fires on subscription. + """ + + + def removeSubscription(subscriber): + """ + Remove subscription to this node. + + @param subscriber: JID of the subscriptions' entity. + @type subscriber: L{JID} + @return: deferred that fires on removal. + """ + + + def isSubscribed(entity): + """ + Returns whether entity has any subscription to this node. + + Only returns C{True} when the subscription state (if present) is + C{'subscribed'} for any subscription that matches the bare JID. + + @param subscriber: bare JID of the subscriptions' entity. + @type subscriber: L{JID} + @return: deferred that returns a C{bool}. + """ + + + def getAffiliations(): + """ + Get affiliations of entities with this node. + + @return: deferred that returns a C{list} of tuples (jid, affiliation), + where jid is a L(JID) + and affiliation is one of C{'owner'}, + C{'publisher'}, C{'outcast'}. + """ + + + +class ILeafNode(Interface): + """ + Interface to the class of objects that represent leaf nodes. + """ + + def storeItems(items, publisher): + """ + Store items in persistent storage for later retrieval. + + @param items: The list of items to be stored. Each item is the + L{domish} representation of the XML fragment as defined + for C{} in the + C{http://jabber.org/protocol/pubsub} namespace. + @type items: C{list} of {domish.Element} + @param publisher: JID of the publishing entity. + @type publisher: L{JID} + @return: deferred that fires upon success. + """ + + + def removeItems(itemIdentifiers): + """ + Remove items by id. + + @param itemIdentifiers: C{list} of item ids. + @return: deferred that fires with a C{list} of ids of the items that + were deleted + """ + + + def getItems(maxItems=None): + """ + Get items. + + If C{maxItems} is not given, all items in the node are returned, + just like C{getItemsById}. Otherwise, C{maxItems} limits + the returned items to a maximum of that number of most recently + published items. + + @param maxItems: if given, a natural number (>0) that limits the + returned number of items. + @return: deferred that fires with a C{list} of found items. + """ + + + def getItemsById(itemIdentifiers): + """ + Get items by item id. + + Each item in the returned list is a unicode string that + represent the XML of the item as it was published, including the + item wrapper with item id. + + @param itemIdentifiers: C{list} of item ids. + @return: deferred that fires with a C{list} of found items. + """ + + + def purge(): + """ + Purge node of all items in persistent storage. + + @return: deferred that fires when the node has been purged. + """ + + + +class IGatewayStorage(Interface): + + def addCallback(service, nodeIdentifier, callback): + """ + Register a callback URI. + + The registered HTTP callback URI will have an Atom Entry documented + POSTed to it upon receiving a notification for the given pubsub node. + + @param service: The XMPP entity that holds the node. + @type service: L{JID} + @param nodeIdentifier: The identifier of the publish-subscribe node. + @type nodeIdentifier: C{unicode}. + @param callback: The callback URI to be registered. + @type callback: C{str}. + @rtype: L{Deferred} + """ + + def removeCallback(service, nodeIdentifier, callback): + """ + Remove a registered callback URI. + + The returned deferred will fire with a boolean that signals wether or + not this was the last callback unregistered for this node. + + @param service: The XMPP entity that holds the node. + @type service: L{JID} + @param nodeIdentifier: The identifier of the publish-subscribe node. + @type nodeIdentifier: C{unicode}. + @param callback: The callback URI to be unregistered. + @type callback: C{str}. + @rtype: L{Deferred} + """ + + def getCallbacks(service, nodeIdentifier): + """ + Get the callbacks registered for this node. + + Returns a deferred that fires with the set of HTTP callback URIs + registered for this node. + + @param service: The XMPP entity that holds the node. + @type service: L{JID} + @param nodeIdentifier: The identifier of the publish-subscribe node. + @type nodeIdentifier: C{unicode}. + @rtype: L{Deferred} + """ + + + def hasCallbacks(service, nodeIdentifier): + """ + Return wether there are callbacks registered for a node. + + @param service: The XMPP entity that holds the node. + @type service: L{JID} + @param nodeIdentifier: The identifier of the publish-subscribe node. + @type nodeIdentifier: C{unicode}. + @returns: Deferred that fires with a boolean. + @rtype: L{Deferred} + """ diff -r d99047cd90f9 -r 923281d4c5bc sat_pubsub/memory_storage.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat_pubsub/memory_storage.py Thu May 17 12:48:14 2012 +0200 @@ -0,0 +1,320 @@ +# Copyright (c) 2003-2010 Ralph Meijer +# See LICENSE for details. + +import copy +from zope.interface import implements +from twisted.internet import defer +from twisted.words.protocols.jabber import jid + +from wokkel.pubsub import Subscription + +from idavoll import error, iidavoll + +class Storage: + + implements(iidavoll.IStorage) + + defaultConfig = { + 'leaf': { + "pubsub#persist_items": True, + "pubsub#deliver_payloads": True, + "pubsub#send_last_published_item": 'on_sub', + }, + 'collection': { + "pubsub#deliver_payloads": True, + "pubsub#send_last_published_item": 'on_sub', + } + } + + def __init__(self): + rootNode = CollectionNode('', jid.JID('localhost'), + copy.copy(self.defaultConfig['collection'])) + self._nodes = {'': rootNode} + + + def getNode(self, nodeIdentifier): + try: + node = self._nodes[nodeIdentifier] + except KeyError: + return defer.fail(error.NodeNotFound()) + + return defer.succeed(node) + + + def getNodeIds(self): + return defer.succeed(self._nodes.keys()) + + + def createNode(self, nodeIdentifier, owner, config): + if nodeIdentifier in self._nodes: + return defer.fail(error.NodeExists()) + + if config['pubsub#node_type'] != 'leaf': + raise error.NoCollections() + + node = LeafNode(nodeIdentifier, owner, config) + self._nodes[nodeIdentifier] = node + + return defer.succeed(None) + + + def deleteNode(self, nodeIdentifier): + try: + del self._nodes[nodeIdentifier] + except KeyError: + return defer.fail(error.NodeNotFound()) + + return defer.succeed(None) + + + def getAffiliations(self, entity): + entity = entity.userhost() + return defer.succeed([(node.nodeIdentifier, node._affiliations[entity]) + for name, node in self._nodes.iteritems() + if entity in node._affiliations]) + + + def getSubscriptions(self, entity): + subscriptions = [] + for node in self._nodes.itervalues(): + for subscriber, subscription in node._subscriptions.iteritems(): + subscriber = jid.internJID(subscriber) + if subscriber.userhostJID() == entity.userhostJID(): + subscriptions.append(subscription) + + return defer.succeed(subscriptions) + + + def getDefaultConfiguration(self, nodeType): + if nodeType == 'collection': + raise error.NoCollections() + + return self.defaultConfig[nodeType] + + +class Node: + + implements(iidavoll.INode) + + def __init__(self, nodeIdentifier, owner, config): + self.nodeIdentifier = nodeIdentifier + self._affiliations = {owner.userhost(): 'owner'} + self._subscriptions = {} + self._config = copy.copy(config) + + + def getType(self): + return self.nodeType + + + def getConfiguration(self): + return self._config + + + def getMetaData(self): + config = copy.copy(self._config) + config["pubsub#node_type"] = self.nodeType + return config + + + def setConfiguration(self, options): + for option in options: + if option in self._config: + self._config[option] = options[option] + + return defer.succeed(None) + + + def getAffiliation(self, entity): + return defer.succeed(self._affiliations.get(entity.userhost())) + + + def getSubscription(self, subscriber): + try: + subscription = self._subscriptions[subscriber.full()] + except KeyError: + return defer.succeed(None) + else: + return defer.succeed(subscription) + + + def getSubscriptions(self, state=None): + return defer.succeed( + [subscription + for subscription in self._subscriptions.itervalues() + if state is None or subscription.state == state]) + + + + def addSubscription(self, subscriber, state, options): + if self._subscriptions.get(subscriber.full()): + return defer.fail(error.SubscriptionExists()) + + subscription = Subscription(self.nodeIdentifier, subscriber, state, + options) + self._subscriptions[subscriber.full()] = subscription + return defer.succeed(None) + + + def removeSubscription(self, subscriber): + try: + del self._subscriptions[subscriber.full()] + except KeyError: + return defer.fail(error.NotSubscribed()) + + return defer.succeed(None) + + + def isSubscribed(self, entity): + for subscriber, subscription in self._subscriptions.iteritems(): + if jid.internJID(subscriber).userhost() == entity.userhost() and \ + subscription.state == 'subscribed': + return defer.succeed(True) + + return defer.succeed(False) + + + def getAffiliations(self): + affiliations = [(jid.internJID(entity), affiliation) for entity, affiliation + in self._affiliations.iteritems()] + + return defer.succeed(affiliations) + + + +class PublishedItem(object): + """ + A published item. + + This represent an item as it was published by an entity. + + @ivar element: The DOM representation of the item that was published. + @type element: L{Element} + @ivar publisher: The entity that published the item. + @type publisher: L{JID} + """ + + def __init__(self, element, publisher): + self.element = element + self.publisher = publisher + + + +class LeafNode(Node): + + implements(iidavoll.ILeafNode) + + nodeType = 'leaf' + + def __init__(self, nodeIdentifier, owner, config): + Node.__init__(self, nodeIdentifier, owner, config) + self._items = {} + self._itemlist = [] + + + def storeItems(self, items, publisher): + for element in items: + item = PublishedItem(element, publisher) + itemIdentifier = element["id"] + if itemIdentifier in self._items: + self._itemlist.remove(self._items[itemIdentifier]) + self._items[itemIdentifier] = item + self._itemlist.append(item) + + return defer.succeed(None) + + + def removeItems(self, itemIdentifiers): + deleted = [] + + for itemIdentifier in itemIdentifiers: + try: + item = self._items[itemIdentifier] + except KeyError: + pass + else: + self._itemlist.remove(item) + del self._items[itemIdentifier] + deleted.append(itemIdentifier) + + return defer.succeed(deleted) + + + def getItems(self, maxItems=None): + if maxItems: + itemList = self._itemlist[-maxItems:] + else: + itemList = self._itemlist + return defer.succeed([item.element for item in itemList]) + + + def getItemsById(self, itemIdentifiers): + items = [] + for itemIdentifier in itemIdentifiers: + try: + item = self._items[itemIdentifier] + except KeyError: + pass + else: + items.append(item.element) + return defer.succeed(items) + + + def purge(self): + self._items = {} + self._itemlist = [] + + return defer.succeed(None) + + +class CollectionNode(Node): + nodeType = 'collection' + + + +class GatewayStorage(object): + """ + Memory based storage facility for the XMPP-HTTP gateway. + """ + + def __init__(self): + self.callbacks = {} + + + def addCallback(self, service, nodeIdentifier, callback): + try: + callbacks = self.callbacks[service, nodeIdentifier] + except KeyError: + callbacks = set([callback]) + self.callbacks[service, nodeIdentifier] = callbacks + else: + callbacks.add(callback) + pass + + return defer.succeed(None) + + + def removeCallback(self, service, nodeIdentifier, callback): + try: + callbacks = self.callbacks[service, nodeIdentifier] + callbacks.remove(callback) + except KeyError: + return defer.fail(error.NotSubscribed()) + else: + if not callbacks: + del self.callbacks[service, nodeIdentifier] + + return defer.succeed(not callbacks) + + + def getCallbacks(self, service, nodeIdentifier): + try: + callbacks = self.callbacks[service, nodeIdentifier] + except KeyError: + return defer.fail(error.NoCallbacks()) + else: + return defer.succeed(callbacks) + + + def hasCallbacks(self, service, nodeIdentifier): + return defer.succeed((service, nodeIdentifier) in self.callbacks) diff -r d99047cd90f9 -r 923281d4c5bc sat_pubsub/pgsql_storage.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat_pubsub/pgsql_storage.py Thu May 17 12:48:14 2012 +0200 @@ -0,0 +1,605 @@ +# Copyright (c) 2003-2008 Ralph Meijer +# See LICENSE for details. + +import copy + +from zope.interface import implements + +from twisted.enterprise import adbapi +from twisted.words.protocols.jabber import jid + +from wokkel.generic import parseXml, stripNamespace +from wokkel.pubsub import Subscription + +from idavoll import error, iidavoll + +class Storage: + + implements(iidavoll.IStorage) + + defaultConfig = { + 'leaf': { + "pubsub#persist_items": True, + "pubsub#deliver_payloads": True, + "pubsub#send_last_published_item": 'on_sub', + }, + 'collection': { + "pubsub#deliver_payloads": True, + "pubsub#send_last_published_item": 'on_sub', + } + } + + def __init__(self, dbpool): + self.dbpool = dbpool + + + def getNode(self, nodeIdentifier): + return self.dbpool.runInteraction(self._getNode, nodeIdentifier) + + + def _getNode(self, cursor, nodeIdentifier): + configuration = {} + cursor.execute("""SELECT node_type, + persist_items, + deliver_payloads, + send_last_published_item + FROM nodes + WHERE node=%s""", + (nodeIdentifier,)) + row = cursor.fetchone() + + if not row: + raise error.NodeNotFound() + + if row[0] == 'leaf': + configuration = { + 'pubsub#persist_items': row[1], + 'pubsub#deliver_payloads': row[2], + 'pubsub#send_last_published_item': + row[3]} + node = LeafNode(nodeIdentifier, configuration) + node.dbpool = self.dbpool + return node + elif row[0] == 'collection': + configuration = { + 'pubsub#deliver_payloads': row[2], + 'pubsub#send_last_published_item': + row[3]} + node = CollectionNode(nodeIdentifier, configuration) + node.dbpool = self.dbpool + return node + + + + def getNodeIds(self): + d = self.dbpool.runQuery("""SELECT node from nodes""") + d.addCallback(lambda results: [r[0] for r in results]) + return d + + + def createNode(self, nodeIdentifier, owner, config): + return self.dbpool.runInteraction(self._createNode, nodeIdentifier, + owner, config) + + + def _createNode(self, cursor, nodeIdentifier, owner, config): + if config['pubsub#node_type'] != 'leaf': + raise error.NoCollections() + + owner = owner.userhost() + try: + cursor.execute("""INSERT INTO nodes + (node, node_type, persist_items, + deliver_payloads, send_last_published_item) + VALUES + (%s, 'leaf', %s, %s, %s)""", + (nodeIdentifier, + config['pubsub#persist_items'], + config['pubsub#deliver_payloads'], + config['pubsub#send_last_published_item']) + ) + except cursor._pool.dbapi.IntegrityError: + raise error.NodeExists() + + cursor.execute("""SELECT 1 from entities where jid=%s""", + (owner,)) + + if not cursor.fetchone(): + cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", + (owner,)) + + cursor.execute("""INSERT INTO affiliations + (node_id, entity_id, affiliation) + SELECT node_id, entity_id, 'owner' FROM + (SELECT node_id FROM nodes WHERE node=%s) as n + CROSS JOIN + (SELECT entity_id FROM entities + WHERE jid=%s) as e""", + (nodeIdentifier, owner)) + + + def deleteNode(self, nodeIdentifier): + return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier) + + + def _deleteNode(self, cursor, nodeIdentifier): + cursor.execute("""DELETE FROM nodes WHERE node=%s""", + (nodeIdentifier,)) + + if cursor.rowcount != 1: + raise error.NodeNotFound() + + + def getAffiliations(self, entity): + d = self.dbpool.runQuery("""SELECT node, affiliation FROM entities + NATURAL JOIN affiliations + NATURAL JOIN nodes + WHERE jid=%s""", + (entity.userhost(),)) + d.addCallback(lambda results: [tuple(r) for r in results]) + return d + + + def getSubscriptions(self, entity): + def toSubscriptions(rows): + subscriptions = [] + for row in rows: + subscriber = jid.internJID('%s/%s' % (row[1], + row[2])) + subscription = Subscription(row[0], subscriber, row[3]) + subscriptions.append(subscription) + return subscriptions + + d = self.dbpool.runQuery("""SELECT node, jid, resource, state + FROM entities + NATURAL JOIN subscriptions + NATURAL JOIN nodes + WHERE jid=%s""", + (entity.userhost(),)) + d.addCallback(toSubscriptions) + return d + + + def getDefaultConfiguration(self, nodeType): + return self.defaultConfig[nodeType] + + + +class Node: + + implements(iidavoll.INode) + + def __init__(self, nodeIdentifier, config): + self.nodeIdentifier = nodeIdentifier + self._config = config + + + def _checkNodeExists(self, cursor): + cursor.execute("""SELECT node_id FROM nodes WHERE node=%s""", + (self.nodeIdentifier,)) + if not cursor.fetchone(): + raise error.NodeNotFound() + + + def getType(self): + return self.nodeType + + + def getConfiguration(self): + return self._config + + + def setConfiguration(self, options): + config = copy.copy(self._config) + + for option in options: + if option in config: + config[option] = options[option] + + d = self.dbpool.runInteraction(self._setConfiguration, config) + d.addCallback(self._setCachedConfiguration, config) + return d + + + def _setConfiguration(self, cursor, config): + self._checkNodeExists(cursor) + cursor.execute("""UPDATE nodes SET persist_items=%s, + deliver_payloads=%s, + send_last_published_item=%s + WHERE node=%s""", + (config["pubsub#persist_items"], + config["pubsub#deliver_payloads"], + config["pubsub#send_last_published_item"], + self.nodeIdentifier)) + + + def _setCachedConfiguration(self, void, config): + self._config = config + + + def getMetaData(self): + config = copy.copy(self._config) + config["pubsub#node_type"] = self.nodeType + return config + + + def getAffiliation(self, entity): + return self.dbpool.runInteraction(self._getAffiliation, entity) + + + def _getAffiliation(self, cursor, entity): + self._checkNodeExists(cursor) + cursor.execute("""SELECT affiliation FROM affiliations + NATURAL JOIN nodes + NATURAL JOIN entities + WHERE node=%s AND jid=%s""", + (self.nodeIdentifier, + entity.userhost())) + + try: + return cursor.fetchone()[0] + except TypeError: + return None + + + def getSubscription(self, subscriber): + return self.dbpool.runInteraction(self._getSubscription, subscriber) + + + def _getSubscription(self, cursor, subscriber): + self._checkNodeExists(cursor) + + userhost = subscriber.userhost() + resource = subscriber.resource or '' + + cursor.execute("""SELECT state FROM subscriptions + NATURAL JOIN nodes + NATURAL JOIN entities + WHERE node=%s AND jid=%s AND resource=%s""", + (self.nodeIdentifier, + userhost, + resource)) + row = cursor.fetchone() + if not row: + return None + else: + return Subscription(self.nodeIdentifier, subscriber, row[0]) + + + def getSubscriptions(self, state=None): + return self.dbpool.runInteraction(self._getSubscriptions, state) + + + def _getSubscriptions(self, cursor, state): + self._checkNodeExists(cursor) + + query = """SELECT jid, resource, state, + subscription_type, subscription_depth + FROM subscriptions + NATURAL JOIN nodes + NATURAL JOIN entities + WHERE node=%s"""; + values = [self.nodeIdentifier] + + if state: + query += " AND state=%s" + values.append(state) + + cursor.execute(query, values); + rows = cursor.fetchall() + + subscriptions = [] + for row in rows: + subscriber = jid.JID('%s/%s' % (row[0], row[1])) + + options = {} + if row[3]: + options['pubsub#subscription_type'] = row[3]; + if row[4]: + options['pubsub#subscription_depth'] = row[4]; + + subscriptions.append(Subscription(self.nodeIdentifier, subscriber, + row[2], options)) + + return subscriptions + + + def addSubscription(self, subscriber, state, config): + return self.dbpool.runInteraction(self._addSubscription, subscriber, + state, config) + + + def _addSubscription(self, cursor, subscriber, state, config): + self._checkNodeExists(cursor) + + userhost = subscriber.userhost() + resource = subscriber.resource or '' + + subscription_type = config.get('pubsub#subscription_type') + subscription_depth = config.get('pubsub#subscription_depth') + + try: + cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", + (userhost,)) + except cursor._pool.dbapi.IntegrityError: + cursor._connection.rollback() + + try: + cursor.execute("""INSERT INTO subscriptions + (node_id, entity_id, resource, state, + subscription_type, subscription_depth) + SELECT node_id, entity_id, %s, %s, %s, %s FROM + (SELECT node_id FROM nodes + WHERE node=%s) as n + CROSS JOIN + (SELECT entity_id FROM entities + WHERE jid=%s) as e""", + (resource, + state, + subscription_type, + subscription_depth, + self.nodeIdentifier, + userhost)) + except cursor._pool.dbapi.IntegrityError: + raise error.SubscriptionExists() + + + def removeSubscription(self, subscriber): + return self.dbpool.runInteraction(self._removeSubscription, + subscriber) + + + def _removeSubscription(self, cursor, subscriber): + self._checkNodeExists(cursor) + + userhost = subscriber.userhost() + resource = subscriber.resource or '' + + cursor.execute("""DELETE FROM subscriptions WHERE + node_id=(SELECT node_id FROM nodes + WHERE node=%s) AND + entity_id=(SELECT entity_id FROM entities + WHERE jid=%s) AND + resource=%s""", + (self.nodeIdentifier, + userhost, + resource)) + if cursor.rowcount != 1: + raise error.NotSubscribed() + + return None + + + def isSubscribed(self, entity): + return self.dbpool.runInteraction(self._isSubscribed, entity) + + + def _isSubscribed(self, cursor, entity): + self._checkNodeExists(cursor) + + cursor.execute("""SELECT 1 FROM entities + NATURAL JOIN subscriptions + NATURAL JOIN nodes + WHERE entities.jid=%s + AND node=%s AND state='subscribed'""", + (entity.userhost(), + self.nodeIdentifier)) + + return cursor.fetchone() is not None + + + def getAffiliations(self): + return self.dbpool.runInteraction(self._getAffiliations) + + + def _getAffiliations(self, cursor): + self._checkNodeExists(cursor) + + cursor.execute("""SELECT jid, affiliation FROM nodes + NATURAL JOIN affiliations + NATURAL JOIN entities + WHERE node=%s""", + (self.nodeIdentifier,)) + result = cursor.fetchall() + + return [(jid.internJID(r[0]), r[1]) for r in result] + + + +class LeafNode(Node): + + implements(iidavoll.ILeafNode) + + nodeType = 'leaf' + + def storeItems(self, items, publisher): + return self.dbpool.runInteraction(self._storeItems, items, publisher) + + + def _storeItems(self, cursor, items, publisher): + self._checkNodeExists(cursor) + for item in items: + self._storeItem(cursor, item, publisher) + + + def _storeItem(self, cursor, item, publisher): + data = item.toXml() + cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s + FROM nodes + WHERE nodes.node_id = items.node_id AND + nodes.node = %s and items.item=%s""", + (publisher.full(), + data, + self.nodeIdentifier, + item["id"])) + if cursor.rowcount == 1: + return + + cursor.execute("""INSERT INTO items (node_id, item, publisher, data) + SELECT node_id, %s, %s, %s FROM nodes + WHERE node=%s""", + (item["id"], + publisher.full(), + data, + self.nodeIdentifier)) + + + def removeItems(self, itemIdentifiers): + return self.dbpool.runInteraction(self._removeItems, itemIdentifiers) + + + def _removeItems(self, cursor, itemIdentifiers): + self._checkNodeExists(cursor) + + deleted = [] + + for itemIdentifier in itemIdentifiers: + cursor.execute("""DELETE FROM items WHERE + node_id=(SELECT node_id FROM nodes + WHERE node=%s) AND + item=%s""", + (self.nodeIdentifier, + itemIdentifier)) + + if cursor.rowcount: + deleted.append(itemIdentifier) + + return deleted + + + def getItems(self, maxItems=None): + return self.dbpool.runInteraction(self._getItems, maxItems) + + + def _getItems(self, cursor, maxItems): + self._checkNodeExists(cursor) + query = """SELECT data FROM nodes + NATURAL JOIN items + WHERE node=%s ORDER BY date DESC""" + if maxItems: + cursor.execute(query + " LIMIT %s", + (self.nodeIdentifier, + maxItems)) + else: + cursor.execute(query, (self.nodeIdentifier,)) + + result = cursor.fetchall() + items = [stripNamespace(parseXml(r[0])) for r in result] + return items + + + def getItemsById(self, itemIdentifiers): + return self.dbpool.runInteraction(self._getItemsById, itemIdentifiers) + + + def _getItemsById(self, cursor, itemIdentifiers): + self._checkNodeExists(cursor) + items = [] + for itemIdentifier in itemIdentifiers: + cursor.execute("""SELECT data FROM nodes + NATURAL JOIN items + WHERE node=%s AND item=%s""", + (self.nodeIdentifier, + itemIdentifier)) + result = cursor.fetchone() + if result: + items.append(parseXml(result[0])) + return items + + + def purge(self): + return self.dbpool.runInteraction(self._purge) + + + def _purge(self, cursor): + self._checkNodeExists(cursor) + + cursor.execute("""DELETE FROM items WHERE + node_id=(SELECT node_id FROM nodes WHERE node=%s)""", + (self.nodeIdentifier,)) + + +class CollectionNode(Node): + + nodeType = 'collection' + + + +class GatewayStorage(object): + """ + Memory based storage facility for the XMPP-HTTP gateway. + """ + + def __init__(self, dbpool): + self.dbpool = dbpool + + + def _countCallbacks(self, cursor, service, nodeIdentifier): + """ + Count number of callbacks registered for a node. + """ + cursor.execute("""SELECT count(*) FROM callbacks + WHERE service=%s and node=%s""", + service.full(), + nodeIdentifier) + results = cursor.fetchall() + return results[0][0] + + + def addCallback(self, service, nodeIdentifier, callback): + def interaction(cursor): + cursor.execute("""SELECT 1 FROM callbacks + WHERE service=%s and node=%s and uri=%s""", + service.full(), + nodeIdentifier, + callback) + if cursor.fetchall(): + return + + cursor.execute("""INSERT INTO callbacks + (service, node, uri) VALUES + (%s, %s, %s)""", + service.full(), + nodeIdentifier, + callback) + + return self.dbpool.runInteraction(interaction) + + + def removeCallback(self, service, nodeIdentifier, callback): + def interaction(cursor): + cursor.execute("""DELETE FROM callbacks + WHERE service=%s and node=%s and uri=%s""", + service.full(), + nodeIdentifier, + callback) + + if cursor.rowcount != 1: + raise error.NotSubscribed() + + last = not self._countCallbacks(cursor, service, nodeIdentifier) + return last + + return self.dbpool.runInteraction(interaction) + + def getCallbacks(self, service, nodeIdentifier): + def interaction(cursor): + cursor.execute("""SELECT uri FROM callbacks + WHERE service=%s and node=%s""", + service.full(), + nodeIdentifier) + results = cursor.fetchall() + + if not results: + raise error.NoCallbacks() + + return [result[0] for result in results] + + return self.dbpool.runInteraction(interaction) + + + def hasCallbacks(self, service, nodeIdentifier): + def interaction(cursor): + return bool(self._countCallbacks(cursor, service, nodeIdentifier)) + + return self.dbpool.runInteraction(interaction) diff -r d99047cd90f9 -r 923281d4c5bc sat_pubsub/tap.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat_pubsub/tap.py Thu May 17 12:48:14 2012 +0200 @@ -0,0 +1,94 @@ +# Copyright (c) 2003-2010 Ralph Meijer +# See LICENSE for details. + +from twisted.application import service +from twisted.python import usage +from twisted.words.protocols.jabber.jid import JID + +from wokkel.component import Component +from wokkel.disco import DiscoHandler +from wokkel.generic import FallbackHandler, VersionHandler +from wokkel.iwokkel import IPubSubResource +from wokkel.pubsub import PubSubService + +from idavoll import __version__ +from idavoll.backend import BackendService + +class Options(usage.Options): + optParameters = [ + ('jid', None, 'pubsub', 'JID this component will be available at'), + ('secret', None, 'secret', 'Jabber server component secret'), + ('rhost', None, '127.0.0.1', 'Jabber server host'), + ('rport', None, '5347', 'Jabber server port'), + ('backend', None, 'memory', 'Choice of storage backend'), + ('dbuser', None, None, 'Database user (pgsql backend)'), + ('dbname', None, 'pubsub', 'Database name (pgsql backend)'), + ('dbpass', None, None, 'Database password (pgsql backend)'), + ('dbhost', None, None, 'Database host (pgsql backend)'), + ('dbport', None, None, 'Database port (pgsql backend)'), + ] + + optFlags = [ + ('verbose', 'v', 'Show traffic'), + ('hide-nodes', None, 'Hide all nodes for disco') + ] + + def postOptions(self): + if self['backend'] not in ['pgsql', 'memory']: + raise usage.UsageError, "Unknown backend!" + + self['jid'] = JID(self['jid']) + + + +def makeService(config): + s = service.MultiService() + + # Create backend service with storage + + if config['backend'] == 'pgsql': + from twisted.enterprise import adbapi + from idavoll.pgsql_storage import Storage + dbpool = adbapi.ConnectionPool('psycopg2', + user=config['dbuser'], + password=config['dbpass'], + database=config['dbname'], + host=config['dbhost'], + port=config['dbport'], + cp_reconnect=True, + client_encoding='utf-8', + ) + st = Storage(dbpool) + elif config['backend'] == 'memory': + from idavoll.memory_storage import Storage + st = Storage() + + bs = BackendService(st) + bs.setName('backend') + bs.setServiceParent(s) + + # Set up XMPP server-side component with publish-subscribe capabilities + + cs = Component(config["rhost"], int(config["rport"]), + config["jid"].full(), config["secret"]) + cs.setName('component') + cs.setServiceParent(s) + + cs.factory.maxDelay = 900 + + if config["verbose"]: + cs.logTraffic = True + + FallbackHandler().setHandlerParent(cs) + VersionHandler('Idavoll', __version__).setHandlerParent(cs) + DiscoHandler().setHandlerParent(cs) + + resource = IPubSubResource(bs) + resource.hideNodes = config["hide-nodes"] + resource.serviceJID = config["jid"] + + ps = PubSubService(resource) + ps.setHandlerParent(cs) + resource.pubsubService = ps + + return s diff -r d99047cd90f9 -r 923281d4c5bc sat_pubsub/tap_http.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat_pubsub/tap_http.py Thu May 17 12:48:14 2012 +0200 @@ -0,0 +1,97 @@ +# Copyright (c) 2003-2008 Ralph Meijer +# See LICENSE for details. + +from twisted.application import internet, service, strports +from twisted.conch import manhole, manhole_ssh +from twisted.cred import portal, checkers +from twisted.web2 import channel, log, resource, server +from twisted.web2.tap import Web2Service + +from idavoll import gateway, tap +from idavoll.gateway import RemoteSubscriptionService + +class Options(tap.Options): + optParameters = [ + ('webport', None, '8086', 'Web port'), + ] + + + +def getManholeFactory(namespace, **passwords): + def getManHole(_): + return manhole.Manhole(namespace) + + realm = manhole_ssh.TerminalRealm() + realm.chainedProtocolFactory.protocolFactory = getManHole + p = portal.Portal(realm) + p.registerChecker( + checkers.InMemoryUsernamePasswordDatabaseDontUse(**passwords)) + f = manhole_ssh.ConchFactory(p) + return f + + + +def makeService(config): + s = tap.makeService(config) + + bs = s.getServiceNamed('backend') + cs = s.getServiceNamed('component') + + # Set up XMPP service for subscribing to remote nodes + + if config['backend'] == 'pgsql': + from idavoll.pgsql_storage import GatewayStorage + gst = GatewayStorage(bs.storage.dbpool) + elif config['backend'] == 'memory': + from idavoll.memory_storage import GatewayStorage + gst = GatewayStorage() + + ss = RemoteSubscriptionService(config['jid'], gst) + ss.setHandlerParent(cs) + ss.startService() + + # Set up web service + + root = resource.Resource() + + # Set up resources that exposes the backend + root.child_create = gateway.CreateResource(bs, config['jid'], + config['jid']) + root.child_delete = gateway.DeleteResource(bs, config['jid'], + config['jid']) + root.child_publish = gateway.PublishResource(bs, config['jid'], + config['jid']) + root.child_list = gateway.ListResource(bs) + + # Set up resources for accessing remote pubsub nodes. + root.child_subscribe = gateway.RemoteSubscribeResource(ss) + root.child_unsubscribe = gateway.RemoteUnsubscribeResource(ss) + root.child_items = gateway.RemoteItemsResource(ss) + + if config["verbose"]: + root = log.LogWrapperResource(root) + + site = server.Site(root) + w = internet.TCPServer(int(config['webport']), channel.HTTPFactory(site)) + + if config["verbose"]: + logObserver = log.DefaultCommonAccessLoggingObserver() + w2s = Web2Service(logObserver) + w.setServiceParent(w2s) + w = w2s + + w.setServiceParent(s) + + # Set up a manhole + + namespace = {'service': s, + 'component': cs, + 'backend': bs, + 'root': root} + + f = getManholeFactory(namespace, admin='admin') + manholeService = strports.service('2222', f) + manholeService.setServiceParent(s) + + return s + diff -r d99047cd90f9 -r 923281d4c5bc sat_pubsub/test/__init__.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat_pubsub/test/__init__.py Thu May 17 12:48:14 2012 +0200 @@ -0,0 +1,6 @@ +# Copyright (c) 2003-2007 Ralph Meijer +# See LICENSE for details. + +""" +Tests for L{idavoll}. +""" diff -r d99047cd90f9 -r 923281d4c5bc sat_pubsub/test/test_backend.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat_pubsub/test/test_backend.py Thu May 17 12:48:14 2012 +0200 @@ -0,0 +1,611 @@ +# Copyright (c) 2003-2010 Ralph Meijer +# See LICENSE for details. + +""" +Tests for L{idavoll.backend}. +""" + +from zope.interface import implements +from zope.interface.verify import verifyObject + +from twisted.internet import defer +from twisted.trial import unittest +from twisted.words.protocols.jabber import jid +from twisted.words.protocols.jabber.error import StanzaError + +from wokkel import iwokkel, pubsub + +from idavoll import backend, error, iidavoll + +OWNER = jid.JID('owner@example.com') +OWNER_FULL = jid.JID('owner@example.com/home') +SERVICE = jid.JID('test.example.org') +NS_PUBSUB = 'http://jabber.org/protocol/pubsub' + +class BackendTest(unittest.TestCase): + + def test_interfaceIBackend(self): + self.assertTrue(verifyObject(iidavoll.IBackendService, + backend.BackendService(None))) + + + def test_deleteNode(self): + class TestNode: + nodeIdentifier = 'to-be-deleted' + def getAffiliation(self, entity): + if entity.userhostJID() == OWNER: + return defer.succeed('owner') + + class TestStorage: + def __init__(self): + self.deleteCalled = [] + + def getNode(self, nodeIdentifier): + return defer.succeed(TestNode()) + + def deleteNode(self, nodeIdentifier): + if nodeIdentifier in ['to-be-deleted']: + self.deleteCalled.append(nodeIdentifier) + return defer.succeed(None) + else: + return defer.fail(error.NodeNotFound()) + + def preDelete(data): + self.assertFalse(self.storage.deleteCalled) + preDeleteCalled.append(data) + return defer.succeed(None) + + def cb(result): + self.assertEquals(1, len(preDeleteCalled)) + data = preDeleteCalled[-1] + self.assertEquals('to-be-deleted', data['nodeIdentifier']) + self.assertTrue(self.storage.deleteCalled) + + self.storage = TestStorage() + self.backend = backend.BackendService(self.storage) + + preDeleteCalled = [] + + self.backend.registerPreDelete(preDelete) + d = self.backend.deleteNode('to-be-deleted', OWNER_FULL) + d.addCallback(cb) + return d + + + def test_deleteNodeRedirect(self): + uri = 'xmpp:%s?;node=test2' % (SERVICE.full(),) + + class TestNode: + nodeIdentifier = 'to-be-deleted' + def getAffiliation(self, entity): + if entity.userhostJID() == OWNER: + return defer.succeed('owner') + + class TestStorage: + def __init__(self): + self.deleteCalled = [] + + def getNode(self, nodeIdentifier): + return defer.succeed(TestNode()) + + def deleteNode(self, nodeIdentifier): + if nodeIdentifier in ['to-be-deleted']: + self.deleteCalled.append(nodeIdentifier) + return defer.succeed(None) + else: + return defer.fail(error.NodeNotFound()) + + def preDelete(data): + self.assertFalse(self.storage.deleteCalled) + preDeleteCalled.append(data) + return defer.succeed(None) + + def cb(result): + self.assertEquals(1, len(preDeleteCalled)) + data = preDeleteCalled[-1] + self.assertEquals('to-be-deleted', data['nodeIdentifier']) + self.assertEquals(uri, data['redirectURI']) + self.assertTrue(self.storage.deleteCalled) + + self.storage = TestStorage() + self.backend = backend.BackendService(self.storage) + + preDeleteCalled = [] + + self.backend.registerPreDelete(preDelete) + d = self.backend.deleteNode('to-be-deleted', OWNER, redirectURI=uri) + d.addCallback(cb) + return d + + + def test_createNodeNoID(self): + """ + Test creation of a node without a given node identifier. + """ + class TestStorage: + def getDefaultConfiguration(self, nodeType): + return {} + + def createNode(self, nodeIdentifier, requestor, config): + self.nodeIdentifier = nodeIdentifier + return defer.succeed(None) + + self.storage = TestStorage() + self.backend = backend.BackendService(self.storage) + self.storage.backend = self.backend + + def checkID(nodeIdentifier): + self.assertNotIdentical(None, nodeIdentifier) + self.assertIdentical(self.storage.nodeIdentifier, nodeIdentifier) + + d = self.backend.createNode(None, OWNER_FULL) + d.addCallback(checkID) + return d + + class NodeStore: + """ + I just store nodes to pose as an L{IStorage} implementation. + """ + def __init__(self, nodes): + self.nodes = nodes + + def getNode(self, nodeIdentifier): + try: + return defer.succeed(self.nodes[nodeIdentifier]) + except KeyError: + return defer.fail(error.NodeNotFound()) + + + def test_getNotifications(self): + """ + Ensure subscribers show up in the notification list. + """ + item = pubsub.Item() + sub = pubsub.Subscription('test', OWNER, 'subscribed') + + class TestNode: + def getSubscriptions(self, state=None): + return [sub] + + def cb(result): + self.assertEquals(1, len(result)) + subscriber, subscriptions, items = result[-1] + + self.assertEquals(OWNER, subscriber) + self.assertEquals(set([sub]), subscriptions) + self.assertEquals([item], items) + + self.storage = self.NodeStore({'test': TestNode()}) + self.backend = backend.BackendService(self.storage) + d = self.backend.getNotifications('test', [item]) + d.addCallback(cb) + return d + + def test_getNotificationsRoot(self): + """ + Ensure subscribers to the root node show up in the notification list + for leaf nodes. + + This assumes a flat node relationship model with exactly one collection + node: the root node. Each leaf node is automatically a child node + of the root node. + """ + item = pubsub.Item() + subRoot = pubsub.Subscription('', OWNER, 'subscribed') + + class TestNode: + def getSubscriptions(self, state=None): + return [] + + class TestRootNode: + def getSubscriptions(self, state=None): + return [subRoot] + + def cb(result): + self.assertEquals(1, len(result)) + subscriber, subscriptions, items = result[-1] + self.assertEquals(OWNER, subscriber) + self.assertEquals(set([subRoot]), subscriptions) + self.assertEquals([item], items) + + self.storage = self.NodeStore({'test': TestNode(), + '': TestRootNode()}) + self.backend = backend.BackendService(self.storage) + d = self.backend.getNotifications('test', [item]) + d.addCallback(cb) + return d + + + def test_getNotificationsMultipleNodes(self): + """ + Ensure that entities that subscribe to a leaf node as well as the + root node get exactly one notification. + """ + item = pubsub.Item() + sub = pubsub.Subscription('test', OWNER, 'subscribed') + subRoot = pubsub.Subscription('', OWNER, 'subscribed') + + class TestNode: + def getSubscriptions(self, state=None): + return [sub] + + class TestRootNode: + def getSubscriptions(self, state=None): + return [subRoot] + + def cb(result): + self.assertEquals(1, len(result)) + subscriber, subscriptions, items = result[-1] + + self.assertEquals(OWNER, subscriber) + self.assertEquals(set([sub, subRoot]), subscriptions) + self.assertEquals([item], items) + + self.storage = self.NodeStore({'test': TestNode(), + '': TestRootNode()}) + self.backend = backend.BackendService(self.storage) + d = self.backend.getNotifications('test', [item]) + d.addCallback(cb) + return d + + + def test_getDefaultConfiguration(self): + """ + L{backend.BackendService.getDefaultConfiguration} should return + a deferred that fires a dictionary with configuration values. + """ + + class TestStorage: + def getDefaultConfiguration(self, nodeType): + return { + "pubsub#persist_items": True, + "pubsub#deliver_payloads": True} + + def cb(options): + self.assertIn("pubsub#persist_items", options) + self.assertEqual(True, options["pubsub#persist_items"]) + + self.backend = backend.BackendService(TestStorage()) + d = self.backend.getDefaultConfiguration('leaf') + d.addCallback(cb) + return d + + + def test_getNodeConfiguration(self): + class testNode: + nodeIdentifier = 'node' + def getConfiguration(self): + return {'pubsub#deliver_payloads': True, + 'pubsub#persist_items': False} + + class testStorage: + def getNode(self, nodeIdentifier): + return defer.succeed(testNode()) + + def cb(options): + self.assertIn("pubsub#deliver_payloads", options) + self.assertEqual(True, options["pubsub#deliver_payloads"]) + self.assertIn("pubsub#persist_items", options) + self.assertEqual(False, options["pubsub#persist_items"]) + + self.storage = testStorage() + self.backend = backend.BackendService(self.storage) + self.storage.backend = self.backend + + d = self.backend.getNodeConfiguration('node') + d.addCallback(cb) + return d + + + def test_setNodeConfiguration(self): + class testNode: + nodeIdentifier = 'node' + def getAffiliation(self, entity): + if entity.userhostJID() == OWNER: + return defer.succeed('owner') + def setConfiguration(self, options): + self.options = options + + class testStorage: + def __init__(self): + self.nodes = {'node': testNode()} + def getNode(self, nodeIdentifier): + return defer.succeed(self.nodes[nodeIdentifier]) + + def checkOptions(node): + options = node.options + self.assertIn("pubsub#deliver_payloads", options) + self.assertEqual(True, options["pubsub#deliver_payloads"]) + self.assertIn("pubsub#persist_items", options) + self.assertEqual(False, options["pubsub#persist_items"]) + + def cb(result): + d = self.storage.getNode('node') + d.addCallback(checkOptions) + return d + + self.storage = testStorage() + self.backend = backend.BackendService(self.storage) + self.storage.backend = self.backend + + options = {'pubsub#deliver_payloads': True, + 'pubsub#persist_items': False} + + d = self.backend.setNodeConfiguration('node', options, OWNER_FULL) + d.addCallback(cb) + return d + + + def test_publishNoID(self): + """ + Test publish request with an item without a node identifier. + """ + class TestNode: + nodeType = 'leaf' + nodeIdentifier = 'node' + def getAffiliation(self, entity): + if entity.userhostJID() == OWNER: + return defer.succeed('owner') + def getConfiguration(self): + return {'pubsub#deliver_payloads': True, + 'pubsub#persist_items': False} + + class TestStorage: + def getNode(self, nodeIdentifier): + return defer.succeed(TestNode()) + + def checkID(notification): + self.assertNotIdentical(None, notification['items'][0]['id']) + + self.storage = TestStorage() + self.backend = backend.BackendService(self.storage) + self.storage.backend = self.backend + + self.backend.registerNotifier(checkID) + + items = [pubsub.Item()] + d = self.backend.publish('node', items, OWNER_FULL) + return d + + + def test_notifyOnSubscription(self): + """ + Test notification of last published item on subscription. + """ + ITEM = "" % NS_PUBSUB + + class TestNode: + implements(iidavoll.ILeafNode) + nodeIdentifier = 'node' + nodeType = 'leaf' + def getAffiliation(self, entity): + if entity is OWNER: + return defer.succeed('owner') + def getConfiguration(self): + return {'pubsub#deliver_payloads': True, + 'pubsub#persist_items': False, + 'pubsub#send_last_published_item': 'on_sub'} + def getItems(self, maxItems): + return [ITEM] + def addSubscription(self, subscriber, state, options): + self.subscription = pubsub.Subscription('node', subscriber, + state, options) + return defer.succeed(None) + def getSubscription(self, subscriber): + return defer.succeed(self.subscription) + + class TestStorage: + def getNode(self, nodeIdentifier): + return defer.succeed(TestNode()) + + def cb(data): + self.assertEquals('node', data['nodeIdentifier']) + self.assertEquals([ITEM], data['items']) + self.assertEquals(OWNER, data['subscription'].subscriber) + + self.storage = TestStorage() + self.backend = backend.BackendService(self.storage) + self.storage.backend = self.backend + + d1 = defer.Deferred() + d1.addCallback(cb) + self.backend.registerNotifier(d1.callback) + d2 = self.backend.subscribe('node', OWNER, OWNER_FULL) + return defer.gatherResults([d1, d2]) + + test_notifyOnSubscription.timeout = 2 + + + +class BaseTestBackend(object): + """ + Base class for backend stubs. + """ + + def supportsPublisherAffiliation(self): + return True + + + def supportsOutcastAffiliation(self): + return True + + + def supportsPersistentItems(self): + return True + + + def supportsInstantNodes(self): + return True + + + def registerNotifier(self, observerfn, *args, **kwargs): + return + + + def registerPreDelete(self, preDeleteFn): + return + + + +class PubSubResourceFromBackendTest(unittest.TestCase): + + def test_interface(self): + resource = backend.PubSubResourceFromBackend(BaseTestBackend()) + self.assertTrue(verifyObject(iwokkel.IPubSubResource, resource)) + + + def test_preDelete(self): + """ + Test pre-delete sending out notifications to subscribers. + """ + + class TestBackend(BaseTestBackend): + preDeleteFn = None + + def registerPreDelete(self, preDeleteFn): + self.preDeleteFn = preDeleteFn + + def getSubscribers(self, nodeIdentifier): + return defer.succeed([OWNER]) + + def notifyDelete(service, nodeIdentifier, subscribers, + redirectURI=None): + self.assertEqual(SERVICE, service) + self.assertEqual('test', nodeIdentifier) + self.assertEqual([OWNER], subscribers) + self.assertIdentical(None, redirectURI) + d1.callback(None) + + d1 = defer.Deferred() + resource = backend.PubSubResourceFromBackend(TestBackend()) + resource.serviceJID = SERVICE + resource.pubsubService = pubsub.PubSubService() + resource.pubsubService.notifyDelete = notifyDelete + self.assertTrue(verifyObject(iwokkel.IPubSubResource, resource)) + self.assertNotIdentical(None, resource.backend.preDeleteFn) + data = {'nodeIdentifier': 'test'} + d2 = resource.backend.preDeleteFn(data) + return defer.DeferredList([d1, d2], fireOnOneErrback=1) + + + def test_preDeleteRedirect(self): + """ + Test pre-delete sending out notifications to subscribers. + """ + + uri = 'xmpp:%s?;node=test2' % (SERVICE.full(),) + + class TestBackend(BaseTestBackend): + preDeleteFn = None + + def registerPreDelete(self, preDeleteFn): + self.preDeleteFn = preDeleteFn + + def getSubscribers(self, nodeIdentifier): + return defer.succeed([OWNER]) + + def notifyDelete(service, nodeIdentifier, subscribers, + redirectURI=None): + self.assertEqual(SERVICE, service) + self.assertEqual('test', nodeIdentifier) + self.assertEqual([OWNER], subscribers) + self.assertEqual(uri, redirectURI) + d1.callback(None) + + d1 = defer.Deferred() + resource = backend.PubSubResourceFromBackend(TestBackend()) + resource.serviceJID = SERVICE + resource.pubsubService = pubsub.PubSubService() + resource.pubsubService.notifyDelete = notifyDelete + self.assertTrue(verifyObject(iwokkel.IPubSubResource, resource)) + self.assertNotIdentical(None, resource.backend.preDeleteFn) + data = {'nodeIdentifier': 'test', + 'redirectURI': uri} + d2 = resource.backend.preDeleteFn(data) + return defer.DeferredList([d1, d2], fireOnOneErrback=1) + + + def test_unsubscribeNotSubscribed(self): + """ + Test unsubscription request when not subscribed. + """ + + class TestBackend(BaseTestBackend): + def unsubscribe(self, nodeIdentifier, subscriber, requestor): + return defer.fail(error.NotSubscribed()) + + def cb(e): + self.assertEquals('unexpected-request', e.condition) + + resource = backend.PubSubResourceFromBackend(TestBackend()) + request = pubsub.PubSubRequest() + request.sender = OWNER + request.recipient = SERVICE + request.nodeIdentifier = 'test' + request.subscriber = OWNER + d = resource.unsubscribe(request) + self.assertFailure(d, StanzaError) + d.addCallback(cb) + return d + + + def test_getInfo(self): + """ + Test retrieving node information. + """ + + class TestBackend(BaseTestBackend): + def getNodeType(self, nodeIdentifier): + return defer.succeed('leaf') + + def getNodeMetaData(self, nodeIdentifier): + return defer.succeed({'pubsub#persist_items': True}) + + def cb(info): + self.assertIn('type', info) + self.assertEquals('leaf', info['type']) + self.assertIn('meta-data', info) + self.assertEquals({'pubsub#persist_items': True}, info['meta-data']) + + resource = backend.PubSubResourceFromBackend(TestBackend()) + d = resource.getInfo(OWNER, SERVICE, 'test') + d.addCallback(cb) + return d + + + def test_getConfigurationOptions(self): + class TestBackend(BaseTestBackend): + nodeOptions = { + "pubsub#persist_items": + {"type": "boolean", + "label": "Persist items to storage"}, + "pubsub#deliver_payloads": + {"type": "boolean", + "label": "Deliver payloads with event notifications"} + } + + resource = backend.PubSubResourceFromBackend(TestBackend()) + options = resource.getConfigurationOptions() + self.assertIn("pubsub#persist_items", options) + + + def test_default(self): + class TestBackend(BaseTestBackend): + def getDefaultConfiguration(self, nodeType): + options = {"pubsub#persist_items": True, + "pubsub#deliver_payloads": True, + "pubsub#send_last_published_item": 'on_sub', + } + return defer.succeed(options) + + def cb(options): + self.assertEquals(True, options["pubsub#persist_items"]) + + resource = backend.PubSubResourceFromBackend(TestBackend()) + request = pubsub.PubSubRequest() + request.sender = OWNER + request.recipient = SERVICE + request.nodeType = 'leaf' + d = resource.default(request) + d.addCallback(cb) + return d diff -r d99047cd90f9 -r 923281d4c5bc sat_pubsub/test/test_gateway.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat_pubsub/test/test_gateway.py Thu May 17 12:48:14 2012 +0200 @@ -0,0 +1,398 @@ +# Copyright (c) 2003-2009 Ralph Meijer +# See LICENSE for details. + +""" +Tests for L{idavoll.gateway}. + +Note that some tests are functional tests that require a running idavoll +service. +""" + +from twisted.internet import defer +from twisted.trial import unittest +from twisted.web import error +from twisted.words.xish import domish + +from idavoll import gateway + +AGENT = "Idavoll Test Script" +NS_ATOM = "http://www.w3.org/2005/Atom" + +TEST_ENTRY = domish.Element((NS_ATOM, 'entry')) +TEST_ENTRY.addElement("id", + content="urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a") +TEST_ENTRY.addElement("title", content="Atom-Powered Robots Run Amok") +TEST_ENTRY.addElement("author").addElement("name", content="John Doe") +TEST_ENTRY.addElement("content", content="Some text.") + +baseURI = "http://localhost:8086/" +componentJID = "pubsub" + +class GatewayTest(unittest.TestCase): + timeout = 2 + + def setUp(self): + self.client = gateway.GatewayClient(baseURI) + self.client.startService() + self.addCleanup(self.client.stopService) + + def trapConnectionRefused(failure): + from twisted.internet.error import ConnectionRefusedError + failure.trap(ConnectionRefusedError) + raise unittest.SkipTest("Gateway to test against is not available") + + def trapNotFound(failure): + from twisted.web.error import Error + failure.trap(Error) + + d = self.client.ping() + d.addErrback(trapConnectionRefused) + d.addErrback(trapNotFound) + return d + + + def tearDown(self): + return self.client.stopService() + + + def test_create(self): + + def cb(response): + self.assertIn('uri', response) + + d = self.client.create() + d.addCallback(cb) + return d + + def test_publish(self): + + def cb(response): + self.assertIn('uri', response) + + d = self.client.publish(TEST_ENTRY) + d.addCallback(cb) + return d + + def test_publishExistingNode(self): + + def cb2(response, xmppURI): + self.assertEquals(xmppURI, response['uri']) + + def cb1(response): + xmppURI = response['uri'] + d = self.client.publish(TEST_ENTRY, xmppURI) + d.addCallback(cb2, xmppURI) + return d + + d = self.client.create() + d.addCallback(cb1) + return d + + def test_publishNonExisting(self): + def cb(err): + self.assertEqual('404', err.status) + + d = self.client.publish(TEST_ENTRY, 'xmpp:%s?node=test' % componentJID) + self.assertFailure(d, error.Error) + d.addCallback(cb) + return d + + def test_delete(self): + def cb(response): + xmppURI = response['uri'] + d = self.client.delete(xmppURI) + return d + + d = self.client.create() + d.addCallback(cb) + return d + + def test_deleteWithRedirect(self): + def cb(response): + xmppURI = response['uri'] + redirectURI = 'xmpp:%s?node=test' % componentJID + d = self.client.delete(xmppURI, redirectURI) + return d + + d = self.client.create() + d.addCallback(cb) + return d + + def test_deleteNotification(self): + def onNotification(data, headers): + try: + self.assertTrue(headers.hasHeader('Event')) + self.assertEquals(['DELETED'], headers.getRawHeaders('Event')) + self.assertFalse(headers.hasHeader('Link')) + except: + self.client.deferred.errback() + else: + self.client.deferred.callback(None) + + def cb(response): + xmppURI = response['uri'] + d = self.client.subscribe(xmppURI) + d.addCallback(lambda _: xmppURI) + return d + + def cb2(xmppURI): + d = self.client.delete(xmppURI) + return d + + self.client.callback = onNotification + self.client.deferred = defer.Deferred() + d = self.client.create() + d.addCallback(cb) + d.addCallback(cb2) + return defer.gatherResults([d, self.client.deferred]) + + def test_deleteNotificationWithRedirect(self): + redirectURI = 'xmpp:%s?node=test' % componentJID + + def onNotification(data, headers): + try: + self.assertTrue(headers.hasHeader('Event')) + self.assertEquals(['DELETED'], headers.getRawHeaders('Event')) + self.assertEquals(['<%s>; rel=alternate' % redirectURI], + headers.getRawHeaders('Link')) + except: + self.client.deferred.errback() + else: + self.client.deferred.callback(None) + + def cb(response): + xmppURI = response['uri'] + d = self.client.subscribe(xmppURI) + d.addCallback(lambda _: xmppURI) + return d + + def cb2(xmppURI): + d = self.client.delete(xmppURI, redirectURI) + return d + + self.client.callback = onNotification + self.client.deferred = defer.Deferred() + d = self.client.create() + d.addCallback(cb) + d.addCallback(cb2) + return defer.gatherResults([d, self.client.deferred]) + + def test_list(self): + d = self.client.listNodes() + return d + + def test_subscribe(self): + def cb(response): + xmppURI = response['uri'] + d = self.client.subscribe(xmppURI) + return d + + d = self.client.create() + d.addCallback(cb) + return d + + def test_subscribeGetNotification(self): + + def onNotification(data, headers): + self.client.deferred.callback(None) + + def cb(response): + xmppURI = response['uri'] + d = self.client.subscribe(xmppURI) + d.addCallback(lambda _: xmppURI) + return d + + def cb2(xmppURI): + d = self.client.publish(TEST_ENTRY, xmppURI) + return d + + + self.client.callback = onNotification + self.client.deferred = defer.Deferred() + d = self.client.create() + d.addCallback(cb) + d.addCallback(cb2) + return defer.gatherResults([d, self.client.deferred]) + + + def test_subscribeTwiceGetNotification(self): + + def onNotification1(data, headers): + d = client1.stopService() + d.chainDeferred(client1.deferred) + + def onNotification2(data, headers): + d = client2.stopService() + d.chainDeferred(client2.deferred) + + def cb(response): + xmppURI = response['uri'] + d = client1.subscribe(xmppURI) + d.addCallback(lambda _: xmppURI) + return d + + def cb2(xmppURI): + d = client2.subscribe(xmppURI) + d.addCallback(lambda _: xmppURI) + return d + + def cb3(xmppURI): + d = self.client.publish(TEST_ENTRY, xmppURI) + return d + + + client1 = gateway.GatewayClient(baseURI, callbackPort=8088) + client1.startService() + client1.callback = onNotification1 + client1.deferred = defer.Deferred() + client2 = gateway.GatewayClient(baseURI, callbackPort=8089) + client2.startService() + client2.callback = onNotification2 + client2.deferred = defer.Deferred() + + d = self.client.create() + d.addCallback(cb) + d.addCallback(cb2) + d.addCallback(cb3) + dl = defer.gatherResults([d, client1.deferred, client2.deferred]) + return dl + + + def test_subscribeGetDelayedNotification(self): + + def onNotification(data, headers): + self.client.deferred.callback(None) + + def cb(response): + xmppURI = response['uri'] + self.assertNot(self.client.deferred.called) + d = self.client.publish(TEST_ENTRY, xmppURI) + d.addCallback(lambda _: xmppURI) + return d + + def cb2(xmppURI): + d = self.client.subscribe(xmppURI) + return d + + + self.client.callback = onNotification + self.client.deferred = defer.Deferred() + d = self.client.create() + d.addCallback(cb) + d.addCallback(cb2) + return defer.gatherResults([d, self.client.deferred]) + + def test_subscribeGetDelayedNotification2(self): + """ + Test that subscribing as second results in a notification being sent. + """ + + def onNotification1(data, headers): + client1.deferred.callback(None) + client1.stopService() + + def onNotification2(data, headers): + client2.deferred.callback(None) + client2.stopService() + + def cb(response): + xmppURI = response['uri'] + self.assertNot(client1.deferred.called) + self.assertNot(client2.deferred.called) + d = self.client.publish(TEST_ENTRY, xmppURI) + d.addCallback(lambda _: xmppURI) + return d + + def cb2(xmppURI): + d = client1.subscribe(xmppURI) + d.addCallback(lambda _: xmppURI) + return d + + def cb3(xmppURI): + d = client2.subscribe(xmppURI) + return d + + client1 = gateway.GatewayClient(baseURI, callbackPort=8088) + client1.startService() + client1.callback = onNotification1 + client1.deferred = defer.Deferred() + client2 = gateway.GatewayClient(baseURI, callbackPort=8089) + client2.startService() + client2.callback = onNotification2 + client2.deferred = defer.Deferred() + + + d = self.client.create() + d.addCallback(cb) + d.addCallback(cb2) + d.addCallback(cb3) + dl = defer.gatherResults([d, client1.deferred, client2.deferred]) + return dl + + + def test_subscribeNonExisting(self): + def cb(err): + self.assertEqual('403', err.status) + + d = self.client.subscribe('xmpp:%s?node=test' % componentJID) + self.assertFailure(d, error.Error) + d.addCallback(cb) + return d + + + def test_subscribeRootGetNotification(self): + + def onNotification(data, headers): + self.client.deferred.callback(None) + + def cb(response): + xmppURI = response['uri'] + jid, nodeIdentifier = gateway.getServiceAndNode(xmppURI) + rootNode = gateway.getXMPPURI(jid, '') + + d = self.client.subscribe(rootNode) + d.addCallback(lambda _: xmppURI) + return d + + def cb2(xmppURI): + return self.client.publish(TEST_ENTRY, xmppURI) + + + self.client.callback = onNotification + self.client.deferred = defer.Deferred() + d = self.client.create() + d.addCallback(cb) + d.addCallback(cb2) + return defer.gatherResults([d, self.client.deferred]) + + + def test_unsubscribeNonExisting(self): + def cb(err): + self.assertEqual('403', err.status) + + d = self.client.unsubscribe('xmpp:%s?node=test' % componentJID) + self.assertFailure(d, error.Error) + d.addCallback(cb) + return d + + + def test_items(self): + def cb(response): + xmppURI = response['uri'] + d = self.client.items(xmppURI) + return d + + d = self.client.publish(TEST_ENTRY) + d.addCallback(cb) + return d + + + def test_itemsMaxItems(self): + def cb(response): + xmppURI = response['uri'] + d = self.client.items(xmppURI, 2) + return d + + d = self.client.publish(TEST_ENTRY) + d.addCallback(cb) + return d diff -r d99047cd90f9 -r 923281d4c5bc sat_pubsub/test/test_storage.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat_pubsub/test/test_storage.py Thu May 17 12:48:14 2012 +0200 @@ -0,0 +1,585 @@ +# Copyright (c) 2003-2010 Ralph Meijer +# See LICENSE for details. + +""" +Tests for L{idavoll.memory_storage} and L{idavoll.pgsql_storage}. +""" + +from zope.interface.verify import verifyObject +from twisted.trial import unittest +from twisted.words.protocols.jabber import jid +from twisted.internet import defer +from twisted.words.xish import domish + +from idavoll import error, iidavoll + +OWNER = jid.JID('owner@example.com/Work') +SUBSCRIBER = jid.JID('subscriber@example.com/Home') +SUBSCRIBER_NEW = jid.JID('new@example.com/Home') +SUBSCRIBER_TO_BE_DELETED = jid.JID('to_be_deleted@example.com/Home') +SUBSCRIBER_PENDING = jid.JID('pending@example.com/Home') +PUBLISHER = jid.JID('publisher@example.com') +ITEM = domish.Element((None, 'item')) +ITEM['id'] = 'current' +ITEM.addElement(('testns', 'test'), content=u'Test \u2083 item') +ITEM_NEW = domish.Element((None, 'item')) +ITEM_NEW['id'] = 'new' +ITEM_NEW.addElement(('testns', 'test'), content=u'Test \u2083 item') +ITEM_UPDATED = domish.Element((None, 'item')) +ITEM_UPDATED['id'] = 'current' +ITEM_UPDATED.addElement(('testns', 'test'), content=u'Test \u2084 item') +ITEM_TO_BE_DELETED = domish.Element((None, 'item')) +ITEM_TO_BE_DELETED['id'] = 'to-be-deleted' +ITEM_TO_BE_DELETED.addElement(('testns', 'test'), content=u'Test \u2083 item') + +def decode(object): + if isinstance(object, str): + object = object.decode('utf-8') + return object + + + +class StorageTests: + + def _assignTestNode(self, node): + self.node = node + + + def setUp(self): + d = self.s.getNode('pre-existing') + d.addCallback(self._assignTestNode) + return d + + + def test_interfaceIStorage(self): + self.assertTrue(verifyObject(iidavoll.IStorage, self.s)) + + + def test_interfaceINode(self): + self.assertTrue(verifyObject(iidavoll.INode, self.node)) + + + def test_interfaceILeafNode(self): + self.assertTrue(verifyObject(iidavoll.ILeafNode, self.node)) + + + def test_getNode(self): + return self.s.getNode('pre-existing') + + + def test_getNonExistingNode(self): + d = self.s.getNode('non-existing') + self.assertFailure(d, error.NodeNotFound) + return d + + + def test_getNodeIDs(self): + def cb(nodeIdentifiers): + self.assertIn('pre-existing', nodeIdentifiers) + self.assertNotIn('non-existing', nodeIdentifiers) + + return self.s.getNodeIds().addCallback(cb) + + + def test_createExistingNode(self): + config = self.s.getDefaultConfiguration('leaf') + config['pubsub#node_type'] = 'leaf' + d = self.s.createNode('pre-existing', OWNER, config) + self.assertFailure(d, error.NodeExists) + return d + + + def test_createNode(self): + def cb(void): + d = self.s.getNode('new 1') + return d + + config = self.s.getDefaultConfiguration('leaf') + config['pubsub#node_type'] = 'leaf' + d = self.s.createNode('new 1', OWNER, config) + d.addCallback(cb) + return d + + + def test_createNodeChangingConfig(self): + """ + The configuration passed to createNode must be free to be changed. + """ + def cb(result): + node1, node2 = result + self.assertTrue(node1.getConfiguration()['pubsub#persist_items']) + + config = { + "pubsub#persist_items": True, + "pubsub#deliver_payloads": True, + "pubsub#send_last_published_item": 'on_sub', + "pubsub#node_type": 'leaf', + } + + def unsetPersistItems(_): + config["pubsub#persist_items"] = False + + d = defer.succeed(None) + d.addCallback(lambda _: self.s.createNode('new 1', OWNER, config)) + d.addCallback(unsetPersistItems) + d.addCallback(lambda _: self.s.createNode('new 2', OWNER, config)) + d.addCallback(lambda _: defer.gatherResults([ + self.s.getNode('new 1'), + self.s.getNode('new 2')])) + d.addCallback(cb) + return d + + + def test_deleteNonExistingNode(self): + d = self.s.deleteNode('non-existing') + self.assertFailure(d, error.NodeNotFound) + return d + + + def test_deleteNode(self): + def cb(void): + d = self.s.getNode('to-be-deleted') + self.assertFailure(d, error.NodeNotFound) + return d + + d = self.s.deleteNode('to-be-deleted') + d.addCallback(cb) + return d + + + def test_getAffiliations(self): + def cb(affiliations): + self.assertIn(('pre-existing', 'owner'), affiliations) + + d = self.s.getAffiliations(OWNER) + d.addCallback(cb) + return d + + + def test_getSubscriptions(self): + def cb(subscriptions): + found = False + for subscription in subscriptions: + if (subscription.nodeIdentifier == 'pre-existing' and + subscription.subscriber == SUBSCRIBER and + subscription.state == 'subscribed'): + found = True + self.assertTrue(found) + + d = self.s.getSubscriptions(SUBSCRIBER) + d.addCallback(cb) + return d + + + # Node tests + + def test_getType(self): + self.assertEqual(self.node.getType(), 'leaf') + + + def test_getConfiguration(self): + config = self.node.getConfiguration() + self.assertIn('pubsub#persist_items', config.iterkeys()) + self.assertIn('pubsub#deliver_payloads', config.iterkeys()) + self.assertEqual(config['pubsub#persist_items'], True) + self.assertEqual(config['pubsub#deliver_payloads'], True) + + + def test_setConfiguration(self): + def getConfig(node): + d = node.setConfiguration({'pubsub#persist_items': False}) + d.addCallback(lambda _: node) + return d + + def checkObjectConfig(node): + config = node.getConfiguration() + self.assertEqual(config['pubsub#persist_items'], False) + + def getNode(void): + return self.s.getNode('to-be-reconfigured') + + def checkStorageConfig(node): + config = node.getConfiguration() + self.assertEqual(config['pubsub#persist_items'], False) + + d = self.s.getNode('to-be-reconfigured') + d.addCallback(getConfig) + d.addCallback(checkObjectConfig) + d.addCallback(getNode) + d.addCallback(checkStorageConfig) + return d + + + def test_getMetaData(self): + metaData = self.node.getMetaData() + for key, value in self.node.getConfiguration().iteritems(): + self.assertIn(key, metaData.iterkeys()) + self.assertEqual(value, metaData[key]) + self.assertIn('pubsub#node_type', metaData.iterkeys()) + self.assertEqual(metaData['pubsub#node_type'], 'leaf') + + + def test_getAffiliation(self): + def cb(affiliation): + self.assertEqual(affiliation, 'owner') + + d = self.node.getAffiliation(OWNER) + d.addCallback(cb) + return d + + + def test_getNonExistingAffiliation(self): + def cb(affiliation): + self.assertEqual(affiliation, None) + + d = self.node.getAffiliation(SUBSCRIBER) + d.addCallback(cb) + return d + + + def test_addSubscription(self): + def cb1(void): + return self.node.getSubscription(SUBSCRIBER_NEW) + + def cb2(subscription): + self.assertEqual(subscription.state, 'pending') + + d = self.node.addSubscription(SUBSCRIBER_NEW, 'pending', {}) + d.addCallback(cb1) + d.addCallback(cb2) + return d + + + def test_addExistingSubscription(self): + d = self.node.addSubscription(SUBSCRIBER, 'pending', {}) + self.assertFailure(d, error.SubscriptionExists) + return d + + + def test_getSubscription(self): + def cb(subscriptions): + self.assertEquals(subscriptions[0].state, 'subscribed') + self.assertEquals(subscriptions[1].state, 'pending') + self.assertEquals(subscriptions[2], None) + + d = defer.gatherResults([self.node.getSubscription(SUBSCRIBER), + self.node.getSubscription(SUBSCRIBER_PENDING), + self.node.getSubscription(OWNER)]) + d.addCallback(cb) + return d + + + def test_removeSubscription(self): + return self.node.removeSubscription(SUBSCRIBER_TO_BE_DELETED) + + + def test_removeNonExistingSubscription(self): + d = self.node.removeSubscription(OWNER) + self.assertFailure(d, error.NotSubscribed) + return d + + + def test_getNodeSubscriptions(self): + def extractSubscribers(subscriptions): + return [subscription.subscriber for subscription in subscriptions] + + def cb(subscribers): + self.assertIn(SUBSCRIBER, subscribers) + self.assertNotIn(SUBSCRIBER_PENDING, subscribers) + self.assertNotIn(OWNER, subscribers) + + d = self.node.getSubscriptions('subscribed') + d.addCallback(extractSubscribers) + d.addCallback(cb) + return d + + + def test_isSubscriber(self): + def cb(subscribed): + self.assertEquals(subscribed[0][1], True) + self.assertEquals(subscribed[1][1], True) + self.assertEquals(subscribed[2][1], False) + self.assertEquals(subscribed[3][1], False) + + d = defer.DeferredList([self.node.isSubscribed(SUBSCRIBER), + self.node.isSubscribed(SUBSCRIBER.userhostJID()), + self.node.isSubscribed(SUBSCRIBER_PENDING), + self.node.isSubscribed(OWNER)]) + d.addCallback(cb) + return d + + + def test_storeItems(self): + def cb1(void): + return self.node.getItemsById(['new']) + + def cb2(result): + self.assertEqual(ITEM_NEW.toXml(), result[0].toXml()) + + d = self.node.storeItems([ITEM_NEW], PUBLISHER) + d.addCallback(cb1) + d.addCallback(cb2) + return d + + + def test_storeUpdatedItems(self): + def cb1(void): + return self.node.getItemsById(['current']) + + def cb2(result): + self.assertEqual(ITEM_UPDATED.toXml(), result[0].toXml()) + + d = self.node.storeItems([ITEM_UPDATED], PUBLISHER) + d.addCallback(cb1) + d.addCallback(cb2) + return d + + + def test_removeItems(self): + def cb1(result): + self.assertEqual(['to-be-deleted'], result) + return self.node.getItemsById(['to-be-deleted']) + + def cb2(result): + self.assertEqual(0, len(result)) + + d = self.node.removeItems(['to-be-deleted']) + d.addCallback(cb1) + d.addCallback(cb2) + return d + + + def test_removeNonExistingItems(self): + def cb(result): + self.assertEqual([], result) + + d = self.node.removeItems(['non-existing']) + d.addCallback(cb) + return d + + + def test_getItems(self): + def cb(result): + items = [item.toXml() for item in result] + self.assertIn(ITEM.toXml(), items) + + d = self.node.getItems() + d.addCallback(cb) + return d + + + def test_lastItem(self): + def cb(result): + self.assertEqual(1, len(result)) + self.assertEqual(ITEM.toXml(), result[0].toXml()) + + d = self.node.getItems(1) + d.addCallback(cb) + return d + + + def test_getItemsById(self): + def cb(result): + self.assertEqual(1, len(result)) + + d = self.node.getItemsById(['current']) + d.addCallback(cb) + return d + + + def test_getNonExistingItemsById(self): + def cb(result): + self.assertEqual(0, len(result)) + + d = self.node.getItemsById(['non-existing']) + d.addCallback(cb) + return d + + + def test_purge(self): + def cb1(node): + d = node.purge() + d.addCallback(lambda _: node) + return d + + def cb2(node): + return node.getItems() + + def cb3(result): + self.assertEqual([], result) + + d = self.s.getNode('to-be-purged') + d.addCallback(cb1) + d.addCallback(cb2) + d.addCallback(cb3) + return d + + + def test_getNodeAffilatiations(self): + def cb1(node): + return node.getAffiliations() + + def cb2(affiliations): + affiliations = dict(((a[0].full(), a[1]) for a in affiliations)) + self.assertEquals(affiliations[OWNER.userhost()], 'owner') + + d = self.s.getNode('pre-existing') + d.addCallback(cb1) + d.addCallback(cb2) + return d + + + +class MemoryStorageStorageTestCase(unittest.TestCase, StorageTests): + + def setUp(self): + from idavoll.memory_storage import Storage, PublishedItem, LeafNode + from idavoll.memory_storage import Subscription + + defaultConfig = Storage.defaultConfig['leaf'] + + self.s = Storage() + self.s._nodes['pre-existing'] = \ + LeafNode('pre-existing', OWNER, defaultConfig) + self.s._nodes['to-be-deleted'] = \ + LeafNode('to-be-deleted', OWNER, None) + self.s._nodes['to-be-reconfigured'] = \ + LeafNode('to-be-reconfigured', OWNER, defaultConfig) + self.s._nodes['to-be-purged'] = \ + LeafNode('to-be-purged', OWNER, None) + + subscriptions = self.s._nodes['pre-existing']._subscriptions + subscriptions[SUBSCRIBER.full()] = Subscription('pre-existing', + SUBSCRIBER, + 'subscribed') + subscriptions[SUBSCRIBER_TO_BE_DELETED.full()] = \ + Subscription('pre-existing', SUBSCRIBER_TO_BE_DELETED, + 'subscribed') + subscriptions[SUBSCRIBER_PENDING.full()] = \ + Subscription('pre-existing', SUBSCRIBER_PENDING, + 'pending') + + item = PublishedItem(ITEM_TO_BE_DELETED, PUBLISHER) + self.s._nodes['pre-existing']._items['to-be-deleted'] = item + self.s._nodes['pre-existing']._itemlist.append(item) + self.s._nodes['to-be-purged']._items['to-be-deleted'] = item + self.s._nodes['to-be-purged']._itemlist.append(item) + item = PublishedItem(ITEM, PUBLISHER) + self.s._nodes['pre-existing']._items['current'] = item + self.s._nodes['pre-existing']._itemlist.append(item) + + return StorageTests.setUp(self) + + + +class PgsqlStorageStorageTestCase(unittest.TestCase, StorageTests): + + dbpool = None + + def setUp(self): + from idavoll.pgsql_storage import Storage + from twisted.enterprise import adbapi + if self.dbpool is None: + self.__class__.dbpool = adbapi.ConnectionPool('psycopg2', + database='pubsub_test', + cp_reconnect=True, + client_encoding='utf-8', + ) + self.s = Storage(self.dbpool) + self.dbpool.start() + d = self.dbpool.runInteraction(self.init) + d.addCallback(lambda _: StorageTests.setUp(self)) + return d + + + def tearDown(self): + return self.dbpool.runInteraction(self.cleandb) + + + def init(self, cursor): + self.cleandb(cursor) + cursor.execute("""INSERT INTO nodes + (node, node_type, persist_items) + VALUES ('pre-existing', 'leaf', TRUE)""") + cursor.execute("""INSERT INTO nodes (node) VALUES ('to-be-deleted')""") + cursor.execute("""INSERT INTO nodes (node) VALUES ('to-be-reconfigured')""") + cursor.execute("""INSERT INTO nodes (node) VALUES ('to-be-purged')""") + cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", + (OWNER.userhost(),)) + cursor.execute("""INSERT INTO affiliations + (node_id, entity_id, affiliation) + SELECT node_id, entity_id, 'owner' + FROM nodes, entities + WHERE node='pre-existing' AND jid=%s""", + (OWNER.userhost(),)) + cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", + (SUBSCRIBER.userhost(),)) + cursor.execute("""INSERT INTO subscriptions + (node_id, entity_id, resource, state) + SELECT node_id, entity_id, %s, 'subscribed' + FROM nodes, entities + WHERE node='pre-existing' AND jid=%s""", + (SUBSCRIBER.resource, + SUBSCRIBER.userhost())) + cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", + (SUBSCRIBER_TO_BE_DELETED.userhost(),)) + cursor.execute("""INSERT INTO subscriptions + (node_id, entity_id, resource, state) + SELECT node_id, entity_id, %s, 'subscribed' + FROM nodes, entities + WHERE node='pre-existing' AND jid=%s""", + (SUBSCRIBER_TO_BE_DELETED.resource, + SUBSCRIBER_TO_BE_DELETED.userhost())) + cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", + (SUBSCRIBER_PENDING.userhost(),)) + cursor.execute("""INSERT INTO subscriptions + (node_id, entity_id, resource, state) + SELECT node_id, entity_id, %s, 'pending' + FROM nodes, entities + WHERE node='pre-existing' AND jid=%s""", + (SUBSCRIBER_PENDING.resource, + SUBSCRIBER_PENDING.userhost())) + cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", + (PUBLISHER.userhost(),)) + cursor.execute("""INSERT INTO items + (node_id, publisher, item, data, date) + SELECT node_id, %s, 'to-be-deleted', %s, + now() - interval '1 day' + FROM nodes + WHERE node='pre-existing'""", + (PUBLISHER.userhost(), + ITEM_TO_BE_DELETED.toXml())) + cursor.execute("""INSERT INTO items (node_id, publisher, item, data) + SELECT node_id, %s, 'to-be-deleted', %s + FROM nodes + WHERE node='to-be-purged'""", + (PUBLISHER.userhost(), + ITEM_TO_BE_DELETED.toXml())) + cursor.execute("""INSERT INTO items (node_id, publisher, item, data) + SELECT node_id, %s, 'current', %s + FROM nodes + WHERE node='pre-existing'""", + (PUBLISHER.userhost(), + ITEM.toXml())) + + + def cleandb(self, cursor): + cursor.execute("""DELETE FROM nodes WHERE node in + ('non-existing', 'pre-existing', 'to-be-deleted', + 'new 1', 'new 2', 'new 3', 'to-be-reconfigured', + 'to-be-purged')""") + cursor.execute("""DELETE FROM entities WHERE jid=%s""", + (OWNER.userhost(),)) + cursor.execute("""DELETE FROM entities WHERE jid=%s""", + (SUBSCRIBER.userhost(),)) + cursor.execute("""DELETE FROM entities WHERE jid=%s""", + (SUBSCRIBER_TO_BE_DELETED.userhost(),)) + cursor.execute("""DELETE FROM entities WHERE jid=%s""", + (SUBSCRIBER_PENDING.userhost(),)) + cursor.execute("""DELETE FROM entities WHERE jid=%s""", + (PUBLISHER.userhost(),)) + +try: + import psycopg2 +except ImportError: + PgsqlStorageStorageTestCase.skip = "Psycopg2 not available"