view tests/unit/test_plugin_xep_0131.py @ 4351:6a0a081485b8

plugin autocrypt: Autocrypt protocol implementation: Implementation of autocrypt: `autocrypt` header is checked, and if present and no public key is known for the peer, the key is imported. `autocrypt` header is also added to outgoing message (only if an email gateway is detected). For the moment, the JID is use as identifier, but the real email used by gateway should be used in the future. rel 456
author Goffi <goffi@goffi.org>
date Fri, 28 Feb 2025 09:23:35 +0100
parents 8ee369e6eb99
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia: an XMPP client
# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from unittest.mock import MagicMock

from pytest_twisted import ensureDeferred as ed
from twisted.internet import defer
from twisted.words.xish import domish

from libervia.backend.models.core import MessageData
from libervia.backend.plugins.plugin_xep_0131 import (
    HeadersData,
    NS_SHIM,
    XEP_0131,
    Urgency,
    Priority,
)


class TestXEP0131:

    def test_headers_data_to_element(self):
        """HeadersData instance is correctly converted to an XML element"""
        headers = HeadersData(
            keywords="test,keyword",
            urgency=Urgency.high,
            priority=Priority.urgent,
            custom_header="custom_value",  # type: ignore
        )
        headers_elt = headers.to_element()

        assert headers_elt.uri == NS_SHIM
        assert headers_elt.name == "headers"
        header_elts = list(headers_elt.elements(NS_SHIM, "header"))
        assert len(header_elts) == 4

        header_names = set()
        for header_elt in header_elts:
            header_names.add(header_elt["name"])
            if header_elt["name"] == "keywords":
                assert str(header_elt) == "test,keyword"
            elif header_elt["name"] == "urgency":
                assert str(header_elt) == "high"
            elif header_elt["name"] == "priority":
                assert str(header_elt) == "urgent"
            elif header_elt["name"] == "custom_header":
                assert str(header_elt) == "custom_value"

        assert header_names == {"keywords", "urgency", "priority", "custom_header"}

    def test_headers_data_from_element(self):
        """HeadersData instance is correctly created from an XML element"""
        headers_elt = domish.Element((NS_SHIM, "headers"))
        keywords_elt = headers_elt.addElement("header")
        keywords_elt["name"] = "keywords"
        keywords_elt.addContent("test,keyword")
        urgency_elt = headers_elt.addElement("header")
        urgency_elt["name"] = "urgency"
        urgency_elt.addContent("high")
        custom_elt = headers_elt.addElement("header")
        custom_elt["name"] = "custom_header"
        custom_elt.addContent("custom_value")

        headers = HeadersData.from_element(headers_elt)

        assert headers.keywords == "test,keyword"
        assert headers.urgency == Urgency.high
        assert headers.priority is None
        assert headers.custom_header == "custom_value"  # type: ignore

    def test_quote_value(self):
        """Values are correctly quoted when necessary"""
        xep_0131 = XEP_0131(MagicMock())

        assert xep_0131.quote_value("simple") == "simple"
        assert xep_0131.quote_value("with space") == '"with space"'
        assert xep_0131.quote_value('with "quotes"') == '"with \\"quotes\\""'
        assert xep_0131.quote_value("with,comma") == '"with,comma"'

    def test_unquote_values(self):
        """Raw header values are correctly unquoted"""
        xep_0131 = XEP_0131(MagicMock())

        assert xep_0131.unquote_values("simple,another") == ["simple", "another"]
        assert xep_0131.unquote_values('"quoted value",simple') == [
            "quoted value",
            "simple",
        ]
        assert xep_0131.unquote_values('"with,comma",simple') == ["with,comma", "simple"]
        assert xep_0131.unquote_values('"with \\"quotes\\"",simple') == [
            'with "quotes"',
            "simple",
        ]

    @ed
    async def test_send_message_trigger(self):
        """Headers are correctly added to the message when sending"""
        xep_0131 = XEP_0131(MagicMock())
        client = MagicMock()

        mess_data = MessageData(
            {
                "extra": {
                    "keywords": ["test", "keyword"],
                    "headers": {"urgency": "high", "custom_header": "custom_value"},
                },
                "xml": domish.Element(("jabber:client", "message")),
            }
        )

        pre_xml_treatments = MagicMock()
        post_xml_treatments = defer.Deferred()

        result = xep_0131.send_message_trigger(
            client, mess_data, pre_xml_treatments, post_xml_treatments
        )
        assert result is True

        post_xml_treatments.callback(mess_data)
        await post_xml_treatments

        # Check that headers were added to the XML
        headers_elt = next(mess_data["xml"].elements(NS_SHIM, "headers"))

        header_names = set()
        for header_elt in headers_elt.elements(NS_SHIM, "header"):
            header_names.add(header_elt["name"])
            if header_elt["name"] == "keywords":
                assert str(header_elt) == "test,keyword"
            elif header_elt["name"] == "urgency":
                assert str(header_elt) == "high"
            elif header_elt["name"] == "custom_header":
                assert str(header_elt) == "custom_value"

        assert header_names == {"keywords", "urgency", "custom_header"}

    @ed
    async def test_message_received_trigger(self):
        """Headers are correctly parsed from received messages"""
        xep_0131 = XEP_0131(MagicMock())
        client = MagicMock()

        message_elt = domish.Element(("jabber:client", "message"))
        headers_elt = message_elt.addElement((NS_SHIM, "headers"))
        keywords_elt = headers_elt.addElement("header")
        keywords_elt["name"] = "keywords"
        keywords_elt.addContent('test,"keyword with space"')
        urgency_elt = headers_elt.addElement("header")
        urgency_elt["name"] = "urgency"
        urgency_elt.addContent("high")

        post_treat = defer.Deferred()

        result = xep_0131.message_received_trigger(client, message_elt, post_treat)

        assert result is True

        mess_data = MessageData({"extra": {}})
        post_treat.callback(mess_data)
        await post_treat

        assert mess_data["extra"]["keywords"] == ["test", "keyword with space"]
        headers = mess_data["extra"]["headers"]
        assert "keywords" not in headers
        assert headers["urgency"] == "high"