Mercurial > libervia-backend
diff sat/plugins/plugin_xep_0420.py @ 3877:00212260f659
plugin XEP-0420: Implementation of Stanza Content Encryption:
Includes implementation of XEP-0082 (XMPP date and time profiles) and tests for both new plugins.
Everything is type checked, linted, format checked and unit tested.
Adds new dependency xmlschema.
fix 377
author | Syndace <me@syndace.dev> |
---|---|
date | Tue, 23 Aug 2022 12:04:11 +0200 |
parents | |
children | 8289ac1b34f4 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_xep_0420.py Tue Aug 23 12:04:11 2022 +0200 @@ -0,0 +1,582 @@ +#!/usr/bin/env python3 + +# Libervia plugin for Stanza Content Encryption +# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +# Type-check with `mypy --strict --disable-error-code no-untyped-call` +# Lint with `pylint` + +from abc import ABC, abstractmethod +from datetime import datetime +import enum +import secrets +import string +from typing import Dict, Iterator, List, NamedTuple, Optional, Set, Tuple, Union, cast + +from lxml import etree + +from sat.core.constants import Const as C +from sat.core.i18n import D_ +from sat.core.log import Logger, getLogger +from sat.core.sat_main import SAT +from sat.tools.xml_tools import ElementParser +from sat.plugins.plugin_xep_0033 import NS_ADDRESS +from sat.plugins.plugin_xep_0082 import XEP_0082 +from sat.plugins.plugin_xep_0334 import NS_HINTS +from sat.plugins.plugin_xep_0359 import NS_SID +from sat.plugins.plugin_xep_0380 import NS_EME +from twisted.words.protocols.jabber import jid +from twisted.words.xish import domish + + +__all__ = [ # pylint: disable=unused-variable + "PLUGIN_INFO", + "NS_SCE", + "XEP_0420", + "ProfileRequirementsNotMet", + "AffixVerificationFailed", + "SCECustomAffix", + "SCEAffixPolicy", + "SCEProfile", + "SCEAffixValues" +] + + +log = cast(Logger, getLogger(__name__)) + + +PLUGIN_INFO = { + C.PI_NAME: "SCE", + C.PI_IMPORT_NAME: "XEP-0420", + C.PI_TYPE: "SEC", + C.PI_PROTOCOLS: [ "XEP-0420" ], + C.PI_DEPENDENCIES: [ "XEP-0334", "XEP-0082" ], + C.PI_RECOMMENDATIONS: [ "XEP-0045", "XEP-0033", "XEP-0359" ], + C.PI_MAIN: "XEP_0420", + C.PI_HANDLER: "no", + C.PI_DESCRIPTION: D_("Implementation of Stanza Content Encryption"), +} + + +NS_SCE = "urn:xmpp:sce:1" + + +class ProfileRequirementsNotMet(Exception): + """ + Raised by :meth:`XEP_0420.unpack_stanza` in case the requirements formulated by the + profile are not met. + """ + + +class AffixVerificationFailed(Exception): + """ + Raised by :meth:`XEP_0420.unpack_stanza` in case of affix verification failure. + """ + + +class SCECustomAffix(ABC): + """ + Interface for custom affixes of SCE profiles. + """ + + @property + @abstractmethod + def element_name(self) -> str: + """ + @return: The name of the affix's XML element. + """ + + @property + @abstractmethod + def element_schema(self) -> str: + """ + @return: The XML schema definition of the affix element's XML structure, i.e. the + ``<xs:element/>`` schema element. This element will be referenced using + ``<xs:element ref="{element_name}"/>``. + """ + + @abstractmethod + def create(self, stanza: domish.Element) -> domish.Element: + """ + @param stanza: The stanza element which has been processed by + :meth:`XEP_0420.pack_stanza`, i.e. all encryptable children have been removed + and only the root ``<message/>`` or ``<iq/>`` and unencryptable children + remain. Do not modify. + @return: An affix element to include in the envelope. The element must have the + name :attr:`element_name` and must validate using :attr:`element_schema`. + @raise ValueError: if the affix couldn't be built. + """ + + @abstractmethod + def verify(self, stanza: domish.Element, element: domish.Element) -> None: + """ + @param stanza: The stanza element before being processed by + :meth:`XEP_0420.unpack_stanza`, i.e. all encryptable children have been + removed and only the root ``<message/>`` or ``<iq/>`` and unencryptable + children remain. Do not modify. + @param element: The affix element to verify. + @raise AffixVerificationFailed: on verification failure. + """ + + +@enum.unique +class SCEAffixPolicy(enum.Enum): + """ + Policy for the presence of an affix in an SCE envelope. + """ + + REQUIRED: str = "REQUIRED" + OPTIONAL: str = "OPTIONAL" + NOT_NEEDED: str = "NOT_NEEDED" + + +class SCEProfile(NamedTuple): + # pylint: disable=invalid-name + """ + An SCE profile, i.e. the definition which affixes are required, optional or not needed + at all by an SCE-enabled encryption protocol. + """ + + rpad_policy: SCEAffixPolicy + time_policy: SCEAffixPolicy + to_policy: SCEAffixPolicy + from_policy: SCEAffixPolicy + custom_policies: Dict[SCECustomAffix, SCEAffixPolicy] + + +class SCEAffixValues(NamedTuple): + # pylint: disable=invalid-name + """ + Structure returned by :meth:`XEP_0420.unpack_stanza` with the parsed/processes values + of all affixes included in the envelope. For custom affixes, the whole affix element + is returned. + """ + + rpad: Optional[str] + timestamp: Optional[datetime] + recipient: Optional[jid.JID] + sender: Optional[jid.JID] + custom: Dict[SCECustomAffix, domish.Element] + + +ENVELOPE_SCHEMA = """<?xml version="1.0" encoding="utf8"?> +<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" + targetNamespace="urn:xmpp:sce:1" + xmlns="urn:xmpp:sce:1"> + + <xs:element name="envelope"> + <xs:complexType> + <xs:all> + <xs:element ref="content"/> + <xs:element ref="rpad" minOccurs="0"/> + <xs:element ref="time" minOccurs="0"/> + <xs:element ref="to" minOccurs="0"/> + <xs:element ref="from" minOccurs="0"/> + {custom_affix_references} + </xs:all> + </xs:complexType> + </xs:element> + + <xs:element name="content"> + <xs:complexType> + <xs:sequence> + <xs:any minOccurs="0" maxOccurs="unbounded" processContents="skip"/> + </xs:sequence> + </xs:complexType> + </xs:element> + + <xs:element name="rpad" type="xs:string"/> + + <xs:element name="time"> + <xs:complexType> + <xs:attribute name="stamp" type="xs:dateTime"/> + </xs:complexType> + </xs:element> + + <xs:element name="to"> + <xs:complexType> + <xs:attribute name="jid" type="xs:string"/> + </xs:complexType> + </xs:element> + + <xs:element name="from"> + <xs:complexType> + <xs:attribute name="jid" type="xs:string"/> + </xs:complexType> + </xs:element> + + {custom_affix_definitions} +</xs:schema> +""" + + +class XEP_0420: # pylint: disable=invalid-name + """ + Implementation of XEP-0420: Stanza Content Encryption under namespace + ``urn:xmpp:sce:1``. + + This is a passive plugin, i.e. it doesn't hook into any triggers to process stanzas + actively, but offers API for other plugins to use. + """ + + # Set of namespaces whose elements are never allowed to be transferred in an encrypted + # envelope. + MUST_BE_PLAINTEXT_NAMESPACES: Set[str] = { + NS_HINTS, + NS_SID, # TODO: Not sure whether this ban applies to both stanza-id and origin-id + NS_ADDRESS, + # Not part of the specification (yet), but just doesn't make sense in an encrypted + # envelope: + NS_EME + } + + # Set of (namespace, element name) tuples that define elements which are never allowed + # to be transferred in an encrypted envelope. If all elements under a certain + # namespace are forbidden, the namespace can be added to + # :attr:`MUST_BE_PLAINTEXT_NAMESPACES` instead. + # Note: only full namespaces are forbidden by the spec for now, the following is for + # potential future use. + MUST_BE_PLAINTEXT_ELEMENTS: Set[Tuple[str, str]] = set() + + def __init__(self, sat: SAT) -> None: + """ + @param sat: The SAT instance. + """ + + @staticmethod + def pack_stanza(profile: SCEProfile, stanza: domish.Element) -> bytes: + """Pack a stanza according to Stanza Content Encryption. + + Removes all elements from the stanza except for a few exceptions that explicitly + need to be transferred in plaintext, e.g. because they contain hints/instructions + for the server on how to process the stanza. Together with the affix elements as + requested by the profile, the removed elements are added to an envelope XML + structure that builds the plaintext to be encrypted by the SCE-enabled encryption + scheme. Optional affixes are always added to the structure, i.e. they are treated + by the packing code as if they were required. + + Once built, the envelope structure is serialized to a byte string and returned for + the encryption scheme to encrypt and add to the stanza. + + @param profile: The SCE profile, i.e. the definition of affixes to include in the + envelope. + @param stanza: The stanza to process. Will be modified by the call. + @return: The serialized envelope structure that builds the plaintext for the + encryption scheme to process. + @raise ValueError: if the <to/> or <from/> affixes are requested but the stanza + doesn't have the "to"/"from" attribute set to extract the value from. Can also + be raised by custom affixes. + + @warning: It is up to the calling code to add a <store/> message processing hint + if applicable. + """ + + # Prepare the envelope and content elements + envelope = domish.Element((NS_SCE, "envelope")) + content = envelope.addElement((NS_SCE, "content")) + + # Note the serialized byte size of the content element before adding any children + empty_content_byte_size = len(content.toXml().encode("utf-8")) + + # Just for type safety + stanza_children = cast(List[Union[domish.Element, str]], stanza.children) + content_children = cast(List[Union[domish.Element, str]], content.children) + + # Move elements that are not explicitly forbidden from being encrypted from the + # stanza to the content element. + for child in list(cast(Iterator[domish.Element], stanza.elements())): + if ( + child.uri not in XEP_0420.MUST_BE_PLAINTEXT_NAMESPACES + and (child.uri, child.name) not in XEP_0420.MUST_BE_PLAINTEXT_ELEMENTS + ): + # Remove the child from the stanza + stanza_children.remove(child) + + # A namespace of ``None`` can be used on domish elements to inherit the + # namespace from the parent. When moving elements from the stanza root to + # the content element, however, we don't want elements to inherit the + # namespace of the content element. Thus, check for elements with ``None`` + # for their namespace and set the namespace to jabber:client, which is the + # namespace of the parent element. + if child.uri is None: + child.uri = C.NS_CLIENT + child.defaultUri = C.NS_CLIENT + + # Add the child with corrected namespaces to the content element + content_children.append(child) + + # Add the affixes requested by the profile + if profile.rpad_policy is not SCEAffixPolicy.NOT_NEEDED: + # The specification defines the rpad affix to contain "[...] a randomly + # generated sequence of random length between 0 and 200 characters." This + # implementation differs a bit from the specification in that a minimum size + # other than 0 is chosen depending on the serialized size of the content + # element. This is to prevent the scenario where the encrypted content is + # short and the rpad is also randomly chosen to be short, which could allow + # guessing the content of a short message. To do so, the rpad length is first + # chosen to pad the content to at least 53 bytes, then afterwards another 0 to + # 200 bytes are added. Note that single-byte characters are used by this + # implementation, thus the number of characters equals the number of bytes. + content_byte_size = len(content.toXml().encode("utf-8")) + content_byte_size_diff = content_byte_size - empty_content_byte_size + rpad_length = max(0, 53 - content_byte_size_diff) + secrets.randbelow(201) + rpad_content = "".join( + secrets.choice(string.digits + string.ascii_letters + string.punctuation) + for __ + in range(rpad_length) + ) + envelope.addElement((NS_SCE, "rpad"), content=rpad_content) + + if profile.time_policy is not SCEAffixPolicy.NOT_NEEDED: + time_element = envelope.addElement((NS_SCE, "time")) + time_element["stamp"] = XEP_0082.format_datetime() + + if profile.to_policy is not SCEAffixPolicy.NOT_NEEDED: + recipient = cast(Optional[str], stanza.getAttribute("to", None)) + if recipient is None: + raise ValueError( + "<to/> affix requested, but stanza doesn't have the 'to' attribute" + " set." + ) + + to_element = envelope.addElement((NS_SCE, "to")) + to_element["jid"] = jid.JID(recipient).userhost() + + if profile.from_policy is not SCEAffixPolicy.NOT_NEEDED: + sender = cast(Optional[str], stanza.getAttribute("from", None)) + if sender is None: + raise ValueError( + "<from/> affix requested, but stanza doesn't have the 'from'" + " attribute set." + ) + + from_element = envelope.addElement((NS_SCE, "from")) + from_element["jid"] = jid.JID(sender).userhost() + + for affix, policy in profile.custom_policies.items(): + if policy is not SCEAffixPolicy.NOT_NEEDED: + envelope.addChild(affix.create(stanza)) + + return cast(str, envelope.toXml()).encode("utf-8") + + @staticmethod + def unpack_stanza( + profile: SCEProfile, + stanza: domish.Element, + envelope_serialized: bytes + ) -> SCEAffixValues: + """Unpack a stanza packed according to Stanza Content Encryption. + + Parses the serialized envelope as XML, verifies included affixes and makes sure + the requirements of the profile are met, and restores the stanza by moving + decrypted elements from the envelope back to the stanza top level. + + @param profile: The SCE profile, i.e. the definition of affixes that have to/may + be included in the envelope. + @param stanza: The stanza to process. Will be modified by the call. + @param envelope_serialized: The serialized envelope, i.e. the plaintext produced + by the decryption scheme utilizing SCE. + @return: The parsed and processed values of all affixes that were present on the + envelope, notably including the timestamp. + @raise ValueError: if the serialized envelope element is malformed. + @raise ProfileRequirementsNotMet: if one or more affixes required by the profile + are missing from the envelope. + @raise AffixVerificationFailed: if an affix included in the envelope fails to + validate. It doesn't matter whether the affix is required by the profile or + not, all affixes included in the envelope are validated and cause this + exception to be raised on failure. + + @warning: It is up to the calling code to verify the timestamp, if returned, since + the requirements on the timestamp may vary between SCE-enabled protocols. + """ + + try: + envelope_serialized_string = envelope_serialized.decode("utf-8") + except UnicodeError as e: + raise ValueError("Serialized envelope can't bare parsed as utf-8.") from e + + custom_affixes = set(profile.custom_policies.keys()) + + # Make sure the envelope adheres to the schema + parser = etree.XMLParser(schema=etree.XMLSchema(etree.XML(ENVELOPE_SCHEMA.format( + custom_affix_references="".join( + f'<xs:element ref="{custom_affix.element_name}" minOccurs="0"/>' + for custom_affix + in custom_affixes + ), + custom_affix_definitions="".join( + custom_affix.element_schema + for custom_affix + in custom_affixes + ) + ).encode("utf-8")))) + + try: + etree.fromstring(envelope_serialized_string, parser) + except etree.XMLSyntaxError as e: + raise ValueError("Serialized envelope doesn't pass schema validation.") from e + + # Prepare the envelope and content elements + envelope = cast(domish.Element, ElementParser()(envelope_serialized_string)) + content = cast(domish.Element, next(envelope.elements(NS_SCE, "content"))) + + # Verify the affixes + rpad_element = cast( + Optional[domish.Element], + next(envelope.elements(NS_SCE, "rpad"), None) + ) + time_element = cast( + Optional[domish.Element], + next(envelope.elements(NS_SCE, "time"), None) + ) + to_element = cast( + Optional[domish.Element], + next(envelope.elements(NS_SCE, "to"), None) + ) + from_element = cast( + Optional[domish.Element], + next(envelope.elements(NS_SCE, "from"), None) + ) + + # The rpad doesn't need verification. + rpad_value = None if rpad_element is None else str(rpad_element) + + # The time affix isn't verified other than that the timestamp is parseable. + try: + timestamp_value = None if time_element is None else \ + XEP_0082.parse_datetime(time_element["stamp"]) + except ValueError as e: + raise AffixVerificationFailed("Malformed time affix") from e + + # The to affix is verified by comparing the to attribute of the stanza with the + # JID referenced by the affix. Note that only bare JIDs are compared as per the + # specification. + recipient_value: Optional[jid.JID] = None + if to_element is not None: + recipient_value = jid.JID(to_element["jid"]) + + recipient_actual = cast(Optional[str], stanza.getAttribute("to", None)) + if recipient_actual is None: + raise AffixVerificationFailed( + "'To' affix is included in the envelope, but the stanza is lacking a" + " 'to' attribute to compare the value to." + ) + + recipient_actual_bare_jid = jid.JID(recipient_actual).userhost() + recipient_target_bare_jid = recipient_value.userhost() + + if recipient_actual_bare_jid != recipient_target_bare_jid: + raise AffixVerificationFailed( + f"Mismatch between actual and target recipient bare JIDs:" + f" {recipient_actual_bare_jid} vs {recipient_target_bare_jid}." + ) + + # The from affix is verified by comparing the from attribute of the stanza with + # the JID referenced by the affix. Note that only bare JIDs are compared as per + # the specification. + sender_value: Optional[jid.JID] = None + if from_element is not None: + sender_value = jid.JID(from_element["jid"]) + + sender_actual = cast(Optional[str], stanza.getAttribute("from", None)) + if sender_actual is None: + raise AffixVerificationFailed( + "'From' affix is included in the envelope, but the stanza is lacking" + " a 'from' attribute to compare the value to." + ) + + sender_actual_bare_jid = jid.JID(sender_actual).userhost() + sender_target_bare_jid = sender_value.userhost() + + if sender_actual_bare_jid != sender_target_bare_jid: + raise AffixVerificationFailed( + f"Mismatch between actual and target sender bare JIDs:" + f" {sender_actual_bare_jid} vs {sender_target_bare_jid}." + ) + + # Find and verify custom affixes + custom_values: Dict[SCECustomAffix, domish.Element] = {} + for affix in custom_affixes: + element_name = affix.element_name + element = cast( + Optional[domish.Element], + next(envelope.elements(NS_SCE, element_name), None) + ) + if element is not None: + affix.verify(stanza, element) + custom_values[affix] = element + + # Check whether all affixes required by the profile are present + rpad_missing = \ + profile.rpad_policy is SCEAffixPolicy.REQUIRED and rpad_element is None + time_missing = \ + profile.time_policy is SCEAffixPolicy.REQUIRED and time_element is None + to_missing = \ + profile.to_policy is SCEAffixPolicy.REQUIRED and to_element is None + from_missing = \ + profile.from_policy is SCEAffixPolicy.REQUIRED and from_element is None + custom_missing = any( + affix not in custom_values + for affix, policy + in profile.custom_policies.items() + if policy is SCEAffixPolicy.REQUIRED + ) + + if rpad_missing or time_missing or to_missing or from_missing or custom_missing: + custom_missing_string = "" + for custom_affix in custom_affixes: + value = "present" if custom_affix in custom_values else "missing" + custom_missing_string += f", [custom]{custom_affix.element_name}={value}" + + raise ProfileRequirementsNotMet( + f"SCE envelope is missing affixes required by the profile {profile}." + f" Affix presence:" + f" rpad={'missing' if rpad_missing else 'present'}" + f", time={'missing' if time_missing else 'present'}" + f", to={'missing' if to_missing else 'present'}" + f", from={'missing' if from_missing else 'present'}" + + custom_missing_string + ) + + # Just for type safety + content_children = cast(List[Union[domish.Element, str]], content.children) + stanza_children = cast(List[Union[domish.Element, str]], stanza.children) + + # Move elements that are not explicitly forbidden from being encrypted from the + # content element to the stanza. + for child in list(cast(Iterator[domish.Element], content.elements())): + if ( + child.uri in XEP_0420.MUST_BE_PLAINTEXT_NAMESPACES + or (child.uri, child.name) in XEP_0420.MUST_BE_PLAINTEXT_ELEMENTS + ): + log.warning( + f"An element that MUST be transferred in plaintext was found in an" + f" SCE envelope: {child.toXml()}" + ) + else: + # Remove the child from the content element + content_children.remove(child) + + # Add the child to the stanza + stanza_children.append(child) + + return SCEAffixValues( + rpad_value, + timestamp_value, + recipient_value, + sender_value, + custom_values + )