diff sat/plugins/plugin_xep_0384.py @ 3911:8289ac1b34f4

plugin XEP-0384: Fully reworked to adjust to the reworked python-omemo: - support for both (modern) OMEMO under the `urn:xmpp:omemo:2` namespace and (legacy) OMEMO under the `eu.siacs.conversations.axolotl` namespace - maintains one identity across both versions of OMEMO - migrates data from the old plugin - includes more features for protocol stability - uses SCE for modern OMEMO - fully type-checked, linted and format-checked - added type hints to various pieces of backend code used by the plugin - added stubs for some Twisted APIs used by the plugin under stubs/ (use `export MYPYPATH=stubs/` before running mypy) - core (xmpp): enabled `send` trigger and made it an asyncPoint fix 375
author Syndace <me@syndace.dev>
date Tue, 23 Aug 2022 21:06:24 +0200
parents cc653b2685f0
children 4cb38c8312a1
line wrap: on
line diff
--- a/sat/plugins/plugin_xep_0384.py	Thu Sep 22 12:03:12 2022 +0200
+++ b/sat/plugins/plugin_xep_0384.py	Tue Aug 23 21:06:24 2022 +0200
@@ -1,7 +1,7 @@
 #!/usr/bin/env python3
 
-# SAT plugin for OMEMO encryption
-# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+# Libervia plugin for OMEMO encryption
+# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev)
 
 # This program is free software: you can redistribute it and/or modify
 # it under the terms of the GNU Affero General Public License as published by
@@ -16,1431 +16,2055 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
+import enum
 import logging
-import random
-import base64
-from functools import partial
+import time
+from typing import (
+    Any, Callable, Dict, FrozenSet, List, Literal, NamedTuple, Optional, Set, Type, cast
+)
+import uuid
+import xml.etree.ElementTree as ET
 from xml.sax.saxutils import quoteattr
-from sat.core.i18n import _, D_
+
+from typing_extensions import Never, assert_never
+from wokkel import muc, pubsub  # type: ignore[import]
+
+from sat.core import exceptions
 from sat.core.constants import Const as C
-from sat.core.log import getLogger
-from sat.core import exceptions
-from twisted.internet import defer, reactor
+from sat.core.core_types import MessageData, SatXMPPEntity
+from sat.core.i18n import _, D_
+from sat.core.log import getLogger, Logger
+from sat.core.sat_main import SAT
+from sat.core.xmpp import SatXMPPClient
+from sat.memory import persistent
+from sat.plugins.plugin_misc_text_commands import TextCommands
+from sat.plugins.plugin_xep_0045 import XEP_0045
+from sat.plugins.plugin_xep_0060 import XEP_0060
+from sat.plugins.plugin_xep_0163 import XEP_0163
+from sat.plugins.plugin_xep_0334 import XEP_0334
+from sat.plugins.plugin_xep_0359 import XEP_0359
+from sat.plugins.plugin_xep_0420 import XEP_0420, SCEAffixPolicy, SCEProfile
+from sat.tools import xml_tools
+from twisted.internet import defer
+from twisted.words.protocols.jabber import error, jid
 from twisted.words.xish import domish
-from twisted.words.protocols.jabber import jid
-from twisted.words.protocols.jabber import error as jabber_error
-from sat.memory import persistent
-from sat.tools import xml_tools
+
 try:
     import omemo
-    from omemo import exceptions as omemo_excpt
-    from omemo.extendedpublicbundle import ExtendedPublicBundle
-except ImportError:
+    import omemo.identity_key_pair
+    import twomemo
+    import twomemo.etree
+    import oldmemo
+    import oldmemo.etree
+    import oldmemo.migrations
+    from xmlschema import XMLSchemaValidationError
+
+    # An explicit version check of the OMEMO libraries should not be required here, since
+    # the stored data is fully versioned and the library will complain if a downgrade is
+    # attempted.
+except ImportError as import_error:
     raise exceptions.MissingModule(
-        'Missing module omemo, please download/install it. You can use '
-        '"pip install omemo"'
-    )
-try:
-    from omemo_backend_signal import BACKEND as omemo_backend
-except ImportError:
-    raise exceptions.MissingModule(
-        'Missing module omemo-backend-signal, please download/install it. You can use '
-        '"pip install omemo-backend-signal"'
-    )
+        "You are missing one or more package required by the OMEMO plugin. Please"
+        " download/install the pip packages 'omemo', 'twomemo', 'oldmemo' and"
+        " 'xmlschema'."
+    ) from import_error
+
 
-log = getLogger(__name__)
+__all__ = [  # pylint: disable=unused-variable
+    "PLUGIN_INFO",
+    "OMEMO"
+]
+
+
+log = cast(Logger, getLogger(__name__))  # type: ignore[no-untyped-call]
+
+
+string_to_domish = cast(Callable[[str], domish.Element], xml_tools.ElementParser())
+
 
 PLUGIN_INFO = {
     C.PI_NAME: "OMEMO",
     C.PI_IMPORT_NAME: "XEP-0384",
     C.PI_TYPE: "SEC",
-    C.PI_PROTOCOLS: ["XEP-0384"],
-    C.PI_DEPENDENCIES: ["XEP-0163", "XEP-0280", "XEP-0334", "XEP-0060"],
-    C.PI_RECOMMENDATIONS: ["XEP-0045", "XEP-0359", C.TEXT_CMDS],
+    C.PI_PROTOCOLS: [ "XEP-0384" ],
+    C.PI_DEPENDENCIES: [ "XEP-0163", "XEP-0280", "XEP-0334", "XEP-0060", "XEP-0420" ],
+    C.PI_RECOMMENDATIONS: [ "XEP-0045", "XEP-0359", C.TEXT_CMDS ],
     C.PI_MAIN: "OMEMO",
     C.PI_HANDLER: "no",
     C.PI_DESCRIPTION: _("""Implementation of OMEMO"""),
 }
 
-OMEMO_MIN_VER = (0, 11, 0)
-NS_OMEMO = "eu.siacs.conversations.axolotl"
-NS_OMEMO_DEVICES = NS_OMEMO + ".devicelist"
-NS_OMEMO_BUNDLE = NS_OMEMO + ".bundles:{device_id}"
-KEY_STATE = "STATE"
-KEY_DEVICE_ID = "DEVICE_ID"
-KEY_SESSION = "SESSION"
-KEY_TRUST = "TRUST"
-# devices which have been automatically trusted by policy like BTBV
-KEY_AUTO_TRUST = "AUTO_TRUST"
-# list of peer bare jids where trust UI has been used at least once
-# this is useful to activate manual trust with BTBV policy
-KEY_MANUAL_TRUST = "MANUAL_TRUST"
-KEY_ACTIVE_DEVICES = "DEVICES"
-KEY_INACTIVE_DEVICES = "INACTIVE_DEVICES"
-KEY_ALL_JIDS = "ALL_JIDS"
-# time before plaintext cache for MUC is expired
-# expressed in seconds, reset on each new MUC message
-MUC_CACHE_TTL = 60 * 5
 
 PARAM_CATEGORY = "Security"
 PARAM_NAME = "omemo_policy"
 
 
-# we want to manage log emitted by omemo module ourselves
+class LogHandler(logging.Handler):
+    """
+    Redirect python-omemo's log output to Libervia's log system.
+    """
 
-class SatHandler(logging.Handler):
-
-    def emit(self, record):
+    def emit(self, record: logging.LogRecord) -> None:
         log.log(record.levelname, record.getMessage())
 
-    @staticmethod
-    def install():
-        omemo_sm_logger = logging.getLogger("omemo.SessionManager")
-        omemo_sm_logger.propagate = False
-        omemo_sm_logger.addHandler(SatHandler())
+
+sm_logger = logging.getLogger(omemo.SessionManager.LOG_TAG)
+sm_logger.setLevel(logging.DEBUG)
+sm_logger.propagate = False
+sm_logger.addHandler(LogHandler())
+
+
+ikp_logger = logging.getLogger(omemo.identity_key_pair.IdentityKeyPair.LOG_TAG)
+ikp_logger.setLevel(logging.DEBUG)
+ikp_logger.propagate = False
+ikp_logger.addHandler(LogHandler())
+
+
+# TODO: Add handling for device labels, i.e. show device labels in the trust UI and give
+# the user a way to change their own device label.
 
 
-SatHandler.install()
+class MUCPlaintextCacheKey(NamedTuple):
+    # pylint: disable=invalid-name
+    """
+    Structure identifying an encrypted message sent to a MUC.
+    """
+
+    client: SatXMPPClient
+    room_jid: jid.JID
+    message_uid: str
+
+
+# TODO: Convert without serialization/parsing
+# On a medium-to-large-sized oldmemo message stanza, 10000 runs of this function took
+# around 0.6 seconds on my setup.
+def etree_to_domish(element: ET.Element) -> domish.Element:
+    """
+    @param element: An ElementTree element.
+    @return: The ElementTree element converted to a domish element.
+    """
+
+    return string_to_domish(ET.tostring(element, encoding="unicode"))
+
+
+# TODO: Convert without serialization/parsing
+# On a medium-to-large-sized oldmemo message stanza, 10000 runs of this function took less
+# than one second on my setup.
+def domish_to_etree(element: domish.Element) -> ET.Element:
+    """
+    @param element: A domish element.
+    @return: The domish element converted to an ElementTree element.
+    """
+
+    return ET.fromstring(element.toXml())
 
 
-def b64enc(data):
-    return base64.b64encode(bytes(bytearray(data))).decode("US-ASCII")
+def domish_to_etree2(element: domish.Element) -> ET.Element:
+    """
+    WIP
+    """
+
+    element_name = element.name
+    if element.uri is not None:
+        element_name = "{" + element.uri + "}" + element_name
+
+    attrib: Dict[str, str] = {}
+    for qname, value in element.attributes.items():
+        attribute_name = qname[1] if isinstance(qname, tuple) else qname
+        attribute_namespace = qname[0] if isinstance(qname, tuple) else None
+        if attribute_namespace is not None:
+            attribute_name = "{" + attribute_namespace + "}" + attribute_name
+
+        attrib[attribute_name] = value
+
+    result = ET.Element(element_name, attrib)
+
+    last_child: Optional[ET.Element] = None
+    for child in element.children:
+        if isinstance(child, str):
+            if last_child is None:
+                result.text = child
+            else:
+                last_child.tail = child
+        else:
+            last_child = domish_to_etree2(child)
+            result.append(last_child)
+
+    return result
 
 
-def promise2Deferred(promise_):
-    """Create a Deferred and fire it when promise is resolved
+@enum.unique
+class TrustLevel(enum.Enum):
+    """
+    The trust levels required for BTBV and manual trust.
+    """
+
+    TRUSTED: str = "TRUSTED"
+    BLINDLY_TRUSTED: str = "BLINDLY_TRUSTED"
+    UNDECIDED: str = "UNDECIDED"
+    DISTRUSTED: str = "DISTRUSTED"
+
+    def to_omemo_trust_level(self) -> omemo.TrustLevel:
+        """
+        @return: This custom trust level evaluated to one of the OMEMO trust levels.
+        """
 
-    @param promise_(promise.Promise): promise to convert
-    @return (defer.Deferred): deferred instance linked to the promise
+        if self is TrustLevel.TRUSTED or self is TrustLevel.BLINDLY_TRUSTED:
+            return omemo.TrustLevel.TRUSTED
+        if self is TrustLevel.UNDECIDED:
+            return omemo.TrustLevel.UNDECIDED
+        if self is TrustLevel.DISTRUSTED:
+            return omemo.TrustLevel.DISTRUSTED
+
+        return assert_never(self)
+
+
+TWOMEMO_DEVICE_LIST_NODE = "urn:xmpp:omemo:2:devices"
+OLDMEMO_DEVICE_LIST_NODE = "eu.siacs.conversations.axolotl.devicelist"
+
+
+class StorageImpl(omemo.Storage):
+    """
+    Storage implementation for OMEMO based on :class:`persistent.LazyPersistentBinaryDict`
     """
-    d = defer.Deferred()
-    promise_.then(
-        lambda result: reactor.callFromThread(d.callback, result),
-        lambda exc: reactor.callFromThread(d.errback, exc)
-    )
-    return d
+
+    def __init__(self, profile: str) -> None:
+        """
+        @param profile: The profile this OMEMO data belongs to.
+        """
+
+        # persistent.LazyPersistentBinaryDict does not cache at all, so keep the caching
+        # option of omemo.Storage enabled.
+        super().__init__()
+
+        self.__storage = persistent.LazyPersistentBinaryDict("XEP-0384", profile)
+
+    async def _load(self, key: str) -> omemo.Maybe[omemo.JSONType]:
+        try:
+            return omemo.Just(await self.__storage[key])
+        except KeyError:
+            return omemo.Nothing()
+        except Exception as e:
+            raise omemo.StorageException(f"Error while loading key {key}") from e
+
+    async def _store(self, key: str, value: omemo.JSONType) -> None:
+        try:
+            await self.__storage.force(key, value)
+        except Exception as e:
+            raise omemo.StorageException(f"Error while storing key {key}: {value}") from e
+
+    async def _delete(self, key: str) -> None:
+        try:
+            await self.__storage.remove(key)
+        except KeyError:
+            pass
+        except Exception as e:
+            raise omemo.StorageException(f"Error while deleting key {key}") from e
 
 
-class OmemoStorage(omemo.Storage):
-
-    def __init__(self, client, device_id, all_jids):
-        self.own_bare_jid_s = client.jid.userhost()
-        self.device_id = device_id
-        self.all_jids = all_jids
-        self.data = client._xep_0384_data
+class LegacyStorageImpl(oldmemo.migrations.LegacyStorage):
+    """
+    Legacy storage implementation to migrate data from the old XEP-0384 plugin.
+    """
 
-    @property
-    def is_async(self):
-        return True
+    KEY_DEVICE_ID = "DEVICE_ID"
+    KEY_STATE = "STATE"
+    KEY_SESSION = "SESSION"
+    KEY_ACTIVE_DEVICES = "DEVICES"
+    KEY_INACTIVE_DEVICES = "INACTIVE_DEVICES"
+    KEY_TRUST = "TRUST"
+    KEY_ALL_JIDS = "ALL_JIDS"
 
-    def setCb(self, deferred, callback):
-        """Associate Deferred and callback
-
-        callback of omemo.Storage expect a boolean with success state then result
-        Deferred on the other hand use 2 methods for callback and errback
-        This method use partial to call callback with boolean then result when
-        Deferred is called
+    def __init__(self, profile: str, own_bare_jid: str) -> None:
+        """
+        @param profile: The profile this OMEMO data belongs to.
+        @param own_bare_jid: The own bare JID, to return by the :meth:`loadOwnData` call.
         """
-        deferred.addCallback(partial(callback, True))
-        deferred.addErrback(partial(callback, False))
+
+        self.__storage = persistent.LazyPersistentBinaryDict("XEP-0384", profile)
+        self.__own_bare_jid = own_bare_jid
+
+    async def loadOwnData(self) -> Optional[oldmemo.migrations.OwnData]:
+        own_device_id = await self.__storage.get(LegacyStorageImpl.KEY_DEVICE_ID, None)
+        if own_device_id is None:
+            return None
+
+        return oldmemo.migrations.OwnData(
+            own_bare_jid=self.__own_bare_jid,
+            own_device_id=own_device_id
+        )
+
+    async def deleteOwnData(self) -> None:
+        try:
+            await self.__storage.remove(LegacyStorageImpl.KEY_DEVICE_ID)
+        except KeyError:
+            pass
+
+    async def loadState(self) -> Optional[oldmemo.migrations.State]:
+        return cast(
+            Optional[oldmemo.migrations.State],
+            await self.__storage.get(LegacyStorageImpl.KEY_STATE, None)
+        )
+
+    async def deleteState(self) -> None:
+        try:
+            await self.__storage.remove(LegacyStorageImpl.KEY_STATE)
+        except KeyError:
+            pass
+
+    async def loadSession(
+        self,
+        bare_jid: str,
+        device_id: int
+    ) -> Optional[oldmemo.migrations.Session]:
+        key = "\n".join([ LegacyStorageImpl.KEY_SESSION, bare_jid, str(device_id) ])
 
-    def _callMainThread(self, callback, method, *args, check_jid=None):
-        if check_jid is None:
-            d = method(*args)
-        else:
-            check_jid_d = self._checkJid(check_jid)
-            check_jid_d.addCallback(lambda __: method(*args))
-            d = check_jid_d
+        return cast(
+            Optional[oldmemo.migrations.Session],
+            await self.__storage.get(key, None)
+        )
+
+    async def deleteSession(self, bare_jid: str, device_id: int) -> None:
+        key = "\n".join([ LegacyStorageImpl.KEY_SESSION, bare_jid, str(device_id) ])
+
+        try:
+            await self.__storage.remove(key)
+        except KeyError:
+            pass
+
+    async def loadActiveDevices(self, bare_jid: str) -> Optional[List[int]]:
+        key = "\n".join([ LegacyStorageImpl.KEY_ACTIVE_DEVICES, bare_jid ])
+
+        return cast(
+            Optional[List[int]],
+            await self.__storage.get(key, None)
+        )
+
+    async def loadInactiveDevices(self, bare_jid: str) -> Optional[Dict[int, int]]:
+        key = "\n".join([ LegacyStorageImpl.KEY_INACTIVE_DEVICES, bare_jid ])
+
+        return cast(
+            Optional[Dict[int, int]],
+            await self.__storage.get(key, None)
+        )
+
+    async def deleteActiveDevices(self, bare_jid: str) -> None:
+        key = "\n".join([ LegacyStorageImpl.KEY_ACTIVE_DEVICES, bare_jid ])
+
+        try:
+            await self.__storage.remove(key)
+        except KeyError:
+            pass
+
+    async def deleteInactiveDevices(self, bare_jid: str) -> None:
+        key = "\n".join([ LegacyStorageImpl.KEY_INACTIVE_DEVICES, bare_jid ])
 
-        if callback is not None:
-            d.addCallback(partial(callback, True))
-            d.addErrback(partial(callback, False))
+        try:
+            await self.__storage.remove(key)
+        except KeyError:
+            pass
+
+    async def loadTrust(
+        self,
+        bare_jid: str,
+        device_id: int
+    ) -> Optional[oldmemo.migrations.Trust]:
+        key = "\n".join([ LegacyStorageImpl.KEY_TRUST, bare_jid, str(device_id) ])
+
+        return cast(
+            Optional[oldmemo.migrations.Trust],
+            await self.__storage.get(key, None)
+        )
+
+    async def deleteTrust(self, bare_jid: str, device_id: int) -> None:
+        key = "\n".join([ LegacyStorageImpl.KEY_TRUST, bare_jid, str(device_id) ])
+
+        try:
+            await self.__storage.remove(key)
+        except KeyError:
+            pass
+
+    async def listJIDs(self) -> Optional[List[str]]:
+        bare_jids = await self.__storage.get(LegacyStorageImpl.KEY_ALL_JIDS, None)
+
+        return None if bare_jids is None else list(bare_jids)
+
+    async def deleteJIDList(self) -> None:
+        try:
+            await self.__storage.remove(LegacyStorageImpl.KEY_ALL_JIDS)
+        except KeyError:
+            pass
+
 
-    def _call(self, callback, method, *args, check_jid=None):
-        """Create Deferred and add Promise callback to it
+async def download_oldmemo_bundle(
+    client: SatXMPPClient,
+    xep_0060: XEP_0060,
+    bare_jid: str,
+    device_id: int
+) -> oldmemo.oldmemo.BundleImpl:
+    """Download the oldmemo bundle corresponding to a specific device.
+
+    @param client: The client.
+    @param xep_0060: The XEP-0060 plugin instance to use for pubsub interactions.
+    @param bare_jid: The bare JID the device belongs to.
+    @param device_id: The id of the device.
+    @return: The bundle.
+    @raise BundleDownloadFailed: if the download failed. Feel free to raise a subclass
+        instead.
+    """
+    # Bundle downloads are needed by the session manager and for migrations from legacy,
+    # thus it is made a separate function.
 
-        This method use reactor.callLater to launch Deferred in main thread
-        @param check_jid: run self._checkJid before method
-        """
-        reactor.callFromThread(
-            self._callMainThread, callback, method, *args, check_jid=check_jid
+    namespace = oldmemo.oldmemo.NAMESPACE
+    node = f"eu.siacs.conversations.axolotl.bundles:{device_id}"
+
+    try:
+        items, __ = await xep_0060.getItems(client, jid.JID(bare_jid), node)
+    except Exception as e:
+        raise omemo.BundleDownloadFailed(
+            f"Bundle download failed for {bare_jid}: {device_id} under namespace"
+            f" {namespace}"
+        ) from e
+
+    if len(items) != 1:
+        raise omemo.BundleDownloadFailed(
+            f"Bundle download failed for {bare_jid}: {device_id} under namespace"
+            f" {namespace}: Unexpected number of items retrieved: {len(items)}."
+        )
+
+    element = next(iter(domish_to_etree(cast(domish.Element, items[0]))), None)
+    if element is None:
+        raise omemo.BundleDownloadFailed(
+            f"Bundle download failed for {bare_jid}: {device_id} under namespace"
+            f" {namespace}: Item download succeeded but parsing failed: {element}."
         )
 
-    def _checkJid(self, bare_jid):
-        """Check if jid is known, and store it if not
-
-        @param bare_jid(unicode): bare jid to check
-        @return (D): Deferred fired when jid is stored
-        """
-        if bare_jid in self.all_jids:
-            return defer.succeed(None)
-        else:
-            self.all_jids.add(bare_jid)
-            d = self.data.force(KEY_ALL_JIDS, self.all_jids)
-            return d
-
-    def loadOwnData(self, callback):
-        callback(True, {'own_bare_jid': self.own_bare_jid_s,
-                        'own_device_id': self.device_id})
-
-    def storeOwnData(self, callback, own_bare_jid, own_device_id):
-        if own_bare_jid != self.own_bare_jid_s or own_device_id != self.device_id:
-            raise exceptions.InternalError('bare jid or device id inconsistency!')
-        callback(True, None)
-
-    def loadState(self, callback):
-        self._call(callback, self.data.get, KEY_STATE)
-
-    def storeState(self, callback, state):
-        self._call(callback, self.data.force, KEY_STATE, state)
-
-    def loadSession(self, callback, bare_jid, device_id):
-        key = '\n'.join([KEY_SESSION, bare_jid, str(device_id)])
-        self._call(callback, self.data.get, key)
-
-    def storeSession(self, callback, bare_jid, device_id, session):
-        key = '\n'.join([KEY_SESSION, bare_jid, str(device_id)])
-        self._call(callback, self.data.force, key, session)
-
-    def deleteSession(self, callback, bare_jid, device_id):
-        key = '\n'.join([KEY_SESSION, bare_jid, str(device_id)])
-        self._call(callback, self.data.remove, key)
-
-    def loadActiveDevices(self, callback, bare_jid):
-        key = '\n'.join([KEY_ACTIVE_DEVICES, bare_jid])
-        self._call(callback, self.data.get, key, {})
-
-    def loadInactiveDevices(self, callback, bare_jid):
-        key = '\n'.join([KEY_INACTIVE_DEVICES, bare_jid])
-        self._call(callback, self.data.get, key, {})
-
-    def storeActiveDevices(self, callback, bare_jid, devices):
-        key = '\n'.join([KEY_ACTIVE_DEVICES, bare_jid])
-        self._call(callback, self.data.force, key, devices, check_jid=bare_jid)
-
-    def storeInactiveDevices(self, callback, bare_jid, devices):
-        key = '\n'.join([KEY_INACTIVE_DEVICES, bare_jid])
-        self._call(callback, self.data.force, key, devices, check_jid=bare_jid)
-
-    def storeTrust(self, callback, bare_jid, device_id, trust):
-        key = '\n'.join([KEY_TRUST, bare_jid, str(device_id)])
-        self._call(callback, self.data.force, key, trust)
-
-    def loadTrust(self, callback, bare_jid, device_id):
-        key = '\n'.join([KEY_TRUST, bare_jid, str(device_id)])
-        self._call(callback, self.data.get, key)
-
-    def listJIDs(self, callback):
-        if callback is not None:
-            callback(True, self.all_jids)
-
-    def _deleteJID_logResults(self, results):
-        failed = [success for success, __ in results if not success]
-        if failed:
-            log.warning(
-                "delete JID failed for {failed_count} on {total_count} operations"
-                .format(failed_count=len(failed), total_count=len(results)))
-        else:
-            log.info(
-                "Delete JID operation succeed ({total_count} operations)."
-                .format(total_count=len(results)))
-
-    def _deleteJID_gotDevices(self, results, bare_jid):
-        assert len(results) == 2
-        active_success, active_devices = results[0]
-        inactive_success, inactive_devices = results[0]
-        d_list = []
-        for success, devices in results:
-            if not success:
-                log.warning("Can't retrieve devices for {bare_jid}: {reason}"
-                    .format(bare_jid=bare_jid, reason=active_devices))
-            else:
-                for device_id in devices:
-                    for key in (KEY_SESSION, KEY_TRUST):
-                        k = '\n'.join([key, bare_jid, str(device_id)])
-                        d_list.append(self.data.remove(k))
-
-        d_list.append(self.data.remove(KEY_ACTIVE_DEVICES, bare_jid))
-        d_list.append(self.data.remove(KEY_INACTIVE_DEVICES, bare_jid))
-        d_list.append(lambda __: self.all_jids.discard(bare_jid))
-        # FIXME: there is a risk of race condition here,
-        #        if self.all_jids is modified between discard and force)
-        d_list.append(lambda __: self.data.force(KEY_ALL_JIDS, self.all_jids))
-        d = defer.DeferredList(d_list)
-        d.addCallback(self._deleteJID_logResults)
-        return d
-
-    def _deleteJID(self, callback, bare_jid):
-        d_list = []
-
-        key = '\n'.join([KEY_ACTIVE_DEVICES, bare_jid])
-        d_list.append(self.data.get(key, []))
-
-        key = '\n'.join([KEY_INACTIVE_DEVICES, bare_jid])
-        d_inactive = self.data.get(key, {})
-        # inactive devices are returned as a dict mapping from devices_id to timestamp
-        # but we only need devices ids
-        d_inactive.addCallback(lambda devices: [k for k, __ in devices])
-
-        d_list.append(d_inactive)
-        d = defer.DeferredList(d_list)
-        d.addCallback(self._deleteJID_gotDevices, bare_jid)
-        if callback is not None:
-            self.setCb(d, callback)
-
-    def deleteJID(self, callback, bare_jid):
-        """Retrieve all (in)actives devices of bare_jid, and delete all related keys"""
-        reactor.callFromThread(self._deleteJID, callback, bare_jid)
+    try:
+        return oldmemo.etree.parse_bundle(element, bare_jid, device_id)
+    except Exception as e:
+        raise omemo.BundleDownloadFailed(
+            f"Bundle parsing failed for {bare_jid}: {device_id} under namespace"
+            f" {namespace}"
+        ) from e
 
 
-class SatOTPKPolicy(omemo.DefaultOTPKPolicy):
-    pass
+def make_session_manager(sat: SAT, profile: str) -> Type[omemo.SessionManager]:
+    """
+    @param sat: The SAT instance.
+    @param profile: The profile.
+    @return: A non-abstract subclass of :class:`~omemo.session_manager.SessionManager`
+        with XMPP interactions and trust handled via the SAT instance.
+    """
+
+    client = sat.getClient(profile)
+    xep_0060 = cast(XEP_0060, sat.plugins["XEP-0060"])
+
+    class SessionManagerImpl(omemo.SessionManager):
+        """
+        Session manager implementation handling XMPP interactions and trust via an
+        instance of :class:`~sat.core.sat_main.SAT`.
+        """
+
+        @staticmethod
+        async def _upload_bundle(bundle: omemo.Bundle) -> None:
+            if isinstance(bundle, twomemo.twomemo.BundleImpl):
+                element = twomemo.etree.serialize_bundle(bundle)
+
+                node = "urn:xmpp:omemo:2:bundles"
+                try:
+                    await xep_0060.sendItem(
+                        client,
+                        client.jid.userhostJID(),
+                        node,
+                        etree_to_domish(element),
+                        item_id=str(bundle.device_id),
+                        extra={
+                            xep_0060.EXTRA_PUBLISH_OPTIONS: {
+                                xep_0060.OPT_MAX_ITEMS: "max"
+                            },
+                            xep_0060.EXTRA_ON_PRECOND_NOT_MET: "raise"
+                        }
+                    )
+                except (error.StanzaError, Exception) as e:
+                    if (
+                        isinstance(e, error.StanzaError)
+                        and e.condition == "conflict"
+                        and e.appCondition is not None
+                        # pylint: disable=no-member
+                        and e.appCondition.name == "precondition-not-met"
+                    ):
+                        # publish options couldn't be set on the fly, manually reconfigure
+                        # the node and publish again
+                        raise omemo.BundleUploadFailed(
+                            f"precondition-not-met: {bundle}"
+                        ) from e
+                        # TODO: What can I do here? The correct node configuration is a
+                        # MUST in the XEP.
+
+                    raise omemo.BundleUploadFailed(
+                        f"Bundle upload failed: {bundle}"
+                    ) from e
+
+                return
+
+            if isinstance(bundle, oldmemo.oldmemo.BundleImpl):
+                element = oldmemo.etree.serialize_bundle(bundle)
+
+                node = f"eu.siacs.conversations.axolotl.bundles:{bundle.device_id}"
+                try:
+                    await xep_0060.sendItem(
+                        client,
+                        client.jid.userhostJID(),
+                        node,
+                        etree_to_domish(element),
+                        item_id=xep_0060.ID_SINGLETON,
+                        extra={
+                            xep_0060.EXTRA_PUBLISH_OPTIONS: { xep_0060.OPT_MAX_ITEMS: 1 },
+                            xep_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options"
+                        }
+                    )
+                except Exception as e:
+                    raise omemo.BundleUploadFailed(
+                        f"Bundle upload failed: {bundle}"
+                    ) from e
+
+                return
+
+            raise omemo.UnknownNamespace(f"Unknown namespace: {bundle.namespace}")
+
+        @staticmethod
+        async def _download_bundle(
+            namespace: str,
+            bare_jid: str,
+            device_id: int
+        ) -> omemo.Bundle:
+            if namespace == twomemo.twomemo.NAMESPACE:
+                node = "urn:xmpp:omemo:2:bundles"
+
+                try:
+                    items, __ = await xep_0060.getItems(
+                        client,
+                        jid.JID(bare_jid),
+                        node,
+                        max_items=None,
+                        item_ids=[ str(device_id) ]
+                    )
+                except Exception as e:
+                    raise omemo.BundleDownloadFailed(
+                        f"Bundle download failed for {bare_jid}: {device_id} under"
+                        f" namespace {namespace}"
+                    ) from e
+
+                if len(items) != 1:
+                    raise omemo.BundleDownloadFailed(
+                        f"Bundle download failed for {bare_jid}: {device_id} under"
+                        f" namespace {namespace}: Unexpected number of items retrieved:"
+                        f" {len(items)}."
+                    )
+
+                element = next(
+                    iter(domish_to_etree(cast(domish.Element, items[0]))),
+                    None
+                )
+                if element is None:
+                    raise omemo.BundleDownloadFailed(
+                        f"Bundle download failed for {bare_jid}: {device_id} under"
+                        f" namespace {namespace}: Item download succeeded but parsing"
+                        f" failed: {element}."
+                    )
+
+                try:
+                    return twomemo.etree.parse_bundle(element, bare_jid, device_id)
+                except Exception as e:
+                    raise omemo.BundleDownloadFailed(
+                        f"Bundle parsing failed for {bare_jid}: {device_id} under"
+                        f" namespace {namespace}"
+                    ) from e
+
+            if namespace == oldmemo.oldmemo.NAMESPACE:
+                return await download_oldmemo_bundle(
+                    client,
+                    xep_0060,
+                    bare_jid,
+                    device_id
+                )
+
+            raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}")
+
+        @staticmethod
+        async def _delete_bundle(namespace: str, device_id: int) -> None:
+            if namespace == twomemo.twomemo.NAMESPACE:
+                node = "urn:xmpp:omemo:2:bundles"
+
+                try:
+                    await xep_0060.retractItems(
+                        client,
+                        client.jid.userhostJID(),
+                        node,
+                        [ str(device_id) ],
+                        notify=False
+                    )
+                except Exception as e:
+                    raise omemo.BundleDeletionFailed(
+                        f"Bundle deletion failed for {device_id} under namespace"
+                        f" {namespace}"
+                    ) from e
+
+                return
+
+            if namespace == oldmemo.oldmemo.NAMESPACE:
+                node = f"eu.siacs.conversations.axolotl.bundles:{device_id}"
+
+                try:
+                    await xep_0060.deleteNode(client, client.jid.userhostJID(), node)
+                except Exception as e:
+                    raise omemo.BundleDeletionFailed(
+                        f"Bundle deletion failed for {device_id} under namespace"
+                        f" {namespace}"
+                    ) from e
+
+                return
+
+            raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}")
+
+        @staticmethod
+        async def _upload_device_list(
+            namespace: str,
+            device_list: Dict[int, Optional[str]]
+        ) -> None:
+            element: Optional[ET.Element] = None
+            node: Optional[str] = None
+
+            if namespace == twomemo.twomemo.NAMESPACE:
+                element = twomemo.etree.serialize_device_list(device_list)
+                node = TWOMEMO_DEVICE_LIST_NODE
+            if namespace == oldmemo.oldmemo.NAMESPACE:
+                element = oldmemo.etree.serialize_device_list(device_list)
+                node = OLDMEMO_DEVICE_LIST_NODE
+
+            if element is None or node is None:
+                raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}")
+
+            try:
+                await xep_0060.sendItem(
+                    client,
+                    client.jid.userhostJID(),
+                    node,
+                    etree_to_domish(element),
+                    item_id=xep_0060.ID_SINGLETON,
+                    extra={
+                        xep_0060.EXTRA_PUBLISH_OPTIONS: {
+                            xep_0060.OPT_MAX_ITEMS: 1,
+                            xep_0060.OPT_ACCESS_MODEL: "open"
+                        },
+                        xep_0060.EXTRA_ON_PRECOND_NOT_MET: "raise"
+                    }
+                )
+            except (error.StanzaError, Exception) as e:
+                if (
+                    isinstance(e, error.StanzaError)
+                    and e.condition == "conflict"
+                    and e.appCondition is not None
+                    # pylint: disable=no-member
+                    and e.appCondition.name == "precondition-not-met"
+                ):
+                    # publish options couldn't be set on the fly, manually reconfigure the
+                    # node and publish again
+                    raise omemo.DeviceListUploadFailed(
+                        f"precondition-not-met for namespace {namespace}"
+                    ) from e
+                    # TODO: What can I do here? The correct node configuration is a MUST
+                    # in the XEP.
+
+                raise omemo.DeviceListUploadFailed(
+                    f"Device list upload failed for namespace {namespace}"
+                ) from e
+
+        @staticmethod
+        async def _download_device_list(
+            namespace: str,
+            bare_jid: str
+        ) -> Dict[int, Optional[str]]:
+            node: Optional[str] = None
+
+            if namespace == twomemo.twomemo.NAMESPACE:
+                node = TWOMEMO_DEVICE_LIST_NODE
+            if namespace == oldmemo.oldmemo.NAMESPACE:
+                node = OLDMEMO_DEVICE_LIST_NODE
+
+            if node is None:
+                raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}")
+
+            try:
+                items, __ = await xep_0060.getItems(client, jid.JID(bare_jid), node)
+            except exceptions.NotFound:
+                return {}
+            except Exception as e:
+                raise omemo.DeviceListDownloadFailed(
+                    f"Device list download failed for {bare_jid} under namespace"
+                    f" {namespace}"
+                ) from e
+
+            if len(items) != 1:
+                raise omemo.DeviceListDownloadFailed(
+                    f"Device list download failed for {bare_jid} under namespace"
+                    f" {namespace}: Unexpected number of items retrieved: {len(items)}."
+                )
+
+            element = next(iter(domish_to_etree(cast(domish.Element, items[0]))), None)
+            if element is None:
+                raise omemo.DeviceListDownloadFailed(
+                    f"Device list download failed for {bare_jid} under namespace"
+                    f" {namespace}: Item download succeeded but parsing failed:"
+                    f" {element}."
+                )
+
+            try:
+                if namespace == twomemo.twomemo.NAMESPACE:
+                    return twomemo.etree.parse_device_list(element)
+                if namespace == oldmemo.oldmemo.NAMESPACE:
+                    return oldmemo.etree.parse_device_list(element)
+            except Exception as e:
+                raise omemo.DeviceListDownloadFailed(
+                    f"Device list download failed for {bare_jid} under namespace"
+                    f" {namespace}"
+                ) from e
+
+            raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}")
+
+        @staticmethod
+        def _evaluate_custom_trust_level(trust_level_name: str) -> omemo.TrustLevel:
+            try:
+                return TrustLevel(trust_level_name).to_omemo_trust_level()
+            except ValueError as e:
+                raise omemo.UnknownTrustLevel(
+                    f"Unknown trust level name {trust_level_name}"
+                ) from e
+
+        async def _make_trust_decision(
+            self,
+            undecided: FrozenSet[omemo.DeviceInformation],
+            identifier: Optional[str]
+        ) -> None:
+            if identifier is None:
+                raise omemo.TrustDecisionFailed(
+                    "The identifier must contain the feedback JID."
+                )
+
+            # The feedback JID is transferred via the identifier
+            feedback_jid = jid.JID(identifier).userhostJID()
+
+            # Get the name of the trust model to use
+            trust_model = cast(str, sat.memory.getParamA(
+                PARAM_NAME,
+                PARAM_CATEGORY,
+                profile_key=cast(str, client.profile)
+            ))
+
+            # Under the BTBV trust model, if at least one device of a bare JID is manually
+            # trusted or distrusted, the trust model is "downgraded" to manual trust.
+            # Thus, we can separate bare JIDs into two pools here, one pool of bare JIDs
+            # for which BTBV is active, and one pool of bare JIDs for which manual trust
+            # is used.
+            bare_jids = { device.bare_jid for device in undecided }
+
+            btbv_bare_jids: Set[str] = set()
+            manual_trust_bare_jids: Set[str] = set()
+
+            if trust_model == "btbv":
+                # For each bare JID, decide whether BTBV or manual trust applies
+                for bare_jid in bare_jids:
+                    # Get all known devices belonging to the bare JID
+                    devices = await self.get_device_information(bare_jid)
+
+                    # If the trust levels of all devices correspond to those used by BTBV,
+                    # BTBV applies. Otherwise, fall back to manual trust.
+                    if all(TrustLevel(device.trust_level_name) in {
+                        TrustLevel.UNDECIDED,
+                        TrustLevel.BLINDLY_TRUSTED
+                    } for device in devices):
+                        btbv_bare_jids.add(bare_jid)
+                    else:
+                        manual_trust_bare_jids.add(bare_jid)
+
+            if trust_model == "manual":
+                manual_trust_bare_jids = bare_jids
+
+            # With the JIDs sorted into their respective pools, the undecided devices can
+            # be categorized too
+            blindly_trusted_devices = \
+                { dev for dev in undecided if dev.bare_jid in btbv_bare_jids }
+            manually_trusted_devices = \
+                { dev for dev in undecided if dev.bare_jid in manual_trust_bare_jids }
+
+            # Blindly trust devices handled by BTBV
+            if len(blindly_trusted_devices) > 0:
+                for device in blindly_trusted_devices:
+                    await self.set_trust(
+                        device.bare_jid,
+                        device.identity_key,
+                        TrustLevel.BLINDLY_TRUSTED.name
+                    )
+
+                blindly_trusted_devices_stringified = ", ".join([
+                    f"device {device.device_id} of {device.bare_jid} under namespace"
+                    f" {device.namespaces}"
+                    for device
+                    in blindly_trusted_devices
+                ])
+
+                client.feedback(
+                    feedback_jid,
+                    D_(
+                        "Not all destination devices are trusted, unknown devices will be"
+                        " blindly trusted due to the Blind Trust Before Verification"
+                        " policy. If you want a more secure workflow, please activate the"
+                        " \"manual\" policy in the settings' \"Security\" tab.\nFollowing"
+                        " devices have been automatically trusted:"
+                        f" {blindly_trusted_devices_stringified}."
+                    )
+                )
+
+            # Prompt the user for manual trust decisions on the devices handled by manual
+            # trust
+            if len(manually_trusted_devices) > 0:
+                client.feedback(
+                    feedback_jid,
+                    D_(
+                        "Not all destination devices are trusted, we can't encrypt"
+                        " message in such a situation. Please indicate if you trust"
+                        " those devices or not in the trust manager before we can"
+                        " send this message."
+                    )
+                )
+                await self.__prompt_manual_trust(
+                    frozenset(manually_trusted_devices),
+                    feedback_jid
+                )
+
+        @staticmethod
+        async def _send_message(message: omemo.Message, bare_jid: str) -> None:
+            element: Optional[ET.Element] = None
+
+            if message.namespace == twomemo.twomemo.NAMESPACE:
+                element = twomemo.etree.serialize_message(message)
+            if message.namespace == oldmemo.oldmemo.NAMESPACE:
+                element = oldmemo.etree.serialize_message(message)
+
+            if element is None:
+                raise omemo.UnknownNamespace(f"Unknown namespace: {message.namespace}")
+
+            # TODO: Untested
+            message_data = client.generateMessageXML(MessageData({
+                "from": client.jid,
+                "to": jid.JID(bare_jid),
+                "uid": str(uuid.uuid4()),
+                "message": {},
+                "subject": {},
+                "type": C.MESS_TYPE_CHAT,
+                "extra": {},
+                "timestamp": time.time()
+            }))
+
+            message_data["xml"].addChild(etree_to_domish(element))
+
+            try:
+                await client.send(message_data["xml"])
+            except Exception as e:
+                raise omemo.MessageSendingFailed() from e
+
+        async def __prompt_manual_trust(
+            self,
+            undecided: FrozenSet[omemo.DeviceInformation],
+            feedback_jid: jid.JID
+        ) -> None:
+            """Asks the user to decide on the manual trust level of a set of devices.
+
+            Blocks until the user has made a decision and updates the trust levels of all
+            devices using :meth:`set_trust`.
+
+            @param undecided: The set of devices to prompt manual trust for.
+            @param feedback_jid: The bare JID to redirect feedback to. In case of a one to
+                one message, the recipient JID. In case of a MUC message, the room JID.
+            @raise TrustDecisionFailed: if the user cancels the prompt.
+            """
+
+            # This session manager handles encryption with both twomemo and oldmemo, but
+            # both are currently registered as different plugins and the `deferXMLUI`
+            # below requires a single namespace identifying the encryption plugin. Thus,
+            # get the namespace of the requested encryption method from the encryption
+            # session using the feedback JID.
+            encryption = client.encryption.getSession(feedback_jid)
+            if encryption is None:
+                raise omemo.TrustDecisionFailed(
+                    f"Encryption not requested for {feedback_jid.userhost()}."
+                )
+
+            namespace = encryption["plugin"].namespace
+
+            # Casting this to Any, otherwise all calls on the variable cause type errors
+            # pylint: disable=no-member
+            trust_ui = cast(Any, xml_tools.XMLUI(
+                panel_type=C.XMLUI_FORM,
+                title=D_("OMEMO trust management"),
+                submit_id=""
+            ))
+            trust_ui.addText(D_(
+                "This is OMEMO trusting system. You'll see below the devices of your "
+                "contacts, and a checkbox to trust them or not. A trusted device "
+                "can read your messages in plain text, so be sure to only validate "
+                "devices that you are sure are belonging to your contact. It's better "
+                "to do this when you are next to your contact and their device, so "
+                "you can check the \"fingerprint\" (the number next to the device) "
+                "yourself. Do *not* validate a device if the fingerprint is wrong!"
+            ))
+
+            own_device, __ = await self.get_own_device_information()
+
+            trust_ui.changeContainer("label")
+            trust_ui.addLabel(D_("This device ID"))
+            trust_ui.addText(str(own_device.device_id))
+            trust_ui.addLabel(D_("This device's fingerprint"))
+            trust_ui.addText(" ".join(self.format_identity_key(own_device.identity_key)))
+            trust_ui.addEmpty()
+            trust_ui.addEmpty()
+
+            # At least sort the devices by bare JID such that they aren't listed
+            # completely random
+            undecided_ordered = sorted(undecided, key=lambda device: device.bare_jid)
+
+            for index, device in enumerate(undecided_ordered):
+                trust_ui.addLabel(D_("Contact"))
+                trust_ui.addJid(jid.JID(device.bare_jid))
+                trust_ui.addLabel(D_("Device ID"))
+                trust_ui.addText(str(device.device_id))
+                trust_ui.addLabel(D_("Fingerprint"))
+                trust_ui.addText(" ".join(self.format_identity_key(device.identity_key)))
+                trust_ui.addLabel(D_("Trust this device?"))
+                trust_ui.addBool(f"trust_{index}", value=C.boolConst(False))
+                trust_ui.addEmpty()
+                trust_ui.addEmpty()
+
+            trust_ui_result = await xml_tools.deferXMLUI(
+                sat,
+                trust_ui,
+                action_extra={ "meta_encryption_trust": namespace },
+                profile=profile
+            )
+
+            if C.bool(trust_ui_result.get("cancelled", "false")):
+                raise omemo.TrustDecisionFailed("Trust UI cancelled.")
+
+            data_form_result = cast(Dict[str, str], xml_tools.XMLUIResult2DataFormResult(
+                trust_ui_result
+            ))
+
+            for key, value in data_form_result.items():
+                if not key.startswith("trust_"):
+                    continue
+
+                device = undecided_ordered[int(key[len("trust_"):])]
+                trust = C.bool(value)
+
+                await self.set_trust(
+                    device.bare_jid,
+                    device.identity_key,
+                    TrustLevel.TRUSTED.name if trust else TrustLevel.DISTRUSTED.name
+                )
+
+    return SessionManagerImpl
 
 
-class OmemoSession:
-    """Wrapper to use omemo.OmemoSession with Deferred"""
-
-    def __init__(self, session):
-        self._session = session
-
-    @property
-    def republish_bundle(self):
-        return self._session.republish_bundle
-
-    @property
-    def public_bundle(self):
-        return self._session.public_bundle
+async def prepare_for_profile(
+    sat: SAT,
+    profile: str,
+    initial_own_label: Optional[str],
+    signed_pre_key_rotation_period: int = 7 * 24 * 60 * 60,
+    pre_key_refill_threshold: int = 99,
+    max_num_per_session_skipped_keys: int = 1000,
+    max_num_per_message_skipped_keys: Optional[int] = None
+) -> omemo.SessionManager:
+    """Prepare the OMEMO library (storage, backends, core) for a specific profile.
 
-    @classmethod
-    def create(cls, client, storage, my_device_id = None):
-        omemo_session_p = omemo.SessionManager.create(
-            storage,
-            SatOTPKPolicy,
-            omemo_backend,
-            client.jid.userhost(),
-            my_device_id)
-        d = promise2Deferred(omemo_session_p)
-        d.addCallback(lambda session: cls(session))
-        return d
-
-    def newDeviceList(self, jid, devices):
-        jid = jid.userhost()
-        new_device_p = self._session.newDeviceList(jid, devices)
-        return promise2Deferred(new_device_p)
+    @param sat: The SAT instance.
+    @param profile: The profile.
+    @param initial_own_label: The initial (optional) label to assign to this device if
+        supported by any of the backends.
+    @param signed_pre_key_rotation_period: The rotation period for the signed pre key, in
+        seconds. The rotation period is recommended to be between one week (the default)
+        and one month.
+    @param pre_key_refill_threshold: The number of pre keys that triggers a refill to 100.
+        Defaults to 99, which means that each pre key gets replaced with a new one right
+        away. The threshold can not be configured to lower than 25.
+    @param max_num_per_session_skipped_keys: The maximum number of skipped message keys to
+        keep around per session. Once the maximum is reached, old message keys are deleted
+        to make space for newer ones. Accessible via
+        :attr:`max_num_per_session_skipped_keys`.
+    @param max_num_per_message_skipped_keys: The maximum number of skipped message keys to
+        accept in a single message. When set to ``None`` (the default), this parameter
+        defaults to the per-session maximum (i.e. the value of the
+        ``max_num_per_session_skipped_keys`` parameter). This parameter may only be 0 if
+        the per-session maximum is 0, otherwise it must be a number between 1 and the
+        per-session maximum. Accessible via :attr:`max_num_per_message_skipped_keys`.
+    @return: A session manager with ``urn:xmpp:omemo:2`` and
+        ``eu.siacs.conversations.axolotl`` capabilities, specifically for the given
+        profile.
+    @raise BundleUploadFailed: if a bundle upload failed. Forwarded from
+        :meth:`~omemo.session_manager.SessionManager.create`.
+    @raise BundleDownloadFailed: if a bundle download failed. Forwarded from
+        :meth:`~omemo.session_manager.SessionManager.create`.
+    @raise BundleDeletionFailed: if a bundle deletion failed. Forwarded from
+        :meth:`~omemo.session_manager.SessionManager.create`.
+    @raise DeviceListUploadFailed: if a device list upload failed. Forwarded from
+        :meth:`~omemo.session_manager.SessionManager.create`.
+    @raise DeviceListDownloadFailed: if a device list download failed. Forwarded from
+        :meth:`~omemo.session_manager.SessionManager.create`.
+    """
 
-    def getDevices(self, bare_jid=None):
-        bare_jid = bare_jid.userhost()
-        get_devices_p = self._session.getDevices(bare_jid=bare_jid)
-        return promise2Deferred(get_devices_p)
+    client = sat.getClient(profile)
+    xep_0060 = cast(XEP_0060, sat.plugins["XEP-0060"])
 
-    def buildSession(self, bare_jid, device, bundle):
-        bare_jid = bare_jid.userhost()
-        build_session_p = self._session.buildSession(bare_jid, int(device), bundle)
-        return promise2Deferred(build_session_p)
-
-    def deleteSession(self, bare_jid, device):
-        bare_jid = bare_jid.userhost()
-        delete_session_p = self._session.deleteSession(
-            bare_jid=bare_jid, device=int(device))
-        return promise2Deferred(delete_session_p)
-
-    def encryptMessage(self, bare_jids, message, bundles=None, expect_problems=None):
-        """Encrypt a message
+    storage = StorageImpl(profile)
 
-        @param bare_jids(iterable[jid.JID]): destinees of the message
-        @param message(unicode): message to encode
-        @param bundles(dict[jid.JID, dict[int, ExtendedPublicBundle]):
-            entities => devices => bundles map
-        @return D(dict): encryption data
-        """
-        bare_jids = [e.userhost() for e in bare_jids]
-        if bundles is not None:
-            bundles = {e.userhost(): v for e, v in bundles.items()}
-        encrypt_mess_p = self._session.encryptMessage(
-            bare_jids=bare_jids,
-            plaintext=message.encode(),
-            bundles=bundles,
-            expect_problems=expect_problems)
-        return promise2Deferred(encrypt_mess_p)
+    # TODO: Untested
+    await oldmemo.migrations.migrate(
+        LegacyStorageImpl(profile, client.jid.userhost()),
+        storage,
+        # TODO: Do we want BLINDLY_TRUSTED or TRUSTED here?
+        TrustLevel.BLINDLY_TRUSTED.name,
+        TrustLevel.UNDECIDED.name,
+        TrustLevel.DISTRUSTED.name,
+        lambda bare_jid, device_id: download_oldmemo_bundle(
+            client,
+            xep_0060,
+            bare_jid,
+            device_id
+        )
+    )
 
-    def encryptRatchetForwardingMessage(
-        self, bare_jids, bundles=None, expect_problems=None):
-        bare_jids = [e.userhost() for e in bare_jids]
-        if bundles is not None:
-            bundles = {e.userhost(): v for e, v in bundles.items()}
-        encrypt_ratchet_fwd_p = self._session.encryptRatchetForwardingMessage(
-            bare_jids=bare_jids,
-            bundles=bundles,
-            expect_problems=expect_problems)
-        return promise2Deferred(encrypt_ratchet_fwd_p)
-
-    def decryptMessage(self, bare_jid, device, iv, message, is_pre_key_message,
-                       ciphertext, additional_information=None, allow_untrusted=False):
-        bare_jid = bare_jid.userhost()
-        decrypt_mess_p = self._session.decryptMessage(
-            bare_jid=bare_jid,
-            device=int(device),
-            iv=iv,
-            message=message,
-            is_pre_key_message=is_pre_key_message,
-            ciphertext=ciphertext,
-            additional_information=additional_information,
-            allow_untrusted=allow_untrusted
+    session_manager = await make_session_manager(sat, profile).create(
+        [
+            twomemo.Twomemo(
+                storage,
+                max_num_per_session_skipped_keys,
+                max_num_per_message_skipped_keys
+            ),
+            oldmemo.Oldmemo(
+                storage,
+                max_num_per_session_skipped_keys,
+                max_num_per_message_skipped_keys
             )
-        return promise2Deferred(decrypt_mess_p)
+        ],
+        storage,
+        client.jid.userhost(),
+        initial_own_label,
+        TrustLevel.UNDECIDED.value,
+        signed_pre_key_rotation_period,
+        pre_key_refill_threshold,
+        omemo.AsyncFramework.TWISTED
+    )
 
-    def decryptRatchetForwardingMessage(
-        self, bare_jid, device, iv, message, is_pre_key_message,
-        additional_information=None, allow_untrusted=False):
-        bare_jid = bare_jid.userhost()
-        decrypt_ratchet_fwd_p = self._session.decryptRatchetForwardingMessage(
-            bare_jid=bare_jid,
-            device=int(device),
-            iv=iv,
-            message=message,
-            is_pre_key_message=is_pre_key_message,
-            additional_information=additional_information,
-            allow_untrusted=allow_untrusted
-            )
-        return promise2Deferred(decrypt_ratchet_fwd_p)
+    # This shouldn't hurt here since we're not running on overly constrainted devices.
+    # TODO: Consider ensuring data consistency regularly/in response to certain events
+    await session_manager.ensure_data_consistency()
 
-    def setTrust(self, bare_jid, device, key, trusted):
-        bare_jid = bare_jid.userhost()
-        setTrust_p = self._session.setTrust(
-            bare_jid=bare_jid,
-            device=int(device),
-            key=key,
-            trusted=trusted,
-        )
-        return promise2Deferred(setTrust_p)
+    # TODO: Correct entering/leaving of the history synchronization mode isn't terribly
+    # important for now, since it only prevents an extremely unlikely race condition of
+    # multiple devices choosing the same pre key for new sessions while the device was
+    # offline. I don't believe other clients seriously defend against that race condition
+    # either. In the long run, it might still be cool to have triggers for when history
+    # sync starts and ends (MAM, MUC catch-up, etc.) and to react to those triggers.
+    await session_manager.after_history_sync()
+
+    return session_manager
+
 
-    def resetTrust(self, bare_jid, device):
-        bare_jid = bare_jid.userhost()
-        resetTrust_p = self._session.resetTrust(
-            bare_jid=bare_jid,
-            device=int(device),
-        )
-        return promise2Deferred(resetTrust_p)
-
-    def getTrustForJID(self, bare_jid):
-        bare_jid = bare_jid.userhost()
-        get_trust_p = self._session.getTrustForJID(bare_jid=bare_jid)
-        return promise2Deferred(get_trust_p)
+DEFAULT_TRUST_MODEL_PARAM = f"""
+<params>
+<individual>
+<category name="{PARAM_CATEGORY}" label={quoteattr(D_('Security'))}>
+    <param name="{PARAM_NAME}"
+        label={quoteattr(D_('OMEMO default trust policy'))}
+        type="list" security="3">
+        <option value="manual" label={quoteattr(D_('Manual trust (more secure)'))} />
+        <option value="btbv"
+            label={quoteattr(D_('Blind Trust Before Verification (more user friendly)'))}
+            selected="true" />
+    </param>
+</category>
+</individual>
+</params>
+"""
 
 
 class OMEMO:
+    """
+    Plugin equipping Libervia with OMEMO capabilities under the (modern)
+    ``urn:xmpp:omemo:2`` namespace and the (legacy) ``eu.siacs.conversations.axolotl``
+    namespace. Both versions of the protocol are handled by this plugin and compatibility
+    between the two is maintained. MUC messages are supported next to one to one messages.
+    For trust management, the two trust models "BTBV" and "manual" are supported.
+    """
 
-    params = """
-    <params>
-    <individual>
-    <category name="{category_name}" label="{category_label}">
-        <param name="{param_name}" label={param_label} type="list" security="3">
-            <option value="manual" label={opt_manual_lbl} />
-            <option value="btbv" label={opt_btbv_lbl} selected="true" />
-        </param>
-     </category>
-    </individual>
-    </params>
-    """.format(
-        category_name=PARAM_CATEGORY,
-        category_label=D_("Security"),
-        param_name=PARAM_NAME,
-        param_label=quoteattr(D_("OMEMO default trust policy")),
-        opt_manual_lbl=quoteattr(D_("Manual trust (more secure)")),
-        opt_btbv_lbl=quoteattr(
-            D_("Blind Trust Before Verification (more user friendly)")),
+    # For MUC/MIX message stanzas, the <to/> affix is a MUST
+    SCE_PROFILE_GROUPCHAT = SCEProfile(
+        rpad_policy=SCEAffixPolicy.REQUIRED,
+        time_policy=SCEAffixPolicy.OPTIONAL,
+        to_policy=SCEAffixPolicy.REQUIRED,
+        from_policy=SCEAffixPolicy.OPTIONAL,
+        custom_policies={}
+    )
+
+    # For everything but MUC/MIX message stanzas, the <to/> affix is a MAY
+    SCE_PROFILE = SCEProfile(
+        rpad_policy=SCEAffixPolicy.REQUIRED,
+        time_policy=SCEAffixPolicy.OPTIONAL,
+        to_policy=SCEAffixPolicy.OPTIONAL,
+        from_policy=SCEAffixPolicy.OPTIONAL,
+        custom_policies={}
     )
 
-    def __init__(self, host):
-        log.info(_("OMEMO plugin initialization (omemo module v{version})").format(
-            version=omemo.__version__))
-        version = tuple(map(int, omemo.__version__.split('.')[:3]))
-        if version < OMEMO_MIN_VER:
-            log.warning(_(
-                "Your version of omemo module is too old: {v[0]}.{v[1]}.{v[2]} is "
-                "minimum required, please update.").format(v=OMEMO_MIN_VER))
-            raise exceptions.CancelError("module is too old")
-        self.host = host
-        host.memory.updateParams(self.params)
-        self._p_hints = host.plugins["XEP-0334"]
-        self._p_carbons = host.plugins["XEP-0280"]
-        self._p = host.plugins["XEP-0060"]
-        self._m = host.plugins.get("XEP-0045")
-        self._sid = host.plugins.get("XEP-0359")
-        host.trigger.add("messageReceived", self._messageReceivedTrigger, priority=100050)
-        host.trigger.add("sendMessageData", self._sendMessageDataTrigger)
-        self.host.registerEncryptionPlugin(self, "OMEMO", NS_OMEMO, 100)
-        pep = host.plugins['XEP-0163']
-        pep.addPEPEvent(
-            "OMEMO_DEVICES", NS_OMEMO_DEVICES,
-            lambda itemsEvent, profile: defer.ensureDeferred(
-                self.onNewDevices(itemsEvent, profile))
+    def __init__(self, sat: SAT) -> None:
+        """
+        @param sat: The SAT instance.
+        """
+
+        self.__sat = sat
+
+        # Add configuration option to choose between manual trust and BTBV as the trust
+        # model
+        sat.memory.updateParams(DEFAULT_TRUST_MODEL_PARAM)
+
+        # Plugins
+        self.__xep_0045 = cast(Optional[XEP_0045], sat.plugins.get("XEP-0045"))
+        self.__xep_0334 = cast(XEP_0334, sat.plugins["XEP-0334"])
+        self.__xep_0359 = cast(Optional[XEP_0359], sat.plugins.get("XEP-0359"))
+        self.__xep_0420 = cast(XEP_0420, sat.plugins["XEP-0420"])
+
+        # In contrast to one to one messages, MUC messages are reflected to the sender.
+        # Thus, the sender does not add messages to their local message log when sending
+        # them, but when the reflection is received. This approach does not pair well with
+        # OMEMO, since for security reasons it is forbidden to encrypt messages for the
+        # own device. Thus, when the reflection of an OMEMO message is received, it can't
+        # be decrypted and added to the local message log as usual. To counteract this,
+        # the plaintext of encrypted messages sent to MUCs are cached in this field, such
+        # that when the reflection is received, the plaintext can be looked up from the
+        # cache and added to the local message log.
+        # TODO: The old plugin expired this cache after some time. I'm not sure that's
+        # really necessary.
+        self.__muc_plaintext_cache: Dict[MUCPlaintextCacheKey, bytes] = {}
+
+        # Mapping from profile name to corresponding session manager
+        self.__session_managers: Dict[str, omemo.SessionManager] = {}
+
+        # Calls waiting for a specific session manager to be built
+        self.__session_manager_waiters: Dict[str, List[defer.Deferred]] = {}
+
+        # These triggers are used by oldmemo, which doesn't do SCE and only applies to
+        # messages
+        sat.trigger.add(
+            "messageReceived",
+            self.__message_received_trigger,
+            priority=100050
         )
+        sat.trigger.add(
+            "sendMessageData",
+            self.__send_message_data_trigger,
+            priority=100050
+        )
+
+        # These triggers are used by twomemo, which does do SCE
+        sat.trigger.add("send", self.__send_trigger, priority=0)
+        # TODO: Add new triggers here for freshly received and about-to-be-sent stanzas,
+        # including IQs.
+
+        # Give twomemo a (slightly) higher priority than oldmemo
+        sat.registerEncryptionPlugin(self, "TWOMEMO", twomemo.twomemo.NAMESPACE, 101)
+        sat.registerEncryptionPlugin(self, "OLDMEMO", oldmemo.oldmemo.NAMESPACE, 100)
+
+        xep_0163 = cast(XEP_0163, sat.plugins["XEP-0163"])
+        xep_0163.addPEPEvent(
+            "TWOMEMO_DEVICES",
+            TWOMEMO_DEVICE_LIST_NODE,
+            lambda items_event, profile: defer.ensureDeferred(
+                self.__on_device_list_update(items_event, profile)
+            )
+        )
+        xep_0163.addPEPEvent(
+            "OLDMEMO_DEVICES",
+            OLDMEMO_DEVICE_LIST_NODE,
+            lambda items_event, profile: defer.ensureDeferred(
+                self.__on_device_list_update(items_event, profile)
+            )
+        )
+
         try:
-            self.text_cmds = self.host.plugins[C.TEXT_CMDS]
+            self.__text_commands = cast(TextCommands, sat.plugins[C.TEXT_CMDS])
         except KeyError:
             log.info(_("Text commands not available"))
         else:
-            self.text_cmds.registerTextCommands(self)
+            self.__text_commands.registerTextCommands(self)
 
-    # Text commands #
+    async def profileConnected(  # pylint: disable=invalid-name
+        self,
+        client: SatXMPPClient
+    ) -> None:
+        """
+        @param client: The client.
+        """
 
-    async def cmd_omemo_reset(self, client, mess_data):
-        """reset OMEMO session (use only if encryption is broken)
+        await self.__prepare_for_profile(cast(str, client.profile))
+
+    async def cmd_omemo_reset(
+        self,
+        client: SatXMPPClient,
+        mess_data: MessageData
+    ) -> Literal[False]:
+        """Reset all sessions of devices that belong to the recipient of ``mess_data``.
 
-        @command(one2one):
+        This must only be callable manually by the user. Use this when a session is
+        apparently broken, i.e. sending and receiving encrypted messages doesn't work and
+        something being wrong has been confirmed manually with the recipient.
+
+        @param client: The client.
+        @param mess_data: The message data, whose ``to`` attribute will be the bare JID to
+            reset all sessions with.
+        @return: The constant value ``False``, indicating to the text commands plugin that
+            the message is not supposed to be sent.
         """
-        if not client.encryption.isEncryptionRequested(mess_data, NS_OMEMO):
-            feedback = _(
-                "You need to have OMEMO encryption activated to reset the session")
-            self.text_cmds.feedBack(client, feedback, mess_data)
+
+        twomemo_requested = \
+            client.encryption.isEncryptionRequested(mess_data, twomemo.twomemo.NAMESPACE)
+        oldmemo_requested = \
+            client.encryption.isEncryptionRequested(mess_data, oldmemo.oldmemo.NAMESPACE)
+
+        if not (twomemo_requested or oldmemo_requested):
+            self.__text_commands.feedBack(
+                client,
+                _("You need to have OMEMO encryption activated to reset the session"),
+                mess_data
+            )
             return False
-        to_jid = mess_data["to"].userhostJID()
-        session = client._xep_0384_session
-        devices = await session.getDevices(to_jid)
+
+        bare_jid = mess_data["to"].userhost()
+
+        session_manager = await self.__prepare_for_profile(client.profile)
+        devices = await session_manager.get_device_information(bare_jid)
 
-        for device in devices['active']:
-            log.debug(f"deleting session for device {device}")
-            await session.deleteSession(to_jid, device=device)
+        for device in devices:
+            log.debug(f"Replacing sessions with device {device}")
+            await session_manager.replace_sessions(device)
 
-        log.debug("Sending an empty message to trigger key exchange")
-        await client.sendMessage(to_jid, {'': ''})
+        self.__text_commands.feedBack(
+            client,
+            _("OMEMO session has been reset"),
+            mess_data
+        )
 
-        feedback = _("OMEMO session has been reset")
-        self.text_cmds.feedBack(client, feedback, mess_data)
         return False
 
-    async def trustUICb(
-            self, xmlui_data, trust_data, expect_problems=None, profile=C.PROF_KEY_NONE):
-        if C.bool(xmlui_data.get('cancelled', 'false')):
-            return {}
-        client = self.host.getClient(profile)
-        session = client._xep_0384_session
-        stored_data = client._xep_0384_data
-        manual_trust = await stored_data.get(KEY_MANUAL_TRUST, set())
-        auto_trusted_cache = {}
-        answer = xml_tools.XMLUIResult2DataFormResult(xmlui_data)
-        blind_trust = C.bool(answer.get('blind_trust', C.BOOL_FALSE))
-        for key, value in answer.items():
-            if key.startswith('trust_'):
-                trust_id = key[6:]
-            else:
-                continue
-            data = trust_data[trust_id]
-            if blind_trust:
-                # user request to restore blind trust for this entity
-                # so if the entity is present in manual trust, we remove it
-                if data["jid"].full() in manual_trust:
-                    manual_trust.remove(data["jid"].full())
-                    await stored_data.aset(KEY_MANUAL_TRUST, manual_trust)
-            elif data["jid"].full() not in manual_trust:
-                # validating this trust UI implies that we activate manual mode for
-                # this entity (used for BTBV policy)
-                manual_trust.add(data["jid"].full())
-                await stored_data.aset(KEY_MANUAL_TRUST, manual_trust)
-            trust = C.bool(value)
+    async def getTrustUI(  # pylint: disable=invalid-name
+        self,
+        client: SatXMPPClient,
+        entity: jid.JID
+    ) -> xml_tools.XMLUI:
+        """
+        @param client: The client.
+        @param entity: The entity whose device trust levels to manage.
+        @return: An XMLUI instance which opens a form to manage the trust level of all
+            devices belonging to the entity.
+        """
+
+        if entity.resource:
+            raise ValueError("A bare JID is expected.")
 
-            if not trust:
-                # if device is not trusted, we check if it must be removed from auto
-                # trusted devices list
-                bare_jid_s = data['jid'].userhost()
-                key = f"{KEY_AUTO_TRUST}\n{bare_jid_s}"
-                if bare_jid_s not in auto_trusted_cache:
-                    auto_trusted_cache[bare_jid_s] = await stored_data.get(
-                        key, default=set())
-                auto_trusted = auto_trusted_cache[bare_jid_s]
-                if data['device'] in auto_trusted:
-                    # as we don't trust this device anymore, we can remove it from the
-                    # list of automatically trusted devices
-                    auto_trusted.remove(data['device'])
-                    await stored_data.aset(key, auto_trusted)
-                    log.info(D_(
-                        "device {device} from {peer_jid} is not an auto-trusted device "
-                        "anymore").format(device=data['device'], peer_jid=bare_jid_s))
+        bare_jids: Set[str]
+        if self.__xep_0045 is not None and self.__xep_0045.isJoinedRoom(client, entity):
+            bare_jids = self.__get_joined_muc_users(client, self.__xep_0045, entity)
+        else:
+            bare_jids = { entity.userhost() }
+
+        session_manager = await self.__prepare_for_profile(client.profile)
 
-            await session.setTrust(
-                data["jid"],
-                data["device"],
-                data["ik"],
-                trusted=trust,
-            )
-            if not trust and expect_problems is not None:
-                expect_problems.setdefault(data['jid'].userhost(), set()).add(
-                    data['device']
-                )
-        return {}
-
-    async def getTrustUI(self, client, entity_jid=None, trust_data=None, submit_id=None):
-        """Generate a XMLUI to manage trust
+        # At least sort the devices by bare JID such that they aren't listed completely
+        # random
+        devices = sorted(cast(Set[omemo.DeviceInformation], set()).union(*[
+            await session_manager.get_device_information(bare_jid)
+            for bare_jid
+            in bare_jids
+        ]), key=lambda device: device.bare_jid)
 
-        @param entity_jid(None, jid.JID): jid of entity to manage
-            None to use trust_data
-        @param trust_data(None, dict): devices data:
-            None to use entity_jid
-            else a dict mapping from trust ids (unicode) to devices data,
-            where a device data must have the following keys:
-                - jid(jid.JID): bare jid of the device owner
-                - device(int): device id
-                - ik(bytes): identity key
-            and may have the following key:
-                - trusted(bool): True if device is trusted
-        @param submit_id(None, unicode): submit_id to use
-            if None set UI callback to trustUICb
-        @return D(xmlui): trust management form
-        """
-        # we need entity_jid xor trust_data
-        assert entity_jid and not trust_data or not entity_jid and trust_data
-        if entity_jid and entity_jid.resource:
-            raise ValueError("A bare jid is expected")
+        async def callback(
+            data: Any,
+            profile: str  # pylint: disable=unused-argument
+        ) -> Dict[Never, Never]:
+            """
+            @param data: The XMLUI result produces by the trust UI form.
+            @param profile: The profile.
+            @return: An empty dictionary. The type of the return value was chosen
+                conservatively since the exact options are neither known not needed here.
+            """
 
-        session = client._xep_0384_session
-        stored_data = client._xep_0384_data
+            if C.bool(data.get("cancelled", "false")):
+                return {}
+
+            data_form_result = cast(
+                Dict[str, str],
+                xml_tools.XMLUIResult2DataFormResult(data)
+            )
+            for key, value in data_form_result.items():
+                if not key.startswith("trust_"):
+                    continue
 
-        if trust_data is None:
-            cache = client._xep_0384_cache.setdefault(entity_jid, {})
-            trust_data = {}
-            if self._m is not None and self._m.isJoinedRoom(client, entity_jid):
-                trust_jids = self.getJIDsForRoom(client, entity_jid)
-            else:
-                trust_jids = [entity_jid]
-            for trust_jid in trust_jids:
-                trust_session_data = await session.getTrustForJID(trust_jid)
-                bare_jid_s = trust_jid.userhost()
-                for device_id, trust_info in trust_session_data['active'].items():
-                    if trust_info is None:
-                        # device has never been (un)trusted, we have to retrieve its
-                        # fingerprint (i.e. identity key or "ik") through public bundle
-                        if device_id not in cache:
-                            bundles, missing = await self.getBundles(client,
-                                                                     trust_jid,
-                                                                     [device_id])
-                            if device_id not in bundles:
-                                log.warning(_(
-                                    "Can't find bundle for device {device_id} of user "
-                                    "{bare_jid}, ignoring").format(device_id=device_id,
-                                                                    bare_jid=bare_jid_s))
-                                continue
-                            cache[device_id] = bundles[device_id]
-                        # TODO: replace False below by None when undecided
-                        #       trusts are handled
-                        trust_info = {
-                            "key": cache[device_id].ik,
-                            "trusted": False
-                        }
+                device = devices[int(key[len("trust_"):])]
+                trust = TrustLevel(value)
+
+                if TrustLevel(device.trust_level_name) is not trust:
+                    await session_manager.set_trust(
+                        device.bare_jid,
+                        device.identity_key,
+                        value
+                    )
+
+            return {}
 
-                    ik = trust_info["key"]
-                    trust_id = str(hash((bare_jid_s, device_id, ik)))
-                    trust_data[trust_id] = {
-                        "jid": trust_jid,
-                        "device": device_id,
-                        "ik": ik,
-                        "trusted": trust_info["trusted"],
-                        }
+        submit_id = self.__sat.registerCallback(callback, with_data=True, one_shot=True)
 
-        if submit_id is None:
-            submit_id = self.host.registerCallback(
-                lambda data, profile: defer.ensureDeferred(
-                    self.trustUICb(data, trust_data=trust_data, profile=profile)),
-                with_data=True,
-                one_shot=True)
-        xmlui = xml_tools.XMLUI(
-            panel_type = C.XMLUI_FORM,
-            title = D_("OMEMO trust management"),
-            submit_id = submit_id
+        result = xml_tools.XMLUI(
+            panel_type=C.XMLUI_FORM,
+            title=D_("OMEMO trust management"),
+            submit_id=submit_id
         )
-        xmlui.addText(D_(
+        # Casting this to Any, otherwise all calls on the variable cause type errors
+        # pylint: disable=no-member
+        trust_ui = cast(Any, result)
+        trust_ui.addText(D_(
             "This is OMEMO trusting system. You'll see below the devices of your "
-            "contacts, and a checkbox to trust them or not. A trusted device "
+            "contacts, and a list selection to trust them or not. A trusted device "
             "can read your messages in plain text, so be sure to only validate "
             "devices that you are sure are belonging to your contact. It's better "
-            "to do this when you are next to your contact and her/his device, so "
+            "to do this when you are next to your contact and their device, so "
             "you can check the \"fingerprint\" (the number next to the device) "
-            "yourself. Do *not* validate a device if the fingerprint is wrong!"))
+            "yourself. Do *not* validate a device if the fingerprint is wrong!"
+        ))
+
+        own_device, __ = await session_manager.get_own_device_information()
 
-        xmlui.changeContainer("label")
-        xmlui.addLabel(D_("This device ID"))
-        xmlui.addText(str(client._xep_0384_device_id))
-        xmlui.addLabel(D_("This device fingerprint"))
-        ik_hex = session.public_bundle.ik.hex().upper()
-        fp_human = ' '.join([ik_hex[i:i+8] for i in range(0, len(ik_hex), 8)])
-        xmlui.addText(fp_human)
-        xmlui.addEmpty()
-        xmlui.addEmpty()
-
-        if entity_jid is not None:
-            omemo_policy = self.host.memory.getParamA(
-                PARAM_NAME, PARAM_CATEGORY, profile_key=client.profile
-            )
-            if omemo_policy == 'btbv':
-                xmlui.addLabel(D_("Automatically trust new devices?"))
-                # blind trust is always disabled when UI is requested
-                # as submitting UI is a verification which should disable it.
-                xmlui.addBool("blind_trust", value=C.BOOL_FALSE)
-                xmlui.addEmpty()
-                xmlui.addEmpty()
+        trust_ui.changeContainer("label")
+        trust_ui.addLabel(D_("This device ID"))
+        trust_ui.addText(str(own_device.device_id))
+        trust_ui.addLabel(D_("This device's fingerprint"))
+        trust_ui.addText(" ".join(session_manager.format_identity_key(
+            own_device.identity_key
+        )))
+        trust_ui.addEmpty()
+        trust_ui.addEmpty()
 
-        auto_trust_cache = {}
+        for index, device in enumerate(devices):
+            trust_ui.addLabel(D_("Contact"))
+            trust_ui.addJid(jid.JID(device.bare_jid))
+            trust_ui.addLabel(D_("Device ID"))
+            trust_ui.addText(str(device.device_id))
+            trust_ui.addLabel(D_("Fingerprint"))
+            trust_ui.addText(" ".join(session_manager.format_identity_key(
+                device.identity_key
+            )))
+            trust_ui.addLabel(D_("Trust this device?"))
 
-        for trust_id, data in trust_data.items():
-            bare_jid_s = data['jid'].userhost()
-            if bare_jid_s not in auto_trust_cache:
-                key = f"{KEY_AUTO_TRUST}\n{bare_jid_s}"
-                auto_trust_cache[bare_jid_s] = await stored_data.get(key, set())
-            xmlui.addLabel(D_("Contact"))
-            xmlui.addJid(data['jid'])
-            xmlui.addLabel(D_("Device ID"))
-            xmlui.addText(str(data['device']))
-            xmlui.addLabel(D_("Fingerprint"))
-            ik_hex = data['ik'].hex().upper()
-            fp_human = ' '.join([ik_hex[i:i+8] for i in range(0, len(ik_hex), 8)])
-            xmlui.addText(fp_human)
-            xmlui.addLabel(D_("Trust this device?"))
-            xmlui.addBool("trust_{}".format(trust_id),
-                          value=C.boolConst(data.get('trusted', False)))
-            if data['device'] in auto_trust_cache[bare_jid_s]:
-                xmlui.addEmpty()
-                xmlui.addLabel(D_("(automatically trusted)"))
+            current_trust_level = TrustLevel(device.trust_level_name)
+            avaiable_trust_levels = \
+                { TrustLevel.DISTRUSTED, TrustLevel.TRUSTED, current_trust_level }
 
-
-            xmlui.addEmpty()
-            xmlui.addEmpty()
-
-        return xmlui
+            trust_ui.addList(
+                f"trust_{index}",
+                options=[ trust_level.name for trust_level in avaiable_trust_levels ],
+                selected=current_trust_level.name,
+                styles=[ "inline" ]
+            )
 
-    async def profileConnected(self, client):
-        if self._m is not None:
-            # we keep plain text message for MUC messages we send
-            # as we can't encrypt for our own device
-            client._xep_0384_muc_cache = {}
-            # and we keep them only for some time, in case something goes wrong
-            # with the MUC
-            client._xep_0384_muc_cache_timer = None
+            twomemo_active = dict(device.active).get(twomemo.twomemo.NAMESPACE)
+            if twomemo_active is None:
+                trust_ui.addEmpty()
+                trust_ui.addLabel(D_("(not available for Twomemo)"))
+            if twomemo_active is False:
+                trust_ui.addEmpty()
+                trust_ui.addLabel(D_("(inactive for Twomemo)"))
 
-        # FIXME: is _xep_0384_ready needed? can we use profileConnecting?
-        #        Workflow should be checked
-        client._xep_0384_ready = defer.Deferred()
-        # we first need to get devices ids (including our own)
-        persistent_dict = persistent.LazyPersistentBinaryDict("XEP-0384", client.profile)
-        client._xep_0384_data = persistent_dict
-        # all known devices of profile
-        devices = await self.getDevices(client)
-        # and our own device id
-        device_id = await persistent_dict.get(KEY_DEVICE_ID)
-        if device_id is None:
-            log.info(_("We have no identity for this device yet, let's generate one"))
-            # we have a new device, we create device_id
-            device_id = random.randint(1, 2**31-1)
-            # we check that it's really unique
-            while device_id in devices:
-                device_id = random.randint(1, 2**31-1)
-            # and we save it
-            await persistent_dict.aset(KEY_DEVICE_ID, device_id)
+            oldmemo_active = dict(device.active).get(oldmemo.oldmemo.NAMESPACE)
+            if oldmemo_active is None:
+                trust_ui.addEmpty()
+                trust_ui.addLabel(D_("(not available for Oldmemo)"))
+            if oldmemo_active is False:
+                trust_ui.addEmpty()
+                trust_ui.addLabel(D_("(inactive for Oldmemo)"))
+
+            trust_ui.addEmpty()
+            trust_ui.addEmpty()
+
+        return result
 
-        log.debug(f"our OMEMO device id is {device_id}")
-
-        if device_id not in devices:
-            log.debug(f"our device id ({device_id}) is not in the list, adding it")
-            devices.add(device_id)
-            await defer.ensureDeferred(self.setDevices(client, devices))
-
-        all_jids = await persistent_dict.get(KEY_ALL_JIDS, set())
+    @staticmethod
+    def __get_joined_muc_users(
+        client: SatXMPPClient,
+        xep_0045: XEP_0045,
+        room_jid: jid.JID
+    ) -> Set[str]:
+        """
+        @param client: The client.
+        @param xep_0045: A MUC plugin instance.
+        @param room_jid: The room JID.
+        @return: A set containing the bare JIDs of the MUC participants.
+        @raise InternalError: if the MUC is not joined or the entity information of a
+            participant isn't available.
+        """
 
-        omemo_storage = OmemoStorage(client, device_id, all_jids)
-        omemo_session = await OmemoSession.create(client, omemo_storage, device_id)
-        client._xep_0384_cache = {}
-        client._xep_0384_session = omemo_session
-        client._xep_0384_device_id = device_id
-        await omemo_session.newDeviceList(client.jid, devices)
-        if omemo_session.republish_bundle:
-            log.info(_("Saving public bundle for this device ({device_id})").format(
-                device_id=device_id))
-            await defer.ensureDeferred(
-                self.setBundle(client, omemo_session.public_bundle, device_id)
-            )
-        client._xep_0384_ready.callback(None)
-        del client._xep_0384_ready
+        bare_jids: Set[str] = set()
 
-
-    ## XMPP PEP nodes manipulation
-
-    # devices
-
-    def parseDevices(self, items):
-        """Parse devices found in items
+        try:
+            room = cast(muc.Room, xep_0045.getRoom(client, room_jid))
+        except exceptions.NotFound as e:
+            raise exceptions.InternalError(
+                "Participant list of unjoined MUC requested."
+            ) from e
 
-        @param items(iterable[domish.Element]): items as retrieved by getItems
-        @return set[int]: parsed devices
+        for user in cast(Dict[str, muc.User], room.roster).values():
+            entity = cast(Optional[SatXMPPEntity], user.entity)
+            if entity is None:
+                raise exceptions.InternalError(
+                    f"Participant list of MUC requested, but the entity information of"
+                    f" the participant {user} is not available."
+                )
+
+            bare_jids.add(entity.jid.userhost())
+
+        return bare_jids
+
+    async def __prepare_for_profile(self, profile: str) -> omemo.SessionManager:
         """
-        devices = set()
-        if len(items) > 1:
-            log.warning(_("OMEMO devices list is stored in more that one items, "
-                          "this is not expected"))
-        if items:
-            try:
-                list_elt = next(items[0].elements(NS_OMEMO, 'list'))
-            except StopIteration:
-                log.warning(_("no list element found in OMEMO devices list"))
-                return devices
-            for device_elt in list_elt.elements(NS_OMEMO, 'device'):
-                try:
-                    device_id = int(device_elt['id'])
-                except KeyError:
-                    log.warning(_('device element is missing "id" attribute: {elt}')
-                                .format(elt=device_elt.toXml()))
-                except ValueError:
-                    log.warning(_('invalid device id: {device_id}').format(
-                        device_id=device_elt['id']))
-                else:
-                    devices.add(device_id)
-        return devices
+        @param profile: The profile to prepare for.
+        @return: A session manager instance for this profile. Creates a new instance if
+            none was prepared before.
+        """
 
-    async def getDevices(self, client, entity_jid=None):
-        """Retrieve list of registered OMEMO devices
-
-        @param entity_jid(jid.JID, None): get devices from this entity
-            None to get our own devices
-        @return (set(int)): list of devices
-        """
-        if entity_jid is not None:
-            assert not entity_jid.resource
-        try:
-            items, metadata = await self._p.getItems(client, entity_jid, NS_OMEMO_DEVICES)
-        except exceptions.NotFound:
-            log.info(_("there is no node to handle OMEMO devices"))
-            return set()
-
-        devices = self.parseDevices(items)
-        return devices
-
-    async def setDevices(self, client, devices):
-        log.debug(f"setting devices with {', '.join(str(d) for d in devices)}")
-        list_elt = domish.Element((NS_OMEMO, 'list'))
-        for device in devices:
-            device_elt = list_elt.addElement('device')
-            device_elt['id'] = str(device)
         try:
-            await self._p.sendItem(
-                client, None, NS_OMEMO_DEVICES, list_elt,
-                item_id=self._p.ID_SINGLETON,
-                extra={
-                    self._p.EXTRA_PUBLISH_OPTIONS: {self._p.OPT_MAX_ITEMS: 1},
-                    self._p.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options",
-                }
-            )
-        except Exception as e:
-            log.warning(_("Can't set devices: {reason}").format(reason=e))
+            # Try to return the session manager
+            return self.__session_managers[profile]
+        except KeyError:
+            # If a session manager for that profile doesn't exist yet, check whether it is
+            # currently being built. A session manager being built is signified by the
+            # profile key existing on __session_manager_waiters.
+            if profile in self.__session_manager_waiters:
+                # If the session manager is being built, add ourselves to the waiting
+                # queue
+                deferred = defer.Deferred()
+                self.__session_manager_waiters[profile].append(deferred)
+                return cast(omemo.SessionManager, await deferred)
+
+            # If the session manager is not being built, do so here.
+            self.__session_manager_waiters[profile] = []
 
-    # bundles
-
-    async def getBundles(self, client, entity_jid, devices_ids):
-        """Retrieve public bundles of an entity devices
+            # Build and store the session manager
+            session_manager = await prepare_for_profile(
+                self.__sat,
+                profile,
+                initial_own_label="Libervia"
+            )
+            self.__session_managers[profile] = session_manager
 
-        @param entity_jid(jid.JID): bare jid of entity
-        @param devices_id(iterable[int]): ids of the devices bundles to retrieve
-        @return (tuple(dict[int, ExtendedPublicBundle], list(int))):
-            - bundles collection:
-                * key is device_id
-                * value is parsed bundle
-            - set of bundles not found
+            # Notify the waiters and delete them
+            for waiter in self.__session_manager_waiters[profile]:
+                waiter.callback(session_manager)
+            del self.__session_manager_waiters[profile]
+
+            return session_manager
+
+    async def __message_received_trigger(
+        self,
+        client: SatXMPPClient,
+        message_elt: domish.Element,
+        post_treat: defer.Deferred
+    ) -> bool:
         """
-        assert not entity_jid.resource
-        bundles = {}
-        missing = set()
-        for device_id in devices_ids:
-            node = NS_OMEMO_BUNDLE.format(device_id=device_id)
-            try:
-                items, metadata = await self._p.getItems(client, entity_jid, node)
-            except exceptions.NotFound:
-                log.warning(_("Bundle missing for device {device_id}")
-                    .format(device_id=device_id))
-                missing.add(device_id)
-                continue
-            except jabber_error.StanzaError as e:
-                log.warning(_("Can't get bundle for device {device_id}: {reason}")
-                    .format(device_id=device_id, reason=e))
-                continue
-            if not items:
-                log.warning(_("no item found in node {node}, can't get public bundle "
-                              "for device {device_id}").format(node=node,
-                                                                device_id=device_id))
-                continue
-            if len(items) > 1:
-                log.warning(_("more than one item found in {node}, "
-                              "this is not expected").format(node=node))
-            item = items[0]
-            try:
-                bundle_elt = next(item.elements(NS_OMEMO, 'bundle'))
-                signedPreKeyPublic_elt = next(bundle_elt.elements(
-                    NS_OMEMO, 'signedPreKeyPublic'))
-                signedPreKeySignature_elt = next(bundle_elt.elements(
-                    NS_OMEMO, 'signedPreKeySignature'))
-                identityKey_elt = next(bundle_elt.elements(
-                    NS_OMEMO, 'identityKey'))
-                prekeys_elt =  next(bundle_elt.elements(
-                    NS_OMEMO, 'prekeys'))
-            except StopIteration:
-                log.warning(_("invalid bundle for device {device_id}, ignoring").format(
-                    device_id=device_id))
-                continue
+        @param client: The client which received the message.
+        @param message_elt: The message element. Can be modified.
+        @param post_treat: A deferred which evaluates to a :class:`MessageData` once the
+            message has fully progressed through the message receiving flow. Can be used
+            to apply treatments to the fully processed message, like marking it as
+            encrypted.
+        @return: Whether to continue the message received flow.
+        """
+
+        muc_plaintext_cache_key: Optional[MUCPlaintextCacheKey] = None
+
+        sender_jid = jid.JID(message_elt["from"])
+        feedback_jid: jid.JID
+
+        message_type = message_elt.getAttribute("type", "unknown")
+        is_muc_message = message_type == C.MESS_TYPE_GROUPCHAT
+        if is_muc_message:
+            if self.__xep_0045 is None:
+                log.warning(
+                    "Ignoring MUC message since plugin XEP-0045 is not available."
+                )
+                # Can't handle a MUC message without XEP-0045, let the flow continue
+                # normally
+                return True
+
+            room_jid = feedback_jid = sender_jid.userhostJID()
 
             try:
-                spkPublic = base64.b64decode(str(signedPreKeyPublic_elt))
-                spkSignature = base64.b64decode(
-                    str(signedPreKeySignature_elt))
+                room = cast(muc.Room, self.__xep_0045.getRoom(client, room_jid))
+            except exceptions.NotFound:
+                log.warning(
+                    f"Ignoring MUC message from a room that has not been joined:"
+                    f" {room_jid}"
+                )
+                # Whatever, let the flow continue
+                return True
+
+            sender_user = cast(Optional[muc.User], room.getUser(sender_jid.resource))
+            if sender_user is None:
+                log.warning(
+                    f"Ignoring MUC message from room {room_jid} since the sender's user"
+                    f" wasn't found {sender_jid.resource}"
+                )
+                # Whatever, let the flow continue
+                return True
+
+            sender_user_jid = cast(Optional[jid.JID], sender_user.entity)
+            if sender_user_jid is None:
+                log.warning(
+                    f"Ignoring MUC message from room {room_jid} since the sender's bare"
+                    f" JID couldn't be found from its user information: {sender_user}"
+                )
+                # Whatever, let the flow continue
+                return True
+
+            sender_jid = sender_user_jid
+
+            message_uid: Optional[str] = None
+            if self.__xep_0359 is not None:
+                message_uid = self.__xep_0359.getOriginId(message_elt)
+            if message_uid is None:
+                message_uid = message_elt.getAttribute("id")
+            if message_uid is not None:
+                muc_plaintext_cache_key = MUCPlaintextCacheKey(
+                    client,
+                    room_jid,
+                    message_uid
+                )
+        else:
+            # I'm not sure why this check is required, this code is copied from the old
+            # plugin.
+            if sender_jid.userhostJID() == client.jid.userhostJID():
+                # TODO: I've seen this cause an exception "builtins.KeyError: 'to'", seems
+                # like "to" isn't always set.
+                feedback_jid = jid.JID(message_elt["to"])
+            else:
+                feedback_jid = sender_jid
+
+        sender_bare_jid = sender_jid.userhost()
+
+        message: Optional[omemo.Message] = None
+        encrypted_elt: Optional[domish.Element] = None
+
+        twomemo_encrypted_elt = cast(Optional[domish.Element], next(
+            message_elt.elements(twomemo.twomemo.NAMESPACE, "encrypted"),
+            None
+        ))
+
+        oldmemo_encrypted_elt = cast(Optional[domish.Element], next(
+            message_elt.elements(oldmemo.oldmemo.NAMESPACE, "encrypted"),
+            None
+        ))
+
+        session_manager = await self.__prepare_for_profile(cast(str, client.profile))
+
+        if twomemo_encrypted_elt is not None:
+            try:
+                message = twomemo.etree.parse_message(
+                    domish_to_etree(twomemo_encrypted_elt),
+                    sender_bare_jid
+                )
+            except (ValueError, XMLSchemaValidationError):
+                log.warning(
+                    f"Ingoring malformed encrypted message for namespace"
+                    f" {twomemo.twomemo.NAMESPACE}: {twomemo_encrypted_elt.toXml()}"
+                )
+            else:
+                encrypted_elt = twomemo_encrypted_elt
+
+        if oldmemo_encrypted_elt is not None:
+            try:
+                message = await oldmemo.etree.parse_message(
+                    domish_to_etree(oldmemo_encrypted_elt),
+                    sender_bare_jid,
+                    client.jid.userhost(),
+                    session_manager
+                )
+            except (ValueError, XMLSchemaValidationError):
+                log.warning(
+                    f"Ingoring malformed encrypted message for namespace"
+                    f" {oldmemo.oldmemo.NAMESPACE}: {oldmemo_encrypted_elt.toXml()}"
+                )
+            except omemo.SenderNotFound:
+                log.warning(
+                    f"Ingoring encrypted message for namespace"
+                    f" {oldmemo.oldmemo.NAMESPACE} by unknown sender:"
+                    f" {oldmemo_encrypted_elt.toXml()}"
+                )
+            else:
+                encrypted_elt = oldmemo_encrypted_elt
+
+        if message is None or encrypted_elt is None:
+            # None of our business, let the flow continue
+            return True
+
+        message_elt.children.remove(encrypted_elt)
 
-                ik = base64.b64decode(str(identityKey_elt))
-                spk = {
-                    "key": spkPublic,
-                    "id": int(signedPreKeyPublic_elt['signedPreKeyId'])
-                }
-                otpks = []
-                for preKeyPublic_elt in prekeys_elt.elements(NS_OMEMO, 'preKeyPublic'):
-                    preKeyPublic = base64.b64decode(str(preKeyPublic_elt))
-                    otpk = {
-                        "key": preKeyPublic,
-                        "id": int(preKeyPublic_elt['preKeyId'])
-                    }
-                    otpks.append(otpk)
+        log.debug(
+            f"{message.namespace} message of type {message_type} received from"
+            f" {sender_bare_jid}"
+        )
+
+        plaintext: Optional[bytes]
+        device_information: omemo.DeviceInformation
+
+        if (
+            muc_plaintext_cache_key is not None
+            and muc_plaintext_cache_key in self.__muc_plaintext_cache
+        ):
+            # Use the cached plaintext
+            plaintext = self.__muc_plaintext_cache.pop(muc_plaintext_cache_key)
+
+            # Since this message was sent by us, use the own device information here
+            device_information, __ = await session_manager.get_own_device_information()
+        else:
+            try:
+                plaintext, device_information = await session_manager.decrypt(message)
+            except omemo.MessageNotForUs:
+                # The difference between this being a debug or a warning is whether there
+                # is a body included in the message. Without a body, we can assume that
+                # it's an empty OMEMO message used for protocol stability reasons, which
+                # is not expected to be sent to all devices of all recipients. If a body
+                # is included, we can assume that the message carries content and we
+                # missed out on something.
+                if len(list(message_elt.elements(C.NS_CLIENT, "body"))) > 0:
+                    client.feedback(
+                        feedback_jid,
+                        D_(
+                            f"An OMEMO message from {sender_jid.full()} has not been"
+                            f" encrypted for our device, we can't decrypt it."
+                        ),
+                        { C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR }
+                    )
+                    log.warning("Message not encrypted for us.")
+                else:
+                    log.debug("Message not encrypted for us.")
 
+                # No point in further processing this message.
+                return False
             except Exception as e:
-                log.warning(_("error while decoding key for device {device_id}: {msg}")
-                            .format(device_id=device_id, msg=e))
-                continue
+                log.warning(_("Can't decrypt message: {reason}\n{xml}").format(
+                    reason=e,
+                    xml=message_elt.toXml()
+                ))
+                client.feedback(
+                    feedback_jid,
+                    D_(
+                        f"An OMEMO message from {sender_jid.full()} can't be decrypted:"
+                        f" {e}"
+                    ),
+                    { C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR }
+                )
+                # No point in further processing this message
+                return False
 
-            bundles[device_id] = ExtendedPublicBundle.parse(omemo_backend, ik, spk,
-                                                            spkSignature, otpks)
+        if message.namespace == twomemo.twomemo.NAMESPACE:
+            if plaintext is not None:
+                # XEP_0420.unpack_stanza handles the whole unpacking, including the
+                # relevant modifications to the element
+                sce_profile = \
+                    OMEMO.SCE_PROFILE_GROUPCHAT if is_muc_message else OMEMO.SCE_PROFILE
+                try:
+                    affix_values = self.__xep_0420.unpack_stanza(
+                        sce_profile,
+                        message_elt,
+                        plaintext
+                    )
+                except Exception as e:
+                    log.warning(D_(
+                        f"Error unpacking SCE-encrypted message: {e}\n{plaintext}"
+                    ))
+                    client.feedback(
+                        feedback_jid,
+                        D_(
+                            f"An OMEMO message from {sender_jid.full()} was rejected:"
+                            f" {e}"
+                        ),
+                        { C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR }
+                    )
+                    # No point in further processing this message
+                    return False
 
-        return (bundles, missing)
+                if affix_values.timestamp is not None:
+                    # TODO: affix_values.timestamp contains the timestamp included in the
+                    # encrypted element here. The XEP says it SHOULD be displayed with the
+                    # plaintext by clients.
+                    pass
 
-    async def setBundle(self, client, bundle, device_id):
-        """Set public bundle for this device.
+        if message.namespace == oldmemo.oldmemo.NAMESPACE:
+            # Remove all body elements from the original element, since those act as
+            # fallbacks in case the encryption protocol is not supported
+            for child in message_elt.elements():
+                if child.name == "body":
+                    message_elt.children.remove(child)
+
+            if plaintext is not None:
+                # Add the decrypted body
+                message_elt.addElement("body", content=plaintext.decode("utf-8"))
+
+        # Mark the message as trusted or untrusted. Undecided counts as untrusted here.
+        trust_level = \
+            TrustLevel(device_information.trust_level_name).to_omemo_trust_level()
+        if trust_level is omemo.TrustLevel.TRUSTED:
+            post_treat.addCallback(client.encryption.markAsTrusted)
+        else:
+            post_treat.addCallback(client.encryption.markAsUntrusted)
 
-        @param bundle(ExtendedPublicBundle): bundle to publish
+        # Mark the message as originally encrypted
+        post_treat.addCallback(
+            client.encryption.markAsEncrypted,
+            namespace=message.namespace
+        )
+
+        # Message processed successfully, continue with the flow
+        return True
+
+    async def __send_trigger(self, client: SatXMPPClient, stanza: domish.Element) -> bool:
+        """
+        @param client: The client sending this message.
+        @param stanza: The stanza that is about to be sent. Can be modified.
+        @return: Whether the send message flow should continue or not.
         """
-        log.debug(_("updating bundle for {device_id}").format(device_id=device_id))
-        bundle = bundle.serialize(omemo_backend)
-        bundle_elt = domish.Element((NS_OMEMO, 'bundle'))
-        signedPreKeyPublic_elt = bundle_elt.addElement(
-            "signedPreKeyPublic",
-            content=b64enc(bundle["spk"]['key']))
-        signedPreKeyPublic_elt['signedPreKeyId'] = str(bundle["spk"]['id'])
+
+        # SCE is only applicable to message and IQ stanzas
+        if stanza.name not in { "message", "iq" }:
+            return True
+
+        # Get the intended recipient
+        recipient = stanza.getAttribute("to", None)
+        if recipient is None:
+            if stanza.name == "message":
+                # Message stanzas must have a recipient
+                raise exceptions.InternalError(
+                    f"Message without recipient encountered. Blocking further processing"
+                    f" to avoid leaking plaintext data: {stanza.toXml()}"
+                )
+
+            # IQs without a recipient are a thing, I believe those simply target the
+            # server and are thus not eligible for e2ee anyway.
+            return True
+
+        # Parse the JID
+        recipient_bare_jid = jid.JID(recipient).userhostJID()
+
+        # Check whether encryption with twomemo is requested
+        encryption = client.encryption.getSession(recipient_bare_jid)
+
+        if encryption is None:
+            # Encryption is not requested for this recipient
+            return True
+
+        if encryption["plugin"].namespace != twomemo.twomemo.NAMESPACE:
+            # Encryption is requested for this recipient, but not with twomemo
+            return True
+
+        # All pre-checks done, we can start encrypting!
+        await self.__encrypt(
+            client,
+            twomemo.twomemo.NAMESPACE,
+            stanza,
+            recipient_bare_jid,
+            stanza.getAttribute("type", "unkown") == C.MESS_TYPE_GROUPCHAT,
+            stanza.getAttribute("id", None)
+        )
 
-        bundle_elt.addElement(
-            "signedPreKeySignature",
-            content=b64enc(bundle["spk_signature"]))
+        # Add a store hint if this is a message stanza
+        if stanza.name == "message":
+            self.__xep_0334.addHintElements(stanza, [ "store" ])
+
+        # Let the flow continue.
+        return True
+
+    async def __send_message_data_trigger(
+        self,
+        client: SatXMPPClient,
+        mess_data: MessageData
+    ) -> None:
+        """
+        @param client: The client sending this message.
+        @param mess_data: The message data that is about to be sent. Can be modified.
+        """
+
+        # Check whether encryption is requested for this message
+        try:
+            namespace = mess_data[C.MESS_KEY_ENCRYPTION]["plugin"].namespace
+        except KeyError:
+            return
+
+        # If encryption is requested, check whether it's oldmemo
+        if namespace != oldmemo.oldmemo.NAMESPACE:
+            return
+
+        # All pre-checks done, we can start encrypting!
+        stanza = mess_data["xml"]
+        recipient_jid = mess_data["to"]
+        is_muc_message = mess_data["type"] == C.MESS_TYPE_GROUPCHAT
+        stanza_id = mess_data["uid"]
+
+        await self.__encrypt(
+            client,
+            oldmemo.oldmemo.NAMESPACE,
+            stanza,
+            recipient_jid,
+            is_muc_message,
+            stanza_id
+        )
+
+        # Add a store hint
+        self.__xep_0334.addHintElements(stanza, [ "store" ])
 
-        bundle_elt.addElement(
-            "identityKey",
-            content=b64enc(bundle["ik"]))
+    async def __encrypt(
+        self,
+        client: SatXMPPClient,
+        namespace: Literal["urn:xmpp:omemo:2", "eu.siacs.conversations.axolotl"],
+        stanza: domish.Element,
+        recipient_jid: jid.JID,
+        is_muc_message: bool,
+        stanza_id: Optional[str]
+    ) -> None:
+        """
+        @param client: The client.
+        @param namespace: The namespace of the OMEMO version to use.
+        @param stanza: The stanza. Twomemo will encrypt the whole stanza using SCE,
+            oldmemo will encrypt only the body. The stanza is modified by this call.
+        @param recipient_jid: The JID of the recipient. Can be a bare (aka "userhost") JID
+            but doesn't have to.
+        @param is_muc_message: Whether the stanza is a message stanza to a MUC room.
+        @param stanza_id: The id of this stanza. Especially relevant for message stanzas
+            to MUC rooms such that the outgoing plaintext can be cached for MUC message
+            reflection handling.
+
+        @warning: The calling code MUST take care of adding the store message processing
+            hint to the stanza if applicable! This can be done before or after this call,
+            the order doesn't matter.
+        """
+
+        muc_plaintext_cache_key: Optional[MUCPlaintextCacheKey] = None
+
+        recipient_bare_jids: Set[str]
+        feedback_jid: jid.JID
+
+        if is_muc_message:
+            if self.__xep_0045 is None:
+                raise exceptions.InternalError(
+                    "Encryption of MUC message requested, but plugin XEP-0045 is not"
+                    " available."
+                )
+
+            if stanza_id is None:
+                raise exceptions.InternalError(
+                    "Encryption of MUC message requested, but stanza id not available."
+                )
+
+            room_jid = feedback_jid = recipient_jid.userhostJID()
+
+            recipient_bare_jids = self.__get_joined_muc_users(
+                client,
+                self.__xep_0045,
+                room_jid
+            )
+
+            muc_plaintext_cache_key = MUCPlaintextCacheKey(
+                client=client,
+                room_jid=room_jid,
+                message_uid=stanza_id
+            )
+        else:
+            recipient_bare_jids = { recipient_jid.userhost() }
+            feedback_jid = recipient_jid.userhostJID()
 
-        prekeys_elt = bundle_elt.addElement('prekeys')
-        for otpk in bundle["otpks"]:
-            preKeyPublic_elt = prekeys_elt.addElement(
-                'preKeyPublic',
-                content=b64enc(otpk["key"]))
-            preKeyPublic_elt['preKeyId'] = str(otpk['id'])
+        log.debug(
+            f"Intercepting message that is to be encrypted by {namespace} for"
+            f" {recipient_bare_jids}"
+        )
+
+        def prepare_stanza() -> Optional[bytes]:
+            """Prepares the stanza for encryption.
+
+            Does so by removing all parts that are not supposed to be sent in plain. Also
+            extracts/prepares the plaintext to encrypt.
+
+            @return: The plaintext to encrypt. Returns ``None`` in case body-only
+                encryption is requested and no body was found. The function should
+                gracefully return in that case, i.e. it's not a critical error that should
+                abort the message sending flow.
+            """
+
+            if namespace == twomemo.twomemo.NAMESPACE:
+                return self.__xep_0420.pack_stanza(
+                    OMEMO.SCE_PROFILE_GROUPCHAT if is_muc_message else OMEMO.SCE_PROFILE,
+                    stanza
+                )
+
+            if namespace == oldmemo.oldmemo.NAMESPACE:
+                plaintext: Optional[bytes] = None
+
+                for child in stanza.elements():
+                    if child.name == "body" and plaintext is None:
+                        plaintext = str(child).encode("utf-8")
 
-        node = NS_OMEMO_BUNDLE.format(device_id=device_id)
+                    # Any other sensitive elements to remove here?
+                    if child.name in { "body", "html" }:
+                        stanza.children.remove(child)
+
+                if plaintext is None:
+                    log.warning(
+                        "No body found in intercepted message to be encrypted with"
+                        " oldmemo."
+                    )
+
+                return plaintext
+
+            return assert_never(namespace)
+
+        # The stanza/plaintext preparation was moved into its own little function for type
+        # safety reasons.
+        plaintext = prepare_stanza()
+        if plaintext is None:
+            return
+
+        log.debug(f"Plaintext to encrypt: {plaintext}")
+
+        session_manager = await self.__prepare_for_profile(client.profile)
+
         try:
-            await self._p.sendItem(
-                client, None, node, bundle_elt, item_id=self._p.ID_SINGLETON,
-                extra={
-                    self._p.EXTRA_PUBLISH_OPTIONS: {self._p.OPT_MAX_ITEMS: 1},
-                    self._p.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options",
-                }
+            messages, encryption_errors = await session_manager.encrypt(
+                frozenset(recipient_bare_jids),
+                { namespace: plaintext },
+                backend_priority_order=[ namespace ],
+                identifier=feedback_jid.userhost()
             )
         except Exception as e:
-            log.warning(_("Can't set bundle: {reason}").format(reason=e))
-
-    ## PEP node events callbacks
-
-    async def onNewDevices(self, itemsEvent, profile):
-        log.debug("devices list has been updated")
-        client = self.host.getClient(profile)
-        try:
-            omemo_session = client._xep_0384_session
-        except AttributeError:
-            await client._xep_0384_ready
-            omemo_session = client._xep_0384_session
-        entity = itemsEvent.sender
-
-        devices = self.parseDevices(itemsEvent.items)
-        await omemo_session.newDeviceList(entity, devices)
-
-        if entity == client.jid.userhostJID():
-            own_device = client._xep_0384_device_id
-            if own_device not in devices:
-                log.warning(_("Our own device is missing from devices list, fixing it"))
-                devices.add(own_device)
-                await self.setDevices(client, devices)
-
-    ## triggers
-
-    async def policyBTBV(self, client, feedback_jid, expect_problems, undecided):
-        session = client._xep_0384_session
-        stored_data = client._xep_0384_data
-        for pb in undecided.values():
-            peer_jid = jid.JID(pb.bare_jid)
-            device = pb.device
-            ik = pb.ik
-            key = f"{KEY_AUTO_TRUST}\n{pb.bare_jid}"
-            auto_trusted = await stored_data.get(key, default=set())
-            auto_trusted.add(device)
-            await stored_data.aset(key, auto_trusted)
-            await session.setTrust(peer_jid, device, ik, True)
-
-        user_msg =  D_(
-            "Not all destination devices are trusted, unknown devices will be blind "
-            "trusted due to the OMEMO Blind Trust Before Verification policy. If you "
-            "want a more secure workflow, please activate \"manual\" OMEMO policy in "
-            "settings' \"Security\" tab.\nFollowing fingerprint have been automatically "
-            "trusted:\n{devices}"
-        ).format(
-            devices = ', '.join(
-                f"- {pb.device} ({pb.bare_jid}): {pb.ik.hex().upper()}"
-                for pb in undecided.values()
+            msg = _(
+                # pylint: disable=consider-using-f-string
+                "Can't encrypt message for {entities}: {reason}".format(
+                    entities=', '.join(recipient_bare_jids),
+                    reason=e
+                )
             )
-        )
-        client.feedback(feedback_jid, user_msg)
-
-    async def policyManual(self, client, feedback_jid, expect_problems, undecided):
-        trust_data = {}
-        for trust_id, data in undecided.items():
-            trust_data[trust_id] = {
-                'jid': jid.JID(data.bare_jid),
-                'device':  data.device,
-                'ik': data.ik}
-
-        user_msg =  D_("Not all destination devices are trusted, we can't encrypt "
-                       "message in such a situation. Please indicate if you trust "
-                       "those devices or not in the trust manager before we can "
-                       "send this message")
-        client.feedback(feedback_jid, user_msg)
-        xmlui = await self.getTrustUI(client, trust_data=trust_data, submit_id="")
-
-        answer = await xml_tools.deferXMLUI(
-            self.host,
-            xmlui,
-            action_extra={
-                "meta_encryption_trust": NS_OMEMO,
-            },
-            profile=client.profile)
-        await self.trustUICb(answer, trust_data, expect_problems, client.profile)
-
-    async def handleProblems(
-        self, client, feedback_jid, bundles, expect_problems, problems):
-        """Try to solve problems found by EncryptMessage
-
-        @param feedback_jid(jid.JID): bare jid where the feedback message must be sent
-        @param bundles(dict): bundles data as used in EncryptMessage
-            already filled with known bundles, missing bundles
-            need to be added to it
-            This dict is updated
-        @param problems(list): exceptions raised by EncryptMessage
-        @param expect_problems(dict): known problems to expect, used in encryptMessage
-            This dict will list devices where problems can be ignored
-            (those devices won't receive the encrypted data)
-            This dict is updated
-        """
-        # FIXME: not all problems are handled yet
-        undecided = {}
-        missing_bundles = {}
-        found_bundles = None
-        cache = client._xep_0384_cache
-        for problem in problems:
-            if isinstance(problem, omemo_excpt.TrustException):
-                if problem.problem == 'undecided':
-                    undecided[str(hash(problem))] = problem
-                elif problem.problem == 'untrusted':
-                    expect_problems.setdefault(problem.bare_jid, set()).add(
-                        problem.device)
-                    log.info(_(
-                        "discarding untrusted device {device_id} with key {device_key} "
-                        "for {entity}").format(
-                            device_id=problem.device,
-                            device_key=problem.ik.hex().upper(),
-                            entity=problem.bare_jid,
-                        )
-                    )
-                else:
-                    log.error(
-                        f"Unexpected trust problem: {problem.problem!r} for device "
-                        f"{problem.device} for {problem.bare_jid}, ignoring device")
-                    expect_problems.setdefault(problem.bare_jid, set()).add(
-                        problem.device)
-            elif isinstance(problem, omemo_excpt.MissingBundleException):
-                pb_entity = jid.JID(problem.bare_jid)
-                entity_cache = cache.setdefault(pb_entity, {})
-                entity_bundles = bundles.setdefault(pb_entity, {})
-                if problem.device in entity_cache:
-                    entity_bundles[problem.device] = entity_cache[problem.device]
-                else:
-                    found_bundles, missing = await self.getBundles(
-                        client, pb_entity, [problem.device])
-                    entity_cache.update(bundles)
-                    entity_bundles.update(found_bundles)
-                    if problem.device in missing:
-                        missing_bundles.setdefault(pb_entity, set()).add(
-                            problem.device)
-                        expect_problems.setdefault(problem.bare_jid, set()).add(
-                            problem.device)
-            elif isinstance(problem, omemo_excpt.NoEligibleDevicesException):
-                if undecided or found_bundles:
-                    # we may have new devices after this run, so let's continue for now
-                    continue
-                else:
-                    raise problem
-            else:
-                raise problem
-
-        for peer_jid, devices in missing_bundles.items():
-            devices_s = [str(d) for d in devices]
-            log.warning(
-                _("Can't retrieve bundle for device(s) {devices} of entity {peer}, "
-                  "the message will not be readable on this/those device(s)").format(
-                    devices=", ".join(devices_s), peer=peer_jid.full()))
-            client.feedback(
-                feedback_jid,
-                D_("You're destinee {peer} has missing encryption data on some of "
-                   "his/her device(s) (bundle on device {devices}), the message won't  "
-                   "be readable on this/those device.").format(
-                   peer=peer_jid.full(), devices=", ".join(devices_s)))
-
-        if undecided:
-            omemo_policy = self.host.memory.getParamA(
-                PARAM_NAME, PARAM_CATEGORY, profile_key=client.profile
-            )
-            if omemo_policy == 'btbv':
-                # we first separate entities which have been trusted manually
-                manual_trust = await client._xep_0384_data.get(KEY_MANUAL_TRUST)
-                if manual_trust:
-                    manual_undecided = {}
-                    for hash_, pb in undecided.items():
-                        if pb.bare_jid in manual_trust:
-                            manual_undecided[hash_] = pb
-                    for hash_ in manual_undecided:
-                        del undecided[hash_]
-                else:
-                    manual_undecided = None
-
-                if undecided:
-                    # we do the automatic trust here
-                    await self.policyBTBV(
-                        client, feedback_jid, expect_problems, undecided)
-                if manual_undecided:
-                    # here user has to manually trust new devices from entities already
-                    # verified
-                    await self.policyManual(
-                        client, feedback_jid, expect_problems, manual_undecided)
-            elif omemo_policy == 'manual':
-                await self.policyManual(
-                    client, feedback_jid, expect_problems, undecided)
-            else:
-                raise exceptions.InternalError(f"Unexpected OMEMO policy: {omemo_policy}")
-
-    async def encryptMessage(self, client, entity_bare_jids, message, feedback_jid=None):
-        if feedback_jid is None:
-            if len(entity_bare_jids) != 1:
-                log.error(
-                    "feedback_jid must be provided when message is encrypted for more "
-                    "than one entities")
-                feedback_jid = entity_bare_jids[0]
-        omemo_session = client._xep_0384_session
-        expect_problems = {}
-        bundles = {}
-        loop_idx = 0
-        try:
-            while True:
-                if loop_idx > 10:
-                    msg = _("Too many iterations in encryption loop")
-                    log.error(msg)
-                    raise exceptions.InternalError(msg)
-                # encryptMessage may fail, in case of e.g. trust issue or missing bundle
-                try:
-                    if not message:
-                        encrypted = await omemo_session.encryptRatchetForwardingMessage(
-                            entity_bare_jids,
-                            bundles,
-                            expect_problems = expect_problems)
-                    else:
-                        encrypted = await omemo_session.encryptMessage(
-                            entity_bare_jids,
-                            message,
-                            bundles,
-                            expect_problems = expect_problems)
-                except omemo_excpt.EncryptionProblemsException as e:
-                    # we know the problem to solve, we can try to fix them
-                    await self.handleProblems(
-                        client,
-                        feedback_jid=feedback_jid,
-                        bundles=bundles,
-                        expect_problems=expect_problems,
-                        problems=e.problems)
-                    loop_idx += 1
-                else:
-                    break
-        except Exception as e:
-            msg = _("Can't encrypt message for {entities}: {reason}".format(
-                entities=', '.join(e.full() for e in entity_bare_jids), reason=e))
             log.warning(msg)
-            extra = {C.MESS_EXTRA_INFO: C.EXTRA_INFO_ENCR_ERR}
-            client.feedback(feedback_jid, msg, extra)
+            client.feedback(feedback_jid, msg, {
+                C.MESS_EXTRA_INFO: C.EXTRA_INFO_ENCR_ERR
+            })
             raise e
 
-        defer.returnValue(encrypted)
-
-    @defer.inlineCallbacks
-    def _messageReceivedTrigger(self, client, message_elt, post_treat):
-        try:
-            encrypted_elt = next(message_elt.elements(NS_OMEMO, "encrypted"))
-        except StopIteration:
-            # no OMEMO message here
-            defer.returnValue(True)
-
-        # we have an encrypted message let's decrypt it
-
-        from_jid = jid.JID(message_elt['from'])
+        if len(encryption_errors) > 0:
+            log.warning(
+                f"Ignored the following non-critical encryption errors:"
+                f" {encryption_errors}"
+            )
 
-        if message_elt.getAttribute("type") == C.MESS_TYPE_GROUPCHAT:
-            # with group chat, we must get the real jid for decryption
-            # and use the room as feedback_jid
-
-            if self._m is None:
-                # plugin XEP-0045 (MUC) is not available
-                defer.returnValue(True)
-
-            room_jid = from_jid.userhostJID()
-            feedback_jid = room_jid
-            if self._sid is not None:
-                mess_id = self._sid.getOriginId(message_elt)
-            else:
-                mess_id = None
-
-            if mess_id is None:
-                mess_id = message_elt.getAttribute('id')
-            cache_key = (room_jid, mess_id)
+            encrypted_errors_stringified = ", ".join([
+                f"device {err.device_id} of {err.bare_jid} under namespace"
+                f" {err.namespace}"
+                for err
+                in encryption_errors
+            ])
 
-            try:
-                room = self._m.getRoom(client, room_jid)
-            except exceptions.NotFound:
-                log.warning(
-                    f"Received an OMEMO encrypted msg from a room {room_jid} which has "
-                    f"not been joined, ignoring")
-                defer.returnValue(True)
-
-            user = room.getUser(from_jid.resource)
-            if user is None:
-                log.warning(f"Can't find user {user} in room {room_jid}, ignoring")
-                defer.returnValue(True)
-            if not user.entity:
-                log.warning(
-                    f"Real entity of user {user} in room {room_jid} can't be established,"
-                    f" OMEMO encrypted message can't be decrypted")
-                defer.returnValue(True)
-
-            # now we have real jid of the entity, we use it instead of from_jid
-            from_jid = user.entity.userhostJID()
+            client.feedback(
+                feedback_jid,
+                D_(
+                    "There were non-critical errors during encryption resulting in some"
+                    " of your destinees' devices potentially not receiving the message."
+                    " This happens when the encryption data/key material of a device is"
+                    " incomplete or broken, which shouldn't happen for actively used"
+                    " devices, and can usually be ignored. The following devices are"
+                    f" affected: {encrypted_errors_stringified}."
+                )
+            )
 
-        else:
-            # we have a one2one message, we can user "from" and "to" normally
-
-            if from_jid.userhostJID() == client.jid.userhostJID():
-                feedback_jid = jid.JID(message_elt['to'])
-            else:
-                feedback_jid = from_jid
-
-
-        if (message_elt.getAttribute("type") == C.MESS_TYPE_GROUPCHAT
-            and mess_id is not None
-            and cache_key in client._xep_0384_muc_cache):
-            plaintext = client._xep_0384_muc_cache.pop(cache_key)
-            if not client._xep_0384_muc_cache:
-                client._xep_0384_muc_cache_timer.cancel()
-                client._xep_0384_muc_cache_timer = None
-        else:
-            try:
-                omemo_session = client._xep_0384_session
-            except AttributeError:
-                # on startup, message can ve received before session actually exists
-                # so we need to synchronise here
-                yield client._xep_0384_ready
-                omemo_session = client._xep_0384_session
+        message = next(message for message in messages if message.namespace == namespace)
 
-            device_id = client._xep_0384_device_id
-            try:
-                header_elt = next(encrypted_elt.elements(NS_OMEMO, 'header'))
-                iv_elt = next(header_elt.elements(NS_OMEMO, 'iv'))
-            except StopIteration:
-                log.warning(_("Invalid OMEMO encrypted stanza, ignoring: {xml}")
-                    .format(xml=message_elt.toXml()))
-                defer.returnValue(False)
-            try:
-                s_device_id = header_elt['sid']
-            except KeyError:
-                log.warning(_("Invalid OMEMO encrypted stanza, missing sender device ID, "
-                              "ignoring: {xml}")
-                    .format(xml=message_elt.toXml()))
-                defer.returnValue(False)
-            try:
-                key_elt = next((e for e in header_elt.elements(NS_OMEMO, 'key')
-                                if int(e['rid']) == device_id))
-            except StopIteration:
-                log.warning(_("This OMEMO encrypted stanza has not been encrypted "
-                              "for our device (device_id: {device_id}, fingerprint: "
-                              "{fingerprint}): {xml}").format(
-                              device_id=device_id,
-                              fingerprint=omemo_session.public_bundle.ik.hex().upper(),
-                              xml=encrypted_elt.toXml()))
-                user_msg = (D_("An OMEMO message from {sender} has not been encrypted for "
-                               "our device, we can't decrypt it").format(
-                               sender=from_jid.full()))
-                extra = {C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR}
-                client.feedback(feedback_jid, user_msg, extra)
-                defer.returnValue(False)
-            except ValueError as e:
-                log.warning(_("Invalid recipient ID: {msg}".format(msg=e)))
-                defer.returnValue(False)
-            is_pre_key = C.bool(key_elt.getAttribute('prekey', 'false'))
-            payload_elt = next(encrypted_elt.elements(NS_OMEMO, 'payload'), None)
-            additional_information = {
-                "from_storage": bool(message_elt.delay)
-            }
+        if namespace == twomemo.twomemo.NAMESPACE:
+            # Add the encrypted element
+            stanza.addChild(etree_to_domish(twomemo.etree.serialize_message(message)))
 
-            kwargs = {
-                "bare_jid": from_jid.userhostJID(),
-                "device": s_device_id,
-                "iv": base64.b64decode(bytes(iv_elt)),
-                "message": base64.b64decode(bytes(key_elt)),
-                "is_pre_key_message": is_pre_key,
-                "additional_information":  additional_information,
-            }
+        if namespace == oldmemo.oldmemo.NAMESPACE:
+            # Add the encrypted element
+            stanza.addChild(etree_to_domish(oldmemo.etree.serialize_message(message)))
+
+        if muc_plaintext_cache_key is not None:
+            self.__muc_plaintext_cache[muc_plaintext_cache_key] = plaintext
 
-            try:
-                if payload_elt is None:
-                    omemo_session.decryptRatchetForwardingMessage(**kwargs)
-                    plaintext = None
-                else:
-                    kwargs["ciphertext"] = base64.b64decode(bytes(payload_elt))
-                    try:
-                        plaintext = yield omemo_session.decryptMessage(**kwargs)
-                    except omemo_excpt.TrustException:
-                        post_treat.addCallback(client.encryption.markAsUntrusted)
-                        kwargs['allow_untrusted'] = True
-                        plaintext = yield omemo_session.decryptMessage(**kwargs)
-                    else:
-                        post_treat.addCallback(client.encryption.markAsTrusted)
-                    plaintext = plaintext.decode()
-            except Exception as e:
-                log.warning(_("Can't decrypt message: {reason}\n{xml}").format(
-                    reason=e, xml=message_elt.toXml()))
-                user_msg = (D_(
-                    "An OMEMO message from {sender} can't be decrypted: {reason}")
-                    .format(sender=from_jid.full(), reason=e))
-                extra = {C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR}
-                client.feedback(feedback_jid, user_msg, extra)
-                defer.returnValue(False)
-            finally:
-                if omemo_session.republish_bundle:
-                    # we don't wait for the Deferred (i.e. no yield) on purpose
-                    # there is no need to block the whole message workflow while
-                    # updating the bundle
-                    defer.ensureDeferred(
-                        self.setBundle(client, omemo_session.public_bundle, device_id)
-                    )
-
-        message_elt.children.remove(encrypted_elt)
-        if plaintext:
-            message_elt.addElement("body", content=plaintext)
-        post_treat.addCallback(client.encryption.markAsEncrypted, namespace=NS_OMEMO)
-        defer.returnValue(True)
+    async def __on_device_list_update(
+        self,
+        items_event: pubsub.ItemsEvent,
+        profile: str
+    ) -> None:
+        """Handle device list updates fired by PEP.
 
-    def getJIDsForRoom(self, client, room_jid):
-        if self._m is None:
-            exceptions.InternalError("XEP-0045 plugin missing, can't encrypt for group chat")
-        room = self._m.getRoom(client, room_jid)
-        return [u.entity.userhostJID() for u in room.roster.values()]
-
-    def _expireMUCCache(self, client):
-        client._xep_0384_muc_cache_timer = None
-        for (room_jid, uid), msg in client._xep_0384_muc_cache.items():
-            client.feedback(
-                room_jid,
-                D_("Our message with UID {uid} has not been received in time, it has "
-                   "probably been lost. The message was: {msg!r}").format(
-                    uid=uid, msg=str(msg)))
-
-        client._xep_0384_muc_cache.clear()
-        log.warning("Cache for OMEMO MUC has expired")
+        @param items_event: The event.
+        @param profile: The profile this event belongs to.
+        """
 
-    @defer.inlineCallbacks
-    def _sendMessageDataTrigger(self, client, mess_data):
-        encryption = mess_data.get(C.MESS_KEY_ENCRYPTION)
-        if encryption is None or encryption['plugin'].namespace != NS_OMEMO:
-            return
-        message_elt = mess_data["xml"]
-        if mess_data['type'] == C.MESS_TYPE_GROUPCHAT:
-            feedback_jid = room_jid = mess_data['to']
-            to_jids = self.getJIDsForRoom(client, room_jid)
-        else:
-            feedback_jid = to_jid = mess_data["to"].userhostJID()
-            to_jids = [to_jid]
-        log.debug("encrypting message")
-        body = None
-        for child in list(message_elt.children):
-            if child.name == "body":
-                # we remove all unencrypted body,
-                # and will only encrypt the first one
-                if body is None:
-                    body = child
-                message_elt.children.remove(child)
-            elif child.name == "html":
-                # we don't want any XHTML-IM element
-                message_elt.children.remove(child)
+        sender = cast(jid.JID, items_event.sender)
+        items = cast(List[domish.Element], items_event.items)
 
-        if body is None:
-            log.warning("No message found")
+        if len(items) > 1:
+            log.warning("Ignoring device list update with more than one element.")
             return
 
-        body = str(body)
+        item = next(iter(items), None)
+        if item is None:
+            log.debug("Ignoring empty device list update.")
+            return
+
+        item_elt = domish_to_etree(item)
 
-        if mess_data['type'] == C.MESS_TYPE_GROUPCHAT:
-            key = (room_jid, mess_data['uid'])
-            # XXX: we can't encrypt message for our own device for security reason
-            #      so we keep the plain text version in cache until we receive the
-            #      message. We don't send it directly to bridge to keep a workflow
-            #      similar to plain text MUC, so when we see it in frontend we know
-            #      that it has been sent correctly.
-            client._xep_0384_muc_cache[key] = body
-            timer = client._xep_0384_muc_cache_timer
-            if timer is None:
-                client._xep_0384_muc_cache_timer = reactor.callLater(
-                    MUC_CACHE_TTL, self._expireMUCCache, client)
+        device_list: Dict[int, Optional[str]] = {}
+        namespace: Optional[str] = None
+
+        list_elt = item_elt.find(f"{{{twomemo.twomemo.NAMESPACE}}}devices")
+        if list_elt is not None:
+            try:
+                device_list = twomemo.etree.parse_device_list(list_elt)
+            except XMLSchemaValidationError:
+                pass
             else:
-                timer.reset(MUC_CACHE_TTL)
-
-        encryption_data = yield defer.ensureDeferred(self.encryptMessage(
-            client, to_jids, body, feedback_jid=feedback_jid))
+                namespace = twomemo.twomemo.NAMESPACE
 
-        encrypted_elt = message_elt.addElement((NS_OMEMO, 'encrypted'))
-        header_elt = encrypted_elt.addElement('header')
-        header_elt['sid'] = str(encryption_data['sid'])
+        list_elt = item_elt.find(f"{{{oldmemo.oldmemo.NAMESPACE}}}list")
+        if list_elt is not None:
+            try:
+                device_list = oldmemo.etree.parse_device_list(list_elt)
+            except XMLSchemaValidationError:
+                pass
+            else:
+                namespace = oldmemo.oldmemo.NAMESPACE
 
-        for key_data in encryption_data['keys'].values():
-            for rid, data in key_data.items():
-                key_elt = header_elt.addElement(
-                    'key',
-                    content=b64enc(data['data'])
-                )
-                key_elt['rid'] = str(rid)
-                if data['pre_key']:
-                    key_elt['prekey'] = 'true'
+        if namespace is None:
+            log.warning(
+                f"Malformed device list update item:"
+                f" {ET.tostring(item_elt, encoding='unicode')}"
+            )
+            return
 
-        header_elt.addElement(
-            'iv',
-            content=b64enc(encryption_data['iv']))
-        try:
-            encrypted_elt.addElement(
-                'payload',
-                content=b64enc(encryption_data['payload']))
-        except KeyError:
-            pass
+        session_manager = await self.__prepare_for_profile(profile)
+
+        await session_manager.update_device_list(
+            namespace,
+            sender.userhost(),
+            device_list
+        )