changeset 3933:cecf45416403

plugin XEP-0373 and XEP-0374: Implementation of OX and OXIM: GPGME is used as the GPG provider. rel 374
author Syndace <me@syndace.dev>
date Tue, 20 Sep 2022 16:22:18 +0200
parents 7af29260ecb8
children e345d93fb6e5
files sat/plugins/plugin_xep_0060.py sat/plugins/plugin_xep_0373.py sat/plugins/plugin_xep_0374.py sat/plugins/plugin_xep_0384.py sat/plugins/plugin_xep_0420.py sat/tools/xmpp_datetime.py setup.py tests/unit/test_plugin_xep_0082.py tests/unit/test_plugin_xep_0373.py tests/unit/test_plugin_xep_0420.py
diffstat 10 files changed, 2710 insertions(+), 34 deletions(-) [+]
line wrap: on
line diff
--- a/sat/plugins/plugin_xep_0060.py	Mon Oct 10 15:23:59 2022 +0200
+++ b/sat/plugins/plugin_xep_0060.py	Tue Sep 20 16:22:18 2022 +0200
@@ -893,14 +893,20 @@
             client, jid.JID(service_s) if service_s else None, nodeIdentifier, options
         )
 
-    def createNode(self, client, service, nodeIdentifier=None, options=None):
+    def createNode(
+        self,
+        client: SatXMPPClient,
+        service: jid.JID,
+        nodeIdentifier: Optional[str] = None,
+        options: Optional[Dict[str, str]] = None
+    ) -> str:
         """Create a new node
 
-        @param service(jid.JID): PubSub service,
-        @param NodeIdentifier(unicode, None): node name
-           use None to create instant node (identifier will be returned by this method)
-        @param option(dict[unicode, unicode], None): node configuration options
-        @return (unicode): identifier of the created node (may be different from requested name)
+        @param service: PubSub service,
+        @param NodeIdentifier: node name use None to create instant node (identifier will
+            be returned by this method)
+        @param option: node configuration options
+        @return: identifier of the created node (may be different from requested name)
         """
         # TODO: if pubsub service doesn't hande publish-options, configure it in a second time
         return client.pubsub_client.createNode(service, nodeIdentifier, options)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat/plugins/plugin_xep_0373.py	Tue Sep 20 16:22:18 2022 +0200
@@ -0,0 +1,2085 @@
+#!/usr/bin/env python3
+
+# Libervia plugin for OpenPGP for XMPP
+# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from abc import ABC, abstractmethod
+import base64
+from datetime import datetime, timezone
+import enum
+import secrets
+import string
+from typing import Any, Dict, Iterable, List, Literal, Optional, Set, Tuple, cast
+from xml.sax.saxutils import quoteattr
+
+from typing_extensions import Final, NamedTuple, Never, assert_never
+from wokkel import muc, pubsub
+from wokkel.disco import DiscoFeature, DiscoInfo
+import xmlschema
+
+from sat.core import exceptions
+from sat.core.constants import Const as C
+from sat.core.core_types import SatXMPPEntity
+from sat.core.i18n import _, D_
+from sat.core.log import getLogger, Logger
+from sat.core.sat_main import SAT
+from sat.core.xmpp import SatXMPPClient
+from sat.memory import persistent
+from sat.plugins.plugin_xep_0045 import XEP_0045
+from sat.plugins.plugin_xep_0060 import XEP_0060
+from sat.plugins.plugin_xep_0163 import XEP_0163
+from sat.tools.xmpp_datetime import format_datetime, parse_datetime
+from sat.tools import xml_tools
+from twisted.internet import defer
+from twisted.words.protocols.jabber import jid
+from twisted.words.xish import domish
+
+try:
+    import gpg
+except ImportError as import_error:
+    raise exceptions.MissingModule(
+        "You are missing the 'gpg' package required by the OX plugin. The recommended"
+        " installation method is via your operating system's package manager, since the"
+        " version of the library has to match the version of your GnuPG installation. See"
+        " https://wiki.python.org/moin/GnuPrivacyGuard#Accessing_GnuPG_via_gpgme"
+    ) from import_error
+
+
+__all__ = [  # pylint: disable=unused-variable
+    "PLUGIN_INFO",
+    "NS_OX",
+    "XEP_0373",
+    "VerificationError",
+    "XMPPInteractionFailed",
+    "InvalidPacket",
+    "DecryptionFailed",
+    "VerificationFailed",
+    "UnknownKey",
+    "GPGProviderError",
+    "GPGPublicKey",
+    "GPGSecretKey",
+    "GPGProvider",
+    "PublicKeyMetadata",
+    "gpg_provider",
+    "TrustLevel"
+]
+
+
+log = cast(Logger, getLogger(__name__))  # type: ignore[no-untyped-call]
+
+
+PLUGIN_INFO = {
+    C.PI_NAME: "XEP-0373",
+    C.PI_IMPORT_NAME: "XEP-0373",
+    C.PI_TYPE: "SEC",
+    C.PI_PROTOCOLS: [ "XEP-0373" ],
+    C.PI_DEPENDENCIES: [ "XEP-0060", "XEP-0163" ],
+    C.PI_RECOMMENDATIONS: [],
+    C.PI_MAIN: "XEP_0373",
+    C.PI_HANDLER: "no",
+    C.PI_DESCRIPTION: D_("Implementation of OpenPGP for XMPP"),
+}
+
+
+NS_OX: Final = "urn:xmpp:openpgp:0"
+
+
+PARAM_CATEGORY = "Security"
+PARAM_NAME = "ox_policy"
+
+
+class VerificationError(Exception):
+    """
+    Raised by verifying methods of :class:`XEP_0373` on semantical verification errors.
+    """
+
+
+class XMPPInteractionFailed(Exception):
+    """
+    Raised by methods of :class:`XEP_0373` on XMPP interaction failure. The reason this
+    exception exists is that the exceptions raised by XMPP interactions are not properly
+    documented for the most part, thus all exceptions are caught and wrapped in instances
+    of this class.
+    """
+
+
+class InvalidPacket(ValueError):
+    """
+    Raised by methods of :class:`GPGProvider` when an invalid packet is encountered.
+    """
+
+
+class DecryptionFailed(Exception):
+    """
+    Raised by methods of :class:`GPGProvider` on decryption failures.
+    """
+
+
+class VerificationFailed(Exception):
+    """
+    Raised by methods of :class:`GPGProvider` on verification failures.
+    """
+
+
+class UnknownKey(ValueError):
+    """
+    Raised by methods of :class:`GPGProvider` when an unknown key is referenced.
+    """
+
+
+class GPGProviderError(Exception):
+    """
+    Raised by methods of :class:`GPGProvider` on internal errors.
+    """
+
+
+class GPGPublicKey(ABC):
+    """
+    Interface describing a GPG public key.
+    """
+
+    @property
+    @abstractmethod
+    def fingerprint(self) -> str:
+        """
+        @return: The OpenPGP v4 fingerprint string of this public key.
+        """
+
+
+class GPGSecretKey(ABC):
+    """
+    Interface descibing a GPG secret key.
+    """
+
+    @property
+    @abstractmethod
+    def public_key(self) -> GPGPublicKey:
+        """
+        @return: The public key corresponding to this secret key.
+        """
+
+
+class GPGProvider(ABC):
+    """
+    Interface describing a GPG provider, i.e. a library or framework providing GPG
+    encryption, signing and key management.
+
+    All methods may raise :class:`GPGProviderError` in addition to those exception types
+    listed explicitly.
+
+    # TODO: Check keys for revoked, disabled and expired everywhere and exclude those (?)
+    """
+
+    @abstractmethod
+    def export_public_key(self, public_key: GPGPublicKey) -> bytes:
+        """Export a public key in a key material packet according to RFC 4880 §5.5.
+
+        Do not use OpenPGP's ASCII Armor.
+
+        @param public_key: The public key to export.
+        @return: The packet containing the exported public key.
+        @raise UnknownKey: if the public key is not available.
+        """
+
+    @abstractmethod
+    def import_public_key(self, packet: bytes) -> GPGPublicKey:
+        """Import a public key from a key material packet according to RFC 4880 §5.5.
+
+        OpenPGP's ASCII Armor is not used.
+
+        @param packet: A packet containing an exported public key.
+        @return: The public key imported from the packet.
+        @raise InvalidPacket: if the packet is either syntactically or semantically deemed
+            invalid.
+
+        @warning: Only packets of version 4 or higher may be accepted, packets below
+            version 4 MUST be rejected.
+        """
+
+    @abstractmethod
+    def backup_secret_key(self, secret_key: GPGSecretKey) -> bytes:
+        """Export a secret key for transfer according to RFC 4880 §11.1.
+
+        Do not encrypt the secret data, i.e. set the octet indicating string-to-key usage
+        conventions to zero in the corresponding secret-key packet according to RFC 4880
+        §5.5.3. Do not use OpenPGP's ASCII Armor.
+
+        @param secret_key: The secret key to export.
+        @return: The binary blob containing the exported secret key.
+        @raise UnknownKey: if the secret key is not available.
+        """
+
+    @abstractmethod
+    def restore_secret_keys(self, data: bytes) -> Set[GPGSecretKey]:
+        """Restore secret keys exported for transfer according to RFC 4880 §11.1.
+
+        The secret data is not encrypted, i.e. the octet indicating string-to-key usage
+        conventions in the corresponding secret-key packets according to RFC 4880 §5.5.3
+        are set to zero. OpenPGP's ASCII Armor is not used.
+
+        @param data: Concatenation of one or more secret keys exported for transfer.
+        @return: The secret keys imported from the data.
+        @raise InvalidPacket: if the data or one of the packets included in the data is
+            either syntactically or semantically deemed invalid.
+
+        @warning: Only packets of version 4 or higher may be accepted, packets below
+            version 4 MUST be rejected.
+        """
+
+    @abstractmethod
+    def encrypt_symmetrically(self, plaintext: bytes, password: str) -> bytes:
+        """Encrypt data symmetrically according to RFC 4880 §5.3.
+
+        The password is used to build a Symmetric-Key Encrypted Session Key packet which
+        precedes the Symmetrically Encrypted Data packet that holds the encrypted data.
+
+        @param plaintext: The data to encrypt.
+        @param password: The password to encrypt the data with.
+        @return: The encrypted data.
+        """
+
+    @abstractmethod
+    def decrypt_symmetrically(self, ciphertext: bytes, password: str) -> bytes:
+        """Decrypt data symmetrically according to RFC 4880 §5.3.
+
+        The ciphertext consists of a Symmetrically Encrypted Data packet that holds the
+        encrypted data, preceded by a Symmetric-Key Encrypted Session Key packet using the
+        password.
+
+        @param ciphertext: The ciphertext.
+        @param password: The password to decrypt the data with.
+        @return: The plaintext.
+        @raise DecryptionFailed: on decryption failure.
+        """
+
+    @abstractmethod
+    def sign(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes:
+        """Sign some data.
+
+        OpenPGP's ASCII Armor is not used.
+
+        @param data: The data to sign.
+        @param secret_keys: The secret keys to sign the data with.
+        @return: The OpenPGP message carrying the signed data.
+        """
+
+    @abstractmethod
+    def sign_detached(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes:
+        """Sign some data. Create the signature detached from the data.
+
+        OpenPGP's ASCII Armor is not used.
+
+        @param data: The data to sign.
+        @param secret_keys: The secret keys to sign the data with.
+        @return: The OpenPGP message carrying the detached signature.
+        """
+
+    @abstractmethod
+    def verify(self, signed_data: bytes, public_keys: Set[GPGPublicKey]) -> bytes:
+        """Verify signed data.
+
+        OpenPGP's ASCII Armor is not used.
+
+        @param signed_data: The signed data as an OpenPGP message.
+        @param public_keys: The public keys to verify the signature with.
+        @return: The verified and unpacked data.
+        @raise VerificationFailed: if the data could not be verified.
+
+        @warning: For implementors: it has to be confirmed that a valid signature by one
+            of the public keys is available.
+        """
+
+    @abstractmethod
+    def verify_detached(
+        self,
+        data: bytes,
+        signature: bytes,
+        public_keys: Set[GPGPublicKey]
+    ) -> None:
+        """Verify signed data, where the signature was created detached from the data.
+
+        OpenPGP's ASCII Armor is not used.
+
+        @param data: The data.
+        @param signature: The signature as an OpenPGP message.
+        @param public_keys: The public keys to verify the signature with.
+        @raise VerificationFailed: if the data could not be verified.
+
+        @warning: For implementors: it has to be confirmed that a valid signature by one
+            of the public keys is available.
+        """
+
+    @abstractmethod
+    def encrypt(
+        self,
+        plaintext: bytes,
+        public_keys: Set[GPGPublicKey],
+        signing_keys: Optional[Set[GPGSecretKey]] = None
+    ) -> bytes:
+        """Encrypt and optionally sign some data.
+
+        OpenPGP's ASCII Armor is not used.
+
+        @param plaintext: The data to encrypt and optionally sign.
+        @param public_keys: The public keys to encrypt the data for.
+        @param signing_keys: The secret keys to sign the data with.
+        @return: The OpenPGP message carrying the encrypted and optionally signed data.
+        """
+
+    @abstractmethod
+    def decrypt(
+        self,
+        ciphertext: bytes,
+        secret_keys: Set[GPGSecretKey],
+        public_keys: Optional[Set[GPGPublicKey]] = None
+    ) -> bytes:
+        """Decrypt and optionally verify some data.
+
+        OpenPGP's ASCII Armor is not used.
+
+        @param ciphertext: The encrypted and optionally signed data as an OpenPGP message.
+        @param secret_keys: The secret keys to attempt decryption with.
+        @param public_keys: The public keys to verify the optional signature with.
+        @return: The decrypted, optionally verified and unpacked data.
+        @raise DecryptionFailed: on decryption failure.
+        @raise VerificationFailed: if the data could not be verified.
+
+        @warning: For implementors: it has to be confirmed that the data was decrypted
+            using one of the secret keys and that a valid signature by one of the public
+            keys is available in case the data is signed.
+        """
+
+    @abstractmethod
+    def list_public_keys(self, user_id: str) -> Set[GPGPublicKey]:
+        """List public keys.
+
+        @param user_id: The user id.
+        @return: The set of public keys available for this user id.
+        """
+
+    @abstractmethod
+    def list_secret_keys(self, user_id: str) -> Set[GPGSecretKey]:
+        """List secret keys.
+
+        @param user_id: The user id.
+        @return: The set of secret keys available for this user id.
+        """
+
+    @abstractmethod
+    def can_sign(self, public_key: GPGPublicKey) -> bool:
+        """
+        @return: Whether the public key belongs to a key pair capable of signing.
+        """
+
+    @abstractmethod
+    def can_encrypt(self, public_key: GPGPublicKey) -> bool:
+        """
+        @return: Whether the public key belongs to a key pair capable of encryption.
+        """
+
+    @abstractmethod
+    def create_key(self, user_id: str) -> GPGSecretKey:
+        """Create a new GPG key, capable of signing and encryption.
+
+        The key is generated without password protection and without expiration. If a key
+        with the same user id already exists, a new key is created anyway.
+
+        @param user_id: The user id to assign to the new key.
+        @return: The new key.
+        """
+
+
+class GPGME_GPGPublicKey(GPGPublicKey):
+    """
+    GPG public key implementation based on GnuPG Made Easy (GPGME).
+    """
+
+    def __init__(self, key_obj: Any) -> None:
+        """
+        @param key_obj: The GPGME key object.
+        """
+
+        self.__key_obj = key_obj
+
+    @property
+    def fingerprint(self) -> str:
+        return self.__key_obj.fpr
+
+    @property
+    def key_obj(self) -> Any:
+        return self.__key_obj
+
+
+class GPGME_GPGSecretKey(GPGSecretKey):
+    """
+    GPG secret key implementation based on GnuPG Made Easy (GPGME).
+    """
+
+    def __init__(self, public_key: GPGME_GPGPublicKey) -> None:
+        """
+        @param public_key: The public key corresponding to this secret key.
+        """
+
+        self.__public_key = public_key
+
+    @property
+    def public_key(self) -> GPGME_GPGPublicKey:
+        return self.__public_key
+
+
+class GPGME_GPGProvider(GPGProvider):
+    """
+    GPG provider implementation based on GnuPG Made Easy (GPGME).
+    """
+
+    def __init__(self, home_dir: Optional[str] = None) -> None:
+        """
+        @param home_dir: Optional GPG home directory path to use for all operations.
+        """
+
+        self.__home_dir = home_dir
+
+    def export_public_key(self, public_key: GPGPublicKey) -> bytes:
+        assert isinstance(public_key, GPGME_GPGPublicKey)
+
+        pattern = public_key.fingerprint
+
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                result = c.key_export_minimal(pattern)
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+
+            if result is None:
+                raise UnknownKey(f"Public key {pattern} not found.")
+
+            return result
+
+    def import_public_key(self, packet: bytes) -> GPGPublicKey:
+        # TODO
+        # - Reject packets older than version 4
+        # - Check whether it's actually a public key (through packet inspection?)
+
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                result = c.key_import(packet)
+            except gpg.errors.GPGMEError as e:
+                # From looking at the code, `key_import` never raises. The documentation
+                # says it does though, so this is included for future-proofness.
+                raise GPGProviderError("Internal GPGME error") from e
+
+            if not hasattr(result, "considered"):
+                raise InvalidPacket(
+                    f"Data not considered for public key import: {result}"
+                )
+
+            if len(result.imports) != 1:
+                raise InvalidPacket(
+                    "Public key packet does not contain exactly one public key (not"
+                    " counting subkeys)."
+                )
+
+            try:
+                key_obj = c.get_key(result.imports[0].fpr, secret=False)
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+            except gpg.errors.KeyError as e:
+                raise GPGProviderError("Newly imported public key not found") from e
+
+            return GPGME_GPGPublicKey(key_obj)
+
+    def backup_secret_key(self, secret_key: GPGSecretKey) -> bytes:
+        assert isinstance(secret_key, GPGME_GPGSecretKey)
+        # TODO
+        # - Handle password protection/pinentry
+        # - Make sure the key is exported unencrypted
+
+        pattern = secret_key.public_key.fingerprint
+
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                result = c.key_export_secret(pattern)
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+
+            if result is None:
+                raise UnknownKey(f"Secret key {pattern} not found.")
+
+            return result
+
+    def restore_secret_keys(self, data: bytes) -> Set[GPGSecretKey]:
+        # TODO
+        # - Reject packets older than version 4
+        # - Check whether it's actually secret keys (through packet inspection?)
+
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                result = c.key_import(data)
+            except gpg.errors.GPGMEError as e:
+                # From looking at the code, `key_import` never raises. The documentation
+                # says it does though, so this is included for future-proofness.
+                raise GPGProviderError("Internal GPGME error") from e
+
+            if not hasattr(result, "considered"):
+                raise InvalidPacket(
+                    f"Data not considered for secret key import: {result}"
+                )
+
+            if len(result.imports) == 0:
+                raise InvalidPacket("Secret key packet does not contain a secret key.")
+
+            secret_keys = set()
+            for import_status in result.imports:
+                try:
+                    key_obj = c.get_key(import_status.fpr, secret=True)
+                except gpg.errors.GPGMEError as e:
+                    raise GPGProviderError("Internal GPGME error") from e
+                except gpg.errors.KeyError as e:
+                    raise GPGProviderError("Newly imported secret key not found") from e
+
+                secret_keys.add(GPGME_GPGSecretKey(GPGME_GPGPublicKey(key_obj)))
+
+            return secret_keys
+
+    def encrypt_symmetrically(self, plaintext: bytes, password: str) -> bytes:
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                ciphertext, __, __ = c.encrypt(plaintext, passphrase=password, sign=False)
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+
+            return ciphertext
+
+    def decrypt_symmetrically(self, ciphertext: bytes, password: str) -> bytes:
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                plaintext, __, __ = c.decrypt(
+                    ciphertext,
+                    passphrase=password,
+                    verify=False
+                )
+            except gpg.errors.GPGMEError as e:
+                # TODO: Find out what kind of error is raised if the password is wrong and
+                # re-raise it as DecryptionFailed instead.
+                raise GPGProviderError("Internal GPGME error") from e
+            except gpg.UnsupportedAlgorithm as e:
+                raise DecryptionFailed("Unsupported algorithm") from e
+
+            return plaintext
+
+    def sign(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes:
+        signers = []
+        for secret_key in secret_keys:
+            assert isinstance(secret_key, GPGME_GPGSecretKey)
+
+            signers.append(secret_key.public_key.key_obj)
+
+        with gpg.Context(home_dir=self.__home_dir, signers=signers) as c:
+            try:
+                signed_data, __ = c.sign(data)
+            except gpg.error.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+            except gpg.errors.InvalidSigners as e:
+                raise GPGProviderError(
+                    "At least one of the secret keys is invalid for signing"
+                ) from e
+
+            return signed_data
+
+    def sign_detached(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes:
+        signers = []
+        for secret_key in secret_keys:
+            assert isinstance(secret_key, GPGME_GPGSecretKey)
+
+            signers.append(secret_key.public_key.key_obj)
+
+        with gpg.Context(home_dir=self.__home_dir, signers=signers) as c:
+            try:
+                signature, __ = c.sign(data, mode=gpg.constants.sig.mode.DETACH)
+            except gpg.error.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+            except gpg.errors.InvalidSigners as e:
+                raise GPGProviderError(
+                    "At least one of the secret keys is invalid for signing"
+                ) from e
+
+            return signature
+
+    def verify(self, signed_data: bytes, public_keys: Set[GPGPublicKey]) -> bytes:
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                data, result = c.verify(signed_data)
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+            except gpg.errors.BadSignatures as e:
+                raise VerificationFailed("Bad signatures on signed data") from e
+
+            valid_signature_found = False
+            for public_key in public_keys:
+                assert isinstance(public_key, GPGME_GPGPublicKey)
+
+                for subkey in public_key.key_obj.subkeys:
+                    for sig in result.signatures:
+                        if subkey.can_sign and subkey.fpr == sig.fpr:
+                            valid_signature_found = True
+
+            if not valid_signature_found:
+                raise VerificationFailed(
+                    "Data not signed by one of the expected public keys"
+                )
+
+            return data
+
+    def verify_detached(
+        self,
+        data: bytes,
+        signature: bytes,
+        public_keys: Set[GPGPublicKey]
+    ) -> None:
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                __, result = c.verify(data, signature=signature)
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+            except gpg.errors.BadSignatures as e:
+                raise VerificationFailed("Bad signatures on signed data") from e
+
+            valid_signature_found = False
+            for public_key in public_keys:
+                assert isinstance(public_key, GPGME_GPGPublicKey)
+
+                for subkey in public_key.key_obj.subkeys:
+                    for sig in result.signatures:
+                        if subkey.can_sign and subkey.fpr == sig.fpr:
+                            valid_signature_found = True
+
+            if not valid_signature_found:
+                raise VerificationFailed(
+                    "Data not signed by one of the expected public keys"
+                )
+
+    def encrypt(
+        self,
+        plaintext: bytes,
+        public_keys: Set[GPGPublicKey],
+        signing_keys: Optional[Set[GPGSecretKey]] = None
+    ) -> bytes:
+        recipients = []
+        for public_key in public_keys:
+            assert isinstance(public_key, GPGME_GPGPublicKey)
+
+            recipients.append(public_key.key_obj)
+
+        signers = []
+        if signing_keys is not None:
+            for secret_key in signing_keys:
+                assert isinstance(secret_key, GPGME_GPGSecretKey)
+
+                signers.append(secret_key.public_key.key_obj)
+
+        sign = signing_keys is not None
+
+        with gpg.Context(home_dir=self.__home_dir, signers=signers) as c:
+            try:
+                ciphertext, __, __ = c.encrypt(
+                    plaintext,
+                    recipients=recipients,
+                    sign=sign,
+                    always_trust=True,
+                    add_encrypt_to=True
+                )
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+            except gpg.errors.InvalidRecipients as e:
+                raise GPGProviderError(
+                    "At least one of the public keys is invalid for encryption"
+                ) from e
+            except gpg.errors.InvalidSigners as e:
+                raise GPGProviderError(
+                    "At least one of the signing keys is invalid for signing"
+                ) from e
+
+            return ciphertext
+
+    def decrypt(
+        self,
+        ciphertext: bytes,
+        secret_keys: Set[GPGSecretKey],
+        public_keys: Optional[Set[GPGPublicKey]] = None
+    ) -> bytes:
+        verify = public_keys is not None
+
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                plaintext, result, verify_result = c.decrypt(
+                    ciphertext,
+                    verify=verify
+                )
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+            except gpg.UnsupportedAlgorithm as e:
+                raise DecryptionFailed("Unsupported algorithm") from e
+
+            # TODO: Check whether the data was decrypted using one of the expected secret
+            # keys
+
+            if public_keys is not None:
+                valid_signature_found = False
+                for public_key in public_keys:
+                    assert isinstance(public_key, GPGME_GPGPublicKey)
+
+                    for subkey in public_key.key_obj.subkeys:
+                        for sig in verify_result.signatures:
+                            if subkey.can_sign and subkey.fpr == sig.fpr:
+                                valid_signature_found = True
+
+                if not valid_signature_found:
+                    raise VerificationFailed(
+                        "Data not signed by one of the expected public keys"
+                    )
+
+            return plaintext
+
+    def list_public_keys(self, user_id: str) -> Set[GPGPublicKey]:
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                return {
+                    GPGME_GPGPublicKey(key)
+                    for key
+                    in c.keylist(pattern=user_id, secret=False)
+                }
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+
+    def list_secret_keys(self, user_id: str) -> Set[GPGSecretKey]:
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                return {
+                    GPGME_GPGSecretKey(GPGME_GPGPublicKey(key))
+                    for key
+                    in c.keylist(pattern=user_id, secret=True)
+                }
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+
+    def can_sign(self, public_key: GPGPublicKey) -> bool:
+        assert isinstance(public_key, GPGME_GPGPublicKey)
+
+        return any(subkey.can_sign for subkey in public_key.key_obj.subkeys)
+
+    def can_encrypt(self, public_key: GPGPublicKey) -> bool:
+        assert isinstance(public_key, GPGME_GPGPublicKey)
+
+        return any(subkey.can_encrypt for subkey in public_key.key_obj.subkeys)
+
+    def create_key(self, user_id: str) -> GPGSecretKey:
+        with gpg.Context(home_dir=self.__home_dir) as c:
+            try:
+                result = c.create_key(
+                    user_id,
+                    expires=False,
+                    sign=True,
+                    encrypt=True,
+                    certify=False,
+                    authenticate=False,
+                    force=True
+                )
+
+                key_obj = c.get_key(result.fpr, secret=True)
+            except gpg.errors.GPGMEError as e:
+                raise GPGProviderError("Internal GPGME error") from e
+            except gpg.errors.KeyError as e:
+                raise GPGProviderError("Newly created key not found") from e
+
+            return GPGME_GPGSecretKey(GPGME_GPGPublicKey(key_obj))
+
+
+class PublicKeyMetadata(NamedTuple):
+    """
+    Metadata about a published public key.
+    """
+
+    fingerprint: str
+    timestamp: datetime
+
+
+@enum.unique
+class TrustLevel(enum.Enum):
+    """
+    The trust levels required for BTBV and manual trust.
+    """
+
+    TRUSTED: str = "TRUSTED"
+    BLINDLY_TRUSTED: str = "BLINDLY_TRUSTED"
+    UNDECIDED: str = "UNDECIDED"
+    DISTRUSTED: str = "DISTRUSTED"
+
+
+OPENPGP_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?>
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
+    targetNamespace="urn:xmpp:openpgp:0"
+    xmlns="urn:xmpp:openpgp:0">
+
+    <xs:element name="openpgp" type="xs:base64Binary"/>
+</xs:schema>
+""")
+
+
+# The following schema needs verion 1.1 of XML Schema, which is not supported by lxml.
+# Luckily, xmlschema exists, which is a clean, well maintained, cross-platform
+# implementation of XML Schema, including version 1.1.
+CONTENT_SCHEMA = xmlschema.XMLSchema11("""<?xml version="1.1" encoding="utf8"?>
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
+    targetNamespace="urn:xmpp:openpgp:0"
+    xmlns="urn:xmpp:openpgp:0">
+
+    <xs:element name="signcrypt">
+        <xs:complexType>
+            <xs:all>
+                <xs:element ref="to" maxOccurs="unbounded"/>
+                <xs:element ref="time"/>
+                <xs:element ref="rpad" minOccurs="0"/>
+                <xs:element ref="payload"/>
+            </xs:all>
+        </xs:complexType>
+    </xs:element>
+
+    <xs:element name="sign">
+        <xs:complexType>
+            <xs:all>
+                <xs:element ref="to" maxOccurs="unbounded"/>
+                <xs:element ref="time"/>
+                <xs:element ref="rpad" minOccurs="0"/>
+                <xs:element ref="payload"/>
+            </xs:all>
+        </xs:complexType>
+    </xs:element>
+
+    <xs:element name="crypt">
+        <xs:complexType>
+            <xs:all>
+                <xs:element ref="to" minOccurs="0" maxOccurs="unbounded"/>
+                <xs:element ref="time"/>
+                <xs:element ref="rpad" minOccurs="0"/>
+                <xs:element ref="payload"/>
+            </xs:all>
+        </xs:complexType>
+    </xs:element>
+
+    <xs:element name="to">
+        <xs:complexType>
+            <xs:attribute name="jid" type="xs:string"/>
+        </xs:complexType>
+    </xs:element>
+
+    <xs:element name="time">
+        <xs:complexType>
+            <xs:attribute name="stamp" type="xs:dateTime"/>
+        </xs:complexType>
+    </xs:element>
+
+    <xs:element name="rpad" type="xs:string"/>
+
+    <xs:element name="payload">
+        <xs:complexType>
+            <xs:sequence>
+                <xs:any minOccurs="0" maxOccurs="unbounded" processContents="skip"/>
+            </xs:sequence>
+        </xs:complexType>
+    </xs:element>
+</xs:schema>
+""")
+
+
+PUBLIC_KEYS_LIST_NODE = "urn:xmpp:openpgp:0:public-keys"
+PUBLIC_KEYS_LIST_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?>
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
+    targetNamespace="urn:xmpp:openpgp:0"
+    xmlns="urn:xmpp:openpgp:0">
+
+    <xs:element name="public-keys-list">
+        <xs:complexType>
+            <xs:sequence>
+                <xs:element ref="pubkey-metadata" minOccurs="0" maxOccurs="unbounded"/>
+            </xs:sequence>
+        </xs:complexType>
+    </xs:element>
+
+    <xs:element name="pubkey-metadata">
+        <xs:complexType>
+            <xs:attribute name="v4-fingerprint" type="xs:string"/>
+            <xs:attribute name="date" type="xs:dateTime"/>
+        </xs:complexType>
+    </xs:element>
+</xs:schema>
+""")
+
+
+PUBKEY_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?>
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
+    targetNamespace="urn:xmpp:openpgp:0"
+    xmlns="urn:xmpp:openpgp:0">
+
+    <xs:element name="pubkey">
+        <xs:complexType>
+            <xs:all>
+                <xs:element ref="data"/>
+            </xs:all>
+            <xs:anyAttribute processContents="skip"/>
+        </xs:complexType>
+    </xs:element>
+
+    <xs:element name="data" type="xs:base64Binary"/>
+</xs:schema>
+""")
+
+
+SECRETKEY_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?>
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
+    targetNamespace="urn:xmpp:openpgp:0"
+    xmlns="urn:xmpp:openpgp:0">
+
+    <xs:element name="secretkey" type="xs:base64Binary"/>
+</xs:schema>
+""")
+
+
+DEFAULT_TRUST_MODEL_PARAM = f"""
+<params>
+<individual>
+<category name="{PARAM_CATEGORY}" label={quoteattr(D_('Security'))}>
+    <param name="{PARAM_NAME}"
+        label={quoteattr(D_('OMEMO default trust policy'))}
+        type="list" security="3">
+        <option value="manual" label={quoteattr(D_('Manual trust (more secure)'))} />
+        <option value="btbv"
+            label={quoteattr(D_('Blind Trust Before Verification (more user friendly)'))}
+            selected="true" />
+    </param>
+</category>
+</individual>
+</params>
+"""
+
+
+def get_gpg_provider(sat: SAT, client: SatXMPPClient) -> GPGProvider:
+    """Get the GPG provider for a client.
+
+    @param sat: The SAT instance.
+    @param client: The client.
+    @return: The GPG provider specifically for that client.
+    """
+
+    return GPGME_GPGProvider(str(sat.get_local_path(client, "gnupg-home")))
+
+
+def generate_passphrase() -> str:
+    """Generate a secure passphrase for symmetric encryption.
+
+    @return: The passphrase.
+    """
+
+    return "-".join("".join(
+        secrets.choice("123456789ABCDEFGHIJKLMNPQRSTUVWXYZ") for __ in range(4)
+    ) for __ in range(6))
+
+
+# TODO: Handle the user id mess
+class XEP_0373:
+    """
+    Implementation of XEP-0373: OpenPGP for XMPP under namespace ``urn:xmpp:openpgp:0``.
+    """
+
+    def __init__(self, sat: SAT) -> None:
+        """
+        @param sat: The SAT instance.
+        """
+
+        self.__sat = sat
+
+        # Add configuration option to choose between manual trust and BTBV as the trust
+        # model
+        sat.memory.updateParams(DEFAULT_TRUST_MODEL_PARAM)
+
+        self.__xep_0045 = cast(Optional[XEP_0045], sat.plugins.get("XEP-0045"))
+        self.__xep_0060 = cast(XEP_0060, sat.plugins["XEP-0060"])
+
+        self.__storage: Dict[str, persistent.LazyPersistentBinaryDict] = {}
+
+        xep_0163 = cast(XEP_0163, sat.plugins["XEP-0163"])
+        xep_0163.addPEPEvent(
+            "OX_PUBLIC_KEYS_LIST",
+            PUBLIC_KEYS_LIST_NODE,
+            lambda items_event, profile: defer.ensureDeferred(
+                self.__on_public_keys_list_update(items_event, profile)
+            )
+        )
+
+    async def profileConnected(  # pylint: disable=invalid-name
+        self,
+        client: SatXMPPClient
+    ) -> None:
+        """
+        @param client: The client.
+        """
+
+        profile = cast(str, client.profile)
+
+        if not profile in self.__storage:
+            self.__storage[profile] = \
+                persistent.LazyPersistentBinaryDict("XEP-0373", client.profile)
+
+        if len(self.list_secret_keys(client)) == 0:
+            log.debug(f"Generating first GPG key for {client.jid.userhost()}.")
+            await self.create_key(client)
+
+    async def __on_public_keys_list_update(
+        self,
+        items_event: pubsub.ItemsEvent,
+        profile: str
+    ) -> None:
+        """Handle public keys list updates fired by PEP.
+
+        @param items_event: The event.
+        @param profile: The profile this event belongs to.
+        """
+
+        client = self.__sat.getClient(profile)
+
+        sender = cast(jid.JID, items_event.sender)
+        items = cast(List[domish.Element], items_event.items)
+
+        if len(items) > 1:
+            log.warning("Ignoring public keys list update with more than one element.")
+            return
+
+        item_elt = next(iter(items), None)
+        if item_elt is None:
+            log.debug("Ignoring empty public keys list update.")
+            return
+
+        public_keys_list_elt = cast(
+            Optional[domish.Element],
+            next(item_elt.elements(NS_OX, "public-keys-list"), None)
+        )
+
+        pubkey_metadata_elts: Optional[List[domish.Element]] = None
+
+        if public_keys_list_elt is not None:
+            try:
+                PUBLIC_KEYS_LIST_SCHEMA.validate(public_keys_list_elt.toXml())
+            except xmlschema.XMLSchemaValidationError:
+                pass
+            else:
+                pubkey_metadata_elts = \
+                    list(public_keys_list_elt.elements(NS_OX, "pubkey-metadata"))
+
+        if pubkey_metadata_elts is None:
+            log.warning(f"Malformed public keys list update item: {item_elt.toXml()}")
+            return
+
+        new_public_keys_metadata = { PublicKeyMetadata(
+            fingerprint=cast(str, pubkey_metadata_elt["v4-fingerprint"]),
+            timestamp=parse_datetime(cast(str, pubkey_metadata_elt["date"]))
+        ) for pubkey_metadata_elt in pubkey_metadata_elts }
+
+        storage_key = f"/public-keys-metadata/{sender.userhost()}"
+
+        local_public_keys_metadata = cast(
+            Set[PublicKeyMetadata],
+            await self.__storage[profile].get(storage_key, set())
+        )
+
+        unchanged_keys = new_public_keys_metadata & local_public_keys_metadata
+        changed_or_new_keys = new_public_keys_metadata - unchanged_keys
+        available_keys = self.list_public_keys(client, sender)
+
+        for key_metadata in changed_or_new_keys:
+            # Check whether the changed or new key has been imported before
+            if any(key.fingerprint == key_metadata.fingerprint for key in available_keys):
+                try:
+                    # If it has been imported before, try to update it
+                    await self.import_public_key(client, sender, key_metadata.fingerprint)
+                except Exception as e:
+                    log.warning(f"Public key import failed: {e}")
+
+                    # If the update fails, remove the key from the local metadata list
+                    # such that the update is attempted again next time
+                    new_public_keys_metadata.remove(key_metadata)
+
+        # Check whether this update was for our account and make sure all of our keys are
+        # included in the update
+        if sender.userhost() == client.jid.userhost():
+            secret_keys = self.list_secret_keys(client)
+            missing_keys = set(filter(lambda secret_key: all(
+                key_metadata.fingerprint != secret_key.public_key.fingerprint
+                for key_metadata
+                in new_public_keys_metadata
+            ), secret_keys))
+
+            if len(missing_keys) > 0:
+                log.warning(
+                    "Public keys list update did not contain at least one of our keys."
+                    f" {new_public_keys_metadata}"
+                )
+
+                for missing_key in missing_keys:
+                    log.warning(missing_key.public_key.fingerprint)
+                    new_public_keys_metadata.add(PublicKeyMetadata(
+                        fingerprint=missing_key.public_key.fingerprint,
+                        timestamp=datetime.now(timezone.utc)
+                    ))
+
+                await self.publish_public_keys_list(client, new_public_keys_metadata)
+
+        await self.__storage[profile].force(storage_key, new_public_keys_metadata)
+
+    def list_public_keys(self, client: SatXMPPClient, jid: jid.JID) -> Set[GPGPublicKey]:
+        """List GPG public keys available for a JID.
+
+        @param client: The client to perform this operation with.
+        @param jid: The JID. Can be a bare JID.
+        @return: The set of public keys available for this JID.
+        """
+
+        gpg_provider = get_gpg_provider(self.__sat, client)
+
+        return gpg_provider.list_public_keys(f"xmpp:{jid.userhost()}")
+
+    def list_secret_keys(self, client: SatXMPPClient) -> Set[GPGSecretKey]:
+        """List GPG secret keys available for a JID.
+
+        @param client: The client to perform this operation with.
+        @return: The set of secret keys available for this JID.
+        """
+
+        gpg_provider = get_gpg_provider(self.__sat, client)
+
+        return gpg_provider.list_secret_keys(f"xmpp:{client.jid.userhost()}")
+
+    async def create_key(self, client: SatXMPPClient) -> GPGSecretKey:
+        """Create a new GPG key, capable of signing and encryption.
+
+        The key is generated without password protection and without expiration.
+
+        @param client: The client to perform this operation with.
+        @return: The new key.
+        """
+
+        gpg_provider = get_gpg_provider(self.__sat, client)
+
+        secret_key = gpg_provider.create_key(f"xmpp:{client.jid.userhost()}")
+
+        await self.publish_public_key(client, secret_key.public_key)
+
+        storage_key = f"/public-keys-metadata/{client.jid.userhost()}"
+
+        public_keys_list = cast(
+            Set[PublicKeyMetadata],
+            await self.__storage[client.profile].get(storage_key, set())
+        )
+
+        public_keys_list.add(PublicKeyMetadata(
+            fingerprint=secret_key.public_key.fingerprint,
+            timestamp=datetime.now(timezone.utc)
+        ))
+
+        await self.publish_public_keys_list(client, public_keys_list)
+
+        await self.__storage[client.profile].force(storage_key, public_keys_list)
+
+        return secret_key
+
+    @staticmethod
+    def __build_content_element(
+        element_name: Literal["signcrypt", "sign", "crypt"],
+        recipient_jids: Iterable[jid.JID],
+        include_rpad: bool
+    ) -> Tuple[domish.Element, domish.Element]:
+        """Build a content element.
+
+        @param element_name: The name of the content element.
+        @param recipient_jids: The intended recipients of this content element. Can be
+            bare JIDs.
+        @param include_rpad: Whether to include random-length random-content padding.
+        @return: The content element and the ``<payload/>`` element to add the stanza
+            extension elements to.
+        """
+
+        content_elt = domish.Element((NS_OX, element_name))
+
+        for recipient_jid in recipient_jids:
+            content_elt.addElement("to")["jid"] = recipient_jid.userhost()
+
+        content_elt.addElement("time")["stamp"] = format_datetime()
+
+        if include_rpad:
+            # XEP-0373 doesn't specify bounds for the length of the random padding. This
+            # uses the bounds specified in XEP-0420 for the closely related rpad affix.
+            rpad_length = secrets.randbelow(201)
+            rpad_content = "".join(
+                secrets.choice(string.digits + string.ascii_letters + string.punctuation)
+                for __
+                in range(rpad_length)
+            )
+            content_elt.addElement("rpad", content=rpad_content)
+
+        payload_elt = content_elt.addElement("payload")
+
+        return content_elt, payload_elt
+
+    @staticmethod
+    def build_signcrypt_element(
+        recipient_jids: Iterable[jid.JID]
+    ) -> Tuple[domish.Element, domish.Element]:
+        """Build a ``<signcrypt/>`` content element.
+
+        @param recipient_jids: The intended recipients of this content element. Can be
+            bare JIDs.
+        @return: The ``<signcrypt/>`` element and the ``<payload/>`` element to add the
+            stanza extension elements to.
+        """
+
+        if len(recipient_jids) == 0:
+            raise ValueError("Recipient JIDs must be provided.")
+
+        return XEP_0373.__build_content_element("signcrypt", recipient_jids, True)
+
+    @staticmethod
+    def build_sign_element(
+        recipient_jids: Iterable[jid.JID],
+        include_rpad: bool
+    ) -> Tuple[domish.Element, domish.Element]:
+        """Build a ``<sign/>`` content element.
+
+        @param recipient_jids: The intended recipients of this content element. Can be
+            bare JIDs.
+        @param include_rpad: Whether to include random-length random-content padding,
+            which is OPTIONAL for the ``<sign/>`` content element.
+        @return: The ``<sign/>`` element and the ``<payload/>`` element to add the stanza
+            extension elements to.
+        """
+
+        if len(recipient_jids) == 0:
+            raise ValueError("Recipient JIDs must be provided.")
+
+        return XEP_0373.__build_content_element("sign", recipient_jids, include_rpad)
+
+    @staticmethod
+    def build_crypt_element(
+        recipient_jids: Iterable[jid.JID]
+    ) -> Tuple[domish.Element, domish.Element]:
+        """Build a ``<crypt/>`` content element.
+
+        @param recipient_jids: The intended recipients of this content element. Specifying
+            the intended recipients is OPTIONAL for the ``<crypt/>`` content element. Can
+            be bare JIDs.
+        @return: The ``<crypt/>`` element and the ``<payload/>`` element to add the stanza
+            extension elements to.
+        """
+
+        return XEP_0373.__build_content_element("crypt", recipient_jids, True)
+
+    async def build_openpgp_element(
+        self,
+        client: SatXMPPClient,
+        content_elt: domish.Element,
+        recipient_jids: Set[jid.JID]
+    ) -> domish.Element:
+        """Build an ``<openpgp/>`` element.
+
+        @param client: The client to perform this operation with.
+        @param content_elt: The content element to contain in the ``<openpgp/>`` element.
+        @param recipient_jids: The recipient's JIDs. Can be bare JIDs.
+        @return: The ``<openpgp/>`` element.
+        """
+
+        gpg_provider = get_gpg_provider(self.__sat, client)
+
+        # TODO: I'm not sure whether we want to sign with all keys by default or choose
+        # just one key/a subset of keys to sign with.
+        signing_keys = set(filter(
+            lambda secret_key: gpg_provider.can_sign(secret_key.public_key),
+            self.list_secret_keys(client)
+        ))
+
+        encryption_keys: Set[GPGPublicKey] = set()
+
+        for recipient_jid in recipient_jids:
+            # Import all keys of the recipient
+            all_public_keys = await self.import_all_public_keys(client, recipient_jid)
+
+            # Filter for keys that can encrypt
+            encryption_keys |= set(filter(gpg_provider.can_encrypt, all_public_keys))
+
+        # TODO: Handle trust
+
+        content = content_elt.toXml().encode("utf-8")
+        data: bytes
+
+        if content_elt.name == "signcrypt":
+            data = gpg_provider.encrypt(content, encryption_keys, signing_keys)
+        elif content_elt.name == "sign":
+            data = gpg_provider.sign(content, signing_keys)
+        elif content_elt.name == "crypt":
+            data = gpg_provider.encrypt(content, encryption_keys)
+        else:
+            raise ValueError(f"Unknown content element <{content_elt.name}/>")
+
+        openpgp_elt = domish.Element((NS_OX, "openpgp"))
+        openpgp_elt.addContent(base64.b64encode(data).decode("ASCII"))
+        return openpgp_elt
+
+    async def unpack_openpgp_element(
+        self,
+        client: SatXMPPClient,
+        openpgp_elt: domish.Element,
+        element_name: Literal["signcrypt", "sign", "crypt"],
+        sender_jid: jid.JID
+    ) -> Tuple[domish.Element, datetime]:
+        """Verify, decrypt and unpack an ``<openpgp/>`` element.
+
+        @param client: The client to perform this operation with.
+        @param openpgp_elt: The ``<openpgp/>`` element.
+        @param element_name: The name of the content element.
+        @param sender_jid: The sender's JID. Can be a bare JID.
+        @return: The ``<payload/>`` element containing the decrypted/verified stanza
+            extension elements carried by this ``<openpgp/>`` element, and the timestamp
+            contained in the content element.
+        @raise exceptions.ParsingError: on syntactical verification errors.
+        @raise VerificationError: on semantical verification errors accoding to XEP-0373.
+        @raise DecryptionFailed: on decryption failure.
+        @raise VerificationFailed: if the data could not be verified.
+
+        @warning: The timestamp is not verified for plausibility; this SHOULD be done by
+            the calling code.
+        """
+
+        gpg_provider = get_gpg_provider(self.__sat, client)
+
+        decryption_keys = set(filter(
+            lambda secret_key: gpg_provider.can_encrypt(secret_key.public_key),
+            self.list_secret_keys(client)
+        ))
+
+        # Import all keys of the sender
+        all_public_keys = await self.import_all_public_keys(client, sender_jid)
+
+        # Filter for keys that can sign
+        verification_keys = set(filter(gpg_provider.can_sign, all_public_keys))
+
+        # TODO: Handle trust
+
+        try:
+            OPENPGP_SCHEMA.validate(openpgp_elt.toXml())
+        except xmlschema.XMLSchemaValidationError as e:
+            raise exceptions.ParsingError(
+                "<openpgp/> element doesn't pass schema validation."
+            ) from e
+
+        openpgp_message = base64.b64decode(str(openpgp_elt))
+        content: bytes
+
+        if element_name == "signcrypt":
+            content = gpg_provider.decrypt(
+                openpgp_message,
+                decryption_keys,
+                public_keys=verification_keys
+            )
+        elif element_name == "sign":
+            content = gpg_provider.verify(openpgp_message, verification_keys)
+        elif element_name == "crypt":
+            content = gpg_provider.decrypt(openpgp_message, decryption_keys)
+        else:
+            assert_never(element_name)
+
+        try:
+            content_elt = cast(
+                domish.Element,
+                xml_tools.ElementParser()(content.decode("utf-8"))
+            )
+        except UnicodeDecodeError as e:
+            raise exceptions.ParsingError("UTF-8 decoding error") from e
+
+        try:
+            CONTENT_SCHEMA.validate(content_elt.toXml())
+        except xmlschema.XMLSchemaValidationError as e:
+            raise exceptions.ParsingError(
+                f"<{element_name}/> element doesn't pass schema validation."
+            ) from e
+
+        if content_elt.name != element_name:
+            raise exceptions.ParsingError(f"Not a <{element_name}/> element.")
+
+        recipient_jids = \
+            { jid.JID(to_elt["jid"]) for to_elt in content_elt.elements(NS_OX, "to") }
+
+        if (
+            client.jid.userhostJID() not in { jid.userhostJID() for jid in recipient_jids }
+            and element_name != "crypt"
+        ):
+            raise VerificationError(
+                f"Recipient list in <{element_name}/> element does not list our (bare)"
+                f" JID."
+            )
+
+        time_elt = next(content_elt.elements(NS_OX, "time"))
+
+        timestamp = parse_datetime(time_elt["stamp"])
+
+        payload_elt = next(content_elt.elements(NS_OX, "payload"))
+
+        return payload_elt, timestamp
+
+    async def publish_public_key(
+        self,
+        client: SatXMPPClient,
+        public_key: GPGPublicKey
+    ) -> None:
+        """Publish a public key.
+
+        @param client: The client.
+        @param public_key: The public key to publish.
+        @raise XMPPInteractionFailed: if any interaction via XMPP failed.
+        """
+
+        gpg_provider = get_gpg_provider(self.__sat, client)
+
+        packet = gpg_provider.export_public_key(public_key)
+
+        node = f"urn:xmpp:openpgp:0:public-keys:{public_key.fingerprint}"
+
+        pubkey_elt = domish.Element((NS_OX, "pubkey"))
+
+        pubkey_elt.addElement("data", content=base64.b64encode(packet).decode("ASCII"))
+
+        try:
+            await self.__xep_0060.sendItem(
+                client,
+                client.jid.userhostJID(),
+                node,
+                pubkey_elt,
+                format_datetime(),
+                extra={
+                    XEP_0060.EXTRA_PUBLISH_OPTIONS: {
+                        XEP_0060.OPT_PERSIST_ITEMS: "true",
+                        XEP_0060.OPT_ACCESS_MODEL: "open",
+                        XEP_0060.OPT_MAX_ITEMS: 1
+                    },
+                    # TODO: Do we really want publish_without_options here?
+                    XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options"
+                }
+            )
+        except Exception as e:
+            raise XMPPInteractionFailed("Publishing the public key failed.") from e
+
+    async def import_all_public_keys(
+        self,
+        client: SatXMPPClient,
+        jid: jid.JID
+    ) -> Set[GPGPublicKey]:
+        """Import all public keys of a JID that have not been imported before.
+
+        @param client: The client.
+        @param jid: The JID. Can be a bare JID.
+        @return: The public keys.
+        @note: Failure to import a key simply results in the key not being included in the
+            result.
+        """
+
+        available_public_keys = self.list_public_keys(client, jid)
+
+        storage_key = f"/public-keys-metadata/{jid.userhost()}"
+
+        public_keys_metadata = cast(
+            Set[PublicKeyMetadata],
+            await self.__storage[client.profile].get(storage_key, set())
+        )
+
+        missing_keys = set(filter(lambda public_key_metadata: all(
+            public_key_metadata.fingerprint != public_key.fingerprint
+            for public_key
+            in available_public_keys
+        ), public_keys_metadata))
+
+        for missing_key in missing_keys:
+            try:
+                available_public_keys.add(
+                    await self.import_public_key(client, jid, missing_key.fingerprint)
+                )
+            except Exception as e:
+                log.warning(
+                    f"Import of public key {missing_key.fingerprint} owned by"
+                    f" {jid.userhost()} failed, ignoring: {e}"
+                )
+
+        return available_public_keys
+
+    async def import_public_key(
+        self,
+        client: SatXMPPClient,
+        jid: jid.JID,
+        fingerprint: str
+    ) -> GPGPublicKey:
+        """Import a public key.
+
+        @param client: The client.
+        @param jid: The JID owning the public key. Can be a bare JID.
+        @param fingerprint: The fingerprint of the public key.
+        @return: The public key.
+        @raise exceptions.NotFound: if the public key was not found.
+        @raise exceptions.ParsingError: on XML-level parsing errors.
+        @raise InvalidPacket: if the packet is either syntactically or semantically deemed
+            invalid.
+        @raise XMPPInteractionFailed: if any interaction via XMPP failed.
+        """
+
+        gpg_provider = get_gpg_provider(self.__sat, client)
+
+        node = f"urn:xmpp:openpgp:0:public-keys:{fingerprint}"
+
+        try:
+            items, __ = await self.__xep_0060.getItems(
+                client,
+                jid.userhostJID(),
+                node,
+                max_items=1
+            )
+        except exceptions.NotFound as e:
+            raise exceptions.NotFound(
+                f"No public key with fingerprint {fingerprint} published by JID"
+                f" {jid.userhost()}."
+            ) from e
+        except Exception as e:
+            raise XMPPInteractionFailed("Fetching the public keys list failed.") from e
+
+        try:
+            item_elt = cast(domish.Element, items[0])
+        except IndexError as e:
+            raise exceptions.NotFound(
+                f"No public key with fingerprint {fingerprint} published by JID"
+                f" {jid.userhost()}."
+            ) from e
+
+        pubkey_elt = cast(
+            Optional[domish.Element],
+            next(item_elt.elements(NS_OX, "pubkey"), None)
+        )
+
+        if pubkey_elt is None:
+            raise exceptions.ParsingError(
+                f"Publish-Subscribe item of JID {jid.userhost()} doesn't contain pubkey"
+                f" element."
+            )
+
+        try:
+            PUBKEY_SCHEMA.validate(pubkey_elt.toXml())
+        except xmlschema.XMLSchemaValidationError as e:
+            raise exceptions.ParsingError(
+                f"Publish-Subscribe item of JID {jid.userhost()} doesn't pass pubkey"
+                f" schema validation."
+            ) from e
+
+        public_key = gpg_provider.import_public_key(base64.b64decode(str(
+            next(pubkey_elt.elements(NS_OX, "data"))
+        )))
+
+        return public_key
+
+    async def publish_public_keys_list(
+        self,
+        client: SatXMPPClient,
+        public_keys_list: Iterable[PublicKeyMetadata]
+    ) -> None:
+        """Publish/update the own public keys list.
+
+        @param client: The client.
+        @param public_keys_list: The public keys list.
+        @raise XMPPInteractionFailed: if any interaction via XMPP failed.
+
+        @warning: All public keys referenced in the public keys list MUST be published
+            beforehand.
+        """
+
+        if len({ pkm.fingerprint for pkm in public_keys_list }) != len(public_keys_list):
+            raise ValueError("Public keys list contains duplicate fingerprints.")
+
+        node = "urn:xmpp:openpgp:0:public-keys"
+
+        public_keys_list_elt = domish.Element((NS_OX, "public-keys-list"))
+
+        for public_key_metadata in public_keys_list:
+            pubkey_metadata_elt = public_keys_list_elt.addElement("pubkey-metadata")
+            pubkey_metadata_elt["v4-fingerprint"] = public_key_metadata.fingerprint
+            pubkey_metadata_elt["date"] = format_datetime(public_key_metadata.timestamp)
+
+        try:
+            await self.__xep_0060.sendItem(
+                client,
+                client.jid.userhostJID(),
+                node,
+                public_keys_list_elt,
+                item_id=XEP_0060.ID_SINGLETON,
+                extra={
+                    XEP_0060.EXTRA_PUBLISH_OPTIONS: {
+                        XEP_0060.OPT_PERSIST_ITEMS: "true",
+                        XEP_0060.OPT_ACCESS_MODEL: "open",
+                        XEP_0060.OPT_MAX_ITEMS: 1
+                    },
+                    # TODO: Do we really want publish_without_options here?
+                    XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options"
+                }
+            )
+        except Exception as e:
+            raise XMPPInteractionFailed("Publishing the public keys list failed.") from e
+
+    async def download_public_keys_list(
+        self,
+        client: SatXMPPClient,
+        jid: jid.JID
+    ) -> Optional[Set[PublicKeyMetadata]]:
+        """Download the public keys list of a JID.
+
+        @param client: The client.
+        @param jid: The JID. Can be a bare JID.
+        @return: The public keys list or ``None`` if the JID hasn't published a public
+            keys list. An empty list means the JID has published an empty list.
+        @raise exceptions.ParsingError: on XML-level parsing errors.
+        @raise XMPPInteractionFailed: if any interaction via XMPP failed.
+        """
+
+        node = "urn:xmpp:openpgp:0:public-keys"
+
+        try:
+            items, __ = await self.__xep_0060.getItems(
+                client,
+                jid.userhostJID(),
+                node,
+                max_items=1
+            )
+        except exceptions.NotFound:
+            return None
+        except Exception as e:
+            raise XMPPInteractionFailed() from e
+
+        try:
+            item_elt = cast(domish.Element, items[0])
+        except IndexError:
+            return None
+
+        public_keys_list_elt = cast(
+            Optional[domish.Element],
+            next(item_elt.elements(NS_OX, "public-keys-list"), None)
+        )
+
+        if public_keys_list_elt is None:
+            return None
+
+        try:
+            PUBLIC_KEYS_LIST_SCHEMA.validate(public_keys_list_elt.toXml())
+        except xmlschema.XMLSchemaValidationError as e:
+            raise exceptions.ParsingError(
+                f"Publish-Subscribe item of JID {jid.userhost()} doesn't pass public keys"
+                f" list schema validation."
+            ) from e
+
+        return {
+            PublicKeyMetadata(
+                fingerprint=pubkey_metadata_elt["v4-fingerprint"],
+                timestamp=parse_datetime(pubkey_metadata_elt["date"])
+            )
+            for pubkey_metadata_elt
+            in public_keys_list_elt.elements(NS_OX, "pubkey-metadata")
+        }
+
+    async def __prepare_secret_key_synchronization(
+        self,
+        client: SatXMPPClient
+    ) -> Optional[domish.Element]:
+        """Prepare for secret key synchronization.
+
+        Makes sure the relative protocols and protocol extensions are supported by the
+        server and makes sure that the PEP node for secret synchronization exists and is
+        configured correctly. The node is created if necessary.
+
+        @param client: The client.
+        @return: As part of the preparations, the secret key synchronization PEP node is
+            fetched. The result of that fetch is returned here.
+        @raise exceptions.FeatureNotFound: if the server lacks support for the required
+            protocols or protocol extensions.
+        @raise XMPPInteractionFailed: if any interaction via XMPP failed.
+        """
+
+        try:
+            infos = cast(DiscoInfo, await self.__sat.memory.disco.getInfos(
+                client,
+                client.jid.userhostJID()
+            ))
+        except Exception as e:
+            raise XMPPInteractionFailed(
+                "Error performing service discovery on the own bare JID."
+            ) from e
+
+        identities = cast(Dict[Tuple[str, str], str], infos.identities)
+        features = cast(Set[DiscoFeature], infos.features)
+
+        if ("pubsub", "pep") not in identities:
+            raise exceptions.FeatureNotFound("Server doesn't support PEP.")
+
+        if "http://jabber.org/protocol/pubsub#access-whitelist" not in features:
+            raise exceptions.FeatureNotFound(
+                "Server doesn't support the whitelist access model."
+            )
+
+        persistent_items_supported = \
+            "http://jabber.org/protocol/pubsub#persistent-items" in features
+
+        # TODO: persistent-items is a SHOULD, how do we handle the feature missing?
+
+        node = "urn:xmpp:openpgp:0:secret-key"
+
+        try:
+            items, __ = await self.__xep_0060.getItems(
+                client,
+                client.jid.userhostJID(),
+                node,
+                max_items=1
+            )
+        except exceptions.NotFound:
+            try:
+                await self.__xep_0060.createNode(
+                    client,
+                    client.jid.userhostJID(),
+                    node,
+                    {
+                        XEP_0060.OPT_PERSIST_ITEMS: "true",
+                        XEP_0060.OPT_ACCESS_MODEL: "whitelist",
+                        XEP_0060.OPT_MAX_ITEMS: "1"
+                    }
+                )
+            except Exception as e:
+                raise XMPPInteractionFailed(
+                    "Error creating the secret key synchronization node."
+                ) from e
+        except Exception as e:
+            raise XMPPInteractionFailed(
+                "Error fetching the secret key synchronization node."
+            ) from e
+
+        try:
+            return cast(domish.Element, items[0])
+        except IndexError:
+            return None
+
+    async def export_secret_keys(
+        self,
+        client: SatXMPPClient,
+        secret_keys: Iterable[GPGSecretKey]
+    ) -> str:
+        """Export secret keys to synchronize them with other devices.
+
+        @param client: The client.
+        @param secret_keys: The secret keys to export.
+        @return: The backup code needed to decrypt the exported secret keys.
+        @raise exceptions.FeatureNotFound: if the server lacks support for the required
+            protocols or protocol extensions.
+        @raise XMPPInteractionFailed: if any interaction via XMPP failed.
+        """
+
+        gpg_provider = get_gpg_provider(self.__sat, client)
+
+        await self.__prepare_secret_key_synchronization(client)
+
+        backup_code = generate_passphrase()
+
+        plaintext = b"".join(
+            gpg_provider.backup_secret_key(secret_key) for secret_key in secret_keys
+        )
+
+        ciphertext = gpg_provider.encrypt_symmetrically(plaintext, backup_code)
+
+        node = "urn:xmpp:openpgp:0:secret-key"
+
+        secretkey_elt = domish.Element((NS_OX, "secretkey"))
+        secretkey_elt.addContent(base64.b64encode(ciphertext).decode("ASCII"))
+
+        try:
+            await self.__xep_0060.sendItem(
+                client,
+                client.jid.userhostJID(),
+                node,
+                secretkey_elt
+            )
+        except Exception as e:
+            raise XMPPInteractionFailed("Publishing the secret keys failed.") from e
+
+        return backup_code
+
+    async def download_secret_keys(self, client: SatXMPPClient) -> Optional[bytes]:
+        """Download previously exported secret keys to import them in a second step.
+
+        The downloading and importing steps are separate since a backup code is required
+        for the import and it should be possible to try multiple backup codes without
+        redownloading the data every time. The second half of the import procedure is
+        provided by :meth:`import_secret_keys`.
+
+        @param client: The client.
+        @return: The encrypted secret keys previously exported, if any.
+        @raise exceptions.FeatureNotFound: if the server lacks support for the required
+            protocols or protocol extensions.
+        @raise exceptions.ParsingError: on XML-level parsing errors.
+        @raise XMPPInteractionFailed: if any interaction via XMPP failed.
+        """
+
+        item_elt = await self.__prepare_secret_key_synchronization(client)
+        if item_elt is None:
+            return None
+
+        secretkey_elt = cast(
+            Optional[domish.Element],
+            next(item_elt.elements(NS_OX, "secretkey"), None)
+        )
+
+        if secretkey_elt is None:
+            return None
+
+        try:
+            SECRETKEY_SCHEMA.validate(secretkey_elt.toXml())
+        except xmlschema.XMLSchemaValidationError as e:
+            raise exceptions.ParsingError(
+                "Publish-Subscribe item doesn't pass secretkey schema validation."
+            ) from e
+
+        return base64.b64decode(str(secretkey_elt))
+
+    def import_secret_keys(
+        self,
+        client: SatXMPPClient,
+        ciphertext: bytes,
+        backup_code: str
+    ) -> Set[GPGSecretKey]:
+        """Import previously downloaded secret keys.
+
+        The downloading and importing steps are separate since a backup code is required
+        for the import and it should be possible to try multiple backup codes without
+        redownloading the data every time. The first half of the import procedure is
+        provided by :meth:`download_secret_keys`.
+
+        @param client: The client to perform this operation with.
+        @param ciphertext: The ciphertext, i.e. the data returned by
+            :meth:`download_secret_keys`.
+        @param backup_code: The backup code needed to decrypt the data.
+        @raise InvalidPacket: if one of the GPG packets building the secret key data is
+            either syntactically or semantically deemed invalid.
+        @raise DecryptionFailed: on decryption failure.
+        """
+
+        gpg_provider = get_gpg_provider(self.__sat, client)
+
+        return gpg_provider.restore_secret_keys(gpg_provider.decrypt_symmetrically(
+            ciphertext,
+            backup_code
+        ))
+
+    @staticmethod
+    def __get_joined_muc_users(
+        client: SatXMPPClient,
+        xep_0045: XEP_0045,
+        room_jid: jid.JID
+    ) -> Set[jid.JID]:
+        """
+        @param client: The client.
+        @param xep_0045: A MUC plugin instance.
+        @param room_jid: The room JID.
+        @return: A set containing the bare JIDs of the MUC participants.
+        @raise InternalError: if the MUC is not joined or the entity information of a
+            participant isn't available.
+        """
+        # TODO: This should probably be a global helper somewhere
+
+        bare_jids: Set[jid.JID] = set()
+
+        try:
+            room = cast(muc.Room, xep_0045.getRoom(client, room_jid))
+        except exceptions.NotFound as e:
+            raise exceptions.InternalError(
+                "Participant list of unjoined MUC requested."
+            ) from e
+
+        for user in cast(Dict[str, muc.User], room.roster).values():
+            entity = cast(Optional[SatXMPPEntity], user.entity)
+            if entity is None:
+                raise exceptions.InternalError(
+                    f"Participant list of MUC requested, but the entity information of"
+                    f" the participant {user} is not available."
+                )
+
+            bare_jids.add(entity.jid.userhostJID())
+
+        return bare_jids
+
+    async def get_trust(
+        self,
+        client: SatXMPPClient,
+        public_key: GPGPublicKey,
+        owner: jid.JID
+    ) -> TrustLevel:
+        """Query the trust level of a public key.
+
+        @param client: The client to perform this operation under.
+        @param public_key: The public key.
+        @param owner: The owner of the public key. Can be a bare JID.
+        @return: The trust level.
+        """
+
+        key = f"/trust/{owner.userhost()}/{public_key.fingerprint}"
+
+        try:
+            return TrustLevel(await self.__storage[client.profile][key])
+        except KeyError:
+            return TrustLevel.UNDECIDED
+
+    async def set_trust(
+        self,
+        client: SatXMPPClient,
+        public_key: GPGPublicKey,
+        owner: jid.JID,
+        trust_level: TrustLevel
+    ) -> None:
+        """Set the trust level of a public key.
+
+        @param client: The client to perform this operation under.
+        @param public_key: The public key.
+        @param owner: The owner of the public key. Can be a bare JID.
+        @param trust_leve: The trust level.
+        """
+
+        key = f"/trust/{owner.userhost()}/{public_key.fingerprint}"
+
+        await self.__storage[client.profile].force(key, trust_level.name)
+
+    async def getTrustUI(  # pylint: disable=invalid-name
+        self,
+        client: SatXMPPClient,
+        entity: jid.JID
+    ) -> xml_tools.XMLUI:
+        """
+        @param client: The client.
+        @param entity: The entity whose device trust levels to manage.
+        @return: An XMLUI instance which opens a form to manage the trust level of all
+            devices belonging to the entity.
+        """
+
+        if entity.resource:
+            raise ValueError("A bare JID is expected.")
+
+        bare_jids: Set[jid.JID]
+        if self.__xep_0045 is not None and self.__xep_0045.isJoinedRoom(client, entity):
+            bare_jids = self.__get_joined_muc_users(client, self.__xep_0045, entity)
+        else:
+            bare_jids = { entity.userhostJID() }
+
+        all_public_keys = list({
+            bare_jid: list(self.list_public_keys(client, bare_jid))
+            for bare_jid
+            in bare_jids
+        }.items())
+
+        async def callback(
+            data: Any,
+            profile: str  # pylint: disable=unused-argument
+        ) -> Dict[Never, Never]:
+            """
+            @param data: The XMLUI result produces by the trust UI form.
+            @param profile: The profile.
+            @return: An empty dictionary. The type of the return value was chosen
+                conservatively since the exact options are neither known not needed here.
+            """
+
+            if C.bool(data.get("cancelled", "false")):
+                return {}
+
+            data_form_result = cast(
+                Dict[str, str],
+                xml_tools.XMLUIResult2DataFormResult(data)
+            )
+            for key, value in data_form_result.items():
+                if not key.startswith("trust_"):
+                    continue
+
+                outer_index, inner_index = key.split("_")[1:]
+
+                owner, public_keys = all_public_keys[int(outer_index)]
+                public_key = public_keys[int(inner_index)]
+                trust = TrustLevel(value)
+
+                if (await self.get_trust(client, public_key, owner)) is not trust:
+                    await self.set_trust(client, public_key, owner, value)
+
+            return {}
+
+        submit_id = self.__sat.registerCallback(callback, with_data=True, one_shot=True)
+
+        result = xml_tools.XMLUI(
+            panel_type=C.XMLUI_FORM,
+            title=D_("OX trust management"),
+            submit_id=submit_id
+        )
+        # Casting this to Any, otherwise all calls on the variable cause type errors
+        # pylint: disable=no-member
+        trust_ui = cast(Any, result)
+        trust_ui.addText(D_(
+            "This is OX trusting system. You'll see below the GPG keys of your "
+            "contacts, and a list selection to trust them or not. A trusted key "
+            "can read your messages in plain text, so be sure to only validate "
+            "keys that you are sure are belonging to your contact. It's better "
+            "to do this when you are next to your contact, so "
+            "you can check the \"fingerprint\" of the key "
+            "yourself. Do *not* validate a key if the fingerprint is wrong!"
+        ))
+
+        own_secret_keys = self.list_secret_keys(client)
+
+        trust_ui.changeContainer("label")
+        for index, secret_key in enumerate(own_secret_keys):
+            trust_ui.addLabel(D_(f"Own secret key {index} fingerprint"))
+            trust_ui.addText(secret_key.public_key.fingerprint)
+            trust_ui.addEmpty()
+            trust_ui.addEmpty()
+
+        for outer_index, [ owner, public_keys ] in enumerate(all_public_keys):
+            for inner_index, public_key in enumerate(public_keys):
+                trust_ui.addLabel(D_("Contact"))
+                trust_ui.addJid(jid.JID(owner))
+                trust_ui.addLabel(D_("Fingerprint"))
+                trust_ui.addText(public_key.fingerprint)
+                trust_ui.addLabel(D_("Trust this device?"))
+
+                current_trust_level = await self.get_trust(client, public_key, owner)
+                avaiable_trust_levels = \
+                    { TrustLevel.DISTRUSTED, TrustLevel.TRUSTED, current_trust_level }
+
+                trust_ui.addList(
+                    f"trust_{outer_index}_{inner_index}",
+                    options=[ trust_level.name for trust_level in avaiable_trust_levels ],
+                    selected=current_trust_level.name,
+                    styles=[ "inline" ]
+                )
+
+                trust_ui.addEmpty()
+                trust_ui.addEmpty()
+
+        return result
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat/plugins/plugin_xep_0374.py	Tue Sep 20 16:22:18 2022 +0200
@@ -0,0 +1,421 @@
+#!/usr/bin/env python3
+
+# Libervia plugin for OpenPGP for XMPP Instant Messaging
+# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from typing import Dict, Optional, Set, cast
+
+from typing_extensions import Final
+from wokkel import muc  # type: ignore[import]
+
+from sat.core import exceptions
+from sat.core.constants import Const as C
+from sat.core.core_types import SatXMPPEntity
+from sat.core.i18n import _, D_
+from sat.core.log import getLogger, Logger
+from sat.core.sat_main import SAT
+from sat.core.xmpp import SatXMPPClient
+from sat.plugins.plugin_xep_0045 import XEP_0045
+from sat.plugins.plugin_xep_0334 import XEP_0334
+from sat.plugins.plugin_xep_0373 import NS_OX, XEP_0373, TrustLevel
+from sat.tools import xml_tools
+from twisted.internet import defer
+from twisted.words.protocols.jabber import jid
+from twisted.words.xish import domish
+
+
+__all__ = [  # pylint: disable=unused-variable
+    "PLUGIN_INFO",
+    "XEP_0374",
+    "NS_OXIM"
+]
+
+
+log = cast(Logger, getLogger(__name__))  # type: ignore[no-untyped-call]
+
+
+PLUGIN_INFO = {
+    C.PI_NAME: "OXIM",
+    C.PI_IMPORT_NAME: "XEP-0374",
+    C.PI_TYPE: "SEC",
+    C.PI_PROTOCOLS: [ "XEP-0374" ],
+    C.PI_DEPENDENCIES: [ "XEP-0334", "XEP-0373" ],
+    C.PI_RECOMMENDATIONS: [ "XEP-0045" ],
+    C.PI_MAIN: "XEP_0374",
+    C.PI_HANDLER: "no",
+    C.PI_DESCRIPTION: _("""Implementation of OXIM"""),
+}
+
+
+# The disco feature
+NS_OXIM: Final = "urn:xmpp:openpgp:im:0"
+
+
+class XEP_0374:
+    """
+    Plugin equipping Libervia with OXIM capabilities under the ``urn:xmpp:openpgp:im:0``
+    namespace. MUC messages are supported next to one to one messages. For trust
+    management, the two trust models "BTBV" and "manual" are supported.
+    """
+
+    def __init__(self, sat: SAT) -> None:
+        """
+        @param sat: The SAT instance.
+        """
+
+        self.__sat = sat
+
+        # Plugins
+        self.__xep_0045 = cast(Optional[XEP_0045], sat.plugins.get("XEP-0045"))
+        self.__xep_0334 = cast(XEP_0334, sat.plugins["XEP-0334"])
+        self.__xep_0373 = cast(XEP_0373, sat.plugins["XEP-0373"])
+
+        # Triggers
+        sat.trigger.add(
+            "messageReceived",
+            self.__message_received_trigger,
+            priority=100050
+        )
+        sat.trigger.add("send", self.__send_trigger, priority=0)
+
+        # Register the encryption plugin
+        sat.registerEncryptionPlugin(self, "OXIM", NS_OX, 102)
+
+    async def getTrustUI(  # pylint: disable=invalid-name
+        self,
+        client: SatXMPPClient,
+        entity: jid.JID
+    ) -> xml_tools.XMLUI:
+        """
+        @param client: The client.
+        @param entity: The entity whose device trust levels to manage.
+        @return: An XMLUI instance which opens a form to manage the trust level of all
+            devices belonging to the entity.
+        """
+
+        return await self.__xep_0373.getTrustUI(client, entity)
+
+    @staticmethod
+    def __get_joined_muc_users(
+        client: SatXMPPClient,
+        xep_0045: XEP_0045,
+        room_jid: jid.JID
+    ) -> Set[jid.JID]:
+        """
+        @param client: The client.
+        @param xep_0045: A MUC plugin instance.
+        @param room_jid: The room JID.
+        @return: A set containing the bare JIDs of the MUC participants.
+        @raise InternalError: if the MUC is not joined or the entity information of a
+            participant isn't available.
+        """
+
+        bare_jids: Set[jid.JID] = set()
+
+        try:
+            room = cast(muc.Room, xep_0045.getRoom(client, room_jid))
+        except exceptions.NotFound as e:
+            raise exceptions.InternalError(
+                "Participant list of unjoined MUC requested."
+            ) from e
+
+        for user in cast(Dict[str, muc.User], room.roster).values():
+            entity = cast(Optional[SatXMPPEntity], user.entity)
+            if entity is None:
+                raise exceptions.InternalError(
+                    f"Participant list of MUC requested, but the entity information of"
+                    f" the participant {user} is not available."
+                )
+
+            bare_jids.add(entity.jid.userhostJID())
+
+        return bare_jids
+
+    async def __message_received_trigger(
+        self,
+        client: SatXMPPClient,
+        message_elt: domish.Element,
+        post_treat: defer.Deferred
+    ) -> bool:
+        """
+        @param client: The client which received the message.
+        @param message_elt: The message element. Can be modified.
+        @param post_treat: A deferred which evaluates to a :class:`MessageData` once the
+            message has fully progressed through the message receiving flow. Can be used
+            to apply treatments to the fully processed message, like marking it as
+            encrypted.
+        @return: Whether to continue the message received flow.
+        """
+        sender_jid = jid.JID(message_elt["from"])
+        feedback_jid: jid.JID
+
+        message_type = message_elt.getAttribute("type", "unknown")
+        is_muc_message = message_type == C.MESS_TYPE_GROUPCHAT
+        if is_muc_message:
+            if self.__xep_0045 is None:
+                log.warning(
+                    "Ignoring MUC message since plugin XEP-0045 is not available."
+                )
+                # Can't handle a MUC message without XEP-0045, let the flow continue
+                # normally
+                return True
+
+            room_jid = feedback_jid = sender_jid.userhostJID()
+
+            try:
+                room = cast(muc.Room, self.__xep_0045.getRoom(client, room_jid))
+            except exceptions.NotFound:
+                log.warning(
+                    f"Ignoring MUC message from a room that has not been joined:"
+                    f" {room_jid}"
+                )
+                # Whatever, let the flow continue
+                return True
+
+            sender_user = cast(Optional[muc.User], room.getUser(sender_jid.resource))
+            if sender_user is None:
+                log.warning(
+                    f"Ignoring MUC message from room {room_jid} since the sender's user"
+                    f" wasn't found {sender_jid.resource}"
+                )
+                # Whatever, let the flow continue
+                return True
+
+            sender_user_jid = cast(Optional[jid.JID], sender_user.entity)
+            if sender_user_jid is None:
+                log.warning(
+                    f"Ignoring MUC message from room {room_jid} since the sender's bare"
+                    f" JID couldn't be found from its user information: {sender_user}"
+                )
+                # Whatever, let the flow continue
+                return True
+
+            sender_jid = sender_user_jid
+        else:
+            # I'm not sure why this check is required, this code is copied from XEP-0384
+            if sender_jid.userhostJID() == client.jid.userhostJID():
+                # TODO: I've seen this cause an exception "builtins.KeyError: 'to'", seems
+                # like "to" isn't always set.
+                feedback_jid = jid.JID(message_elt["to"])
+            else:
+                feedback_jid = sender_jid
+
+        sender_bare_jid = sender_jid.userhost()
+
+        openpgp_elt = cast(Optional[domish.Element], next(
+            message_elt.elements(NS_OX, "openpgp"),
+            None
+        ))
+
+        if openpgp_elt is None:
+            # None of our business, let the flow continue
+            return True
+
+        try:
+            payload_elt, timestamp = await self.__xep_0373.unpack_openpgp_element(
+                client,
+                openpgp_elt,
+                "signcrypt",
+                jid.JID(sender_bare_jid)
+            )
+        except Exception as e:
+            # TODO: More specific exception handling
+            log.warning(_("Can't decrypt message: {reason}\n{xml}").format(
+                reason=e,
+                xml=message_elt.toXml()
+            ))
+            client.feedback(
+                feedback_jid,
+                D_(
+                    f"An OXIM message from {sender_jid.full()} can't be decrypted:"
+                    f" {e}"
+                ),
+                { C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR }
+            )
+            # No point in further processing this message
+            return False
+
+        message_elt.children.remove(openpgp_elt)
+
+        log.debug(f"OXIM message of type {message_type} received from {sender_bare_jid}")
+
+        # Remove all body elements from the original element, since those act as
+        # fallbacks in case the encryption protocol is not supported
+        for child in message_elt.elements():
+            if child.name == "body":
+                message_elt.children.remove(child)
+
+        # Move all extension elements from the payload to the stanza root
+        # TODO: There should probably be explicitly forbidden elements here too, just as
+        # for XEP-0420
+        for child in list(payload_elt.elements()):
+            # Remove the child from the content element
+            payload_elt.children.remove(child)
+
+            # Add the child to the stanza
+            message_elt.addChild(child)
+
+        # Mark the message as trusted or untrusted. Undecided counts as untrusted here.
+        trust_level = TrustLevel.UNDECIDED  # TODO: Load the actual trust level
+        if trust_level is TrustLevel.TRUSTED:
+            post_treat.addCallback(client.encryption.markAsTrusted)
+        else:
+            post_treat.addCallback(client.encryption.markAsUntrusted)
+
+        # Mark the message as originally encrypted
+        post_treat.addCallback(
+            client.encryption.markAsEncrypted,
+            namespace=NS_OX
+        )
+
+        # Message processed successfully, continue with the flow
+        return True
+
+    async def __send_trigger(self, client: SatXMPPClient, stanza: domish.Element) -> bool:
+        """
+        @param client: The client sending this message.
+        @param stanza: The stanza that is about to be sent. Can be modified.
+        @return: Whether the send message flow should continue or not.
+        """
+        # OXIM only handles message stanzas
+        if stanza.name != "message":
+            return True
+
+        # Get the intended recipient
+        recipient = stanza.getAttribute("to", None)
+        if recipient is None:
+            raise exceptions.InternalError(
+                f"Message without recipient encountered. Blocking further processing to"
+                f" avoid leaking plaintext data: {stanza.toXml()}"
+            )
+
+        # Parse the JID
+        recipient_bare_jid = jid.JID(recipient).userhostJID()
+
+        # Check whether encryption with OXIM is requested
+        encryption = client.encryption.getSession(recipient_bare_jid)
+
+        if encryption is None:
+            # Encryption is not requested for this recipient
+            return True
+
+        if encryption["plugin"].namespace != NS_OX:
+            # Encryption is requested for this recipient, but not with OXIM
+            return True
+
+        # All pre-checks done, we can start encrypting!
+        await self.__encrypt(
+            client,
+            stanza,
+            recipient_bare_jid,
+            stanza.getAttribute("type", "unkown") == C.MESS_TYPE_GROUPCHAT
+        )
+
+        # Add a store hint if this is a message stanza
+        self.__xep_0334.addHintElements(stanza, [ "store" ])
+
+        # Let the flow continue.
+        return True
+
+    async def __encrypt(
+        self,
+        client: SatXMPPClient,
+        stanza: domish.Element,
+        recipient_jid: jid.JID,
+        is_muc_message: bool
+    ) -> None:
+        """
+        @param client: The client.
+        @param stanza: The stanza, which is modified by this call.
+        @param recipient_jid: The JID of the recipient. Can be a bare (aka "userhost") JID
+            but doesn't have to.
+        @param is_muc_message: Whether the stanza is a message stanza to a MUC room.
+
+        @warning: The calling code MUST take care of adding the store message processing
+            hint to the stanza if applicable! This can be done before or after this call,
+            the order doesn't matter.
+        """
+
+        recipient_bare_jids: Set[jid.JID]
+        feedback_jid: jid.JID
+
+        if is_muc_message:
+            if self.__xep_0045 is None:
+                raise exceptions.InternalError(
+                    "Encryption of MUC message requested, but plugin XEP-0045 is not"
+                    " available."
+                )
+
+            room_jid = feedback_jid = recipient_jid.userhostJID()
+
+            recipient_bare_jids = self.__get_joined_muc_users(
+                client,
+                self.__xep_0045,
+                room_jid
+            )
+        else:
+            recipient_bare_jids = { recipient_jid.userhostJID() }
+            feedback_jid = recipient_jid.userhostJID()
+
+        log.debug(
+            f"Intercepting message that is to be encrypted by {NS_OX} for"
+            f" {recipient_bare_jids}"
+        )
+
+        signcrypt_elt, payload_elt = \
+            self.__xep_0373.build_signcrypt_element(recipient_bare_jids)
+
+        # Move elements from the stanza to the content element.
+        # TODO: There should probably be explicitly forbidden elements here too, just as
+        # for XEP-0420
+        for child in list(stanza.elements()):
+            # Remove the child from the stanza
+            stanza.children.remove(child)
+
+            # A namespace of ``None`` can be used on domish elements to inherit the
+            # namespace from the parent. When moving elements from the stanza root to
+            # the content element, however, we don't want elements to inherit the
+            # namespace of the content element. Thus, check for elements with ``None``
+            # for their namespace and set the namespace to jabber:client, which is the
+            # namespace of the parent element.
+            if child.uri is None:
+                child.uri = C.NS_CLIENT
+                child.defaultUri = C.NS_CLIENT
+
+            # Add the child with corrected namespaces to the content element
+            payload_elt.addChild(child)
+
+        try:
+            openpgp_elt = await self.__xep_0373.build_openpgp_element(
+                client,
+                signcrypt_elt,
+                recipient_bare_jids
+            )
+        except Exception as e:
+            msg = _(
+                # pylint: disable=consider-using-f-string
+                "Can't encrypt message for {entities}: {reason}".format(
+                    entities=', '.join(jid.userhost() for jid in recipient_bare_jids),
+                    reason=e
+                )
+            )
+            log.warning(msg)
+            client.feedback(feedback_jid, msg, {
+                C.MESS_EXTRA_INFO: C.EXTRA_INFO_ENCR_ERR
+            })
+            raise e
+
+        stanza.addChild(openpgp_elt)
--- a/sat/plugins/plugin_xep_0384.py	Mon Oct 10 15:23:59 2022 +0200
+++ b/sat/plugins/plugin_xep_0384.py	Tue Sep 20 16:22:18 2022 +0200
@@ -479,10 +479,10 @@
                         xml_tools.et_elt_2_domish_elt(element),
                         item_id=str(bundle.device_id),
                         extra={
-                            xep_0060.EXTRA_PUBLISH_OPTIONS: {
-                                xep_0060.OPT_MAX_ITEMS: "max"
+                            XEP_0060.EXTRA_PUBLISH_OPTIONS: {
+                                XEP_0060.OPT_MAX_ITEMS: "max"
                             },
-                            xep_0060.EXTRA_ON_PRECOND_NOT_MET: "raise"
+                            XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "raise"
                         }
                     )
                 except (error.StanzaError, Exception) as e:
@@ -519,8 +519,8 @@
                         xml_tools.et_elt_2_domish_elt(element),
                         item_id=xep_0060.ID_SINGLETON,
                         extra={
-                            xep_0060.EXTRA_PUBLISH_OPTIONS: { xep_0060.OPT_MAX_ITEMS: 1 },
-                            xep_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options"
+                            XEP_0060.EXTRA_PUBLISH_OPTIONS: { XEP_0060.OPT_MAX_ITEMS: 1 },
+                            XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options"
                         }
                     )
                 except Exception as e:
@@ -546,7 +546,6 @@
                         client,
                         jid.JID(bare_jid),
                         node,
-                        max_items=None,
                         item_ids=[ str(device_id) ]
                     )
                 except Exception as e:
@@ -653,11 +652,11 @@
                     xml_tools.et_elt_2_domish_elt(element),
                     item_id=xep_0060.ID_SINGLETON,
                     extra={
-                        xep_0060.EXTRA_PUBLISH_OPTIONS: {
-                            xep_0060.OPT_MAX_ITEMS: 1,
-                            xep_0060.OPT_ACCESS_MODEL: "open"
+                        XEP_0060.EXTRA_PUBLISH_OPTIONS: {
+                            XEP_0060.OPT_MAX_ITEMS: 1,
+                            XEP_0060.OPT_ACCESS_MODEL: "open"
                         },
-                        xep_0060.EXTRA_ON_PRECOND_NOT_MET: "raise"
+                        XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "raise"
                     }
                 )
             except (error.StanzaError, Exception) as e:
--- a/sat/plugins/plugin_xep_0420.py	Mon Oct 10 15:23:59 2022 +0200
+++ b/sat/plugins/plugin_xep_0420.py	Tue Sep 20 16:22:18 2022 +0200
@@ -22,8 +22,10 @@
 import secrets
 import string
 from typing import Dict, NamedTuple, Optional, Set, Tuple, cast
+from typing_extensions import Final
 
 from lxml import etree
+from sat.core import exceptions
 
 from sat.core.constants import Const as C
 from sat.core.i18n import D_
@@ -68,7 +70,7 @@
 }
 
 
-NS_SCE = "urn:xmpp:sce:1"
+NS_SCE: Final = "urn:xmpp:sce:1"
 
 
 class ProfileRequirementsNotMet(Exception):
@@ -114,7 +116,8 @@
             remain. Do not modify.
         @return: An affix element to include in the envelope. The element must have the
             name :attr:`element_name` and must validate using :attr:`element_schema`.
-        @raise ValueError: if the affix couldn't be built.
+        @raise ValueError: if the affix couldn't be built due to missing information on
+            the stanza.
         """
 
     @abstractmethod
@@ -384,7 +387,7 @@
             by the decryption scheme utilizing SCE.
         @return: The parsed and processed values of all affixes that were present on the
             envelope, notably including the timestamp.
-        @raise ValueError: if the serialized envelope element is malformed.
+        @raise exceptions.ParsingError: if the serialized envelope element is malformed.
         @raise ProfileRequirementsNotMet: if one or more affixes required by the profile
             are missing from the envelope.
         @raise AffixVerificationFailed: if an affix included in the envelope fails to
@@ -399,7 +402,9 @@
         try:
             envelope_serialized_string = envelope_serialized.decode("utf-8")
         except UnicodeError as e:
-            raise ValueError("Serialized envelope can't bare parsed as utf-8.") from e
+            raise exceptions.ParsingError(
+                "Serialized envelope can't bare parsed as utf-8."
+            ) from e
 
         custom_affixes = set(profile.custom_policies.keys())
 
@@ -420,7 +425,9 @@
         try:
             etree.fromstring(envelope_serialized_string, parser)
         except etree.XMLSyntaxError as e:
-            raise ValueError("Serialized envelope doesn't pass schema validation.") from e
+            raise exceptions.ParsingError(
+                "Serialized envelope doesn't pass schema validation."
+            ) from e
 
         # Prepare the envelope and content elements
         envelope = cast(domish.Element, ElementParser()(envelope_serialized_string))
@@ -452,7 +459,7 @@
             timestamp_value = None if time_element is None else \
                 XEP_0082.parse_datetime(time_element["stamp"])
         except ValueError as e:
-            raise AffixVerificationFailed("Malformed time affix") from e
+            raise AffixVerificationFailed("Malformed time affix.") from e
 
         # The to affix is verified by comparing the to attribute of the stanza with the
         # JID referenced by the affix. Note that only bare JIDs are compared as per the
--- a/sat/tools/xmpp_datetime.py	Mon Oct 10 15:23:59 2022 +0200
+++ b/sat/tools/xmpp_datetime.py	Tue Sep 20 16:22:18 2022 +0200
@@ -80,12 +80,15 @@
     @param value: A string containing date information formatted according to the Date
         profile specified in XEP-0082.
     @return: The date parsed from the input string.
-    @raise ValueError: if the input string is not correctly formatted.
+    @raise exceptions.ParsingError: if the input string is not correctly formatted.
     """
     # CCYY-MM-DD
 
     # The Date profile of XEP-0082 is equal to the ISO 8601 format.
-    return date.fromisoformat(value)
+    try:
+        return date.fromisoformat(value)
+    except ValueError as e:
+        raise exceptions.ParsingError() from e
 
 
 def format_datetime(
@@ -125,13 +128,16 @@
     @param value: A string containing datetime information formatted according to the
         DateTime profile specified in XEP-0082.
     @return: The datetime parsed from the input string.
-    @raise ValueError: if the input string is not correctly formatted.
+    @raise exceptions.ParsingError: if the input string is not correctly formatted.
     """
     # CCYY-MM-DDThh:mm:ss[.sss]TZD
 
     value, microsecond = __parse_fraction_of_a_second(value)
 
-    result = datetime.strptime(value, "%Y-%m-%dT%H:%M:%S%z")
+    try:
+        result = datetime.strptime(value, "%Y-%m-%dT%H:%M:%S%z")
+    except ValueError as e:
+        raise exceptions.ParsingError() from e
 
     if microsecond is not None:
         result = result.replace(microsecond=microsecond)
@@ -167,7 +173,7 @@
     @param value: A string containing time information formatted according to the Time
         profile specified in XEP-0082.
     @return: The time parsed from the input string.
-    @raise ValueError: if the input string is not correctly formatted.
+    @raise exceptions.ParsingError: if the input string is not correctly formatted.
     """
     # hh:mm:ss[.sss][TZD]
 
@@ -177,7 +183,10 @@
     # profile, except that it doesn't handle the letter Z as time zone information for
     # UTC. This can be fixed with a simple string replacement of 'Z' with "+00:00", which
     # is another way to represent UTC.
-    result = time.fromisoformat(value.replace('Z', "+00:00"))
+    try:
+        result = time.fromisoformat(value.replace('Z', "+00:00"))
+    except ValueError as e:
+        raise exceptions.ParsingError() from e
 
     if microsecond is not None:
         result = result.replace(microsecond=microsecond)
--- a/setup.py	Mon Oct 10 15:23:59 2022 +0200
+++ b/setup.py	Tue Sep 20 16:22:18 2022 +0200
@@ -52,13 +52,14 @@
     'urwid-satext == 0.9.*',
     'wokkel >= 18.0.0, < 19.0.0',
     'omemo >= 1.0.0, < 2',
-    'twomemo >= 1.0.0, < 2',
-    'oldmemo >= 1.0.0, < 2',
+    'twomemo[xml] >= 1.0.0, < 2',
+    'oldmemo[xml] >= 1.0.0, < 2',
     'pyyaml < 7.0.0',
     'sqlalchemy >= 1.4',
     'alembic',
     'aiosqlite',
     'txdbus',
+    'xmlschema',
 ]
 
 extras_require = {
--- a/tests/unit/test_plugin_xep_0082.py	Mon Oct 10 15:23:59 2022 +0200
+++ b/tests/unit/test_plugin_xep_0082.py	Tue Sep 20 16:22:18 2022 +0200
@@ -19,6 +19,7 @@
 from datetime import date, datetime, time, timezone
 
 import pytest
+from sat.core import exceptions
 
 from sat.plugins.plugin_xep_0082 import XEP_0082
 
@@ -104,7 +105,7 @@
     assert XEP_0082.parse_datetime("1969-07-20T21:56:15-05:00") == value
 
     # Without timezone, without a fraction of a second
-    with pytest.raises(ValueError):
+    with pytest.raises(exceptions.ParsingError):
         XEP_0082.parse_datetime("1969-07-21T02:56:15")
 
     # With timezone 'Z', with a fraction of a second consisting of two digits
@@ -123,7 +124,7 @@
     assert XEP_0082.parse_datetime("1969-07-20T21:56:15.-05:00") == value
 
     # Without timezone, with a fraction of a second consisting of six digits
-    with pytest.raises(ValueError):
+    with pytest.raises(exceptions.ParsingError):
         XEP_0082.parse_datetime("1969-07-21T02:56:15.123456")
 
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/unit/test_plugin_xep_0373.py	Tue Sep 20 16:22:18 2022 +0200
@@ -0,0 +1,146 @@
+from datetime import datetime, timedelta, timezone
+from sat.plugins.plugin_xep_0373 import XEP_0373, NS_OX
+from sat.tools.xmpp_datetime import parse_datetime
+
+import pytest
+from twisted.words.protocols.jabber import jid
+
+
+a = jid.JID("foo@example.com")
+b = jid.JID("bar@example.com")
+
+
+def test_signcrypt_element_args() -> None:
+    with pytest.raises(ValueError):
+        XEP_0373.build_signcrypt_element([])
+
+
+def test_signcrypt_element() -> None:
+    signcrypt_elt, payload_elt = XEP_0373.build_signcrypt_element([ a, b ])
+    payload_elt.addElement("signcrypt-test-content", content="signcrypt test content")
+
+    rpad_elt = next(signcrypt_elt.elements(NS_OX, "rpad"))
+    time_elt = next(signcrypt_elt.elements(NS_OX, "time"))
+
+    rpad = str(rpad_elt)
+    timestamp = parse_datetime(time_elt["stamp"])
+
+    signcrypt_elt.children.remove(rpad_elt)
+    signcrypt_elt.children.remove(time_elt)
+
+    assert rpad
+    assert (datetime.now(timezone.utc) - timestamp) < timedelta(seconds=10)
+    assert signcrypt_elt.toXml() == (
+        "<signcrypt xmlns='urn:xmpp:openpgp:0'>"
+            "<to jid='foo@example.com'/>"
+            "<to jid='bar@example.com'/>"
+            "<payload>"
+                "<signcrypt-test-content>signcrypt test content</signcrypt-test-content>"
+            "</payload>"
+        "</signcrypt>"
+    )
+
+
+def test_sign_element_args() -> None:
+    with pytest.raises(ValueError):
+        XEP_0373.build_sign_element([], True)
+
+
+def test_sign_element_with_rpad() -> None:
+    sign_elt, payload_elt = XEP_0373.build_sign_element([ a, b ], True)
+    payload_elt.addElement("sign-test-content", content="sign test content")
+
+    rpad_elt = next(sign_elt.elements(NS_OX, "rpad"))
+    time_elt = next(sign_elt.elements(NS_OX, "time"))
+
+    rpad = str(rpad_elt)
+    timestamp = parse_datetime(time_elt["stamp"])
+
+    sign_elt.children.remove(rpad_elt)
+    sign_elt.children.remove(time_elt)
+
+    assert rpad
+    assert (datetime.now(timezone.utc) - timestamp) < timedelta(seconds=10)
+    assert sign_elt.toXml() == (
+        "<sign xmlns='urn:xmpp:openpgp:0'>"
+            "<to jid='foo@example.com'/>"
+            "<to jid='bar@example.com'/>"
+            "<payload>"
+                "<sign-test-content>sign test content</sign-test-content>"
+            "</payload>"
+        "</sign>"
+    )
+
+
+def test_sign_element_without_rpad() -> None:
+    sign_elt, payload_elt = XEP_0373.build_sign_element([ a, b ], False)
+    payload_elt.addElement("sign-test-content", content="sign test content")
+
+    rpad_elt = next(sign_elt.elements(NS_OX, "rpad"), None)
+    time_elt = next(sign_elt.elements(NS_OX, "time"))
+
+    timestamp = parse_datetime(time_elt["stamp"])
+
+    sign_elt.children.remove(time_elt)
+
+    assert rpad_elt is None
+    assert (datetime.now(timezone.utc) - timestamp) < timedelta(seconds=10)
+    assert sign_elt.toXml() == (
+        "<sign xmlns='urn:xmpp:openpgp:0'>"
+            "<to jid='foo@example.com'/>"
+            "<to jid='bar@example.com'/>"
+            "<payload>"
+                "<sign-test-content>sign test content</sign-test-content>"
+            "</payload>"
+        "</sign>"
+    )
+
+
+def test_crypt_element_with_recipients() -> None:
+    crypt_elt, payload_elt = XEP_0373.build_crypt_element([ a, b ])
+    payload_elt.addElement("crypt-test-content", content="crypt test content")
+
+    rpad_elt = next(crypt_elt.elements(NS_OX, "rpad"))
+    time_elt = next(crypt_elt.elements(NS_OX, "time"))
+
+    rpad = str(rpad_elt)
+    timestamp = parse_datetime(time_elt["stamp"])
+
+    crypt_elt.children.remove(rpad_elt)
+    crypt_elt.children.remove(time_elt)
+
+    assert rpad
+    assert (datetime.now(timezone.utc) - timestamp) < timedelta(seconds=10)
+    assert crypt_elt.toXml() == (
+        "<crypt xmlns='urn:xmpp:openpgp:0'>"
+            "<to jid='foo@example.com'/>"
+            "<to jid='bar@example.com'/>"
+            "<payload>"
+                "<crypt-test-content>crypt test content</crypt-test-content>"
+            "</payload>"
+        "</crypt>"
+    )
+
+
+def test_crypt_element_without_recipients() -> None:
+    crypt_elt, payload_elt = XEP_0373.build_crypt_element([])
+    payload_elt.addElement("crypt-test-content", content="crypt test content")
+
+    rpad_elt = next(crypt_elt.elements(NS_OX, "rpad"))
+    time_elt = next(crypt_elt.elements(NS_OX, "time"))
+
+    rpad = str(rpad_elt)
+    timestamp = parse_datetime(time_elt["stamp"])
+
+    crypt_elt.children.remove(rpad_elt)
+    crypt_elt.children.remove(time_elt)
+
+    assert rpad
+    assert (datetime.now(timezone.utc) - timestamp) < timedelta(seconds=10)
+    assert crypt_elt.toXml() == (
+        "<crypt xmlns='urn:xmpp:openpgp:0'>"
+            "<payload>"
+                "<crypt-test-content>crypt test content</crypt-test-content>"
+            "</payload>"
+        "</crypt>"
+    )
--- a/tests/unit/test_plugin_xep_0420.py	Mon Oct 10 15:23:59 2022 +0200
+++ b/tests/unit/test_plugin_xep_0420.py	Tue Sep 20 16:22:18 2022 +0200
@@ -20,6 +20,7 @@
 from typing import Callable, cast
 
 import pytest
+from sat.core import exceptions
 
 from sat.plugins.plugin_xep_0334 import NS_HINTS
 from sat.plugins.plugin_xep_0420 import (
@@ -435,7 +436,7 @@
         </body>
     </message>""")
 
-    with pytest.raises(ValueError):
+    with pytest.raises(exceptions.ParsingError):
         XEP_0420.unpack_stanza(
             unpacking_profile,
             stanza,
@@ -558,5 +559,5 @@
         <unknown-affix unknown-attr="unknown"/>
     </envelope>""".encode("utf-8")
 
-    with pytest.raises(ValueError):
+    with pytest.raises(exceptions.ParsingError):
         XEP_0420.unpack_stanza(profile, stanza, envelope_serialized)