Mercurial > libervia-pubsub
changeset 406:a58610ab2983
removed old code:
- removed gateway which was an HTTP gateway inherited from Idavoll and which is not used
is SàT Pubsub
- removed memory_storage which has not been maintained since Idavoll and which is expected
to be replaced by a sqlite-based backend
- simplejson dependency is not used anymore
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 16 Aug 2019 12:00:03 +0200 |
parents | c56a728412f1 |
children | d58599801c23 |
files | sat_pubsub/gateway.py sat_pubsub/memory_storage.py sat_pubsub/pgsql_storage.py sat_pubsub/test/test_gateway.py setup.py twisted/plugins/pubsub.py |
diffstat | 6 files changed, 3 insertions(+), 2183 deletions(-) [+] |
line wrap: on
line diff
--- a/sat_pubsub/gateway.py Fri Aug 16 12:00:02 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,899 +0,0 @@ -#!/usr/bin/python -#-*- coding: utf-8 -*- - -# Copyright (c) 2003-2011 Ralph Meijer -# Copyright (c) 2012-2019 Jérôme Poisson - - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. - -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -# -- - -# This program is based on Idavoll (http://idavoll.ik.nu/), -# originaly written by Ralph Meijer (http://ralphm.net/blog/) -# It is sublicensed under AGPL v3 (or any later version) as allowed by the original -# license. - -# -- - -# Here is a copy of the original license: - -# Copyright (c) 2003-2011 Ralph Meijer - -# Permission is hereby granted, free of charge, to any person obtaining -# a copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, -# distribute, sublicense, and/or sell copies of the Software, and to -# permit persons to whom the Software is furnished to do so, subject to -# the following conditions: - -# The above copyright notice and this permission notice shall be -# included in all copies or substantial portions of the Software. - -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE -# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION -# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - - -""" -Web resources and client for interacting with pubsub services. -""" - -import mimetools -from time import gmtime, strftime -from StringIO import StringIO -import urllib -import urlparse - -import simplejson - -from twisted.application import service -from twisted.internet import defer, reactor -from twisted.python import log -from twisted.web import client, http, resource, server -from twisted.web.error import Error -from twisted.words.protocols.jabber.jid import JID -from twisted.words.protocols.jabber.error import StanzaError -from twisted.words.xish import domish - -from wokkel.generic import parseXml -from wokkel.pubsub import Item -from wokkel.pubsub import PubSubClient - -from sat_pubsub import error - -NS_ATOM = 'http://www.w3.org/2005/Atom' -MIME_ATOM_ENTRY = b'application/atom+xml;type=entry' -MIME_ATOM_FEED = b'application/atom+xml;type=feed' -MIME_JSON = b'application/json' - -class XMPPURIParseError(ValueError): - """ - Raised when a given XMPP URI couldn't be properly parsed. - """ - - - -def getServiceAndNode(uri): - """ - Given an XMPP URI, extract the publish subscribe service JID and node ID. - """ - - try: - scheme, rest = uri.split(':', 1) - except ValueError: - raise XMPPURIParseError("No URI scheme component") - - if scheme != 'xmpp': - raise XMPPURIParseError("Unknown URI scheme") - - if rest.startswith("//"): - raise XMPPURIParseError("Unexpected URI authority component") - - try: - entity, query = rest.split('?', 1) - except ValueError: - entity, query = rest, '' - - if not entity: - raise XMPPURIParseError("Empty URI path component") - - try: - service = JID(entity) - except Exception, e: - raise XMPPURIParseError("Invalid JID: %s" % e) - - params = urlparse.parse_qs(query) - - try: - nodeIdentifier = params['node'][0] - except (KeyError, ValueError): - nodeIdentifier = '' - - return service, nodeIdentifier - - - -def getXMPPURI(service, nodeIdentifier): - """ - Construct an XMPP URI from a service JID and node identifier. - """ - return "xmpp:%s?;node=%s" % (service.full(), nodeIdentifier or '') - - - -def _parseContentType(header): - """ - Parse a Content-Type header value to a L{mimetools.Message}. - - L{mimetools.Message} parses a Content-Type header and makes the - components available with its C{getmaintype}, C{getsubtype}, C{gettype}, - C{getplist} and C{getparam} methods. - """ - return mimetools.Message(StringIO(b'Content-Type: ' + header)) - - - -def _asyncResponse(render): - """ - """ - def wrapped(self, request): - def eb(failure): - if failure.check(Error): - err = failure.value - else: - log.err(failure) - err = Error(500) - request.setResponseCode(err.status, err.message) - return err.response - - def finish(result): - if result is server.NOT_DONE_YET: - return - - if result: - request.write(result) - request.finish() - - d = defer.maybeDeferred(render, self, request) - d.addErrback(eb) - d.addCallback(finish) - - return server.NOT_DONE_YET - - return wrapped - - - -class CreateResource(resource.Resource): - """ - A resource to create a publish-subscribe node. - """ - def __init__(self, backend, serviceJID, owner): - self.backend = backend - self.serviceJID = serviceJID - self.owner = owner - - - http_GET = None - - - @_asyncResponse - def render_POST(self, request): - """ - Respond to a POST request to create a new node. - """ - - def toResponse(nodeIdentifier): - uri = getXMPPURI(self.serviceJID, nodeIdentifier) - body = simplejson.dumps({'uri': uri}) - request.setHeader(b'Content-Type', MIME_JSON) - return body - - d = self.backend.createNode(None, self.owner) - d.addCallback(toResponse) - return d - - - -class DeleteResource(resource.Resource): - """ - A resource to create a publish-subscribe node. - """ - def __init__(self, backend, serviceJID, owner): - self.backend = backend - self.serviceJID = serviceJID - self.owner = owner - - - render_GET = None - - - @_asyncResponse - def render_POST(self, request): - """ - Respond to a POST request to create a new node. - """ - def toResponse(result): - request.setResponseCode(http.NO_CONTENT) - - def trapNotFound(failure): - failure.trap(error.NodeNotFound) - raise Error(http.NOT_FOUND, "Node not found") - - if not request.args.get('uri'): - raise Error(http.BAD_REQUEST, "No URI given") - - try: - jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0]) - except XMPPURIParseError, e: - raise Error(http.BAD_REQUEST, "Malformed XMPP URI: %s" % e) - - - data = request.content.read() - if data: - params = simplejson.loads(data) - redirectURI = params.get('redirect_uri', None) - else: - redirectURI = None - - d = self.backend.deleteNode(nodeIdentifier, self.owner, - redirectURI) - d.addCallback(toResponse) - d.addErrback(trapNotFound) - return d - - - -class PublishResource(resource.Resource): - """ - A resource to publish to a publish-subscribe node. - """ - - def __init__(self, backend, serviceJID, owner): - self.backend = backend - self.serviceJID = serviceJID - self.owner = owner - - - render_GET = None - - - def checkMediaType(self, request): - ctype = request.getHeader(b'content-type') - - if not ctype: - request.setResponseCode(http.BAD_REQUEST) - - raise Error(http.BAD_REQUEST, b"No specified Media Type") - - message = _parseContentType(ctype) - if (message.maintype != b'application' or - message.subtype != b'atom+xml' or - message.getparam(b'type') != b'entry' or - (message.getparam(b'charset') or b'utf-8') != b'utf-8'): - raise Error(http.UNSUPPORTED_MEDIA_TYPE, - b"Unsupported Media Type: %s" % ctype) - - - @_asyncResponse - def render_POST(self, request): - """ - Respond to a POST request to create a new item. - """ - - def toResponse(nodeIdentifier): - uri = getXMPPURI(self.serviceJID, nodeIdentifier) - body = simplejson.dumps({'uri': uri}) - request.setHeader(b'Content-Type', MIME_JSON) - return body - - def gotNode(nodeIdentifier, payload): - item = Item(id='current', payload=payload) - d = self.backend.publish(nodeIdentifier, [item], self.owner) - d.addCallback(lambda _: nodeIdentifier) - return d - - def getNode(): - if request.args.get('uri'): - jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0]) - return defer.succeed(nodeIdentifier) - else: - return self.backend.createNode(None, self.owner) - - def trapNotFound(failure): - failure.trap(error.NodeNotFound) - raise Error(http.NOT_FOUND, "Node not found") - - def trapXMPPURIParseError(failure): - failure.trap(XMPPURIParseError) - raise Error(http.BAD_REQUEST, - "Malformed XMPP URI: %s" % failure.value) - - self.checkMediaType(request) - payload = parseXml(request.content.read()) - d = getNode() - d.addCallback(gotNode, payload) - d.addCallback(toResponse) - d.addErrback(trapNotFound) - d.addErrback(trapXMPPURIParseError) - return d - - - -class ListResource(resource.Resource): - def __init__(self, service): - self.service = service - - - @_asyncResponse - def render_GET(self, request): - def responseFromNodes(nodeIdentifiers): - body = simplejson.dumps(nodeIdentifiers) - request.setHeader(b'Content-Type', MIME_JSON) - return body - - d = self.service.getNodes() - d.addCallback(responseFromNodes) - return d - - - -# Service for subscribing to remote XMPP Pubsub nodes and web resources - -def extractAtomEntries(items): - """ - Extract atom entries from a list of publish-subscribe items. - - @param items: List of L{domish.Element}s that represent publish-subscribe - items. - @type items: C{list} - """ - - atomEntries = [] - - for item in items: - # ignore non-items (i.e. retractions) - if item.name != 'item': - continue - - atomEntry = None - for element in item.elements(): - # extract the first element that is an atom entry - if element.uri == NS_ATOM and element.name == 'entry': - atomEntry = element - break - - if atomEntry: - atomEntries.append(atomEntry) - - return atomEntries - - - -def constructFeed(service, nodeIdentifier, entries, title): - nodeURI = getXMPPURI(service, nodeIdentifier) - now = strftime("%Y-%m-%dT%H:%M:%SZ", gmtime()) - - # Collect the received entries in a feed - feed = domish.Element((NS_ATOM, 'feed')) - feed.addElement('title', content=title) - feed.addElement('id', content=nodeURI) - feed.addElement('updated', content=now) - - for entry in entries: - feed.addChild(entry) - - return feed - - - -class RemoteSubscriptionService(service.Service, PubSubClient): - """ - Service for subscribing to remote XMPP Publish-Subscribe nodes. - - Subscriptions are created with a callback HTTP URI that is POSTed - to with the received items in notifications. - """ - - def __init__(self, jid, storage): - self.jid = jid - self.storage = storage - - - def trapNotFound(self, failure): - failure.trap(StanzaError) - - if failure.value.condition == 'item-not-found': - raise error.NodeNotFound() - else: - return failure - - - def subscribeCallback(self, jid, nodeIdentifier, callback): - """ - Subscribe a callback URI. - - This registers a callback URI to be called when a notification is - received for the given node. - - If this is the first callback registered for this node, the gateway - will subscribe to the node. Otherwise, the most recently published item - for this node is retrieved and, if present, the newly registered - callback will be called with that item. - """ - - def callbackForLastItem(items): - atomEntries = extractAtomEntries(items) - - if not atomEntries: - return - - self._postTo([callback], jid, nodeIdentifier, atomEntries[0], - MIME_ATOM_ENTRY) - - def subscribeOrItems(hasCallbacks): - if hasCallbacks: - if not nodeIdentifier: - return None - d = self.items(jid, nodeIdentifier, 1) - d.addCallback(callbackForLastItem) - else: - d = self.subscribe(jid, nodeIdentifier, self.jid) - - d.addErrback(self.trapNotFound) - return d - - d = self.storage.hasCallbacks(jid, nodeIdentifier) - d.addCallback(subscribeOrItems) - d.addCallback(lambda _: self.storage.addCallback(jid, nodeIdentifier, - callback)) - return d - - - def unsubscribeCallback(self, jid, nodeIdentifier, callback): - """ - Unsubscribe a callback. - - If this was the last registered callback for this node, the - gateway will unsubscribe from node. - """ - - def cb(last): - if last: - return self.unsubscribe(jid, nodeIdentifier, self.jid) - - d = self.storage.removeCallback(jid, nodeIdentifier, callback) - d.addCallback(cb) - return d - - - def itemsReceived(self, event): - """ - Fire up HTTP client to do callback - """ - - atomEntries = extractAtomEntries(event.items) - service = event.sender - nodeIdentifier = event.nodeIdentifier - headers = event.headers - - # Don't notify if there are no atom entries - if not atomEntries: - return - - if len(atomEntries) == 1: - contentType = MIME_ATOM_ENTRY - payload = atomEntries[0] - else: - contentType = MIME_ATOM_FEED - payload = constructFeed(service, nodeIdentifier, atomEntries, - title='Received item collection') - - self.callCallbacks(service, nodeIdentifier, payload, contentType) - - if 'Collection' in headers: - for collection in headers['Collection']: - nodeIdentifier = collection or '' - self.callCallbacks(service, nodeIdentifier, payload, - contentType) - - - def deleteReceived(self, event): - """ - Fire up HTTP client to do callback - """ - - service = event.sender - nodeIdentifier = event.nodeIdentifier - redirectURI = event.redirectURI - self.callCallbacks(service, nodeIdentifier, eventType='DELETED', - redirectURI=redirectURI) - - - def _postTo(self, callbacks, service, nodeIdentifier, - payload=None, contentType=None, eventType=None, - redirectURI=None): - - if not callbacks: - return - - postdata = None - nodeURI = getXMPPURI(service, nodeIdentifier) - headers = {'Referer': nodeURI.encode('utf-8'), - 'PubSub-Service': service.full().encode('utf-8')} - - if payload: - postdata = payload.toXml().encode('utf-8') - if contentType: - headers['Content-Type'] = "%s;charset=utf-8" % contentType - - if eventType: - headers['Event'] = eventType - - if redirectURI: - headers['Link'] = '<%s>; rel=alternate' % ( - redirectURI.encode('utf-8'), - ) - - def postNotification(callbackURI): - f = getPageWithFactory(str(callbackURI), - method='POST', - postdata=postdata, - headers=headers) - d = f.deferred - d.addErrback(log.err) - - for callbackURI in callbacks: - reactor.callLater(0, postNotification, callbackURI) - - - def callCallbacks(self, service, nodeIdentifier, - payload=None, contentType=None, eventType=None, - redirectURI=None): - - def eb(failure): - failure.trap(error.NoCallbacks) - - # No callbacks were registered for this node. Unsubscribe? - - d = self.storage.getCallbacks(service, nodeIdentifier) - d.addCallback(self._postTo, service, nodeIdentifier, payload, - contentType, eventType, redirectURI) - d.addErrback(eb) - d.addErrback(log.err) - - - -class RemoteSubscribeBaseResource(resource.Resource): - """ - Base resource for remote pubsub node subscription and unsubscription. - - This resource accepts POST request with a JSON document that holds - a dictionary with the keys C{uri} and C{callback} that respectively map - to the XMPP URI of the publish-subscribe node and the callback URI. - - This class should be inherited with L{serviceMethod} overridden. - - @cvar serviceMethod: The name of the method to be called with - the JID of the pubsub service, the node identifier - and the callback URI as received in the HTTP POST - request to this resource. - """ - serviceMethod = None - errorMap = { - error.NodeNotFound: - (http.FORBIDDEN, "Node not found"), - error.NotSubscribed: - (http.FORBIDDEN, "No such subscription found"), - error.SubscriptionExists: - (http.FORBIDDEN, "Subscription already exists"), - } - - def __init__(self, service): - self.service = service - self.params = None - - - render_GET = None - - - @_asyncResponse - def render_POST(self, request): - def trapNotFound(failure): - err = failure.trap(*self.errorMap.keys()) - status, message = self.errorMap[err] - raise Error(status, message) - - def toResponse(result): - request.setResponseCode(http.NO_CONTENT) - return b'' - - def trapXMPPURIParseError(failure): - failure.trap(XMPPURIParseError) - raise Error(http.BAD_REQUEST, - "Malformed XMPP URI: %s" % failure.value) - - data = request.content.read() - self.params = simplejson.loads(data) - - uri = self.params['uri'] - callback = self.params['callback'] - - jid, nodeIdentifier = getServiceAndNode(uri) - method = getattr(self.service, self.serviceMethod) - d = method(jid, nodeIdentifier, callback) - d.addCallback(toResponse) - d.addErrback(trapNotFound) - d.addErrback(trapXMPPURIParseError) - return d - - - -class RemoteSubscribeResource(RemoteSubscribeBaseResource): - """ - Resource to subscribe to a remote publish-subscribe node. - - The passed C{uri} is the XMPP URI of the node to subscribe to and the - C{callback} is the callback URI. Upon receiving notifications from the - node, a POST request will be perfomed on the callback URI. - """ - serviceMethod = 'subscribeCallback' - - - -class RemoteUnsubscribeResource(RemoteSubscribeBaseResource): - """ - Resource to unsubscribe from a remote publish-subscribe node. - - The passed C{uri} is the XMPP URI of the node to unsubscribe from and the - C{callback} is the callback URI that was registered for it. - """ - serviceMethod = 'unsubscribeCallback' - - - -class RemoteItemsResource(resource.Resource): - """ - Resource for retrieving items from a remote pubsub node. - """ - - def __init__(self, service): - self.service = service - - - @_asyncResponse - def render_GET(self, request): - try: - maxItems = int(request.args.get('max_items', [0])[0]) or None - except ValueError: - raise Error(http.BAD_REQUEST, - "The argument max_items has an invalid value.") - - try: - uri = request.args['uri'][0] - except KeyError: - raise Error(http.BAD_REQUEST, - "No URI for the remote node provided.") - - try: - jid, nodeIdentifier = getServiceAndNode(uri) - except XMPPURIParseError: - raise Error(http.BAD_REQUEST, - "Malformed XMPP URI: %s" % uri) - - def toResponse(items): - """ - Create a feed out the retrieved items. - """ - atomEntries = extractAtomEntries(items) - feed = constructFeed(jid, nodeIdentifier, atomEntries, - "Retrieved item collection") - body = feed.toXml().encode('utf-8') - request.setHeader(b'Content-Type', MIME_ATOM_FEED) - return body - - def trapNotFound(failure): - failure.trap(StanzaError) - if not failure.value.condition == 'item-not-found': - raise failure - raise Error(http.NOT_FOUND, "Node not found") - - d = self.service.items(jid, nodeIdentifier, maxItems) - d.addCallback(toResponse) - d.addErrback(trapNotFound) - return d - - - -# Client side code to interact with a service as provided above - -def getPageWithFactory(url, contextFactory=None, *args, **kwargs): - """Download a web page. - - Download a page. Return the factory that holds a deferred, which will - callback with a page (as a string) or errback with a description of the - error. - - See HTTPClientFactory to see what extra args can be passed. - """ - - factory = client.HTTPClientFactory(url, *args, **kwargs) - factory.protocol.handleStatus_204 = lambda self: self.handleStatus_200() - - if factory.scheme == 'https': - from twisted.internet import ssl - if contextFactory is None: - contextFactory = ssl.ClientContextFactory() - reactor.connectSSL(factory.host, factory.port, factory, contextFactory) - else: - reactor.connectTCP(factory.host, factory.port, factory) - return factory - - - -class CallbackResource(resource.Resource): - """ - Web resource for retrieving gateway notifications. - """ - - def __init__(self, callback): - self.callback = callback - - - http_GET = None - - - def render_POST(self, request): - if request.requestHeaders.hasHeader(b'Event'): - payload = None - else: - payload = parseXml(request.content.read()) - - self.callback(payload, request.requestHeaders) - - request.setResponseCode(http.NO_CONTENT) - return b'' - - - - -class GatewayClient(service.Service): - """ - Service that provides client access to the HTTP Gateway into Idavoll. - """ - - agent = "Idavoll HTTP Gateway Client" - - def __init__(self, baseURI, callbackHost=None, callbackPort=None): - self.baseURI = baseURI - self.callbackHost = callbackHost or 'localhost' - self.callbackPort = callbackPort or 8087 - root = resource.Resource() - root.putChild('callback', CallbackResource( - lambda *args, **kwargs: self.callback(*args, **kwargs))) - self.site = server.Site(root) - - - def startService(self): - self.port = reactor.listenTCP(self.callbackPort, - self.site) - - - def stopService(self): - return self.port.stopListening() - - - def _makeURI(self, verb, query=None): - uriComponents = urlparse.urlparse(self.baseURI) - uri = urlparse.urlunparse((uriComponents[0], - uriComponents[1], - uriComponents[2] + verb, - '', - query and urllib.urlencode(query) or '', - '')) - return uri - - - def callback(self, data, headers): - pass - - - def ping(self): - f = getPageWithFactory(self._makeURI(''), - method='HEAD', - agent=self.agent) - return f.deferred - - - def create(self): - f = getPageWithFactory(self._makeURI('create'), - method='POST', - agent=self.agent) - return f.deferred.addCallback(simplejson.loads) - - - def delete(self, xmppURI, redirectURI=None): - query = {'uri': xmppURI} - - if redirectURI: - params = {'redirect_uri': redirectURI} - postdata = simplejson.dumps(params) - headers = {'Content-Type': MIME_JSON} - else: - postdata = None - headers = None - - f = getPageWithFactory(self._makeURI('delete', query), - method='POST', - postdata=postdata, - headers=headers, - agent=self.agent) - return f.deferred - - - def publish(self, entry, xmppURI=None): - query = xmppURI and {'uri': xmppURI} - - f = getPageWithFactory(self._makeURI('publish', query), - method='POST', - postdata=entry.toXml().encode('utf-8'), - headers={'Content-Type': MIME_ATOM_ENTRY}, - agent=self.agent) - return f.deferred.addCallback(simplejson.loads) - - - def listNodes(self): - f = getPageWithFactory(self._makeURI('list'), - method='GET', - agent=self.agent) - return f.deferred.addCallback(simplejson.loads) - - - def subscribe(self, xmppURI): - params = {'uri': xmppURI, - 'callback': 'http://%s:%s/callback' % (self.callbackHost, - self.callbackPort)} - f = getPageWithFactory(self._makeURI('subscribe'), - method='POST', - postdata=simplejson.dumps(params), - headers={'Content-Type': MIME_JSON}, - agent=self.agent) - return f.deferred - - - def unsubscribe(self, xmppURI): - params = {'uri': xmppURI, - 'callback': 'http://%s:%s/callback' % (self.callbackHost, - self.callbackPort)} - f = getPageWithFactory(self._makeURI('unsubscribe'), - method='POST', - postdata=simplejson.dumps(params), - headers={'Content-Type': MIME_JSON}, - agent=self.agent) - return f.deferred - - - def items(self, xmppURI, maxItems=None): - query = {'uri': xmppURI} - if maxItems is not None: - query['max_items'] = int(maxItems) - f = getPageWithFactory(self._makeURI('items', query), - method='GET', - agent=self.agent) - return f.deferred
--- a/sat_pubsub/memory_storage.py Fri Aug 16 12:00:02 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,380 +0,0 @@ -#!/usr/bin/python -#-*- coding: utf-8 -*- - -# Copyright (c) 2003-2011 Ralph Meijer -# Copyright (c) 2012-2019 Jérôme Poisson - - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. - -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -# -- - -# This program is based on Idavoll (http://idavoll.ik.nu/), -# originaly written by Ralph Meijer (http://ralphm.net/blog/) -# It is sublicensed under AGPL v3 (or any later version) as allowed by the original -# license. - -# -- - -# Here is a copy of the original license: - -# Copyright (c) 2003-2011 Ralph Meijer - -# Permission is hereby granted, free of charge, to any person obtaining -# a copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, -# distribute, sublicense, and/or sell copies of the Software, and to -# permit persons to whom the Software is furnished to do so, subject to -# the following conditions: - -# The above copyright notice and this permission notice shall be -# included in all copies or substantial portions of the Software. - -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE -# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION -# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - - -import copy -from zope.interface import implements -from twisted.internet import defer -from twisted.words.protocols.jabber import jid - -from wokkel.pubsub import Subscription - -from sat_pubsub import error, iidavoll - -class Storage: - - implements(iidavoll.IStorage) - - defaultConfig = { - 'leaf': { - "pubsub#persist_items": True, - "pubsub#deliver_payloads": True, - "pubsub#send_last_published_item": 'on_sub', - }, - 'collection': { - "pubsub#deliver_payloads": True, - "pubsub#send_last_published_item": 'on_sub', - } - } - - def __init__(self): - rootNode = CollectionNode('', jid.JID('localhost'), - copy.copy(self.defaultConfig['collection'])) - self._nodes = {'': rootNode} - - - def getNode(self, nodeIdentifier): - try: - node = self._nodes[nodeIdentifier] - except KeyError: - return defer.fail(error.NodeNotFound()) - - return defer.succeed(node) - - - def getNodeIds(self): - return defer.succeed(self._nodes.keys()) - - - def createNode(self, nodeIdentifier, owner, config): - if nodeIdentifier in self._nodes: - return defer.fail(error.NodeExists()) - - if config['pubsub#node_type'] != 'leaf': - raise error.NoCollections() - - node = LeafNode(nodeIdentifier, owner, config) - self._nodes[nodeIdentifier] = node - - return defer.succeed(None) - - - def deleteNode(self, nodeIdentifier): - try: - del self._nodes[nodeIdentifier] - except KeyError: - return defer.fail(error.NodeNotFound()) - - return defer.succeed(None) - - - def getAffiliations(self, entity): - entity = entity.userhost() - return defer.succeed([(node.nodeIdentifier, node._affiliations[entity]) - for name, node in self._nodes.iteritems() - if entity in node._affiliations]) - - - def getSubscriptions(self, entity): - subscriptions = [] - for node in self._nodes.itervalues(): - for subscriber, subscription in node._subscriptions.iteritems(): - subscriber = jid.internJID(subscriber) - if subscriber.userhostJID() == entity.userhostJID(): - subscriptions.append(subscription) - - return defer.succeed(subscriptions) - - - def getDefaultConfiguration(self, nodeType): - if nodeType == 'collection': - raise error.NoCollections() - - return self.defaultConfig[nodeType] - - -class Node: - - implements(iidavoll.INode) - - def __init__(self, nodeIdentifier, owner, config): - self.nodeIdentifier = nodeIdentifier - self._affiliations = {owner.userhost(): 'owner'} - self._subscriptions = {} - self._config = copy.copy(config) - - - def getType(self): - return self.nodeType - - - def getConfiguration(self): - return self._config - - - def getMetaData(self): - config = copy.copy(self._config) - config["pubsub#node_type"] = self.nodeType - return config - - - def setConfiguration(self, options): - for option in options: - if option in self._config: - self._config[option] = options[option] - - return defer.succeed(None) - - - def getAffiliation(self, entity): - return defer.succeed(self._affiliations.get(entity.userhost())) - - - def getSubscription(self, subscriber): - try: - subscription = self._subscriptions[subscriber.full()] - except KeyError: - return defer.succeed(None) - else: - return defer.succeed(subscription) - - - def getSubscriptions(self, state=None): - return defer.succeed( - [subscription - for subscription in self._subscriptions.itervalues() - if state is None or subscription.state == state]) - - - - def addSubscription(self, subscriber, state, options): - if self._subscriptions.get(subscriber.full()): - return defer.fail(error.SubscriptionExists()) - - subscription = Subscription(self.nodeIdentifier, subscriber, state, - options) - self._subscriptions[subscriber.full()] = subscription - return defer.succeed(None) - - - def removeSubscription(self, subscriber): - try: - del self._subscriptions[subscriber.full()] - except KeyError: - return defer.fail(error.NotSubscribed()) - - return defer.succeed(None) - - - def isSubscribed(self, entity): - for subscriber, subscription in self._subscriptions.iteritems(): - if jid.internJID(subscriber).userhost() == entity.userhost() and \ - subscription.state == 'subscribed': - return defer.succeed(True) - - return defer.succeed(False) - - - def getAffiliations(self): - affiliations = [(jid.internJID(entity), affiliation) for entity, affiliation - in self._affiliations.iteritems()] - - return defer.succeed(affiliations) - - - -class PublishedItem(object): - """ - A published item. - - This represent an item as it was published by an entity. - - @ivar element: The DOM representation of the item that was published. - @type element: L{Element<twisted.words.xish.domish.Element>} - @ivar publisher: The entity that published the item. - @type publisher: L{JID<twisted.words.protocols.jabber.jid.JID>} - """ - - def __init__(self, element, publisher): - self.element = element - self.publisher = publisher - - - -class LeafNode(Node): - - implements(iidavoll.ILeafNode) - - nodeType = 'leaf' - - def __init__(self, nodeIdentifier, owner, config): - Node.__init__(self, nodeIdentifier, owner, config) - self._items = {} - self._itemlist = [] - - - def storeItems(self, item_data, publisher): - for access_model, item_config, element in item_data: - item = PublishedItem(element, publisher) - itemIdentifier = element["id"] - if itemIdentifier in self._items: - self._itemlist.remove(self._items[itemIdentifier]) - self._items[itemIdentifier] = item - self._itemlist.append(item) - - return defer.succeed(None) - - - def removeItems(self, itemIdentifiers): - deleted = [] - - for itemIdentifier in itemIdentifiers: - try: - item = self._items[itemIdentifier] - except KeyError: - pass - else: - self._itemlist.remove(item) - del self._items[itemIdentifier] - deleted.append(itemIdentifier) - - return defer.succeed(deleted) - - - def getItems(self, authorized_groups, unrestricted, maxItems=None): - if maxItems is not None: - itemList = self._itemlist[-maxItems:] - else: - itemList = self._itemlist - return defer.succeed([item.element for item in itemList]) - - - def getItemsById(self, authorized_groups, unrestricted, itemIdentifiers): - items = [] - for itemIdentifier in itemIdentifiers: - try: - item = self._items[itemIdentifier] - except KeyError: - pass - else: - items.append(item.element) - return defer.succeed(items) - - - def purge(self): - self._items = {} - self._itemlist = [] - - return defer.succeed(None) - - - def filterItemsWithPublisher(self, itemIdentifiers, requestor): - filteredItems = [] - for itemIdentifier in itemIdentifiers: - try: - if self._items[itemIdentifier].publisher.userhost() == requestor.userhost(): - filteredItems.append(self.items[itemIdentifier]) - except KeyError, AttributeError: - pass - return defer.succeed(filteredItems) - - -class CollectionNode(Node): - nodeType = 'collection' - - - -class GatewayStorage(object): - """ - Memory based storage facility for the XMPP-HTTP gateway. - """ - - def __init__(self): - self.callbacks = {} - - - def addCallback(self, service, nodeIdentifier, callback): - try: - callbacks = self.callbacks[service, nodeIdentifier] - except KeyError: - callbacks = {callback} - self.callbacks[service, nodeIdentifier] = callbacks - else: - callbacks.add(callback) - pass - - return defer.succeed(None) - - - def removeCallback(self, service, nodeIdentifier, callback): - try: - callbacks = self.callbacks[service, nodeIdentifier] - callbacks.remove(callback) - except KeyError: - return defer.fail(error.NotSubscribed()) - else: - if not callbacks: - del self.callbacks[service, nodeIdentifier] - - return defer.succeed(not callbacks) - - - def getCallbacks(self, service, nodeIdentifier): - try: - callbacks = self.callbacks[service, nodeIdentifier] - except KeyError: - return defer.fail(error.NoCallbacks()) - else: - return defer.succeed(callbacks) - - - def hasCallbacks(self, service, nodeIdentifier): - return defer.succeed((service, nodeIdentifier) in self.callbacks)
--- a/sat_pubsub/pgsql_storage.py Fri Aug 16 12:00:02 2019 +0200 +++ b/sat_pubsub/pgsql_storage.py Fri Aug 16 12:00:03 2019 +0200 @@ -1300,80 +1300,3 @@ class CollectionNode(Node): nodeType = 'collection' - - - -class GatewayStorage(object): - """ - Memory based storage facility for the XMPP-HTTP gateway. - """ - - def __init__(self, dbpool): - self.dbpool = dbpool - - def _countCallbacks(self, cursor, service, nodeIdentifier): - """ - Count number of callbacks registered for a node. - """ - cursor.execute("""SELECT count(*) FROM callbacks - WHERE service=%s and node=%s""", - (service.full(), - nodeIdentifier)) - results = cursor.fetchall() - return results[0][0] - - def addCallback(self, service, nodeIdentifier, callback): - def interaction(cursor): - cursor.execute("""SELECT 1 as bool FROM callbacks - WHERE service=%s and node=%s and uri=%s""", - (service.full(), - nodeIdentifier, - callback)) - if cursor.fetchall(): - return - - cursor.execute("""INSERT INTO callbacks - (service, node, uri) VALUES - (%s, %s, %s)""", - (service.full(), - nodeIdentifier, - callback)) - - return self.dbpool.runInteraction(interaction) - - def removeCallback(self, service, nodeIdentifier, callback): - def interaction(cursor): - cursor.execute("""DELETE FROM callbacks - WHERE service=%s and node=%s and uri=%s""", - (service.full(), - nodeIdentifier, - callback)) - - if cursor.rowcount != 1: - raise error.NotSubscribed() - - last = not self._countCallbacks(cursor, service, nodeIdentifier) - return last - - return self.dbpool.runInteraction(interaction) - - def getCallbacks(self, service, nodeIdentifier): - def interaction(cursor): - cursor.execute("""SELECT uri FROM callbacks - WHERE service=%s and node=%s""", - (service.full(), - nodeIdentifier)) - results = cursor.fetchall() - - if not results: - raise error.NoCallbacks() - - return [result[0] for result in results] - - return self.dbpool.runInteraction(interaction) - - def hasCallbacks(self, service, nodeIdentifier): - def interaction(cursor): - return bool(self._countCallbacks(cursor, service, nodeIdentifier)) - - return self.dbpool.runInteraction(interaction)
--- a/sat_pubsub/test/test_gateway.py Fri Aug 16 12:00:02 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,822 +0,0 @@ -#!/usr/bin/python -#-*- coding: utf-8 -*- - -# Copyright (c) 2003-2011 Ralph Meijer -# Copyright (c) 2012-2019 Jérôme Poisson - - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. - -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -# -- - -# This program is based on Idavoll (http://idavoll.ik.nu/), -# originaly written by Ralph Meijer (http://ralphm.net/blog/) -# It is sublicensed under AGPL v3 (or any later version) as allowed by the original -# license. - -# -- - -# Here is a copy of the original license: - -# Copyright (c) 2003-2011 Ralph Meijer - -# Permission is hereby granted, free of charge, to any person obtaining -# a copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, -# distribute, sublicense, and/or sell copies of the Software, and to -# permit persons to whom the Software is furnished to do so, subject to -# the following conditions: - -# The above copyright notice and this permission notice shall be -# included in all copies or substantial portions of the Software. - -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE -# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION -# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - - -""" -Tests for L{idavoll.gateway}. - -Note that some tests are functional tests that require a running idavoll -service. -""" - -from StringIO import StringIO - -import simplejson - -from twisted.internet import defer -from twisted.trial import unittest -from twisted.web import error, http, http_headers, server -from twisted.web.test import requesthelper -from twisted.words.xish import domish -from twisted.words.protocols.jabber.jid import JID - -from sat_pubsub import gateway -from sat_pubsub.backend import BackendService -from sat_pubsub.memory_storage import Storage - -AGENT = "Idavoll Test Script" -NS_ATOM = "http://www.w3.org/2005/Atom" - -TEST_ENTRY = domish.Element((NS_ATOM, 'entry')) -TEST_ENTRY.addElement("id", - content="urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a") -TEST_ENTRY.addElement("title", content="Atom-Powered Robots Run Amok") -TEST_ENTRY.addElement("author").addElement("name", content="John Doe") -TEST_ENTRY.addElement("content", content="Some text.") - -baseURI = "http://localhost:8086/" -component = "pubsub" -componentJID = JID(component) -ownerJID = JID('owner@example.org') - -def _render(resource, request): - result = resource.render(request) - if isinstance(result, str): - request.write(result) - request.finish() - return defer.succeed(None) - elif result is server.NOT_DONE_YET: - if request.finished: - return defer.succeed(None) - else: - return request.notifyFinish() - else: - raise ValueError("Unexpected return value: %r" % (result,)) - - -class DummyRequest(requesthelper.DummyRequest): - - def __init__(self, *args, **kwargs): - requesthelper.DummyRequest.__init__(self, *args, **kwargs) - self.requestHeaders = http_headers.Headers() - - - -class GetServiceAndNodeTest(unittest.TestCase): - """ - Tests for {gateway.getServiceAndNode}. - """ - - def test_basic(self): - """ - getServiceAndNode parses an XMPP URI with node parameter. - """ - uri = b'xmpp:pubsub.example.org?;node=test' - service, nodeIdentifier = gateway.getServiceAndNode(uri) - self.assertEqual(JID(u'pubsub.example.org'), service) - self.assertEqual(u'test', nodeIdentifier) - - - def test_schemeEmpty(self): - """ - If the URI scheme is empty, an exception is raised. - """ - uri = b'pubsub.example.org' - self.assertRaises(gateway.XMPPURIParseError, - gateway.getServiceAndNode, uri) - - - def test_schemeNotXMPP(self): - """ - If the URI scheme is not 'xmpp', an exception is raised. - """ - uri = b'mailto:test@example.org' - self.assertRaises(gateway.XMPPURIParseError, - gateway.getServiceAndNode, uri) - - - def test_authorityPresent(self): - """ - If the URI has an authority component, an exception is raised. - """ - uri = b'xmpp://pubsub.example.org/' - self.assertRaises(gateway.XMPPURIParseError, - gateway.getServiceAndNode, uri) - - - def test_queryEmpty(self): - """ - If there is no query component, the nodeIdentifier is empty. - """ - uri = b'xmpp:pubsub.example.org' - service, nodeIdentifier = gateway.getServiceAndNode(uri) - - self.assertEqual(JID(u'pubsub.example.org'), service) - self.assertEqual(u'', nodeIdentifier) - - - def test_jidInvalid(self): - """ - If the JID from the path component is invalid, an exception is raised. - """ - uri = b'xmpp:@@pubsub.example.org?;node=test' - self.assertRaises(gateway.XMPPURIParseError, - gateway.getServiceAndNode, uri) - - - def test_pathEmpty(self): - """ - If there is no path component, an exception is raised. - """ - uri = b'xmpp:?node=test' - self.assertRaises(gateway.XMPPURIParseError, - gateway.getServiceAndNode, uri) - - - def test_nodeAbsent(self): - """ - If the node parameter is missing, the nodeIdentifier is empty. - """ - uri = b'xmpp:pubsub.example.org?' - service, nodeIdentifier = gateway.getServiceAndNode(uri) - - self.assertEqual(JID(u'pubsub.example.org'), service) - self.assertEqual(u'', nodeIdentifier) - - - -class GetXMPPURITest(unittest.TestCase): - """ - Tests for L{gateway.getXMPPURITest}. - """ - - def test_basic(self): - uri = gateway.getXMPPURI(JID(u'pubsub.example.org'), u'test') - self.assertEqual('xmpp:pubsub.example.org?;node=test', uri) - - -class CreateResourceTest(unittest.TestCase): - """ - Tests for L{gateway.CreateResource}. - """ - - def setUp(self): - self.backend = BackendService(Storage()) - self.resource = gateway.CreateResource(self.backend, componentJID, - ownerJID) - - - def test_get(self): - """ - The method GET is not supported. - """ - request = DummyRequest([b'']) - self.assertRaises(error.UnsupportedMethod, - _render, self.resource, request) - - - def test_post(self): - """ - Upon a POST, a new node is created and the URI returned. - """ - request = DummyRequest([b'']) - request.method = 'POST' - - def gotNodes(nodeIdentifiers, uri): - service, nodeIdentifier = gateway.getServiceAndNode(uri) - self.assertIn(nodeIdentifier, nodeIdentifiers) - - def rendered(result): - self.assertEqual('application/json', - request.outgoingHeaders['content-type']) - payload = simplejson.loads(b''.join(request.written)) - self.assertIn('uri', payload) - d = self.backend.getNodes() - d.addCallback(gotNodes, payload['uri']) - return d - - d = _render(self.resource, request) - d.addCallback(rendered) - return d - - - -class DeleteResourceTest(unittest.TestCase): - """ - Tests for L{gateway.DeleteResource}. - """ - - def setUp(self): - self.backend = BackendService(Storage()) - self.resource = gateway.DeleteResource(self.backend, componentJID, - ownerJID) - - - def test_get(self): - """ - The method GET is not supported. - """ - request = DummyRequest([b'']) - self.assertRaises(error.UnsupportedMethod, - _render, self.resource, request) - - - def test_post(self): - """ - Upon a POST, a new node is created and the URI returned. - """ - request = DummyRequest([b'']) - request.method = b'POST' - - def rendered(result): - self.assertEqual(http.NO_CONTENT, request.responseCode) - - def nodeCreated(nodeIdentifier): - uri = gateway.getXMPPURI(componentJID, nodeIdentifier) - request.args[b'uri'] = [uri] - request.content = StringIO(b'') - - return _render(self.resource, request) - - d = self.backend.createNode(u'test', ownerJID) - d.addCallback(nodeCreated) - d.addCallback(rendered) - return d - - - def test_postWithRedirect(self): - """ - Upon a POST, a new node is created and the URI returned. - """ - request = DummyRequest([b'']) - request.method = b'POST' - otherNodeURI = b'xmpp:pubsub.example.org?node=other' - - def rendered(result): - self.assertEqual(http.NO_CONTENT, request.responseCode) - self.assertEqual(1, len(deletes)) - nodeIdentifier, owner, redirectURI = deletes[-1] - self.assertEqual(otherNodeURI, redirectURI) - - def nodeCreated(nodeIdentifier): - uri = gateway.getXMPPURI(componentJID, nodeIdentifier) - request.args[b'uri'] = [uri] - payload = {b'redirect_uri': otherNodeURI} - body = simplejson.dumps(payload) - request.content = StringIO(body) - return _render(self.resource, request) - - def deleteNode(nodeIdentifier, owner, redirectURI): - deletes.append((nodeIdentifier, owner, redirectURI)) - return defer.succeed(nodeIdentifier) - - deletes = [] - self.patch(self.backend, 'deleteNode', deleteNode) - d = self.backend.createNode(u'test', ownerJID) - d.addCallback(nodeCreated) - d.addCallback(rendered) - return d - - - def test_postUnknownNode(self): - """ - If the node to be deleted is unknown, 404 Not Found is returned. - """ - request = DummyRequest([b'']) - request.method = b'POST' - - def rendered(result): - self.assertEqual(http.NOT_FOUND, request.responseCode) - - uri = gateway.getXMPPURI(componentJID, u'unknown') - request.args[b'uri'] = [uri] - request.content = StringIO(b'') - - d = _render(self.resource, request) - d.addCallback(rendered) - return d - - - def test_postMalformedXMPPURI(self): - """ - If the XMPP URI is malformed, Bad Request is returned. - """ - request = DummyRequest([b'']) - request.method = b'POST' - - def rendered(result): - self.assertEqual(http.BAD_REQUEST, request.responseCode) - - uri = 'xmpp:@@@@' - request.args[b'uri'] = [uri] - request.content = StringIO(b'') - - d = _render(self.resource, request) - d.addCallback(rendered) - return d - - - def test_postURIMissing(self): - """ - If no URI is passed, 400 Bad Request is returned. - """ - request = DummyRequest([b'']) - request.method = b'POST' - - def rendered(result): - self.assertEqual(http.BAD_REQUEST, request.responseCode) - - request.content = StringIO(b'') - - d = _render(self.resource, request) - d.addCallback(rendered) - return d - - - -class CallbackResourceTest(unittest.TestCase): - """ - Tests for L{gateway.CallbackResource}. - """ - - def setUp(self): - self.callbackEvents = [] - self.resource = gateway.CallbackResource(self._callback) - - - def _callback(self, payload, headers): - self.callbackEvents.append((payload, headers)) - - - def test_get(self): - """ - The method GET is not supported. - """ - request = DummyRequest([b'']) - self.assertRaises(error.UnsupportedMethod, - _render, self.resource, request) - - - def test_post(self): - """ - The body posted is passed to the callback. - """ - request = DummyRequest([b'']) - request.method = 'POST' - request.content = StringIO(b'<root><child/></root>') - - def rendered(result): - self.assertEqual(1, len(self.callbackEvents)) - payload, headers = self.callbackEvents[-1] - self.assertEqual('root', payload.name) - - self.assertEqual(http.NO_CONTENT, request.responseCode) - self.assertFalse(b''.join(request.written)) - - d = _render(self.resource, request) - d.addCallback(rendered) - return d - - - def test_postEvent(self): - """ - If the Event header is set, the payload is empty and the header passed. - """ - request = DummyRequest([b'']) - request.method = 'POST' - request.requestHeaders.addRawHeader(b'Event', b'DELETE') - request.content = StringIO(b'') - - def rendered(result): - self.assertEqual(1, len(self.callbackEvents)) - payload, headers = self.callbackEvents[-1] - self.assertIdentical(None, payload) - self.assertEqual(['DELETE'], headers.getRawHeaders(b'Event')) - self.assertFalse(b''.join(request.written)) - - d = _render(self.resource, request) - d.addCallback(rendered) - return d - - - -class GatewayTest(unittest.TestCase): - timeout = 2 - - def setUp(self): - self.client = gateway.GatewayClient(baseURI) - self.client.startService() - self.addCleanup(self.client.stopService) - - def trapConnectionRefused(failure): - from twisted.internet.error import ConnectionRefusedError - failure.trap(ConnectionRefusedError) - raise unittest.SkipTest("Gateway to test against is not available") - - def trapNotFound(failure): - from twisted.web.error import Error - failure.trap(Error) - - d = self.client.ping() - d.addErrback(trapConnectionRefused) - d.addErrback(trapNotFound) - return d - - - def tearDown(self): - return self.client.stopService() - - - def test_create(self): - - def cb(response): - self.assertIn('uri', response) - - d = self.client.create() - d.addCallback(cb) - return d - - def test_publish(self): - - def cb(response): - self.assertIn('uri', response) - - d = self.client.publish(TEST_ENTRY) - d.addCallback(cb) - return d - - def test_publishExistingNode(self): - - def cb2(response, xmppURI): - self.assertEquals(xmppURI, response['uri']) - - def cb1(response): - xmppURI = response['uri'] - d = self.client.publish(TEST_ENTRY, xmppURI) - d.addCallback(cb2, xmppURI) - return d - - d = self.client.create() - d.addCallback(cb1) - return d - - def test_publishNonExisting(self): - def cb(err): - self.assertEqual('404', err.status) - - d = self.client.publish(TEST_ENTRY, 'xmpp:%s?node=test' % component) - self.assertFailure(d, error.Error) - d.addCallback(cb) - return d - - def test_delete(self): - def cb(response): - xmppURI = response['uri'] - d = self.client.delete(xmppURI) - return d - - d = self.client.create() - d.addCallback(cb) - return d - - def test_deleteWithRedirect(self): - def cb(response): - xmppURI = response['uri'] - redirectURI = 'xmpp:%s?node=test' % component - d = self.client.delete(xmppURI, redirectURI) - return d - - d = self.client.create() - d.addCallback(cb) - return d - - def test_deleteNotification(self): - def onNotification(data, headers): - try: - self.assertTrue(headers.hasHeader('Event')) - self.assertEquals(['DELETED'], headers.getRawHeaders('Event')) - self.assertFalse(headers.hasHeader('Link')) - except: - self.client.deferred.errback() - else: - self.client.deferred.callback(None) - - def cb(response): - xmppURI = response['uri'] - d = self.client.subscribe(xmppURI) - d.addCallback(lambda _: xmppURI) - return d - - def cb2(xmppURI): - d = self.client.delete(xmppURI) - return d - - self.client.callback = onNotification - self.client.deferred = defer.Deferred() - d = self.client.create() - d.addCallback(cb) - d.addCallback(cb2) - return defer.gatherResults([d, self.client.deferred]) - - def test_deleteNotificationWithRedirect(self): - redirectURI = 'xmpp:%s?node=test' % component - - def onNotification(data, headers): - try: - self.assertTrue(headers.hasHeader('Event')) - self.assertEquals(['DELETED'], headers.getRawHeaders('Event')) - self.assertEquals(['<%s>; rel=alternate' % redirectURI], - headers.getRawHeaders('Link')) - except: - self.client.deferred.errback() - else: - self.client.deferred.callback(None) - - def cb(response): - xmppURI = response['uri'] - d = self.client.subscribe(xmppURI) - d.addCallback(lambda _: xmppURI) - return d - - def cb2(xmppURI): - d = self.client.delete(xmppURI, redirectURI) - return d - - self.client.callback = onNotification - self.client.deferred = defer.Deferred() - d = self.client.create() - d.addCallback(cb) - d.addCallback(cb2) - return defer.gatherResults([d, self.client.deferred]) - - def test_list(self): - d = self.client.listNodes() - return d - - def test_subscribe(self): - def cb(response): - xmppURI = response['uri'] - d = self.client.subscribe(xmppURI) - return d - - d = self.client.create() - d.addCallback(cb) - return d - - def test_subscribeGetNotification(self): - - def onNotification(data, headers): - self.client.deferred.callback(None) - - def cb(response): - xmppURI = response['uri'] - d = self.client.subscribe(xmppURI) - d.addCallback(lambda _: xmppURI) - return d - - def cb2(xmppURI): - d = self.client.publish(TEST_ENTRY, xmppURI) - return d - - - self.client.callback = onNotification - self.client.deferred = defer.Deferred() - d = self.client.create() - d.addCallback(cb) - d.addCallback(cb2) - return defer.gatherResults([d, self.client.deferred]) - - - def test_subscribeTwiceGetNotification(self): - - def onNotification1(data, headers): - d = client1.stopService() - d.chainDeferred(client1.deferred) - - def onNotification2(data, headers): - d = client2.stopService() - d.chainDeferred(client2.deferred) - - def cb(response): - xmppURI = response['uri'] - d = client1.subscribe(xmppURI) - d.addCallback(lambda _: xmppURI) - return d - - def cb2(xmppURI): - d = client2.subscribe(xmppURI) - d.addCallback(lambda _: xmppURI) - return d - - def cb3(xmppURI): - d = self.client.publish(TEST_ENTRY, xmppURI) - return d - - - client1 = gateway.GatewayClient(baseURI, callbackPort=8088) - client1.startService() - client1.callback = onNotification1 - client1.deferred = defer.Deferred() - client2 = gateway.GatewayClient(baseURI, callbackPort=8089) - client2.startService() - client2.callback = onNotification2 - client2.deferred = defer.Deferred() - - d = self.client.create() - d.addCallback(cb) - d.addCallback(cb2) - d.addCallback(cb3) - dl = defer.gatherResults([d, client1.deferred, client2.deferred]) - return dl - - - def test_subscribeGetDelayedNotification(self): - - def onNotification(data, headers): - self.client.deferred.callback(None) - - def cb(response): - xmppURI = response['uri'] - self.assertNot(self.client.deferred.called) - d = self.client.publish(TEST_ENTRY, xmppURI) - d.addCallback(lambda _: xmppURI) - return d - - def cb2(xmppURI): - d = self.client.subscribe(xmppURI) - return d - - - self.client.callback = onNotification - self.client.deferred = defer.Deferred() - d = self.client.create() - d.addCallback(cb) - d.addCallback(cb2) - return defer.gatherResults([d, self.client.deferred]) - - def test_subscribeGetDelayedNotification2(self): - """ - Test that subscribing as second results in a notification being sent. - """ - - def onNotification1(data, headers): - client1.deferred.callback(None) - client1.stopService() - - def onNotification2(data, headers): - client2.deferred.callback(None) - client2.stopService() - - def cb(response): - xmppURI = response['uri'] - self.assertNot(client1.deferred.called) - self.assertNot(client2.deferred.called) - d = self.client.publish(TEST_ENTRY, xmppURI) - d.addCallback(lambda _: xmppURI) - return d - - def cb2(xmppURI): - d = client1.subscribe(xmppURI) - d.addCallback(lambda _: xmppURI) - return d - - def cb3(xmppURI): - d = client2.subscribe(xmppURI) - return d - - client1 = gateway.GatewayClient(baseURI, callbackPort=8088) - client1.startService() - client1.callback = onNotification1 - client1.deferred = defer.Deferred() - client2 = gateway.GatewayClient(baseURI, callbackPort=8089) - client2.startService() - client2.callback = onNotification2 - client2.deferred = defer.Deferred() - - - d = self.client.create() - d.addCallback(cb) - d.addCallback(cb2) - d.addCallback(cb3) - dl = defer.gatherResults([d, client1.deferred, client2.deferred]) - return dl - - - def test_subscribeNonExisting(self): - def cb(err): - self.assertEqual('403', err.status) - - d = self.client.subscribe('xmpp:%s?node=test' % component) - self.assertFailure(d, error.Error) - d.addCallback(cb) - return d - - - def test_subscribeRootGetNotification(self): - - def clean(rootNode): - return self.client.unsubscribe(rootNode) - - def onNotification(data, headers): - self.client.deferred.callback(None) - - def cb(response): - xmppURI = response['uri'] - jid, nodeIdentifier = gateway.getServiceAndNode(xmppURI) - rootNode = gateway.getXMPPURI(jid, '') - - d = self.client.subscribe(rootNode) - d.addCallback(lambda _: self.addCleanup(clean, rootNode)) - d.addCallback(lambda _: xmppURI) - return d - - def cb2(xmppURI): - return self.client.publish(TEST_ENTRY, xmppURI) - - - self.client.callback = onNotification - self.client.deferred = defer.Deferred() - d = self.client.create() - d.addCallback(cb) - d.addCallback(cb2) - return defer.gatherResults([d, self.client.deferred]) - - - def test_unsubscribeNonExisting(self): - def cb(err): - self.assertEqual('403', err.status) - - d = self.client.unsubscribe('xmpp:%s?node=test' % component) - self.assertFailure(d, error.Error) - d.addCallback(cb) - return d - - - def test_items(self): - def cb(response): - xmppURI = response['uri'] - d = self.client.items(xmppURI) - return d - - d = self.client.publish(TEST_ENTRY) - d.addCallback(cb) - return d - - - def test_itemsMaxItems(self): - def cb(response): - xmppURI = response['uri'] - d = self.client.items(xmppURI, 2) - return d - - d = self.client.publish(TEST_ENTRY) - d.addCallback(cb) - return d
--- a/setup.py Fri Aug 16 12:00:02 2019 +0200 +++ b/setup.py Fri Aug 16 12:00:03 2019 +0200 @@ -26,9 +26,8 @@ install_requires = [ 'wokkel >= 0.7.1', 'psycopg2', - 'simplejson', + 'sat_tmp', 'uuid', - 'sat_tmp', ]
--- a/twisted/plugins/pubsub.py Fri Aug 16 12:00:02 2019 +0200 +++ b/twisted/plugins/pubsub.py Fri Aug 16 12:00:03 2019 +0200 @@ -166,7 +166,7 @@ def postOptions(self): if self['backend'] not in ['pgsql', 'memory']: - raise usage.UsageError, "Unknown backend!" + raise usage.UsageError("Unknown backend!") if self['backend'] == 'memory': raise NotImplementedError('memory backend is not available at the moment') @@ -211,8 +211,7 @@ ) st = Storage(dbpool) elif config['backend'] == 'memory': - from sat_pubsub.memory_storage import Storage - st = Storage() + raise NotImplementedError('memory backend is not available at the moment') bs = BackendService(st, config) bs.setName('backend')