view src/test/test_plugin_xep_0033.py @ 789:0cb423500fbb

test: use the SatTestCase methods instead of builtin "assert" in tests for memory, plugin xep-0033
author souliane <souliane@mailoo.org>
date Tue, 07 Jan 2014 09:27:53 +0100
parents dd656d745d6a
children 2136be5a44a8
line wrap: on
line source

#!/usr/bin/python
# -*- coding: utf-8 -*-

# SAT: a jabber client
# Copyright (C) 2009, 2010, 2011, 2012, 2013  Jérôme Poisson (goffi@goffi.org)
# Copyright (C) 2013  Adrien Cossa (souliane@mailoo.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

""" Plugin extended addressing stanzas """

from constants import Const
from sat.test import helpers
from sat.plugins import plugin_xep_0033 as plugin
from sat.memory.memory import NO_SECURITY_LIMIT
from sat.core.sat_main import AbortSendMessage, MessageSentAndStored
from copy import deepcopy
from twisted.internet import defer
from wokkel.generic import parseXml
from twisted.words.protocols.jabber.jid import JID


class XEP_0033Test(helpers.SatTestCase):

    def setUp(self):
        self.host = helpers.FakeSAT()
        self.plugin = plugin.XEP_0033(self.host)

    def test_messageReceived(self):
        self.host.memory.init()
        xml = u"""
        <message type="chat" from="%s" to="%s" id="test_1">
            <body>test</body>
            <addresses xmlns='http://jabber.org/protocol/address'>
                <address type='to' jid='%s'/>
                <address type='cc' jid='%s'/>
                <address type='bcc' jid='%s'/>
            </addresses>
        </message>
        """ % (Const.TEST_JID_2_STR, self.host.getClientHostJid(Const.TEST_PROFILE),
               Const.TEST_JID_STR, Const.TEST_JID_2_STR, Const.TEST_JID_3_STR)
        stanza = parseXml(xml.encode("utf-8"))
        treatments = defer.Deferred()
        self.plugin.messageReceivedTrigger(stanza, treatments, Const.TEST_PROFILE)
        data = {'extra': {}}

        def cb(data):
            expected = ('to', Const.TEST_JID_STR, 'cc', Const.TEST_JID_2_STR, 'bcc', Const.TEST_JID_3_STR)
            msg = 'Expected: %s\nGot:      %s' % (expected, data['extra']['addresses'])
            self.assertEqual(data['extra']['addresses'], '%s:%s\n%s:%s\n%s:%s\n' % expected, msg)

        treatments.addCallback(cb)
        treatments.callback(data)

    def test_sendMessageTrigger(self):
        mess_data = {"to": self.host.getClientHostJid(Const.TEST_PROFILE),
                     "type": "chat",
                     "message": "content",
                     "extra": {}
                     }
        addresses = ('to', Const.TEST_JID_STR, 'cc', Const.TEST_JID_2_STR, 'bcc', Const.TEST_JID_3_STR)
        mess_data["extra"]["address"] = '%s:%s\n%s:%s\n%s:%s\n' % addresses
        original_stanza = u"""
        <message type="chat" from="%s" to="%s" id="test_1">
            <body>content</body>
        </message>
        """ % (Const.TEST_JID_2_STR, self.host.getClientHostJid(Const.TEST_PROFILE))
        mess_data['xml'] = parseXml(original_stanza.encode("utf-8"))
        expected = deepcopy(mess_data['xml'])
        addresses_extra = """
        <addresses xmlns='http://jabber.org/protocol/address'>
            <address type='%s' jid='%s'/>
            <address type='%s' jid='%s'/>
            <address type='%s' jid='%s'/>
        </addresses>""" % addresses
        addresses_element = parseXml(addresses_extra.encode('utf-8'))
        expected.addChild(addresses_element)

        def assertAddresses(mess_data):
            """The mess_data that we got here has been modified by self.plugin.sendMessageTrigger,
            check that the addresses element has been added to the stanza."""
            self.assertEqualXML(mess_data['xml'].toXml().encode("utf-8"), expected.toXml().encode("utf-8"))

        def sendMessageErrback(failure, exception_class):
            """If the failure does encapsulate the expected exception, it will be silently
            trapped, otherwise it will be re-raised and will make the test fail"""
            if exception_class == MessageSentAndStored:
                assertAddresses(failure.value.mess_data)
            failure.trap(exception_class)

        def checkSentAndStored():
            """Check that all the recipients got their messages and that the history has been filled.
            /!\ see the comments in XEP_0033.sendAndStoreMessage"""
            sent = []
            stored = []
            cache = set()
            for to_s in [addresses[1], addresses[3], addresses[5]]:
                to_jid = JID(to_s)
                host = JID(to_jid.host)
                if self.host.memory.hasServerFeature(plugin.NS_ADDRESS, host, Const.TEST_PROFILE):
                    if host not in cache:
                        sent.append(host)
                        stored.append(host)
                        cache.add(host)
                    stored.append(to_jid)
                else:
                    sent.append(to_jid)
                    stored.append(to_jid)
            msg = "/!\ see the comments in XEP_0033.sendAndStoreMessage"
            self.assertEqualUnsortedList(self.host.sent_messages, sent, msg)
            self.assertEqualUnsortedList(self.host.stored_messages, stored, msg)

        # feature is not supported, abort the message
        self.host.memory.init()
        treatments = defer.Deferred()
        data = deepcopy(mess_data)
        self.plugin.sendMessageTrigger(data, treatments, Const.TEST_PROFILE)
        treatments.addCallbacks(assertAddresses, lambda failure: sendMessageErrback(failure, AbortSendMessage))
        treatments.callback(data)

        # feature is supported
        self.host.init()
        self.host.memory.init()
        self.host.memory.addServerFeature(plugin.NS_ADDRESS, self.host.getClientHostJid(Const.TEST_PROFILE), Const.TEST_PROFILE)
        treatments = defer.Deferred()
        data = deepcopy(mess_data)
        self.plugin.sendMessageTrigger(data, treatments, Const.TEST_PROFILE)
        treatments.addCallbacks(assertAddresses, lambda failure: sendMessageErrback(failure, MessageSentAndStored))
        treatments.callback(data)
        checkSentAndStored()

        # check that a wrong recipient entity is fixed by the backend
        self.host.init()
        self.host.memory.init()
        self.host.memory.addServerFeature(plugin.NS_ADDRESS, self.host.getClientHostJid(Const.TEST_PROFILE), Const.TEST_PROFILE)
        treatments = defer.Deferred()
        data = deepcopy(mess_data)
        data["to"] = Const.TEST_JID
        self.plugin.sendMessageTrigger(data, treatments, Const.TEST_PROFILE)
        treatments.addCallbacks(assertAddresses, lambda failure: sendMessageErrback(failure, MessageSentAndStored))
        treatments.callback(mess_data)
        checkSentAndStored()