diff libervia/backend/test/test_plugin_xep_0033.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/test/test_plugin_xep_0033.py@524856bd7b19
children 0d7bb4df2343
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/test/test_plugin_xep_0033.py	Fri Jun 02 11:49:51 2023 +0200
@@ -0,0 +1,211 @@
+#!/usr/bin/env python3
+
+
+# SAT: a jabber client
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+# Copyright (C) 2013-2016 Adrien Cossa (souliane@mailoo.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+""" Plugin extended addressing stanzas """
+
+from .constants import Const
+from libervia.backend.test import helpers
+from libervia.backend.plugins import plugin_xep_0033 as plugin
+from libervia.backend.core.exceptions import CancelError
+from twisted.internet import defer
+from wokkel.generic import parseXml
+from twisted.words.protocols.jabber.jid import JID
+
+PROFILE_INDEX = 0
+PROFILE = Const.PROFILE[PROFILE_INDEX]
+JID_STR_FROM = Const.JID_STR[1]
+JID_STR_TO = Const.PROFILE_DICT[PROFILE].host
+JID_STR_X_TO = Const.JID_STR[0]
+JID_STR_X_CC = Const.JID_STR[1]
+JID_STR_X_BCC = Const.JID_STR[2]
+
+ADDRS = ("to", JID_STR_X_TO, "cc", JID_STR_X_CC, "bcc", JID_STR_X_BCC)
+
+
+class XEP_0033Test(helpers.SatTestCase):
+    def setUp(self):
+        self.host = helpers.FakeSAT()
+        self.plugin = plugin.XEP_0033(self.host)
+
+    def test_message_received(self):
+        self.host.memory.reinit()
+        xml = """
+        <message type="chat" from="%s" to="%s" id="test_1">
+            <body>test</body>
+            <addresses xmlns='http://jabber.org/protocol/address'>
+                <address type='to' jid='%s'/>
+                <address type='cc' jid='%s'/>
+                <address type='bcc' jid='%s'/>
+            </addresses>
+        </message>
+        """ % (
+            JID_STR_FROM,
+            JID_STR_TO,
+            JID_STR_X_TO,
+            JID_STR_X_CC,
+            JID_STR_X_BCC,
+        )
+        stanza = parseXml(xml.encode("utf-8"))
+        treatments = defer.Deferred()
+        self.plugin.message_received_trigger(
+            self.host.get_client(PROFILE), stanza, treatments
+        )
+        data = {"extra": {}}
+
+        def cb(data):
+            expected = ("to", JID_STR_X_TO, "cc", JID_STR_X_CC, "bcc", JID_STR_X_BCC)
+            msg = "Expected: %s\nGot:      %s" % (expected, data["extra"]["addresses"])
+            self.assertEqual(
+                data["extra"]["addresses"], "%s:%s\n%s:%s\n%s:%s\n" % expected, msg
+            )
+
+        treatments.addCallback(cb)
+        return treatments.callback(data)
+
+    def _get_mess_data(self):
+        mess_data = {
+            "to": JID(JID_STR_TO),
+            "type": "chat",
+            "message": "content",
+            "extra": {},
+        }
+        mess_data["extra"]["address"] = "%s:%s\n%s:%s\n%s:%s\n" % ADDRS
+        original_stanza = """
+        <message type="chat" from="%s" to="%s" id="test_1">
+            <body>content</body>
+        </message>
+        """ % (
+            JID_STR_FROM,
+            JID_STR_TO,
+        )
+        mess_data["xml"] = parseXml(original_stanza.encode("utf-8"))
+        return mess_data
+
+    def _assert_addresses(self, mess_data):
+        """The mess_data that we got here has been modified by self.plugin.messageSendTrigger,
+        check that the addresses element has been added to the stanza."""
+        expected = self._get_mess_data()["xml"]
+        addresses_extra = (
+            """
+        <addresses xmlns='http://jabber.org/protocol/address'>
+            <address type='%s' jid='%s'/>
+            <address type='%s' jid='%s'/>
+            <address type='%s' jid='%s'/>
+        </addresses>"""
+            % ADDRS
+        )
+        addresses_element = parseXml(addresses_extra.encode("utf-8"))
+        expected.addChild(addresses_element)
+        self.assert_equal_xml(
+            mess_data["xml"].toXml().encode("utf-8"), expected.toXml().encode("utf-8")
+        )
+
+    def _check_sent_and_stored(self):
+        """Check that all the recipients got their messages and that the history has been filled.
+        /!\ see the comments in XEP_0033.send_and_store_message"""
+        sent = []
+        stored = []
+        d_list = []
+
+        def cb(entities, to_jid):
+            if host in entities:
+                if (
+                    host not in sent
+                ):  # send the message to the entity offering the feature
+                    sent.append(host)
+                    stored.append(host)
+                stored.append(to_jid)  # store in history for each recipient
+            else:  # feature not supported, use normal behavior
+                sent.append(to_jid)
+                stored.append(to_jid)
+            helpers.unmute_logging()
+
+        for to_s in (JID_STR_X_TO, JID_STR_X_CC, JID_STR_X_BCC):
+            to_jid = JID(to_s)
+            host = JID(to_jid.host)
+            helpers.mute_logging()
+            d = self.host.find_features_set([plugin.NS_ADDRESS], jid_=host, profile=PROFILE)
+            d.addCallback(cb, to_jid)
+            d_list.append(d)
+
+        def cb_list(__):
+            msg = "/!\ see the comments in XEP_0033.send_and_store_message"
+            sent_recipients = [
+                JID(elt["to"]) for elt in self.host.get_sent_messages(PROFILE_INDEX)
+            ]
+            self.assert_equal_unsorted_list(sent_recipients, sent, msg)
+            self.assert_equal_unsorted_list(self.host.stored_messages, stored, msg)
+
+        return defer.DeferredList(d_list).addCallback(cb_list)
+
+    def _trigger(self, data):
+        """Execute self.plugin.messageSendTrigger with a different logging
+        level to not pollute the output, then check that the plugin did its
+        job. It should abort sending the message or add the extended
+        addressing information to the stanza.
+        @param data: the data to be processed by self.plugin.messageSendTrigger
+        """
+        pre_treatments = defer.Deferred()
+        post_treatments = defer.Deferred()
+        helpers.mute_logging()
+        self.plugin.messageSendTrigger(
+            self.host.get_client[PROFILE], data, pre_treatments, post_treatments
+        )
+        post_treatments.callback(data)
+        helpers.unmute_logging()
+        post_treatments.addCallbacks(
+            self._assert_addresses, lambda failure: failure.trap(CancelError)
+        )
+        return post_treatments
+
+    def test_message_send_trigger_feature_not_supported(self):
+        # feature is not supported, abort the message
+        self.host.memory.reinit()
+        data = self._get_mess_data()
+        return self._trigger(data)
+
+    def test_message_send_trigger_feature_supported(self):
+        # feature is supported by the main target server
+        self.host.reinit()
+        self.host.add_feature(JID(JID_STR_TO), plugin.NS_ADDRESS, PROFILE)
+        data = self._get_mess_data()
+        d = self._trigger(data)
+        return d.addCallback(lambda __: self._check_sent_and_stored())
+
+    def test_message_send_trigger_feature_fully_supported(self):
+        # feature is supported by all target servers
+        self.host.reinit()
+        self.host.add_feature(JID(JID_STR_TO), plugin.NS_ADDRESS, PROFILE)
+        for dest in (JID_STR_X_TO, JID_STR_X_CC, JID_STR_X_BCC):
+            self.host.add_feature(JID(JID(dest).host), plugin.NS_ADDRESS, PROFILE)
+        data = self._get_mess_data()
+        d = self._trigger(data)
+        return d.addCallback(lambda __: self._check_sent_and_stored())
+
+    def test_message_send_trigger_fix_wrong_entity(self):
+        # check that a wrong recipient entity is fixed by the backend
+        self.host.reinit()
+        self.host.add_feature(JID(JID_STR_TO), plugin.NS_ADDRESS, PROFILE)
+        for dest in (JID_STR_X_TO, JID_STR_X_CC, JID_STR_X_BCC):
+            self.host.add_feature(JID(JID(dest).host), plugin.NS_ADDRESS, PROFILE)
+        data = self._get_mess_data()
+        data["to"] = JID(JID_STR_X_TO)
+        d = self._trigger(data)
+        return d.addCallback(lambda __: self._check_sent_and_stored())