Mercurial > libervia-backend
changeset 3933:cecf45416403
plugin XEP-0373 and XEP-0374: Implementation of OX and OXIM:
GPGME is used as the GPG provider.
rel 374
author | Syndace <me@syndace.dev> |
---|---|
date | Tue, 20 Sep 2022 16:22:18 +0200 |
parents | 7af29260ecb8 |
children | e345d93fb6e5 |
files | sat/plugins/plugin_xep_0060.py sat/plugins/plugin_xep_0373.py sat/plugins/plugin_xep_0374.py sat/plugins/plugin_xep_0384.py sat/plugins/plugin_xep_0420.py sat/tools/xmpp_datetime.py setup.py tests/unit/test_plugin_xep_0082.py tests/unit/test_plugin_xep_0373.py tests/unit/test_plugin_xep_0420.py |
diffstat | 10 files changed, 2710 insertions(+), 34 deletions(-) [+] |
line wrap: on
line diff
--- a/sat/plugins/plugin_xep_0060.py Mon Oct 10 15:23:59 2022 +0200 +++ b/sat/plugins/plugin_xep_0060.py Tue Sep 20 16:22:18 2022 +0200 @@ -893,14 +893,20 @@ client, jid.JID(service_s) if service_s else None, nodeIdentifier, options ) - def createNode(self, client, service, nodeIdentifier=None, options=None): + def createNode( + self, + client: SatXMPPClient, + service: jid.JID, + nodeIdentifier: Optional[str] = None, + options: Optional[Dict[str, str]] = None + ) -> str: """Create a new node - @param service(jid.JID): PubSub service, - @param NodeIdentifier(unicode, None): node name - use None to create instant node (identifier will be returned by this method) - @param option(dict[unicode, unicode], None): node configuration options - @return (unicode): identifier of the created node (may be different from requested name) + @param service: PubSub service, + @param NodeIdentifier: node name use None to create instant node (identifier will + be returned by this method) + @param option: node configuration options + @return: identifier of the created node (may be different from requested name) """ # TODO: if pubsub service doesn't hande publish-options, configure it in a second time return client.pubsub_client.createNode(service, nodeIdentifier, options)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_xep_0373.py Tue Sep 20 16:22:18 2022 +0200 @@ -0,0 +1,2085 @@ +#!/usr/bin/env python3 + +# Libervia plugin for OpenPGP for XMPP +# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from abc import ABC, abstractmethod +import base64 +from datetime import datetime, timezone +import enum +import secrets +import string +from typing import Any, Dict, Iterable, List, Literal, Optional, Set, Tuple, cast +from xml.sax.saxutils import quoteattr + +from typing_extensions import Final, NamedTuple, Never, assert_never +from wokkel import muc, pubsub +from wokkel.disco import DiscoFeature, DiscoInfo +import xmlschema + +from sat.core import exceptions +from sat.core.constants import Const as C +from sat.core.core_types import SatXMPPEntity +from sat.core.i18n import _, D_ +from sat.core.log import getLogger, Logger +from sat.core.sat_main import SAT +from sat.core.xmpp import SatXMPPClient +from sat.memory import persistent +from sat.plugins.plugin_xep_0045 import XEP_0045 +from sat.plugins.plugin_xep_0060 import XEP_0060 +from sat.plugins.plugin_xep_0163 import XEP_0163 +from sat.tools.xmpp_datetime import format_datetime, parse_datetime +from sat.tools import xml_tools +from twisted.internet import defer +from twisted.words.protocols.jabber import jid +from twisted.words.xish import domish + +try: + import gpg +except ImportError as import_error: + raise exceptions.MissingModule( + "You are missing the 'gpg' package required by the OX plugin. The recommended" + " installation method is via your operating system's package manager, since the" + " version of the library has to match the version of your GnuPG installation. See" + " https://wiki.python.org/moin/GnuPrivacyGuard#Accessing_GnuPG_via_gpgme" + ) from import_error + + +__all__ = [ # pylint: disable=unused-variable + "PLUGIN_INFO", + "NS_OX", + "XEP_0373", + "VerificationError", + "XMPPInteractionFailed", + "InvalidPacket", + "DecryptionFailed", + "VerificationFailed", + "UnknownKey", + "GPGProviderError", + "GPGPublicKey", + "GPGSecretKey", + "GPGProvider", + "PublicKeyMetadata", + "gpg_provider", + "TrustLevel" +] + + +log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call] + + +PLUGIN_INFO = { + C.PI_NAME: "XEP-0373", + C.PI_IMPORT_NAME: "XEP-0373", + C.PI_TYPE: "SEC", + C.PI_PROTOCOLS: [ "XEP-0373" ], + C.PI_DEPENDENCIES: [ "XEP-0060", "XEP-0163" ], + C.PI_RECOMMENDATIONS: [], + C.PI_MAIN: "XEP_0373", + C.PI_HANDLER: "no", + C.PI_DESCRIPTION: D_("Implementation of OpenPGP for XMPP"), +} + + +NS_OX: Final = "urn:xmpp:openpgp:0" + + +PARAM_CATEGORY = "Security" +PARAM_NAME = "ox_policy" + + +class VerificationError(Exception): + """ + Raised by verifying methods of :class:`XEP_0373` on semantical verification errors. + """ + + +class XMPPInteractionFailed(Exception): + """ + Raised by methods of :class:`XEP_0373` on XMPP interaction failure. The reason this + exception exists is that the exceptions raised by XMPP interactions are not properly + documented for the most part, thus all exceptions are caught and wrapped in instances + of this class. + """ + + +class InvalidPacket(ValueError): + """ + Raised by methods of :class:`GPGProvider` when an invalid packet is encountered. + """ + + +class DecryptionFailed(Exception): + """ + Raised by methods of :class:`GPGProvider` on decryption failures. + """ + + +class VerificationFailed(Exception): + """ + Raised by methods of :class:`GPGProvider` on verification failures. + """ + + +class UnknownKey(ValueError): + """ + Raised by methods of :class:`GPGProvider` when an unknown key is referenced. + """ + + +class GPGProviderError(Exception): + """ + Raised by methods of :class:`GPGProvider` on internal errors. + """ + + +class GPGPublicKey(ABC): + """ + Interface describing a GPG public key. + """ + + @property + @abstractmethod + def fingerprint(self) -> str: + """ + @return: The OpenPGP v4 fingerprint string of this public key. + """ + + +class GPGSecretKey(ABC): + """ + Interface descibing a GPG secret key. + """ + + @property + @abstractmethod + def public_key(self) -> GPGPublicKey: + """ + @return: The public key corresponding to this secret key. + """ + + +class GPGProvider(ABC): + """ + Interface describing a GPG provider, i.e. a library or framework providing GPG + encryption, signing and key management. + + All methods may raise :class:`GPGProviderError` in addition to those exception types + listed explicitly. + + # TODO: Check keys for revoked, disabled and expired everywhere and exclude those (?) + """ + + @abstractmethod + def export_public_key(self, public_key: GPGPublicKey) -> bytes: + """Export a public key in a key material packet according to RFC 4880 §5.5. + + Do not use OpenPGP's ASCII Armor. + + @param public_key: The public key to export. + @return: The packet containing the exported public key. + @raise UnknownKey: if the public key is not available. + """ + + @abstractmethod + def import_public_key(self, packet: bytes) -> GPGPublicKey: + """Import a public key from a key material packet according to RFC 4880 §5.5. + + OpenPGP's ASCII Armor is not used. + + @param packet: A packet containing an exported public key. + @return: The public key imported from the packet. + @raise InvalidPacket: if the packet is either syntactically or semantically deemed + invalid. + + @warning: Only packets of version 4 or higher may be accepted, packets below + version 4 MUST be rejected. + """ + + @abstractmethod + def backup_secret_key(self, secret_key: GPGSecretKey) -> bytes: + """Export a secret key for transfer according to RFC 4880 §11.1. + + Do not encrypt the secret data, i.e. set the octet indicating string-to-key usage + conventions to zero in the corresponding secret-key packet according to RFC 4880 + §5.5.3. Do not use OpenPGP's ASCII Armor. + + @param secret_key: The secret key to export. + @return: The binary blob containing the exported secret key. + @raise UnknownKey: if the secret key is not available. + """ + + @abstractmethod + def restore_secret_keys(self, data: bytes) -> Set[GPGSecretKey]: + """Restore secret keys exported for transfer according to RFC 4880 §11.1. + + The secret data is not encrypted, i.e. the octet indicating string-to-key usage + conventions in the corresponding secret-key packets according to RFC 4880 §5.5.3 + are set to zero. OpenPGP's ASCII Armor is not used. + + @param data: Concatenation of one or more secret keys exported for transfer. + @return: The secret keys imported from the data. + @raise InvalidPacket: if the data or one of the packets included in the data is + either syntactically or semantically deemed invalid. + + @warning: Only packets of version 4 or higher may be accepted, packets below + version 4 MUST be rejected. + """ + + @abstractmethod + def encrypt_symmetrically(self, plaintext: bytes, password: str) -> bytes: + """Encrypt data symmetrically according to RFC 4880 §5.3. + + The password is used to build a Symmetric-Key Encrypted Session Key packet which + precedes the Symmetrically Encrypted Data packet that holds the encrypted data. + + @param plaintext: The data to encrypt. + @param password: The password to encrypt the data with. + @return: The encrypted data. + """ + + @abstractmethod + def decrypt_symmetrically(self, ciphertext: bytes, password: str) -> bytes: + """Decrypt data symmetrically according to RFC 4880 §5.3. + + The ciphertext consists of a Symmetrically Encrypted Data packet that holds the + encrypted data, preceded by a Symmetric-Key Encrypted Session Key packet using the + password. + + @param ciphertext: The ciphertext. + @param password: The password to decrypt the data with. + @return: The plaintext. + @raise DecryptionFailed: on decryption failure. + """ + + @abstractmethod + def sign(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes: + """Sign some data. + + OpenPGP's ASCII Armor is not used. + + @param data: The data to sign. + @param secret_keys: The secret keys to sign the data with. + @return: The OpenPGP message carrying the signed data. + """ + + @abstractmethod + def sign_detached(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes: + """Sign some data. Create the signature detached from the data. + + OpenPGP's ASCII Armor is not used. + + @param data: The data to sign. + @param secret_keys: The secret keys to sign the data with. + @return: The OpenPGP message carrying the detached signature. + """ + + @abstractmethod + def verify(self, signed_data: bytes, public_keys: Set[GPGPublicKey]) -> bytes: + """Verify signed data. + + OpenPGP's ASCII Armor is not used. + + @param signed_data: The signed data as an OpenPGP message. + @param public_keys: The public keys to verify the signature with. + @return: The verified and unpacked data. + @raise VerificationFailed: if the data could not be verified. + + @warning: For implementors: it has to be confirmed that a valid signature by one + of the public keys is available. + """ + + @abstractmethod + def verify_detached( + self, + data: bytes, + signature: bytes, + public_keys: Set[GPGPublicKey] + ) -> None: + """Verify signed data, where the signature was created detached from the data. + + OpenPGP's ASCII Armor is not used. + + @param data: The data. + @param signature: The signature as an OpenPGP message. + @param public_keys: The public keys to verify the signature with. + @raise VerificationFailed: if the data could not be verified. + + @warning: For implementors: it has to be confirmed that a valid signature by one + of the public keys is available. + """ + + @abstractmethod + def encrypt( + self, + plaintext: bytes, + public_keys: Set[GPGPublicKey], + signing_keys: Optional[Set[GPGSecretKey]] = None + ) -> bytes: + """Encrypt and optionally sign some data. + + OpenPGP's ASCII Armor is not used. + + @param plaintext: The data to encrypt and optionally sign. + @param public_keys: The public keys to encrypt the data for. + @param signing_keys: The secret keys to sign the data with. + @return: The OpenPGP message carrying the encrypted and optionally signed data. + """ + + @abstractmethod + def decrypt( + self, + ciphertext: bytes, + secret_keys: Set[GPGSecretKey], + public_keys: Optional[Set[GPGPublicKey]] = None + ) -> bytes: + """Decrypt and optionally verify some data. + + OpenPGP's ASCII Armor is not used. + + @param ciphertext: The encrypted and optionally signed data as an OpenPGP message. + @param secret_keys: The secret keys to attempt decryption with. + @param public_keys: The public keys to verify the optional signature with. + @return: The decrypted, optionally verified and unpacked data. + @raise DecryptionFailed: on decryption failure. + @raise VerificationFailed: if the data could not be verified. + + @warning: For implementors: it has to be confirmed that the data was decrypted + using one of the secret keys and that a valid signature by one of the public + keys is available in case the data is signed. + """ + + @abstractmethod + def list_public_keys(self, user_id: str) -> Set[GPGPublicKey]: + """List public keys. + + @param user_id: The user id. + @return: The set of public keys available for this user id. + """ + + @abstractmethod + def list_secret_keys(self, user_id: str) -> Set[GPGSecretKey]: + """List secret keys. + + @param user_id: The user id. + @return: The set of secret keys available for this user id. + """ + + @abstractmethod + def can_sign(self, public_key: GPGPublicKey) -> bool: + """ + @return: Whether the public key belongs to a key pair capable of signing. + """ + + @abstractmethod + def can_encrypt(self, public_key: GPGPublicKey) -> bool: + """ + @return: Whether the public key belongs to a key pair capable of encryption. + """ + + @abstractmethod + def create_key(self, user_id: str) -> GPGSecretKey: + """Create a new GPG key, capable of signing and encryption. + + The key is generated without password protection and without expiration. If a key + with the same user id already exists, a new key is created anyway. + + @param user_id: The user id to assign to the new key. + @return: The new key. + """ + + +class GPGME_GPGPublicKey(GPGPublicKey): + """ + GPG public key implementation based on GnuPG Made Easy (GPGME). + """ + + def __init__(self, key_obj: Any) -> None: + """ + @param key_obj: The GPGME key object. + """ + + self.__key_obj = key_obj + + @property + def fingerprint(self) -> str: + return self.__key_obj.fpr + + @property + def key_obj(self) -> Any: + return self.__key_obj + + +class GPGME_GPGSecretKey(GPGSecretKey): + """ + GPG secret key implementation based on GnuPG Made Easy (GPGME). + """ + + def __init__(self, public_key: GPGME_GPGPublicKey) -> None: + """ + @param public_key: The public key corresponding to this secret key. + """ + + self.__public_key = public_key + + @property + def public_key(self) -> GPGME_GPGPublicKey: + return self.__public_key + + +class GPGME_GPGProvider(GPGProvider): + """ + GPG provider implementation based on GnuPG Made Easy (GPGME). + """ + + def __init__(self, home_dir: Optional[str] = None) -> None: + """ + @param home_dir: Optional GPG home directory path to use for all operations. + """ + + self.__home_dir = home_dir + + def export_public_key(self, public_key: GPGPublicKey) -> bytes: + assert isinstance(public_key, GPGME_GPGPublicKey) + + pattern = public_key.fingerprint + + with gpg.Context(home_dir=self.__home_dir) as c: + try: + result = c.key_export_minimal(pattern) + except gpg.errors.GPGMEError as e: + raise GPGProviderError("Internal GPGME error") from e + + if result is None: + raise UnknownKey(f"Public key {pattern} not found.") + + return result + + def import_public_key(self, packet: bytes) -> GPGPublicKey: + # TODO + # - Reject packets older than version 4 + # - Check whether it's actually a public key (through packet inspection?) + + with gpg.Context(home_dir=self.__home_dir) as c: + try: + result = c.key_import(packet) + except gpg.errors.GPGMEError as e: + # From looking at the code, `key_import` never raises. The documentation + # says it does though, so this is included for future-proofness. + raise GPGProviderError("Internal GPGME error") from e + + if not hasattr(result, "considered"): + raise InvalidPacket( + f"Data not considered for public key import: {result}" + ) + + if len(result.imports) != 1: + raise InvalidPacket( + "Public key packet does not contain exactly one public key (not" + " counting subkeys)." + ) + + try: + key_obj = c.get_key(result.imports[0].fpr, secret=False) + except gpg.errors.GPGMEError as e: + raise GPGProviderError("Internal GPGME error") from e + except gpg.errors.KeyError as e: + raise GPGProviderError("Newly imported public key not found") from e + + return GPGME_GPGPublicKey(key_obj) + + def backup_secret_key(self, secret_key: GPGSecretKey) -> bytes: + assert isinstance(secret_key, GPGME_GPGSecretKey) + # TODO + # - Handle password protection/pinentry + # - Make sure the key is exported unencrypted + + pattern = secret_key.public_key.fingerprint + + with gpg.Context(home_dir=self.__home_dir) as c: + try: + result = c.key_export_secret(pattern) + except gpg.errors.GPGMEError as e: + raise GPGProviderError("Internal GPGME error") from e + + if result is None: + raise UnknownKey(f"Secret key {pattern} not found.") + + return result + + def restore_secret_keys(self, data: bytes) -> Set[GPGSecretKey]: + # TODO + # - Reject packets older than version 4 + # - Check whether it's actually secret keys (through packet inspection?) + + with gpg.Context(home_dir=self.__home_dir) as c: + try: + result = c.key_import(data) + except gpg.errors.GPGMEError as e: + # From looking at the code, `key_import` never raises. The documentation + # says it does though, so this is included for future-proofness. + raise GPGProviderError("Internal GPGME error") from e + + if not hasattr(result, "considered"): + raise InvalidPacket( + f"Data not considered for secret key import: {result}" + ) + + if len(result.imports) == 0: + raise InvalidPacket("Secret key packet does not contain a secret key.") + + secret_keys = set() + for import_status in result.imports: + try: + key_obj = c.get_key(import_status.fpr, secret=True) + except gpg.errors.GPGMEError as e: + raise GPGProviderError("Internal GPGME error") from e + except gpg.errors.KeyError as e: + raise GPGProviderError("Newly imported secret key not found") from e + + secret_keys.add(GPGME_GPGSecretKey(GPGME_GPGPublicKey(key_obj))) + + return secret_keys + + def encrypt_symmetrically(self, plaintext: bytes, password: str) -> bytes: + with gpg.Context(home_dir=self.__home_dir) as c: + try: + ciphertext, __, __ = c.encrypt(plaintext, passphrase=password, sign=False) + except gpg.errors.GPGMEError as e: + raise GPGProviderError("Internal GPGME error") from e + + return ciphertext + + def decrypt_symmetrically(self, ciphertext: bytes, password: str) -> bytes: + with gpg.Context(home_dir=self.__home_dir) as c: + try: + plaintext, __, __ = c.decrypt( + ciphertext, + passphrase=password, + verify=False + ) + except gpg.errors.GPGMEError as e: + # TODO: Find out what kind of error is raised if the password is wrong and + # re-raise it as DecryptionFailed instead. + raise GPGProviderError("Internal GPGME error") from e + except gpg.UnsupportedAlgorithm as e: + raise DecryptionFailed("Unsupported algorithm") from e + + return plaintext + + def sign(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes: + signers = [] + for secret_key in secret_keys: + assert isinstance(secret_key, GPGME_GPGSecretKey) + + signers.append(secret_key.public_key.key_obj) + + with gpg.Context(home_dir=self.__home_dir, signers=signers) as c: + try: + signed_data, __ = c.sign(data) + except gpg.error.GPGMEError as e: + raise GPGProviderError("Internal GPGME error") from e + except gpg.errors.InvalidSigners as e: + raise GPGProviderError( + "At least one of the secret keys is invalid for signing" + ) from e + + return signed_data + + def sign_detached(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes: + signers = [] + for secret_key in secret_keys: + assert isinstance(secret_key, GPGME_GPGSecretKey) + + signers.append(secret_key.public_key.key_obj) + + with gpg.Context(home_dir=self.__home_dir, signers=signers) as c: + try: + signature, __ = c.sign(data, mode=gpg.constants.sig.mode.DETACH) + except gpg.error.GPGMEError as e: + raise GPGProviderError("Internal GPGME error") from e + except gpg.errors.InvalidSigners as e: + raise GPGProviderError( + "At least one of the secret keys is invalid for signing" + ) from e + + return signature + + def verify(self, signed_data: bytes, public_keys: Set[GPGPublicKey]) -> bytes: + with gpg.Context(home_dir=self.__home_dir) as c: + try: + data, result = c.verify(signed_data) + except gpg.errors.GPGMEError as e: + raise GPGProviderError("Internal GPGME error") from e + except gpg.errors.BadSignatures as e: + raise VerificationFailed("Bad signatures on signed data") from e + + valid_signature_found = False + for public_key in public_keys: + assert isinstance(public_key, GPGME_GPGPublicKey) + + for subkey in public_key.key_obj.subkeys: + for sig in result.signatures: + if subkey.can_sign and subkey.fpr == sig.fpr: + valid_signature_found = True + + if not valid_signature_found: + raise VerificationFailed( + "Data not signed by one of the expected public keys" + ) + + return data + + def verify_detached( + self, + data: bytes, + signature: bytes, + public_keys: Set[GPGPublicKey] + ) -> None: + with gpg.Context(home_dir=self.__home_dir) as c: + try: + __, result = c.verify(data, signature=signature) + except gpg.errors.GPGMEError as e: + raise GPGProviderError("Internal GPGME error") from e + except gpg.errors.BadSignatures as e: + raise VerificationFailed("Bad signatures on signed data") from e + + valid_signature_found = False + for public_key in public_keys: + assert isinstance(public_key, GPGME_GPGPublicKey) + + for subkey in public_key.key_obj.subkeys: + for sig in result.signatures: + if subkey.can_sign and subkey.fpr == sig.fpr: + valid_signature_found = True + + if not valid_signature_found: + raise VerificationFailed( + "Data not signed by one of the expected public keys" + ) + + def encrypt( + self, + plaintext: bytes, + public_keys: Set[GPGPublicKey], + signing_keys: Optional[Set[GPGSecretKey]] = None + ) -> bytes: + recipients = [] + for public_key in public_keys: + assert isinstance(public_key, GPGME_GPGPublicKey) + + recipients.append(public_key.key_obj) + + signers = [] + if signing_keys is not None: + for secret_key in signing_keys: + assert isinstance(secret_key, GPGME_GPGSecretKey) + + signers.append(secret_key.public_key.key_obj) + + sign = signing_keys is not None + + with gpg.Context(home_dir=self.__home_dir, signers=signers) as c: + try: + ciphertext, __, __ = c.encrypt( + plaintext, + recipients=recipients, + sign=sign, + always_trust=True, + add_encrypt_to=True + ) + except gpg.errors.GPGMEError as e: + raise GPGProviderError("Internal GPGME error") from e + except gpg.errors.InvalidRecipients as e: + raise GPGProviderError( + "At least one of the public keys is invalid for encryption" + ) from e + except gpg.errors.InvalidSigners as e: + raise GPGProviderError( + "At least one of the signing keys is invalid for signing" + ) from e + + return ciphertext + + def decrypt( + self, + ciphertext: bytes, + secret_keys: Set[GPGSecretKey], + public_keys: Optional[Set[GPGPublicKey]] = None + ) -> bytes: + verify = public_keys is not None + + with gpg.Context(home_dir=self.__home_dir) as c: + try: + plaintext, result, verify_result = c.decrypt( + ciphertext, + verify=verify + ) + except gpg.errors.GPGMEError as e: + raise GPGProviderError("Internal GPGME error") from e + except gpg.UnsupportedAlgorithm as e: + raise DecryptionFailed("Unsupported algorithm") from e + + # TODO: Check whether the data was decrypted using one of the expected secret + # keys + + if public_keys is not None: + valid_signature_found = False + for public_key in public_keys: + assert isinstance(public_key, GPGME_GPGPublicKey) + + for subkey in public_key.key_obj.subkeys: + for sig in verify_result.signatures: + if subkey.can_sign and subkey.fpr == sig.fpr: + valid_signature_found = True + + if not valid_signature_found: + raise VerificationFailed( + "Data not signed by one of the expected public keys" + ) + + return plaintext + + def list_public_keys(self, user_id: str) -> Set[GPGPublicKey]: + with gpg.Context(home_dir=self.__home_dir) as c: + try: + return { + GPGME_GPGPublicKey(key) + for key + in c.keylist(pattern=user_id, secret=False) + } + except gpg.errors.GPGMEError as e: + raise GPGProviderError("Internal GPGME error") from e + + def list_secret_keys(self, user_id: str) -> Set[GPGSecretKey]: + with gpg.Context(home_dir=self.__home_dir) as c: + try: + return { + GPGME_GPGSecretKey(GPGME_GPGPublicKey(key)) + for key + in c.keylist(pattern=user_id, secret=True) + } + except gpg.errors.GPGMEError as e: + raise GPGProviderError("Internal GPGME error") from e + + def can_sign(self, public_key: GPGPublicKey) -> bool: + assert isinstance(public_key, GPGME_GPGPublicKey) + + return any(subkey.can_sign for subkey in public_key.key_obj.subkeys) + + def can_encrypt(self, public_key: GPGPublicKey) -> bool: + assert isinstance(public_key, GPGME_GPGPublicKey) + + return any(subkey.can_encrypt for subkey in public_key.key_obj.subkeys) + + def create_key(self, user_id: str) -> GPGSecretKey: + with gpg.Context(home_dir=self.__home_dir) as c: + try: + result = c.create_key( + user_id, + expires=False, + sign=True, + encrypt=True, + certify=False, + authenticate=False, + force=True + ) + + key_obj = c.get_key(result.fpr, secret=True) + except gpg.errors.GPGMEError as e: + raise GPGProviderError("Internal GPGME error") from e + except gpg.errors.KeyError as e: + raise GPGProviderError("Newly created key not found") from e + + return GPGME_GPGSecretKey(GPGME_GPGPublicKey(key_obj)) + + +class PublicKeyMetadata(NamedTuple): + """ + Metadata about a published public key. + """ + + fingerprint: str + timestamp: datetime + + +@enum.unique +class TrustLevel(enum.Enum): + """ + The trust levels required for BTBV and manual trust. + """ + + TRUSTED: str = "TRUSTED" + BLINDLY_TRUSTED: str = "BLINDLY_TRUSTED" + UNDECIDED: str = "UNDECIDED" + DISTRUSTED: str = "DISTRUSTED" + + +OPENPGP_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?> +<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" + targetNamespace="urn:xmpp:openpgp:0" + xmlns="urn:xmpp:openpgp:0"> + + <xs:element name="openpgp" type="xs:base64Binary"/> +</xs:schema> +""") + + +# The following schema needs verion 1.1 of XML Schema, which is not supported by lxml. +# Luckily, xmlschema exists, which is a clean, well maintained, cross-platform +# implementation of XML Schema, including version 1.1. +CONTENT_SCHEMA = xmlschema.XMLSchema11("""<?xml version="1.1" encoding="utf8"?> +<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" + targetNamespace="urn:xmpp:openpgp:0" + xmlns="urn:xmpp:openpgp:0"> + + <xs:element name="signcrypt"> + <xs:complexType> + <xs:all> + <xs:element ref="to" maxOccurs="unbounded"/> + <xs:element ref="time"/> + <xs:element ref="rpad" minOccurs="0"/> + <xs:element ref="payload"/> + </xs:all> + </xs:complexType> + </xs:element> + + <xs:element name="sign"> + <xs:complexType> + <xs:all> + <xs:element ref="to" maxOccurs="unbounded"/> + <xs:element ref="time"/> + <xs:element ref="rpad" minOccurs="0"/> + <xs:element ref="payload"/> + </xs:all> + </xs:complexType> + </xs:element> + + <xs:element name="crypt"> + <xs:complexType> + <xs:all> + <xs:element ref="to" minOccurs="0" maxOccurs="unbounded"/> + <xs:element ref="time"/> + <xs:element ref="rpad" minOccurs="0"/> + <xs:element ref="payload"/> + </xs:all> + </xs:complexType> + </xs:element> + + <xs:element name="to"> + <xs:complexType> + <xs:attribute name="jid" type="xs:string"/> + </xs:complexType> + </xs:element> + + <xs:element name="time"> + <xs:complexType> + <xs:attribute name="stamp" type="xs:dateTime"/> + </xs:complexType> + </xs:element> + + <xs:element name="rpad" type="xs:string"/> + + <xs:element name="payload"> + <xs:complexType> + <xs:sequence> + <xs:any minOccurs="0" maxOccurs="unbounded" processContents="skip"/> + </xs:sequence> + </xs:complexType> + </xs:element> +</xs:schema> +""") + + +PUBLIC_KEYS_LIST_NODE = "urn:xmpp:openpgp:0:public-keys" +PUBLIC_KEYS_LIST_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?> +<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" + targetNamespace="urn:xmpp:openpgp:0" + xmlns="urn:xmpp:openpgp:0"> + + <xs:element name="public-keys-list"> + <xs:complexType> + <xs:sequence> + <xs:element ref="pubkey-metadata" minOccurs="0" maxOccurs="unbounded"/> + </xs:sequence> + </xs:complexType> + </xs:element> + + <xs:element name="pubkey-metadata"> + <xs:complexType> + <xs:attribute name="v4-fingerprint" type="xs:string"/> + <xs:attribute name="date" type="xs:dateTime"/> + </xs:complexType> + </xs:element> +</xs:schema> +""") + + +PUBKEY_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?> +<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" + targetNamespace="urn:xmpp:openpgp:0" + xmlns="urn:xmpp:openpgp:0"> + + <xs:element name="pubkey"> + <xs:complexType> + <xs:all> + <xs:element ref="data"/> + </xs:all> + <xs:anyAttribute processContents="skip"/> + </xs:complexType> + </xs:element> + + <xs:element name="data" type="xs:base64Binary"/> +</xs:schema> +""") + + +SECRETKEY_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?> +<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" + targetNamespace="urn:xmpp:openpgp:0" + xmlns="urn:xmpp:openpgp:0"> + + <xs:element name="secretkey" type="xs:base64Binary"/> +</xs:schema> +""") + + +DEFAULT_TRUST_MODEL_PARAM = f""" +<params> +<individual> +<category name="{PARAM_CATEGORY}" label={quoteattr(D_('Security'))}> + <param name="{PARAM_NAME}" + label={quoteattr(D_('OMEMO default trust policy'))} + type="list" security="3"> + <option value="manual" label={quoteattr(D_('Manual trust (more secure)'))} /> + <option value="btbv" + label={quoteattr(D_('Blind Trust Before Verification (more user friendly)'))} + selected="true" /> + </param> +</category> +</individual> +</params> +""" + + +def get_gpg_provider(sat: SAT, client: SatXMPPClient) -> GPGProvider: + """Get the GPG provider for a client. + + @param sat: The SAT instance. + @param client: The client. + @return: The GPG provider specifically for that client. + """ + + return GPGME_GPGProvider(str(sat.get_local_path(client, "gnupg-home"))) + + +def generate_passphrase() -> str: + """Generate a secure passphrase for symmetric encryption. + + @return: The passphrase. + """ + + return "-".join("".join( + secrets.choice("123456789ABCDEFGHIJKLMNPQRSTUVWXYZ") for __ in range(4) + ) for __ in range(6)) + + +# TODO: Handle the user id mess +class XEP_0373: + """ + Implementation of XEP-0373: OpenPGP for XMPP under namespace ``urn:xmpp:openpgp:0``. + """ + + def __init__(self, sat: SAT) -> None: + """ + @param sat: The SAT instance. + """ + + self.__sat = sat + + # Add configuration option to choose between manual trust and BTBV as the trust + # model + sat.memory.updateParams(DEFAULT_TRUST_MODEL_PARAM) + + self.__xep_0045 = cast(Optional[XEP_0045], sat.plugins.get("XEP-0045")) + self.__xep_0060 = cast(XEP_0060, sat.plugins["XEP-0060"]) + + self.__storage: Dict[str, persistent.LazyPersistentBinaryDict] = {} + + xep_0163 = cast(XEP_0163, sat.plugins["XEP-0163"]) + xep_0163.addPEPEvent( + "OX_PUBLIC_KEYS_LIST", + PUBLIC_KEYS_LIST_NODE, + lambda items_event, profile: defer.ensureDeferred( + self.__on_public_keys_list_update(items_event, profile) + ) + ) + + async def profileConnected( # pylint: disable=invalid-name + self, + client: SatXMPPClient + ) -> None: + """ + @param client: The client. + """ + + profile = cast(str, client.profile) + + if not profile in self.__storage: + self.__storage[profile] = \ + persistent.LazyPersistentBinaryDict("XEP-0373", client.profile) + + if len(self.list_secret_keys(client)) == 0: + log.debug(f"Generating first GPG key for {client.jid.userhost()}.") + await self.create_key(client) + + async def __on_public_keys_list_update( + self, + items_event: pubsub.ItemsEvent, + profile: str + ) -> None: + """Handle public keys list updates fired by PEP. + + @param items_event: The event. + @param profile: The profile this event belongs to. + """ + + client = self.__sat.getClient(profile) + + sender = cast(jid.JID, items_event.sender) + items = cast(List[domish.Element], items_event.items) + + if len(items) > 1: + log.warning("Ignoring public keys list update with more than one element.") + return + + item_elt = next(iter(items), None) + if item_elt is None: + log.debug("Ignoring empty public keys list update.") + return + + public_keys_list_elt = cast( + Optional[domish.Element], + next(item_elt.elements(NS_OX, "public-keys-list"), None) + ) + + pubkey_metadata_elts: Optional[List[domish.Element]] = None + + if public_keys_list_elt is not None: + try: + PUBLIC_KEYS_LIST_SCHEMA.validate(public_keys_list_elt.toXml()) + except xmlschema.XMLSchemaValidationError: + pass + else: + pubkey_metadata_elts = \ + list(public_keys_list_elt.elements(NS_OX, "pubkey-metadata")) + + if pubkey_metadata_elts is None: + log.warning(f"Malformed public keys list update item: {item_elt.toXml()}") + return + + new_public_keys_metadata = { PublicKeyMetadata( + fingerprint=cast(str, pubkey_metadata_elt["v4-fingerprint"]), + timestamp=parse_datetime(cast(str, pubkey_metadata_elt["date"])) + ) for pubkey_metadata_elt in pubkey_metadata_elts } + + storage_key = f"/public-keys-metadata/{sender.userhost()}" + + local_public_keys_metadata = cast( + Set[PublicKeyMetadata], + await self.__storage[profile].get(storage_key, set()) + ) + + unchanged_keys = new_public_keys_metadata & local_public_keys_metadata + changed_or_new_keys = new_public_keys_metadata - unchanged_keys + available_keys = self.list_public_keys(client, sender) + + for key_metadata in changed_or_new_keys: + # Check whether the changed or new key has been imported before + if any(key.fingerprint == key_metadata.fingerprint for key in available_keys): + try: + # If it has been imported before, try to update it + await self.import_public_key(client, sender, key_metadata.fingerprint) + except Exception as e: + log.warning(f"Public key import failed: {e}") + + # If the update fails, remove the key from the local metadata list + # such that the update is attempted again next time + new_public_keys_metadata.remove(key_metadata) + + # Check whether this update was for our account and make sure all of our keys are + # included in the update + if sender.userhost() == client.jid.userhost(): + secret_keys = self.list_secret_keys(client) + missing_keys = set(filter(lambda secret_key: all( + key_metadata.fingerprint != secret_key.public_key.fingerprint + for key_metadata + in new_public_keys_metadata + ), secret_keys)) + + if len(missing_keys) > 0: + log.warning( + "Public keys list update did not contain at least one of our keys." + f" {new_public_keys_metadata}" + ) + + for missing_key in missing_keys: + log.warning(missing_key.public_key.fingerprint) + new_public_keys_metadata.add(PublicKeyMetadata( + fingerprint=missing_key.public_key.fingerprint, + timestamp=datetime.now(timezone.utc) + )) + + await self.publish_public_keys_list(client, new_public_keys_metadata) + + await self.__storage[profile].force(storage_key, new_public_keys_metadata) + + def list_public_keys(self, client: SatXMPPClient, jid: jid.JID) -> Set[GPGPublicKey]: + """List GPG public keys available for a JID. + + @param client: The client to perform this operation with. + @param jid: The JID. Can be a bare JID. + @return: The set of public keys available for this JID. + """ + + gpg_provider = get_gpg_provider(self.__sat, client) + + return gpg_provider.list_public_keys(f"xmpp:{jid.userhost()}") + + def list_secret_keys(self, client: SatXMPPClient) -> Set[GPGSecretKey]: + """List GPG secret keys available for a JID. + + @param client: The client to perform this operation with. + @return: The set of secret keys available for this JID. + """ + + gpg_provider = get_gpg_provider(self.__sat, client) + + return gpg_provider.list_secret_keys(f"xmpp:{client.jid.userhost()}") + + async def create_key(self, client: SatXMPPClient) -> GPGSecretKey: + """Create a new GPG key, capable of signing and encryption. + + The key is generated without password protection and without expiration. + + @param client: The client to perform this operation with. + @return: The new key. + """ + + gpg_provider = get_gpg_provider(self.__sat, client) + + secret_key = gpg_provider.create_key(f"xmpp:{client.jid.userhost()}") + + await self.publish_public_key(client, secret_key.public_key) + + storage_key = f"/public-keys-metadata/{client.jid.userhost()}" + + public_keys_list = cast( + Set[PublicKeyMetadata], + await self.__storage[client.profile].get(storage_key, set()) + ) + + public_keys_list.add(PublicKeyMetadata( + fingerprint=secret_key.public_key.fingerprint, + timestamp=datetime.now(timezone.utc) + )) + + await self.publish_public_keys_list(client, public_keys_list) + + await self.__storage[client.profile].force(storage_key, public_keys_list) + + return secret_key + + @staticmethod + def __build_content_element( + element_name: Literal["signcrypt", "sign", "crypt"], + recipient_jids: Iterable[jid.JID], + include_rpad: bool + ) -> Tuple[domish.Element, domish.Element]: + """Build a content element. + + @param element_name: The name of the content element. + @param recipient_jids: The intended recipients of this content element. Can be + bare JIDs. + @param include_rpad: Whether to include random-length random-content padding. + @return: The content element and the ``<payload/>`` element to add the stanza + extension elements to. + """ + + content_elt = domish.Element((NS_OX, element_name)) + + for recipient_jid in recipient_jids: + content_elt.addElement("to")["jid"] = recipient_jid.userhost() + + content_elt.addElement("time")["stamp"] = format_datetime() + + if include_rpad: + # XEP-0373 doesn't specify bounds for the length of the random padding. This + # uses the bounds specified in XEP-0420 for the closely related rpad affix. + rpad_length = secrets.randbelow(201) + rpad_content = "".join( + secrets.choice(string.digits + string.ascii_letters + string.punctuation) + for __ + in range(rpad_length) + ) + content_elt.addElement("rpad", content=rpad_content) + + payload_elt = content_elt.addElement("payload") + + return content_elt, payload_elt + + @staticmethod + def build_signcrypt_element( + recipient_jids: Iterable[jid.JID] + ) -> Tuple[domish.Element, domish.Element]: + """Build a ``<signcrypt/>`` content element. + + @param recipient_jids: The intended recipients of this content element. Can be + bare JIDs. + @return: The ``<signcrypt/>`` element and the ``<payload/>`` element to add the + stanza extension elements to. + """ + + if len(recipient_jids) == 0: + raise ValueError("Recipient JIDs must be provided.") + + return XEP_0373.__build_content_element("signcrypt", recipient_jids, True) + + @staticmethod + def build_sign_element( + recipient_jids: Iterable[jid.JID], + include_rpad: bool + ) -> Tuple[domish.Element, domish.Element]: + """Build a ``<sign/>`` content element. + + @param recipient_jids: The intended recipients of this content element. Can be + bare JIDs. + @param include_rpad: Whether to include random-length random-content padding, + which is OPTIONAL for the ``<sign/>`` content element. + @return: The ``<sign/>`` element and the ``<payload/>`` element to add the stanza + extension elements to. + """ + + if len(recipient_jids) == 0: + raise ValueError("Recipient JIDs must be provided.") + + return XEP_0373.__build_content_element("sign", recipient_jids, include_rpad) + + @staticmethod + def build_crypt_element( + recipient_jids: Iterable[jid.JID] + ) -> Tuple[domish.Element, domish.Element]: + """Build a ``<crypt/>`` content element. + + @param recipient_jids: The intended recipients of this content element. Specifying + the intended recipients is OPTIONAL for the ``<crypt/>`` content element. Can + be bare JIDs. + @return: The ``<crypt/>`` element and the ``<payload/>`` element to add the stanza + extension elements to. + """ + + return XEP_0373.__build_content_element("crypt", recipient_jids, True) + + async def build_openpgp_element( + self, + client: SatXMPPClient, + content_elt: domish.Element, + recipient_jids: Set[jid.JID] + ) -> domish.Element: + """Build an ``<openpgp/>`` element. + + @param client: The client to perform this operation with. + @param content_elt: The content element to contain in the ``<openpgp/>`` element. + @param recipient_jids: The recipient's JIDs. Can be bare JIDs. + @return: The ``<openpgp/>`` element. + """ + + gpg_provider = get_gpg_provider(self.__sat, client) + + # TODO: I'm not sure whether we want to sign with all keys by default or choose + # just one key/a subset of keys to sign with. + signing_keys = set(filter( + lambda secret_key: gpg_provider.can_sign(secret_key.public_key), + self.list_secret_keys(client) + )) + + encryption_keys: Set[GPGPublicKey] = set() + + for recipient_jid in recipient_jids: + # Import all keys of the recipient + all_public_keys = await self.import_all_public_keys(client, recipient_jid) + + # Filter for keys that can encrypt + encryption_keys |= set(filter(gpg_provider.can_encrypt, all_public_keys)) + + # TODO: Handle trust + + content = content_elt.toXml().encode("utf-8") + data: bytes + + if content_elt.name == "signcrypt": + data = gpg_provider.encrypt(content, encryption_keys, signing_keys) + elif content_elt.name == "sign": + data = gpg_provider.sign(content, signing_keys) + elif content_elt.name == "crypt": + data = gpg_provider.encrypt(content, encryption_keys) + else: + raise ValueError(f"Unknown content element <{content_elt.name}/>") + + openpgp_elt = domish.Element((NS_OX, "openpgp")) + openpgp_elt.addContent(base64.b64encode(data).decode("ASCII")) + return openpgp_elt + + async def unpack_openpgp_element( + self, + client: SatXMPPClient, + openpgp_elt: domish.Element, + element_name: Literal["signcrypt", "sign", "crypt"], + sender_jid: jid.JID + ) -> Tuple[domish.Element, datetime]: + """Verify, decrypt and unpack an ``<openpgp/>`` element. + + @param client: The client to perform this operation with. + @param openpgp_elt: The ``<openpgp/>`` element. + @param element_name: The name of the content element. + @param sender_jid: The sender's JID. Can be a bare JID. + @return: The ``<payload/>`` element containing the decrypted/verified stanza + extension elements carried by this ``<openpgp/>`` element, and the timestamp + contained in the content element. + @raise exceptions.ParsingError: on syntactical verification errors. + @raise VerificationError: on semantical verification errors accoding to XEP-0373. + @raise DecryptionFailed: on decryption failure. + @raise VerificationFailed: if the data could not be verified. + + @warning: The timestamp is not verified for plausibility; this SHOULD be done by + the calling code. + """ + + gpg_provider = get_gpg_provider(self.__sat, client) + + decryption_keys = set(filter( + lambda secret_key: gpg_provider.can_encrypt(secret_key.public_key), + self.list_secret_keys(client) + )) + + # Import all keys of the sender + all_public_keys = await self.import_all_public_keys(client, sender_jid) + + # Filter for keys that can sign + verification_keys = set(filter(gpg_provider.can_sign, all_public_keys)) + + # TODO: Handle trust + + try: + OPENPGP_SCHEMA.validate(openpgp_elt.toXml()) + except xmlschema.XMLSchemaValidationError as e: + raise exceptions.ParsingError( + "<openpgp/> element doesn't pass schema validation." + ) from e + + openpgp_message = base64.b64decode(str(openpgp_elt)) + content: bytes + + if element_name == "signcrypt": + content = gpg_provider.decrypt( + openpgp_message, + decryption_keys, + public_keys=verification_keys + ) + elif element_name == "sign": + content = gpg_provider.verify(openpgp_message, verification_keys) + elif element_name == "crypt": + content = gpg_provider.decrypt(openpgp_message, decryption_keys) + else: + assert_never(element_name) + + try: + content_elt = cast( + domish.Element, + xml_tools.ElementParser()(content.decode("utf-8")) + ) + except UnicodeDecodeError as e: + raise exceptions.ParsingError("UTF-8 decoding error") from e + + try: + CONTENT_SCHEMA.validate(content_elt.toXml()) + except xmlschema.XMLSchemaValidationError as e: + raise exceptions.ParsingError( + f"<{element_name}/> element doesn't pass schema validation." + ) from e + + if content_elt.name != element_name: + raise exceptions.ParsingError(f"Not a <{element_name}/> element.") + + recipient_jids = \ + { jid.JID(to_elt["jid"]) for to_elt in content_elt.elements(NS_OX, "to") } + + if ( + client.jid.userhostJID() not in { jid.userhostJID() for jid in recipient_jids } + and element_name != "crypt" + ): + raise VerificationError( + f"Recipient list in <{element_name}/> element does not list our (bare)" + f" JID." + ) + + time_elt = next(content_elt.elements(NS_OX, "time")) + + timestamp = parse_datetime(time_elt["stamp"]) + + payload_elt = next(content_elt.elements(NS_OX, "payload")) + + return payload_elt, timestamp + + async def publish_public_key( + self, + client: SatXMPPClient, + public_key: GPGPublicKey + ) -> None: + """Publish a public key. + + @param client: The client. + @param public_key: The public key to publish. + @raise XMPPInteractionFailed: if any interaction via XMPP failed. + """ + + gpg_provider = get_gpg_provider(self.__sat, client) + + packet = gpg_provider.export_public_key(public_key) + + node = f"urn:xmpp:openpgp:0:public-keys:{public_key.fingerprint}" + + pubkey_elt = domish.Element((NS_OX, "pubkey")) + + pubkey_elt.addElement("data", content=base64.b64encode(packet).decode("ASCII")) + + try: + await self.__xep_0060.sendItem( + client, + client.jid.userhostJID(), + node, + pubkey_elt, + format_datetime(), + extra={ + XEP_0060.EXTRA_PUBLISH_OPTIONS: { + XEP_0060.OPT_PERSIST_ITEMS: "true", + XEP_0060.OPT_ACCESS_MODEL: "open", + XEP_0060.OPT_MAX_ITEMS: 1 + }, + # TODO: Do we really want publish_without_options here? + XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options" + } + ) + except Exception as e: + raise XMPPInteractionFailed("Publishing the public key failed.") from e + + async def import_all_public_keys( + self, + client: SatXMPPClient, + jid: jid.JID + ) -> Set[GPGPublicKey]: + """Import all public keys of a JID that have not been imported before. + + @param client: The client. + @param jid: The JID. Can be a bare JID. + @return: The public keys. + @note: Failure to import a key simply results in the key not being included in the + result. + """ + + available_public_keys = self.list_public_keys(client, jid) + + storage_key = f"/public-keys-metadata/{jid.userhost()}" + + public_keys_metadata = cast( + Set[PublicKeyMetadata], + await self.__storage[client.profile].get(storage_key, set()) + ) + + missing_keys = set(filter(lambda public_key_metadata: all( + public_key_metadata.fingerprint != public_key.fingerprint + for public_key + in available_public_keys + ), public_keys_metadata)) + + for missing_key in missing_keys: + try: + available_public_keys.add( + await self.import_public_key(client, jid, missing_key.fingerprint) + ) + except Exception as e: + log.warning( + f"Import of public key {missing_key.fingerprint} owned by" + f" {jid.userhost()} failed, ignoring: {e}" + ) + + return available_public_keys + + async def import_public_key( + self, + client: SatXMPPClient, + jid: jid.JID, + fingerprint: str + ) -> GPGPublicKey: + """Import a public key. + + @param client: The client. + @param jid: The JID owning the public key. Can be a bare JID. + @param fingerprint: The fingerprint of the public key. + @return: The public key. + @raise exceptions.NotFound: if the public key was not found. + @raise exceptions.ParsingError: on XML-level parsing errors. + @raise InvalidPacket: if the packet is either syntactically or semantically deemed + invalid. + @raise XMPPInteractionFailed: if any interaction via XMPP failed. + """ + + gpg_provider = get_gpg_provider(self.__sat, client) + + node = f"urn:xmpp:openpgp:0:public-keys:{fingerprint}" + + try: + items, __ = await self.__xep_0060.getItems( + client, + jid.userhostJID(), + node, + max_items=1 + ) + except exceptions.NotFound as e: + raise exceptions.NotFound( + f"No public key with fingerprint {fingerprint} published by JID" + f" {jid.userhost()}." + ) from e + except Exception as e: + raise XMPPInteractionFailed("Fetching the public keys list failed.") from e + + try: + item_elt = cast(domish.Element, items[0]) + except IndexError as e: + raise exceptions.NotFound( + f"No public key with fingerprint {fingerprint} published by JID" + f" {jid.userhost()}." + ) from e + + pubkey_elt = cast( + Optional[domish.Element], + next(item_elt.elements(NS_OX, "pubkey"), None) + ) + + if pubkey_elt is None: + raise exceptions.ParsingError( + f"Publish-Subscribe item of JID {jid.userhost()} doesn't contain pubkey" + f" element." + ) + + try: + PUBKEY_SCHEMA.validate(pubkey_elt.toXml()) + except xmlschema.XMLSchemaValidationError as e: + raise exceptions.ParsingError( + f"Publish-Subscribe item of JID {jid.userhost()} doesn't pass pubkey" + f" schema validation." + ) from e + + public_key = gpg_provider.import_public_key(base64.b64decode(str( + next(pubkey_elt.elements(NS_OX, "data")) + ))) + + return public_key + + async def publish_public_keys_list( + self, + client: SatXMPPClient, + public_keys_list: Iterable[PublicKeyMetadata] + ) -> None: + """Publish/update the own public keys list. + + @param client: The client. + @param public_keys_list: The public keys list. + @raise XMPPInteractionFailed: if any interaction via XMPP failed. + + @warning: All public keys referenced in the public keys list MUST be published + beforehand. + """ + + if len({ pkm.fingerprint for pkm in public_keys_list }) != len(public_keys_list): + raise ValueError("Public keys list contains duplicate fingerprints.") + + node = "urn:xmpp:openpgp:0:public-keys" + + public_keys_list_elt = domish.Element((NS_OX, "public-keys-list")) + + for public_key_metadata in public_keys_list: + pubkey_metadata_elt = public_keys_list_elt.addElement("pubkey-metadata") + pubkey_metadata_elt["v4-fingerprint"] = public_key_metadata.fingerprint + pubkey_metadata_elt["date"] = format_datetime(public_key_metadata.timestamp) + + try: + await self.__xep_0060.sendItem( + client, + client.jid.userhostJID(), + node, + public_keys_list_elt, + item_id=XEP_0060.ID_SINGLETON, + extra={ + XEP_0060.EXTRA_PUBLISH_OPTIONS: { + XEP_0060.OPT_PERSIST_ITEMS: "true", + XEP_0060.OPT_ACCESS_MODEL: "open", + XEP_0060.OPT_MAX_ITEMS: 1 + }, + # TODO: Do we really want publish_without_options here? + XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options" + } + ) + except Exception as e: + raise XMPPInteractionFailed("Publishing the public keys list failed.") from e + + async def download_public_keys_list( + self, + client: SatXMPPClient, + jid: jid.JID + ) -> Optional[Set[PublicKeyMetadata]]: + """Download the public keys list of a JID. + + @param client: The client. + @param jid: The JID. Can be a bare JID. + @return: The public keys list or ``None`` if the JID hasn't published a public + keys list. An empty list means the JID has published an empty list. + @raise exceptions.ParsingError: on XML-level parsing errors. + @raise XMPPInteractionFailed: if any interaction via XMPP failed. + """ + + node = "urn:xmpp:openpgp:0:public-keys" + + try: + items, __ = await self.__xep_0060.getItems( + client, + jid.userhostJID(), + node, + max_items=1 + ) + except exceptions.NotFound: + return None + except Exception as e: + raise XMPPInteractionFailed() from e + + try: + item_elt = cast(domish.Element, items[0]) + except IndexError: + return None + + public_keys_list_elt = cast( + Optional[domish.Element], + next(item_elt.elements(NS_OX, "public-keys-list"), None) + ) + + if public_keys_list_elt is None: + return None + + try: + PUBLIC_KEYS_LIST_SCHEMA.validate(public_keys_list_elt.toXml()) + except xmlschema.XMLSchemaValidationError as e: + raise exceptions.ParsingError( + f"Publish-Subscribe item of JID {jid.userhost()} doesn't pass public keys" + f" list schema validation." + ) from e + + return { + PublicKeyMetadata( + fingerprint=pubkey_metadata_elt["v4-fingerprint"], + timestamp=parse_datetime(pubkey_metadata_elt["date"]) + ) + for pubkey_metadata_elt + in public_keys_list_elt.elements(NS_OX, "pubkey-metadata") + } + + async def __prepare_secret_key_synchronization( + self, + client: SatXMPPClient + ) -> Optional[domish.Element]: + """Prepare for secret key synchronization. + + Makes sure the relative protocols and protocol extensions are supported by the + server and makes sure that the PEP node for secret synchronization exists and is + configured correctly. The node is created if necessary. + + @param client: The client. + @return: As part of the preparations, the secret key synchronization PEP node is + fetched. The result of that fetch is returned here. + @raise exceptions.FeatureNotFound: if the server lacks support for the required + protocols or protocol extensions. + @raise XMPPInteractionFailed: if any interaction via XMPP failed. + """ + + try: + infos = cast(DiscoInfo, await self.__sat.memory.disco.getInfos( + client, + client.jid.userhostJID() + )) + except Exception as e: + raise XMPPInteractionFailed( + "Error performing service discovery on the own bare JID." + ) from e + + identities = cast(Dict[Tuple[str, str], str], infos.identities) + features = cast(Set[DiscoFeature], infos.features) + + if ("pubsub", "pep") not in identities: + raise exceptions.FeatureNotFound("Server doesn't support PEP.") + + if "http://jabber.org/protocol/pubsub#access-whitelist" not in features: + raise exceptions.FeatureNotFound( + "Server doesn't support the whitelist access model." + ) + + persistent_items_supported = \ + "http://jabber.org/protocol/pubsub#persistent-items" in features + + # TODO: persistent-items is a SHOULD, how do we handle the feature missing? + + node = "urn:xmpp:openpgp:0:secret-key" + + try: + items, __ = await self.__xep_0060.getItems( + client, + client.jid.userhostJID(), + node, + max_items=1 + ) + except exceptions.NotFound: + try: + await self.__xep_0060.createNode( + client, + client.jid.userhostJID(), + node, + { + XEP_0060.OPT_PERSIST_ITEMS: "true", + XEP_0060.OPT_ACCESS_MODEL: "whitelist", + XEP_0060.OPT_MAX_ITEMS: "1" + } + ) + except Exception as e: + raise XMPPInteractionFailed( + "Error creating the secret key synchronization node." + ) from e + except Exception as e: + raise XMPPInteractionFailed( + "Error fetching the secret key synchronization node." + ) from e + + try: + return cast(domish.Element, items[0]) + except IndexError: + return None + + async def export_secret_keys( + self, + client: SatXMPPClient, + secret_keys: Iterable[GPGSecretKey] + ) -> str: + """Export secret keys to synchronize them with other devices. + + @param client: The client. + @param secret_keys: The secret keys to export. + @return: The backup code needed to decrypt the exported secret keys. + @raise exceptions.FeatureNotFound: if the server lacks support for the required + protocols or protocol extensions. + @raise XMPPInteractionFailed: if any interaction via XMPP failed. + """ + + gpg_provider = get_gpg_provider(self.__sat, client) + + await self.__prepare_secret_key_synchronization(client) + + backup_code = generate_passphrase() + + plaintext = b"".join( + gpg_provider.backup_secret_key(secret_key) for secret_key in secret_keys + ) + + ciphertext = gpg_provider.encrypt_symmetrically(plaintext, backup_code) + + node = "urn:xmpp:openpgp:0:secret-key" + + secretkey_elt = domish.Element((NS_OX, "secretkey")) + secretkey_elt.addContent(base64.b64encode(ciphertext).decode("ASCII")) + + try: + await self.__xep_0060.sendItem( + client, + client.jid.userhostJID(), + node, + secretkey_elt + ) + except Exception as e: + raise XMPPInteractionFailed("Publishing the secret keys failed.") from e + + return backup_code + + async def download_secret_keys(self, client: SatXMPPClient) -> Optional[bytes]: + """Download previously exported secret keys to import them in a second step. + + The downloading and importing steps are separate since a backup code is required + for the import and it should be possible to try multiple backup codes without + redownloading the data every time. The second half of the import procedure is + provided by :meth:`import_secret_keys`. + + @param client: The client. + @return: The encrypted secret keys previously exported, if any. + @raise exceptions.FeatureNotFound: if the server lacks support for the required + protocols or protocol extensions. + @raise exceptions.ParsingError: on XML-level parsing errors. + @raise XMPPInteractionFailed: if any interaction via XMPP failed. + """ + + item_elt = await self.__prepare_secret_key_synchronization(client) + if item_elt is None: + return None + + secretkey_elt = cast( + Optional[domish.Element], + next(item_elt.elements(NS_OX, "secretkey"), None) + ) + + if secretkey_elt is None: + return None + + try: + SECRETKEY_SCHEMA.validate(secretkey_elt.toXml()) + except xmlschema.XMLSchemaValidationError as e: + raise exceptions.ParsingError( + "Publish-Subscribe item doesn't pass secretkey schema validation." + ) from e + + return base64.b64decode(str(secretkey_elt)) + + def import_secret_keys( + self, + client: SatXMPPClient, + ciphertext: bytes, + backup_code: str + ) -> Set[GPGSecretKey]: + """Import previously downloaded secret keys. + + The downloading and importing steps are separate since a backup code is required + for the import and it should be possible to try multiple backup codes without + redownloading the data every time. The first half of the import procedure is + provided by :meth:`download_secret_keys`. + + @param client: The client to perform this operation with. + @param ciphertext: The ciphertext, i.e. the data returned by + :meth:`download_secret_keys`. + @param backup_code: The backup code needed to decrypt the data. + @raise InvalidPacket: if one of the GPG packets building the secret key data is + either syntactically or semantically deemed invalid. + @raise DecryptionFailed: on decryption failure. + """ + + gpg_provider = get_gpg_provider(self.__sat, client) + + return gpg_provider.restore_secret_keys(gpg_provider.decrypt_symmetrically( + ciphertext, + backup_code + )) + + @staticmethod + def __get_joined_muc_users( + client: SatXMPPClient, + xep_0045: XEP_0045, + room_jid: jid.JID + ) -> Set[jid.JID]: + """ + @param client: The client. + @param xep_0045: A MUC plugin instance. + @param room_jid: The room JID. + @return: A set containing the bare JIDs of the MUC participants. + @raise InternalError: if the MUC is not joined or the entity information of a + participant isn't available. + """ + # TODO: This should probably be a global helper somewhere + + bare_jids: Set[jid.JID] = set() + + try: + room = cast(muc.Room, xep_0045.getRoom(client, room_jid)) + except exceptions.NotFound as e: + raise exceptions.InternalError( + "Participant list of unjoined MUC requested." + ) from e + + for user in cast(Dict[str, muc.User], room.roster).values(): + entity = cast(Optional[SatXMPPEntity], user.entity) + if entity is None: + raise exceptions.InternalError( + f"Participant list of MUC requested, but the entity information of" + f" the participant {user} is not available." + ) + + bare_jids.add(entity.jid.userhostJID()) + + return bare_jids + + async def get_trust( + self, + client: SatXMPPClient, + public_key: GPGPublicKey, + owner: jid.JID + ) -> TrustLevel: + """Query the trust level of a public key. + + @param client: The client to perform this operation under. + @param public_key: The public key. + @param owner: The owner of the public key. Can be a bare JID. + @return: The trust level. + """ + + key = f"/trust/{owner.userhost()}/{public_key.fingerprint}" + + try: + return TrustLevel(await self.__storage[client.profile][key]) + except KeyError: + return TrustLevel.UNDECIDED + + async def set_trust( + self, + client: SatXMPPClient, + public_key: GPGPublicKey, + owner: jid.JID, + trust_level: TrustLevel + ) -> None: + """Set the trust level of a public key. + + @param client: The client to perform this operation under. + @param public_key: The public key. + @param owner: The owner of the public key. Can be a bare JID. + @param trust_leve: The trust level. + """ + + key = f"/trust/{owner.userhost()}/{public_key.fingerprint}" + + await self.__storage[client.profile].force(key, trust_level.name) + + async def getTrustUI( # pylint: disable=invalid-name + self, + client: SatXMPPClient, + entity: jid.JID + ) -> xml_tools.XMLUI: + """ + @param client: The client. + @param entity: The entity whose device trust levels to manage. + @return: An XMLUI instance which opens a form to manage the trust level of all + devices belonging to the entity. + """ + + if entity.resource: + raise ValueError("A bare JID is expected.") + + bare_jids: Set[jid.JID] + if self.__xep_0045 is not None and self.__xep_0045.isJoinedRoom(client, entity): + bare_jids = self.__get_joined_muc_users(client, self.__xep_0045, entity) + else: + bare_jids = { entity.userhostJID() } + + all_public_keys = list({ + bare_jid: list(self.list_public_keys(client, bare_jid)) + for bare_jid + in bare_jids + }.items()) + + async def callback( + data: Any, + profile: str # pylint: disable=unused-argument + ) -> Dict[Never, Never]: + """ + @param data: The XMLUI result produces by the trust UI form. + @param profile: The profile. + @return: An empty dictionary. The type of the return value was chosen + conservatively since the exact options are neither known not needed here. + """ + + if C.bool(data.get("cancelled", "false")): + return {} + + data_form_result = cast( + Dict[str, str], + xml_tools.XMLUIResult2DataFormResult(data) + ) + for key, value in data_form_result.items(): + if not key.startswith("trust_"): + continue + + outer_index, inner_index = key.split("_")[1:] + + owner, public_keys = all_public_keys[int(outer_index)] + public_key = public_keys[int(inner_index)] + trust = TrustLevel(value) + + if (await self.get_trust(client, public_key, owner)) is not trust: + await self.set_trust(client, public_key, owner, value) + + return {} + + submit_id = self.__sat.registerCallback(callback, with_data=True, one_shot=True) + + result = xml_tools.XMLUI( + panel_type=C.XMLUI_FORM, + title=D_("OX trust management"), + submit_id=submit_id + ) + # Casting this to Any, otherwise all calls on the variable cause type errors + # pylint: disable=no-member + trust_ui = cast(Any, result) + trust_ui.addText(D_( + "This is OX trusting system. You'll see below the GPG keys of your " + "contacts, and a list selection to trust them or not. A trusted key " + "can read your messages in plain text, so be sure to only validate " + "keys that you are sure are belonging to your contact. It's better " + "to do this when you are next to your contact, so " + "you can check the \"fingerprint\" of the key " + "yourself. Do *not* validate a key if the fingerprint is wrong!" + )) + + own_secret_keys = self.list_secret_keys(client) + + trust_ui.changeContainer("label") + for index, secret_key in enumerate(own_secret_keys): + trust_ui.addLabel(D_(f"Own secret key {index} fingerprint")) + trust_ui.addText(secret_key.public_key.fingerprint) + trust_ui.addEmpty() + trust_ui.addEmpty() + + for outer_index, [ owner, public_keys ] in enumerate(all_public_keys): + for inner_index, public_key in enumerate(public_keys): + trust_ui.addLabel(D_("Contact")) + trust_ui.addJid(jid.JID(owner)) + trust_ui.addLabel(D_("Fingerprint")) + trust_ui.addText(public_key.fingerprint) + trust_ui.addLabel(D_("Trust this device?")) + + current_trust_level = await self.get_trust(client, public_key, owner) + avaiable_trust_levels = \ + { TrustLevel.DISTRUSTED, TrustLevel.TRUSTED, current_trust_level } + + trust_ui.addList( + f"trust_{outer_index}_{inner_index}", + options=[ trust_level.name for trust_level in avaiable_trust_levels ], + selected=current_trust_level.name, + styles=[ "inline" ] + ) + + trust_ui.addEmpty() + trust_ui.addEmpty() + + return result
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_xep_0374.py Tue Sep 20 16:22:18 2022 +0200 @@ -0,0 +1,421 @@ +#!/usr/bin/env python3 + +# Libervia plugin for OpenPGP for XMPP Instant Messaging +# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from typing import Dict, Optional, Set, cast + +from typing_extensions import Final +from wokkel import muc # type: ignore[import] + +from sat.core import exceptions +from sat.core.constants import Const as C +from sat.core.core_types import SatXMPPEntity +from sat.core.i18n import _, D_ +from sat.core.log import getLogger, Logger +from sat.core.sat_main import SAT +from sat.core.xmpp import SatXMPPClient +from sat.plugins.plugin_xep_0045 import XEP_0045 +from sat.plugins.plugin_xep_0334 import XEP_0334 +from sat.plugins.plugin_xep_0373 import NS_OX, XEP_0373, TrustLevel +from sat.tools import xml_tools +from twisted.internet import defer +from twisted.words.protocols.jabber import jid +from twisted.words.xish import domish + + +__all__ = [ # pylint: disable=unused-variable + "PLUGIN_INFO", + "XEP_0374", + "NS_OXIM" +] + + +log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call] + + +PLUGIN_INFO = { + C.PI_NAME: "OXIM", + C.PI_IMPORT_NAME: "XEP-0374", + C.PI_TYPE: "SEC", + C.PI_PROTOCOLS: [ "XEP-0374" ], + C.PI_DEPENDENCIES: [ "XEP-0334", "XEP-0373" ], + C.PI_RECOMMENDATIONS: [ "XEP-0045" ], + C.PI_MAIN: "XEP_0374", + C.PI_HANDLER: "no", + C.PI_DESCRIPTION: _("""Implementation of OXIM"""), +} + + +# The disco feature +NS_OXIM: Final = "urn:xmpp:openpgp:im:0" + + +class XEP_0374: + """ + Plugin equipping Libervia with OXIM capabilities under the ``urn:xmpp:openpgp:im:0`` + namespace. MUC messages are supported next to one to one messages. For trust + management, the two trust models "BTBV" and "manual" are supported. + """ + + def __init__(self, sat: SAT) -> None: + """ + @param sat: The SAT instance. + """ + + self.__sat = sat + + # Plugins + self.__xep_0045 = cast(Optional[XEP_0045], sat.plugins.get("XEP-0045")) + self.__xep_0334 = cast(XEP_0334, sat.plugins["XEP-0334"]) + self.__xep_0373 = cast(XEP_0373, sat.plugins["XEP-0373"]) + + # Triggers + sat.trigger.add( + "messageReceived", + self.__message_received_trigger, + priority=100050 + ) + sat.trigger.add("send", self.__send_trigger, priority=0) + + # Register the encryption plugin + sat.registerEncryptionPlugin(self, "OXIM", NS_OX, 102) + + async def getTrustUI( # pylint: disable=invalid-name + self, + client: SatXMPPClient, + entity: jid.JID + ) -> xml_tools.XMLUI: + """ + @param client: The client. + @param entity: The entity whose device trust levels to manage. + @return: An XMLUI instance which opens a form to manage the trust level of all + devices belonging to the entity. + """ + + return await self.__xep_0373.getTrustUI(client, entity) + + @staticmethod + def __get_joined_muc_users( + client: SatXMPPClient, + xep_0045: XEP_0045, + room_jid: jid.JID + ) -> Set[jid.JID]: + """ + @param client: The client. + @param xep_0045: A MUC plugin instance. + @param room_jid: The room JID. + @return: A set containing the bare JIDs of the MUC participants. + @raise InternalError: if the MUC is not joined or the entity information of a + participant isn't available. + """ + + bare_jids: Set[jid.JID] = set() + + try: + room = cast(muc.Room, xep_0045.getRoom(client, room_jid)) + except exceptions.NotFound as e: + raise exceptions.InternalError( + "Participant list of unjoined MUC requested." + ) from e + + for user in cast(Dict[str, muc.User], room.roster).values(): + entity = cast(Optional[SatXMPPEntity], user.entity) + if entity is None: + raise exceptions.InternalError( + f"Participant list of MUC requested, but the entity information of" + f" the participant {user} is not available." + ) + + bare_jids.add(entity.jid.userhostJID()) + + return bare_jids + + async def __message_received_trigger( + self, + client: SatXMPPClient, + message_elt: domish.Element, + post_treat: defer.Deferred + ) -> bool: + """ + @param client: The client which received the message. + @param message_elt: The message element. Can be modified. + @param post_treat: A deferred which evaluates to a :class:`MessageData` once the + message has fully progressed through the message receiving flow. Can be used + to apply treatments to the fully processed message, like marking it as + encrypted. + @return: Whether to continue the message received flow. + """ + sender_jid = jid.JID(message_elt["from"]) + feedback_jid: jid.JID + + message_type = message_elt.getAttribute("type", "unknown") + is_muc_message = message_type == C.MESS_TYPE_GROUPCHAT + if is_muc_message: + if self.__xep_0045 is None: + log.warning( + "Ignoring MUC message since plugin XEP-0045 is not available." + ) + # Can't handle a MUC message without XEP-0045, let the flow continue + # normally + return True + + room_jid = feedback_jid = sender_jid.userhostJID() + + try: + room = cast(muc.Room, self.__xep_0045.getRoom(client, room_jid)) + except exceptions.NotFound: + log.warning( + f"Ignoring MUC message from a room that has not been joined:" + f" {room_jid}" + ) + # Whatever, let the flow continue + return True + + sender_user = cast(Optional[muc.User], room.getUser(sender_jid.resource)) + if sender_user is None: + log.warning( + f"Ignoring MUC message from room {room_jid} since the sender's user" + f" wasn't found {sender_jid.resource}" + ) + # Whatever, let the flow continue + return True + + sender_user_jid = cast(Optional[jid.JID], sender_user.entity) + if sender_user_jid is None: + log.warning( + f"Ignoring MUC message from room {room_jid} since the sender's bare" + f" JID couldn't be found from its user information: {sender_user}" + ) + # Whatever, let the flow continue + return True + + sender_jid = sender_user_jid + else: + # I'm not sure why this check is required, this code is copied from XEP-0384 + if sender_jid.userhostJID() == client.jid.userhostJID(): + # TODO: I've seen this cause an exception "builtins.KeyError: 'to'", seems + # like "to" isn't always set. + feedback_jid = jid.JID(message_elt["to"]) + else: + feedback_jid = sender_jid + + sender_bare_jid = sender_jid.userhost() + + openpgp_elt = cast(Optional[domish.Element], next( + message_elt.elements(NS_OX, "openpgp"), + None + )) + + if openpgp_elt is None: + # None of our business, let the flow continue + return True + + try: + payload_elt, timestamp = await self.__xep_0373.unpack_openpgp_element( + client, + openpgp_elt, + "signcrypt", + jid.JID(sender_bare_jid) + ) + except Exception as e: + # TODO: More specific exception handling + log.warning(_("Can't decrypt message: {reason}\n{xml}").format( + reason=e, + xml=message_elt.toXml() + )) + client.feedback( + feedback_jid, + D_( + f"An OXIM message from {sender_jid.full()} can't be decrypted:" + f" {e}" + ), + { C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR } + ) + # No point in further processing this message + return False + + message_elt.children.remove(openpgp_elt) + + log.debug(f"OXIM message of type {message_type} received from {sender_bare_jid}") + + # Remove all body elements from the original element, since those act as + # fallbacks in case the encryption protocol is not supported + for child in message_elt.elements(): + if child.name == "body": + message_elt.children.remove(child) + + # Move all extension elements from the payload to the stanza root + # TODO: There should probably be explicitly forbidden elements here too, just as + # for XEP-0420 + for child in list(payload_elt.elements()): + # Remove the child from the content element + payload_elt.children.remove(child) + + # Add the child to the stanza + message_elt.addChild(child) + + # Mark the message as trusted or untrusted. Undecided counts as untrusted here. + trust_level = TrustLevel.UNDECIDED # TODO: Load the actual trust level + if trust_level is TrustLevel.TRUSTED: + post_treat.addCallback(client.encryption.markAsTrusted) + else: + post_treat.addCallback(client.encryption.markAsUntrusted) + + # Mark the message as originally encrypted + post_treat.addCallback( + client.encryption.markAsEncrypted, + namespace=NS_OX + ) + + # Message processed successfully, continue with the flow + return True + + async def __send_trigger(self, client: SatXMPPClient, stanza: domish.Element) -> bool: + """ + @param client: The client sending this message. + @param stanza: The stanza that is about to be sent. Can be modified. + @return: Whether the send message flow should continue or not. + """ + # OXIM only handles message stanzas + if stanza.name != "message": + return True + + # Get the intended recipient + recipient = stanza.getAttribute("to", None) + if recipient is None: + raise exceptions.InternalError( + f"Message without recipient encountered. Blocking further processing to" + f" avoid leaking plaintext data: {stanza.toXml()}" + ) + + # Parse the JID + recipient_bare_jid = jid.JID(recipient).userhostJID() + + # Check whether encryption with OXIM is requested + encryption = client.encryption.getSession(recipient_bare_jid) + + if encryption is None: + # Encryption is not requested for this recipient + return True + + if encryption["plugin"].namespace != NS_OX: + # Encryption is requested for this recipient, but not with OXIM + return True + + # All pre-checks done, we can start encrypting! + await self.__encrypt( + client, + stanza, + recipient_bare_jid, + stanza.getAttribute("type", "unkown") == C.MESS_TYPE_GROUPCHAT + ) + + # Add a store hint if this is a message stanza + self.__xep_0334.addHintElements(stanza, [ "store" ]) + + # Let the flow continue. + return True + + async def __encrypt( + self, + client: SatXMPPClient, + stanza: domish.Element, + recipient_jid: jid.JID, + is_muc_message: bool + ) -> None: + """ + @param client: The client. + @param stanza: The stanza, which is modified by this call. + @param recipient_jid: The JID of the recipient. Can be a bare (aka "userhost") JID + but doesn't have to. + @param is_muc_message: Whether the stanza is a message stanza to a MUC room. + + @warning: The calling code MUST take care of adding the store message processing + hint to the stanza if applicable! This can be done before or after this call, + the order doesn't matter. + """ + + recipient_bare_jids: Set[jid.JID] + feedback_jid: jid.JID + + if is_muc_message: + if self.__xep_0045 is None: + raise exceptions.InternalError( + "Encryption of MUC message requested, but plugin XEP-0045 is not" + " available." + ) + + room_jid = feedback_jid = recipient_jid.userhostJID() + + recipient_bare_jids = self.__get_joined_muc_users( + client, + self.__xep_0045, + room_jid + ) + else: + recipient_bare_jids = { recipient_jid.userhostJID() } + feedback_jid = recipient_jid.userhostJID() + + log.debug( + f"Intercepting message that is to be encrypted by {NS_OX} for" + f" {recipient_bare_jids}" + ) + + signcrypt_elt, payload_elt = \ + self.__xep_0373.build_signcrypt_element(recipient_bare_jids) + + # Move elements from the stanza to the content element. + # TODO: There should probably be explicitly forbidden elements here too, just as + # for XEP-0420 + for child in list(stanza.elements()): + # Remove the child from the stanza + stanza.children.remove(child) + + # A namespace of ``None`` can be used on domish elements to inherit the + # namespace from the parent. When moving elements from the stanza root to + # the content element, however, we don't want elements to inherit the + # namespace of the content element. Thus, check for elements with ``None`` + # for their namespace and set the namespace to jabber:client, which is the + # namespace of the parent element. + if child.uri is None: + child.uri = C.NS_CLIENT + child.defaultUri = C.NS_CLIENT + + # Add the child with corrected namespaces to the content element + payload_elt.addChild(child) + + try: + openpgp_elt = await self.__xep_0373.build_openpgp_element( + client, + signcrypt_elt, + recipient_bare_jids + ) + except Exception as e: + msg = _( + # pylint: disable=consider-using-f-string + "Can't encrypt message for {entities}: {reason}".format( + entities=', '.join(jid.userhost() for jid in recipient_bare_jids), + reason=e + ) + ) + log.warning(msg) + client.feedback(feedback_jid, msg, { + C.MESS_EXTRA_INFO: C.EXTRA_INFO_ENCR_ERR + }) + raise e + + stanza.addChild(openpgp_elt)
--- a/sat/plugins/plugin_xep_0384.py Mon Oct 10 15:23:59 2022 +0200 +++ b/sat/plugins/plugin_xep_0384.py Tue Sep 20 16:22:18 2022 +0200 @@ -479,10 +479,10 @@ xml_tools.et_elt_2_domish_elt(element), item_id=str(bundle.device_id), extra={ - xep_0060.EXTRA_PUBLISH_OPTIONS: { - xep_0060.OPT_MAX_ITEMS: "max" + XEP_0060.EXTRA_PUBLISH_OPTIONS: { + XEP_0060.OPT_MAX_ITEMS: "max" }, - xep_0060.EXTRA_ON_PRECOND_NOT_MET: "raise" + XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "raise" } ) except (error.StanzaError, Exception) as e: @@ -519,8 +519,8 @@ xml_tools.et_elt_2_domish_elt(element), item_id=xep_0060.ID_SINGLETON, extra={ - xep_0060.EXTRA_PUBLISH_OPTIONS: { xep_0060.OPT_MAX_ITEMS: 1 }, - xep_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options" + XEP_0060.EXTRA_PUBLISH_OPTIONS: { XEP_0060.OPT_MAX_ITEMS: 1 }, + XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options" } ) except Exception as e: @@ -546,7 +546,6 @@ client, jid.JID(bare_jid), node, - max_items=None, item_ids=[ str(device_id) ] ) except Exception as e: @@ -653,11 +652,11 @@ xml_tools.et_elt_2_domish_elt(element), item_id=xep_0060.ID_SINGLETON, extra={ - xep_0060.EXTRA_PUBLISH_OPTIONS: { - xep_0060.OPT_MAX_ITEMS: 1, - xep_0060.OPT_ACCESS_MODEL: "open" + XEP_0060.EXTRA_PUBLISH_OPTIONS: { + XEP_0060.OPT_MAX_ITEMS: 1, + XEP_0060.OPT_ACCESS_MODEL: "open" }, - xep_0060.EXTRA_ON_PRECOND_NOT_MET: "raise" + XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "raise" } ) except (error.StanzaError, Exception) as e:
--- a/sat/plugins/plugin_xep_0420.py Mon Oct 10 15:23:59 2022 +0200 +++ b/sat/plugins/plugin_xep_0420.py Tue Sep 20 16:22:18 2022 +0200 @@ -22,8 +22,10 @@ import secrets import string from typing import Dict, NamedTuple, Optional, Set, Tuple, cast +from typing_extensions import Final from lxml import etree +from sat.core import exceptions from sat.core.constants import Const as C from sat.core.i18n import D_ @@ -68,7 +70,7 @@ } -NS_SCE = "urn:xmpp:sce:1" +NS_SCE: Final = "urn:xmpp:sce:1" class ProfileRequirementsNotMet(Exception): @@ -114,7 +116,8 @@ remain. Do not modify. @return: An affix element to include in the envelope. The element must have the name :attr:`element_name` and must validate using :attr:`element_schema`. - @raise ValueError: if the affix couldn't be built. + @raise ValueError: if the affix couldn't be built due to missing information on + the stanza. """ @abstractmethod @@ -384,7 +387,7 @@ by the decryption scheme utilizing SCE. @return: The parsed and processed values of all affixes that were present on the envelope, notably including the timestamp. - @raise ValueError: if the serialized envelope element is malformed. + @raise exceptions.ParsingError: if the serialized envelope element is malformed. @raise ProfileRequirementsNotMet: if one or more affixes required by the profile are missing from the envelope. @raise AffixVerificationFailed: if an affix included in the envelope fails to @@ -399,7 +402,9 @@ try: envelope_serialized_string = envelope_serialized.decode("utf-8") except UnicodeError as e: - raise ValueError("Serialized envelope can't bare parsed as utf-8.") from e + raise exceptions.ParsingError( + "Serialized envelope can't bare parsed as utf-8." + ) from e custom_affixes = set(profile.custom_policies.keys()) @@ -420,7 +425,9 @@ try: etree.fromstring(envelope_serialized_string, parser) except etree.XMLSyntaxError as e: - raise ValueError("Serialized envelope doesn't pass schema validation.") from e + raise exceptions.ParsingError( + "Serialized envelope doesn't pass schema validation." + ) from e # Prepare the envelope and content elements envelope = cast(domish.Element, ElementParser()(envelope_serialized_string)) @@ -452,7 +459,7 @@ timestamp_value = None if time_element is None else \ XEP_0082.parse_datetime(time_element["stamp"]) except ValueError as e: - raise AffixVerificationFailed("Malformed time affix") from e + raise AffixVerificationFailed("Malformed time affix.") from e # The to affix is verified by comparing the to attribute of the stanza with the # JID referenced by the affix. Note that only bare JIDs are compared as per the
--- a/sat/tools/xmpp_datetime.py Mon Oct 10 15:23:59 2022 +0200 +++ b/sat/tools/xmpp_datetime.py Tue Sep 20 16:22:18 2022 +0200 @@ -80,12 +80,15 @@ @param value: A string containing date information formatted according to the Date profile specified in XEP-0082. @return: The date parsed from the input string. - @raise ValueError: if the input string is not correctly formatted. + @raise exceptions.ParsingError: if the input string is not correctly formatted. """ # CCYY-MM-DD # The Date profile of XEP-0082 is equal to the ISO 8601 format. - return date.fromisoformat(value) + try: + return date.fromisoformat(value) + except ValueError as e: + raise exceptions.ParsingError() from e def format_datetime( @@ -125,13 +128,16 @@ @param value: A string containing datetime information formatted according to the DateTime profile specified in XEP-0082. @return: The datetime parsed from the input string. - @raise ValueError: if the input string is not correctly formatted. + @raise exceptions.ParsingError: if the input string is not correctly formatted. """ # CCYY-MM-DDThh:mm:ss[.sss]TZD value, microsecond = __parse_fraction_of_a_second(value) - result = datetime.strptime(value, "%Y-%m-%dT%H:%M:%S%z") + try: + result = datetime.strptime(value, "%Y-%m-%dT%H:%M:%S%z") + except ValueError as e: + raise exceptions.ParsingError() from e if microsecond is not None: result = result.replace(microsecond=microsecond) @@ -167,7 +173,7 @@ @param value: A string containing time information formatted according to the Time profile specified in XEP-0082. @return: The time parsed from the input string. - @raise ValueError: if the input string is not correctly formatted. + @raise exceptions.ParsingError: if the input string is not correctly formatted. """ # hh:mm:ss[.sss][TZD] @@ -177,7 +183,10 @@ # profile, except that it doesn't handle the letter Z as time zone information for # UTC. This can be fixed with a simple string replacement of 'Z' with "+00:00", which # is another way to represent UTC. - result = time.fromisoformat(value.replace('Z', "+00:00")) + try: + result = time.fromisoformat(value.replace('Z', "+00:00")) + except ValueError as e: + raise exceptions.ParsingError() from e if microsecond is not None: result = result.replace(microsecond=microsecond)
--- a/setup.py Mon Oct 10 15:23:59 2022 +0200 +++ b/setup.py Tue Sep 20 16:22:18 2022 +0200 @@ -52,13 +52,14 @@ 'urwid-satext == 0.9.*', 'wokkel >= 18.0.0, < 19.0.0', 'omemo >= 1.0.0, < 2', - 'twomemo >= 1.0.0, < 2', - 'oldmemo >= 1.0.0, < 2', + 'twomemo[xml] >= 1.0.0, < 2', + 'oldmemo[xml] >= 1.0.0, < 2', 'pyyaml < 7.0.0', 'sqlalchemy >= 1.4', 'alembic', 'aiosqlite', 'txdbus', + 'xmlschema', ] extras_require = {
--- a/tests/unit/test_plugin_xep_0082.py Mon Oct 10 15:23:59 2022 +0200 +++ b/tests/unit/test_plugin_xep_0082.py Tue Sep 20 16:22:18 2022 +0200 @@ -19,6 +19,7 @@ from datetime import date, datetime, time, timezone import pytest +from sat.core import exceptions from sat.plugins.plugin_xep_0082 import XEP_0082 @@ -104,7 +105,7 @@ assert XEP_0082.parse_datetime("1969-07-20T21:56:15-05:00") == value # Without timezone, without a fraction of a second - with pytest.raises(ValueError): + with pytest.raises(exceptions.ParsingError): XEP_0082.parse_datetime("1969-07-21T02:56:15") # With timezone 'Z', with a fraction of a second consisting of two digits @@ -123,7 +124,7 @@ assert XEP_0082.parse_datetime("1969-07-20T21:56:15.-05:00") == value # Without timezone, with a fraction of a second consisting of six digits - with pytest.raises(ValueError): + with pytest.raises(exceptions.ParsingError): XEP_0082.parse_datetime("1969-07-21T02:56:15.123456")
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/unit/test_plugin_xep_0373.py Tue Sep 20 16:22:18 2022 +0200 @@ -0,0 +1,146 @@ +from datetime import datetime, timedelta, timezone +from sat.plugins.plugin_xep_0373 import XEP_0373, NS_OX +from sat.tools.xmpp_datetime import parse_datetime + +import pytest +from twisted.words.protocols.jabber import jid + + +a = jid.JID("foo@example.com") +b = jid.JID("bar@example.com") + + +def test_signcrypt_element_args() -> None: + with pytest.raises(ValueError): + XEP_0373.build_signcrypt_element([]) + + +def test_signcrypt_element() -> None: + signcrypt_elt, payload_elt = XEP_0373.build_signcrypt_element([ a, b ]) + payload_elt.addElement("signcrypt-test-content", content="signcrypt test content") + + rpad_elt = next(signcrypt_elt.elements(NS_OX, "rpad")) + time_elt = next(signcrypt_elt.elements(NS_OX, "time")) + + rpad = str(rpad_elt) + timestamp = parse_datetime(time_elt["stamp"]) + + signcrypt_elt.children.remove(rpad_elt) + signcrypt_elt.children.remove(time_elt) + + assert rpad + assert (datetime.now(timezone.utc) - timestamp) < timedelta(seconds=10) + assert signcrypt_elt.toXml() == ( + "<signcrypt xmlns='urn:xmpp:openpgp:0'>" + "<to jid='foo@example.com'/>" + "<to jid='bar@example.com'/>" + "<payload>" + "<signcrypt-test-content>signcrypt test content</signcrypt-test-content>" + "</payload>" + "</signcrypt>" + ) + + +def test_sign_element_args() -> None: + with pytest.raises(ValueError): + XEP_0373.build_sign_element([], True) + + +def test_sign_element_with_rpad() -> None: + sign_elt, payload_elt = XEP_0373.build_sign_element([ a, b ], True) + payload_elt.addElement("sign-test-content", content="sign test content") + + rpad_elt = next(sign_elt.elements(NS_OX, "rpad")) + time_elt = next(sign_elt.elements(NS_OX, "time")) + + rpad = str(rpad_elt) + timestamp = parse_datetime(time_elt["stamp"]) + + sign_elt.children.remove(rpad_elt) + sign_elt.children.remove(time_elt) + + assert rpad + assert (datetime.now(timezone.utc) - timestamp) < timedelta(seconds=10) + assert sign_elt.toXml() == ( + "<sign xmlns='urn:xmpp:openpgp:0'>" + "<to jid='foo@example.com'/>" + "<to jid='bar@example.com'/>" + "<payload>" + "<sign-test-content>sign test content</sign-test-content>" + "</payload>" + "</sign>" + ) + + +def test_sign_element_without_rpad() -> None: + sign_elt, payload_elt = XEP_0373.build_sign_element([ a, b ], False) + payload_elt.addElement("sign-test-content", content="sign test content") + + rpad_elt = next(sign_elt.elements(NS_OX, "rpad"), None) + time_elt = next(sign_elt.elements(NS_OX, "time")) + + timestamp = parse_datetime(time_elt["stamp"]) + + sign_elt.children.remove(time_elt) + + assert rpad_elt is None + assert (datetime.now(timezone.utc) - timestamp) < timedelta(seconds=10) + assert sign_elt.toXml() == ( + "<sign xmlns='urn:xmpp:openpgp:0'>" + "<to jid='foo@example.com'/>" + "<to jid='bar@example.com'/>" + "<payload>" + "<sign-test-content>sign test content</sign-test-content>" + "</payload>" + "</sign>" + ) + + +def test_crypt_element_with_recipients() -> None: + crypt_elt, payload_elt = XEP_0373.build_crypt_element([ a, b ]) + payload_elt.addElement("crypt-test-content", content="crypt test content") + + rpad_elt = next(crypt_elt.elements(NS_OX, "rpad")) + time_elt = next(crypt_elt.elements(NS_OX, "time")) + + rpad = str(rpad_elt) + timestamp = parse_datetime(time_elt["stamp"]) + + crypt_elt.children.remove(rpad_elt) + crypt_elt.children.remove(time_elt) + + assert rpad + assert (datetime.now(timezone.utc) - timestamp) < timedelta(seconds=10) + assert crypt_elt.toXml() == ( + "<crypt xmlns='urn:xmpp:openpgp:0'>" + "<to jid='foo@example.com'/>" + "<to jid='bar@example.com'/>" + "<payload>" + "<crypt-test-content>crypt test content</crypt-test-content>" + "</payload>" + "</crypt>" + ) + + +def test_crypt_element_without_recipients() -> None: + crypt_elt, payload_elt = XEP_0373.build_crypt_element([]) + payload_elt.addElement("crypt-test-content", content="crypt test content") + + rpad_elt = next(crypt_elt.elements(NS_OX, "rpad")) + time_elt = next(crypt_elt.elements(NS_OX, "time")) + + rpad = str(rpad_elt) + timestamp = parse_datetime(time_elt["stamp"]) + + crypt_elt.children.remove(rpad_elt) + crypt_elt.children.remove(time_elt) + + assert rpad + assert (datetime.now(timezone.utc) - timestamp) < timedelta(seconds=10) + assert crypt_elt.toXml() == ( + "<crypt xmlns='urn:xmpp:openpgp:0'>" + "<payload>" + "<crypt-test-content>crypt test content</crypt-test-content>" + "</payload>" + "</crypt>" + )
--- a/tests/unit/test_plugin_xep_0420.py Mon Oct 10 15:23:59 2022 +0200 +++ b/tests/unit/test_plugin_xep_0420.py Tue Sep 20 16:22:18 2022 +0200 @@ -20,6 +20,7 @@ from typing import Callable, cast import pytest +from sat.core import exceptions from sat.plugins.plugin_xep_0334 import NS_HINTS from sat.plugins.plugin_xep_0420 import ( @@ -435,7 +436,7 @@ </body> </message>""") - with pytest.raises(ValueError): + with pytest.raises(exceptions.ParsingError): XEP_0420.unpack_stanza( unpacking_profile, stanza, @@ -558,5 +559,5 @@ <unknown-affix unknown-attr="unknown"/> </envelope>""".encode("utf-8") - with pytest.raises(ValueError): + with pytest.raises(exceptions.ParsingError): XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)