Mercurial > libervia-backend
view libervia/backend/plugins/plugin_xep_0373.py @ 4336:6e0918e638ee
plugin XEP-0498: "Pubsub File Sharing" implementation:
Partial implementation of XEP-0498, necessary to implement the service part in email
gateway.
rel 453
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 03 Dec 2024 00:13:23 +0100 |
parents | 111dce64dcb5 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin for OpenPGP for XMPP # Copyright (C) 2022-2024 Tim Henkes (me@syndace.dev) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from abc import ABC, abstractmethod import base64 from datetime import datetime, timezone import enum import json import secrets import string from typing import Any, Dict, Iterable, List, Literal, Optional, Set, Tuple, cast from xml.sax.saxutils import quoteattr from typing import Final, NamedTuple, Never, assert_never from wokkel import muc, pubsub from wokkel.disco import DiscoFeature, DiscoInfo import xmlschema from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import _, D_ from libervia.backend.core.log import getLogger, Logger from libervia.backend.core.main import LiberviaBackend from libervia.backend.core.xmpp import SatXMPPClient from libervia.backend.memory import persistent from libervia.backend.plugins.plugin_xep_0045 import XEP_0045 from libervia.backend.plugins.plugin_xep_0060 import XEP_0060 from libervia.backend.plugins.plugin_xep_0163 import XEP_0163 from libervia.backend.tools.xmpp_datetime import format_datetime, parse_datetime from libervia.backend.tools import xml_tools from twisted.internet import defer from twisted.words.protocols.jabber import jid from twisted.words.xish import domish try: import gpg except ImportError as import_error: raise exceptions.MissingModule( "You are missing the 'gpg' package required by the OX plugin. The recommended" " installation method is via your operating system's package manager, since the" " version of the library has to match the version of your GnuPG installation. See" " https://wiki.python.org/moin/GnuPrivacyGuard#Accessing_GnuPG_via_gpgme" ) from import_error __all__ = [ # pylint: disable=unused-variable "PLUGIN_INFO", "NS_OX", "XEP_0373", "VerificationError", "XMPPInteractionFailed", "InvalidPacket", "DecryptionFailed", "VerificationFailed", "UnknownKey", "GPGProviderError", "GPGPublicKey", "GPGSecretKey", "GPGProvider", "PublicKeyMetadata", "gpg_provider", "TrustLevel", ] log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call] PLUGIN_INFO = { C.PI_NAME: "XEP-0373", C.PI_IMPORT_NAME: "XEP-0373", C.PI_TYPE: "SEC", C.PI_PROTOCOLS: ["XEP-0373"], C.PI_DEPENDENCIES: ["XEP-0060", "XEP-0163"], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "XEP_0373", C.PI_HANDLER: "no", C.PI_DESCRIPTION: D_("Implementation of OpenPGP for XMPP"), } NS_OX: Final = "urn:xmpp:openpgp:0" PARAM_CATEGORY = "Security" PARAM_NAME = "ox_policy" STR_KEY_PUBLIC_KEYS_METADATA = "/public-keys-metadata/{}" class VerificationError(Exception): """ Raised by verifying methods of :class:`XEP_0373` on semantical verification errors. """ class XMPPInteractionFailed(Exception): """ Raised by methods of :class:`XEP_0373` on XMPP interaction failure. The reason this exception exists is that the exceptions raised by XMPP interactions are not properly documented for the most part, thus all exceptions are caught and wrapped in instances of this class. """ class InvalidPacket(ValueError): """ Raised by methods of :class:`GPGProvider` when an invalid packet is encountered. """ class DecryptionFailed(Exception): """ Raised by methods of :class:`GPGProvider` on decryption failures. """ class VerificationFailed(Exception): """ Raised by methods of :class:`GPGProvider` on verification failures. """ class UnknownKey(ValueError): """ Raised by methods of :class:`GPGProvider` when an unknown key is referenced. """ class GPGProviderError(Exception): """ Raised by methods of :class:`GPGProvider` on internal errors. """ class GPGPublicKey(ABC): """ Interface describing a GPG public key. """ @property @abstractmethod def fingerprint(self) -> str: """ @return: The OpenPGP v4 fingerprint string of this public key. """ class GPGSecretKey(ABC): """ Interface descibing a GPG secret key. """ @property @abstractmethod def public_key(self) -> GPGPublicKey: """ @return: The public key corresponding to this secret key. """ class GPGProvider(ABC): """ Interface describing a GPG provider, i.e. a library or framework providing GPG encryption, signing and key management. All methods may raise :class:`GPGProviderError` in addition to those exception types listed explicitly. # TODO: Check keys for revoked, disabled and expired everywhere and exclude those (?) """ @abstractmethod def export_public_key(self, public_key: GPGPublicKey) -> bytes: """Export a public key in a key material packet according to RFC 4880 §5.5. Do not use OpenPGP's ASCII Armor. @param public_key: The public key to export. @return: The packet containing the exported public key. @raise UnknownKey: if the public key is not available. """ @abstractmethod def import_public_key(self, packet: bytes) -> GPGPublicKey: """import a public key from a key material packet according to RFC 4880 §5.5. OpenPGP's ASCII Armor is not used. @param packet: A packet containing an exported public key. @return: The public key imported from the packet. @raise InvalidPacket: if the packet is either syntactically or semantically deemed invalid. @warning: Only packets of version 4 or higher may be accepted, packets below version 4 MUST be rejected. """ @abstractmethod def backup_secret_key(self, secret_key: GPGSecretKey) -> bytes: """Export a secret key for transfer according to RFC 4880 §11.1. Do not encrypt the secret data, i.e. set the octet indicating string-to-key usage conventions to zero in the corresponding secret-key packet according to RFC 4880 §5.5.3. Do not use OpenPGP's ASCII Armor. @param secret_key: The secret key to export. @return: The binary blob containing the exported secret key. @raise UnknownKey: if the secret key is not available. """ @abstractmethod def restore_secret_keys(self, data: bytes) -> Set[GPGSecretKey]: """Restore secret keys exported for transfer according to RFC 4880 §11.1. The secret data is not encrypted, i.e. the octet indicating string-to-key usage conventions in the corresponding secret-key packets according to RFC 4880 §5.5.3 are set to zero. OpenPGP's ASCII Armor is not used. @param data: Concatenation of one or more secret keys exported for transfer. @return: The secret keys imported from the data. @raise InvalidPacket: if the data or one of the packets included in the data is either syntactically or semantically deemed invalid. @warning: Only packets of version 4 or higher may be accepted, packets below version 4 MUST be rejected. """ @abstractmethod def encrypt_symmetrically(self, plaintext: bytes, password: str) -> bytes: """Encrypt data symmetrically according to RFC 4880 §5.3. The password is used to build a Symmetric-Key Encrypted Session Key packet which precedes the Symmetrically Encrypted Data packet that holds the encrypted data. @param plaintext: The data to encrypt. @param password: The password to encrypt the data with. @return: The encrypted data. """ @abstractmethod def decrypt_symmetrically(self, ciphertext: bytes, password: str) -> bytes: """Decrypt data symmetrically according to RFC 4880 §5.3. The ciphertext consists of a Symmetrically Encrypted Data packet that holds the encrypted data, preceded by a Symmetric-Key Encrypted Session Key packet using the password. @param ciphertext: The ciphertext. @param password: The password to decrypt the data with. @return: The plaintext. @raise DecryptionFailed: on decryption failure. """ @abstractmethod def sign(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes: """Sign some data. OpenPGP's ASCII Armor is not used. @param data: The data to sign. @param secret_keys: The secret keys to sign the data with. @return: The OpenPGP message carrying the signed data. """ @abstractmethod def sign_detached(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes: """Sign some data. Create the signature detached from the data. OpenPGP's ASCII Armor is not used. @param data: The data to sign. @param secret_keys: The secret keys to sign the data with. @return: The OpenPGP message carrying the detached signature. """ @abstractmethod def verify(self, signed_data: bytes, public_keys: Set[GPGPublicKey]) -> bytes: """Verify signed data. OpenPGP's ASCII Armor is not used. @param signed_data: The signed data as an OpenPGP message. @param public_keys: The public keys to verify the signature with. @return: The verified and unpacked data. @raise VerificationFailed: if the data could not be verified. @warning: For implementors: it has to be confirmed that a valid signature by one of the public keys is available. """ @abstractmethod def verify_detached( self, data: bytes, signature: bytes, public_keys: Set[GPGPublicKey] ) -> None: """Verify signed data, where the signature was created detached from the data. OpenPGP's ASCII Armor is not used. @param data: The data. @param signature: The signature as an OpenPGP message. @param public_keys: The public keys to verify the signature with. @raise VerificationFailed: if the data could not be verified. @warning: For implementors: it has to be confirmed that a valid signature by one of the public keys is available. """ @abstractmethod def encrypt( self, plaintext: bytes, public_keys: Set[GPGPublicKey], signing_keys: Optional[Set[GPGSecretKey]] = None, ) -> bytes: """Encrypt and optionally sign some data. OpenPGP's ASCII Armor is not used. @param plaintext: The data to encrypt and optionally sign. @param public_keys: The public keys to encrypt the data for. @param signing_keys: The secret keys to sign the data with. @return: The OpenPGP message carrying the encrypted and optionally signed data. """ @abstractmethod def decrypt( self, ciphertext: bytes, secret_keys: Set[GPGSecretKey], public_keys: Optional[Set[GPGPublicKey]] = None, ) -> bytes: """Decrypt and optionally verify some data. OpenPGP's ASCII Armor is not used. @param ciphertext: The encrypted and optionally signed data as an OpenPGP message. @param secret_keys: The secret keys to attempt decryption with. @param public_keys: The public keys to verify the optional signature with. @return: The decrypted, optionally verified and unpacked data. @raise DecryptionFailed: on decryption failure. @raise VerificationFailed: if the data could not be verified. @warning: For implementors: it has to be confirmed that the data was decrypted using one of the secret keys and that a valid signature by one of the public keys is available in case the data is signed. """ @abstractmethod def list_public_keys(self, user_id: str) -> Set[GPGPublicKey]: """List public keys. @param user_id: The user id. @return: The set of public keys available for this user id. """ @abstractmethod def list_secret_keys(self, user_id: str) -> Set[GPGSecretKey]: """List secret keys. @param user_id: The user id. @return: The set of secret keys available for this user id. """ @abstractmethod def can_sign(self, public_key: GPGPublicKey) -> bool: """ @return: Whether the public key belongs to a key pair capable of signing. """ @abstractmethod def can_encrypt(self, public_key: GPGPublicKey) -> bool: """ @return: Whether the public key belongs to a key pair capable of encryption. """ @abstractmethod def create_key(self, user_id: str) -> GPGSecretKey: """Create a new GPG key, capable of signing and encryption. The key is generated without password protection and without expiration. If a key with the same user id already exists, a new key is created anyway. @param user_id: The user id to assign to the new key. @return: The new key. """ class GPGME_GPGPublicKey(GPGPublicKey): """ GPG public key implementation based on GnuPG Made Easy (GPGME). """ def __init__(self, key_obj: Any) -> None: """ @param key_obj: The GPGME key object. """ self.__key_obj = key_obj @property def fingerprint(self) -> str: return self.__key_obj.fpr @property def key_obj(self) -> Any: return self.__key_obj class GPGME_GPGSecretKey(GPGSecretKey): """ GPG secret key implementation based on GnuPG Made Easy (GPGME). """ def __init__(self, public_key: GPGME_GPGPublicKey) -> None: """ @param public_key: The public key corresponding to this secret key. """ self.__public_key = public_key @property def public_key(self) -> GPGME_GPGPublicKey: return self.__public_key class GPGME_GPGProvider(GPGProvider): """ GPG provider implementation based on GnuPG Made Easy (GPGME). """ ALGORITHM = "ed25519/cert,sign+cv25519/encr" def __init__(self, home_dir: Optional[str] = None) -> None: """ @param home_dir: Optional GPG home directory path to use for all operations. """ self.__home_dir = home_dir def export_public_key(self, public_key: GPGPublicKey) -> bytes: assert isinstance(public_key, GPGME_GPGPublicKey) pattern = public_key.fingerprint with gpg.Context(home_dir=self.__home_dir) as c: try: result = c.key_export_minimal(pattern) except gpg.errors.GPGMEError as e: raise GPGProviderError("Internal GPGME error") from e if result is None: raise UnknownKey(f"Public key {pattern} not found.") return result def import_public_key(self, packet: bytes) -> GPGPublicKey: # TODO # - Reject packets older than version 4 # - Check whether it's actually a public key (through packet inspection?) with gpg.Context(home_dir=self.__home_dir) as c: try: result = c.key_import(packet) except gpg.errors.GPGMEError as e: # From looking at the code, `key_import` never raises. The documentation # says it does though, so this is included for future-proofness. raise GPGProviderError("Internal GPGME error") from e if not hasattr(result, "considered"): raise InvalidPacket( f"Data not considered for public key import: {result}" ) if len(result.imports) != 1: raise InvalidPacket( "Public key packet does not contain exactly one public key (not" " counting subkeys)." ) try: key_obj = c.get_key(result.imports[0].fpr, secret=False) except gpg.errors.GPGMEError as e: raise GPGProviderError("Internal GPGME error") from e except gpg.errors.KeyError as e: raise GPGProviderError("Newly imported public key not found") from e return GPGME_GPGPublicKey(key_obj) def backup_secret_key(self, secret_key: GPGSecretKey) -> bytes: assert isinstance(secret_key, GPGME_GPGSecretKey) # TODO # - Handle password protection/pinentry # - Make sure the key is exported unencrypted pattern = secret_key.public_key.fingerprint with gpg.Context(home_dir=self.__home_dir) as c: try: result = c.key_export_secret(pattern) except gpg.errors.GPGMEError as e: raise GPGProviderError("Internal GPGME error") from e if result is None: raise UnknownKey(f"Secret key {pattern} not found.") return result def restore_secret_keys(self, data: bytes) -> Set[GPGSecretKey]: # TODO # - Reject packets older than version 4 # - Check whether it's actually secret keys (through packet inspection?) with gpg.Context(home_dir=self.__home_dir) as c: try: result = c.key_import(data) except gpg.errors.GPGMEError as e: # From looking at the code, `key_import` never raises. The documentation # says it does though, so this is included for future-proofness. raise GPGProviderError("Internal GPGME error") from e if not hasattr(result, "considered"): raise InvalidPacket( f"Data not considered for secret key import: {result}" ) if len(result.imports) == 0: raise InvalidPacket("Secret key packet does not contain a secret key.") secret_keys = set() for import_status in result.imports: try: key_obj = c.get_key(import_status.fpr, secret=True) except gpg.errors.GPGMEError as e: raise GPGProviderError("Internal GPGME error") from e except gpg.errors.KeyError as e: raise GPGProviderError("Newly imported secret key not found") from e secret_keys.add(GPGME_GPGSecretKey(GPGME_GPGPublicKey(key_obj))) return secret_keys def encrypt_symmetrically(self, plaintext: bytes, password: str) -> bytes: with gpg.Context(home_dir=self.__home_dir) as c: try: ciphertext, __, __ = c.encrypt(plaintext, passphrase=password, sign=False) except gpg.errors.GPGMEError as e: raise GPGProviderError("Internal GPGME error") from e return ciphertext def decrypt_symmetrically(self, ciphertext: bytes, password: str) -> bytes: with gpg.Context(home_dir=self.__home_dir) as c: try: plaintext, __, __ = c.decrypt( ciphertext, passphrase=password, verify=False ) except gpg.errors.GPGMEError as e: # TODO: Find out what kind of error is raised if the password is wrong and # re-raise it as DecryptionFailed instead. raise GPGProviderError("Internal GPGME error") from e except gpg.UnsupportedAlgorithm as e: raise DecryptionFailed("Unsupported algorithm") from e return plaintext def sign(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes: signers = [] for secret_key in secret_keys: assert isinstance(secret_key, GPGME_GPGSecretKey) signers.append(secret_key.public_key.key_obj) with gpg.Context(home_dir=self.__home_dir, signers=signers) as c: try: signed_data, __ = c.sign(data) except gpg.errors.GPGMEError as e: raise GPGProviderError("Internal GPGME error") from e except gpg.errors.InvalidSigners as e: raise GPGProviderError( "At least one of the secret keys is invalid for signing" ) from e return signed_data def sign_detached(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes: signers = [] for secret_key in secret_keys: assert isinstance(secret_key, GPGME_GPGSecretKey) signers.append(secret_key.public_key.key_obj) with gpg.Context(home_dir=self.__home_dir, signers=signers) as c: try: signature, __ = c.sign(data, mode=gpg.constants.sig.mode.DETACH) except gpg.errors.GPGMEError as e: raise GPGProviderError("Internal GPGME error") from e except gpg.errors.InvalidSigners as e: raise GPGProviderError( "At least one of the secret keys is invalid for signing" ) from e return signature def verify(self, signed_data: bytes, public_keys: Set[GPGPublicKey]) -> bytes: with gpg.Context(home_dir=self.__home_dir) as c: try: data, result = c.verify(signed_data) except gpg.errors.GPGMEError as e: raise GPGProviderError("Internal GPGME error") from e except gpg.errors.BadSignatures as e: raise VerificationFailed("Bad signatures on signed data") from e valid_signature_found = False for public_key in public_keys: assert isinstance(public_key, GPGME_GPGPublicKey) for subkey in public_key.key_obj.subkeys: for sig in result.signatures: if subkey.can_sign and subkey.fpr == sig.fpr: valid_signature_found = True if not valid_signature_found: raise VerificationFailed( "Data not signed by one of the expected public keys" ) return data def verify_detached( self, data: bytes, signature: bytes, public_keys: Set[GPGPublicKey] ) -> None: with gpg.Context(home_dir=self.__home_dir) as c: try: __, result = c.verify(data, signature=signature) except gpg.errors.GPGMEError as e: raise GPGProviderError("Internal GPGME error") from e except gpg.errors.BadSignatures as e: raise VerificationFailed("Bad signatures on signed data") from e valid_signature_found = False for public_key in public_keys: assert isinstance(public_key, GPGME_GPGPublicKey) for subkey in public_key.key_obj.subkeys: for sig in result.signatures: if subkey.can_sign and subkey.fpr == sig.fpr: valid_signature_found = True if not valid_signature_found: raise VerificationFailed( "Data not signed by one of the expected public keys" ) def encrypt( self, plaintext: bytes, public_keys: Set[GPGPublicKey], signing_keys: Optional[Set[GPGSecretKey]] = None, ) -> bytes: recipients = [] for public_key in public_keys: assert isinstance(public_key, GPGME_GPGPublicKey) recipients.append(public_key.key_obj) signers = [] if signing_keys is not None: for secret_key in signing_keys: assert isinstance(secret_key, GPGME_GPGSecretKey) signers.append(secret_key.public_key.key_obj) sign = signing_keys is not None with gpg.Context(home_dir=self.__home_dir, signers=signers) as c: try: ciphertext, __, __ = c.encrypt( plaintext, recipients=recipients, sign=sign, always_trust=True, add_encrypt_to=True, ) except gpg.errors.GPGMEError as e: raise GPGProviderError("Internal GPGME error") from e except gpg.errors.InvalidRecipients as e: raise GPGProviderError( "At least one of the public keys is invalid for encryption" ) from e except gpg.errors.InvalidSigners as e: raise GPGProviderError( "At least one of the signing keys is invalid for signing" ) from e return ciphertext def decrypt( self, ciphertext: bytes, secret_keys: Set[GPGSecretKey], public_keys: Optional[Set[GPGPublicKey]] = None, ) -> bytes: verify = public_keys is not None with gpg.Context(home_dir=self.__home_dir) as c: try: plaintext, result, verify_result = c.decrypt(ciphertext, verify=verify) except gpg.errors.GPGMEError as e: raise GPGProviderError("Internal GPGME error") from e except gpg.UnsupportedAlgorithm as e: raise DecryptionFailed("Unsupported algorithm") from e # TODO: Check whether the data was decrypted using one of the expected secret # keys if public_keys is not None: valid_signature_found = False for public_key in public_keys: assert isinstance(public_key, GPGME_GPGPublicKey) for subkey in public_key.key_obj.subkeys: for sig in verify_result.signatures: if subkey.can_sign and subkey.fpr == sig.fpr: valid_signature_found = True if not valid_signature_found: raise VerificationFailed( "Data not signed by one of the expected public keys" ) return plaintext def list_public_keys(self, user_id: str) -> Set[GPGPublicKey]: with gpg.Context(home_dir=self.__home_dir) as c: try: return { GPGME_GPGPublicKey(key) for key in c.keylist(pattern=user_id, secret=False) } except gpg.errors.GPGMEError as e: raise GPGProviderError("Internal GPGME error") from e def list_secret_keys(self, user_id: str) -> Set[GPGSecretKey]: with gpg.Context(home_dir=self.__home_dir) as c: try: return { GPGME_GPGSecretKey(GPGME_GPGPublicKey(key)) for key in c.keylist(pattern=user_id, secret=True) } except gpg.errors.GPGMEError as e: raise GPGProviderError("Internal GPGME error") from e def can_sign(self, public_key: GPGPublicKey) -> bool: assert isinstance(public_key, GPGME_GPGPublicKey) return any(subkey.can_sign for subkey in public_key.key_obj.subkeys) def can_encrypt(self, public_key: GPGPublicKey) -> bool: assert isinstance(public_key, GPGME_GPGPublicKey) return any(subkey.can_encrypt for subkey in public_key.key_obj.subkeys) def create_key(self, user_id: str) -> GPGSecretKey: with gpg.Context(home_dir=self.__home_dir) as c: try: # A single ECC (primary) key with signing and encryption capabilities is not supported, a # subkey is required for encryption. # Create a primary key only capable of certifying subkeys primary_create_result = c.create_key( user_id, algorithm=self.ALGORITHM, expires=False, certify=True, force=True, ) primary_key_obj = c.get_key(primary_create_result.fpr, secret=True) c.create_subkey( primary_key_obj, algorithm=self.ALGORITHM, expires=False, sign=True ) c.create_subkey( primary_key_obj, algorithm=self.ALGORITHM, expires=False, encrypt=True ) except gpg.errors.GPGMEError as e: raise GPGProviderError("Internal GPGME error") from e except gpg.errors.KeyNotFound as e: raise GPGProviderError("Newly created key not found") from e return GPGME_GPGSecretKey(GPGME_GPGPublicKey(primary_key_obj)) class PublicKeyMetadata(NamedTuple): """ Metadata about a published public key. """ fingerprint: str timestamp: datetime def to_dict(self) -> dict: # Convert the instance to a dictionary and handle datetime serialization data = self._asdict() data["timestamp"] = self.timestamp.isoformat() return data @staticmethod def from_dict(data: dict) -> "PublicKeyMetadata": # Load a serialised dictionary data["timestamp"] = datetime.fromisoformat(data["timestamp"]) return PublicKeyMetadata(**data) @enum.unique class TrustLevel(enum.Enum): """ The trust levels required for BTBV and manual trust. """ TRUSTED: str = "TRUSTED" BLINDLY_TRUSTED: str = "BLINDLY_TRUSTED" UNDECIDED: str = "UNDECIDED" DISTRUSTED: str = "DISTRUSTED" OPENPGP_SCHEMA = xmlschema.XMLSchema( """<?xml version="1.0" encoding="utf8"?> <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" targetNamespace="urn:xmpp:openpgp:0" xmlns="urn:xmpp:openpgp:0"> <xs:element name="openpgp" type="xs:base64Binary"/> </xs:schema> """ ) # The following schema needs verion 1.1 of XML Schema, which is not supported by lxml. # Luckily, xmlschema exists, which is a clean, well maintained, cross-platform # implementation of XML Schema, including version 1.1. CONTENT_SCHEMA = xmlschema.XMLSchema11( """<?xml version="1.1" encoding="utf8"?> <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" targetNamespace="urn:xmpp:openpgp:0" xmlns="urn:xmpp:openpgp:0"> <xs:element name="signcrypt"> <xs:complexType> <xs:all> <xs:element ref="to" maxOccurs="unbounded"/> <xs:element ref="time"/> <xs:element ref="rpad" minOccurs="0"/> <xs:element ref="payload"/> </xs:all> </xs:complexType> </xs:element> <xs:element name="sign"> <xs:complexType> <xs:all> <xs:element ref="to" maxOccurs="unbounded"/> <xs:element ref="time"/> <xs:element ref="rpad" minOccurs="0"/> <xs:element ref="payload"/> </xs:all> </xs:complexType> </xs:element> <xs:element name="crypt"> <xs:complexType> <xs:all> <xs:element ref="to" minOccurs="0" maxOccurs="unbounded"/> <xs:element ref="time"/> <xs:element ref="rpad" minOccurs="0"/> <xs:element ref="payload"/> </xs:all> </xs:complexType> </xs:element> <xs:element name="to"> <xs:complexType> <xs:attribute name="jid" type="xs:string"/> </xs:complexType> </xs:element> <xs:element name="time"> <xs:complexType> <xs:attribute name="stamp" type="xs:dateTime"/> </xs:complexType> </xs:element> <xs:element name="rpad" type="xs:string"/> <xs:element name="payload"> <xs:complexType> <xs:sequence> <xs:any minOccurs="0" maxOccurs="unbounded" processContents="skip"/> </xs:sequence> </xs:complexType> </xs:element> </xs:schema> """ ) PUBLIC_KEYS_LIST_NODE = "urn:xmpp:openpgp:0:public-keys" PUBLIC_KEYS_LIST_SCHEMA = xmlschema.XMLSchema( """<?xml version="1.0" encoding="utf8"?> <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" targetNamespace="urn:xmpp:openpgp:0" xmlns="urn:xmpp:openpgp:0"> <xs:element name="public-keys-list"> <xs:complexType> <xs:sequence> <xs:element ref="pubkey-metadata" minOccurs="0" maxOccurs="unbounded"/> </xs:sequence> </xs:complexType> </xs:element> <xs:element name="pubkey-metadata"> <xs:complexType> <xs:attribute name="v4-fingerprint" type="xs:string"/> <xs:attribute name="date" type="xs:dateTime"/> </xs:complexType> </xs:element> </xs:schema> """ ) PUBKEY_SCHEMA = xmlschema.XMLSchema( """<?xml version="1.0" encoding="utf8"?> <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" targetNamespace="urn:xmpp:openpgp:0" xmlns="urn:xmpp:openpgp:0"> <xs:element name="pubkey"> <xs:complexType> <xs:all> <xs:element ref="data"/> </xs:all> <xs:anyAttribute processContents="skip"/> </xs:complexType> </xs:element> <xs:element name="data" type="xs:base64Binary"/> </xs:schema> """ ) SECRETKEY_SCHEMA = xmlschema.XMLSchema( """<?xml version="1.0" encoding="utf8"?> <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" targetNamespace="urn:xmpp:openpgp:0" xmlns="urn:xmpp:openpgp:0"> <xs:element name="secretkey" type="xs:base64Binary"/> </xs:schema> """ ) DEFAULT_TRUST_MODEL_PARAM = f""" <params> <individual> <category name="{PARAM_CATEGORY}" label={quoteattr(D_('Security'))}> <param name="{PARAM_NAME}" label={quoteattr(D_('OMEMO default trust policy'))} type="list" security="3"> <option value="manual" label={quoteattr(D_('Manual trust (more secure)'))} /> <option value="btbv" label={quoteattr(D_('Blind Trust Before Verification (more user friendly)'))} selected="true" /> </param> </category> </individual> </params> """ def get_gpg_provider(sat: LiberviaBackend, client: SatXMPPClient) -> GPGProvider: """Get the GPG provider for a client. @param sat: The SAT instance. @param client: The client. @return: The GPG provider specifically for that client. """ return GPGME_GPGProvider(str(sat.get_local_path(client, "gnupg-home"))) def generate_passphrase() -> str: """Generate a secure passphrase for symmetric encryption. @return: The passphrase. """ return "-".join( "".join(secrets.choice("123456789ABCDEFGHIJKLMNPQRSTUVWXYZ") for __ in range(4)) for __ in range(6) ) # TODO: Handle the user id mess class XEP_0373: """ Implementation of XEP-0373: OpenPGP for XMPP under namespace ``urn:xmpp:openpgp:0``. """ def __init__(self, host: LiberviaBackend) -> None: """ @param sat: The SAT instance. """ self.host = host # Add configuration option to choose between manual trust and BTBV as the trust # model host.memory.update_params(DEFAULT_TRUST_MODEL_PARAM) self.__xep_0045 = cast(Optional[XEP_0045], host.plugins.get("XEP-0045")) self.__xep_0060 = cast(XEP_0060, host.plugins["XEP-0060"]) self.__storage: Dict[str, persistent.LazyPersistentBinaryDict] = {} xep_0163 = cast(XEP_0163, host.plugins["XEP-0163"]) xep_0163.add_pep_event( "OX_PUBLIC_KEYS_LIST", PUBLIC_KEYS_LIST_NODE, lambda items_event, profile: defer.ensureDeferred( self.__on_public_keys_list_update(items_event, profile) ), ) async def profile_connecting(self, client): client.gpg_provider = get_gpg_provider(self.host, client) async def profile_connected( # pylint: disable=invalid-name self, client: SatXMPPClient ) -> None: """ @param client: The client. """ profile = cast(str, client.profile) if not profile in self.__storage: self.__storage[profile] = persistent.LazyPersistentBinaryDict( "XEP-0373", client.profile ) if len(self.list_secret_keys(client)) == 0: log.debug(f"Generating first GPG key for {client.jid.userhost()}.") await self.create_key(client) async def __on_public_keys_list_update( self, items_event: pubsub.ItemsEvent, profile: str ) -> None: """Handle public keys list updates fired by PEP. @param items_event: The event. @param profile: The profile this event belongs to. """ client = self.host.get_client(profile) sender = cast(jid.JID, items_event.sender) items = cast(List[domish.Element], items_event.items) if len(items) > 1: log.warning("Ignoring public keys list update with more than one element.") return item_elt = next(iter(items), None) if item_elt is None: log.debug("Ignoring empty public keys list update.") return public_keys_list_elt = cast( Optional[domish.Element], next(item_elt.elements(NS_OX, "public-keys-list"), None), ) pubkey_metadata_elts: Optional[List[domish.Element]] = None if public_keys_list_elt is not None: try: PUBLIC_KEYS_LIST_SCHEMA.validate(public_keys_list_elt.toXml()) except xmlschema.XMLSchemaValidationError: pass else: pubkey_metadata_elts = list( public_keys_list_elt.elements(NS_OX, "pubkey-metadata") ) if pubkey_metadata_elts is None: log.warning(f"Malformed public keys list update item: {item_elt.toXml()}") return new_public_keys_metadata = { PublicKeyMetadata( fingerprint=cast(str, pubkey_metadata_elt["v4-fingerprint"]), timestamp=parse_datetime(cast(str, pubkey_metadata_elt["date"])), ) for pubkey_metadata_elt in pubkey_metadata_elts } storage_key = STR_KEY_PUBLIC_KEYS_METADATA.format(sender.userhost()) local_public_keys_metadata = { PublicKeyMetadata.from_dict(pkm) for pkm in await self.__storage[profile].get(storage_key, []) } unchanged_keys = new_public_keys_metadata & local_public_keys_metadata changed_or_new_keys = new_public_keys_metadata - unchanged_keys available_keys = self.list_public_keys(client, sender) for key_metadata in changed_or_new_keys: # Check whether the changed or new key has been imported before if any(key.fingerprint == key_metadata.fingerprint for key in available_keys): try: # If it has been imported before, try to update it await self.import_public_key(client, sender, key_metadata.fingerprint) except Exception as e: log.warning(f"Public key import failed: {e}") # If the update fails, remove the key from the local metadata list # such that the update is attempted again next time new_public_keys_metadata.remove(key_metadata) # Check whether this update was for our account and make sure all of our keys are # included in the update if sender.userhost() == client.jid.userhost(): secret_keys = self.list_secret_keys(client) missing_keys = set( filter( lambda secret_key: all( key_metadata.fingerprint != secret_key.public_key.fingerprint for key_metadata in new_public_keys_metadata ), secret_keys, ) ) if len(missing_keys) > 0: log.warning( "Public keys list update did not contain at least one of our keys." f" {new_public_keys_metadata}" ) for missing_key in missing_keys: log.warning(missing_key.public_key.fingerprint) new_public_keys_metadata.add( PublicKeyMetadata( fingerprint=missing_key.public_key.fingerprint, timestamp=datetime.now(timezone.utc), ) ) await self.publish_public_keys_list(client, new_public_keys_metadata) await self.__storage[profile].force( storage_key, [pkm.to_dict() for pkm in new_public_keys_metadata] ) def list_public_keys(self, client: SatXMPPClient, jid: jid.JID) -> Set[GPGPublicKey]: """List GPG public keys available for a JID. @param client: The client to perform this operation with. @param jid: The JID. Can be a bare JID. @return: The set of public keys available for this JID. """ gpg_provider = get_gpg_provider(self.host, client) return gpg_provider.list_public_keys(f"xmpp:{jid.userhost()}") def list_secret_keys(self, client: SatXMPPClient) -> Set[GPGSecretKey]: """List GPG secret keys available for a JID. @param client: The client to perform this operation with. @return: The set of secret keys available for this JID. """ gpg_provider = get_gpg_provider(self.host, client) return gpg_provider.list_secret_keys(f"xmpp:{client.jid.userhost()}") async def create_key(self, client: SatXMPPClient) -> GPGSecretKey: """Create a new GPG key, capable of signing and encryption. The key is generated without password protection and without expiration. @param client: The client to perform this operation with. @return: The new key. """ gpg_provider = get_gpg_provider(self.host, client) secret_key = gpg_provider.create_key(f"xmpp:{client.jid.userhost()}") await self.publish_public_key(client, secret_key.public_key) storage_key = STR_KEY_PUBLIC_KEYS_METADATA.format(client.jid.userhost()) public_keys_list = { PublicKeyMetadata.from_dict(pkm) for pkm in await self.__storage[client.profile].get(storage_key, []) } public_keys_list.add( PublicKeyMetadata( fingerprint=secret_key.public_key.fingerprint, timestamp=datetime.now(timezone.utc), ) ) await self.publish_public_keys_list(client, public_keys_list) await self.__storage[client.profile].force( storage_key, [pkm.to_dict() for pkm in public_keys_list] ) return secret_key @staticmethod def __build_content_element( element_name: Literal["signcrypt", "sign", "crypt"], recipient_jids: Iterable[jid.JID], include_rpad: bool, ) -> Tuple[domish.Element, domish.Element]: """Build a content element. @param element_name: The name of the content element. @param recipient_jids: The intended recipients of this content element. Can be bare JIDs. @param include_rpad: Whether to include random-length random-content padding. @return: The content element and the ``<payload/>`` element to add the stanza extension elements to. """ content_elt = domish.Element((NS_OX, element_name)) for recipient_jid in recipient_jids: content_elt.addElement("to")["jid"] = recipient_jid.userhost() content_elt.addElement("time")["stamp"] = format_datetime() if include_rpad: # XEP-0373 doesn't specify bounds for the length of the random padding. This # uses the bounds specified in XEP-0420 for the closely related rpad affix. rpad_length = secrets.randbelow(201) rpad_content = "".join( secrets.choice(string.digits + string.ascii_letters + string.punctuation) for __ in range(rpad_length) ) content_elt.addElement("rpad", content=rpad_content) payload_elt = content_elt.addElement("payload") return content_elt, payload_elt @staticmethod def build_signcrypt_element( recipient_jids: Iterable[jid.JID], ) -> Tuple[domish.Element, domish.Element]: """Build a ``<signcrypt/>`` content element. @param recipient_jids: The intended recipients of this content element. Can be bare JIDs. @return: The ``<signcrypt/>`` element and the ``<payload/>`` element to add the stanza extension elements to. """ if len(recipient_jids) == 0: raise ValueError("Recipient JIDs must be provided.") return XEP_0373.__build_content_element("signcrypt", recipient_jids, True) @staticmethod def build_sign_element( recipient_jids: Iterable[jid.JID], include_rpad: bool ) -> Tuple[domish.Element, domish.Element]: """Build a ``<sign/>`` content element. @param recipient_jids: The intended recipients of this content element. Can be bare JIDs. @param include_rpad: Whether to include random-length random-content padding, which is OPTIONAL for the ``<sign/>`` content element. @return: The ``<sign/>`` element and the ``<payload/>`` element to add the stanza extension elements to. """ if len(recipient_jids) == 0: raise ValueError("Recipient JIDs must be provided.") return XEP_0373.__build_content_element("sign", recipient_jids, include_rpad) @staticmethod def build_crypt_element( recipient_jids: Iterable[jid.JID], ) -> Tuple[domish.Element, domish.Element]: """Build a ``<crypt/>`` content element. @param recipient_jids: The intended recipients of this content element. Specifying the intended recipients is OPTIONAL for the ``<crypt/>`` content element. Can be bare JIDs. @return: The ``<crypt/>`` element and the ``<payload/>`` element to add the stanza extension elements to. """ return XEP_0373.__build_content_element("crypt", recipient_jids, True) async def build_openpgp_element( self, client: SatXMPPClient, content_elt: domish.Element, recipient_jids: Set[jid.JID], ) -> domish.Element: """Build an ``<openpgp/>`` element. @param client: The client to perform this operation with. @param content_elt: The content element to contain in the ``<openpgp/>`` element. @param recipient_jids: The recipient's JIDs. Can be bare JIDs. @return: The ``<openpgp/>`` element. """ gpg_provider = get_gpg_provider(self.host, client) # TODO: I'm not sure whether we want to sign with all keys by default or choose # just one key/a subset of keys to sign with. signing_keys = set( filter( lambda secret_key: gpg_provider.can_sign(secret_key.public_key), self.list_secret_keys(client), ) ) encryption_keys: Set[GPGPublicKey] = set() for recipient_jid in recipient_jids: # import all keys of the recipient all_public_keys = await self.import_all_public_keys(client, recipient_jid) # Filter for keys that can encrypt encryption_keys |= set(filter(gpg_provider.can_encrypt, all_public_keys)) # TODO: Handle trust content = content_elt.toXml().encode("utf-8") data: bytes if content_elt.name == "signcrypt": data = gpg_provider.encrypt(content, encryption_keys, signing_keys) elif content_elt.name == "sign": data = gpg_provider.sign(content, signing_keys) elif content_elt.name == "crypt": data = gpg_provider.encrypt(content, encryption_keys) else: raise ValueError(f"Unknown content element <{content_elt.name}/>") openpgp_elt = domish.Element((NS_OX, "openpgp")) openpgp_elt.addContent(base64.b64encode(data).decode("ASCII")) return openpgp_elt async def unpack_openpgp_element( self, client: SatXMPPClient, openpgp_elt: domish.Element, element_name: Literal["signcrypt", "sign", "crypt"], sender_jid: jid.JID, ) -> Tuple[domish.Element, datetime]: """Verify, decrypt and unpack an ``<openpgp/>`` element. @param client: The client to perform this operation with. @param openpgp_elt: The ``<openpgp/>`` element. @param element_name: The name of the content element. @param sender_jid: The sender's JID. Can be a bare JID. @return: The ``<payload/>`` element containing the decrypted/verified stanza extension elements carried by this ``<openpgp/>`` element, and the timestamp contained in the content element. @raise exceptions.ParsingError: on syntactical verification errors. @raise VerificationError: on semantical verification errors accoding to XEP-0373. @raise DecryptionFailed: on decryption failure. @raise VerificationFailed: if the data could not be verified. @warning: The timestamp is not verified for plausibility; this SHOULD be done by the calling code. """ gpg_provider = get_gpg_provider(self.host, client) decryption_keys = set( filter( lambda secret_key: gpg_provider.can_encrypt(secret_key.public_key), self.list_secret_keys(client), ) ) # import all keys of the sender all_public_keys = await self.import_all_public_keys(client, sender_jid) # Filter for keys that can sign verification_keys = set(filter(gpg_provider.can_sign, all_public_keys)) # TODO: Handle trust try: OPENPGP_SCHEMA.validate(openpgp_elt.toXml()) except xmlschema.XMLSchemaValidationError as e: raise exceptions.ParsingError( "<openpgp/> element doesn't pass schema validation." ) from e openpgp_message = base64.b64decode(str(openpgp_elt)) content: bytes if element_name == "signcrypt": content = gpg_provider.decrypt( openpgp_message, decryption_keys, public_keys=verification_keys ) elif element_name == "sign": content = gpg_provider.verify(openpgp_message, verification_keys) elif element_name == "crypt": content = gpg_provider.decrypt(openpgp_message, decryption_keys) else: assert_never(element_name) try: content_elt = cast( domish.Element, xml_tools.ElementParser()(content.decode("utf-8")) ) except UnicodeDecodeError as e: raise exceptions.ParsingError("UTF-8 decoding error") from e try: CONTENT_SCHEMA.validate(content_elt.toXml()) except xmlschema.XMLSchemaValidationError as e: raise exceptions.ParsingError( f"<{element_name}/> element doesn't pass schema validation." ) from e if content_elt.name != element_name: raise exceptions.ParsingError(f"Not a <{element_name}/> element.") recipient_jids = { jid.JID(to_elt["jid"]) for to_elt in content_elt.elements(NS_OX, "to") } if ( client.jid.userhostJID() not in {jid.userhostJID() for jid in recipient_jids} and element_name != "crypt" ): raise VerificationError( f"Recipient list in <{element_name}/> element does not list our (bare)" f" JID." ) time_elt = next(content_elt.elements(NS_OX, "time")) timestamp = parse_datetime(time_elt["stamp"]) payload_elt = next(content_elt.elements(NS_OX, "payload")) return payload_elt, timestamp async def publish_public_key( self, client: SatXMPPClient, public_key: GPGPublicKey ) -> None: """Publish a public key. @param client: The client. @param public_key: The public key to publish. @raise XMPPInteractionFailed: if any interaction via XMPP failed. """ gpg_provider = get_gpg_provider(self.host, client) packet = gpg_provider.export_public_key(public_key) node = f"urn:xmpp:openpgp:0:public-keys:{public_key.fingerprint}" pubkey_elt = domish.Element((NS_OX, "pubkey")) pubkey_elt.addElement("data", content=base64.b64encode(packet).decode("ASCII")) try: await self.__xep_0060.send_item( client, client.jid.userhostJID(), node, pubkey_elt, format_datetime(), extra={ XEP_0060.EXTRA_PUBLISH_OPTIONS: { XEP_0060.OPT_PERSIST_ITEMS: "true", XEP_0060.OPT_ACCESS_MODEL: "open", XEP_0060.OPT_MAX_ITEMS: 1, }, # TODO: Do we really want publish_without_options here? XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options", }, ) except Exception as e: raise XMPPInteractionFailed("Publishing the public key failed.") from e async def import_all_public_keys( self, client: SatXMPPClient, entity_jid: jid.JID ) -> Set[GPGPublicKey]: """import all public keys of a JID that have not been imported before. @param client: The client. @param jid: The JID. Can be a bare JID. @return: The public keys. @note: Failure to import a key simply results in the key not being included in the result. """ available_public_keys = self.list_public_keys(client, entity_jid) storage_key = STR_KEY_PUBLIC_KEYS_METADATA.format(entity_jid.userhost()) public_keys_metadata = { PublicKeyMetadata.from_dict(pkm) for pkm in await self.__storage[client.profile].get(storage_key, []) } if not public_keys_metadata: public_keys_metadata = await self.download_public_keys_list( client, entity_jid ) if not public_keys_metadata: raise exceptions.NotFound(f"Can't find public keys for {entity_jid}") else: await self.__storage[client.profile].aset( storage_key, [pkm.to_dict() for pkm in public_keys_metadata] ) missing_keys = set( filter( lambda public_key_metadata: all( public_key_metadata.fingerprint != public_key.fingerprint for public_key in available_public_keys ), public_keys_metadata, ) ) for missing_key in missing_keys: try: available_public_keys.add( await self.import_public_key( client, entity_jid, missing_key.fingerprint ) ) except Exception as e: log.warning( f"import of public key {missing_key.fingerprint} owned by" f" {entity_jid.userhost()} failed, ignoring: {e}" ) return available_public_keys async def import_public_key( self, client: SatXMPPClient, jid: jid.JID, fingerprint: str ) -> GPGPublicKey: """import a public key. @param client: The client. @param jid: The JID owning the public key. Can be a bare JID. @param fingerprint: The fingerprint of the public key. @return: The public key. @raise exceptions.NotFound: if the public key was not found. @raise exceptions.ParsingError: on XML-level parsing errors. @raise InvalidPacket: if the packet is either syntactically or semantically deemed invalid. @raise XMPPInteractionFailed: if any interaction via XMPP failed. """ gpg_provider = get_gpg_provider(self.host, client) node = f"urn:xmpp:openpgp:0:public-keys:{fingerprint}" try: items, __ = await self.__xep_0060.get_items( client, jid.userhostJID(), node, max_items=1 ) except exceptions.NotFound as e: raise exceptions.NotFound( f"No public key with fingerprint {fingerprint} published by JID" f" {jid.userhost()}." ) from e except Exception as e: raise XMPPInteractionFailed("Fetching the public keys list failed.") from e try: item_elt = cast(domish.Element, items[0]) except IndexError as e: raise exceptions.NotFound( f"No public key with fingerprint {fingerprint} published by JID" f" {jid.userhost()}." ) from e pubkey_elt = cast( Optional[domish.Element], next(item_elt.elements(NS_OX, "pubkey"), None) ) if pubkey_elt is None: raise exceptions.ParsingError( f"Publish-Subscribe item of JID {jid.userhost()} doesn't contain pubkey" f" element." ) try: PUBKEY_SCHEMA.validate(pubkey_elt.toXml()) except xmlschema.XMLSchemaValidationError as e: raise exceptions.ParsingError( f"Publish-Subscribe item of JID {jid.userhost()} doesn't pass pubkey" f" schema validation." ) from e public_key = gpg_provider.import_public_key( base64.b64decode(str(next(pubkey_elt.elements(NS_OX, "data")))) ) return public_key async def publish_public_keys_list( self, client: SatXMPPClient, public_keys_list: Iterable[PublicKeyMetadata] ) -> None: """Publish/update the own public keys list. @param client: The client. @param public_keys_list: The public keys list. @raise XMPPInteractionFailed: if any interaction via XMPP failed. @warning: All public keys referenced in the public keys list MUST be published beforehand. """ if len({pkm.fingerprint for pkm in public_keys_list}) != len(public_keys_list): raise ValueError("Public keys list contains duplicate fingerprints.") node = "urn:xmpp:openpgp:0:public-keys" public_keys_list_elt = domish.Element((NS_OX, "public-keys-list")) for public_key_metadata in public_keys_list: pubkey_metadata_elt = public_keys_list_elt.addElement("pubkey-metadata") pubkey_metadata_elt["v4-fingerprint"] = public_key_metadata.fingerprint pubkey_metadata_elt["date"] = format_datetime(public_key_metadata.timestamp) try: await self.__xep_0060.send_item( client, client.jid.userhostJID(), node, public_keys_list_elt, item_id=XEP_0060.ID_SINGLETON, extra={ XEP_0060.EXTRA_PUBLISH_OPTIONS: { XEP_0060.OPT_PERSIST_ITEMS: "true", XEP_0060.OPT_ACCESS_MODEL: "open", XEP_0060.OPT_MAX_ITEMS: 1, }, # TODO: Do we really want publish_without_options here? XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options", }, ) except Exception as e: raise XMPPInteractionFailed("Publishing the public keys list failed.") from e async def download_public_keys_list( self, client: SatXMPPClient, jid: jid.JID ) -> Optional[Set[PublicKeyMetadata]]: """Download the public keys list of a JID. @param client: The client. @param jid: The JID. Can be a bare JID. @return: The public keys list or ``None`` if the JID hasn't published a public keys list. An empty list means the JID has published an empty list. @raise exceptions.ParsingError: on XML-level parsing errors. @raise XMPPInteractionFailed: if any interaction via XMPP failed. """ node = "urn:xmpp:openpgp:0:public-keys" try: items, __ = await self.__xep_0060.get_items( client, jid.userhostJID(), node, max_items=1 ) except exceptions.NotFound: return None except Exception as e: raise XMPPInteractionFailed() from e try: item_elt = cast(domish.Element, items[0]) except IndexError: return None public_keys_list_elt = cast( Optional[domish.Element], next(item_elt.elements(NS_OX, "public-keys-list"), None), ) if public_keys_list_elt is None: return None try: PUBLIC_KEYS_LIST_SCHEMA.validate(public_keys_list_elt.toXml()) except xmlschema.XMLSchemaValidationError as e: raise exceptions.ParsingError( f"Publish-Subscribe item of JID {jid.userhost()} doesn't pass public keys" f" list schema validation." ) from e return { PublicKeyMetadata( fingerprint=pubkey_metadata_elt["v4-fingerprint"], timestamp=parse_datetime(pubkey_metadata_elt["date"]), ) for pubkey_metadata_elt in public_keys_list_elt.elements( NS_OX, "pubkey-metadata" ) } async def __prepare_secret_key_synchronization( self, client: SatXMPPClient ) -> Optional[domish.Element]: """Prepare for secret key synchronization. Makes sure the relative protocols and protocol extensions are supported by the server and makes sure that the PEP node for secret synchronization exists and is configured correctly. The node is created if necessary. @param client: The client. @return: As part of the preparations, the secret key synchronization PEP node is fetched. The result of that fetch is returned here. @raise exceptions.FeatureNotFound: if the server lacks support for the required protocols or protocol extensions. @raise XMPPInteractionFailed: if any interaction via XMPP failed. """ try: infos = cast( DiscoInfo, await self.host.memory.disco.get_infos(client, client.jid.userhostJID()), ) except Exception as e: raise XMPPInteractionFailed( "Error performing service discovery on the own bare JID." ) from e identities = cast(Dict[Tuple[str, str], str], infos.identities) features = cast(Set[DiscoFeature], infos.features) if ("pubsub", "pep") not in identities: raise exceptions.FeatureNotFound("Server doesn't support PEP.") if "http://jabber.org/protocol/pubsub#access-whitelist" not in features: raise exceptions.FeatureNotFound( "Server doesn't support the whitelist access model." ) persistent_items_supported = ( "http://jabber.org/protocol/pubsub#persistent-items" in features ) # TODO: persistent-items is a SHOULD, how do we handle the feature missing? node = "urn:xmpp:openpgp:0:secret-key" try: items, __ = await self.__xep_0060.get_items( client, client.jid.userhostJID(), node, max_items=1 ) except exceptions.NotFound: try: await self.__xep_0060.createNode( client, client.jid.userhostJID(), node, { XEP_0060.OPT_PERSIST_ITEMS: "true", XEP_0060.OPT_ACCESS_MODEL: "whitelist", XEP_0060.OPT_MAX_ITEMS: "1", }, ) except Exception as e: raise XMPPInteractionFailed( "Error creating the secret key synchronization node." ) from e except Exception as e: raise XMPPInteractionFailed( "Error fetching the secret key synchronization node." ) from e try: return cast(domish.Element, items[0]) except IndexError: return None async def export_secret_keys( self, client: SatXMPPClient, secret_keys: Iterable[GPGSecretKey] ) -> str: """Export secret keys to synchronize them with other devices. @param client: The client. @param secret_keys: The secret keys to export. @return: The backup code needed to decrypt the exported secret keys. @raise exceptions.FeatureNotFound: if the server lacks support for the required protocols or protocol extensions. @raise XMPPInteractionFailed: if any interaction via XMPP failed. """ gpg_provider = get_gpg_provider(self.host, client) await self.__prepare_secret_key_synchronization(client) backup_code = generate_passphrase() plaintext = b"".join( gpg_provider.backup_secret_key(secret_key) for secret_key in secret_keys ) ciphertext = gpg_provider.encrypt_symmetrically(plaintext, backup_code) node = "urn:xmpp:openpgp:0:secret-key" secretkey_elt = domish.Element((NS_OX, "secretkey")) secretkey_elt.addContent(base64.b64encode(ciphertext).decode("ASCII")) try: await self.__xep_0060.send_item( client, client.jid.userhostJID(), node, secretkey_elt ) except Exception as e: raise XMPPInteractionFailed("Publishing the secret keys failed.") from e return backup_code async def download_secret_keys(self, client: SatXMPPClient) -> Optional[bytes]: """Download previously exported secret keys to import them in a second step. The downloading and importing steps are separate since a backup code is required for the import and it should be possible to try multiple backup codes without redownloading the data every time. The second half of the import procedure is provided by :meth:`import_secret_keys`. @param client: The client. @return: The encrypted secret keys previously exported, if any. @raise exceptions.FeatureNotFound: if the server lacks support for the required protocols or protocol extensions. @raise exceptions.ParsingError: on XML-level parsing errors. @raise XMPPInteractionFailed: if any interaction via XMPP failed. """ item_elt = await self.__prepare_secret_key_synchronization(client) if item_elt is None: return None secretkey_elt = cast( Optional[domish.Element], next(item_elt.elements(NS_OX, "secretkey"), None) ) if secretkey_elt is None: return None try: SECRETKEY_SCHEMA.validate(secretkey_elt.toXml()) except xmlschema.XMLSchemaValidationError as e: raise exceptions.ParsingError( "Publish-Subscribe item doesn't pass secretkey schema validation." ) from e return base64.b64decode(str(secretkey_elt)) def import_secret_keys( self, client: SatXMPPClient, ciphertext: bytes, backup_code: str ) -> Set[GPGSecretKey]: """import previously downloaded secret keys. The downloading and importing steps are separate since a backup code is required for the import and it should be possible to try multiple backup codes without redownloading the data every time. The first half of the import procedure is provided by :meth:`download_secret_keys`. @param client: The client to perform this operation with. @param ciphertext: The ciphertext, i.e. the data returned by :meth:`download_secret_keys`. @param backup_code: The backup code needed to decrypt the data. @raise InvalidPacket: if one of the GPG packets building the secret key data is either syntactically or semantically deemed invalid. @raise DecryptionFailed: on decryption failure. """ gpg_provider = get_gpg_provider(self.host, client) return gpg_provider.restore_secret_keys( gpg_provider.decrypt_symmetrically(ciphertext, backup_code) ) @staticmethod def __get_joined_muc_users( client: SatXMPPClient, xep_0045: XEP_0045, room_jid: jid.JID ) -> Set[jid.JID]: """ @param client: The client. @param xep_0045: A MUC plugin instance. @param room_jid: The room JID. @return: A set containing the bare JIDs of the MUC participants. @raise InternalError: if the MUC is not joined or the entity information of a participant isn't available. """ # TODO: This should probably be a global helper somewhere bare_jids: Set[jid.JID] = set() try: room = cast(muc.Room, xep_0045.get_room(client, room_jid)) except exceptions.NotFound as e: raise exceptions.InternalError( "Participant list of unjoined MUC requested." ) from e for user in cast(Dict[str, muc.User], room.roster).values(): entity = cast(Optional[SatXMPPEntity], user.entity) if entity is None: raise exceptions.InternalError( f"Participant list of MUC requested, but the entity information of" f" the participant {user} is not available." ) bare_jids.add(entity.jid.userhostJID()) return bare_jids async def get_trust( self, client: SatXMPPClient, public_key: GPGPublicKey, owner: jid.JID ) -> TrustLevel: """Query the trust level of a public key. @param client: The client to perform this operation under. @param public_key: The public key. @param owner: The owner of the public key. Can be a bare JID. @return: The trust level. """ key = f"/trust/{owner.userhost()}/{public_key.fingerprint}" try: return TrustLevel(await self.__storage[client.profile][key]) except KeyError: return TrustLevel.UNDECIDED async def set_trust( self, client: SatXMPPClient, public_key: GPGPublicKey, owner: jid.JID, trust_level: TrustLevel, ) -> None: """Set the trust level of a public key. @param client: The client to perform this operation under. @param public_key: The public key. @param owner: The owner of the public key. Can be a bare JID. @param trust_leve: The trust level. """ key = f"/trust/{owner.userhost()}/{public_key.fingerprint}" await self.__storage[client.profile].force(key, trust_level.name) async def get_trust_ui( # pylint: disable=invalid-name self, client: SatXMPPClient, entity: jid.JID ) -> xml_tools.XMLUI: """ @param client: The client. @param entity: The entity whose device trust levels to manage. @return: An XMLUI instance which opens a form to manage the trust level of all devices belonging to the entity. """ if entity.resource: raise ValueError("A bare JID is expected.") bare_jids: Set[jid.JID] if self.__xep_0045 is not None and self.__xep_0045.is_joined_room(client, entity): bare_jids = self.__get_joined_muc_users(client, self.__xep_0045, entity) else: bare_jids = {entity.userhostJID()} all_public_keys = list( { bare_jid: list(self.list_public_keys(client, bare_jid)) for bare_jid in bare_jids }.items() ) async def callback( data: Any, profile: str # pylint: disable=unused-argument ) -> Dict[Never, Never]: """ @param data: The XMLUI result produces by the trust UI form. @param profile: The profile. @return: An empty dictionary. The type of the return value was chosen conservatively since the exact options are neither known not needed here. """ if C.bool(data.get("cancelled", "false")): return {} data_form_result = cast( Dict[str, str], xml_tools.xmlui_result_2_data_form_result(data) ) for key, value in data_form_result.items(): if not key.startswith("trust_"): continue outer_index, inner_index = key.split("_")[1:] owner, public_keys = all_public_keys[int(outer_index)] public_key = public_keys[int(inner_index)] trust = TrustLevel(value) if (await self.get_trust(client, public_key, owner)) is not trust: await self.set_trust(client, public_key, owner, value) return {} submit_id = self.host.register_callback(callback, with_data=True, one_shot=True) result = xml_tools.XMLUI( panel_type=C.XMLUI_FORM, title=D_("OX trust management"), submit_id=submit_id ) # Casting this to Any, otherwise all calls on the variable cause type errors # pylint: disable=no-member trust_ui = cast(Any, result) trust_ui.addText( D_( "This is OX trusting system. You'll see below the GPG keys of your " "contacts, and a list selection to trust them or not. A trusted key " "can read your messages in plain text, so be sure to only validate " "keys that you are sure are belonging to your contact. It's better " "to do this when you are next to your contact, so " 'you can check the "fingerprint" of the key ' "yourself. Do *not* validate a key if the fingerprint is wrong!" ) ) own_secret_keys = self.list_secret_keys(client) trust_ui.change_container("label") for index, secret_key in enumerate(own_secret_keys): trust_ui.addLabel(D_(f"Own secret key {index} fingerprint")) trust_ui.addText(secret_key.public_key.fingerprint) trust_ui.addEmpty() trust_ui.addEmpty() for outer_index, [owner, public_keys] in enumerate(all_public_keys): for inner_index, public_key in enumerate(public_keys): trust_ui.addLabel(D_("Contact")) trust_ui.addJid(jid.JID(owner)) trust_ui.addLabel(D_("Fingerprint")) trust_ui.addText(public_key.fingerprint) trust_ui.addLabel(D_("Trust this device?")) current_trust_level = await self.get_trust(client, public_key, owner) avaiable_trust_levels = { TrustLevel.DISTRUSTED, TrustLevel.TRUSTED, current_trust_level, } trust_ui.addList( f"trust_{outer_index}_{inner_index}", options=[trust_level.name for trust_level in avaiable_trust_levels], selected=current_trust_level.name, styles=["inline"], ) trust_ui.addEmpty() trust_ui.addEmpty() return result