Mercurial > libervia-backend
diff sat/plugins/plugin_xep_0384.py @ 3911:8289ac1b34f4
plugin XEP-0384: Fully reworked to adjust to the reworked python-omemo:
- support for both (modern) OMEMO under the `urn:xmpp:omemo:2` namespace and (legacy) OMEMO under the `eu.siacs.conversations.axolotl` namespace
- maintains one identity across both versions of OMEMO
- migrates data from the old plugin
- includes more features for protocol stability
- uses SCE for modern OMEMO
- fully type-checked, linted and format-checked
- added type hints to various pieces of backend code used by the plugin
- added stubs for some Twisted APIs used by the plugin under stubs/ (use `export MYPYPATH=stubs/` before running mypy)
- core (xmpp): enabled `send` trigger and made it an asyncPoint
fix 375
author | Syndace <me@syndace.dev> |
---|---|
date | Tue, 23 Aug 2022 21:06:24 +0200 |
parents | cc653b2685f0 |
children | 4cb38c8312a1 |
line wrap: on
line diff
--- a/sat/plugins/plugin_xep_0384.py Thu Sep 22 12:03:12 2022 +0200 +++ b/sat/plugins/plugin_xep_0384.py Tue Aug 23 21:06:24 2022 +0200 @@ -1,7 +1,7 @@ #!/usr/bin/env python3 -# SAT plugin for OMEMO encryption -# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) +# Libervia plugin for OMEMO encryption +# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -16,1431 +16,2055 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import enum import logging -import random -import base64 -from functools import partial +import time +from typing import ( + Any, Callable, Dict, FrozenSet, List, Literal, NamedTuple, Optional, Set, Type, cast +) +import uuid +import xml.etree.ElementTree as ET from xml.sax.saxutils import quoteattr -from sat.core.i18n import _, D_ + +from typing_extensions import Never, assert_never +from wokkel import muc, pubsub # type: ignore[import] + +from sat.core import exceptions from sat.core.constants import Const as C -from sat.core.log import getLogger -from sat.core import exceptions -from twisted.internet import defer, reactor +from sat.core.core_types import MessageData, SatXMPPEntity +from sat.core.i18n import _, D_ +from sat.core.log import getLogger, Logger +from sat.core.sat_main import SAT +from sat.core.xmpp import SatXMPPClient +from sat.memory import persistent +from sat.plugins.plugin_misc_text_commands import TextCommands +from sat.plugins.plugin_xep_0045 import XEP_0045 +from sat.plugins.plugin_xep_0060 import XEP_0060 +from sat.plugins.plugin_xep_0163 import XEP_0163 +from sat.plugins.plugin_xep_0334 import XEP_0334 +from sat.plugins.plugin_xep_0359 import XEP_0359 +from sat.plugins.plugin_xep_0420 import XEP_0420, SCEAffixPolicy, SCEProfile +from sat.tools import xml_tools +from twisted.internet import defer +from twisted.words.protocols.jabber import error, jid from twisted.words.xish import domish -from twisted.words.protocols.jabber import jid -from twisted.words.protocols.jabber import error as jabber_error -from sat.memory import persistent -from sat.tools import xml_tools + try: import omemo - from omemo import exceptions as omemo_excpt - from omemo.extendedpublicbundle import ExtendedPublicBundle -except ImportError: + import omemo.identity_key_pair + import twomemo + import twomemo.etree + import oldmemo + import oldmemo.etree + import oldmemo.migrations + from xmlschema import XMLSchemaValidationError + + # An explicit version check of the OMEMO libraries should not be required here, since + # the stored data is fully versioned and the library will complain if a downgrade is + # attempted. +except ImportError as import_error: raise exceptions.MissingModule( - 'Missing module omemo, please download/install it. You can use ' - '"pip install omemo"' - ) -try: - from omemo_backend_signal import BACKEND as omemo_backend -except ImportError: - raise exceptions.MissingModule( - 'Missing module omemo-backend-signal, please download/install it. You can use ' - '"pip install omemo-backend-signal"' - ) + "You are missing one or more package required by the OMEMO plugin. Please" + " download/install the pip packages 'omemo', 'twomemo', 'oldmemo' and" + " 'xmlschema'." + ) from import_error + -log = getLogger(__name__) +__all__ = [ # pylint: disable=unused-variable + "PLUGIN_INFO", + "OMEMO" +] + + +log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call] + + +string_to_domish = cast(Callable[[str], domish.Element], xml_tools.ElementParser()) + PLUGIN_INFO = { C.PI_NAME: "OMEMO", C.PI_IMPORT_NAME: "XEP-0384", C.PI_TYPE: "SEC", - C.PI_PROTOCOLS: ["XEP-0384"], - C.PI_DEPENDENCIES: ["XEP-0163", "XEP-0280", "XEP-0334", "XEP-0060"], - C.PI_RECOMMENDATIONS: ["XEP-0045", "XEP-0359", C.TEXT_CMDS], + C.PI_PROTOCOLS: [ "XEP-0384" ], + C.PI_DEPENDENCIES: [ "XEP-0163", "XEP-0280", "XEP-0334", "XEP-0060", "XEP-0420" ], + C.PI_RECOMMENDATIONS: [ "XEP-0045", "XEP-0359", C.TEXT_CMDS ], C.PI_MAIN: "OMEMO", C.PI_HANDLER: "no", C.PI_DESCRIPTION: _("""Implementation of OMEMO"""), } -OMEMO_MIN_VER = (0, 11, 0) -NS_OMEMO = "eu.siacs.conversations.axolotl" -NS_OMEMO_DEVICES = NS_OMEMO + ".devicelist" -NS_OMEMO_BUNDLE = NS_OMEMO + ".bundles:{device_id}" -KEY_STATE = "STATE" -KEY_DEVICE_ID = "DEVICE_ID" -KEY_SESSION = "SESSION" -KEY_TRUST = "TRUST" -# devices which have been automatically trusted by policy like BTBV -KEY_AUTO_TRUST = "AUTO_TRUST" -# list of peer bare jids where trust UI has been used at least once -# this is useful to activate manual trust with BTBV policy -KEY_MANUAL_TRUST = "MANUAL_TRUST" -KEY_ACTIVE_DEVICES = "DEVICES" -KEY_INACTIVE_DEVICES = "INACTIVE_DEVICES" -KEY_ALL_JIDS = "ALL_JIDS" -# time before plaintext cache for MUC is expired -# expressed in seconds, reset on each new MUC message -MUC_CACHE_TTL = 60 * 5 PARAM_CATEGORY = "Security" PARAM_NAME = "omemo_policy" -# we want to manage log emitted by omemo module ourselves +class LogHandler(logging.Handler): + """ + Redirect python-omemo's log output to Libervia's log system. + """ -class SatHandler(logging.Handler): - - def emit(self, record): + def emit(self, record: logging.LogRecord) -> None: log.log(record.levelname, record.getMessage()) - @staticmethod - def install(): - omemo_sm_logger = logging.getLogger("omemo.SessionManager") - omemo_sm_logger.propagate = False - omemo_sm_logger.addHandler(SatHandler()) + +sm_logger = logging.getLogger(omemo.SessionManager.LOG_TAG) +sm_logger.setLevel(logging.DEBUG) +sm_logger.propagate = False +sm_logger.addHandler(LogHandler()) + + +ikp_logger = logging.getLogger(omemo.identity_key_pair.IdentityKeyPair.LOG_TAG) +ikp_logger.setLevel(logging.DEBUG) +ikp_logger.propagate = False +ikp_logger.addHandler(LogHandler()) + + +# TODO: Add handling for device labels, i.e. show device labels in the trust UI and give +# the user a way to change their own device label. -SatHandler.install() +class MUCPlaintextCacheKey(NamedTuple): + # pylint: disable=invalid-name + """ + Structure identifying an encrypted message sent to a MUC. + """ + + client: SatXMPPClient + room_jid: jid.JID + message_uid: str + + +# TODO: Convert without serialization/parsing +# On a medium-to-large-sized oldmemo message stanza, 10000 runs of this function took +# around 0.6 seconds on my setup. +def etree_to_domish(element: ET.Element) -> domish.Element: + """ + @param element: An ElementTree element. + @return: The ElementTree element converted to a domish element. + """ + + return string_to_domish(ET.tostring(element, encoding="unicode")) + + +# TODO: Convert without serialization/parsing +# On a medium-to-large-sized oldmemo message stanza, 10000 runs of this function took less +# than one second on my setup. +def domish_to_etree(element: domish.Element) -> ET.Element: + """ + @param element: A domish element. + @return: The domish element converted to an ElementTree element. + """ + + return ET.fromstring(element.toXml()) -def b64enc(data): - return base64.b64encode(bytes(bytearray(data))).decode("US-ASCII") +def domish_to_etree2(element: domish.Element) -> ET.Element: + """ + WIP + """ + + element_name = element.name + if element.uri is not None: + element_name = "{" + element.uri + "}" + element_name + + attrib: Dict[str, str] = {} + for qname, value in element.attributes.items(): + attribute_name = qname[1] if isinstance(qname, tuple) else qname + attribute_namespace = qname[0] if isinstance(qname, tuple) else None + if attribute_namespace is not None: + attribute_name = "{" + attribute_namespace + "}" + attribute_name + + attrib[attribute_name] = value + + result = ET.Element(element_name, attrib) + + last_child: Optional[ET.Element] = None + for child in element.children: + if isinstance(child, str): + if last_child is None: + result.text = child + else: + last_child.tail = child + else: + last_child = domish_to_etree2(child) + result.append(last_child) + + return result -def promise2Deferred(promise_): - """Create a Deferred and fire it when promise is resolved +@enum.unique +class TrustLevel(enum.Enum): + """ + The trust levels required for BTBV and manual trust. + """ + + TRUSTED: str = "TRUSTED" + BLINDLY_TRUSTED: str = "BLINDLY_TRUSTED" + UNDECIDED: str = "UNDECIDED" + DISTRUSTED: str = "DISTRUSTED" + + def to_omemo_trust_level(self) -> omemo.TrustLevel: + """ + @return: This custom trust level evaluated to one of the OMEMO trust levels. + """ - @param promise_(promise.Promise): promise to convert - @return (defer.Deferred): deferred instance linked to the promise + if self is TrustLevel.TRUSTED or self is TrustLevel.BLINDLY_TRUSTED: + return omemo.TrustLevel.TRUSTED + if self is TrustLevel.UNDECIDED: + return omemo.TrustLevel.UNDECIDED + if self is TrustLevel.DISTRUSTED: + return omemo.TrustLevel.DISTRUSTED + + return assert_never(self) + + +TWOMEMO_DEVICE_LIST_NODE = "urn:xmpp:omemo:2:devices" +OLDMEMO_DEVICE_LIST_NODE = "eu.siacs.conversations.axolotl.devicelist" + + +class StorageImpl(omemo.Storage): + """ + Storage implementation for OMEMO based on :class:`persistent.LazyPersistentBinaryDict` """ - d = defer.Deferred() - promise_.then( - lambda result: reactor.callFromThread(d.callback, result), - lambda exc: reactor.callFromThread(d.errback, exc) - ) - return d + + def __init__(self, profile: str) -> None: + """ + @param profile: The profile this OMEMO data belongs to. + """ + + # persistent.LazyPersistentBinaryDict does not cache at all, so keep the caching + # option of omemo.Storage enabled. + super().__init__() + + self.__storage = persistent.LazyPersistentBinaryDict("XEP-0384", profile) + + async def _load(self, key: str) -> omemo.Maybe[omemo.JSONType]: + try: + return omemo.Just(await self.__storage[key]) + except KeyError: + return omemo.Nothing() + except Exception as e: + raise omemo.StorageException(f"Error while loading key {key}") from e + + async def _store(self, key: str, value: omemo.JSONType) -> None: + try: + await self.__storage.force(key, value) + except Exception as e: + raise omemo.StorageException(f"Error while storing key {key}: {value}") from e + + async def _delete(self, key: str) -> None: + try: + await self.__storage.remove(key) + except KeyError: + pass + except Exception as e: + raise omemo.StorageException(f"Error while deleting key {key}") from e -class OmemoStorage(omemo.Storage): - - def __init__(self, client, device_id, all_jids): - self.own_bare_jid_s = client.jid.userhost() - self.device_id = device_id - self.all_jids = all_jids - self.data = client._xep_0384_data +class LegacyStorageImpl(oldmemo.migrations.LegacyStorage): + """ + Legacy storage implementation to migrate data from the old XEP-0384 plugin. + """ - @property - def is_async(self): - return True + KEY_DEVICE_ID = "DEVICE_ID" + KEY_STATE = "STATE" + KEY_SESSION = "SESSION" + KEY_ACTIVE_DEVICES = "DEVICES" + KEY_INACTIVE_DEVICES = "INACTIVE_DEVICES" + KEY_TRUST = "TRUST" + KEY_ALL_JIDS = "ALL_JIDS" - def setCb(self, deferred, callback): - """Associate Deferred and callback - - callback of omemo.Storage expect a boolean with success state then result - Deferred on the other hand use 2 methods for callback and errback - This method use partial to call callback with boolean then result when - Deferred is called + def __init__(self, profile: str, own_bare_jid: str) -> None: + """ + @param profile: The profile this OMEMO data belongs to. + @param own_bare_jid: The own bare JID, to return by the :meth:`loadOwnData` call. """ - deferred.addCallback(partial(callback, True)) - deferred.addErrback(partial(callback, False)) + + self.__storage = persistent.LazyPersistentBinaryDict("XEP-0384", profile) + self.__own_bare_jid = own_bare_jid + + async def loadOwnData(self) -> Optional[oldmemo.migrations.OwnData]: + own_device_id = await self.__storage.get(LegacyStorageImpl.KEY_DEVICE_ID, None) + if own_device_id is None: + return None + + return oldmemo.migrations.OwnData( + own_bare_jid=self.__own_bare_jid, + own_device_id=own_device_id + ) + + async def deleteOwnData(self) -> None: + try: + await self.__storage.remove(LegacyStorageImpl.KEY_DEVICE_ID) + except KeyError: + pass + + async def loadState(self) -> Optional[oldmemo.migrations.State]: + return cast( + Optional[oldmemo.migrations.State], + await self.__storage.get(LegacyStorageImpl.KEY_STATE, None) + ) + + async def deleteState(self) -> None: + try: + await self.__storage.remove(LegacyStorageImpl.KEY_STATE) + except KeyError: + pass + + async def loadSession( + self, + bare_jid: str, + device_id: int + ) -> Optional[oldmemo.migrations.Session]: + key = "\n".join([ LegacyStorageImpl.KEY_SESSION, bare_jid, str(device_id) ]) - def _callMainThread(self, callback, method, *args, check_jid=None): - if check_jid is None: - d = method(*args) - else: - check_jid_d = self._checkJid(check_jid) - check_jid_d.addCallback(lambda __: method(*args)) - d = check_jid_d + return cast( + Optional[oldmemo.migrations.Session], + await self.__storage.get(key, None) + ) + + async def deleteSession(self, bare_jid: str, device_id: int) -> None: + key = "\n".join([ LegacyStorageImpl.KEY_SESSION, bare_jid, str(device_id) ]) + + try: + await self.__storage.remove(key) + except KeyError: + pass + + async def loadActiveDevices(self, bare_jid: str) -> Optional[List[int]]: + key = "\n".join([ LegacyStorageImpl.KEY_ACTIVE_DEVICES, bare_jid ]) + + return cast( + Optional[List[int]], + await self.__storage.get(key, None) + ) + + async def loadInactiveDevices(self, bare_jid: str) -> Optional[Dict[int, int]]: + key = "\n".join([ LegacyStorageImpl.KEY_INACTIVE_DEVICES, bare_jid ]) + + return cast( + Optional[Dict[int, int]], + await self.__storage.get(key, None) + ) + + async def deleteActiveDevices(self, bare_jid: str) -> None: + key = "\n".join([ LegacyStorageImpl.KEY_ACTIVE_DEVICES, bare_jid ]) + + try: + await self.__storage.remove(key) + except KeyError: + pass + + async def deleteInactiveDevices(self, bare_jid: str) -> None: + key = "\n".join([ LegacyStorageImpl.KEY_INACTIVE_DEVICES, bare_jid ]) - if callback is not None: - d.addCallback(partial(callback, True)) - d.addErrback(partial(callback, False)) + try: + await self.__storage.remove(key) + except KeyError: + pass + + async def loadTrust( + self, + bare_jid: str, + device_id: int + ) -> Optional[oldmemo.migrations.Trust]: + key = "\n".join([ LegacyStorageImpl.KEY_TRUST, bare_jid, str(device_id) ]) + + return cast( + Optional[oldmemo.migrations.Trust], + await self.__storage.get(key, None) + ) + + async def deleteTrust(self, bare_jid: str, device_id: int) -> None: + key = "\n".join([ LegacyStorageImpl.KEY_TRUST, bare_jid, str(device_id) ]) + + try: + await self.__storage.remove(key) + except KeyError: + pass + + async def listJIDs(self) -> Optional[List[str]]: + bare_jids = await self.__storage.get(LegacyStorageImpl.KEY_ALL_JIDS, None) + + return None if bare_jids is None else list(bare_jids) + + async def deleteJIDList(self) -> None: + try: + await self.__storage.remove(LegacyStorageImpl.KEY_ALL_JIDS) + except KeyError: + pass + - def _call(self, callback, method, *args, check_jid=None): - """Create Deferred and add Promise callback to it +async def download_oldmemo_bundle( + client: SatXMPPClient, + xep_0060: XEP_0060, + bare_jid: str, + device_id: int +) -> oldmemo.oldmemo.BundleImpl: + """Download the oldmemo bundle corresponding to a specific device. + + @param client: The client. + @param xep_0060: The XEP-0060 plugin instance to use for pubsub interactions. + @param bare_jid: The bare JID the device belongs to. + @param device_id: The id of the device. + @return: The bundle. + @raise BundleDownloadFailed: if the download failed. Feel free to raise a subclass + instead. + """ + # Bundle downloads are needed by the session manager and for migrations from legacy, + # thus it is made a separate function. - This method use reactor.callLater to launch Deferred in main thread - @param check_jid: run self._checkJid before method - """ - reactor.callFromThread( - self._callMainThread, callback, method, *args, check_jid=check_jid + namespace = oldmemo.oldmemo.NAMESPACE + node = f"eu.siacs.conversations.axolotl.bundles:{device_id}" + + try: + items, __ = await xep_0060.getItems(client, jid.JID(bare_jid), node) + except Exception as e: + raise omemo.BundleDownloadFailed( + f"Bundle download failed for {bare_jid}: {device_id} under namespace" + f" {namespace}" + ) from e + + if len(items) != 1: + raise omemo.BundleDownloadFailed( + f"Bundle download failed for {bare_jid}: {device_id} under namespace" + f" {namespace}: Unexpected number of items retrieved: {len(items)}." + ) + + element = next(iter(domish_to_etree(cast(domish.Element, items[0]))), None) + if element is None: + raise omemo.BundleDownloadFailed( + f"Bundle download failed for {bare_jid}: {device_id} under namespace" + f" {namespace}: Item download succeeded but parsing failed: {element}." ) - def _checkJid(self, bare_jid): - """Check if jid is known, and store it if not - - @param bare_jid(unicode): bare jid to check - @return (D): Deferred fired when jid is stored - """ - if bare_jid in self.all_jids: - return defer.succeed(None) - else: - self.all_jids.add(bare_jid) - d = self.data.force(KEY_ALL_JIDS, self.all_jids) - return d - - def loadOwnData(self, callback): - callback(True, {'own_bare_jid': self.own_bare_jid_s, - 'own_device_id': self.device_id}) - - def storeOwnData(self, callback, own_bare_jid, own_device_id): - if own_bare_jid != self.own_bare_jid_s or own_device_id != self.device_id: - raise exceptions.InternalError('bare jid or device id inconsistency!') - callback(True, None) - - def loadState(self, callback): - self._call(callback, self.data.get, KEY_STATE) - - def storeState(self, callback, state): - self._call(callback, self.data.force, KEY_STATE, state) - - def loadSession(self, callback, bare_jid, device_id): - key = '\n'.join([KEY_SESSION, bare_jid, str(device_id)]) - self._call(callback, self.data.get, key) - - def storeSession(self, callback, bare_jid, device_id, session): - key = '\n'.join([KEY_SESSION, bare_jid, str(device_id)]) - self._call(callback, self.data.force, key, session) - - def deleteSession(self, callback, bare_jid, device_id): - key = '\n'.join([KEY_SESSION, bare_jid, str(device_id)]) - self._call(callback, self.data.remove, key) - - def loadActiveDevices(self, callback, bare_jid): - key = '\n'.join([KEY_ACTIVE_DEVICES, bare_jid]) - self._call(callback, self.data.get, key, {}) - - def loadInactiveDevices(self, callback, bare_jid): - key = '\n'.join([KEY_INACTIVE_DEVICES, bare_jid]) - self._call(callback, self.data.get, key, {}) - - def storeActiveDevices(self, callback, bare_jid, devices): - key = '\n'.join([KEY_ACTIVE_DEVICES, bare_jid]) - self._call(callback, self.data.force, key, devices, check_jid=bare_jid) - - def storeInactiveDevices(self, callback, bare_jid, devices): - key = '\n'.join([KEY_INACTIVE_DEVICES, bare_jid]) - self._call(callback, self.data.force, key, devices, check_jid=bare_jid) - - def storeTrust(self, callback, bare_jid, device_id, trust): - key = '\n'.join([KEY_TRUST, bare_jid, str(device_id)]) - self._call(callback, self.data.force, key, trust) - - def loadTrust(self, callback, bare_jid, device_id): - key = '\n'.join([KEY_TRUST, bare_jid, str(device_id)]) - self._call(callback, self.data.get, key) - - def listJIDs(self, callback): - if callback is not None: - callback(True, self.all_jids) - - def _deleteJID_logResults(self, results): - failed = [success for success, __ in results if not success] - if failed: - log.warning( - "delete JID failed for {failed_count} on {total_count} operations" - .format(failed_count=len(failed), total_count=len(results))) - else: - log.info( - "Delete JID operation succeed ({total_count} operations)." - .format(total_count=len(results))) - - def _deleteJID_gotDevices(self, results, bare_jid): - assert len(results) == 2 - active_success, active_devices = results[0] - inactive_success, inactive_devices = results[0] - d_list = [] - for success, devices in results: - if not success: - log.warning("Can't retrieve devices for {bare_jid}: {reason}" - .format(bare_jid=bare_jid, reason=active_devices)) - else: - for device_id in devices: - for key in (KEY_SESSION, KEY_TRUST): - k = '\n'.join([key, bare_jid, str(device_id)]) - d_list.append(self.data.remove(k)) - - d_list.append(self.data.remove(KEY_ACTIVE_DEVICES, bare_jid)) - d_list.append(self.data.remove(KEY_INACTIVE_DEVICES, bare_jid)) - d_list.append(lambda __: self.all_jids.discard(bare_jid)) - # FIXME: there is a risk of race condition here, - # if self.all_jids is modified between discard and force) - d_list.append(lambda __: self.data.force(KEY_ALL_JIDS, self.all_jids)) - d = defer.DeferredList(d_list) - d.addCallback(self._deleteJID_logResults) - return d - - def _deleteJID(self, callback, bare_jid): - d_list = [] - - key = '\n'.join([KEY_ACTIVE_DEVICES, bare_jid]) - d_list.append(self.data.get(key, [])) - - key = '\n'.join([KEY_INACTIVE_DEVICES, bare_jid]) - d_inactive = self.data.get(key, {}) - # inactive devices are returned as a dict mapping from devices_id to timestamp - # but we only need devices ids - d_inactive.addCallback(lambda devices: [k for k, __ in devices]) - - d_list.append(d_inactive) - d = defer.DeferredList(d_list) - d.addCallback(self._deleteJID_gotDevices, bare_jid) - if callback is not None: - self.setCb(d, callback) - - def deleteJID(self, callback, bare_jid): - """Retrieve all (in)actives devices of bare_jid, and delete all related keys""" - reactor.callFromThread(self._deleteJID, callback, bare_jid) + try: + return oldmemo.etree.parse_bundle(element, bare_jid, device_id) + except Exception as e: + raise omemo.BundleDownloadFailed( + f"Bundle parsing failed for {bare_jid}: {device_id} under namespace" + f" {namespace}" + ) from e -class SatOTPKPolicy(omemo.DefaultOTPKPolicy): - pass +def make_session_manager(sat: SAT, profile: str) -> Type[omemo.SessionManager]: + """ + @param sat: The SAT instance. + @param profile: The profile. + @return: A non-abstract subclass of :class:`~omemo.session_manager.SessionManager` + with XMPP interactions and trust handled via the SAT instance. + """ + + client = sat.getClient(profile) + xep_0060 = cast(XEP_0060, sat.plugins["XEP-0060"]) + + class SessionManagerImpl(omemo.SessionManager): + """ + Session manager implementation handling XMPP interactions and trust via an + instance of :class:`~sat.core.sat_main.SAT`. + """ + + @staticmethod + async def _upload_bundle(bundle: omemo.Bundle) -> None: + if isinstance(bundle, twomemo.twomemo.BundleImpl): + element = twomemo.etree.serialize_bundle(bundle) + + node = "urn:xmpp:omemo:2:bundles" + try: + await xep_0060.sendItem( + client, + client.jid.userhostJID(), + node, + etree_to_domish(element), + item_id=str(bundle.device_id), + extra={ + xep_0060.EXTRA_PUBLISH_OPTIONS: { + xep_0060.OPT_MAX_ITEMS: "max" + }, + xep_0060.EXTRA_ON_PRECOND_NOT_MET: "raise" + } + ) + except (error.StanzaError, Exception) as e: + if ( + isinstance(e, error.StanzaError) + and e.condition == "conflict" + and e.appCondition is not None + # pylint: disable=no-member + and e.appCondition.name == "precondition-not-met" + ): + # publish options couldn't be set on the fly, manually reconfigure + # the node and publish again + raise omemo.BundleUploadFailed( + f"precondition-not-met: {bundle}" + ) from e + # TODO: What can I do here? The correct node configuration is a + # MUST in the XEP. + + raise omemo.BundleUploadFailed( + f"Bundle upload failed: {bundle}" + ) from e + + return + + if isinstance(bundle, oldmemo.oldmemo.BundleImpl): + element = oldmemo.etree.serialize_bundle(bundle) + + node = f"eu.siacs.conversations.axolotl.bundles:{bundle.device_id}" + try: + await xep_0060.sendItem( + client, + client.jid.userhostJID(), + node, + etree_to_domish(element), + item_id=xep_0060.ID_SINGLETON, + extra={ + xep_0060.EXTRA_PUBLISH_OPTIONS: { xep_0060.OPT_MAX_ITEMS: 1 }, + xep_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options" + } + ) + except Exception as e: + raise omemo.BundleUploadFailed( + f"Bundle upload failed: {bundle}" + ) from e + + return + + raise omemo.UnknownNamespace(f"Unknown namespace: {bundle.namespace}") + + @staticmethod + async def _download_bundle( + namespace: str, + bare_jid: str, + device_id: int + ) -> omemo.Bundle: + if namespace == twomemo.twomemo.NAMESPACE: + node = "urn:xmpp:omemo:2:bundles" + + try: + items, __ = await xep_0060.getItems( + client, + jid.JID(bare_jid), + node, + max_items=None, + item_ids=[ str(device_id) ] + ) + except Exception as e: + raise omemo.BundleDownloadFailed( + f"Bundle download failed for {bare_jid}: {device_id} under" + f" namespace {namespace}" + ) from e + + if len(items) != 1: + raise omemo.BundleDownloadFailed( + f"Bundle download failed for {bare_jid}: {device_id} under" + f" namespace {namespace}: Unexpected number of items retrieved:" + f" {len(items)}." + ) + + element = next( + iter(domish_to_etree(cast(domish.Element, items[0]))), + None + ) + if element is None: + raise omemo.BundleDownloadFailed( + f"Bundle download failed for {bare_jid}: {device_id} under" + f" namespace {namespace}: Item download succeeded but parsing" + f" failed: {element}." + ) + + try: + return twomemo.etree.parse_bundle(element, bare_jid, device_id) + except Exception as e: + raise omemo.BundleDownloadFailed( + f"Bundle parsing failed for {bare_jid}: {device_id} under" + f" namespace {namespace}" + ) from e + + if namespace == oldmemo.oldmemo.NAMESPACE: + return await download_oldmemo_bundle( + client, + xep_0060, + bare_jid, + device_id + ) + + raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}") + + @staticmethod + async def _delete_bundle(namespace: str, device_id: int) -> None: + if namespace == twomemo.twomemo.NAMESPACE: + node = "urn:xmpp:omemo:2:bundles" + + try: + await xep_0060.retractItems( + client, + client.jid.userhostJID(), + node, + [ str(device_id) ], + notify=False + ) + except Exception as e: + raise omemo.BundleDeletionFailed( + f"Bundle deletion failed for {device_id} under namespace" + f" {namespace}" + ) from e + + return + + if namespace == oldmemo.oldmemo.NAMESPACE: + node = f"eu.siacs.conversations.axolotl.bundles:{device_id}" + + try: + await xep_0060.deleteNode(client, client.jid.userhostJID(), node) + except Exception as e: + raise omemo.BundleDeletionFailed( + f"Bundle deletion failed for {device_id} under namespace" + f" {namespace}" + ) from e + + return + + raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}") + + @staticmethod + async def _upload_device_list( + namespace: str, + device_list: Dict[int, Optional[str]] + ) -> None: + element: Optional[ET.Element] = None + node: Optional[str] = None + + if namespace == twomemo.twomemo.NAMESPACE: + element = twomemo.etree.serialize_device_list(device_list) + node = TWOMEMO_DEVICE_LIST_NODE + if namespace == oldmemo.oldmemo.NAMESPACE: + element = oldmemo.etree.serialize_device_list(device_list) + node = OLDMEMO_DEVICE_LIST_NODE + + if element is None or node is None: + raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}") + + try: + await xep_0060.sendItem( + client, + client.jid.userhostJID(), + node, + etree_to_domish(element), + item_id=xep_0060.ID_SINGLETON, + extra={ + xep_0060.EXTRA_PUBLISH_OPTIONS: { + xep_0060.OPT_MAX_ITEMS: 1, + xep_0060.OPT_ACCESS_MODEL: "open" + }, + xep_0060.EXTRA_ON_PRECOND_NOT_MET: "raise" + } + ) + except (error.StanzaError, Exception) as e: + if ( + isinstance(e, error.StanzaError) + and e.condition == "conflict" + and e.appCondition is not None + # pylint: disable=no-member + and e.appCondition.name == "precondition-not-met" + ): + # publish options couldn't be set on the fly, manually reconfigure the + # node and publish again + raise omemo.DeviceListUploadFailed( + f"precondition-not-met for namespace {namespace}" + ) from e + # TODO: What can I do here? The correct node configuration is a MUST + # in the XEP. + + raise omemo.DeviceListUploadFailed( + f"Device list upload failed for namespace {namespace}" + ) from e + + @staticmethod + async def _download_device_list( + namespace: str, + bare_jid: str + ) -> Dict[int, Optional[str]]: + node: Optional[str] = None + + if namespace == twomemo.twomemo.NAMESPACE: + node = TWOMEMO_DEVICE_LIST_NODE + if namespace == oldmemo.oldmemo.NAMESPACE: + node = OLDMEMO_DEVICE_LIST_NODE + + if node is None: + raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}") + + try: + items, __ = await xep_0060.getItems(client, jid.JID(bare_jid), node) + except exceptions.NotFound: + return {} + except Exception as e: + raise omemo.DeviceListDownloadFailed( + f"Device list download failed for {bare_jid} under namespace" + f" {namespace}" + ) from e + + if len(items) != 1: + raise omemo.DeviceListDownloadFailed( + f"Device list download failed for {bare_jid} under namespace" + f" {namespace}: Unexpected number of items retrieved: {len(items)}." + ) + + element = next(iter(domish_to_etree(cast(domish.Element, items[0]))), None) + if element is None: + raise omemo.DeviceListDownloadFailed( + f"Device list download failed for {bare_jid} under namespace" + f" {namespace}: Item download succeeded but parsing failed:" + f" {element}." + ) + + try: + if namespace == twomemo.twomemo.NAMESPACE: + return twomemo.etree.parse_device_list(element) + if namespace == oldmemo.oldmemo.NAMESPACE: + return oldmemo.etree.parse_device_list(element) + except Exception as e: + raise omemo.DeviceListDownloadFailed( + f"Device list download failed for {bare_jid} under namespace" + f" {namespace}" + ) from e + + raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}") + + @staticmethod + def _evaluate_custom_trust_level(trust_level_name: str) -> omemo.TrustLevel: + try: + return TrustLevel(trust_level_name).to_omemo_trust_level() + except ValueError as e: + raise omemo.UnknownTrustLevel( + f"Unknown trust level name {trust_level_name}" + ) from e + + async def _make_trust_decision( + self, + undecided: FrozenSet[omemo.DeviceInformation], + identifier: Optional[str] + ) -> None: + if identifier is None: + raise omemo.TrustDecisionFailed( + "The identifier must contain the feedback JID." + ) + + # The feedback JID is transferred via the identifier + feedback_jid = jid.JID(identifier).userhostJID() + + # Get the name of the trust model to use + trust_model = cast(str, sat.memory.getParamA( + PARAM_NAME, + PARAM_CATEGORY, + profile_key=cast(str, client.profile) + )) + + # Under the BTBV trust model, if at least one device of a bare JID is manually + # trusted or distrusted, the trust model is "downgraded" to manual trust. + # Thus, we can separate bare JIDs into two pools here, one pool of bare JIDs + # for which BTBV is active, and one pool of bare JIDs for which manual trust + # is used. + bare_jids = { device.bare_jid for device in undecided } + + btbv_bare_jids: Set[str] = set() + manual_trust_bare_jids: Set[str] = set() + + if trust_model == "btbv": + # For each bare JID, decide whether BTBV or manual trust applies + for bare_jid in bare_jids: + # Get all known devices belonging to the bare JID + devices = await self.get_device_information(bare_jid) + + # If the trust levels of all devices correspond to those used by BTBV, + # BTBV applies. Otherwise, fall back to manual trust. + if all(TrustLevel(device.trust_level_name) in { + TrustLevel.UNDECIDED, + TrustLevel.BLINDLY_TRUSTED + } for device in devices): + btbv_bare_jids.add(bare_jid) + else: + manual_trust_bare_jids.add(bare_jid) + + if trust_model == "manual": + manual_trust_bare_jids = bare_jids + + # With the JIDs sorted into their respective pools, the undecided devices can + # be categorized too + blindly_trusted_devices = \ + { dev for dev in undecided if dev.bare_jid in btbv_bare_jids } + manually_trusted_devices = \ + { dev for dev in undecided if dev.bare_jid in manual_trust_bare_jids } + + # Blindly trust devices handled by BTBV + if len(blindly_trusted_devices) > 0: + for device in blindly_trusted_devices: + await self.set_trust( + device.bare_jid, + device.identity_key, + TrustLevel.BLINDLY_TRUSTED.name + ) + + blindly_trusted_devices_stringified = ", ".join([ + f"device {device.device_id} of {device.bare_jid} under namespace" + f" {device.namespaces}" + for device + in blindly_trusted_devices + ]) + + client.feedback( + feedback_jid, + D_( + "Not all destination devices are trusted, unknown devices will be" + " blindly trusted due to the Blind Trust Before Verification" + " policy. If you want a more secure workflow, please activate the" + " \"manual\" policy in the settings' \"Security\" tab.\nFollowing" + " devices have been automatically trusted:" + f" {blindly_trusted_devices_stringified}." + ) + ) + + # Prompt the user for manual trust decisions on the devices handled by manual + # trust + if len(manually_trusted_devices) > 0: + client.feedback( + feedback_jid, + D_( + "Not all destination devices are trusted, we can't encrypt" + " message in such a situation. Please indicate if you trust" + " those devices or not in the trust manager before we can" + " send this message." + ) + ) + await self.__prompt_manual_trust( + frozenset(manually_trusted_devices), + feedback_jid + ) + + @staticmethod + async def _send_message(message: omemo.Message, bare_jid: str) -> None: + element: Optional[ET.Element] = None + + if message.namespace == twomemo.twomemo.NAMESPACE: + element = twomemo.etree.serialize_message(message) + if message.namespace == oldmemo.oldmemo.NAMESPACE: + element = oldmemo.etree.serialize_message(message) + + if element is None: + raise omemo.UnknownNamespace(f"Unknown namespace: {message.namespace}") + + # TODO: Untested + message_data = client.generateMessageXML(MessageData({ + "from": client.jid, + "to": jid.JID(bare_jid), + "uid": str(uuid.uuid4()), + "message": {}, + "subject": {}, + "type": C.MESS_TYPE_CHAT, + "extra": {}, + "timestamp": time.time() + })) + + message_data["xml"].addChild(etree_to_domish(element)) + + try: + await client.send(message_data["xml"]) + except Exception as e: + raise omemo.MessageSendingFailed() from e + + async def __prompt_manual_trust( + self, + undecided: FrozenSet[omemo.DeviceInformation], + feedback_jid: jid.JID + ) -> None: + """Asks the user to decide on the manual trust level of a set of devices. + + Blocks until the user has made a decision and updates the trust levels of all + devices using :meth:`set_trust`. + + @param undecided: The set of devices to prompt manual trust for. + @param feedback_jid: The bare JID to redirect feedback to. In case of a one to + one message, the recipient JID. In case of a MUC message, the room JID. + @raise TrustDecisionFailed: if the user cancels the prompt. + """ + + # This session manager handles encryption with both twomemo and oldmemo, but + # both are currently registered as different plugins and the `deferXMLUI` + # below requires a single namespace identifying the encryption plugin. Thus, + # get the namespace of the requested encryption method from the encryption + # session using the feedback JID. + encryption = client.encryption.getSession(feedback_jid) + if encryption is None: + raise omemo.TrustDecisionFailed( + f"Encryption not requested for {feedback_jid.userhost()}." + ) + + namespace = encryption["plugin"].namespace + + # Casting this to Any, otherwise all calls on the variable cause type errors + # pylint: disable=no-member + trust_ui = cast(Any, xml_tools.XMLUI( + panel_type=C.XMLUI_FORM, + title=D_("OMEMO trust management"), + submit_id="" + )) + trust_ui.addText(D_( + "This is OMEMO trusting system. You'll see below the devices of your " + "contacts, and a checkbox to trust them or not. A trusted device " + "can read your messages in plain text, so be sure to only validate " + "devices that you are sure are belonging to your contact. It's better " + "to do this when you are next to your contact and their device, so " + "you can check the \"fingerprint\" (the number next to the device) " + "yourself. Do *not* validate a device if the fingerprint is wrong!" + )) + + own_device, __ = await self.get_own_device_information() + + trust_ui.changeContainer("label") + trust_ui.addLabel(D_("This device ID")) + trust_ui.addText(str(own_device.device_id)) + trust_ui.addLabel(D_("This device's fingerprint")) + trust_ui.addText(" ".join(self.format_identity_key(own_device.identity_key))) + trust_ui.addEmpty() + trust_ui.addEmpty() + + # At least sort the devices by bare JID such that they aren't listed + # completely random + undecided_ordered = sorted(undecided, key=lambda device: device.bare_jid) + + for index, device in enumerate(undecided_ordered): + trust_ui.addLabel(D_("Contact")) + trust_ui.addJid(jid.JID(device.bare_jid)) + trust_ui.addLabel(D_("Device ID")) + trust_ui.addText(str(device.device_id)) + trust_ui.addLabel(D_("Fingerprint")) + trust_ui.addText(" ".join(self.format_identity_key(device.identity_key))) + trust_ui.addLabel(D_("Trust this device?")) + trust_ui.addBool(f"trust_{index}", value=C.boolConst(False)) + trust_ui.addEmpty() + trust_ui.addEmpty() + + trust_ui_result = await xml_tools.deferXMLUI( + sat, + trust_ui, + action_extra={ "meta_encryption_trust": namespace }, + profile=profile + ) + + if C.bool(trust_ui_result.get("cancelled", "false")): + raise omemo.TrustDecisionFailed("Trust UI cancelled.") + + data_form_result = cast(Dict[str, str], xml_tools.XMLUIResult2DataFormResult( + trust_ui_result + )) + + for key, value in data_form_result.items(): + if not key.startswith("trust_"): + continue + + device = undecided_ordered[int(key[len("trust_"):])] + trust = C.bool(value) + + await self.set_trust( + device.bare_jid, + device.identity_key, + TrustLevel.TRUSTED.name if trust else TrustLevel.DISTRUSTED.name + ) + + return SessionManagerImpl -class OmemoSession: - """Wrapper to use omemo.OmemoSession with Deferred""" - - def __init__(self, session): - self._session = session - - @property - def republish_bundle(self): - return self._session.republish_bundle - - @property - def public_bundle(self): - return self._session.public_bundle +async def prepare_for_profile( + sat: SAT, + profile: str, + initial_own_label: Optional[str], + signed_pre_key_rotation_period: int = 7 * 24 * 60 * 60, + pre_key_refill_threshold: int = 99, + max_num_per_session_skipped_keys: int = 1000, + max_num_per_message_skipped_keys: Optional[int] = None +) -> omemo.SessionManager: + """Prepare the OMEMO library (storage, backends, core) for a specific profile. - @classmethod - def create(cls, client, storage, my_device_id = None): - omemo_session_p = omemo.SessionManager.create( - storage, - SatOTPKPolicy, - omemo_backend, - client.jid.userhost(), - my_device_id) - d = promise2Deferred(omemo_session_p) - d.addCallback(lambda session: cls(session)) - return d - - def newDeviceList(self, jid, devices): - jid = jid.userhost() - new_device_p = self._session.newDeviceList(jid, devices) - return promise2Deferred(new_device_p) + @param sat: The SAT instance. + @param profile: The profile. + @param initial_own_label: The initial (optional) label to assign to this device if + supported by any of the backends. + @param signed_pre_key_rotation_period: The rotation period for the signed pre key, in + seconds. The rotation period is recommended to be between one week (the default) + and one month. + @param pre_key_refill_threshold: The number of pre keys that triggers a refill to 100. + Defaults to 99, which means that each pre key gets replaced with a new one right + away. The threshold can not be configured to lower than 25. + @param max_num_per_session_skipped_keys: The maximum number of skipped message keys to + keep around per session. Once the maximum is reached, old message keys are deleted + to make space for newer ones. Accessible via + :attr:`max_num_per_session_skipped_keys`. + @param max_num_per_message_skipped_keys: The maximum number of skipped message keys to + accept in a single message. When set to ``None`` (the default), this parameter + defaults to the per-session maximum (i.e. the value of the + ``max_num_per_session_skipped_keys`` parameter). This parameter may only be 0 if + the per-session maximum is 0, otherwise it must be a number between 1 and the + per-session maximum. Accessible via :attr:`max_num_per_message_skipped_keys`. + @return: A session manager with ``urn:xmpp:omemo:2`` and + ``eu.siacs.conversations.axolotl`` capabilities, specifically for the given + profile. + @raise BundleUploadFailed: if a bundle upload failed. Forwarded from + :meth:`~omemo.session_manager.SessionManager.create`. + @raise BundleDownloadFailed: if a bundle download failed. Forwarded from + :meth:`~omemo.session_manager.SessionManager.create`. + @raise BundleDeletionFailed: if a bundle deletion failed. Forwarded from + :meth:`~omemo.session_manager.SessionManager.create`. + @raise DeviceListUploadFailed: if a device list upload failed. Forwarded from + :meth:`~omemo.session_manager.SessionManager.create`. + @raise DeviceListDownloadFailed: if a device list download failed. Forwarded from + :meth:`~omemo.session_manager.SessionManager.create`. + """ - def getDevices(self, bare_jid=None): - bare_jid = bare_jid.userhost() - get_devices_p = self._session.getDevices(bare_jid=bare_jid) - return promise2Deferred(get_devices_p) + client = sat.getClient(profile) + xep_0060 = cast(XEP_0060, sat.plugins["XEP-0060"]) - def buildSession(self, bare_jid, device, bundle): - bare_jid = bare_jid.userhost() - build_session_p = self._session.buildSession(bare_jid, int(device), bundle) - return promise2Deferred(build_session_p) - - def deleteSession(self, bare_jid, device): - bare_jid = bare_jid.userhost() - delete_session_p = self._session.deleteSession( - bare_jid=bare_jid, device=int(device)) - return promise2Deferred(delete_session_p) - - def encryptMessage(self, bare_jids, message, bundles=None, expect_problems=None): - """Encrypt a message + storage = StorageImpl(profile) - @param bare_jids(iterable[jid.JID]): destinees of the message - @param message(unicode): message to encode - @param bundles(dict[jid.JID, dict[int, ExtendedPublicBundle]): - entities => devices => bundles map - @return D(dict): encryption data - """ - bare_jids = [e.userhost() for e in bare_jids] - if bundles is not None: - bundles = {e.userhost(): v for e, v in bundles.items()} - encrypt_mess_p = self._session.encryptMessage( - bare_jids=bare_jids, - plaintext=message.encode(), - bundles=bundles, - expect_problems=expect_problems) - return promise2Deferred(encrypt_mess_p) + # TODO: Untested + await oldmemo.migrations.migrate( + LegacyStorageImpl(profile, client.jid.userhost()), + storage, + # TODO: Do we want BLINDLY_TRUSTED or TRUSTED here? + TrustLevel.BLINDLY_TRUSTED.name, + TrustLevel.UNDECIDED.name, + TrustLevel.DISTRUSTED.name, + lambda bare_jid, device_id: download_oldmemo_bundle( + client, + xep_0060, + bare_jid, + device_id + ) + ) - def encryptRatchetForwardingMessage( - self, bare_jids, bundles=None, expect_problems=None): - bare_jids = [e.userhost() for e in bare_jids] - if bundles is not None: - bundles = {e.userhost(): v for e, v in bundles.items()} - encrypt_ratchet_fwd_p = self._session.encryptRatchetForwardingMessage( - bare_jids=bare_jids, - bundles=bundles, - expect_problems=expect_problems) - return promise2Deferred(encrypt_ratchet_fwd_p) - - def decryptMessage(self, bare_jid, device, iv, message, is_pre_key_message, - ciphertext, additional_information=None, allow_untrusted=False): - bare_jid = bare_jid.userhost() - decrypt_mess_p = self._session.decryptMessage( - bare_jid=bare_jid, - device=int(device), - iv=iv, - message=message, - is_pre_key_message=is_pre_key_message, - ciphertext=ciphertext, - additional_information=additional_information, - allow_untrusted=allow_untrusted + session_manager = await make_session_manager(sat, profile).create( + [ + twomemo.Twomemo( + storage, + max_num_per_session_skipped_keys, + max_num_per_message_skipped_keys + ), + oldmemo.Oldmemo( + storage, + max_num_per_session_skipped_keys, + max_num_per_message_skipped_keys ) - return promise2Deferred(decrypt_mess_p) + ], + storage, + client.jid.userhost(), + initial_own_label, + TrustLevel.UNDECIDED.value, + signed_pre_key_rotation_period, + pre_key_refill_threshold, + omemo.AsyncFramework.TWISTED + ) - def decryptRatchetForwardingMessage( - self, bare_jid, device, iv, message, is_pre_key_message, - additional_information=None, allow_untrusted=False): - bare_jid = bare_jid.userhost() - decrypt_ratchet_fwd_p = self._session.decryptRatchetForwardingMessage( - bare_jid=bare_jid, - device=int(device), - iv=iv, - message=message, - is_pre_key_message=is_pre_key_message, - additional_information=additional_information, - allow_untrusted=allow_untrusted - ) - return promise2Deferred(decrypt_ratchet_fwd_p) + # This shouldn't hurt here since we're not running on overly constrainted devices. + # TODO: Consider ensuring data consistency regularly/in response to certain events + await session_manager.ensure_data_consistency() - def setTrust(self, bare_jid, device, key, trusted): - bare_jid = bare_jid.userhost() - setTrust_p = self._session.setTrust( - bare_jid=bare_jid, - device=int(device), - key=key, - trusted=trusted, - ) - return promise2Deferred(setTrust_p) + # TODO: Correct entering/leaving of the history synchronization mode isn't terribly + # important for now, since it only prevents an extremely unlikely race condition of + # multiple devices choosing the same pre key for new sessions while the device was + # offline. I don't believe other clients seriously defend against that race condition + # either. In the long run, it might still be cool to have triggers for when history + # sync starts and ends (MAM, MUC catch-up, etc.) and to react to those triggers. + await session_manager.after_history_sync() + + return session_manager + - def resetTrust(self, bare_jid, device): - bare_jid = bare_jid.userhost() - resetTrust_p = self._session.resetTrust( - bare_jid=bare_jid, - device=int(device), - ) - return promise2Deferred(resetTrust_p) - - def getTrustForJID(self, bare_jid): - bare_jid = bare_jid.userhost() - get_trust_p = self._session.getTrustForJID(bare_jid=bare_jid) - return promise2Deferred(get_trust_p) +DEFAULT_TRUST_MODEL_PARAM = f""" +<params> +<individual> +<category name="{PARAM_CATEGORY}" label={quoteattr(D_('Security'))}> + <param name="{PARAM_NAME}" + label={quoteattr(D_('OMEMO default trust policy'))} + type="list" security="3"> + <option value="manual" label={quoteattr(D_('Manual trust (more secure)'))} /> + <option value="btbv" + label={quoteattr(D_('Blind Trust Before Verification (more user friendly)'))} + selected="true" /> + </param> +</category> +</individual> +</params> +""" class OMEMO: + """ + Plugin equipping Libervia with OMEMO capabilities under the (modern) + ``urn:xmpp:omemo:2`` namespace and the (legacy) ``eu.siacs.conversations.axolotl`` + namespace. Both versions of the protocol are handled by this plugin and compatibility + between the two is maintained. MUC messages are supported next to one to one messages. + For trust management, the two trust models "BTBV" and "manual" are supported. + """ - params = """ - <params> - <individual> - <category name="{category_name}" label="{category_label}"> - <param name="{param_name}" label={param_label} type="list" security="3"> - <option value="manual" label={opt_manual_lbl} /> - <option value="btbv" label={opt_btbv_lbl} selected="true" /> - </param> - </category> - </individual> - </params> - """.format( - category_name=PARAM_CATEGORY, - category_label=D_("Security"), - param_name=PARAM_NAME, - param_label=quoteattr(D_("OMEMO default trust policy")), - opt_manual_lbl=quoteattr(D_("Manual trust (more secure)")), - opt_btbv_lbl=quoteattr( - D_("Blind Trust Before Verification (more user friendly)")), + # For MUC/MIX message stanzas, the <to/> affix is a MUST + SCE_PROFILE_GROUPCHAT = SCEProfile( + rpad_policy=SCEAffixPolicy.REQUIRED, + time_policy=SCEAffixPolicy.OPTIONAL, + to_policy=SCEAffixPolicy.REQUIRED, + from_policy=SCEAffixPolicy.OPTIONAL, + custom_policies={} + ) + + # For everything but MUC/MIX message stanzas, the <to/> affix is a MAY + SCE_PROFILE = SCEProfile( + rpad_policy=SCEAffixPolicy.REQUIRED, + time_policy=SCEAffixPolicy.OPTIONAL, + to_policy=SCEAffixPolicy.OPTIONAL, + from_policy=SCEAffixPolicy.OPTIONAL, + custom_policies={} ) - def __init__(self, host): - log.info(_("OMEMO plugin initialization (omemo module v{version})").format( - version=omemo.__version__)) - version = tuple(map(int, omemo.__version__.split('.')[:3])) - if version < OMEMO_MIN_VER: - log.warning(_( - "Your version of omemo module is too old: {v[0]}.{v[1]}.{v[2]} is " - "minimum required, please update.").format(v=OMEMO_MIN_VER)) - raise exceptions.CancelError("module is too old") - self.host = host - host.memory.updateParams(self.params) - self._p_hints = host.plugins["XEP-0334"] - self._p_carbons = host.plugins["XEP-0280"] - self._p = host.plugins["XEP-0060"] - self._m = host.plugins.get("XEP-0045") - self._sid = host.plugins.get("XEP-0359") - host.trigger.add("messageReceived", self._messageReceivedTrigger, priority=100050) - host.trigger.add("sendMessageData", self._sendMessageDataTrigger) - self.host.registerEncryptionPlugin(self, "OMEMO", NS_OMEMO, 100) - pep = host.plugins['XEP-0163'] - pep.addPEPEvent( - "OMEMO_DEVICES", NS_OMEMO_DEVICES, - lambda itemsEvent, profile: defer.ensureDeferred( - self.onNewDevices(itemsEvent, profile)) + def __init__(self, sat: SAT) -> None: + """ + @param sat: The SAT instance. + """ + + self.__sat = sat + + # Add configuration option to choose between manual trust and BTBV as the trust + # model + sat.memory.updateParams(DEFAULT_TRUST_MODEL_PARAM) + + # Plugins + self.__xep_0045 = cast(Optional[XEP_0045], sat.plugins.get("XEP-0045")) + self.__xep_0334 = cast(XEP_0334, sat.plugins["XEP-0334"]) + self.__xep_0359 = cast(Optional[XEP_0359], sat.plugins.get("XEP-0359")) + self.__xep_0420 = cast(XEP_0420, sat.plugins["XEP-0420"]) + + # In contrast to one to one messages, MUC messages are reflected to the sender. + # Thus, the sender does not add messages to their local message log when sending + # them, but when the reflection is received. This approach does not pair well with + # OMEMO, since for security reasons it is forbidden to encrypt messages for the + # own device. Thus, when the reflection of an OMEMO message is received, it can't + # be decrypted and added to the local message log as usual. To counteract this, + # the plaintext of encrypted messages sent to MUCs are cached in this field, such + # that when the reflection is received, the plaintext can be looked up from the + # cache and added to the local message log. + # TODO: The old plugin expired this cache after some time. I'm not sure that's + # really necessary. + self.__muc_plaintext_cache: Dict[MUCPlaintextCacheKey, bytes] = {} + + # Mapping from profile name to corresponding session manager + self.__session_managers: Dict[str, omemo.SessionManager] = {} + + # Calls waiting for a specific session manager to be built + self.__session_manager_waiters: Dict[str, List[defer.Deferred]] = {} + + # These triggers are used by oldmemo, which doesn't do SCE and only applies to + # messages + sat.trigger.add( + "messageReceived", + self.__message_received_trigger, + priority=100050 ) + sat.trigger.add( + "sendMessageData", + self.__send_message_data_trigger, + priority=100050 + ) + + # These triggers are used by twomemo, which does do SCE + sat.trigger.add("send", self.__send_trigger, priority=0) + # TODO: Add new triggers here for freshly received and about-to-be-sent stanzas, + # including IQs. + + # Give twomemo a (slightly) higher priority than oldmemo + sat.registerEncryptionPlugin(self, "TWOMEMO", twomemo.twomemo.NAMESPACE, 101) + sat.registerEncryptionPlugin(self, "OLDMEMO", oldmemo.oldmemo.NAMESPACE, 100) + + xep_0163 = cast(XEP_0163, sat.plugins["XEP-0163"]) + xep_0163.addPEPEvent( + "TWOMEMO_DEVICES", + TWOMEMO_DEVICE_LIST_NODE, + lambda items_event, profile: defer.ensureDeferred( + self.__on_device_list_update(items_event, profile) + ) + ) + xep_0163.addPEPEvent( + "OLDMEMO_DEVICES", + OLDMEMO_DEVICE_LIST_NODE, + lambda items_event, profile: defer.ensureDeferred( + self.__on_device_list_update(items_event, profile) + ) + ) + try: - self.text_cmds = self.host.plugins[C.TEXT_CMDS] + self.__text_commands = cast(TextCommands, sat.plugins[C.TEXT_CMDS]) except KeyError: log.info(_("Text commands not available")) else: - self.text_cmds.registerTextCommands(self) + self.__text_commands.registerTextCommands(self) - # Text commands # + async def profileConnected( # pylint: disable=invalid-name + self, + client: SatXMPPClient + ) -> None: + """ + @param client: The client. + """ - async def cmd_omemo_reset(self, client, mess_data): - """reset OMEMO session (use only if encryption is broken) + await self.__prepare_for_profile(cast(str, client.profile)) + + async def cmd_omemo_reset( + self, + client: SatXMPPClient, + mess_data: MessageData + ) -> Literal[False]: + """Reset all sessions of devices that belong to the recipient of ``mess_data``. - @command(one2one): + This must only be callable manually by the user. Use this when a session is + apparently broken, i.e. sending and receiving encrypted messages doesn't work and + something being wrong has been confirmed manually with the recipient. + + @param client: The client. + @param mess_data: The message data, whose ``to`` attribute will be the bare JID to + reset all sessions with. + @return: The constant value ``False``, indicating to the text commands plugin that + the message is not supposed to be sent. """ - if not client.encryption.isEncryptionRequested(mess_data, NS_OMEMO): - feedback = _( - "You need to have OMEMO encryption activated to reset the session") - self.text_cmds.feedBack(client, feedback, mess_data) + + twomemo_requested = \ + client.encryption.isEncryptionRequested(mess_data, twomemo.twomemo.NAMESPACE) + oldmemo_requested = \ + client.encryption.isEncryptionRequested(mess_data, oldmemo.oldmemo.NAMESPACE) + + if not (twomemo_requested or oldmemo_requested): + self.__text_commands.feedBack( + client, + _("You need to have OMEMO encryption activated to reset the session"), + mess_data + ) return False - to_jid = mess_data["to"].userhostJID() - session = client._xep_0384_session - devices = await session.getDevices(to_jid) + + bare_jid = mess_data["to"].userhost() + + session_manager = await self.__prepare_for_profile(client.profile) + devices = await session_manager.get_device_information(bare_jid) - for device in devices['active']: - log.debug(f"deleting session for device {device}") - await session.deleteSession(to_jid, device=device) + for device in devices: + log.debug(f"Replacing sessions with device {device}") + await session_manager.replace_sessions(device) - log.debug("Sending an empty message to trigger key exchange") - await client.sendMessage(to_jid, {'': ''}) + self.__text_commands.feedBack( + client, + _("OMEMO session has been reset"), + mess_data + ) - feedback = _("OMEMO session has been reset") - self.text_cmds.feedBack(client, feedback, mess_data) return False - async def trustUICb( - self, xmlui_data, trust_data, expect_problems=None, profile=C.PROF_KEY_NONE): - if C.bool(xmlui_data.get('cancelled', 'false')): - return {} - client = self.host.getClient(profile) - session = client._xep_0384_session - stored_data = client._xep_0384_data - manual_trust = await stored_data.get(KEY_MANUAL_TRUST, set()) - auto_trusted_cache = {} - answer = xml_tools.XMLUIResult2DataFormResult(xmlui_data) - blind_trust = C.bool(answer.get('blind_trust', C.BOOL_FALSE)) - for key, value in answer.items(): - if key.startswith('trust_'): - trust_id = key[6:] - else: - continue - data = trust_data[trust_id] - if blind_trust: - # user request to restore blind trust for this entity - # so if the entity is present in manual trust, we remove it - if data["jid"].full() in manual_trust: - manual_trust.remove(data["jid"].full()) - await stored_data.aset(KEY_MANUAL_TRUST, manual_trust) - elif data["jid"].full() not in manual_trust: - # validating this trust UI implies that we activate manual mode for - # this entity (used for BTBV policy) - manual_trust.add(data["jid"].full()) - await stored_data.aset(KEY_MANUAL_TRUST, manual_trust) - trust = C.bool(value) + async def getTrustUI( # pylint: disable=invalid-name + self, + client: SatXMPPClient, + entity: jid.JID + ) -> xml_tools.XMLUI: + """ + @param client: The client. + @param entity: The entity whose device trust levels to manage. + @return: An XMLUI instance which opens a form to manage the trust level of all + devices belonging to the entity. + """ + + if entity.resource: + raise ValueError("A bare JID is expected.") - if not trust: - # if device is not trusted, we check if it must be removed from auto - # trusted devices list - bare_jid_s = data['jid'].userhost() - key = f"{KEY_AUTO_TRUST}\n{bare_jid_s}" - if bare_jid_s not in auto_trusted_cache: - auto_trusted_cache[bare_jid_s] = await stored_data.get( - key, default=set()) - auto_trusted = auto_trusted_cache[bare_jid_s] - if data['device'] in auto_trusted: - # as we don't trust this device anymore, we can remove it from the - # list of automatically trusted devices - auto_trusted.remove(data['device']) - await stored_data.aset(key, auto_trusted) - log.info(D_( - "device {device} from {peer_jid} is not an auto-trusted device " - "anymore").format(device=data['device'], peer_jid=bare_jid_s)) + bare_jids: Set[str] + if self.__xep_0045 is not None and self.__xep_0045.isJoinedRoom(client, entity): + bare_jids = self.__get_joined_muc_users(client, self.__xep_0045, entity) + else: + bare_jids = { entity.userhost() } + + session_manager = await self.__prepare_for_profile(client.profile) - await session.setTrust( - data["jid"], - data["device"], - data["ik"], - trusted=trust, - ) - if not trust and expect_problems is not None: - expect_problems.setdefault(data['jid'].userhost(), set()).add( - data['device'] - ) - return {} - - async def getTrustUI(self, client, entity_jid=None, trust_data=None, submit_id=None): - """Generate a XMLUI to manage trust + # At least sort the devices by bare JID such that they aren't listed completely + # random + devices = sorted(cast(Set[omemo.DeviceInformation], set()).union(*[ + await session_manager.get_device_information(bare_jid) + for bare_jid + in bare_jids + ]), key=lambda device: device.bare_jid) - @param entity_jid(None, jid.JID): jid of entity to manage - None to use trust_data - @param trust_data(None, dict): devices data: - None to use entity_jid - else a dict mapping from trust ids (unicode) to devices data, - where a device data must have the following keys: - - jid(jid.JID): bare jid of the device owner - - device(int): device id - - ik(bytes): identity key - and may have the following key: - - trusted(bool): True if device is trusted - @param submit_id(None, unicode): submit_id to use - if None set UI callback to trustUICb - @return D(xmlui): trust management form - """ - # we need entity_jid xor trust_data - assert entity_jid and not trust_data or not entity_jid and trust_data - if entity_jid and entity_jid.resource: - raise ValueError("A bare jid is expected") + async def callback( + data: Any, + profile: str # pylint: disable=unused-argument + ) -> Dict[Never, Never]: + """ + @param data: The XMLUI result produces by the trust UI form. + @param profile: The profile. + @return: An empty dictionary. The type of the return value was chosen + conservatively since the exact options are neither known not needed here. + """ - session = client._xep_0384_session - stored_data = client._xep_0384_data + if C.bool(data.get("cancelled", "false")): + return {} + + data_form_result = cast( + Dict[str, str], + xml_tools.XMLUIResult2DataFormResult(data) + ) + for key, value in data_form_result.items(): + if not key.startswith("trust_"): + continue - if trust_data is None: - cache = client._xep_0384_cache.setdefault(entity_jid, {}) - trust_data = {} - if self._m is not None and self._m.isJoinedRoom(client, entity_jid): - trust_jids = self.getJIDsForRoom(client, entity_jid) - else: - trust_jids = [entity_jid] - for trust_jid in trust_jids: - trust_session_data = await session.getTrustForJID(trust_jid) - bare_jid_s = trust_jid.userhost() - for device_id, trust_info in trust_session_data['active'].items(): - if trust_info is None: - # device has never been (un)trusted, we have to retrieve its - # fingerprint (i.e. identity key or "ik") through public bundle - if device_id not in cache: - bundles, missing = await self.getBundles(client, - trust_jid, - [device_id]) - if device_id not in bundles: - log.warning(_( - "Can't find bundle for device {device_id} of user " - "{bare_jid}, ignoring").format(device_id=device_id, - bare_jid=bare_jid_s)) - continue - cache[device_id] = bundles[device_id] - # TODO: replace False below by None when undecided - # trusts are handled - trust_info = { - "key": cache[device_id].ik, - "trusted": False - } + device = devices[int(key[len("trust_"):])] + trust = TrustLevel(value) + + if TrustLevel(device.trust_level_name) is not trust: + await session_manager.set_trust( + device.bare_jid, + device.identity_key, + value + ) + + return {} - ik = trust_info["key"] - trust_id = str(hash((bare_jid_s, device_id, ik))) - trust_data[trust_id] = { - "jid": trust_jid, - "device": device_id, - "ik": ik, - "trusted": trust_info["trusted"], - } + submit_id = self.__sat.registerCallback(callback, with_data=True, one_shot=True) - if submit_id is None: - submit_id = self.host.registerCallback( - lambda data, profile: defer.ensureDeferred( - self.trustUICb(data, trust_data=trust_data, profile=profile)), - with_data=True, - one_shot=True) - xmlui = xml_tools.XMLUI( - panel_type = C.XMLUI_FORM, - title = D_("OMEMO trust management"), - submit_id = submit_id + result = xml_tools.XMLUI( + panel_type=C.XMLUI_FORM, + title=D_("OMEMO trust management"), + submit_id=submit_id ) - xmlui.addText(D_( + # Casting this to Any, otherwise all calls on the variable cause type errors + # pylint: disable=no-member + trust_ui = cast(Any, result) + trust_ui.addText(D_( "This is OMEMO trusting system. You'll see below the devices of your " - "contacts, and a checkbox to trust them or not. A trusted device " + "contacts, and a list selection to trust them or not. A trusted device " "can read your messages in plain text, so be sure to only validate " "devices that you are sure are belonging to your contact. It's better " - "to do this when you are next to your contact and her/his device, so " + "to do this when you are next to your contact and their device, so " "you can check the \"fingerprint\" (the number next to the device) " - "yourself. Do *not* validate a device if the fingerprint is wrong!")) + "yourself. Do *not* validate a device if the fingerprint is wrong!" + )) + + own_device, __ = await session_manager.get_own_device_information() - xmlui.changeContainer("label") - xmlui.addLabel(D_("This device ID")) - xmlui.addText(str(client._xep_0384_device_id)) - xmlui.addLabel(D_("This device fingerprint")) - ik_hex = session.public_bundle.ik.hex().upper() - fp_human = ' '.join([ik_hex[i:i+8] for i in range(0, len(ik_hex), 8)]) - xmlui.addText(fp_human) - xmlui.addEmpty() - xmlui.addEmpty() - - if entity_jid is not None: - omemo_policy = self.host.memory.getParamA( - PARAM_NAME, PARAM_CATEGORY, profile_key=client.profile - ) - if omemo_policy == 'btbv': - xmlui.addLabel(D_("Automatically trust new devices?")) - # blind trust is always disabled when UI is requested - # as submitting UI is a verification which should disable it. - xmlui.addBool("blind_trust", value=C.BOOL_FALSE) - xmlui.addEmpty() - xmlui.addEmpty() + trust_ui.changeContainer("label") + trust_ui.addLabel(D_("This device ID")) + trust_ui.addText(str(own_device.device_id)) + trust_ui.addLabel(D_("This device's fingerprint")) + trust_ui.addText(" ".join(session_manager.format_identity_key( + own_device.identity_key + ))) + trust_ui.addEmpty() + trust_ui.addEmpty() - auto_trust_cache = {} + for index, device in enumerate(devices): + trust_ui.addLabel(D_("Contact")) + trust_ui.addJid(jid.JID(device.bare_jid)) + trust_ui.addLabel(D_("Device ID")) + trust_ui.addText(str(device.device_id)) + trust_ui.addLabel(D_("Fingerprint")) + trust_ui.addText(" ".join(session_manager.format_identity_key( + device.identity_key + ))) + trust_ui.addLabel(D_("Trust this device?")) - for trust_id, data in trust_data.items(): - bare_jid_s = data['jid'].userhost() - if bare_jid_s not in auto_trust_cache: - key = f"{KEY_AUTO_TRUST}\n{bare_jid_s}" - auto_trust_cache[bare_jid_s] = await stored_data.get(key, set()) - xmlui.addLabel(D_("Contact")) - xmlui.addJid(data['jid']) - xmlui.addLabel(D_("Device ID")) - xmlui.addText(str(data['device'])) - xmlui.addLabel(D_("Fingerprint")) - ik_hex = data['ik'].hex().upper() - fp_human = ' '.join([ik_hex[i:i+8] for i in range(0, len(ik_hex), 8)]) - xmlui.addText(fp_human) - xmlui.addLabel(D_("Trust this device?")) - xmlui.addBool("trust_{}".format(trust_id), - value=C.boolConst(data.get('trusted', False))) - if data['device'] in auto_trust_cache[bare_jid_s]: - xmlui.addEmpty() - xmlui.addLabel(D_("(automatically trusted)")) + current_trust_level = TrustLevel(device.trust_level_name) + avaiable_trust_levels = \ + { TrustLevel.DISTRUSTED, TrustLevel.TRUSTED, current_trust_level } - - xmlui.addEmpty() - xmlui.addEmpty() - - return xmlui + trust_ui.addList( + f"trust_{index}", + options=[ trust_level.name for trust_level in avaiable_trust_levels ], + selected=current_trust_level.name, + styles=[ "inline" ] + ) - async def profileConnected(self, client): - if self._m is not None: - # we keep plain text message for MUC messages we send - # as we can't encrypt for our own device - client._xep_0384_muc_cache = {} - # and we keep them only for some time, in case something goes wrong - # with the MUC - client._xep_0384_muc_cache_timer = None + twomemo_active = dict(device.active).get(twomemo.twomemo.NAMESPACE) + if twomemo_active is None: + trust_ui.addEmpty() + trust_ui.addLabel(D_("(not available for Twomemo)")) + if twomemo_active is False: + trust_ui.addEmpty() + trust_ui.addLabel(D_("(inactive for Twomemo)")) - # FIXME: is _xep_0384_ready needed? can we use profileConnecting? - # Workflow should be checked - client._xep_0384_ready = defer.Deferred() - # we first need to get devices ids (including our own) - persistent_dict = persistent.LazyPersistentBinaryDict("XEP-0384", client.profile) - client._xep_0384_data = persistent_dict - # all known devices of profile - devices = await self.getDevices(client) - # and our own device id - device_id = await persistent_dict.get(KEY_DEVICE_ID) - if device_id is None: - log.info(_("We have no identity for this device yet, let's generate one")) - # we have a new device, we create device_id - device_id = random.randint(1, 2**31-1) - # we check that it's really unique - while device_id in devices: - device_id = random.randint(1, 2**31-1) - # and we save it - await persistent_dict.aset(KEY_DEVICE_ID, device_id) + oldmemo_active = dict(device.active).get(oldmemo.oldmemo.NAMESPACE) + if oldmemo_active is None: + trust_ui.addEmpty() + trust_ui.addLabel(D_("(not available for Oldmemo)")) + if oldmemo_active is False: + trust_ui.addEmpty() + trust_ui.addLabel(D_("(inactive for Oldmemo)")) + + trust_ui.addEmpty() + trust_ui.addEmpty() + + return result - log.debug(f"our OMEMO device id is {device_id}") - - if device_id not in devices: - log.debug(f"our device id ({device_id}) is not in the list, adding it") - devices.add(device_id) - await defer.ensureDeferred(self.setDevices(client, devices)) - - all_jids = await persistent_dict.get(KEY_ALL_JIDS, set()) + @staticmethod + def __get_joined_muc_users( + client: SatXMPPClient, + xep_0045: XEP_0045, + room_jid: jid.JID + ) -> Set[str]: + """ + @param client: The client. + @param xep_0045: A MUC plugin instance. + @param room_jid: The room JID. + @return: A set containing the bare JIDs of the MUC participants. + @raise InternalError: if the MUC is not joined or the entity information of a + participant isn't available. + """ - omemo_storage = OmemoStorage(client, device_id, all_jids) - omemo_session = await OmemoSession.create(client, omemo_storage, device_id) - client._xep_0384_cache = {} - client._xep_0384_session = omemo_session - client._xep_0384_device_id = device_id - await omemo_session.newDeviceList(client.jid, devices) - if omemo_session.republish_bundle: - log.info(_("Saving public bundle for this device ({device_id})").format( - device_id=device_id)) - await defer.ensureDeferred( - self.setBundle(client, omemo_session.public_bundle, device_id) - ) - client._xep_0384_ready.callback(None) - del client._xep_0384_ready + bare_jids: Set[str] = set() - - ## XMPP PEP nodes manipulation - - # devices - - def parseDevices(self, items): - """Parse devices found in items + try: + room = cast(muc.Room, xep_0045.getRoom(client, room_jid)) + except exceptions.NotFound as e: + raise exceptions.InternalError( + "Participant list of unjoined MUC requested." + ) from e - @param items(iterable[domish.Element]): items as retrieved by getItems - @return set[int]: parsed devices + for user in cast(Dict[str, muc.User], room.roster).values(): + entity = cast(Optional[SatXMPPEntity], user.entity) + if entity is None: + raise exceptions.InternalError( + f"Participant list of MUC requested, but the entity information of" + f" the participant {user} is not available." + ) + + bare_jids.add(entity.jid.userhost()) + + return bare_jids + + async def __prepare_for_profile(self, profile: str) -> omemo.SessionManager: """ - devices = set() - if len(items) > 1: - log.warning(_("OMEMO devices list is stored in more that one items, " - "this is not expected")) - if items: - try: - list_elt = next(items[0].elements(NS_OMEMO, 'list')) - except StopIteration: - log.warning(_("no list element found in OMEMO devices list")) - return devices - for device_elt in list_elt.elements(NS_OMEMO, 'device'): - try: - device_id = int(device_elt['id']) - except KeyError: - log.warning(_('device element is missing "id" attribute: {elt}') - .format(elt=device_elt.toXml())) - except ValueError: - log.warning(_('invalid device id: {device_id}').format( - device_id=device_elt['id'])) - else: - devices.add(device_id) - return devices + @param profile: The profile to prepare for. + @return: A session manager instance for this profile. Creates a new instance if + none was prepared before. + """ - async def getDevices(self, client, entity_jid=None): - """Retrieve list of registered OMEMO devices - - @param entity_jid(jid.JID, None): get devices from this entity - None to get our own devices - @return (set(int)): list of devices - """ - if entity_jid is not None: - assert not entity_jid.resource - try: - items, metadata = await self._p.getItems(client, entity_jid, NS_OMEMO_DEVICES) - except exceptions.NotFound: - log.info(_("there is no node to handle OMEMO devices")) - return set() - - devices = self.parseDevices(items) - return devices - - async def setDevices(self, client, devices): - log.debug(f"setting devices with {', '.join(str(d) for d in devices)}") - list_elt = domish.Element((NS_OMEMO, 'list')) - for device in devices: - device_elt = list_elt.addElement('device') - device_elt['id'] = str(device) try: - await self._p.sendItem( - client, None, NS_OMEMO_DEVICES, list_elt, - item_id=self._p.ID_SINGLETON, - extra={ - self._p.EXTRA_PUBLISH_OPTIONS: {self._p.OPT_MAX_ITEMS: 1}, - self._p.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options", - } - ) - except Exception as e: - log.warning(_("Can't set devices: {reason}").format(reason=e)) + # Try to return the session manager + return self.__session_managers[profile] + except KeyError: + # If a session manager for that profile doesn't exist yet, check whether it is + # currently being built. A session manager being built is signified by the + # profile key existing on __session_manager_waiters. + if profile in self.__session_manager_waiters: + # If the session manager is being built, add ourselves to the waiting + # queue + deferred = defer.Deferred() + self.__session_manager_waiters[profile].append(deferred) + return cast(omemo.SessionManager, await deferred) + + # If the session manager is not being built, do so here. + self.__session_manager_waiters[profile] = [] - # bundles - - async def getBundles(self, client, entity_jid, devices_ids): - """Retrieve public bundles of an entity devices + # Build and store the session manager + session_manager = await prepare_for_profile( + self.__sat, + profile, + initial_own_label="Libervia" + ) + self.__session_managers[profile] = session_manager - @param entity_jid(jid.JID): bare jid of entity - @param devices_id(iterable[int]): ids of the devices bundles to retrieve - @return (tuple(dict[int, ExtendedPublicBundle], list(int))): - - bundles collection: - * key is device_id - * value is parsed bundle - - set of bundles not found + # Notify the waiters and delete them + for waiter in self.__session_manager_waiters[profile]: + waiter.callback(session_manager) + del self.__session_manager_waiters[profile] + + return session_manager + + async def __message_received_trigger( + self, + client: SatXMPPClient, + message_elt: domish.Element, + post_treat: defer.Deferred + ) -> bool: """ - assert not entity_jid.resource - bundles = {} - missing = set() - for device_id in devices_ids: - node = NS_OMEMO_BUNDLE.format(device_id=device_id) - try: - items, metadata = await self._p.getItems(client, entity_jid, node) - except exceptions.NotFound: - log.warning(_("Bundle missing for device {device_id}") - .format(device_id=device_id)) - missing.add(device_id) - continue - except jabber_error.StanzaError as e: - log.warning(_("Can't get bundle for device {device_id}: {reason}") - .format(device_id=device_id, reason=e)) - continue - if not items: - log.warning(_("no item found in node {node}, can't get public bundle " - "for device {device_id}").format(node=node, - device_id=device_id)) - continue - if len(items) > 1: - log.warning(_("more than one item found in {node}, " - "this is not expected").format(node=node)) - item = items[0] - try: - bundle_elt = next(item.elements(NS_OMEMO, 'bundle')) - signedPreKeyPublic_elt = next(bundle_elt.elements( - NS_OMEMO, 'signedPreKeyPublic')) - signedPreKeySignature_elt = next(bundle_elt.elements( - NS_OMEMO, 'signedPreKeySignature')) - identityKey_elt = next(bundle_elt.elements( - NS_OMEMO, 'identityKey')) - prekeys_elt = next(bundle_elt.elements( - NS_OMEMO, 'prekeys')) - except StopIteration: - log.warning(_("invalid bundle for device {device_id}, ignoring").format( - device_id=device_id)) - continue + @param client: The client which received the message. + @param message_elt: The message element. Can be modified. + @param post_treat: A deferred which evaluates to a :class:`MessageData` once the + message has fully progressed through the message receiving flow. Can be used + to apply treatments to the fully processed message, like marking it as + encrypted. + @return: Whether to continue the message received flow. + """ + + muc_plaintext_cache_key: Optional[MUCPlaintextCacheKey] = None + + sender_jid = jid.JID(message_elt["from"]) + feedback_jid: jid.JID + + message_type = message_elt.getAttribute("type", "unknown") + is_muc_message = message_type == C.MESS_TYPE_GROUPCHAT + if is_muc_message: + if self.__xep_0045 is None: + log.warning( + "Ignoring MUC message since plugin XEP-0045 is not available." + ) + # Can't handle a MUC message without XEP-0045, let the flow continue + # normally + return True + + room_jid = feedback_jid = sender_jid.userhostJID() try: - spkPublic = base64.b64decode(str(signedPreKeyPublic_elt)) - spkSignature = base64.b64decode( - str(signedPreKeySignature_elt)) + room = cast(muc.Room, self.__xep_0045.getRoom(client, room_jid)) + except exceptions.NotFound: + log.warning( + f"Ignoring MUC message from a room that has not been joined:" + f" {room_jid}" + ) + # Whatever, let the flow continue + return True + + sender_user = cast(Optional[muc.User], room.getUser(sender_jid.resource)) + if sender_user is None: + log.warning( + f"Ignoring MUC message from room {room_jid} since the sender's user" + f" wasn't found {sender_jid.resource}" + ) + # Whatever, let the flow continue + return True + + sender_user_jid = cast(Optional[jid.JID], sender_user.entity) + if sender_user_jid is None: + log.warning( + f"Ignoring MUC message from room {room_jid} since the sender's bare" + f" JID couldn't be found from its user information: {sender_user}" + ) + # Whatever, let the flow continue + return True + + sender_jid = sender_user_jid + + message_uid: Optional[str] = None + if self.__xep_0359 is not None: + message_uid = self.__xep_0359.getOriginId(message_elt) + if message_uid is None: + message_uid = message_elt.getAttribute("id") + if message_uid is not None: + muc_plaintext_cache_key = MUCPlaintextCacheKey( + client, + room_jid, + message_uid + ) + else: + # I'm not sure why this check is required, this code is copied from the old + # plugin. + if sender_jid.userhostJID() == client.jid.userhostJID(): + # TODO: I've seen this cause an exception "builtins.KeyError: 'to'", seems + # like "to" isn't always set. + feedback_jid = jid.JID(message_elt["to"]) + else: + feedback_jid = sender_jid + + sender_bare_jid = sender_jid.userhost() + + message: Optional[omemo.Message] = None + encrypted_elt: Optional[domish.Element] = None + + twomemo_encrypted_elt = cast(Optional[domish.Element], next( + message_elt.elements(twomemo.twomemo.NAMESPACE, "encrypted"), + None + )) + + oldmemo_encrypted_elt = cast(Optional[domish.Element], next( + message_elt.elements(oldmemo.oldmemo.NAMESPACE, "encrypted"), + None + )) + + session_manager = await self.__prepare_for_profile(cast(str, client.profile)) + + if twomemo_encrypted_elt is not None: + try: + message = twomemo.etree.parse_message( + domish_to_etree(twomemo_encrypted_elt), + sender_bare_jid + ) + except (ValueError, XMLSchemaValidationError): + log.warning( + f"Ingoring malformed encrypted message for namespace" + f" {twomemo.twomemo.NAMESPACE}: {twomemo_encrypted_elt.toXml()}" + ) + else: + encrypted_elt = twomemo_encrypted_elt + + if oldmemo_encrypted_elt is not None: + try: + message = await oldmemo.etree.parse_message( + domish_to_etree(oldmemo_encrypted_elt), + sender_bare_jid, + client.jid.userhost(), + session_manager + ) + except (ValueError, XMLSchemaValidationError): + log.warning( + f"Ingoring malformed encrypted message for namespace" + f" {oldmemo.oldmemo.NAMESPACE}: {oldmemo_encrypted_elt.toXml()}" + ) + except omemo.SenderNotFound: + log.warning( + f"Ingoring encrypted message for namespace" + f" {oldmemo.oldmemo.NAMESPACE} by unknown sender:" + f" {oldmemo_encrypted_elt.toXml()}" + ) + else: + encrypted_elt = oldmemo_encrypted_elt + + if message is None or encrypted_elt is None: + # None of our business, let the flow continue + return True + + message_elt.children.remove(encrypted_elt) - ik = base64.b64decode(str(identityKey_elt)) - spk = { - "key": spkPublic, - "id": int(signedPreKeyPublic_elt['signedPreKeyId']) - } - otpks = [] - for preKeyPublic_elt in prekeys_elt.elements(NS_OMEMO, 'preKeyPublic'): - preKeyPublic = base64.b64decode(str(preKeyPublic_elt)) - otpk = { - "key": preKeyPublic, - "id": int(preKeyPublic_elt['preKeyId']) - } - otpks.append(otpk) + log.debug( + f"{message.namespace} message of type {message_type} received from" + f" {sender_bare_jid}" + ) + + plaintext: Optional[bytes] + device_information: omemo.DeviceInformation + + if ( + muc_plaintext_cache_key is not None + and muc_plaintext_cache_key in self.__muc_plaintext_cache + ): + # Use the cached plaintext + plaintext = self.__muc_plaintext_cache.pop(muc_plaintext_cache_key) + + # Since this message was sent by us, use the own device information here + device_information, __ = await session_manager.get_own_device_information() + else: + try: + plaintext, device_information = await session_manager.decrypt(message) + except omemo.MessageNotForUs: + # The difference between this being a debug or a warning is whether there + # is a body included in the message. Without a body, we can assume that + # it's an empty OMEMO message used for protocol stability reasons, which + # is not expected to be sent to all devices of all recipients. If a body + # is included, we can assume that the message carries content and we + # missed out on something. + if len(list(message_elt.elements(C.NS_CLIENT, "body"))) > 0: + client.feedback( + feedback_jid, + D_( + f"An OMEMO message from {sender_jid.full()} has not been" + f" encrypted for our device, we can't decrypt it." + ), + { C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR } + ) + log.warning("Message not encrypted for us.") + else: + log.debug("Message not encrypted for us.") + # No point in further processing this message. + return False except Exception as e: - log.warning(_("error while decoding key for device {device_id}: {msg}") - .format(device_id=device_id, msg=e)) - continue + log.warning(_("Can't decrypt message: {reason}\n{xml}").format( + reason=e, + xml=message_elt.toXml() + )) + client.feedback( + feedback_jid, + D_( + f"An OMEMO message from {sender_jid.full()} can't be decrypted:" + f" {e}" + ), + { C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR } + ) + # No point in further processing this message + return False - bundles[device_id] = ExtendedPublicBundle.parse(omemo_backend, ik, spk, - spkSignature, otpks) + if message.namespace == twomemo.twomemo.NAMESPACE: + if plaintext is not None: + # XEP_0420.unpack_stanza handles the whole unpacking, including the + # relevant modifications to the element + sce_profile = \ + OMEMO.SCE_PROFILE_GROUPCHAT if is_muc_message else OMEMO.SCE_PROFILE + try: + affix_values = self.__xep_0420.unpack_stanza( + sce_profile, + message_elt, + plaintext + ) + except Exception as e: + log.warning(D_( + f"Error unpacking SCE-encrypted message: {e}\n{plaintext}" + )) + client.feedback( + feedback_jid, + D_( + f"An OMEMO message from {sender_jid.full()} was rejected:" + f" {e}" + ), + { C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR } + ) + # No point in further processing this message + return False - return (bundles, missing) + if affix_values.timestamp is not None: + # TODO: affix_values.timestamp contains the timestamp included in the + # encrypted element here. The XEP says it SHOULD be displayed with the + # plaintext by clients. + pass - async def setBundle(self, client, bundle, device_id): - """Set public bundle for this device. + if message.namespace == oldmemo.oldmemo.NAMESPACE: + # Remove all body elements from the original element, since those act as + # fallbacks in case the encryption protocol is not supported + for child in message_elt.elements(): + if child.name == "body": + message_elt.children.remove(child) + + if plaintext is not None: + # Add the decrypted body + message_elt.addElement("body", content=plaintext.decode("utf-8")) + + # Mark the message as trusted or untrusted. Undecided counts as untrusted here. + trust_level = \ + TrustLevel(device_information.trust_level_name).to_omemo_trust_level() + if trust_level is omemo.TrustLevel.TRUSTED: + post_treat.addCallback(client.encryption.markAsTrusted) + else: + post_treat.addCallback(client.encryption.markAsUntrusted) - @param bundle(ExtendedPublicBundle): bundle to publish + # Mark the message as originally encrypted + post_treat.addCallback( + client.encryption.markAsEncrypted, + namespace=message.namespace + ) + + # Message processed successfully, continue with the flow + return True + + async def __send_trigger(self, client: SatXMPPClient, stanza: domish.Element) -> bool: + """ + @param client: The client sending this message. + @param stanza: The stanza that is about to be sent. Can be modified. + @return: Whether the send message flow should continue or not. """ - log.debug(_("updating bundle for {device_id}").format(device_id=device_id)) - bundle = bundle.serialize(omemo_backend) - bundle_elt = domish.Element((NS_OMEMO, 'bundle')) - signedPreKeyPublic_elt = bundle_elt.addElement( - "signedPreKeyPublic", - content=b64enc(bundle["spk"]['key'])) - signedPreKeyPublic_elt['signedPreKeyId'] = str(bundle["spk"]['id']) + + # SCE is only applicable to message and IQ stanzas + if stanza.name not in { "message", "iq" }: + return True + + # Get the intended recipient + recipient = stanza.getAttribute("to", None) + if recipient is None: + if stanza.name == "message": + # Message stanzas must have a recipient + raise exceptions.InternalError( + f"Message without recipient encountered. Blocking further processing" + f" to avoid leaking plaintext data: {stanza.toXml()}" + ) + + # IQs without a recipient are a thing, I believe those simply target the + # server and are thus not eligible for e2ee anyway. + return True + + # Parse the JID + recipient_bare_jid = jid.JID(recipient).userhostJID() + + # Check whether encryption with twomemo is requested + encryption = client.encryption.getSession(recipient_bare_jid) + + if encryption is None: + # Encryption is not requested for this recipient + return True + + if encryption["plugin"].namespace != twomemo.twomemo.NAMESPACE: + # Encryption is requested for this recipient, but not with twomemo + return True + + # All pre-checks done, we can start encrypting! + await self.__encrypt( + client, + twomemo.twomemo.NAMESPACE, + stanza, + recipient_bare_jid, + stanza.getAttribute("type", "unkown") == C.MESS_TYPE_GROUPCHAT, + stanza.getAttribute("id", None) + ) - bundle_elt.addElement( - "signedPreKeySignature", - content=b64enc(bundle["spk_signature"])) + # Add a store hint if this is a message stanza + if stanza.name == "message": + self.__xep_0334.addHintElements(stanza, [ "store" ]) + + # Let the flow continue. + return True + + async def __send_message_data_trigger( + self, + client: SatXMPPClient, + mess_data: MessageData + ) -> None: + """ + @param client: The client sending this message. + @param mess_data: The message data that is about to be sent. Can be modified. + """ + + # Check whether encryption is requested for this message + try: + namespace = mess_data[C.MESS_KEY_ENCRYPTION]["plugin"].namespace + except KeyError: + return + + # If encryption is requested, check whether it's oldmemo + if namespace != oldmemo.oldmemo.NAMESPACE: + return + + # All pre-checks done, we can start encrypting! + stanza = mess_data["xml"] + recipient_jid = mess_data["to"] + is_muc_message = mess_data["type"] == C.MESS_TYPE_GROUPCHAT + stanza_id = mess_data["uid"] + + await self.__encrypt( + client, + oldmemo.oldmemo.NAMESPACE, + stanza, + recipient_jid, + is_muc_message, + stanza_id + ) + + # Add a store hint + self.__xep_0334.addHintElements(stanza, [ "store" ]) - bundle_elt.addElement( - "identityKey", - content=b64enc(bundle["ik"])) + async def __encrypt( + self, + client: SatXMPPClient, + namespace: Literal["urn:xmpp:omemo:2", "eu.siacs.conversations.axolotl"], + stanza: domish.Element, + recipient_jid: jid.JID, + is_muc_message: bool, + stanza_id: Optional[str] + ) -> None: + """ + @param client: The client. + @param namespace: The namespace of the OMEMO version to use. + @param stanza: The stanza. Twomemo will encrypt the whole stanza using SCE, + oldmemo will encrypt only the body. The stanza is modified by this call. + @param recipient_jid: The JID of the recipient. Can be a bare (aka "userhost") JID + but doesn't have to. + @param is_muc_message: Whether the stanza is a message stanza to a MUC room. + @param stanza_id: The id of this stanza. Especially relevant for message stanzas + to MUC rooms such that the outgoing plaintext can be cached for MUC message + reflection handling. + + @warning: The calling code MUST take care of adding the store message processing + hint to the stanza if applicable! This can be done before or after this call, + the order doesn't matter. + """ + + muc_plaintext_cache_key: Optional[MUCPlaintextCacheKey] = None + + recipient_bare_jids: Set[str] + feedback_jid: jid.JID + + if is_muc_message: + if self.__xep_0045 is None: + raise exceptions.InternalError( + "Encryption of MUC message requested, but plugin XEP-0045 is not" + " available." + ) + + if stanza_id is None: + raise exceptions.InternalError( + "Encryption of MUC message requested, but stanza id not available." + ) + + room_jid = feedback_jid = recipient_jid.userhostJID() + + recipient_bare_jids = self.__get_joined_muc_users( + client, + self.__xep_0045, + room_jid + ) + + muc_plaintext_cache_key = MUCPlaintextCacheKey( + client=client, + room_jid=room_jid, + message_uid=stanza_id + ) + else: + recipient_bare_jids = { recipient_jid.userhost() } + feedback_jid = recipient_jid.userhostJID() - prekeys_elt = bundle_elt.addElement('prekeys') - for otpk in bundle["otpks"]: - preKeyPublic_elt = prekeys_elt.addElement( - 'preKeyPublic', - content=b64enc(otpk["key"])) - preKeyPublic_elt['preKeyId'] = str(otpk['id']) + log.debug( + f"Intercepting message that is to be encrypted by {namespace} for" + f" {recipient_bare_jids}" + ) + + def prepare_stanza() -> Optional[bytes]: + """Prepares the stanza for encryption. + + Does so by removing all parts that are not supposed to be sent in plain. Also + extracts/prepares the plaintext to encrypt. + + @return: The plaintext to encrypt. Returns ``None`` in case body-only + encryption is requested and no body was found. The function should + gracefully return in that case, i.e. it's not a critical error that should + abort the message sending flow. + """ + + if namespace == twomemo.twomemo.NAMESPACE: + return self.__xep_0420.pack_stanza( + OMEMO.SCE_PROFILE_GROUPCHAT if is_muc_message else OMEMO.SCE_PROFILE, + stanza + ) + + if namespace == oldmemo.oldmemo.NAMESPACE: + plaintext: Optional[bytes] = None + + for child in stanza.elements(): + if child.name == "body" and plaintext is None: + plaintext = str(child).encode("utf-8") - node = NS_OMEMO_BUNDLE.format(device_id=device_id) + # Any other sensitive elements to remove here? + if child.name in { "body", "html" }: + stanza.children.remove(child) + + if plaintext is None: + log.warning( + "No body found in intercepted message to be encrypted with" + " oldmemo." + ) + + return plaintext + + return assert_never(namespace) + + # The stanza/plaintext preparation was moved into its own little function for type + # safety reasons. + plaintext = prepare_stanza() + if plaintext is None: + return + + log.debug(f"Plaintext to encrypt: {plaintext}") + + session_manager = await self.__prepare_for_profile(client.profile) + try: - await self._p.sendItem( - client, None, node, bundle_elt, item_id=self._p.ID_SINGLETON, - extra={ - self._p.EXTRA_PUBLISH_OPTIONS: {self._p.OPT_MAX_ITEMS: 1}, - self._p.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options", - } + messages, encryption_errors = await session_manager.encrypt( + frozenset(recipient_bare_jids), + { namespace: plaintext }, + backend_priority_order=[ namespace ], + identifier=feedback_jid.userhost() ) except Exception as e: - log.warning(_("Can't set bundle: {reason}").format(reason=e)) - - ## PEP node events callbacks - - async def onNewDevices(self, itemsEvent, profile): - log.debug("devices list has been updated") - client = self.host.getClient(profile) - try: - omemo_session = client._xep_0384_session - except AttributeError: - await client._xep_0384_ready - omemo_session = client._xep_0384_session - entity = itemsEvent.sender - - devices = self.parseDevices(itemsEvent.items) - await omemo_session.newDeviceList(entity, devices) - - if entity == client.jid.userhostJID(): - own_device = client._xep_0384_device_id - if own_device not in devices: - log.warning(_("Our own device is missing from devices list, fixing it")) - devices.add(own_device) - await self.setDevices(client, devices) - - ## triggers - - async def policyBTBV(self, client, feedback_jid, expect_problems, undecided): - session = client._xep_0384_session - stored_data = client._xep_0384_data - for pb in undecided.values(): - peer_jid = jid.JID(pb.bare_jid) - device = pb.device - ik = pb.ik - key = f"{KEY_AUTO_TRUST}\n{pb.bare_jid}" - auto_trusted = await stored_data.get(key, default=set()) - auto_trusted.add(device) - await stored_data.aset(key, auto_trusted) - await session.setTrust(peer_jid, device, ik, True) - - user_msg = D_( - "Not all destination devices are trusted, unknown devices will be blind " - "trusted due to the OMEMO Blind Trust Before Verification policy. If you " - "want a more secure workflow, please activate \"manual\" OMEMO policy in " - "settings' \"Security\" tab.\nFollowing fingerprint have been automatically " - "trusted:\n{devices}" - ).format( - devices = ', '.join( - f"- {pb.device} ({pb.bare_jid}): {pb.ik.hex().upper()}" - for pb in undecided.values() + msg = _( + # pylint: disable=consider-using-f-string + "Can't encrypt message for {entities}: {reason}".format( + entities=', '.join(recipient_bare_jids), + reason=e + ) ) - ) - client.feedback(feedback_jid, user_msg) - - async def policyManual(self, client, feedback_jid, expect_problems, undecided): - trust_data = {} - for trust_id, data in undecided.items(): - trust_data[trust_id] = { - 'jid': jid.JID(data.bare_jid), - 'device': data.device, - 'ik': data.ik} - - user_msg = D_("Not all destination devices are trusted, we can't encrypt " - "message in such a situation. Please indicate if you trust " - "those devices or not in the trust manager before we can " - "send this message") - client.feedback(feedback_jid, user_msg) - xmlui = await self.getTrustUI(client, trust_data=trust_data, submit_id="") - - answer = await xml_tools.deferXMLUI( - self.host, - xmlui, - action_extra={ - "meta_encryption_trust": NS_OMEMO, - }, - profile=client.profile) - await self.trustUICb(answer, trust_data, expect_problems, client.profile) - - async def handleProblems( - self, client, feedback_jid, bundles, expect_problems, problems): - """Try to solve problems found by EncryptMessage - - @param feedback_jid(jid.JID): bare jid where the feedback message must be sent - @param bundles(dict): bundles data as used in EncryptMessage - already filled with known bundles, missing bundles - need to be added to it - This dict is updated - @param problems(list): exceptions raised by EncryptMessage - @param expect_problems(dict): known problems to expect, used in encryptMessage - This dict will list devices where problems can be ignored - (those devices won't receive the encrypted data) - This dict is updated - """ - # FIXME: not all problems are handled yet - undecided = {} - missing_bundles = {} - found_bundles = None - cache = client._xep_0384_cache - for problem in problems: - if isinstance(problem, omemo_excpt.TrustException): - if problem.problem == 'undecided': - undecided[str(hash(problem))] = problem - elif problem.problem == 'untrusted': - expect_problems.setdefault(problem.bare_jid, set()).add( - problem.device) - log.info(_( - "discarding untrusted device {device_id} with key {device_key} " - "for {entity}").format( - device_id=problem.device, - device_key=problem.ik.hex().upper(), - entity=problem.bare_jid, - ) - ) - else: - log.error( - f"Unexpected trust problem: {problem.problem!r} for device " - f"{problem.device} for {problem.bare_jid}, ignoring device") - expect_problems.setdefault(problem.bare_jid, set()).add( - problem.device) - elif isinstance(problem, omemo_excpt.MissingBundleException): - pb_entity = jid.JID(problem.bare_jid) - entity_cache = cache.setdefault(pb_entity, {}) - entity_bundles = bundles.setdefault(pb_entity, {}) - if problem.device in entity_cache: - entity_bundles[problem.device] = entity_cache[problem.device] - else: - found_bundles, missing = await self.getBundles( - client, pb_entity, [problem.device]) - entity_cache.update(bundles) - entity_bundles.update(found_bundles) - if problem.device in missing: - missing_bundles.setdefault(pb_entity, set()).add( - problem.device) - expect_problems.setdefault(problem.bare_jid, set()).add( - problem.device) - elif isinstance(problem, omemo_excpt.NoEligibleDevicesException): - if undecided or found_bundles: - # we may have new devices after this run, so let's continue for now - continue - else: - raise problem - else: - raise problem - - for peer_jid, devices in missing_bundles.items(): - devices_s = [str(d) for d in devices] - log.warning( - _("Can't retrieve bundle for device(s) {devices} of entity {peer}, " - "the message will not be readable on this/those device(s)").format( - devices=", ".join(devices_s), peer=peer_jid.full())) - client.feedback( - feedback_jid, - D_("You're destinee {peer} has missing encryption data on some of " - "his/her device(s) (bundle on device {devices}), the message won't " - "be readable on this/those device.").format( - peer=peer_jid.full(), devices=", ".join(devices_s))) - - if undecided: - omemo_policy = self.host.memory.getParamA( - PARAM_NAME, PARAM_CATEGORY, profile_key=client.profile - ) - if omemo_policy == 'btbv': - # we first separate entities which have been trusted manually - manual_trust = await client._xep_0384_data.get(KEY_MANUAL_TRUST) - if manual_trust: - manual_undecided = {} - for hash_, pb in undecided.items(): - if pb.bare_jid in manual_trust: - manual_undecided[hash_] = pb - for hash_ in manual_undecided: - del undecided[hash_] - else: - manual_undecided = None - - if undecided: - # we do the automatic trust here - await self.policyBTBV( - client, feedback_jid, expect_problems, undecided) - if manual_undecided: - # here user has to manually trust new devices from entities already - # verified - await self.policyManual( - client, feedback_jid, expect_problems, manual_undecided) - elif omemo_policy == 'manual': - await self.policyManual( - client, feedback_jid, expect_problems, undecided) - else: - raise exceptions.InternalError(f"Unexpected OMEMO policy: {omemo_policy}") - - async def encryptMessage(self, client, entity_bare_jids, message, feedback_jid=None): - if feedback_jid is None: - if len(entity_bare_jids) != 1: - log.error( - "feedback_jid must be provided when message is encrypted for more " - "than one entities") - feedback_jid = entity_bare_jids[0] - omemo_session = client._xep_0384_session - expect_problems = {} - bundles = {} - loop_idx = 0 - try: - while True: - if loop_idx > 10: - msg = _("Too many iterations in encryption loop") - log.error(msg) - raise exceptions.InternalError(msg) - # encryptMessage may fail, in case of e.g. trust issue or missing bundle - try: - if not message: - encrypted = await omemo_session.encryptRatchetForwardingMessage( - entity_bare_jids, - bundles, - expect_problems = expect_problems) - else: - encrypted = await omemo_session.encryptMessage( - entity_bare_jids, - message, - bundles, - expect_problems = expect_problems) - except omemo_excpt.EncryptionProblemsException as e: - # we know the problem to solve, we can try to fix them - await self.handleProblems( - client, - feedback_jid=feedback_jid, - bundles=bundles, - expect_problems=expect_problems, - problems=e.problems) - loop_idx += 1 - else: - break - except Exception as e: - msg = _("Can't encrypt message for {entities}: {reason}".format( - entities=', '.join(e.full() for e in entity_bare_jids), reason=e)) log.warning(msg) - extra = {C.MESS_EXTRA_INFO: C.EXTRA_INFO_ENCR_ERR} - client.feedback(feedback_jid, msg, extra) + client.feedback(feedback_jid, msg, { + C.MESS_EXTRA_INFO: C.EXTRA_INFO_ENCR_ERR + }) raise e - defer.returnValue(encrypted) - - @defer.inlineCallbacks - def _messageReceivedTrigger(self, client, message_elt, post_treat): - try: - encrypted_elt = next(message_elt.elements(NS_OMEMO, "encrypted")) - except StopIteration: - # no OMEMO message here - defer.returnValue(True) - - # we have an encrypted message let's decrypt it - - from_jid = jid.JID(message_elt['from']) + if len(encryption_errors) > 0: + log.warning( + f"Ignored the following non-critical encryption errors:" + f" {encryption_errors}" + ) - if message_elt.getAttribute("type") == C.MESS_TYPE_GROUPCHAT: - # with group chat, we must get the real jid for decryption - # and use the room as feedback_jid - - if self._m is None: - # plugin XEP-0045 (MUC) is not available - defer.returnValue(True) - - room_jid = from_jid.userhostJID() - feedback_jid = room_jid - if self._sid is not None: - mess_id = self._sid.getOriginId(message_elt) - else: - mess_id = None - - if mess_id is None: - mess_id = message_elt.getAttribute('id') - cache_key = (room_jid, mess_id) + encrypted_errors_stringified = ", ".join([ + f"device {err.device_id} of {err.bare_jid} under namespace" + f" {err.namespace}" + for err + in encryption_errors + ]) - try: - room = self._m.getRoom(client, room_jid) - except exceptions.NotFound: - log.warning( - f"Received an OMEMO encrypted msg from a room {room_jid} which has " - f"not been joined, ignoring") - defer.returnValue(True) - - user = room.getUser(from_jid.resource) - if user is None: - log.warning(f"Can't find user {user} in room {room_jid}, ignoring") - defer.returnValue(True) - if not user.entity: - log.warning( - f"Real entity of user {user} in room {room_jid} can't be established," - f" OMEMO encrypted message can't be decrypted") - defer.returnValue(True) - - # now we have real jid of the entity, we use it instead of from_jid - from_jid = user.entity.userhostJID() + client.feedback( + feedback_jid, + D_( + "There were non-critical errors during encryption resulting in some" + " of your destinees' devices potentially not receiving the message." + " This happens when the encryption data/key material of a device is" + " incomplete or broken, which shouldn't happen for actively used" + " devices, and can usually be ignored. The following devices are" + f" affected: {encrypted_errors_stringified}." + ) + ) - else: - # we have a one2one message, we can user "from" and "to" normally - - if from_jid.userhostJID() == client.jid.userhostJID(): - feedback_jid = jid.JID(message_elt['to']) - else: - feedback_jid = from_jid - - - if (message_elt.getAttribute("type") == C.MESS_TYPE_GROUPCHAT - and mess_id is not None - and cache_key in client._xep_0384_muc_cache): - plaintext = client._xep_0384_muc_cache.pop(cache_key) - if not client._xep_0384_muc_cache: - client._xep_0384_muc_cache_timer.cancel() - client._xep_0384_muc_cache_timer = None - else: - try: - omemo_session = client._xep_0384_session - except AttributeError: - # on startup, message can ve received before session actually exists - # so we need to synchronise here - yield client._xep_0384_ready - omemo_session = client._xep_0384_session + message = next(message for message in messages if message.namespace == namespace) - device_id = client._xep_0384_device_id - try: - header_elt = next(encrypted_elt.elements(NS_OMEMO, 'header')) - iv_elt = next(header_elt.elements(NS_OMEMO, 'iv')) - except StopIteration: - log.warning(_("Invalid OMEMO encrypted stanza, ignoring: {xml}") - .format(xml=message_elt.toXml())) - defer.returnValue(False) - try: - s_device_id = header_elt['sid'] - except KeyError: - log.warning(_("Invalid OMEMO encrypted stanza, missing sender device ID, " - "ignoring: {xml}") - .format(xml=message_elt.toXml())) - defer.returnValue(False) - try: - key_elt = next((e for e in header_elt.elements(NS_OMEMO, 'key') - if int(e['rid']) == device_id)) - except StopIteration: - log.warning(_("This OMEMO encrypted stanza has not been encrypted " - "for our device (device_id: {device_id}, fingerprint: " - "{fingerprint}): {xml}").format( - device_id=device_id, - fingerprint=omemo_session.public_bundle.ik.hex().upper(), - xml=encrypted_elt.toXml())) - user_msg = (D_("An OMEMO message from {sender} has not been encrypted for " - "our device, we can't decrypt it").format( - sender=from_jid.full())) - extra = {C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR} - client.feedback(feedback_jid, user_msg, extra) - defer.returnValue(False) - except ValueError as e: - log.warning(_("Invalid recipient ID: {msg}".format(msg=e))) - defer.returnValue(False) - is_pre_key = C.bool(key_elt.getAttribute('prekey', 'false')) - payload_elt = next(encrypted_elt.elements(NS_OMEMO, 'payload'), None) - additional_information = { - "from_storage": bool(message_elt.delay) - } + if namespace == twomemo.twomemo.NAMESPACE: + # Add the encrypted element + stanza.addChild(etree_to_domish(twomemo.etree.serialize_message(message))) - kwargs = { - "bare_jid": from_jid.userhostJID(), - "device": s_device_id, - "iv": base64.b64decode(bytes(iv_elt)), - "message": base64.b64decode(bytes(key_elt)), - "is_pre_key_message": is_pre_key, - "additional_information": additional_information, - } + if namespace == oldmemo.oldmemo.NAMESPACE: + # Add the encrypted element + stanza.addChild(etree_to_domish(oldmemo.etree.serialize_message(message))) + + if muc_plaintext_cache_key is not None: + self.__muc_plaintext_cache[muc_plaintext_cache_key] = plaintext - try: - if payload_elt is None: - omemo_session.decryptRatchetForwardingMessage(**kwargs) - plaintext = None - else: - kwargs["ciphertext"] = base64.b64decode(bytes(payload_elt)) - try: - plaintext = yield omemo_session.decryptMessage(**kwargs) - except omemo_excpt.TrustException: - post_treat.addCallback(client.encryption.markAsUntrusted) - kwargs['allow_untrusted'] = True - plaintext = yield omemo_session.decryptMessage(**kwargs) - else: - post_treat.addCallback(client.encryption.markAsTrusted) - plaintext = plaintext.decode() - except Exception as e: - log.warning(_("Can't decrypt message: {reason}\n{xml}").format( - reason=e, xml=message_elt.toXml())) - user_msg = (D_( - "An OMEMO message from {sender} can't be decrypted: {reason}") - .format(sender=from_jid.full(), reason=e)) - extra = {C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR} - client.feedback(feedback_jid, user_msg, extra) - defer.returnValue(False) - finally: - if omemo_session.republish_bundle: - # we don't wait for the Deferred (i.e. no yield) on purpose - # there is no need to block the whole message workflow while - # updating the bundle - defer.ensureDeferred( - self.setBundle(client, omemo_session.public_bundle, device_id) - ) - - message_elt.children.remove(encrypted_elt) - if plaintext: - message_elt.addElement("body", content=plaintext) - post_treat.addCallback(client.encryption.markAsEncrypted, namespace=NS_OMEMO) - defer.returnValue(True) + async def __on_device_list_update( + self, + items_event: pubsub.ItemsEvent, + profile: str + ) -> None: + """Handle device list updates fired by PEP. - def getJIDsForRoom(self, client, room_jid): - if self._m is None: - exceptions.InternalError("XEP-0045 plugin missing, can't encrypt for group chat") - room = self._m.getRoom(client, room_jid) - return [u.entity.userhostJID() for u in room.roster.values()] - - def _expireMUCCache(self, client): - client._xep_0384_muc_cache_timer = None - for (room_jid, uid), msg in client._xep_0384_muc_cache.items(): - client.feedback( - room_jid, - D_("Our message with UID {uid} has not been received in time, it has " - "probably been lost. The message was: {msg!r}").format( - uid=uid, msg=str(msg))) - - client._xep_0384_muc_cache.clear() - log.warning("Cache for OMEMO MUC has expired") + @param items_event: The event. + @param profile: The profile this event belongs to. + """ - @defer.inlineCallbacks - def _sendMessageDataTrigger(self, client, mess_data): - encryption = mess_data.get(C.MESS_KEY_ENCRYPTION) - if encryption is None or encryption['plugin'].namespace != NS_OMEMO: - return - message_elt = mess_data["xml"] - if mess_data['type'] == C.MESS_TYPE_GROUPCHAT: - feedback_jid = room_jid = mess_data['to'] - to_jids = self.getJIDsForRoom(client, room_jid) - else: - feedback_jid = to_jid = mess_data["to"].userhostJID() - to_jids = [to_jid] - log.debug("encrypting message") - body = None - for child in list(message_elt.children): - if child.name == "body": - # we remove all unencrypted body, - # and will only encrypt the first one - if body is None: - body = child - message_elt.children.remove(child) - elif child.name == "html": - # we don't want any XHTML-IM element - message_elt.children.remove(child) + sender = cast(jid.JID, items_event.sender) + items = cast(List[domish.Element], items_event.items) - if body is None: - log.warning("No message found") + if len(items) > 1: + log.warning("Ignoring device list update with more than one element.") return - body = str(body) + item = next(iter(items), None) + if item is None: + log.debug("Ignoring empty device list update.") + return + + item_elt = domish_to_etree(item) - if mess_data['type'] == C.MESS_TYPE_GROUPCHAT: - key = (room_jid, mess_data['uid']) - # XXX: we can't encrypt message for our own device for security reason - # so we keep the plain text version in cache until we receive the - # message. We don't send it directly to bridge to keep a workflow - # similar to plain text MUC, so when we see it in frontend we know - # that it has been sent correctly. - client._xep_0384_muc_cache[key] = body - timer = client._xep_0384_muc_cache_timer - if timer is None: - client._xep_0384_muc_cache_timer = reactor.callLater( - MUC_CACHE_TTL, self._expireMUCCache, client) + device_list: Dict[int, Optional[str]] = {} + namespace: Optional[str] = None + + list_elt = item_elt.find(f"{{{twomemo.twomemo.NAMESPACE}}}devices") + if list_elt is not None: + try: + device_list = twomemo.etree.parse_device_list(list_elt) + except XMLSchemaValidationError: + pass else: - timer.reset(MUC_CACHE_TTL) - - encryption_data = yield defer.ensureDeferred(self.encryptMessage( - client, to_jids, body, feedback_jid=feedback_jid)) + namespace = twomemo.twomemo.NAMESPACE - encrypted_elt = message_elt.addElement((NS_OMEMO, 'encrypted')) - header_elt = encrypted_elt.addElement('header') - header_elt['sid'] = str(encryption_data['sid']) + list_elt = item_elt.find(f"{{{oldmemo.oldmemo.NAMESPACE}}}list") + if list_elt is not None: + try: + device_list = oldmemo.etree.parse_device_list(list_elt) + except XMLSchemaValidationError: + pass + else: + namespace = oldmemo.oldmemo.NAMESPACE - for key_data in encryption_data['keys'].values(): - for rid, data in key_data.items(): - key_elt = header_elt.addElement( - 'key', - content=b64enc(data['data']) - ) - key_elt['rid'] = str(rid) - if data['pre_key']: - key_elt['prekey'] = 'true' + if namespace is None: + log.warning( + f"Malformed device list update item:" + f" {ET.tostring(item_elt, encoding='unicode')}" + ) + return - header_elt.addElement( - 'iv', - content=b64enc(encryption_data['iv'])) - try: - encrypted_elt.addElement( - 'payload', - content=b64enc(encryption_data['payload'])) - except KeyError: - pass + session_manager = await self.__prepare_for_profile(profile) + + await session_manager.update_device_list( + namespace, + sender.userhost(), + device_list + )