view tests/unit/test_plugin_xep_0131.py @ 4320:9658c534287e

plugin XEP-0215, XEP-0376: fix bad calls to `hasFeature`: `hasFeature` was called like blocking code, missing the `await`. This has been fixed, and is now using the `memory.disco.has_feature` version.
author Goffi <goffi@goffi.org>
date Mon, 30 Sep 2024 14:14:38 +0200
parents 8ee369e6eb99
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia: an XMPP client
# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from unittest.mock import MagicMock

from pytest_twisted import ensureDeferred as ed
from twisted.internet import defer
from twisted.words.xish import domish

from libervia.backend.models.core import MessageData
from libervia.backend.plugins.plugin_xep_0131 import (
    HeadersData,
    NS_SHIM,
    XEP_0131,
    Urgency,
    Priority,
)


class TestXEP0131:

    def test_headers_data_to_element(self):
        """HeadersData instance is correctly converted to an XML element"""
        headers = HeadersData(
            keywords="test,keyword",
            urgency=Urgency.high,
            priority=Priority.urgent,
            custom_header="custom_value",  # type: ignore
        )
        headers_elt = headers.to_element()

        assert headers_elt.uri == NS_SHIM
        assert headers_elt.name == "headers"
        header_elts = list(headers_elt.elements(NS_SHIM, "header"))
        assert len(header_elts) == 4

        header_names = set()
        for header_elt in header_elts:
            header_names.add(header_elt["name"])
            if header_elt["name"] == "keywords":
                assert str(header_elt) == "test,keyword"
            elif header_elt["name"] == "urgency":
                assert str(header_elt) == "high"
            elif header_elt["name"] == "priority":
                assert str(header_elt) == "urgent"
            elif header_elt["name"] == "custom_header":
                assert str(header_elt) == "custom_value"

        assert header_names == {"keywords", "urgency", "priority", "custom_header"}

    def test_headers_data_from_element(self):
        """HeadersData instance is correctly created from an XML element"""
        headers_elt = domish.Element((NS_SHIM, "headers"))
        keywords_elt = headers_elt.addElement("header")
        keywords_elt["name"] = "keywords"
        keywords_elt.addContent("test,keyword")
        urgency_elt = headers_elt.addElement("header")
        urgency_elt["name"] = "urgency"
        urgency_elt.addContent("high")
        custom_elt = headers_elt.addElement("header")
        custom_elt["name"] = "custom_header"
        custom_elt.addContent("custom_value")

        headers = HeadersData.from_element(headers_elt)

        assert headers.keywords == "test,keyword"
        assert headers.urgency == Urgency.high
        assert headers.priority is None
        assert headers.custom_header == "custom_value"  # type: ignore

    def test_quote_value(self):
        """Values are correctly quoted when necessary"""
        xep_0131 = XEP_0131(MagicMock())

        assert xep_0131.quote_value("simple") == "simple"
        assert xep_0131.quote_value("with space") == '"with space"'
        assert xep_0131.quote_value('with "quotes"') == '"with \\"quotes\\""'
        assert xep_0131.quote_value("with,comma") == '"with,comma"'

    def test_unquote_values(self):
        """Raw header values are correctly unquoted"""
        xep_0131 = XEP_0131(MagicMock())

        assert xep_0131.unquote_values("simple,another") == ["simple", "another"]
        assert xep_0131.unquote_values('"quoted value",simple') == [
            "quoted value",
            "simple",
        ]
        assert xep_0131.unquote_values('"with,comma",simple') == ["with,comma", "simple"]
        assert xep_0131.unquote_values('"with \\"quotes\\"",simple') == [
            'with "quotes"',
            "simple",
        ]

    @ed
    async def test_send_message_trigger(self):
        """Headers are correctly added to the message when sending"""
        xep_0131 = XEP_0131(MagicMock())
        client = MagicMock()

        mess_data = MessageData(
            {
                "extra": {
                    "keywords": ["test", "keyword"],
                    "headers": {"urgency": "high", "custom_header": "custom_value"},
                },
                "xml": domish.Element(("jabber:client", "message")),
            }
        )

        pre_xml_treatments = MagicMock()
        post_xml_treatments = defer.Deferred()

        result = xep_0131.send_message_trigger(
            client, mess_data, pre_xml_treatments, post_xml_treatments
        )
        assert result is True

        post_xml_treatments.callback(mess_data)
        await post_xml_treatments

        # Check that headers were added to the XML
        headers_elt = next(mess_data["xml"].elements(NS_SHIM, "headers"))

        header_names = set()
        for header_elt in headers_elt.elements(NS_SHIM, "header"):
            header_names.add(header_elt["name"])
            if header_elt["name"] == "keywords":
                assert str(header_elt) == "test,keyword"
            elif header_elt["name"] == "urgency":
                assert str(header_elt) == "high"
            elif header_elt["name"] == "custom_header":
                assert str(header_elt) == "custom_value"

        assert header_names == {"keywords", "urgency", "custom_header"}

    @ed
    async def test_message_received_trigger(self):
        """Headers are correctly parsed from received messages"""
        xep_0131 = XEP_0131(MagicMock())
        client = MagicMock()

        message_elt = domish.Element(("jabber:client", "message"))
        headers_elt = message_elt.addElement((NS_SHIM, "headers"))
        keywords_elt = headers_elt.addElement("header")
        keywords_elt["name"] = "keywords"
        keywords_elt.addContent('test,"keyword with space"')
        urgency_elt = headers_elt.addElement("header")
        urgency_elt["name"] = "urgency"
        urgency_elt.addContent("high")

        post_treat = defer.Deferred()

        result = xep_0131.message_received_trigger(client, message_elt, post_treat)

        assert result is True

        mess_data = MessageData({"extra": {}})
        post_treat.callback(mess_data)
        await post_treat

        assert mess_data["extra"]["keywords"] == ["test", "keyword with space"]
        headers = mess_data["extra"]["headers"]
        assert "keywords" not in headers
        assert headers["urgency"] == "high"