view tests/unit/test_plugin_xep_0420.py @ 4282:8da377040ba6

doc (encryption): update pubsub encryption specifications.
author Goffi <goffi@goffi.org>
date Sat, 13 Jul 2024 17:45:47 +0200
parents 4b842c1fb686
children f1d0cde61af7
line wrap: on
line source

#!/usr/bin/env python3

# Tests for Libervia's Stanza Content Encryption plugin
# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from datetime import datetime, timezone
from typing import Callable, cast

import pytest
from libervia.backend.core import exceptions

from libervia.backend.plugins.plugin_xep_0334 import NS_HINTS
from libervia.backend.plugins.plugin_xep_0420 import (
    NS_SCE, XEP_0420, AffixVerificationFailed, ProfileRequirementsNotMet, SCEAffixPolicy,
    SCECustomAffix, SCEProfile
)
from libervia.backend.tools.xml_tools import ElementParser
from twisted.words.xish import domish


__all__ = [  # pylint: disable=unused-variable
    "test_unpack_matches_original",
    "test_affixes_included",
    "test_all_affixes_verified",
    "test_incomplete_affixes",
    "test_rpad_affix",
    "test_time_affix",
    "test_to_affix",
    "test_from_affix",
    "test_custom_affixes",
    "test_namespace_conversion",
    "test_non_encryptable_elements",
    "test_schema_validation"
]


string_to_domish = cast(Callable[[str], domish.Element], ElementParser())


class CustomAffixImpl(SCECustomAffix):
    """
    A simple custom affix implementation for testing purposes. Verifies the full JIDs of
    both sender and recipient.

    @warning: This is just an example, an affix element like this might not make sense due
        to potentially allowed modifications of recipient/sender full JIDs (I don't know
        enough about XMPP routing to know whether full JIDs are always left untouched by
        the server).
    """

    @property
    def element_name(self) -> str:
        return "full-jids"

    @property
    def element_schema(self) -> str:
        return """<xs:element name="full-jids">
            <xs:complexType>
                <xs:attribute name="recipient" type="xs:string"/>
                <xs:attribute name="sender" type="xs:string"/>
            </xs:complexType>
        </xs:element>"""

    def create(self, stanza: domish.Element) -> domish.Element:
        recipient = stanza.getAttribute("to", None)
        sender = stanza.getAttribute("from", None)

        if recipient is None or sender is None:
            raise ValueError(
                "Stanza doesn't have ``to`` and ``from`` attributes required by the"
                " full-jids custom affix."
            )

        element = domish.Element((NS_SCE, "full-jids"))
        element["recipient"] = recipient
        element["sender"] = sender
        return element

    def verify(self, stanza: domish.Element, element: domish.Element) -> None:
        recipient_target = element["recipient"]
        recipient_actual = stanza.getAttribute("to")

        sender_target = element["sender"]
        sender_actual = stanza.getAttribute("from")

        if recipient_actual != recipient_target or sender_actual != sender_target:
            raise AffixVerificationFailed(
                f"Full JIDs differ. Recipient: actual={recipient_actual} vs."
                f" target={recipient_target}; Sender: actual={sender_actual} vs."
                f" target={sender_target}"
            )


def test_unpack_matches_original() -> None:  # pylint: disable=missing-function-docstring
    profile = SCEProfile(
        SCEAffixPolicy.REQUIRED,
        SCEAffixPolicy.OPTIONAL,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.OPTIONAL,
        custom_policies={ CustomAffixImpl(): SCEAffixPolicy.NOT_NEEDED }
    )

    stanza_string = (
        '<message from="foo@example.com" to="bar@example.com"><body>Test with both a body'
        ' and some other custom element.</body><custom xmlns="urn:xmpp:example:0"'
        ' test="matches-original">some more content</custom></message>'
    )

    stanza = string_to_domish(stanza_string)

    envelope_serialized = XEP_0420.pack_stanza(profile, stanza)

    # The stanza should not have child elements any more
    assert len(list(stanza.elements())) == 0

    XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)

    # domish.Element doesn't override __eq__, thus we compare the .toXml() strings here in
    # the hope that serialization for an example as small as this is unique enough to be
    # compared that way.
    assert stanza.toXml() == string_to_domish(stanza_string).toXml()


def test_affixes_included() -> None:  # pylint: disable=missing-function-docstring
    custom_affix = CustomAffixImpl()

    profile = SCEProfile(
        SCEAffixPolicy.REQUIRED,
        SCEAffixPolicy.OPTIONAL,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.OPTIONAL,
        custom_policies={ custom_affix: SCEAffixPolicy.OPTIONAL }
    )

    stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com">
        <body>
            Make sure that both the REQUIRED and the OPTIONAL affixes are included.
        </body>
    </message>""")

    affix_values = XEP_0420.unpack_stanza(
        profile,
        stanza,
        XEP_0420.pack_stanza(profile, stanza)
    )

    assert affix_values.rpad is not None
    assert affix_values.timestamp is not None
    assert affix_values.recipient is None
    assert affix_values.sender is not None
    assert custom_affix in affix_values.custom


def test_all_affixes_verified() -> None:  # pylint: disable=missing-function-docstring
    packing_profile = SCEProfile(
        SCEAffixPolicy.REQUIRED,
        SCEAffixPolicy.REQUIRED,
        SCEAffixPolicy.REQUIRED,
        SCEAffixPolicy.REQUIRED,
        custom_policies={}
    )

    unpacking_profile = SCEProfile(
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        custom_policies={}
    )

    stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com">
        <body>
            When unpacking, all affixes are loaded, even those marked as NOT_NEEDED.
        </body>
    </message>""")

    envelope_serialized = XEP_0420.pack_stanza(packing_profile, stanza)

    affix_values = XEP_0420.unpack_stanza(unpacking_profile, stanza, envelope_serialized)

    assert affix_values.rpad is not None
    assert affix_values.timestamp is not None
    assert affix_values.recipient is not None
    assert affix_values.sender is not None

    # When unpacking, all affixes are verified, even if they are NOT_NEEDED by the profile
    stanza = string_to_domish(
        """<message from="fooo@example.com" to="baz@example.com"></message>"""
    )

    with pytest.raises(AffixVerificationFailed):
        XEP_0420.unpack_stanza(unpacking_profile, stanza, envelope_serialized)


def test_incomplete_affixes() -> None:  # pylint: disable=missing-function-docstring
    packing_profile = SCEProfile(
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        custom_policies={}
    )

    unpacking_profile = SCEProfile(
        SCEAffixPolicy.REQUIRED,
        SCEAffixPolicy.REQUIRED,
        SCEAffixPolicy.REQUIRED,
        SCEAffixPolicy.REQUIRED,
        custom_policies={}
    )

    stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com">
        <body>Check that all affixes REQUIRED by the profile are present.</body>
    </message>""")

    with pytest.raises(ProfileRequirementsNotMet):
        XEP_0420.unpack_stanza(
            unpacking_profile,
            stanza,
            XEP_0420.pack_stanza(packing_profile, stanza)
        )

    # Do the same but with a custom affix missing
    custom_affix = CustomAffixImpl()

    packing_profile = SCEProfile(
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        custom_policies={ custom_affix: SCEAffixPolicy.NOT_NEEDED }
    )

    unpacking_profile = SCEProfile(
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        custom_policies={ custom_affix: SCEAffixPolicy.REQUIRED }
    )

    stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com">
        <body>
            Check that all affixes REQUIRED by the profile are present, including custom
            affixes.
        </body>
    </message>""")

    with pytest.raises(ProfileRequirementsNotMet):
        XEP_0420.unpack_stanza(
            unpacking_profile,
            stanza,
            XEP_0420.pack_stanza(packing_profile, stanza)
        )


def test_rpad_affix() -> None:  # pylint: disable=missing-function-docstring
    profile = SCEProfile(
        SCEAffixPolicy.REQUIRED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        custom_policies={}
    )

    for _ in range(100):
        stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com">
            <body>OK</body>
        </message>""")

        affix_values = XEP_0420.unpack_stanza(
            profile,
            stanza,
            XEP_0420.pack_stanza(profile, stanza)
        )

        # Test that the rpad exists and that the content elements are always padded to at
        # least 53 characters
        assert affix_values.rpad is not None
        assert len(affix_values.rpad) >= 53 - len("<body xmlns='jabber:client'>OK</body>")


def test_time_affix() -> None:  # pylint: disable=missing-function-docstring
    profile = SCEProfile(
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.REQUIRED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        custom_policies={}
    )

    stanza = string_to_domish(
        """<message from="foo@example.com" to="bar@example.com"></message>"""
    )

    envelope_serialized = f"""<envelope xmlns="{NS_SCE}">
        <content>
            <body xmlns="jabber:client">
                The time affix is only parsed and not otherwise verified. Not much to test
                here.
            </body>
        </content>
        <time stamp="1969-07-21T02:56:15Z"/>
    </envelope>""".encode("utf-8")

    affix_values = XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)
    assert affix_values.timestamp == datetime(1969, 7, 21, 2, 56, 15, tzinfo=timezone.utc)


def test_to_affix() -> None:  # pylint: disable=missing-function-docstring
    profile = SCEProfile(
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.REQUIRED,
        SCEAffixPolicy.NOT_NEEDED,
        custom_policies={}
    )

    stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com">
        <body>Check that the ``to`` affix is correctly added.</body>
    </message>""")

    envelope_serialized = XEP_0420.pack_stanza(profile, stanza)
    affix_values = XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)
    assert affix_values.recipient is not None
    assert affix_values.recipient.userhost() == "bar@example.com"

    # Check that a mismatch in recipient bare JID causes an exception to be raised
    stanza = string_to_domish(
        """<message from="foo@example.com" to="baz@example.com"></message>"""
    )

    with pytest.raises(AffixVerificationFailed):
        XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)

    # Check that only the bare JID matters
    stanza = string_to_domish(
        """<message from="foo@example.com" to="bar@example.com/device"></message>"""
    )

    affix_values = XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)
    assert affix_values.recipient is not None
    assert affix_values.recipient.userhost() == "bar@example.com"

    stanza = string_to_domish("""<message from="foo@example.com">
        <body>
            Check that a missing "to" attribute on the stanza fails stanza packing.
        </body>
    </message>""")

    with pytest.raises(ValueError):
        XEP_0420.pack_stanza(profile, stanza)


def test_from_affix() -> None:  # pylint: disable=missing-function-docstring
    profile = SCEProfile(
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.REQUIRED,
        custom_policies={}
    )

    stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com">
        <body>Check that the ``from`` affix is correctly added.</body>
    </message>""")

    envelope_serialized = XEP_0420.pack_stanza(profile, stanza)
    affix_values = XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)
    assert affix_values.sender is not None
    assert affix_values.sender.userhost() == "foo@example.com"

    # Check that a mismatch in sender bare JID causes an exception to be raised
    stanza = string_to_domish(
        """<message from="fooo@example.com" to="bar@example.com"></message>"""
    )

    with pytest.raises(AffixVerificationFailed):
        XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)

    # Check that only the bare JID matters
    stanza = string_to_domish(
        """<message from="foo@example.com/device" to="bar@example.com"></message>"""
    )

    affix_values = XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)
    assert affix_values.sender is not None
    assert affix_values.sender.userhost() == "foo@example.com"

    stanza = string_to_domish("""<message to="bar@example.com">
        <body>
            Check that a missing "from" attribute on the stanza fails stanza packing.
        </body>
    </message>""")

    with pytest.raises(ValueError):
        XEP_0420.pack_stanza(profile, stanza)


def test_custom_affixes() -> None:  # pylint: disable=missing-function-docstring
    custom_affix = CustomAffixImpl()

    packing_profile = SCEProfile(
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        custom_policies={ custom_affix: SCEAffixPolicy.REQUIRED }
    )

    unpacking_profile = SCEProfile(
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        custom_policies={}
    )

    stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com">
        <body>
            If a custom affix is included in the envelope, but not excpected by the
            recipient, the schema validation should fail.
        </body>
    </message>""")

    with pytest.raises(exceptions.ParsingError):
        XEP_0420.unpack_stanza(
            unpacking_profile,
            stanza,
            XEP_0420.pack_stanza(packing_profile, stanza)
        )

    profile = packing_profile

    stanza = string_to_domish("""<message from="foo@example.com/device0"
        to="bar@example.com/Libervia.123">
        <body>The affix element should be returned as part of the affix values.</body>
    </message>""")

    affix_values = XEP_0420.unpack_stanza(
        profile,
        stanza,
        XEP_0420.pack_stanza(profile, stanza)
    )

    assert custom_affix in affix_values.custom
    assert affix_values.custom[custom_affix].getAttribute("recipient") == \
        "bar@example.com/Libervia.123"
    assert affix_values.custom[custom_affix].getAttribute("sender") == \
        "foo@example.com/device0"


def test_namespace_conversion() -> None:  # pylint: disable=missing-function-docstring
    profile = SCEProfile(
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        custom_policies={}
    )

    stanza = domish.Element((None, "message"))
    stanza["from"] = "foo@example.com"
    stanza["to"] = "bar@example.com"
    stanza.addElement(
        "body",
        content=(
            "This body element has namespace ``None``, which has to be replaced with"
            " jabber:client."
        )
    )

    envelope_serialized = XEP_0420.pack_stanza(profile, stanza)
    envelope = string_to_domish(envelope_serialized.decode("utf-8"))
    content = next(envelope.elements(NS_SCE, "content"))

    # The body should have been assigned ``jabber:client`` as its namespace
    assert next(content.elements("jabber:client", "body"), None) is not None

    XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)

    # The body should still have ``jabber:client`` after unpacking
    assert next(stanza.elements("jabber:client", "body"), None) is not None


def test_non_encryptable_elements() -> None:  # pylint: disable=missing-function-docstring
    profile = SCEProfile(
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        custom_policies={}
    )

    stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com">
        <body>This stanza includes a store hint which must not be encrypted.</body>
        <store xmlns="urn:xmpp:hints"/>
    </message>""")

    envelope_serialized = XEP_0420.pack_stanza(profile, stanza)
    envelope = string_to_domish(envelope_serialized.decode("utf-8"))
    content = next(envelope.elements(NS_SCE, "content"))

    # The store hint must not have been moved to the content element
    assert next(stanza.elements(NS_HINTS, "store"), None) is not None
    assert next(content.elements(NS_HINTS, "store"), None) is None

    stanza = string_to_domish(
        """<message from="foo@example.com" to="bar@example.com"></message>"""
    )

    envelope_serialized = f"""<envelope xmlns="{NS_SCE}">
        <content>
            <body xmlns="jabber:client">
                The store hint must not be moved to the stanza.
            </body>
            <store xmlns="urn:xmpp:hints"/>
        </content>
    </envelope>""".encode("utf-8")

    XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)

    assert next(stanza.elements(NS_HINTS, "store"), None) is None


def test_schema_validation() -> None:  # pylint: disable=missing-function-docstring
    profile = SCEProfile(
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        custom_policies={}
    )

    stanza = string_to_domish(
        """<message from="foo@example.com" to="bar@example.com"></message>"""
    )

    envelope_serialized = f"""<envelope xmlns="{NS_SCE}">
        <content>
            <body xmlns="jabber:client">
                An unknwon affix should cause a schema validation error.
            </body>
            <store xmlns="urn:xmpp:hints"/>
        </content>
        <unknown-affix unknown-attr="unknown"/>
    </envelope>""".encode("utf-8")

    with pytest.raises(exceptions.ParsingError):
        XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)