Mercurial > libervia-backend
changeset 3967:f461f11ea176
plugin XEP-0384: Implementation of Automatic Trust Management:
- Implementation of Trust Messages (XEP-0434)
- Implementation of Automatic Trust Management (XEP-0450)
- Implementations directly as part of the OMEMO plugin, since omemo:2 is the only protocol supported by ATM at the moment
- Trust system selection updated to allow choice between manual trust with ATM and BTBV
- dev-requirements.txt updated to include additional requirements for the e2e tests
fix 376
author | Syndace <me@syndace.dev> |
---|---|
date | Fri, 28 Oct 2022 18:50:06 +0200 |
parents | 9f85369294f3 |
children | 0dd79c6cc1d2 |
files | dev-requirements.txt requirements.txt sat/plugins/plugin_xep_0374.py sat/plugins/plugin_xep_0384.py sat/tools/xml_tools.py setup.py |
diffstat | 6 files changed, 819 insertions(+), 162 deletions(-) [+] |
line wrap: on
line diff
--- a/dev-requirements.txt Sun Oct 30 01:06:58 2022 +0200 +++ b/dev-requirements.txt Fri Oct 28 18:50:06 2022 +0200 @@ -6,3 +6,7 @@ pylint pytest pytest_twisted + +sh +libervia-templates @ hg+https://repos.goffi.org/sat_templates +libervia-web @ hg+https://repos.goffi.org/libervia-web
--- a/requirements.txt Sun Oct 30 01:06:58 2022 +0200 +++ b/requirements.txt Fri Oct 28 18:50:06 2022 +0200 @@ -19,7 +19,7 @@ hyperlink==21.0.0 idna==3.3 incremental==21.3.0 -Jinja2==3.1.2 +Jinja2==3.0.3 langid==1.1.6 lxml==4.8.0 Mako==1.2.0
--- a/sat/plugins/plugin_xep_0374.py Sun Oct 30 01:06:58 2022 +0200 +++ b/sat/plugins/plugin_xep_0374.py Fri Oct 28 18:50:06 2022 +0200 @@ -207,8 +207,6 @@ else: # I'm not sure why this check is required, this code is copied from XEP-0384 if sender_jid.userhostJID() == client.jid.userhostJID(): - # TODO: I've seen this cause an exception "builtins.KeyError: 'to'", seems - # like "to" isn't always set. try: feedback_jid = jid.JID(message_elt["to"]) except KeyError:
--- a/sat/plugins/plugin_xep_0384.py Sun Oct 30 01:06:58 2022 +0200 +++ b/sat/plugins/plugin_xep_0384.py Fri Oct 28 18:50:06 2022 +0200 @@ -16,18 +16,20 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import base64 +from datetime import datetime import enum import logging import time -from typing import ( - Any, Callable, Dict, FrozenSet, List, Literal, NamedTuple, Optional, Set, Type, cast -) +from typing import \ + Any, Dict, FrozenSet, List, Literal, NamedTuple, Optional, Set, Type, cast import uuid import xml.etree.ElementTree as ET from xml.sax.saxutils import quoteattr -from typing_extensions import Never, assert_never +from typing_extensions import Final, Never, assert_never from wokkel import muc, pubsub # type: ignore[import] +import xmlschema from sat.core import exceptions from sat.core.constants import Const as C @@ -43,7 +45,8 @@ from sat.plugins.plugin_xep_0163 import XEP_0163 from sat.plugins.plugin_xep_0334 import XEP_0334 from sat.plugins.plugin_xep_0359 import XEP_0359 -from sat.plugins.plugin_xep_0420 import XEP_0420, SCEAffixPolicy, SCEProfile +from sat.plugins.plugin_xep_0420 import \ + XEP_0420, SCEAffixPolicy, SCEAffixValues, SCEProfile from sat.tools import xml_tools from twisted.internet import defer from twisted.words.protocols.jabber import error, jid @@ -78,9 +81,6 @@ log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call] -string_to_domish = cast(Callable[[str], domish.Element], xml_tools.ElementParser()) - - PLUGIN_INFO = { C.PI_NAME: "OMEMO", C.PI_IMPORT_NAME: "XEP-0384", @@ -134,68 +134,10 @@ message_uid: str -# TODO: Convert without serialization/parsing -# On a medium-to-large-sized oldmemo message stanza, 10000 runs of this function took -# around 0.6 seconds on my setup. -def etree_to_domish(element: ET.Element) -> domish.Element: - """ - @param element: An ElementTree element. - @return: The ElementTree element converted to a domish element. - """ - - return string_to_domish(ET.tostring(element, encoding="unicode")) - - -# TODO: Convert without serialization/parsing -# On a medium-to-large-sized oldmemo message stanza, 10000 runs of this function took less -# than one second on my setup. -def domish_to_etree(element: domish.Element) -> ET.Element: - """ - @param element: A domish element. - @return: The domish element converted to an ElementTree element. - """ - - return ET.fromstring(element.toXml()) - - -def domish_to_etree2(element: domish.Element) -> ET.Element: - """ - WIP - """ - - element_name = element.name - if element.uri is not None: - element_name = "{" + element.uri + "}" + element_name - - attrib: Dict[str, str] = {} - for qname, value in element.attributes.items(): - attribute_name = qname[1] if isinstance(qname, tuple) else qname - attribute_namespace = qname[0] if isinstance(qname, tuple) else None - if attribute_namespace is not None: - attribute_name = "{" + attribute_namespace + "}" + attribute_name - - attrib[attribute_name] = value - - result = ET.Element(element_name, attrib) - - last_child: Optional[ET.Element] = None - for child in element.children: - if isinstance(child, str): - if last_child is None: - result.text = child - else: - last_child.tail = child - else: - last_child = domish_to_etree2(child) - result.append(last_child) - - return result - - @enum.unique class TrustLevel(enum.Enum): """ - The trust levels required for BTBV and manual trust. + The trust levels required for ATM and BTBV. """ TRUSTED: str = "TRUSTED" @@ -203,20 +145,6 @@ UNDECIDED: str = "UNDECIDED" DISTRUSTED: str = "DISTRUSTED" - def to_omemo_trust_level(self) -> omemo.TrustLevel: - """ - @return: This custom trust level evaluated to one of the OMEMO trust levels. - """ - - if self is TrustLevel.TRUSTED or self is TrustLevel.BLINDLY_TRUSTED: - return omemo.TrustLevel.TRUSTED - if self is TrustLevel.UNDECIDED: - return omemo.TrustLevel.UNDECIDED - if self is TrustLevel.DISTRUSTED: - return omemo.TrustLevel.DISTRUSTED - - return assert_never(self) - TWOMEMO_DEVICE_LIST_NODE = "urn:xmpp:omemo:2:devices" OLDMEMO_DEVICE_LIST_NODE = "eu.siacs.conversations.axolotl.devicelist" @@ -431,7 +359,8 @@ f" {namespace}: Unexpected number of items retrieved: {len(items)}." ) - element = next(iter(xml_tools.domish_elt_2_et_elt(cast(domish.Element, items[0]))), None) + element = \ + next(iter(xml_tools.domish_elt_2_et_elt(cast(domish.Element, items[0]))), None) if element is None: raise omemo.BundleDownloadFailed( f"Bundle download failed for {bare_jid}: {device_id} under namespace" @@ -447,6 +376,433 @@ ) from e +# ATM only supports protocols based on SCE, which is currently only omemo:2, and relies on +# so many implementation details of the encryption protocol that it makes more sense to +# add ATM to the OMEMO plugin directly instead of having it a separate Libervia plugin. +NS_TM: Final = "urn:xmpp:tm:1" +NS_ATM: Final = "urn:xmpp:atm:1" + + +TRUST_MESSAGE_SCHEMA = xmlschema.XMLSchema("""<?xml version='1.0' encoding='UTF-8'?> +<xs:schema xmlns:xs='http://www.w3.org/2001/XMLSchema' + targetNamespace='urn:xmpp:tm:1' + xmlns='urn:xmpp:tm:1' + elementFormDefault='qualified'> + + <xs:element name='trust-message'> + <xs:complexType> + <xs:sequence> + <xs:element ref='key-owner' minOccurs='1' maxOccurs='unbounded'/> + </xs:sequence> + <xs:attribute name='usage' type='xs:string' use='required'/> + <xs:attribute name='encryption' type='xs:string' use='required'/> + </xs:complexType> + </xs:element> + + <xs:element name='key-owner'> + <xs:complexType> + <xs:sequence> + <xs:element + name='trust' type='xs:base64Binary' minOccurs='0' maxOccurs='unbounded'/> + <xs:element + name='distrust' type='xs:base64Binary' minOccurs='0' maxOccurs='unbounded'/> + </xs:sequence> + <xs:attribute name='jid' type='xs:string' use='required'/> + </xs:complexType> + </xs:element> +</xs:schema> +""") + + +# This is compatible with omemo:2's SCE profile +TM_SCE_PROFILE = SCEProfile( + rpad_policy=SCEAffixPolicy.REQUIRED, + time_policy=SCEAffixPolicy.REQUIRED, + to_policy=SCEAffixPolicy.OPTIONAL, + from_policy=SCEAffixPolicy.OPTIONAL, + custom_policies={} +) + + +class TrustUpdate(NamedTuple): + # pylint: disable=invalid-name + """ + An update to the trust status of an identity key, used by Automatic Trust Management. + """ + + target_jid: jid.JID + target_key: bytes + target_trust: bool + + +class TrustMessageCacheEntry(NamedTuple): + # pylint: disable=invalid-name + """ + An entry in the trust message cache used by ATM. + """ + + sender_jid: jid.JID + sender_key: bytes + timestamp: datetime + trust_update: TrustUpdate + + +class PartialTrustMessage(NamedTuple): + # pylint: disable=invalid-name + """ + A structure representing a partial trust message, used by :func:`send_trust_messages` + to build trust messages. + """ + + recipient_jid: jid.JID + updated_jid: jid.JID + trust_updates: FrozenSet[TrustUpdate] + + +async def manage_trust_message_cache( + client: SatXMPPClient, + session_manager: omemo.SessionManager, + applied_trust_updates: FrozenSet[TrustUpdate] +) -> None: + """Manage the ATM trust message cache after trust updates have been applied. + + @param client: The client this operation runs under. + @param session_manager: The session manager to use. + @param applied_trust_updates: The trust updates that have already been applied, + triggering this cache management run. + """ + + trust_message_cache = persistent.LazyPersistentBinaryDict( + "XEP-0384/TM", + client.profile + ) + + # Load cache entries + cache_entries = cast( + Set[TrustMessageCacheEntry], + await trust_message_cache.get("cache", set()) + ) + + # Expire cache entries that were overwritten by the applied trust updates + cache_entries_by_target = { + ( + cache_entry.trust_update.target_jid.userhostJID(), + cache_entry.trust_update.target_key + ): cache_entry + for cache_entry + in cache_entries + } + + for trust_update in applied_trust_updates: + cache_entry = cache_entries_by_target.get( + (trust_update.target_jid.userhostJID(), trust_update.target_key), + None + ) + + if cache_entry is not None: + cache_entries.remove(cache_entry) + + # Apply cached Trust Messages by newly trusted devices + new_trust_updates: Set[TrustUpdate] = set() + + for trust_update in applied_trust_updates: + if trust_update.target_trust: + # Iterate over a copy such that cache_entries can be modified + for cache_entry in set(cache_entries): + if ( + cache_entry.sender_jid.userhostJID() + == trust_update.target_jid.userhostJID() + and cache_entry.sender_key == trust_update.target_key + ): + trust_level = ( + TrustLevel.TRUSTED + if cache_entry.trust_update.target_trust + else TrustLevel.DISTRUSTED + ) + + # Apply the trust update + await session_manager.set_trust( + cache_entry.trust_update.target_jid.userhost(), + cache_entry.trust_update.target_key, + trust_level.name + ) + + # Track the fact that this trust update has been applied + new_trust_updates.add(cache_entry.trust_update) + + # Remove the corresponding cache entry + cache_entries.remove(cache_entry) + + # Store the updated cache entries + await trust_message_cache.force("cache", cache_entries) + + # TODO: Notify the user ("feedback") about automatically updated trust? + + if len(new_trust_updates) > 0: + # If any trust has been updated, recursively perform another run of cache + # management + await manage_trust_message_cache( + client, + session_manager, + frozenset(new_trust_updates) + ) + + +async def get_trust_as_trust_updates( + session_manager: omemo.SessionManager, + target_jid: jid.JID +) -> FrozenSet[TrustUpdate]: + """Get the trust status of all known keys of a JID as trust updates for use with ATM. + + @param session_manager: The session manager to load the trust from. + @param target_jid: The JID to load the trust for. + @return: The trust updates encoding the trust status of all known keys of the JID that + are either explicitly trusted or distrusted. Undecided keys are not included in + the trust updates. + """ + + devices = await session_manager.get_device_information(target_jid.userhost()) + + trust_updates: Set[TrustUpdate] = set() + + for device in devices: + trust_level = TrustLevel(device.trust_level_name) + target_trust: bool + + if trust_level is TrustLevel.TRUSTED: + target_trust = True + elif trust_level is TrustLevel.DISTRUSTED: + target_trust = False + else: + # Skip devices that are not explicitly trusted or distrusted + continue + + trust_updates.add(TrustUpdate( + target_jid=target_jid.userhostJID(), + target_key=device.identity_key, + target_trust=target_trust + )) + + return frozenset(trust_updates) + + +async def send_trust_messages( + client: SatXMPPClient, + session_manager: omemo.SessionManager, + applied_trust_updates: FrozenSet[TrustUpdate] +) -> None: + """Send information about updated trust to peers via ATM (XEP-0450). + + @param client: The client. + @param session_manager: The session manager. + @param applied_trust_updates: The trust updates that have already been applied, to + notify other peers about. + """ + # NOTE: This currently sends information about oldmemo trust too. This is not + # specified and experimental, but since twomemo and oldmemo share the same identity + # keys and trust systems, this could be a cool side effect. + + # Send Trust Messages for newly trusted and distrusted devices + own_jid = client.jid.userhostJID() + own_trust_updates = await get_trust_as_trust_updates(session_manager, own_jid) + + # JIDs of which at least one device's trust has been updated + updated_jids = frozenset({ + trust_update.target_jid.userhostJID() + for trust_update + in applied_trust_updates + }) + + trust_messages: Set[PartialTrustMessage] = set() + + for updated_jid in updated_jids: + # Get the trust updates for that JID + trust_updates = frozenset({ + trust_update for trust_update in applied_trust_updates + if trust_update.target_jid.userhostJID() == updated_jid + }) + + if updated_jid == own_jid: + # If the own JID is updated, _all_ peers have to be notified + # TODO: Using my author's privilege here to shamelessly access private fields + # and storage keys until I've added public API to get a list of peers to + # python-omemo. + storage: omemo.Storage = getattr(session_manager, "_SessionManager__storage") + peer_jids = frozenset({ + jid.JID(bare_jid).userhostJID() for bare_jid in (await storage.load_list( + f"/{OMEMO.NS_TWOMEMO}/bare_jids", + str + )).maybe([]) + }) + + if len(peer_jids) == 0: + # If there are no peers to notify, notify our other devices about the + # changes directly + trust_messages.add(PartialTrustMessage( + recipient_jid=own_jid, + updated_jid=own_jid, + trust_updates=trust_updates + )) + else: + # Otherwise, notify all peers about the changes in trust and let carbons + # handle the copy to our own JID + for peer_jid in peer_jids: + trust_messages.add(PartialTrustMessage( + recipient_jid=peer_jid, + updated_jid=own_jid, + trust_updates=trust_updates + )) + + # Also send full trust information about _every_ peer to our newly + # trusted devices + peer_trust_updates = \ + await get_trust_as_trust_updates(session_manager, peer_jid) + + trust_messages.add(PartialTrustMessage( + recipient_jid=own_jid, + updated_jid=peer_jid, + trust_updates=peer_trust_updates + )) + + # Send information about our own devices to our newly trusted devices + trust_messages.add(PartialTrustMessage( + recipient_jid=own_jid, + updated_jid=own_jid, + trust_updates=own_trust_updates + )) + else: + # Notify our other devices about the changes in trust + trust_messages.add(PartialTrustMessage( + recipient_jid=own_jid, + updated_jid=updated_jid, + trust_updates=trust_updates + )) + + # Send a summary of our own trust to newly trusted devices + trust_messages.add(PartialTrustMessage( + recipient_jid=updated_jid, + updated_jid=own_jid, + trust_updates=own_trust_updates + )) + + # All trust messages prepared. Merge all trust messages directed at the same + # recipient. + recipient_jids = { trust_message.recipient_jid for trust_message in trust_messages } + + for recipient_jid in recipient_jids: + updated: Dict[jid.JID, Set[TrustUpdate]] = {} + + for trust_message in trust_messages: + # Merge trust messages directed at that recipient + if trust_message.recipient_jid == recipient_jid: + # Merge the trust updates + updated[trust_message.updated_jid] = \ + updated.get(trust_message.updated_jid, set()) + + updated[trust_message.updated_jid] |= trust_message.trust_updates + + # Build the trust message + trust_message_elt = domish.Element((NS_TM, "trust-message")) + trust_message_elt["usage"] = NS_ATM + trust_message_elt["encryption"] = twomemo.twomemo.NAMESPACE + + for updated_jid, trust_updates in updated.items(): + key_owner_elt = trust_message_elt.addElement((NS_TM, "key-owner")) + key_owner_elt["jid"] = updated_jid.userhost() + + for trust_update in trust_updates: + serialized_identity_key = \ + base64.b64encode(trust_update.target_key).decode("ASCII") + + if trust_update.target_trust: + key_owner_elt.addElement( + (NS_TM, "trust"), + content=serialized_identity_key + ) + else: + key_owner_elt.addElement( + (NS_TM, "distrust"), + content=serialized_identity_key + ) + + # Finally, encrypt and send the trust message! + message_data = client.generateMessageXML(MessageData({ + "from": own_jid, + "to": recipient_jid, + "uid": str(uuid.uuid4()), + "message": {}, + "subject": {}, + "type": C.MESS_TYPE_CHAT, + "extra": {}, + "timestamp": time.time() + })) + + message_data["xml"].addChild(trust_message_elt) + + plaintext = XEP_0420.pack_stanza(TM_SCE_PROFILE, message_data["xml"]) + + feedback_jid = recipient_jid + + # TODO: The following is mostly duplicate code + try: + messages, encryption_errors = await session_manager.encrypt( + frozenset({ own_jid.userhost(), recipient_jid.userhost() }), + { OMEMO.NS_TWOMEMO: plaintext }, + backend_priority_order=[ OMEMO.NS_TWOMEMO ], + identifier=feedback_jid.userhost() + ) + except Exception as e: + msg = _( + # pylint: disable=consider-using-f-string + "Can't encrypt message for {entities}: {reason}".format( + entities=', '.join({ own_jid.userhost(), recipient_jid.userhost() }), + reason=e + ) + ) + log.warning(msg) + client.feedback(feedback_jid, msg, { + C.MESS_EXTRA_INFO: C.EXTRA_INFO_ENCR_ERR + }) + raise e + + if len(encryption_errors) > 0: + log.warning( + f"Ignored the following non-critical encryption errors:" + f" {encryption_errors}" + ) + + encrypted_errors_stringified = ", ".join([ + f"device {err.device_id} of {err.bare_jid} under namespace" + f" {err.namespace}" + for err + in encryption_errors + ]) + + client.feedback( + feedback_jid, + D_( + "There were non-critical errors during encryption resulting in some" + " of your destinees' devices potentially not receiving the message." + " This happens when the encryption data/key material of a device is" + " incomplete or broken, which shouldn't happen for actively used" + " devices, and can usually be ignored. The following devices are" + f" affected: {encrypted_errors_stringified}." + ) + ) + + message = next( + message for message in messages + if message.namespace == OMEMO.NS_TWOMEMO + ) + + # Add the encrypted element + message_data["xml"].addChild(xml_tools.et_elt_2_domish_elt( + twomemo.etree.serialize_message(message) + )) + + await client.a_send(message_data["xml"]) + + def make_session_manager(sat: SAT, profile: str) -> Type[omemo.SessionManager]: """ @param sat: The SAT instance. @@ -705,13 +1061,18 @@ if len(items) == 0: return {} - elif len(items) != 1: + + if len(items) != 1: raise omemo.DeviceListDownloadFailed( f"Device list download failed for {bare_jid} under namespace" f" {namespace}: Unexpected number of items retrieved: {len(items)}." ) - element = next(iter(xml_tools.domish_elt_2_et_elt(cast(domish.Element, items[0]))), None) + element = next( + iter(xml_tools.domish_elt_2_et_elt(cast(domish.Element, items[0]))), + None + ) + if element is None: raise omemo.DeviceListDownloadFailed( f"Device list download failed for {bare_jid} under namespace" @@ -732,15 +1093,62 @@ raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}") - @staticmethod - def _evaluate_custom_trust_level(trust_level_name: str) -> omemo.TrustLevel: + async def _evaluate_custom_trust_level( + self, + device: omemo.DeviceInformation + ) -> omemo.TrustLevel: + # Get the custom trust level try: - return TrustLevel(trust_level_name).to_omemo_trust_level() + trust_level = TrustLevel(device.trust_level_name) except ValueError as e: raise omemo.UnknownTrustLevel( - f"Unknown trust level name {trust_level_name}" + f"Unknown trust level name {device.trust_level_name}" ) from e + # The first three cases are a straight-forward mapping + if trust_level is TrustLevel.TRUSTED: + return omemo.TrustLevel.TRUSTED + if trust_level is TrustLevel.UNDECIDED: + return omemo.TrustLevel.UNDECIDED + if trust_level is TrustLevel.DISTRUSTED: + return omemo.TrustLevel.DISTRUSTED + + # The blindly trusted case is more complicated, since its evaluation depends + # on the trust system and phase + if trust_level is TrustLevel.BLINDLY_TRUSTED: + # Get the name of the active trust system + trust_system = cast(str, sat.memory.getParamA( + PARAM_NAME, + PARAM_CATEGORY, + profile_key=profile + )) + + # If the trust model is BTBV, blind trust is always enabled + if trust_system == "btbv": + return omemo.TrustLevel.TRUSTED + + # If the trust model is ATM, blind trust is disabled in the second phase + # and counts as undecided + if trust_system == "atm": + # Find out whether we are in phase one or two + devices = await self.get_device_information(device.bare_jid) + + phase_one = all(TrustLevel(device.trust_level_name) in { + TrustLevel.UNDECIDED, + TrustLevel.BLINDLY_TRUSTED + } for device in devices) + + if phase_one: + return omemo.TrustLevel.TRUSTED + + return omemo.TrustLevel.UNDECIDED + + raise exceptions.InternalError( + f"Unknown trust system active: {trust_system}" + ) + + assert_never(trust_level) + async def _make_trust_decision( self, undecided: FrozenSet[omemo.DeviceInformation], @@ -754,50 +1162,38 @@ # The feedback JID is transferred via the identifier feedback_jid = jid.JID(identifier).userhostJID() - # Get the name of the trust model to use - trust_model = cast(str, sat.memory.getParamA( - PARAM_NAME, - PARAM_CATEGORY, - profile_key=cast(str, client.profile) - )) - - # Under the BTBV trust model, if at least one device of a bare JID is manually - # trusted or distrusted, the trust model is "downgraded" to manual trust. - # Thus, we can separate bare JIDs into two pools here, one pool of bare JIDs - # for which BTBV is active, and one pool of bare JIDs for which manual trust - # is used. + # Both the ATM and the BTBV trust models work with blind trust before the + # first manual verification is performed. Thus, we can separate bare JIDs into + # two pools here, one pool of bare JIDs for which blind trust is active, and + # one pool of bare JIDs for which manual trust is used instead. bare_jids = { device.bare_jid for device in undecided } - btbv_bare_jids: Set[str] = set() + blind_trust_bare_jids: Set[str] = set() manual_trust_bare_jids: Set[str] = set() - if trust_model == "btbv": - # For each bare JID, decide whether BTBV or manual trust applies - for bare_jid in bare_jids: - # Get all known devices belonging to the bare JID - devices = await self.get_device_information(bare_jid) - - # If the trust levels of all devices correspond to those used by BTBV, - # BTBV applies. Otherwise, fall back to manual trust. - if all(TrustLevel(device.trust_level_name) in { - TrustLevel.UNDECIDED, - TrustLevel.BLINDLY_TRUSTED - } for device in devices): - btbv_bare_jids.add(bare_jid) - else: - manual_trust_bare_jids.add(bare_jid) - - if trust_model == "manual": - manual_trust_bare_jids = bare_jids + # For each bare JID, decide whether blind trust applies + for bare_jid in bare_jids: + # Get all known devices belonging to the bare JID + devices = await self.get_device_information(bare_jid) + + # If the trust levels of all devices correspond to those used by blind + # trust, blind trust applies. Otherwise, fall back to manual trust. + if all(TrustLevel(device.trust_level_name) in { + TrustLevel.UNDECIDED, + TrustLevel.BLINDLY_TRUSTED + } for device in devices): + blind_trust_bare_jids.add(bare_jid) + else: + manual_trust_bare_jids.add(bare_jid) # With the JIDs sorted into their respective pools, the undecided devices can # be categorized too blindly_trusted_devices = \ - { dev for dev in undecided if dev.bare_jid in btbv_bare_jids } + { dev for dev in undecided if dev.bare_jid in blind_trust_bare_jids } manually_trusted_devices = \ { dev for dev in undecided if dev.bare_jid in manual_trust_bare_jids } - # Blindly trust devices handled by BTBV + # Blindly trust devices handled by blind trust if len(blindly_trusted_devices) > 0: for device in blindly_trusted_devices: await self.set_trust( @@ -817,11 +1213,8 @@ feedback_jid, D_( "Not all destination devices are trusted, unknown devices will be" - " blindly trusted due to the Blind Trust Before Verification" - " policy. If you want a more secure workflow, please activate the" - " \"manual\" policy in the settings' \"Security\" tab.\nFollowing" - " devices have been automatically trusted:" - f" {blindly_trusted_devices_stringified}." + " blindly trusted.\nFollowing devices have been automatically" + f" trusted: {blindly_trusted_devices_stringified}." ) ) @@ -854,7 +1247,6 @@ if element is None: raise omemo.UnknownNamespace(f"Unknown namespace: {message.namespace}") - # TODO: Untested message_data = client.generateMessageXML(MessageData({ "from": client.jid, "to": jid.JID(bare_jid), @@ -959,19 +1351,40 @@ trust_ui_result )) + trust_updates: Set[TrustUpdate] = set() + for key, value in data_form_result.items(): if not key.startswith("trust_"): continue device = undecided_ordered[int(key[len("trust_"):])] - trust = C.bool(value) + target_trust = C.bool(value) + trust_level = \ + TrustLevel.TRUSTED if target_trust else TrustLevel.DISTRUSTED await self.set_trust( device.bare_jid, device.identity_key, - TrustLevel.TRUSTED.name if trust else TrustLevel.DISTRUSTED.name + trust_level.name ) + trust_updates.add(TrustUpdate( + target_jid=jid.JID(device.bare_jid).userhostJID(), + target_key=device.identity_key, + target_trust=target_trust + )) + + # Check whether ATM is enabled and handle everything in case it is + trust_system = cast(str, sat.memory.getParamA( + PARAM_NAME, + PARAM_CATEGORY, + profile_key=profile + )) + + if trust_system == "atm": + await manage_trust_message_cache(client, self, frozenset(trust_updates)) + await send_trust_messages(client, self, frozenset(trust_updates)) + return SessionManagerImpl @@ -1086,7 +1499,8 @@ <param name="{PARAM_NAME}" label={quoteattr(D_('OMEMO default trust policy'))} type="list" security="3"> - <option value="manual" label={quoteattr(D_('Manual trust (more secure)'))} /> + <option value="atm" + label={quoteattr(D_('Automatic Trust Management (more secure)'))} /> <option value="btbv" label={quoteattr(D_('Blind Trust Before Verification (more user friendly)'))} selected="true" /> @@ -1103,7 +1517,7 @@ ``urn:xmpp:omemo:2`` namespace and the (legacy) ``eu.siacs.conversations.axolotl`` namespace. Both versions of the protocol are handled by this plugin and compatibility between the two is maintained. MUC messages are supported next to one to one messages. - For trust management, the two trust models "BTBV" and "manual" are supported. + For trust management, the two trust models "ATM" and "BTBV" are supported. """ NS_TWOMEMO = twomemo.twomemo.NAMESPACE NS_OLDMEMO = oldmemo.oldmemo.NAMESPACE @@ -1163,7 +1577,8 @@ self.__session_manager_waiters: Dict[str, List[defer.Deferred]] = {} # These triggers are used by oldmemo, which doesn't do SCE and only applies to - # messages + # messages. Temporarily, until a more fitting trigger for SCE-based encryption is + # added, the messageReceived trigger is also used for twomemo. sat.trigger.add( "messageReceived", self.__message_received_trigger, @@ -1298,7 +1713,7 @@ async def callback( data: Any, - profile: str # pylint: disable=unused-argument + profile: str ) -> Dict[Never, Never]: """ @param data: The XMLUI result produces by the trust UI form. @@ -1314,18 +1729,56 @@ Dict[str, str], xml_tools.XMLUIResult2DataFormResult(data) ) + + trust_updates: Set[TrustUpdate] = set() + for key, value in data_form_result.items(): if not key.startswith("trust_"): continue device = devices[int(key[len("trust_"):])] - trust = TrustLevel(value) - - if TrustLevel(device.trust_level_name) is not trust: + trust_level_name = value + + if device.trust_level_name != trust_level_name: await session_manager.set_trust( device.bare_jid, device.identity_key, - value + trust_level_name + ) + + target_trust: Optional[bool] = None + + if TrustLevel(trust_level_name) is TrustLevel.TRUSTED: + target_trust = True + if TrustLevel(trust_level_name) is TrustLevel.DISTRUSTED: + target_trust = False + + if target_trust is not None: + trust_updates.add(TrustUpdate( + target_jid=jid.JID(device.bare_jid).userhostJID(), + target_key=device.identity_key, + target_trust=target_trust + )) + + # Check whether ATM is enabled and handle everything in case it is + trust_system = cast(str, self.__sat.memory.getParamA( + PARAM_NAME, + PARAM_CATEGORY, + profile_key=profile + )) + + if trust_system == "atm": + if len(trust_updates) > 0: + await manage_trust_message_cache( + client, + session_manager, + frozenset(trust_updates) + ) + + await send_trust_messages( + client, + session_manager, + frozenset(trust_updates) ) return {} @@ -1341,13 +1794,15 @@ # pylint: disable=no-member trust_ui = cast(Any, result) trust_ui.addText(D_( - "This is OMEMO trusting system. You'll see below the devices of your " - "contacts, and a list selection to trust them or not. A trusted device " - "can read your messages in plain text, so be sure to only validate " - "devices that you are sure are belonging to your contact. It's better " - "to do this when you are next to your contact and their device, so " - "you can check the \"fingerprint\" (the number next to the device) " - "yourself. Do *not* validate a device if the fingerprint is wrong!" + "This is OMEMO trusting system. You'll see below the devices of your" + " contacts, and a list selection to trust them or not. A trusted device" + " can read your messages in plain text, so be sure to only validate" + " devices that you are sure are belonging to your contact. It's better" + " to do this when you are next to your contact and their device, so" + " you can check the \"fingerprint\" (the number next to the device)" + " yourself. Do *not* validate a device if the fingerprint is wrong!" + " Note that manually validating a fingerprint disables any form of automatic" + " trust." )) own_device, __ = await session_manager.get_own_device_information() @@ -1491,6 +1946,156 @@ return session_manager + async def __message_received_trigger_atm( + self, + client: SatXMPPClient, + message_elt: domish.Element, + session_manager: omemo.SessionManager, + sender_device_information: omemo.DeviceInformation, + timestamp: datetime + ) -> None: + """Check a newly decrypted message stanza for ATM content and perform ATM in case. + + @param client: The client which received the message. + @param message_elt: The message element. Can be modified. + @param session_manager: The session manager. + @param sender_device_information: Information about the device that sent/encrypted + the message. + @param timestamp: Timestamp extracted from the SCE time affix. + """ + + trust_message_cache = persistent.LazyPersistentBinaryDict( + "XEP-0384/TM", + client.profile + ) + + new_cache_entries: Set[TrustMessageCacheEntry] = set() + + for trust_message_elt in message_elt.elements(NS_TM, "trust-message"): + assert isinstance(trust_message_elt, domish.Element) + + try: + TRUST_MESSAGE_SCHEMA.validate(trust_message_elt.toXml()) + except xmlschema.XMLSchemaValidationError as e: + raise exceptions.ParsingError( + "<trust-message/> element doesn't pass schema validation." + ) from e + + if trust_message_elt["usage"] != NS_ATM: + # Skip non-ATM trust message + continue + + if trust_message_elt["encryption"] != OMEMO.NS_TWOMEMO: + # Skip non-twomemo trust message + continue + + for key_owner_elt in trust_message_elt.elements(NS_TM, "key-owner"): + assert isinstance(key_owner_elt, domish.Element) + + key_owner_jid = jid.JID(key_owner_elt["jid"]).userhostJID() + + for trust_elt in key_owner_elt.elements(NS_TM, "trust"): + assert isinstance(trust_elt, domish.Element) + + new_cache_entries.add(TrustMessageCacheEntry( + sender_jid=jid.JID(sender_device_information.bare_jid), + sender_key=sender_device_information.identity_key, + timestamp=timestamp, + trust_update=TrustUpdate( + target_jid=key_owner_jid, + target_key=base64.b64decode(str(trust_elt)), + target_trust=True + ) + )) + + for distrust_elt in key_owner_elt.elements(NS_TM, "distrust"): + assert isinstance(distrust_elt, domish.Element) + + new_cache_entries.add(TrustMessageCacheEntry( + sender_jid=jid.JID(sender_device_information.bare_jid), + sender_key=sender_device_information.identity_key, + timestamp=timestamp, + trust_update=TrustUpdate( + target_jid=key_owner_jid, + target_key=base64.b64decode(str(distrust_elt)), + target_trust=False + ) + )) + + # Load existing cache entries + existing_cache_entries = cast( + Set[TrustMessageCacheEntry], + await trust_message_cache.get("cache", set()) + ) + + # Discard cache entries by timestamp comparison + existing_by_target = { + ( + cache_entry.trust_update.target_jid.userhostJID(), + cache_entry.trust_update.target_key + ): cache_entry + for cache_entry + in existing_cache_entries + } + + # Iterate over a copy here, such that new_cache_entries can be modified + for new_cache_entry in set(new_cache_entries): + existing_cache_entry = existing_by_target.get( + ( + new_cache_entry.trust_update.target_jid.userhostJID(), + new_cache_entry.trust_update.target_key + ), + None + ) + + if existing_cache_entry is not None: + if existing_cache_entry.timestamp > new_cache_entry.timestamp: + # If the existing cache entry is newer than the new cache entry, + # discard the new one in favor of the existing one + new_cache_entries.remove(new_cache_entry) + else: + # Otherwise, discard the existing cache entry. This includes the case + # when both cache entries have matching timestamps. + existing_cache_entries.remove(existing_cache_entry) + + # If the sending device is trusted, apply the new cache entries + applied_trust_updates: Set[TrustUpdate] = set() + + if TrustLevel(sender_device_information.trust_level_name) is TrustLevel.TRUSTED: + # Iterate over a copy such that new_cache_entries can be modified + for cache_entry in set(new_cache_entries): + trust_update = cache_entry.trust_update + + trust_level = ( + TrustLevel.TRUSTED + if trust_update.target_trust + else TrustLevel.DISTRUSTED + ) + + await session_manager.set_trust( + trust_update.target_jid.userhost(), + trust_update.target_key, + trust_level.name + ) + + applied_trust_updates.add(trust_update) + + new_cache_entries.remove(cache_entry) + + # Store the remaining existing and new cache entries + await trust_message_cache.force( + "cache", + existing_cache_entries | new_cache_entries + ) + + # If the trust of at least one device was modified, run the ATM cache update logic + if len(applied_trust_updates) > 0: + await manage_trust_message_cache( + client, + session_manager, + frozenset(applied_trust_updates) + ) + async def __message_received_trigger( self, client: SatXMPPClient, @@ -1506,6 +2111,7 @@ encrypted. @return: Whether to continue the message received flow. """ + muc_plaintext_cache_key: Optional[MUCPlaintextCacheKey] = None sender_jid = jid.JID(message_elt["from"]) @@ -1569,8 +2175,6 @@ # I'm not sure why this check is required, this code is copied from the old # plugin. if sender_jid.userhostJID() == client.jid.userhostJID(): - # TODO: I've seen this cause an exception "builtins.KeyError: 'to'", seems - # like "to" isn't always set. try: feedback_jid = jid.JID(message_elt["to"]) except KeyError: @@ -1700,6 +2304,8 @@ # No point in further processing this message return False + affix_values: Optional[SCEAffixValues] = None + if message.namespace == twomemo.twomemo.NAMESPACE: if plaintext is not None: # XEP_0420.unpack_stanza handles the whole unpacking, including the @@ -1726,12 +2332,12 @@ ) # No point in further processing this message return False - - if affix_values.timestamp is not None: - # TODO: affix_values.timestamp contains the timestamp included in the - # encrypted element here. The XEP says it SHOULD be displayed with the - # plaintext by clients. - pass + else: + if affix_values.timestamp is not None: + # TODO: affix_values.timestamp contains the timestamp included in + # the encrypted element here. The XEP says it SHOULD be displayed + # with the plaintext by clients. + pass if message.namespace == oldmemo.oldmemo.NAMESPACE: # Remove all body elements from the original element, since those act as @@ -1746,7 +2352,8 @@ # Mark the message as trusted or untrusted. Undecided counts as untrusted here. trust_level = \ - TrustLevel(device_information.trust_level_name).to_omemo_trust_level() + await session_manager._evaluate_custom_trust_level(device_information) + if trust_level is omemo.TrustLevel.TRUSTED: post_treat.addCallback(client.encryption.markAsTrusted) else: @@ -1758,6 +2365,16 @@ namespace=message.namespace ) + # Handle potential ATM trust updates + if affix_values is not None and affix_values.timestamp is not None: + await self.__message_received_trigger_atm( + client, + message_elt, + session_manager, + device_information, + affix_values.timestamp + ) + # Message processed successfully, continue with the flow return True @@ -1769,7 +2386,7 @@ """ # SCE is only applicable to message and IQ stanzas # FIXME: temporary disabling IQ stanza encryption - if stanza.name not in { "message" }: # , "iq" }: + if stanza.name not in { "message" }: # , "iq" }: return True # Get the intended recipient @@ -2019,11 +2636,15 @@ if namespace == twomemo.twomemo.NAMESPACE: # Add the encrypted element - stanza.addChild(xml_tools.et_elt_2_domish_elt(twomemo.etree.serialize_message(message))) + stanza.addChild(xml_tools.et_elt_2_domish_elt( + twomemo.etree.serialize_message(message) + )) if namespace == oldmemo.oldmemo.NAMESPACE: # Add the encrypted element - stanza.addChild(xml_tools.et_elt_2_domish_elt(oldmemo.etree.serialize_message(message))) + stanza.addChild(xml_tools.et_elt_2_domish_elt( + oldmemo.etree.serialize_message(message) + )) if muc_plaintext_cache_key is not None: self.__muc_plaintext_cache[muc_plaintext_cache_key] = plaintext
--- a/sat/tools/xml_tools.py Sun Oct 30 01:06:58 2022 +0200 +++ b/sat/tools/xml_tools.py Fri Oct 28 18:50:06 2022 +0200 @@ -20,7 +20,7 @@ from collections import OrderedDict import html.entities import re -from typing import Optional, Tuple, Union, Literal, overload +from typing import Dict, Optional, Tuple, Union, Literal, overload from xml.dom import NotFoundErr, minidom import xml.etree.ElementTree as ET from lxml import etree @@ -2030,3 +2030,37 @@ for child in elt.elements(): et_elt.append(domish_elt_2_et_elt(child, lxml=lxml)) return et_elt + + +def domish_elt_2_et_elt2(element: domish.Element) -> ET.Element: + """ + WIP, originally from the OMEMO plugin + """ + + element_name = element.name + if element.uri is not None: + element_name = "{" + element.uri + "}" + element_name + + attrib: Dict[str, str] = {} + for qname, value in element.attributes.items(): + attribute_name = qname[1] if isinstance(qname, tuple) else qname + attribute_namespace = qname[0] if isinstance(qname, tuple) else None + if attribute_namespace is not None: + attribute_name = "{" + attribute_namespace + "}" + attribute_name + + attrib[attribute_name] = value + + result = ET.Element(element_name, attrib) + + last_child: Optional[ET.Element] = None + for child in element.children: + if isinstance(child, str): + if last_child is None: + result.text = child + else: + last_child.tail = child + else: + last_child = domish_elt_2_et_elt2(child) + result.append(last_child) + + return result
--- a/setup.py Sun Oct 30 01:06:58 2022 +0200 +++ b/setup.py Fri Oct 28 18:50:06 2022 +0200 @@ -52,8 +52,8 @@ 'urwid-satext == 0.9.*', 'wokkel >= 18.0.0, < 19.0.0', 'omemo >= 1.0.0, < 2', - 'twomemo[xml] >= 1.0.0, < 2', - 'oldmemo[xml] >= 1.0.0, < 2', + 'twomemo >= 1.0.0, < 2', + 'oldmemo >= 1.0.0, < 2', 'pyyaml < 7.0.0', 'sqlalchemy >= 1.4', 'alembic',