Mercurial > libervia-backend
view tests/unit/test_plugin_xep_0420.py @ 3934:e345d93fb6e5
plugin OXPS: OpenPGP for XMPP Pubsub implementation:
OpenPGP for XMPP Pubsub (https://xmpp.org/extensions/inbox/pubsub-encryption.html,
currently a protoXEP) is implemented and activated when `encrypted` is set to `True` in
pubsub's `extra` data.
On item retrieval, the decryption is transparent if the key is known, except if the
`decrypt` key in `extra` is set to `False` (notably useful when one wants to checks that
data is well encrypted).
Methods and corresponding bridge methods have been implemented to manage shared secrets
(to share, revoke or rotate the secrets).
plugin XEP-0060's `XEP-0060_publish` trigger point as been move before actual publish so
item can be modified (here e2ee) by the triggers. A new `XEP-0060_items` trigger point has
also been added.
`encrypted` flag can be used with plugin XEP-0277's microblog data
rel 380
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 15 Oct 2022 20:36:53 +0200 |
parents | cecf45416403 |
children | 4b842c1fb686 |
line wrap: on
line source
#!/usr/bin/env python3 # Tests for Libervia's Stanza Content Encryption plugin # Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from datetime import datetime, timezone from typing import Callable, cast import pytest from sat.core import exceptions from sat.plugins.plugin_xep_0334 import NS_HINTS from sat.plugins.plugin_xep_0420 import ( NS_SCE, XEP_0420, AffixVerificationFailed, ProfileRequirementsNotMet, SCEAffixPolicy, SCECustomAffix, SCEProfile ) from sat.tools.xml_tools import ElementParser from twisted.words.xish import domish __all__ = [ # pylint: disable=unused-variable "test_unpack_matches_original", "test_affixes_included", "test_all_affixes_verified", "test_incomplete_affixes", "test_rpad_affix", "test_time_affix", "test_to_affix", "test_from_affix", "test_custom_affixes", "test_namespace_conversion", "test_non_encryptable_elements", "test_schema_validation" ] string_to_domish = cast(Callable[[str], domish.Element], ElementParser()) class CustomAffixImpl(SCECustomAffix): """ A simple custom affix implementation for testing purposes. Verifies the full JIDs of both sender and recipient. @warning: This is just an example, an affix element like this might not make sense due to potentially allowed modifications of recipient/sender full JIDs (I don't know enough about XMPP routing to know whether full JIDs are always left untouched by the server). """ @property def element_name(self) -> str: return "full-jids" @property def element_schema(self) -> str: return """<xs:element name="full-jids"> <xs:complexType> <xs:attribute name="recipient" type="xs:string"/> <xs:attribute name="sender" type="xs:string"/> </xs:complexType> </xs:element>""" def create(self, stanza: domish.Element) -> domish.Element: recipient = stanza.getAttribute("to", None) sender = stanza.getAttribute("from", None) if recipient is None or sender is None: raise ValueError( "Stanza doesn't have ``to`` and ``from`` attributes required by the" " full-jids custom affix." ) element = domish.Element((NS_SCE, "full-jids")) element["recipient"] = recipient element["sender"] = sender return element def verify(self, stanza: domish.Element, element: domish.Element) -> None: recipient_target = element["recipient"] recipient_actual = stanza.getAttribute("to") sender_target = element["sender"] sender_actual = stanza.getAttribute("from") if recipient_actual != recipient_target or sender_actual != sender_target: raise AffixVerificationFailed( f"Full JIDs differ. Recipient: actual={recipient_actual} vs." f" target={recipient_target}; Sender: actual={sender_actual} vs." f" target={sender_target}" ) def test_unpack_matches_original() -> None: # pylint: disable=missing-function-docstring profile = SCEProfile( SCEAffixPolicy.REQUIRED, SCEAffixPolicy.OPTIONAL, SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.OPTIONAL, custom_policies={ CustomAffixImpl(): SCEAffixPolicy.NOT_NEEDED } ) stanza_string = ( '<message from="foo@example.com" to="bar@example.com"><body>Test with both a body' ' and some other custom element.</body><custom xmlns="urn:xmpp:example:0"' ' test="matches-original">some more content</custom></message>' ) stanza = string_to_domish(stanza_string) envelope_serialized = XEP_0420.pack_stanza(profile, stanza) # The stanza should not have child elements any more assert len(list(stanza.elements())) == 0 XEP_0420.unpack_stanza(profile, stanza, envelope_serialized) # domish.Element doesn't override __eq__, thus we compare the .toXml() strings here in # the hope that serialization for an example as small as this is unique enough to be # compared that way. assert stanza.toXml() == string_to_domish(stanza_string).toXml() def test_affixes_included() -> None: # pylint: disable=missing-function-docstring custom_affix = CustomAffixImpl() profile = SCEProfile( SCEAffixPolicy.REQUIRED, SCEAffixPolicy.OPTIONAL, SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.OPTIONAL, custom_policies={ custom_affix: SCEAffixPolicy.OPTIONAL } ) stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com"> <body> Make sure that both the REQUIRED and the OPTIONAL affixes are included. </body> </message>""") affix_values = XEP_0420.unpack_stanza( profile, stanza, XEP_0420.pack_stanza(profile, stanza) ) assert affix_values.rpad is not None assert affix_values.timestamp is not None assert affix_values.recipient is None assert affix_values.sender is not None assert custom_affix in affix_values.custom def test_all_affixes_verified() -> None: # pylint: disable=missing-function-docstring packing_profile = SCEProfile( SCEAffixPolicy.REQUIRED, SCEAffixPolicy.REQUIRED, SCEAffixPolicy.REQUIRED, SCEAffixPolicy.REQUIRED, custom_policies={} ) unpacking_profile = SCEProfile( SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, custom_policies={} ) stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com"> <body> When unpacking, all affixes are loaded, even those marked as NOT_NEEDED. </body> </message>""") envelope_serialized = XEP_0420.pack_stanza(packing_profile, stanza) affix_values = XEP_0420.unpack_stanza(unpacking_profile, stanza, envelope_serialized) assert affix_values.rpad is not None assert affix_values.timestamp is not None assert affix_values.recipient is not None assert affix_values.sender is not None # When unpacking, all affixes are verified, even if they are NOT_NEEDED by the profile stanza = string_to_domish( """<message from="fooo@example.com" to="baz@example.com"></message>""" ) with pytest.raises(AffixVerificationFailed): XEP_0420.unpack_stanza(unpacking_profile, stanza, envelope_serialized) def test_incomplete_affixes() -> None: # pylint: disable=missing-function-docstring packing_profile = SCEProfile( SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, custom_policies={} ) unpacking_profile = SCEProfile( SCEAffixPolicy.REQUIRED, SCEAffixPolicy.REQUIRED, SCEAffixPolicy.REQUIRED, SCEAffixPolicy.REQUIRED, custom_policies={} ) stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com"> <body>Check that all affixes REQUIRED by the profile are present.</body> </message>""") with pytest.raises(ProfileRequirementsNotMet): XEP_0420.unpack_stanza( unpacking_profile, stanza, XEP_0420.pack_stanza(packing_profile, stanza) ) # Do the same but with a custom affix missing custom_affix = CustomAffixImpl() packing_profile = SCEProfile( SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, custom_policies={ custom_affix: SCEAffixPolicy.NOT_NEEDED } ) unpacking_profile = SCEProfile( SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, custom_policies={ custom_affix: SCEAffixPolicy.REQUIRED } ) stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com"> <body> Check that all affixes REQUIRED by the profile are present, including custom affixes. </body> </message>""") with pytest.raises(ProfileRequirementsNotMet): XEP_0420.unpack_stanza( unpacking_profile, stanza, XEP_0420.pack_stanza(packing_profile, stanza) ) def test_rpad_affix() -> None: # pylint: disable=missing-function-docstring profile = SCEProfile( SCEAffixPolicy.REQUIRED, SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, custom_policies={} ) for _ in range(100): stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com"> <body>OK</body> </message>""") affix_values = XEP_0420.unpack_stanza( profile, stanza, XEP_0420.pack_stanza(profile, stanza) ) # Test that the rpad exists and that the content elements are always padded to at # least 53 characters assert affix_values.rpad is not None assert len(affix_values.rpad) >= 53 - len("<body xmlns='jabber:client'>OK</body>") def test_time_affix() -> None: # pylint: disable=missing-function-docstring profile = SCEProfile( SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.REQUIRED, SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, custom_policies={} ) stanza = string_to_domish( """<message from="foo@example.com" to="bar@example.com"></message>""" ) envelope_serialized = f"""<envelope xmlns="{NS_SCE}"> <content> <body xmlns="jabber:client"> The time affix is only parsed and not otherwise verified. Not much to test here. </body> </content> <time stamp="1969-07-21T02:56:15Z"/> </envelope>""".encode("utf-8") affix_values = XEP_0420.unpack_stanza(profile, stanza, envelope_serialized) assert affix_values.timestamp == datetime(1969, 7, 21, 2, 56, 15, tzinfo=timezone.utc) def test_to_affix() -> None: # pylint: disable=missing-function-docstring profile = SCEProfile( SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.REQUIRED, SCEAffixPolicy.NOT_NEEDED, custom_policies={} ) stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com"> <body>Check that the ``to`` affix is correctly added.</body> </message>""") envelope_serialized = XEP_0420.pack_stanza(profile, stanza) affix_values = XEP_0420.unpack_stanza(profile, stanza, envelope_serialized) assert affix_values.recipient is not None assert affix_values.recipient.userhost() == "bar@example.com" # Check that a mismatch in recipient bare JID causes an exception to be raised stanza = string_to_domish( """<message from="foo@example.com" to="baz@example.com"></message>""" ) with pytest.raises(AffixVerificationFailed): XEP_0420.unpack_stanza(profile, stanza, envelope_serialized) # Check that only the bare JID matters stanza = string_to_domish( """<message from="foo@example.com" to="bar@example.com/device"></message>""" ) affix_values = XEP_0420.unpack_stanza(profile, stanza, envelope_serialized) assert affix_values.recipient is not None assert affix_values.recipient.userhost() == "bar@example.com" stanza = string_to_domish("""<message from="foo@example.com"> <body> Check that a missing "to" attribute on the stanza fails stanza packing. </body> </message>""") with pytest.raises(ValueError): XEP_0420.pack_stanza(profile, stanza) def test_from_affix() -> None: # pylint: disable=missing-function-docstring profile = SCEProfile( SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.REQUIRED, custom_policies={} ) stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com"> <body>Check that the ``from`` affix is correctly added.</body> </message>""") envelope_serialized = XEP_0420.pack_stanza(profile, stanza) affix_values = XEP_0420.unpack_stanza(profile, stanza, envelope_serialized) assert affix_values.sender is not None assert affix_values.sender.userhost() == "foo@example.com" # Check that a mismatch in sender bare JID causes an exception to be raised stanza = string_to_domish( """<message from="fooo@example.com" to="bar@example.com"></message>""" ) with pytest.raises(AffixVerificationFailed): XEP_0420.unpack_stanza(profile, stanza, envelope_serialized) # Check that only the bare JID matters stanza = string_to_domish( """<message from="foo@example.com/device" to="bar@example.com"></message>""" ) affix_values = XEP_0420.unpack_stanza(profile, stanza, envelope_serialized) assert affix_values.sender is not None assert affix_values.sender.userhost() == "foo@example.com" stanza = string_to_domish("""<message to="bar@example.com"> <body> Check that a missing "from" attribute on the stanza fails stanza packing. </body> </message>""") with pytest.raises(ValueError): XEP_0420.pack_stanza(profile, stanza) def test_custom_affixes() -> None: # pylint: disable=missing-function-docstring custom_affix = CustomAffixImpl() packing_profile = SCEProfile( SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, custom_policies={ custom_affix: SCEAffixPolicy.REQUIRED } ) unpacking_profile = SCEProfile( SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, custom_policies={} ) stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com"> <body> If a custom affix is included in the envelope, but not excpected by the recipient, the schema validation should fail. </body> </message>""") with pytest.raises(exceptions.ParsingError): XEP_0420.unpack_stanza( unpacking_profile, stanza, XEP_0420.pack_stanza(packing_profile, stanza) ) profile = packing_profile stanza = string_to_domish("""<message from="foo@example.com/device0" to="bar@example.com/Libervia.123"> <body>The affix element should be returned as part of the affix values.</body> </message>""") affix_values = XEP_0420.unpack_stanza( profile, stanza, XEP_0420.pack_stanza(profile, stanza) ) assert custom_affix in affix_values.custom assert affix_values.custom[custom_affix].getAttribute("recipient") == \ "bar@example.com/Libervia.123" assert affix_values.custom[custom_affix].getAttribute("sender") == \ "foo@example.com/device0" def test_namespace_conversion() -> None: # pylint: disable=missing-function-docstring profile = SCEProfile( SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, custom_policies={} ) stanza = domish.Element((None, "message")) stanza["from"] = "foo@example.com" stanza["to"] = "bar@example.com" stanza.addElement( "body", content=( "This body element has namespace ``None``, which has to be replaced with" " jabber:client." ) ) envelope_serialized = XEP_0420.pack_stanza(profile, stanza) envelope = string_to_domish(envelope_serialized.decode("utf-8")) content = next(envelope.elements(NS_SCE, "content")) # The body should have been assigned ``jabber:client`` as its namespace assert next(content.elements("jabber:client", "body"), None) is not None XEP_0420.unpack_stanza(profile, stanza, envelope_serialized) # The body should still have ``jabber:client`` after unpacking assert next(stanza.elements("jabber:client", "body"), None) is not None def test_non_encryptable_elements() -> None: # pylint: disable=missing-function-docstring profile = SCEProfile( SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, custom_policies={} ) stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com"> <body>This stanza includes a store hint which must not be encrypted.</body> <store xmlns="urn:xmpp:hints"/> </message>""") envelope_serialized = XEP_0420.pack_stanza(profile, stanza) envelope = string_to_domish(envelope_serialized.decode("utf-8")) content = next(envelope.elements(NS_SCE, "content")) # The store hint must not have been moved to the content element assert next(stanza.elements(NS_HINTS, "store"), None) is not None assert next(content.elements(NS_HINTS, "store"), None) is None stanza = string_to_domish( """<message from="foo@example.com" to="bar@example.com"></message>""" ) envelope_serialized = f"""<envelope xmlns="{NS_SCE}"> <content> <body xmlns="jabber:client"> The store hint must not be moved to the stanza. </body> <store xmlns="urn:xmpp:hints"/> </content> </envelope>""".encode("utf-8") XEP_0420.unpack_stanza(profile, stanza, envelope_serialized) assert next(stanza.elements(NS_HINTS, "store"), None) is None def test_schema_validation() -> None: # pylint: disable=missing-function-docstring profile = SCEProfile( SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, SCEAffixPolicy.NOT_NEEDED, custom_policies={} ) stanza = string_to_domish( """<message from="foo@example.com" to="bar@example.com"></message>""" ) envelope_serialized = f"""<envelope xmlns="{NS_SCE}"> <content> <body xmlns="jabber:client"> An unknwon affix should cause a schema validation error. </body> <store xmlns="urn:xmpp:hints"/> </content> <unknown-affix unknown-attr="unknown"/> </envelope>""".encode("utf-8") with pytest.raises(exceptions.ParsingError): XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)