view libervia/backend/plugins/plugin_xep_0373.py @ 4259:49019947cc76

component AP Gateway: implement HTTP GET signature.
author Goffi <goffi@goffi.org>
date Wed, 05 Jun 2024 22:34:09 +0200
parents b53b6dc1f929
children 0d7bb4df2343
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin for OpenPGP for XMPP
# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from abc import ABC, abstractmethod
import base64
from datetime import datetime, timezone
import enum
import json
import secrets
import string
from typing import Any, Dict, Iterable, List, Literal, Optional, Set, Tuple, cast
from xml.sax.saxutils import quoteattr

from typing import Final, NamedTuple, Never, assert_never
from wokkel import muc, pubsub
from wokkel.disco import DiscoFeature, DiscoInfo
import xmlschema

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.i18n import _, D_
from libervia.backend.core.log import getLogger, Logger
from libervia.backend.core.main import LiberviaBackend
from libervia.backend.core.xmpp import SatXMPPClient
from libervia.backend.memory import persistent
from libervia.backend.plugins.plugin_xep_0045 import XEP_0045
from libervia.backend.plugins.plugin_xep_0060 import XEP_0060
from libervia.backend.plugins.plugin_xep_0163 import XEP_0163
from libervia.backend.tools.xmpp_datetime import format_datetime, parse_datetime
from libervia.backend.tools import xml_tools
from twisted.internet import defer
from twisted.words.protocols.jabber import jid
from twisted.words.xish import domish

try:
    import gpg
except ImportError as import_error:
    raise exceptions.MissingModule(
        "You are missing the 'gpg' package required by the OX plugin. The recommended"
        " installation method is via your operating system's package manager, since the"
        " version of the library has to match the version of your GnuPG installation. See"
        " https://wiki.python.org/moin/GnuPrivacyGuard#Accessing_GnuPG_via_gpgme"
    ) from import_error


__all__ = [  # pylint: disable=unused-variable
    "PLUGIN_INFO",
    "NS_OX",
    "XEP_0373",
    "VerificationError",
    "XMPPInteractionFailed",
    "InvalidPacket",
    "DecryptionFailed",
    "VerificationFailed",
    "UnknownKey",
    "GPGProviderError",
    "GPGPublicKey",
    "GPGSecretKey",
    "GPGProvider",
    "PublicKeyMetadata",
    "gpg_provider",
    "TrustLevel"
]


log = cast(Logger, getLogger(__name__))  # type: ignore[no-untyped-call]


PLUGIN_INFO = {
    C.PI_NAME: "XEP-0373",
    C.PI_IMPORT_NAME: "XEP-0373",
    C.PI_TYPE: "SEC",
    C.PI_PROTOCOLS: [ "XEP-0373" ],
    C.PI_DEPENDENCIES: [ "XEP-0060", "XEP-0163" ],
    C.PI_RECOMMENDATIONS: [],
    C.PI_MAIN: "XEP_0373",
    C.PI_HANDLER: "no",
    C.PI_DESCRIPTION: D_("Implementation of OpenPGP for XMPP"),
}


NS_OX: Final = "urn:xmpp:openpgp:0"


PARAM_CATEGORY = "Security"
PARAM_NAME = "ox_policy"
STR_KEY_PUBLIC_KEYS_METADATA = "/public-keys-metadata/{}"


class VerificationError(Exception):
    """
    Raised by verifying methods of :class:`XEP_0373` on semantical verification errors.
    """


class XMPPInteractionFailed(Exception):
    """
    Raised by methods of :class:`XEP_0373` on XMPP interaction failure. The reason this
    exception exists is that the exceptions raised by XMPP interactions are not properly
    documented for the most part, thus all exceptions are caught and wrapped in instances
    of this class.
    """


class InvalidPacket(ValueError):
    """
    Raised by methods of :class:`GPGProvider` when an invalid packet is encountered.
    """


class DecryptionFailed(Exception):
    """
    Raised by methods of :class:`GPGProvider` on decryption failures.
    """


class VerificationFailed(Exception):
    """
    Raised by methods of :class:`GPGProvider` on verification failures.
    """


class UnknownKey(ValueError):
    """
    Raised by methods of :class:`GPGProvider` when an unknown key is referenced.
    """


class GPGProviderError(Exception):
    """
    Raised by methods of :class:`GPGProvider` on internal errors.
    """


class GPGPublicKey(ABC):
    """
    Interface describing a GPG public key.
    """

    @property
    @abstractmethod
    def fingerprint(self) -> str:
        """
        @return: The OpenPGP v4 fingerprint string of this public key.
        """


class GPGSecretKey(ABC):
    """
    Interface descibing a GPG secret key.
    """

    @property
    @abstractmethod
    def public_key(self) -> GPGPublicKey:
        """
        @return: The public key corresponding to this secret key.
        """


class GPGProvider(ABC):
    """
    Interface describing a GPG provider, i.e. a library or framework providing GPG
    encryption, signing and key management.

    All methods may raise :class:`GPGProviderError` in addition to those exception types
    listed explicitly.

    # TODO: Check keys for revoked, disabled and expired everywhere and exclude those (?)
    """

    @abstractmethod
    def export_public_key(self, public_key: GPGPublicKey) -> bytes:
        """Export a public key in a key material packet according to RFC 4880 §5.5.

        Do not use OpenPGP's ASCII Armor.

        @param public_key: The public key to export.
        @return: The packet containing the exported public key.
        @raise UnknownKey: if the public key is not available.
        """

    @abstractmethod
    def import_public_key(self, packet: bytes) -> GPGPublicKey:
        """import a public key from a key material packet according to RFC 4880 §5.5.

        OpenPGP's ASCII Armor is not used.

        @param packet: A packet containing an exported public key.
        @return: The public key imported from the packet.
        @raise InvalidPacket: if the packet is either syntactically or semantically deemed
            invalid.

        @warning: Only packets of version 4 or higher may be accepted, packets below
            version 4 MUST be rejected.
        """

    @abstractmethod
    def backup_secret_key(self, secret_key: GPGSecretKey) -> bytes:
        """Export a secret key for transfer according to RFC 4880 §11.1.

        Do not encrypt the secret data, i.e. set the octet indicating string-to-key usage
        conventions to zero in the corresponding secret-key packet according to RFC 4880
        §5.5.3. Do not use OpenPGP's ASCII Armor.

        @param secret_key: The secret key to export.
        @return: The binary blob containing the exported secret key.
        @raise UnknownKey: if the secret key is not available.
        """

    @abstractmethod
    def restore_secret_keys(self, data: bytes) -> Set[GPGSecretKey]:
        """Restore secret keys exported for transfer according to RFC 4880 §11.1.

        The secret data is not encrypted, i.e. the octet indicating string-to-key usage
        conventions in the corresponding secret-key packets according to RFC 4880 §5.5.3
        are set to zero. OpenPGP's ASCII Armor is not used.

        @param data: Concatenation of one or more secret keys exported for transfer.
        @return: The secret keys imported from the data.
        @raise InvalidPacket: if the data or one of the packets included in the data is
            either syntactically or semantically deemed invalid.

        @warning: Only packets of version 4 or higher may be accepted, packets below
            version 4 MUST be rejected.
        """

    @abstractmethod
    def encrypt_symmetrically(self, plaintext: bytes, password: str) -> bytes:
        """Encrypt data symmetrically according to RFC 4880 §5.3.

        The password is used to build a Symmetric-Key Encrypted Session Key packet which
        precedes the Symmetrically Encrypted Data packet that holds the encrypted data.

        @param plaintext: The data to encrypt.
        @param password: The password to encrypt the data with.
        @return: The encrypted data.
        """

    @abstractmethod
    def decrypt_symmetrically(self, ciphertext: bytes, password: str) -> bytes:
        """Decrypt data symmetrically according to RFC 4880 §5.3.

        The ciphertext consists of a Symmetrically Encrypted Data packet that holds the
        encrypted data, preceded by a Symmetric-Key Encrypted Session Key packet using the
        password.

        @param ciphertext: The ciphertext.
        @param password: The password to decrypt the data with.
        @return: The plaintext.
        @raise DecryptionFailed: on decryption failure.
        """

    @abstractmethod
    def sign(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes:
        """Sign some data.

        OpenPGP's ASCII Armor is not used.

        @param data: The data to sign.
        @param secret_keys: The secret keys to sign the data with.
        @return: The OpenPGP message carrying the signed data.
        """

    @abstractmethod
    def sign_detached(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes:
        """Sign some data. Create the signature detached from the data.

        OpenPGP's ASCII Armor is not used.

        @param data: The data to sign.
        @param secret_keys: The secret keys to sign the data with.
        @return: The OpenPGP message carrying the detached signature.
        """

    @abstractmethod
    def verify(self, signed_data: bytes, public_keys: Set[GPGPublicKey]) -> bytes:
        """Verify signed data.

        OpenPGP's ASCII Armor is not used.

        @param signed_data: The signed data as an OpenPGP message.
        @param public_keys: The public keys to verify the signature with.
        @return: The verified and unpacked data.
        @raise VerificationFailed: if the data could not be verified.

        @warning: For implementors: it has to be confirmed that a valid signature by one
            of the public keys is available.
        """

    @abstractmethod
    def verify_detached(
        self,
        data: bytes,
        signature: bytes,
        public_keys: Set[GPGPublicKey]
    ) -> None:
        """Verify signed data, where the signature was created detached from the data.

        OpenPGP's ASCII Armor is not used.

        @param data: The data.
        @param signature: The signature as an OpenPGP message.
        @param public_keys: The public keys to verify the signature with.
        @raise VerificationFailed: if the data could not be verified.

        @warning: For implementors: it has to be confirmed that a valid signature by one
            of the public keys is available.
        """

    @abstractmethod
    def encrypt(
        self,
        plaintext: bytes,
        public_keys: Set[GPGPublicKey],
        signing_keys: Optional[Set[GPGSecretKey]] = None
    ) -> bytes:
        """Encrypt and optionally sign some data.

        OpenPGP's ASCII Armor is not used.

        @param plaintext: The data to encrypt and optionally sign.
        @param public_keys: The public keys to encrypt the data for.
        @param signing_keys: The secret keys to sign the data with.
        @return: The OpenPGP message carrying the encrypted and optionally signed data.
        """

    @abstractmethod
    def decrypt(
        self,
        ciphertext: bytes,
        secret_keys: Set[GPGSecretKey],
        public_keys: Optional[Set[GPGPublicKey]] = None
    ) -> bytes:
        """Decrypt and optionally verify some data.

        OpenPGP's ASCII Armor is not used.

        @param ciphertext: The encrypted and optionally signed data as an OpenPGP message.
        @param secret_keys: The secret keys to attempt decryption with.
        @param public_keys: The public keys to verify the optional signature with.
        @return: The decrypted, optionally verified and unpacked data.
        @raise DecryptionFailed: on decryption failure.
        @raise VerificationFailed: if the data could not be verified.

        @warning: For implementors: it has to be confirmed that the data was decrypted
            using one of the secret keys and that a valid signature by one of the public
            keys is available in case the data is signed.
        """

    @abstractmethod
    def list_public_keys(self, user_id: str) -> Set[GPGPublicKey]:
        """List public keys.

        @param user_id: The user id.
        @return: The set of public keys available for this user id.
        """

    @abstractmethod
    def list_secret_keys(self, user_id: str) -> Set[GPGSecretKey]:
        """List secret keys.

        @param user_id: The user id.
        @return: The set of secret keys available for this user id.
        """

    @abstractmethod
    def can_sign(self, public_key: GPGPublicKey) -> bool:
        """
        @return: Whether the public key belongs to a key pair capable of signing.
        """

    @abstractmethod
    def can_encrypt(self, public_key: GPGPublicKey) -> bool:
        """
        @return: Whether the public key belongs to a key pair capable of encryption.
        """

    @abstractmethod
    def create_key(self, user_id: str) -> GPGSecretKey:
        """Create a new GPG key, capable of signing and encryption.

        The key is generated without password protection and without expiration. If a key
        with the same user id already exists, a new key is created anyway.

        @param user_id: The user id to assign to the new key.
        @return: The new key.
        """


class GPGME_GPGPublicKey(GPGPublicKey):
    """
    GPG public key implementation based on GnuPG Made Easy (GPGME).
    """

    def __init__(self, key_obj: Any) -> None:
        """
        @param key_obj: The GPGME key object.
        """

        self.__key_obj = key_obj

    @property
    def fingerprint(self) -> str:
        return self.__key_obj.fpr

    @property
    def key_obj(self) -> Any:
        return self.__key_obj


class GPGME_GPGSecretKey(GPGSecretKey):
    """
    GPG secret key implementation based on GnuPG Made Easy (GPGME).
    """

    def __init__(self, public_key: GPGME_GPGPublicKey) -> None:
        """
        @param public_key: The public key corresponding to this secret key.
        """

        self.__public_key = public_key

    @property
    def public_key(self) -> GPGME_GPGPublicKey:
        return self.__public_key


class GPGME_GPGProvider(GPGProvider):
    """
    GPG provider implementation based on GnuPG Made Easy (GPGME).
    """

    def __init__(self, home_dir: Optional[str] = None) -> None:
        """
        @param home_dir: Optional GPG home directory path to use for all operations.
        """

        self.__home_dir = home_dir

    def export_public_key(self, public_key: GPGPublicKey) -> bytes:
        assert isinstance(public_key, GPGME_GPGPublicKey)

        pattern = public_key.fingerprint

        with gpg.Context(home_dir=self.__home_dir) as c:
            try:
                result = c.key_export_minimal(pattern)
            except gpg.errors.GPGMEError as e:
                raise GPGProviderError("Internal GPGME error") from e

            if result is None:
                raise UnknownKey(f"Public key {pattern} not found.")

            return result

    def import_public_key(self, packet: bytes) -> GPGPublicKey:
        # TODO
        # - Reject packets older than version 4
        # - Check whether it's actually a public key (through packet inspection?)

        with gpg.Context(home_dir=self.__home_dir) as c:
            try:
                result = c.key_import(packet)
            except gpg.errors.GPGMEError as e:
                # From looking at the code, `key_import` never raises. The documentation
                # says it does though, so this is included for future-proofness.
                raise GPGProviderError("Internal GPGME error") from e

            if not hasattr(result, "considered"):
                raise InvalidPacket(
                    f"Data not considered for public key import: {result}"
                )

            if len(result.imports) != 1:
                raise InvalidPacket(
                    "Public key packet does not contain exactly one public key (not"
                    " counting subkeys)."
                )

            try:
                key_obj = c.get_key(result.imports[0].fpr, secret=False)
            except gpg.errors.GPGMEError as e:
                raise GPGProviderError("Internal GPGME error") from e
            except gpg.errors.KeyError as e:
                raise GPGProviderError("Newly imported public key not found") from e

            return GPGME_GPGPublicKey(key_obj)

    def backup_secret_key(self, secret_key: GPGSecretKey) -> bytes:
        assert isinstance(secret_key, GPGME_GPGSecretKey)
        # TODO
        # - Handle password protection/pinentry
        # - Make sure the key is exported unencrypted

        pattern = secret_key.public_key.fingerprint

        with gpg.Context(home_dir=self.__home_dir) as c:
            try:
                result = c.key_export_secret(pattern)
            except gpg.errors.GPGMEError as e:
                raise GPGProviderError("Internal GPGME error") from e

            if result is None:
                raise UnknownKey(f"Secret key {pattern} not found.")

            return result

    def restore_secret_keys(self, data: bytes) -> Set[GPGSecretKey]:
        # TODO
        # - Reject packets older than version 4
        # - Check whether it's actually secret keys (through packet inspection?)

        with gpg.Context(home_dir=self.__home_dir) as c:
            try:
                result = c.key_import(data)
            except gpg.errors.GPGMEError as e:
                # From looking at the code, `key_import` never raises. The documentation
                # says it does though, so this is included for future-proofness.
                raise GPGProviderError("Internal GPGME error") from e

            if not hasattr(result, "considered"):
                raise InvalidPacket(
                    f"Data not considered for secret key import: {result}"
                )

            if len(result.imports) == 0:
                raise InvalidPacket("Secret key packet does not contain a secret key.")

            secret_keys = set()
            for import_status in result.imports:
                try:
                    key_obj = c.get_key(import_status.fpr, secret=True)
                except gpg.errors.GPGMEError as e:
                    raise GPGProviderError("Internal GPGME error") from e
                except gpg.errors.KeyError as e:
                    raise GPGProviderError("Newly imported secret key not found") from e

                secret_keys.add(GPGME_GPGSecretKey(GPGME_GPGPublicKey(key_obj)))

            return secret_keys

    def encrypt_symmetrically(self, plaintext: bytes, password: str) -> bytes:
        with gpg.Context(home_dir=self.__home_dir) as c:
            try:
                ciphertext, __, __ = c.encrypt(plaintext, passphrase=password, sign=False)
            except gpg.errors.GPGMEError as e:
                raise GPGProviderError("Internal GPGME error") from e

            return ciphertext

    def decrypt_symmetrically(self, ciphertext: bytes, password: str) -> bytes:
        with gpg.Context(home_dir=self.__home_dir) as c:
            try:
                plaintext, __, __ = c.decrypt(
                    ciphertext,
                    passphrase=password,
                    verify=False
                )
            except gpg.errors.GPGMEError as e:
                # TODO: Find out what kind of error is raised if the password is wrong and
                # re-raise it as DecryptionFailed instead.
                raise GPGProviderError("Internal GPGME error") from e
            except gpg.UnsupportedAlgorithm as e:
                raise DecryptionFailed("Unsupported algorithm") from e

            return plaintext

    def sign(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes:
        signers = []
        for secret_key in secret_keys:
            assert isinstance(secret_key, GPGME_GPGSecretKey)

            signers.append(secret_key.public_key.key_obj)

        with gpg.Context(home_dir=self.__home_dir, signers=signers) as c:
            try:
                signed_data, __ = c.sign(data)
            except gpg.errors.GPGMEError as e:
                raise GPGProviderError("Internal GPGME error") from e
            except gpg.errors.InvalidSigners as e:
                raise GPGProviderError(
                    "At least one of the secret keys is invalid for signing"
                ) from e

            return signed_data

    def sign_detached(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes:
        signers = []
        for secret_key in secret_keys:
            assert isinstance(secret_key, GPGME_GPGSecretKey)

            signers.append(secret_key.public_key.key_obj)

        with gpg.Context(home_dir=self.__home_dir, signers=signers) as c:
            try:
                signature, __ = c.sign(data, mode=gpg.constants.sig.mode.DETACH)
            except gpg.errors.GPGMEError as e:
                raise GPGProviderError("Internal GPGME error") from e
            except gpg.errors.InvalidSigners as e:
                raise GPGProviderError(
                    "At least one of the secret keys is invalid for signing"
                ) from e

            return signature

    def verify(self, signed_data: bytes, public_keys: Set[GPGPublicKey]) -> bytes:
        with gpg.Context(home_dir=self.__home_dir) as c:
            try:
                data, result = c.verify(signed_data)
            except gpg.errors.GPGMEError as e:
                raise GPGProviderError("Internal GPGME error") from e
            except gpg.errors.BadSignatures as e:
                raise VerificationFailed("Bad signatures on signed data") from e

            valid_signature_found = False
            for public_key in public_keys:
                assert isinstance(public_key, GPGME_GPGPublicKey)

                for subkey in public_key.key_obj.subkeys:
                    for sig in result.signatures:
                        if subkey.can_sign and subkey.fpr == sig.fpr:
                            valid_signature_found = True

            if not valid_signature_found:
                raise VerificationFailed(
                    "Data not signed by one of the expected public keys"
                )

            return data

    def verify_detached(
        self,
        data: bytes,
        signature: bytes,
        public_keys: Set[GPGPublicKey]
    ) -> None:
        with gpg.Context(home_dir=self.__home_dir) as c:
            try:
                __, result = c.verify(data, signature=signature)
            except gpg.errors.GPGMEError as e:
                raise GPGProviderError("Internal GPGME error") from e
            except gpg.errors.BadSignatures as e:
                raise VerificationFailed("Bad signatures on signed data") from e

            valid_signature_found = False
            for public_key in public_keys:
                assert isinstance(public_key, GPGME_GPGPublicKey)

                for subkey in public_key.key_obj.subkeys:
                    for sig in result.signatures:
                        if subkey.can_sign and subkey.fpr == sig.fpr:
                            valid_signature_found = True

            if not valid_signature_found:
                raise VerificationFailed(
                    "Data not signed by one of the expected public keys"
                )

    def encrypt(
        self,
        plaintext: bytes,
        public_keys: Set[GPGPublicKey],
        signing_keys: Optional[Set[GPGSecretKey]] = None
    ) -> bytes:
        recipients = []
        for public_key in public_keys:
            assert isinstance(public_key, GPGME_GPGPublicKey)

            recipients.append(public_key.key_obj)

        signers = []
        if signing_keys is not None:
            for secret_key in signing_keys:
                assert isinstance(secret_key, GPGME_GPGSecretKey)

                signers.append(secret_key.public_key.key_obj)

        sign = signing_keys is not None

        with gpg.Context(home_dir=self.__home_dir, signers=signers) as c:
            try:
                ciphertext, __, __ = c.encrypt(
                    plaintext,
                    recipients=recipients,
                    sign=sign,
                    always_trust=True,
                    add_encrypt_to=True
                )
            except gpg.errors.GPGMEError as e:
                raise GPGProviderError("Internal GPGME error") from e
            except gpg.errors.InvalidRecipients as e:
                raise GPGProviderError(
                    "At least one of the public keys is invalid for encryption"
                ) from e
            except gpg.errors.InvalidSigners as e:
                raise GPGProviderError(
                    "At least one of the signing keys is invalid for signing"
                ) from e

            return ciphertext

    def decrypt(
        self,
        ciphertext: bytes,
        secret_keys: Set[GPGSecretKey],
        public_keys: Optional[Set[GPGPublicKey]] = None
    ) -> bytes:
        verify = public_keys is not None

        with gpg.Context(home_dir=self.__home_dir) as c:
            try:
                plaintext, result, verify_result = c.decrypt(
                    ciphertext,
                    verify=verify
                )
            except gpg.errors.GPGMEError as e:
                raise GPGProviderError("Internal GPGME error") from e
            except gpg.UnsupportedAlgorithm as e:
                raise DecryptionFailed("Unsupported algorithm") from e

            # TODO: Check whether the data was decrypted using one of the expected secret
            # keys

            if public_keys is not None:
                valid_signature_found = False
                for public_key in public_keys:
                    assert isinstance(public_key, GPGME_GPGPublicKey)

                    for subkey in public_key.key_obj.subkeys:
                        for sig in verify_result.signatures:
                            if subkey.can_sign and subkey.fpr == sig.fpr:
                                valid_signature_found = True

                if not valid_signature_found:
                    raise VerificationFailed(
                        "Data not signed by one of the expected public keys"
                    )

            return plaintext

    def list_public_keys(self, user_id: str) -> Set[GPGPublicKey]:
        with gpg.Context(home_dir=self.__home_dir) as c:
            try:
                return {
                    GPGME_GPGPublicKey(key)
                    for key
                    in c.keylist(pattern=user_id, secret=False)
                }
            except gpg.errors.GPGMEError as e:
                raise GPGProviderError("Internal GPGME error") from e

    def list_secret_keys(self, user_id: str) -> Set[GPGSecretKey]:
        with gpg.Context(home_dir=self.__home_dir) as c:
            try:
                return {
                    GPGME_GPGSecretKey(GPGME_GPGPublicKey(key))
                    for key
                    in c.keylist(pattern=user_id, secret=True)
                }
            except gpg.errors.GPGMEError as e:
                raise GPGProviderError("Internal GPGME error") from e

    def can_sign(self, public_key: GPGPublicKey) -> bool:
        assert isinstance(public_key, GPGME_GPGPublicKey)

        return any(subkey.can_sign for subkey in public_key.key_obj.subkeys)

    def can_encrypt(self, public_key: GPGPublicKey) -> bool:
        assert isinstance(public_key, GPGME_GPGPublicKey)

        return any(subkey.can_encrypt for subkey in public_key.key_obj.subkeys)

    def create_key(self, user_id: str) -> GPGSecretKey:
        with gpg.Context(home_dir=self.__home_dir) as c:
            try:
                result = c.create_key(
                    user_id,
                    expires=False,
                    sign=True,
                    encrypt=True,
                    certify=False,
                    authenticate=False,
                    force=True
                )

                key_obj = c.get_key(result.fpr, secret=True)
            except gpg.errors.GPGMEError as e:
                raise GPGProviderError("Internal GPGME error") from e
            except gpg.errors.KeyError as e:
                raise GPGProviderError("Newly created key not found") from e

            return GPGME_GPGSecretKey(GPGME_GPGPublicKey(key_obj))


class PublicKeyMetadata(NamedTuple):
    """
    Metadata about a published public key.
    """
    fingerprint: str
    timestamp: datetime

    def to_dict(self) -> dict:
        # Convert the instance to a dictionary and handle datetime serialization
        data = self._asdict()
        data['timestamp'] = self.timestamp.isoformat()
        return data

    @staticmethod
    def from_dict(data: dict) -> 'PublicKeyMetadata':
        # Load a serialised dictionary
        data['timestamp'] = datetime.fromisoformat(data['timestamp'])
        return PublicKeyMetadata(**data)


@enum.unique
class TrustLevel(enum.Enum):
    """
    The trust levels required for BTBV and manual trust.
    """

    TRUSTED: str = "TRUSTED"
    BLINDLY_TRUSTED: str = "BLINDLY_TRUSTED"
    UNDECIDED: str = "UNDECIDED"
    DISTRUSTED: str = "DISTRUSTED"


OPENPGP_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
    targetNamespace="urn:xmpp:openpgp:0"
    xmlns="urn:xmpp:openpgp:0">

    <xs:element name="openpgp" type="xs:base64Binary"/>
</xs:schema>
""")


# The following schema needs verion 1.1 of XML Schema, which is not supported by lxml.
# Luckily, xmlschema exists, which is a clean, well maintained, cross-platform
# implementation of XML Schema, including version 1.1.
CONTENT_SCHEMA = xmlschema.XMLSchema11("""<?xml version="1.1" encoding="utf8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
    targetNamespace="urn:xmpp:openpgp:0"
    xmlns="urn:xmpp:openpgp:0">

    <xs:element name="signcrypt">
        <xs:complexType>
            <xs:all>
                <xs:element ref="to" maxOccurs="unbounded"/>
                <xs:element ref="time"/>
                <xs:element ref="rpad" minOccurs="0"/>
                <xs:element ref="payload"/>
            </xs:all>
        </xs:complexType>
    </xs:element>

    <xs:element name="sign">
        <xs:complexType>
            <xs:all>
                <xs:element ref="to" maxOccurs="unbounded"/>
                <xs:element ref="time"/>
                <xs:element ref="rpad" minOccurs="0"/>
                <xs:element ref="payload"/>
            </xs:all>
        </xs:complexType>
    </xs:element>

    <xs:element name="crypt">
        <xs:complexType>
            <xs:all>
                <xs:element ref="to" minOccurs="0" maxOccurs="unbounded"/>
                <xs:element ref="time"/>
                <xs:element ref="rpad" minOccurs="0"/>
                <xs:element ref="payload"/>
            </xs:all>
        </xs:complexType>
    </xs:element>

    <xs:element name="to">
        <xs:complexType>
            <xs:attribute name="jid" type="xs:string"/>
        </xs:complexType>
    </xs:element>

    <xs:element name="time">
        <xs:complexType>
            <xs:attribute name="stamp" type="xs:dateTime"/>
        </xs:complexType>
    </xs:element>

    <xs:element name="rpad" type="xs:string"/>

    <xs:element name="payload">
        <xs:complexType>
            <xs:sequence>
                <xs:any minOccurs="0" maxOccurs="unbounded" processContents="skip"/>
            </xs:sequence>
        </xs:complexType>
    </xs:element>
</xs:schema>
""")


PUBLIC_KEYS_LIST_NODE = "urn:xmpp:openpgp:0:public-keys"
PUBLIC_KEYS_LIST_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
    targetNamespace="urn:xmpp:openpgp:0"
    xmlns="urn:xmpp:openpgp:0">

    <xs:element name="public-keys-list">
        <xs:complexType>
            <xs:sequence>
                <xs:element ref="pubkey-metadata" minOccurs="0" maxOccurs="unbounded"/>
            </xs:sequence>
        </xs:complexType>
    </xs:element>

    <xs:element name="pubkey-metadata">
        <xs:complexType>
            <xs:attribute name="v4-fingerprint" type="xs:string"/>
            <xs:attribute name="date" type="xs:dateTime"/>
        </xs:complexType>
    </xs:element>
</xs:schema>
""")


PUBKEY_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
    targetNamespace="urn:xmpp:openpgp:0"
    xmlns="urn:xmpp:openpgp:0">

    <xs:element name="pubkey">
        <xs:complexType>
            <xs:all>
                <xs:element ref="data"/>
            </xs:all>
            <xs:anyAttribute processContents="skip"/>
        </xs:complexType>
    </xs:element>

    <xs:element name="data" type="xs:base64Binary"/>
</xs:schema>
""")


SECRETKEY_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
    targetNamespace="urn:xmpp:openpgp:0"
    xmlns="urn:xmpp:openpgp:0">

    <xs:element name="secretkey" type="xs:base64Binary"/>
</xs:schema>
""")


DEFAULT_TRUST_MODEL_PARAM = f"""
<params>
<individual>
<category name="{PARAM_CATEGORY}" label={quoteattr(D_('Security'))}>
    <param name="{PARAM_NAME}"
        label={quoteattr(D_('OMEMO default trust policy'))}
        type="list" security="3">
        <option value="manual" label={quoteattr(D_('Manual trust (more secure)'))} />
        <option value="btbv"
            label={quoteattr(D_('Blind Trust Before Verification (more user friendly)'))}
            selected="true" />
    </param>
</category>
</individual>
</params>
"""


def get_gpg_provider(sat: LiberviaBackend, client: SatXMPPClient) -> GPGProvider:
    """Get the GPG provider for a client.

    @param sat: The SAT instance.
    @param client: The client.
    @return: The GPG provider specifically for that client.
    """

    return GPGME_GPGProvider(str(sat.get_local_path(client, "gnupg-home")))


def generate_passphrase() -> str:
    """Generate a secure passphrase for symmetric encryption.

    @return: The passphrase.
    """

    return "-".join("".join(
        secrets.choice("123456789ABCDEFGHIJKLMNPQRSTUVWXYZ") for __ in range(4)
    ) for __ in range(6))


# TODO: Handle the user id mess
class XEP_0373:
    """
    Implementation of XEP-0373: OpenPGP for XMPP under namespace ``urn:xmpp:openpgp:0``.
    """

    def __init__(self, host: LiberviaBackend) -> None:
        """
        @param sat: The SAT instance.
        """

        self.host = host

        # Add configuration option to choose between manual trust and BTBV as the trust
        # model
        host.memory.update_params(DEFAULT_TRUST_MODEL_PARAM)

        self.__xep_0045 = cast(Optional[XEP_0045], host.plugins.get("XEP-0045"))
        self.__xep_0060 = cast(XEP_0060, host.plugins["XEP-0060"])

        self.__storage: Dict[str, persistent.LazyPersistentBinaryDict] = {}

        xep_0163 = cast(XEP_0163, host.plugins["XEP-0163"])
        xep_0163.add_pep_event(
            "OX_PUBLIC_KEYS_LIST",
            PUBLIC_KEYS_LIST_NODE,
            lambda items_event, profile: defer.ensureDeferred(
                self.__on_public_keys_list_update(items_event, profile)
            )
        )

    async def profile_connecting(self, client):
        client.gpg_provider = get_gpg_provider(self.host, client)

    async def profile_connected(  # pylint: disable=invalid-name
        self,
        client: SatXMPPClient
    ) -> None:
        """
        @param client: The client.
        """

        profile = cast(str, client.profile)

        if not profile in self.__storage:
            self.__storage[profile] = \
                persistent.LazyPersistentBinaryDict("XEP-0373", client.profile)

        if len(self.list_secret_keys(client)) == 0:
            log.debug(f"Generating first GPG key for {client.jid.userhost()}.")
            await self.create_key(client)

    async def __on_public_keys_list_update(
        self,
        items_event: pubsub.ItemsEvent,
        profile: str
    ) -> None:
        """Handle public keys list updates fired by PEP.

        @param items_event: The event.
        @param profile: The profile this event belongs to.
        """

        client = self.host.get_client(profile)

        sender = cast(jid.JID, items_event.sender)
        items = cast(List[domish.Element], items_event.items)

        if len(items) > 1:
            log.warning("Ignoring public keys list update with more than one element.")
            return

        item_elt = next(iter(items), None)
        if item_elt is None:
            log.debug("Ignoring empty public keys list update.")
            return

        public_keys_list_elt = cast(
            Optional[domish.Element],
            next(item_elt.elements(NS_OX, "public-keys-list"), None)
        )

        pubkey_metadata_elts: Optional[List[domish.Element]] = None

        if public_keys_list_elt is not None:
            try:
                PUBLIC_KEYS_LIST_SCHEMA.validate(public_keys_list_elt.toXml())
            except xmlschema.XMLSchemaValidationError:
                pass
            else:
                pubkey_metadata_elts = \
                    list(public_keys_list_elt.elements(NS_OX, "pubkey-metadata"))

        if pubkey_metadata_elts is None:
            log.warning(f"Malformed public keys list update item: {item_elt.toXml()}")
            return

        new_public_keys_metadata = { PublicKeyMetadata(
            fingerprint=cast(str, pubkey_metadata_elt["v4-fingerprint"]),
            timestamp=parse_datetime(cast(str, pubkey_metadata_elt["date"]))
        ) for pubkey_metadata_elt in pubkey_metadata_elts }

        storage_key = STR_KEY_PUBLIC_KEYS_METADATA.format(sender.userhost())

        local_public_keys_metadata = {
            PublicKeyMetadata.from_dict(pkm)
            for pkm in await self.__storage[profile].get(storage_key, [])
        }

        unchanged_keys = new_public_keys_metadata & local_public_keys_metadata
        changed_or_new_keys = new_public_keys_metadata - unchanged_keys
        available_keys = self.list_public_keys(client, sender)

        for key_metadata in changed_or_new_keys:
            # Check whether the changed or new key has been imported before
            if any(key.fingerprint == key_metadata.fingerprint for key in available_keys):
                try:
                    # If it has been imported before, try to update it
                    await self.import_public_key(client, sender, key_metadata.fingerprint)
                except Exception as e:
                    log.warning(f"Public key import failed: {e}")

                    # If the update fails, remove the key from the local metadata list
                    # such that the update is attempted again next time
                    new_public_keys_metadata.remove(key_metadata)

        # Check whether this update was for our account and make sure all of our keys are
        # included in the update
        if sender.userhost() == client.jid.userhost():
            secret_keys = self.list_secret_keys(client)
            missing_keys = set(filter(lambda secret_key: all(
                key_metadata.fingerprint != secret_key.public_key.fingerprint
                for key_metadata
                in new_public_keys_metadata
            ), secret_keys))

            if len(missing_keys) > 0:
                log.warning(
                    "Public keys list update did not contain at least one of our keys."
                    f" {new_public_keys_metadata}"
                )

                for missing_key in missing_keys:
                    log.warning(missing_key.public_key.fingerprint)
                    new_public_keys_metadata.add(PublicKeyMetadata(
                        fingerprint=missing_key.public_key.fingerprint,
                        timestamp=datetime.now(timezone.utc)
                    ))

                await self.publish_public_keys_list(client, new_public_keys_metadata)

        await self.__storage[profile].force(
            storage_key,
            [pkm.to_dict() for pkm in new_public_keys_metadata]
        )

    def list_public_keys(self, client: SatXMPPClient, jid: jid.JID) -> Set[GPGPublicKey]:
        """List GPG public keys available for a JID.

        @param client: The client to perform this operation with.
        @param jid: The JID. Can be a bare JID.
        @return: The set of public keys available for this JID.
        """

        gpg_provider = get_gpg_provider(self.host, client)

        return gpg_provider.list_public_keys(f"xmpp:{jid.userhost()}")

    def list_secret_keys(self, client: SatXMPPClient) -> Set[GPGSecretKey]:
        """List GPG secret keys available for a JID.

        @param client: The client to perform this operation with.
        @return: The set of secret keys available for this JID.
        """

        gpg_provider = get_gpg_provider(self.host, client)

        return gpg_provider.list_secret_keys(f"xmpp:{client.jid.userhost()}")

    async def create_key(self, client: SatXMPPClient) -> GPGSecretKey:
        """Create a new GPG key, capable of signing and encryption.

        The key is generated without password protection and without expiration.

        @param client: The client to perform this operation with.
        @return: The new key.
        """

        gpg_provider = get_gpg_provider(self.host, client)

        secret_key = gpg_provider.create_key(f"xmpp:{client.jid.userhost()}")

        await self.publish_public_key(client, secret_key.public_key)

        storage_key = STR_KEY_PUBLIC_KEYS_METADATA.format(client.jid.userhost())

        public_keys_list = {
            PublicKeyMetadata.from_dict(pkm)
            for pkm in await self.__storage[client.profile].get(storage_key, [])
        }

        public_keys_list.add(PublicKeyMetadata(
            fingerprint=secret_key.public_key.fingerprint,
            timestamp=datetime.now(timezone.utc)
        ))

        await self.publish_public_keys_list(client, public_keys_list)

        await self.__storage[client.profile].force(
            storage_key,
            [pkm.to_dict() for pkm in public_keys_list]
        )

        return secret_key

    @staticmethod
    def __build_content_element(
        element_name: Literal["signcrypt", "sign", "crypt"],
        recipient_jids: Iterable[jid.JID],
        include_rpad: bool
    ) -> Tuple[domish.Element, domish.Element]:
        """Build a content element.

        @param element_name: The name of the content element.
        @param recipient_jids: The intended recipients of this content element. Can be
            bare JIDs.
        @param include_rpad: Whether to include random-length random-content padding.
        @return: The content element and the ``<payload/>`` element to add the stanza
            extension elements to.
        """

        content_elt = domish.Element((NS_OX, element_name))

        for recipient_jid in recipient_jids:
            content_elt.addElement("to")["jid"] = recipient_jid.userhost()

        content_elt.addElement("time")["stamp"] = format_datetime()

        if include_rpad:
            # XEP-0373 doesn't specify bounds for the length of the random padding. This
            # uses the bounds specified in XEP-0420 for the closely related rpad affix.
            rpad_length = secrets.randbelow(201)
            rpad_content = "".join(
                secrets.choice(string.digits + string.ascii_letters + string.punctuation)
                for __
                in range(rpad_length)
            )
            content_elt.addElement("rpad", content=rpad_content)

        payload_elt = content_elt.addElement("payload")

        return content_elt, payload_elt

    @staticmethod
    def build_signcrypt_element(
        recipient_jids: Iterable[jid.JID]
    ) -> Tuple[domish.Element, domish.Element]:
        """Build a ``<signcrypt/>`` content element.

        @param recipient_jids: The intended recipients of this content element. Can be
            bare JIDs.
        @return: The ``<signcrypt/>`` element and the ``<payload/>`` element to add the
            stanza extension elements to.
        """

        if len(recipient_jids) == 0:
            raise ValueError("Recipient JIDs must be provided.")

        return XEP_0373.__build_content_element("signcrypt", recipient_jids, True)

    @staticmethod
    def build_sign_element(
        recipient_jids: Iterable[jid.JID],
        include_rpad: bool
    ) -> Tuple[domish.Element, domish.Element]:
        """Build a ``<sign/>`` content element.

        @param recipient_jids: The intended recipients of this content element. Can be
            bare JIDs.
        @param include_rpad: Whether to include random-length random-content padding,
            which is OPTIONAL for the ``<sign/>`` content element.
        @return: The ``<sign/>`` element and the ``<payload/>`` element to add the stanza
            extension elements to.
        """

        if len(recipient_jids) == 0:
            raise ValueError("Recipient JIDs must be provided.")

        return XEP_0373.__build_content_element("sign", recipient_jids, include_rpad)

    @staticmethod
    def build_crypt_element(
        recipient_jids: Iterable[jid.JID]
    ) -> Tuple[domish.Element, domish.Element]:
        """Build a ``<crypt/>`` content element.

        @param recipient_jids: The intended recipients of this content element. Specifying
            the intended recipients is OPTIONAL for the ``<crypt/>`` content element. Can
            be bare JIDs.
        @return: The ``<crypt/>`` element and the ``<payload/>`` element to add the stanza
            extension elements to.
        """

        return XEP_0373.__build_content_element("crypt", recipient_jids, True)

    async def build_openpgp_element(
        self,
        client: SatXMPPClient,
        content_elt: domish.Element,
        recipient_jids: Set[jid.JID]
    ) -> domish.Element:
        """Build an ``<openpgp/>`` element.

        @param client: The client to perform this operation with.
        @param content_elt: The content element to contain in the ``<openpgp/>`` element.
        @param recipient_jids: The recipient's JIDs. Can be bare JIDs.
        @return: The ``<openpgp/>`` element.
        """

        gpg_provider = get_gpg_provider(self.host, client)

        # TODO: I'm not sure whether we want to sign with all keys by default or choose
        # just one key/a subset of keys to sign with.
        signing_keys = set(filter(
            lambda secret_key: gpg_provider.can_sign(secret_key.public_key),
            self.list_secret_keys(client)
        ))

        encryption_keys: Set[GPGPublicKey] = set()

        for recipient_jid in recipient_jids:
            # import all keys of the recipient
            all_public_keys = await self.import_all_public_keys(client, recipient_jid)

            # Filter for keys that can encrypt
            encryption_keys |= set(filter(gpg_provider.can_encrypt, all_public_keys))

        # TODO: Handle trust

        content = content_elt.toXml().encode("utf-8")
        data: bytes

        if content_elt.name == "signcrypt":
            data = gpg_provider.encrypt(content, encryption_keys, signing_keys)
        elif content_elt.name == "sign":
            data = gpg_provider.sign(content, signing_keys)
        elif content_elt.name == "crypt":
            data = gpg_provider.encrypt(content, encryption_keys)
        else:
            raise ValueError(f"Unknown content element <{content_elt.name}/>")

        openpgp_elt = domish.Element((NS_OX, "openpgp"))
        openpgp_elt.addContent(base64.b64encode(data).decode("ASCII"))
        return openpgp_elt

    async def unpack_openpgp_element(
        self,
        client: SatXMPPClient,
        openpgp_elt: domish.Element,
        element_name: Literal["signcrypt", "sign", "crypt"],
        sender_jid: jid.JID
    ) -> Tuple[domish.Element, datetime]:
        """Verify, decrypt and unpack an ``<openpgp/>`` element.

        @param client: The client to perform this operation with.
        @param openpgp_elt: The ``<openpgp/>`` element.
        @param element_name: The name of the content element.
        @param sender_jid: The sender's JID. Can be a bare JID.
        @return: The ``<payload/>`` element containing the decrypted/verified stanza
            extension elements carried by this ``<openpgp/>`` element, and the timestamp
            contained in the content element.
        @raise exceptions.ParsingError: on syntactical verification errors.
        @raise VerificationError: on semantical verification errors accoding to XEP-0373.
        @raise DecryptionFailed: on decryption failure.
        @raise VerificationFailed: if the data could not be verified.

        @warning: The timestamp is not verified for plausibility; this SHOULD be done by
            the calling code.
        """

        gpg_provider = get_gpg_provider(self.host, client)

        decryption_keys = set(filter(
            lambda secret_key: gpg_provider.can_encrypt(secret_key.public_key),
            self.list_secret_keys(client)
        ))

        # import all keys of the sender
        all_public_keys = await self.import_all_public_keys(client, sender_jid)

        # Filter for keys that can sign
        verification_keys = set(filter(gpg_provider.can_sign, all_public_keys))

        # TODO: Handle trust

        try:
            OPENPGP_SCHEMA.validate(openpgp_elt.toXml())
        except xmlschema.XMLSchemaValidationError as e:
            raise exceptions.ParsingError(
                "<openpgp/> element doesn't pass schema validation."
            ) from e

        openpgp_message = base64.b64decode(str(openpgp_elt))
        content: bytes

        if element_name == "signcrypt":
            content = gpg_provider.decrypt(
                openpgp_message,
                decryption_keys,
                public_keys=verification_keys
            )
        elif element_name == "sign":
            content = gpg_provider.verify(openpgp_message, verification_keys)
        elif element_name == "crypt":
            content = gpg_provider.decrypt(openpgp_message, decryption_keys)
        else:
            assert_never(element_name)

        try:
            content_elt = cast(
                domish.Element,
                xml_tools.ElementParser()(content.decode("utf-8"))
            )
        except UnicodeDecodeError as e:
            raise exceptions.ParsingError("UTF-8 decoding error") from e

        try:
            CONTENT_SCHEMA.validate(content_elt.toXml())
        except xmlschema.XMLSchemaValidationError as e:
            raise exceptions.ParsingError(
                f"<{element_name}/> element doesn't pass schema validation."
            ) from e

        if content_elt.name != element_name:
            raise exceptions.ParsingError(f"Not a <{element_name}/> element.")

        recipient_jids = \
            { jid.JID(to_elt["jid"]) for to_elt in content_elt.elements(NS_OX, "to") }

        if (
            client.jid.userhostJID() not in { jid.userhostJID() for jid in recipient_jids }
            and element_name != "crypt"
        ):
            raise VerificationError(
                f"Recipient list in <{element_name}/> element does not list our (bare)"
                f" JID."
            )

        time_elt = next(content_elt.elements(NS_OX, "time"))

        timestamp = parse_datetime(time_elt["stamp"])

        payload_elt = next(content_elt.elements(NS_OX, "payload"))

        return payload_elt, timestamp

    async def publish_public_key(
        self,
        client: SatXMPPClient,
        public_key: GPGPublicKey
    ) -> None:
        """Publish a public key.

        @param client: The client.
        @param public_key: The public key to publish.
        @raise XMPPInteractionFailed: if any interaction via XMPP failed.
        """

        gpg_provider = get_gpg_provider(self.host, client)

        packet = gpg_provider.export_public_key(public_key)

        node = f"urn:xmpp:openpgp:0:public-keys:{public_key.fingerprint}"

        pubkey_elt = domish.Element((NS_OX, "pubkey"))

        pubkey_elt.addElement("data", content=base64.b64encode(packet).decode("ASCII"))

        try:
            await self.__xep_0060.send_item(
                client,
                client.jid.userhostJID(),
                node,
                pubkey_elt,
                format_datetime(),
                extra={
                    XEP_0060.EXTRA_PUBLISH_OPTIONS: {
                        XEP_0060.OPT_PERSIST_ITEMS: "true",
                        XEP_0060.OPT_ACCESS_MODEL: "open",
                        XEP_0060.OPT_MAX_ITEMS: 1
                    },
                    # TODO: Do we really want publish_without_options here?
                    XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options"
                }
            )
        except Exception as e:
            raise XMPPInteractionFailed("Publishing the public key failed.") from e

    async def import_all_public_keys(
        self,
        client: SatXMPPClient,
        entity_jid: jid.JID
    ) -> Set[GPGPublicKey]:
        """import all public keys of a JID that have not been imported before.

        @param client: The client.
        @param jid: The JID. Can be a bare JID.
        @return: The public keys.
        @note: Failure to import a key simply results in the key not being included in the
            result.
        """

        available_public_keys = self.list_public_keys(client, entity_jid)

        storage_key = STR_KEY_PUBLIC_KEYS_METADATA.format(entity_jid.userhost())

        public_keys_metadata = {
            PublicKeyMetadata.from_dict(pkm)
            for pkm in await self.__storage[client.profile].get(storage_key, [])
        }
        if not public_keys_metadata:
            public_keys_metadata = await self.download_public_keys_list(
                client, entity_jid
            )
            if not public_keys_metadata:
                raise exceptions.NotFound(
                    f"Can't find public keys for {entity_jid}"
                )
            else:
                await self.__storage[client.profile].aset(
                    storage_key,
                    [pkm.to_dict() for pkm in public_keys_metadata]
                )


        missing_keys = set(filter(lambda public_key_metadata: all(
            public_key_metadata.fingerprint != public_key.fingerprint
            for public_key
            in available_public_keys
        ), public_keys_metadata))

        for missing_key in missing_keys:
            try:
                available_public_keys.add(
                    await self.import_public_key(client, entity_jid, missing_key.fingerprint)
                )
            except Exception as e:
                log.warning(
                    f"import of public key {missing_key.fingerprint} owned by"
                    f" {entity_jid.userhost()} failed, ignoring: {e}"
                )

        return available_public_keys

    async def import_public_key(
        self,
        client: SatXMPPClient,
        jid: jid.JID,
        fingerprint: str
    ) -> GPGPublicKey:
        """import a public key.

        @param client: The client.
        @param jid: The JID owning the public key. Can be a bare JID.
        @param fingerprint: The fingerprint of the public key.
        @return: The public key.
        @raise exceptions.NotFound: if the public key was not found.
        @raise exceptions.ParsingError: on XML-level parsing errors.
        @raise InvalidPacket: if the packet is either syntactically or semantically deemed
            invalid.
        @raise XMPPInteractionFailed: if any interaction via XMPP failed.
        """

        gpg_provider = get_gpg_provider(self.host, client)

        node = f"urn:xmpp:openpgp:0:public-keys:{fingerprint}"

        try:
            items, __ = await self.__xep_0060.get_items(
                client,
                jid.userhostJID(),
                node,
                max_items=1
            )
        except exceptions.NotFound as e:
            raise exceptions.NotFound(
                f"No public key with fingerprint {fingerprint} published by JID"
                f" {jid.userhost()}."
            ) from e
        except Exception as e:
            raise XMPPInteractionFailed("Fetching the public keys list failed.") from e

        try:
            item_elt = cast(domish.Element, items[0])
        except IndexError as e:
            raise exceptions.NotFound(
                f"No public key with fingerprint {fingerprint} published by JID"
                f" {jid.userhost()}."
            ) from e

        pubkey_elt = cast(
            Optional[domish.Element],
            next(item_elt.elements(NS_OX, "pubkey"), None)
        )

        if pubkey_elt is None:
            raise exceptions.ParsingError(
                f"Publish-Subscribe item of JID {jid.userhost()} doesn't contain pubkey"
                f" element."
            )

        try:
            PUBKEY_SCHEMA.validate(pubkey_elt.toXml())
        except xmlschema.XMLSchemaValidationError as e:
            raise exceptions.ParsingError(
                f"Publish-Subscribe item of JID {jid.userhost()} doesn't pass pubkey"
                f" schema validation."
            ) from e

        public_key = gpg_provider.import_public_key(base64.b64decode(str(
            next(pubkey_elt.elements(NS_OX, "data"))
        )))

        return public_key

    async def publish_public_keys_list(
        self,
        client: SatXMPPClient,
        public_keys_list: Iterable[PublicKeyMetadata]
    ) -> None:
        """Publish/update the own public keys list.

        @param client: The client.
        @param public_keys_list: The public keys list.
        @raise XMPPInteractionFailed: if any interaction via XMPP failed.

        @warning: All public keys referenced in the public keys list MUST be published
            beforehand.
        """

        if len({ pkm.fingerprint for pkm in public_keys_list }) != len(public_keys_list):
            raise ValueError("Public keys list contains duplicate fingerprints.")

        node = "urn:xmpp:openpgp:0:public-keys"

        public_keys_list_elt = domish.Element((NS_OX, "public-keys-list"))

        for public_key_metadata in public_keys_list:
            pubkey_metadata_elt = public_keys_list_elt.addElement("pubkey-metadata")
            pubkey_metadata_elt["v4-fingerprint"] = public_key_metadata.fingerprint
            pubkey_metadata_elt["date"] = format_datetime(public_key_metadata.timestamp)

        try:
            await self.__xep_0060.send_item(
                client,
                client.jid.userhostJID(),
                node,
                public_keys_list_elt,
                item_id=XEP_0060.ID_SINGLETON,
                extra={
                    XEP_0060.EXTRA_PUBLISH_OPTIONS: {
                        XEP_0060.OPT_PERSIST_ITEMS: "true",
                        XEP_0060.OPT_ACCESS_MODEL: "open",
                        XEP_0060.OPT_MAX_ITEMS: 1
                    },
                    # TODO: Do we really want publish_without_options here?
                    XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options"
                }
            )
        except Exception as e:
            raise XMPPInteractionFailed("Publishing the public keys list failed.") from e

    async def download_public_keys_list(
        self,
        client: SatXMPPClient,
        jid: jid.JID
    ) -> Optional[Set[PublicKeyMetadata]]:
        """Download the public keys list of a JID.

        @param client: The client.
        @param jid: The JID. Can be a bare JID.
        @return: The public keys list or ``None`` if the JID hasn't published a public
            keys list. An empty list means the JID has published an empty list.
        @raise exceptions.ParsingError: on XML-level parsing errors.
        @raise XMPPInteractionFailed: if any interaction via XMPP failed.
        """

        node = "urn:xmpp:openpgp:0:public-keys"

        try:
            items, __ = await self.__xep_0060.get_items(
                client,
                jid.userhostJID(),
                node,
                max_items=1
            )
        except exceptions.NotFound:
            return None
        except Exception as e:
            raise XMPPInteractionFailed() from e

        try:
            item_elt = cast(domish.Element, items[0])
        except IndexError:
            return None

        public_keys_list_elt = cast(
            Optional[domish.Element],
            next(item_elt.elements(NS_OX, "public-keys-list"), None)
        )

        if public_keys_list_elt is None:
            return None

        try:
            PUBLIC_KEYS_LIST_SCHEMA.validate(public_keys_list_elt.toXml())
        except xmlschema.XMLSchemaValidationError as e:
            raise exceptions.ParsingError(
                f"Publish-Subscribe item of JID {jid.userhost()} doesn't pass public keys"
                f" list schema validation."
            ) from e

        return {
            PublicKeyMetadata(
                fingerprint=pubkey_metadata_elt["v4-fingerprint"],
                timestamp=parse_datetime(pubkey_metadata_elt["date"])
            )
            for pubkey_metadata_elt
            in public_keys_list_elt.elements(NS_OX, "pubkey-metadata")
        }

    async def __prepare_secret_key_synchronization(
        self,
        client: SatXMPPClient
    ) -> Optional[domish.Element]:
        """Prepare for secret key synchronization.

        Makes sure the relative protocols and protocol extensions are supported by the
        server and makes sure that the PEP node for secret synchronization exists and is
        configured correctly. The node is created if necessary.

        @param client: The client.
        @return: As part of the preparations, the secret key synchronization PEP node is
            fetched. The result of that fetch is returned here.
        @raise exceptions.FeatureNotFound: if the server lacks support for the required
            protocols or protocol extensions.
        @raise XMPPInteractionFailed: if any interaction via XMPP failed.
        """

        try:
            infos = cast(DiscoInfo, await self.host.memory.disco.get_infos(
                client,
                client.jid.userhostJID()
            ))
        except Exception as e:
            raise XMPPInteractionFailed(
                "Error performing service discovery on the own bare JID."
            ) from e

        identities = cast(Dict[Tuple[str, str], str], infos.identities)
        features = cast(Set[DiscoFeature], infos.features)

        if ("pubsub", "pep") not in identities:
            raise exceptions.FeatureNotFound("Server doesn't support PEP.")

        if "http://jabber.org/protocol/pubsub#access-whitelist" not in features:
            raise exceptions.FeatureNotFound(
                "Server doesn't support the whitelist access model."
            )

        persistent_items_supported = \
            "http://jabber.org/protocol/pubsub#persistent-items" in features

        # TODO: persistent-items is a SHOULD, how do we handle the feature missing?

        node = "urn:xmpp:openpgp:0:secret-key"

        try:
            items, __ = await self.__xep_0060.get_items(
                client,
                client.jid.userhostJID(),
                node,
                max_items=1
            )
        except exceptions.NotFound:
            try:
                await self.__xep_0060.createNode(
                    client,
                    client.jid.userhostJID(),
                    node,
                    {
                        XEP_0060.OPT_PERSIST_ITEMS: "true",
                        XEP_0060.OPT_ACCESS_MODEL: "whitelist",
                        XEP_0060.OPT_MAX_ITEMS: "1"
                    }
                )
            except Exception as e:
                raise XMPPInteractionFailed(
                    "Error creating the secret key synchronization node."
                ) from e
        except Exception as e:
            raise XMPPInteractionFailed(
                "Error fetching the secret key synchronization node."
            ) from e

        try:
            return cast(domish.Element, items[0])
        except IndexError:
            return None

    async def export_secret_keys(
        self,
        client: SatXMPPClient,
        secret_keys: Iterable[GPGSecretKey]
    ) -> str:
        """Export secret keys to synchronize them with other devices.

        @param client: The client.
        @param secret_keys: The secret keys to export.
        @return: The backup code needed to decrypt the exported secret keys.
        @raise exceptions.FeatureNotFound: if the server lacks support for the required
            protocols or protocol extensions.
        @raise XMPPInteractionFailed: if any interaction via XMPP failed.
        """

        gpg_provider = get_gpg_provider(self.host, client)

        await self.__prepare_secret_key_synchronization(client)

        backup_code = generate_passphrase()

        plaintext = b"".join(
            gpg_provider.backup_secret_key(secret_key) for secret_key in secret_keys
        )

        ciphertext = gpg_provider.encrypt_symmetrically(plaintext, backup_code)

        node = "urn:xmpp:openpgp:0:secret-key"

        secretkey_elt = domish.Element((NS_OX, "secretkey"))
        secretkey_elt.addContent(base64.b64encode(ciphertext).decode("ASCII"))

        try:
            await self.__xep_0060.send_item(
                client,
                client.jid.userhostJID(),
                node,
                secretkey_elt
            )
        except Exception as e:
            raise XMPPInteractionFailed("Publishing the secret keys failed.") from e

        return backup_code

    async def download_secret_keys(self, client: SatXMPPClient) -> Optional[bytes]:
        """Download previously exported secret keys to import them in a second step.

        The downloading and importing steps are separate since a backup code is required
        for the import and it should be possible to try multiple backup codes without
        redownloading the data every time. The second half of the import procedure is
        provided by :meth:`import_secret_keys`.

        @param client: The client.
        @return: The encrypted secret keys previously exported, if any.
        @raise exceptions.FeatureNotFound: if the server lacks support for the required
            protocols or protocol extensions.
        @raise exceptions.ParsingError: on XML-level parsing errors.
        @raise XMPPInteractionFailed: if any interaction via XMPP failed.
        """

        item_elt = await self.__prepare_secret_key_synchronization(client)
        if item_elt is None:
            return None

        secretkey_elt = cast(
            Optional[domish.Element],
            next(item_elt.elements(NS_OX, "secretkey"), None)
        )

        if secretkey_elt is None:
            return None

        try:
            SECRETKEY_SCHEMA.validate(secretkey_elt.toXml())
        except xmlschema.XMLSchemaValidationError as e:
            raise exceptions.ParsingError(
                "Publish-Subscribe item doesn't pass secretkey schema validation."
            ) from e

        return base64.b64decode(str(secretkey_elt))

    def import_secret_keys(
        self,
        client: SatXMPPClient,
        ciphertext: bytes,
        backup_code: str
    ) -> Set[GPGSecretKey]:
        """import previously downloaded secret keys.

        The downloading and importing steps are separate since a backup code is required
        for the import and it should be possible to try multiple backup codes without
        redownloading the data every time. The first half of the import procedure is
        provided by :meth:`download_secret_keys`.

        @param client: The client to perform this operation with.
        @param ciphertext: The ciphertext, i.e. the data returned by
            :meth:`download_secret_keys`.
        @param backup_code: The backup code needed to decrypt the data.
        @raise InvalidPacket: if one of the GPG packets building the secret key data is
            either syntactically or semantically deemed invalid.
        @raise DecryptionFailed: on decryption failure.
        """

        gpg_provider = get_gpg_provider(self.host, client)

        return gpg_provider.restore_secret_keys(gpg_provider.decrypt_symmetrically(
            ciphertext,
            backup_code
        ))

    @staticmethod
    def __get_joined_muc_users(
        client: SatXMPPClient,
        xep_0045: XEP_0045,
        room_jid: jid.JID
    ) -> Set[jid.JID]:
        """
        @param client: The client.
        @param xep_0045: A MUC plugin instance.
        @param room_jid: The room JID.
        @return: A set containing the bare JIDs of the MUC participants.
        @raise InternalError: if the MUC is not joined or the entity information of a
            participant isn't available.
        """
        # TODO: This should probably be a global helper somewhere

        bare_jids: Set[jid.JID] = set()

        try:
            room = cast(muc.Room, xep_0045.get_room(client, room_jid))
        except exceptions.NotFound as e:
            raise exceptions.InternalError(
                "Participant list of unjoined MUC requested."
            ) from e

        for user in cast(Dict[str, muc.User], room.roster).values():
            entity = cast(Optional[SatXMPPEntity], user.entity)
            if entity is None:
                raise exceptions.InternalError(
                    f"Participant list of MUC requested, but the entity information of"
                    f" the participant {user} is not available."
                )

            bare_jids.add(entity.jid.userhostJID())

        return bare_jids

    async def get_trust(
        self,
        client: SatXMPPClient,
        public_key: GPGPublicKey,
        owner: jid.JID
    ) -> TrustLevel:
        """Query the trust level of a public key.

        @param client: The client to perform this operation under.
        @param public_key: The public key.
        @param owner: The owner of the public key. Can be a bare JID.
        @return: The trust level.
        """

        key = f"/trust/{owner.userhost()}/{public_key.fingerprint}"

        try:
            return TrustLevel(await self.__storage[client.profile][key])
        except KeyError:
            return TrustLevel.UNDECIDED

    async def set_trust(
        self,
        client: SatXMPPClient,
        public_key: GPGPublicKey,
        owner: jid.JID,
        trust_level: TrustLevel
    ) -> None:
        """Set the trust level of a public key.

        @param client: The client to perform this operation under.
        @param public_key: The public key.
        @param owner: The owner of the public key. Can be a bare JID.
        @param trust_leve: The trust level.
        """

        key = f"/trust/{owner.userhost()}/{public_key.fingerprint}"

        await self.__storage[client.profile].force(key, trust_level.name)

    async def get_trust_ui(  # pylint: disable=invalid-name
        self,
        client: SatXMPPClient,
        entity: jid.JID
    ) -> xml_tools.XMLUI:
        """
        @param client: The client.
        @param entity: The entity whose device trust levels to manage.
        @return: An XMLUI instance which opens a form to manage the trust level of all
            devices belonging to the entity.
        """

        if entity.resource:
            raise ValueError("A bare JID is expected.")

        bare_jids: Set[jid.JID]
        if self.__xep_0045 is not None and self.__xep_0045.is_joined_room(client, entity):
            bare_jids = self.__get_joined_muc_users(client, self.__xep_0045, entity)
        else:
            bare_jids = { entity.userhostJID() }

        all_public_keys = list({
            bare_jid: list(self.list_public_keys(client, bare_jid))
            for bare_jid
            in bare_jids
        }.items())

        async def callback(
            data: Any,
            profile: str  # pylint: disable=unused-argument
        ) -> Dict[Never, Never]:
            """
            @param data: The XMLUI result produces by the trust UI form.
            @param profile: The profile.
            @return: An empty dictionary. The type of the return value was chosen
                conservatively since the exact options are neither known not needed here.
            """

            if C.bool(data.get("cancelled", "false")):
                return {}

            data_form_result = cast(
                Dict[str, str],
                xml_tools.xmlui_result_2_data_form_result(data)
            )
            for key, value in data_form_result.items():
                if not key.startswith("trust_"):
                    continue

                outer_index, inner_index = key.split("_")[1:]

                owner, public_keys = all_public_keys[int(outer_index)]
                public_key = public_keys[int(inner_index)]
                trust = TrustLevel(value)

                if (await self.get_trust(client, public_key, owner)) is not trust:
                    await self.set_trust(client, public_key, owner, value)

            return {}

        submit_id = self.host.register_callback(callback, with_data=True, one_shot=True)

        result = xml_tools.XMLUI(
            panel_type=C.XMLUI_FORM,
            title=D_("OX trust management"),
            submit_id=submit_id
        )
        # Casting this to Any, otherwise all calls on the variable cause type errors
        # pylint: disable=no-member
        trust_ui = cast(Any, result)
        trust_ui.addText(D_(
            "This is OX trusting system. You'll see below the GPG keys of your "
            "contacts, and a list selection to trust them or not. A trusted key "
            "can read your messages in plain text, so be sure to only validate "
            "keys that you are sure are belonging to your contact. It's better "
            "to do this when you are next to your contact, so "
            "you can check the \"fingerprint\" of the key "
            "yourself. Do *not* validate a key if the fingerprint is wrong!"
        ))

        own_secret_keys = self.list_secret_keys(client)

        trust_ui.change_container("label")
        for index, secret_key in enumerate(own_secret_keys):
            trust_ui.addLabel(D_(f"Own secret key {index} fingerprint"))
            trust_ui.addText(secret_key.public_key.fingerprint)
            trust_ui.addEmpty()
            trust_ui.addEmpty()

        for outer_index, [ owner, public_keys ] in enumerate(all_public_keys):
            for inner_index, public_key in enumerate(public_keys):
                trust_ui.addLabel(D_("Contact"))
                trust_ui.addJid(jid.JID(owner))
                trust_ui.addLabel(D_("Fingerprint"))
                trust_ui.addText(public_key.fingerprint)
                trust_ui.addLabel(D_("Trust this device?"))

                current_trust_level = await self.get_trust(client, public_key, owner)
                avaiable_trust_levels = \
                    { TrustLevel.DISTRUSTED, TrustLevel.TRUSTED, current_trust_level }

                trust_ui.addList(
                    f"trust_{outer_index}_{inner_index}",
                    options=[ trust_level.name for trust_level in avaiable_trust_levels ],
                    selected=current_trust_level.name,
                    styles=[ "inline" ]
                )

                trust_ui.addEmpty()
                trust_ui.addEmpty()

        return result