Mercurial > libervia-backend
view tests/unit/test_plugin_xep_0373.py @ 4230:314d3c02bb67
core (xmpp): Add a timeout for messages processing to avoid blocking the queue.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 06 Apr 2024 12:21:04 +0200 |
parents | 4b842c1fb686 |
children | f1d0cde61af7 |
line wrap: on
line source
from datetime import datetime, timedelta, timezone import pytest from libervia.backend.core import exceptions try: from libervia.backend.plugins.plugin_xep_0373 import XEP_0373, NS_OX except exceptions.MissingModule as e: pytest.skip(f"Can't test XEP-0373: {e}", allow_module_level=True) from libervia.backend.tools.xmpp_datetime import parse_datetime from twisted.words.protocols.jabber import jid a = jid.JID("foo@example.com") b = jid.JID("bar@example.com") def test_signcrypt_element_args() -> None: with pytest.raises(ValueError): XEP_0373.build_signcrypt_element([]) def test_signcrypt_element() -> None: signcrypt_elt, payload_elt = XEP_0373.build_signcrypt_element([ a, b ]) payload_elt.addElement("signcrypt-test-content", content="signcrypt test content") rpad_elt = next(signcrypt_elt.elements(NS_OX, "rpad")) time_elt = next(signcrypt_elt.elements(NS_OX, "time")) rpad = str(rpad_elt) timestamp = parse_datetime(time_elt["stamp"]) signcrypt_elt.children.remove(rpad_elt) signcrypt_elt.children.remove(time_elt) assert rpad assert (datetime.now(timezone.utc) - timestamp) < timedelta(seconds=10) assert signcrypt_elt.toXml() == ( "<signcrypt xmlns='urn:xmpp:openpgp:0'>" "<to jid='foo@example.com'/>" "<to jid='bar@example.com'/>" "<payload>" "<signcrypt-test-content>signcrypt test content</signcrypt-test-content>" "</payload>" "</signcrypt>" ) def test_sign_element_args() -> None: with pytest.raises(ValueError): XEP_0373.build_sign_element([], True) def test_sign_element_with_rpad() -> None: sign_elt, payload_elt = XEP_0373.build_sign_element([ a, b ], True) payload_elt.addElement("sign-test-content", content="sign test content") rpad_elt = next(sign_elt.elements(NS_OX, "rpad")) time_elt = next(sign_elt.elements(NS_OX, "time")) rpad = str(rpad_elt) timestamp = parse_datetime(time_elt["stamp"]) sign_elt.children.remove(rpad_elt) sign_elt.children.remove(time_elt) assert rpad assert (datetime.now(timezone.utc) - timestamp) < timedelta(seconds=10) assert sign_elt.toXml() == ( "<sign xmlns='urn:xmpp:openpgp:0'>" "<to jid='foo@example.com'/>" "<to jid='bar@example.com'/>" "<payload>" "<sign-test-content>sign test content</sign-test-content>" "</payload>" "</sign>" ) def test_sign_element_without_rpad() -> None: sign_elt, payload_elt = XEP_0373.build_sign_element([ a, b ], False) payload_elt.addElement("sign-test-content", content="sign test content") rpad_elt = next(sign_elt.elements(NS_OX, "rpad"), None) time_elt = next(sign_elt.elements(NS_OX, "time")) timestamp = parse_datetime(time_elt["stamp"]) sign_elt.children.remove(time_elt) assert rpad_elt is None assert (datetime.now(timezone.utc) - timestamp) < timedelta(seconds=10) assert sign_elt.toXml() == ( "<sign xmlns='urn:xmpp:openpgp:0'>" "<to jid='foo@example.com'/>" "<to jid='bar@example.com'/>" "<payload>" "<sign-test-content>sign test content</sign-test-content>" "</payload>" "</sign>" ) def test_crypt_element_with_recipients() -> None: crypt_elt, payload_elt = XEP_0373.build_crypt_element([ a, b ]) payload_elt.addElement("crypt-test-content", content="crypt test content") rpad_elt = next(crypt_elt.elements(NS_OX, "rpad")) time_elt = next(crypt_elt.elements(NS_OX, "time")) rpad = str(rpad_elt) timestamp = parse_datetime(time_elt["stamp"]) crypt_elt.children.remove(rpad_elt) crypt_elt.children.remove(time_elt) assert rpad assert (datetime.now(timezone.utc) - timestamp) < timedelta(seconds=10) assert crypt_elt.toXml() == ( "<crypt xmlns='urn:xmpp:openpgp:0'>" "<to jid='foo@example.com'/>" "<to jid='bar@example.com'/>" "<payload>" "<crypt-test-content>crypt test content</crypt-test-content>" "</payload>" "</crypt>" ) def test_crypt_element_without_recipients() -> None: crypt_elt, payload_elt = XEP_0373.build_crypt_element([]) payload_elt.addElement("crypt-test-content", content="crypt test content") rpad_elt = next(crypt_elt.elements(NS_OX, "rpad")) time_elt = next(crypt_elt.elements(NS_OX, "time")) rpad = str(rpad_elt) timestamp = parse_datetime(time_elt["stamp"]) crypt_elt.children.remove(rpad_elt) crypt_elt.children.remove(time_elt) assert rpad assert (datetime.now(timezone.utc) - timestamp) < timedelta(seconds=10) assert crypt_elt.toXml() == ( "<crypt xmlns='urn:xmpp:openpgp:0'>" "<payload>" "<crypt-test-content>crypt test content</crypt-test-content>" "</payload>" "</crypt>" )