view tests/unit/test_plugin_xep_0420.py @ 3913:944f51f9c2b4

core (xmpp): make `send` a blocking method, fix `sendMessageData` calls: original `send` method is blocking, and it is used as such by Wokkel and thus can't be changed to an async method easily. However, an Async method is necessary to have an async trigger at the very end of the send workflow for end-to-end encryption. To workaround that, `send` is an async method which call `a_send`, an async method which actually does the sending. This way legacy method can still call `send` while `a_send` can be await otherwise. Fix calls to `sendMessageData`: the method now being an `async` one, `ensureDeferred` had to be used in some calls.
author Goffi <goffi@goffi.org>
date Sat, 24 Sep 2022 16:31:39 +0200
parents 8289ac1b34f4
children cecf45416403
line wrap: on
line source

#!/usr/bin/env python3

# Tests for Libervia's Stanza Content Encryption plugin
# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from datetime import datetime, timezone
from typing import Callable, cast

import pytest

from sat.plugins.plugin_xep_0334 import NS_HINTS
from sat.plugins.plugin_xep_0420 import (
    NS_SCE, XEP_0420, AffixVerificationFailed, ProfileRequirementsNotMet, SCEAffixPolicy,
    SCECustomAffix, SCEProfile
)
from sat.tools.xml_tools import ElementParser
from twisted.words.xish import domish


__all__ = [  # pylint: disable=unused-variable
    "test_unpack_matches_original",
    "test_affixes_included",
    "test_all_affixes_verified",
    "test_incomplete_affixes",
    "test_rpad_affix",
    "test_time_affix",
    "test_to_affix",
    "test_from_affix",
    "test_custom_affixes",
    "test_namespace_conversion",
    "test_non_encryptable_elements",
    "test_schema_validation"
]


string_to_domish = cast(Callable[[str], domish.Element], ElementParser())


class CustomAffixImpl(SCECustomAffix):
    """
    A simple custom affix implementation for testing purposes. Verifies the full JIDs of
    both sender and recipient.

    @warning: This is just an example, an affix element like this might not make sense due
        to potentially allowed modifications of recipient/sender full JIDs (I don't know
        enough about XMPP routing to know whether full JIDs are always left untouched by
        the server).
    """

    @property
    def element_name(self) -> str:
        return "full-jids"

    @property
    def element_schema(self) -> str:
        return """<xs:element name="full-jids">
            <xs:complexType>
                <xs:attribute name="recipient" type="xs:string"/>
                <xs:attribute name="sender" type="xs:string"/>
            </xs:complexType>
        </xs:element>"""

    def create(self, stanza: domish.Element) -> domish.Element:
        recipient = stanza.getAttribute("to", None)
        sender = stanza.getAttribute("from", None)

        if recipient is None or sender is None:
            raise ValueError(
                "Stanza doesn't have ``to`` and ``from`` attributes required by the"
                " full-jids custom affix."
            )

        element = domish.Element((NS_SCE, "full-jids"))
        element["recipient"] = recipient
        element["sender"] = sender
        return element

    def verify(self, stanza: domish.Element, element: domish.Element) -> None:
        recipient_target = element["recipient"]
        recipient_actual = stanza.getAttribute("to")

        sender_target = element["sender"]
        sender_actual = stanza.getAttribute("from")

        if recipient_actual != recipient_target or sender_actual != sender_target:
            raise AffixVerificationFailed(
                f"Full JIDs differ. Recipient: actual={recipient_actual} vs."
                f" target={recipient_target}; Sender: actual={sender_actual} vs."
                f" target={sender_target}"
            )


def test_unpack_matches_original() -> None:  # pylint: disable=missing-function-docstring
    profile = SCEProfile(
        SCEAffixPolicy.REQUIRED,
        SCEAffixPolicy.OPTIONAL,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.OPTIONAL,
        custom_policies={ CustomAffixImpl(): SCEAffixPolicy.NOT_NEEDED }
    )

    stanza_string = (
        '<message from="foo@example.com" to="bar@example.com"><body>Test with both a body'
        ' and some other custom element.</body><custom xmlns="urn:xmpp:example:0"'
        ' test="matches-original">some more content</custom></message>'
    )

    stanza = string_to_domish(stanza_string)

    envelope_serialized = XEP_0420.pack_stanza(profile, stanza)

    # The stanza should not have child elements any more
    assert len(list(stanza.elements())) == 0

    XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)

    # domish.Element doesn't override __eq__, thus we compare the .toXml() strings here in
    # the hope that serialization for an example as small as this is unique enough to be
    # compared that way.
    assert stanza.toXml() == string_to_domish(stanza_string).toXml()


def test_affixes_included() -> None:  # pylint: disable=missing-function-docstring
    custom_affix = CustomAffixImpl()

    profile = SCEProfile(
        SCEAffixPolicy.REQUIRED,
        SCEAffixPolicy.OPTIONAL,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.OPTIONAL,
        custom_policies={ custom_affix: SCEAffixPolicy.OPTIONAL }
    )

    stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com">
        <body>
            Make sure that both the REQUIRED and the OPTIONAL affixes are included.
        </body>
    </message>""")

    affix_values = XEP_0420.unpack_stanza(
        profile,
        stanza,
        XEP_0420.pack_stanza(profile, stanza)
    )

    assert affix_values.rpad is not None
    assert affix_values.timestamp is not None
    assert affix_values.recipient is None
    assert affix_values.sender is not None
    assert custom_affix in affix_values.custom


def test_all_affixes_verified() -> None:  # pylint: disable=missing-function-docstring
    packing_profile = SCEProfile(
        SCEAffixPolicy.REQUIRED,
        SCEAffixPolicy.REQUIRED,
        SCEAffixPolicy.REQUIRED,
        SCEAffixPolicy.REQUIRED,
        custom_policies={}
    )

    unpacking_profile = SCEProfile(
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        custom_policies={}
    )

    stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com">
        <body>
            When unpacking, all affixes are loaded, even those marked as NOT_NEEDED.
        </body>
    </message>""")

    envelope_serialized = XEP_0420.pack_stanza(packing_profile, stanza)

    affix_values = XEP_0420.unpack_stanza(unpacking_profile, stanza, envelope_serialized)

    assert affix_values.rpad is not None
    assert affix_values.timestamp is not None
    assert affix_values.recipient is not None
    assert affix_values.sender is not None

    # When unpacking, all affixes are verified, even if they are NOT_NEEDED by the profile
    stanza = string_to_domish(
        """<message from="fooo@example.com" to="baz@example.com"></message>"""
    )

    with pytest.raises(AffixVerificationFailed):
        XEP_0420.unpack_stanza(unpacking_profile, stanza, envelope_serialized)


def test_incomplete_affixes() -> None:  # pylint: disable=missing-function-docstring
    packing_profile = SCEProfile(
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        custom_policies={}
    )

    unpacking_profile = SCEProfile(
        SCEAffixPolicy.REQUIRED,
        SCEAffixPolicy.REQUIRED,
        SCEAffixPolicy.REQUIRED,
        SCEAffixPolicy.REQUIRED,
        custom_policies={}
    )

    stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com">
        <body>Check that all affixes REQUIRED by the profile are present.</body>
    </message>""")

    with pytest.raises(ProfileRequirementsNotMet):
        XEP_0420.unpack_stanza(
            unpacking_profile,
            stanza,
            XEP_0420.pack_stanza(packing_profile, stanza)
        )

    # Do the same but with a custom affix missing
    custom_affix = CustomAffixImpl()

    packing_profile = SCEProfile(
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        custom_policies={ custom_affix: SCEAffixPolicy.NOT_NEEDED }
    )

    unpacking_profile = SCEProfile(
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        custom_policies={ custom_affix: SCEAffixPolicy.REQUIRED }
    )

    stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com">
        <body>
            Check that all affixes REQUIRED by the profile are present, including custom
            affixes.
        </body>
    </message>""")

    with pytest.raises(ProfileRequirementsNotMet):
        XEP_0420.unpack_stanza(
            unpacking_profile,
            stanza,
            XEP_0420.pack_stanza(packing_profile, stanza)
        )


def test_rpad_affix() -> None:  # pylint: disable=missing-function-docstring
    profile = SCEProfile(
        SCEAffixPolicy.REQUIRED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        custom_policies={}
    )

    for _ in range(100):
        stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com">
            <body>OK</body>
        </message>""")

        affix_values = XEP_0420.unpack_stanza(
            profile,
            stanza,
            XEP_0420.pack_stanza(profile, stanza)
        )

        # Test that the rpad exists and that the content elements are always padded to at
        # least 53 characters
        assert affix_values.rpad is not None
        assert len(affix_values.rpad) >= 53 - len("<body xmlns='jabber:client'>OK</body>")


def test_time_affix() -> None:  # pylint: disable=missing-function-docstring
    profile = SCEProfile(
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.REQUIRED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        custom_policies={}
    )

    stanza = string_to_domish(
        """<message from="foo@example.com" to="bar@example.com"></message>"""
    )

    envelope_serialized = f"""<envelope xmlns="{NS_SCE}">
        <content>
            <body xmlns="jabber:client">
                The time affix is only parsed and not otherwise verified. Not much to test
                here.
            </body>
        </content>
        <time stamp="1969-07-21T02:56:15Z"/>
    </envelope>""".encode("utf-8")

    affix_values = XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)
    assert affix_values.timestamp == datetime(1969, 7, 21, 2, 56, 15, tzinfo=timezone.utc)


def test_to_affix() -> None:  # pylint: disable=missing-function-docstring
    profile = SCEProfile(
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.REQUIRED,
        SCEAffixPolicy.NOT_NEEDED,
        custom_policies={}
    )

    stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com">
        <body>Check that the ``to`` affix is correctly added.</body>
    </message>""")

    envelope_serialized = XEP_0420.pack_stanza(profile, stanza)
    affix_values = XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)
    assert affix_values.recipient is not None
    assert affix_values.recipient.userhost() == "bar@example.com"

    # Check that a mismatch in recipient bare JID causes an exception to be raised
    stanza = string_to_domish(
        """<message from="foo@example.com" to="baz@example.com"></message>"""
    )

    with pytest.raises(AffixVerificationFailed):
        XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)

    # Check that only the bare JID matters
    stanza = string_to_domish(
        """<message from="foo@example.com" to="bar@example.com/device"></message>"""
    )

    affix_values = XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)
    assert affix_values.recipient is not None
    assert affix_values.recipient.userhost() == "bar@example.com"

    stanza = string_to_domish("""<message from="foo@example.com">
        <body>
            Check that a missing "to" attribute on the stanza fails stanza packing.
        </body>
    </message>""")

    with pytest.raises(ValueError):
        XEP_0420.pack_stanza(profile, stanza)


def test_from_affix() -> None:  # pylint: disable=missing-function-docstring
    profile = SCEProfile(
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.REQUIRED,
        custom_policies={}
    )

    stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com">
        <body>Check that the ``from`` affix is correctly added.</body>
    </message>""")

    envelope_serialized = XEP_0420.pack_stanza(profile, stanza)
    affix_values = XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)
    assert affix_values.sender is not None
    assert affix_values.sender.userhost() == "foo@example.com"

    # Check that a mismatch in sender bare JID causes an exception to be raised
    stanza = string_to_domish(
        """<message from="fooo@example.com" to="bar@example.com"></message>"""
    )

    with pytest.raises(AffixVerificationFailed):
        XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)

    # Check that only the bare JID matters
    stanza = string_to_domish(
        """<message from="foo@example.com/device" to="bar@example.com"></message>"""
    )

    affix_values = XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)
    assert affix_values.sender is not None
    assert affix_values.sender.userhost() == "foo@example.com"

    stanza = string_to_domish("""<message to="bar@example.com">
        <body>
            Check that a missing "from" attribute on the stanza fails stanza packing.
        </body>
    </message>""")

    with pytest.raises(ValueError):
        XEP_0420.pack_stanza(profile, stanza)


def test_custom_affixes() -> None:  # pylint: disable=missing-function-docstring
    custom_affix = CustomAffixImpl()

    packing_profile = SCEProfile(
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        custom_policies={ custom_affix: SCEAffixPolicy.REQUIRED }
    )

    unpacking_profile = SCEProfile(
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        custom_policies={}
    )

    stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com">
        <body>
            If a custom affix is included in the envelope, but not excpected by the
            recipient, the schema validation should fail.
        </body>
    </message>""")

    with pytest.raises(ValueError):
        XEP_0420.unpack_stanza(
            unpacking_profile,
            stanza,
            XEP_0420.pack_stanza(packing_profile, stanza)
        )

    profile = packing_profile

    stanza = string_to_domish("""<message from="foo@example.com/device0"
        to="bar@example.com/Libervia.123">
        <body>The affix element should be returned as part of the affix values.</body>
    </message>""")

    affix_values = XEP_0420.unpack_stanza(
        profile,
        stanza,
        XEP_0420.pack_stanza(profile, stanza)
    )

    assert custom_affix in affix_values.custom
    assert affix_values.custom[custom_affix].getAttribute("recipient") == \
        "bar@example.com/Libervia.123"
    assert affix_values.custom[custom_affix].getAttribute("sender") == \
        "foo@example.com/device0"


def test_namespace_conversion() -> None:  # pylint: disable=missing-function-docstring
    profile = SCEProfile(
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        custom_policies={}
    )

    stanza = domish.Element((None, "message"))
    stanza["from"] = "foo@example.com"
    stanza["to"] = "bar@example.com"
    stanza.addElement(
        "body",
        content=(
            "This body element has namespace ``None``, which has to be replaced with"
            " jabber:client."
        )
    )

    envelope_serialized = XEP_0420.pack_stanza(profile, stanza)
    envelope = string_to_domish(envelope_serialized.decode("utf-8"))
    content = next(envelope.elements(NS_SCE, "content"))

    # The body should have been assigned ``jabber:client`` as its namespace
    assert next(content.elements("jabber:client", "body"), None) is not None

    XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)

    # The body should still have ``jabber:client`` after unpacking
    assert next(stanza.elements("jabber:client", "body"), None) is not None


def test_non_encryptable_elements() -> None:  # pylint: disable=missing-function-docstring
    profile = SCEProfile(
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        custom_policies={}
    )

    stanza = string_to_domish("""<message from="foo@example.com" to="bar@example.com">
        <body>This stanza includes a store hint which must not be encrypted.</body>
        <store xmlns="urn:xmpp:hints"/>
    </message>""")

    envelope_serialized = XEP_0420.pack_stanza(profile, stanza)
    envelope = string_to_domish(envelope_serialized.decode("utf-8"))
    content = next(envelope.elements(NS_SCE, "content"))

    # The store hint must not have been moved to the content element
    assert next(stanza.elements(NS_HINTS, "store"), None) is not None
    assert next(content.elements(NS_HINTS, "store"), None) is None

    stanza = string_to_domish(
        """<message from="foo@example.com" to="bar@example.com"></message>"""
    )

    envelope_serialized = f"""<envelope xmlns="{NS_SCE}">
        <content>
            <body xmlns="jabber:client">
                The store hint must not be moved to the stanza.
            </body>
            <store xmlns="urn:xmpp:hints"/>
        </content>
    </envelope>""".encode("utf-8")

    XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)

    assert next(stanza.elements(NS_HINTS, "store"), None) is None


def test_schema_validation() -> None:  # pylint: disable=missing-function-docstring
    profile = SCEProfile(
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        SCEAffixPolicy.NOT_NEEDED,
        custom_policies={}
    )

    stanza = string_to_domish(
        """<message from="foo@example.com" to="bar@example.com"></message>"""
    )

    envelope_serialized = f"""<envelope xmlns="{NS_SCE}">
        <content>
            <body xmlns="jabber:client">
                An unknwon affix should cause a schema validation error.
            </body>
            <store xmlns="urn:xmpp:hints"/>
        </content>
        <unknown-affix unknown-attr="unknown"/>
    </envelope>""".encode("utf-8")

    with pytest.raises(ValueError):
        XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)