# HG changeset patch # User Syndace # Date 1661249051 -7200 # Node ID 00212260f6590627ca597de46294d6669eb5c92d # Parent e3c1f4736ab267c8c10ed4e2b14bbba2169a373b plugin XEP-0420: Implementation of Stanza Content Encryption: Includes implementation of XEP-0082 (XMPP date and time profiles) and tests for both new plugins. Everything is type checked, linted, format checked and unit tested. Adds new dependency xmlschema. fix 377 diff -r e3c1f4736ab2 -r 00212260f659 .hgignore --- a/.hgignore Tue Aug 23 23:37:22 2022 +0200 +++ b/.hgignore Tue Aug 23 12:04:11 2022 +0200 @@ -18,3 +18,8 @@ Session.vim .build .pytest_cache +env/ +.eggs/ +libervia_backend.egg-info/ +.mypy_cache/ +twisted/plugins/dropin.cache diff -r e3c1f4736ab2 -r 00212260f659 dev-requirements.txt --- a/dev-requirements.txt Tue Aug 23 23:37:22 2022 +0200 +++ b/dev-requirements.txt Tue Aug 23 12:04:11 2022 +0200 @@ -1,4 +1,8 @@ -r requirements.txt +lxml-stubs + +mypy +pylint pytest pytest_twisted diff -r e3c1f4736ab2 -r 00212260f659 pylintrc --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pylintrc Tue Aug 23 12:04:11 2022 +0200 @@ -0,0 +1,498 @@ +[MASTER] + +# A comma-separated list of package or module names from where C extensions may +# be loaded. Extensions are loading into the active Python interpreter and may +# run arbitrary code. +extension-pkg-allow-list=lxml.etree + +# Add files or directories to the blacklist. They should be base names, not +# paths. +ignore=CVS + +# Add files or directories matching the regex patterns to the blacklist. The +# regex matches against base names, not paths. +ignore-patterns= + +# Python code to execute, usually for sys.path manipulation such as +# pygtk.require(). +#init-hook= + +# Use multiple processes to speed up Pylint. Specifying 0 will auto-detect the +# number of processors available to use. +jobs=1 + +# Control the amount of potential inferred values when inferring a single +# object. This can help the performance when dealing with large functions or +# complex, nested conditions. +limit-inference-results=100 + +# List of plugins (as comma separated values of python module names) to load, +# usually to register additional checkers. +load-plugins= + +# Pickle collected data for later comparisons. +persistent=yes + +# Specify a configuration file. +#rcfile= + +# When enabled, pylint would attempt to guess common misconfiguration and emit +# user-friendly hints instead of false-positive error messages. +suggestion-mode=yes + +# Allow loading of arbitrary C extensions. Extensions are imported into the +# active Python interpreter and may run arbitrary code. +unsafe-load-any-extension=no + + +[MESSAGES CONTROL] + +# Only show warnings with the listed confidence levels. Leave empty to show +# all. Valid levels: HIGH, INFERENCE, INFERENCE_FAILURE, UNDEFINED. +confidence= + +# Disable the message, report, category or checker with the given id(s). You +# can either give multiple identifiers separated by comma (,) or put this +# option multiple times (only on the command line, not in the configuration +# file where it should appear only once). You can also use "--disable=all" to +# disable everything first and then reenable specific checks. For example, if +# you want to run only the similarities checker, you can use "--disable=all +# --enable=similarities". If you want to run only the classes checker, but have +# no Warning level messages displayed, use "--disable=all --enable=classes +# --disable=W". +disable=missing-module-docstring, + duplicate-code, + fixme, + logging-fstring-interpolation + +# Enable the message, report, category or checker with the given id(s). You can +# either give multiple identifier separated by comma (,) or put this option +# multiple time (only on the command line, not in the configuration file where +# it should appear only once). See also the "--disable" option for examples. +enable=useless-suppression + + +[REPORTS] + +# Python expression which should return a score less than or equal to 10. You +# have access to the variables 'error', 'warning', 'refactor', and 'convention' +# which contain the number of messages in each category, as well as 'statement' +# which is the total number of statements analyzed. This score is used by the +# global evaluation report (RP0004). +evaluation=10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10) + +# Template used to display messages. This is a python new-style format string +# used to format the message information. See doc for all details. +#msg-template= + +# Set the output format. Available formats are text, parseable, colorized, json +# and msvs (visual studio). You can also give a reporter class, e.g. +# mypackage.mymodule.MyReporterClass. +output-format=text + +# Tells whether to display a full report or only the messages. +reports=no + +# Activate the evaluation score. +score=yes + + +[REFACTORING] + +# Maximum number of nested blocks for function / method body +max-nested-blocks=5 + +# Complete name of functions that never returns. When checking for +# inconsistent-return-statements if a never returning function is called then +# it will be considered as an explicit return statement and no message will be +# printed. +never-returning-functions=sys.exit + + +[SPELLING] + +# Limits count of emitted suggestions for spelling mistakes. +max-spelling-suggestions=4 + +# Spelling dictionary name. Available dictionaries: none. To make it work, +# install the python-enchant package. +spelling-dict= + +# List of comma separated words that should not be checked. +spelling-ignore-words= + +# A path to a file that contains the private dictionary; one word per line. +spelling-private-dict-file= + +# Tells whether to store unknown words to the private dictionary (see the +# --spelling-private-dict-file option) instead of raising a message. +spelling-store-unknown-words=no + + +[VARIABLES] + +# List of additional names supposed to be defined in builtins. Remember that +# you should avoid defining new builtins when possible. +additional-builtins= + +# Tells whether unused global variables should be treated as a violation. +allow-global-unused-variables=no + +# List of strings which can identify a callback function by name. A callback +# name must start or end with one of those strings. +callbacks=cb_, + _cb + +# A regular expression matching the name of dummy variables (i.e. expected to +# not be used). +dummy-variables-rgx=_+$|(_[a-zA-Z0-9_]*[a-zA-Z0-9]+?$)|dummy|^ignored_|^unused_ + +# Argument names that match this expression will be ignored. Default to name +# with leading underscore. +ignored-argument-names=_.*|^ignored_|^unused_ + +# Tells whether we should check for unused import in __init__ files. +init-import=no + +# List of qualified module names which can have objects that can redefine +# builtins. +redefining-builtins-modules=six.moves,past.builtins,future.builtins,builtins,io + + +[LOGGING] + +# Format style used to check logging format string. `old` means using % +# formatting, `new` is for `{}` formatting,and `fstr` is for f-strings. +logging-format-style=old + +# Logging modules to check that the string format arguments are in logging +# function parameter format. +logging-modules=logging + + +[FORMAT] + +# Expected format of line ending, e.g. empty (any line ending), LF or CRLF. +expected-line-ending-format= + +# Regexp for a line that is allowed to be longer than the limit. +ignore-long-lines=^\s*(# )??$ + +# Number of spaces of indent required inside a hanging or continued line. +indent-after-paren=4 + +# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1 +# tab). +indent-string=' ' + +# Maximum number of characters on a single line. +max-line-length=90 + +# Maximum number of lines in a module. +max-module-lines=10000 + +# Allow the body of a class to be on the same line as the declaration if body +# contains single statement. +single-line-class-stmt=no + +# Allow the body of an if to be on the same line as the test if there is no +# else. +single-line-if-stmt=yes + + +[TYPECHECK] + +# List of decorators that produce context managers, such as +# contextlib.contextmanager. Add to this list to register other decorators that +# produce valid context managers. +contextmanager-decorators=contextlib.contextmanager + +# List of members which are set dynamically and missed by pylint inference +# system, and so shouldn't trigger E1101 when accessed. Python regular +# expressions are accepted. +generated-members= + +# Tells whether missing members accessed in mixin class should be ignored. A +# mixin class is detected if its name ends with "mixin" (case insensitive). +ignore-mixin-members=yes + +# Tells whether to warn about missing members when the owner of the attribute +# is inferred to be None. +ignore-none=no + +# This flag controls whether pylint should warn about no-member and similar +# checks whenever an opaque object is returned when inferring. The inference +# can return multiple potential results while evaluating a Python object, but +# some branches might not be evaluated, which results in partial inference. In +# that case, it might be useful to still emit no-member and other checks for +# the rest of the inferred objects. +ignore-on-opaque-inference=no + +# List of class names for which member attributes should not be checked (useful +# for classes with dynamically set attributes). This supports the use of +# qualified names. +ignored-classes=optparse.Values,thread._local,_thread._local + +# List of module names for which member attributes should not be checked +# (useful for modules/projects where namespaces are manipulated during runtime +# and thus existing member attributes cannot be deduced by static analysis). It +# supports qualified module names, as well as Unix pattern matching. +ignored-modules= + +# Show a hint with possible names when a member name was not found. The aspect +# of finding the hint is based on edit distance. +missing-member-hint=yes + +# The minimum edit distance a name should have in order to be considered a +# similar match for a missing member name. +missing-member-hint-distance=1 + +# The total number of similar names that should be taken in consideration when +# showing a hint for a missing member. +missing-member-max-choices=1 + +# List of decorators that change the signature of a decorated function. +signature-mutators= + + +[STRING] + +# This flag controls whether the implicit-str-concat-in-sequence should +# generate a warning on implicit string concatenation in sequences defined over +# several lines. +check-str-concat-over-line-jumps=no + + +[SIMILARITIES] + +# Ignore comments when computing similarities. +ignore-comments=yes + +# Ignore docstrings when computing similarities. +ignore-docstrings=yes + +# Ignore imports when computing similarities. +ignore-imports=no + +# Minimum lines number of a similarity. +min-similarity-lines=4 + + +[BASIC] + +# Naming style matching correct argument names. +argument-naming-style=snake_case + +# Regular expression matching correct argument names. Overrides argument- +# naming-style. +#argument-rgx= + +# Naming style matching correct attribute names. +attr-naming-style=snake_case + +# Regular expression matching correct attribute names. Overrides attr-naming- +# style. +#attr-rgx= + +# Bad variable names which should always be refused, separated by a comma. +bad-names=foo, + bar, + baz, + toto, + tutu, + tata + +# Naming style matching correct class attribute names. +class-attribute-naming-style=UPPER_CASE + +# Regular expression matching correct class attribute names. Overrides class- +# attribute-naming-style. +#class-attribute-rgx= + +# Naming style matching correct class names. +class-naming-style=PascalCase + +# Regular expression matching correct class names. Overrides class-naming- +# style. +#class-rgx= + +# Naming style matching correct constant names. +const-naming-style=any + +# Regular expression matching correct constant names. Overrides const-naming- +# style. +#const-rgx= + +# Minimum line length for functions/classes that require docstrings, shorter +# ones are exempt. +docstring-min-length=-1 + +# Naming style matching correct function names. +function-naming-style=snake_case + +# Regular expression matching correct function names. Overrides function- +# naming-style. +#function-rgx= + +# Good variable names which should always be accepted, separated by a comma. +good-names=i, + j, + e, # exceptions in except blocks + _ + +# Include a hint for the correct naming format with invalid-name. +include-naming-hint=no + +# Naming style matching correct inline iteration names. +inlinevar-naming-style=any + +# Regular expression matching correct inline iteration names. Overrides +# inlinevar-naming-style. +#inlinevar-rgx= + +# Naming style matching correct method names. +method-naming-style=snake_case + +# Regular expression matching correct method names. Overrides method-naming- +# style. +#method-rgx= + +# Naming style matching correct module names. +module-naming-style=snake_case + +# Regular expression matching correct module names. Overrides module-naming- +# style. +#module-rgx= + +# Colon-delimited sets of names that determine each other's naming style when +# the name regexes allow several styles. +name-group= + +# Regular expression which should only match function or class names that do +# not require a docstring. +no-docstring-rgx=^_ + +# List of decorators that produce properties, such as abc.abstractproperty. Add +# to this list to register other decorators that produce valid properties. +# These decorators are taken in consideration only for invalid-name. +property-classes=abc.abstractproperty + +# Naming style matching correct variable names. +variable-naming-style=snake_case + +# Regular expression matching correct variable names. Overrides variable- +# naming-style. +#variable-rgx= + + +[MISCELLANEOUS] + +# List of note tags to take in consideration, separated by a comma. +notes=FIXME, + XXX, + TODO + + +[IMPORTS] + +# List of modules that can be imported at any level, not just the top level +# one. +allow-any-import-level= + +# Allow wildcard imports from modules that define __all__. +allow-wildcard-with-all=no + +# Analyse import fallback blocks. This can be used to support both Python 2 and +# 3 compatible code, which means that the block might have code that exists +# only in one or another interpreter, leading to false positives when analysed. +analyse-fallback-blocks=yes + +# Deprecated modules which should not be used, separated by a comma. +deprecated-modules=optparse,tkinter.tix + +# Create a graph of external dependencies in the given file (report RP0402 must +# not be disabled). +ext-import-graph= + +# Create a graph of every (i.e. internal and external) dependencies in the +# given file (report RP0402 must not be disabled). +import-graph= + +# Create a graph of internal dependencies in the given file (report RP0402 must +# not be disabled). +int-import-graph= + +# Force import order to recognize a module as part of the standard +# compatibility libraries. +known-standard-library= + +# Force import order to recognize a module as part of a third party library. +known-third-party=enchant + +# Couples of modules and preferred modules, separated by a comma. +preferred-modules= + + +[CLASSES] + +# List of method names used to declare (i.e. assign) instance attributes. +defining-attr-methods=__init__, + __new__, + setUp, + __post_init__, + create + +# List of member names, which should be excluded from the protected access +# warning. +exclude-protected=_asdict, + _fields, + _replace, + _source, + _make + +# List of valid names for the first argument in a class method. +valid-classmethod-first-arg=cls + +# List of valid names for the first argument in a metaclass class method. +valid-metaclass-classmethod-first-arg=cls + + +[DESIGN] + +# Maximum number of arguments for function / method. +max-args=100 + +# Maximum number of attributes for a class (see R0902). +max-attributes=100 + +# Maximum number of boolean expressions in an if statement (see R0916). +max-bool-expr=10 + +# Maximum number of branch for function / method body. +max-branches=100 + +# Maximum number of locals for function / method body. +max-locals=100 + +# Maximum number of parents for a class (see R0901). +max-parents=10 + +# Maximum number of public methods for a class (see R0904). +max-public-methods=100 + +# Maximum number of return / yield for function / method body. +max-returns=100 + +# Maximum number of statements in function / method body. +max-statements=1000 + +# Minimum number of public methods for a class (see R0903). +min-public-methods=0 + + +[EXCEPTIONS] + +# Exceptions that will emit a warning when being caught. Defaults to +# "BaseException, Exception". +overgeneral-exceptions=BaseException, + Exception diff -r e3c1f4736ab2 -r 00212260f659 sat/core/i18n.py --- a/sat/core/i18n.py Tue Aug 23 23:37:22 2022 +0200 +++ b/sat/core/i18n.py Tue Aug 23 12:04:11 2022 +0200 @@ -17,6 +17,7 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +from typing import Callable, cast from sat.core.log import getLogger @@ -40,10 +41,10 @@ except ImportError: log.warning("gettext support disabled") - _ = lambda msg: msg # Libervia doesn't support gettext + _ = cast(Callable[[str], str], lambda msg: msg) # Libervia doesn't support gettext def languageSwitch(lang=None): pass -D_ = lambda msg: msg # used for deferred translations +D_ = cast(Callable[[str], str], lambda msg: msg) # used for deferred translations diff -r e3c1f4736ab2 -r 00212260f659 sat/plugins/plugin_xep_0082.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_xep_0082.py Tue Aug 23 12:04:11 2022 +0200 @@ -0,0 +1,68 @@ +#!/usr/bin/env python3 + +# Libervia plugin for XMPP Date and Time Profile formatting and parsing with Python's +# datetime package +# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +# Type-check with `mypy --strict` +# Lint with `pylint` + +from sat.core.constants import Const as C +from sat.core.i18n import D_ +from sat.core.sat_main import SAT +from sat.tools import datetime + + +__all__ = [ # pylint: disable=unused-variable + "PLUGIN_INFO", + "XEP_0082" +] + + +PLUGIN_INFO = { + C.PI_NAME: "XMPP Date and Time Profiles", + C.PI_IMPORT_NAME: "XEP-0082", + C.PI_TYPE: C.PLUG_TYPE_MISC, + C.PI_PROTOCOLS: [ "XEP-0082" ], + C.PI_DEPENDENCIES: [], + C.PI_RECOMMENDATIONS: [], + C.PI_MAIN: "XEP_0082", + C.PI_HANDLER: "no", + C.PI_DESCRIPTION: D_("Date and Time Profiles for XMPP"), +} + + +class XEP_0082: # pylint: disable=invalid-name + """ + Implementation of the date and time profiles specified in XEP-0082 using Python's + datetime module. The legacy format described in XEP-0082 section "4. Migration" is not + supported. Reexports of the functions in :mod:`sat.tools.datetime`. + + This is a passive plugin, i.e. it doesn't hook into any triggers to process stanzas + actively, but offers API for other plugins to use. + """ + + def __init__(self, sat: SAT) -> None: + """ + @param sat: The SAT instance. + """ + + format_date = staticmethod(datetime.format_date) + parse_date = staticmethod(datetime.parse_date) + format_datetime = staticmethod(datetime.format_datetime) + parse_datetime = staticmethod(datetime.parse_datetime) + format_time = staticmethod(datetime.format_time) + parse_time = staticmethod(datetime.parse_time) diff -r e3c1f4736ab2 -r 00212260f659 sat/plugins/plugin_xep_0420.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_xep_0420.py Tue Aug 23 12:04:11 2022 +0200 @@ -0,0 +1,582 @@ +#!/usr/bin/env python3 + +# Libervia plugin for Stanza Content Encryption +# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +# Type-check with `mypy --strict --disable-error-code no-untyped-call` +# Lint with `pylint` + +from abc import ABC, abstractmethod +from datetime import datetime +import enum +import secrets +import string +from typing import Dict, Iterator, List, NamedTuple, Optional, Set, Tuple, Union, cast + +from lxml import etree + +from sat.core.constants import Const as C +from sat.core.i18n import D_ +from sat.core.log import Logger, getLogger +from sat.core.sat_main import SAT +from sat.tools.xml_tools import ElementParser +from sat.plugins.plugin_xep_0033 import NS_ADDRESS +from sat.plugins.plugin_xep_0082 import XEP_0082 +from sat.plugins.plugin_xep_0334 import NS_HINTS +from sat.plugins.plugin_xep_0359 import NS_SID +from sat.plugins.plugin_xep_0380 import NS_EME +from twisted.words.protocols.jabber import jid +from twisted.words.xish import domish + + +__all__ = [ # pylint: disable=unused-variable + "PLUGIN_INFO", + "NS_SCE", + "XEP_0420", + "ProfileRequirementsNotMet", + "AffixVerificationFailed", + "SCECustomAffix", + "SCEAffixPolicy", + "SCEProfile", + "SCEAffixValues" +] + + +log = cast(Logger, getLogger(__name__)) + + +PLUGIN_INFO = { + C.PI_NAME: "SCE", + C.PI_IMPORT_NAME: "XEP-0420", + C.PI_TYPE: "SEC", + C.PI_PROTOCOLS: [ "XEP-0420" ], + C.PI_DEPENDENCIES: [ "XEP-0334", "XEP-0082" ], + C.PI_RECOMMENDATIONS: [ "XEP-0045", "XEP-0033", "XEP-0359" ], + C.PI_MAIN: "XEP_0420", + C.PI_HANDLER: "no", + C.PI_DESCRIPTION: D_("Implementation of Stanza Content Encryption"), +} + + +NS_SCE = "urn:xmpp:sce:1" + + +class ProfileRequirementsNotMet(Exception): + """ + Raised by :meth:`XEP_0420.unpack_stanza` in case the requirements formulated by the + profile are not met. + """ + + +class AffixVerificationFailed(Exception): + """ + Raised by :meth:`XEP_0420.unpack_stanza` in case of affix verification failure. + """ + + +class SCECustomAffix(ABC): + """ + Interface for custom affixes of SCE profiles. + """ + + @property + @abstractmethod + def element_name(self) -> str: + """ + @return: The name of the affix's XML element. + """ + + @property + @abstractmethod + def element_schema(self) -> str: + """ + @return: The XML schema definition of the affix element's XML structure, i.e. the + ```` schema element. This element will be referenced using + ````. + """ + + @abstractmethod + def create(self, stanza: domish.Element) -> domish.Element: + """ + @param stanza: The stanza element which has been processed by + :meth:`XEP_0420.pack_stanza`, i.e. all encryptable children have been removed + and only the root ```` or ```` and unencryptable children + remain. Do not modify. + @return: An affix element to include in the envelope. The element must have the + name :attr:`element_name` and must validate using :attr:`element_schema`. + @raise ValueError: if the affix couldn't be built. + """ + + @abstractmethod + def verify(self, stanza: domish.Element, element: domish.Element) -> None: + """ + @param stanza: The stanza element before being processed by + :meth:`XEP_0420.unpack_stanza`, i.e. all encryptable children have been + removed and only the root ```` or ```` and unencryptable + children remain. Do not modify. + @param element: The affix element to verify. + @raise AffixVerificationFailed: on verification failure. + """ + + +@enum.unique +class SCEAffixPolicy(enum.Enum): + """ + Policy for the presence of an affix in an SCE envelope. + """ + + REQUIRED: str = "REQUIRED" + OPTIONAL: str = "OPTIONAL" + NOT_NEEDED: str = "NOT_NEEDED" + + +class SCEProfile(NamedTuple): + # pylint: disable=invalid-name + """ + An SCE profile, i.e. the definition which affixes are required, optional or not needed + at all by an SCE-enabled encryption protocol. + """ + + rpad_policy: SCEAffixPolicy + time_policy: SCEAffixPolicy + to_policy: SCEAffixPolicy + from_policy: SCEAffixPolicy + custom_policies: Dict[SCECustomAffix, SCEAffixPolicy] + + +class SCEAffixValues(NamedTuple): + # pylint: disable=invalid-name + """ + Structure returned by :meth:`XEP_0420.unpack_stanza` with the parsed/processes values + of all affixes included in the envelope. For custom affixes, the whole affix element + is returned. + """ + + rpad: Optional[str] + timestamp: Optional[datetime] + recipient: Optional[jid.JID] + sender: Optional[jid.JID] + custom: Dict[SCECustomAffix, domish.Element] + + +ENVELOPE_SCHEMA = """ + + + + + + + + + + + {custom_affix_references} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + {custom_affix_definitions} + +""" + + +class XEP_0420: # pylint: disable=invalid-name + """ + Implementation of XEP-0420: Stanza Content Encryption under namespace + ``urn:xmpp:sce:1``. + + This is a passive plugin, i.e. it doesn't hook into any triggers to process stanzas + actively, but offers API for other plugins to use. + """ + + # Set of namespaces whose elements are never allowed to be transferred in an encrypted + # envelope. + MUST_BE_PLAINTEXT_NAMESPACES: Set[str] = { + NS_HINTS, + NS_SID, # TODO: Not sure whether this ban applies to both stanza-id and origin-id + NS_ADDRESS, + # Not part of the specification (yet), but just doesn't make sense in an encrypted + # envelope: + NS_EME + } + + # Set of (namespace, element name) tuples that define elements which are never allowed + # to be transferred in an encrypted envelope. If all elements under a certain + # namespace are forbidden, the namespace can be added to + # :attr:`MUST_BE_PLAINTEXT_NAMESPACES` instead. + # Note: only full namespaces are forbidden by the spec for now, the following is for + # potential future use. + MUST_BE_PLAINTEXT_ELEMENTS: Set[Tuple[str, str]] = set() + + def __init__(self, sat: SAT) -> None: + """ + @param sat: The SAT instance. + """ + + @staticmethod + def pack_stanza(profile: SCEProfile, stanza: domish.Element) -> bytes: + """Pack a stanza according to Stanza Content Encryption. + + Removes all elements from the stanza except for a few exceptions that explicitly + need to be transferred in plaintext, e.g. because they contain hints/instructions + for the server on how to process the stanza. Together with the affix elements as + requested by the profile, the removed elements are added to an envelope XML + structure that builds the plaintext to be encrypted by the SCE-enabled encryption + scheme. Optional affixes are always added to the structure, i.e. they are treated + by the packing code as if they were required. + + Once built, the envelope structure is serialized to a byte string and returned for + the encryption scheme to encrypt and add to the stanza. + + @param profile: The SCE profile, i.e. the definition of affixes to include in the + envelope. + @param stanza: The stanza to process. Will be modified by the call. + @return: The serialized envelope structure that builds the plaintext for the + encryption scheme to process. + @raise ValueError: if the or affixes are requested but the stanza + doesn't have the "to"/"from" attribute set to extract the value from. Can also + be raised by custom affixes. + + @warning: It is up to the calling code to add a message processing hint + if applicable. + """ + + # Prepare the envelope and content elements + envelope = domish.Element((NS_SCE, "envelope")) + content = envelope.addElement((NS_SCE, "content")) + + # Note the serialized byte size of the content element before adding any children + empty_content_byte_size = len(content.toXml().encode("utf-8")) + + # Just for type safety + stanza_children = cast(List[Union[domish.Element, str]], stanza.children) + content_children = cast(List[Union[domish.Element, str]], content.children) + + # Move elements that are not explicitly forbidden from being encrypted from the + # stanza to the content element. + for child in list(cast(Iterator[domish.Element], stanza.elements())): + if ( + child.uri not in XEP_0420.MUST_BE_PLAINTEXT_NAMESPACES + and (child.uri, child.name) not in XEP_0420.MUST_BE_PLAINTEXT_ELEMENTS + ): + # Remove the child from the stanza + stanza_children.remove(child) + + # A namespace of ``None`` can be used on domish elements to inherit the + # namespace from the parent. When moving elements from the stanza root to + # the content element, however, we don't want elements to inherit the + # namespace of the content element. Thus, check for elements with ``None`` + # for their namespace and set the namespace to jabber:client, which is the + # namespace of the parent element. + if child.uri is None: + child.uri = C.NS_CLIENT + child.defaultUri = C.NS_CLIENT + + # Add the child with corrected namespaces to the content element + content_children.append(child) + + # Add the affixes requested by the profile + if profile.rpad_policy is not SCEAffixPolicy.NOT_NEEDED: + # The specification defines the rpad affix to contain "[...] a randomly + # generated sequence of random length between 0 and 200 characters." This + # implementation differs a bit from the specification in that a minimum size + # other than 0 is chosen depending on the serialized size of the content + # element. This is to prevent the scenario where the encrypted content is + # short and the rpad is also randomly chosen to be short, which could allow + # guessing the content of a short message. To do so, the rpad length is first + # chosen to pad the content to at least 53 bytes, then afterwards another 0 to + # 200 bytes are added. Note that single-byte characters are used by this + # implementation, thus the number of characters equals the number of bytes. + content_byte_size = len(content.toXml().encode("utf-8")) + content_byte_size_diff = content_byte_size - empty_content_byte_size + rpad_length = max(0, 53 - content_byte_size_diff) + secrets.randbelow(201) + rpad_content = "".join( + secrets.choice(string.digits + string.ascii_letters + string.punctuation) + for __ + in range(rpad_length) + ) + envelope.addElement((NS_SCE, "rpad"), content=rpad_content) + + if profile.time_policy is not SCEAffixPolicy.NOT_NEEDED: + time_element = envelope.addElement((NS_SCE, "time")) + time_element["stamp"] = XEP_0082.format_datetime() + + if profile.to_policy is not SCEAffixPolicy.NOT_NEEDED: + recipient = cast(Optional[str], stanza.getAttribute("to", None)) + if recipient is None: + raise ValueError( + " affix requested, but stanza doesn't have the 'to' attribute" + " set." + ) + + to_element = envelope.addElement((NS_SCE, "to")) + to_element["jid"] = jid.JID(recipient).userhost() + + if profile.from_policy is not SCEAffixPolicy.NOT_NEEDED: + sender = cast(Optional[str], stanza.getAttribute("from", None)) + if sender is None: + raise ValueError( + " affix requested, but stanza doesn't have the 'from'" + " attribute set." + ) + + from_element = envelope.addElement((NS_SCE, "from")) + from_element["jid"] = jid.JID(sender).userhost() + + for affix, policy in profile.custom_policies.items(): + if policy is not SCEAffixPolicy.NOT_NEEDED: + envelope.addChild(affix.create(stanza)) + + return cast(str, envelope.toXml()).encode("utf-8") + + @staticmethod + def unpack_stanza( + profile: SCEProfile, + stanza: domish.Element, + envelope_serialized: bytes + ) -> SCEAffixValues: + """Unpack a stanza packed according to Stanza Content Encryption. + + Parses the serialized envelope as XML, verifies included affixes and makes sure + the requirements of the profile are met, and restores the stanza by moving + decrypted elements from the envelope back to the stanza top level. + + @param profile: The SCE profile, i.e. the definition of affixes that have to/may + be included in the envelope. + @param stanza: The stanza to process. Will be modified by the call. + @param envelope_serialized: The serialized envelope, i.e. the plaintext produced + by the decryption scheme utilizing SCE. + @return: The parsed and processed values of all affixes that were present on the + envelope, notably including the timestamp. + @raise ValueError: if the serialized envelope element is malformed. + @raise ProfileRequirementsNotMet: if one or more affixes required by the profile + are missing from the envelope. + @raise AffixVerificationFailed: if an affix included in the envelope fails to + validate. It doesn't matter whether the affix is required by the profile or + not, all affixes included in the envelope are validated and cause this + exception to be raised on failure. + + @warning: It is up to the calling code to verify the timestamp, if returned, since + the requirements on the timestamp may vary between SCE-enabled protocols. + """ + + try: + envelope_serialized_string = envelope_serialized.decode("utf-8") + except UnicodeError as e: + raise ValueError("Serialized envelope can't bare parsed as utf-8.") from e + + custom_affixes = set(profile.custom_policies.keys()) + + # Make sure the envelope adheres to the schema + parser = etree.XMLParser(schema=etree.XMLSchema(etree.XML(ENVELOPE_SCHEMA.format( + custom_affix_references="".join( + f'' + for custom_affix + in custom_affixes + ), + custom_affix_definitions="".join( + custom_affix.element_schema + for custom_affix + in custom_affixes + ) + ).encode("utf-8")))) + + try: + etree.fromstring(envelope_serialized_string, parser) + except etree.XMLSyntaxError as e: + raise ValueError("Serialized envelope doesn't pass schema validation.") from e + + # Prepare the envelope and content elements + envelope = cast(domish.Element, ElementParser()(envelope_serialized_string)) + content = cast(domish.Element, next(envelope.elements(NS_SCE, "content"))) + + # Verify the affixes + rpad_element = cast( + Optional[domish.Element], + next(envelope.elements(NS_SCE, "rpad"), None) + ) + time_element = cast( + Optional[domish.Element], + next(envelope.elements(NS_SCE, "time"), None) + ) + to_element = cast( + Optional[domish.Element], + next(envelope.elements(NS_SCE, "to"), None) + ) + from_element = cast( + Optional[domish.Element], + next(envelope.elements(NS_SCE, "from"), None) + ) + + # The rpad doesn't need verification. + rpad_value = None if rpad_element is None else str(rpad_element) + + # The time affix isn't verified other than that the timestamp is parseable. + try: + timestamp_value = None if time_element is None else \ + XEP_0082.parse_datetime(time_element["stamp"]) + except ValueError as e: + raise AffixVerificationFailed("Malformed time affix") from e + + # The to affix is verified by comparing the to attribute of the stanza with the + # JID referenced by the affix. Note that only bare JIDs are compared as per the + # specification. + recipient_value: Optional[jid.JID] = None + if to_element is not None: + recipient_value = jid.JID(to_element["jid"]) + + recipient_actual = cast(Optional[str], stanza.getAttribute("to", None)) + if recipient_actual is None: + raise AffixVerificationFailed( + "'To' affix is included in the envelope, but the stanza is lacking a" + " 'to' attribute to compare the value to." + ) + + recipient_actual_bare_jid = jid.JID(recipient_actual).userhost() + recipient_target_bare_jid = recipient_value.userhost() + + if recipient_actual_bare_jid != recipient_target_bare_jid: + raise AffixVerificationFailed( + f"Mismatch between actual and target recipient bare JIDs:" + f" {recipient_actual_bare_jid} vs {recipient_target_bare_jid}." + ) + + # The from affix is verified by comparing the from attribute of the stanza with + # the JID referenced by the affix. Note that only bare JIDs are compared as per + # the specification. + sender_value: Optional[jid.JID] = None + if from_element is not None: + sender_value = jid.JID(from_element["jid"]) + + sender_actual = cast(Optional[str], stanza.getAttribute("from", None)) + if sender_actual is None: + raise AffixVerificationFailed( + "'From' affix is included in the envelope, but the stanza is lacking" + " a 'from' attribute to compare the value to." + ) + + sender_actual_bare_jid = jid.JID(sender_actual).userhost() + sender_target_bare_jid = sender_value.userhost() + + if sender_actual_bare_jid != sender_target_bare_jid: + raise AffixVerificationFailed( + f"Mismatch between actual and target sender bare JIDs:" + f" {sender_actual_bare_jid} vs {sender_target_bare_jid}." + ) + + # Find and verify custom affixes + custom_values: Dict[SCECustomAffix, domish.Element] = {} + for affix in custom_affixes: + element_name = affix.element_name + element = cast( + Optional[domish.Element], + next(envelope.elements(NS_SCE, element_name), None) + ) + if element is not None: + affix.verify(stanza, element) + custom_values[affix] = element + + # Check whether all affixes required by the profile are present + rpad_missing = \ + profile.rpad_policy is SCEAffixPolicy.REQUIRED and rpad_element is None + time_missing = \ + profile.time_policy is SCEAffixPolicy.REQUIRED and time_element is None + to_missing = \ + profile.to_policy is SCEAffixPolicy.REQUIRED and to_element is None + from_missing = \ + profile.from_policy is SCEAffixPolicy.REQUIRED and from_element is None + custom_missing = any( + affix not in custom_values + for affix, policy + in profile.custom_policies.items() + if policy is SCEAffixPolicy.REQUIRED + ) + + if rpad_missing or time_missing or to_missing or from_missing or custom_missing: + custom_missing_string = "" + for custom_affix in custom_affixes: + value = "present" if custom_affix in custom_values else "missing" + custom_missing_string += f", [custom]{custom_affix.element_name}={value}" + + raise ProfileRequirementsNotMet( + f"SCE envelope is missing affixes required by the profile {profile}." + f" Affix presence:" + f" rpad={'missing' if rpad_missing else 'present'}" + f", time={'missing' if time_missing else 'present'}" + f", to={'missing' if to_missing else 'present'}" + f", from={'missing' if from_missing else 'present'}" + + custom_missing_string + ) + + # Just for type safety + content_children = cast(List[Union[domish.Element, str]], content.children) + stanza_children = cast(List[Union[domish.Element, str]], stanza.children) + + # Move elements that are not explicitly forbidden from being encrypted from the + # content element to the stanza. + for child in list(cast(Iterator[domish.Element], content.elements())): + if ( + child.uri in XEP_0420.MUST_BE_PLAINTEXT_NAMESPACES + or (child.uri, child.name) in XEP_0420.MUST_BE_PLAINTEXT_ELEMENTS + ): + log.warning( + f"An element that MUST be transferred in plaintext was found in an" + f" SCE envelope: {child.toXml()}" + ) + else: + # Remove the child from the content element + content_children.remove(child) + + # Add the child to the stanza + stanza_children.append(child) + + return SCEAffixValues( + rpad_value, + timestamp_value, + recipient_value, + sender_value, + custom_values + ) diff -r e3c1f4736ab2 -r 00212260f659 sat/tools/datetime.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/tools/datetime.py Tue Aug 23 12:04:11 2022 +0200 @@ -0,0 +1,182 @@ +#!/usr/bin/env python3 + +# Libervia: XMPP Date and Time profiles as per XEP-0082 +# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +# Type-check with `mypy --strict` +# Lint with `pylint` + +from datetime import date, datetime, time, timezone +import re +from typing import Optional, Tuple + + +__all__ = [ # pylint: disable=unused-variable + "format_date", + "parse_date", + "format_datetime", + "parse_datetime", + "format_time", + "parse_time" +] + + +def __parse_fraction_of_a_second(value: str) -> Tuple[str, Optional[int]]: + """ + datetime's strptime only supports up to six digits of the fraction of a seconds, while + the XEP-0082 specification allows for any number of digits. This function parses and + removes the optional fraction of a second from the input string. + + @param value: The input string, containing a section of the format [.sss]. + @return: The input string with the fraction of a second removed, and the fraction of a + second parsed with microsecond resolution. Returns the unaltered input string and + ``None`` if no fraction of a second was found in the input string. + """ + + # The following regex matches the optional fraction of a seconds for manual + # processing. + match = re.search(r"\.(\d*)", value) + microsecond: Optional[int] = None + if match is not None: + # Remove the fraction of a second from the input string + value = value[:match.start()] + value[match.end():] + + # datetime supports microsecond resolution for the fraction of a second, thus + # limit/pad the parsed fraction of a second to six digits + microsecond = int(match.group(1)[:6].ljust(6, '0')) + + return value, microsecond + + +def format_date(value: Optional[date] = None) -> str: + """ + @param value: The date for format. Defaults to the current date in the UTC timezone. + @return: The date formatted according to the Date profile specified in XEP-0082. + + @warning: Formatting of the current date in the local timezone may leak geographical + information of the sender. Thus, it is advised to only format the current date in + UTC. + """ + # CCYY-MM-DD + + # The Date profile of XEP-0082 is equal to the ISO 8601 format. + return (datetime.now(timezone.utc).date() if value is None else value).isoformat() + + +def parse_date(value: str) -> date: + """ + @param value: A string containing date information formatted according to the Date + profile specified in XEP-0082. + @return: The date parsed from the input string. + @raise ValueError: if the input string is not correctly formatted. + """ + # CCYY-MM-DD + + # The Date profile of XEP-0082 is equal to the ISO 8601 format. + return date.fromisoformat(value) + + +def format_datetime( + value: Optional[datetime] = None, + include_microsecond: bool = False +) -> str: + """ + @param value: The datetime to format. Defaults to the current datetime. + @param include_microsecond: Include the microsecond of the datetime in the output. + @return: The datetime formatted according to the DateTime profile specified in + XEP-0082. The datetime is always converted to UTC before formatting to avoid + leaking geographical information of the sender. + """ + # CCYY-MM-DDThh:mm:ss[.sss]TZD + + # We format the time in UTC, since the %z formatter of strftime doesn't include colons + # to separate hours and minutes which is required by XEP-0082. UTC allows us to put a + # simple letter 'Z' as the time zone definition. + value = ( + datetime.now(timezone.utc) + if value is None + else value.astimezone(timezone.utc) # pylint: disable=no-member + ) + + if include_microsecond: + return value.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + + return value.strftime("%Y-%m-%dT%H:%M:%SZ") + + +def parse_datetime(value: str) -> datetime: + """ + @param value: A string containing datetime information formatted according to the + DateTime profile specified in XEP-0082. + @return: The datetime parsed from the input string. + @raise ValueError: if the input string is not correctly formatted. + """ + # CCYY-MM-DDThh:mm:ss[.sss]TZD + + value, microsecond = __parse_fraction_of_a_second(value) + + result = datetime.strptime(value, "%Y-%m-%dT%H:%M:%S%z") + + if microsecond is not None: + result = result.replace(microsecond=microsecond) + + return result + + +def format_time(value: Optional[time] = None, include_microsecond: bool = False) -> str: + """ + @param value: The time to format. Defaults to the current time in the UTC timezone. + @param include_microsecond: Include the microsecond of the time in the output. + @return: The time formatted according to the Time profile specified in XEP-0082. + + @warning: Since accurate timezone conversion requires the date to be known, this + function cannot convert input times to UTC before formatting. This means that + geographical information of the sender may be leaked if a time in local timezone + is formatted. Thus, when passing a time to format, it is advised to pass the time + in UTC if possible. + """ + # hh:mm:ss[.sss][TZD] + + if value is None: + # There is no time.now() method as one might expect, but the current time can be + # extracted from a datetime object including time zone information. + value = datetime.now(timezone.utc).timetz() + + # The format created by time.isoformat complies with the XEP-0082 Time profile. + return value.isoformat("auto" if include_microsecond else "seconds") + + +def parse_time(value: str) -> time: + """ + @param value: A string containing time information formatted according to the Time + profile specified in XEP-0082. + @return: The time parsed from the input string. + @raise ValueError: if the input string is not correctly formatted. + """ + # hh:mm:ss[.sss][TZD] + + value, microsecond = __parse_fraction_of_a_second(value) + + # The format parsed by time.fromisoformat mostly complies with the XEP-0082 Time + # profile, except that it doesn't handle the letter Z as time zone information for + # UTC. This can be fixed with a simple string replacement of 'Z' with "+00:00", which + # is another way to represent UTC. + result = time.fromisoformat(value.replace('Z', "+00:00")) + + if microsecond is not None: + result = result.replace(microsecond=microsecond) + + return result diff -r e3c1f4736ab2 -r 00212260f659 sat/tools/utils.py --- a/sat/tools/utils.py Tue Aug 23 23:37:22 2022 +0200 +++ b/sat/tools/utils.py Tue Aug 23 12:04:11 2022 +0200 @@ -34,6 +34,7 @@ from twisted.internet import defer from sat.core.constants import Const as C from sat.core.log import getLogger +from sat.tools.datetime import format_date, format_datetime log = getLogger(__name__) @@ -146,18 +147,16 @@ to avoid reveling the timezone, we always return UTC dates the string returned by this method is valid with RFC 3339 + this function redirects to the functions in the :mod:`sat.tools.datetime` module @param timestamp(None, float): posix timestamp. If None current time will be used @param with_time(bool): if True include the time @return(unicode): XEP-0082 formatted date and time """ - template_date = "%Y-%m-%d" - template_time = "%H:%M:%SZ" - template = ( - "{}T{}".format(template_date, template_time) if with_time else template_date + dtime = datetime.datetime.utcfromtimestamp( + time.time() if timestamp is None else timestamp ) - return datetime.datetime.utcfromtimestamp( - time.time() if timestamp is None else timestamp - ).strftime(template) + + return format_datetime(dtime) if with_time else format_date(dtime.date()) def generatePassword(vocabulary=None, size=20): diff -r e3c1f4736ab2 -r 00212260f659 tests/unit/test_plugin_xep_0082.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/unit/test_plugin_xep_0082.py Tue Aug 23 12:04:11 2022 +0200 @@ -0,0 +1,204 @@ +#!/usr/bin/env python3 + +# Tests for Libervia's XMPP Date and Time Profile formatting and parsing plugin +# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +# Type-check with `mypy --strict` +# Lint with `pylint` + +from datetime import date, datetime, time, timezone + +import pytest + +from sat.plugins.plugin_xep_0082 import XEP_0082 + + +__all__ = [ # pylint: disable=unused-variable + "test_date_formatting", + "test_date_parsing", + "test_datetime_formatting", + "test_datetime_parsing", + "test_time_formatting", + "test_time_parsing" +] + + +def test_date_formatting() -> None: # pylint: disable=missing-function-docstring + # The following tests don't check the output of format_date directly; instead, they + # assume that parse_date rejects incorrect formatting. + + # The date version 0.1 of XEP-0082 was published + value = date(2003, 4, 21) + + # Check that the parsed date equals the formatted date + assert XEP_0082.parse_date(XEP_0082.format_date(value)) == value + + # Check that a date instance is returned when format_date is called without an + # explicit input date + assert isinstance(XEP_0082.parse_date(XEP_0082.format_date()), date) + + +def test_date_parsing() -> None: # pylint: disable=missing-function-docstring + # There isn't really a point in testing much more than this + assert XEP_0082.parse_date("2003-04-21") == date(2003, 4, 21) + + +def test_datetime_formatting() -> None: # pylint: disable=missing-function-docstring + # The following tests don't check the output of format_datetime directly; instead, + # they assume that parse_datetime rejects incorrect formatting. + + # A datetime in UTC + value = datetime(1234, 5, 6, 7, 8, 9, 123456, timezone.utc) + + # Check that the parsed datetime equals the formatted datetime except for the + # microseconds + parsed = XEP_0082.parse_datetime(XEP_0082.format_datetime(value)) + assert parsed != value + assert parsed.replace(microsecond=123456) == value + + # Check that the parsed datetime equals the formatted datetime including microseconds + assert XEP_0082.parse_datetime(XEP_0082.format_datetime(value, True)) == value + + # A datetime in some other timezone, from the Python docs of the datetime module + value = datetime.fromisoformat("2011-11-04T00:05:23+04:00") + + # Check that the parsed datetime equals the formatted datetime with or without + # microseconds + assert XEP_0082.parse_datetime(XEP_0082.format_datetime(value)) == value + assert XEP_0082.parse_datetime(XEP_0082.format_datetime(value, True)) == value + + # Check that the datetime was converted to UTC while formatting + assert XEP_0082.parse_datetime(XEP_0082.format_datetime(value)).tzinfo == timezone.utc + + # Check that a datetime instance is returned when format_datetime is called without an + # explicit input + # datetime + assert isinstance(XEP_0082.parse_datetime(XEP_0082.format_datetime()), datetime) + assert isinstance(XEP_0082.parse_datetime(XEP_0082.format_datetime( + include_microsecond=True + )), datetime) + assert XEP_0082.parse_datetime(XEP_0082.format_datetime()).tzinfo == timezone.utc + + +def test_datetime_parsing() -> None: # pylint: disable=missing-function-docstring + # Datetime of the first human steps on the Moon (UTC) + value = datetime(1969, 7, 21, 2, 56, 15, tzinfo=timezone.utc) + + # With timezone 'Z', without a fraction of a second + assert XEP_0082.parse_datetime("1969-07-21T02:56:15Z") == value + + # With timezone '+04:00', without a fraction of a second + assert XEP_0082.parse_datetime("1969-07-21T06:56:15+04:00") == value + + # With timezone '-05:00', without a fraction of a second + assert XEP_0082.parse_datetime("1969-07-20T21:56:15-05:00") == value + + # Without timezone, without a fraction of a second + with pytest.raises(ValueError): + XEP_0082.parse_datetime("1969-07-21T02:56:15") + + # With timezone 'Z', with a fraction of a second consisting of two digits + assert XEP_0082.parse_datetime("1969-07-21T02:56:15.12Z") == \ + value.replace(microsecond=120000) + + # With timezone 'Z', with a fraction of a second consisting of nine digits + assert XEP_0082.parse_datetime("1969-07-21T02:56:15.123456789Z") == \ + value.replace(microsecond=123456) + + # With timezone '+04:00', with a fraction of a second consisting of six digits + assert XEP_0082.parse_datetime("1969-07-21T06:56:15.123456+04:00") == \ + value.replace(microsecond=123456) + + # With timezone '-05:00', with a fraction of a second consisting of zero digits + assert XEP_0082.parse_datetime("1969-07-20T21:56:15.-05:00") == value + + # Without timezone, with a fraction of a second consisting of six digits + with pytest.raises(ValueError): + XEP_0082.parse_datetime("1969-07-21T02:56:15.123456") + + +def test_time_formatting() -> None: # pylint: disable=missing-function-docstring + # The following tests don't check the output of format_time directly; instead, they + # assume that parse_time rejects incorrect formatting. + + # A time in UTC + value = time(12, 34, 56, 789012, timezone.utc) + + # Check that the parsed time equals the formatted time except for the microseconds + parsed = XEP_0082.parse_time(XEP_0082.format_time(value)) + assert parsed != value + assert parsed.replace(microsecond=789012) == value + + # Check that the parsed time equals the formatted time including microseconds + assert XEP_0082.parse_time(XEP_0082.format_time(value, True)) == value + + # A time in some other timezone, from the Python docs of the datetime module + value = time.fromisoformat("04:23:01+04:00") + + # Check that the parsed time equals the formatted time with or without microseconds + assert XEP_0082.parse_time(XEP_0082.format_time(value)) == value + assert XEP_0082.parse_time(XEP_0082.format_time(value, True)) == value + + # Check that the time has retained its timezone + assert XEP_0082.parse_time(XEP_0082.format_time(value)).tzinfo == value.tzinfo + + # A time without timezone information, from the Python docs of the datetime module + value = time.fromisoformat("04:23:01") + + # Check that the parsed time doesn't have timezone information either + assert XEP_0082.parse_time(XEP_0082.format_time(value)).tzinfo is None + + # Check that a time instance is returned when format_time is called without an + # explicit input date + assert isinstance(XEP_0082.parse_time(XEP_0082.format_time()), time) + assert isinstance(XEP_0082.parse_time(XEP_0082.format_time( + include_microsecond=True + )), time) + assert XEP_0082.parse_time(XEP_0082.format_time()).tzinfo == timezone.utc + + +def test_time_parsing() -> None: # pylint: disable=missing-function-docstring + # Time for tea + value = time(16, 0, 0, tzinfo=timezone.utc) + + # With timezone 'Z', without a fraction of a second + assert XEP_0082.parse_time("16:00:00Z") == value + + # With timezone '+04:00', without a fraction of a second + assert XEP_0082.parse_time("20:00:00+04:00") == value + + # With timezone '-05:00', without a fraction of a second + assert XEP_0082.parse_time("11:00:00-05:00") == value + + # Without a timezone, without a fraction of a second + assert XEP_0082.parse_time("16:00:00") == value.replace(tzinfo=None) + + # With timezone 'Z', with a fraction of a second consisting of two digits + assert XEP_0082.parse_time("16:00:00.12Z") == value.replace(microsecond=120000) + + # With timezone 'Z', with a fraction of a second consisting of nine digits + assert XEP_0082.parse_time("16:00:00.123456789Z") == value.replace(microsecond=123456) + + # With timezone '+04:00', with a fraction of a second consisting of six digits + assert XEP_0082.parse_time("20:00:00.123456+04:00") == \ + value.replace(microsecond=123456) + + # With timezone '-05:00', with a fraction of a second consisting of zero digits + assert XEP_0082.parse_time("11:00:00.-05:00") == value + + # Without a timezone, with a fraction of a second consisting of six digits + assert XEP_0082.parse_time("16:00:00.123456") == \ + value.replace(microsecond=123456, tzinfo=None) diff -r e3c1f4736ab2 -r 00212260f659 tests/unit/test_plugin_xep_0420.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/unit/test_plugin_xep_0420.py Tue Aug 23 12:04:11 2022 +0200 @@ -0,0 +1,581 @@ +#!/usr/bin/env python3 + +# Tests for Libervia's Stanza Content Encryption plugin +# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +# Type-check with `mypy --strict --disable-error-code no-untyped-call` +# Lint with `pylint` + +from datetime import datetime, timezone +from typing import Callable, Iterator, Optional, cast + +import pytest + +from sat.plugins.plugin_xep_0334 import NS_HINTS +from sat.plugins.plugin_xep_0420 import ( + NS_SCE, XEP_0420, AffixVerificationFailed, ProfileRequirementsNotMet, SCEAffixPolicy, + SCECustomAffix, SCEProfile +) +from sat.tools.xml_tools import ElementParser +from twisted.words.xish import domish + + +__all__ = [ # pylint: disable=unused-variable + "test_unpack_matches_original", + "test_affixes_included", + "test_all_affixes_verified", + "test_incomplete_affixes", + "test_rpad_affix", + "test_time_affix", + "test_to_affix", + "test_from_affix", + "test_custom_affixes", + "test_namespace_conversion", + "test_non_encryptable_elements", + "test_schema_validation" +] + + +string_to_domish = cast(Callable[[str], domish.Element], ElementParser()) + + +class CustomAffixImpl(SCECustomAffix): + """ + A simple custom affix implementation for testing purposes. Verifies the full JIDs of + both sender and recipient. + + @warning: This is just an example, an affix element like this might not make sense due + to potentially allowed modifications of recipient/sender full JIDs (I don't know + enough about XMPP routing to know whether full JIDs are always left untouched by + the server). + """ + + @property + def element_name(self) -> str: + return "full-jids" + + @property + def element_schema(self) -> str: + return """ + + + + + """ + + def create(self, stanza: domish.Element) -> domish.Element: + recipient = cast(Optional[str], stanza.getAttribute("to", None)) + sender = cast(Optional[str], stanza.getAttribute("from", None)) + + if recipient is None or sender is None: + raise ValueError( + "Stanza doesn't have ``to`` and ``from`` attributes required by the" + " full-jids custom affix." + ) + + element = domish.Element((NS_SCE, "full-jids")) + element["recipient"] = recipient + element["sender"] = sender + return element + + def verify(self, stanza: domish.Element, element: domish.Element) -> None: + recipient_target = element["recipient"] + recipient_actual = stanza.getAttribute("to") + + sender_target = element["sender"] + sender_actual = stanza.getAttribute("from") + + if recipient_actual != recipient_target or sender_actual != sender_target: + raise AffixVerificationFailed( + f"Full JIDs differ. Recipient: actual={recipient_actual} vs." + f" target={recipient_target}; Sender: actual={sender_actual} vs." + f" target={sender_target}" + ) + + +def test_unpack_matches_original() -> None: # pylint: disable=missing-function-docstring + profile = SCEProfile( + SCEAffixPolicy.REQUIRED, + SCEAffixPolicy.OPTIONAL, + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.OPTIONAL, + custom_policies={ CustomAffixImpl(): SCEAffixPolicy.NOT_NEEDED } + ) + + stanza_string = ( + 'Test with both a body' + ' and some other custom element.some more content' + ) + + stanza = string_to_domish(stanza_string) + + envelope_serialized = XEP_0420.pack_stanza(profile, stanza) + + # The stanza should not have child elements any more + assert len(list(stanza.elements())) == 0 + + XEP_0420.unpack_stanza(profile, stanza, envelope_serialized) + + # domish.Element doesn't override __eq__, thus we compare the .toXml() strings here in + # the hope that serialization for an example as small as this is unique enough to be + # compared that way. + assert stanza.toXml() == string_to_domish(stanza_string).toXml() + + +def test_affixes_included() -> None: # pylint: disable=missing-function-docstring + custom_affix = CustomAffixImpl() + + profile = SCEProfile( + SCEAffixPolicy.REQUIRED, + SCEAffixPolicy.OPTIONAL, + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.OPTIONAL, + custom_policies={ custom_affix: SCEAffixPolicy.OPTIONAL } + ) + + stanza = string_to_domish(""" + + Make sure that both the REQUIRED and the OPTIONAL affixes are included. + + """) + + affix_values = XEP_0420.unpack_stanza( + profile, + stanza, + XEP_0420.pack_stanza(profile, stanza) + ) + + assert affix_values.rpad is not None + assert affix_values.timestamp is not None + assert affix_values.recipient is None + assert affix_values.sender is not None + assert custom_affix in affix_values.custom + + +def test_all_affixes_verified() -> None: # pylint: disable=missing-function-docstring + packing_profile = SCEProfile( + SCEAffixPolicy.REQUIRED, + SCEAffixPolicy.REQUIRED, + SCEAffixPolicy.REQUIRED, + SCEAffixPolicy.REQUIRED, + custom_policies={} + ) + + unpacking_profile = SCEProfile( + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + custom_policies={} + ) + + stanza = string_to_domish(""" + + When unpacking, all affixes are loaded, even those marked as NOT_NEEDED. + + """) + + envelope_serialized = XEP_0420.pack_stanza(packing_profile, stanza) + + affix_values = XEP_0420.unpack_stanza(unpacking_profile, stanza, envelope_serialized) + + assert affix_values.rpad is not None + assert affix_values.timestamp is not None + assert affix_values.recipient is not None + assert affix_values.sender is not None + + # When unpacking, all affixes are verified, even if they are NOT_NEEDED by the profile + stanza = string_to_domish( + """""" + ) + + with pytest.raises(AffixVerificationFailed): + XEP_0420.unpack_stanza(unpacking_profile, stanza, envelope_serialized) + + +def test_incomplete_affixes() -> None: # pylint: disable=missing-function-docstring + packing_profile = SCEProfile( + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + custom_policies={} + ) + + unpacking_profile = SCEProfile( + SCEAffixPolicy.REQUIRED, + SCEAffixPolicy.REQUIRED, + SCEAffixPolicy.REQUIRED, + SCEAffixPolicy.REQUIRED, + custom_policies={} + ) + + stanza = string_to_domish(""" + Check that all affixes REQUIRED by the profile are present. + """) + + with pytest.raises(ProfileRequirementsNotMet): + XEP_0420.unpack_stanza( + unpacking_profile, + stanza, + XEP_0420.pack_stanza(packing_profile, stanza) + ) + + # Do the same but with a custom affix missing + custom_affix = CustomAffixImpl() + + packing_profile = SCEProfile( + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + custom_policies={ custom_affix: SCEAffixPolicy.NOT_NEEDED } + ) + + unpacking_profile = SCEProfile( + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + custom_policies={ custom_affix: SCEAffixPolicy.REQUIRED } + ) + + stanza = string_to_domish(""" + + Check that all affixes REQUIRED by the profile are present, including custom + affixes. + + """) + + with pytest.raises(ProfileRequirementsNotMet): + XEP_0420.unpack_stanza( + unpacking_profile, + stanza, + XEP_0420.pack_stanza(packing_profile, stanza) + ) + + +def test_rpad_affix() -> None: # pylint: disable=missing-function-docstring + profile = SCEProfile( + SCEAffixPolicy.REQUIRED, + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + custom_policies={} + ) + + for _ in range(100): + stanza = string_to_domish(""" + OK + """) + + affix_values = XEP_0420.unpack_stanza( + profile, + stanza, + XEP_0420.pack_stanza(profile, stanza) + ) + + # Test that the rpad exists and that the content elements are always padded to at + # least 53 characters + assert affix_values.rpad is not None + assert len(affix_values.rpad) >= 53 - len("OK") + + +def test_time_affix() -> None: # pylint: disable=missing-function-docstring + profile = SCEProfile( + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.REQUIRED, + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + custom_policies={} + ) + + stanza = string_to_domish( + """""" + ) + + envelope_serialized = f""" + + + The time affix is only parsed and not otherwise verified. Not much to test + here. + + + """.encode("utf-8") + + affix_values = XEP_0420.unpack_stanza(profile, stanza, envelope_serialized) + assert affix_values.timestamp == datetime(1969, 7, 21, 2, 56, 15, tzinfo=timezone.utc) + + +def test_to_affix() -> None: # pylint: disable=missing-function-docstring + profile = SCEProfile( + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.REQUIRED, + SCEAffixPolicy.NOT_NEEDED, + custom_policies={} + ) + + stanza = string_to_domish(""" + Check that the ``to`` affix is correctly added. + """) + + envelope_serialized = XEP_0420.pack_stanza(profile, stanza) + affix_values = XEP_0420.unpack_stanza(profile, stanza, envelope_serialized) + assert affix_values.recipient is not None + assert affix_values.recipient.userhost() == "bar@example.com" + + # Check that a mismatch in recipient bare JID causes an exception to be raised + stanza = string_to_domish( + """""" + ) + + with pytest.raises(AffixVerificationFailed): + XEP_0420.unpack_stanza(profile, stanza, envelope_serialized) + + # Check that only the bare JID matters + stanza = string_to_domish( + """""" + ) + + affix_values = XEP_0420.unpack_stanza(profile, stanza, envelope_serialized) + assert affix_values.recipient is not None + assert affix_values.recipient.userhost() == "bar@example.com" + + stanza = string_to_domish(""" + + Check that a missing "to" attribute on the stanza fails stanza packing. + + """) + + with pytest.raises(ValueError): + XEP_0420.pack_stanza(profile, stanza) + + +def test_from_affix() -> None: # pylint: disable=missing-function-docstring + profile = SCEProfile( + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.REQUIRED, + custom_policies={} + ) + + stanza = string_to_domish(""" + Check that the ``from`` affix is correctly added. + """) + + envelope_serialized = XEP_0420.pack_stanza(profile, stanza) + affix_values = XEP_0420.unpack_stanza(profile, stanza, envelope_serialized) + assert affix_values.sender is not None + assert affix_values.sender.userhost() == "foo@example.com" + + # Check that a mismatch in sender bare JID causes an exception to be raised + stanza = string_to_domish( + """""" + ) + + with pytest.raises(AffixVerificationFailed): + XEP_0420.unpack_stanza(profile, stanza, envelope_serialized) + + # Check that only the bare JID matters + stanza = string_to_domish( + """""" + ) + + affix_values = XEP_0420.unpack_stanza(profile, stanza, envelope_serialized) + assert affix_values.sender is not None + assert affix_values.sender.userhost() == "foo@example.com" + + stanza = string_to_domish(""" + + Check that a missing "from" attribute on the stanza fails stanza packing. + + """) + + with pytest.raises(ValueError): + XEP_0420.pack_stanza(profile, stanza) + + +def test_custom_affixes() -> None: # pylint: disable=missing-function-docstring + custom_affix = CustomAffixImpl() + + packing_profile = SCEProfile( + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + custom_policies={ custom_affix: SCEAffixPolicy.REQUIRED } + ) + + unpacking_profile = SCEProfile( + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + custom_policies={} + ) + + stanza = string_to_domish(""" + + If a custom affix is included in the envelope, but not excpected by the + recipient, the schema validation should fail. + + """) + + with pytest.raises(ValueError): + XEP_0420.unpack_stanza( + unpacking_profile, + stanza, + XEP_0420.pack_stanza(packing_profile, stanza) + ) + + profile = packing_profile + + stanza = string_to_domish(""" + The affix element should be returned as part of the affix values. + """) + + affix_values = XEP_0420.unpack_stanza( + profile, + stanza, + XEP_0420.pack_stanza(profile, stanza) + ) + + assert custom_affix in affix_values.custom + assert affix_values.custom[custom_affix].getAttribute("recipient") == \ + "bar@example.com/Libervia.123" + assert affix_values.custom[custom_affix].getAttribute("sender") == \ + "foo@example.com/device0" + + +def test_namespace_conversion() -> None: # pylint: disable=missing-function-docstring + profile = SCEProfile( + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + custom_policies={} + ) + + stanza = domish.Element((None, "message")) + stanza["from"] = "foo@example.com" + stanza["to"] = "bar@example.com" + stanza.addElement( + "body", + content=( + "This body element has namespace ``None``, which has to be replaced with" + " jabber:client." + ) + ) + + envelope_serialized = XEP_0420.pack_stanza(profile, stanza) + envelope = string_to_domish(envelope_serialized.decode("utf-8")) + content = next(cast(Iterator[domish.Element], envelope.elements(NS_SCE, "content"))) + + # The body should have been assigned ``jabber:client`` as its namespace + assert next( + cast(Iterator[domish.Element], content.elements("jabber:client", "body")), + None + ) is not None + + XEP_0420.unpack_stanza(profile, stanza, envelope_serialized) + + # The body should still have ``jabber:client`` after unpacking + assert next( + cast(Iterator[domish.Element], stanza.elements("jabber:client", "body")), + None + ) is not None + + +def test_non_encryptable_elements() -> None: # pylint: disable=missing-function-docstring + profile = SCEProfile( + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + custom_policies={} + ) + + stanza = string_to_domish(""" + This stanza includes a store hint which must not be encrypted. + + """) + + envelope_serialized = XEP_0420.pack_stanza(profile, stanza) + envelope = string_to_domish(envelope_serialized.decode("utf-8")) + content = next(cast(Iterator[domish.Element], envelope.elements(NS_SCE, "content"))) + + # The store hint must not have been moved to the content element + assert next( + cast(Iterator[domish.Element], stanza.elements(NS_HINTS, "store")), + None + ) is not None + + assert next( + cast(Iterator[domish.Element], content.elements(NS_HINTS, "store")), + None + ) is None + + stanza = string_to_domish( + """""" + ) + + envelope_serialized = f""" + + + The store hint must not be moved to the stanza. + + + + """.encode("utf-8") + + XEP_0420.unpack_stanza(profile, stanza, envelope_serialized) + + assert next( + cast(Iterator[domish.Element], stanza.elements(NS_HINTS, "store")), + None + ) is None + + +def test_schema_validation() -> None: # pylint: disable=missing-function-docstring + profile = SCEProfile( + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + SCEAffixPolicy.NOT_NEEDED, + custom_policies={} + ) + + stanza = string_to_domish( + """""" + ) + + envelope_serialized = f""" + + + An unknwon affix should cause a schema validation error. + + + + + """.encode("utf-8") + + with pytest.raises(ValueError): + XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)