# HG changeset patch # User Goffi # Date 1727359921 -7200 # Node ID 6a0155f410bd421539449dc2b92396a9f7a48291 # Parent 94e0968987cde2f1b3350e9cc62a545a9ba7c144 test (unit): add test for plugin XEP-0033: those replace the legacy XEP-0033 test from libervia/backend/test/test_plugin_xep_0033.py. rel 450 diff -r 94e0968987cd -r 6a0155f410bd libervia/backend/test/test_plugin_xep_0033.py --- a/libervia/backend/test/test_plugin_xep_0033.py Thu Sep 26 16:12:01 2024 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,213 +0,0 @@ -#!/usr/bin/env python3 - - -# SAT: a jabber client -# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) -# Copyright (C) 2013-2016 Adrien Cossa (souliane@mailoo.org) - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. - -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . - -""" Plugin extended addressing stanzas """ - -from .constants import Const -from libervia.backend.test import helpers -from libervia.backend.plugins import plugin_xep_0033 as plugin -from libervia.backend.core.exceptions import CancelError -from twisted.internet import defer -from wokkel.generic import parseXml -from twisted.words.protocols.jabber.jid import JID - -PROFILE_INDEX = 0 -PROFILE = Const.PROFILE[PROFILE_INDEX] -JID_STR_FROM = Const.JID_STR[1] -JID_STR_TO = Const.PROFILE_DICT[PROFILE].host -JID_STR_X_TO = Const.JID_STR[0] -JID_STR_X_CC = Const.JID_STR[1] -JID_STR_X_BCC = Const.JID_STR[2] - -ADDRS = ("to", JID_STR_X_TO, "cc", JID_STR_X_CC, "bcc", JID_STR_X_BCC) - - -class XEP_0033Test(helpers.SatTestCase): - def setUp(self): - self.host = helpers.FakeSAT() - self.plugin = plugin.XEP_0033(self.host) - - def test_message_received(self): - self.host.memory.reinit() - xml = """ - - test - -
-
-
- - - """ % ( - JID_STR_FROM, - JID_STR_TO, - JID_STR_X_TO, - JID_STR_X_CC, - JID_STR_X_BCC, - ) - stanza = parseXml(xml.encode("utf-8")) - treatments = defer.Deferred() - self.plugin.message_received_trigger( - self.host.get_client(PROFILE), stanza, treatments - ) - data = {"extra": {}} - - def cb(data): - expected = ("to", JID_STR_X_TO, "cc", JID_STR_X_CC, "bcc", JID_STR_X_BCC) - msg = "Expected: %s\nGot: %s" % (expected, data["extra"]["addresses"]) - self.assertEqual( - data["extra"]["addresses"], "%s:%s\n%s:%s\n%s:%s\n" % expected, msg - ) - - treatments.addCallback(cb) - return treatments.callback(data) - - def _get_mess_data(self): - mess_data = { - "to": JID(JID_STR_TO), - "type": "chat", - "message": "content", - "extra": {}, - } - mess_data["extra"]["address"] = "%s:%s\n%s:%s\n%s:%s\n" % ADDRS - original_stanza = """ - - content - - """ % ( - JID_STR_FROM, - JID_STR_TO, - ) - mess_data["xml"] = parseXml(original_stanza.encode("utf-8")) - return mess_data - - def _assert_addresses(self, mess_data): - """The mess_data that we got here has been modified by self.plugin.messageSendTrigger, - check that the addresses element has been added to the stanza.""" - expected = self._get_mess_data()["xml"] - addresses_extra = ( - """ - -
-
-
- """ - % ADDRS - ) - addresses_element = parseXml(addresses_extra.encode("utf-8")) - expected.addChild(addresses_element) - self.assert_equal_xml( - mess_data["xml"].toXml().encode("utf-8"), expected.toXml().encode("utf-8") - ) - - def _check_sent_and_stored(self): - """Check that all the recipients got their messages and that the history has been filled. - /!\ see the comments in XEP_0033.send_and_store_message""" - sent = [] - stored = [] - d_list = [] - - def cb(entities, to_jid): - if host in entities: - if ( - host not in sent - ): # send the message to the entity offering the feature - sent.append(host) - stored.append(host) - stored.append(to_jid) # store in history for each recipient - else: # feature not supported, use normal behavior - sent.append(to_jid) - stored.append(to_jid) - helpers.unmute_logging() - - for to_s in (JID_STR_X_TO, JID_STR_X_CC, JID_STR_X_BCC): - to_jid = JID(to_s) - host = JID(to_jid.host) - helpers.mute_logging() - d = self.host.find_features_set( - [plugin.NS_ADDRESS], jid_=host, profile=PROFILE - ) - d.addCallback(cb, to_jid) - d_list.append(d) - - def cb_list(__): - msg = "/!\ see the comments in XEP_0033.send_and_store_message" - sent_recipients = [ - JID(elt["to"]) for elt in self.host.get_sent_messages(PROFILE_INDEX) - ] - self.assert_equal_unsorted_list(sent_recipients, sent, msg) - self.assert_equal_unsorted_list(self.host.stored_messages, stored, msg) - - return defer.DeferredList(d_list).addCallback(cb_list) - - def _trigger(self, data): - """Execute self.plugin.messageSendTrigger with a different logging - level to not pollute the output, then check that the plugin did its - job. It should abort sending the message or add the extended - addressing information to the stanza. - @param data: the data to be processed by self.plugin.messageSendTrigger - """ - pre_treatments = defer.Deferred() - post_treatments = defer.Deferred() - helpers.mute_logging() - self.plugin.messageSendTrigger( - self.host.get_client[PROFILE], data, pre_treatments, post_treatments - ) - post_treatments.callback(data) - helpers.unmute_logging() - post_treatments.addCallbacks( - self._assert_addresses, lambda failure: failure.trap(CancelError) - ) - return post_treatments - - def test_message_send_trigger_feature_not_supported(self): - # feature is not supported, abort the message - self.host.memory.reinit() - data = self._get_mess_data() - return self._trigger(data) - - def test_message_send_trigger_feature_supported(self): - # feature is supported by the main target server - self.host.reinit() - self.host.add_feature(JID(JID_STR_TO), plugin.NS_ADDRESS, PROFILE) - data = self._get_mess_data() - d = self._trigger(data) - return d.addCallback(lambda __: self._check_sent_and_stored()) - - def test_message_send_trigger_feature_fully_supported(self): - # feature is supported by all target servers - self.host.reinit() - self.host.add_feature(JID(JID_STR_TO), plugin.NS_ADDRESS, PROFILE) - for dest in (JID_STR_X_TO, JID_STR_X_CC, JID_STR_X_BCC): - self.host.add_feature(JID(JID(dest).host), plugin.NS_ADDRESS, PROFILE) - data = self._get_mess_data() - d = self._trigger(data) - return d.addCallback(lambda __: self._check_sent_and_stored()) - - def test_message_send_trigger_fix_wrong_entity(self): - # check that a wrong recipient entity is fixed by the backend - self.host.reinit() - self.host.add_feature(JID(JID_STR_TO), plugin.NS_ADDRESS, PROFILE) - for dest in (JID_STR_X_TO, JID_STR_X_CC, JID_STR_X_BCC): - self.host.add_feature(JID(JID(dest).host), plugin.NS_ADDRESS, PROFILE) - data = self._get_mess_data() - data["to"] = JID(JID_STR_X_TO) - d = self._trigger(data) - return d.addCallback(lambda __: self._check_sent_and_stored()) diff -r 94e0968987cd -r 6a0155f410bd tests/unit/test_plugin_xep_0033.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/unit/test_plugin_xep_0033.py Thu Sep 26 16:12:01 2024 +0200 @@ -0,0 +1,242 @@ +#!/usr/bin/env python3 + +# Libervia: an XMPP client +# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from typing import cast +from unittest.mock import AsyncMock, MagicMock, patch + +from pytest_twisted import ensureDeferred as ed +from twisted.words.protocols.jabber import jid +from twisted.words.xish import domish + +from libervia.backend.models.core import MessageData +from libervia.backend.plugins.plugin_xep_0033 import ( + AddressType, + AddressesData, + NS_ADDRESS, + XEP_0033, +) + + +class TestXEP0033: + def test_address_type_from_element(self): + """ + AddressType instance is correctly created from an XML
element. + """ + address_elt = domish.Element((NS_ADDRESS, "address")) + address_elt["jid"] = "test@example.com" + address_elt["desc"] = "Test Description" + address_elt["delivered"] = "true" + address = AddressType.from_element(address_elt) + assert address.jid == jid.JID("test@example.com") + assert address.desc == "Test Description" + assert address.delivered is True + + def test_address_type_to_element(self): + """ + XML
element is correctly built from an AddressType instance. + """ + address = AddressType( + jid=jid.JID("test@example.com"), + desc="Test Description", + delivered=True, + ) + address_elt = address.to_element() + assert address_elt.uri == NS_ADDRESS + assert address_elt.name == "address" + assert address_elt["jid"] == "test@example.com" + assert address_elt["desc"] == "Test Description" + assert address_elt["delivered"] == "true" + + def test_addresses_data_from_element(self): + """ + AddressesData instance is correctly created from an XML element. + """ + addresses_elt = domish.Element((NS_ADDRESS, "addresses")) + address_elt1 = addresses_elt.addElement("address") + address_elt1["type"] = "to" + address_elt1["jid"] = "test1@example.com" + address_elt2 = addresses_elt.addElement("address") + address_elt2["type"] = "cc" + address_elt2["jid"] = "test2@example.com" + address_elt3 = addresses_elt.addElement("address") + address_elt3["type"] = "bcc" + address_elt3["jid"] = "test3@example.com" + address_elt4 = addresses_elt.addElement("address") + address_elt4["type"] = "noreply" + addresses = AddressesData.from_element(addresses_elt) + assert addresses.to is not None and len(addresses.to) == 1 + assert addresses.to[0].jid == jid.JID("test1@example.com") + assert addresses.cc is not None and len(addresses.cc) == 1 + assert addresses.cc[0].jid == jid.JID("test2@example.com") + assert addresses.bcc is not None and len(addresses.bcc) == 1 + assert addresses.bcc[0].jid == jid.JID("test3@example.com") + assert addresses.noreply + + def test_addresses_data_to_element(self): + """ + XML element is correctly built from an AddressesData instance. + """ + addresses = AddressesData( + to=[AddressType(jid=jid.JID("test1@example.com"))], + cc=[AddressType(jid=jid.JID("test2@example.com"))], + bcc=[AddressType(jid=jid.JID("test3@example.com"))], + noreply=True, + ) + addresses_elt = addresses.to_element() + assert addresses_elt.uri == NS_ADDRESS + assert addresses_elt.name == "addresses" + assert len(addresses_elt.children) == 4 + for elt in addresses_elt.children: + assert elt.uri == NS_ADDRESS + assert elt.name == "address" + assert addresses_elt.children[0]["type"] == "to" + assert addresses_elt.children[0]["jid"] == "test1@example.com" + assert addresses_elt.children[1]["type"] == "cc" + assert addresses_elt.children[1]["jid"] == "test2@example.com" + assert addresses_elt.children[2]["type"] == "bcc" + assert addresses_elt.children[2]["jid"] == "test3@example.com" + assert addresses_elt.children[3]["type"] == "noreply" + + @ed + async def test_handle_addresses(self): + """ + Server JID is used, element is added and messages are delivered. + """ + xep_0033 = XEP_0033(MagicMock()) + client = MagicMock() + client.server_jid = jid.JID("server.example.com") + client.profile = "test_profile" + + mess_data = MessageData( + { + "to": jid.JID("recipient@example.com"), + "extra": { + "addresses": { + "to": [{"jid": "to@example.com"}], + "cc": [{"jid": "cc@example.com"}], + } + }, + "xml": domish.Element(("jabber:client", "message")), + } + ) + + xep_0033.host.memory.disco.has_feature = AsyncMock(return_value=True) + xep_0033.deliver_messages = AsyncMock() + + with patch.object(xep_0033, "_stop_if_all_delivered", AsyncMock()): + result = await xep_0033._handle_addresses(client, mess_data) + cast(AsyncMock, xep_0033._stop_if_all_delivered).assert_called_once() + + assert result["to"] == client.server_jid + assert "addresses" in result["xml"].children[0].name + xep_0033.deliver_messages.assert_called_once() + + @ed + async def test_deliver_messages(self): + """Delivery is done for all recipients.""" + xep_0033 = XEP_0033(MagicMock()) + client = MagicMock() + client.server_jid = jid.JID("server.example.com") + client.a_send = AsyncMock() + + mess_data = MessageData( + { + "xml": domish.Element(("jabber:client", "message")), + } + ) + to_example_addr = AddressType(jid=jid.JID("to@example.com")) + cc_example_addr = AddressType(jid=jid.JID("cc@example.com")) + bcc_example_addr = AddressType(jid=jid.JID("bcc@other.com")) + + addr_data = AddressesData( + to=[to_example_addr], + cc=[cc_example_addr], + bcc=[bcc_example_addr], + ) + + domains = { + "example.com": [ + to_example_addr, + cc_example_addr, + ], + "other.com": [bcc_example_addr], + } + + xep_0033.host.memory.disco.has_feature = AsyncMock(return_value=True) + + await xep_0033.deliver_messages(client, mess_data, addr_data, domains) + + # Check that messages were sent to the multicast domain and individual recipients + assert client.a_send.call_count == 2 + calls = client.a_send.call_args_list + # First call is to the multicast service. + assert calls[0][0][0]["to"] == "example.com" + # Second call is the individual BCC. + assert calls[1][0][0]["to"] == "bcc@other.com" + + # Everything must have been delivered. + assert all(address.delivered for address in addr_data.addresses) + + # And BCC must have been removed. + assert addr_data.bcc is None + + @ed + async def test_deliver_messages_multicast_only(self): + """Delivery is done only to multicast services.""" + xep_0033 = XEP_0033(MagicMock()) + client = MagicMock() + client.server_jid = jid.JID("server.example.com") + client.a_send = AsyncMock() + + mess_data = MessageData( + { + "xml": domish.Element(("jabber:client", "message")), + } + ) + to_example_addr = AddressType(jid=jid.JID("to@example.com")) + cc_example_addr = AddressType(jid=jid.JID("cc@example.com")) + bcc_example_addr = AddressType(jid=jid.JID("bcc@other.com")) + + addr_data = AddressesData( + to=[to_example_addr], + cc=[cc_example_addr], + bcc=[bcc_example_addr], + ) + + domains = { + "example.com": [ + to_example_addr, + cc_example_addr, + ], + "other.com": [bcc_example_addr], + } + + xep_0033.host.memory.disco.has_feature = AsyncMock(return_value=True) + + await xep_0033.deliver_messages( + client, mess_data, addr_data, domains, multicast_only=True + ) + + # Check that only the multicast message was sent + assert client.a_send.call_count == 1 + assert client.a_send.call_args[0][0]["to"] == "example.com" + + # Check that only addresses from the multicast domain are marked as delivered + assert addr_data.to and addr_data.to[0].delivered is True + assert addr_data.cc and addr_data.cc[0].delivered is True + assert addr_data.bcc and addr_data.bcc[0].delivered is None