Mercurial > libervia-backend
diff tests/unit/test_plugin_xep_0033.py @ 4307:6a0155f410bd
test (unit): add test for plugin XEP-0033:
those replace the legacy XEP-0033 test from libervia/backend/test/test_plugin_xep_0033.py.
rel 450
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 26 Sep 2024 16:12:01 +0200 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/unit/test_plugin_xep_0033.py Thu Sep 26 16:12:01 2024 +0200 @@ -0,0 +1,242 @@ +#!/usr/bin/env python3 + +# Libervia: an XMPP client +# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from typing import cast +from unittest.mock import AsyncMock, MagicMock, patch + +from pytest_twisted import ensureDeferred as ed +from twisted.words.protocols.jabber import jid +from twisted.words.xish import domish + +from libervia.backend.models.core import MessageData +from libervia.backend.plugins.plugin_xep_0033 import ( + AddressType, + AddressesData, + NS_ADDRESS, + XEP_0033, +) + + +class TestXEP0033: + def test_address_type_from_element(self): + """ + AddressType instance is correctly created from an XML <address> element. + """ + address_elt = domish.Element((NS_ADDRESS, "address")) + address_elt["jid"] = "test@example.com" + address_elt["desc"] = "Test Description" + address_elt["delivered"] = "true" + address = AddressType.from_element(address_elt) + assert address.jid == jid.JID("test@example.com") + assert address.desc == "Test Description" + assert address.delivered is True + + def test_address_type_to_element(self): + """ + XML <address> element is correctly built from an AddressType instance. + """ + address = AddressType( + jid=jid.JID("test@example.com"), + desc="Test Description", + delivered=True, + ) + address_elt = address.to_element() + assert address_elt.uri == NS_ADDRESS + assert address_elt.name == "address" + assert address_elt["jid"] == "test@example.com" + assert address_elt["desc"] == "Test Description" + assert address_elt["delivered"] == "true" + + def test_addresses_data_from_element(self): + """ + AddressesData instance is correctly created from an XML <addresses> element. + """ + addresses_elt = domish.Element((NS_ADDRESS, "addresses")) + address_elt1 = addresses_elt.addElement("address") + address_elt1["type"] = "to" + address_elt1["jid"] = "test1@example.com" + address_elt2 = addresses_elt.addElement("address") + address_elt2["type"] = "cc" + address_elt2["jid"] = "test2@example.com" + address_elt3 = addresses_elt.addElement("address") + address_elt3["type"] = "bcc" + address_elt3["jid"] = "test3@example.com" + address_elt4 = addresses_elt.addElement("address") + address_elt4["type"] = "noreply" + addresses = AddressesData.from_element(addresses_elt) + assert addresses.to is not None and len(addresses.to) == 1 + assert addresses.to[0].jid == jid.JID("test1@example.com") + assert addresses.cc is not None and len(addresses.cc) == 1 + assert addresses.cc[0].jid == jid.JID("test2@example.com") + assert addresses.bcc is not None and len(addresses.bcc) == 1 + assert addresses.bcc[0].jid == jid.JID("test3@example.com") + assert addresses.noreply + + def test_addresses_data_to_element(self): + """ + XML <addresses> element is correctly built from an AddressesData instance. + """ + addresses = AddressesData( + to=[AddressType(jid=jid.JID("test1@example.com"))], + cc=[AddressType(jid=jid.JID("test2@example.com"))], + bcc=[AddressType(jid=jid.JID("test3@example.com"))], + noreply=True, + ) + addresses_elt = addresses.to_element() + assert addresses_elt.uri == NS_ADDRESS + assert addresses_elt.name == "addresses" + assert len(addresses_elt.children) == 4 + for elt in addresses_elt.children: + assert elt.uri == NS_ADDRESS + assert elt.name == "address" + assert addresses_elt.children[0]["type"] == "to" + assert addresses_elt.children[0]["jid"] == "test1@example.com" + assert addresses_elt.children[1]["type"] == "cc" + assert addresses_elt.children[1]["jid"] == "test2@example.com" + assert addresses_elt.children[2]["type"] == "bcc" + assert addresses_elt.children[2]["jid"] == "test3@example.com" + assert addresses_elt.children[3]["type"] == "noreply" + + @ed + async def test_handle_addresses(self): + """ + Server JID is used, <addresses> element is added and messages are delivered. + """ + xep_0033 = XEP_0033(MagicMock()) + client = MagicMock() + client.server_jid = jid.JID("server.example.com") + client.profile = "test_profile" + + mess_data = MessageData( + { + "to": jid.JID("recipient@example.com"), + "extra": { + "addresses": { + "to": [{"jid": "to@example.com"}], + "cc": [{"jid": "cc@example.com"}], + } + }, + "xml": domish.Element(("jabber:client", "message")), + } + ) + + xep_0033.host.memory.disco.has_feature = AsyncMock(return_value=True) + xep_0033.deliver_messages = AsyncMock() + + with patch.object(xep_0033, "_stop_if_all_delivered", AsyncMock()): + result = await xep_0033._handle_addresses(client, mess_data) + cast(AsyncMock, xep_0033._stop_if_all_delivered).assert_called_once() + + assert result["to"] == client.server_jid + assert "addresses" in result["xml"].children[0].name + xep_0033.deliver_messages.assert_called_once() + + @ed + async def test_deliver_messages(self): + """Delivery is done for all recipients.""" + xep_0033 = XEP_0033(MagicMock()) + client = MagicMock() + client.server_jid = jid.JID("server.example.com") + client.a_send = AsyncMock() + + mess_data = MessageData( + { + "xml": domish.Element(("jabber:client", "message")), + } + ) + to_example_addr = AddressType(jid=jid.JID("to@example.com")) + cc_example_addr = AddressType(jid=jid.JID("cc@example.com")) + bcc_example_addr = AddressType(jid=jid.JID("bcc@other.com")) + + addr_data = AddressesData( + to=[to_example_addr], + cc=[cc_example_addr], + bcc=[bcc_example_addr], + ) + + domains = { + "example.com": [ + to_example_addr, + cc_example_addr, + ], + "other.com": [bcc_example_addr], + } + + xep_0033.host.memory.disco.has_feature = AsyncMock(return_value=True) + + await xep_0033.deliver_messages(client, mess_data, addr_data, domains) + + # Check that messages were sent to the multicast domain and individual recipients + assert client.a_send.call_count == 2 + calls = client.a_send.call_args_list + # First call is to the multicast service. + assert calls[0][0][0]["to"] == "example.com" + # Second call is the individual BCC. + assert calls[1][0][0]["to"] == "bcc@other.com" + + # Everything must have been delivered. + assert all(address.delivered for address in addr_data.addresses) + + # And BCC must have been removed. + assert addr_data.bcc is None + + @ed + async def test_deliver_messages_multicast_only(self): + """Delivery is done only to multicast services.""" + xep_0033 = XEP_0033(MagicMock()) + client = MagicMock() + client.server_jid = jid.JID("server.example.com") + client.a_send = AsyncMock() + + mess_data = MessageData( + { + "xml": domish.Element(("jabber:client", "message")), + } + ) + to_example_addr = AddressType(jid=jid.JID("to@example.com")) + cc_example_addr = AddressType(jid=jid.JID("cc@example.com")) + bcc_example_addr = AddressType(jid=jid.JID("bcc@other.com")) + + addr_data = AddressesData( + to=[to_example_addr], + cc=[cc_example_addr], + bcc=[bcc_example_addr], + ) + + domains = { + "example.com": [ + to_example_addr, + cc_example_addr, + ], + "other.com": [bcc_example_addr], + } + + xep_0033.host.memory.disco.has_feature = AsyncMock(return_value=True) + + await xep_0033.deliver_messages( + client, mess_data, addr_data, domains, multicast_only=True + ) + + # Check that only the multicast message was sent + assert client.a_send.call_count == 1 + assert client.a_send.call_args[0][0]["to"] == "example.com" + + # Check that only addresses from the multicast domain are marked as delivered + assert addr_data.to and addr_data.to[0].delivered is True + assert addr_data.cc and addr_data.cc[0].delivered is True + assert addr_data.bcc and addr_data.bcc[0].delivered is None