Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_xep_0384.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_xep_0384.py@c23cad65ae99 |
children | 040095a5dc7f |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_xep_0384.py Fri Jun 02 11:49:51 2023 +0200 @@ -0,0 +1,2724 @@ +#!/usr/bin/env python3 + +# Libervia plugin for OMEMO encryption +# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import base64 +from datetime import datetime +import enum +import logging +import time +from typing import \ + Any, Dict, FrozenSet, List, Literal, NamedTuple, Optional, Set, Type, Union, cast +import uuid +import xml.etree.ElementTree as ET +from xml.sax.saxutils import quoteattr + +from typing_extensions import Final, Never, assert_never +from wokkel import muc, pubsub # type: ignore[import] +import xmlschema + +from libervia.backend.core import exceptions +from libervia.backend.core.constants import Const as C +from libervia.backend.core.core_types import MessageData, SatXMPPEntity +from libervia.backend.core.i18n import _, D_ +from libervia.backend.core.log import getLogger, Logger +from libervia.backend.core.sat_main import SAT +from libervia.backend.core.xmpp import SatXMPPClient +from libervia.backend.memory import persistent +from libervia.backend.plugins.plugin_misc_text_commands import TextCommands +from libervia.backend.plugins.plugin_xep_0045 import XEP_0045 +from libervia.backend.plugins.plugin_xep_0060 import XEP_0060 +from libervia.backend.plugins.plugin_xep_0163 import XEP_0163 +from libervia.backend.plugins.plugin_xep_0334 import XEP_0334 +from libervia.backend.plugins.plugin_xep_0359 import XEP_0359 +from libervia.backend.plugins.plugin_xep_0420 import \ + XEP_0420, SCEAffixPolicy, SCEAffixValues, SCEProfile +from libervia.backend.tools import xml_tools +from twisted.internet import defer +from twisted.words.protocols.jabber import error, jid +from twisted.words.xish import domish + +try: + import omemo + import omemo.identity_key_pair + import twomemo + import twomemo.etree + import oldmemo + import oldmemo.etree + import oldmemo.migrations + from xmlschema import XMLSchemaValidationError + + # An explicit version check of the OMEMO libraries should not be required here, since + # the stored data is fully versioned and the library will complain if a downgrade is + # attempted. +except ImportError as import_error: + raise exceptions.MissingModule( + "You are missing one or more package required by the OMEMO plugin. Please" + " download/install the pip packages 'omemo', 'twomemo', 'oldmemo' and" + f" 'xmlschema'.\nexception: {import_error}" + ) from import_error + + +__all__ = [ # pylint: disable=unused-variable + "PLUGIN_INFO", + "OMEMO" +] + +log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call] + + +PLUGIN_INFO = { + C.PI_NAME: "OMEMO", + C.PI_IMPORT_NAME: "XEP-0384", + C.PI_TYPE: "SEC", + C.PI_PROTOCOLS: [ "XEP-0384" ], + C.PI_DEPENDENCIES: [ "XEP-0163", "XEP-0280", "XEP-0334", "XEP-0060", "XEP-0420" ], + C.PI_RECOMMENDATIONS: [ "XEP-0045", "XEP-0359", C.TEXT_CMDS ], + C.PI_MAIN: "OMEMO", + C.PI_HANDLER: "no", + C.PI_DESCRIPTION: _("""Implementation of OMEMO"""), +} + + +PARAM_CATEGORY = "Security" +PARAM_NAME = "omemo_policy" + + +class LogHandler(logging.Handler): + """ + Redirect python-omemo's log output to Libervia's log system. + """ + + def emit(self, record: logging.LogRecord) -> None: + log.log(record.levelname, record.getMessage()) + + +sm_logger = logging.getLogger(omemo.SessionManager.LOG_TAG) +sm_logger.setLevel(logging.DEBUG) +sm_logger.propagate = False +sm_logger.addHandler(LogHandler()) + + +ikp_logger = logging.getLogger(omemo.identity_key_pair.IdentityKeyPair.LOG_TAG) +ikp_logger.setLevel(logging.DEBUG) +ikp_logger.propagate = False +ikp_logger.addHandler(LogHandler()) + + +# TODO: Add handling for device labels, i.e. show device labels in the trust UI and give +# the user a way to change their own device label. + + +class MUCPlaintextCacheKey(NamedTuple): + # pylint: disable=invalid-name + """ + Structure identifying an encrypted message sent to a MUC. + """ + + client: SatXMPPClient + room_jid: jid.JID + message_uid: str + + +@enum.unique +class TrustLevel(enum.Enum): + """ + The trust levels required for ATM and BTBV. + """ + + TRUSTED: str = "TRUSTED" + BLINDLY_TRUSTED: str = "BLINDLY_TRUSTED" + UNDECIDED: str = "UNDECIDED" + DISTRUSTED: str = "DISTRUSTED" + + +TWOMEMO_DEVICE_LIST_NODE = "urn:xmpp:omemo:2:devices" +OLDMEMO_DEVICE_LIST_NODE = "eu.siacs.conversations.axolotl.devicelist" + + +class StorageImpl(omemo.Storage): + """ + Storage implementation for OMEMO based on :class:`persistent.LazyPersistentBinaryDict` + """ + + def __init__(self, profile: str) -> None: + """ + @param profile: The profile this OMEMO data belongs to. + """ + + # persistent.LazyPersistentBinaryDict does not cache at all, so keep the caching + # option of omemo.Storage enabled. + super().__init__() + + self.__storage = persistent.LazyPersistentBinaryDict("XEP-0384", profile) + + async def _load(self, key: str) -> omemo.Maybe[omemo.JSONType]: + try: + return omemo.Just(await self.__storage[key]) + except KeyError: + return omemo.Nothing() + except Exception as e: + raise omemo.StorageException(f"Error while loading key {key}") from e + + async def _store(self, key: str, value: omemo.JSONType) -> None: + try: + await self.__storage.force(key, value) + except Exception as e: + raise omemo.StorageException(f"Error while storing key {key}: {value}") from e + + async def _delete(self, key: str) -> None: + try: + await self.__storage.remove(key) + except KeyError: + pass + except Exception as e: + raise omemo.StorageException(f"Error while deleting key {key}") from e + + +class LegacyStorageImpl(oldmemo.migrations.LegacyStorage): + """ + Legacy storage implementation to migrate data from the old XEP-0384 plugin. + """ + + KEY_DEVICE_ID = "DEVICE_ID" + KEY_STATE = "STATE" + KEY_SESSION = "SESSION" + KEY_ACTIVE_DEVICES = "DEVICES" + KEY_INACTIVE_DEVICES = "INACTIVE_DEVICES" + KEY_TRUST = "TRUST" + KEY_ALL_JIDS = "ALL_JIDS" + + def __init__(self, profile: str, own_bare_jid: str) -> None: + """ + @param profile: The profile this OMEMO data belongs to. + @param own_bare_jid: The own bare JID, to return by the :meth:`load_own_data` call. + """ + + self.__storage = persistent.LazyPersistentBinaryDict("XEP-0384", profile) + self.__own_bare_jid = own_bare_jid + + async def loadOwnData(self) -> Optional[oldmemo.migrations.OwnData]: + own_device_id = await self.__storage.get(LegacyStorageImpl.KEY_DEVICE_ID, None) + if own_device_id is None: + return None + + return oldmemo.migrations.OwnData( + own_bare_jid=self.__own_bare_jid, + own_device_id=own_device_id + ) + + async def deleteOwnData(self) -> None: + try: + await self.__storage.remove(LegacyStorageImpl.KEY_DEVICE_ID) + except KeyError: + pass + + async def loadState(self) -> Optional[oldmemo.migrations.State]: + return cast( + Optional[oldmemo.migrations.State], + await self.__storage.get(LegacyStorageImpl.KEY_STATE, None) + ) + + async def deleteState(self) -> None: + try: + await self.__storage.remove(LegacyStorageImpl.KEY_STATE) + except KeyError: + pass + + async def loadSession( + self, + bare_jid: str, + device_id: int + ) -> Optional[oldmemo.migrations.Session]: + key = "\n".join([ LegacyStorageImpl.KEY_SESSION, bare_jid, str(device_id) ]) + + return cast( + Optional[oldmemo.migrations.Session], + await self.__storage.get(key, None) + ) + + async def deleteSession(self, bare_jid: str, device_id: int) -> None: + key = "\n".join([ LegacyStorageImpl.KEY_SESSION, bare_jid, str(device_id) ]) + + try: + await self.__storage.remove(key) + except KeyError: + pass + + async def loadActiveDevices(self, bare_jid: str) -> Optional[List[int]]: + key = "\n".join([ LegacyStorageImpl.KEY_ACTIVE_DEVICES, bare_jid ]) + + return cast( + Optional[List[int]], + await self.__storage.get(key, None) + ) + + async def loadInactiveDevices(self, bare_jid: str) -> Optional[Dict[int, int]]: + key = "\n".join([ LegacyStorageImpl.KEY_INACTIVE_DEVICES, bare_jid ]) + + return cast( + Optional[Dict[int, int]], + await self.__storage.get(key, None) + ) + + async def deleteActiveDevices(self, bare_jid: str) -> None: + key = "\n".join([ LegacyStorageImpl.KEY_ACTIVE_DEVICES, bare_jid ]) + + try: + await self.__storage.remove(key) + except KeyError: + pass + + async def deleteInactiveDevices(self, bare_jid: str) -> None: + key = "\n".join([ LegacyStorageImpl.KEY_INACTIVE_DEVICES, bare_jid ]) + + try: + await self.__storage.remove(key) + except KeyError: + pass + + async def loadTrust( + self, + bare_jid: str, + device_id: int + ) -> Optional[oldmemo.migrations.Trust]: + key = "\n".join([ LegacyStorageImpl.KEY_TRUST, bare_jid, str(device_id) ]) + + return cast( + Optional[oldmemo.migrations.Trust], + await self.__storage.get(key, None) + ) + + async def deleteTrust(self, bare_jid: str, device_id: int) -> None: + key = "\n".join([ LegacyStorageImpl.KEY_TRUST, bare_jid, str(device_id) ]) + + try: + await self.__storage.remove(key) + except KeyError: + pass + + async def listJIDs(self) -> Optional[List[str]]: + bare_jids = await self.__storage.get(LegacyStorageImpl.KEY_ALL_JIDS, None) + + return None if bare_jids is None else list(bare_jids) + + async def deleteJIDList(self) -> None: + try: + await self.__storage.remove(LegacyStorageImpl.KEY_ALL_JIDS) + except KeyError: + pass + + +async def download_oldmemo_bundle( + client: SatXMPPClient, + xep_0060: XEP_0060, + bare_jid: str, + device_id: int +) -> oldmemo.oldmemo.BundleImpl: + """Download the oldmemo bundle corresponding to a specific device. + + @param client: The client. + @param xep_0060: The XEP-0060 plugin instance to use for pubsub interactions. + @param bare_jid: The bare JID the device belongs to. + @param device_id: The id of the device. + @return: The bundle. + @raise BundleDownloadFailed: if the download failed. Feel free to raise a subclass + instead. + """ + # Bundle downloads are needed by the session manager and for migrations from legacy, + # thus it is made a separate function. + + namespace = oldmemo.oldmemo.NAMESPACE + node = f"eu.siacs.conversations.axolotl.bundles:{device_id}" + + try: + items, __ = await xep_0060.get_items(client, jid.JID(bare_jid), node, max_items=1) + except Exception as e: + raise omemo.BundleDownloadFailed( + f"Bundle download failed for {bare_jid}: {device_id} under namespace" + f" {namespace}" + ) from e + + if len(items) != 1: + raise omemo.BundleDownloadFailed( + f"Bundle download failed for {bare_jid}: {device_id} under namespace" + f" {namespace}: Unexpected number of items retrieved: {len(items)}." + ) + + element = \ + next(iter(xml_tools.domish_elt_2_et_elt(cast(domish.Element, items[0]))), None) + if element is None: + raise omemo.BundleDownloadFailed( + f"Bundle download failed for {bare_jid}: {device_id} under namespace" + f" {namespace}: Item download succeeded but parsing failed: {element}." + ) + + try: + return oldmemo.etree.parse_bundle(element, bare_jid, device_id) + except Exception as e: + raise omemo.BundleDownloadFailed( + f"Bundle parsing failed for {bare_jid}: {device_id} under namespace" + f" {namespace}" + ) from e + + +# ATM only supports protocols based on SCE, which is currently only omemo:2, and relies on +# so many implementation details of the encryption protocol that it makes more sense to +# add ATM to the OMEMO plugin directly instead of having it a separate Libervia plugin. +NS_TM: Final = "urn:xmpp:tm:1" +NS_ATM: Final = "urn:xmpp:atm:1" + + +TRUST_MESSAGE_SCHEMA = xmlschema.XMLSchema("""<?xml version='1.0' encoding='UTF-8'?> +<xs:schema xmlns:xs='http://www.w3.org/2001/XMLSchema' + targetNamespace='urn:xmpp:tm:1' + xmlns='urn:xmpp:tm:1' + elementFormDefault='qualified'> + + <xs:element name='trust-message'> + <xs:complexType> + <xs:sequence> + <xs:element ref='key-owner' minOccurs='1' maxOccurs='unbounded'/> + </xs:sequence> + <xs:attribute name='usage' type='xs:string' use='required'/> + <xs:attribute name='encryption' type='xs:string' use='required'/> + </xs:complexType> + </xs:element> + + <xs:element name='key-owner'> + <xs:complexType> + <xs:sequence> + <xs:element + name='trust' type='xs:base64Binary' minOccurs='0' maxOccurs='unbounded'/> + <xs:element + name='distrust' type='xs:base64Binary' minOccurs='0' maxOccurs='unbounded'/> + </xs:sequence> + <xs:attribute name='jid' type='xs:string' use='required'/> + </xs:complexType> + </xs:element> +</xs:schema> +""") + + +# This is compatible with omemo:2's SCE profile +TM_SCE_PROFILE = SCEProfile( + rpad_policy=SCEAffixPolicy.REQUIRED, + time_policy=SCEAffixPolicy.REQUIRED, + to_policy=SCEAffixPolicy.OPTIONAL, + from_policy=SCEAffixPolicy.OPTIONAL, + custom_policies={} +) + + +class TrustUpdate(NamedTuple): + # pylint: disable=invalid-name + """ + An update to the trust status of an identity key, used by Automatic Trust Management. + """ + + target_jid: jid.JID + target_key: bytes + target_trust: bool + + +class TrustMessageCacheEntry(NamedTuple): + # pylint: disable=invalid-name + """ + An entry in the trust message cache used by ATM. + """ + + sender_jid: jid.JID + sender_key: bytes + timestamp: datetime + trust_update: TrustUpdate + + +class PartialTrustMessage(NamedTuple): + # pylint: disable=invalid-name + """ + A structure representing a partial trust message, used by :func:`send_trust_messages` + to build trust messages. + """ + + recipient_jid: jid.JID + updated_jid: jid.JID + trust_updates: FrozenSet[TrustUpdate] + + +async def manage_trust_message_cache( + client: SatXMPPClient, + session_manager: omemo.SessionManager, + applied_trust_updates: FrozenSet[TrustUpdate] +) -> None: + """Manage the ATM trust message cache after trust updates have been applied. + + @param client: The client this operation runs under. + @param session_manager: The session manager to use. + @param applied_trust_updates: The trust updates that have already been applied, + triggering this cache management run. + """ + + trust_message_cache = persistent.LazyPersistentBinaryDict( + "XEP-0384/TM", + client.profile + ) + + # Load cache entries + cache_entries = cast( + Set[TrustMessageCacheEntry], + await trust_message_cache.get("cache", set()) + ) + + # Expire cache entries that were overwritten by the applied trust updates + cache_entries_by_target = { + ( + cache_entry.trust_update.target_jid.userhostJID(), + cache_entry.trust_update.target_key + ): cache_entry + for cache_entry + in cache_entries + } + + for trust_update in applied_trust_updates: + cache_entry = cache_entries_by_target.get( + (trust_update.target_jid.userhostJID(), trust_update.target_key), + None + ) + + if cache_entry is not None: + cache_entries.remove(cache_entry) + + # Apply cached Trust Messages by newly trusted devices + new_trust_updates: Set[TrustUpdate] = set() + + for trust_update in applied_trust_updates: + if trust_update.target_trust: + # Iterate over a copy such that cache_entries can be modified + for cache_entry in set(cache_entries): + if ( + cache_entry.sender_jid.userhostJID() + == trust_update.target_jid.userhostJID() + and cache_entry.sender_key == trust_update.target_key + ): + trust_level = ( + TrustLevel.TRUSTED + if cache_entry.trust_update.target_trust + else TrustLevel.DISTRUSTED + ) + + # Apply the trust update + await session_manager.set_trust( + cache_entry.trust_update.target_jid.userhost(), + cache_entry.trust_update.target_key, + trust_level.name + ) + + # Track the fact that this trust update has been applied + new_trust_updates.add(cache_entry.trust_update) + + # Remove the corresponding cache entry + cache_entries.remove(cache_entry) + + # Store the updated cache entries + await trust_message_cache.force("cache", cache_entries) + + # TODO: Notify the user ("feedback") about automatically updated trust? + + if len(new_trust_updates) > 0: + # If any trust has been updated, recursively perform another run of cache + # management + await manage_trust_message_cache( + client, + session_manager, + frozenset(new_trust_updates) + ) + + +async def get_trust_as_trust_updates( + session_manager: omemo.SessionManager, + target_jid: jid.JID +) -> FrozenSet[TrustUpdate]: + """Get the trust status of all known keys of a JID as trust updates for use with ATM. + + @param session_manager: The session manager to load the trust from. + @param target_jid: The JID to load the trust for. + @return: The trust updates encoding the trust status of all known keys of the JID that + are either explicitly trusted or distrusted. Undecided keys are not included in + the trust updates. + """ + + devices = await session_manager.get_device_information(target_jid.userhost()) + + trust_updates: Set[TrustUpdate] = set() + + for device in devices: + trust_level = TrustLevel(device.trust_level_name) + target_trust: bool + + if trust_level is TrustLevel.TRUSTED: + target_trust = True + elif trust_level is TrustLevel.DISTRUSTED: + target_trust = False + else: + # Skip devices that are not explicitly trusted or distrusted + continue + + trust_updates.add(TrustUpdate( + target_jid=target_jid.userhostJID(), + target_key=device.identity_key, + target_trust=target_trust + )) + + return frozenset(trust_updates) + + +async def send_trust_messages( + client: SatXMPPClient, + session_manager: omemo.SessionManager, + applied_trust_updates: FrozenSet[TrustUpdate] +) -> None: + """Send information about updated trust to peers via ATM (XEP-0450). + + @param client: The client. + @param session_manager: The session manager. + @param applied_trust_updates: The trust updates that have already been applied, to + notify other peers about. + """ + # NOTE: This currently sends information about oldmemo trust too. This is not + # specified and experimental, but since twomemo and oldmemo share the same identity + # keys and trust systems, this could be a cool side effect. + + # Send Trust Messages for newly trusted and distrusted devices + own_jid = client.jid.userhostJID() + own_trust_updates = await get_trust_as_trust_updates(session_manager, own_jid) + + # JIDs of which at least one device's trust has been updated + updated_jids = frozenset({ + trust_update.target_jid.userhostJID() + for trust_update + in applied_trust_updates + }) + + trust_messages: Set[PartialTrustMessage] = set() + + for updated_jid in updated_jids: + # Get the trust updates for that JID + trust_updates = frozenset({ + trust_update for trust_update in applied_trust_updates + if trust_update.target_jid.userhostJID() == updated_jid + }) + + if updated_jid == own_jid: + # If the own JID is updated, _all_ peers have to be notified + # TODO: Using my author's privilege here to shamelessly access private fields + # and storage keys until I've added public API to get a list of peers to + # python-omemo. + storage: omemo.Storage = getattr(session_manager, "_SessionManager__storage") + peer_jids = frozenset({ + jid.JID(bare_jid).userhostJID() for bare_jid in (await storage.load_list( + f"/{OMEMO.NS_TWOMEMO}/bare_jids", + str + )).maybe([]) + }) + + if len(peer_jids) == 0: + # If there are no peers to notify, notify our other devices about the + # changes directly + trust_messages.add(PartialTrustMessage( + recipient_jid=own_jid, + updated_jid=own_jid, + trust_updates=trust_updates + )) + else: + # Otherwise, notify all peers about the changes in trust and let carbons + # handle the copy to our own JID + for peer_jid in peer_jids: + trust_messages.add(PartialTrustMessage( + recipient_jid=peer_jid, + updated_jid=own_jid, + trust_updates=trust_updates + )) + + # Also send full trust information about _every_ peer to our newly + # trusted devices + peer_trust_updates = \ + await get_trust_as_trust_updates(session_manager, peer_jid) + + trust_messages.add(PartialTrustMessage( + recipient_jid=own_jid, + updated_jid=peer_jid, + trust_updates=peer_trust_updates + )) + + # Send information about our own devices to our newly trusted devices + trust_messages.add(PartialTrustMessage( + recipient_jid=own_jid, + updated_jid=own_jid, + trust_updates=own_trust_updates + )) + else: + # Notify our other devices about the changes in trust + trust_messages.add(PartialTrustMessage( + recipient_jid=own_jid, + updated_jid=updated_jid, + trust_updates=trust_updates + )) + + # Send a summary of our own trust to newly trusted devices + trust_messages.add(PartialTrustMessage( + recipient_jid=updated_jid, + updated_jid=own_jid, + trust_updates=own_trust_updates + )) + + # All trust messages prepared. Merge all trust messages directed at the same + # recipient. + recipient_jids = { trust_message.recipient_jid for trust_message in trust_messages } + + for recipient_jid in recipient_jids: + updated: Dict[jid.JID, Set[TrustUpdate]] = {} + + for trust_message in trust_messages: + # Merge trust messages directed at that recipient + if trust_message.recipient_jid == recipient_jid: + # Merge the trust updates + updated[trust_message.updated_jid] = \ + updated.get(trust_message.updated_jid, set()) + + updated[trust_message.updated_jid] |= trust_message.trust_updates + + # Build the trust message + trust_message_elt = domish.Element((NS_TM, "trust-message")) + trust_message_elt["usage"] = NS_ATM + trust_message_elt["encryption"] = twomemo.twomemo.NAMESPACE + + for updated_jid, trust_updates in updated.items(): + key_owner_elt = trust_message_elt.addElement((NS_TM, "key-owner")) + key_owner_elt["jid"] = updated_jid.userhost() + + for trust_update in trust_updates: + serialized_identity_key = \ + base64.b64encode(trust_update.target_key).decode("ASCII") + + if trust_update.target_trust: + key_owner_elt.addElement( + (NS_TM, "trust"), + content=serialized_identity_key + ) + else: + key_owner_elt.addElement( + (NS_TM, "distrust"), + content=serialized_identity_key + ) + + # Finally, encrypt and send the trust message! + message_data = client.generate_message_xml(MessageData({ + "from": own_jid, + "to": recipient_jid, + "uid": str(uuid.uuid4()), + "message": {}, + "subject": {}, + "type": C.MESS_TYPE_CHAT, + "extra": {}, + "timestamp": time.time() + })) + + message_data["xml"].addChild(trust_message_elt) + + plaintext = XEP_0420.pack_stanza(TM_SCE_PROFILE, message_data["xml"]) + + feedback_jid = recipient_jid + + # TODO: The following is mostly duplicate code + try: + messages, encryption_errors = await session_manager.encrypt( + frozenset({ own_jid.userhost(), recipient_jid.userhost() }), + { OMEMO.NS_TWOMEMO: plaintext }, + backend_priority_order=[ OMEMO.NS_TWOMEMO ], + identifier=feedback_jid.userhost() + ) + except Exception as e: + msg = _( + # pylint: disable=consider-using-f-string + "Can't encrypt message for {entities}: {reason}".format( + entities=', '.join({ own_jid.userhost(), recipient_jid.userhost() }), + reason=e + ) + ) + log.warning(msg) + client.feedback(feedback_jid, msg, { + C.MESS_EXTRA_INFO: C.EXTRA_INFO_ENCR_ERR + }) + raise e + + if len(encryption_errors) > 0: + log.warning( + f"Ignored the following non-critical encryption errors:" + f" {encryption_errors}" + ) + + encrypted_errors_stringified = ", ".join([ + f"device {err.device_id} of {err.bare_jid} under namespace" + f" {err.namespace}" + for err + in encryption_errors + ]) + + client.feedback( + feedback_jid, + D_( + "There were non-critical errors during encryption resulting in some" + " of your destinees' devices potentially not receiving the message." + " This happens when the encryption data/key material of a device is" + " incomplete or broken, which shouldn't happen for actively used" + " devices, and can usually be ignored. The following devices are" + f" affected: {encrypted_errors_stringified}." + ) + ) + + message = next( + message for message in messages + if message.namespace == OMEMO.NS_TWOMEMO + ) + + # Add the encrypted element + message_data["xml"].addChild(xml_tools.et_elt_2_domish_elt( + twomemo.etree.serialize_message(message) + )) + + await client.a_send(message_data["xml"]) + + +def make_session_manager(sat: SAT, profile: str) -> Type[omemo.SessionManager]: + """ + @param sat: The SAT instance. + @param profile: The profile. + @return: A non-abstract subclass of :class:`~omemo.session_manager.SessionManager` + with XMPP interactions and trust handled via the SAT instance. + """ + + client = sat.get_client(profile) + xep_0060 = cast(XEP_0060, sat.plugins["XEP-0060"]) + + class SessionManagerImpl(omemo.SessionManager): + """ + Session manager implementation handling XMPP interactions and trust via an + instance of :class:`~sat.core.sat_main.SAT`. + """ + + @staticmethod + async def _upload_bundle(bundle: omemo.Bundle) -> None: + if isinstance(bundle, twomemo.twomemo.BundleImpl): + element = twomemo.etree.serialize_bundle(bundle) + + node = "urn:xmpp:omemo:2:bundles" + try: + await xep_0060.send_item( + client, + client.jid.userhostJID(), + node, + xml_tools.et_elt_2_domish_elt(element), + item_id=str(bundle.device_id), + extra={ + XEP_0060.EXTRA_PUBLISH_OPTIONS: { + XEP_0060.OPT_MAX_ITEMS: "max" + }, + XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "raise" + } + ) + except (error.StanzaError, Exception) as e: + if ( + isinstance(e, error.StanzaError) + and e.condition == "conflict" + and e.appCondition is not None + # pylint: disable=no-member + and e.appCondition.name == "precondition-not-met" + ): + # publish options couldn't be set on the fly, manually reconfigure + # the node and publish again + raise omemo.BundleUploadFailed( + f"precondition-not-met: {bundle}" + ) from e + # TODO: What can I do here? The correct node configuration is a + # MUST in the XEP. + + raise omemo.BundleUploadFailed( + f"Bundle upload failed: {bundle}" + ) from e + + return + + if isinstance(bundle, oldmemo.oldmemo.BundleImpl): + element = oldmemo.etree.serialize_bundle(bundle) + + node = f"eu.siacs.conversations.axolotl.bundles:{bundle.device_id}" + try: + await xep_0060.send_item( + client, + client.jid.userhostJID(), + node, + xml_tools.et_elt_2_domish_elt(element), + item_id=xep_0060.ID_SINGLETON, + extra={ + XEP_0060.EXTRA_PUBLISH_OPTIONS: { XEP_0060.OPT_MAX_ITEMS: 1 }, + XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options" + } + ) + except Exception as e: + raise omemo.BundleUploadFailed( + f"Bundle upload failed: {bundle}" + ) from e + + return + + raise omemo.UnknownNamespace(f"Unknown namespace: {bundle.namespace}") + + @staticmethod + async def _download_bundle( + namespace: str, + bare_jid: str, + device_id: int + ) -> omemo.Bundle: + if namespace == twomemo.twomemo.NAMESPACE: + node = "urn:xmpp:omemo:2:bundles" + + try: + items, __ = await xep_0060.get_items( + client, + jid.JID(bare_jid), + node, + item_ids=[ str(device_id) ] + ) + except Exception as e: + raise omemo.BundleDownloadFailed( + f"Bundle download failed for {bare_jid}: {device_id} under" + f" namespace {namespace}" + ) from e + + if len(items) != 1: + raise omemo.BundleDownloadFailed( + f"Bundle download failed for {bare_jid}: {device_id} under" + f" namespace {namespace}: Unexpected number of items retrieved:" + f" {len(items)}." + ) + + element = next( + iter(xml_tools.domish_elt_2_et_elt(cast(domish.Element, items[0]))), + None + ) + if element is None: + raise omemo.BundleDownloadFailed( + f"Bundle download failed for {bare_jid}: {device_id} under" + f" namespace {namespace}: Item download succeeded but parsing" + f" failed: {element}." + ) + + try: + return twomemo.etree.parse_bundle(element, bare_jid, device_id) + except Exception as e: + raise omemo.BundleDownloadFailed( + f"Bundle parsing failed for {bare_jid}: {device_id} under" + f" namespace {namespace}" + ) from e + + if namespace == oldmemo.oldmemo.NAMESPACE: + return await download_oldmemo_bundle( + client, + xep_0060, + bare_jid, + device_id + ) + + raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}") + + @staticmethod + async def _delete_bundle(namespace: str, device_id: int) -> None: + if namespace == twomemo.twomemo.NAMESPACE: + node = "urn:xmpp:omemo:2:bundles" + + try: + await xep_0060.retract_items( + client, + client.jid.userhostJID(), + node, + [ str(device_id) ], + notify=False + ) + except Exception as e: + raise omemo.BundleDeletionFailed( + f"Bundle deletion failed for {device_id} under namespace" + f" {namespace}" + ) from e + + return + + if namespace == oldmemo.oldmemo.NAMESPACE: + node = f"eu.siacs.conversations.axolotl.bundles:{device_id}" + + try: + await xep_0060.deleteNode(client, client.jid.userhostJID(), node) + except Exception as e: + raise omemo.BundleDeletionFailed( + f"Bundle deletion failed for {device_id} under namespace" + f" {namespace}" + ) from e + + return + + raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}") + + @staticmethod + async def _upload_device_list( + namespace: str, + device_list: Dict[int, Optional[str]] + ) -> None: + element: Optional[ET.Element] = None + node: Optional[str] = None + + if namespace == twomemo.twomemo.NAMESPACE: + element = twomemo.etree.serialize_device_list(device_list) + node = TWOMEMO_DEVICE_LIST_NODE + if namespace == oldmemo.oldmemo.NAMESPACE: + element = oldmemo.etree.serialize_device_list(device_list) + node = OLDMEMO_DEVICE_LIST_NODE + + if element is None or node is None: + raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}") + + try: + await xep_0060.send_item( + client, + client.jid.userhostJID(), + node, + xml_tools.et_elt_2_domish_elt(element), + item_id=xep_0060.ID_SINGLETON, + extra={ + XEP_0060.EXTRA_PUBLISH_OPTIONS: { + XEP_0060.OPT_MAX_ITEMS: 1, + XEP_0060.OPT_ACCESS_MODEL: "open" + }, + XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "raise" + } + ) + except (error.StanzaError, Exception) as e: + if ( + isinstance(e, error.StanzaError) + and e.condition == "conflict" + and e.appCondition is not None + # pylint: disable=no-member + and e.appCondition.name == "precondition-not-met" + ): + # publish options couldn't be set on the fly, manually reconfigure the + # node and publish again + raise omemo.DeviceListUploadFailed( + f"precondition-not-met for namespace {namespace}" + ) from e + # TODO: What can I do here? The correct node configuration is a MUST + # in the XEP. + + raise omemo.DeviceListUploadFailed( + f"Device list upload failed for namespace {namespace}" + ) from e + + @staticmethod + async def _download_device_list( + namespace: str, + bare_jid: str + ) -> Dict[int, Optional[str]]: + node: Optional[str] = None + + if namespace == twomemo.twomemo.NAMESPACE: + node = TWOMEMO_DEVICE_LIST_NODE + if namespace == oldmemo.oldmemo.NAMESPACE: + node = OLDMEMO_DEVICE_LIST_NODE + + if node is None: + raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}") + + try: + items, __ = await xep_0060.get_items(client, jid.JID(bare_jid), node) + except exceptions.NotFound: + return {} + except Exception as e: + raise omemo.DeviceListDownloadFailed( + f"Device list download failed for {bare_jid} under namespace" + f" {namespace}" + ) from e + + if len(items) == 0: + return {} + + if len(items) != 1: + raise omemo.DeviceListDownloadFailed( + f"Device list download failed for {bare_jid} under namespace" + f" {namespace}: Unexpected number of items retrieved: {len(items)}." + ) + + element = next( + iter(xml_tools.domish_elt_2_et_elt(cast(domish.Element, items[0]))), + None + ) + + if element is None: + raise omemo.DeviceListDownloadFailed( + f"Device list download failed for {bare_jid} under namespace" + f" {namespace}: Item download succeeded but parsing failed:" + f" {element}." + ) + + try: + if namespace == twomemo.twomemo.NAMESPACE: + return twomemo.etree.parse_device_list(element) + if namespace == oldmemo.oldmemo.NAMESPACE: + return oldmemo.etree.parse_device_list(element) + except Exception as e: + raise omemo.DeviceListDownloadFailed( + f"Device list download failed for {bare_jid} under namespace" + f" {namespace}" + ) from e + + raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}") + + async def _evaluate_custom_trust_level( + self, + device: omemo.DeviceInformation + ) -> omemo.TrustLevel: + # Get the custom trust level + try: + trust_level = TrustLevel(device.trust_level_name) + except ValueError as e: + raise omemo.UnknownTrustLevel( + f"Unknown trust level name {device.trust_level_name}" + ) from e + + # The first three cases are a straight-forward mapping + if trust_level is TrustLevel.TRUSTED: + return omemo.TrustLevel.TRUSTED + if trust_level is TrustLevel.UNDECIDED: + return omemo.TrustLevel.UNDECIDED + if trust_level is TrustLevel.DISTRUSTED: + return omemo.TrustLevel.DISTRUSTED + + # The blindly trusted case is more complicated, since its evaluation depends + # on the trust system and phase + if trust_level is TrustLevel.BLINDLY_TRUSTED: + # Get the name of the active trust system + trust_system = cast(str, sat.memory.param_get_a( + PARAM_NAME, + PARAM_CATEGORY, + profile_key=profile + )) + + # If the trust model is BTBV, blind trust is always enabled + if trust_system == "btbv": + return omemo.TrustLevel.TRUSTED + + # If the trust model is ATM, blind trust is disabled in the second phase + # and counts as undecided + if trust_system == "atm": + # Find out whether we are in phase one or two + devices = await self.get_device_information(device.bare_jid) + + phase_one = all(TrustLevel(device.trust_level_name) in { + TrustLevel.UNDECIDED, + TrustLevel.BLINDLY_TRUSTED + } for device in devices) + + if phase_one: + return omemo.TrustLevel.TRUSTED + + return omemo.TrustLevel.UNDECIDED + + raise exceptions.InternalError( + f"Unknown trust system active: {trust_system}" + ) + + assert_never(trust_level) + + async def _make_trust_decision( + self, + undecided: FrozenSet[omemo.DeviceInformation], + identifier: Optional[str] + ) -> None: + if identifier is None: + raise omemo.TrustDecisionFailed( + "The identifier must contain the feedback JID." + ) + + # The feedback JID is transferred via the identifier + feedback_jid = jid.JID(identifier).userhostJID() + + # Both the ATM and the BTBV trust models work with blind trust before the + # first manual verification is performed. Thus, we can separate bare JIDs into + # two pools here, one pool of bare JIDs for which blind trust is active, and + # one pool of bare JIDs for which manual trust is used instead. + bare_jids = { device.bare_jid for device in undecided } + + blind_trust_bare_jids: Set[str] = set() + manual_trust_bare_jids: Set[str] = set() + + # For each bare JID, decide whether blind trust applies + for bare_jid in bare_jids: + # Get all known devices belonging to the bare JID + devices = await self.get_device_information(bare_jid) + + # If the trust levels of all devices correspond to those used by blind + # trust, blind trust applies. Otherwise, fall back to manual trust. + if all(TrustLevel(device.trust_level_name) in { + TrustLevel.UNDECIDED, + TrustLevel.BLINDLY_TRUSTED + } for device in devices): + blind_trust_bare_jids.add(bare_jid) + else: + manual_trust_bare_jids.add(bare_jid) + + # With the JIDs sorted into their respective pools, the undecided devices can + # be categorized too + blindly_trusted_devices = \ + { dev for dev in undecided if dev.bare_jid in blind_trust_bare_jids } + manually_trusted_devices = \ + { dev for dev in undecided if dev.bare_jid in manual_trust_bare_jids } + + # Blindly trust devices handled by blind trust + if len(blindly_trusted_devices) > 0: + for device in blindly_trusted_devices: + await self.set_trust( + device.bare_jid, + device.identity_key, + TrustLevel.BLINDLY_TRUSTED.name + ) + + blindly_trusted_devices_stringified = ", ".join([ + f"device {device.device_id} of {device.bare_jid} under namespace" + f" {device.namespaces}" + for device + in blindly_trusted_devices + ]) + + client.feedback( + feedback_jid, + D_( + "Not all destination devices are trusted, unknown devices will be" + " blindly trusted.\nFollowing devices have been automatically" + f" trusted: {blindly_trusted_devices_stringified}." + ) + ) + + # Prompt the user for manual trust decisions on the devices handled by manual + # trust + if len(manually_trusted_devices) > 0: + client.feedback( + feedback_jid, + D_( + "Not all destination devices are trusted, we can't encrypt" + " message in such a situation. Please indicate if you trust" + " those devices or not in the trust manager before we can" + " send this message." + ) + ) + await self.__prompt_manual_trust( + frozenset(manually_trusted_devices), + feedback_jid + ) + + @staticmethod + async def _send_message(message: omemo.Message, bare_jid: str) -> None: + element: Optional[ET.Element] = None + + if message.namespace == twomemo.twomemo.NAMESPACE: + element = twomemo.etree.serialize_message(message) + if message.namespace == oldmemo.oldmemo.NAMESPACE: + element = oldmemo.etree.serialize_message(message) + + if element is None: + raise omemo.UnknownNamespace(f"Unknown namespace: {message.namespace}") + + message_data = client.generate_message_xml(MessageData({ + "from": client.jid, + "to": jid.JID(bare_jid), + "uid": str(uuid.uuid4()), + "message": {}, + "subject": {}, + "type": C.MESS_TYPE_CHAT, + "extra": {}, + "timestamp": time.time() + })) + + message_data["xml"].addChild(xml_tools.et_elt_2_domish_elt(element)) + + try: + await client.a_send(message_data["xml"]) + except Exception as e: + raise omemo.MessageSendingFailed() from e + + async def __prompt_manual_trust( + self, + undecided: FrozenSet[omemo.DeviceInformation], + feedback_jid: jid.JID + ) -> None: + """Asks the user to decide on the manual trust level of a set of devices. + + Blocks until the user has made a decision and updates the trust levels of all + devices using :meth:`set_trust`. + + @param undecided: The set of devices to prompt manual trust for. + @param feedback_jid: The bare JID to redirect feedback to. In case of a one to + one message, the recipient JID. In case of a MUC message, the room JID. + @raise TrustDecisionFailed: if the user cancels the prompt. + """ + + # This session manager handles encryption with both twomemo and oldmemo, but + # both are currently registered as different plugins and the `defer_xmlui` + # below requires a single namespace identifying the encryption plugin. Thus, + # get the namespace of the requested encryption method from the encryption + # session using the feedback JID. + encryption = client.encryption.getSession(feedback_jid) + if encryption is None: + raise omemo.TrustDecisionFailed( + f"Encryption not requested for {feedback_jid.userhost()}." + ) + + namespace = encryption["plugin"].namespace + + # Casting this to Any, otherwise all calls on the variable cause type errors + # pylint: disable=no-member + trust_ui = cast(Any, xml_tools.XMLUI( + panel_type=C.XMLUI_FORM, + title=D_("OMEMO trust management"), + submit_id="" + )) + trust_ui.addText(D_( + "This is OMEMO trusting system. You'll see below the devices of your " + "contacts, and a checkbox to trust them or not. A trusted device " + "can read your messages in plain text, so be sure to only validate " + "devices that you are sure are belonging to your contact. It's better " + "to do this when you are next to your contact and their device, so " + "you can check the \"fingerprint\" (the number next to the device) " + "yourself. Do *not* validate a device if the fingerprint is wrong!" + )) + + own_device, __ = await self.get_own_device_information() + + trust_ui.change_container("label") + trust_ui.addLabel(D_("This device ID")) + trust_ui.addText(str(own_device.device_id)) + trust_ui.addLabel(D_("This device's fingerprint")) + trust_ui.addText(" ".join(self.format_identity_key(own_device.identity_key))) + trust_ui.addEmpty() + trust_ui.addEmpty() + + # At least sort the devices by bare JID such that they aren't listed + # completely random + undecided_ordered = sorted(undecided, key=lambda device: device.bare_jid) + + for index, device in enumerate(undecided_ordered): + trust_ui.addLabel(D_("Contact")) + trust_ui.addJid(jid.JID(device.bare_jid)) + trust_ui.addLabel(D_("Device ID")) + trust_ui.addText(str(device.device_id)) + trust_ui.addLabel(D_("Fingerprint")) + trust_ui.addText(" ".join(self.format_identity_key(device.identity_key))) + trust_ui.addLabel(D_("Trust this device?")) + trust_ui.addBool(f"trust_{index}", value=C.bool_const(False)) + trust_ui.addEmpty() + trust_ui.addEmpty() + + trust_ui_result = await xml_tools.defer_xmlui( + sat, + trust_ui, + action_extra={ "meta_encryption_trust": namespace }, + profile=profile + ) + + if C.bool(trust_ui_result.get("cancelled", "false")): + raise omemo.TrustDecisionFailed("Trust UI cancelled.") + + data_form_result = cast(Dict[str, str], xml_tools.xmlui_result_2_data_form_result( + trust_ui_result + )) + + trust_updates: Set[TrustUpdate] = set() + + for key, value in data_form_result.items(): + if not key.startswith("trust_"): + continue + + device = undecided_ordered[int(key[len("trust_"):])] + target_trust = C.bool(value) + trust_level = \ + TrustLevel.TRUSTED if target_trust else TrustLevel.DISTRUSTED + + await self.set_trust( + device.bare_jid, + device.identity_key, + trust_level.name + ) + + trust_updates.add(TrustUpdate( + target_jid=jid.JID(device.bare_jid).userhostJID(), + target_key=device.identity_key, + target_trust=target_trust + )) + + # Check whether ATM is enabled and handle everything in case it is + trust_system = cast(str, sat.memory.param_get_a( + PARAM_NAME, + PARAM_CATEGORY, + profile_key=profile + )) + + if trust_system == "atm": + await manage_trust_message_cache(client, self, frozenset(trust_updates)) + await send_trust_messages(client, self, frozenset(trust_updates)) + + return SessionManagerImpl + + +async def prepare_for_profile( + sat: SAT, + profile: str, + initial_own_label: Optional[str], + signed_pre_key_rotation_period: int = 7 * 24 * 60 * 60, + pre_key_refill_threshold: int = 99, + max_num_per_session_skipped_keys: int = 1000, + max_num_per_message_skipped_keys: Optional[int] = None +) -> omemo.SessionManager: + """Prepare the OMEMO library (storage, backends, core) for a specific profile. + + @param sat: The SAT instance. + @param profile: The profile. + @param initial_own_label: The initial (optional) label to assign to this device if + supported by any of the backends. + @param signed_pre_key_rotation_period: The rotation period for the signed pre key, in + seconds. The rotation period is recommended to be between one week (the default) + and one month. + @param pre_key_refill_threshold: The number of pre keys that triggers a refill to 100. + Defaults to 99, which means that each pre key gets replaced with a new one right + away. The threshold can not be configured to lower than 25. + @param max_num_per_session_skipped_keys: The maximum number of skipped message keys to + keep around per session. Once the maximum is reached, old message keys are deleted + to make space for newer ones. Accessible via + :attr:`max_num_per_session_skipped_keys`. + @param max_num_per_message_skipped_keys: The maximum number of skipped message keys to + accept in a single message. When set to ``None`` (the default), this parameter + defaults to the per-session maximum (i.e. the value of the + ``max_num_per_session_skipped_keys`` parameter). This parameter may only be 0 if + the per-session maximum is 0, otherwise it must be a number between 1 and the + per-session maximum. Accessible via :attr:`max_num_per_message_skipped_keys`. + @return: A session manager with ``urn:xmpp:omemo:2`` and + ``eu.siacs.conversations.axolotl`` capabilities, specifically for the given + profile. + @raise BundleUploadFailed: if a bundle upload failed. Forwarded from + :meth:`~omemo.session_manager.SessionManager.create`. + @raise BundleDownloadFailed: if a bundle download failed. Forwarded from + :meth:`~omemo.session_manager.SessionManager.create`. + @raise BundleDeletionFailed: if a bundle deletion failed. Forwarded from + :meth:`~omemo.session_manager.SessionManager.create`. + @raise DeviceListUploadFailed: if a device list upload failed. Forwarded from + :meth:`~omemo.session_manager.SessionManager.create`. + @raise DeviceListDownloadFailed: if a device list download failed. Forwarded from + :meth:`~omemo.session_manager.SessionManager.create`. + """ + + client = sat.get_client(profile) + xep_0060 = cast(XEP_0060, sat.plugins["XEP-0060"]) + + storage = StorageImpl(profile) + + # TODO: Untested + await oldmemo.migrations.migrate( + LegacyStorageImpl(profile, client.jid.userhost()), + storage, + # TODO: Do we want BLINDLY_TRUSTED or TRUSTED here? + TrustLevel.BLINDLY_TRUSTED.name, + TrustLevel.UNDECIDED.name, + TrustLevel.DISTRUSTED.name, + lambda bare_jid, device_id: download_oldmemo_bundle( + client, + xep_0060, + bare_jid, + device_id + ) + ) + + session_manager = await make_session_manager(sat, profile).create( + [ + twomemo.Twomemo( + storage, + max_num_per_session_skipped_keys, + max_num_per_message_skipped_keys + ), + oldmemo.Oldmemo( + storage, + max_num_per_session_skipped_keys, + max_num_per_message_skipped_keys + ) + ], + storage, + client.jid.userhost(), + initial_own_label, + TrustLevel.UNDECIDED.value, + signed_pre_key_rotation_period, + pre_key_refill_threshold, + omemo.AsyncFramework.TWISTED + ) + + # This shouldn't hurt here since we're not running on overly constrainted devices. + # TODO: Consider ensuring data consistency regularly/in response to certain events + await session_manager.ensure_data_consistency() + + # TODO: Correct entering/leaving of the history synchronization mode isn't terribly + # important for now, since it only prevents an extremely unlikely race condition of + # multiple devices choosing the same pre key for new sessions while the device was + # offline. I don't believe other clients seriously defend against that race condition + # either. In the long run, it might still be cool to have triggers for when history + # sync starts and ends (MAM, MUC catch-up, etc.) and to react to those triggers. + await session_manager.after_history_sync() + + return session_manager + + +DEFAULT_TRUST_MODEL_PARAM = f""" +<params> +<individual> +<category name="{PARAM_CATEGORY}" label={quoteattr(D_('Security'))}> + <param name="{PARAM_NAME}" + label={quoteattr(D_('OMEMO default trust policy'))} + type="list" security="3"> + <option value="atm" + label={quoteattr(D_('Automatic Trust Management (more secure)'))} /> + <option value="btbv" + label={quoteattr(D_('Blind Trust Before Verification (more user friendly)'))} + selected="true" /> + </param> +</category> +</individual> +</params> +""" + + +class OMEMO: + """ + Plugin equipping Libervia with OMEMO capabilities under the (modern) + ``urn:xmpp:omemo:2`` namespace and the (legacy) ``eu.siacs.conversations.axolotl`` + namespace. Both versions of the protocol are handled by this plugin and compatibility + between the two is maintained. MUC messages are supported next to one to one messages. + For trust management, the two trust models "ATM" and "BTBV" are supported. + """ + NS_TWOMEMO = twomemo.twomemo.NAMESPACE + NS_OLDMEMO = oldmemo.oldmemo.NAMESPACE + + # For MUC/MIX message stanzas, the <to/> affix is a MUST + SCE_PROFILE_GROUPCHAT = SCEProfile( + rpad_policy=SCEAffixPolicy.REQUIRED, + time_policy=SCEAffixPolicy.OPTIONAL, + to_policy=SCEAffixPolicy.REQUIRED, + from_policy=SCEAffixPolicy.OPTIONAL, + custom_policies={} + ) + + # For everything but MUC/MIX message stanzas, the <to/> affix is a MAY + SCE_PROFILE = SCEProfile( + rpad_policy=SCEAffixPolicy.REQUIRED, + time_policy=SCEAffixPolicy.OPTIONAL, + to_policy=SCEAffixPolicy.OPTIONAL, + from_policy=SCEAffixPolicy.OPTIONAL, + custom_policies={} + ) + + def __init__(self, sat: SAT) -> None: + """ + @param sat: The SAT instance. + """ + + self.__sat = sat + + # Add configuration option to choose between manual trust and BTBV as the trust + # model + sat.memory.update_params(DEFAULT_TRUST_MODEL_PARAM) + + # Plugins + self.__xep_0045 = cast(Optional[XEP_0045], sat.plugins.get("XEP-0045")) + self.__xep_0334 = cast(XEP_0334, sat.plugins["XEP-0334"]) + self.__xep_0359 = cast(Optional[XEP_0359], sat.plugins.get("XEP-0359")) + self.__xep_0420 = cast(XEP_0420, sat.plugins["XEP-0420"]) + + # In contrast to one to one messages, MUC messages are reflected to the sender. + # Thus, the sender does not add messages to their local message log when sending + # them, but when the reflection is received. This approach does not pair well with + # OMEMO, since for security reasons it is forbidden to encrypt messages for the + # own device. Thus, when the reflection of an OMEMO message is received, it can't + # be decrypted and added to the local message log as usual. To counteract this, + # the plaintext of encrypted messages sent to MUCs are cached in this field, such + # that when the reflection is received, the plaintext can be looked up from the + # cache and added to the local message log. + # TODO: The old plugin expired this cache after some time. I'm not sure that's + # really necessary. + self.__muc_plaintext_cache: Dict[MUCPlaintextCacheKey, bytes] = {} + + # Mapping from profile name to corresponding session manager + self.__session_managers: Dict[str, omemo.SessionManager] = {} + + # Calls waiting for a specific session manager to be built + self.__session_manager_waiters: Dict[str, List[defer.Deferred]] = {} + + # These triggers are used by oldmemo, which doesn't do SCE and only applies to + # messages. Temporarily, until a more fitting trigger for SCE-based encryption is + # added, the message_received trigger is also used for twomemo. + sat.trigger.add( + "message_received", + self._message_received_trigger, + priority=100050 + ) + sat.trigger.add( + "send_message_data", + self.__send_message_data_trigger, + priority=100050 + ) + + # These triggers are used by twomemo, which does do SCE + sat.trigger.add("send", self.__send_trigger, priority=0) + # TODO: Add new triggers here for freshly received and about-to-be-sent stanzas, + # including IQs. + + # Give twomemo a (slightly) higher priority than oldmemo + sat.register_encryption_plugin(self, "TWOMEMO", twomemo.twomemo.NAMESPACE, 101) + sat.register_encryption_plugin(self, "OLDMEMO", oldmemo.oldmemo.NAMESPACE, 100) + + xep_0163 = cast(XEP_0163, sat.plugins["XEP-0163"]) + xep_0163.add_pep_event( + "TWOMEMO_DEVICES", + TWOMEMO_DEVICE_LIST_NODE, + lambda items_event, profile: defer.ensureDeferred( + self.__on_device_list_update(items_event, profile) + ) + ) + xep_0163.add_pep_event( + "OLDMEMO_DEVICES", + OLDMEMO_DEVICE_LIST_NODE, + lambda items_event, profile: defer.ensureDeferred( + self.__on_device_list_update(items_event, profile) + ) + ) + + try: + self.__text_commands = cast(TextCommands, sat.plugins[C.TEXT_CMDS]) + except KeyError: + log.info(_("Text commands not available")) + else: + self.__text_commands.register_text_commands(self) + + def profile_connected( # pylint: disable=invalid-name + self, + client: SatXMPPClient + ) -> None: + """ + @param client: The client. + """ + + defer.ensureDeferred(self.get_session_manager( + cast(str, client.profile) + )) + + async def cmd_omemo_reset( + self, + client: SatXMPPClient, + mess_data: MessageData + ) -> Literal[False]: + """Reset all sessions of devices that belong to the recipient of ``mess_data``. + + This must only be callable manually by the user. Use this when a session is + apparently broken, i.e. sending and receiving encrypted messages doesn't work and + something being wrong has been confirmed manually with the recipient. + + @param client: The client. + @param mess_data: The message data, whose ``to`` attribute will be the bare JID to + reset all sessions with. + @return: The constant value ``False``, indicating to the text commands plugin that + the message is not supposed to be sent. + """ + + twomemo_requested = \ + client.encryption.is_encryption_requested(mess_data, twomemo.twomemo.NAMESPACE) + oldmemo_requested = \ + client.encryption.is_encryption_requested(mess_data, oldmemo.oldmemo.NAMESPACE) + + if not (twomemo_requested or oldmemo_requested): + self.__text_commands.feed_back( + client, + _("You need to have OMEMO encryption activated to reset the session"), + mess_data + ) + return False + + bare_jid = mess_data["to"].userhost() + + session_manager = await self.get_session_manager(client.profile) + devices = await session_manager.get_device_information(bare_jid) + + for device in devices: + log.debug(f"Replacing sessions with device {device}") + await session_manager.replace_sessions(device) + + self.__text_commands.feed_back( + client, + _("OMEMO session has been reset"), + mess_data + ) + + return False + + async def get_trust_ui( # pylint: disable=invalid-name + self, + client: SatXMPPClient, + entity: jid.JID + ) -> xml_tools.XMLUI: + """ + @param client: The client. + @param entity: The entity whose device trust levels to manage. + @return: An XMLUI instance which opens a form to manage the trust level of all + devices belonging to the entity. + """ + + if entity.resource: + raise ValueError("A bare JID is expected.") + + bare_jids: Set[str] + if self.__xep_0045 is not None and self.__xep_0045.is_joined_room(client, entity): + bare_jids = self.__get_joined_muc_users(client, self.__xep_0045, entity) + else: + bare_jids = { entity.userhost() } + + session_manager = await self.get_session_manager(client.profile) + + # At least sort the devices by bare JID such that they aren't listed completely + # random + devices = sorted(cast(Set[omemo.DeviceInformation], set()).union(*[ + await session_manager.get_device_information(bare_jid) + for bare_jid + in bare_jids + ]), key=lambda device: device.bare_jid) + + async def callback( + data: Any, + profile: str + ) -> Dict[Never, Never]: + """ + @param data: The XMLUI result produces by the trust UI form. + @param profile: The profile. + @return: An empty dictionary. The type of the return value was chosen + conservatively since the exact options are neither known not needed here. + """ + + if C.bool(data.get("cancelled", "false")): + return {} + + data_form_result = cast( + Dict[str, str], + xml_tools.xmlui_result_2_data_form_result(data) + ) + + trust_updates: Set[TrustUpdate] = set() + + for key, value in data_form_result.items(): + if not key.startswith("trust_"): + continue + + device = devices[int(key[len("trust_"):])] + trust_level_name = value + + if device.trust_level_name != trust_level_name: + await session_manager.set_trust( + device.bare_jid, + device.identity_key, + trust_level_name + ) + + target_trust: Optional[bool] = None + + if TrustLevel(trust_level_name) is TrustLevel.TRUSTED: + target_trust = True + if TrustLevel(trust_level_name) is TrustLevel.DISTRUSTED: + target_trust = False + + if target_trust is not None: + trust_updates.add(TrustUpdate( + target_jid=jid.JID(device.bare_jid).userhostJID(), + target_key=device.identity_key, + target_trust=target_trust + )) + + # Check whether ATM is enabled and handle everything in case it is + trust_system = cast(str, self.__sat.memory.param_get_a( + PARAM_NAME, + PARAM_CATEGORY, + profile_key=profile + )) + + if trust_system == "atm": + if len(trust_updates) > 0: + await manage_trust_message_cache( + client, + session_manager, + frozenset(trust_updates) + ) + + await send_trust_messages( + client, + session_manager, + frozenset(trust_updates) + ) + + return {} + + submit_id = self.__sat.register_callback(callback, with_data=True, one_shot=True) + + result = xml_tools.XMLUI( + panel_type=C.XMLUI_FORM, + title=D_("OMEMO trust management"), + submit_id=submit_id + ) + # Casting this to Any, otherwise all calls on the variable cause type errors + # pylint: disable=no-member + trust_ui = cast(Any, result) + trust_ui.addText(D_( + "This is OMEMO trusting system. You'll see below the devices of your" + " contacts, and a list selection to trust them or not. A trusted device" + " can read your messages in plain text, so be sure to only validate" + " devices that you are sure are belonging to your contact. It's better" + " to do this when you are next to your contact and their device, so" + " you can check the \"fingerprint\" (the number next to the device)" + " yourself. Do *not* validate a device if the fingerprint is wrong!" + " Note that manually validating a fingerprint disables any form of automatic" + " trust." + )) + + own_device, __ = await session_manager.get_own_device_information() + + trust_ui.change_container("label") + trust_ui.addLabel(D_("This device ID")) + trust_ui.addText(str(own_device.device_id)) + trust_ui.addLabel(D_("This device's fingerprint")) + trust_ui.addText(" ".join(session_manager.format_identity_key( + own_device.identity_key + ))) + trust_ui.addEmpty() + trust_ui.addEmpty() + + for index, device in enumerate(devices): + trust_ui.addLabel(D_("Contact")) + trust_ui.addJid(jid.JID(device.bare_jid)) + trust_ui.addLabel(D_("Device ID")) + trust_ui.addText(str(device.device_id)) + trust_ui.addLabel(D_("Fingerprint")) + trust_ui.addText(" ".join(session_manager.format_identity_key( + device.identity_key + ))) + trust_ui.addLabel(D_("Trust this device?")) + + current_trust_level = TrustLevel(device.trust_level_name) + avaiable_trust_levels = \ + { TrustLevel.DISTRUSTED, TrustLevel.TRUSTED, current_trust_level } + + trust_ui.addList( + f"trust_{index}", + options=[ trust_level.name for trust_level in avaiable_trust_levels ], + selected=current_trust_level.name, + styles=[ "inline" ] + ) + + twomemo_active = dict(device.active).get(twomemo.twomemo.NAMESPACE) + if twomemo_active is None: + trust_ui.addEmpty() + trust_ui.addLabel(D_("(not available for Twomemo)")) + if twomemo_active is False: + trust_ui.addEmpty() + trust_ui.addLabel(D_("(inactive for Twomemo)")) + + oldmemo_active = dict(device.active).get(oldmemo.oldmemo.NAMESPACE) + if oldmemo_active is None: + trust_ui.addEmpty() + trust_ui.addLabel(D_("(not available for Oldmemo)")) + if oldmemo_active is False: + trust_ui.addEmpty() + trust_ui.addLabel(D_("(inactive for Oldmemo)")) + + trust_ui.addEmpty() + trust_ui.addEmpty() + + return result + + @staticmethod + def __get_joined_muc_users( + client: SatXMPPClient, + xep_0045: XEP_0045, + room_jid: jid.JID + ) -> Set[str]: + """ + @param client: The client. + @param xep_0045: A MUC plugin instance. + @param room_jid: The room JID. + @return: A set containing the bare JIDs of the MUC participants. + @raise InternalError: if the MUC is not joined or the entity information of a + participant isn't available. + """ + + bare_jids: Set[str] = set() + + try: + room = cast(muc.Room, xep_0045.get_room(client, room_jid)) + except exceptions.NotFound as e: + raise exceptions.InternalError( + "Participant list of unjoined MUC requested." + ) from e + + for user in cast(Dict[str, muc.User], room.roster).values(): + entity = cast(Optional[SatXMPPEntity], user.entity) + if entity is None: + raise exceptions.InternalError( + f"Participant list of MUC requested, but the entity information of" + f" the participant {user} is not available." + ) + + bare_jids.add(entity.jid.userhost()) + + return bare_jids + + async def get_session_manager(self, profile: str) -> omemo.SessionManager: + """ + @param profile: The profile to prepare for. + @return: A session manager instance for this profile. Creates a new instance if + none was prepared before. + """ + + try: + # Try to return the session manager + return self.__session_managers[profile] + except KeyError: + # If a session manager for that profile doesn't exist yet, check whether it is + # currently being built. A session manager being built is signified by the + # profile key existing on __session_manager_waiters. + if profile in self.__session_manager_waiters: + # If the session manager is being built, add ourselves to the waiting + # queue + deferred = defer.Deferred() + self.__session_manager_waiters[profile].append(deferred) + return cast(omemo.SessionManager, await deferred) + + # If the session manager is not being built, do so here. + self.__session_manager_waiters[profile] = [] + + # Build and store the session manager + try: + session_manager = await prepare_for_profile( + self.__sat, + profile, + initial_own_label="Libervia" + ) + except Exception as e: + # In case of an error during initalization, notify the waiters accordingly + # and delete them + for waiter in self.__session_manager_waiters[profile]: + waiter.errback(e) + del self.__session_manager_waiters[profile] + + # Re-raise the exception + raise + + self.__session_managers[profile] = session_manager + + # Notify the waiters and delete them + for waiter in self.__session_manager_waiters[profile]: + waiter.callback(session_manager) + del self.__session_manager_waiters[profile] + + return session_manager + + async def __message_received_trigger_atm( + self, + client: SatXMPPClient, + message_elt: domish.Element, + session_manager: omemo.SessionManager, + sender_device_information: omemo.DeviceInformation, + timestamp: datetime + ) -> None: + """Check a newly decrypted message stanza for ATM content and perform ATM in case. + + @param client: The client which received the message. + @param message_elt: The message element. Can be modified. + @param session_manager: The session manager. + @param sender_device_information: Information about the device that sent/encrypted + the message. + @param timestamp: Timestamp extracted from the SCE time affix. + """ + + trust_message_cache = persistent.LazyPersistentBinaryDict( + "XEP-0384/TM", + client.profile + ) + + new_cache_entries: Set[TrustMessageCacheEntry] = set() + + for trust_message_elt in message_elt.elements(NS_TM, "trust-message"): + assert isinstance(trust_message_elt, domish.Element) + + try: + TRUST_MESSAGE_SCHEMA.validate(trust_message_elt.toXml()) + except xmlschema.XMLSchemaValidationError as e: + raise exceptions.ParsingError( + "<trust-message/> element doesn't pass schema validation." + ) from e + + if trust_message_elt["usage"] != NS_ATM: + # Skip non-ATM trust message + continue + + if trust_message_elt["encryption"] != OMEMO.NS_TWOMEMO: + # Skip non-twomemo trust message + continue + + for key_owner_elt in trust_message_elt.elements(NS_TM, "key-owner"): + assert isinstance(key_owner_elt, domish.Element) + + key_owner_jid = jid.JID(key_owner_elt["jid"]).userhostJID() + + for trust_elt in key_owner_elt.elements(NS_TM, "trust"): + assert isinstance(trust_elt, domish.Element) + + new_cache_entries.add(TrustMessageCacheEntry( + sender_jid=jid.JID(sender_device_information.bare_jid), + sender_key=sender_device_information.identity_key, + timestamp=timestamp, + trust_update=TrustUpdate( + target_jid=key_owner_jid, + target_key=base64.b64decode(str(trust_elt)), + target_trust=True + ) + )) + + for distrust_elt in key_owner_elt.elements(NS_TM, "distrust"): + assert isinstance(distrust_elt, domish.Element) + + new_cache_entries.add(TrustMessageCacheEntry( + sender_jid=jid.JID(sender_device_information.bare_jid), + sender_key=sender_device_information.identity_key, + timestamp=timestamp, + trust_update=TrustUpdate( + target_jid=key_owner_jid, + target_key=base64.b64decode(str(distrust_elt)), + target_trust=False + ) + )) + + # Load existing cache entries + existing_cache_entries = cast( + Set[TrustMessageCacheEntry], + await trust_message_cache.get("cache", set()) + ) + + # Discard cache entries by timestamp comparison + existing_by_target = { + ( + cache_entry.trust_update.target_jid.userhostJID(), + cache_entry.trust_update.target_key + ): cache_entry + for cache_entry + in existing_cache_entries + } + + # Iterate over a copy here, such that new_cache_entries can be modified + for new_cache_entry in set(new_cache_entries): + existing_cache_entry = existing_by_target.get( + ( + new_cache_entry.trust_update.target_jid.userhostJID(), + new_cache_entry.trust_update.target_key + ), + None + ) + + if existing_cache_entry is not None: + if existing_cache_entry.timestamp > new_cache_entry.timestamp: + # If the existing cache entry is newer than the new cache entry, + # discard the new one in favor of the existing one + new_cache_entries.remove(new_cache_entry) + else: + # Otherwise, discard the existing cache entry. This includes the case + # when both cache entries have matching timestamps. + existing_cache_entries.remove(existing_cache_entry) + + # If the sending device is trusted, apply the new cache entries + applied_trust_updates: Set[TrustUpdate] = set() + + if TrustLevel(sender_device_information.trust_level_name) is TrustLevel.TRUSTED: + # Iterate over a copy such that new_cache_entries can be modified + for cache_entry in set(new_cache_entries): + trust_update = cache_entry.trust_update + + trust_level = ( + TrustLevel.TRUSTED + if trust_update.target_trust + else TrustLevel.DISTRUSTED + ) + + await session_manager.set_trust( + trust_update.target_jid.userhost(), + trust_update.target_key, + trust_level.name + ) + + applied_trust_updates.add(trust_update) + + new_cache_entries.remove(cache_entry) + + # Store the remaining existing and new cache entries + await trust_message_cache.force( + "cache", + existing_cache_entries | new_cache_entries + ) + + # If the trust of at least one device was modified, run the ATM cache update logic + if len(applied_trust_updates) > 0: + await manage_trust_message_cache( + client, + session_manager, + frozenset(applied_trust_updates) + ) + + async def _message_received_trigger( + self, + client: SatXMPPClient, + message_elt: domish.Element, + post_treat: defer.Deferred + ) -> bool: + """ + @param client: The client which received the message. + @param message_elt: The message element. Can be modified. + @param post_treat: A deferred which evaluates to a :class:`MessageData` once the + message has fully progressed through the message receiving flow. Can be used + to apply treatments to the fully processed message, like marking it as + encrypted. + @return: Whether to continue the message received flow. + """ + if client.is_component: + return True + muc_plaintext_cache_key: Optional[MUCPlaintextCacheKey] = None + + sender_jid = jid.JID(message_elt["from"]) + feedback_jid: jid.JID + + message_type = message_elt.getAttribute("type", C.MESS_TYPE_NORMAL) + is_muc_message = message_type == C.MESS_TYPE_GROUPCHAT + if is_muc_message: + if self.__xep_0045 is None: + log.warning( + "Ignoring MUC message since plugin XEP-0045 is not available." + ) + # Can't handle a MUC message without XEP-0045, let the flow continue + # normally + return True + + room_jid = feedback_jid = sender_jid.userhostJID() + + try: + room = cast(muc.Room, self.__xep_0045.get_room(client, room_jid)) + except exceptions.NotFound: + log.warning( + f"Ignoring MUC message from a room that has not been joined:" + f" {room_jid}" + ) + # Whatever, let the flow continue + return True + + sender_user = cast(Optional[muc.User], room.getUser(sender_jid.resource)) + if sender_user is None: + log.warning( + f"Ignoring MUC message from room {room_jid} since the sender's user" + f" wasn't found {sender_jid.resource}" + ) + # Whatever, let the flow continue + return True + + sender_user_jid = cast(Optional[jid.JID], sender_user.entity) + if sender_user_jid is None: + log.warning( + f"Ignoring MUC message from room {room_jid} since the sender's bare" + f" JID couldn't be found from its user information: {sender_user}" + ) + # Whatever, let the flow continue + return True + + sender_jid = sender_user_jid + + message_uid: Optional[str] = None + if self.__xep_0359 is not None: + message_uid = self.__xep_0359.get_origin_id(message_elt) + if message_uid is None: + message_uid = message_elt.getAttribute("id") + if message_uid is not None: + muc_plaintext_cache_key = MUCPlaintextCacheKey( + client, + room_jid, + message_uid + ) + else: + # I'm not sure why this check is required, this code is copied from the old + # plugin. + if sender_jid.userhostJID() == client.jid.userhostJID(): + try: + feedback_jid = jid.JID(message_elt["to"]) + except KeyError: + feedback_jid = client.server_jid + else: + feedback_jid = sender_jid + + sender_bare_jid = sender_jid.userhost() + + message: Optional[omemo.Message] = None + encrypted_elt: Optional[domish.Element] = None + + twomemo_encrypted_elt = cast(Optional[domish.Element], next( + message_elt.elements(twomemo.twomemo.NAMESPACE, "encrypted"), + None + )) + + oldmemo_encrypted_elt = cast(Optional[domish.Element], next( + message_elt.elements(oldmemo.oldmemo.NAMESPACE, "encrypted"), + None + )) + + try: + session_manager = await self.get_session_manager(cast(str, client.profile)) + except Exception as e: + log.error(f"error while preparing profile for {client.profile}: {e}") + # we don't want to block the workflow + return True + + if twomemo_encrypted_elt is not None: + try: + message = twomemo.etree.parse_message( + xml_tools.domish_elt_2_et_elt(twomemo_encrypted_elt), + sender_bare_jid + ) + except (ValueError, XMLSchemaValidationError): + log.warning( + f"Ingoring malformed encrypted message for namespace" + f" {twomemo.twomemo.NAMESPACE}: {twomemo_encrypted_elt.toXml()}" + ) + else: + encrypted_elt = twomemo_encrypted_elt + + if oldmemo_encrypted_elt is not None: + try: + message = await oldmemo.etree.parse_message( + xml_tools.domish_elt_2_et_elt(oldmemo_encrypted_elt), + sender_bare_jid, + client.jid.userhost(), + session_manager + ) + except (ValueError, XMLSchemaValidationError): + log.warning( + f"Ingoring malformed encrypted message for namespace" + f" {oldmemo.oldmemo.NAMESPACE}: {oldmemo_encrypted_elt.toXml()}" + ) + except omemo.SenderNotFound: + log.warning( + f"Ingoring encrypted message for namespace" + f" {oldmemo.oldmemo.NAMESPACE} by unknown sender:" + f" {oldmemo_encrypted_elt.toXml()}" + ) + else: + encrypted_elt = oldmemo_encrypted_elt + + if message is None or encrypted_elt is None: + # None of our business, let the flow continue + return True + + message_elt.children.remove(encrypted_elt) + + log.debug( + f"{message.namespace} message of type {message_type} received from" + f" {sender_bare_jid}" + ) + + plaintext: Optional[bytes] + device_information: omemo.DeviceInformation + + if ( + muc_plaintext_cache_key is not None + and muc_plaintext_cache_key in self.__muc_plaintext_cache + ): + # Use the cached plaintext + plaintext = self.__muc_plaintext_cache.pop(muc_plaintext_cache_key) + + # Since this message was sent by us, use the own device information here + device_information, __ = await session_manager.get_own_device_information() + else: + try: + plaintext, device_information, __ = await session_manager.decrypt(message) + except omemo.MessageNotForUs: + # The difference between this being a debug or a warning is whether there + # is a body included in the message. Without a body, we can assume that + # it's an empty OMEMO message used for protocol stability reasons, which + # is not expected to be sent to all devices of all recipients. If a body + # is included, we can assume that the message carries content and we + # missed out on something. + if len(list(message_elt.elements(C.NS_CLIENT, "body"))) > 0: + client.feedback( + feedback_jid, + D_( + f"An OMEMO message from {sender_jid.full()} has not been" + f" encrypted for our device, we can't decrypt it." + ), + { C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR } + ) + log.warning("Message not encrypted for us.") + else: + log.debug("Message not encrypted for us.") + + # No point in further processing this message. + return False + except Exception as e: + log.warning(_("Can't decrypt message: {reason}\n{xml}").format( + reason=e, + xml=message_elt.toXml() + )) + client.feedback( + feedback_jid, + D_( + f"An OMEMO message from {sender_jid.full()} can't be decrypted:" + f" {e}" + ), + { C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR } + ) + # No point in further processing this message + return False + + affix_values: Optional[SCEAffixValues] = None + + if message.namespace == twomemo.twomemo.NAMESPACE: + if plaintext is not None: + # XEP_0420.unpack_stanza handles the whole unpacking, including the + # relevant modifications to the element + sce_profile = \ + OMEMO.SCE_PROFILE_GROUPCHAT if is_muc_message else OMEMO.SCE_PROFILE + try: + affix_values = self.__xep_0420.unpack_stanza( + sce_profile, + message_elt, + plaintext + ) + except Exception as e: + log.warning(D_( + f"Error unpacking SCE-encrypted message: {e}\n{plaintext}" + )) + client.feedback( + feedback_jid, + D_( + f"An OMEMO message from {sender_jid.full()} was rejected:" + f" {e}" + ), + { C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR } + ) + # No point in further processing this message + return False + else: + if affix_values.timestamp is not None: + # TODO: affix_values.timestamp contains the timestamp included in + # the encrypted element here. The XEP says it SHOULD be displayed + # with the plaintext by clients. + pass + + if message.namespace == oldmemo.oldmemo.NAMESPACE: + # Remove all body elements from the original element, since those act as + # fallbacks in case the encryption protocol is not supported + for child in message_elt.elements(): + if child.name == "body": + message_elt.children.remove(child) + + if plaintext is not None: + # Add the decrypted body + message_elt.addElement("body", content=plaintext.decode("utf-8")) + + # Mark the message as trusted or untrusted. Undecided counts as untrusted here. + trust_level = \ + await session_manager._evaluate_custom_trust_level(device_information) + + if trust_level is omemo.TrustLevel.TRUSTED: + post_treat.addCallback(client.encryption.mark_as_trusted) + else: + post_treat.addCallback(client.encryption.mark_as_untrusted) + + # Mark the message as originally encrypted + post_treat.addCallback( + client.encryption.mark_as_encrypted, + namespace=message.namespace + ) + + # Handle potential ATM trust updates + if affix_values is not None and affix_values.timestamp is not None: + await self.__message_received_trigger_atm( + client, + message_elt, + session_manager, + device_information, + affix_values.timestamp + ) + + # Message processed successfully, continue with the flow + return True + + async def __send_trigger(self, client: SatXMPPClient, stanza: domish.Element) -> bool: + """ + @param client: The client sending this message. + @param stanza: The stanza that is about to be sent. Can be modified. + @return: Whether the send message flow should continue or not. + """ + # SCE is only applicable to message and IQ stanzas + # FIXME: temporary disabling IQ stanza encryption + if stanza.name not in { "message" }: # , "iq" }: + return True + + # Get the intended recipient + recipient = stanza.getAttribute("to", None) + if recipient is None: + if stanza.name == "message": + # Message stanzas must have a recipient + raise exceptions.InternalError( + f"Message without recipient encountered. Blocking further processing" + f" to avoid leaking plaintext data: {stanza.toXml()}" + ) + + # IQs without a recipient are a thing, I believe those simply target the + # server and are thus not eligible for e2ee anyway. + return True + + # Parse the JID + recipient_bare_jid = jid.JID(recipient).userhostJID() + + # Check whether encryption with twomemo is requested + encryption = client.encryption.getSession(recipient_bare_jid) + + if encryption is None: + # Encryption is not requested for this recipient + return True + + if encryption["plugin"].namespace != twomemo.twomemo.NAMESPACE: + # Encryption is requested for this recipient, but not with twomemo + return True + + # All pre-checks done, we can start encrypting! + await self.encrypt( + client, + twomemo.twomemo.NAMESPACE, + stanza, + recipient_bare_jid, + stanza.getAttribute("type", C.MESS_TYPE_NORMAL) == C.MESS_TYPE_GROUPCHAT, + stanza.getAttribute("id", None) + ) + + # Add a store hint if this is a message stanza + if stanza.name == "message": + self.__xep_0334.add_hint_elements(stanza, [ "store" ]) + + # Let the flow continue. + return True + + async def __send_message_data_trigger( + self, + client: SatXMPPClient, + mess_data: MessageData + ) -> None: + """ + @param client: The client sending this message. + @param mess_data: The message data that is about to be sent. Can be modified. + """ + + # Check whether encryption is requested for this message + try: + namespace = mess_data[C.MESS_KEY_ENCRYPTION]["plugin"].namespace + except KeyError: + return + + # If encryption is requested, check whether it's oldmemo + if namespace != oldmemo.oldmemo.NAMESPACE: + return + + # All pre-checks done, we can start encrypting! + stanza = mess_data["xml"] + recipient_jid = mess_data["to"] + is_muc_message = mess_data["type"] == C.MESS_TYPE_GROUPCHAT + stanza_id = mess_data["uid"] + + await self.encrypt( + client, + oldmemo.oldmemo.NAMESPACE, + stanza, + recipient_jid, + is_muc_message, + stanza_id + ) + + # Add a store hint + self.__xep_0334.add_hint_elements(stanza, [ "store" ]) + + async def encrypt( + self, + client: SatXMPPClient, + namespace: Literal["urn:xmpp:omemo:2", "eu.siacs.conversations.axolotl"], + stanza: domish.Element, + recipient_jids: Union[jid.JID, Set[jid.JID]], + is_muc_message: bool, + stanza_id: Optional[str] + ) -> None: + """ + @param client: The client. + @param namespace: The namespace of the OMEMO version to use. + @param stanza: The stanza. Twomemo will encrypt the whole stanza using SCE, + oldmemo will encrypt only the body. The stanza is modified by this call. + @param recipient_jid: The JID of the recipients. + Can be a bare (aka "userhost") JIDs but doesn't have to. + A single JID can be used. + @param is_muc_message: Whether the stanza is a message stanza to a MUC room. + @param stanza_id: The id of this stanza. Especially relevant for message stanzas + to MUC rooms such that the outgoing plaintext can be cached for MUC message + reflection handling. + + @warning: The calling code MUST take care of adding the store message processing + hint to the stanza if applicable! This can be done before or after this call, + the order doesn't matter. + """ + if isinstance(recipient_jids, jid.JID): + recipient_jids = {recipient_jids} + if not recipient_jids: + raise exceptions.InternalError("At least one JID must be specified") + recipient_jid = next(iter(recipient_jids)) + + muc_plaintext_cache_key: Optional[MUCPlaintextCacheKey] = None + + recipient_bare_jids: Set[str] + feedback_jid: jid.JID + + if is_muc_message: + if len(recipient_jids) != 1: + raise exceptions.InternalError( + 'Only one JID can be set when "is_muc_message" is set' + ) + if self.__xep_0045 is None: + raise exceptions.InternalError( + "Encryption of MUC message requested, but plugin XEP-0045 is not" + " available." + ) + + if stanza_id is None: + raise exceptions.InternalError( + "Encryption of MUC message requested, but stanza id not available." + ) + + room_jid = feedback_jid = recipient_jid.userhostJID() + + recipient_bare_jids = self.__get_joined_muc_users( + client, + self.__xep_0045, + room_jid + ) + + muc_plaintext_cache_key = MUCPlaintextCacheKey( + client=client, + room_jid=room_jid, + message_uid=stanza_id + ) + else: + recipient_bare_jids = {r.userhost() for r in recipient_jids} + feedback_jid = recipient_jid.userhostJID() + + log.debug( + f"Intercepting message that is to be encrypted by {namespace} for" + f" {recipient_bare_jids}" + ) + + def prepare_stanza() -> Optional[bytes]: + """Prepares the stanza for encryption. + + Does so by removing all parts that are not supposed to be sent in plain. Also + extracts/prepares the plaintext to encrypt. + + @return: The plaintext to encrypt. Returns ``None`` in case body-only + encryption is requested and no body was found. The function should + gracefully return in that case, i.e. it's not a critical error that should + abort the message sending flow. + """ + + if namespace == twomemo.twomemo.NAMESPACE: + return self.__xep_0420.pack_stanza( + OMEMO.SCE_PROFILE_GROUPCHAT if is_muc_message else OMEMO.SCE_PROFILE, + stanza + ) + + if namespace == oldmemo.oldmemo.NAMESPACE: + plaintext: Optional[bytes] = None + + for child in stanza.elements(): + if child.name == "body" and plaintext is None: + plaintext = str(child).encode("utf-8") + + # Any other sensitive elements to remove here? + if child.name in { "body", "html" }: + stanza.children.remove(child) + + if plaintext is None: + log.warning( + "No body found in intercepted message to be encrypted with" + " oldmemo." + ) + + return plaintext + + return assert_never(namespace) + + # The stanza/plaintext preparation was moved into its own little function for type + # safety reasons. + plaintext = prepare_stanza() + if plaintext is None: + return + + log.debug(f"Plaintext to encrypt: {plaintext}") + + session_manager = await self.get_session_manager(client.profile) + + try: + messages, encryption_errors = await session_manager.encrypt( + frozenset(recipient_bare_jids), + { namespace: plaintext }, + backend_priority_order=[ namespace ], + identifier=feedback_jid.userhost() + ) + except Exception as e: + msg = _( + # pylint: disable=consider-using-f-string + "Can't encrypt message for {entities}: {reason}".format( + entities=', '.join(recipient_bare_jids), + reason=e + ) + ) + log.warning(msg) + client.feedback(feedback_jid, msg, { + C.MESS_EXTRA_INFO: C.EXTRA_INFO_ENCR_ERR + }) + raise e + + if len(encryption_errors) > 0: + log.warning( + f"Ignored the following non-critical encryption errors:" + f" {encryption_errors}" + ) + + encrypted_errors_stringified = ", ".join([ + f"device {err.device_id} of {err.bare_jid} under namespace" + f" {err.namespace}" + for err + in encryption_errors + ]) + + client.feedback( + feedback_jid, + D_( + "There were non-critical errors during encryption resulting in some" + " of your destinees' devices potentially not receiving the message." + " This happens when the encryption data/key material of a device is" + " incomplete or broken, which shouldn't happen for actively used" + " devices, and can usually be ignored. The following devices are" + f" affected: {encrypted_errors_stringified}." + ) + ) + + message = next(message for message in messages if message.namespace == namespace) + + if namespace == twomemo.twomemo.NAMESPACE: + # Add the encrypted element + stanza.addChild(xml_tools.et_elt_2_domish_elt( + twomemo.etree.serialize_message(message) + )) + + if namespace == oldmemo.oldmemo.NAMESPACE: + # Add the encrypted element + stanza.addChild(xml_tools.et_elt_2_domish_elt( + oldmemo.etree.serialize_message(message) + )) + + if muc_plaintext_cache_key is not None: + self.__muc_plaintext_cache[muc_plaintext_cache_key] = plaintext + + async def __on_device_list_update( + self, + items_event: pubsub.ItemsEvent, + profile: str + ) -> None: + """Handle device list updates fired by PEP. + + @param items_event: The event. + @param profile: The profile this event belongs to. + """ + + sender = cast(jid.JID, items_event.sender) + items = cast(List[domish.Element], items_event.items) + + if len(items) > 1: + log.warning("Ignoring device list update with more than one element.") + return + + item = next(iter(items), None) + if item is None: + log.debug("Ignoring empty device list update.") + return + + item_elt = xml_tools.domish_elt_2_et_elt(item) + + device_list: Dict[int, Optional[str]] = {} + namespace: Optional[str] = None + + list_elt = item_elt.find(f"{{{twomemo.twomemo.NAMESPACE}}}devices") + if list_elt is not None: + try: + device_list = twomemo.etree.parse_device_list(list_elt) + except XMLSchemaValidationError: + pass + else: + namespace = twomemo.twomemo.NAMESPACE + + list_elt = item_elt.find(f"{{{oldmemo.oldmemo.NAMESPACE}}}list") + if list_elt is not None: + try: + device_list = oldmemo.etree.parse_device_list(list_elt) + except XMLSchemaValidationError: + pass + else: + namespace = oldmemo.oldmemo.NAMESPACE + + if namespace is None: + log.warning( + f"Malformed device list update item:" + f" {ET.tostring(item_elt, encoding='unicode')}" + ) + return + + session_manager = await self.get_session_manager(profile) + + await session_manager.update_device_list( + namespace, + sender.userhost(), + device_list + )