Mercurial > libervia-backend
diff sat/plugins/plugin_xep_0373.py @ 3933:cecf45416403
plugin XEP-0373 and XEP-0374: Implementation of OX and OXIM:
GPGME is used as the GPG provider.
rel 374
author | Syndace <me@syndace.dev> |
---|---|
date | Tue, 20 Sep 2022 16:22:18 +0200 |
parents | |
children | a92eef737703 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_xep_0373.py Tue Sep 20 16:22:18 2022 +0200 @@ -0,0 +1,2085 @@ +#!/usr/bin/env python3 + +# Libervia plugin for OpenPGP for XMPP +# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from abc import ABC, abstractmethod +import base64 +from datetime import datetime, timezone +import enum +import secrets +import string +from typing import Any, Dict, Iterable, List, Literal, Optional, Set, Tuple, cast +from xml.sax.saxutils import quoteattr + +from typing_extensions import Final, NamedTuple, Never, assert_never +from wokkel import muc, pubsub +from wokkel.disco import DiscoFeature, DiscoInfo +import xmlschema + +from sat.core import exceptions +from sat.core.constants import Const as C +from sat.core.core_types import SatXMPPEntity +from sat.core.i18n import _, D_ +from sat.core.log import getLogger, Logger +from sat.core.sat_main import SAT +from sat.core.xmpp import SatXMPPClient +from sat.memory import persistent +from sat.plugins.plugin_xep_0045 import XEP_0045 +from sat.plugins.plugin_xep_0060 import XEP_0060 +from sat.plugins.plugin_xep_0163 import XEP_0163 +from sat.tools.xmpp_datetime import format_datetime, parse_datetime +from sat.tools import xml_tools +from twisted.internet import defer +from twisted.words.protocols.jabber import jid +from twisted.words.xish import domish + +try: + import gpg +except ImportError as import_error: + raise exceptions.MissingModule( + "You are missing the 'gpg' package required by the OX plugin. The recommended" + " installation method is via your operating system's package manager, since the" + " version of the library has to match the version of your GnuPG installation. See" + " https://wiki.python.org/moin/GnuPrivacyGuard#Accessing_GnuPG_via_gpgme" + ) from import_error + + +__all__ = [ # pylint: disable=unused-variable + "PLUGIN_INFO", + "NS_OX", + "XEP_0373", + "VerificationError", + "XMPPInteractionFailed", + "InvalidPacket", + "DecryptionFailed", + "VerificationFailed", + "UnknownKey", + "GPGProviderError", + "GPGPublicKey", + "GPGSecretKey", + "GPGProvider", + "PublicKeyMetadata", + "gpg_provider", + "TrustLevel" +] + + +log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call] + + +PLUGIN_INFO = { + C.PI_NAME: "XEP-0373", + C.PI_IMPORT_NAME: "XEP-0373", + C.PI_TYPE: "SEC", + C.PI_PROTOCOLS: [ "XEP-0373" ], + C.PI_DEPENDENCIES: [ "XEP-0060", "XEP-0163" ], + C.PI_RECOMMENDATIONS: [], + C.PI_MAIN: "XEP_0373", + C.PI_HANDLER: "no", + C.PI_DESCRIPTION: D_("Implementation of OpenPGP for XMPP"), +} + + +NS_OX: Final = "urn:xmpp:openpgp:0" + + +PARAM_CATEGORY = "Security" +PARAM_NAME = "ox_policy" + + +class VerificationError(Exception): + """ + Raised by verifying methods of :class:`XEP_0373` on semantical verification errors. + """ + + +class XMPPInteractionFailed(Exception): + """ + Raised by methods of :class:`XEP_0373` on XMPP interaction failure. The reason this + exception exists is that the exceptions raised by XMPP interactions are not properly + documented for the most part, thus all exceptions are caught and wrapped in instances + of this class. + """ + + +class InvalidPacket(ValueError): + """ + Raised by methods of :class:`GPGProvider` when an invalid packet is encountered. + """ + + +class DecryptionFailed(Exception): + """ + Raised by methods of :class:`GPGProvider` on decryption failures. + """ + + +class VerificationFailed(Exception): + """ + Raised by methods of :class:`GPGProvider` on verification failures. + """ + + +class UnknownKey(ValueError): + """ + Raised by methods of :class:`GPGProvider` when an unknown key is referenced. + """ + + +class GPGProviderError(Exception): + """ + Raised by methods of :class:`GPGProvider` on internal errors. + """ + + +class GPGPublicKey(ABC): + """ + Interface describing a GPG public key. + """ + + @property + @abstractmethod + def fingerprint(self) -> str: + """ + @return: The OpenPGP v4 fingerprint string of this public key. + """ + + +class GPGSecretKey(ABC): + """ + Interface descibing a GPG secret key. + """ + + @property + @abstractmethod + def public_key(self) -> GPGPublicKey: + """ + @return: The public key corresponding to this secret key. + """ + + +class GPGProvider(ABC): + """ + Interface describing a GPG provider, i.e. a library or framework providing GPG + encryption, signing and key management. + + All methods may raise :class:`GPGProviderError` in addition to those exception types + listed explicitly. + + # TODO: Check keys for revoked, disabled and expired everywhere and exclude those (?) + """ + + @abstractmethod + def export_public_key(self, public_key: GPGPublicKey) -> bytes: + """Export a public key in a key material packet according to RFC 4880 §5.5. + + Do not use OpenPGP's ASCII Armor. + + @param public_key: The public key to export. + @return: The packet containing the exported public key. + @raise UnknownKey: if the public key is not available. + """ + + @abstractmethod + def import_public_key(self, packet: bytes) -> GPGPublicKey: + """Import a public key from a key material packet according to RFC 4880 §5.5. + + OpenPGP's ASCII Armor is not used. + + @param packet: A packet containing an exported public key. + @return: The public key imported from the packet. + @raise InvalidPacket: if the packet is either syntactically or semantically deemed + invalid. + + @warning: Only packets of version 4 or higher may be accepted, packets below + version 4 MUST be rejected. + """ + + @abstractmethod + def backup_secret_key(self, secret_key: GPGSecretKey) -> bytes: + """Export a secret key for transfer according to RFC 4880 §11.1. + + Do not encrypt the secret data, i.e. set the octet indicating string-to-key usage + conventions to zero in the corresponding secret-key packet according to RFC 4880 + §5.5.3. Do not use OpenPGP's ASCII Armor. + + @param secret_key: The secret key to export. + @return: The binary blob containing the exported secret key. + @raise UnknownKey: if the secret key is not available. + """ + + @abstractmethod + def restore_secret_keys(self, data: bytes) -> Set[GPGSecretKey]: + """Restore secret keys exported for transfer according to RFC 4880 §11.1. + + The secret data is not encrypted, i.e. the octet indicating string-to-key usage + conventions in the corresponding secret-key packets according to RFC 4880 §5.5.3 + are set to zero. OpenPGP's ASCII Armor is not used. + + @param data: Concatenation of one or more secret keys exported for transfer. + @return: The secret keys imported from the data. + @raise InvalidPacket: if the data or one of the packets included in the data is + either syntactically or semantically deemed invalid. + + @warning: Only packets of version 4 or higher may be accepted, packets below + version 4 MUST be rejected. + """ + + @abstractmethod + def encrypt_symmetrically(self, plaintext: bytes, password: str) -> bytes: + """Encrypt data symmetrically according to RFC 4880 §5.3. + + The password is used to build a Symmetric-Key Encrypted Session Key packet which + precedes the Symmetrically Encrypted Data packet that holds the encrypted data. + + @param plaintext: The data to encrypt. + @param password: The password to encrypt the data with. + @return: The encrypted data. + """ + + @abstractmethod + def decrypt_symmetrically(self, ciphertext: bytes, password: str) -> bytes: + """Decrypt data symmetrically according to RFC 4880 §5.3. + + The ciphertext consists of a Symmetrically Encrypted Data packet that holds the + encrypted data, preceded by a Symmetric-Key Encrypted Session Key packet using the + password. + + @param ciphertext: The ciphertext. + @param password: The password to decrypt the data with. + @return: The plaintext. + @raise DecryptionFailed: on decryption failure. + """ + + @abstractmethod + def sign(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes: + """Sign some data. + + OpenPGP's ASCII Armor is not used. + + @param data: The data to sign. + @param secret_keys: The secret keys to sign the data with. + @return: The OpenPGP message carrying the signed data. + """ + + @abstractmethod + def sign_detached(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes: + """Sign some data. Create the signature detached from the data. + + OpenPGP's ASCII Armor is not used. + + @param data: The data to sign. + @param secret_keys: The secret keys to sign the data with. + @return: The OpenPGP message carrying the detached signature. + """ + + @abstractmethod + def verify(self, signed_data: bytes, public_keys: Set[GPGPublicKey]) -> bytes: + """Verify signed data. + + OpenPGP's ASCII Armor is not used. + + @param signed_data: The signed data as an OpenPGP message. + @param public_keys: The public keys to verify the signature with. + @return: The verified and unpacked data. + @raise VerificationFailed: if the data could not be verified. + + @warning: For implementors: it has to be confirmed that a valid signature by one + of the public keys is available. + """ + + @abstractmethod + def verify_detached( + self, + data: bytes, + signature: bytes, + public_keys: Set[GPGPublicKey] + ) -> None: + """Verify signed data, where the signature was created detached from the data. + + OpenPGP's ASCII Armor is not used. + + @param data: The data. + @param signature: The signature as an OpenPGP message. + @param public_keys: The public keys to verify the signature with. + @raise VerificationFailed: if the data could not be verified. + + @warning: For implementors: it has to be confirmed that a valid signature by one + of the public keys is available. + """ + + @abstractmethod + def encrypt( + self, + plaintext: bytes, + public_keys: Set[GPGPublicKey], + signing_keys: Optional[Set[GPGSecretKey]] = None + ) -> bytes: + """Encrypt and optionally sign some data. + + OpenPGP's ASCII Armor is not used. + + @param plaintext: The data to encrypt and optionally sign. + @param public_keys: The public keys to encrypt the data for. + @param signing_keys: The secret keys to sign the data with. + @return: The OpenPGP message carrying the encrypted and optionally signed data. + """ + + @abstractmethod + def decrypt( + self, + ciphertext: bytes, + secret_keys: Set[GPGSecretKey], + public_keys: Optional[Set[GPGPublicKey]] = None + ) -> bytes: + """Decrypt and optionally verify some data. + + OpenPGP's ASCII Armor is not used. + + @param ciphertext: The encrypted and optionally signed data as an OpenPGP message. + @param secret_keys: The secret keys to attempt decryption with. + @param public_keys: The public keys to verify the optional signature with. + @return: The decrypted, optionally verified and unpacked data. + @raise DecryptionFailed: on decryption failure. + @raise VerificationFailed: if the data could not be verified. + + @warning: For implementors: it has to be confirmed that the data was decrypted + using one of the secret keys and that a valid signature by one of the public + keys is available in case the data is signed. + """ + + @abstractmethod + def list_public_keys(self, user_id: str) -> Set[GPGPublicKey]: + """List public keys. + + @param user_id: The user id. + @return: The set of public keys available for this user id. + """ + + @abstractmethod + def list_secret_keys(self, user_id: str) -> Set[GPGSecretKey]: + """List secret keys. + + @param user_id: The user id. + @return: The set of secret keys available for this user id. + """ + + @abstractmethod + def can_sign(self, public_key: GPGPublicKey) -> bool: + """ + @return: Whether the public key belongs to a key pair capable of signing. + """ + + @abstractmethod + def can_encrypt(self, public_key: GPGPublicKey) -> bool: + """ + @return: Whether the public key belongs to a key pair capable of encryption. + """ + + @abstractmethod + def create_key(self, user_id: str) -> GPGSecretKey: + """Create a new GPG key, capable of signing and encryption. + + The key is generated without password protection and without expiration. If a key + with the same user id already exists, a new key is created anyway. + + @param user_id: The user id to assign to the new key. + @return: The new key. + """ + + +class GPGME_GPGPublicKey(GPGPublicKey): + """ + GPG public key implementation based on GnuPG Made Easy (GPGME). + """ + + def __init__(self, key_obj: Any) -> None: + """ + @param key_obj: The GPGME key object. + """ + + self.__key_obj = key_obj + + @property + def fingerprint(self) -> str: + return self.__key_obj.fpr + + @property + def key_obj(self) -> Any: + return self.__key_obj + + +class GPGME_GPGSecretKey(GPGSecretKey): + """ + GPG secret key implementation based on GnuPG Made Easy (GPGME). + """ + + def __init__(self, public_key: GPGME_GPGPublicKey) -> None: + """ + @param public_key: The public key corresponding to this secret key. + """ + + self.__public_key = public_key + + @property + def public_key(self) -> GPGME_GPGPublicKey: + return self.__public_key + + +class GPGME_GPGProvider(GPGProvider): + """ + GPG provider implementation based on GnuPG Made Easy (GPGME). + """ + + def __init__(self, home_dir: Optional[str] = None) -> None: + """ + @param home_dir: Optional GPG home directory path to use for all operations. + """ + + self.__home_dir = home_dir + + def export_public_key(self, public_key: GPGPublicKey) -> bytes: + assert isinstance(public_key, GPGME_GPGPublicKey) + + pattern = public_key.fingerprint + + with gpg.Context(home_dir=self.__home_dir) as c: + try: + result = c.key_export_minimal(pattern) + except gpg.errors.GPGMEError as e: + raise GPGProviderError("Internal GPGME error") from e + + if result is None: + raise UnknownKey(f"Public key {pattern} not found.") + + return result + + def import_public_key(self, packet: bytes) -> GPGPublicKey: + # TODO + # - Reject packets older than version 4 + # - Check whether it's actually a public key (through packet inspection?) + + with gpg.Context(home_dir=self.__home_dir) as c: + try: + result = c.key_import(packet) + except gpg.errors.GPGMEError as e: + # From looking at the code, `key_import` never raises. The documentation + # says it does though, so this is included for future-proofness. + raise GPGProviderError("Internal GPGME error") from e + + if not hasattr(result, "considered"): + raise InvalidPacket( + f"Data not considered for public key import: {result}" + ) + + if len(result.imports) != 1: + raise InvalidPacket( + "Public key packet does not contain exactly one public key (not" + " counting subkeys)." + ) + + try: + key_obj = c.get_key(result.imports[0].fpr, secret=False) + except gpg.errors.GPGMEError as e: + raise GPGProviderError("Internal GPGME error") from e + except gpg.errors.KeyError as e: + raise GPGProviderError("Newly imported public key not found") from e + + return GPGME_GPGPublicKey(key_obj) + + def backup_secret_key(self, secret_key: GPGSecretKey) -> bytes: + assert isinstance(secret_key, GPGME_GPGSecretKey) + # TODO + # - Handle password protection/pinentry + # - Make sure the key is exported unencrypted + + pattern = secret_key.public_key.fingerprint + + with gpg.Context(home_dir=self.__home_dir) as c: + try: + result = c.key_export_secret(pattern) + except gpg.errors.GPGMEError as e: + raise GPGProviderError("Internal GPGME error") from e + + if result is None: + raise UnknownKey(f"Secret key {pattern} not found.") + + return result + + def restore_secret_keys(self, data: bytes) -> Set[GPGSecretKey]: + # TODO + # - Reject packets older than version 4 + # - Check whether it's actually secret keys (through packet inspection?) + + with gpg.Context(home_dir=self.__home_dir) as c: + try: + result = c.key_import(data) + except gpg.errors.GPGMEError as e: + # From looking at the code, `key_import` never raises. The documentation + # says it does though, so this is included for future-proofness. + raise GPGProviderError("Internal GPGME error") from e + + if not hasattr(result, "considered"): + raise InvalidPacket( + f"Data not considered for secret key import: {result}" + ) + + if len(result.imports) == 0: + raise InvalidPacket("Secret key packet does not contain a secret key.") + + secret_keys = set() + for import_status in result.imports: + try: + key_obj = c.get_key(import_status.fpr, secret=True) + except gpg.errors.GPGMEError as e: + raise GPGProviderError("Internal GPGME error") from e + except gpg.errors.KeyError as e: + raise GPGProviderError("Newly imported secret key not found") from e + + secret_keys.add(GPGME_GPGSecretKey(GPGME_GPGPublicKey(key_obj))) + + return secret_keys + + def encrypt_symmetrically(self, plaintext: bytes, password: str) -> bytes: + with gpg.Context(home_dir=self.__home_dir) as c: + try: + ciphertext, __, __ = c.encrypt(plaintext, passphrase=password, sign=False) + except gpg.errors.GPGMEError as e: + raise GPGProviderError("Internal GPGME error") from e + + return ciphertext + + def decrypt_symmetrically(self, ciphertext: bytes, password: str) -> bytes: + with gpg.Context(home_dir=self.__home_dir) as c: + try: + plaintext, __, __ = c.decrypt( + ciphertext, + passphrase=password, + verify=False + ) + except gpg.errors.GPGMEError as e: + # TODO: Find out what kind of error is raised if the password is wrong and + # re-raise it as DecryptionFailed instead. + raise GPGProviderError("Internal GPGME error") from e + except gpg.UnsupportedAlgorithm as e: + raise DecryptionFailed("Unsupported algorithm") from e + + return plaintext + + def sign(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes: + signers = [] + for secret_key in secret_keys: + assert isinstance(secret_key, GPGME_GPGSecretKey) + + signers.append(secret_key.public_key.key_obj) + + with gpg.Context(home_dir=self.__home_dir, signers=signers) as c: + try: + signed_data, __ = c.sign(data) + except gpg.error.GPGMEError as e: + raise GPGProviderError("Internal GPGME error") from e + except gpg.errors.InvalidSigners as e: + raise GPGProviderError( + "At least one of the secret keys is invalid for signing" + ) from e + + return signed_data + + def sign_detached(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes: + signers = [] + for secret_key in secret_keys: + assert isinstance(secret_key, GPGME_GPGSecretKey) + + signers.append(secret_key.public_key.key_obj) + + with gpg.Context(home_dir=self.__home_dir, signers=signers) as c: + try: + signature, __ = c.sign(data, mode=gpg.constants.sig.mode.DETACH) + except gpg.error.GPGMEError as e: + raise GPGProviderError("Internal GPGME error") from e + except gpg.errors.InvalidSigners as e: + raise GPGProviderError( + "At least one of the secret keys is invalid for signing" + ) from e + + return signature + + def verify(self, signed_data: bytes, public_keys: Set[GPGPublicKey]) -> bytes: + with gpg.Context(home_dir=self.__home_dir) as c: + try: + data, result = c.verify(signed_data) + except gpg.errors.GPGMEError as e: + raise GPGProviderError("Internal GPGME error") from e + except gpg.errors.BadSignatures as e: + raise VerificationFailed("Bad signatures on signed data") from e + + valid_signature_found = False + for public_key in public_keys: + assert isinstance(public_key, GPGME_GPGPublicKey) + + for subkey in public_key.key_obj.subkeys: + for sig in result.signatures: + if subkey.can_sign and subkey.fpr == sig.fpr: + valid_signature_found = True + + if not valid_signature_found: + raise VerificationFailed( + "Data not signed by one of the expected public keys" + ) + + return data + + def verify_detached( + self, + data: bytes, + signature: bytes, + public_keys: Set[GPGPublicKey] + ) -> None: + with gpg.Context(home_dir=self.__home_dir) as c: + try: + __, result = c.verify(data, signature=signature) + except gpg.errors.GPGMEError as e: + raise GPGProviderError("Internal GPGME error") from e + except gpg.errors.BadSignatures as e: + raise VerificationFailed("Bad signatures on signed data") from e + + valid_signature_found = False + for public_key in public_keys: + assert isinstance(public_key, GPGME_GPGPublicKey) + + for subkey in public_key.key_obj.subkeys: + for sig in result.signatures: + if subkey.can_sign and subkey.fpr == sig.fpr: + valid_signature_found = True + + if not valid_signature_found: + raise VerificationFailed( + "Data not signed by one of the expected public keys" + ) + + def encrypt( + self, + plaintext: bytes, + public_keys: Set[GPGPublicKey], + signing_keys: Optional[Set[GPGSecretKey]] = None + ) -> bytes: + recipients = [] + for public_key in public_keys: + assert isinstance(public_key, GPGME_GPGPublicKey) + + recipients.append(public_key.key_obj) + + signers = [] + if signing_keys is not None: + for secret_key in signing_keys: + assert isinstance(secret_key, GPGME_GPGSecretKey) + + signers.append(secret_key.public_key.key_obj) + + sign = signing_keys is not None + + with gpg.Context(home_dir=self.__home_dir, signers=signers) as c: + try: + ciphertext, __, __ = c.encrypt( + plaintext, + recipients=recipients, + sign=sign, + always_trust=True, + add_encrypt_to=True + ) + except gpg.errors.GPGMEError as e: + raise GPGProviderError("Internal GPGME error") from e + except gpg.errors.InvalidRecipients as e: + raise GPGProviderError( + "At least one of the public keys is invalid for encryption" + ) from e + except gpg.errors.InvalidSigners as e: + raise GPGProviderError( + "At least one of the signing keys is invalid for signing" + ) from e + + return ciphertext + + def decrypt( + self, + ciphertext: bytes, + secret_keys: Set[GPGSecretKey], + public_keys: Optional[Set[GPGPublicKey]] = None + ) -> bytes: + verify = public_keys is not None + + with gpg.Context(home_dir=self.__home_dir) as c: + try: + plaintext, result, verify_result = c.decrypt( + ciphertext, + verify=verify + ) + except gpg.errors.GPGMEError as e: + raise GPGProviderError("Internal GPGME error") from e + except gpg.UnsupportedAlgorithm as e: + raise DecryptionFailed("Unsupported algorithm") from e + + # TODO: Check whether the data was decrypted using one of the expected secret + # keys + + if public_keys is not None: + valid_signature_found = False + for public_key in public_keys: + assert isinstance(public_key, GPGME_GPGPublicKey) + + for subkey in public_key.key_obj.subkeys: + for sig in verify_result.signatures: + if subkey.can_sign and subkey.fpr == sig.fpr: + valid_signature_found = True + + if not valid_signature_found: + raise VerificationFailed( + "Data not signed by one of the expected public keys" + ) + + return plaintext + + def list_public_keys(self, user_id: str) -> Set[GPGPublicKey]: + with gpg.Context(home_dir=self.__home_dir) as c: + try: + return { + GPGME_GPGPublicKey(key) + for key + in c.keylist(pattern=user_id, secret=False) + } + except gpg.errors.GPGMEError as e: + raise GPGProviderError("Internal GPGME error") from e + + def list_secret_keys(self, user_id: str) -> Set[GPGSecretKey]: + with gpg.Context(home_dir=self.__home_dir) as c: + try: + return { + GPGME_GPGSecretKey(GPGME_GPGPublicKey(key)) + for key + in c.keylist(pattern=user_id, secret=True) + } + except gpg.errors.GPGMEError as e: + raise GPGProviderError("Internal GPGME error") from e + + def can_sign(self, public_key: GPGPublicKey) -> bool: + assert isinstance(public_key, GPGME_GPGPublicKey) + + return any(subkey.can_sign for subkey in public_key.key_obj.subkeys) + + def can_encrypt(self, public_key: GPGPublicKey) -> bool: + assert isinstance(public_key, GPGME_GPGPublicKey) + + return any(subkey.can_encrypt for subkey in public_key.key_obj.subkeys) + + def create_key(self, user_id: str) -> GPGSecretKey: + with gpg.Context(home_dir=self.__home_dir) as c: + try: + result = c.create_key( + user_id, + expires=False, + sign=True, + encrypt=True, + certify=False, + authenticate=False, + force=True + ) + + key_obj = c.get_key(result.fpr, secret=True) + except gpg.errors.GPGMEError as e: + raise GPGProviderError("Internal GPGME error") from e + except gpg.errors.KeyError as e: + raise GPGProviderError("Newly created key not found") from e + + return GPGME_GPGSecretKey(GPGME_GPGPublicKey(key_obj)) + + +class PublicKeyMetadata(NamedTuple): + """ + Metadata about a published public key. + """ + + fingerprint: str + timestamp: datetime + + +@enum.unique +class TrustLevel(enum.Enum): + """ + The trust levels required for BTBV and manual trust. + """ + + TRUSTED: str = "TRUSTED" + BLINDLY_TRUSTED: str = "BLINDLY_TRUSTED" + UNDECIDED: str = "UNDECIDED" + DISTRUSTED: str = "DISTRUSTED" + + +OPENPGP_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?> +<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" + targetNamespace="urn:xmpp:openpgp:0" + xmlns="urn:xmpp:openpgp:0"> + + <xs:element name="openpgp" type="xs:base64Binary"/> +</xs:schema> +""") + + +# The following schema needs verion 1.1 of XML Schema, which is not supported by lxml. +# Luckily, xmlschema exists, which is a clean, well maintained, cross-platform +# implementation of XML Schema, including version 1.1. +CONTENT_SCHEMA = xmlschema.XMLSchema11("""<?xml version="1.1" encoding="utf8"?> +<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" + targetNamespace="urn:xmpp:openpgp:0" + xmlns="urn:xmpp:openpgp:0"> + + <xs:element name="signcrypt"> + <xs:complexType> + <xs:all> + <xs:element ref="to" maxOccurs="unbounded"/> + <xs:element ref="time"/> + <xs:element ref="rpad" minOccurs="0"/> + <xs:element ref="payload"/> + </xs:all> + </xs:complexType> + </xs:element> + + <xs:element name="sign"> + <xs:complexType> + <xs:all> + <xs:element ref="to" maxOccurs="unbounded"/> + <xs:element ref="time"/> + <xs:element ref="rpad" minOccurs="0"/> + <xs:element ref="payload"/> + </xs:all> + </xs:complexType> + </xs:element> + + <xs:element name="crypt"> + <xs:complexType> + <xs:all> + <xs:element ref="to" minOccurs="0" maxOccurs="unbounded"/> + <xs:element ref="time"/> + <xs:element ref="rpad" minOccurs="0"/> + <xs:element ref="payload"/> + </xs:all> + </xs:complexType> + </xs:element> + + <xs:element name="to"> + <xs:complexType> + <xs:attribute name="jid" type="xs:string"/> + </xs:complexType> + </xs:element> + + <xs:element name="time"> + <xs:complexType> + <xs:attribute name="stamp" type="xs:dateTime"/> + </xs:complexType> + </xs:element> + + <xs:element name="rpad" type="xs:string"/> + + <xs:element name="payload"> + <xs:complexType> + <xs:sequence> + <xs:any minOccurs="0" maxOccurs="unbounded" processContents="skip"/> + </xs:sequence> + </xs:complexType> + </xs:element> +</xs:schema> +""") + + +PUBLIC_KEYS_LIST_NODE = "urn:xmpp:openpgp:0:public-keys" +PUBLIC_KEYS_LIST_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?> +<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" + targetNamespace="urn:xmpp:openpgp:0" + xmlns="urn:xmpp:openpgp:0"> + + <xs:element name="public-keys-list"> + <xs:complexType> + <xs:sequence> + <xs:element ref="pubkey-metadata" minOccurs="0" maxOccurs="unbounded"/> + </xs:sequence> + </xs:complexType> + </xs:element> + + <xs:element name="pubkey-metadata"> + <xs:complexType> + <xs:attribute name="v4-fingerprint" type="xs:string"/> + <xs:attribute name="date" type="xs:dateTime"/> + </xs:complexType> + </xs:element> +</xs:schema> +""") + + +PUBKEY_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?> +<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" + targetNamespace="urn:xmpp:openpgp:0" + xmlns="urn:xmpp:openpgp:0"> + + <xs:element name="pubkey"> + <xs:complexType> + <xs:all> + <xs:element ref="data"/> + </xs:all> + <xs:anyAttribute processContents="skip"/> + </xs:complexType> + </xs:element> + + <xs:element name="data" type="xs:base64Binary"/> +</xs:schema> +""") + + +SECRETKEY_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?> +<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" + targetNamespace="urn:xmpp:openpgp:0" + xmlns="urn:xmpp:openpgp:0"> + + <xs:element name="secretkey" type="xs:base64Binary"/> +</xs:schema> +""") + + +DEFAULT_TRUST_MODEL_PARAM = f""" +<params> +<individual> +<category name="{PARAM_CATEGORY}" label={quoteattr(D_('Security'))}> + <param name="{PARAM_NAME}" + label={quoteattr(D_('OMEMO default trust policy'))} + type="list" security="3"> + <option value="manual" label={quoteattr(D_('Manual trust (more secure)'))} /> + <option value="btbv" + label={quoteattr(D_('Blind Trust Before Verification (more user friendly)'))} + selected="true" /> + </param> +</category> +</individual> +</params> +""" + + +def get_gpg_provider(sat: SAT, client: SatXMPPClient) -> GPGProvider: + """Get the GPG provider for a client. + + @param sat: The SAT instance. + @param client: The client. + @return: The GPG provider specifically for that client. + """ + + return GPGME_GPGProvider(str(sat.get_local_path(client, "gnupg-home"))) + + +def generate_passphrase() -> str: + """Generate a secure passphrase for symmetric encryption. + + @return: The passphrase. + """ + + return "-".join("".join( + secrets.choice("123456789ABCDEFGHIJKLMNPQRSTUVWXYZ") for __ in range(4) + ) for __ in range(6)) + + +# TODO: Handle the user id mess +class XEP_0373: + """ + Implementation of XEP-0373: OpenPGP for XMPP under namespace ``urn:xmpp:openpgp:0``. + """ + + def __init__(self, sat: SAT) -> None: + """ + @param sat: The SAT instance. + """ + + self.__sat = sat + + # Add configuration option to choose between manual trust and BTBV as the trust + # model + sat.memory.updateParams(DEFAULT_TRUST_MODEL_PARAM) + + self.__xep_0045 = cast(Optional[XEP_0045], sat.plugins.get("XEP-0045")) + self.__xep_0060 = cast(XEP_0060, sat.plugins["XEP-0060"]) + + self.__storage: Dict[str, persistent.LazyPersistentBinaryDict] = {} + + xep_0163 = cast(XEP_0163, sat.plugins["XEP-0163"]) + xep_0163.addPEPEvent( + "OX_PUBLIC_KEYS_LIST", + PUBLIC_KEYS_LIST_NODE, + lambda items_event, profile: defer.ensureDeferred( + self.__on_public_keys_list_update(items_event, profile) + ) + ) + + async def profileConnected( # pylint: disable=invalid-name + self, + client: SatXMPPClient + ) -> None: + """ + @param client: The client. + """ + + profile = cast(str, client.profile) + + if not profile in self.__storage: + self.__storage[profile] = \ + persistent.LazyPersistentBinaryDict("XEP-0373", client.profile) + + if len(self.list_secret_keys(client)) == 0: + log.debug(f"Generating first GPG key for {client.jid.userhost()}.") + await self.create_key(client) + + async def __on_public_keys_list_update( + self, + items_event: pubsub.ItemsEvent, + profile: str + ) -> None: + """Handle public keys list updates fired by PEP. + + @param items_event: The event. + @param profile: The profile this event belongs to. + """ + + client = self.__sat.getClient(profile) + + sender = cast(jid.JID, items_event.sender) + items = cast(List[domish.Element], items_event.items) + + if len(items) > 1: + log.warning("Ignoring public keys list update with more than one element.") + return + + item_elt = next(iter(items), None) + if item_elt is None: + log.debug("Ignoring empty public keys list update.") + return + + public_keys_list_elt = cast( + Optional[domish.Element], + next(item_elt.elements(NS_OX, "public-keys-list"), None) + ) + + pubkey_metadata_elts: Optional[List[domish.Element]] = None + + if public_keys_list_elt is not None: + try: + PUBLIC_KEYS_LIST_SCHEMA.validate(public_keys_list_elt.toXml()) + except xmlschema.XMLSchemaValidationError: + pass + else: + pubkey_metadata_elts = \ + list(public_keys_list_elt.elements(NS_OX, "pubkey-metadata")) + + if pubkey_metadata_elts is None: + log.warning(f"Malformed public keys list update item: {item_elt.toXml()}") + return + + new_public_keys_metadata = { PublicKeyMetadata( + fingerprint=cast(str, pubkey_metadata_elt["v4-fingerprint"]), + timestamp=parse_datetime(cast(str, pubkey_metadata_elt["date"])) + ) for pubkey_metadata_elt in pubkey_metadata_elts } + + storage_key = f"/public-keys-metadata/{sender.userhost()}" + + local_public_keys_metadata = cast( + Set[PublicKeyMetadata], + await self.__storage[profile].get(storage_key, set()) + ) + + unchanged_keys = new_public_keys_metadata & local_public_keys_metadata + changed_or_new_keys = new_public_keys_metadata - unchanged_keys + available_keys = self.list_public_keys(client, sender) + + for key_metadata in changed_or_new_keys: + # Check whether the changed or new key has been imported before + if any(key.fingerprint == key_metadata.fingerprint for key in available_keys): + try: + # If it has been imported before, try to update it + await self.import_public_key(client, sender, key_metadata.fingerprint) + except Exception as e: + log.warning(f"Public key import failed: {e}") + + # If the update fails, remove the key from the local metadata list + # such that the update is attempted again next time + new_public_keys_metadata.remove(key_metadata) + + # Check whether this update was for our account and make sure all of our keys are + # included in the update + if sender.userhost() == client.jid.userhost(): + secret_keys = self.list_secret_keys(client) + missing_keys = set(filter(lambda secret_key: all( + key_metadata.fingerprint != secret_key.public_key.fingerprint + for key_metadata + in new_public_keys_metadata + ), secret_keys)) + + if len(missing_keys) > 0: + log.warning( + "Public keys list update did not contain at least one of our keys." + f" {new_public_keys_metadata}" + ) + + for missing_key in missing_keys: + log.warning(missing_key.public_key.fingerprint) + new_public_keys_metadata.add(PublicKeyMetadata( + fingerprint=missing_key.public_key.fingerprint, + timestamp=datetime.now(timezone.utc) + )) + + await self.publish_public_keys_list(client, new_public_keys_metadata) + + await self.__storage[profile].force(storage_key, new_public_keys_metadata) + + def list_public_keys(self, client: SatXMPPClient, jid: jid.JID) -> Set[GPGPublicKey]: + """List GPG public keys available for a JID. + + @param client: The client to perform this operation with. + @param jid: The JID. Can be a bare JID. + @return: The set of public keys available for this JID. + """ + + gpg_provider = get_gpg_provider(self.__sat, client) + + return gpg_provider.list_public_keys(f"xmpp:{jid.userhost()}") + + def list_secret_keys(self, client: SatXMPPClient) -> Set[GPGSecretKey]: + """List GPG secret keys available for a JID. + + @param client: The client to perform this operation with. + @return: The set of secret keys available for this JID. + """ + + gpg_provider = get_gpg_provider(self.__sat, client) + + return gpg_provider.list_secret_keys(f"xmpp:{client.jid.userhost()}") + + async def create_key(self, client: SatXMPPClient) -> GPGSecretKey: + """Create a new GPG key, capable of signing and encryption. + + The key is generated without password protection and without expiration. + + @param client: The client to perform this operation with. + @return: The new key. + """ + + gpg_provider = get_gpg_provider(self.__sat, client) + + secret_key = gpg_provider.create_key(f"xmpp:{client.jid.userhost()}") + + await self.publish_public_key(client, secret_key.public_key) + + storage_key = f"/public-keys-metadata/{client.jid.userhost()}" + + public_keys_list = cast( + Set[PublicKeyMetadata], + await self.__storage[client.profile].get(storage_key, set()) + ) + + public_keys_list.add(PublicKeyMetadata( + fingerprint=secret_key.public_key.fingerprint, + timestamp=datetime.now(timezone.utc) + )) + + await self.publish_public_keys_list(client, public_keys_list) + + await self.__storage[client.profile].force(storage_key, public_keys_list) + + return secret_key + + @staticmethod + def __build_content_element( + element_name: Literal["signcrypt", "sign", "crypt"], + recipient_jids: Iterable[jid.JID], + include_rpad: bool + ) -> Tuple[domish.Element, domish.Element]: + """Build a content element. + + @param element_name: The name of the content element. + @param recipient_jids: The intended recipients of this content element. Can be + bare JIDs. + @param include_rpad: Whether to include random-length random-content padding. + @return: The content element and the ``<payload/>`` element to add the stanza + extension elements to. + """ + + content_elt = domish.Element((NS_OX, element_name)) + + for recipient_jid in recipient_jids: + content_elt.addElement("to")["jid"] = recipient_jid.userhost() + + content_elt.addElement("time")["stamp"] = format_datetime() + + if include_rpad: + # XEP-0373 doesn't specify bounds for the length of the random padding. This + # uses the bounds specified in XEP-0420 for the closely related rpad affix. + rpad_length = secrets.randbelow(201) + rpad_content = "".join( + secrets.choice(string.digits + string.ascii_letters + string.punctuation) + for __ + in range(rpad_length) + ) + content_elt.addElement("rpad", content=rpad_content) + + payload_elt = content_elt.addElement("payload") + + return content_elt, payload_elt + + @staticmethod + def build_signcrypt_element( + recipient_jids: Iterable[jid.JID] + ) -> Tuple[domish.Element, domish.Element]: + """Build a ``<signcrypt/>`` content element. + + @param recipient_jids: The intended recipients of this content element. Can be + bare JIDs. + @return: The ``<signcrypt/>`` element and the ``<payload/>`` element to add the + stanza extension elements to. + """ + + if len(recipient_jids) == 0: + raise ValueError("Recipient JIDs must be provided.") + + return XEP_0373.__build_content_element("signcrypt", recipient_jids, True) + + @staticmethod + def build_sign_element( + recipient_jids: Iterable[jid.JID], + include_rpad: bool + ) -> Tuple[domish.Element, domish.Element]: + """Build a ``<sign/>`` content element. + + @param recipient_jids: The intended recipients of this content element. Can be + bare JIDs. + @param include_rpad: Whether to include random-length random-content padding, + which is OPTIONAL for the ``<sign/>`` content element. + @return: The ``<sign/>`` element and the ``<payload/>`` element to add the stanza + extension elements to. + """ + + if len(recipient_jids) == 0: + raise ValueError("Recipient JIDs must be provided.") + + return XEP_0373.__build_content_element("sign", recipient_jids, include_rpad) + + @staticmethod + def build_crypt_element( + recipient_jids: Iterable[jid.JID] + ) -> Tuple[domish.Element, domish.Element]: + """Build a ``<crypt/>`` content element. + + @param recipient_jids: The intended recipients of this content element. Specifying + the intended recipients is OPTIONAL for the ``<crypt/>`` content element. Can + be bare JIDs. + @return: The ``<crypt/>`` element and the ``<payload/>`` element to add the stanza + extension elements to. + """ + + return XEP_0373.__build_content_element("crypt", recipient_jids, True) + + async def build_openpgp_element( + self, + client: SatXMPPClient, + content_elt: domish.Element, + recipient_jids: Set[jid.JID] + ) -> domish.Element: + """Build an ``<openpgp/>`` element. + + @param client: The client to perform this operation with. + @param content_elt: The content element to contain in the ``<openpgp/>`` element. + @param recipient_jids: The recipient's JIDs. Can be bare JIDs. + @return: The ``<openpgp/>`` element. + """ + + gpg_provider = get_gpg_provider(self.__sat, client) + + # TODO: I'm not sure whether we want to sign with all keys by default or choose + # just one key/a subset of keys to sign with. + signing_keys = set(filter( + lambda secret_key: gpg_provider.can_sign(secret_key.public_key), + self.list_secret_keys(client) + )) + + encryption_keys: Set[GPGPublicKey] = set() + + for recipient_jid in recipient_jids: + # Import all keys of the recipient + all_public_keys = await self.import_all_public_keys(client, recipient_jid) + + # Filter for keys that can encrypt + encryption_keys |= set(filter(gpg_provider.can_encrypt, all_public_keys)) + + # TODO: Handle trust + + content = content_elt.toXml().encode("utf-8") + data: bytes + + if content_elt.name == "signcrypt": + data = gpg_provider.encrypt(content, encryption_keys, signing_keys) + elif content_elt.name == "sign": + data = gpg_provider.sign(content, signing_keys) + elif content_elt.name == "crypt": + data = gpg_provider.encrypt(content, encryption_keys) + else: + raise ValueError(f"Unknown content element <{content_elt.name}/>") + + openpgp_elt = domish.Element((NS_OX, "openpgp")) + openpgp_elt.addContent(base64.b64encode(data).decode("ASCII")) + return openpgp_elt + + async def unpack_openpgp_element( + self, + client: SatXMPPClient, + openpgp_elt: domish.Element, + element_name: Literal["signcrypt", "sign", "crypt"], + sender_jid: jid.JID + ) -> Tuple[domish.Element, datetime]: + """Verify, decrypt and unpack an ``<openpgp/>`` element. + + @param client: The client to perform this operation with. + @param openpgp_elt: The ``<openpgp/>`` element. + @param element_name: The name of the content element. + @param sender_jid: The sender's JID. Can be a bare JID. + @return: The ``<payload/>`` element containing the decrypted/verified stanza + extension elements carried by this ``<openpgp/>`` element, and the timestamp + contained in the content element. + @raise exceptions.ParsingError: on syntactical verification errors. + @raise VerificationError: on semantical verification errors accoding to XEP-0373. + @raise DecryptionFailed: on decryption failure. + @raise VerificationFailed: if the data could not be verified. + + @warning: The timestamp is not verified for plausibility; this SHOULD be done by + the calling code. + """ + + gpg_provider = get_gpg_provider(self.__sat, client) + + decryption_keys = set(filter( + lambda secret_key: gpg_provider.can_encrypt(secret_key.public_key), + self.list_secret_keys(client) + )) + + # Import all keys of the sender + all_public_keys = await self.import_all_public_keys(client, sender_jid) + + # Filter for keys that can sign + verification_keys = set(filter(gpg_provider.can_sign, all_public_keys)) + + # TODO: Handle trust + + try: + OPENPGP_SCHEMA.validate(openpgp_elt.toXml()) + except xmlschema.XMLSchemaValidationError as e: + raise exceptions.ParsingError( + "<openpgp/> element doesn't pass schema validation." + ) from e + + openpgp_message = base64.b64decode(str(openpgp_elt)) + content: bytes + + if element_name == "signcrypt": + content = gpg_provider.decrypt( + openpgp_message, + decryption_keys, + public_keys=verification_keys + ) + elif element_name == "sign": + content = gpg_provider.verify(openpgp_message, verification_keys) + elif element_name == "crypt": + content = gpg_provider.decrypt(openpgp_message, decryption_keys) + else: + assert_never(element_name) + + try: + content_elt = cast( + domish.Element, + xml_tools.ElementParser()(content.decode("utf-8")) + ) + except UnicodeDecodeError as e: + raise exceptions.ParsingError("UTF-8 decoding error") from e + + try: + CONTENT_SCHEMA.validate(content_elt.toXml()) + except xmlschema.XMLSchemaValidationError as e: + raise exceptions.ParsingError( + f"<{element_name}/> element doesn't pass schema validation." + ) from e + + if content_elt.name != element_name: + raise exceptions.ParsingError(f"Not a <{element_name}/> element.") + + recipient_jids = \ + { jid.JID(to_elt["jid"]) for to_elt in content_elt.elements(NS_OX, "to") } + + if ( + client.jid.userhostJID() not in { jid.userhostJID() for jid in recipient_jids } + and element_name != "crypt" + ): + raise VerificationError( + f"Recipient list in <{element_name}/> element does not list our (bare)" + f" JID." + ) + + time_elt = next(content_elt.elements(NS_OX, "time")) + + timestamp = parse_datetime(time_elt["stamp"]) + + payload_elt = next(content_elt.elements(NS_OX, "payload")) + + return payload_elt, timestamp + + async def publish_public_key( + self, + client: SatXMPPClient, + public_key: GPGPublicKey + ) -> None: + """Publish a public key. + + @param client: The client. + @param public_key: The public key to publish. + @raise XMPPInteractionFailed: if any interaction via XMPP failed. + """ + + gpg_provider = get_gpg_provider(self.__sat, client) + + packet = gpg_provider.export_public_key(public_key) + + node = f"urn:xmpp:openpgp:0:public-keys:{public_key.fingerprint}" + + pubkey_elt = domish.Element((NS_OX, "pubkey")) + + pubkey_elt.addElement("data", content=base64.b64encode(packet).decode("ASCII")) + + try: + await self.__xep_0060.sendItem( + client, + client.jid.userhostJID(), + node, + pubkey_elt, + format_datetime(), + extra={ + XEP_0060.EXTRA_PUBLISH_OPTIONS: { + XEP_0060.OPT_PERSIST_ITEMS: "true", + XEP_0060.OPT_ACCESS_MODEL: "open", + XEP_0060.OPT_MAX_ITEMS: 1 + }, + # TODO: Do we really want publish_without_options here? + XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options" + } + ) + except Exception as e: + raise XMPPInteractionFailed("Publishing the public key failed.") from e + + async def import_all_public_keys( + self, + client: SatXMPPClient, + jid: jid.JID + ) -> Set[GPGPublicKey]: + """Import all public keys of a JID that have not been imported before. + + @param client: The client. + @param jid: The JID. Can be a bare JID. + @return: The public keys. + @note: Failure to import a key simply results in the key not being included in the + result. + """ + + available_public_keys = self.list_public_keys(client, jid) + + storage_key = f"/public-keys-metadata/{jid.userhost()}" + + public_keys_metadata = cast( + Set[PublicKeyMetadata], + await self.__storage[client.profile].get(storage_key, set()) + ) + + missing_keys = set(filter(lambda public_key_metadata: all( + public_key_metadata.fingerprint != public_key.fingerprint + for public_key + in available_public_keys + ), public_keys_metadata)) + + for missing_key in missing_keys: + try: + available_public_keys.add( + await self.import_public_key(client, jid, missing_key.fingerprint) + ) + except Exception as e: + log.warning( + f"Import of public key {missing_key.fingerprint} owned by" + f" {jid.userhost()} failed, ignoring: {e}" + ) + + return available_public_keys + + async def import_public_key( + self, + client: SatXMPPClient, + jid: jid.JID, + fingerprint: str + ) -> GPGPublicKey: + """Import a public key. + + @param client: The client. + @param jid: The JID owning the public key. Can be a bare JID. + @param fingerprint: The fingerprint of the public key. + @return: The public key. + @raise exceptions.NotFound: if the public key was not found. + @raise exceptions.ParsingError: on XML-level parsing errors. + @raise InvalidPacket: if the packet is either syntactically or semantically deemed + invalid. + @raise XMPPInteractionFailed: if any interaction via XMPP failed. + """ + + gpg_provider = get_gpg_provider(self.__sat, client) + + node = f"urn:xmpp:openpgp:0:public-keys:{fingerprint}" + + try: + items, __ = await self.__xep_0060.getItems( + client, + jid.userhostJID(), + node, + max_items=1 + ) + except exceptions.NotFound as e: + raise exceptions.NotFound( + f"No public key with fingerprint {fingerprint} published by JID" + f" {jid.userhost()}." + ) from e + except Exception as e: + raise XMPPInteractionFailed("Fetching the public keys list failed.") from e + + try: + item_elt = cast(domish.Element, items[0]) + except IndexError as e: + raise exceptions.NotFound( + f"No public key with fingerprint {fingerprint} published by JID" + f" {jid.userhost()}." + ) from e + + pubkey_elt = cast( + Optional[domish.Element], + next(item_elt.elements(NS_OX, "pubkey"), None) + ) + + if pubkey_elt is None: + raise exceptions.ParsingError( + f"Publish-Subscribe item of JID {jid.userhost()} doesn't contain pubkey" + f" element." + ) + + try: + PUBKEY_SCHEMA.validate(pubkey_elt.toXml()) + except xmlschema.XMLSchemaValidationError as e: + raise exceptions.ParsingError( + f"Publish-Subscribe item of JID {jid.userhost()} doesn't pass pubkey" + f" schema validation." + ) from e + + public_key = gpg_provider.import_public_key(base64.b64decode(str( + next(pubkey_elt.elements(NS_OX, "data")) + ))) + + return public_key + + async def publish_public_keys_list( + self, + client: SatXMPPClient, + public_keys_list: Iterable[PublicKeyMetadata] + ) -> None: + """Publish/update the own public keys list. + + @param client: The client. + @param public_keys_list: The public keys list. + @raise XMPPInteractionFailed: if any interaction via XMPP failed. + + @warning: All public keys referenced in the public keys list MUST be published + beforehand. + """ + + if len({ pkm.fingerprint for pkm in public_keys_list }) != len(public_keys_list): + raise ValueError("Public keys list contains duplicate fingerprints.") + + node = "urn:xmpp:openpgp:0:public-keys" + + public_keys_list_elt = domish.Element((NS_OX, "public-keys-list")) + + for public_key_metadata in public_keys_list: + pubkey_metadata_elt = public_keys_list_elt.addElement("pubkey-metadata") + pubkey_metadata_elt["v4-fingerprint"] = public_key_metadata.fingerprint + pubkey_metadata_elt["date"] = format_datetime(public_key_metadata.timestamp) + + try: + await self.__xep_0060.sendItem( + client, + client.jid.userhostJID(), + node, + public_keys_list_elt, + item_id=XEP_0060.ID_SINGLETON, + extra={ + XEP_0060.EXTRA_PUBLISH_OPTIONS: { + XEP_0060.OPT_PERSIST_ITEMS: "true", + XEP_0060.OPT_ACCESS_MODEL: "open", + XEP_0060.OPT_MAX_ITEMS: 1 + }, + # TODO: Do we really want publish_without_options here? + XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options" + } + ) + except Exception as e: + raise XMPPInteractionFailed("Publishing the public keys list failed.") from e + + async def download_public_keys_list( + self, + client: SatXMPPClient, + jid: jid.JID + ) -> Optional[Set[PublicKeyMetadata]]: + """Download the public keys list of a JID. + + @param client: The client. + @param jid: The JID. Can be a bare JID. + @return: The public keys list or ``None`` if the JID hasn't published a public + keys list. An empty list means the JID has published an empty list. + @raise exceptions.ParsingError: on XML-level parsing errors. + @raise XMPPInteractionFailed: if any interaction via XMPP failed. + """ + + node = "urn:xmpp:openpgp:0:public-keys" + + try: + items, __ = await self.__xep_0060.getItems( + client, + jid.userhostJID(), + node, + max_items=1 + ) + except exceptions.NotFound: + return None + except Exception as e: + raise XMPPInteractionFailed() from e + + try: + item_elt = cast(domish.Element, items[0]) + except IndexError: + return None + + public_keys_list_elt = cast( + Optional[domish.Element], + next(item_elt.elements(NS_OX, "public-keys-list"), None) + ) + + if public_keys_list_elt is None: + return None + + try: + PUBLIC_KEYS_LIST_SCHEMA.validate(public_keys_list_elt.toXml()) + except xmlschema.XMLSchemaValidationError as e: + raise exceptions.ParsingError( + f"Publish-Subscribe item of JID {jid.userhost()} doesn't pass public keys" + f" list schema validation." + ) from e + + return { + PublicKeyMetadata( + fingerprint=pubkey_metadata_elt["v4-fingerprint"], + timestamp=parse_datetime(pubkey_metadata_elt["date"]) + ) + for pubkey_metadata_elt + in public_keys_list_elt.elements(NS_OX, "pubkey-metadata") + } + + async def __prepare_secret_key_synchronization( + self, + client: SatXMPPClient + ) -> Optional[domish.Element]: + """Prepare for secret key synchronization. + + Makes sure the relative protocols and protocol extensions are supported by the + server and makes sure that the PEP node for secret synchronization exists and is + configured correctly. The node is created if necessary. + + @param client: The client. + @return: As part of the preparations, the secret key synchronization PEP node is + fetched. The result of that fetch is returned here. + @raise exceptions.FeatureNotFound: if the server lacks support for the required + protocols or protocol extensions. + @raise XMPPInteractionFailed: if any interaction via XMPP failed. + """ + + try: + infos = cast(DiscoInfo, await self.__sat.memory.disco.getInfos( + client, + client.jid.userhostJID() + )) + except Exception as e: + raise XMPPInteractionFailed( + "Error performing service discovery on the own bare JID." + ) from e + + identities = cast(Dict[Tuple[str, str], str], infos.identities) + features = cast(Set[DiscoFeature], infos.features) + + if ("pubsub", "pep") not in identities: + raise exceptions.FeatureNotFound("Server doesn't support PEP.") + + if "http://jabber.org/protocol/pubsub#access-whitelist" not in features: + raise exceptions.FeatureNotFound( + "Server doesn't support the whitelist access model." + ) + + persistent_items_supported = \ + "http://jabber.org/protocol/pubsub#persistent-items" in features + + # TODO: persistent-items is a SHOULD, how do we handle the feature missing? + + node = "urn:xmpp:openpgp:0:secret-key" + + try: + items, __ = await self.__xep_0060.getItems( + client, + client.jid.userhostJID(), + node, + max_items=1 + ) + except exceptions.NotFound: + try: + await self.__xep_0060.createNode( + client, + client.jid.userhostJID(), + node, + { + XEP_0060.OPT_PERSIST_ITEMS: "true", + XEP_0060.OPT_ACCESS_MODEL: "whitelist", + XEP_0060.OPT_MAX_ITEMS: "1" + } + ) + except Exception as e: + raise XMPPInteractionFailed( + "Error creating the secret key synchronization node." + ) from e + except Exception as e: + raise XMPPInteractionFailed( + "Error fetching the secret key synchronization node." + ) from e + + try: + return cast(domish.Element, items[0]) + except IndexError: + return None + + async def export_secret_keys( + self, + client: SatXMPPClient, + secret_keys: Iterable[GPGSecretKey] + ) -> str: + """Export secret keys to synchronize them with other devices. + + @param client: The client. + @param secret_keys: The secret keys to export. + @return: The backup code needed to decrypt the exported secret keys. + @raise exceptions.FeatureNotFound: if the server lacks support for the required + protocols or protocol extensions. + @raise XMPPInteractionFailed: if any interaction via XMPP failed. + """ + + gpg_provider = get_gpg_provider(self.__sat, client) + + await self.__prepare_secret_key_synchronization(client) + + backup_code = generate_passphrase() + + plaintext = b"".join( + gpg_provider.backup_secret_key(secret_key) for secret_key in secret_keys + ) + + ciphertext = gpg_provider.encrypt_symmetrically(plaintext, backup_code) + + node = "urn:xmpp:openpgp:0:secret-key" + + secretkey_elt = domish.Element((NS_OX, "secretkey")) + secretkey_elt.addContent(base64.b64encode(ciphertext).decode("ASCII")) + + try: + await self.__xep_0060.sendItem( + client, + client.jid.userhostJID(), + node, + secretkey_elt + ) + except Exception as e: + raise XMPPInteractionFailed("Publishing the secret keys failed.") from e + + return backup_code + + async def download_secret_keys(self, client: SatXMPPClient) -> Optional[bytes]: + """Download previously exported secret keys to import them in a second step. + + The downloading and importing steps are separate since a backup code is required + for the import and it should be possible to try multiple backup codes without + redownloading the data every time. The second half of the import procedure is + provided by :meth:`import_secret_keys`. + + @param client: The client. + @return: The encrypted secret keys previously exported, if any. + @raise exceptions.FeatureNotFound: if the server lacks support for the required + protocols or protocol extensions. + @raise exceptions.ParsingError: on XML-level parsing errors. + @raise XMPPInteractionFailed: if any interaction via XMPP failed. + """ + + item_elt = await self.__prepare_secret_key_synchronization(client) + if item_elt is None: + return None + + secretkey_elt = cast( + Optional[domish.Element], + next(item_elt.elements(NS_OX, "secretkey"), None) + ) + + if secretkey_elt is None: + return None + + try: + SECRETKEY_SCHEMA.validate(secretkey_elt.toXml()) + except xmlschema.XMLSchemaValidationError as e: + raise exceptions.ParsingError( + "Publish-Subscribe item doesn't pass secretkey schema validation." + ) from e + + return base64.b64decode(str(secretkey_elt)) + + def import_secret_keys( + self, + client: SatXMPPClient, + ciphertext: bytes, + backup_code: str + ) -> Set[GPGSecretKey]: + """Import previously downloaded secret keys. + + The downloading and importing steps are separate since a backup code is required + for the import and it should be possible to try multiple backup codes without + redownloading the data every time. The first half of the import procedure is + provided by :meth:`download_secret_keys`. + + @param client: The client to perform this operation with. + @param ciphertext: The ciphertext, i.e. the data returned by + :meth:`download_secret_keys`. + @param backup_code: The backup code needed to decrypt the data. + @raise InvalidPacket: if one of the GPG packets building the secret key data is + either syntactically or semantically deemed invalid. + @raise DecryptionFailed: on decryption failure. + """ + + gpg_provider = get_gpg_provider(self.__sat, client) + + return gpg_provider.restore_secret_keys(gpg_provider.decrypt_symmetrically( + ciphertext, + backup_code + )) + + @staticmethod + def __get_joined_muc_users( + client: SatXMPPClient, + xep_0045: XEP_0045, + room_jid: jid.JID + ) -> Set[jid.JID]: + """ + @param client: The client. + @param xep_0045: A MUC plugin instance. + @param room_jid: The room JID. + @return: A set containing the bare JIDs of the MUC participants. + @raise InternalError: if the MUC is not joined or the entity information of a + participant isn't available. + """ + # TODO: This should probably be a global helper somewhere + + bare_jids: Set[jid.JID] = set() + + try: + room = cast(muc.Room, xep_0045.getRoom(client, room_jid)) + except exceptions.NotFound as e: + raise exceptions.InternalError( + "Participant list of unjoined MUC requested." + ) from e + + for user in cast(Dict[str, muc.User], room.roster).values(): + entity = cast(Optional[SatXMPPEntity], user.entity) + if entity is None: + raise exceptions.InternalError( + f"Participant list of MUC requested, but the entity information of" + f" the participant {user} is not available." + ) + + bare_jids.add(entity.jid.userhostJID()) + + return bare_jids + + async def get_trust( + self, + client: SatXMPPClient, + public_key: GPGPublicKey, + owner: jid.JID + ) -> TrustLevel: + """Query the trust level of a public key. + + @param client: The client to perform this operation under. + @param public_key: The public key. + @param owner: The owner of the public key. Can be a bare JID. + @return: The trust level. + """ + + key = f"/trust/{owner.userhost()}/{public_key.fingerprint}" + + try: + return TrustLevel(await self.__storage[client.profile][key]) + except KeyError: + return TrustLevel.UNDECIDED + + async def set_trust( + self, + client: SatXMPPClient, + public_key: GPGPublicKey, + owner: jid.JID, + trust_level: TrustLevel + ) -> None: + """Set the trust level of a public key. + + @param client: The client to perform this operation under. + @param public_key: The public key. + @param owner: The owner of the public key. Can be a bare JID. + @param trust_leve: The trust level. + """ + + key = f"/trust/{owner.userhost()}/{public_key.fingerprint}" + + await self.__storage[client.profile].force(key, trust_level.name) + + async def getTrustUI( # pylint: disable=invalid-name + self, + client: SatXMPPClient, + entity: jid.JID + ) -> xml_tools.XMLUI: + """ + @param client: The client. + @param entity: The entity whose device trust levels to manage. + @return: An XMLUI instance which opens a form to manage the trust level of all + devices belonging to the entity. + """ + + if entity.resource: + raise ValueError("A bare JID is expected.") + + bare_jids: Set[jid.JID] + if self.__xep_0045 is not None and self.__xep_0045.isJoinedRoom(client, entity): + bare_jids = self.__get_joined_muc_users(client, self.__xep_0045, entity) + else: + bare_jids = { entity.userhostJID() } + + all_public_keys = list({ + bare_jid: list(self.list_public_keys(client, bare_jid)) + for bare_jid + in bare_jids + }.items()) + + async def callback( + data: Any, + profile: str # pylint: disable=unused-argument + ) -> Dict[Never, Never]: + """ + @param data: The XMLUI result produces by the trust UI form. + @param profile: The profile. + @return: An empty dictionary. The type of the return value was chosen + conservatively since the exact options are neither known not needed here. + """ + + if C.bool(data.get("cancelled", "false")): + return {} + + data_form_result = cast( + Dict[str, str], + xml_tools.XMLUIResult2DataFormResult(data) + ) + for key, value in data_form_result.items(): + if not key.startswith("trust_"): + continue + + outer_index, inner_index = key.split("_")[1:] + + owner, public_keys = all_public_keys[int(outer_index)] + public_key = public_keys[int(inner_index)] + trust = TrustLevel(value) + + if (await self.get_trust(client, public_key, owner)) is not trust: + await self.set_trust(client, public_key, owner, value) + + return {} + + submit_id = self.__sat.registerCallback(callback, with_data=True, one_shot=True) + + result = xml_tools.XMLUI( + panel_type=C.XMLUI_FORM, + title=D_("OX trust management"), + submit_id=submit_id + ) + # Casting this to Any, otherwise all calls on the variable cause type errors + # pylint: disable=no-member + trust_ui = cast(Any, result) + trust_ui.addText(D_( + "This is OX trusting system. You'll see below the GPG keys of your " + "contacts, and a list selection to trust them or not. A trusted key " + "can read your messages in plain text, so be sure to only validate " + "keys that you are sure are belonging to your contact. It's better " + "to do this when you are next to your contact, so " + "you can check the \"fingerprint\" of the key " + "yourself. Do *not* validate a key if the fingerprint is wrong!" + )) + + own_secret_keys = self.list_secret_keys(client) + + trust_ui.changeContainer("label") + for index, secret_key in enumerate(own_secret_keys): + trust_ui.addLabel(D_(f"Own secret key {index} fingerprint")) + trust_ui.addText(secret_key.public_key.fingerprint) + trust_ui.addEmpty() + trust_ui.addEmpty() + + for outer_index, [ owner, public_keys ] in enumerate(all_public_keys): + for inner_index, public_key in enumerate(public_keys): + trust_ui.addLabel(D_("Contact")) + trust_ui.addJid(jid.JID(owner)) + trust_ui.addLabel(D_("Fingerprint")) + trust_ui.addText(public_key.fingerprint) + trust_ui.addLabel(D_("Trust this device?")) + + current_trust_level = await self.get_trust(client, public_key, owner) + avaiable_trust_levels = \ + { TrustLevel.DISTRUSTED, TrustLevel.TRUSTED, current_trust_level } + + trust_ui.addList( + f"trust_{outer_index}_{inner_index}", + options=[ trust_level.name for trust_level in avaiable_trust_levels ], + selected=current_trust_level.name, + styles=[ "inline" ] + ) + + trust_ui.addEmpty() + trust_ui.addEmpty() + + return result