changeset 4307:6a0155f410bd

test (unit): add test for plugin XEP-0033: those replace the legacy XEP-0033 test from libervia/backend/test/test_plugin_xep_0033.py. rel 450
author Goffi <goffi@goffi.org>
date Thu, 26 Sep 2024 16:12:01 +0200
parents 94e0968987cd
children 472a938a46e3
files libervia/backend/test/test_plugin_xep_0033.py tests/unit/test_plugin_xep_0033.py
diffstat 2 files changed, 242 insertions(+), 213 deletions(-) [+]
line wrap: on
line diff
--- a/libervia/backend/test/test_plugin_xep_0033.py	Thu Sep 26 16:12:01 2024 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,213 +0,0 @@
-#!/usr/bin/env python3
-
-
-# SAT: a jabber client
-# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
-# Copyright (C) 2013-2016 Adrien Cossa (souliane@mailoo.org)
-
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-# GNU Affero General Public License for more details.
-
-# You should have received a copy of the GNU Affero General Public License
-# along with this program.  If not, see <http://www.gnu.org/licenses/>.
-
-""" Plugin extended addressing stanzas """
-
-from .constants import Const
-from libervia.backend.test import helpers
-from libervia.backend.plugins import plugin_xep_0033 as plugin
-from libervia.backend.core.exceptions import CancelError
-from twisted.internet import defer
-from wokkel.generic import parseXml
-from twisted.words.protocols.jabber.jid import JID
-
-PROFILE_INDEX = 0
-PROFILE = Const.PROFILE[PROFILE_INDEX]
-JID_STR_FROM = Const.JID_STR[1]
-JID_STR_TO = Const.PROFILE_DICT[PROFILE].host
-JID_STR_X_TO = Const.JID_STR[0]
-JID_STR_X_CC = Const.JID_STR[1]
-JID_STR_X_BCC = Const.JID_STR[2]
-
-ADDRS = ("to", JID_STR_X_TO, "cc", JID_STR_X_CC, "bcc", JID_STR_X_BCC)
-
-
-class XEP_0033Test(helpers.SatTestCase):
-    def setUp(self):
-        self.host = helpers.FakeSAT()
-        self.plugin = plugin.XEP_0033(self.host)
-
-    def test_message_received(self):
-        self.host.memory.reinit()
-        xml = """
-        <message type="chat" from="%s" to="%s" id="test_1">
-            <body>test</body>
-            <addresses xmlns='http://jabber.org/protocol/address'>
-                <address type='to' jid='%s'/>
-                <address type='cc' jid='%s'/>
-                <address type='bcc' jid='%s'/>
-            </addresses>
-        </message>
-        """ % (
-            JID_STR_FROM,
-            JID_STR_TO,
-            JID_STR_X_TO,
-            JID_STR_X_CC,
-            JID_STR_X_BCC,
-        )
-        stanza = parseXml(xml.encode("utf-8"))
-        treatments = defer.Deferred()
-        self.plugin.message_received_trigger(
-            self.host.get_client(PROFILE), stanza, treatments
-        )
-        data = {"extra": {}}
-
-        def cb(data):
-            expected = ("to", JID_STR_X_TO, "cc", JID_STR_X_CC, "bcc", JID_STR_X_BCC)
-            msg = "Expected: %s\nGot:      %s" % (expected, data["extra"]["addresses"])
-            self.assertEqual(
-                data["extra"]["addresses"], "%s:%s\n%s:%s\n%s:%s\n" % expected, msg
-            )
-
-        treatments.addCallback(cb)
-        return treatments.callback(data)
-
-    def _get_mess_data(self):
-        mess_data = {
-            "to": JID(JID_STR_TO),
-            "type": "chat",
-            "message": "content",
-            "extra": {},
-        }
-        mess_data["extra"]["address"] = "%s:%s\n%s:%s\n%s:%s\n" % ADDRS
-        original_stanza = """
-        <message type="chat" from="%s" to="%s" id="test_1">
-            <body>content</body>
-        </message>
-        """ % (
-            JID_STR_FROM,
-            JID_STR_TO,
-        )
-        mess_data["xml"] = parseXml(original_stanza.encode("utf-8"))
-        return mess_data
-
-    def _assert_addresses(self, mess_data):
-        """The mess_data that we got here has been modified by self.plugin.messageSendTrigger,
-        check that the addresses element has been added to the stanza."""
-        expected = self._get_mess_data()["xml"]
-        addresses_extra = (
-            """
-        <addresses xmlns='http://jabber.org/protocol/address'>
-            <address type='%s' jid='%s'/>
-            <address type='%s' jid='%s'/>
-            <address type='%s' jid='%s'/>
-        </addresses>"""
-            % ADDRS
-        )
-        addresses_element = parseXml(addresses_extra.encode("utf-8"))
-        expected.addChild(addresses_element)
-        self.assert_equal_xml(
-            mess_data["xml"].toXml().encode("utf-8"), expected.toXml().encode("utf-8")
-        )
-
-    def _check_sent_and_stored(self):
-        """Check that all the recipients got their messages and that the history has been filled.
-        /!\ see the comments in XEP_0033.send_and_store_message"""
-        sent = []
-        stored = []
-        d_list = []
-
-        def cb(entities, to_jid):
-            if host in entities:
-                if (
-                    host not in sent
-                ):  # send the message to the entity offering the feature
-                    sent.append(host)
-                    stored.append(host)
-                stored.append(to_jid)  # store in history for each recipient
-            else:  # feature not supported, use normal behavior
-                sent.append(to_jid)
-                stored.append(to_jid)
-            helpers.unmute_logging()
-
-        for to_s in (JID_STR_X_TO, JID_STR_X_CC, JID_STR_X_BCC):
-            to_jid = JID(to_s)
-            host = JID(to_jid.host)
-            helpers.mute_logging()
-            d = self.host.find_features_set(
-                [plugin.NS_ADDRESS], jid_=host, profile=PROFILE
-            )
-            d.addCallback(cb, to_jid)
-            d_list.append(d)
-
-        def cb_list(__):
-            msg = "/!\ see the comments in XEP_0033.send_and_store_message"
-            sent_recipients = [
-                JID(elt["to"]) for elt in self.host.get_sent_messages(PROFILE_INDEX)
-            ]
-            self.assert_equal_unsorted_list(sent_recipients, sent, msg)
-            self.assert_equal_unsorted_list(self.host.stored_messages, stored, msg)
-
-        return defer.DeferredList(d_list).addCallback(cb_list)
-
-    def _trigger(self, data):
-        """Execute self.plugin.messageSendTrigger with a different logging
-        level to not pollute the output, then check that the plugin did its
-        job. It should abort sending the message or add the extended
-        addressing information to the stanza.
-        @param data: the data to be processed by self.plugin.messageSendTrigger
-        """
-        pre_treatments = defer.Deferred()
-        post_treatments = defer.Deferred()
-        helpers.mute_logging()
-        self.plugin.messageSendTrigger(
-            self.host.get_client[PROFILE], data, pre_treatments, post_treatments
-        )
-        post_treatments.callback(data)
-        helpers.unmute_logging()
-        post_treatments.addCallbacks(
-            self._assert_addresses, lambda failure: failure.trap(CancelError)
-        )
-        return post_treatments
-
-    def test_message_send_trigger_feature_not_supported(self):
-        # feature is not supported, abort the message
-        self.host.memory.reinit()
-        data = self._get_mess_data()
-        return self._trigger(data)
-
-    def test_message_send_trigger_feature_supported(self):
-        # feature is supported by the main target server
-        self.host.reinit()
-        self.host.add_feature(JID(JID_STR_TO), plugin.NS_ADDRESS, PROFILE)
-        data = self._get_mess_data()
-        d = self._trigger(data)
-        return d.addCallback(lambda __: self._check_sent_and_stored())
-
-    def test_message_send_trigger_feature_fully_supported(self):
-        # feature is supported by all target servers
-        self.host.reinit()
-        self.host.add_feature(JID(JID_STR_TO), plugin.NS_ADDRESS, PROFILE)
-        for dest in (JID_STR_X_TO, JID_STR_X_CC, JID_STR_X_BCC):
-            self.host.add_feature(JID(JID(dest).host), plugin.NS_ADDRESS, PROFILE)
-        data = self._get_mess_data()
-        d = self._trigger(data)
-        return d.addCallback(lambda __: self._check_sent_and_stored())
-
-    def test_message_send_trigger_fix_wrong_entity(self):
-        # check that a wrong recipient entity is fixed by the backend
-        self.host.reinit()
-        self.host.add_feature(JID(JID_STR_TO), plugin.NS_ADDRESS, PROFILE)
-        for dest in (JID_STR_X_TO, JID_STR_X_CC, JID_STR_X_BCC):
-            self.host.add_feature(JID(JID(dest).host), plugin.NS_ADDRESS, PROFILE)
-        data = self._get_mess_data()
-        data["to"] = JID(JID_STR_X_TO)
-        d = self._trigger(data)
-        return d.addCallback(lambda __: self._check_sent_and_stored())
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/unit/test_plugin_xep_0033.py	Thu Sep 26 16:12:01 2024 +0200
@@ -0,0 +1,242 @@
+#!/usr/bin/env python3
+
+# Libervia: an XMPP client
+# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from typing import cast
+from unittest.mock import AsyncMock, MagicMock, patch
+
+from pytest_twisted import ensureDeferred as ed
+from twisted.words.protocols.jabber import jid
+from twisted.words.xish import domish
+
+from libervia.backend.models.core import MessageData
+from libervia.backend.plugins.plugin_xep_0033 import (
+    AddressType,
+    AddressesData,
+    NS_ADDRESS,
+    XEP_0033,
+)
+
+
+class TestXEP0033:
+    def test_address_type_from_element(self):
+        """
+        AddressType instance is correctly created from an XML <address> element.
+        """
+        address_elt = domish.Element((NS_ADDRESS, "address"))
+        address_elt["jid"] = "test@example.com"
+        address_elt["desc"] = "Test Description"
+        address_elt["delivered"] = "true"
+        address = AddressType.from_element(address_elt)
+        assert address.jid == jid.JID("test@example.com")
+        assert address.desc == "Test Description"
+        assert address.delivered is True
+
+    def test_address_type_to_element(self):
+        """
+        XML <address> element is correctly built from an AddressType instance.
+        """
+        address = AddressType(
+            jid=jid.JID("test@example.com"),
+            desc="Test Description",
+            delivered=True,
+        )
+        address_elt = address.to_element()
+        assert address_elt.uri == NS_ADDRESS
+        assert address_elt.name == "address"
+        assert address_elt["jid"] == "test@example.com"
+        assert address_elt["desc"] == "Test Description"
+        assert address_elt["delivered"] == "true"
+
+    def test_addresses_data_from_element(self):
+        """
+        AddressesData instance is correctly created from an XML <addresses> element.
+        """
+        addresses_elt = domish.Element((NS_ADDRESS, "addresses"))
+        address_elt1 = addresses_elt.addElement("address")
+        address_elt1["type"] = "to"
+        address_elt1["jid"] = "test1@example.com"
+        address_elt2 = addresses_elt.addElement("address")
+        address_elt2["type"] = "cc"
+        address_elt2["jid"] = "test2@example.com"
+        address_elt3 = addresses_elt.addElement("address")
+        address_elt3["type"] = "bcc"
+        address_elt3["jid"] = "test3@example.com"
+        address_elt4 = addresses_elt.addElement("address")
+        address_elt4["type"] = "noreply"
+        addresses = AddressesData.from_element(addresses_elt)
+        assert addresses.to is not None and len(addresses.to) == 1
+        assert addresses.to[0].jid == jid.JID("test1@example.com")
+        assert addresses.cc is not None and len(addresses.cc) == 1
+        assert addresses.cc[0].jid == jid.JID("test2@example.com")
+        assert addresses.bcc is not None and len(addresses.bcc) == 1
+        assert addresses.bcc[0].jid == jid.JID("test3@example.com")
+        assert addresses.noreply
+
+    def test_addresses_data_to_element(self):
+        """
+        XML <addresses> element is correctly built from an AddressesData instance.
+        """
+        addresses = AddressesData(
+            to=[AddressType(jid=jid.JID("test1@example.com"))],
+            cc=[AddressType(jid=jid.JID("test2@example.com"))],
+            bcc=[AddressType(jid=jid.JID("test3@example.com"))],
+            noreply=True,
+        )
+        addresses_elt = addresses.to_element()
+        assert addresses_elt.uri == NS_ADDRESS
+        assert addresses_elt.name == "addresses"
+        assert len(addresses_elt.children) == 4
+        for elt in addresses_elt.children:
+            assert elt.uri == NS_ADDRESS
+            assert elt.name == "address"
+        assert addresses_elt.children[0]["type"] == "to"
+        assert addresses_elt.children[0]["jid"] == "test1@example.com"
+        assert addresses_elt.children[1]["type"] == "cc"
+        assert addresses_elt.children[1]["jid"] == "test2@example.com"
+        assert addresses_elt.children[2]["type"] == "bcc"
+        assert addresses_elt.children[2]["jid"] == "test3@example.com"
+        assert addresses_elt.children[3]["type"] == "noreply"
+
+    @ed
+    async def test_handle_addresses(self):
+        """
+        Server JID is used, <addresses> element is added and messages are delivered.
+        """
+        xep_0033 = XEP_0033(MagicMock())
+        client = MagicMock()
+        client.server_jid = jid.JID("server.example.com")
+        client.profile = "test_profile"
+
+        mess_data = MessageData(
+            {
+                "to": jid.JID("recipient@example.com"),
+                "extra": {
+                    "addresses": {
+                        "to": [{"jid": "to@example.com"}],
+                        "cc": [{"jid": "cc@example.com"}],
+                    }
+                },
+                "xml": domish.Element(("jabber:client", "message")),
+            }
+        )
+
+        xep_0033.host.memory.disco.has_feature = AsyncMock(return_value=True)
+        xep_0033.deliver_messages = AsyncMock()
+
+        with patch.object(xep_0033, "_stop_if_all_delivered", AsyncMock()):
+            result = await xep_0033._handle_addresses(client, mess_data)
+            cast(AsyncMock, xep_0033._stop_if_all_delivered).assert_called_once()
+
+        assert result["to"] == client.server_jid
+        assert "addresses" in result["xml"].children[0].name
+        xep_0033.deliver_messages.assert_called_once()
+
+    @ed
+    async def test_deliver_messages(self):
+        """Delivery is done for all recipients."""
+        xep_0033 = XEP_0033(MagicMock())
+        client = MagicMock()
+        client.server_jid = jid.JID("server.example.com")
+        client.a_send = AsyncMock()
+
+        mess_data = MessageData(
+            {
+                "xml": domish.Element(("jabber:client", "message")),
+            }
+        )
+        to_example_addr = AddressType(jid=jid.JID("to@example.com"))
+        cc_example_addr = AddressType(jid=jid.JID("cc@example.com"))
+        bcc_example_addr = AddressType(jid=jid.JID("bcc@other.com"))
+
+        addr_data = AddressesData(
+            to=[to_example_addr],
+            cc=[cc_example_addr],
+            bcc=[bcc_example_addr],
+        )
+
+        domains = {
+            "example.com": [
+                to_example_addr,
+                cc_example_addr,
+            ],
+            "other.com": [bcc_example_addr],
+        }
+
+        xep_0033.host.memory.disco.has_feature = AsyncMock(return_value=True)
+
+        await xep_0033.deliver_messages(client, mess_data, addr_data, domains)
+
+        # Check that messages were sent to the multicast domain and individual recipients
+        assert client.a_send.call_count == 2
+        calls = client.a_send.call_args_list
+        # First call is to the multicast service.
+        assert calls[0][0][0]["to"] == "example.com"
+        # Second call is the individual BCC.
+        assert calls[1][0][0]["to"] == "bcc@other.com"
+
+        # Everything must have been delivered.
+        assert all(address.delivered for address in addr_data.addresses)
+
+        # And BCC must have been removed.
+        assert addr_data.bcc is None
+
+    @ed
+    async def test_deliver_messages_multicast_only(self):
+        """Delivery is done only to multicast services."""
+        xep_0033 = XEP_0033(MagicMock())
+        client = MagicMock()
+        client.server_jid = jid.JID("server.example.com")
+        client.a_send = AsyncMock()
+
+        mess_data = MessageData(
+            {
+                "xml": domish.Element(("jabber:client", "message")),
+            }
+        )
+        to_example_addr = AddressType(jid=jid.JID("to@example.com"))
+        cc_example_addr = AddressType(jid=jid.JID("cc@example.com"))
+        bcc_example_addr = AddressType(jid=jid.JID("bcc@other.com"))
+
+        addr_data = AddressesData(
+            to=[to_example_addr],
+            cc=[cc_example_addr],
+            bcc=[bcc_example_addr],
+        )
+
+        domains = {
+            "example.com": [
+                to_example_addr,
+                cc_example_addr,
+            ],
+            "other.com": [bcc_example_addr],
+        }
+
+        xep_0033.host.memory.disco.has_feature = AsyncMock(return_value=True)
+
+        await xep_0033.deliver_messages(
+            client, mess_data, addr_data, domains, multicast_only=True
+        )
+
+        # Check that only the multicast message was sent
+        assert client.a_send.call_count == 1
+        assert client.a_send.call_args[0][0]["to"] == "example.com"
+
+        # Check that only addresses from the multicast domain are marked as delivered
+        assert addr_data.to and addr_data.to[0].delivered is True
+        assert addr_data.cc and addr_data.cc[0].delivered is True
+        assert addr_data.bcc and addr_data.bcc[0].delivered is None