view tests/unit/test_plugin_xep_0033.py @ 4351:6a0a081485b8

plugin autocrypt: Autocrypt protocol implementation: Implementation of autocrypt: `autocrypt` header is checked, and if present and no public key is known for the peer, the key is imported. `autocrypt` header is also added to outgoing message (only if an email gateway is detected). For the moment, the JID is use as identifier, but the real email used by gateway should be used in the future. rel 456
author Goffi <goffi@goffi.org>
date Fri, 28 Feb 2025 09:23:35 +0100
parents 6a0155f410bd
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia: an XMPP client
# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import cast
from unittest.mock import AsyncMock, MagicMock, patch

from pytest_twisted import ensureDeferred as ed
from twisted.words.protocols.jabber import jid
from twisted.words.xish import domish

from libervia.backend.models.core import MessageData
from libervia.backend.plugins.plugin_xep_0033 import (
    AddressType,
    AddressesData,
    NS_ADDRESS,
    XEP_0033,
)


class TestXEP0033:
    def test_address_type_from_element(self):
        """
        AddressType instance is correctly created from an XML <address> element.
        """
        address_elt = domish.Element((NS_ADDRESS, "address"))
        address_elt["jid"] = "test@example.com"
        address_elt["desc"] = "Test Description"
        address_elt["delivered"] = "true"
        address = AddressType.from_element(address_elt)
        assert address.jid == jid.JID("test@example.com")
        assert address.desc == "Test Description"
        assert address.delivered is True

    def test_address_type_to_element(self):
        """
        XML <address> element is correctly built from an AddressType instance.
        """
        address = AddressType(
            jid=jid.JID("test@example.com"),
            desc="Test Description",
            delivered=True,
        )
        address_elt = address.to_element()
        assert address_elt.uri == NS_ADDRESS
        assert address_elt.name == "address"
        assert address_elt["jid"] == "test@example.com"
        assert address_elt["desc"] == "Test Description"
        assert address_elt["delivered"] == "true"

    def test_addresses_data_from_element(self):
        """
        AddressesData instance is correctly created from an XML <addresses> element.
        """
        addresses_elt = domish.Element((NS_ADDRESS, "addresses"))
        address_elt1 = addresses_elt.addElement("address")
        address_elt1["type"] = "to"
        address_elt1["jid"] = "test1@example.com"
        address_elt2 = addresses_elt.addElement("address")
        address_elt2["type"] = "cc"
        address_elt2["jid"] = "test2@example.com"
        address_elt3 = addresses_elt.addElement("address")
        address_elt3["type"] = "bcc"
        address_elt3["jid"] = "test3@example.com"
        address_elt4 = addresses_elt.addElement("address")
        address_elt4["type"] = "noreply"
        addresses = AddressesData.from_element(addresses_elt)
        assert addresses.to is not None and len(addresses.to) == 1
        assert addresses.to[0].jid == jid.JID("test1@example.com")
        assert addresses.cc is not None and len(addresses.cc) == 1
        assert addresses.cc[0].jid == jid.JID("test2@example.com")
        assert addresses.bcc is not None and len(addresses.bcc) == 1
        assert addresses.bcc[0].jid == jid.JID("test3@example.com")
        assert addresses.noreply

    def test_addresses_data_to_element(self):
        """
        XML <addresses> element is correctly built from an AddressesData instance.
        """
        addresses = AddressesData(
            to=[AddressType(jid=jid.JID("test1@example.com"))],
            cc=[AddressType(jid=jid.JID("test2@example.com"))],
            bcc=[AddressType(jid=jid.JID("test3@example.com"))],
            noreply=True,
        )
        addresses_elt = addresses.to_element()
        assert addresses_elt.uri == NS_ADDRESS
        assert addresses_elt.name == "addresses"
        assert len(addresses_elt.children) == 4
        for elt in addresses_elt.children:
            assert elt.uri == NS_ADDRESS
            assert elt.name == "address"
        assert addresses_elt.children[0]["type"] == "to"
        assert addresses_elt.children[0]["jid"] == "test1@example.com"
        assert addresses_elt.children[1]["type"] == "cc"
        assert addresses_elt.children[1]["jid"] == "test2@example.com"
        assert addresses_elt.children[2]["type"] == "bcc"
        assert addresses_elt.children[2]["jid"] == "test3@example.com"
        assert addresses_elt.children[3]["type"] == "noreply"

    @ed
    async def test_handle_addresses(self):
        """
        Server JID is used, <addresses> element is added and messages are delivered.
        """
        xep_0033 = XEP_0033(MagicMock())
        client = MagicMock()
        client.server_jid = jid.JID("server.example.com")
        client.profile = "test_profile"

        mess_data = MessageData(
            {
                "to": jid.JID("recipient@example.com"),
                "extra": {
                    "addresses": {
                        "to": [{"jid": "to@example.com"}],
                        "cc": [{"jid": "cc@example.com"}],
                    }
                },
                "xml": domish.Element(("jabber:client", "message")),
            }
        )

        xep_0033.host.memory.disco.has_feature = AsyncMock(return_value=True)
        xep_0033.deliver_messages = AsyncMock()

        with patch.object(xep_0033, "_stop_if_all_delivered", AsyncMock()):
            result = await xep_0033._handle_addresses(client, mess_data)
            cast(AsyncMock, xep_0033._stop_if_all_delivered).assert_called_once()

        assert result["to"] == client.server_jid
        assert "addresses" in result["xml"].children[0].name
        xep_0033.deliver_messages.assert_called_once()

    @ed
    async def test_deliver_messages(self):
        """Delivery is done for all recipients."""
        xep_0033 = XEP_0033(MagicMock())
        client = MagicMock()
        client.server_jid = jid.JID("server.example.com")
        client.a_send = AsyncMock()

        mess_data = MessageData(
            {
                "xml": domish.Element(("jabber:client", "message")),
            }
        )
        to_example_addr = AddressType(jid=jid.JID("to@example.com"))
        cc_example_addr = AddressType(jid=jid.JID("cc@example.com"))
        bcc_example_addr = AddressType(jid=jid.JID("bcc@other.com"))

        addr_data = AddressesData(
            to=[to_example_addr],
            cc=[cc_example_addr],
            bcc=[bcc_example_addr],
        )

        domains = {
            "example.com": [
                to_example_addr,
                cc_example_addr,
            ],
            "other.com": [bcc_example_addr],
        }

        xep_0033.host.memory.disco.has_feature = AsyncMock(return_value=True)

        await xep_0033.deliver_messages(client, mess_data, addr_data, domains)

        # Check that messages were sent to the multicast domain and individual recipients
        assert client.a_send.call_count == 2
        calls = client.a_send.call_args_list
        # First call is to the multicast service.
        assert calls[0][0][0]["to"] == "example.com"
        # Second call is the individual BCC.
        assert calls[1][0][0]["to"] == "bcc@other.com"

        # Everything must have been delivered.
        assert all(address.delivered for address in addr_data.addresses)

        # And BCC must have been removed.
        assert addr_data.bcc is None

    @ed
    async def test_deliver_messages_multicast_only(self):
        """Delivery is done only to multicast services."""
        xep_0033 = XEP_0033(MagicMock())
        client = MagicMock()
        client.server_jid = jid.JID("server.example.com")
        client.a_send = AsyncMock()

        mess_data = MessageData(
            {
                "xml": domish.Element(("jabber:client", "message")),
            }
        )
        to_example_addr = AddressType(jid=jid.JID("to@example.com"))
        cc_example_addr = AddressType(jid=jid.JID("cc@example.com"))
        bcc_example_addr = AddressType(jid=jid.JID("bcc@other.com"))

        addr_data = AddressesData(
            to=[to_example_addr],
            cc=[cc_example_addr],
            bcc=[bcc_example_addr],
        )

        domains = {
            "example.com": [
                to_example_addr,
                cc_example_addr,
            ],
            "other.com": [bcc_example_addr],
        }

        xep_0033.host.memory.disco.has_feature = AsyncMock(return_value=True)

        await xep_0033.deliver_messages(
            client, mess_data, addr_data, domains, multicast_only=True
        )

        # Check that only the multicast message was sent
        assert client.a_send.call_count == 1
        assert client.a_send.call_args[0][0]["to"] == "example.com"

        # Check that only addresses from the multicast domain are marked as delivered
        assert addr_data.to and addr_data.to[0].delivered is True
        assert addr_data.cc and addr_data.cc[0].delivered is True
        assert addr_data.bcc and addr_data.bcc[0].delivered is None