diff sat/plugins/plugin_xep_0420.py @ 3877:00212260f659

plugin XEP-0420: Implementation of Stanza Content Encryption: Includes implementation of XEP-0082 (XMPP date and time profiles) and tests for both new plugins. Everything is type checked, linted, format checked and unit tested. Adds new dependency xmlschema. fix 377
author Syndace <me@syndace.dev>
date Tue, 23 Aug 2022 12:04:11 +0200
children 8289ac1b34f4
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat/plugins/plugin_xep_0420.py	Tue Aug 23 12:04:11 2022 +0200
@@ -0,0 +1,582 @@
+#!/usr/bin/env python3
+# Libervia plugin for Stanza Content Encryption
+# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev)
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU Affero General Public License for more details.
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+# Type-check with `mypy --strict --disable-error-code no-untyped-call`
+# Lint with `pylint`
+from abc import ABC, abstractmethod
+from datetime import datetime
+import enum
+import secrets
+import string
+from typing import Dict, Iterator, List, NamedTuple, Optional, Set, Tuple, Union, cast
+from lxml import etree
+from sat.core.constants import Const as C
+from sat.core.i18n import D_
+from sat.core.log import Logger, getLogger
+from sat.core.sat_main import SAT
+from sat.tools.xml_tools import ElementParser
+from sat.plugins.plugin_xep_0033 import NS_ADDRESS
+from sat.plugins.plugin_xep_0082 import XEP_0082
+from sat.plugins.plugin_xep_0334 import NS_HINTS
+from sat.plugins.plugin_xep_0359 import NS_SID
+from sat.plugins.plugin_xep_0380 import NS_EME
+from twisted.words.protocols.jabber import jid
+from twisted.words.xish import domish
+__all__ = [  # pylint: disable=unused-variable
+    "NS_SCE",
+    "XEP_0420",
+    "ProfileRequirementsNotMet",
+    "AffixVerificationFailed",
+    "SCECustomAffix",
+    "SCEAffixPolicy",
+    "SCEProfile",
+    "SCEAffixValues"
+log = cast(Logger, getLogger(__name__))
+    C.PI_NAME: "SCE",
+    C.PI_IMPORT_NAME: "XEP-0420",
+    C.PI_TYPE: "SEC",
+    C.PI_PROTOCOLS: [ "XEP-0420" ],
+    C.PI_DEPENDENCIES: [ "XEP-0334", "XEP-0082" ],
+    C.PI_RECOMMENDATIONS: [ "XEP-0045", "XEP-0033", "XEP-0359" ],
+    C.PI_MAIN: "XEP_0420",
+    C.PI_HANDLER: "no",
+    C.PI_DESCRIPTION: D_("Implementation of Stanza Content Encryption"),
+NS_SCE = "urn:xmpp:sce:1"
+class ProfileRequirementsNotMet(Exception):
+    """
+    Raised by :meth:`XEP_0420.unpack_stanza` in case the requirements formulated by the
+    profile are not met.
+    """
+class AffixVerificationFailed(Exception):
+    """
+    Raised by :meth:`XEP_0420.unpack_stanza` in case of affix verification failure.
+    """
+class SCECustomAffix(ABC):
+    """
+    Interface for custom affixes of SCE profiles.
+    """
+    @property
+    @abstractmethod
+    def element_name(self) -> str:
+        """
+        @return: The name of the affix's XML element.
+        """
+    @property
+    @abstractmethod
+    def element_schema(self) -> str:
+        """
+        @return: The XML schema definition of the affix element's XML structure, i.e. the
+            ``<xs:element/>`` schema element. This element will be referenced using
+            ``<xs:element ref="{element_name}"/>``.
+        """
+    @abstractmethod
+    def create(self, stanza: domish.Element) -> domish.Element:
+        """
+        @param stanza: The stanza element which has been processed by
+            :meth:`XEP_0420.pack_stanza`, i.e. all encryptable children have been removed
+            and only the root ``<message/>`` or ``<iq/>`` and unencryptable children
+            remain. Do not modify.
+        @return: An affix element to include in the envelope. The element must have the
+            name :attr:`element_name` and must validate using :attr:`element_schema`.
+        @raise ValueError: if the affix couldn't be built.
+        """
+    @abstractmethod
+    def verify(self, stanza: domish.Element, element: domish.Element) -> None:
+        """
+        @param stanza: The stanza element before being processed by
+            :meth:`XEP_0420.unpack_stanza`, i.e. all encryptable children have been
+            removed and only the root ``<message/>`` or ``<iq/>`` and unencryptable
+            children remain. Do not modify.
+        @param element: The affix element to verify.
+        @raise AffixVerificationFailed: on verification failure.
+        """
+class SCEAffixPolicy(enum.Enum):
+    """
+    Policy for the presence of an affix in an SCE envelope.
+    """
+class SCEProfile(NamedTuple):
+    # pylint: disable=invalid-name
+    """
+    An SCE profile, i.e. the definition which affixes are required, optional or not needed
+    at all by an SCE-enabled encryption protocol.
+    """
+    rpad_policy: SCEAffixPolicy
+    time_policy: SCEAffixPolicy
+    to_policy: SCEAffixPolicy
+    from_policy: SCEAffixPolicy
+    custom_policies: Dict[SCECustomAffix, SCEAffixPolicy]
+class SCEAffixValues(NamedTuple):
+    # pylint: disable=invalid-name
+    """
+    Structure returned by :meth:`XEP_0420.unpack_stanza` with the parsed/processes values
+    of all affixes included in the envelope. For custom affixes, the whole affix element
+    is returned.
+    """
+    rpad: Optional[str]
+    timestamp: Optional[datetime]
+    recipient: Optional[jid.JID]
+    sender: Optional[jid.JID]
+    custom: Dict[SCECustomAffix, domish.Element]
+ENVELOPE_SCHEMA = """<?xml version="1.0" encoding="utf8"?>
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
+    targetNamespace="urn:xmpp:sce:1"
+    xmlns="urn:xmpp:sce:1">
+    <xs:element name="envelope">
+        <xs:complexType>
+            <xs:all>
+                <xs:element ref="content"/>
+                <xs:element ref="rpad" minOccurs="0"/>
+                <xs:element ref="time" minOccurs="0"/>
+                <xs:element ref="to" minOccurs="0"/>
+                <xs:element ref="from" minOccurs="0"/>
+                {custom_affix_references}
+            </xs:all>
+        </xs:complexType>
+    </xs:element>
+    <xs:element name="content">
+        <xs:complexType>
+            <xs:sequence>
+                <xs:any minOccurs="0" maxOccurs="unbounded" processContents="skip"/>
+            </xs:sequence>
+        </xs:complexType>
+    </xs:element>
+    <xs:element name="rpad" type="xs:string"/>
+    <xs:element name="time">
+        <xs:complexType>
+            <xs:attribute name="stamp" type="xs:dateTime"/>
+        </xs:complexType>
+    </xs:element>
+    <xs:element name="to">
+        <xs:complexType>
+            <xs:attribute name="jid" type="xs:string"/>
+        </xs:complexType>
+    </xs:element>
+    <xs:element name="from">
+        <xs:complexType>
+            <xs:attribute name="jid" type="xs:string"/>
+        </xs:complexType>
+    </xs:element>
+    {custom_affix_definitions}
+class XEP_0420:  # pylint: disable=invalid-name
+    """
+    Implementation of XEP-0420: Stanza Content Encryption under namespace
+    ``urn:xmpp:sce:1``.
+    This is a passive plugin, i.e. it doesn't hook into any triggers to process stanzas
+    actively, but offers API for other plugins to use.
+    """
+    # Set of namespaces whose elements are never allowed to be transferred in an encrypted
+    # envelope.
+        NS_HINTS,
+        NS_SID,  # TODO: Not sure whether this ban applies to both stanza-id and origin-id
+        NS_ADDRESS,
+        # Not part of the specification (yet), but just doesn't make sense in an encrypted
+        # envelope:
+        NS_EME
+    }
+    # Set of (namespace, element name) tuples that define elements which are never allowed
+    # to be transferred in an encrypted envelope. If all elements under a certain
+    # namespace are forbidden, the namespace can be added to
+    # :attr:`MUST_BE_PLAINTEXT_NAMESPACES` instead.
+    # Note: only full namespaces are forbidden by the spec for now, the following is for
+    # potential future use.
+    MUST_BE_PLAINTEXT_ELEMENTS: Set[Tuple[str, str]] = set()
+    def __init__(self, sat: SAT) -> None:
+        """
+        @param sat: The SAT instance.
+        """
+    @staticmethod
+    def pack_stanza(profile: SCEProfile, stanza: domish.Element) -> bytes:
+        """Pack a stanza according to Stanza Content Encryption.
+        Removes all elements from the stanza except for a few exceptions that explicitly
+        need to be transferred in plaintext, e.g. because they contain hints/instructions
+        for the server on how to process the stanza. Together with the affix elements as
+        requested by the profile, the removed elements are added to an envelope XML
+        structure that builds the plaintext to be encrypted by the SCE-enabled encryption
+        scheme. Optional affixes are always added to the structure, i.e. they are treated
+        by the packing code as if they were required.
+        Once built, the envelope structure is serialized to a byte string and returned for
+        the encryption scheme to encrypt and add to the stanza.
+        @param profile: The SCE profile, i.e. the definition of affixes to include in the
+            envelope.
+        @param stanza: The stanza to process. Will be modified by the call.
+        @return: The serialized envelope structure that builds the plaintext for the
+            encryption scheme to process.
+        @raise ValueError: if the <to/> or <from/> affixes are requested but the stanza
+            doesn't have the "to"/"from" attribute set to extract the value from. Can also
+            be raised by custom affixes.
+        @warning: It is up to the calling code to add a <store/> message processing hint
+            if applicable.
+        """
+        # Prepare the envelope and content elements
+        envelope = domish.Element((NS_SCE, "envelope"))
+        content = envelope.addElement((NS_SCE, "content"))
+        # Note the serialized byte size of the content element before adding any children
+        empty_content_byte_size = len(content.toXml().encode("utf-8"))
+        # Just for type safety
+        stanza_children = cast(List[Union[domish.Element, str]], stanza.children)
+        content_children = cast(List[Union[domish.Element, str]], content.children)
+        # Move elements that are not explicitly forbidden from being encrypted from the
+        # stanza to the content element.
+        for child in list(cast(Iterator[domish.Element], stanza.elements())):
+            if (
+                child.uri not in XEP_0420.MUST_BE_PLAINTEXT_NAMESPACES
+                and (child.uri, child.name) not in XEP_0420.MUST_BE_PLAINTEXT_ELEMENTS
+            ):
+                # Remove the child from the stanza
+                stanza_children.remove(child)
+                # A namespace of ``None`` can be used on domish elements to inherit the
+                # namespace from the parent. When moving elements from the stanza root to
+                # the content element, however, we don't want elements to inherit the
+                # namespace of the content element. Thus, check for elements with ``None``
+                # for their namespace and set the namespace to jabber:client, which is the
+                # namespace of the parent element.
+                if child.uri is None:
+                    child.uri = C.NS_CLIENT
+                    child.defaultUri = C.NS_CLIENT
+                # Add the child with corrected namespaces to the content element
+                content_children.append(child)
+        # Add the affixes requested by the profile
+        if profile.rpad_policy is not SCEAffixPolicy.NOT_NEEDED:
+            # The specification defines the rpad affix to contain "[...] a randomly
+            # generated sequence of random length between 0 and 200 characters." This
+            # implementation differs a bit from the specification in that a minimum size
+            # other than 0 is chosen depending on the serialized size of the content
+            # element. This is to prevent the scenario where the encrypted content is
+            # short and the rpad is also randomly chosen to be short, which could allow
+            # guessing the content of a short message. To do so, the rpad length is first
+            # chosen to pad the content to at least 53 bytes, then afterwards another 0 to
+            # 200 bytes are added. Note that single-byte characters are used by this
+            # implementation, thus the number of characters equals the number of bytes.
+            content_byte_size = len(content.toXml().encode("utf-8"))
+            content_byte_size_diff = content_byte_size - empty_content_byte_size
+            rpad_length = max(0, 53 - content_byte_size_diff) + secrets.randbelow(201)
+            rpad_content = "".join(
+                secrets.choice(string.digits + string.ascii_letters + string.punctuation)
+                for __
+                in range(rpad_length)
+            )
+            envelope.addElement((NS_SCE, "rpad"), content=rpad_content)
+        if profile.time_policy is not SCEAffixPolicy.NOT_NEEDED:
+            time_element = envelope.addElement((NS_SCE, "time"))
+            time_element["stamp"] = XEP_0082.format_datetime()
+        if profile.to_policy is not SCEAffixPolicy.NOT_NEEDED:
+            recipient = cast(Optional[str], stanza.getAttribute("to", None))
+            if recipient is None:
+                raise ValueError(
+                    "<to/> affix requested, but stanza doesn't have the 'to' attribute"
+                    " set."
+                )
+            to_element = envelope.addElement((NS_SCE, "to"))
+            to_element["jid"] = jid.JID(recipient).userhost()
+        if profile.from_policy is not SCEAffixPolicy.NOT_NEEDED:
+            sender = cast(Optional[str], stanza.getAttribute("from", None))
+            if sender is None:
+                raise ValueError(
+                    "<from/> affix requested, but stanza doesn't have the 'from'"
+                    " attribute set."
+                )
+            from_element = envelope.addElement((NS_SCE, "from"))
+            from_element["jid"] = jid.JID(sender).userhost()
+        for affix, policy in profile.custom_policies.items():
+            if policy is not SCEAffixPolicy.NOT_NEEDED:
+                envelope.addChild(affix.create(stanza))
+        return cast(str, envelope.toXml()).encode("utf-8")
+    @staticmethod
+    def unpack_stanza(
+        profile: SCEProfile,
+        stanza: domish.Element,
+        envelope_serialized: bytes
+    ) -> SCEAffixValues:
+        """Unpack a stanza packed according to Stanza Content Encryption.
+        Parses the serialized envelope as XML, verifies included affixes and makes sure
+        the requirements of the profile are met, and restores the stanza by moving
+        decrypted elements from the envelope back to the stanza top level.
+        @param profile: The SCE profile, i.e. the definition of affixes that have to/may
+            be included in the envelope.
+        @param stanza: The stanza to process. Will be modified by the call.
+        @param envelope_serialized: The serialized envelope, i.e. the plaintext produced
+            by the decryption scheme utilizing SCE.
+        @return: The parsed and processed values of all affixes that were present on the
+            envelope, notably including the timestamp.
+        @raise ValueError: if the serialized envelope element is malformed.
+        @raise ProfileRequirementsNotMet: if one or more affixes required by the profile
+            are missing from the envelope.
+        @raise AffixVerificationFailed: if an affix included in the envelope fails to
+            validate. It doesn't matter whether the affix is required by the profile or
+            not, all affixes included in the envelope are validated and cause this
+            exception to be raised on failure.
+        @warning: It is up to the calling code to verify the timestamp, if returned, since
+            the requirements on the timestamp may vary between SCE-enabled protocols.
+        """
+        try:
+            envelope_serialized_string = envelope_serialized.decode("utf-8")
+        except UnicodeError as e:
+            raise ValueError("Serialized envelope can't bare parsed as utf-8.") from e
+        custom_affixes = set(profile.custom_policies.keys())
+        # Make sure the envelope adheres to the schema
+        parser = etree.XMLParser(schema=etree.XMLSchema(etree.XML(ENVELOPE_SCHEMA.format(
+            custom_affix_references="".join(
+                f'<xs:element ref="{custom_affix.element_name}" minOccurs="0"/>'
+                for custom_affix
+                in custom_affixes
+            ),
+            custom_affix_definitions="".join(
+                custom_affix.element_schema
+                for custom_affix
+                in custom_affixes
+            )
+        ).encode("utf-8"))))
+        try:
+            etree.fromstring(envelope_serialized_string, parser)
+        except etree.XMLSyntaxError as e:
+            raise ValueError("Serialized envelope doesn't pass schema validation.") from e
+        # Prepare the envelope and content elements
+        envelope = cast(domish.Element, ElementParser()(envelope_serialized_string))
+        content = cast(domish.Element, next(envelope.elements(NS_SCE, "content")))
+        # Verify the affixes
+        rpad_element = cast(
+            Optional[domish.Element],
+            next(envelope.elements(NS_SCE, "rpad"), None)
+        )
+        time_element = cast(
+            Optional[domish.Element],
+            next(envelope.elements(NS_SCE, "time"), None)
+        )
+        to_element = cast(
+            Optional[domish.Element],
+            next(envelope.elements(NS_SCE, "to"), None)
+        )
+        from_element = cast(
+            Optional[domish.Element],
+            next(envelope.elements(NS_SCE, "from"), None)
+        )
+        # The rpad doesn't need verification.
+        rpad_value = None if rpad_element is None else str(rpad_element)
+        # The time affix isn't verified other than that the timestamp is parseable.
+        try:
+            timestamp_value = None if time_element is None else \
+                XEP_0082.parse_datetime(time_element["stamp"])
+        except ValueError as e:
+            raise AffixVerificationFailed("Malformed time affix") from e
+        # The to affix is verified by comparing the to attribute of the stanza with the
+        # JID referenced by the affix. Note that only bare JIDs are compared as per the
+        # specification.
+        recipient_value: Optional[jid.JID] = None
+        if to_element is not None:
+            recipient_value = jid.JID(to_element["jid"])
+            recipient_actual = cast(Optional[str], stanza.getAttribute("to", None))
+            if recipient_actual is None:
+                raise AffixVerificationFailed(
+                    "'To' affix is included in the envelope, but the stanza is lacking a"
+                    " 'to' attribute to compare the value to."
+                )
+            recipient_actual_bare_jid = jid.JID(recipient_actual).userhost()
+            recipient_target_bare_jid = recipient_value.userhost()
+            if recipient_actual_bare_jid != recipient_target_bare_jid:
+                raise AffixVerificationFailed(
+                    f"Mismatch between actual and target recipient bare JIDs:"
+                    f" {recipient_actual_bare_jid} vs {recipient_target_bare_jid}."
+                )
+        # The from affix is verified by comparing the from attribute of the stanza with
+        # the JID referenced by the affix. Note that only bare JIDs are compared as per
+        # the specification.
+        sender_value: Optional[jid.JID] = None
+        if from_element is not None:
+            sender_value = jid.JID(from_element["jid"])
+            sender_actual = cast(Optional[str], stanza.getAttribute("from", None))
+            if sender_actual is None:
+                raise AffixVerificationFailed(
+                    "'From' affix is included in the envelope, but the stanza is lacking"
+                    " a 'from' attribute to compare the value to."
+                )
+            sender_actual_bare_jid = jid.JID(sender_actual).userhost()
+            sender_target_bare_jid = sender_value.userhost()
+            if sender_actual_bare_jid != sender_target_bare_jid:
+                raise AffixVerificationFailed(
+                    f"Mismatch between actual and target sender bare JIDs:"
+                    f" {sender_actual_bare_jid} vs {sender_target_bare_jid}."
+                )
+        # Find and verify custom affixes
+        custom_values: Dict[SCECustomAffix, domish.Element] = {}
+        for affix in custom_affixes:
+            element_name = affix.element_name
+            element = cast(
+                Optional[domish.Element],
+                next(envelope.elements(NS_SCE, element_name), None)
+            )
+            if element is not None:
+                affix.verify(stanza, element)
+                custom_values[affix] = element
+        # Check whether all affixes required by the profile are present
+        rpad_missing = \
+            profile.rpad_policy is SCEAffixPolicy.REQUIRED and rpad_element is None
+        time_missing = \
+            profile.time_policy is SCEAffixPolicy.REQUIRED and time_element is None
+        to_missing = \
+            profile.to_policy is SCEAffixPolicy.REQUIRED and to_element is None
+        from_missing = \
+            profile.from_policy is SCEAffixPolicy.REQUIRED and from_element is None
+        custom_missing = any(
+            affix not in custom_values
+            for affix, policy
+            in profile.custom_policies.items()
+            if policy is SCEAffixPolicy.REQUIRED
+        )
+        if rpad_missing or time_missing or to_missing or from_missing or custom_missing:
+            custom_missing_string = ""
+            for custom_affix in custom_affixes:
+                value = "present" if custom_affix in custom_values else "missing"
+                custom_missing_string += f", [custom]{custom_affix.element_name}={value}"
+            raise ProfileRequirementsNotMet(
+                f"SCE envelope is missing affixes required by the profile {profile}."
+                f" Affix presence:"
+                f" rpad={'missing' if rpad_missing else 'present'}"
+                f", time={'missing' if time_missing else 'present'}"
+                f", to={'missing' if to_missing else 'present'}"
+                f", from={'missing' if from_missing else 'present'}"
+                + custom_missing_string
+            )
+        # Just for type safety
+        content_children = cast(List[Union[domish.Element, str]], content.children)
+        stanza_children = cast(List[Union[domish.Element, str]], stanza.children)
+        # Move elements that are not explicitly forbidden from being encrypted from the
+        # content element to the stanza.
+        for child in list(cast(Iterator[domish.Element], content.elements())):
+            if (
+                child.uri in XEP_0420.MUST_BE_PLAINTEXT_NAMESPACES
+                or (child.uri, child.name) in XEP_0420.MUST_BE_PLAINTEXT_ELEMENTS
+            ):
+                log.warning(
+                    f"An element that MUST be transferred in plaintext was found in an"
+                    f" SCE envelope: {child.toXml()}"
+                )
+            else:
+                # Remove the child from the content element
+                content_children.remove(child)
+                # Add the child to the stanza
+                stanza_children.append(child)
+        return SCEAffixValues(
+            rpad_value,
+            timestamp_value,
+            recipient_value,
+            sender_value,
+            custom_values
+        )