diff libervia/backend/plugins/plugin_xep_0373.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_xep_0373.py@524856bd7b19
children 040095a5dc7f
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_xep_0373.py	Fri Jun 02 11:49:51 2023 +0200
@@ -0,0 +1,2102 @@
+#!/usr/bin/env python3
+
+# Libervia plugin for OpenPGP for XMPP
+# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from abc import ABC, abstractmethod
+import base64
+from datetime import datetime, timezone
+import enum
+import secrets
+import string
+from typing import Any, Dict, Iterable, List, Literal, Optional, Set, Tuple, cast
+from xml.sax.saxutils import quoteattr
+
+from typing_extensions import Final, NamedTuple, Never, assert_never
+from wokkel import muc, pubsub
+from wokkel.disco import DiscoFeature, DiscoInfo
+import xmlschema
+
+from libervia.backend.core import exceptions
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.core_types import SatXMPPEntity
+from libervia.backend.core.i18n import _, D_
+from libervia.backend.core.log import getLogger, Logger
+from libervia.backend.core.sat_main import SAT
+from libervia.backend.core.xmpp import SatXMPPClient
+from libervia.backend.memory import persistent
+from libervia.backend.plugins.plugin_xep_0045 import XEP_0045
+from libervia.backend.plugins.plugin_xep_0060 import XEP_0060
+from libervia.backend.plugins.plugin_xep_0163 import XEP_0163
+from libervia.backend.tools.xmpp_datetime import format_datetime, parse_datetime
+from libervia.backend.tools import xml_tools
+from twisted.internet import defer
+from twisted.words.protocols.jabber import jid
+from twisted.words.xish import domish
+
+try:
+    import gpg
+except ImportError as import_error:
+    raise exceptions.MissingModule(
+        "You are missing the 'gpg' package required by the OX plugin. The recommended"
+        " installation method is via your operating system's package manager, since the"
+        " version of the library has to match the version of your GnuPG installation. See"
+        " https://wiki.python.org/moin/GnuPrivacyGuard#Accessing_GnuPG_via_gpgme"
+    ) from import_error
+
+
+__all__ = [  # pylint: disable=unused-variable
+    "PLUGIN_INFO",
+    "NS_OX",
+    "XEP_0373",
+    "VerificationError",
+    "XMPPInteractionFailed",
+    "InvalidPacket",
+    "DecryptionFailed",
+    "VerificationFailed",
+    "UnknownKey",
+    "GPGProviderError",
+    "GPGPublicKey",
+    "GPGSecretKey",
+    "GPGProvider",
+    "PublicKeyMetadata",
+    "gpg_provider",
+    "TrustLevel"
+]
+
+
+log = cast(Logger, getLogger(__name__))  # type: ignore[no-untyped-call]
+
+
+PLUGIN_INFO = {
+    C.PI_NAME: "XEP-0373",
+    C.PI_IMPORT_NAME: "XEP-0373",
+    C.PI_TYPE: "SEC",
+    C.PI_PROTOCOLS: [ "XEP-0373" ],
+    C.PI_DEPENDENCIES: [ "XEP-0060", "XEP-0163" ],
+    C.PI_RECOMMENDATIONS: [],
+    C.PI_MAIN: "XEP_0373",
+    C.PI_HANDLER: "no",
+    C.PI_DESCRIPTION: D_("Implementation of OpenPGP for XMPP"),
+}
+
+
+NS_OX: Final = "urn:xmpp:openpgp:0"
+
+
+PARAM_CATEGORY = "Security"
+PARAM_NAME = "ox_policy"
+STR_KEY_PUBLIC_KEYS_METADATA = "/public-keys-metadata/{}"
+
+
+class VerificationError(Exception):
+    """
+    Raised by verifying methods of :class:`XEP_0373` on semantical verification errors.
+    """
+
+
+class XMPPInteractionFailed(Exception):
+    """
+    Raised by methods of :class:`XEP_0373` on XMPP interaction failure. The reason this
+    exception exists is that the exceptions raised by XMPP interactions are not properly
+    documented for the most part, thus all exceptions are caught and wrapped in instances
+    of this class.
+    """
+
+
+class InvalidPacket(ValueError):
+    """
+    Raised by methods of :class:`GPGProvider` when an invalid packet is encountered.
+    """
+
+
+class DecryptionFailed(Exception):
+    """
+    Raised by methods of :class:`GPGProvider` on decryption failures.
+    """
+
+
+class VerificationFailed(Exception):
+    """
+    Raised by methods of :class:`GPGProvider` on verification failures.
+    """
+
+
+class UnknownKey(ValueError):
+    """
+    Raised by methods of :class:`GPGProvider` when an unknown key is referenced.
+    """
+
+
+class GPGProviderError(Exception):
+    """
+    Raised by methods of :class:`GPGProvider` on internal errors.
+    """
+
+
+class GPGPublicKey(ABC):
+    """
+    Interface describing a GPG public key.
+    """
+
+    @property
+    @abstractmethod
+    def fingerprint(self) -> str:
+        """
+        @return: The OpenPGP v4 fingerprint string of this public key.
+        """
+
+
+class GPGSecretKey(ABC):
+    """
+    Interface descibing a GPG secret key.
+    """
+
+    @property
+    @abstractmethod
+    def public_key(self) -> GPGPublicKey:
+        """
+        @return: The public key corresponding to this secret key.
+        """
+
+
+class GPGProvider(ABC):
+    """
+    Interface describing a GPG provider, i.e. a library or framework providing GPG
+    encryption, signing and key management.
+
+    All methods may raise :class:`GPGProviderError` in addition to those exception types
+    listed explicitly.
+
+    # TODO: Check keys for revoked, disabled and expired everywhere and exclude those (?)
+    """
+
+    @abstractmethod
+    def export_public_key(self, public_key: GPGPublicKey) -> bytes:
+        """Export a public key in a key material packet according to RFC 4880 §5.5.
+
+        Do not use OpenPGP's ASCII Armor.
+
+        @param public_key: The public key to export.
+        @return: The packet containing the exported public key.
+        @raise UnknownKey: if the public key is not available.
+        """
+
+    @abstractmethod
+    def import_public_key(self, packet: bytes) -> GPGPublicKey:
+        """import a public key from a key material packet according to RFC 4880 §5.5.
+
+        OpenPGP's ASCII Armor is not used.
+
+        @param packet: A packet containing an exported public key.
+        @return: The public key imported from the packet.
+        @raise InvalidPacket: if the packet is either syntactically or semantically deemed
+            invalid.
+
+        @warning: Only packets of version 4 or higher may be accepted, packets below
+            version 4 MUST be rejected.
+        """
+
+    @abstractmethod
+    def backup_secret_key(self, secret_key: GPGSecretKey) -> bytes:
+        """Export a secret key for transfer according to RFC 4880 §11.1.
+
+        Do not encrypt the secret data, i.e. set the octet indicating string-to-key usage
+        conventions to zero in the corresponding secret-key packet according to RFC 4880
+        §5.5.3. Do not use OpenPGP's ASCII Armor.
+
+        @param secret_key: The secret key to export.
+        @return: The binary blob containing the exported secret key.
+        @raise UnknownKey: if the secret key is not available.
+        """
+
+    @abstractmethod
+    def restore_secret_keys(self, data: bytes) -> Set[GPGSecretKey]:
+        """Restore secret keys exported for transfer according to RFC 4880 §11.1.
+
+        The secret data is not encrypted, i.e. the octet indicating string-to-key usage
+        conventions in the corresponding secret-key packets according to RFC 4880 §5.5.3
+        are set to zero. OpenPGP's ASCII Armor is not used.
+
+        @param data: Concatenation of one or more secret keys exported for transfer.
+        @return: The secret keys imported from the data.
+        @raise InvalidPacket: if the data or one of the packets included in the data is
+            either syntactically or semantically deemed invalid.
+
+        @warning: Only packets of version 4 or higher may be accepted, packets below
+            version 4 MUST be rejected.
+        """
+
+    @abstractmethod
+    def encrypt_symmetrically(self, plaintext: bytes, password: str) -> bytes:
+        """Encrypt data symmetrically according to RFC 4880 §5.3.
+
+        The password is used to build a Symmetric-Key Encrypted Session Key packet which
+        precedes the Symmetrically Encrypted Data packet that holds the encrypted data.
+
+        @param plaintext: The data to encrypt.
+        @param password: The password to encrypt the data with.
+        @return: The encrypted data.
+        """
+
+    @abstractmethod
+    def decrypt_symmetrically(self, ciphertext: bytes, password: str) -> bytes:
+        """Decrypt data symmetrically according to RFC 4880 §5.3.
+
+        The ciphertext consists of a Symmetrically Encrypted Data packet that holds the
+        encrypted data, preceded by a Symmetric-Key Encrypted Session Key packet using the
+        password.
+
+        @param ciphertext: The ciphertext.
+        @param password: The password to decrypt the data with.
+        @return: The plaintext.
+        @raise DecryptionFailed: on decryption failure.
+        """
+
+    @abstractmethod
+    def sign(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes:
+        """Sign some data.
+
+        OpenPGP's ASCII Armor is not used.
+
+        @param data: The data to sign.
+        @param secret_keys: The secret keys to sign the data with.
+        @return: The OpenPGP message carrying the signed data.
+        """
+
+    @abstractmethod
+    def sign_detached(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes:
+        """Sign some data. Create the signature detached from the data.
+
+        OpenPGP's ASCII Armor is not used.
+
+        @param data: The data to sign.
+        @param secret_keys: The secret keys to sign the data with.
+        @return: The OpenPGP message carrying the detached signature.
+        """
+
+    @abstractmethod
+    def verify(self, signed_data: bytes, public_keys: Set[GPGPublicKey]) -> bytes:
+        """Verify signed data.
+
+        OpenPGP's ASCII Armor is not used.
+
+        @param signed_data: The signed data as an OpenPGP message.
+        @param public_keys: The public keys to verify the signature with.
+        @return: The verified and unpacked data.
+        @raise VerificationFailed: if the data could not be verified.
+
+        @warning: For implementors: it has to be confirmed that a valid signature by one
+            of the public keys is available.
+        """
+
+    @abstractmethod
+    def verify_detached(
+        self,
+        data: bytes,
+        signature: bytes,
+        public_keys: Set[GPGPublicKey]
+    ) -> None:
+        """Verify signed data, where the signature was created detached from the data.
+
+        OpenPGP's ASCII Armor is not used.
+
+        @param data: The data.
+        @param signature: The signature as an OpenPGP message.
+        @param public_keys: The public keys to verify the signature with.
+        @raise VerificationFailed: if the data could not be verified.
+
+        @warning: For implementors: it has to be confirmed that a valid signature by one
+            of the public keys is available.
+        """
+
+    @abstractmethod
+    def encrypt(
+        self,
+        plaintext: bytes,
+        public_keys: Set[GPGPublicKey],
+        signing_keys: Optional[Set[GPGSecretKey]] = None
+    ) -> bytes:
+        """Encrypt and optionally sign some data.
+
+        OpenPGP's ASCII Armor is not used.
+
+        @param plaintext: The data to encrypt and optionally sign.
+        @param public_keys: The public keys to encrypt the data for.
+        @param signing_keys: The secret keys to sign the data with.
+        @return: The OpenPGP message carrying the encrypted and optionally signed data.
+        """
+
+    @abstractmethod
+    def decrypt(
+        self,
+        ciphertext: bytes,
+        secret_keys: Set[GPGSecretKey],
+        public_keys: Optional[Set[GPGPublicKey]] = None
+    ) -> bytes:
+        """Decrypt and optionally verify some data.
+
+        OpenPGP's ASCII Armor is not used.
+
+        @param ciphertext: The encrypted and optionally signed data as an OpenPGP message.
+        @param secret_keys: The secret keys to attempt decryption with.
+        @param public_keys: The public keys to verify the optional signature with.
+        @return: The decrypted, optionally verified and unpacked data.
+        @raise DecryptionFailed: on decryption failure.
+        @raise VerificationFailed: if the data could not be verified.
+
+        @warning: For implementors: it has to be confirmed that the data was decrypted
+            using one of the secret keys and that a valid signature by one of the public
+            keys is available in case the data is signed.
+        """
+
+    @abstractmethod
+    def list_public_keys(self, user_id: str) -> Set[GPGPublicKey]:
+        """List public keys.
+
+        @param user_id: The user id.
+        @return: The set of public keys available for this user id.
+        """
+
+    @abstractmethod
+    def list_secret_keys(self, user_id: str) -> Set[GPGSecretKey]:
+        """List secret keys.
+
+        @param user_id: The user id.
+        @return: The set of secret keys available for this user id.
+        """
+
+    @abstractmethod
+    def can_sign(self, public_key: GPGPublicKey) -> bool:
+        """
+        @return: Whether the public key belongs to a key pair capable of signing.
+        """
+
+    @abstractmethod
+    def can_encrypt(self, public_key: GPGPublicKey) -> bool:
+        """
+        @return: Whether the public key belongs to a key pair capable of encryption.
+        """
+
+    @abstractmethod
+    def create_key(self, user_id: str) -> GPGSecretKey:
+        """Create a new GPG key, capable of signing and encryption.
+
+        The key is generated without password protection and without expiration. If a key
+        with the same user id already exists, a new key is created anyway.
+
+        @param user_id: The user id to assign to the new key.
+        @return: The new key.
+        """
+
+
+class GPGME_GPGPublicKey(GPGPublicKey):
+    """
+    GPG public key implementation based on GnuPG Made Easy (GPGME).
+    """
+
+    def __init__(self, key_obj: Any) -> None:
+        """
+        @param key_obj: The GPGME key object.
+        """
+
+        self.__key_obj = key_obj
+
+    @property
+    def fingerprint(self) -> str:
+        return self.__key_obj.fpr
+
+    @property
+    def key_obj(self) -> Any:
+        return self.__key_obj
+
+
+class GPGME_GPGSecretKey(GPGSecretKey):
+    """
+    GPG secret key implementation based on GnuPG Made Easy (GPGME).
+    """
+
+    def __init__(self, public_key: GPGME_GPGPublicKey) -> None:
+        """
+        @param public_key: The public key corresponding to this secret key.
+        """
+
+        self.__public_key = public_key
+
+    @property
+    def public_key(self) -> GPGME_GPGPublicKey:
+        return self.__public_key
+
+
+class GPGME_GPGProvider(GPGProvider):
+    """
+    GPG provider implementation based on GnuPG Made Easy (GPGME).
+    """
+
+    def __init__(self, home_dir: Optional[str] = None) -> None:
+        """
+        @param home_dir: Optional GPG home directory path to use for all operations.
+        """
+
+        self.__home_dir = home_dir
+
+    def export_public_key(self, public_key: GPGPublicKey) -> bytes:
+        assert isinstance(public_key, GPGME_GPGPublicKey)
+
+        pattern = public_key.fingerprint
+
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                result = c.key_export_minimal(pattern)
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+
+            if result is None:
+                raise UnknownKey(f"Public key {pattern} not found.")
+
+            return result
+
+    def import_public_key(self, packet: bytes) -> GPGPublicKey:
+        # TODO
+        # - Reject packets older than version 4
+        # - Check whether it's actually a public key (through packet inspection?)
+
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                result = c.key_import(packet)
+            except gpg.errors.GPGMEError as e:
+                # From looking at the code, `key_import` never raises. The documentation
+                # says it does though, so this is included for future-proofness.
+                raise GPGProviderError("Internal GPGME error") from e
+
+            if not hasattr(result, "considered"):
+                raise InvalidPacket(
+                    f"Data not considered for public key import: {result}"
+                )
+
+            if len(result.imports) != 1:
+                raise InvalidPacket(
+                    "Public key packet does not contain exactly one public key (not"
+                    " counting subkeys)."
+                )
+
+            try:
+                key_obj = c.get_key(result.imports[0].fpr, secret=False)
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+            except gpg.errors.KeyError as e:
+                raise GPGProviderError("Newly imported public key not found") from e
+
+            return GPGME_GPGPublicKey(key_obj)
+
+    def backup_secret_key(self, secret_key: GPGSecretKey) -> bytes:
+        assert isinstance(secret_key, GPGME_GPGSecretKey)
+        # TODO
+        # - Handle password protection/pinentry
+        # - Make sure the key is exported unencrypted
+
+        pattern = secret_key.public_key.fingerprint
+
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                result = c.key_export_secret(pattern)
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+
+            if result is None:
+                raise UnknownKey(f"Secret key {pattern} not found.")
+
+            return result
+
+    def restore_secret_keys(self, data: bytes) -> Set[GPGSecretKey]:
+        # TODO
+        # - Reject packets older than version 4
+        # - Check whether it's actually secret keys (through packet inspection?)
+
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                result = c.key_import(data)
+            except gpg.errors.GPGMEError as e:
+                # From looking at the code, `key_import` never raises. The documentation
+                # says it does though, so this is included for future-proofness.
+                raise GPGProviderError("Internal GPGME error") from e
+
+            if not hasattr(result, "considered"):
+                raise InvalidPacket(
+                    f"Data not considered for secret key import: {result}"
+                )
+
+            if len(result.imports) == 0:
+                raise InvalidPacket("Secret key packet does not contain a secret key.")
+
+            secret_keys = set()
+            for import_status in result.imports:
+                try:
+                    key_obj = c.get_key(import_status.fpr, secret=True)
+                except gpg.errors.GPGMEError as e:
+                    raise GPGProviderError("Internal GPGME error") from e
+                except gpg.errors.KeyError as e:
+                    raise GPGProviderError("Newly imported secret key not found") from e
+
+                secret_keys.add(GPGME_GPGSecretKey(GPGME_GPGPublicKey(key_obj)))
+
+            return secret_keys
+
+    def encrypt_symmetrically(self, plaintext: bytes, password: str) -> bytes:
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                ciphertext, __, __ = c.encrypt(plaintext, passphrase=password, sign=False)
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+
+            return ciphertext
+
+    def decrypt_symmetrically(self, ciphertext: bytes, password: str) -> bytes:
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                plaintext, __, __ = c.decrypt(
+                    ciphertext,
+                    passphrase=password,
+                    verify=False
+                )
+            except gpg.errors.GPGMEError as e:
+                # TODO: Find out what kind of error is raised if the password is wrong and
+                # re-raise it as DecryptionFailed instead.
+                raise GPGProviderError("Internal GPGME error") from e
+            except gpg.UnsupportedAlgorithm as e:
+                raise DecryptionFailed("Unsupported algorithm") from e
+
+            return plaintext
+
+    def sign(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes:
+        signers = []
+        for secret_key in secret_keys:
+            assert isinstance(secret_key, GPGME_GPGSecretKey)
+
+            signers.append(secret_key.public_key.key_obj)
+
+        with gpg.Context(home_dir=self.__home_dir, signers=signers) as c:
+            try:
+                signed_data, __ = c.sign(data)
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+            except gpg.errors.InvalidSigners as e:
+                raise GPGProviderError(
+                    "At least one of the secret keys is invalid for signing"
+                ) from e
+
+            return signed_data
+
+    def sign_detached(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes:
+        signers = []
+        for secret_key in secret_keys:
+            assert isinstance(secret_key, GPGME_GPGSecretKey)
+
+            signers.append(secret_key.public_key.key_obj)
+
+        with gpg.Context(home_dir=self.__home_dir, signers=signers) as c:
+            try:
+                signature, __ = c.sign(data, mode=gpg.constants.sig.mode.DETACH)
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+            except gpg.errors.InvalidSigners as e:
+                raise GPGProviderError(
+                    "At least one of the secret keys is invalid for signing"
+                ) from e
+
+            return signature
+
+    def verify(self, signed_data: bytes, public_keys: Set[GPGPublicKey]) -> bytes:
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                data, result = c.verify(signed_data)
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+            except gpg.errors.BadSignatures as e:
+                raise VerificationFailed("Bad signatures on signed data") from e
+
+            valid_signature_found = False
+            for public_key in public_keys:
+                assert isinstance(public_key, GPGME_GPGPublicKey)
+
+                for subkey in public_key.key_obj.subkeys:
+                    for sig in result.signatures:
+                        if subkey.can_sign and subkey.fpr == sig.fpr:
+                            valid_signature_found = True
+
+            if not valid_signature_found:
+                raise VerificationFailed(
+                    "Data not signed by one of the expected public keys"
+                )
+
+            return data
+
+    def verify_detached(
+        self,
+        data: bytes,
+        signature: bytes,
+        public_keys: Set[GPGPublicKey]
+    ) -> None:
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                __, result = c.verify(data, signature=signature)
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+            except gpg.errors.BadSignatures as e:
+                raise VerificationFailed("Bad signatures on signed data") from e
+
+            valid_signature_found = False
+            for public_key in public_keys:
+                assert isinstance(public_key, GPGME_GPGPublicKey)
+
+                for subkey in public_key.key_obj.subkeys:
+                    for sig in result.signatures:
+                        if subkey.can_sign and subkey.fpr == sig.fpr:
+                            valid_signature_found = True
+
+            if not valid_signature_found:
+                raise VerificationFailed(
+                    "Data not signed by one of the expected public keys"
+                )
+
+    def encrypt(
+        self,
+        plaintext: bytes,
+        public_keys: Set[GPGPublicKey],
+        signing_keys: Optional[Set[GPGSecretKey]] = None
+    ) -> bytes:
+        recipients = []
+        for public_key in public_keys:
+            assert isinstance(public_key, GPGME_GPGPublicKey)
+
+            recipients.append(public_key.key_obj)
+
+        signers = []
+        if signing_keys is not None:
+            for secret_key in signing_keys:
+                assert isinstance(secret_key, GPGME_GPGSecretKey)
+
+                signers.append(secret_key.public_key.key_obj)
+
+        sign = signing_keys is not None
+
+        with gpg.Context(home_dir=self.__home_dir, signers=signers) as c:
+            try:
+                ciphertext, __, __ = c.encrypt(
+                    plaintext,
+                    recipients=recipients,
+                    sign=sign,
+                    always_trust=True,
+                    add_encrypt_to=True
+                )
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+            except gpg.errors.InvalidRecipients as e:
+                raise GPGProviderError(
+                    "At least one of the public keys is invalid for encryption"
+                ) from e
+            except gpg.errors.InvalidSigners as e:
+                raise GPGProviderError(
+                    "At least one of the signing keys is invalid for signing"
+                ) from e
+
+            return ciphertext
+
+    def decrypt(
+        self,
+        ciphertext: bytes,
+        secret_keys: Set[GPGSecretKey],
+        public_keys: Optional[Set[GPGPublicKey]] = None
+    ) -> bytes:
+        verify = public_keys is not None
+
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                plaintext, result, verify_result = c.decrypt(
+                    ciphertext,
+                    verify=verify
+                )
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+            except gpg.UnsupportedAlgorithm as e:
+                raise DecryptionFailed("Unsupported algorithm") from e
+
+            # TODO: Check whether the data was decrypted using one of the expected secret
+            # keys
+
+            if public_keys is not None:
+                valid_signature_found = False
+                for public_key in public_keys:
+                    assert isinstance(public_key, GPGME_GPGPublicKey)
+
+                    for subkey in public_key.key_obj.subkeys:
+                        for sig in verify_result.signatures:
+                            if subkey.can_sign and subkey.fpr == sig.fpr:
+                                valid_signature_found = True
+
+                if not valid_signature_found:
+                    raise VerificationFailed(
+                        "Data not signed by one of the expected public keys"
+                    )
+
+            return plaintext
+
+    def list_public_keys(self, user_id: str) -> Set[GPGPublicKey]:
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                return {
+                    GPGME_GPGPublicKey(key)
+                    for key
+                    in c.keylist(pattern=user_id, secret=False)
+                }
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+
+    def list_secret_keys(self, user_id: str) -> Set[GPGSecretKey]:
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                return {
+                    GPGME_GPGSecretKey(GPGME_GPGPublicKey(key))
+                    for key
+                    in c.keylist(pattern=user_id, secret=True)
+                }
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+
+    def can_sign(self, public_key: GPGPublicKey) -> bool:
+        assert isinstance(public_key, GPGME_GPGPublicKey)
+
+        return any(subkey.can_sign for subkey in public_key.key_obj.subkeys)
+
+    def can_encrypt(self, public_key: GPGPublicKey) -> bool:
+        assert isinstance(public_key, GPGME_GPGPublicKey)
+
+        return any(subkey.can_encrypt for subkey in public_key.key_obj.subkeys)
+
+    def create_key(self, user_id: str) -> GPGSecretKey:
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                result = c.create_key(
+                    user_id,
+                    expires=False,
+                    sign=True,
+                    encrypt=True,
+                    certify=False,
+                    authenticate=False,
+                    force=True
+                )
+
+                key_obj = c.get_key(result.fpr, secret=True)
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+            except gpg.errors.KeyError as e:
+                raise GPGProviderError("Newly created key not found") from e
+
+            return GPGME_GPGSecretKey(GPGME_GPGPublicKey(key_obj))
+
+
+class PublicKeyMetadata(NamedTuple):
+    """
+    Metadata about a published public key.
+    """
+
+    fingerprint: str
+    timestamp: datetime
+
+
+@enum.unique
+class TrustLevel(enum.Enum):
+    """
+    The trust levels required for BTBV and manual trust.
+    """
+
+    TRUSTED: str = "TRUSTED"
+    BLINDLY_TRUSTED: str = "BLINDLY_TRUSTED"
+    UNDECIDED: str = "UNDECIDED"
+    DISTRUSTED: str = "DISTRUSTED"
+
+
+OPENPGP_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?>
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
+    targetNamespace="urn:xmpp:openpgp:0"
+    xmlns="urn:xmpp:openpgp:0">
+
+    <xs:element name="openpgp" type="xs:base64Binary"/>
+</xs:schema>
+""")
+
+
+# The following schema needs verion 1.1 of XML Schema, which is not supported by lxml.
+# Luckily, xmlschema exists, which is a clean, well maintained, cross-platform
+# implementation of XML Schema, including version 1.1.
+CONTENT_SCHEMA = xmlschema.XMLSchema11("""<?xml version="1.1" encoding="utf8"?>
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
+    targetNamespace="urn:xmpp:openpgp:0"
+    xmlns="urn:xmpp:openpgp:0">
+
+    <xs:element name="signcrypt">
+        <xs:complexType>
+            <xs:all>
+                <xs:element ref="to" maxOccurs="unbounded"/>
+                <xs:element ref="time"/>
+                <xs:element ref="rpad" minOccurs="0"/>
+                <xs:element ref="payload"/>
+            </xs:all>
+        </xs:complexType>
+    </xs:element>
+
+    <xs:element name="sign">
+        <xs:complexType>
+            <xs:all>
+                <xs:element ref="to" maxOccurs="unbounded"/>
+                <xs:element ref="time"/>
+                <xs:element ref="rpad" minOccurs="0"/>
+                <xs:element ref="payload"/>
+            </xs:all>
+        </xs:complexType>
+    </xs:element>
+
+    <xs:element name="crypt">
+        <xs:complexType>
+            <xs:all>
+                <xs:element ref="to" minOccurs="0" maxOccurs="unbounded"/>
+                <xs:element ref="time"/>
+                <xs:element ref="rpad" minOccurs="0"/>
+                <xs:element ref="payload"/>
+            </xs:all>
+        </xs:complexType>
+    </xs:element>
+
+    <xs:element name="to">
+        <xs:complexType>
+            <xs:attribute name="jid" type="xs:string"/>
+        </xs:complexType>
+    </xs:element>
+
+    <xs:element name="time">
+        <xs:complexType>
+            <xs:attribute name="stamp" type="xs:dateTime"/>
+        </xs:complexType>
+    </xs:element>
+
+    <xs:element name="rpad" type="xs:string"/>
+
+    <xs:element name="payload">
+        <xs:complexType>
+            <xs:sequence>
+                <xs:any minOccurs="0" maxOccurs="unbounded" processContents="skip"/>
+            </xs:sequence>
+        </xs:complexType>
+    </xs:element>
+</xs:schema>
+""")
+
+
+PUBLIC_KEYS_LIST_NODE = "urn:xmpp:openpgp:0:public-keys"
+PUBLIC_KEYS_LIST_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?>
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
+    targetNamespace="urn:xmpp:openpgp:0"
+    xmlns="urn:xmpp:openpgp:0">
+
+    <xs:element name="public-keys-list">
+        <xs:complexType>
+            <xs:sequence>
+                <xs:element ref="pubkey-metadata" minOccurs="0" maxOccurs="unbounded"/>
+            </xs:sequence>
+        </xs:complexType>
+    </xs:element>
+
+    <xs:element name="pubkey-metadata">
+        <xs:complexType>
+            <xs:attribute name="v4-fingerprint" type="xs:string"/>
+            <xs:attribute name="date" type="xs:dateTime"/>
+        </xs:complexType>
+    </xs:element>
+</xs:schema>
+""")
+
+
+PUBKEY_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?>
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
+    targetNamespace="urn:xmpp:openpgp:0"
+    xmlns="urn:xmpp:openpgp:0">
+
+    <xs:element name="pubkey">
+        <xs:complexType>
+            <xs:all>
+                <xs:element ref="data"/>
+            </xs:all>
+            <xs:anyAttribute processContents="skip"/>
+        </xs:complexType>
+    </xs:element>
+
+    <xs:element name="data" type="xs:base64Binary"/>
+</xs:schema>
+""")
+
+
+SECRETKEY_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?>
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
+    targetNamespace="urn:xmpp:openpgp:0"
+    xmlns="urn:xmpp:openpgp:0">
+
+    <xs:element name="secretkey" type="xs:base64Binary"/>
+</xs:schema>
+""")
+
+
+DEFAULT_TRUST_MODEL_PARAM = f"""
+<params>
+<individual>
+<category name="{PARAM_CATEGORY}" label={quoteattr(D_('Security'))}>
+    <param name="{PARAM_NAME}"
+        label={quoteattr(D_('OMEMO default trust policy'))}
+        type="list" security="3">
+        <option value="manual" label={quoteattr(D_('Manual trust (more secure)'))} />
+        <option value="btbv"
+            label={quoteattr(D_('Blind Trust Before Verification (more user friendly)'))}
+            selected="true" />
+    </param>
+</category>
+</individual>
+</params>
+"""
+
+
+def get_gpg_provider(sat: SAT, client: SatXMPPClient) -> GPGProvider:
+    """Get the GPG provider for a client.
+
+    @param sat: The SAT instance.
+    @param client: The client.
+    @return: The GPG provider specifically for that client.
+    """
+
+    return GPGME_GPGProvider(str(sat.get_local_path(client, "gnupg-home")))
+
+
+def generate_passphrase() -> str:
+    """Generate a secure passphrase for symmetric encryption.
+
+    @return: The passphrase.
+    """
+
+    return "-".join("".join(
+        secrets.choice("123456789ABCDEFGHIJKLMNPQRSTUVWXYZ") for __ in range(4)
+    ) for __ in range(6))
+
+
+# TODO: Handle the user id mess
+class XEP_0373:
+    """
+    Implementation of XEP-0373: OpenPGP for XMPP under namespace ``urn:xmpp:openpgp:0``.
+    """
+
+    def __init__(self, host: SAT) -> None:
+        """
+        @param sat: The SAT instance.
+        """
+
+        self.host = host
+
+        # Add configuration option to choose between manual trust and BTBV as the trust
+        # model
+        host.memory.update_params(DEFAULT_TRUST_MODEL_PARAM)
+
+        self.__xep_0045 = cast(Optional[XEP_0045], host.plugins.get("XEP-0045"))
+        self.__xep_0060 = cast(XEP_0060, host.plugins["XEP-0060"])
+
+        self.__storage: Dict[str, persistent.LazyPersistentBinaryDict] = {}
+
+        xep_0163 = cast(XEP_0163, host.plugins["XEP-0163"])
+        xep_0163.add_pep_event(
+            "OX_PUBLIC_KEYS_LIST",
+            PUBLIC_KEYS_LIST_NODE,
+            lambda items_event, profile: defer.ensureDeferred(
+                self.__on_public_keys_list_update(items_event, profile)
+            )
+        )
+
+    async def profile_connecting(self, client):
+        client.gpg_provider = get_gpg_provider(self.host, client)
+
+    async def profile_connected(  # pylint: disable=invalid-name
+        self,
+        client: SatXMPPClient
+    ) -> None:
+        """
+        @param client: The client.
+        """
+
+        profile = cast(str, client.profile)
+
+        if not profile in self.__storage:
+            self.__storage[profile] = \
+                persistent.LazyPersistentBinaryDict("XEP-0373", client.profile)
+
+        if len(self.list_secret_keys(client)) == 0:
+            log.debug(f"Generating first GPG key for {client.jid.userhost()}.")
+            await self.create_key(client)
+
+    async def __on_public_keys_list_update(
+        self,
+        items_event: pubsub.ItemsEvent,
+        profile: str
+    ) -> None:
+        """Handle public keys list updates fired by PEP.
+
+        @param items_event: The event.
+        @param profile: The profile this event belongs to.
+        """
+
+        client = self.host.get_client(profile)
+
+        sender = cast(jid.JID, items_event.sender)
+        items = cast(List[domish.Element], items_event.items)
+
+        if len(items) > 1:
+            log.warning("Ignoring public keys list update with more than one element.")
+            return
+
+        item_elt = next(iter(items), None)
+        if item_elt is None:
+            log.debug("Ignoring empty public keys list update.")
+            return
+
+        public_keys_list_elt = cast(
+            Optional[domish.Element],
+            next(item_elt.elements(NS_OX, "public-keys-list"), None)
+        )
+
+        pubkey_metadata_elts: Optional[List[domish.Element]] = None
+
+        if public_keys_list_elt is not None:
+            try:
+                PUBLIC_KEYS_LIST_SCHEMA.validate(public_keys_list_elt.toXml())
+            except xmlschema.XMLSchemaValidationError:
+                pass
+            else:
+                pubkey_metadata_elts = \
+                    list(public_keys_list_elt.elements(NS_OX, "pubkey-metadata"))
+
+        if pubkey_metadata_elts is None:
+            log.warning(f"Malformed public keys list update item: {item_elt.toXml()}")
+            return
+
+        new_public_keys_metadata = { PublicKeyMetadata(
+            fingerprint=cast(str, pubkey_metadata_elt["v4-fingerprint"]),
+            timestamp=parse_datetime(cast(str, pubkey_metadata_elt["date"]))
+        ) for pubkey_metadata_elt in pubkey_metadata_elts }
+
+        storage_key = STR_KEY_PUBLIC_KEYS_METADATA.format(sender.userhost())
+
+        local_public_keys_metadata = cast(
+            Set[PublicKeyMetadata],
+            await self.__storage[profile].get(storage_key, set())
+        )
+
+        unchanged_keys = new_public_keys_metadata & local_public_keys_metadata
+        changed_or_new_keys = new_public_keys_metadata - unchanged_keys
+        available_keys = self.list_public_keys(client, sender)
+
+        for key_metadata in changed_or_new_keys:
+            # Check whether the changed or new key has been imported before
+            if any(key.fingerprint == key_metadata.fingerprint for key in available_keys):
+                try:
+                    # If it has been imported before, try to update it
+                    await self.import_public_key(client, sender, key_metadata.fingerprint)
+                except Exception as e:
+                    log.warning(f"Public key import failed: {e}")
+
+                    # If the update fails, remove the key from the local metadata list
+                    # such that the update is attempted again next time
+                    new_public_keys_metadata.remove(key_metadata)
+
+        # Check whether this update was for our account and make sure all of our keys are
+        # included in the update
+        if sender.userhost() == client.jid.userhost():
+            secret_keys = self.list_secret_keys(client)
+            missing_keys = set(filter(lambda secret_key: all(
+                key_metadata.fingerprint != secret_key.public_key.fingerprint
+                for key_metadata
+                in new_public_keys_metadata
+            ), secret_keys))
+
+            if len(missing_keys) > 0:
+                log.warning(
+                    "Public keys list update did not contain at least one of our keys."
+                    f" {new_public_keys_metadata}"
+                )
+
+                for missing_key in missing_keys:
+                    log.warning(missing_key.public_key.fingerprint)
+                    new_public_keys_metadata.add(PublicKeyMetadata(
+                        fingerprint=missing_key.public_key.fingerprint,
+                        timestamp=datetime.now(timezone.utc)
+                    ))
+
+                await self.publish_public_keys_list(client, new_public_keys_metadata)
+
+        await self.__storage[profile].force(storage_key, new_public_keys_metadata)
+
+    def list_public_keys(self, client: SatXMPPClient, jid: jid.JID) -> Set[GPGPublicKey]:
+        """List GPG public keys available for a JID.
+
+        @param client: The client to perform this operation with.
+        @param jid: The JID. Can be a bare JID.
+        @return: The set of public keys available for this JID.
+        """
+
+        gpg_provider = get_gpg_provider(self.host, client)
+
+        return gpg_provider.list_public_keys(f"xmpp:{jid.userhost()}")
+
+    def list_secret_keys(self, client: SatXMPPClient) -> Set[GPGSecretKey]:
+        """List GPG secret keys available for a JID.
+
+        @param client: The client to perform this operation with.
+        @return: The set of secret keys available for this JID.
+        """
+
+        gpg_provider = get_gpg_provider(self.host, client)
+
+        return gpg_provider.list_secret_keys(f"xmpp:{client.jid.userhost()}")
+
+    async def create_key(self, client: SatXMPPClient) -> GPGSecretKey:
+        """Create a new GPG key, capable of signing and encryption.
+
+        The key is generated without password protection and without expiration.
+
+        @param client: The client to perform this operation with.
+        @return: The new key.
+        """
+
+        gpg_provider = get_gpg_provider(self.host, client)
+
+        secret_key = gpg_provider.create_key(f"xmpp:{client.jid.userhost()}")
+
+        await self.publish_public_key(client, secret_key.public_key)
+
+        storage_key = STR_KEY_PUBLIC_KEYS_METADATA.format(client.jid.userhost())
+
+        public_keys_list = cast(
+            Set[PublicKeyMetadata],
+            await self.__storage[client.profile].get(storage_key, set())
+        )
+
+        public_keys_list.add(PublicKeyMetadata(
+            fingerprint=secret_key.public_key.fingerprint,
+            timestamp=datetime.now(timezone.utc)
+        ))
+
+        await self.publish_public_keys_list(client, public_keys_list)
+
+        await self.__storage[client.profile].force(storage_key, public_keys_list)
+
+        return secret_key
+
+    @staticmethod
+    def __build_content_element(
+        element_name: Literal["signcrypt", "sign", "crypt"],
+        recipient_jids: Iterable[jid.JID],
+        include_rpad: bool
+    ) -> Tuple[domish.Element, domish.Element]:
+        """Build a content element.
+
+        @param element_name: The name of the content element.
+        @param recipient_jids: The intended recipients of this content element. Can be
+            bare JIDs.
+        @param include_rpad: Whether to include random-length random-content padding.
+        @return: The content element and the ``<payload/>`` element to add the stanza
+            extension elements to.
+        """
+
+        content_elt = domish.Element((NS_OX, element_name))
+
+        for recipient_jid in recipient_jids:
+            content_elt.addElement("to")["jid"] = recipient_jid.userhost()
+
+        content_elt.addElement("time")["stamp"] = format_datetime()
+
+        if include_rpad:
+            # XEP-0373 doesn't specify bounds for the length of the random padding. This
+            # uses the bounds specified in XEP-0420 for the closely related rpad affix.
+            rpad_length = secrets.randbelow(201)
+            rpad_content = "".join(
+                secrets.choice(string.digits + string.ascii_letters + string.punctuation)
+                for __
+                in range(rpad_length)
+            )
+            content_elt.addElement("rpad", content=rpad_content)
+
+        payload_elt = content_elt.addElement("payload")
+
+        return content_elt, payload_elt
+
+    @staticmethod
+    def build_signcrypt_element(
+        recipient_jids: Iterable[jid.JID]
+    ) -> Tuple[domish.Element, domish.Element]:
+        """Build a ``<signcrypt/>`` content element.
+
+        @param recipient_jids: The intended recipients of this content element. Can be
+            bare JIDs.
+        @return: The ``<signcrypt/>`` element and the ``<payload/>`` element to add the
+            stanza extension elements to.
+        """
+
+        if len(recipient_jids) == 0:
+            raise ValueError("Recipient JIDs must be provided.")
+
+        return XEP_0373.__build_content_element("signcrypt", recipient_jids, True)
+
+    @staticmethod
+    def build_sign_element(
+        recipient_jids: Iterable[jid.JID],
+        include_rpad: bool
+    ) -> Tuple[domish.Element, domish.Element]:
+        """Build a ``<sign/>`` content element.
+
+        @param recipient_jids: The intended recipients of this content element. Can be
+            bare JIDs.
+        @param include_rpad: Whether to include random-length random-content padding,
+            which is OPTIONAL for the ``<sign/>`` content element.
+        @return: The ``<sign/>`` element and the ``<payload/>`` element to add the stanza
+            extension elements to.
+        """
+
+        if len(recipient_jids) == 0:
+            raise ValueError("Recipient JIDs must be provided.")
+
+        return XEP_0373.__build_content_element("sign", recipient_jids, include_rpad)
+
+    @staticmethod
+    def build_crypt_element(
+        recipient_jids: Iterable[jid.JID]
+    ) -> Tuple[domish.Element, domish.Element]:
+        """Build a ``<crypt/>`` content element.
+
+        @param recipient_jids: The intended recipients of this content element. Specifying
+            the intended recipients is OPTIONAL for the ``<crypt/>`` content element. Can
+            be bare JIDs.
+        @return: The ``<crypt/>`` element and the ``<payload/>`` element to add the stanza
+            extension elements to.
+        """
+
+        return XEP_0373.__build_content_element("crypt", recipient_jids, True)
+
+    async def build_openpgp_element(
+        self,
+        client: SatXMPPClient,
+        content_elt: domish.Element,
+        recipient_jids: Set[jid.JID]
+    ) -> domish.Element:
+        """Build an ``<openpgp/>`` element.
+
+        @param client: The client to perform this operation with.
+        @param content_elt: The content element to contain in the ``<openpgp/>`` element.
+        @param recipient_jids: The recipient's JIDs. Can be bare JIDs.
+        @return: The ``<openpgp/>`` element.
+        """
+
+        gpg_provider = get_gpg_provider(self.host, client)
+
+        # TODO: I'm not sure whether we want to sign with all keys by default or choose
+        # just one key/a subset of keys to sign with.
+        signing_keys = set(filter(
+            lambda secret_key: gpg_provider.can_sign(secret_key.public_key),
+            self.list_secret_keys(client)
+        ))
+
+        encryption_keys: Set[GPGPublicKey] = set()
+
+        for recipient_jid in recipient_jids:
+            # import all keys of the recipient
+            all_public_keys = await self.import_all_public_keys(client, recipient_jid)
+
+            # Filter for keys that can encrypt
+            encryption_keys |= set(filter(gpg_provider.can_encrypt, all_public_keys))
+
+        # TODO: Handle trust
+
+        content = content_elt.toXml().encode("utf-8")
+        data: bytes
+
+        if content_elt.name == "signcrypt":
+            data = gpg_provider.encrypt(content, encryption_keys, signing_keys)
+        elif content_elt.name == "sign":
+            data = gpg_provider.sign(content, signing_keys)
+        elif content_elt.name == "crypt":
+            data = gpg_provider.encrypt(content, encryption_keys)
+        else:
+            raise ValueError(f"Unknown content element <{content_elt.name}/>")
+
+        openpgp_elt = domish.Element((NS_OX, "openpgp"))
+        openpgp_elt.addContent(base64.b64encode(data).decode("ASCII"))
+        return openpgp_elt
+
+    async def unpack_openpgp_element(
+        self,
+        client: SatXMPPClient,
+        openpgp_elt: domish.Element,
+        element_name: Literal["signcrypt", "sign", "crypt"],
+        sender_jid: jid.JID
+    ) -> Tuple[domish.Element, datetime]:
+        """Verify, decrypt and unpack an ``<openpgp/>`` element.
+
+        @param client: The client to perform this operation with.
+        @param openpgp_elt: The ``<openpgp/>`` element.
+        @param element_name: The name of the content element.
+        @param sender_jid: The sender's JID. Can be a bare JID.
+        @return: The ``<payload/>`` element containing the decrypted/verified stanza
+            extension elements carried by this ``<openpgp/>`` element, and the timestamp
+            contained in the content element.
+        @raise exceptions.ParsingError: on syntactical verification errors.
+        @raise VerificationError: on semantical verification errors accoding to XEP-0373.
+        @raise DecryptionFailed: on decryption failure.
+        @raise VerificationFailed: if the data could not be verified.
+
+        @warning: The timestamp is not verified for plausibility; this SHOULD be done by
+            the calling code.
+        """
+
+        gpg_provider = get_gpg_provider(self.host, client)
+
+        decryption_keys = set(filter(
+            lambda secret_key: gpg_provider.can_encrypt(secret_key.public_key),
+            self.list_secret_keys(client)
+        ))
+
+        # import all keys of the sender
+        all_public_keys = await self.import_all_public_keys(client, sender_jid)
+
+        # Filter for keys that can sign
+        verification_keys = set(filter(gpg_provider.can_sign, all_public_keys))
+
+        # TODO: Handle trust
+
+        try:
+            OPENPGP_SCHEMA.validate(openpgp_elt.toXml())
+        except xmlschema.XMLSchemaValidationError as e:
+            raise exceptions.ParsingError(
+                "<openpgp/> element doesn't pass schema validation."
+            ) from e
+
+        openpgp_message = base64.b64decode(str(openpgp_elt))
+        content: bytes
+
+        if element_name == "signcrypt":
+            content = gpg_provider.decrypt(
+                openpgp_message,
+                decryption_keys,
+                public_keys=verification_keys
+            )
+        elif element_name == "sign":
+            content = gpg_provider.verify(openpgp_message, verification_keys)
+        elif element_name == "crypt":
+            content = gpg_provider.decrypt(openpgp_message, decryption_keys)
+        else:
+            assert_never(element_name)
+
+        try:
+            content_elt = cast(
+                domish.Element,
+                xml_tools.ElementParser()(content.decode("utf-8"))
+            )
+        except UnicodeDecodeError as e:
+            raise exceptions.ParsingError("UTF-8 decoding error") from e
+
+        try:
+            CONTENT_SCHEMA.validate(content_elt.toXml())
+        except xmlschema.XMLSchemaValidationError as e:
+            raise exceptions.ParsingError(
+                f"<{element_name}/> element doesn't pass schema validation."
+            ) from e
+
+        if content_elt.name != element_name:
+            raise exceptions.ParsingError(f"Not a <{element_name}/> element.")
+
+        recipient_jids = \
+            { jid.JID(to_elt["jid"]) for to_elt in content_elt.elements(NS_OX, "to") }
+
+        if (
+            client.jid.userhostJID() not in { jid.userhostJID() for jid in recipient_jids }
+            and element_name != "crypt"
+        ):
+            raise VerificationError(
+                f"Recipient list in <{element_name}/> element does not list our (bare)"
+                f" JID."
+            )
+
+        time_elt = next(content_elt.elements(NS_OX, "time"))
+
+        timestamp = parse_datetime(time_elt["stamp"])
+
+        payload_elt = next(content_elt.elements(NS_OX, "payload"))
+
+        return payload_elt, timestamp
+
+    async def publish_public_key(
+        self,
+        client: SatXMPPClient,
+        public_key: GPGPublicKey
+    ) -> None:
+        """Publish a public key.
+
+        @param client: The client.
+        @param public_key: The public key to publish.
+        @raise XMPPInteractionFailed: if any interaction via XMPP failed.
+        """
+
+        gpg_provider = get_gpg_provider(self.host, client)
+
+        packet = gpg_provider.export_public_key(public_key)
+
+        node = f"urn:xmpp:openpgp:0:public-keys:{public_key.fingerprint}"
+
+        pubkey_elt = domish.Element((NS_OX, "pubkey"))
+
+        pubkey_elt.addElement("data", content=base64.b64encode(packet).decode("ASCII"))
+
+        try:
+            await self.__xep_0060.send_item(
+                client,
+                client.jid.userhostJID(),
+                node,
+                pubkey_elt,
+                format_datetime(),
+                extra={
+                    XEP_0060.EXTRA_PUBLISH_OPTIONS: {
+                        XEP_0060.OPT_PERSIST_ITEMS: "true",
+                        XEP_0060.OPT_ACCESS_MODEL: "open",
+                        XEP_0060.OPT_MAX_ITEMS: 1
+                    },
+                    # TODO: Do we really want publish_without_options here?
+                    XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options"
+                }
+            )
+        except Exception as e:
+            raise XMPPInteractionFailed("Publishing the public key failed.") from e
+
+    async def import_all_public_keys(
+        self,
+        client: SatXMPPClient,
+        entity_jid: jid.JID
+    ) -> Set[GPGPublicKey]:
+        """import all public keys of a JID that have not been imported before.
+
+        @param client: The client.
+        @param jid: The JID. Can be a bare JID.
+        @return: The public keys.
+        @note: Failure to import a key simply results in the key not being included in the
+            result.
+        """
+
+        available_public_keys = self.list_public_keys(client, entity_jid)
+
+        storage_key = STR_KEY_PUBLIC_KEYS_METADATA.format(entity_jid.userhost())
+
+        public_keys_metadata = cast(
+            Set[PublicKeyMetadata],
+            await self.__storage[client.profile].get(storage_key, set())
+        )
+        if not public_keys_metadata:
+            public_keys_metadata = await self.download_public_keys_list(
+                client, entity_jid
+            )
+            if not public_keys_metadata:
+                raise exceptions.NotFound(
+                    f"Can't find public keys for {entity_jid}"
+                )
+            else:
+                await self.__storage[client.profile].aset(
+                    storage_key, public_keys_metadata
+                )
+
+
+        missing_keys = set(filter(lambda public_key_metadata: all(
+            public_key_metadata.fingerprint != public_key.fingerprint
+            for public_key
+            in available_public_keys
+        ), public_keys_metadata))
+
+        for missing_key in missing_keys:
+            try:
+                available_public_keys.add(
+                    await self.import_public_key(client, entity_jid, missing_key.fingerprint)
+                )
+            except Exception as e:
+                log.warning(
+                    f"import of public key {missing_key.fingerprint} owned by"
+                    f" {entity_jid.userhost()} failed, ignoring: {e}"
+                )
+
+        return available_public_keys
+
+    async def import_public_key(
+        self,
+        client: SatXMPPClient,
+        jid: jid.JID,
+        fingerprint: str
+    ) -> GPGPublicKey:
+        """import a public key.
+
+        @param client: The client.
+        @param jid: The JID owning the public key. Can be a bare JID.
+        @param fingerprint: The fingerprint of the public key.
+        @return: The public key.
+        @raise exceptions.NotFound: if the public key was not found.
+        @raise exceptions.ParsingError: on XML-level parsing errors.
+        @raise InvalidPacket: if the packet is either syntactically or semantically deemed
+            invalid.
+        @raise XMPPInteractionFailed: if any interaction via XMPP failed.
+        """
+
+        gpg_provider = get_gpg_provider(self.host, client)
+
+        node = f"urn:xmpp:openpgp:0:public-keys:{fingerprint}"
+
+        try:
+            items, __ = await self.__xep_0060.get_items(
+                client,
+                jid.userhostJID(),
+                node,
+                max_items=1
+            )
+        except exceptions.NotFound as e:
+            raise exceptions.NotFound(
+                f"No public key with fingerprint {fingerprint} published by JID"
+                f" {jid.userhost()}."
+            ) from e
+        except Exception as e:
+            raise XMPPInteractionFailed("Fetching the public keys list failed.") from e
+
+        try:
+            item_elt = cast(domish.Element, items[0])
+        except IndexError as e:
+            raise exceptions.NotFound(
+                f"No public key with fingerprint {fingerprint} published by JID"
+                f" {jid.userhost()}."
+            ) from e
+
+        pubkey_elt = cast(
+            Optional[domish.Element],
+            next(item_elt.elements(NS_OX, "pubkey"), None)
+        )
+
+        if pubkey_elt is None:
+            raise exceptions.ParsingError(
+                f"Publish-Subscribe item of JID {jid.userhost()} doesn't contain pubkey"
+                f" element."
+            )
+
+        try:
+            PUBKEY_SCHEMA.validate(pubkey_elt.toXml())
+        except xmlschema.XMLSchemaValidationError as e:
+            raise exceptions.ParsingError(
+                f"Publish-Subscribe item of JID {jid.userhost()} doesn't pass pubkey"
+                f" schema validation."
+            ) from e
+
+        public_key = gpg_provider.import_public_key(base64.b64decode(str(
+            next(pubkey_elt.elements(NS_OX, "data"))
+        )))
+
+        return public_key
+
+    async def publish_public_keys_list(
+        self,
+        client: SatXMPPClient,
+        public_keys_list: Iterable[PublicKeyMetadata]
+    ) -> None:
+        """Publish/update the own public keys list.
+
+        @param client: The client.
+        @param public_keys_list: The public keys list.
+        @raise XMPPInteractionFailed: if any interaction via XMPP failed.
+
+        @warning: All public keys referenced in the public keys list MUST be published
+            beforehand.
+        """
+
+        if len({ pkm.fingerprint for pkm in public_keys_list }) != len(public_keys_list):
+            raise ValueError("Public keys list contains duplicate fingerprints.")
+
+        node = "urn:xmpp:openpgp:0:public-keys"
+
+        public_keys_list_elt = domish.Element((NS_OX, "public-keys-list"))
+
+        for public_key_metadata in public_keys_list:
+            pubkey_metadata_elt = public_keys_list_elt.addElement("pubkey-metadata")
+            pubkey_metadata_elt["v4-fingerprint"] = public_key_metadata.fingerprint
+            pubkey_metadata_elt["date"] = format_datetime(public_key_metadata.timestamp)
+
+        try:
+            await self.__xep_0060.send_item(
+                client,
+                client.jid.userhostJID(),
+                node,
+                public_keys_list_elt,
+                item_id=XEP_0060.ID_SINGLETON,
+                extra={
+                    XEP_0060.EXTRA_PUBLISH_OPTIONS: {
+                        XEP_0060.OPT_PERSIST_ITEMS: "true",
+                        XEP_0060.OPT_ACCESS_MODEL: "open",
+                        XEP_0060.OPT_MAX_ITEMS: 1
+                    },
+                    # TODO: Do we really want publish_without_options here?
+                    XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options"
+                }
+            )
+        except Exception as e:
+            raise XMPPInteractionFailed("Publishing the public keys list failed.") from e
+
+    async def download_public_keys_list(
+        self,
+        client: SatXMPPClient,
+        jid: jid.JID
+    ) -> Optional[Set[PublicKeyMetadata]]:
+        """Download the public keys list of a JID.
+
+        @param client: The client.
+        @param jid: The JID. Can be a bare JID.
+        @return: The public keys list or ``None`` if the JID hasn't published a public
+            keys list. An empty list means the JID has published an empty list.
+        @raise exceptions.ParsingError: on XML-level parsing errors.
+        @raise XMPPInteractionFailed: if any interaction via XMPP failed.
+        """
+
+        node = "urn:xmpp:openpgp:0:public-keys"
+
+        try:
+            items, __ = await self.__xep_0060.get_items(
+                client,
+                jid.userhostJID(),
+                node,
+                max_items=1
+            )
+        except exceptions.NotFound:
+            return None
+        except Exception as e:
+            raise XMPPInteractionFailed() from e
+
+        try:
+            item_elt = cast(domish.Element, items[0])
+        except IndexError:
+            return None
+
+        public_keys_list_elt = cast(
+            Optional[domish.Element],
+            next(item_elt.elements(NS_OX, "public-keys-list"), None)
+        )
+
+        if public_keys_list_elt is None:
+            return None
+
+        try:
+            PUBLIC_KEYS_LIST_SCHEMA.validate(public_keys_list_elt.toXml())
+        except xmlschema.XMLSchemaValidationError as e:
+            raise exceptions.ParsingError(
+                f"Publish-Subscribe item of JID {jid.userhost()} doesn't pass public keys"
+                f" list schema validation."
+            ) from e
+
+        return {
+            PublicKeyMetadata(
+                fingerprint=pubkey_metadata_elt["v4-fingerprint"],
+                timestamp=parse_datetime(pubkey_metadata_elt["date"])
+            )
+            for pubkey_metadata_elt
+            in public_keys_list_elt.elements(NS_OX, "pubkey-metadata")
+        }
+
+    async def __prepare_secret_key_synchronization(
+        self,
+        client: SatXMPPClient
+    ) -> Optional[domish.Element]:
+        """Prepare for secret key synchronization.
+
+        Makes sure the relative protocols and protocol extensions are supported by the
+        server and makes sure that the PEP node for secret synchronization exists and is
+        configured correctly. The node is created if necessary.
+
+        @param client: The client.
+        @return: As part of the preparations, the secret key synchronization PEP node is
+            fetched. The result of that fetch is returned here.
+        @raise exceptions.FeatureNotFound: if the server lacks support for the required
+            protocols or protocol extensions.
+        @raise XMPPInteractionFailed: if any interaction via XMPP failed.
+        """
+
+        try:
+            infos = cast(DiscoInfo, await self.host.memory.disco.get_infos(
+                client,
+                client.jid.userhostJID()
+            ))
+        except Exception as e:
+            raise XMPPInteractionFailed(
+                "Error performing service discovery on the own bare JID."
+            ) from e
+
+        identities = cast(Dict[Tuple[str, str], str], infos.identities)
+        features = cast(Set[DiscoFeature], infos.features)
+
+        if ("pubsub", "pep") not in identities:
+            raise exceptions.FeatureNotFound("Server doesn't support PEP.")
+
+        if "http://jabber.org/protocol/pubsub#access-whitelist" not in features:
+            raise exceptions.FeatureNotFound(
+                "Server doesn't support the whitelist access model."
+            )
+
+        persistent_items_supported = \
+            "http://jabber.org/protocol/pubsub#persistent-items" in features
+
+        # TODO: persistent-items is a SHOULD, how do we handle the feature missing?
+
+        node = "urn:xmpp:openpgp:0:secret-key"
+
+        try:
+            items, __ = await self.__xep_0060.get_items(
+                client,
+                client.jid.userhostJID(),
+                node,
+                max_items=1
+            )
+        except exceptions.NotFound:
+            try:
+                await self.__xep_0060.createNode(
+                    client,
+                    client.jid.userhostJID(),
+                    node,
+                    {
+                        XEP_0060.OPT_PERSIST_ITEMS: "true",
+                        XEP_0060.OPT_ACCESS_MODEL: "whitelist",
+                        XEP_0060.OPT_MAX_ITEMS: "1"
+                    }
+                )
+            except Exception as e:
+                raise XMPPInteractionFailed(
+                    "Error creating the secret key synchronization node."
+                ) from e
+        except Exception as e:
+            raise XMPPInteractionFailed(
+                "Error fetching the secret key synchronization node."
+            ) from e
+
+        try:
+            return cast(domish.Element, items[0])
+        except IndexError:
+            return None
+
+    async def export_secret_keys(
+        self,
+        client: SatXMPPClient,
+        secret_keys: Iterable[GPGSecretKey]
+    ) -> str:
+        """Export secret keys to synchronize them with other devices.
+
+        @param client: The client.
+        @param secret_keys: The secret keys to export.
+        @return: The backup code needed to decrypt the exported secret keys.
+        @raise exceptions.FeatureNotFound: if the server lacks support for the required
+            protocols or protocol extensions.
+        @raise XMPPInteractionFailed: if any interaction via XMPP failed.
+        """
+
+        gpg_provider = get_gpg_provider(self.host, client)
+
+        await self.__prepare_secret_key_synchronization(client)
+
+        backup_code = generate_passphrase()
+
+        plaintext = b"".join(
+            gpg_provider.backup_secret_key(secret_key) for secret_key in secret_keys
+        )
+
+        ciphertext = gpg_provider.encrypt_symmetrically(plaintext, backup_code)
+
+        node = "urn:xmpp:openpgp:0:secret-key"
+
+        secretkey_elt = domish.Element((NS_OX, "secretkey"))
+        secretkey_elt.addContent(base64.b64encode(ciphertext).decode("ASCII"))
+
+        try:
+            await self.__xep_0060.send_item(
+                client,
+                client.jid.userhostJID(),
+                node,
+                secretkey_elt
+            )
+        except Exception as e:
+            raise XMPPInteractionFailed("Publishing the secret keys failed.") from e
+
+        return backup_code
+
+    async def download_secret_keys(self, client: SatXMPPClient) -> Optional[bytes]:
+        """Download previously exported secret keys to import them in a second step.
+
+        The downloading and importing steps are separate since a backup code is required
+        for the import and it should be possible to try multiple backup codes without
+        redownloading the data every time. The second half of the import procedure is
+        provided by :meth:`import_secret_keys`.
+
+        @param client: The client.
+        @return: The encrypted secret keys previously exported, if any.
+        @raise exceptions.FeatureNotFound: if the server lacks support for the required
+            protocols or protocol extensions.
+        @raise exceptions.ParsingError: on XML-level parsing errors.
+        @raise XMPPInteractionFailed: if any interaction via XMPP failed.
+        """
+
+        item_elt = await self.__prepare_secret_key_synchronization(client)
+        if item_elt is None:
+            return None
+
+        secretkey_elt = cast(
+            Optional[domish.Element],
+            next(item_elt.elements(NS_OX, "secretkey"), None)
+        )
+
+        if secretkey_elt is None:
+            return None
+
+        try:
+            SECRETKEY_SCHEMA.validate(secretkey_elt.toXml())
+        except xmlschema.XMLSchemaValidationError as e:
+            raise exceptions.ParsingError(
+                "Publish-Subscribe item doesn't pass secretkey schema validation."
+            ) from e
+
+        return base64.b64decode(str(secretkey_elt))
+
+    def import_secret_keys(
+        self,
+        client: SatXMPPClient,
+        ciphertext: bytes,
+        backup_code: str
+    ) -> Set[GPGSecretKey]:
+        """import previously downloaded secret keys.
+
+        The downloading and importing steps are separate since a backup code is required
+        for the import and it should be possible to try multiple backup codes without
+        redownloading the data every time. The first half of the import procedure is
+        provided by :meth:`download_secret_keys`.
+
+        @param client: The client to perform this operation with.
+        @param ciphertext: The ciphertext, i.e. the data returned by
+            :meth:`download_secret_keys`.
+        @param backup_code: The backup code needed to decrypt the data.
+        @raise InvalidPacket: if one of the GPG packets building the secret key data is
+            either syntactically or semantically deemed invalid.
+        @raise DecryptionFailed: on decryption failure.
+        """
+
+        gpg_provider = get_gpg_provider(self.host, client)
+
+        return gpg_provider.restore_secret_keys(gpg_provider.decrypt_symmetrically(
+            ciphertext,
+            backup_code
+        ))
+
+    @staticmethod
+    def __get_joined_muc_users(
+        client: SatXMPPClient,
+        xep_0045: XEP_0045,
+        room_jid: jid.JID
+    ) -> Set[jid.JID]:
+        """
+        @param client: The client.
+        @param xep_0045: A MUC plugin instance.
+        @param room_jid: The room JID.
+        @return: A set containing the bare JIDs of the MUC participants.
+        @raise InternalError: if the MUC is not joined or the entity information of a
+            participant isn't available.
+        """
+        # TODO: This should probably be a global helper somewhere
+
+        bare_jids: Set[jid.JID] = set()
+
+        try:
+            room = cast(muc.Room, xep_0045.get_room(client, room_jid))
+        except exceptions.NotFound as e:
+            raise exceptions.InternalError(
+                "Participant list of unjoined MUC requested."
+            ) from e
+
+        for user in cast(Dict[str, muc.User], room.roster).values():
+            entity = cast(Optional[SatXMPPEntity], user.entity)
+            if entity is None:
+                raise exceptions.InternalError(
+                    f"Participant list of MUC requested, but the entity information of"
+                    f" the participant {user} is not available."
+                )
+
+            bare_jids.add(entity.jid.userhostJID())
+
+        return bare_jids
+
+    async def get_trust(
+        self,
+        client: SatXMPPClient,
+        public_key: GPGPublicKey,
+        owner: jid.JID
+    ) -> TrustLevel:
+        """Query the trust level of a public key.
+
+        @param client: The client to perform this operation under.
+        @param public_key: The public key.
+        @param owner: The owner of the public key. Can be a bare JID.
+        @return: The trust level.
+        """
+
+        key = f"/trust/{owner.userhost()}/{public_key.fingerprint}"
+
+        try:
+            return TrustLevel(await self.__storage[client.profile][key])
+        except KeyError:
+            return TrustLevel.UNDECIDED
+
+    async def set_trust(
+        self,
+        client: SatXMPPClient,
+        public_key: GPGPublicKey,
+        owner: jid.JID,
+        trust_level: TrustLevel
+    ) -> None:
+        """Set the trust level of a public key.
+
+        @param client: The client to perform this operation under.
+        @param public_key: The public key.
+        @param owner: The owner of the public key. Can be a bare JID.
+        @param trust_leve: The trust level.
+        """
+
+        key = f"/trust/{owner.userhost()}/{public_key.fingerprint}"
+
+        await self.__storage[client.profile].force(key, trust_level.name)
+
+    async def get_trust_ui(  # pylint: disable=invalid-name
+        self,
+        client: SatXMPPClient,
+        entity: jid.JID
+    ) -> xml_tools.XMLUI:
+        """
+        @param client: The client.
+        @param entity: The entity whose device trust levels to manage.
+        @return: An XMLUI instance which opens a form to manage the trust level of all
+            devices belonging to the entity.
+        """
+
+        if entity.resource:
+            raise ValueError("A bare JID is expected.")
+
+        bare_jids: Set[jid.JID]
+        if self.__xep_0045 is not None and self.__xep_0045.is_joined_room(client, entity):
+            bare_jids = self.__get_joined_muc_users(client, self.__xep_0045, entity)
+        else:
+            bare_jids = { entity.userhostJID() }
+
+        all_public_keys = list({
+            bare_jid: list(self.list_public_keys(client, bare_jid))
+            for bare_jid
+            in bare_jids
+        }.items())
+
+        async def callback(
+            data: Any,
+            profile: str  # pylint: disable=unused-argument
+        ) -> Dict[Never, Never]:
+            """
+            @param data: The XMLUI result produces by the trust UI form.
+            @param profile: The profile.
+            @return: An empty dictionary. The type of the return value was chosen
+                conservatively since the exact options are neither known not needed here.
+            """
+
+            if C.bool(data.get("cancelled", "false")):
+                return {}
+
+            data_form_result = cast(
+                Dict[str, str],
+                xml_tools.xmlui_result_2_data_form_result(data)
+            )
+            for key, value in data_form_result.items():
+                if not key.startswith("trust_"):
+                    continue
+
+                outer_index, inner_index = key.split("_")[1:]
+
+                owner, public_keys = all_public_keys[int(outer_index)]
+                public_key = public_keys[int(inner_index)]
+                trust = TrustLevel(value)
+
+                if (await self.get_trust(client, public_key, owner)) is not trust:
+                    await self.set_trust(client, public_key, owner, value)
+
+            return {}
+
+        submit_id = self.host.register_callback(callback, with_data=True, one_shot=True)
+
+        result = xml_tools.XMLUI(
+            panel_type=C.XMLUI_FORM,
+            title=D_("OX trust management"),
+            submit_id=submit_id
+        )
+        # Casting this to Any, otherwise all calls on the variable cause type errors
+        # pylint: disable=no-member
+        trust_ui = cast(Any, result)
+        trust_ui.addText(D_(
+            "This is OX trusting system. You'll see below the GPG keys of your "
+            "contacts, and a list selection to trust them or not. A trusted key "
+            "can read your messages in plain text, so be sure to only validate "
+            "keys that you are sure are belonging to your contact. It's better "
+            "to do this when you are next to your contact, so "
+            "you can check the \"fingerprint\" of the key "
+            "yourself. Do *not* validate a key if the fingerprint is wrong!"
+        ))
+
+        own_secret_keys = self.list_secret_keys(client)
+
+        trust_ui.change_container("label")
+        for index, secret_key in enumerate(own_secret_keys):
+            trust_ui.addLabel(D_(f"Own secret key {index} fingerprint"))
+            trust_ui.addText(secret_key.public_key.fingerprint)
+            trust_ui.addEmpty()
+            trust_ui.addEmpty()
+
+        for outer_index, [ owner, public_keys ] in enumerate(all_public_keys):
+            for inner_index, public_key in enumerate(public_keys):
+                trust_ui.addLabel(D_("Contact"))
+                trust_ui.addJid(jid.JID(owner))
+                trust_ui.addLabel(D_("Fingerprint"))
+                trust_ui.addText(public_key.fingerprint)
+                trust_ui.addLabel(D_("Trust this device?"))
+
+                current_trust_level = await self.get_trust(client, public_key, owner)
+                avaiable_trust_levels = \
+                    { TrustLevel.DISTRUSTED, TrustLevel.TRUSTED, current_trust_level }
+
+                trust_ui.addList(
+                    f"trust_{outer_index}_{inner_index}",
+                    options=[ trust_level.name for trust_level in avaiable_trust_levels ],
+                    selected=current_trust_level.name,
+                    styles=[ "inline" ]
+                )
+
+                trust_ui.addEmpty()
+                trust_ui.addEmpty()
+
+        return result