diff libervia/backend/plugins/plugin_xep_0384.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_xep_0384.py@c23cad65ae99
children 040095a5dc7f
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_xep_0384.py	Fri Jun 02 11:49:51 2023 +0200
@@ -0,0 +1,2724 @@
+#!/usr/bin/env python3
+
+# Libervia plugin for OMEMO encryption
+# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+import base64
+from datetime import datetime
+import enum
+import logging
+import time
+from typing import \
+    Any, Dict, FrozenSet, List, Literal, NamedTuple, Optional, Set, Type, Union, cast
+import uuid
+import xml.etree.ElementTree as ET
+from xml.sax.saxutils import quoteattr
+
+from typing_extensions import Final, Never, assert_never
+from wokkel import muc, pubsub  # type: ignore[import]
+import xmlschema
+
+from libervia.backend.core import exceptions
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.core_types import MessageData, SatXMPPEntity
+from libervia.backend.core.i18n import _, D_
+from libervia.backend.core.log import getLogger, Logger
+from libervia.backend.core.sat_main import SAT
+from libervia.backend.core.xmpp import SatXMPPClient
+from libervia.backend.memory import persistent
+from libervia.backend.plugins.plugin_misc_text_commands import TextCommands
+from libervia.backend.plugins.plugin_xep_0045 import XEP_0045
+from libervia.backend.plugins.plugin_xep_0060 import XEP_0060
+from libervia.backend.plugins.plugin_xep_0163 import XEP_0163
+from libervia.backend.plugins.plugin_xep_0334 import XEP_0334
+from libervia.backend.plugins.plugin_xep_0359 import XEP_0359
+from libervia.backend.plugins.plugin_xep_0420 import \
+    XEP_0420, SCEAffixPolicy, SCEAffixValues, SCEProfile
+from libervia.backend.tools import xml_tools
+from twisted.internet import defer
+from twisted.words.protocols.jabber import error, jid
+from twisted.words.xish import domish
+
+try:
+    import omemo
+    import omemo.identity_key_pair
+    import twomemo
+    import twomemo.etree
+    import oldmemo
+    import oldmemo.etree
+    import oldmemo.migrations
+    from xmlschema import XMLSchemaValidationError
+
+    # An explicit version check of the OMEMO libraries should not be required here, since
+    # the stored data is fully versioned and the library will complain if a downgrade is
+    # attempted.
+except ImportError as import_error:
+    raise exceptions.MissingModule(
+        "You are missing one or more package required by the OMEMO plugin. Please"
+        " download/install the pip packages 'omemo', 'twomemo', 'oldmemo' and"
+        f" 'xmlschema'.\nexception: {import_error}"
+    ) from import_error
+
+
+__all__ = [  # pylint: disable=unused-variable
+    "PLUGIN_INFO",
+    "OMEMO"
+]
+
+log = cast(Logger, getLogger(__name__))  # type: ignore[no-untyped-call]
+
+
+PLUGIN_INFO = {
+    C.PI_NAME: "OMEMO",
+    C.PI_IMPORT_NAME: "XEP-0384",
+    C.PI_TYPE: "SEC",
+    C.PI_PROTOCOLS: [ "XEP-0384" ],
+    C.PI_DEPENDENCIES: [ "XEP-0163", "XEP-0280", "XEP-0334", "XEP-0060", "XEP-0420" ],
+    C.PI_RECOMMENDATIONS: [ "XEP-0045", "XEP-0359", C.TEXT_CMDS ],
+    C.PI_MAIN: "OMEMO",
+    C.PI_HANDLER: "no",
+    C.PI_DESCRIPTION: _("""Implementation of OMEMO"""),
+}
+
+
+PARAM_CATEGORY = "Security"
+PARAM_NAME = "omemo_policy"
+
+
+class LogHandler(logging.Handler):
+    """
+    Redirect python-omemo's log output to Libervia's log system.
+    """
+
+    def emit(self, record: logging.LogRecord) -> None:
+        log.log(record.levelname, record.getMessage())
+
+
+sm_logger = logging.getLogger(omemo.SessionManager.LOG_TAG)
+sm_logger.setLevel(logging.DEBUG)
+sm_logger.propagate = False
+sm_logger.addHandler(LogHandler())
+
+
+ikp_logger = logging.getLogger(omemo.identity_key_pair.IdentityKeyPair.LOG_TAG)
+ikp_logger.setLevel(logging.DEBUG)
+ikp_logger.propagate = False
+ikp_logger.addHandler(LogHandler())
+
+
+# TODO: Add handling for device labels, i.e. show device labels in the trust UI and give
+# the user a way to change their own device label.
+
+
+class MUCPlaintextCacheKey(NamedTuple):
+    # pylint: disable=invalid-name
+    """
+    Structure identifying an encrypted message sent to a MUC.
+    """
+
+    client: SatXMPPClient
+    room_jid: jid.JID
+    message_uid: str
+
+
+@enum.unique
+class TrustLevel(enum.Enum):
+    """
+    The trust levels required for ATM and BTBV.
+    """
+
+    TRUSTED: str = "TRUSTED"
+    BLINDLY_TRUSTED: str = "BLINDLY_TRUSTED"
+    UNDECIDED: str = "UNDECIDED"
+    DISTRUSTED: str = "DISTRUSTED"
+
+
+TWOMEMO_DEVICE_LIST_NODE = "urn:xmpp:omemo:2:devices"
+OLDMEMO_DEVICE_LIST_NODE = "eu.siacs.conversations.axolotl.devicelist"
+
+
+class StorageImpl(omemo.Storage):
+    """
+    Storage implementation for OMEMO based on :class:`persistent.LazyPersistentBinaryDict`
+    """
+
+    def __init__(self, profile: str) -> None:
+        """
+        @param profile: The profile this OMEMO data belongs to.
+        """
+
+        # persistent.LazyPersistentBinaryDict does not cache at all, so keep the caching
+        # option of omemo.Storage enabled.
+        super().__init__()
+
+        self.__storage = persistent.LazyPersistentBinaryDict("XEP-0384", profile)
+
+    async def _load(self, key: str) -> omemo.Maybe[omemo.JSONType]:
+        try:
+            return omemo.Just(await self.__storage[key])
+        except KeyError:
+            return omemo.Nothing()
+        except Exception as e:
+            raise omemo.StorageException(f"Error while loading key {key}") from e
+
+    async def _store(self, key: str, value: omemo.JSONType) -> None:
+        try:
+            await self.__storage.force(key, value)
+        except Exception as e:
+            raise omemo.StorageException(f"Error while storing key {key}: {value}") from e
+
+    async def _delete(self, key: str) -> None:
+        try:
+            await self.__storage.remove(key)
+        except KeyError:
+            pass
+        except Exception as e:
+            raise omemo.StorageException(f"Error while deleting key {key}") from e
+
+
+class LegacyStorageImpl(oldmemo.migrations.LegacyStorage):
+    """
+    Legacy storage implementation to migrate data from the old XEP-0384 plugin.
+    """
+
+    KEY_DEVICE_ID = "DEVICE_ID"
+    KEY_STATE = "STATE"
+    KEY_SESSION = "SESSION"
+    KEY_ACTIVE_DEVICES = "DEVICES"
+    KEY_INACTIVE_DEVICES = "INACTIVE_DEVICES"
+    KEY_TRUST = "TRUST"
+    KEY_ALL_JIDS = "ALL_JIDS"
+
+    def __init__(self, profile: str, own_bare_jid: str) -> None:
+        """
+        @param profile: The profile this OMEMO data belongs to.
+        @param own_bare_jid: The own bare JID, to return by the :meth:`load_own_data` call.
+        """
+
+        self.__storage = persistent.LazyPersistentBinaryDict("XEP-0384", profile)
+        self.__own_bare_jid = own_bare_jid
+
+    async def loadOwnData(self) -> Optional[oldmemo.migrations.OwnData]:
+        own_device_id = await self.__storage.get(LegacyStorageImpl.KEY_DEVICE_ID, None)
+        if own_device_id is None:
+            return None
+
+        return oldmemo.migrations.OwnData(
+            own_bare_jid=self.__own_bare_jid,
+            own_device_id=own_device_id
+        )
+
+    async def deleteOwnData(self) -> None:
+        try:
+            await self.__storage.remove(LegacyStorageImpl.KEY_DEVICE_ID)
+        except KeyError:
+            pass
+
+    async def loadState(self) -> Optional[oldmemo.migrations.State]:
+        return cast(
+            Optional[oldmemo.migrations.State],
+            await self.__storage.get(LegacyStorageImpl.KEY_STATE, None)
+        )
+
+    async def deleteState(self) -> None:
+        try:
+            await self.__storage.remove(LegacyStorageImpl.KEY_STATE)
+        except KeyError:
+            pass
+
+    async def loadSession(
+        self,
+        bare_jid: str,
+        device_id: int
+    ) -> Optional[oldmemo.migrations.Session]:
+        key = "\n".join([ LegacyStorageImpl.KEY_SESSION, bare_jid, str(device_id) ])
+
+        return cast(
+            Optional[oldmemo.migrations.Session],
+            await self.__storage.get(key, None)
+        )
+
+    async def deleteSession(self, bare_jid: str, device_id: int) -> None:
+        key = "\n".join([ LegacyStorageImpl.KEY_SESSION, bare_jid, str(device_id) ])
+
+        try:
+            await self.__storage.remove(key)
+        except KeyError:
+            pass
+
+    async def loadActiveDevices(self, bare_jid: str) -> Optional[List[int]]:
+        key = "\n".join([ LegacyStorageImpl.KEY_ACTIVE_DEVICES, bare_jid ])
+
+        return cast(
+            Optional[List[int]],
+            await self.__storage.get(key, None)
+        )
+
+    async def loadInactiveDevices(self, bare_jid: str) -> Optional[Dict[int, int]]:
+        key = "\n".join([ LegacyStorageImpl.KEY_INACTIVE_DEVICES, bare_jid ])
+
+        return cast(
+            Optional[Dict[int, int]],
+            await self.__storage.get(key, None)
+        )
+
+    async def deleteActiveDevices(self, bare_jid: str) -> None:
+        key = "\n".join([ LegacyStorageImpl.KEY_ACTIVE_DEVICES, bare_jid ])
+
+        try:
+            await self.__storage.remove(key)
+        except KeyError:
+            pass
+
+    async def deleteInactiveDevices(self, bare_jid: str) -> None:
+        key = "\n".join([ LegacyStorageImpl.KEY_INACTIVE_DEVICES, bare_jid ])
+
+        try:
+            await self.__storage.remove(key)
+        except KeyError:
+            pass
+
+    async def loadTrust(
+        self,
+        bare_jid: str,
+        device_id: int
+    ) -> Optional[oldmemo.migrations.Trust]:
+        key = "\n".join([ LegacyStorageImpl.KEY_TRUST, bare_jid, str(device_id) ])
+
+        return cast(
+            Optional[oldmemo.migrations.Trust],
+            await self.__storage.get(key, None)
+        )
+
+    async def deleteTrust(self, bare_jid: str, device_id: int) -> None:
+        key = "\n".join([ LegacyStorageImpl.KEY_TRUST, bare_jid, str(device_id) ])
+
+        try:
+            await self.__storage.remove(key)
+        except KeyError:
+            pass
+
+    async def listJIDs(self) -> Optional[List[str]]:
+        bare_jids = await self.__storage.get(LegacyStorageImpl.KEY_ALL_JIDS, None)
+
+        return None if bare_jids is None else list(bare_jids)
+
+    async def deleteJIDList(self) -> None:
+        try:
+            await self.__storage.remove(LegacyStorageImpl.KEY_ALL_JIDS)
+        except KeyError:
+            pass
+
+
+async def download_oldmemo_bundle(
+    client: SatXMPPClient,
+    xep_0060: XEP_0060,
+    bare_jid: str,
+    device_id: int
+) -> oldmemo.oldmemo.BundleImpl:
+    """Download the oldmemo bundle corresponding to a specific device.
+
+    @param client: The client.
+    @param xep_0060: The XEP-0060 plugin instance to use for pubsub interactions.
+    @param bare_jid: The bare JID the device belongs to.
+    @param device_id: The id of the device.
+    @return: The bundle.
+    @raise BundleDownloadFailed: if the download failed. Feel free to raise a subclass
+        instead.
+    """
+    # Bundle downloads are needed by the session manager and for migrations from legacy,
+    # thus it is made a separate function.
+
+    namespace = oldmemo.oldmemo.NAMESPACE
+    node = f"eu.siacs.conversations.axolotl.bundles:{device_id}"
+
+    try:
+        items, __ = await xep_0060.get_items(client, jid.JID(bare_jid), node, max_items=1)
+    except Exception as e:
+        raise omemo.BundleDownloadFailed(
+            f"Bundle download failed for {bare_jid}: {device_id} under namespace"
+            f" {namespace}"
+        ) from e
+
+    if len(items) != 1:
+        raise omemo.BundleDownloadFailed(
+            f"Bundle download failed for {bare_jid}: {device_id} under namespace"
+            f" {namespace}: Unexpected number of items retrieved: {len(items)}."
+        )
+
+    element = \
+        next(iter(xml_tools.domish_elt_2_et_elt(cast(domish.Element, items[0]))), None)
+    if element is None:
+        raise omemo.BundleDownloadFailed(
+            f"Bundle download failed for {bare_jid}: {device_id} under namespace"
+            f" {namespace}: Item download succeeded but parsing failed: {element}."
+        )
+
+    try:
+        return oldmemo.etree.parse_bundle(element, bare_jid, device_id)
+    except Exception as e:
+        raise omemo.BundleDownloadFailed(
+            f"Bundle parsing failed for {bare_jid}: {device_id} under namespace"
+            f" {namespace}"
+        ) from e
+
+
+# ATM only supports protocols based on SCE, which is currently only omemo:2, and relies on
+# so many implementation details of the encryption protocol that it makes more sense to
+# add ATM to the OMEMO plugin directly instead of having it a separate Libervia plugin.
+NS_TM: Final = "urn:xmpp:tm:1"
+NS_ATM: Final = "urn:xmpp:atm:1"
+
+
+TRUST_MESSAGE_SCHEMA = xmlschema.XMLSchema("""<?xml version='1.0' encoding='UTF-8'?>
+<xs:schema xmlns:xs='http://www.w3.org/2001/XMLSchema'
+           targetNamespace='urn:xmpp:tm:1'
+           xmlns='urn:xmpp:tm:1'
+           elementFormDefault='qualified'>
+
+  <xs:element name='trust-message'>
+    <xs:complexType>
+      <xs:sequence>
+        <xs:element ref='key-owner' minOccurs='1' maxOccurs='unbounded'/>
+      </xs:sequence>
+      <xs:attribute name='usage' type='xs:string' use='required'/>
+      <xs:attribute name='encryption' type='xs:string' use='required'/>
+    </xs:complexType>
+  </xs:element>
+
+  <xs:element name='key-owner'>
+    <xs:complexType>
+      <xs:sequence>
+        <xs:element
+            name='trust' type='xs:base64Binary' minOccurs='0' maxOccurs='unbounded'/>
+        <xs:element
+            name='distrust' type='xs:base64Binary' minOccurs='0' maxOccurs='unbounded'/>
+      </xs:sequence>
+      <xs:attribute name='jid' type='xs:string' use='required'/>
+    </xs:complexType>
+  </xs:element>
+</xs:schema>
+""")
+
+
+# This is compatible with omemo:2's SCE profile
+TM_SCE_PROFILE = SCEProfile(
+    rpad_policy=SCEAffixPolicy.REQUIRED,
+    time_policy=SCEAffixPolicy.REQUIRED,
+    to_policy=SCEAffixPolicy.OPTIONAL,
+    from_policy=SCEAffixPolicy.OPTIONAL,
+    custom_policies={}
+)
+
+
+class TrustUpdate(NamedTuple):
+    # pylint: disable=invalid-name
+    """
+    An update to the trust status of an identity key, used by Automatic Trust Management.
+    """
+
+    target_jid: jid.JID
+    target_key: bytes
+    target_trust: bool
+
+
+class TrustMessageCacheEntry(NamedTuple):
+    # pylint: disable=invalid-name
+    """
+    An entry in the trust message cache used by ATM.
+    """
+
+    sender_jid: jid.JID
+    sender_key: bytes
+    timestamp: datetime
+    trust_update: TrustUpdate
+
+
+class PartialTrustMessage(NamedTuple):
+    # pylint: disable=invalid-name
+    """
+    A structure representing a partial trust message, used by :func:`send_trust_messages`
+    to build trust messages.
+    """
+
+    recipient_jid: jid.JID
+    updated_jid: jid.JID
+    trust_updates: FrozenSet[TrustUpdate]
+
+
+async def manage_trust_message_cache(
+    client: SatXMPPClient,
+    session_manager: omemo.SessionManager,
+    applied_trust_updates: FrozenSet[TrustUpdate]
+) -> None:
+    """Manage the ATM trust message cache after trust updates have been applied.
+
+    @param client: The client this operation runs under.
+    @param session_manager: The session manager to use.
+    @param applied_trust_updates: The trust updates that have already been applied,
+        triggering this cache management run.
+    """
+
+    trust_message_cache = persistent.LazyPersistentBinaryDict(
+        "XEP-0384/TM",
+        client.profile
+    )
+
+    # Load cache entries
+    cache_entries = cast(
+        Set[TrustMessageCacheEntry],
+        await trust_message_cache.get("cache", set())
+    )
+
+    # Expire cache entries that were overwritten by the applied trust updates
+    cache_entries_by_target = {
+        (
+            cache_entry.trust_update.target_jid.userhostJID(),
+            cache_entry.trust_update.target_key
+        ): cache_entry
+        for cache_entry
+        in cache_entries
+    }
+
+    for trust_update in applied_trust_updates:
+        cache_entry = cache_entries_by_target.get(
+            (trust_update.target_jid.userhostJID(), trust_update.target_key),
+            None
+        )
+
+        if cache_entry is not None:
+            cache_entries.remove(cache_entry)
+
+    # Apply cached Trust Messages by newly trusted devices
+    new_trust_updates: Set[TrustUpdate] = set()
+
+    for trust_update in applied_trust_updates:
+        if trust_update.target_trust:
+            # Iterate over a copy such that cache_entries can be modified
+            for cache_entry in set(cache_entries):
+                if (
+                    cache_entry.sender_jid.userhostJID()
+                    == trust_update.target_jid.userhostJID()
+                    and cache_entry.sender_key == trust_update.target_key
+                ):
+                    trust_level = (
+                        TrustLevel.TRUSTED
+                        if cache_entry.trust_update.target_trust
+                        else TrustLevel.DISTRUSTED
+                    )
+
+                    # Apply the trust update
+                    await session_manager.set_trust(
+                        cache_entry.trust_update.target_jid.userhost(),
+                        cache_entry.trust_update.target_key,
+                        trust_level.name
+                    )
+
+                    # Track the fact that this trust update has been applied
+                    new_trust_updates.add(cache_entry.trust_update)
+
+                    # Remove the corresponding cache entry
+                    cache_entries.remove(cache_entry)
+
+    # Store the updated cache entries
+    await trust_message_cache.force("cache", cache_entries)
+
+    # TODO: Notify the user ("feedback") about automatically updated trust?
+
+    if len(new_trust_updates) > 0:
+        # If any trust has been updated, recursively perform another run of cache
+        # management
+        await manage_trust_message_cache(
+            client,
+            session_manager,
+            frozenset(new_trust_updates)
+        )
+
+
+async def get_trust_as_trust_updates(
+    session_manager: omemo.SessionManager,
+    target_jid: jid.JID
+) -> FrozenSet[TrustUpdate]:
+    """Get the trust status of all known keys of a JID as trust updates for use with ATM.
+
+    @param session_manager: The session manager to load the trust from.
+    @param target_jid: The JID to load the trust for.
+    @return: The trust updates encoding the trust status of all known keys of the JID that
+        are either explicitly trusted or distrusted. Undecided keys are not included in
+        the trust updates.
+    """
+
+    devices = await session_manager.get_device_information(target_jid.userhost())
+
+    trust_updates: Set[TrustUpdate] = set()
+
+    for device in devices:
+        trust_level = TrustLevel(device.trust_level_name)
+        target_trust: bool
+
+        if trust_level is TrustLevel.TRUSTED:
+            target_trust = True
+        elif trust_level is TrustLevel.DISTRUSTED:
+            target_trust = False
+        else:
+            # Skip devices that are not explicitly trusted or distrusted
+            continue
+
+        trust_updates.add(TrustUpdate(
+            target_jid=target_jid.userhostJID(),
+            target_key=device.identity_key,
+            target_trust=target_trust
+        ))
+
+    return frozenset(trust_updates)
+
+
+async def send_trust_messages(
+    client: SatXMPPClient,
+    session_manager: omemo.SessionManager,
+    applied_trust_updates: FrozenSet[TrustUpdate]
+) -> None:
+    """Send information about updated trust to peers via ATM (XEP-0450).
+
+    @param client: The client.
+    @param session_manager: The session manager.
+    @param applied_trust_updates: The trust updates that have already been applied, to
+        notify other peers about.
+    """
+    # NOTE: This currently sends information about oldmemo trust too. This is not
+    # specified and experimental, but since twomemo and oldmemo share the same identity
+    # keys and trust systems, this could be a cool side effect.
+
+    # Send Trust Messages for newly trusted and distrusted devices
+    own_jid = client.jid.userhostJID()
+    own_trust_updates = await get_trust_as_trust_updates(session_manager, own_jid)
+
+    # JIDs of which at least one device's trust has been updated
+    updated_jids = frozenset({
+        trust_update.target_jid.userhostJID()
+        for trust_update
+        in applied_trust_updates
+    })
+
+    trust_messages: Set[PartialTrustMessage] = set()
+
+    for updated_jid in updated_jids:
+        # Get the trust updates for that JID
+        trust_updates = frozenset({
+            trust_update for trust_update in applied_trust_updates
+            if trust_update.target_jid.userhostJID() == updated_jid
+        })
+
+        if updated_jid == own_jid:
+            # If the own JID is updated, _all_ peers have to be notified
+            # TODO: Using my author's privilege here to shamelessly access private fields
+            # and storage keys until I've added public API to get a list of peers to
+            # python-omemo.
+            storage: omemo.Storage = getattr(session_manager, "_SessionManager__storage")
+            peer_jids = frozenset({
+                jid.JID(bare_jid).userhostJID() for bare_jid in (await storage.load_list(
+                    f"/{OMEMO.NS_TWOMEMO}/bare_jids",
+                    str
+                )).maybe([])
+            })
+
+            if len(peer_jids) == 0:
+                # If there are no peers to notify, notify our other devices about the
+                # changes directly
+                trust_messages.add(PartialTrustMessage(
+                    recipient_jid=own_jid,
+                    updated_jid=own_jid,
+                    trust_updates=trust_updates
+                ))
+            else:
+                # Otherwise, notify all peers about the changes in trust and let carbons
+                # handle the copy to our own JID
+                for peer_jid in peer_jids:
+                    trust_messages.add(PartialTrustMessage(
+                        recipient_jid=peer_jid,
+                        updated_jid=own_jid,
+                        trust_updates=trust_updates
+                    ))
+
+                    # Also send full trust information about _every_ peer to our newly
+                    # trusted devices
+                    peer_trust_updates = \
+                        await get_trust_as_trust_updates(session_manager, peer_jid)
+
+                    trust_messages.add(PartialTrustMessage(
+                        recipient_jid=own_jid,
+                        updated_jid=peer_jid,
+                        trust_updates=peer_trust_updates
+                    ))
+
+            # Send information about our own devices to our newly trusted devices
+            trust_messages.add(PartialTrustMessage(
+                recipient_jid=own_jid,
+                updated_jid=own_jid,
+                trust_updates=own_trust_updates
+            ))
+        else:
+            # Notify our other devices about the changes in trust
+            trust_messages.add(PartialTrustMessage(
+                recipient_jid=own_jid,
+                updated_jid=updated_jid,
+                trust_updates=trust_updates
+            ))
+
+            # Send a summary of our own trust to newly trusted devices
+            trust_messages.add(PartialTrustMessage(
+                recipient_jid=updated_jid,
+                updated_jid=own_jid,
+                trust_updates=own_trust_updates
+            ))
+
+    # All trust messages prepared. Merge all trust messages directed at the same
+    # recipient.
+    recipient_jids = { trust_message.recipient_jid for trust_message in trust_messages }
+
+    for recipient_jid in recipient_jids:
+        updated: Dict[jid.JID, Set[TrustUpdate]] = {}
+
+        for trust_message in trust_messages:
+            # Merge trust messages directed at that recipient
+            if trust_message.recipient_jid == recipient_jid:
+                # Merge the trust updates
+                updated[trust_message.updated_jid] = \
+                    updated.get(trust_message.updated_jid, set())
+
+                updated[trust_message.updated_jid] |= trust_message.trust_updates
+
+        # Build the trust message
+        trust_message_elt = domish.Element((NS_TM, "trust-message"))
+        trust_message_elt["usage"] = NS_ATM
+        trust_message_elt["encryption"] = twomemo.twomemo.NAMESPACE
+
+        for updated_jid, trust_updates in updated.items():
+            key_owner_elt = trust_message_elt.addElement((NS_TM, "key-owner"))
+            key_owner_elt["jid"] = updated_jid.userhost()
+
+            for trust_update in trust_updates:
+                serialized_identity_key = \
+                    base64.b64encode(trust_update.target_key).decode("ASCII")
+
+                if trust_update.target_trust:
+                    key_owner_elt.addElement(
+                        (NS_TM, "trust"),
+                        content=serialized_identity_key
+                    )
+                else:
+                    key_owner_elt.addElement(
+                        (NS_TM, "distrust"),
+                        content=serialized_identity_key
+                    )
+
+        # Finally, encrypt and send the trust message!
+        message_data = client.generate_message_xml(MessageData({
+            "from": own_jid,
+            "to": recipient_jid,
+            "uid": str(uuid.uuid4()),
+            "message": {},
+            "subject": {},
+            "type": C.MESS_TYPE_CHAT,
+            "extra": {},
+            "timestamp": time.time()
+        }))
+
+        message_data["xml"].addChild(trust_message_elt)
+
+        plaintext = XEP_0420.pack_stanza(TM_SCE_PROFILE, message_data["xml"])
+
+        feedback_jid = recipient_jid
+
+        # TODO: The following is mostly duplicate code
+        try:
+            messages, encryption_errors = await session_manager.encrypt(
+                frozenset({ own_jid.userhost(), recipient_jid.userhost() }),
+                { OMEMO.NS_TWOMEMO: plaintext },
+                backend_priority_order=[ OMEMO.NS_TWOMEMO ],
+                identifier=feedback_jid.userhost()
+            )
+        except Exception as e:
+            msg = _(
+                # pylint: disable=consider-using-f-string
+                "Can't encrypt message for {entities}: {reason}".format(
+                    entities=', '.join({ own_jid.userhost(), recipient_jid.userhost() }),
+                    reason=e
+                )
+            )
+            log.warning(msg)
+            client.feedback(feedback_jid, msg, {
+                C.MESS_EXTRA_INFO: C.EXTRA_INFO_ENCR_ERR
+            })
+            raise e
+
+        if len(encryption_errors) > 0:
+            log.warning(
+                f"Ignored the following non-critical encryption errors:"
+                f" {encryption_errors}"
+            )
+
+            encrypted_errors_stringified = ", ".join([
+                f"device {err.device_id} of {err.bare_jid} under namespace"
+                f" {err.namespace}"
+                for err
+                in encryption_errors
+            ])
+
+            client.feedback(
+                feedback_jid,
+                D_(
+                    "There were non-critical errors during encryption resulting in some"
+                    " of your destinees' devices potentially not receiving the message."
+                    " This happens when the encryption data/key material of a device is"
+                    " incomplete or broken, which shouldn't happen for actively used"
+                    " devices, and can usually be ignored. The following devices are"
+                    f" affected: {encrypted_errors_stringified}."
+                )
+            )
+
+        message = next(
+            message for message in messages
+            if message.namespace == OMEMO.NS_TWOMEMO
+        )
+
+        # Add the encrypted element
+        message_data["xml"].addChild(xml_tools.et_elt_2_domish_elt(
+            twomemo.etree.serialize_message(message)
+        ))
+
+        await client.a_send(message_data["xml"])
+
+
+def make_session_manager(sat: SAT, profile: str) -> Type[omemo.SessionManager]:
+    """
+    @param sat: The SAT instance.
+    @param profile: The profile.
+    @return: A non-abstract subclass of :class:`~omemo.session_manager.SessionManager`
+        with XMPP interactions and trust handled via the SAT instance.
+    """
+
+    client = sat.get_client(profile)
+    xep_0060 = cast(XEP_0060, sat.plugins["XEP-0060"])
+
+    class SessionManagerImpl(omemo.SessionManager):
+        """
+        Session manager implementation handling XMPP interactions and trust via an
+        instance of :class:`~sat.core.sat_main.SAT`.
+        """
+
+        @staticmethod
+        async def _upload_bundle(bundle: omemo.Bundle) -> None:
+            if isinstance(bundle, twomemo.twomemo.BundleImpl):
+                element = twomemo.etree.serialize_bundle(bundle)
+
+                node = "urn:xmpp:omemo:2:bundles"
+                try:
+                    await xep_0060.send_item(
+                        client,
+                        client.jid.userhostJID(),
+                        node,
+                        xml_tools.et_elt_2_domish_elt(element),
+                        item_id=str(bundle.device_id),
+                        extra={
+                            XEP_0060.EXTRA_PUBLISH_OPTIONS: {
+                                XEP_0060.OPT_MAX_ITEMS: "max"
+                            },
+                            XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "raise"
+                        }
+                    )
+                except (error.StanzaError, Exception) as e:
+                    if (
+                        isinstance(e, error.StanzaError)
+                        and e.condition == "conflict"
+                        and e.appCondition is not None
+                        # pylint: disable=no-member
+                        and e.appCondition.name == "precondition-not-met"
+                    ):
+                        # publish options couldn't be set on the fly, manually reconfigure
+                        # the node and publish again
+                        raise omemo.BundleUploadFailed(
+                            f"precondition-not-met: {bundle}"
+                        ) from e
+                        # TODO: What can I do here? The correct node configuration is a
+                        # MUST in the XEP.
+
+                    raise omemo.BundleUploadFailed(
+                        f"Bundle upload failed: {bundle}"
+                    ) from e
+
+                return
+
+            if isinstance(bundle, oldmemo.oldmemo.BundleImpl):
+                element = oldmemo.etree.serialize_bundle(bundle)
+
+                node = f"eu.siacs.conversations.axolotl.bundles:{bundle.device_id}"
+                try:
+                    await xep_0060.send_item(
+                        client,
+                        client.jid.userhostJID(),
+                        node,
+                        xml_tools.et_elt_2_domish_elt(element),
+                        item_id=xep_0060.ID_SINGLETON,
+                        extra={
+                            XEP_0060.EXTRA_PUBLISH_OPTIONS: { XEP_0060.OPT_MAX_ITEMS: 1 },
+                            XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options"
+                        }
+                    )
+                except Exception as e:
+                    raise omemo.BundleUploadFailed(
+                        f"Bundle upload failed: {bundle}"
+                    ) from e
+
+                return
+
+            raise omemo.UnknownNamespace(f"Unknown namespace: {bundle.namespace}")
+
+        @staticmethod
+        async def _download_bundle(
+            namespace: str,
+            bare_jid: str,
+            device_id: int
+        ) -> omemo.Bundle:
+            if namespace == twomemo.twomemo.NAMESPACE:
+                node = "urn:xmpp:omemo:2:bundles"
+
+                try:
+                    items, __ = await xep_0060.get_items(
+                        client,
+                        jid.JID(bare_jid),
+                        node,
+                        item_ids=[ str(device_id) ]
+                    )
+                except Exception as e:
+                    raise omemo.BundleDownloadFailed(
+                        f"Bundle download failed for {bare_jid}: {device_id} under"
+                        f" namespace {namespace}"
+                    ) from e
+
+                if len(items) != 1:
+                    raise omemo.BundleDownloadFailed(
+                        f"Bundle download failed for {bare_jid}: {device_id} under"
+                        f" namespace {namespace}: Unexpected number of items retrieved:"
+                        f" {len(items)}."
+                    )
+
+                element = next(
+                    iter(xml_tools.domish_elt_2_et_elt(cast(domish.Element, items[0]))),
+                    None
+                )
+                if element is None:
+                    raise omemo.BundleDownloadFailed(
+                        f"Bundle download failed for {bare_jid}: {device_id} under"
+                        f" namespace {namespace}: Item download succeeded but parsing"
+                        f" failed: {element}."
+                    )
+
+                try:
+                    return twomemo.etree.parse_bundle(element, bare_jid, device_id)
+                except Exception as e:
+                    raise omemo.BundleDownloadFailed(
+                        f"Bundle parsing failed for {bare_jid}: {device_id} under"
+                        f" namespace {namespace}"
+                    ) from e
+
+            if namespace == oldmemo.oldmemo.NAMESPACE:
+                return await download_oldmemo_bundle(
+                    client,
+                    xep_0060,
+                    bare_jid,
+                    device_id
+                )
+
+            raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}")
+
+        @staticmethod
+        async def _delete_bundle(namespace: str, device_id: int) -> None:
+            if namespace == twomemo.twomemo.NAMESPACE:
+                node = "urn:xmpp:omemo:2:bundles"
+
+                try:
+                    await xep_0060.retract_items(
+                        client,
+                        client.jid.userhostJID(),
+                        node,
+                        [ str(device_id) ],
+                        notify=False
+                    )
+                except Exception as e:
+                    raise omemo.BundleDeletionFailed(
+                        f"Bundle deletion failed for {device_id} under namespace"
+                        f" {namespace}"
+                    ) from e
+
+                return
+
+            if namespace == oldmemo.oldmemo.NAMESPACE:
+                node = f"eu.siacs.conversations.axolotl.bundles:{device_id}"
+
+                try:
+                    await xep_0060.deleteNode(client, client.jid.userhostJID(), node)
+                except Exception as e:
+                    raise omemo.BundleDeletionFailed(
+                        f"Bundle deletion failed for {device_id} under namespace"
+                        f" {namespace}"
+                    ) from e
+
+                return
+
+            raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}")
+
+        @staticmethod
+        async def _upload_device_list(
+            namespace: str,
+            device_list: Dict[int, Optional[str]]
+        ) -> None:
+            element: Optional[ET.Element] = None
+            node: Optional[str] = None
+
+            if namespace == twomemo.twomemo.NAMESPACE:
+                element = twomemo.etree.serialize_device_list(device_list)
+                node = TWOMEMO_DEVICE_LIST_NODE
+            if namespace == oldmemo.oldmemo.NAMESPACE:
+                element = oldmemo.etree.serialize_device_list(device_list)
+                node = OLDMEMO_DEVICE_LIST_NODE
+
+            if element is None or node is None:
+                raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}")
+
+            try:
+                await xep_0060.send_item(
+                    client,
+                    client.jid.userhostJID(),
+                    node,
+                    xml_tools.et_elt_2_domish_elt(element),
+                    item_id=xep_0060.ID_SINGLETON,
+                    extra={
+                        XEP_0060.EXTRA_PUBLISH_OPTIONS: {
+                            XEP_0060.OPT_MAX_ITEMS: 1,
+                            XEP_0060.OPT_ACCESS_MODEL: "open"
+                        },
+                        XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "raise"
+                    }
+                )
+            except (error.StanzaError, Exception) as e:
+                if (
+                    isinstance(e, error.StanzaError)
+                    and e.condition == "conflict"
+                    and e.appCondition is not None
+                    # pylint: disable=no-member
+                    and e.appCondition.name == "precondition-not-met"
+                ):
+                    # publish options couldn't be set on the fly, manually reconfigure the
+                    # node and publish again
+                    raise omemo.DeviceListUploadFailed(
+                        f"precondition-not-met for namespace {namespace}"
+                    ) from e
+                    # TODO: What can I do here? The correct node configuration is a MUST
+                    # in the XEP.
+
+                raise omemo.DeviceListUploadFailed(
+                    f"Device list upload failed for namespace {namespace}"
+                ) from e
+
+        @staticmethod
+        async def _download_device_list(
+            namespace: str,
+            bare_jid: str
+        ) -> Dict[int, Optional[str]]:
+            node: Optional[str] = None
+
+            if namespace == twomemo.twomemo.NAMESPACE:
+                node = TWOMEMO_DEVICE_LIST_NODE
+            if namespace == oldmemo.oldmemo.NAMESPACE:
+                node = OLDMEMO_DEVICE_LIST_NODE
+
+            if node is None:
+                raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}")
+
+            try:
+                items, __ = await xep_0060.get_items(client, jid.JID(bare_jid), node)
+            except exceptions.NotFound:
+                return {}
+            except Exception as e:
+                raise omemo.DeviceListDownloadFailed(
+                    f"Device list download failed for {bare_jid} under namespace"
+                    f" {namespace}"
+                ) from e
+
+            if len(items) == 0:
+                return {}
+
+            if len(items) != 1:
+                raise omemo.DeviceListDownloadFailed(
+                    f"Device list download failed for {bare_jid} under namespace"
+                    f" {namespace}: Unexpected number of items retrieved: {len(items)}."
+                )
+
+            element = next(
+                iter(xml_tools.domish_elt_2_et_elt(cast(domish.Element, items[0]))),
+                None
+            )
+
+            if element is None:
+                raise omemo.DeviceListDownloadFailed(
+                    f"Device list download failed for {bare_jid} under namespace"
+                    f" {namespace}: Item download succeeded but parsing failed:"
+                    f" {element}."
+                )
+
+            try:
+                if namespace == twomemo.twomemo.NAMESPACE:
+                    return twomemo.etree.parse_device_list(element)
+                if namespace == oldmemo.oldmemo.NAMESPACE:
+                    return oldmemo.etree.parse_device_list(element)
+            except Exception as e:
+                raise omemo.DeviceListDownloadFailed(
+                    f"Device list download failed for {bare_jid} under namespace"
+                    f" {namespace}"
+                ) from e
+
+            raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}")
+
+        async def _evaluate_custom_trust_level(
+            self,
+            device: omemo.DeviceInformation
+        ) -> omemo.TrustLevel:
+            # Get the custom trust level
+            try:
+                trust_level = TrustLevel(device.trust_level_name)
+            except ValueError as e:
+                raise omemo.UnknownTrustLevel(
+                    f"Unknown trust level name {device.trust_level_name}"
+                ) from e
+
+            # The first three cases are a straight-forward mapping
+            if trust_level is TrustLevel.TRUSTED:
+                return omemo.TrustLevel.TRUSTED
+            if trust_level is TrustLevel.UNDECIDED:
+                return omemo.TrustLevel.UNDECIDED
+            if trust_level is TrustLevel.DISTRUSTED:
+                return omemo.TrustLevel.DISTRUSTED
+
+            # The blindly trusted case is more complicated, since its evaluation depends
+            # on the trust system and phase
+            if trust_level is TrustLevel.BLINDLY_TRUSTED:
+                # Get the name of the active trust system
+                trust_system = cast(str, sat.memory.param_get_a(
+                    PARAM_NAME,
+                    PARAM_CATEGORY,
+                    profile_key=profile
+                ))
+
+                # If the trust model is BTBV, blind trust is always enabled
+                if trust_system == "btbv":
+                    return omemo.TrustLevel.TRUSTED
+
+                # If the trust model is ATM, blind trust is disabled in the second phase
+                # and counts as undecided
+                if trust_system == "atm":
+                    # Find out whether we are in phase one or two
+                    devices = await self.get_device_information(device.bare_jid)
+
+                    phase_one = all(TrustLevel(device.trust_level_name) in {
+                        TrustLevel.UNDECIDED,
+                        TrustLevel.BLINDLY_TRUSTED
+                    } for device in devices)
+
+                    if phase_one:
+                        return omemo.TrustLevel.TRUSTED
+
+                    return omemo.TrustLevel.UNDECIDED
+
+                raise exceptions.InternalError(
+                    f"Unknown trust system active: {trust_system}"
+                )
+
+            assert_never(trust_level)
+
+        async def _make_trust_decision(
+            self,
+            undecided: FrozenSet[omemo.DeviceInformation],
+            identifier: Optional[str]
+        ) -> None:
+            if identifier is None:
+                raise omemo.TrustDecisionFailed(
+                    "The identifier must contain the feedback JID."
+                )
+
+            # The feedback JID is transferred via the identifier
+            feedback_jid = jid.JID(identifier).userhostJID()
+
+            # Both the ATM and the BTBV trust models work with blind trust before the
+            # first manual verification is performed. Thus, we can separate bare JIDs into
+            # two pools here, one pool of bare JIDs for which blind trust is active, and
+            # one pool of bare JIDs for which manual trust is used instead.
+            bare_jids = { device.bare_jid for device in undecided }
+
+            blind_trust_bare_jids: Set[str] = set()
+            manual_trust_bare_jids: Set[str] = set()
+
+            # For each bare JID, decide whether blind trust applies
+            for bare_jid in bare_jids:
+                # Get all known devices belonging to the bare JID
+                devices = await self.get_device_information(bare_jid)
+
+                # If the trust levels of all devices correspond to those used by blind
+                # trust, blind trust applies. Otherwise, fall back to manual trust.
+                if all(TrustLevel(device.trust_level_name) in {
+                    TrustLevel.UNDECIDED,
+                    TrustLevel.BLINDLY_TRUSTED
+                } for device in devices):
+                    blind_trust_bare_jids.add(bare_jid)
+                else:
+                    manual_trust_bare_jids.add(bare_jid)
+
+            # With the JIDs sorted into their respective pools, the undecided devices can
+            # be categorized too
+            blindly_trusted_devices = \
+                { dev for dev in undecided if dev.bare_jid in blind_trust_bare_jids }
+            manually_trusted_devices = \
+                { dev for dev in undecided if dev.bare_jid in manual_trust_bare_jids }
+
+            # Blindly trust devices handled by blind trust
+            if len(blindly_trusted_devices) > 0:
+                for device in blindly_trusted_devices:
+                    await self.set_trust(
+                        device.bare_jid,
+                        device.identity_key,
+                        TrustLevel.BLINDLY_TRUSTED.name
+                    )
+
+                blindly_trusted_devices_stringified = ", ".join([
+                    f"device {device.device_id} of {device.bare_jid} under namespace"
+                    f" {device.namespaces}"
+                    for device
+                    in blindly_trusted_devices
+                ])
+
+                client.feedback(
+                    feedback_jid,
+                    D_(
+                        "Not all destination devices are trusted, unknown devices will be"
+                        " blindly trusted.\nFollowing devices have been automatically"
+                        f" trusted: {blindly_trusted_devices_stringified}."
+                    )
+                )
+
+            # Prompt the user for manual trust decisions on the devices handled by manual
+            # trust
+            if len(manually_trusted_devices) > 0:
+                client.feedback(
+                    feedback_jid,
+                    D_(
+                        "Not all destination devices are trusted, we can't encrypt"
+                        " message in such a situation. Please indicate if you trust"
+                        " those devices or not in the trust manager before we can"
+                        " send this message."
+                    )
+                )
+                await self.__prompt_manual_trust(
+                    frozenset(manually_trusted_devices),
+                    feedback_jid
+                )
+
+        @staticmethod
+        async def _send_message(message: omemo.Message, bare_jid: str) -> None:
+            element: Optional[ET.Element] = None
+
+            if message.namespace == twomemo.twomemo.NAMESPACE:
+                element = twomemo.etree.serialize_message(message)
+            if message.namespace == oldmemo.oldmemo.NAMESPACE:
+                element = oldmemo.etree.serialize_message(message)
+
+            if element is None:
+                raise omemo.UnknownNamespace(f"Unknown namespace: {message.namespace}")
+
+            message_data = client.generate_message_xml(MessageData({
+                "from": client.jid,
+                "to": jid.JID(bare_jid),
+                "uid": str(uuid.uuid4()),
+                "message": {},
+                "subject": {},
+                "type": C.MESS_TYPE_CHAT,
+                "extra": {},
+                "timestamp": time.time()
+            }))
+
+            message_data["xml"].addChild(xml_tools.et_elt_2_domish_elt(element))
+
+            try:
+                await client.a_send(message_data["xml"])
+            except Exception as e:
+                raise omemo.MessageSendingFailed() from e
+
+        async def __prompt_manual_trust(
+            self,
+            undecided: FrozenSet[omemo.DeviceInformation],
+            feedback_jid: jid.JID
+        ) -> None:
+            """Asks the user to decide on the manual trust level of a set of devices.
+
+            Blocks until the user has made a decision and updates the trust levels of all
+            devices using :meth:`set_trust`.
+
+            @param undecided: The set of devices to prompt manual trust for.
+            @param feedback_jid: The bare JID to redirect feedback to. In case of a one to
+                one message, the recipient JID. In case of a MUC message, the room JID.
+            @raise TrustDecisionFailed: if the user cancels the prompt.
+            """
+
+            # This session manager handles encryption with both twomemo and oldmemo, but
+            # both are currently registered as different plugins and the `defer_xmlui`
+            # below requires a single namespace identifying the encryption plugin. Thus,
+            # get the namespace of the requested encryption method from the encryption
+            # session using the feedback JID.
+            encryption = client.encryption.getSession(feedback_jid)
+            if encryption is None:
+                raise omemo.TrustDecisionFailed(
+                    f"Encryption not requested for {feedback_jid.userhost()}."
+                )
+
+            namespace = encryption["plugin"].namespace
+
+            # Casting this to Any, otherwise all calls on the variable cause type errors
+            # pylint: disable=no-member
+            trust_ui = cast(Any, xml_tools.XMLUI(
+                panel_type=C.XMLUI_FORM,
+                title=D_("OMEMO trust management"),
+                submit_id=""
+            ))
+            trust_ui.addText(D_(
+                "This is OMEMO trusting system. You'll see below the devices of your "
+                "contacts, and a checkbox to trust them or not. A trusted device "
+                "can read your messages in plain text, so be sure to only validate "
+                "devices that you are sure are belonging to your contact. It's better "
+                "to do this when you are next to your contact and their device, so "
+                "you can check the \"fingerprint\" (the number next to the device) "
+                "yourself. Do *not* validate a device if the fingerprint is wrong!"
+            ))
+
+            own_device, __ = await self.get_own_device_information()
+
+            trust_ui.change_container("label")
+            trust_ui.addLabel(D_("This device ID"))
+            trust_ui.addText(str(own_device.device_id))
+            trust_ui.addLabel(D_("This device's fingerprint"))
+            trust_ui.addText(" ".join(self.format_identity_key(own_device.identity_key)))
+            trust_ui.addEmpty()
+            trust_ui.addEmpty()
+
+            # At least sort the devices by bare JID such that they aren't listed
+            # completely random
+            undecided_ordered = sorted(undecided, key=lambda device: device.bare_jid)
+
+            for index, device in enumerate(undecided_ordered):
+                trust_ui.addLabel(D_("Contact"))
+                trust_ui.addJid(jid.JID(device.bare_jid))
+                trust_ui.addLabel(D_("Device ID"))
+                trust_ui.addText(str(device.device_id))
+                trust_ui.addLabel(D_("Fingerprint"))
+                trust_ui.addText(" ".join(self.format_identity_key(device.identity_key)))
+                trust_ui.addLabel(D_("Trust this device?"))
+                trust_ui.addBool(f"trust_{index}", value=C.bool_const(False))
+                trust_ui.addEmpty()
+                trust_ui.addEmpty()
+
+            trust_ui_result = await xml_tools.defer_xmlui(
+                sat,
+                trust_ui,
+                action_extra={ "meta_encryption_trust": namespace },
+                profile=profile
+            )
+
+            if C.bool(trust_ui_result.get("cancelled", "false")):
+                raise omemo.TrustDecisionFailed("Trust UI cancelled.")
+
+            data_form_result = cast(Dict[str, str], xml_tools.xmlui_result_2_data_form_result(
+                trust_ui_result
+            ))
+
+            trust_updates: Set[TrustUpdate] = set()
+
+            for key, value in data_form_result.items():
+                if not key.startswith("trust_"):
+                    continue
+
+                device = undecided_ordered[int(key[len("trust_"):])]
+                target_trust = C.bool(value)
+                trust_level = \
+                    TrustLevel.TRUSTED if target_trust else TrustLevel.DISTRUSTED
+
+                await self.set_trust(
+                    device.bare_jid,
+                    device.identity_key,
+                    trust_level.name
+                )
+
+                trust_updates.add(TrustUpdate(
+                    target_jid=jid.JID(device.bare_jid).userhostJID(),
+                    target_key=device.identity_key,
+                    target_trust=target_trust
+                ))
+
+            # Check whether ATM is enabled and handle everything in case it is
+            trust_system = cast(str, sat.memory.param_get_a(
+                PARAM_NAME,
+                PARAM_CATEGORY,
+                profile_key=profile
+            ))
+
+            if trust_system == "atm":
+                await manage_trust_message_cache(client, self, frozenset(trust_updates))
+                await send_trust_messages(client, self, frozenset(trust_updates))
+
+    return SessionManagerImpl
+
+
+async def prepare_for_profile(
+    sat: SAT,
+    profile: str,
+    initial_own_label: Optional[str],
+    signed_pre_key_rotation_period: int = 7 * 24 * 60 * 60,
+    pre_key_refill_threshold: int = 99,
+    max_num_per_session_skipped_keys: int = 1000,
+    max_num_per_message_skipped_keys: Optional[int] = None
+) -> omemo.SessionManager:
+    """Prepare the OMEMO library (storage, backends, core) for a specific profile.
+
+    @param sat: The SAT instance.
+    @param profile: The profile.
+    @param initial_own_label: The initial (optional) label to assign to this device if
+        supported by any of the backends.
+    @param signed_pre_key_rotation_period: The rotation period for the signed pre key, in
+        seconds. The rotation period is recommended to be between one week (the default)
+        and one month.
+    @param pre_key_refill_threshold: The number of pre keys that triggers a refill to 100.
+        Defaults to 99, which means that each pre key gets replaced with a new one right
+        away. The threshold can not be configured to lower than 25.
+    @param max_num_per_session_skipped_keys: The maximum number of skipped message keys to
+        keep around per session. Once the maximum is reached, old message keys are deleted
+        to make space for newer ones. Accessible via
+        :attr:`max_num_per_session_skipped_keys`.
+    @param max_num_per_message_skipped_keys: The maximum number of skipped message keys to
+        accept in a single message. When set to ``None`` (the default), this parameter
+        defaults to the per-session maximum (i.e. the value of the
+        ``max_num_per_session_skipped_keys`` parameter). This parameter may only be 0 if
+        the per-session maximum is 0, otherwise it must be a number between 1 and the
+        per-session maximum. Accessible via :attr:`max_num_per_message_skipped_keys`.
+    @return: A session manager with ``urn:xmpp:omemo:2`` and
+        ``eu.siacs.conversations.axolotl`` capabilities, specifically for the given
+        profile.
+    @raise BundleUploadFailed: if a bundle upload failed. Forwarded from
+        :meth:`~omemo.session_manager.SessionManager.create`.
+    @raise BundleDownloadFailed: if a bundle download failed. Forwarded from
+        :meth:`~omemo.session_manager.SessionManager.create`.
+    @raise BundleDeletionFailed: if a bundle deletion failed. Forwarded from
+        :meth:`~omemo.session_manager.SessionManager.create`.
+    @raise DeviceListUploadFailed: if a device list upload failed. Forwarded from
+        :meth:`~omemo.session_manager.SessionManager.create`.
+    @raise DeviceListDownloadFailed: if a device list download failed. Forwarded from
+        :meth:`~omemo.session_manager.SessionManager.create`.
+    """
+
+    client = sat.get_client(profile)
+    xep_0060 = cast(XEP_0060, sat.plugins["XEP-0060"])
+
+    storage = StorageImpl(profile)
+
+    # TODO: Untested
+    await oldmemo.migrations.migrate(
+        LegacyStorageImpl(profile, client.jid.userhost()),
+        storage,
+        # TODO: Do we want BLINDLY_TRUSTED or TRUSTED here?
+        TrustLevel.BLINDLY_TRUSTED.name,
+        TrustLevel.UNDECIDED.name,
+        TrustLevel.DISTRUSTED.name,
+        lambda bare_jid, device_id: download_oldmemo_bundle(
+            client,
+            xep_0060,
+            bare_jid,
+            device_id
+        )
+    )
+
+    session_manager = await make_session_manager(sat, profile).create(
+        [
+            twomemo.Twomemo(
+                storage,
+                max_num_per_session_skipped_keys,
+                max_num_per_message_skipped_keys
+            ),
+            oldmemo.Oldmemo(
+                storage,
+                max_num_per_session_skipped_keys,
+                max_num_per_message_skipped_keys
+            )
+        ],
+        storage,
+        client.jid.userhost(),
+        initial_own_label,
+        TrustLevel.UNDECIDED.value,
+        signed_pre_key_rotation_period,
+        pre_key_refill_threshold,
+        omemo.AsyncFramework.TWISTED
+    )
+
+    # This shouldn't hurt here since we're not running on overly constrainted devices.
+    # TODO: Consider ensuring data consistency regularly/in response to certain events
+    await session_manager.ensure_data_consistency()
+
+    # TODO: Correct entering/leaving of the history synchronization mode isn't terribly
+    # important for now, since it only prevents an extremely unlikely race condition of
+    # multiple devices choosing the same pre key for new sessions while the device was
+    # offline. I don't believe other clients seriously defend against that race condition
+    # either. In the long run, it might still be cool to have triggers for when history
+    # sync starts and ends (MAM, MUC catch-up, etc.) and to react to those triggers.
+    await session_manager.after_history_sync()
+
+    return session_manager
+
+
+DEFAULT_TRUST_MODEL_PARAM = f"""
+<params>
+<individual>
+<category name="{PARAM_CATEGORY}" label={quoteattr(D_('Security'))}>
+    <param name="{PARAM_NAME}"
+        label={quoteattr(D_('OMEMO default trust policy'))}
+        type="list" security="3">
+        <option value="atm"
+            label={quoteattr(D_('Automatic Trust Management (more secure)'))} />
+        <option value="btbv"
+            label={quoteattr(D_('Blind Trust Before Verification (more user friendly)'))}
+            selected="true" />
+    </param>
+</category>
+</individual>
+</params>
+"""
+
+
+class OMEMO:
+    """
+    Plugin equipping Libervia with OMEMO capabilities under the (modern)
+    ``urn:xmpp:omemo:2`` namespace and the (legacy) ``eu.siacs.conversations.axolotl``
+    namespace. Both versions of the protocol are handled by this plugin and compatibility
+    between the two is maintained. MUC messages are supported next to one to one messages.
+    For trust management, the two trust models "ATM" and "BTBV" are supported.
+    """
+    NS_TWOMEMO = twomemo.twomemo.NAMESPACE
+    NS_OLDMEMO = oldmemo.oldmemo.NAMESPACE
+
+    # For MUC/MIX message stanzas, the <to/> affix is a MUST
+    SCE_PROFILE_GROUPCHAT = SCEProfile(
+        rpad_policy=SCEAffixPolicy.REQUIRED,
+        time_policy=SCEAffixPolicy.OPTIONAL,
+        to_policy=SCEAffixPolicy.REQUIRED,
+        from_policy=SCEAffixPolicy.OPTIONAL,
+        custom_policies={}
+    )
+
+    # For everything but MUC/MIX message stanzas, the <to/> affix is a MAY
+    SCE_PROFILE = SCEProfile(
+        rpad_policy=SCEAffixPolicy.REQUIRED,
+        time_policy=SCEAffixPolicy.OPTIONAL,
+        to_policy=SCEAffixPolicy.OPTIONAL,
+        from_policy=SCEAffixPolicy.OPTIONAL,
+        custom_policies={}
+    )
+
+    def __init__(self, sat: SAT) -> None:
+        """
+        @param sat: The SAT instance.
+        """
+
+        self.__sat = sat
+
+        # Add configuration option to choose between manual trust and BTBV as the trust
+        # model
+        sat.memory.update_params(DEFAULT_TRUST_MODEL_PARAM)
+
+        # Plugins
+        self.__xep_0045 = cast(Optional[XEP_0045], sat.plugins.get("XEP-0045"))
+        self.__xep_0334 = cast(XEP_0334, sat.plugins["XEP-0334"])
+        self.__xep_0359 = cast(Optional[XEP_0359], sat.plugins.get("XEP-0359"))
+        self.__xep_0420 = cast(XEP_0420, sat.plugins["XEP-0420"])
+
+        # In contrast to one to one messages, MUC messages are reflected to the sender.
+        # Thus, the sender does not add messages to their local message log when sending
+        # them, but when the reflection is received. This approach does not pair well with
+        # OMEMO, since for security reasons it is forbidden to encrypt messages for the
+        # own device. Thus, when the reflection of an OMEMO message is received, it can't
+        # be decrypted and added to the local message log as usual. To counteract this,
+        # the plaintext of encrypted messages sent to MUCs are cached in this field, such
+        # that when the reflection is received, the plaintext can be looked up from the
+        # cache and added to the local message log.
+        # TODO: The old plugin expired this cache after some time. I'm not sure that's
+        # really necessary.
+        self.__muc_plaintext_cache: Dict[MUCPlaintextCacheKey, bytes] = {}
+
+        # Mapping from profile name to corresponding session manager
+        self.__session_managers: Dict[str, omemo.SessionManager] = {}
+
+        # Calls waiting for a specific session manager to be built
+        self.__session_manager_waiters: Dict[str, List[defer.Deferred]] = {}
+
+        # These triggers are used by oldmemo, which doesn't do SCE and only applies to
+        # messages. Temporarily, until a more fitting trigger for SCE-based encryption is
+        # added, the message_received trigger is also used for twomemo.
+        sat.trigger.add(
+            "message_received",
+            self._message_received_trigger,
+            priority=100050
+        )
+        sat.trigger.add(
+            "send_message_data",
+            self.__send_message_data_trigger,
+            priority=100050
+        )
+
+        # These triggers are used by twomemo, which does do SCE
+        sat.trigger.add("send", self.__send_trigger, priority=0)
+        # TODO: Add new triggers here for freshly received and about-to-be-sent stanzas,
+        # including IQs.
+
+        # Give twomemo a (slightly) higher priority than oldmemo
+        sat.register_encryption_plugin(self, "TWOMEMO", twomemo.twomemo.NAMESPACE, 101)
+        sat.register_encryption_plugin(self, "OLDMEMO", oldmemo.oldmemo.NAMESPACE, 100)
+
+        xep_0163 = cast(XEP_0163, sat.plugins["XEP-0163"])
+        xep_0163.add_pep_event(
+            "TWOMEMO_DEVICES",
+            TWOMEMO_DEVICE_LIST_NODE,
+            lambda items_event, profile: defer.ensureDeferred(
+                self.__on_device_list_update(items_event, profile)
+            )
+        )
+        xep_0163.add_pep_event(
+            "OLDMEMO_DEVICES",
+            OLDMEMO_DEVICE_LIST_NODE,
+            lambda items_event, profile: defer.ensureDeferred(
+                self.__on_device_list_update(items_event, profile)
+            )
+        )
+
+        try:
+            self.__text_commands = cast(TextCommands, sat.plugins[C.TEXT_CMDS])
+        except KeyError:
+            log.info(_("Text commands not available"))
+        else:
+            self.__text_commands.register_text_commands(self)
+
+    def profile_connected(  # pylint: disable=invalid-name
+        self,
+        client: SatXMPPClient
+    ) -> None:
+        """
+        @param client: The client.
+        """
+
+        defer.ensureDeferred(self.get_session_manager(
+            cast(str, client.profile)
+        ))
+
+    async def cmd_omemo_reset(
+        self,
+        client: SatXMPPClient,
+        mess_data: MessageData
+    ) -> Literal[False]:
+        """Reset all sessions of devices that belong to the recipient of ``mess_data``.
+
+        This must only be callable manually by the user. Use this when a session is
+        apparently broken, i.e. sending and receiving encrypted messages doesn't work and
+        something being wrong has been confirmed manually with the recipient.
+
+        @param client: The client.
+        @param mess_data: The message data, whose ``to`` attribute will be the bare JID to
+            reset all sessions with.
+        @return: The constant value ``False``, indicating to the text commands plugin that
+            the message is not supposed to be sent.
+        """
+
+        twomemo_requested = \
+            client.encryption.is_encryption_requested(mess_data, twomemo.twomemo.NAMESPACE)
+        oldmemo_requested = \
+            client.encryption.is_encryption_requested(mess_data, oldmemo.oldmemo.NAMESPACE)
+
+        if not (twomemo_requested or oldmemo_requested):
+            self.__text_commands.feed_back(
+                client,
+                _("You need to have OMEMO encryption activated to reset the session"),
+                mess_data
+            )
+            return False
+
+        bare_jid = mess_data["to"].userhost()
+
+        session_manager = await self.get_session_manager(client.profile)
+        devices = await session_manager.get_device_information(bare_jid)
+
+        for device in devices:
+            log.debug(f"Replacing sessions with device {device}")
+            await session_manager.replace_sessions(device)
+
+        self.__text_commands.feed_back(
+            client,
+            _("OMEMO session has been reset"),
+            mess_data
+        )
+
+        return False
+
+    async def get_trust_ui(  # pylint: disable=invalid-name
+        self,
+        client: SatXMPPClient,
+        entity: jid.JID
+    ) -> xml_tools.XMLUI:
+        """
+        @param client: The client.
+        @param entity: The entity whose device trust levels to manage.
+        @return: An XMLUI instance which opens a form to manage the trust level of all
+            devices belonging to the entity.
+        """
+
+        if entity.resource:
+            raise ValueError("A bare JID is expected.")
+
+        bare_jids: Set[str]
+        if self.__xep_0045 is not None and self.__xep_0045.is_joined_room(client, entity):
+            bare_jids = self.__get_joined_muc_users(client, self.__xep_0045, entity)
+        else:
+            bare_jids = { entity.userhost() }
+
+        session_manager = await self.get_session_manager(client.profile)
+
+        # At least sort the devices by bare JID such that they aren't listed completely
+        # random
+        devices = sorted(cast(Set[omemo.DeviceInformation], set()).union(*[
+            await session_manager.get_device_information(bare_jid)
+            for bare_jid
+            in bare_jids
+        ]), key=lambda device: device.bare_jid)
+
+        async def callback(
+            data: Any,
+            profile: str
+        ) -> Dict[Never, Never]:
+            """
+            @param data: The XMLUI result produces by the trust UI form.
+            @param profile: The profile.
+            @return: An empty dictionary. The type of the return value was chosen
+                conservatively since the exact options are neither known not needed here.
+            """
+
+            if C.bool(data.get("cancelled", "false")):
+                return {}
+
+            data_form_result = cast(
+                Dict[str, str],
+                xml_tools.xmlui_result_2_data_form_result(data)
+            )
+
+            trust_updates: Set[TrustUpdate] = set()
+
+            for key, value in data_form_result.items():
+                if not key.startswith("trust_"):
+                    continue
+
+                device = devices[int(key[len("trust_"):])]
+                trust_level_name = value
+
+                if device.trust_level_name != trust_level_name:
+                    await session_manager.set_trust(
+                        device.bare_jid,
+                        device.identity_key,
+                        trust_level_name
+                    )
+
+                    target_trust: Optional[bool] = None
+
+                    if TrustLevel(trust_level_name) is TrustLevel.TRUSTED:
+                        target_trust = True
+                    if TrustLevel(trust_level_name) is TrustLevel.DISTRUSTED:
+                        target_trust = False
+
+                    if target_trust is not None:
+                        trust_updates.add(TrustUpdate(
+                            target_jid=jid.JID(device.bare_jid).userhostJID(),
+                            target_key=device.identity_key,
+                            target_trust=target_trust
+                        ))
+
+            # Check whether ATM is enabled and handle everything in case it is
+            trust_system = cast(str, self.__sat.memory.param_get_a(
+                PARAM_NAME,
+                PARAM_CATEGORY,
+                profile_key=profile
+            ))
+
+            if trust_system == "atm":
+                if len(trust_updates) > 0:
+                    await manage_trust_message_cache(
+                        client,
+                        session_manager,
+                        frozenset(trust_updates)
+                    )
+
+                    await send_trust_messages(
+                        client,
+                        session_manager,
+                        frozenset(trust_updates)
+                    )
+
+            return {}
+
+        submit_id = self.__sat.register_callback(callback, with_data=True, one_shot=True)
+
+        result = xml_tools.XMLUI(
+            panel_type=C.XMLUI_FORM,
+            title=D_("OMEMO trust management"),
+            submit_id=submit_id
+        )
+        # Casting this to Any, otherwise all calls on the variable cause type errors
+        # pylint: disable=no-member
+        trust_ui = cast(Any, result)
+        trust_ui.addText(D_(
+            "This is OMEMO trusting system. You'll see below the devices of your"
+            " contacts, and a list selection to trust them or not. A trusted device"
+            " can read your messages in plain text, so be sure to only validate"
+            " devices that you are sure are belonging to your contact. It's better"
+            " to do this when you are next to your contact and their device, so"
+            " you can check the \"fingerprint\" (the number next to the device)"
+            " yourself. Do *not* validate a device if the fingerprint is wrong!"
+            " Note that manually validating a fingerprint disables any form of automatic"
+            " trust."
+        ))
+
+        own_device, __ = await session_manager.get_own_device_information()
+
+        trust_ui.change_container("label")
+        trust_ui.addLabel(D_("This device ID"))
+        trust_ui.addText(str(own_device.device_id))
+        trust_ui.addLabel(D_("This device's fingerprint"))
+        trust_ui.addText(" ".join(session_manager.format_identity_key(
+            own_device.identity_key
+        )))
+        trust_ui.addEmpty()
+        trust_ui.addEmpty()
+
+        for index, device in enumerate(devices):
+            trust_ui.addLabel(D_("Contact"))
+            trust_ui.addJid(jid.JID(device.bare_jid))
+            trust_ui.addLabel(D_("Device ID"))
+            trust_ui.addText(str(device.device_id))
+            trust_ui.addLabel(D_("Fingerprint"))
+            trust_ui.addText(" ".join(session_manager.format_identity_key(
+                device.identity_key
+            )))
+            trust_ui.addLabel(D_("Trust this device?"))
+
+            current_trust_level = TrustLevel(device.trust_level_name)
+            avaiable_trust_levels = \
+                { TrustLevel.DISTRUSTED, TrustLevel.TRUSTED, current_trust_level }
+
+            trust_ui.addList(
+                f"trust_{index}",
+                options=[ trust_level.name for trust_level in avaiable_trust_levels ],
+                selected=current_trust_level.name,
+                styles=[ "inline" ]
+            )
+
+            twomemo_active = dict(device.active).get(twomemo.twomemo.NAMESPACE)
+            if twomemo_active is None:
+                trust_ui.addEmpty()
+                trust_ui.addLabel(D_("(not available for Twomemo)"))
+            if twomemo_active is False:
+                trust_ui.addEmpty()
+                trust_ui.addLabel(D_("(inactive for Twomemo)"))
+
+            oldmemo_active = dict(device.active).get(oldmemo.oldmemo.NAMESPACE)
+            if oldmemo_active is None:
+                trust_ui.addEmpty()
+                trust_ui.addLabel(D_("(not available for Oldmemo)"))
+            if oldmemo_active is False:
+                trust_ui.addEmpty()
+                trust_ui.addLabel(D_("(inactive for Oldmemo)"))
+
+            trust_ui.addEmpty()
+            trust_ui.addEmpty()
+
+        return result
+
+    @staticmethod
+    def __get_joined_muc_users(
+        client: SatXMPPClient,
+        xep_0045: XEP_0045,
+        room_jid: jid.JID
+    ) -> Set[str]:
+        """
+        @param client: The client.
+        @param xep_0045: A MUC plugin instance.
+        @param room_jid: The room JID.
+        @return: A set containing the bare JIDs of the MUC participants.
+        @raise InternalError: if the MUC is not joined or the entity information of a
+            participant isn't available.
+        """
+
+        bare_jids: Set[str] = set()
+
+        try:
+            room = cast(muc.Room, xep_0045.get_room(client, room_jid))
+        except exceptions.NotFound as e:
+            raise exceptions.InternalError(
+                "Participant list of unjoined MUC requested."
+            ) from e
+
+        for user in cast(Dict[str, muc.User], room.roster).values():
+            entity = cast(Optional[SatXMPPEntity], user.entity)
+            if entity is None:
+                raise exceptions.InternalError(
+                    f"Participant list of MUC requested, but the entity information of"
+                    f" the participant {user} is not available."
+                )
+
+            bare_jids.add(entity.jid.userhost())
+
+        return bare_jids
+
+    async def get_session_manager(self, profile: str) -> omemo.SessionManager:
+        """
+        @param profile: The profile to prepare for.
+        @return: A session manager instance for this profile. Creates a new instance if
+            none was prepared before.
+        """
+
+        try:
+            # Try to return the session manager
+            return self.__session_managers[profile]
+        except KeyError:
+            # If a session manager for that profile doesn't exist yet, check whether it is
+            # currently being built. A session manager being built is signified by the
+            # profile key existing on __session_manager_waiters.
+            if profile in self.__session_manager_waiters:
+                # If the session manager is being built, add ourselves to the waiting
+                # queue
+                deferred = defer.Deferred()
+                self.__session_manager_waiters[profile].append(deferred)
+                return cast(omemo.SessionManager, await deferred)
+
+            # If the session manager is not being built, do so here.
+            self.__session_manager_waiters[profile] = []
+
+            # Build and store the session manager
+            try:
+                session_manager = await prepare_for_profile(
+                    self.__sat,
+                    profile,
+                    initial_own_label="Libervia"
+                )
+            except Exception as e:
+                # In case of an error during initalization, notify the waiters accordingly
+                # and delete them
+                for waiter in self.__session_manager_waiters[profile]:
+                    waiter.errback(e)
+                del self.__session_manager_waiters[profile]
+
+                # Re-raise the exception
+                raise
+
+            self.__session_managers[profile] = session_manager
+
+            # Notify the waiters and delete them
+            for waiter in self.__session_manager_waiters[profile]:
+                waiter.callback(session_manager)
+            del self.__session_manager_waiters[profile]
+
+            return session_manager
+
+    async def __message_received_trigger_atm(
+        self,
+        client: SatXMPPClient,
+        message_elt: domish.Element,
+        session_manager: omemo.SessionManager,
+        sender_device_information: omemo.DeviceInformation,
+        timestamp: datetime
+    ) -> None:
+        """Check a newly decrypted message stanza for ATM content and perform ATM in case.
+
+        @param client: The client which received the message.
+        @param message_elt: The message element. Can be modified.
+        @param session_manager: The session manager.
+        @param sender_device_information: Information about the device that sent/encrypted
+            the message.
+        @param timestamp: Timestamp extracted from the SCE time affix.
+        """
+
+        trust_message_cache = persistent.LazyPersistentBinaryDict(
+            "XEP-0384/TM",
+            client.profile
+        )
+
+        new_cache_entries: Set[TrustMessageCacheEntry] = set()
+
+        for trust_message_elt in message_elt.elements(NS_TM, "trust-message"):
+            assert isinstance(trust_message_elt, domish.Element)
+
+            try:
+                TRUST_MESSAGE_SCHEMA.validate(trust_message_elt.toXml())
+            except xmlschema.XMLSchemaValidationError as e:
+                raise exceptions.ParsingError(
+                    "<trust-message/> element doesn't pass schema validation."
+                ) from e
+
+            if trust_message_elt["usage"] != NS_ATM:
+                # Skip non-ATM trust message
+                continue
+
+            if trust_message_elt["encryption"] != OMEMO.NS_TWOMEMO:
+                # Skip non-twomemo trust message
+                continue
+
+            for key_owner_elt in trust_message_elt.elements(NS_TM, "key-owner"):
+                assert isinstance(key_owner_elt, domish.Element)
+
+                key_owner_jid = jid.JID(key_owner_elt["jid"]).userhostJID()
+
+                for trust_elt in key_owner_elt.elements(NS_TM, "trust"):
+                    assert isinstance(trust_elt, domish.Element)
+
+                    new_cache_entries.add(TrustMessageCacheEntry(
+                        sender_jid=jid.JID(sender_device_information.bare_jid),
+                        sender_key=sender_device_information.identity_key,
+                        timestamp=timestamp,
+                        trust_update=TrustUpdate(
+                            target_jid=key_owner_jid,
+                            target_key=base64.b64decode(str(trust_elt)),
+                            target_trust=True
+                        )
+                    ))
+
+                for distrust_elt in key_owner_elt.elements(NS_TM, "distrust"):
+                    assert isinstance(distrust_elt, domish.Element)
+
+                    new_cache_entries.add(TrustMessageCacheEntry(
+                        sender_jid=jid.JID(sender_device_information.bare_jid),
+                        sender_key=sender_device_information.identity_key,
+                        timestamp=timestamp,
+                        trust_update=TrustUpdate(
+                            target_jid=key_owner_jid,
+                            target_key=base64.b64decode(str(distrust_elt)),
+                            target_trust=False
+                        )
+                    ))
+
+        # Load existing cache entries
+        existing_cache_entries = cast(
+            Set[TrustMessageCacheEntry],
+            await trust_message_cache.get("cache", set())
+        )
+
+        # Discard cache entries by timestamp comparison
+        existing_by_target = {
+            (
+                cache_entry.trust_update.target_jid.userhostJID(),
+                cache_entry.trust_update.target_key
+            ): cache_entry
+            for cache_entry
+            in existing_cache_entries
+        }
+
+        # Iterate over a copy here, such that new_cache_entries can be modified
+        for new_cache_entry in set(new_cache_entries):
+            existing_cache_entry = existing_by_target.get(
+                (
+                    new_cache_entry.trust_update.target_jid.userhostJID(),
+                    new_cache_entry.trust_update.target_key
+                ),
+                None
+            )
+
+            if existing_cache_entry is not None:
+                if existing_cache_entry.timestamp > new_cache_entry.timestamp:
+                    # If the existing cache entry is newer than the new cache entry,
+                    # discard the new one in favor of the existing one
+                    new_cache_entries.remove(new_cache_entry)
+                else:
+                    # Otherwise, discard the existing cache entry. This includes the case
+                    # when both cache entries have matching timestamps.
+                    existing_cache_entries.remove(existing_cache_entry)
+
+        # If the sending device is trusted, apply the new cache entries
+        applied_trust_updates: Set[TrustUpdate] = set()
+
+        if TrustLevel(sender_device_information.trust_level_name) is TrustLevel.TRUSTED:
+            # Iterate over a copy such that new_cache_entries can be modified
+            for cache_entry in set(new_cache_entries):
+                trust_update = cache_entry.trust_update
+
+                trust_level = (
+                    TrustLevel.TRUSTED
+                    if trust_update.target_trust
+                    else TrustLevel.DISTRUSTED
+                )
+
+                await session_manager.set_trust(
+                    trust_update.target_jid.userhost(),
+                    trust_update.target_key,
+                    trust_level.name
+                )
+
+                applied_trust_updates.add(trust_update)
+
+                new_cache_entries.remove(cache_entry)
+
+        # Store the remaining existing and new cache entries
+        await trust_message_cache.force(
+            "cache",
+            existing_cache_entries | new_cache_entries
+        )
+
+        # If the trust of at least one device was modified, run the ATM cache update logic
+        if len(applied_trust_updates) > 0:
+            await manage_trust_message_cache(
+                client,
+                session_manager,
+                frozenset(applied_trust_updates)
+            )
+
+    async def _message_received_trigger(
+        self,
+        client: SatXMPPClient,
+        message_elt: domish.Element,
+        post_treat: defer.Deferred
+    ) -> bool:
+        """
+        @param client: The client which received the message.
+        @param message_elt: The message element. Can be modified.
+        @param post_treat: A deferred which evaluates to a :class:`MessageData` once the
+            message has fully progressed through the message receiving flow. Can be used
+            to apply treatments to the fully processed message, like marking it as
+            encrypted.
+        @return: Whether to continue the message received flow.
+        """
+        if client.is_component:
+            return True
+        muc_plaintext_cache_key: Optional[MUCPlaintextCacheKey] = None
+
+        sender_jid = jid.JID(message_elt["from"])
+        feedback_jid: jid.JID
+
+        message_type = message_elt.getAttribute("type", C.MESS_TYPE_NORMAL)
+        is_muc_message = message_type == C.MESS_TYPE_GROUPCHAT
+        if is_muc_message:
+            if self.__xep_0045 is None:
+                log.warning(
+                    "Ignoring MUC message since plugin XEP-0045 is not available."
+                )
+                # Can't handle a MUC message without XEP-0045, let the flow continue
+                # normally
+                return True
+
+            room_jid = feedback_jid = sender_jid.userhostJID()
+
+            try:
+                room = cast(muc.Room, self.__xep_0045.get_room(client, room_jid))
+            except exceptions.NotFound:
+                log.warning(
+                    f"Ignoring MUC message from a room that has not been joined:"
+                    f" {room_jid}"
+                )
+                # Whatever, let the flow continue
+                return True
+
+            sender_user = cast(Optional[muc.User], room.getUser(sender_jid.resource))
+            if sender_user is None:
+                log.warning(
+                    f"Ignoring MUC message from room {room_jid} since the sender's user"
+                    f" wasn't found {sender_jid.resource}"
+                )
+                # Whatever, let the flow continue
+                return True
+
+            sender_user_jid = cast(Optional[jid.JID], sender_user.entity)
+            if sender_user_jid is None:
+                log.warning(
+                    f"Ignoring MUC message from room {room_jid} since the sender's bare"
+                    f" JID couldn't be found from its user information: {sender_user}"
+                )
+                # Whatever, let the flow continue
+                return True
+
+            sender_jid = sender_user_jid
+
+            message_uid: Optional[str] = None
+            if self.__xep_0359 is not None:
+                message_uid = self.__xep_0359.get_origin_id(message_elt)
+            if message_uid is None:
+                message_uid = message_elt.getAttribute("id")
+            if message_uid is not None:
+                muc_plaintext_cache_key = MUCPlaintextCacheKey(
+                    client,
+                    room_jid,
+                    message_uid
+                )
+        else:
+            # I'm not sure why this check is required, this code is copied from the old
+            # plugin.
+            if sender_jid.userhostJID() == client.jid.userhostJID():
+                try:
+                    feedback_jid = jid.JID(message_elt["to"])
+                except KeyError:
+                    feedback_jid = client.server_jid
+            else:
+                feedback_jid = sender_jid
+
+        sender_bare_jid = sender_jid.userhost()
+
+        message: Optional[omemo.Message] = None
+        encrypted_elt: Optional[domish.Element] = None
+
+        twomemo_encrypted_elt = cast(Optional[domish.Element], next(
+            message_elt.elements(twomemo.twomemo.NAMESPACE, "encrypted"),
+            None
+        ))
+
+        oldmemo_encrypted_elt = cast(Optional[domish.Element], next(
+            message_elt.elements(oldmemo.oldmemo.NAMESPACE, "encrypted"),
+            None
+        ))
+
+        try:
+            session_manager = await self.get_session_manager(cast(str, client.profile))
+        except Exception as e:
+            log.error(f"error while preparing profile for {client.profile}: {e}")
+            # we don't want to block the workflow
+            return True
+
+        if twomemo_encrypted_elt is not None:
+            try:
+                message = twomemo.etree.parse_message(
+                    xml_tools.domish_elt_2_et_elt(twomemo_encrypted_elt),
+                    sender_bare_jid
+                )
+            except (ValueError, XMLSchemaValidationError):
+                log.warning(
+                    f"Ingoring malformed encrypted message for namespace"
+                    f" {twomemo.twomemo.NAMESPACE}: {twomemo_encrypted_elt.toXml()}"
+                )
+            else:
+                encrypted_elt = twomemo_encrypted_elt
+
+        if oldmemo_encrypted_elt is not None:
+            try:
+                message = await oldmemo.etree.parse_message(
+                    xml_tools.domish_elt_2_et_elt(oldmemo_encrypted_elt),
+                    sender_bare_jid,
+                    client.jid.userhost(),
+                    session_manager
+                )
+            except (ValueError, XMLSchemaValidationError):
+                log.warning(
+                    f"Ingoring malformed encrypted message for namespace"
+                    f" {oldmemo.oldmemo.NAMESPACE}: {oldmemo_encrypted_elt.toXml()}"
+                )
+            except omemo.SenderNotFound:
+                log.warning(
+                    f"Ingoring encrypted message for namespace"
+                    f" {oldmemo.oldmemo.NAMESPACE} by unknown sender:"
+                    f" {oldmemo_encrypted_elt.toXml()}"
+                )
+            else:
+                encrypted_elt = oldmemo_encrypted_elt
+
+        if message is None or encrypted_elt is None:
+            # None of our business, let the flow continue
+            return True
+
+        message_elt.children.remove(encrypted_elt)
+
+        log.debug(
+            f"{message.namespace} message of type {message_type} received from"
+            f" {sender_bare_jid}"
+        )
+
+        plaintext: Optional[bytes]
+        device_information: omemo.DeviceInformation
+
+        if (
+            muc_plaintext_cache_key is not None
+            and muc_plaintext_cache_key in self.__muc_plaintext_cache
+        ):
+            # Use the cached plaintext
+            plaintext = self.__muc_plaintext_cache.pop(muc_plaintext_cache_key)
+
+            # Since this message was sent by us, use the own device information here
+            device_information, __ = await session_manager.get_own_device_information()
+        else:
+            try:
+                plaintext, device_information, __ = await session_manager.decrypt(message)
+            except omemo.MessageNotForUs:
+                # The difference between this being a debug or a warning is whether there
+                # is a body included in the message. Without a body, we can assume that
+                # it's an empty OMEMO message used for protocol stability reasons, which
+                # is not expected to be sent to all devices of all recipients. If a body
+                # is included, we can assume that the message carries content and we
+                # missed out on something.
+                if len(list(message_elt.elements(C.NS_CLIENT, "body"))) > 0:
+                    client.feedback(
+                        feedback_jid,
+                        D_(
+                            f"An OMEMO message from {sender_jid.full()} has not been"
+                            f" encrypted for our device, we can't decrypt it."
+                        ),
+                        { C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR }
+                    )
+                    log.warning("Message not encrypted for us.")
+                else:
+                    log.debug("Message not encrypted for us.")
+
+                # No point in further processing this message.
+                return False
+            except Exception as e:
+                log.warning(_("Can't decrypt message: {reason}\n{xml}").format(
+                    reason=e,
+                    xml=message_elt.toXml()
+                ))
+                client.feedback(
+                    feedback_jid,
+                    D_(
+                        f"An OMEMO message from {sender_jid.full()} can't be decrypted:"
+                        f" {e}"
+                    ),
+                    { C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR }
+                )
+                # No point in further processing this message
+                return False
+
+        affix_values: Optional[SCEAffixValues] = None
+
+        if message.namespace == twomemo.twomemo.NAMESPACE:
+            if plaintext is not None:
+                # XEP_0420.unpack_stanza handles the whole unpacking, including the
+                # relevant modifications to the element
+                sce_profile = \
+                    OMEMO.SCE_PROFILE_GROUPCHAT if is_muc_message else OMEMO.SCE_PROFILE
+                try:
+                    affix_values = self.__xep_0420.unpack_stanza(
+                        sce_profile,
+                        message_elt,
+                        plaintext
+                    )
+                except Exception as e:
+                    log.warning(D_(
+                        f"Error unpacking SCE-encrypted message: {e}\n{plaintext}"
+                    ))
+                    client.feedback(
+                        feedback_jid,
+                        D_(
+                            f"An OMEMO message from {sender_jid.full()} was rejected:"
+                            f" {e}"
+                        ),
+                        { C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR }
+                    )
+                    # No point in further processing this message
+                    return False
+                else:
+                    if affix_values.timestamp is not None:
+                        # TODO: affix_values.timestamp contains the timestamp included in
+                        # the encrypted element here. The XEP says it SHOULD be displayed
+                        # with the plaintext by clients.
+                        pass
+
+        if message.namespace == oldmemo.oldmemo.NAMESPACE:
+            # Remove all body elements from the original element, since those act as
+            # fallbacks in case the encryption protocol is not supported
+            for child in message_elt.elements():
+                if child.name == "body":
+                    message_elt.children.remove(child)
+
+            if plaintext is not None:
+                # Add the decrypted body
+                message_elt.addElement("body", content=plaintext.decode("utf-8"))
+
+        # Mark the message as trusted or untrusted. Undecided counts as untrusted here.
+        trust_level = \
+            await session_manager._evaluate_custom_trust_level(device_information)
+
+        if trust_level is omemo.TrustLevel.TRUSTED:
+            post_treat.addCallback(client.encryption.mark_as_trusted)
+        else:
+            post_treat.addCallback(client.encryption.mark_as_untrusted)
+
+        # Mark the message as originally encrypted
+        post_treat.addCallback(
+            client.encryption.mark_as_encrypted,
+            namespace=message.namespace
+        )
+
+        # Handle potential ATM trust updates
+        if affix_values is not None and affix_values.timestamp is not None:
+            await self.__message_received_trigger_atm(
+                client,
+                message_elt,
+                session_manager,
+                device_information,
+                affix_values.timestamp
+            )
+
+        # Message processed successfully, continue with the flow
+        return True
+
+    async def __send_trigger(self, client: SatXMPPClient, stanza: domish.Element) -> bool:
+        """
+        @param client: The client sending this message.
+        @param stanza: The stanza that is about to be sent. Can be modified.
+        @return: Whether the send message flow should continue or not.
+        """
+        # SCE is only applicable to message and IQ stanzas
+        # FIXME: temporary disabling IQ stanza encryption
+        if stanza.name not in { "message" }:  # , "iq" }:
+            return True
+
+        # Get the intended recipient
+        recipient = stanza.getAttribute("to", None)
+        if recipient is None:
+            if stanza.name == "message":
+                # Message stanzas must have a recipient
+                raise exceptions.InternalError(
+                    f"Message without recipient encountered. Blocking further processing"
+                    f" to avoid leaking plaintext data: {stanza.toXml()}"
+                )
+
+            # IQs without a recipient are a thing, I believe those simply target the
+            # server and are thus not eligible for e2ee anyway.
+            return True
+
+        # Parse the JID
+        recipient_bare_jid = jid.JID(recipient).userhostJID()
+
+        # Check whether encryption with twomemo is requested
+        encryption = client.encryption.getSession(recipient_bare_jid)
+
+        if encryption is None:
+            # Encryption is not requested for this recipient
+            return True
+
+        if encryption["plugin"].namespace != twomemo.twomemo.NAMESPACE:
+            # Encryption is requested for this recipient, but not with twomemo
+            return True
+
+        # All pre-checks done, we can start encrypting!
+        await self.encrypt(
+            client,
+            twomemo.twomemo.NAMESPACE,
+            stanza,
+            recipient_bare_jid,
+            stanza.getAttribute("type", C.MESS_TYPE_NORMAL) == C.MESS_TYPE_GROUPCHAT,
+            stanza.getAttribute("id", None)
+        )
+
+        # Add a store hint if this is a message stanza
+        if stanza.name == "message":
+            self.__xep_0334.add_hint_elements(stanza, [ "store" ])
+
+        # Let the flow continue.
+        return True
+
+    async def __send_message_data_trigger(
+        self,
+        client: SatXMPPClient,
+        mess_data: MessageData
+    ) -> None:
+        """
+        @param client: The client sending this message.
+        @param mess_data: The message data that is about to be sent. Can be modified.
+        """
+
+        # Check whether encryption is requested for this message
+        try:
+            namespace = mess_data[C.MESS_KEY_ENCRYPTION]["plugin"].namespace
+        except KeyError:
+            return
+
+        # If encryption is requested, check whether it's oldmemo
+        if namespace != oldmemo.oldmemo.NAMESPACE:
+            return
+
+        # All pre-checks done, we can start encrypting!
+        stanza = mess_data["xml"]
+        recipient_jid = mess_data["to"]
+        is_muc_message = mess_data["type"] == C.MESS_TYPE_GROUPCHAT
+        stanza_id = mess_data["uid"]
+
+        await self.encrypt(
+            client,
+            oldmemo.oldmemo.NAMESPACE,
+            stanza,
+            recipient_jid,
+            is_muc_message,
+            stanza_id
+        )
+
+        # Add a store hint
+        self.__xep_0334.add_hint_elements(stanza, [ "store" ])
+
+    async def encrypt(
+        self,
+        client: SatXMPPClient,
+        namespace: Literal["urn:xmpp:omemo:2", "eu.siacs.conversations.axolotl"],
+        stanza: domish.Element,
+        recipient_jids: Union[jid.JID, Set[jid.JID]],
+        is_muc_message: bool,
+        stanza_id: Optional[str]
+    ) -> None:
+        """
+        @param client: The client.
+        @param namespace: The namespace of the OMEMO version to use.
+        @param stanza: The stanza. Twomemo will encrypt the whole stanza using SCE,
+            oldmemo will encrypt only the body. The stanza is modified by this call.
+        @param recipient_jid: The JID of the recipients.
+            Can be a bare (aka "userhost") JIDs but doesn't have to.
+            A single JID can be used.
+        @param is_muc_message: Whether the stanza is a message stanza to a MUC room.
+        @param stanza_id: The id of this stanza. Especially relevant for message stanzas
+            to MUC rooms such that the outgoing plaintext can be cached for MUC message
+            reflection handling.
+
+        @warning: The calling code MUST take care of adding the store message processing
+            hint to the stanza if applicable! This can be done before or after this call,
+            the order doesn't matter.
+        """
+        if isinstance(recipient_jids, jid.JID):
+            recipient_jids = {recipient_jids}
+        if not recipient_jids:
+            raise exceptions.InternalError("At least one JID must be specified")
+        recipient_jid = next(iter(recipient_jids))
+
+        muc_plaintext_cache_key: Optional[MUCPlaintextCacheKey] = None
+
+        recipient_bare_jids: Set[str]
+        feedback_jid: jid.JID
+
+        if is_muc_message:
+            if len(recipient_jids) != 1:
+                raise exceptions.InternalError(
+                    'Only one JID can be set when "is_muc_message" is set'
+                )
+            if self.__xep_0045 is None:
+                raise exceptions.InternalError(
+                    "Encryption of MUC message requested, but plugin XEP-0045 is not"
+                    " available."
+                )
+
+            if stanza_id is None:
+                raise exceptions.InternalError(
+                    "Encryption of MUC message requested, but stanza id not available."
+                )
+
+            room_jid = feedback_jid = recipient_jid.userhostJID()
+
+            recipient_bare_jids = self.__get_joined_muc_users(
+                client,
+                self.__xep_0045,
+                room_jid
+            )
+
+            muc_plaintext_cache_key = MUCPlaintextCacheKey(
+                client=client,
+                room_jid=room_jid,
+                message_uid=stanza_id
+            )
+        else:
+            recipient_bare_jids = {r.userhost() for r in recipient_jids}
+            feedback_jid = recipient_jid.userhostJID()
+
+        log.debug(
+            f"Intercepting message that is to be encrypted by {namespace} for"
+            f" {recipient_bare_jids}"
+        )
+
+        def prepare_stanza() -> Optional[bytes]:
+            """Prepares the stanza for encryption.
+
+            Does so by removing all parts that are not supposed to be sent in plain. Also
+            extracts/prepares the plaintext to encrypt.
+
+            @return: The plaintext to encrypt. Returns ``None`` in case body-only
+                encryption is requested and no body was found. The function should
+                gracefully return in that case, i.e. it's not a critical error that should
+                abort the message sending flow.
+            """
+
+            if namespace == twomemo.twomemo.NAMESPACE:
+                return self.__xep_0420.pack_stanza(
+                    OMEMO.SCE_PROFILE_GROUPCHAT if is_muc_message else OMEMO.SCE_PROFILE,
+                    stanza
+                )
+
+            if namespace == oldmemo.oldmemo.NAMESPACE:
+                plaintext: Optional[bytes] = None
+
+                for child in stanza.elements():
+                    if child.name == "body" and plaintext is None:
+                        plaintext = str(child).encode("utf-8")
+
+                    # Any other sensitive elements to remove here?
+                    if child.name in { "body", "html" }:
+                        stanza.children.remove(child)
+
+                if plaintext is None:
+                    log.warning(
+                        "No body found in intercepted message to be encrypted with"
+                        " oldmemo."
+                    )
+
+                return plaintext
+
+            return assert_never(namespace)
+
+        # The stanza/plaintext preparation was moved into its own little function for type
+        # safety reasons.
+        plaintext = prepare_stanza()
+        if plaintext is None:
+            return
+
+        log.debug(f"Plaintext to encrypt: {plaintext}")
+
+        session_manager = await self.get_session_manager(client.profile)
+
+        try:
+            messages, encryption_errors = await session_manager.encrypt(
+                frozenset(recipient_bare_jids),
+                { namespace: plaintext },
+                backend_priority_order=[ namespace ],
+                identifier=feedback_jid.userhost()
+            )
+        except Exception as e:
+            msg = _(
+                # pylint: disable=consider-using-f-string
+                "Can't encrypt message for {entities}: {reason}".format(
+                    entities=', '.join(recipient_bare_jids),
+                    reason=e
+                )
+            )
+            log.warning(msg)
+            client.feedback(feedback_jid, msg, {
+                C.MESS_EXTRA_INFO: C.EXTRA_INFO_ENCR_ERR
+            })
+            raise e
+
+        if len(encryption_errors) > 0:
+            log.warning(
+                f"Ignored the following non-critical encryption errors:"
+                f" {encryption_errors}"
+            )
+
+            encrypted_errors_stringified = ", ".join([
+                f"device {err.device_id} of {err.bare_jid} under namespace"
+                f" {err.namespace}"
+                for err
+                in encryption_errors
+            ])
+
+            client.feedback(
+                feedback_jid,
+                D_(
+                    "There were non-critical errors during encryption resulting in some"
+                    " of your destinees' devices potentially not receiving the message."
+                    " This happens when the encryption data/key material of a device is"
+                    " incomplete or broken, which shouldn't happen for actively used"
+                    " devices, and can usually be ignored. The following devices are"
+                    f" affected: {encrypted_errors_stringified}."
+                )
+            )
+
+        message = next(message for message in messages if message.namespace == namespace)
+
+        if namespace == twomemo.twomemo.NAMESPACE:
+            # Add the encrypted element
+            stanza.addChild(xml_tools.et_elt_2_domish_elt(
+                twomemo.etree.serialize_message(message)
+            ))
+
+        if namespace == oldmemo.oldmemo.NAMESPACE:
+            # Add the encrypted element
+            stanza.addChild(xml_tools.et_elt_2_domish_elt(
+                oldmemo.etree.serialize_message(message)
+            ))
+
+        if muc_plaintext_cache_key is not None:
+            self.__muc_plaintext_cache[muc_plaintext_cache_key] = plaintext
+
+    async def __on_device_list_update(
+        self,
+        items_event: pubsub.ItemsEvent,
+        profile: str
+    ) -> None:
+        """Handle device list updates fired by PEP.
+
+        @param items_event: The event.
+        @param profile: The profile this event belongs to.
+        """
+
+        sender = cast(jid.JID, items_event.sender)
+        items = cast(List[domish.Element], items_event.items)
+
+        if len(items) > 1:
+            log.warning("Ignoring device list update with more than one element.")
+            return
+
+        item = next(iter(items), None)
+        if item is None:
+            log.debug("Ignoring empty device list update.")
+            return
+
+        item_elt = xml_tools.domish_elt_2_et_elt(item)
+
+        device_list: Dict[int, Optional[str]] = {}
+        namespace: Optional[str] = None
+
+        list_elt = item_elt.find(f"{{{twomemo.twomemo.NAMESPACE}}}devices")
+        if list_elt is not None:
+            try:
+                device_list = twomemo.etree.parse_device_list(list_elt)
+            except XMLSchemaValidationError:
+                pass
+            else:
+                namespace = twomemo.twomemo.NAMESPACE
+
+        list_elt = item_elt.find(f"{{{oldmemo.oldmemo.NAMESPACE}}}list")
+        if list_elt is not None:
+            try:
+                device_list = oldmemo.etree.parse_device_list(list_elt)
+            except XMLSchemaValidationError:
+                pass
+            else:
+                namespace = oldmemo.oldmemo.NAMESPACE
+
+        if namespace is None:
+            log.warning(
+                f"Malformed device list update item:"
+                f" {ET.tostring(item_elt, encoding='unicode')}"
+            )
+            return
+
+        session_manager = await self.get_session_manager(profile)
+
+        await session_manager.update_device_list(
+            namespace,
+            sender.userhost(),
+            device_list
+        )