view src/test/test_gateway.py @ 391:1d2222a91e6b

pubsub_admin: catch errors on publish, and send an iq error
author Goffi <goffi@goffi.org>
date Fri, 15 Feb 2019 18:05:02 +0100
parents aa3a464df605
children
line wrap: on
line source

#!/usr/bin/python
#-*- coding: utf-8 -*-

# Copyright (c) 2003-2011 Ralph Meijer
# Copyright (c) 2012-2019 Jérôme Poisson


# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
# --

# This program is based on Idavoll (http://idavoll.ik.nu/),
# originaly written by Ralph Meijer (http://ralphm.net/blog/)
# It is sublicensed under AGPL v3 (or any later version) as allowed by the original
# license.

# --

# Here is a copy of the original license:

# Copyright (c) 2003-2011 Ralph Meijer

# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:

# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.


"""
Tests for L{idavoll.gateway}.

Note that some tests are functional tests that require a running idavoll
service.
"""

from StringIO import StringIO

import simplejson

from twisted.internet import defer
from twisted.trial import unittest
from twisted.web import error, http, http_headers, server
from twisted.web.test import requesthelper
from twisted.words.xish import domish
from twisted.words.protocols.jabber.jid import JID

from sat_pubsub import gateway
from sat_pubsub.backend import BackendService
from sat_pubsub.memory_storage import Storage

AGENT = "Idavoll Test Script"
NS_ATOM = "http://www.w3.org/2005/Atom"

TEST_ENTRY = domish.Element((NS_ATOM, 'entry'))
TEST_ENTRY.addElement("id",
                      content="urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a")
TEST_ENTRY.addElement("title", content="Atom-Powered Robots Run Amok")
TEST_ENTRY.addElement("author").addElement("name", content="John Doe")
TEST_ENTRY.addElement("content", content="Some text.")

baseURI = "http://localhost:8086/"
component = "pubsub"
componentJID = JID(component)
ownerJID = JID('owner@example.org')

def _render(resource, request):
    result = resource.render(request)
    if isinstance(result, str):
        request.write(result)
        request.finish()
        return defer.succeed(None)
    elif result is server.NOT_DONE_YET:
        if request.finished:
            return defer.succeed(None)
        else:
            return request.notifyFinish()
    else:
        raise ValueError("Unexpected return value: %r" % (result,))


class DummyRequest(requesthelper.DummyRequest):

    def __init__(self, *args, **kwargs):
        requesthelper.DummyRequest.__init__(self, *args, **kwargs)
        self.requestHeaders = http_headers.Headers()



class GetServiceAndNodeTest(unittest.TestCase):
    """
    Tests for {gateway.getServiceAndNode}.
    """

    def test_basic(self):
        """
        getServiceAndNode parses an XMPP URI with node parameter.
        """
        uri = b'xmpp:pubsub.example.org?;node=test'
        service, nodeIdentifier = gateway.getServiceAndNode(uri)
        self.assertEqual(JID(u'pubsub.example.org'), service)
        self.assertEqual(u'test', nodeIdentifier)


    def test_schemeEmpty(self):
        """
        If the URI scheme is empty, an exception is raised.
        """
        uri = b'pubsub.example.org'
        self.assertRaises(gateway.XMPPURIParseError,
                          gateway.getServiceAndNode, uri)


    def test_schemeNotXMPP(self):
        """
        If the URI scheme is not 'xmpp', an exception is raised.
        """
        uri = b'mailto:test@example.org'
        self.assertRaises(gateway.XMPPURIParseError,
                          gateway.getServiceAndNode, uri)


    def test_authorityPresent(self):
        """
        If the URI has an authority component, an exception is raised.
        """
        uri = b'xmpp://pubsub.example.org/'
        self.assertRaises(gateway.XMPPURIParseError,
                          gateway.getServiceAndNode, uri)


    def test_queryEmpty(self):
        """
        If there is no query component, the nodeIdentifier is empty.
        """
        uri = b'xmpp:pubsub.example.org'
        service, nodeIdentifier = gateway.getServiceAndNode(uri)

        self.assertEqual(JID(u'pubsub.example.org'), service)
        self.assertEqual(u'', nodeIdentifier)


    def test_jidInvalid(self):
        """
        If the JID from the path component is invalid, an exception is raised.
        """
        uri = b'xmpp:@@pubsub.example.org?;node=test'
        self.assertRaises(gateway.XMPPURIParseError,
                          gateway.getServiceAndNode, uri)


    def test_pathEmpty(self):
        """
        If there is no path component, an exception is raised.
        """
        uri = b'xmpp:?node=test'
        self.assertRaises(gateway.XMPPURIParseError,
                          gateway.getServiceAndNode, uri)


    def test_nodeAbsent(self):
        """
        If the node parameter is missing, the nodeIdentifier is empty.
        """
        uri = b'xmpp:pubsub.example.org?'
        service, nodeIdentifier = gateway.getServiceAndNode(uri)

        self.assertEqual(JID(u'pubsub.example.org'), service)
        self.assertEqual(u'', nodeIdentifier)



class GetXMPPURITest(unittest.TestCase):
    """
    Tests for L{gateway.getXMPPURITest}.
    """

    def test_basic(self):
        uri = gateway.getXMPPURI(JID(u'pubsub.example.org'), u'test')
        self.assertEqual('xmpp:pubsub.example.org?;node=test', uri)


class CreateResourceTest(unittest.TestCase):
    """
    Tests for L{gateway.CreateResource}.
    """

    def setUp(self):
        self.backend = BackendService(Storage())
        self.resource = gateway.CreateResource(self.backend, componentJID,
                                               ownerJID)


    def test_get(self):
        """
        The method GET is not supported.
        """
        request = DummyRequest([b''])
        self.assertRaises(error.UnsupportedMethod,
                          _render, self.resource, request)


    def test_post(self):
        """
        Upon a POST, a new node is created and the URI returned.
        """
        request = DummyRequest([b''])
        request.method = 'POST'

        def gotNodes(nodeIdentifiers, uri):
            service, nodeIdentifier = gateway.getServiceAndNode(uri)
            self.assertIn(nodeIdentifier, nodeIdentifiers)

        def rendered(result):
            self.assertEqual('application/json',
                             request.outgoingHeaders['content-type'])
            payload = simplejson.loads(b''.join(request.written))
            self.assertIn('uri', payload)
            d = self.backend.getNodes()
            d.addCallback(gotNodes, payload['uri'])
            return d

        d = _render(self.resource, request)
        d.addCallback(rendered)
        return d



class DeleteResourceTest(unittest.TestCase):
    """
    Tests for L{gateway.DeleteResource}.
    """

    def setUp(self):
        self.backend = BackendService(Storage())
        self.resource = gateway.DeleteResource(self.backend, componentJID,
                                               ownerJID)


    def test_get(self):
        """
        The method GET is not supported.
        """
        request = DummyRequest([b''])
        self.assertRaises(error.UnsupportedMethod,
                          _render, self.resource, request)


    def test_post(self):
        """
        Upon a POST, a new node is created and the URI returned.
        """
        request = DummyRequest([b''])
        request.method = b'POST'

        def rendered(result):
            self.assertEqual(http.NO_CONTENT, request.responseCode)

        def nodeCreated(nodeIdentifier):
            uri = gateway.getXMPPURI(componentJID, nodeIdentifier)
            request.args[b'uri'] = [uri]
            request.content = StringIO(b'')

            return _render(self.resource, request)

        d = self.backend.createNode(u'test', ownerJID)
        d.addCallback(nodeCreated)
        d.addCallback(rendered)
        return d


    def test_postWithRedirect(self):
        """
        Upon a POST, a new node is created and the URI returned.
        """
        request = DummyRequest([b''])
        request.method = b'POST'
        otherNodeURI = b'xmpp:pubsub.example.org?node=other'

        def rendered(result):
            self.assertEqual(http.NO_CONTENT, request.responseCode)
            self.assertEqual(1, len(deletes))
            nodeIdentifier, owner, redirectURI = deletes[-1]
            self.assertEqual(otherNodeURI, redirectURI)

        def nodeCreated(nodeIdentifier):
            uri = gateway.getXMPPURI(componentJID, nodeIdentifier)
            request.args[b'uri'] = [uri]
            payload = {b'redirect_uri': otherNodeURI}
            body = simplejson.dumps(payload)
            request.content = StringIO(body)
            return _render(self.resource, request)

        def deleteNode(nodeIdentifier, owner, redirectURI):
            deletes.append((nodeIdentifier, owner, redirectURI))
            return defer.succeed(nodeIdentifier)

        deletes = []
        self.patch(self.backend, 'deleteNode', deleteNode)
        d = self.backend.createNode(u'test', ownerJID)
        d.addCallback(nodeCreated)
        d.addCallback(rendered)
        return d


    def test_postUnknownNode(self):
        """
        If the node to be deleted is unknown, 404 Not Found is returned.
        """
        request = DummyRequest([b''])
        request.method = b'POST'

        def rendered(result):
            self.assertEqual(http.NOT_FOUND, request.responseCode)

        uri = gateway.getXMPPURI(componentJID, u'unknown')
        request.args[b'uri'] = [uri]
        request.content = StringIO(b'')

        d = _render(self.resource, request)
        d.addCallback(rendered)
        return d


    def test_postMalformedXMPPURI(self):
        """
        If the XMPP URI is malformed, Bad Request is returned.
        """
        request = DummyRequest([b''])
        request.method = b'POST'

        def rendered(result):
            self.assertEqual(http.BAD_REQUEST, request.responseCode)

        uri = 'xmpp:@@@@'
        request.args[b'uri'] = [uri]
        request.content = StringIO(b'')

        d = _render(self.resource, request)
        d.addCallback(rendered)
        return d


    def test_postURIMissing(self):
        """
        If no URI is passed, 400 Bad Request is returned.
        """
        request = DummyRequest([b''])
        request.method = b'POST'

        def rendered(result):
            self.assertEqual(http.BAD_REQUEST, request.responseCode)

        request.content = StringIO(b'')

        d = _render(self.resource, request)
        d.addCallback(rendered)
        return d



class CallbackResourceTest(unittest.TestCase):
    """
    Tests for L{gateway.CallbackResource}.
    """

    def setUp(self):
        self.callbackEvents = []
        self.resource = gateway.CallbackResource(self._callback)


    def _callback(self, payload, headers):
        self.callbackEvents.append((payload, headers))


    def test_get(self):
        """
        The method GET is not supported.
        """
        request = DummyRequest([b''])
        self.assertRaises(error.UnsupportedMethod,
                          _render, self.resource, request)


    def test_post(self):
        """
        The body posted is passed to the callback.
        """
        request = DummyRequest([b''])
        request.method = 'POST'
        request.content = StringIO(b'<root><child/></root>')

        def rendered(result):
            self.assertEqual(1, len(self.callbackEvents))
            payload, headers = self.callbackEvents[-1]
            self.assertEqual('root', payload.name)

            self.assertEqual(http.NO_CONTENT, request.responseCode)
            self.assertFalse(b''.join(request.written))

        d = _render(self.resource, request)
        d.addCallback(rendered)
        return d


    def test_postEvent(self):
        """
        If the Event header is set, the payload is empty and the header passed.
        """
        request = DummyRequest([b''])
        request.method = 'POST'
        request.requestHeaders.addRawHeader(b'Event', b'DELETE')
        request.content = StringIO(b'')

        def rendered(result):
            self.assertEqual(1, len(self.callbackEvents))
            payload, headers = self.callbackEvents[-1]
            self.assertIdentical(None, payload)
            self.assertEqual(['DELETE'], headers.getRawHeaders(b'Event'))
            self.assertFalse(b''.join(request.written))

        d = _render(self.resource, request)
        d.addCallback(rendered)
        return d



class GatewayTest(unittest.TestCase):
    timeout = 2

    def setUp(self):
        self.client = gateway.GatewayClient(baseURI)
        self.client.startService()
        self.addCleanup(self.client.stopService)

        def trapConnectionRefused(failure):
            from twisted.internet.error import ConnectionRefusedError
            failure.trap(ConnectionRefusedError)
            raise unittest.SkipTest("Gateway to test against is not available")

        def trapNotFound(failure):
            from twisted.web.error import Error
            failure.trap(Error)

        d = self.client.ping()
        d.addErrback(trapConnectionRefused)
        d.addErrback(trapNotFound)
        return d


    def tearDown(self):
        return self.client.stopService()


    def test_create(self):

        def cb(response):
            self.assertIn('uri', response)

        d = self.client.create()
        d.addCallback(cb)
        return d

    def test_publish(self):

        def cb(response):
            self.assertIn('uri', response)

        d = self.client.publish(TEST_ENTRY)
        d.addCallback(cb)
        return d

    def test_publishExistingNode(self):

        def cb2(response, xmppURI):
            self.assertEquals(xmppURI, response['uri'])

        def cb1(response):
            xmppURI = response['uri']
            d = self.client.publish(TEST_ENTRY, xmppURI)
            d.addCallback(cb2, xmppURI)
            return d

        d = self.client.create()
        d.addCallback(cb1)
        return d

    def test_publishNonExisting(self):
        def cb(err):
            self.assertEqual('404', err.status)

        d = self.client.publish(TEST_ENTRY, 'xmpp:%s?node=test' % component)
        self.assertFailure(d, error.Error)
        d.addCallback(cb)
        return d

    def test_delete(self):
        def cb(response):
            xmppURI = response['uri']
            d = self.client.delete(xmppURI)
            return d

        d = self.client.create()
        d.addCallback(cb)
        return d

    def test_deleteWithRedirect(self):
        def cb(response):
            xmppURI = response['uri']
            redirectURI = 'xmpp:%s?node=test' % component
            d = self.client.delete(xmppURI, redirectURI)
            return d

        d = self.client.create()
        d.addCallback(cb)
        return d

    def test_deleteNotification(self):
        def onNotification(data, headers):
            try:
                self.assertTrue(headers.hasHeader('Event'))
                self.assertEquals(['DELETED'], headers.getRawHeaders('Event'))
                self.assertFalse(headers.hasHeader('Link'))
            except:
                self.client.deferred.errback()
            else:
                self.client.deferred.callback(None)

        def cb(response):
            xmppURI = response['uri']
            d = self.client.subscribe(xmppURI)
            d.addCallback(lambda _: xmppURI)
            return d

        def cb2(xmppURI):
            d = self.client.delete(xmppURI)
            return d

        self.client.callback = onNotification
        self.client.deferred = defer.Deferred()
        d = self.client.create()
        d.addCallback(cb)
        d.addCallback(cb2)
        return defer.gatherResults([d, self.client.deferred])

    def test_deleteNotificationWithRedirect(self):
        redirectURI = 'xmpp:%s?node=test' % component

        def onNotification(data, headers):
            try:
                self.assertTrue(headers.hasHeader('Event'))
                self.assertEquals(['DELETED'], headers.getRawHeaders('Event'))
                self.assertEquals(['<%s>; rel=alternate' % redirectURI],
                                  headers.getRawHeaders('Link'))
            except:
                self.client.deferred.errback()
            else:
                self.client.deferred.callback(None)

        def cb(response):
            xmppURI = response['uri']
            d = self.client.subscribe(xmppURI)
            d.addCallback(lambda _: xmppURI)
            return d

        def cb2(xmppURI):
            d = self.client.delete(xmppURI, redirectURI)
            return d

        self.client.callback = onNotification
        self.client.deferred = defer.Deferred()
        d = self.client.create()
        d.addCallback(cb)
        d.addCallback(cb2)
        return defer.gatherResults([d, self.client.deferred])

    def test_list(self):
        d = self.client.listNodes()
        return d

    def test_subscribe(self):
        def cb(response):
            xmppURI = response['uri']
            d = self.client.subscribe(xmppURI)
            return d

        d = self.client.create()
        d.addCallback(cb)
        return d

    def test_subscribeGetNotification(self):

        def onNotification(data, headers):
            self.client.deferred.callback(None)

        def cb(response):
            xmppURI = response['uri']
            d = self.client.subscribe(xmppURI)
            d.addCallback(lambda _: xmppURI)
            return d

        def cb2(xmppURI):
            d = self.client.publish(TEST_ENTRY, xmppURI)
            return d


        self.client.callback = onNotification
        self.client.deferred = defer.Deferred()
        d = self.client.create()
        d.addCallback(cb)
        d.addCallback(cb2)
        return defer.gatherResults([d, self.client.deferred])


    def test_subscribeTwiceGetNotification(self):

        def onNotification1(data, headers):
            d = client1.stopService()
            d.chainDeferred(client1.deferred)

        def onNotification2(data, headers):
            d = client2.stopService()
            d.chainDeferred(client2.deferred)

        def cb(response):
            xmppURI = response['uri']
            d = client1.subscribe(xmppURI)
            d.addCallback(lambda _: xmppURI)
            return d

        def cb2(xmppURI):
            d = client2.subscribe(xmppURI)
            d.addCallback(lambda _: xmppURI)
            return d

        def cb3(xmppURI):
            d = self.client.publish(TEST_ENTRY, xmppURI)
            return d


        client1 = gateway.GatewayClient(baseURI, callbackPort=8088)
        client1.startService()
        client1.callback = onNotification1
        client1.deferred = defer.Deferred()
        client2 = gateway.GatewayClient(baseURI, callbackPort=8089)
        client2.startService()
        client2.callback = onNotification2
        client2.deferred = defer.Deferred()

        d = self.client.create()
        d.addCallback(cb)
        d.addCallback(cb2)
        d.addCallback(cb3)
        dl = defer.gatherResults([d, client1.deferred, client2.deferred])
        return dl


    def test_subscribeGetDelayedNotification(self):

        def onNotification(data, headers):
            self.client.deferred.callback(None)

        def cb(response):
            xmppURI = response['uri']
            self.assertNot(self.client.deferred.called)
            d = self.client.publish(TEST_ENTRY, xmppURI)
            d.addCallback(lambda _: xmppURI)
            return d

        def cb2(xmppURI):
            d = self.client.subscribe(xmppURI)
            return d


        self.client.callback = onNotification
        self.client.deferred = defer.Deferred()
        d = self.client.create()
        d.addCallback(cb)
        d.addCallback(cb2)
        return defer.gatherResults([d, self.client.deferred])

    def test_subscribeGetDelayedNotification2(self):
        """
        Test that subscribing as second results in a notification being sent.
        """

        def onNotification1(data, headers):
            client1.deferred.callback(None)
            client1.stopService()

        def onNotification2(data, headers):
            client2.deferred.callback(None)
            client2.stopService()

        def cb(response):
            xmppURI = response['uri']
            self.assertNot(client1.deferred.called)
            self.assertNot(client2.deferred.called)
            d = self.client.publish(TEST_ENTRY, xmppURI)
            d.addCallback(lambda _: xmppURI)
            return d

        def cb2(xmppURI):
            d = client1.subscribe(xmppURI)
            d.addCallback(lambda _: xmppURI)
            return d

        def cb3(xmppURI):
            d = client2.subscribe(xmppURI)
            return d

        client1 = gateway.GatewayClient(baseURI, callbackPort=8088)
        client1.startService()
        client1.callback = onNotification1
        client1.deferred = defer.Deferred()
        client2 = gateway.GatewayClient(baseURI, callbackPort=8089)
        client2.startService()
        client2.callback = onNotification2
        client2.deferred = defer.Deferred()


        d = self.client.create()
        d.addCallback(cb)
        d.addCallback(cb2)
        d.addCallback(cb3)
        dl = defer.gatherResults([d, client1.deferred, client2.deferred])
        return dl


    def test_subscribeNonExisting(self):
        def cb(err):
            self.assertEqual('403', err.status)

        d = self.client.subscribe('xmpp:%s?node=test' % component)
        self.assertFailure(d, error.Error)
        d.addCallback(cb)
        return d


    def test_subscribeRootGetNotification(self):

        def clean(rootNode):
            return self.client.unsubscribe(rootNode)

        def onNotification(data, headers):
            self.client.deferred.callback(None)

        def cb(response):
            xmppURI = response['uri']
            jid, nodeIdentifier = gateway.getServiceAndNode(xmppURI)
            rootNode = gateway.getXMPPURI(jid, '')

            d = self.client.subscribe(rootNode)
            d.addCallback(lambda _: self.addCleanup(clean, rootNode))
            d.addCallback(lambda _: xmppURI)
            return d

        def cb2(xmppURI):
            return self.client.publish(TEST_ENTRY, xmppURI)


        self.client.callback = onNotification
        self.client.deferred = defer.Deferred()
        d = self.client.create()
        d.addCallback(cb)
        d.addCallback(cb2)
        return defer.gatherResults([d, self.client.deferred])


    def test_unsubscribeNonExisting(self):
        def cb(err):
            self.assertEqual('403', err.status)

        d = self.client.unsubscribe('xmpp:%s?node=test' % component)
        self.assertFailure(d, error.Error)
        d.addCallback(cb)
        return d


    def test_items(self):
        def cb(response):
            xmppURI = response['uri']
            d = self.client.items(xmppURI)
            return d

        d = self.client.publish(TEST_ENTRY)
        d.addCallback(cb)
        return d


    def test_itemsMaxItems(self):
        def cb(response):
            xmppURI = response['uri']
            d = self.client.items(xmppURI, 2)
            return d

        d = self.client.publish(TEST_ENTRY)
        d.addCallback(cb)
        return d