view src/tmp/wokkel/test/test_rsm.py @ 1785:ac73d7456abc

tmp (wokkel.mam): recipient and sender are handled in MAM Request. TODO: MAMRequest should inherit from generic.Stanza, the same way as for PubSubRequest
author Goffi <goffi@goffi.org>
date Fri, 08 Jan 2016 18:54:23 +0100
parents 0fb5785b4c63
children ec764bfb146d
line wrap: on
line source

# Copyright (c) Adrien Cossa.
# See LICENSE for details.

"""
Tests for L{wokkel.rsm}.
"""

from zope.interface import verify

from twisted.trial import unittest
from twisted.words.xish import domish
from twisted.words.protocols.jabber.jid import JID
from twisted.words.protocols.jabber.xmlstream import toResponse
from twisted.internet import defer

from wokkel.generic import parseXml
from wokkel import iwokkel
from wokkel.test.helpers import XmlStreamStub, TestableRequestHandlerMixin

from sat.tmp.wokkel import pubsub
from sat.tmp.wokkel.rsm import NS_RSM, RSMRequest, RSMResponse, PubSubClient, PubSubService

import uuid


class RSMRequestTest(unittest.TestCase):
    """
    Tests for L{rsm.RSMRequest}.
    """

    def test___init__(self):
        """
        Fail to initialize a RSMRequest with wrong attribute values.
        """
        self.assertRaises(AssertionError, RSMRequest, index=371, after=u'test')
        self.assertRaises(AssertionError, RSMRequest, index=371, before=u'test')
        self.assertRaises(AssertionError, RSMRequest, before=117)
        self.assertRaises(AssertionError, RSMRequest, after=312)
        self.assertRaises(AssertionError, RSMRequest, after=u'117', before=u'312')

    def test_parse(self):
        """
        Parse a request element asking for the first page.
        """
        xml = """
        <query xmlns='jabber:iq:search'>
          <nick>Pete</nick>
          <set xmlns='http://jabber.org/protocol/rsm'>
            <max>1</max>
          </set>
        </query>
        """
        request = RSMRequest.parse(parseXml(xml))
        self.assertEqual(1, request.max)
        self.assertIdentical(None, request.index)
        self.assertIdentical(None, request.after)
        self.assertIdentical(None, request.before)

    def test_parseSecondPage(self):
        """
        Parse a request element asking for a next page.
        """
        xml = """
        <query xmlns='jabber:iq:search'>
          <nick>Pete</nick>
          <set xmlns='http://jabber.org/protocol/rsm'>
            <max>3</max>
            <after>peterpan@neverland.lit</after>
          </set>
        </query>
        """
        request = RSMRequest.parse(parseXml(xml))
        self.assertEqual(3, request.max)
        self.assertIdentical(None, request.index)
        self.assertEqual(u'peterpan@neverland.lit', request.after)
        self.assertIdentical(None, request.before)

    def test_parsePreviousPage(self):
        """
        Parse a request element asking for a previous page.
        """
        xml = """
        <query xmlns='jabber:iq:search'>
          <nick>Pete</nick>
          <set xmlns='http://jabber.org/protocol/rsm'>
            <max>5</max>
            <before>peterpan@pixyland.org</before>
          </set>
        </query>
        """
        request = RSMRequest.parse(parseXml(xml))
        self.assertEqual(5, request.max)
        self.assertIdentical(None, request.index)
        self.assertIdentical(None, request.after)
        self.assertEqual(u'peterpan@pixyland.org', request.before)

    def test_parseLastPage(self):
        """
        Parse a request element asking for the last page.
        """
        xml = """
        <query xmlns='jabber:iq:search'>
          <nick>Pete</nick>
          <set xmlns='http://jabber.org/protocol/rsm'>
            <max>7</max>
            <before/>
          </set>
        </query>
        """
        request = RSMRequest.parse(parseXml(xml))
        self.assertEqual(7, request.max)
        self.assertIdentical(None, request.index)
        self.assertIdentical(None, request.after)
        self.assertEqual('', request.before)

    def test_parseOutOfOrderPage(self):
        """
        Parse a request element asking for a page out of order.
        """
        xml = """
        <query xmlns='jabber:iq:search'>
          <nick>Pete</nick>
          <set xmlns='http://jabber.org/protocol/rsm'>
            <max>9</max>
            <index>371</index>
          </set>
        </query>
        """
        request = RSMRequest.parse(parseXml(xml))
        self.assertEqual(9, request.max)
        self.assertEqual(371, request.index)
        self.assertIdentical(None, request.after)
        self.assertIdentical(None, request.before)

    def test_parseItemCount(self):
        """
        Parse a request element asking for the items count.
        """
        xml = """
        <query xmlns='jabber:iq:search'>
          <nick>Pete</nick>
          <set xmlns='http://jabber.org/protocol/rsm'>
            <max>0</max>
          </set>
        </query>
        """
        request = RSMRequest.parse(parseXml(xml))
        self.assertEqual(0, request.max)
        self.assertIdentical(None, request.index)
        self.assertIdentical(None, request.after)
        self.assertIdentical(None, request.before)

    def test_render(self):
        """
        Embed a page request in the element.
        """
        element = domish.Element(('jabber:iq:search', 'query'))
        element.addElement('items')['max_items'] = u'10'
        RSMRequest(1).render(element)

        self.assertEqual(u'10', element.items['max_items'])  # not changed

        self.assertEqual(NS_RSM, element.set.uri)
        self.assertEqual(u'1', ''.join(element.set.max.children))
        self.assertIdentical(None, element.set.after)
        self.assertIdentical(None, element.set.before)
        self.assertIdentical(None, element.set.index)

    def test_renderPubSub(self):
        """
        Embed a page request in the pubsub element.
        """
        element = domish.Element((pubsub.NS_PUBSUB, 'pubsub'))
        element.addElement('items')['max_items'] = u'10'
        RSMRequest(3).render(element)

        self.assertEqual(u'3', element.items['max_items'])  # changed

        self.assertEqual(NS_RSM, element.set.uri)
        self.assertEqual(u'3', ''.join(element.set.max.children))
        self.assertIdentical(None, element.set.after)
        self.assertIdentical(None, element.set.before)
        self.assertIdentical(None, element.set.index)

    def test_renderItems(self):
        """
        Embed a page request in the element, specify items.
        """
        element = domish.Element(('jabber:iq:search', 'query'))
        RSMRequest(5, 127).render(element)
        self.assertEqual(NS_RSM, element.set.uri)
        self.assertEqual(u'5', ''.join(element.set.max.children))
        self.assertIdentical(None, element.set.after)
        self.assertIdentical(None, element.set.before)
        self.assertEqual(u'127', ''.join(element.set.index.children))

    def test_renderAfter(self):
        """
        Embed a page request in the element, specify after.
        """
        element = domish.Element(('jabber:iq:search', 'query'))
        RSMRequest(5, after=u'test').render(element)
        self.assertEqual(NS_RSM, element.set.uri)
        self.assertEqual(u'5', ''.join(element.set.max.children))
        self.assertEqual(u'test', ''.join(element.set.after.children))
        self.assertIdentical(None, element.set.before)
        self.assertIdentical(None, element.set.index)

    def test_renderBefore(self):
        """
        Embed a page request in the element, specify before.
        """
        element = domish.Element(('jabber:iq:search', 'query'))
        RSMRequest(5, before=u'test').render(element)
        self.assertEqual(NS_RSM, element.set.uri)
        self.assertEqual(u'5', ''.join(element.set.max.children))
        self.assertIdentical(None, element.set.after)
        self.assertEqual(u'test', ''.join(element.set.before.children))
        self.assertIdentical(None, element.set.index)


class RSMResponseTest(unittest.TestCase):
    """
    Tests for L{rsm.RSMResponse}.
    """

    def test___init__(self):
        """
        Fail to initialize a RSMResponse with wrong attribute values.
        """
        self.assertRaises(AssertionError, RSMResponse, count=u'1')
        self.assertRaises(AssertionError, RSMResponse, index=127, first=u'127')
        self.assertRaises(AssertionError, RSMResponse, index=127, last=u'351')
        self.assertRaises(AssertionError, RSMResponse, first=u'127', last=u'351')
        self.assertRaises(AssertionError, RSMResponse, index=u'127',
                          first=u'127', last=u'351')
        self.assertRaises(AssertionError, RSMResponse, index=127,
                          first=127, last=u'351')
        self.assertRaises(AssertionError, RSMResponse, index=127,
                          first=u'127', last=351)

    def test_parse(self):
        """
        Parse a response element returning a page.
        """
        xml = """
        <query xmlns='jabber:iq:search'>
          <set xmlns='http://jabber.org/protocol/rsm'>
            <first index='20'>stpeter@jabber.org</first>
            <last>peterpan@neverland.lit</last>
            <count>800</count>
          </set>
        </query>
        """
        response = RSMResponse.parse(parseXml(xml))
        self.assertEqual(800, response.count)
        self.assertEqual(20, response.index)
        self.assertEqual(u'stpeter@jabber.org', response.first)
        self.assertEqual(u'peterpan@neverland.lit', response.last)

    def test_parseEmptySet(self):
        """
        Parse a response element returning an empty set.
        """
        xml = """
        <query xmlns='jabber:iq:search'>
          <set xmlns='http://jabber.org/protocol/rsm'>
            <count>800</count>
          </set>
        </query>
        """
        response = RSMResponse.parse(parseXml(xml))
        self.assertEqual(800, response.count)
        self.assertIdentical(None, response.first)
        self.assertIdentical(None, response.last)
        self.assertIdentical(None, response.index)

    def test_render(self):
        """
        Embed a page response in the element.
        """
        element = domish.Element(('jabber:iq:search', 'query'))
        RSMResponse(800, 20, u'stpeter@jabber.org',
                    u'peterpan@neverland.lit').render(element)

        self.assertEqual(NS_RSM, element.set.uri)
        self.assertEqual(u'800', ''.join(element.set.count.children))
        self.assertEqual(u'stpeter@jabber.org',
                         ''.join(element.set.first.children))
        self.assertEqual(u'peterpan@neverland.lit',
                         ''.join(element.set.last.children))
        self.assertEqual(u'20', element.set.first['index'])

    def test_renderEmptySet(self):
        """
        Embed a page response in the element, for empty set.
        """
        element = domish.Element(('jabber:iq:search', 'query'))
        RSMResponse(800).render(element)

        self.assertEqual(NS_RSM, element.set.uri)
        self.assertEqual(u'800', ''.join(element.set.count.children))
        self.assertIdentical(None, element.set.first)
        self.assertIdentical(None, element.set.last)


class PubSubClientTest(unittest.TestCase):
    """
    Tests for L{rsm.PubSubClient}.
    """
    timeout = 2

    def setUp(self):
        self.stub = XmlStreamStub()
        self.protocol = PubSubClient()
        self.protocol.xmlstream = self.stub.xmlstream
        self.protocol.connectionInitialized()

    def test_items(self):
        """
        Test sending items request to get the first page.
        """
        def cb(items):
            self.assertEquals(2, len(items))
            self.assertEquals([item1, item2], items)
            rsm = self.protocol.getRSMResponse(ext_data['id'])
            self.assertDictEqual(rsm, {'count': '800', 'index': '0',
                                       'first': 'item1', 'last': 'item2'})

        ext_data = {'id': unicode(uuid.uuid4()), 'rsm': RSMRequest(2)}
        d = self.protocol.items(JID('pubsub.example.org'), 'test',
                                ext_data=ext_data)
        d.addCallback(cb)

        iq = self.stub.output[-1]
        self.assertEquals('pubsub.example.org', iq.getAttribute('to'))
        self.assertEquals('get', iq.getAttribute('type'))
        self.assertEquals('pubsub', iq.pubsub.name)
        self.assertEquals(pubsub.NS_PUBSUB, iq.pubsub.uri)
        children = list(domish.generateElementsQNamed(iq.pubsub.children,
                                                      'items', pubsub.NS_PUBSUB))
        self.assertEquals(1, len(children))
        child = children[0]
        self.assertEquals('test', child['node'])

        set_elts = list(domish.generateElementsQNamed(iq.pubsub.children,
                                                      'set', NS_RSM))
        self.assertEquals(1, len(set_elts))
        set_elt = set_elts[0]
        self.assertEquals(u'2', ''.join(set_elt.max.children))

        response = toResponse(iq, 'result')
        items = response.addElement((pubsub.NS_PUBSUB,
                                     'pubsub')).addElement('items')
        items['node'] = 'test'
        item1 = items.addElement('item')
        item1['id'] = 'item1'
        item2 = items.addElement('item')
        item2['id'] = 'item2'
        RSMResponse(800, 0, u'item1', u'item2').render(response.pubsub)
        self.stub.send(response)

        return d

    def test_itemsAfter(self):
        """
        Test sending items request to get the next page.
        """
        def cb(items):
            self.assertEquals(2, len(items))
            self.assertEquals([item1, item2], items)
            rsm = self.protocol.getRSMResponse(ext_data['id'])
            self.assertDictEqual(rsm, {'count': '800', 'index': '2',
                                       'first': 'item3', 'last': 'item4'})

        ext_data = {'id': unicode(uuid.uuid4()),
                    'rsm': RSMRequest(2, after=u'item2')}
        d = self.protocol.items(JID('pubsub.example.org'), 'test',
                                ext_data=ext_data)
        d.addCallback(cb)

        iq = self.stub.output[-1]
        self.assertEquals('pubsub.example.org', iq.getAttribute('to'))
        self.assertEquals('get', iq.getAttribute('type'))
        self.assertEquals('pubsub', iq.pubsub.name)
        self.assertEquals(pubsub.NS_PUBSUB, iq.pubsub.uri)
        children = list(domish.generateElementsQNamed(iq.pubsub.children,
                                                      'items', pubsub.NS_PUBSUB))
        self.assertEquals(1, len(children))
        child = children[0]
        self.assertEquals('test', child['node'])

        set_elts = list(domish.generateElementsQNamed(iq.pubsub.children,
                                                      'set', NS_RSM))
        self.assertEquals(1, len(set_elts))
        set_elt = set_elts[0]
        self.assertEquals(u'2', ''.join(set_elt.max.children))
        self.assertEquals(u'item2', ''.join(set_elt.after.children))

        response = toResponse(iq, 'result')
        items = response.addElement((pubsub.NS_PUBSUB,
                                     'pubsub')).addElement('items')
        items['node'] = 'test'
        item1 = items.addElement('item')
        item1['id'] = 'item3'
        item2 = items.addElement('item')
        item2['id'] = 'item4'
        RSMResponse(800, 2, u'item3', u'item4').render(response.pubsub)
        self.stub.send(response)

        return d

    def test_itemsBefore(self):
        """
        Test sending items request to get the previous page.
        """
        def cb(items):
            self.assertEquals(2, len(items))
            self.assertEquals([item1, item2], items)
            rsm = self.protocol.getRSMResponse(ext_data['id'])
            self.assertDictEqual(rsm, {'count': '800', 'index': '0',
                                       'first': 'item1', 'last': 'item2'})

        ext_data = {'id': unicode(uuid.uuid4()),
                    'rsm': RSMRequest(2, before=u'item3')}
        d = self.protocol.items(JID('pubsub.example.org'), 'test',
                                ext_data=ext_data)
        d.addCallback(cb)

        iq = self.stub.output[-1]
        self.assertEquals('pubsub.example.org', iq.getAttribute('to'))
        self.assertEquals('get', iq.getAttribute('type'))
        self.assertEquals('pubsub', iq.pubsub.name)
        self.assertEquals(pubsub.NS_PUBSUB, iq.pubsub.uri)
        children = list(domish.generateElementsQNamed(iq.pubsub.children,
                                                      'items', pubsub.NS_PUBSUB))
        self.assertEquals(1, len(children))
        child = children[0]
        self.assertEquals('test', child['node'])

        set_elts = list(domish.generateElementsQNamed(iq.pubsub.children,
                                                      'set', NS_RSM))
        self.assertEquals(1, len(set_elts))
        set_elt = set_elts[0]
        self.assertEquals(u'2', ''.join(set_elt.max.children))
        self.assertEquals(u'item3', ''.join(set_elt.before.children))

        response = toResponse(iq, 'result')
        items = response.addElement((pubsub.NS_PUBSUB,
                                     'pubsub')).addElement('items')
        items['node'] = 'test'
        item1 = items.addElement('item')
        item1['id'] = 'item1'
        item2 = items.addElement('item')
        item2['id'] = 'item2'
        RSMResponse(800, 0, u'item1', u'item2').render(response.pubsub)
        self.stub.send(response)

        return d

    def test_itemsIndex(self):
        """
        Test sending items request to get a page out of order.
        """
        def cb(items):
            self.assertEquals(3, len(items))
            self.assertEquals([item1, item2, item3], items)
            rsm = self.protocol.getRSMResponse(ext_data['id'])
            self.assertDictEqual(rsm, {'count': '800', 'index': '3',
                                       'first': 'item4', 'last': 'item6'})

        ext_data = {'id': unicode(uuid.uuid4()), 'rsm': RSMRequest(3, 3)}
        d = self.protocol.items(JID('pubsub.example.org'), 'test',
                                ext_data=ext_data)
        d.addCallback(cb)

        iq = self.stub.output[-1]
        self.assertEquals('pubsub.example.org', iq.getAttribute('to'))
        self.assertEquals('get', iq.getAttribute('type'))
        self.assertEquals('pubsub', iq.pubsub.name)
        self.assertEquals(pubsub.NS_PUBSUB, iq.pubsub.uri)
        children = list(domish.generateElementsQNamed(iq.pubsub.children,
                                                      'items', pubsub.NS_PUBSUB))
        self.assertEquals(1, len(children))
        child = children[0]
        self.assertEquals('test', child['node'])

        set_elts = list(domish.generateElementsQNamed(iq.pubsub.children,
                                                      'set', NS_RSM))
        self.assertEquals(1, len(set_elts))
        set_elt = set_elts[0]
        self.assertEquals(u'3', ''.join(set_elt.max.children))
        self.assertEquals(u'3', ''.join(set_elt.index.children))

        response = toResponse(iq, 'result')
        items = response.addElement((pubsub.NS_PUBSUB,
                                     'pubsub')).addElement('items')
        items['node'] = 'test'
        item1 = items.addElement('item')
        item1['id'] = 'item4'
        item2 = items.addElement('item')
        item2['id'] = 'item5'
        item3 = items.addElement('item')
        item3['id'] = 'item6'
        RSMResponse(800, 3, u'item4', u'item6').render(response.pubsub)
        self.stub.send(response)

        return d

    def test_itemsCount(self):
        """
        Test sending items request to count them.
        """
        def cb(items):
            self.assertEquals(0, len(items))
            rsm = self.protocol.getRSMResponse(ext_data['id'])
            self.assertDictEqual(rsm, {'count': '800'})

        ext_data = {'id': unicode(uuid.uuid4()), 'rsm': RSMRequest(0)}
        d = self.protocol.items(JID('pubsub.example.org'), 'test',
                                ext_data=ext_data)
        d.addCallback(cb)

        iq = self.stub.output[-1]
        self.assertEquals('pubsub.example.org', iq.getAttribute('to'))
        self.assertEquals('get', iq.getAttribute('type'))
        self.assertEquals('pubsub', iq.pubsub.name)
        self.assertEquals(pubsub.NS_PUBSUB, iq.pubsub.uri)
        children = list(domish.generateElementsQNamed(iq.pubsub.children,
                                                      'items', pubsub.NS_PUBSUB))
        self.assertEquals(1, len(children))
        child = children[0]
        self.assertEquals('test', child['node'])

        set_elts = list(domish.generateElementsQNamed(iq.pubsub.children,
                                                      'set', NS_RSM))
        self.assertEquals(1, len(set_elts))
        set_elt = set_elts[0]
        self.assertEquals(u'0', ''.join(set_elt.max.children))

        response = toResponse(iq, 'result')
        response.addElement((pubsub.NS_PUBSUB, 'pubsub'))
        RSMResponse(800).render(response.pubsub)
        self.stub.send(response)

        return d


class PubSubServiceTest(unittest.TestCase, TestableRequestHandlerMixin):

    def setUp(self):
        self.stub = XmlStreamStub()
        self.resource = pubsub.PubSubResource()
        self.service = PubSubService(self.resource)
        self.service.send = self.stub.xmlstream.send

    def test_on_items(self):
        """
        On a items request, return the first item for the given node.
        """
        xml = """
        <iq type='get' to='pubsub.example.org'
                       from='user@example.org'>
          <pubsub xmlns='http://jabber.org/protocol/pubsub'>
            <items node='test'/>
          </pubsub>
          <set xmlns='http://jabber.org/protocol/rsm'>
            <max>1</max>
          </set>
        </iq>
        """

        def items(request):
            rsm = RSMResponse(800, 0, u'item', u'item').toElement()
            return defer.succeed([pubsub.Item('current'), rsm])

        def cb(element):
            self.assertEqual(pubsub.NS_PUBSUB, element.uri)
            self.assertEqual(pubsub.NS_PUBSUB, element.items.uri)
            self.assertEqual(1, len(element.items.children))
            item = element.items.children[-1]
            self.assertTrue(domish.IElement.providedBy(item))
            self.assertEqual('item', item.name)
            self.assertEqual(pubsub.NS_PUBSUB, item.uri)
            self.assertEqual('current', item['id'])
            self.assertEqual(NS_RSM, element.set.uri)
            self.assertEqual('800', ''.join(element.set.count.children))
            self.assertEqual('0', element.set.first['index'])
            self.assertEqual('item', ''.join(element.set.first.children))
            self.assertEqual('item', ''.join(element.set.last.children))

        self.resource.items = items
        verify.verifyObject(iwokkel.IPubSubResource, self.resource)
        d = self.handleRequest(xml)
        d.addCallback(cb)
        return d

    def test_on_itemsIndex(self):
        """
        On a items request, return some items out of order for the given node.
        """
        xml = """
        <iq type='get' to='pubsub.example.org'
                       from='user@example.org'>
          <pubsub xmlns='http://jabber.org/protocol/pubsub'>
            <items node='test'/>
          </pubsub>
          <set xmlns='http://jabber.org/protocol/rsm'>
            <max>2</max>
            <index>3</index>
          </set>
        </iq>
        """

        def items(request):
            rsm = RSMResponse(800, 3, u'i1', u'i2').toElement()
            return defer.succeed([pubsub.Item('i1'), pubsub.Item('i2'), rsm])

        def cb(element):
            self.assertEqual(pubsub.NS_PUBSUB, element.uri)
            self.assertEqual(pubsub.NS_PUBSUB, element.items.uri)
            self.assertEqual(2, len(element.items.children))
            item = element.items.children[0]
            self.assertTrue(domish.IElement.providedBy(item))
            self.assertEqual('item', item.name)
            self.assertEqual(pubsub.NS_PUBSUB, item.uri)
            self.assertEqual('i1', item['id'])
            item = element.items.children[1]
            self.assertTrue(domish.IElement.providedBy(item))
            self.assertEqual('item', item.name)
            self.assertEqual(pubsub.NS_PUBSUB, item.uri)
            self.assertEqual('i2', item['id'])
            self.assertEqual(NS_RSM, element.set.uri)
            self.assertEqual('800', ''.join(element.set.count.children))
            self.assertEqual('3', element.set.first['index'])
            self.assertEqual('i1', ''.join(element.set.first.children))
            self.assertEqual('i2', ''.join(element.set.last.children))

        self.resource.items = items
        verify.verifyObject(iwokkel.IPubSubResource, self.resource)
        d = self.handleRequest(xml)
        d.addCallback(cb)
        return d

    def test_on_itemsCount(self):
        """
        On a items request, return the items count.
        """
        xml = """
        <iq type='get' to='pubsub.example.org'
                       from='user@example.org'>
          <pubsub xmlns='http://jabber.org/protocol/pubsub'>
            <items node='test'/>
          </pubsub>
          <set xmlns='http://jabber.org/protocol/rsm'>
            <max>0</max>
          </set>
        </iq>
        """

        def items(request):
            rsm = RSMResponse(800).toElement()
            return defer.succeed([rsm])

        def cb(element):
            self.assertEqual(pubsub.NS_PUBSUB, element.uri)
            self.assertEqual(pubsub.NS_PUBSUB, element.items.uri)
            self.assertEqual(0, len(element.items.children))
            self.assertEqual(NS_RSM, element.set.uri)
            self.assertEqual('800', ''.join(element.set.count.children))
            self.assertEqual(None, element.set.first)
            self.assertEqual(None, element.set.last)

        self.resource.items = items
        verify.verifyObject(iwokkel.IPubSubResource, self.resource)
        d = self.handleRequest(xml)
        d.addCallback(cb)
        return d