view tests/unit/test_plugin_xep_0033.py @ 4320:9658c534287e

plugin XEP-0215, XEP-0376: fix bad calls to `hasFeature`: `hasFeature` was called like blocking code, missing the `await`. This has been fixed, and is now using the `memory.disco.has_feature` version.
author Goffi <goffi@goffi.org>
date Mon, 30 Sep 2024 14:14:38 +0200
parents 6a0155f410bd
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia: an XMPP client
# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import cast
from unittest.mock import AsyncMock, MagicMock, patch

from pytest_twisted import ensureDeferred as ed
from twisted.words.protocols.jabber import jid
from twisted.words.xish import domish

from libervia.backend.models.core import MessageData
from libervia.backend.plugins.plugin_xep_0033 import (
    AddressType,
    AddressesData,
    NS_ADDRESS,
    XEP_0033,
)


class TestXEP0033:
    def test_address_type_from_element(self):
        """
        AddressType instance is correctly created from an XML <address> element.
        """
        address_elt = domish.Element((NS_ADDRESS, "address"))
        address_elt["jid"] = "test@example.com"
        address_elt["desc"] = "Test Description"
        address_elt["delivered"] = "true"
        address = AddressType.from_element(address_elt)
        assert address.jid == jid.JID("test@example.com")
        assert address.desc == "Test Description"
        assert address.delivered is True

    def test_address_type_to_element(self):
        """
        XML <address> element is correctly built from an AddressType instance.
        """
        address = AddressType(
            jid=jid.JID("test@example.com"),
            desc="Test Description",
            delivered=True,
        )
        address_elt = address.to_element()
        assert address_elt.uri == NS_ADDRESS
        assert address_elt.name == "address"
        assert address_elt["jid"] == "test@example.com"
        assert address_elt["desc"] == "Test Description"
        assert address_elt["delivered"] == "true"

    def test_addresses_data_from_element(self):
        """
        AddressesData instance is correctly created from an XML <addresses> element.
        """
        addresses_elt = domish.Element((NS_ADDRESS, "addresses"))
        address_elt1 = addresses_elt.addElement("address")
        address_elt1["type"] = "to"
        address_elt1["jid"] = "test1@example.com"
        address_elt2 = addresses_elt.addElement("address")
        address_elt2["type"] = "cc"
        address_elt2["jid"] = "test2@example.com"
        address_elt3 = addresses_elt.addElement("address")
        address_elt3["type"] = "bcc"
        address_elt3["jid"] = "test3@example.com"
        address_elt4 = addresses_elt.addElement("address")
        address_elt4["type"] = "noreply"
        addresses = AddressesData.from_element(addresses_elt)
        assert addresses.to is not None and len(addresses.to) == 1
        assert addresses.to[0].jid == jid.JID("test1@example.com")
        assert addresses.cc is not None and len(addresses.cc) == 1
        assert addresses.cc[0].jid == jid.JID("test2@example.com")
        assert addresses.bcc is not None and len(addresses.bcc) == 1
        assert addresses.bcc[0].jid == jid.JID("test3@example.com")
        assert addresses.noreply

    def test_addresses_data_to_element(self):
        """
        XML <addresses> element is correctly built from an AddressesData instance.
        """
        addresses = AddressesData(
            to=[AddressType(jid=jid.JID("test1@example.com"))],
            cc=[AddressType(jid=jid.JID("test2@example.com"))],
            bcc=[AddressType(jid=jid.JID("test3@example.com"))],
            noreply=True,
        )
        addresses_elt = addresses.to_element()
        assert addresses_elt.uri == NS_ADDRESS
        assert addresses_elt.name == "addresses"
        assert len(addresses_elt.children) == 4
        for elt in addresses_elt.children:
            assert elt.uri == NS_ADDRESS
            assert elt.name == "address"
        assert addresses_elt.children[0]["type"] == "to"
        assert addresses_elt.children[0]["jid"] == "test1@example.com"
        assert addresses_elt.children[1]["type"] == "cc"
        assert addresses_elt.children[1]["jid"] == "test2@example.com"
        assert addresses_elt.children[2]["type"] == "bcc"
        assert addresses_elt.children[2]["jid"] == "test3@example.com"
        assert addresses_elt.children[3]["type"] == "noreply"

    @ed
    async def test_handle_addresses(self):
        """
        Server JID is used, <addresses> element is added and messages are delivered.
        """
        xep_0033 = XEP_0033(MagicMock())
        client = MagicMock()
        client.server_jid = jid.JID("server.example.com")
        client.profile = "test_profile"

        mess_data = MessageData(
            {
                "to": jid.JID("recipient@example.com"),
                "extra": {
                    "addresses": {
                        "to": [{"jid": "to@example.com"}],
                        "cc": [{"jid": "cc@example.com"}],
                    }
                },
                "xml": domish.Element(("jabber:client", "message")),
            }
        )

        xep_0033.host.memory.disco.has_feature = AsyncMock(return_value=True)
        xep_0033.deliver_messages = AsyncMock()

        with patch.object(xep_0033, "_stop_if_all_delivered", AsyncMock()):
            result = await xep_0033._handle_addresses(client, mess_data)
            cast(AsyncMock, xep_0033._stop_if_all_delivered).assert_called_once()

        assert result["to"] == client.server_jid
        assert "addresses" in result["xml"].children[0].name
        xep_0033.deliver_messages.assert_called_once()

    @ed
    async def test_deliver_messages(self):
        """Delivery is done for all recipients."""
        xep_0033 = XEP_0033(MagicMock())
        client = MagicMock()
        client.server_jid = jid.JID("server.example.com")
        client.a_send = AsyncMock()

        mess_data = MessageData(
            {
                "xml": domish.Element(("jabber:client", "message")),
            }
        )
        to_example_addr = AddressType(jid=jid.JID("to@example.com"))
        cc_example_addr = AddressType(jid=jid.JID("cc@example.com"))
        bcc_example_addr = AddressType(jid=jid.JID("bcc@other.com"))

        addr_data = AddressesData(
            to=[to_example_addr],
            cc=[cc_example_addr],
            bcc=[bcc_example_addr],
        )

        domains = {
            "example.com": [
                to_example_addr,
                cc_example_addr,
            ],
            "other.com": [bcc_example_addr],
        }

        xep_0033.host.memory.disco.has_feature = AsyncMock(return_value=True)

        await xep_0033.deliver_messages(client, mess_data, addr_data, domains)

        # Check that messages were sent to the multicast domain and individual recipients
        assert client.a_send.call_count == 2
        calls = client.a_send.call_args_list
        # First call is to the multicast service.
        assert calls[0][0][0]["to"] == "example.com"
        # Second call is the individual BCC.
        assert calls[1][0][0]["to"] == "bcc@other.com"

        # Everything must have been delivered.
        assert all(address.delivered for address in addr_data.addresses)

        # And BCC must have been removed.
        assert addr_data.bcc is None

    @ed
    async def test_deliver_messages_multicast_only(self):
        """Delivery is done only to multicast services."""
        xep_0033 = XEP_0033(MagicMock())
        client = MagicMock()
        client.server_jid = jid.JID("server.example.com")
        client.a_send = AsyncMock()

        mess_data = MessageData(
            {
                "xml": domish.Element(("jabber:client", "message")),
            }
        )
        to_example_addr = AddressType(jid=jid.JID("to@example.com"))
        cc_example_addr = AddressType(jid=jid.JID("cc@example.com"))
        bcc_example_addr = AddressType(jid=jid.JID("bcc@other.com"))

        addr_data = AddressesData(
            to=[to_example_addr],
            cc=[cc_example_addr],
            bcc=[bcc_example_addr],
        )

        domains = {
            "example.com": [
                to_example_addr,
                cc_example_addr,
            ],
            "other.com": [bcc_example_addr],
        }

        xep_0033.host.memory.disco.has_feature = AsyncMock(return_value=True)

        await xep_0033.deliver_messages(
            client, mess_data, addr_data, domains, multicast_only=True
        )

        # Check that only the multicast message was sent
        assert client.a_send.call_count == 1
        assert client.a_send.call_args[0][0]["to"] == "example.com"

        # Check that only addresses from the multicast domain are marked as delivered
        assert addr_data.to and addr_data.to[0].delivered is True
        assert addr_data.cc and addr_data.cc[0].delivered is True
        assert addr_data.bcc and addr_data.bcc[0].delivered is None