Mercurial > libervia-backend
view sat/plugins/plugin_xep_0420.py @ 3981:acc9dfc8ba8d
component AP gateway: parse body immediately on `POST` request:
the body is parsed immediately during a `POST` request: this avoids duplication of code,
and allows to check the body data before continuing (will be used to filter some requests
in a future patch).
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 15 Nov 2022 18:07:34 +0100 |
parents | cecf45416403 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin for Stanza Content Encryption # Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from abc import ABC, abstractmethod from datetime import datetime import enum import secrets import string from typing import Dict, NamedTuple, Optional, Set, Tuple, cast from typing_extensions import Final from lxml import etree from sat.core import exceptions from sat.core.constants import Const as C from sat.core.i18n import D_ from sat.core.log import Logger, getLogger from sat.core.sat_main import SAT from sat.tools.xml_tools import ElementParser from sat.plugins.plugin_xep_0033 import NS_ADDRESS from sat.plugins.plugin_xep_0082 import XEP_0082 from sat.plugins.plugin_xep_0334 import NS_HINTS from sat.plugins.plugin_xep_0359 import NS_SID from sat.plugins.plugin_xep_0380 import NS_EME from twisted.words.protocols.jabber import jid from twisted.words.xish import domish __all__ = [ # pylint: disable=unused-variable "PLUGIN_INFO", "NS_SCE", "XEP_0420", "ProfileRequirementsNotMet", "AffixVerificationFailed", "SCECustomAffix", "SCEAffixPolicy", "SCEProfile", "SCEAffixValues" ] log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call] PLUGIN_INFO = { C.PI_NAME: "SCE", C.PI_IMPORT_NAME: "XEP-0420", C.PI_TYPE: "SEC", C.PI_PROTOCOLS: [ "XEP-0420" ], C.PI_DEPENDENCIES: [ "XEP-0334", "XEP-0082" ], C.PI_RECOMMENDATIONS: [ "XEP-0045", "XEP-0033", "XEP-0359" ], C.PI_MAIN: "XEP_0420", C.PI_HANDLER: "no", C.PI_DESCRIPTION: D_("Implementation of Stanza Content Encryption"), } NS_SCE: Final = "urn:xmpp:sce:1" class ProfileRequirementsNotMet(Exception): """ Raised by :meth:`XEP_0420.unpack_stanza` in case the requirements formulated by the profile are not met. """ class AffixVerificationFailed(Exception): """ Raised by :meth:`XEP_0420.unpack_stanza` in case of affix verification failure. """ class SCECustomAffix(ABC): """ Interface for custom affixes of SCE profiles. """ @property @abstractmethod def element_name(self) -> str: """ @return: The name of the affix's XML element. """ @property @abstractmethod def element_schema(self) -> str: """ @return: The XML schema definition of the affix element's XML structure, i.e. the ``<xs:element/>`` schema element. This element will be referenced using ``<xs:element ref="{element_name}"/>``. """ @abstractmethod def create(self, stanza: domish.Element) -> domish.Element: """ @param stanza: The stanza element which has been processed by :meth:`XEP_0420.pack_stanza`, i.e. all encryptable children have been removed and only the root ``<message/>`` or ``<iq/>`` and unencryptable children remain. Do not modify. @return: An affix element to include in the envelope. The element must have the name :attr:`element_name` and must validate using :attr:`element_schema`. @raise ValueError: if the affix couldn't be built due to missing information on the stanza. """ @abstractmethod def verify(self, stanza: domish.Element, element: domish.Element) -> None: """ @param stanza: The stanza element before being processed by :meth:`XEP_0420.unpack_stanza`, i.e. all encryptable children have been removed and only the root ``<message/>`` or ``<iq/>`` and unencryptable children remain. Do not modify. @param element: The affix element to verify. @raise AffixVerificationFailed: on verification failure. """ @enum.unique class SCEAffixPolicy(enum.Enum): """ Policy for the presence of an affix in an SCE envelope. """ REQUIRED: str = "REQUIRED" OPTIONAL: str = "OPTIONAL" NOT_NEEDED: str = "NOT_NEEDED" class SCEProfile(NamedTuple): # pylint: disable=invalid-name """ An SCE profile, i.e. the definition which affixes are required, optional or not needed at all by an SCE-enabled encryption protocol. """ rpad_policy: SCEAffixPolicy time_policy: SCEAffixPolicy to_policy: SCEAffixPolicy from_policy: SCEAffixPolicy custom_policies: Dict[SCECustomAffix, SCEAffixPolicy] class SCEAffixValues(NamedTuple): # pylint: disable=invalid-name """ Structure returned by :meth:`XEP_0420.unpack_stanza` with the parsed/processes values of all affixes included in the envelope. For custom affixes, the whole affix element is returned. """ rpad: Optional[str] timestamp: Optional[datetime] recipient: Optional[jid.JID] sender: Optional[jid.JID] custom: Dict[SCECustomAffix, domish.Element] ENVELOPE_SCHEMA = """<?xml version="1.0" encoding="utf8"?> <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" targetNamespace="urn:xmpp:sce:1" xmlns="urn:xmpp:sce:1"> <xs:element name="envelope"> <xs:complexType> <xs:all> <xs:element ref="content"/> <xs:element ref="rpad" minOccurs="0"/> <xs:element ref="time" minOccurs="0"/> <xs:element ref="to" minOccurs="0"/> <xs:element ref="from" minOccurs="0"/> {custom_affix_references} </xs:all> </xs:complexType> </xs:element> <xs:element name="content"> <xs:complexType> <xs:sequence> <xs:any minOccurs="0" maxOccurs="unbounded" processContents="skip"/> </xs:sequence> </xs:complexType> </xs:element> <xs:element name="rpad" type="xs:string"/> <xs:element name="time"> <xs:complexType> <xs:attribute name="stamp" type="xs:dateTime"/> </xs:complexType> </xs:element> <xs:element name="to"> <xs:complexType> <xs:attribute name="jid" type="xs:string"/> </xs:complexType> </xs:element> <xs:element name="from"> <xs:complexType> <xs:attribute name="jid" type="xs:string"/> </xs:complexType> </xs:element> {custom_affix_definitions} </xs:schema> """ class XEP_0420: # pylint: disable=invalid-name """ Implementation of XEP-0420: Stanza Content Encryption under namespace ``urn:xmpp:sce:1``. This is a passive plugin, i.e. it doesn't hook into any triggers to process stanzas actively, but offers API for other plugins to use. """ # Set of namespaces whose elements are never allowed to be transferred in an encrypted # envelope. MUST_BE_PLAINTEXT_NAMESPACES: Set[str] = { NS_HINTS, NS_SID, # TODO: Not sure whether this ban applies to both stanza-id and origin-id NS_ADDRESS, # Not part of the specification (yet), but just doesn't make sense in an encrypted # envelope: NS_EME } # Set of (namespace, element name) tuples that define elements which are never allowed # to be transferred in an encrypted envelope. If all elements under a certain # namespace are forbidden, the namespace can be added to # :attr:`MUST_BE_PLAINTEXT_NAMESPACES` instead. # Note: only full namespaces are forbidden by the spec for now, the following is for # potential future use. MUST_BE_PLAINTEXT_ELEMENTS: Set[Tuple[str, str]] = set() def __init__(self, sat: SAT) -> None: """ @param sat: The SAT instance. """ @staticmethod def pack_stanza(profile: SCEProfile, stanza: domish.Element) -> bytes: """Pack a stanza according to Stanza Content Encryption. Removes all elements from the stanza except for a few exceptions that explicitly need to be transferred in plaintext, e.g. because they contain hints/instructions for the server on how to process the stanza. Together with the affix elements as requested by the profile, the removed elements are added to an envelope XML structure that builds the plaintext to be encrypted by the SCE-enabled encryption scheme. Optional affixes are always added to the structure, i.e. they are treated by the packing code as if they were required. Once built, the envelope structure is serialized to a byte string and returned for the encryption scheme to encrypt and add to the stanza. @param profile: The SCE profile, i.e. the definition of affixes to include in the envelope. @param stanza: The stanza to process. Will be modified by the call. @return: The serialized envelope structure that builds the plaintext for the encryption scheme to process. @raise ValueError: if the <to/> or <from/> affixes are requested but the stanza doesn't have the "to"/"from" attribute set to extract the value from. Can also be raised by custom affixes. @warning: It is up to the calling code to add a <store/> message processing hint if applicable. """ # Prepare the envelope and content elements envelope = domish.Element((NS_SCE, "envelope")) content = envelope.addElement((NS_SCE, "content")) # Note the serialized byte size of the content element before adding any children empty_content_byte_size = len(content.toXml().encode("utf-8")) # Move elements that are not explicitly forbidden from being encrypted from the # stanza to the content element. for child in list(stanza.elements()): if ( child.uri not in XEP_0420.MUST_BE_PLAINTEXT_NAMESPACES and (child.uri, child.name) not in XEP_0420.MUST_BE_PLAINTEXT_ELEMENTS ): # Remove the child from the stanza stanza.children.remove(child) # A namespace of ``None`` can be used on domish elements to inherit the # namespace from the parent. When moving elements from the stanza root to # the content element, however, we don't want elements to inherit the # namespace of the content element. Thus, check for elements with ``None`` # for their namespace and set the namespace to jabber:client, which is the # namespace of the parent element. if child.uri is None: child.uri = C.NS_CLIENT child.defaultUri = C.NS_CLIENT # Add the child with corrected namespaces to the content element content.addChild(child) # Add the affixes requested by the profile if profile.rpad_policy is not SCEAffixPolicy.NOT_NEEDED: # The specification defines the rpad affix to contain "[...] a randomly # generated sequence of random length between 0 and 200 characters." This # implementation differs a bit from the specification in that a minimum size # other than 0 is chosen depending on the serialized size of the content # element. This is to prevent the scenario where the encrypted content is # short and the rpad is also randomly chosen to be short, which could allow # guessing the content of a short message. To do so, the rpad length is first # chosen to pad the content to at least 53 bytes, then afterwards another 0 to # 200 bytes are added. Note that single-byte characters are used by this # implementation, thus the number of characters equals the number of bytes. content_byte_size = len(content.toXml().encode("utf-8")) content_byte_size_diff = content_byte_size - empty_content_byte_size rpad_length = max(0, 53 - content_byte_size_diff) + secrets.randbelow(201) rpad_content = "".join( secrets.choice(string.digits + string.ascii_letters + string.punctuation) for __ in range(rpad_length) ) envelope.addElement((NS_SCE, "rpad"), content=rpad_content) if profile.time_policy is not SCEAffixPolicy.NOT_NEEDED: time_element = envelope.addElement((NS_SCE, "time")) time_element["stamp"] = XEP_0082.format_datetime() if profile.to_policy is not SCEAffixPolicy.NOT_NEEDED: recipient = stanza.getAttribute("to", None) if recipient is not None: to_element = envelope.addElement((NS_SCE, "to")) to_element["jid"] = jid.JID(recipient).userhost() elif profile.to_policy is SCEAffixPolicy.REQUIRED: raise ValueError( "<to/> affix requested, but stanza doesn't have the 'to' attribute" " set." ) if profile.from_policy is not SCEAffixPolicy.NOT_NEEDED: sender = stanza.getAttribute("from", None) if sender is not None: from_element = envelope.addElement((NS_SCE, "from")) from_element["jid"] = jid.JID(sender).userhost() elif profile.from_policy is SCEAffixPolicy.REQUIRED: raise ValueError( "<from/> affix requested, but stanza doesn't have the 'from'" " attribute set." ) for affix, policy in profile.custom_policies.items(): if policy is not SCEAffixPolicy.NOT_NEEDED: envelope.addChild(affix.create(stanza)) return envelope.toXml().encode("utf-8") @staticmethod def unpack_stanza( profile: SCEProfile, stanza: domish.Element, envelope_serialized: bytes ) -> SCEAffixValues: """Unpack a stanza packed according to Stanza Content Encryption. Parses the serialized envelope as XML, verifies included affixes and makes sure the requirements of the profile are met, and restores the stanza by moving decrypted elements from the envelope back to the stanza top level. @param profile: The SCE profile, i.e. the definition of affixes that have to/may be included in the envelope. @param stanza: The stanza to process. Will be modified by the call. @param envelope_serialized: The serialized envelope, i.e. the plaintext produced by the decryption scheme utilizing SCE. @return: The parsed and processed values of all affixes that were present on the envelope, notably including the timestamp. @raise exceptions.ParsingError: if the serialized envelope element is malformed. @raise ProfileRequirementsNotMet: if one or more affixes required by the profile are missing from the envelope. @raise AffixVerificationFailed: if an affix included in the envelope fails to validate. It doesn't matter whether the affix is required by the profile or not, all affixes included in the envelope are validated and cause this exception to be raised on failure. @warning: It is up to the calling code to verify the timestamp, if returned, since the requirements on the timestamp may vary between SCE-enabled protocols. """ try: envelope_serialized_string = envelope_serialized.decode("utf-8") except UnicodeError as e: raise exceptions.ParsingError( "Serialized envelope can't bare parsed as utf-8." ) from e custom_affixes = set(profile.custom_policies.keys()) # Make sure the envelope adheres to the schema parser = etree.XMLParser(schema=etree.XMLSchema(etree.XML(ENVELOPE_SCHEMA.format( custom_affix_references="".join( f'<xs:element ref="{custom_affix.element_name}" minOccurs="0"/>' for custom_affix in custom_affixes ), custom_affix_definitions="".join( custom_affix.element_schema for custom_affix in custom_affixes ) ).encode("utf-8")))) try: etree.fromstring(envelope_serialized_string, parser) except etree.XMLSyntaxError as e: raise exceptions.ParsingError( "Serialized envelope doesn't pass schema validation." ) from e # Prepare the envelope and content elements envelope = cast(domish.Element, ElementParser()(envelope_serialized_string)) content = next(envelope.elements(NS_SCE, "content")) # Verify the affixes rpad_element = cast( Optional[domish.Element], next(envelope.elements(NS_SCE, "rpad"), None) ) time_element = cast( Optional[domish.Element], next(envelope.elements(NS_SCE, "time"), None) ) to_element = cast( Optional[domish.Element], next(envelope.elements(NS_SCE, "to"), None) ) from_element = cast( Optional[domish.Element], next(envelope.elements(NS_SCE, "from"), None) ) # The rpad doesn't need verification. rpad_value = None if rpad_element is None else str(rpad_element) # The time affix isn't verified other than that the timestamp is parseable. try: timestamp_value = None if time_element is None else \ XEP_0082.parse_datetime(time_element["stamp"]) except ValueError as e: raise AffixVerificationFailed("Malformed time affix.") from e # The to affix is verified by comparing the to attribute of the stanza with the # JID referenced by the affix. Note that only bare JIDs are compared as per the # specification. recipient_value: Optional[jid.JID] = None if to_element is not None: recipient_value = jid.JID(to_element["jid"]) recipient_actual = stanza.getAttribute("to", None) if recipient_actual is None: raise AffixVerificationFailed( "'To' affix is included in the envelope, but the stanza is lacking a" " 'to' attribute to compare the value to." ) recipient_actual_bare_jid = jid.JID(recipient_actual).userhost() recipient_target_bare_jid = recipient_value.userhost() if recipient_actual_bare_jid != recipient_target_bare_jid: raise AffixVerificationFailed( f"Mismatch between actual and target recipient bare JIDs:" f" {recipient_actual_bare_jid} vs {recipient_target_bare_jid}." ) # The from affix is verified by comparing the from attribute of the stanza with # the JID referenced by the affix. Note that only bare JIDs are compared as per # the specification. sender_value: Optional[jid.JID] = None if from_element is not None: sender_value = jid.JID(from_element["jid"]) sender_actual = stanza.getAttribute("from", None) if sender_actual is None: raise AffixVerificationFailed( "'From' affix is included in the envelope, but the stanza is lacking" " a 'from' attribute to compare the value to." ) sender_actual_bare_jid = jid.JID(sender_actual).userhost() sender_target_bare_jid = sender_value.userhost() if sender_actual_bare_jid != sender_target_bare_jid: raise AffixVerificationFailed( f"Mismatch between actual and target sender bare JIDs:" f" {sender_actual_bare_jid} vs {sender_target_bare_jid}." ) # Find and verify custom affixes custom_values: Dict[SCECustomAffix, domish.Element] = {} for affix in custom_affixes: element_name = affix.element_name element = cast( Optional[domish.Element], next(envelope.elements(NS_SCE, element_name), None) ) if element is not None: affix.verify(stanza, element) custom_values[affix] = element # Check whether all affixes required by the profile are present rpad_missing = \ profile.rpad_policy is SCEAffixPolicy.REQUIRED and rpad_element is None time_missing = \ profile.time_policy is SCEAffixPolicy.REQUIRED and time_element is None to_missing = \ profile.to_policy is SCEAffixPolicy.REQUIRED and to_element is None from_missing = \ profile.from_policy is SCEAffixPolicy.REQUIRED and from_element is None custom_missing = any( affix not in custom_values for affix, policy in profile.custom_policies.items() if policy is SCEAffixPolicy.REQUIRED ) if rpad_missing or time_missing or to_missing or from_missing or custom_missing: custom_missing_string = "" for custom_affix in custom_affixes: value = "present" if custom_affix in custom_values else "missing" custom_missing_string += f", [custom]{custom_affix.element_name}={value}" raise ProfileRequirementsNotMet( f"SCE envelope is missing affixes required by the profile {profile}." f" Affix presence:" f" rpad={'missing' if rpad_missing else 'present'}" f", time={'missing' if time_missing else 'present'}" f", to={'missing' if to_missing else 'present'}" f", from={'missing' if from_missing else 'present'}" + custom_missing_string ) # Move elements that are not explicitly forbidden from being encrypted from the # content element to the stanza. for child in list(content.elements()): if ( child.uri in XEP_0420.MUST_BE_PLAINTEXT_NAMESPACES or (child.uri, child.name) in XEP_0420.MUST_BE_PLAINTEXT_ELEMENTS ): log.warning( f"An element that MUST be transferred in plaintext was found in an" f" SCE envelope: {child.toXml()}" ) else: # Remove the child from the content element content.children.remove(child) # Add the child to the stanza stanza.addChild(child) return SCEAffixValues( rpad_value, timestamp_value, recipient_value, sender_value, custom_values )