diff sat/plugins/plugin_xep_0384.py @ 3967:f461f11ea176

plugin XEP-0384: Implementation of Automatic Trust Management: - Implementation of Trust Messages (XEP-0434) - Implementation of Automatic Trust Management (XEP-0450) - Implementations directly as part of the OMEMO plugin, since omemo:2 is the only protocol supported by ATM at the moment - Trust system selection updated to allow choice between manual trust with ATM and BTBV - dev-requirements.txt updated to include additional requirements for the e2e tests fix 376
author Syndace <me@syndace.dev>
date Fri, 28 Oct 2022 18:50:06 +0200
parents 748094d5a74d
children 8e7d5796fb23
line wrap: on
line diff
--- a/sat/plugins/plugin_xep_0384.py	Sun Oct 30 01:06:58 2022 +0200
+++ b/sat/plugins/plugin_xep_0384.py	Fri Oct 28 18:50:06 2022 +0200
@@ -16,18 +16,20 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
+import base64
+from datetime import datetime
 import enum
 import logging
 import time
-from typing import (
-    Any, Callable, Dict, FrozenSet, List, Literal, NamedTuple, Optional, Set, Type, cast
-)
+from typing import \
+    Any, Dict, FrozenSet, List, Literal, NamedTuple, Optional, Set, Type, cast
 import uuid
 import xml.etree.ElementTree as ET
 from xml.sax.saxutils import quoteattr
 
-from typing_extensions import Never, assert_never
+from typing_extensions import Final, Never, assert_never
 from wokkel import muc, pubsub  # type: ignore[import]
+import xmlschema
 
 from sat.core import exceptions
 from sat.core.constants import Const as C
@@ -43,7 +45,8 @@
 from sat.plugins.plugin_xep_0163 import XEP_0163
 from sat.plugins.plugin_xep_0334 import XEP_0334
 from sat.plugins.plugin_xep_0359 import XEP_0359
-from sat.plugins.plugin_xep_0420 import XEP_0420, SCEAffixPolicy, SCEProfile
+from sat.plugins.plugin_xep_0420 import \
+    XEP_0420, SCEAffixPolicy, SCEAffixValues, SCEProfile
 from sat.tools import xml_tools
 from twisted.internet import defer
 from twisted.words.protocols.jabber import error, jid
@@ -78,9 +81,6 @@
 log = cast(Logger, getLogger(__name__))  # type: ignore[no-untyped-call]
 
 
-string_to_domish = cast(Callable[[str], domish.Element], xml_tools.ElementParser())
-
-
 PLUGIN_INFO = {
     C.PI_NAME: "OMEMO",
     C.PI_IMPORT_NAME: "XEP-0384",
@@ -134,68 +134,10 @@
     message_uid: str
 
 
-# TODO: Convert without serialization/parsing
-# On a medium-to-large-sized oldmemo message stanza, 10000 runs of this function took
-# around 0.6 seconds on my setup.
-def etree_to_domish(element: ET.Element) -> domish.Element:
-    """
-    @param element: An ElementTree element.
-    @return: The ElementTree element converted to a domish element.
-    """
-
-    return string_to_domish(ET.tostring(element, encoding="unicode"))
-
-
-# TODO: Convert without serialization/parsing
-# On a medium-to-large-sized oldmemo message stanza, 10000 runs of this function took less
-# than one second on my setup.
-def domish_to_etree(element: domish.Element) -> ET.Element:
-    """
-    @param element: A domish element.
-    @return: The domish element converted to an ElementTree element.
-    """
-
-    return ET.fromstring(element.toXml())
-
-
-def domish_to_etree2(element: domish.Element) -> ET.Element:
-    """
-    WIP
-    """
-
-    element_name = element.name
-    if element.uri is not None:
-        element_name = "{" + element.uri + "}" + element_name
-
-    attrib: Dict[str, str] = {}
-    for qname, value in element.attributes.items():
-        attribute_name = qname[1] if isinstance(qname, tuple) else qname
-        attribute_namespace = qname[0] if isinstance(qname, tuple) else None
-        if attribute_namespace is not None:
-            attribute_name = "{" + attribute_namespace + "}" + attribute_name
-
-        attrib[attribute_name] = value
-
-    result = ET.Element(element_name, attrib)
-
-    last_child: Optional[ET.Element] = None
-    for child in element.children:
-        if isinstance(child, str):
-            if last_child is None:
-                result.text = child
-            else:
-                last_child.tail = child
-        else:
-            last_child = domish_to_etree2(child)
-            result.append(last_child)
-
-    return result
-
-
 @enum.unique
 class TrustLevel(enum.Enum):
     """
-    The trust levels required for BTBV and manual trust.
+    The trust levels required for ATM and BTBV.
     """
 
     TRUSTED: str = "TRUSTED"
@@ -203,20 +145,6 @@
     UNDECIDED: str = "UNDECIDED"
     DISTRUSTED: str = "DISTRUSTED"
 
-    def to_omemo_trust_level(self) -> omemo.TrustLevel:
-        """
-        @return: This custom trust level evaluated to one of the OMEMO trust levels.
-        """
-
-        if self is TrustLevel.TRUSTED or self is TrustLevel.BLINDLY_TRUSTED:
-            return omemo.TrustLevel.TRUSTED
-        if self is TrustLevel.UNDECIDED:
-            return omemo.TrustLevel.UNDECIDED
-        if self is TrustLevel.DISTRUSTED:
-            return omemo.TrustLevel.DISTRUSTED
-
-        return assert_never(self)
-
 
 TWOMEMO_DEVICE_LIST_NODE = "urn:xmpp:omemo:2:devices"
 OLDMEMO_DEVICE_LIST_NODE = "eu.siacs.conversations.axolotl.devicelist"
@@ -431,7 +359,8 @@
             f" {namespace}: Unexpected number of items retrieved: {len(items)}."
         )
 
-    element = next(iter(xml_tools.domish_elt_2_et_elt(cast(domish.Element, items[0]))), None)
+    element = \
+        next(iter(xml_tools.domish_elt_2_et_elt(cast(domish.Element, items[0]))), None)
     if element is None:
         raise omemo.BundleDownloadFailed(
             f"Bundle download failed for {bare_jid}: {device_id} under namespace"
@@ -447,6 +376,433 @@
         ) from e
 
 
+# ATM only supports protocols based on SCE, which is currently only omemo:2, and relies on
+# so many implementation details of the encryption protocol that it makes more sense to
+# add ATM to the OMEMO plugin directly instead of having it a separate Libervia plugin.
+NS_TM: Final = "urn:xmpp:tm:1"
+NS_ATM: Final = "urn:xmpp:atm:1"
+
+
+TRUST_MESSAGE_SCHEMA = xmlschema.XMLSchema("""<?xml version='1.0' encoding='UTF-8'?>
+<xs:schema xmlns:xs='http://www.w3.org/2001/XMLSchema'
+           targetNamespace='urn:xmpp:tm:1'
+           xmlns='urn:xmpp:tm:1'
+           elementFormDefault='qualified'>
+
+  <xs:element name='trust-message'>
+    <xs:complexType>
+      <xs:sequence>
+        <xs:element ref='key-owner' minOccurs='1' maxOccurs='unbounded'/>
+      </xs:sequence>
+      <xs:attribute name='usage' type='xs:string' use='required'/>
+      <xs:attribute name='encryption' type='xs:string' use='required'/>
+    </xs:complexType>
+  </xs:element>
+
+  <xs:element name='key-owner'>
+    <xs:complexType>
+      <xs:sequence>
+        <xs:element
+            name='trust' type='xs:base64Binary' minOccurs='0' maxOccurs='unbounded'/>
+        <xs:element
+            name='distrust' type='xs:base64Binary' minOccurs='0' maxOccurs='unbounded'/>
+      </xs:sequence>
+      <xs:attribute name='jid' type='xs:string' use='required'/>
+    </xs:complexType>
+  </xs:element>
+</xs:schema>
+""")
+
+
+# This is compatible with omemo:2's SCE profile
+TM_SCE_PROFILE = SCEProfile(
+    rpad_policy=SCEAffixPolicy.REQUIRED,
+    time_policy=SCEAffixPolicy.REQUIRED,
+    to_policy=SCEAffixPolicy.OPTIONAL,
+    from_policy=SCEAffixPolicy.OPTIONAL,
+    custom_policies={}
+)
+
+
+class TrustUpdate(NamedTuple):
+    # pylint: disable=invalid-name
+    """
+    An update to the trust status of an identity key, used by Automatic Trust Management.
+    """
+
+    target_jid: jid.JID
+    target_key: bytes
+    target_trust: bool
+
+
+class TrustMessageCacheEntry(NamedTuple):
+    # pylint: disable=invalid-name
+    """
+    An entry in the trust message cache used by ATM.
+    """
+
+    sender_jid: jid.JID
+    sender_key: bytes
+    timestamp: datetime
+    trust_update: TrustUpdate
+
+
+class PartialTrustMessage(NamedTuple):
+    # pylint: disable=invalid-name
+    """
+    A structure representing a partial trust message, used by :func:`send_trust_messages`
+    to build trust messages.
+    """
+
+    recipient_jid: jid.JID
+    updated_jid: jid.JID
+    trust_updates: FrozenSet[TrustUpdate]
+
+
+async def manage_trust_message_cache(
+    client: SatXMPPClient,
+    session_manager: omemo.SessionManager,
+    applied_trust_updates: FrozenSet[TrustUpdate]
+) -> None:
+    """Manage the ATM trust message cache after trust updates have been applied.
+
+    @param client: The client this operation runs under.
+    @param session_manager: The session manager to use.
+    @param applied_trust_updates: The trust updates that have already been applied,
+        triggering this cache management run.
+    """
+
+    trust_message_cache = persistent.LazyPersistentBinaryDict(
+        "XEP-0384/TM",
+        client.profile
+    )
+
+    # Load cache entries
+    cache_entries = cast(
+        Set[TrustMessageCacheEntry],
+        await trust_message_cache.get("cache", set())
+    )
+
+    # Expire cache entries that were overwritten by the applied trust updates
+    cache_entries_by_target = {
+        (
+            cache_entry.trust_update.target_jid.userhostJID(),
+            cache_entry.trust_update.target_key
+        ): cache_entry
+        for cache_entry
+        in cache_entries
+    }
+
+    for trust_update in applied_trust_updates:
+        cache_entry = cache_entries_by_target.get(
+            (trust_update.target_jid.userhostJID(), trust_update.target_key),
+            None
+        )
+
+        if cache_entry is not None:
+            cache_entries.remove(cache_entry)
+
+    # Apply cached Trust Messages by newly trusted devices
+    new_trust_updates: Set[TrustUpdate] = set()
+
+    for trust_update in applied_trust_updates:
+        if trust_update.target_trust:
+            # Iterate over a copy such that cache_entries can be modified
+            for cache_entry in set(cache_entries):
+                if (
+                    cache_entry.sender_jid.userhostJID()
+                    == trust_update.target_jid.userhostJID()
+                    and cache_entry.sender_key == trust_update.target_key
+                ):
+                    trust_level = (
+                        TrustLevel.TRUSTED
+                        if cache_entry.trust_update.target_trust
+                        else TrustLevel.DISTRUSTED
+                    )
+
+                    # Apply the trust update
+                    await session_manager.set_trust(
+                        cache_entry.trust_update.target_jid.userhost(),
+                        cache_entry.trust_update.target_key,
+                        trust_level.name
+                    )
+
+                    # Track the fact that this trust update has been applied
+                    new_trust_updates.add(cache_entry.trust_update)
+
+                    # Remove the corresponding cache entry
+                    cache_entries.remove(cache_entry)
+
+    # Store the updated cache entries
+    await trust_message_cache.force("cache", cache_entries)
+
+    # TODO: Notify the user ("feedback") about automatically updated trust?
+
+    if len(new_trust_updates) > 0:
+        # If any trust has been updated, recursively perform another run of cache
+        # management
+        await manage_trust_message_cache(
+            client,
+            session_manager,
+            frozenset(new_trust_updates)
+        )
+
+
+async def get_trust_as_trust_updates(
+    session_manager: omemo.SessionManager,
+    target_jid: jid.JID
+) -> FrozenSet[TrustUpdate]:
+    """Get the trust status of all known keys of a JID as trust updates for use with ATM.
+
+    @param session_manager: The session manager to load the trust from.
+    @param target_jid: The JID to load the trust for.
+    @return: The trust updates encoding the trust status of all known keys of the JID that
+        are either explicitly trusted or distrusted. Undecided keys are not included in
+        the trust updates.
+    """
+
+    devices = await session_manager.get_device_information(target_jid.userhost())
+
+    trust_updates: Set[TrustUpdate] = set()
+
+    for device in devices:
+        trust_level = TrustLevel(device.trust_level_name)
+        target_trust: bool
+
+        if trust_level is TrustLevel.TRUSTED:
+            target_trust = True
+        elif trust_level is TrustLevel.DISTRUSTED:
+            target_trust = False
+        else:
+            # Skip devices that are not explicitly trusted or distrusted
+            continue
+
+        trust_updates.add(TrustUpdate(
+            target_jid=target_jid.userhostJID(),
+            target_key=device.identity_key,
+            target_trust=target_trust
+        ))
+
+    return frozenset(trust_updates)
+
+
+async def send_trust_messages(
+    client: SatXMPPClient,
+    session_manager: omemo.SessionManager,
+    applied_trust_updates: FrozenSet[TrustUpdate]
+) -> None:
+    """Send information about updated trust to peers via ATM (XEP-0450).
+
+    @param client: The client.
+    @param session_manager: The session manager.
+    @param applied_trust_updates: The trust updates that have already been applied, to
+        notify other peers about.
+    """
+    # NOTE: This currently sends information about oldmemo trust too. This is not
+    # specified and experimental, but since twomemo and oldmemo share the same identity
+    # keys and trust systems, this could be a cool side effect.
+
+    # Send Trust Messages for newly trusted and distrusted devices
+    own_jid = client.jid.userhostJID()
+    own_trust_updates = await get_trust_as_trust_updates(session_manager, own_jid)
+
+    # JIDs of which at least one device's trust has been updated
+    updated_jids = frozenset({
+        trust_update.target_jid.userhostJID()
+        for trust_update
+        in applied_trust_updates
+    })
+
+    trust_messages: Set[PartialTrustMessage] = set()
+
+    for updated_jid in updated_jids:
+        # Get the trust updates for that JID
+        trust_updates = frozenset({
+            trust_update for trust_update in applied_trust_updates
+            if trust_update.target_jid.userhostJID() == updated_jid
+        })
+
+        if updated_jid == own_jid:
+            # If the own JID is updated, _all_ peers have to be notified
+            # TODO: Using my author's privilege here to shamelessly access private fields
+            # and storage keys until I've added public API to get a list of peers to
+            # python-omemo.
+            storage: omemo.Storage = getattr(session_manager, "_SessionManager__storage")
+            peer_jids = frozenset({
+                jid.JID(bare_jid).userhostJID() for bare_jid in (await storage.load_list(
+                    f"/{OMEMO.NS_TWOMEMO}/bare_jids",
+                    str
+                )).maybe([])
+            })
+
+            if len(peer_jids) == 0:
+                # If there are no peers to notify, notify our other devices about the
+                # changes directly
+                trust_messages.add(PartialTrustMessage(
+                    recipient_jid=own_jid,
+                    updated_jid=own_jid,
+                    trust_updates=trust_updates
+                ))
+            else:
+                # Otherwise, notify all peers about the changes in trust and let carbons
+                # handle the copy to our own JID
+                for peer_jid in peer_jids:
+                    trust_messages.add(PartialTrustMessage(
+                        recipient_jid=peer_jid,
+                        updated_jid=own_jid,
+                        trust_updates=trust_updates
+                    ))
+
+                    # Also send full trust information about _every_ peer to our newly
+                    # trusted devices
+                    peer_trust_updates = \
+                        await get_trust_as_trust_updates(session_manager, peer_jid)
+
+                    trust_messages.add(PartialTrustMessage(
+                        recipient_jid=own_jid,
+                        updated_jid=peer_jid,
+                        trust_updates=peer_trust_updates
+                    ))
+
+            # Send information about our own devices to our newly trusted devices
+            trust_messages.add(PartialTrustMessage(
+                recipient_jid=own_jid,
+                updated_jid=own_jid,
+                trust_updates=own_trust_updates
+            ))
+        else:
+            # Notify our other devices about the changes in trust
+            trust_messages.add(PartialTrustMessage(
+                recipient_jid=own_jid,
+                updated_jid=updated_jid,
+                trust_updates=trust_updates
+            ))
+
+            # Send a summary of our own trust to newly trusted devices
+            trust_messages.add(PartialTrustMessage(
+                recipient_jid=updated_jid,
+                updated_jid=own_jid,
+                trust_updates=own_trust_updates
+            ))
+
+    # All trust messages prepared. Merge all trust messages directed at the same
+    # recipient.
+    recipient_jids = { trust_message.recipient_jid for trust_message in trust_messages }
+
+    for recipient_jid in recipient_jids:
+        updated: Dict[jid.JID, Set[TrustUpdate]] = {}
+
+        for trust_message in trust_messages:
+            # Merge trust messages directed at that recipient
+            if trust_message.recipient_jid == recipient_jid:
+                # Merge the trust updates
+                updated[trust_message.updated_jid] = \
+                    updated.get(trust_message.updated_jid, set())
+
+                updated[trust_message.updated_jid] |= trust_message.trust_updates
+
+        # Build the trust message
+        trust_message_elt = domish.Element((NS_TM, "trust-message"))
+        trust_message_elt["usage"] = NS_ATM
+        trust_message_elt["encryption"] = twomemo.twomemo.NAMESPACE
+
+        for updated_jid, trust_updates in updated.items():
+            key_owner_elt = trust_message_elt.addElement((NS_TM, "key-owner"))
+            key_owner_elt["jid"] = updated_jid.userhost()
+
+            for trust_update in trust_updates:
+                serialized_identity_key = \
+                    base64.b64encode(trust_update.target_key).decode("ASCII")
+
+                if trust_update.target_trust:
+                    key_owner_elt.addElement(
+                        (NS_TM, "trust"),
+                        content=serialized_identity_key
+                    )
+                else:
+                    key_owner_elt.addElement(
+                        (NS_TM, "distrust"),
+                        content=serialized_identity_key
+                    )
+
+        # Finally, encrypt and send the trust message!
+        message_data = client.generateMessageXML(MessageData({
+            "from": own_jid,
+            "to": recipient_jid,
+            "uid": str(uuid.uuid4()),
+            "message": {},
+            "subject": {},
+            "type": C.MESS_TYPE_CHAT,
+            "extra": {},
+            "timestamp": time.time()
+        }))
+
+        message_data["xml"].addChild(trust_message_elt)
+
+        plaintext = XEP_0420.pack_stanza(TM_SCE_PROFILE, message_data["xml"])
+
+        feedback_jid = recipient_jid
+
+        # TODO: The following is mostly duplicate code
+        try:
+            messages, encryption_errors = await session_manager.encrypt(
+                frozenset({ own_jid.userhost(), recipient_jid.userhost() }),
+                { OMEMO.NS_TWOMEMO: plaintext },
+                backend_priority_order=[ OMEMO.NS_TWOMEMO ],
+                identifier=feedback_jid.userhost()
+            )
+        except Exception as e:
+            msg = _(
+                # pylint: disable=consider-using-f-string
+                "Can't encrypt message for {entities}: {reason}".format(
+                    entities=', '.join({ own_jid.userhost(), recipient_jid.userhost() }),
+                    reason=e
+                )
+            )
+            log.warning(msg)
+            client.feedback(feedback_jid, msg, {
+                C.MESS_EXTRA_INFO: C.EXTRA_INFO_ENCR_ERR
+            })
+            raise e
+
+        if len(encryption_errors) > 0:
+            log.warning(
+                f"Ignored the following non-critical encryption errors:"
+                f" {encryption_errors}"
+            )
+
+            encrypted_errors_stringified = ", ".join([
+                f"device {err.device_id} of {err.bare_jid} under namespace"
+                f" {err.namespace}"
+                for err
+                in encryption_errors
+            ])
+
+            client.feedback(
+                feedback_jid,
+                D_(
+                    "There were non-critical errors during encryption resulting in some"
+                    " of your destinees' devices potentially not receiving the message."
+                    " This happens when the encryption data/key material of a device is"
+                    " incomplete or broken, which shouldn't happen for actively used"
+                    " devices, and can usually be ignored. The following devices are"
+                    f" affected: {encrypted_errors_stringified}."
+                )
+            )
+
+        message = next(
+            message for message in messages
+            if message.namespace == OMEMO.NS_TWOMEMO
+        )
+
+        # Add the encrypted element
+        message_data["xml"].addChild(xml_tools.et_elt_2_domish_elt(
+            twomemo.etree.serialize_message(message)
+        ))
+
+        await client.a_send(message_data["xml"])
+
+
 def make_session_manager(sat: SAT, profile: str) -> Type[omemo.SessionManager]:
     """
     @param sat: The SAT instance.
@@ -705,13 +1061,18 @@
 
             if len(items) == 0:
                 return {}
-            elif len(items) != 1:
+
+            if len(items) != 1:
                 raise omemo.DeviceListDownloadFailed(
                     f"Device list download failed for {bare_jid} under namespace"
                     f" {namespace}: Unexpected number of items retrieved: {len(items)}."
                 )
 
-            element = next(iter(xml_tools.domish_elt_2_et_elt(cast(domish.Element, items[0]))), None)
+            element = next(
+                iter(xml_tools.domish_elt_2_et_elt(cast(domish.Element, items[0]))),
+                None
+            )
+
             if element is None:
                 raise omemo.DeviceListDownloadFailed(
                     f"Device list download failed for {bare_jid} under namespace"
@@ -732,15 +1093,62 @@
 
             raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}")
 
-        @staticmethod
-        def _evaluate_custom_trust_level(trust_level_name: str) -> omemo.TrustLevel:
+        async def _evaluate_custom_trust_level(
+            self,
+            device: omemo.DeviceInformation
+        ) -> omemo.TrustLevel:
+            # Get the custom trust level
             try:
-                return TrustLevel(trust_level_name).to_omemo_trust_level()
+                trust_level = TrustLevel(device.trust_level_name)
             except ValueError as e:
                 raise omemo.UnknownTrustLevel(
-                    f"Unknown trust level name {trust_level_name}"
+                    f"Unknown trust level name {device.trust_level_name}"
                 ) from e
 
+            # The first three cases are a straight-forward mapping
+            if trust_level is TrustLevel.TRUSTED:
+                return omemo.TrustLevel.TRUSTED
+            if trust_level is TrustLevel.UNDECIDED:
+                return omemo.TrustLevel.UNDECIDED
+            if trust_level is TrustLevel.DISTRUSTED:
+                return omemo.TrustLevel.DISTRUSTED
+
+            # The blindly trusted case is more complicated, since its evaluation depends
+            # on the trust system and phase
+            if trust_level is TrustLevel.BLINDLY_TRUSTED:
+                # Get the name of the active trust system
+                trust_system = cast(str, sat.memory.getParamA(
+                    PARAM_NAME,
+                    PARAM_CATEGORY,
+                    profile_key=profile
+                ))
+
+                # If the trust model is BTBV, blind trust is always enabled
+                if trust_system == "btbv":
+                    return omemo.TrustLevel.TRUSTED
+
+                # If the trust model is ATM, blind trust is disabled in the second phase
+                # and counts as undecided
+                if trust_system == "atm":
+                    # Find out whether we are in phase one or two
+                    devices = await self.get_device_information(device.bare_jid)
+
+                    phase_one = all(TrustLevel(device.trust_level_name) in {
+                        TrustLevel.UNDECIDED,
+                        TrustLevel.BLINDLY_TRUSTED
+                    } for device in devices)
+
+                    if phase_one:
+                        return omemo.TrustLevel.TRUSTED
+
+                    return omemo.TrustLevel.UNDECIDED
+
+                raise exceptions.InternalError(
+                    f"Unknown trust system active: {trust_system}"
+                )
+
+            assert_never(trust_level)
+
         async def _make_trust_decision(
             self,
             undecided: FrozenSet[omemo.DeviceInformation],
@@ -754,50 +1162,38 @@
             # The feedback JID is transferred via the identifier
             feedback_jid = jid.JID(identifier).userhostJID()
 
-            # Get the name of the trust model to use
-            trust_model = cast(str, sat.memory.getParamA(
-                PARAM_NAME,
-                PARAM_CATEGORY,
-                profile_key=cast(str, client.profile)
-            ))
-
-            # Under the BTBV trust model, if at least one device of a bare JID is manually
-            # trusted or distrusted, the trust model is "downgraded" to manual trust.
-            # Thus, we can separate bare JIDs into two pools here, one pool of bare JIDs
-            # for which BTBV is active, and one pool of bare JIDs for which manual trust
-            # is used.
+            # Both the ATM and the BTBV trust models work with blind trust before the
+            # first manual verification is performed. Thus, we can separate bare JIDs into
+            # two pools here, one pool of bare JIDs for which blind trust is active, and
+            # one pool of bare JIDs for which manual trust is used instead.
             bare_jids = { device.bare_jid for device in undecided }
 
-            btbv_bare_jids: Set[str] = set()
+            blind_trust_bare_jids: Set[str] = set()
             manual_trust_bare_jids: Set[str] = set()
 
-            if trust_model == "btbv":
-                # For each bare JID, decide whether BTBV or manual trust applies
-                for bare_jid in bare_jids:
-                    # Get all known devices belonging to the bare JID
-                    devices = await self.get_device_information(bare_jid)
-
-                    # If the trust levels of all devices correspond to those used by BTBV,
-                    # BTBV applies. Otherwise, fall back to manual trust.
-                    if all(TrustLevel(device.trust_level_name) in {
-                        TrustLevel.UNDECIDED,
-                        TrustLevel.BLINDLY_TRUSTED
-                    } for device in devices):
-                        btbv_bare_jids.add(bare_jid)
-                    else:
-                        manual_trust_bare_jids.add(bare_jid)
-
-            if trust_model == "manual":
-                manual_trust_bare_jids = bare_jids
+            # For each bare JID, decide whether blind trust applies
+            for bare_jid in bare_jids:
+                # Get all known devices belonging to the bare JID
+                devices = await self.get_device_information(bare_jid)
+
+                # If the trust levels of all devices correspond to those used by blind
+                # trust, blind trust applies. Otherwise, fall back to manual trust.
+                if all(TrustLevel(device.trust_level_name) in {
+                    TrustLevel.UNDECIDED,
+                    TrustLevel.BLINDLY_TRUSTED
+                } for device in devices):
+                    blind_trust_bare_jids.add(bare_jid)
+                else:
+                    manual_trust_bare_jids.add(bare_jid)
 
             # With the JIDs sorted into their respective pools, the undecided devices can
             # be categorized too
             blindly_trusted_devices = \
-                { dev for dev in undecided if dev.bare_jid in btbv_bare_jids }
+                { dev for dev in undecided if dev.bare_jid in blind_trust_bare_jids }
             manually_trusted_devices = \
                 { dev for dev in undecided if dev.bare_jid in manual_trust_bare_jids }
 
-            # Blindly trust devices handled by BTBV
+            # Blindly trust devices handled by blind trust
             if len(blindly_trusted_devices) > 0:
                 for device in blindly_trusted_devices:
                     await self.set_trust(
@@ -817,11 +1213,8 @@
                     feedback_jid,
                     D_(
                         "Not all destination devices are trusted, unknown devices will be"
-                        " blindly trusted due to the Blind Trust Before Verification"
-                        " policy. If you want a more secure workflow, please activate the"
-                        " \"manual\" policy in the settings' \"Security\" tab.\nFollowing"
-                        " devices have been automatically trusted:"
-                        f" {blindly_trusted_devices_stringified}."
+                        " blindly trusted.\nFollowing devices have been automatically"
+                        f" trusted: {blindly_trusted_devices_stringified}."
                     )
                 )
 
@@ -854,7 +1247,6 @@
             if element is None:
                 raise omemo.UnknownNamespace(f"Unknown namespace: {message.namespace}")
 
-            # TODO: Untested
             message_data = client.generateMessageXML(MessageData({
                 "from": client.jid,
                 "to": jid.JID(bare_jid),
@@ -959,19 +1351,40 @@
                 trust_ui_result
             ))
 
+            trust_updates: Set[TrustUpdate] = set()
+
             for key, value in data_form_result.items():
                 if not key.startswith("trust_"):
                     continue
 
                 device = undecided_ordered[int(key[len("trust_"):])]
-                trust = C.bool(value)
+                target_trust = C.bool(value)
+                trust_level = \
+                    TrustLevel.TRUSTED if target_trust else TrustLevel.DISTRUSTED
 
                 await self.set_trust(
                     device.bare_jid,
                     device.identity_key,
-                    TrustLevel.TRUSTED.name if trust else TrustLevel.DISTRUSTED.name
+                    trust_level.name
                 )
 
+                trust_updates.add(TrustUpdate(
+                    target_jid=jid.JID(device.bare_jid).userhostJID(),
+                    target_key=device.identity_key,
+                    target_trust=target_trust
+                ))
+
+            # Check whether ATM is enabled and handle everything in case it is
+            trust_system = cast(str, sat.memory.getParamA(
+                PARAM_NAME,
+                PARAM_CATEGORY,
+                profile_key=profile
+            ))
+
+            if trust_system == "atm":
+                await manage_trust_message_cache(client, self, frozenset(trust_updates))
+                await send_trust_messages(client, self, frozenset(trust_updates))
+
     return SessionManagerImpl
 
 
@@ -1086,7 +1499,8 @@
     <param name="{PARAM_NAME}"
         label={quoteattr(D_('OMEMO default trust policy'))}
         type="list" security="3">
-        <option value="manual" label={quoteattr(D_('Manual trust (more secure)'))} />
+        <option value="atm"
+            label={quoteattr(D_('Automatic Trust Management (more secure)'))} />
         <option value="btbv"
             label={quoteattr(D_('Blind Trust Before Verification (more user friendly)'))}
             selected="true" />
@@ -1103,7 +1517,7 @@
     ``urn:xmpp:omemo:2`` namespace and the (legacy) ``eu.siacs.conversations.axolotl``
     namespace. Both versions of the protocol are handled by this plugin and compatibility
     between the two is maintained. MUC messages are supported next to one to one messages.
-    For trust management, the two trust models "BTBV" and "manual" are supported.
+    For trust management, the two trust models "ATM" and "BTBV" are supported.
     """
     NS_TWOMEMO = twomemo.twomemo.NAMESPACE
     NS_OLDMEMO = oldmemo.oldmemo.NAMESPACE
@@ -1163,7 +1577,8 @@
         self.__session_manager_waiters: Dict[str, List[defer.Deferred]] = {}
 
         # These triggers are used by oldmemo, which doesn't do SCE and only applies to
-        # messages
+        # messages. Temporarily, until a more fitting trigger for SCE-based encryption is
+        # added, the messageReceived trigger is also used for twomemo.
         sat.trigger.add(
             "messageReceived",
             self.__message_received_trigger,
@@ -1298,7 +1713,7 @@
 
         async def callback(
             data: Any,
-            profile: str  # pylint: disable=unused-argument
+            profile: str
         ) -> Dict[Never, Never]:
             """
             @param data: The XMLUI result produces by the trust UI form.
@@ -1314,18 +1729,56 @@
                 Dict[str, str],
                 xml_tools.XMLUIResult2DataFormResult(data)
             )
+
+            trust_updates: Set[TrustUpdate] = set()
+
             for key, value in data_form_result.items():
                 if not key.startswith("trust_"):
                     continue
 
                 device = devices[int(key[len("trust_"):])]
-                trust = TrustLevel(value)
-
-                if TrustLevel(device.trust_level_name) is not trust:
+                trust_level_name = value
+
+                if device.trust_level_name != trust_level_name:
                     await session_manager.set_trust(
                         device.bare_jid,
                         device.identity_key,
-                        value
+                        trust_level_name
+                    )
+
+                    target_trust: Optional[bool] = None
+
+                    if TrustLevel(trust_level_name) is TrustLevel.TRUSTED:
+                        target_trust = True
+                    if TrustLevel(trust_level_name) is TrustLevel.DISTRUSTED:
+                        target_trust = False
+
+                    if target_trust is not None:
+                        trust_updates.add(TrustUpdate(
+                            target_jid=jid.JID(device.bare_jid).userhostJID(),
+                            target_key=device.identity_key,
+                            target_trust=target_trust
+                        ))
+
+            # Check whether ATM is enabled and handle everything in case it is
+            trust_system = cast(str, self.__sat.memory.getParamA(
+                PARAM_NAME,
+                PARAM_CATEGORY,
+                profile_key=profile
+            ))
+
+            if trust_system == "atm":
+                if len(trust_updates) > 0:
+                    await manage_trust_message_cache(
+                        client,
+                        session_manager,
+                        frozenset(trust_updates)
+                    )
+
+                    await send_trust_messages(
+                        client,
+                        session_manager,
+                        frozenset(trust_updates)
                     )
 
             return {}
@@ -1341,13 +1794,15 @@
         # pylint: disable=no-member
         trust_ui = cast(Any, result)
         trust_ui.addText(D_(
-            "This is OMEMO trusting system. You'll see below the devices of your "
-            "contacts, and a list selection to trust them or not. A trusted device "
-            "can read your messages in plain text, so be sure to only validate "
-            "devices that you are sure are belonging to your contact. It's better "
-            "to do this when you are next to your contact and their device, so "
-            "you can check the \"fingerprint\" (the number next to the device) "
-            "yourself. Do *not* validate a device if the fingerprint is wrong!"
+            "This is OMEMO trusting system. You'll see below the devices of your"
+            " contacts, and a list selection to trust them or not. A trusted device"
+            " can read your messages in plain text, so be sure to only validate"
+            " devices that you are sure are belonging to your contact. It's better"
+            " to do this when you are next to your contact and their device, so"
+            " you can check the \"fingerprint\" (the number next to the device)"
+            " yourself. Do *not* validate a device if the fingerprint is wrong!"
+            " Note that manually validating a fingerprint disables any form of automatic"
+            " trust."
         ))
 
         own_device, __ = await session_manager.get_own_device_information()
@@ -1491,6 +1946,156 @@
 
             return session_manager
 
+    async def __message_received_trigger_atm(
+        self,
+        client: SatXMPPClient,
+        message_elt: domish.Element,
+        session_manager: omemo.SessionManager,
+        sender_device_information: omemo.DeviceInformation,
+        timestamp: datetime
+    ) -> None:
+        """Check a newly decrypted message stanza for ATM content and perform ATM in case.
+
+        @param client: The client which received the message.
+        @param message_elt: The message element. Can be modified.
+        @param session_manager: The session manager.
+        @param sender_device_information: Information about the device that sent/encrypted
+            the message.
+        @param timestamp: Timestamp extracted from the SCE time affix.
+        """
+
+        trust_message_cache = persistent.LazyPersistentBinaryDict(
+            "XEP-0384/TM",
+            client.profile
+        )
+
+        new_cache_entries: Set[TrustMessageCacheEntry] = set()
+
+        for trust_message_elt in message_elt.elements(NS_TM, "trust-message"):
+            assert isinstance(trust_message_elt, domish.Element)
+
+            try:
+                TRUST_MESSAGE_SCHEMA.validate(trust_message_elt.toXml())
+            except xmlschema.XMLSchemaValidationError as e:
+                raise exceptions.ParsingError(
+                    "<trust-message/> element doesn't pass schema validation."
+                ) from e
+
+            if trust_message_elt["usage"] != NS_ATM:
+                # Skip non-ATM trust message
+                continue
+
+            if trust_message_elt["encryption"] != OMEMO.NS_TWOMEMO:
+                # Skip non-twomemo trust message
+                continue
+
+            for key_owner_elt in trust_message_elt.elements(NS_TM, "key-owner"):
+                assert isinstance(key_owner_elt, domish.Element)
+
+                key_owner_jid = jid.JID(key_owner_elt["jid"]).userhostJID()
+
+                for trust_elt in key_owner_elt.elements(NS_TM, "trust"):
+                    assert isinstance(trust_elt, domish.Element)
+
+                    new_cache_entries.add(TrustMessageCacheEntry(
+                        sender_jid=jid.JID(sender_device_information.bare_jid),
+                        sender_key=sender_device_information.identity_key,
+                        timestamp=timestamp,
+                        trust_update=TrustUpdate(
+                            target_jid=key_owner_jid,
+                            target_key=base64.b64decode(str(trust_elt)),
+                            target_trust=True
+                        )
+                    ))
+
+                for distrust_elt in key_owner_elt.elements(NS_TM, "distrust"):
+                    assert isinstance(distrust_elt, domish.Element)
+
+                    new_cache_entries.add(TrustMessageCacheEntry(
+                        sender_jid=jid.JID(sender_device_information.bare_jid),
+                        sender_key=sender_device_information.identity_key,
+                        timestamp=timestamp,
+                        trust_update=TrustUpdate(
+                            target_jid=key_owner_jid,
+                            target_key=base64.b64decode(str(distrust_elt)),
+                            target_trust=False
+                        )
+                    ))
+
+        # Load existing cache entries
+        existing_cache_entries = cast(
+            Set[TrustMessageCacheEntry],
+            await trust_message_cache.get("cache", set())
+        )
+
+        # Discard cache entries by timestamp comparison
+        existing_by_target = {
+            (
+                cache_entry.trust_update.target_jid.userhostJID(),
+                cache_entry.trust_update.target_key
+            ): cache_entry
+            for cache_entry
+            in existing_cache_entries
+        }
+
+        # Iterate over a copy here, such that new_cache_entries can be modified
+        for new_cache_entry in set(new_cache_entries):
+            existing_cache_entry = existing_by_target.get(
+                (
+                    new_cache_entry.trust_update.target_jid.userhostJID(),
+                    new_cache_entry.trust_update.target_key
+                ),
+                None
+            )
+
+            if existing_cache_entry is not None:
+                if existing_cache_entry.timestamp > new_cache_entry.timestamp:
+                    # If the existing cache entry is newer than the new cache entry,
+                    # discard the new one in favor of the existing one
+                    new_cache_entries.remove(new_cache_entry)
+                else:
+                    # Otherwise, discard the existing cache entry. This includes the case
+                    # when both cache entries have matching timestamps.
+                    existing_cache_entries.remove(existing_cache_entry)
+
+        # If the sending device is trusted, apply the new cache entries
+        applied_trust_updates: Set[TrustUpdate] = set()
+
+        if TrustLevel(sender_device_information.trust_level_name) is TrustLevel.TRUSTED:
+            # Iterate over a copy such that new_cache_entries can be modified
+            for cache_entry in set(new_cache_entries):
+                trust_update = cache_entry.trust_update
+
+                trust_level = (
+                    TrustLevel.TRUSTED
+                    if trust_update.target_trust
+                    else TrustLevel.DISTRUSTED
+                )
+
+                await session_manager.set_trust(
+                    trust_update.target_jid.userhost(),
+                    trust_update.target_key,
+                    trust_level.name
+                )
+
+                applied_trust_updates.add(trust_update)
+
+                new_cache_entries.remove(cache_entry)
+
+        # Store the remaining existing and new cache entries
+        await trust_message_cache.force(
+            "cache",
+            existing_cache_entries | new_cache_entries
+        )
+
+        # If the trust of at least one device was modified, run the ATM cache update logic
+        if len(applied_trust_updates) > 0:
+            await manage_trust_message_cache(
+                client,
+                session_manager,
+                frozenset(applied_trust_updates)
+            )
+
     async def __message_received_trigger(
         self,
         client: SatXMPPClient,
@@ -1506,6 +2111,7 @@
             encrypted.
         @return: Whether to continue the message received flow.
         """
+
         muc_plaintext_cache_key: Optional[MUCPlaintextCacheKey] = None
 
         sender_jid = jid.JID(message_elt["from"])
@@ -1569,8 +2175,6 @@
             # I'm not sure why this check is required, this code is copied from the old
             # plugin.
             if sender_jid.userhostJID() == client.jid.userhostJID():
-                # TODO: I've seen this cause an exception "builtins.KeyError: 'to'", seems
-                # like "to" isn't always set.
                 try:
                     feedback_jid = jid.JID(message_elt["to"])
                 except KeyError:
@@ -1700,6 +2304,8 @@
                 # No point in further processing this message
                 return False
 
+        affix_values: Optional[SCEAffixValues] = None
+
         if message.namespace == twomemo.twomemo.NAMESPACE:
             if plaintext is not None:
                 # XEP_0420.unpack_stanza handles the whole unpacking, including the
@@ -1726,12 +2332,12 @@
                     )
                     # No point in further processing this message
                     return False
-
-                if affix_values.timestamp is not None:
-                    # TODO: affix_values.timestamp contains the timestamp included in the
-                    # encrypted element here. The XEP says it SHOULD be displayed with the
-                    # plaintext by clients.
-                    pass
+                else:
+                    if affix_values.timestamp is not None:
+                        # TODO: affix_values.timestamp contains the timestamp included in
+                        # the encrypted element here. The XEP says it SHOULD be displayed
+                        # with the plaintext by clients.
+                        pass
 
         if message.namespace == oldmemo.oldmemo.NAMESPACE:
             # Remove all body elements from the original element, since those act as
@@ -1746,7 +2352,8 @@
 
         # Mark the message as trusted or untrusted. Undecided counts as untrusted here.
         trust_level = \
-            TrustLevel(device_information.trust_level_name).to_omemo_trust_level()
+            await session_manager._evaluate_custom_trust_level(device_information)
+
         if trust_level is omemo.TrustLevel.TRUSTED:
             post_treat.addCallback(client.encryption.markAsTrusted)
         else:
@@ -1758,6 +2365,16 @@
             namespace=message.namespace
         )
 
+        # Handle potential ATM trust updates
+        if affix_values is not None and affix_values.timestamp is not None:
+            await self.__message_received_trigger_atm(
+                client,
+                message_elt,
+                session_manager,
+                device_information,
+                affix_values.timestamp
+            )
+
         # Message processed successfully, continue with the flow
         return True
 
@@ -1769,7 +2386,7 @@
         """
         # SCE is only applicable to message and IQ stanzas
         # FIXME: temporary disabling IQ stanza encryption
-        if stanza.name not in { "message" }: # , "iq" }:
+        if stanza.name not in { "message" }:  # , "iq" }:
             return True
 
         # Get the intended recipient
@@ -2019,11 +2636,15 @@
 
         if namespace == twomemo.twomemo.NAMESPACE:
             # Add the encrypted element
-            stanza.addChild(xml_tools.et_elt_2_domish_elt(twomemo.etree.serialize_message(message)))
+            stanza.addChild(xml_tools.et_elt_2_domish_elt(
+                twomemo.etree.serialize_message(message)
+            ))
 
         if namespace == oldmemo.oldmemo.NAMESPACE:
             # Add the encrypted element
-            stanza.addChild(xml_tools.et_elt_2_domish_elt(oldmemo.etree.serialize_message(message)))
+            stanza.addChild(xml_tools.et_elt_2_domish_elt(
+                oldmemo.etree.serialize_message(message)
+            ))
 
         if muc_plaintext_cache_key is not None:
             self.__muc_plaintext_cache[muc_plaintext_cache_key] = plaintext