# HG changeset patch # User Goffi # Date 1727531823 -7200 # Node ID 8ee369e6eb9969daa3827795623f91aa2ced0972 # Parent 6a70fcd93a7a9fff879f6922ecbf4a10ffa60623 tests (unit): add XEP-0131 tests: rel 451 diff -r 6a70fcd93a7a -r 8ee369e6eb99 tests/unit/test_plugin_xep_0131.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/unit/test_plugin_xep_0131.py Sat Sep 28 15:57:03 2024 +0200 @@ -0,0 +1,180 @@ +#!/usr/bin/env python3 + +# Libervia: an XMPP client +# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from unittest.mock import MagicMock + +from pytest_twisted import ensureDeferred as ed +from twisted.internet import defer +from twisted.words.xish import domish + +from libervia.backend.models.core import MessageData +from libervia.backend.plugins.plugin_xep_0131 import ( + HeadersData, + NS_SHIM, + XEP_0131, + Urgency, + Priority, +) + + +class TestXEP0131: + + def test_headers_data_to_element(self): + """HeadersData instance is correctly converted to an XML element""" + headers = HeadersData( + keywords="test,keyword", + urgency=Urgency.high, + priority=Priority.urgent, + custom_header="custom_value", # type: ignore + ) + headers_elt = headers.to_element() + + assert headers_elt.uri == NS_SHIM + assert headers_elt.name == "headers" + header_elts = list(headers_elt.elements(NS_SHIM, "header")) + assert len(header_elts) == 4 + + header_names = set() + for header_elt in header_elts: + header_names.add(header_elt["name"]) + if header_elt["name"] == "keywords": + assert str(header_elt) == "test,keyword" + elif header_elt["name"] == "urgency": + assert str(header_elt) == "high" + elif header_elt["name"] == "priority": + assert str(header_elt) == "urgent" + elif header_elt["name"] == "custom_header": + assert str(header_elt) == "custom_value" + + assert header_names == {"keywords", "urgency", "priority", "custom_header"} + + def test_headers_data_from_element(self): + """HeadersData instance is correctly created from an XML element""" + headers_elt = domish.Element((NS_SHIM, "headers")) + keywords_elt = headers_elt.addElement("header") + keywords_elt["name"] = "keywords" + keywords_elt.addContent("test,keyword") + urgency_elt = headers_elt.addElement("header") + urgency_elt["name"] = "urgency" + urgency_elt.addContent("high") + custom_elt = headers_elt.addElement("header") + custom_elt["name"] = "custom_header" + custom_elt.addContent("custom_value") + + headers = HeadersData.from_element(headers_elt) + + assert headers.keywords == "test,keyword" + assert headers.urgency == Urgency.high + assert headers.priority is None + assert headers.custom_header == "custom_value" # type: ignore + + def test_quote_value(self): + """Values are correctly quoted when necessary""" + xep_0131 = XEP_0131(MagicMock()) + + assert xep_0131.quote_value("simple") == "simple" + assert xep_0131.quote_value("with space") == '"with space"' + assert xep_0131.quote_value('with "quotes"') == '"with \\"quotes\\""' + assert xep_0131.quote_value("with,comma") == '"with,comma"' + + def test_unquote_values(self): + """Raw header values are correctly unquoted""" + xep_0131 = XEP_0131(MagicMock()) + + assert xep_0131.unquote_values("simple,another") == ["simple", "another"] + assert xep_0131.unquote_values('"quoted value",simple') == [ + "quoted value", + "simple", + ] + assert xep_0131.unquote_values('"with,comma",simple') == ["with,comma", "simple"] + assert xep_0131.unquote_values('"with \\"quotes\\"",simple') == [ + 'with "quotes"', + "simple", + ] + + @ed + async def test_send_message_trigger(self): + """Headers are correctly added to the message when sending""" + xep_0131 = XEP_0131(MagicMock()) + client = MagicMock() + + mess_data = MessageData( + { + "extra": { + "keywords": ["test", "keyword"], + "headers": {"urgency": "high", "custom_header": "custom_value"}, + }, + "xml": domish.Element(("jabber:client", "message")), + } + ) + + pre_xml_treatments = MagicMock() + post_xml_treatments = defer.Deferred() + + result = xep_0131.send_message_trigger( + client, mess_data, pre_xml_treatments, post_xml_treatments + ) + assert result is True + + post_xml_treatments.callback(mess_data) + await post_xml_treatments + + # Check that headers were added to the XML + headers_elt = next(mess_data["xml"].elements(NS_SHIM, "headers")) + + header_names = set() + for header_elt in headers_elt.elements(NS_SHIM, "header"): + header_names.add(header_elt["name"]) + if header_elt["name"] == "keywords": + assert str(header_elt) == "test,keyword" + elif header_elt["name"] == "urgency": + assert str(header_elt) == "high" + elif header_elt["name"] == "custom_header": + assert str(header_elt) == "custom_value" + + assert header_names == {"keywords", "urgency", "custom_header"} + + @ed + async def test_message_received_trigger(self): + """Headers are correctly parsed from received messages""" + xep_0131 = XEP_0131(MagicMock()) + client = MagicMock() + + message_elt = domish.Element(("jabber:client", "message")) + headers_elt = message_elt.addElement((NS_SHIM, "headers")) + keywords_elt = headers_elt.addElement("header") + keywords_elt["name"] = "keywords" + keywords_elt.addContent('test,"keyword with space"') + urgency_elt = headers_elt.addElement("header") + urgency_elt["name"] = "urgency" + urgency_elt.addContent("high") + + post_treat = defer.Deferred() + + result = xep_0131.message_received_trigger(client, message_elt, post_treat) + + assert result is True + + mess_data = MessageData({"extra": {}}) + post_treat.callback(mess_data) + await post_treat + + assert mess_data["extra"]["keywords"] == ["test", "keyword with space"] + headers = mess_data["extra"]["headers"] + assert "keywords" not in headers + assert headers["urgency"] == "high"