view sat/plugins/plugin_xep_0420.py @ 3979:996e0f84935e

component AP gateway: log at debug level instead of warning when no client is set
author Goffi <goffi@goffi.org>
date Tue, 15 Nov 2022 18:02:16 +0100
parents cecf45416403
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin for Stanza Content Encryption
# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from abc import ABC, abstractmethod
from datetime import datetime
import enum
import secrets
import string
from typing import Dict, NamedTuple, Optional, Set, Tuple, cast
from typing_extensions import Final

from lxml import etree
from sat.core import exceptions

from sat.core.constants import Const as C
from sat.core.i18n import D_
from sat.core.log import Logger, getLogger
from sat.core.sat_main import SAT
from sat.tools.xml_tools import ElementParser
from sat.plugins.plugin_xep_0033 import NS_ADDRESS
from sat.plugins.plugin_xep_0082 import XEP_0082
from sat.plugins.plugin_xep_0334 import NS_HINTS
from sat.plugins.plugin_xep_0359 import NS_SID
from sat.plugins.plugin_xep_0380 import NS_EME
from twisted.words.protocols.jabber import jid
from twisted.words.xish import domish


__all__ = [  # pylint: disable=unused-variable
    "PLUGIN_INFO",
    "NS_SCE",
    "XEP_0420",
    "ProfileRequirementsNotMet",
    "AffixVerificationFailed",
    "SCECustomAffix",
    "SCEAffixPolicy",
    "SCEProfile",
    "SCEAffixValues"
]


log = cast(Logger, getLogger(__name__))  # type: ignore[no-untyped-call]


PLUGIN_INFO = {
    C.PI_NAME: "SCE",
    C.PI_IMPORT_NAME: "XEP-0420",
    C.PI_TYPE: "SEC",
    C.PI_PROTOCOLS: [ "XEP-0420" ],
    C.PI_DEPENDENCIES: [ "XEP-0334", "XEP-0082" ],
    C.PI_RECOMMENDATIONS: [ "XEP-0045", "XEP-0033", "XEP-0359" ],
    C.PI_MAIN: "XEP_0420",
    C.PI_HANDLER: "no",
    C.PI_DESCRIPTION: D_("Implementation of Stanza Content Encryption"),
}


NS_SCE: Final = "urn:xmpp:sce:1"


class ProfileRequirementsNotMet(Exception):
    """
    Raised by :meth:`XEP_0420.unpack_stanza` in case the requirements formulated by the
    profile are not met.
    """


class AffixVerificationFailed(Exception):
    """
    Raised by :meth:`XEP_0420.unpack_stanza` in case of affix verification failure.
    """


class SCECustomAffix(ABC):
    """
    Interface for custom affixes of SCE profiles.
    """

    @property
    @abstractmethod
    def element_name(self) -> str:
        """
        @return: The name of the affix's XML element.
        """

    @property
    @abstractmethod
    def element_schema(self) -> str:
        """
        @return: The XML schema definition of the affix element's XML structure, i.e. the
            ``<xs:element/>`` schema element. This element will be referenced using
            ``<xs:element ref="{element_name}"/>``.
        """

    @abstractmethod
    def create(self, stanza: domish.Element) -> domish.Element:
        """
        @param stanza: The stanza element which has been processed by
            :meth:`XEP_0420.pack_stanza`, i.e. all encryptable children have been removed
            and only the root ``<message/>`` or ``<iq/>`` and unencryptable children
            remain. Do not modify.
        @return: An affix element to include in the envelope. The element must have the
            name :attr:`element_name` and must validate using :attr:`element_schema`.
        @raise ValueError: if the affix couldn't be built due to missing information on
            the stanza.
        """

    @abstractmethod
    def verify(self, stanza: domish.Element, element: domish.Element) -> None:
        """
        @param stanza: The stanza element before being processed by
            :meth:`XEP_0420.unpack_stanza`, i.e. all encryptable children have been
            removed and only the root ``<message/>`` or ``<iq/>`` and unencryptable
            children remain. Do not modify.
        @param element: The affix element to verify.
        @raise AffixVerificationFailed: on verification failure.
        """


@enum.unique
class SCEAffixPolicy(enum.Enum):
    """
    Policy for the presence of an affix in an SCE envelope.
    """

    REQUIRED: str = "REQUIRED"
    OPTIONAL: str = "OPTIONAL"
    NOT_NEEDED: str = "NOT_NEEDED"


class SCEProfile(NamedTuple):
    # pylint: disable=invalid-name
    """
    An SCE profile, i.e. the definition which affixes are required, optional or not needed
    at all by an SCE-enabled encryption protocol.
    """

    rpad_policy: SCEAffixPolicy
    time_policy: SCEAffixPolicy
    to_policy: SCEAffixPolicy
    from_policy: SCEAffixPolicy
    custom_policies: Dict[SCECustomAffix, SCEAffixPolicy]


class SCEAffixValues(NamedTuple):
    # pylint: disable=invalid-name
    """
    Structure returned by :meth:`XEP_0420.unpack_stanza` with the parsed/processes values
    of all affixes included in the envelope. For custom affixes, the whole affix element
    is returned.
    """

    rpad: Optional[str]
    timestamp: Optional[datetime]
    recipient: Optional[jid.JID]
    sender: Optional[jid.JID]
    custom: Dict[SCECustomAffix, domish.Element]


ENVELOPE_SCHEMA = """<?xml version="1.0" encoding="utf8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
    targetNamespace="urn:xmpp:sce:1"
    xmlns="urn:xmpp:sce:1">

    <xs:element name="envelope">
        <xs:complexType>
            <xs:all>
                <xs:element ref="content"/>
                <xs:element ref="rpad" minOccurs="0"/>
                <xs:element ref="time" minOccurs="0"/>
                <xs:element ref="to" minOccurs="0"/>
                <xs:element ref="from" minOccurs="0"/>
                {custom_affix_references}
            </xs:all>
        </xs:complexType>
    </xs:element>

    <xs:element name="content">
        <xs:complexType>
            <xs:sequence>
                <xs:any minOccurs="0" maxOccurs="unbounded" processContents="skip"/>
            </xs:sequence>
        </xs:complexType>
    </xs:element>

    <xs:element name="rpad" type="xs:string"/>

    <xs:element name="time">
        <xs:complexType>
            <xs:attribute name="stamp" type="xs:dateTime"/>
        </xs:complexType>
    </xs:element>

    <xs:element name="to">
        <xs:complexType>
            <xs:attribute name="jid" type="xs:string"/>
        </xs:complexType>
    </xs:element>

    <xs:element name="from">
        <xs:complexType>
            <xs:attribute name="jid" type="xs:string"/>
        </xs:complexType>
    </xs:element>

    {custom_affix_definitions}
</xs:schema>
"""


class XEP_0420:  # pylint: disable=invalid-name
    """
    Implementation of XEP-0420: Stanza Content Encryption under namespace
    ``urn:xmpp:sce:1``.

    This is a passive plugin, i.e. it doesn't hook into any triggers to process stanzas
    actively, but offers API for other plugins to use.
    """

    # Set of namespaces whose elements are never allowed to be transferred in an encrypted
    # envelope.
    MUST_BE_PLAINTEXT_NAMESPACES: Set[str] = {
        NS_HINTS,
        NS_SID,  # TODO: Not sure whether this ban applies to both stanza-id and origin-id
        NS_ADDRESS,
        # Not part of the specification (yet), but just doesn't make sense in an encrypted
        # envelope:
        NS_EME
    }

    # Set of (namespace, element name) tuples that define elements which are never allowed
    # to be transferred in an encrypted envelope. If all elements under a certain
    # namespace are forbidden, the namespace can be added to
    # :attr:`MUST_BE_PLAINTEXT_NAMESPACES` instead.
    # Note: only full namespaces are forbidden by the spec for now, the following is for
    # potential future use.
    MUST_BE_PLAINTEXT_ELEMENTS: Set[Tuple[str, str]] = set()

    def __init__(self, sat: SAT) -> None:
        """
        @param sat: The SAT instance.
        """

    @staticmethod
    def pack_stanza(profile: SCEProfile, stanza: domish.Element) -> bytes:
        """Pack a stanza according to Stanza Content Encryption.

        Removes all elements from the stanza except for a few exceptions that explicitly
        need to be transferred in plaintext, e.g. because they contain hints/instructions
        for the server on how to process the stanza. Together with the affix elements as
        requested by the profile, the removed elements are added to an envelope XML
        structure that builds the plaintext to be encrypted by the SCE-enabled encryption
        scheme. Optional affixes are always added to the structure, i.e. they are treated
        by the packing code as if they were required.

        Once built, the envelope structure is serialized to a byte string and returned for
        the encryption scheme to encrypt and add to the stanza.

        @param profile: The SCE profile, i.e. the definition of affixes to include in the
            envelope.
        @param stanza: The stanza to process. Will be modified by the call.
        @return: The serialized envelope structure that builds the plaintext for the
            encryption scheme to process.
        @raise ValueError: if the <to/> or <from/> affixes are requested but the stanza
            doesn't have the "to"/"from" attribute set to extract the value from. Can also
            be raised by custom affixes.

        @warning: It is up to the calling code to add a <store/> message processing hint
            if applicable.
        """

        # Prepare the envelope and content elements
        envelope = domish.Element((NS_SCE, "envelope"))
        content = envelope.addElement((NS_SCE, "content"))

        # Note the serialized byte size of the content element before adding any children
        empty_content_byte_size = len(content.toXml().encode("utf-8"))

        # Move elements that are not explicitly forbidden from being encrypted from the
        # stanza to the content element.
        for child in list(stanza.elements()):
            if (
                child.uri not in XEP_0420.MUST_BE_PLAINTEXT_NAMESPACES
                and (child.uri, child.name) not in XEP_0420.MUST_BE_PLAINTEXT_ELEMENTS
            ):
                # Remove the child from the stanza
                stanza.children.remove(child)

                # A namespace of ``None`` can be used on domish elements to inherit the
                # namespace from the parent. When moving elements from the stanza root to
                # the content element, however, we don't want elements to inherit the
                # namespace of the content element. Thus, check for elements with ``None``
                # for their namespace and set the namespace to jabber:client, which is the
                # namespace of the parent element.
                if child.uri is None:
                    child.uri = C.NS_CLIENT
                    child.defaultUri = C.NS_CLIENT

                # Add the child with corrected namespaces to the content element
                content.addChild(child)

        # Add the affixes requested by the profile
        if profile.rpad_policy is not SCEAffixPolicy.NOT_NEEDED:
            # The specification defines the rpad affix to contain "[...] a randomly
            # generated sequence of random length between 0 and 200 characters." This
            # implementation differs a bit from the specification in that a minimum size
            # other than 0 is chosen depending on the serialized size of the content
            # element. This is to prevent the scenario where the encrypted content is
            # short and the rpad is also randomly chosen to be short, which could allow
            # guessing the content of a short message. To do so, the rpad length is first
            # chosen to pad the content to at least 53 bytes, then afterwards another 0 to
            # 200 bytes are added. Note that single-byte characters are used by this
            # implementation, thus the number of characters equals the number of bytes.
            content_byte_size = len(content.toXml().encode("utf-8"))
            content_byte_size_diff = content_byte_size - empty_content_byte_size
            rpad_length = max(0, 53 - content_byte_size_diff) + secrets.randbelow(201)
            rpad_content = "".join(
                secrets.choice(string.digits + string.ascii_letters + string.punctuation)
                for __
                in range(rpad_length)
            )
            envelope.addElement((NS_SCE, "rpad"), content=rpad_content)

        if profile.time_policy is not SCEAffixPolicy.NOT_NEEDED:
            time_element = envelope.addElement((NS_SCE, "time"))
            time_element["stamp"] = XEP_0082.format_datetime()

        if profile.to_policy is not SCEAffixPolicy.NOT_NEEDED:
            recipient = stanza.getAttribute("to", None)
            if recipient is not None:
                to_element = envelope.addElement((NS_SCE, "to"))
                to_element["jid"] = jid.JID(recipient).userhost()
            elif profile.to_policy is SCEAffixPolicy.REQUIRED:
                raise ValueError(
                    "<to/> affix requested, but stanza doesn't have the 'to' attribute"
                    " set."
                )

        if profile.from_policy is not SCEAffixPolicy.NOT_NEEDED:
            sender = stanza.getAttribute("from", None)
            if sender is not None:
                from_element = envelope.addElement((NS_SCE, "from"))
                from_element["jid"] = jid.JID(sender).userhost()
            elif profile.from_policy is SCEAffixPolicy.REQUIRED:
                raise ValueError(
                    "<from/> affix requested, but stanza doesn't have the 'from'"
                    " attribute set."
                )

        for affix, policy in profile.custom_policies.items():
            if policy is not SCEAffixPolicy.NOT_NEEDED:
                envelope.addChild(affix.create(stanza))

        return envelope.toXml().encode("utf-8")

    @staticmethod
    def unpack_stanza(
        profile: SCEProfile,
        stanza: domish.Element,
        envelope_serialized: bytes
    ) -> SCEAffixValues:
        """Unpack a stanza packed according to Stanza Content Encryption.

        Parses the serialized envelope as XML, verifies included affixes and makes sure
        the requirements of the profile are met, and restores the stanza by moving
        decrypted elements from the envelope back to the stanza top level.

        @param profile: The SCE profile, i.e. the definition of affixes that have to/may
            be included in the envelope.
        @param stanza: The stanza to process. Will be modified by the call.
        @param envelope_serialized: The serialized envelope, i.e. the plaintext produced
            by the decryption scheme utilizing SCE.
        @return: The parsed and processed values of all affixes that were present on the
            envelope, notably including the timestamp.
        @raise exceptions.ParsingError: if the serialized envelope element is malformed.
        @raise ProfileRequirementsNotMet: if one or more affixes required by the profile
            are missing from the envelope.
        @raise AffixVerificationFailed: if an affix included in the envelope fails to
            validate. It doesn't matter whether the affix is required by the profile or
            not, all affixes included in the envelope are validated and cause this
            exception to be raised on failure.

        @warning: It is up to the calling code to verify the timestamp, if returned, since
            the requirements on the timestamp may vary between SCE-enabled protocols.
        """

        try:
            envelope_serialized_string = envelope_serialized.decode("utf-8")
        except UnicodeError as e:
            raise exceptions.ParsingError(
                "Serialized envelope can't bare parsed as utf-8."
            ) from e

        custom_affixes = set(profile.custom_policies.keys())

        # Make sure the envelope adheres to the schema
        parser = etree.XMLParser(schema=etree.XMLSchema(etree.XML(ENVELOPE_SCHEMA.format(
            custom_affix_references="".join(
                f'<xs:element ref="{custom_affix.element_name}" minOccurs="0"/>'
                for custom_affix
                in custom_affixes
            ),
            custom_affix_definitions="".join(
                custom_affix.element_schema
                for custom_affix
                in custom_affixes
            )
        ).encode("utf-8"))))

        try:
            etree.fromstring(envelope_serialized_string, parser)
        except etree.XMLSyntaxError as e:
            raise exceptions.ParsingError(
                "Serialized envelope doesn't pass schema validation."
            ) from e

        # Prepare the envelope and content elements
        envelope = cast(domish.Element, ElementParser()(envelope_serialized_string))
        content = next(envelope.elements(NS_SCE, "content"))

        # Verify the affixes
        rpad_element = cast(
            Optional[domish.Element],
            next(envelope.elements(NS_SCE, "rpad"), None)
        )
        time_element = cast(
            Optional[domish.Element],
            next(envelope.elements(NS_SCE, "time"), None)
        )
        to_element = cast(
            Optional[domish.Element],
            next(envelope.elements(NS_SCE, "to"), None)
        )
        from_element = cast(
            Optional[domish.Element],
            next(envelope.elements(NS_SCE, "from"), None)
        )

        # The rpad doesn't need verification.
        rpad_value = None if rpad_element is None else str(rpad_element)

        # The time affix isn't verified other than that the timestamp is parseable.
        try:
            timestamp_value = None if time_element is None else \
                XEP_0082.parse_datetime(time_element["stamp"])
        except ValueError as e:
            raise AffixVerificationFailed("Malformed time affix.") from e

        # The to affix is verified by comparing the to attribute of the stanza with the
        # JID referenced by the affix. Note that only bare JIDs are compared as per the
        # specification.
        recipient_value: Optional[jid.JID] = None
        if to_element is not None:
            recipient_value = jid.JID(to_element["jid"])

            recipient_actual = stanza.getAttribute("to", None)
            if recipient_actual is None:
                raise AffixVerificationFailed(
                    "'To' affix is included in the envelope, but the stanza is lacking a"
                    " 'to' attribute to compare the value to."
                )

            recipient_actual_bare_jid = jid.JID(recipient_actual).userhost()
            recipient_target_bare_jid = recipient_value.userhost()

            if recipient_actual_bare_jid != recipient_target_bare_jid:
                raise AffixVerificationFailed(
                    f"Mismatch between actual and target recipient bare JIDs:"
                    f" {recipient_actual_bare_jid} vs {recipient_target_bare_jid}."
                )

        # The from affix is verified by comparing the from attribute of the stanza with
        # the JID referenced by the affix. Note that only bare JIDs are compared as per
        # the specification.
        sender_value: Optional[jid.JID] = None
        if from_element is not None:
            sender_value = jid.JID(from_element["jid"])

            sender_actual = stanza.getAttribute("from", None)
            if sender_actual is None:
                raise AffixVerificationFailed(
                    "'From' affix is included in the envelope, but the stanza is lacking"
                    " a 'from' attribute to compare the value to."
                )

            sender_actual_bare_jid = jid.JID(sender_actual).userhost()
            sender_target_bare_jid = sender_value.userhost()

            if sender_actual_bare_jid != sender_target_bare_jid:
                raise AffixVerificationFailed(
                    f"Mismatch between actual and target sender bare JIDs:"
                    f" {sender_actual_bare_jid} vs {sender_target_bare_jid}."
                )

        # Find and verify custom affixes
        custom_values: Dict[SCECustomAffix, domish.Element] = {}
        for affix in custom_affixes:
            element_name = affix.element_name
            element = cast(
                Optional[domish.Element],
                next(envelope.elements(NS_SCE, element_name), None)
            )
            if element is not None:
                affix.verify(stanza, element)
                custom_values[affix] = element

        # Check whether all affixes required by the profile are present
        rpad_missing = \
            profile.rpad_policy is SCEAffixPolicy.REQUIRED and rpad_element is None
        time_missing = \
            profile.time_policy is SCEAffixPolicy.REQUIRED and time_element is None
        to_missing = \
            profile.to_policy is SCEAffixPolicy.REQUIRED and to_element is None
        from_missing = \
            profile.from_policy is SCEAffixPolicy.REQUIRED and from_element is None
        custom_missing = any(
            affix not in custom_values
            for affix, policy
            in profile.custom_policies.items()
            if policy is SCEAffixPolicy.REQUIRED
        )

        if rpad_missing or time_missing or to_missing or from_missing or custom_missing:
            custom_missing_string = ""
            for custom_affix in custom_affixes:
                value = "present" if custom_affix in custom_values else "missing"
                custom_missing_string += f", [custom]{custom_affix.element_name}={value}"

            raise ProfileRequirementsNotMet(
                f"SCE envelope is missing affixes required by the profile {profile}."
                f" Affix presence:"
                f" rpad={'missing' if rpad_missing else 'present'}"
                f", time={'missing' if time_missing else 'present'}"
                f", to={'missing' if to_missing else 'present'}"
                f", from={'missing' if from_missing else 'present'}"
                + custom_missing_string
            )

        # Move elements that are not explicitly forbidden from being encrypted from the
        # content element to the stanza.
        for child in list(content.elements()):
            if (
                child.uri in XEP_0420.MUST_BE_PLAINTEXT_NAMESPACES
                or (child.uri, child.name) in XEP_0420.MUST_BE_PLAINTEXT_ELEMENTS
            ):
                log.warning(
                    f"An element that MUST be transferred in plaintext was found in an"
                    f" SCE envelope: {child.toXml()}"
                )
            else:
                # Remove the child from the content element
                content.children.remove(child)

                # Add the child to the stanza
                stanza.addChild(child)

        return SCEAffixValues(
            rpad_value,
            timestamp_value,
            recipient_value,
            sender_value,
            custom_values
        )