comparison libervia/backend/plugins/plugin_xep_0384.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_xep_0384.py@c23cad65ae99
children 040095a5dc7f
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3 # Libervia plugin for OMEMO encryption
4 # Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 import base64
20 from datetime import datetime
21 import enum
22 import logging
23 import time
24 from typing import \
25 Any, Dict, FrozenSet, List, Literal, NamedTuple, Optional, Set, Type, Union, cast
26 import uuid
27 import xml.etree.ElementTree as ET
28 from xml.sax.saxutils import quoteattr
29
30 from typing_extensions import Final, Never, assert_never
31 from wokkel import muc, pubsub # type: ignore[import]
32 import xmlschema
33
34 from libervia.backend.core import exceptions
35 from libervia.backend.core.constants import Const as C
36 from libervia.backend.core.core_types import MessageData, SatXMPPEntity
37 from libervia.backend.core.i18n import _, D_
38 from libervia.backend.core.log import getLogger, Logger
39 from libervia.backend.core.sat_main import SAT
40 from libervia.backend.core.xmpp import SatXMPPClient
41 from libervia.backend.memory import persistent
42 from libervia.backend.plugins.plugin_misc_text_commands import TextCommands
43 from libervia.backend.plugins.plugin_xep_0045 import XEP_0045
44 from libervia.backend.plugins.plugin_xep_0060 import XEP_0060
45 from libervia.backend.plugins.plugin_xep_0163 import XEP_0163
46 from libervia.backend.plugins.plugin_xep_0334 import XEP_0334
47 from libervia.backend.plugins.plugin_xep_0359 import XEP_0359
48 from libervia.backend.plugins.plugin_xep_0420 import \
49 XEP_0420, SCEAffixPolicy, SCEAffixValues, SCEProfile
50 from libervia.backend.tools import xml_tools
51 from twisted.internet import defer
52 from twisted.words.protocols.jabber import error, jid
53 from twisted.words.xish import domish
54
55 try:
56 import omemo
57 import omemo.identity_key_pair
58 import twomemo
59 import twomemo.etree
60 import oldmemo
61 import oldmemo.etree
62 import oldmemo.migrations
63 from xmlschema import XMLSchemaValidationError
64
65 # An explicit version check of the OMEMO libraries should not be required here, since
66 # the stored data is fully versioned and the library will complain if a downgrade is
67 # attempted.
68 except ImportError as import_error:
69 raise exceptions.MissingModule(
70 "You are missing one or more package required by the OMEMO plugin. Please"
71 " download/install the pip packages 'omemo', 'twomemo', 'oldmemo' and"
72 f" 'xmlschema'.\nexception: {import_error}"
73 ) from import_error
74
75
76 __all__ = [ # pylint: disable=unused-variable
77 "PLUGIN_INFO",
78 "OMEMO"
79 ]
80
81 log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call]
82
83
84 PLUGIN_INFO = {
85 C.PI_NAME: "OMEMO",
86 C.PI_IMPORT_NAME: "XEP-0384",
87 C.PI_TYPE: "SEC",
88 C.PI_PROTOCOLS: [ "XEP-0384" ],
89 C.PI_DEPENDENCIES: [ "XEP-0163", "XEP-0280", "XEP-0334", "XEP-0060", "XEP-0420" ],
90 C.PI_RECOMMENDATIONS: [ "XEP-0045", "XEP-0359", C.TEXT_CMDS ],
91 C.PI_MAIN: "OMEMO",
92 C.PI_HANDLER: "no",
93 C.PI_DESCRIPTION: _("""Implementation of OMEMO"""),
94 }
95
96
97 PARAM_CATEGORY = "Security"
98 PARAM_NAME = "omemo_policy"
99
100
101 class LogHandler(logging.Handler):
102 """
103 Redirect python-omemo's log output to Libervia's log system.
104 """
105
106 def emit(self, record: logging.LogRecord) -> None:
107 log.log(record.levelname, record.getMessage())
108
109
110 sm_logger = logging.getLogger(omemo.SessionManager.LOG_TAG)
111 sm_logger.setLevel(logging.DEBUG)
112 sm_logger.propagate = False
113 sm_logger.addHandler(LogHandler())
114
115
116 ikp_logger = logging.getLogger(omemo.identity_key_pair.IdentityKeyPair.LOG_TAG)
117 ikp_logger.setLevel(logging.DEBUG)
118 ikp_logger.propagate = False
119 ikp_logger.addHandler(LogHandler())
120
121
122 # TODO: Add handling for device labels, i.e. show device labels in the trust UI and give
123 # the user a way to change their own device label.
124
125
126 class MUCPlaintextCacheKey(NamedTuple):
127 # pylint: disable=invalid-name
128 """
129 Structure identifying an encrypted message sent to a MUC.
130 """
131
132 client: SatXMPPClient
133 room_jid: jid.JID
134 message_uid: str
135
136
137 @enum.unique
138 class TrustLevel(enum.Enum):
139 """
140 The trust levels required for ATM and BTBV.
141 """
142
143 TRUSTED: str = "TRUSTED"
144 BLINDLY_TRUSTED: str = "BLINDLY_TRUSTED"
145 UNDECIDED: str = "UNDECIDED"
146 DISTRUSTED: str = "DISTRUSTED"
147
148
149 TWOMEMO_DEVICE_LIST_NODE = "urn:xmpp:omemo:2:devices"
150 OLDMEMO_DEVICE_LIST_NODE = "eu.siacs.conversations.axolotl.devicelist"
151
152
153 class StorageImpl(omemo.Storage):
154 """
155 Storage implementation for OMEMO based on :class:`persistent.LazyPersistentBinaryDict`
156 """
157
158 def __init__(self, profile: str) -> None:
159 """
160 @param profile: The profile this OMEMO data belongs to.
161 """
162
163 # persistent.LazyPersistentBinaryDict does not cache at all, so keep the caching
164 # option of omemo.Storage enabled.
165 super().__init__()
166
167 self.__storage = persistent.LazyPersistentBinaryDict("XEP-0384", profile)
168
169 async def _load(self, key: str) -> omemo.Maybe[omemo.JSONType]:
170 try:
171 return omemo.Just(await self.__storage[key])
172 except KeyError:
173 return omemo.Nothing()
174 except Exception as e:
175 raise omemo.StorageException(f"Error while loading key {key}") from e
176
177 async def _store(self, key: str, value: omemo.JSONType) -> None:
178 try:
179 await self.__storage.force(key, value)
180 except Exception as e:
181 raise omemo.StorageException(f"Error while storing key {key}: {value}") from e
182
183 async def _delete(self, key: str) -> None:
184 try:
185 await self.__storage.remove(key)
186 except KeyError:
187 pass
188 except Exception as e:
189 raise omemo.StorageException(f"Error while deleting key {key}") from e
190
191
192 class LegacyStorageImpl(oldmemo.migrations.LegacyStorage):
193 """
194 Legacy storage implementation to migrate data from the old XEP-0384 plugin.
195 """
196
197 KEY_DEVICE_ID = "DEVICE_ID"
198 KEY_STATE = "STATE"
199 KEY_SESSION = "SESSION"
200 KEY_ACTIVE_DEVICES = "DEVICES"
201 KEY_INACTIVE_DEVICES = "INACTIVE_DEVICES"
202 KEY_TRUST = "TRUST"
203 KEY_ALL_JIDS = "ALL_JIDS"
204
205 def __init__(self, profile: str, own_bare_jid: str) -> None:
206 """
207 @param profile: The profile this OMEMO data belongs to.
208 @param own_bare_jid: The own bare JID, to return by the :meth:`load_own_data` call.
209 """
210
211 self.__storage = persistent.LazyPersistentBinaryDict("XEP-0384", profile)
212 self.__own_bare_jid = own_bare_jid
213
214 async def loadOwnData(self) -> Optional[oldmemo.migrations.OwnData]:
215 own_device_id = await self.__storage.get(LegacyStorageImpl.KEY_DEVICE_ID, None)
216 if own_device_id is None:
217 return None
218
219 return oldmemo.migrations.OwnData(
220 own_bare_jid=self.__own_bare_jid,
221 own_device_id=own_device_id
222 )
223
224 async def deleteOwnData(self) -> None:
225 try:
226 await self.__storage.remove(LegacyStorageImpl.KEY_DEVICE_ID)
227 except KeyError:
228 pass
229
230 async def loadState(self) -> Optional[oldmemo.migrations.State]:
231 return cast(
232 Optional[oldmemo.migrations.State],
233 await self.__storage.get(LegacyStorageImpl.KEY_STATE, None)
234 )
235
236 async def deleteState(self) -> None:
237 try:
238 await self.__storage.remove(LegacyStorageImpl.KEY_STATE)
239 except KeyError:
240 pass
241
242 async def loadSession(
243 self,
244 bare_jid: str,
245 device_id: int
246 ) -> Optional[oldmemo.migrations.Session]:
247 key = "\n".join([ LegacyStorageImpl.KEY_SESSION, bare_jid, str(device_id) ])
248
249 return cast(
250 Optional[oldmemo.migrations.Session],
251 await self.__storage.get(key, None)
252 )
253
254 async def deleteSession(self, bare_jid: str, device_id: int) -> None:
255 key = "\n".join([ LegacyStorageImpl.KEY_SESSION, bare_jid, str(device_id) ])
256
257 try:
258 await self.__storage.remove(key)
259 except KeyError:
260 pass
261
262 async def loadActiveDevices(self, bare_jid: str) -> Optional[List[int]]:
263 key = "\n".join([ LegacyStorageImpl.KEY_ACTIVE_DEVICES, bare_jid ])
264
265 return cast(
266 Optional[List[int]],
267 await self.__storage.get(key, None)
268 )
269
270 async def loadInactiveDevices(self, bare_jid: str) -> Optional[Dict[int, int]]:
271 key = "\n".join([ LegacyStorageImpl.KEY_INACTIVE_DEVICES, bare_jid ])
272
273 return cast(
274 Optional[Dict[int, int]],
275 await self.__storage.get(key, None)
276 )
277
278 async def deleteActiveDevices(self, bare_jid: str) -> None:
279 key = "\n".join([ LegacyStorageImpl.KEY_ACTIVE_DEVICES, bare_jid ])
280
281 try:
282 await self.__storage.remove(key)
283 except KeyError:
284 pass
285
286 async def deleteInactiveDevices(self, bare_jid: str) -> None:
287 key = "\n".join([ LegacyStorageImpl.KEY_INACTIVE_DEVICES, bare_jid ])
288
289 try:
290 await self.__storage.remove(key)
291 except KeyError:
292 pass
293
294 async def loadTrust(
295 self,
296 bare_jid: str,
297 device_id: int
298 ) -> Optional[oldmemo.migrations.Trust]:
299 key = "\n".join([ LegacyStorageImpl.KEY_TRUST, bare_jid, str(device_id) ])
300
301 return cast(
302 Optional[oldmemo.migrations.Trust],
303 await self.__storage.get(key, None)
304 )
305
306 async def deleteTrust(self, bare_jid: str, device_id: int) -> None:
307 key = "\n".join([ LegacyStorageImpl.KEY_TRUST, bare_jid, str(device_id) ])
308
309 try:
310 await self.__storage.remove(key)
311 except KeyError:
312 pass
313
314 async def listJIDs(self) -> Optional[List[str]]:
315 bare_jids = await self.__storage.get(LegacyStorageImpl.KEY_ALL_JIDS, None)
316
317 return None if bare_jids is None else list(bare_jids)
318
319 async def deleteJIDList(self) -> None:
320 try:
321 await self.__storage.remove(LegacyStorageImpl.KEY_ALL_JIDS)
322 except KeyError:
323 pass
324
325
326 async def download_oldmemo_bundle(
327 client: SatXMPPClient,
328 xep_0060: XEP_0060,
329 bare_jid: str,
330 device_id: int
331 ) -> oldmemo.oldmemo.BundleImpl:
332 """Download the oldmemo bundle corresponding to a specific device.
333
334 @param client: The client.
335 @param xep_0060: The XEP-0060 plugin instance to use for pubsub interactions.
336 @param bare_jid: The bare JID the device belongs to.
337 @param device_id: The id of the device.
338 @return: The bundle.
339 @raise BundleDownloadFailed: if the download failed. Feel free to raise a subclass
340 instead.
341 """
342 # Bundle downloads are needed by the session manager and for migrations from legacy,
343 # thus it is made a separate function.
344
345 namespace = oldmemo.oldmemo.NAMESPACE
346 node = f"eu.siacs.conversations.axolotl.bundles:{device_id}"
347
348 try:
349 items, __ = await xep_0060.get_items(client, jid.JID(bare_jid), node, max_items=1)
350 except Exception as e:
351 raise omemo.BundleDownloadFailed(
352 f"Bundle download failed for {bare_jid}: {device_id} under namespace"
353 f" {namespace}"
354 ) from e
355
356 if len(items) != 1:
357 raise omemo.BundleDownloadFailed(
358 f"Bundle download failed for {bare_jid}: {device_id} under namespace"
359 f" {namespace}: Unexpected number of items retrieved: {len(items)}."
360 )
361
362 element = \
363 next(iter(xml_tools.domish_elt_2_et_elt(cast(domish.Element, items[0]))), None)
364 if element is None:
365 raise omemo.BundleDownloadFailed(
366 f"Bundle download failed for {bare_jid}: {device_id} under namespace"
367 f" {namespace}: Item download succeeded but parsing failed: {element}."
368 )
369
370 try:
371 return oldmemo.etree.parse_bundle(element, bare_jid, device_id)
372 except Exception as e:
373 raise omemo.BundleDownloadFailed(
374 f"Bundle parsing failed for {bare_jid}: {device_id} under namespace"
375 f" {namespace}"
376 ) from e
377
378
379 # ATM only supports protocols based on SCE, which is currently only omemo:2, and relies on
380 # so many implementation details of the encryption protocol that it makes more sense to
381 # add ATM to the OMEMO plugin directly instead of having it a separate Libervia plugin.
382 NS_TM: Final = "urn:xmpp:tm:1"
383 NS_ATM: Final = "urn:xmpp:atm:1"
384
385
386 TRUST_MESSAGE_SCHEMA = xmlschema.XMLSchema("""<?xml version='1.0' encoding='UTF-8'?>
387 <xs:schema xmlns:xs='http://www.w3.org/2001/XMLSchema'
388 targetNamespace='urn:xmpp:tm:1'
389 xmlns='urn:xmpp:tm:1'
390 elementFormDefault='qualified'>
391
392 <xs:element name='trust-message'>
393 <xs:complexType>
394 <xs:sequence>
395 <xs:element ref='key-owner' minOccurs='1' maxOccurs='unbounded'/>
396 </xs:sequence>
397 <xs:attribute name='usage' type='xs:string' use='required'/>
398 <xs:attribute name='encryption' type='xs:string' use='required'/>
399 </xs:complexType>
400 </xs:element>
401
402 <xs:element name='key-owner'>
403 <xs:complexType>
404 <xs:sequence>
405 <xs:element
406 name='trust' type='xs:base64Binary' minOccurs='0' maxOccurs='unbounded'/>
407 <xs:element
408 name='distrust' type='xs:base64Binary' minOccurs='0' maxOccurs='unbounded'/>
409 </xs:sequence>
410 <xs:attribute name='jid' type='xs:string' use='required'/>
411 </xs:complexType>
412 </xs:element>
413 </xs:schema>
414 """)
415
416
417 # This is compatible with omemo:2's SCE profile
418 TM_SCE_PROFILE = SCEProfile(
419 rpad_policy=SCEAffixPolicy.REQUIRED,
420 time_policy=SCEAffixPolicy.REQUIRED,
421 to_policy=SCEAffixPolicy.OPTIONAL,
422 from_policy=SCEAffixPolicy.OPTIONAL,
423 custom_policies={}
424 )
425
426
427 class TrustUpdate(NamedTuple):
428 # pylint: disable=invalid-name
429 """
430 An update to the trust status of an identity key, used by Automatic Trust Management.
431 """
432
433 target_jid: jid.JID
434 target_key: bytes
435 target_trust: bool
436
437
438 class TrustMessageCacheEntry(NamedTuple):
439 # pylint: disable=invalid-name
440 """
441 An entry in the trust message cache used by ATM.
442 """
443
444 sender_jid: jid.JID
445 sender_key: bytes
446 timestamp: datetime
447 trust_update: TrustUpdate
448
449
450 class PartialTrustMessage(NamedTuple):
451 # pylint: disable=invalid-name
452 """
453 A structure representing a partial trust message, used by :func:`send_trust_messages`
454 to build trust messages.
455 """
456
457 recipient_jid: jid.JID
458 updated_jid: jid.JID
459 trust_updates: FrozenSet[TrustUpdate]
460
461
462 async def manage_trust_message_cache(
463 client: SatXMPPClient,
464 session_manager: omemo.SessionManager,
465 applied_trust_updates: FrozenSet[TrustUpdate]
466 ) -> None:
467 """Manage the ATM trust message cache after trust updates have been applied.
468
469 @param client: The client this operation runs under.
470 @param session_manager: The session manager to use.
471 @param applied_trust_updates: The trust updates that have already been applied,
472 triggering this cache management run.
473 """
474
475 trust_message_cache = persistent.LazyPersistentBinaryDict(
476 "XEP-0384/TM",
477 client.profile
478 )
479
480 # Load cache entries
481 cache_entries = cast(
482 Set[TrustMessageCacheEntry],
483 await trust_message_cache.get("cache", set())
484 )
485
486 # Expire cache entries that were overwritten by the applied trust updates
487 cache_entries_by_target = {
488 (
489 cache_entry.trust_update.target_jid.userhostJID(),
490 cache_entry.trust_update.target_key
491 ): cache_entry
492 for cache_entry
493 in cache_entries
494 }
495
496 for trust_update in applied_trust_updates:
497 cache_entry = cache_entries_by_target.get(
498 (trust_update.target_jid.userhostJID(), trust_update.target_key),
499 None
500 )
501
502 if cache_entry is not None:
503 cache_entries.remove(cache_entry)
504
505 # Apply cached Trust Messages by newly trusted devices
506 new_trust_updates: Set[TrustUpdate] = set()
507
508 for trust_update in applied_trust_updates:
509 if trust_update.target_trust:
510 # Iterate over a copy such that cache_entries can be modified
511 for cache_entry in set(cache_entries):
512 if (
513 cache_entry.sender_jid.userhostJID()
514 == trust_update.target_jid.userhostJID()
515 and cache_entry.sender_key == trust_update.target_key
516 ):
517 trust_level = (
518 TrustLevel.TRUSTED
519 if cache_entry.trust_update.target_trust
520 else TrustLevel.DISTRUSTED
521 )
522
523 # Apply the trust update
524 await session_manager.set_trust(
525 cache_entry.trust_update.target_jid.userhost(),
526 cache_entry.trust_update.target_key,
527 trust_level.name
528 )
529
530 # Track the fact that this trust update has been applied
531 new_trust_updates.add(cache_entry.trust_update)
532
533 # Remove the corresponding cache entry
534 cache_entries.remove(cache_entry)
535
536 # Store the updated cache entries
537 await trust_message_cache.force("cache", cache_entries)
538
539 # TODO: Notify the user ("feedback") about automatically updated trust?
540
541 if len(new_trust_updates) > 0:
542 # If any trust has been updated, recursively perform another run of cache
543 # management
544 await manage_trust_message_cache(
545 client,
546 session_manager,
547 frozenset(new_trust_updates)
548 )
549
550
551 async def get_trust_as_trust_updates(
552 session_manager: omemo.SessionManager,
553 target_jid: jid.JID
554 ) -> FrozenSet[TrustUpdate]:
555 """Get the trust status of all known keys of a JID as trust updates for use with ATM.
556
557 @param session_manager: The session manager to load the trust from.
558 @param target_jid: The JID to load the trust for.
559 @return: The trust updates encoding the trust status of all known keys of the JID that
560 are either explicitly trusted or distrusted. Undecided keys are not included in
561 the trust updates.
562 """
563
564 devices = await session_manager.get_device_information(target_jid.userhost())
565
566 trust_updates: Set[TrustUpdate] = set()
567
568 for device in devices:
569 trust_level = TrustLevel(device.trust_level_name)
570 target_trust: bool
571
572 if trust_level is TrustLevel.TRUSTED:
573 target_trust = True
574 elif trust_level is TrustLevel.DISTRUSTED:
575 target_trust = False
576 else:
577 # Skip devices that are not explicitly trusted or distrusted
578 continue
579
580 trust_updates.add(TrustUpdate(
581 target_jid=target_jid.userhostJID(),
582 target_key=device.identity_key,
583 target_trust=target_trust
584 ))
585
586 return frozenset(trust_updates)
587
588
589 async def send_trust_messages(
590 client: SatXMPPClient,
591 session_manager: omemo.SessionManager,
592 applied_trust_updates: FrozenSet[TrustUpdate]
593 ) -> None:
594 """Send information about updated trust to peers via ATM (XEP-0450).
595
596 @param client: The client.
597 @param session_manager: The session manager.
598 @param applied_trust_updates: The trust updates that have already been applied, to
599 notify other peers about.
600 """
601 # NOTE: This currently sends information about oldmemo trust too. This is not
602 # specified and experimental, but since twomemo and oldmemo share the same identity
603 # keys and trust systems, this could be a cool side effect.
604
605 # Send Trust Messages for newly trusted and distrusted devices
606 own_jid = client.jid.userhostJID()
607 own_trust_updates = await get_trust_as_trust_updates(session_manager, own_jid)
608
609 # JIDs of which at least one device's trust has been updated
610 updated_jids = frozenset({
611 trust_update.target_jid.userhostJID()
612 for trust_update
613 in applied_trust_updates
614 })
615
616 trust_messages: Set[PartialTrustMessage] = set()
617
618 for updated_jid in updated_jids:
619 # Get the trust updates for that JID
620 trust_updates = frozenset({
621 trust_update for trust_update in applied_trust_updates
622 if trust_update.target_jid.userhostJID() == updated_jid
623 })
624
625 if updated_jid == own_jid:
626 # If the own JID is updated, _all_ peers have to be notified
627 # TODO: Using my author's privilege here to shamelessly access private fields
628 # and storage keys until I've added public API to get a list of peers to
629 # python-omemo.
630 storage: omemo.Storage = getattr(session_manager, "_SessionManager__storage")
631 peer_jids = frozenset({
632 jid.JID(bare_jid).userhostJID() for bare_jid in (await storage.load_list(
633 f"/{OMEMO.NS_TWOMEMO}/bare_jids",
634 str
635 )).maybe([])
636 })
637
638 if len(peer_jids) == 0:
639 # If there are no peers to notify, notify our other devices about the
640 # changes directly
641 trust_messages.add(PartialTrustMessage(
642 recipient_jid=own_jid,
643 updated_jid=own_jid,
644 trust_updates=trust_updates
645 ))
646 else:
647 # Otherwise, notify all peers about the changes in trust and let carbons
648 # handle the copy to our own JID
649 for peer_jid in peer_jids:
650 trust_messages.add(PartialTrustMessage(
651 recipient_jid=peer_jid,
652 updated_jid=own_jid,
653 trust_updates=trust_updates
654 ))
655
656 # Also send full trust information about _every_ peer to our newly
657 # trusted devices
658 peer_trust_updates = \
659 await get_trust_as_trust_updates(session_manager, peer_jid)
660
661 trust_messages.add(PartialTrustMessage(
662 recipient_jid=own_jid,
663 updated_jid=peer_jid,
664 trust_updates=peer_trust_updates
665 ))
666
667 # Send information about our own devices to our newly trusted devices
668 trust_messages.add(PartialTrustMessage(
669 recipient_jid=own_jid,
670 updated_jid=own_jid,
671 trust_updates=own_trust_updates
672 ))
673 else:
674 # Notify our other devices about the changes in trust
675 trust_messages.add(PartialTrustMessage(
676 recipient_jid=own_jid,
677 updated_jid=updated_jid,
678 trust_updates=trust_updates
679 ))
680
681 # Send a summary of our own trust to newly trusted devices
682 trust_messages.add(PartialTrustMessage(
683 recipient_jid=updated_jid,
684 updated_jid=own_jid,
685 trust_updates=own_trust_updates
686 ))
687
688 # All trust messages prepared. Merge all trust messages directed at the same
689 # recipient.
690 recipient_jids = { trust_message.recipient_jid for trust_message in trust_messages }
691
692 for recipient_jid in recipient_jids:
693 updated: Dict[jid.JID, Set[TrustUpdate]] = {}
694
695 for trust_message in trust_messages:
696 # Merge trust messages directed at that recipient
697 if trust_message.recipient_jid == recipient_jid:
698 # Merge the trust updates
699 updated[trust_message.updated_jid] = \
700 updated.get(trust_message.updated_jid, set())
701
702 updated[trust_message.updated_jid] |= trust_message.trust_updates
703
704 # Build the trust message
705 trust_message_elt = domish.Element((NS_TM, "trust-message"))
706 trust_message_elt["usage"] = NS_ATM
707 trust_message_elt["encryption"] = twomemo.twomemo.NAMESPACE
708
709 for updated_jid, trust_updates in updated.items():
710 key_owner_elt = trust_message_elt.addElement((NS_TM, "key-owner"))
711 key_owner_elt["jid"] = updated_jid.userhost()
712
713 for trust_update in trust_updates:
714 serialized_identity_key = \
715 base64.b64encode(trust_update.target_key).decode("ASCII")
716
717 if trust_update.target_trust:
718 key_owner_elt.addElement(
719 (NS_TM, "trust"),
720 content=serialized_identity_key
721 )
722 else:
723 key_owner_elt.addElement(
724 (NS_TM, "distrust"),
725 content=serialized_identity_key
726 )
727
728 # Finally, encrypt and send the trust message!
729 message_data = client.generate_message_xml(MessageData({
730 "from": own_jid,
731 "to": recipient_jid,
732 "uid": str(uuid.uuid4()),
733 "message": {},
734 "subject": {},
735 "type": C.MESS_TYPE_CHAT,
736 "extra": {},
737 "timestamp": time.time()
738 }))
739
740 message_data["xml"].addChild(trust_message_elt)
741
742 plaintext = XEP_0420.pack_stanza(TM_SCE_PROFILE, message_data["xml"])
743
744 feedback_jid = recipient_jid
745
746 # TODO: The following is mostly duplicate code
747 try:
748 messages, encryption_errors = await session_manager.encrypt(
749 frozenset({ own_jid.userhost(), recipient_jid.userhost() }),
750 { OMEMO.NS_TWOMEMO: plaintext },
751 backend_priority_order=[ OMEMO.NS_TWOMEMO ],
752 identifier=feedback_jid.userhost()
753 )
754 except Exception as e:
755 msg = _(
756 # pylint: disable=consider-using-f-string
757 "Can't encrypt message for {entities}: {reason}".format(
758 entities=', '.join({ own_jid.userhost(), recipient_jid.userhost() }),
759 reason=e
760 )
761 )
762 log.warning(msg)
763 client.feedback(feedback_jid, msg, {
764 C.MESS_EXTRA_INFO: C.EXTRA_INFO_ENCR_ERR
765 })
766 raise e
767
768 if len(encryption_errors) > 0:
769 log.warning(
770 f"Ignored the following non-critical encryption errors:"
771 f" {encryption_errors}"
772 )
773
774 encrypted_errors_stringified = ", ".join([
775 f"device {err.device_id} of {err.bare_jid} under namespace"
776 f" {err.namespace}"
777 for err
778 in encryption_errors
779 ])
780
781 client.feedback(
782 feedback_jid,
783 D_(
784 "There were non-critical errors during encryption resulting in some"
785 " of your destinees' devices potentially not receiving the message."
786 " This happens when the encryption data/key material of a device is"
787 " incomplete or broken, which shouldn't happen for actively used"
788 " devices, and can usually be ignored. The following devices are"
789 f" affected: {encrypted_errors_stringified}."
790 )
791 )
792
793 message = next(
794 message for message in messages
795 if message.namespace == OMEMO.NS_TWOMEMO
796 )
797
798 # Add the encrypted element
799 message_data["xml"].addChild(xml_tools.et_elt_2_domish_elt(
800 twomemo.etree.serialize_message(message)
801 ))
802
803 await client.a_send(message_data["xml"])
804
805
806 def make_session_manager(sat: SAT, profile: str) -> Type[omemo.SessionManager]:
807 """
808 @param sat: The SAT instance.
809 @param profile: The profile.
810 @return: A non-abstract subclass of :class:`~omemo.session_manager.SessionManager`
811 with XMPP interactions and trust handled via the SAT instance.
812 """
813
814 client = sat.get_client(profile)
815 xep_0060 = cast(XEP_0060, sat.plugins["XEP-0060"])
816
817 class SessionManagerImpl(omemo.SessionManager):
818 """
819 Session manager implementation handling XMPP interactions and trust via an
820 instance of :class:`~sat.core.sat_main.SAT`.
821 """
822
823 @staticmethod
824 async def _upload_bundle(bundle: omemo.Bundle) -> None:
825 if isinstance(bundle, twomemo.twomemo.BundleImpl):
826 element = twomemo.etree.serialize_bundle(bundle)
827
828 node = "urn:xmpp:omemo:2:bundles"
829 try:
830 await xep_0060.send_item(
831 client,
832 client.jid.userhostJID(),
833 node,
834 xml_tools.et_elt_2_domish_elt(element),
835 item_id=str(bundle.device_id),
836 extra={
837 XEP_0060.EXTRA_PUBLISH_OPTIONS: {
838 XEP_0060.OPT_MAX_ITEMS: "max"
839 },
840 XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "raise"
841 }
842 )
843 except (error.StanzaError, Exception) as e:
844 if (
845 isinstance(e, error.StanzaError)
846 and e.condition == "conflict"
847 and e.appCondition is not None
848 # pylint: disable=no-member
849 and e.appCondition.name == "precondition-not-met"
850 ):
851 # publish options couldn't be set on the fly, manually reconfigure
852 # the node and publish again
853 raise omemo.BundleUploadFailed(
854 f"precondition-not-met: {bundle}"
855 ) from e
856 # TODO: What can I do here? The correct node configuration is a
857 # MUST in the XEP.
858
859 raise omemo.BundleUploadFailed(
860 f"Bundle upload failed: {bundle}"
861 ) from e
862
863 return
864
865 if isinstance(bundle, oldmemo.oldmemo.BundleImpl):
866 element = oldmemo.etree.serialize_bundle(bundle)
867
868 node = f"eu.siacs.conversations.axolotl.bundles:{bundle.device_id}"
869 try:
870 await xep_0060.send_item(
871 client,
872 client.jid.userhostJID(),
873 node,
874 xml_tools.et_elt_2_domish_elt(element),
875 item_id=xep_0060.ID_SINGLETON,
876 extra={
877 XEP_0060.EXTRA_PUBLISH_OPTIONS: { XEP_0060.OPT_MAX_ITEMS: 1 },
878 XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options"
879 }
880 )
881 except Exception as e:
882 raise omemo.BundleUploadFailed(
883 f"Bundle upload failed: {bundle}"
884 ) from e
885
886 return
887
888 raise omemo.UnknownNamespace(f"Unknown namespace: {bundle.namespace}")
889
890 @staticmethod
891 async def _download_bundle(
892 namespace: str,
893 bare_jid: str,
894 device_id: int
895 ) -> omemo.Bundle:
896 if namespace == twomemo.twomemo.NAMESPACE:
897 node = "urn:xmpp:omemo:2:bundles"
898
899 try:
900 items, __ = await xep_0060.get_items(
901 client,
902 jid.JID(bare_jid),
903 node,
904 item_ids=[ str(device_id) ]
905 )
906 except Exception as e:
907 raise omemo.BundleDownloadFailed(
908 f"Bundle download failed for {bare_jid}: {device_id} under"
909 f" namespace {namespace}"
910 ) from e
911
912 if len(items) != 1:
913 raise omemo.BundleDownloadFailed(
914 f"Bundle download failed for {bare_jid}: {device_id} under"
915 f" namespace {namespace}: Unexpected number of items retrieved:"
916 f" {len(items)}."
917 )
918
919 element = next(
920 iter(xml_tools.domish_elt_2_et_elt(cast(domish.Element, items[0]))),
921 None
922 )
923 if element is None:
924 raise omemo.BundleDownloadFailed(
925 f"Bundle download failed for {bare_jid}: {device_id} under"
926 f" namespace {namespace}: Item download succeeded but parsing"
927 f" failed: {element}."
928 )
929
930 try:
931 return twomemo.etree.parse_bundle(element, bare_jid, device_id)
932 except Exception as e:
933 raise omemo.BundleDownloadFailed(
934 f"Bundle parsing failed for {bare_jid}: {device_id} under"
935 f" namespace {namespace}"
936 ) from e
937
938 if namespace == oldmemo.oldmemo.NAMESPACE:
939 return await download_oldmemo_bundle(
940 client,
941 xep_0060,
942 bare_jid,
943 device_id
944 )
945
946 raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}")
947
948 @staticmethod
949 async def _delete_bundle(namespace: str, device_id: int) -> None:
950 if namespace == twomemo.twomemo.NAMESPACE:
951 node = "urn:xmpp:omemo:2:bundles"
952
953 try:
954 await xep_0060.retract_items(
955 client,
956 client.jid.userhostJID(),
957 node,
958 [ str(device_id) ],
959 notify=False
960 )
961 except Exception as e:
962 raise omemo.BundleDeletionFailed(
963 f"Bundle deletion failed for {device_id} under namespace"
964 f" {namespace}"
965 ) from e
966
967 return
968
969 if namespace == oldmemo.oldmemo.NAMESPACE:
970 node = f"eu.siacs.conversations.axolotl.bundles:{device_id}"
971
972 try:
973 await xep_0060.deleteNode(client, client.jid.userhostJID(), node)
974 except Exception as e:
975 raise omemo.BundleDeletionFailed(
976 f"Bundle deletion failed for {device_id} under namespace"
977 f" {namespace}"
978 ) from e
979
980 return
981
982 raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}")
983
984 @staticmethod
985 async def _upload_device_list(
986 namespace: str,
987 device_list: Dict[int, Optional[str]]
988 ) -> None:
989 element: Optional[ET.Element] = None
990 node: Optional[str] = None
991
992 if namespace == twomemo.twomemo.NAMESPACE:
993 element = twomemo.etree.serialize_device_list(device_list)
994 node = TWOMEMO_DEVICE_LIST_NODE
995 if namespace == oldmemo.oldmemo.NAMESPACE:
996 element = oldmemo.etree.serialize_device_list(device_list)
997 node = OLDMEMO_DEVICE_LIST_NODE
998
999 if element is None or node is None:
1000 raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}")
1001
1002 try:
1003 await xep_0060.send_item(
1004 client,
1005 client.jid.userhostJID(),
1006 node,
1007 xml_tools.et_elt_2_domish_elt(element),
1008 item_id=xep_0060.ID_SINGLETON,
1009 extra={
1010 XEP_0060.EXTRA_PUBLISH_OPTIONS: {
1011 XEP_0060.OPT_MAX_ITEMS: 1,
1012 XEP_0060.OPT_ACCESS_MODEL: "open"
1013 },
1014 XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "raise"
1015 }
1016 )
1017 except (error.StanzaError, Exception) as e:
1018 if (
1019 isinstance(e, error.StanzaError)
1020 and e.condition == "conflict"
1021 and e.appCondition is not None
1022 # pylint: disable=no-member
1023 and e.appCondition.name == "precondition-not-met"
1024 ):
1025 # publish options couldn't be set on the fly, manually reconfigure the
1026 # node and publish again
1027 raise omemo.DeviceListUploadFailed(
1028 f"precondition-not-met for namespace {namespace}"
1029 ) from e
1030 # TODO: What can I do here? The correct node configuration is a MUST
1031 # in the XEP.
1032
1033 raise omemo.DeviceListUploadFailed(
1034 f"Device list upload failed for namespace {namespace}"
1035 ) from e
1036
1037 @staticmethod
1038 async def _download_device_list(
1039 namespace: str,
1040 bare_jid: str
1041 ) -> Dict[int, Optional[str]]:
1042 node: Optional[str] = None
1043
1044 if namespace == twomemo.twomemo.NAMESPACE:
1045 node = TWOMEMO_DEVICE_LIST_NODE
1046 if namespace == oldmemo.oldmemo.NAMESPACE:
1047 node = OLDMEMO_DEVICE_LIST_NODE
1048
1049 if node is None:
1050 raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}")
1051
1052 try:
1053 items, __ = await xep_0060.get_items(client, jid.JID(bare_jid), node)
1054 except exceptions.NotFound:
1055 return {}
1056 except Exception as e:
1057 raise omemo.DeviceListDownloadFailed(
1058 f"Device list download failed for {bare_jid} under namespace"
1059 f" {namespace}"
1060 ) from e
1061
1062 if len(items) == 0:
1063 return {}
1064
1065 if len(items) != 1:
1066 raise omemo.DeviceListDownloadFailed(
1067 f"Device list download failed for {bare_jid} under namespace"
1068 f" {namespace}: Unexpected number of items retrieved: {len(items)}."
1069 )
1070
1071 element = next(
1072 iter(xml_tools.domish_elt_2_et_elt(cast(domish.Element, items[0]))),
1073 None
1074 )
1075
1076 if element is None:
1077 raise omemo.DeviceListDownloadFailed(
1078 f"Device list download failed for {bare_jid} under namespace"
1079 f" {namespace}: Item download succeeded but parsing failed:"
1080 f" {element}."
1081 )
1082
1083 try:
1084 if namespace == twomemo.twomemo.NAMESPACE:
1085 return twomemo.etree.parse_device_list(element)
1086 if namespace == oldmemo.oldmemo.NAMESPACE:
1087 return oldmemo.etree.parse_device_list(element)
1088 except Exception as e:
1089 raise omemo.DeviceListDownloadFailed(
1090 f"Device list download failed for {bare_jid} under namespace"
1091 f" {namespace}"
1092 ) from e
1093
1094 raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}")
1095
1096 async def _evaluate_custom_trust_level(
1097 self,
1098 device: omemo.DeviceInformation
1099 ) -> omemo.TrustLevel:
1100 # Get the custom trust level
1101 try:
1102 trust_level = TrustLevel(device.trust_level_name)
1103 except ValueError as e:
1104 raise omemo.UnknownTrustLevel(
1105 f"Unknown trust level name {device.trust_level_name}"
1106 ) from e
1107
1108 # The first three cases are a straight-forward mapping
1109 if trust_level is TrustLevel.TRUSTED:
1110 return omemo.TrustLevel.TRUSTED
1111 if trust_level is TrustLevel.UNDECIDED:
1112 return omemo.TrustLevel.UNDECIDED
1113 if trust_level is TrustLevel.DISTRUSTED:
1114 return omemo.TrustLevel.DISTRUSTED
1115
1116 # The blindly trusted case is more complicated, since its evaluation depends
1117 # on the trust system and phase
1118 if trust_level is TrustLevel.BLINDLY_TRUSTED:
1119 # Get the name of the active trust system
1120 trust_system = cast(str, sat.memory.param_get_a(
1121 PARAM_NAME,
1122 PARAM_CATEGORY,
1123 profile_key=profile
1124 ))
1125
1126 # If the trust model is BTBV, blind trust is always enabled
1127 if trust_system == "btbv":
1128 return omemo.TrustLevel.TRUSTED
1129
1130 # If the trust model is ATM, blind trust is disabled in the second phase
1131 # and counts as undecided
1132 if trust_system == "atm":
1133 # Find out whether we are in phase one or two
1134 devices = await self.get_device_information(device.bare_jid)
1135
1136 phase_one = all(TrustLevel(device.trust_level_name) in {
1137 TrustLevel.UNDECIDED,
1138 TrustLevel.BLINDLY_TRUSTED
1139 } for device in devices)
1140
1141 if phase_one:
1142 return omemo.TrustLevel.TRUSTED
1143
1144 return omemo.TrustLevel.UNDECIDED
1145
1146 raise exceptions.InternalError(
1147 f"Unknown trust system active: {trust_system}"
1148 )
1149
1150 assert_never(trust_level)
1151
1152 async def _make_trust_decision(
1153 self,
1154 undecided: FrozenSet[omemo.DeviceInformation],
1155 identifier: Optional[str]
1156 ) -> None:
1157 if identifier is None:
1158 raise omemo.TrustDecisionFailed(
1159 "The identifier must contain the feedback JID."
1160 )
1161
1162 # The feedback JID is transferred via the identifier
1163 feedback_jid = jid.JID(identifier).userhostJID()
1164
1165 # Both the ATM and the BTBV trust models work with blind trust before the
1166 # first manual verification is performed. Thus, we can separate bare JIDs into
1167 # two pools here, one pool of bare JIDs for which blind trust is active, and
1168 # one pool of bare JIDs for which manual trust is used instead.
1169 bare_jids = { device.bare_jid for device in undecided }
1170
1171 blind_trust_bare_jids: Set[str] = set()
1172 manual_trust_bare_jids: Set[str] = set()
1173
1174 # For each bare JID, decide whether blind trust applies
1175 for bare_jid in bare_jids:
1176 # Get all known devices belonging to the bare JID
1177 devices = await self.get_device_information(bare_jid)
1178
1179 # If the trust levels of all devices correspond to those used by blind
1180 # trust, blind trust applies. Otherwise, fall back to manual trust.
1181 if all(TrustLevel(device.trust_level_name) in {
1182 TrustLevel.UNDECIDED,
1183 TrustLevel.BLINDLY_TRUSTED
1184 } for device in devices):
1185 blind_trust_bare_jids.add(bare_jid)
1186 else:
1187 manual_trust_bare_jids.add(bare_jid)
1188
1189 # With the JIDs sorted into their respective pools, the undecided devices can
1190 # be categorized too
1191 blindly_trusted_devices = \
1192 { dev for dev in undecided if dev.bare_jid in blind_trust_bare_jids }
1193 manually_trusted_devices = \
1194 { dev for dev in undecided if dev.bare_jid in manual_trust_bare_jids }
1195
1196 # Blindly trust devices handled by blind trust
1197 if len(blindly_trusted_devices) > 0:
1198 for device in blindly_trusted_devices:
1199 await self.set_trust(
1200 device.bare_jid,
1201 device.identity_key,
1202 TrustLevel.BLINDLY_TRUSTED.name
1203 )
1204
1205 blindly_trusted_devices_stringified = ", ".join([
1206 f"device {device.device_id} of {device.bare_jid} under namespace"
1207 f" {device.namespaces}"
1208 for device
1209 in blindly_trusted_devices
1210 ])
1211
1212 client.feedback(
1213 feedback_jid,
1214 D_(
1215 "Not all destination devices are trusted, unknown devices will be"
1216 " blindly trusted.\nFollowing devices have been automatically"
1217 f" trusted: {blindly_trusted_devices_stringified}."
1218 )
1219 )
1220
1221 # Prompt the user for manual trust decisions on the devices handled by manual
1222 # trust
1223 if len(manually_trusted_devices) > 0:
1224 client.feedback(
1225 feedback_jid,
1226 D_(
1227 "Not all destination devices are trusted, we can't encrypt"
1228 " message in such a situation. Please indicate if you trust"
1229 " those devices or not in the trust manager before we can"
1230 " send this message."
1231 )
1232 )
1233 await self.__prompt_manual_trust(
1234 frozenset(manually_trusted_devices),
1235 feedback_jid
1236 )
1237
1238 @staticmethod
1239 async def _send_message(message: omemo.Message, bare_jid: str) -> None:
1240 element: Optional[ET.Element] = None
1241
1242 if message.namespace == twomemo.twomemo.NAMESPACE:
1243 element = twomemo.etree.serialize_message(message)
1244 if message.namespace == oldmemo.oldmemo.NAMESPACE:
1245 element = oldmemo.etree.serialize_message(message)
1246
1247 if element is None:
1248 raise omemo.UnknownNamespace(f"Unknown namespace: {message.namespace}")
1249
1250 message_data = client.generate_message_xml(MessageData({
1251 "from": client.jid,
1252 "to": jid.JID(bare_jid),
1253 "uid": str(uuid.uuid4()),
1254 "message": {},
1255 "subject": {},
1256 "type": C.MESS_TYPE_CHAT,
1257 "extra": {},
1258 "timestamp": time.time()
1259 }))
1260
1261 message_data["xml"].addChild(xml_tools.et_elt_2_domish_elt(element))
1262
1263 try:
1264 await client.a_send(message_data["xml"])
1265 except Exception as e:
1266 raise omemo.MessageSendingFailed() from e
1267
1268 async def __prompt_manual_trust(
1269 self,
1270 undecided: FrozenSet[omemo.DeviceInformation],
1271 feedback_jid: jid.JID
1272 ) -> None:
1273 """Asks the user to decide on the manual trust level of a set of devices.
1274
1275 Blocks until the user has made a decision and updates the trust levels of all
1276 devices using :meth:`set_trust`.
1277
1278 @param undecided: The set of devices to prompt manual trust for.
1279 @param feedback_jid: The bare JID to redirect feedback to. In case of a one to
1280 one message, the recipient JID. In case of a MUC message, the room JID.
1281 @raise TrustDecisionFailed: if the user cancels the prompt.
1282 """
1283
1284 # This session manager handles encryption with both twomemo and oldmemo, but
1285 # both are currently registered as different plugins and the `defer_xmlui`
1286 # below requires a single namespace identifying the encryption plugin. Thus,
1287 # get the namespace of the requested encryption method from the encryption
1288 # session using the feedback JID.
1289 encryption = client.encryption.getSession(feedback_jid)
1290 if encryption is None:
1291 raise omemo.TrustDecisionFailed(
1292 f"Encryption not requested for {feedback_jid.userhost()}."
1293 )
1294
1295 namespace = encryption["plugin"].namespace
1296
1297 # Casting this to Any, otherwise all calls on the variable cause type errors
1298 # pylint: disable=no-member
1299 trust_ui = cast(Any, xml_tools.XMLUI(
1300 panel_type=C.XMLUI_FORM,
1301 title=D_("OMEMO trust management"),
1302 submit_id=""
1303 ))
1304 trust_ui.addText(D_(
1305 "This is OMEMO trusting system. You'll see below the devices of your "
1306 "contacts, and a checkbox to trust them or not. A trusted device "
1307 "can read your messages in plain text, so be sure to only validate "
1308 "devices that you are sure are belonging to your contact. It's better "
1309 "to do this when you are next to your contact and their device, so "
1310 "you can check the \"fingerprint\" (the number next to the device) "
1311 "yourself. Do *not* validate a device if the fingerprint is wrong!"
1312 ))
1313
1314 own_device, __ = await self.get_own_device_information()
1315
1316 trust_ui.change_container("label")
1317 trust_ui.addLabel(D_("This device ID"))
1318 trust_ui.addText(str(own_device.device_id))
1319 trust_ui.addLabel(D_("This device's fingerprint"))
1320 trust_ui.addText(" ".join(self.format_identity_key(own_device.identity_key)))
1321 trust_ui.addEmpty()
1322 trust_ui.addEmpty()
1323
1324 # At least sort the devices by bare JID such that they aren't listed
1325 # completely random
1326 undecided_ordered = sorted(undecided, key=lambda device: device.bare_jid)
1327
1328 for index, device in enumerate(undecided_ordered):
1329 trust_ui.addLabel(D_("Contact"))
1330 trust_ui.addJid(jid.JID(device.bare_jid))
1331 trust_ui.addLabel(D_("Device ID"))
1332 trust_ui.addText(str(device.device_id))
1333 trust_ui.addLabel(D_("Fingerprint"))
1334 trust_ui.addText(" ".join(self.format_identity_key(device.identity_key)))
1335 trust_ui.addLabel(D_("Trust this device?"))
1336 trust_ui.addBool(f"trust_{index}", value=C.bool_const(False))
1337 trust_ui.addEmpty()
1338 trust_ui.addEmpty()
1339
1340 trust_ui_result = await xml_tools.defer_xmlui(
1341 sat,
1342 trust_ui,
1343 action_extra={ "meta_encryption_trust": namespace },
1344 profile=profile
1345 )
1346
1347 if C.bool(trust_ui_result.get("cancelled", "false")):
1348 raise omemo.TrustDecisionFailed("Trust UI cancelled.")
1349
1350 data_form_result = cast(Dict[str, str], xml_tools.xmlui_result_2_data_form_result(
1351 trust_ui_result
1352 ))
1353
1354 trust_updates: Set[TrustUpdate] = set()
1355
1356 for key, value in data_form_result.items():
1357 if not key.startswith("trust_"):
1358 continue
1359
1360 device = undecided_ordered[int(key[len("trust_"):])]
1361 target_trust = C.bool(value)
1362 trust_level = \
1363 TrustLevel.TRUSTED if target_trust else TrustLevel.DISTRUSTED
1364
1365 await self.set_trust(
1366 device.bare_jid,
1367 device.identity_key,
1368 trust_level.name
1369 )
1370
1371 trust_updates.add(TrustUpdate(
1372 target_jid=jid.JID(device.bare_jid).userhostJID(),
1373 target_key=device.identity_key,
1374 target_trust=target_trust
1375 ))
1376
1377 # Check whether ATM is enabled and handle everything in case it is
1378 trust_system = cast(str, sat.memory.param_get_a(
1379 PARAM_NAME,
1380 PARAM_CATEGORY,
1381 profile_key=profile
1382 ))
1383
1384 if trust_system == "atm":
1385 await manage_trust_message_cache(client, self, frozenset(trust_updates))
1386 await send_trust_messages(client, self, frozenset(trust_updates))
1387
1388 return SessionManagerImpl
1389
1390
1391 async def prepare_for_profile(
1392 sat: SAT,
1393 profile: str,
1394 initial_own_label: Optional[str],
1395 signed_pre_key_rotation_period: int = 7 * 24 * 60 * 60,
1396 pre_key_refill_threshold: int = 99,
1397 max_num_per_session_skipped_keys: int = 1000,
1398 max_num_per_message_skipped_keys: Optional[int] = None
1399 ) -> omemo.SessionManager:
1400 """Prepare the OMEMO library (storage, backends, core) for a specific profile.
1401
1402 @param sat: The SAT instance.
1403 @param profile: The profile.
1404 @param initial_own_label: The initial (optional) label to assign to this device if
1405 supported by any of the backends.
1406 @param signed_pre_key_rotation_period: The rotation period for the signed pre key, in
1407 seconds. The rotation period is recommended to be between one week (the default)
1408 and one month.
1409 @param pre_key_refill_threshold: The number of pre keys that triggers a refill to 100.
1410 Defaults to 99, which means that each pre key gets replaced with a new one right
1411 away. The threshold can not be configured to lower than 25.
1412 @param max_num_per_session_skipped_keys: The maximum number of skipped message keys to
1413 keep around per session. Once the maximum is reached, old message keys are deleted
1414 to make space for newer ones. Accessible via
1415 :attr:`max_num_per_session_skipped_keys`.
1416 @param max_num_per_message_skipped_keys: The maximum number of skipped message keys to
1417 accept in a single message. When set to ``None`` (the default), this parameter
1418 defaults to the per-session maximum (i.e. the value of the
1419 ``max_num_per_session_skipped_keys`` parameter). This parameter may only be 0 if
1420 the per-session maximum is 0, otherwise it must be a number between 1 and the
1421 per-session maximum. Accessible via :attr:`max_num_per_message_skipped_keys`.
1422 @return: A session manager with ``urn:xmpp:omemo:2`` and
1423 ``eu.siacs.conversations.axolotl`` capabilities, specifically for the given
1424 profile.
1425 @raise BundleUploadFailed: if a bundle upload failed. Forwarded from
1426 :meth:`~omemo.session_manager.SessionManager.create`.
1427 @raise BundleDownloadFailed: if a bundle download failed. Forwarded from
1428 :meth:`~omemo.session_manager.SessionManager.create`.
1429 @raise BundleDeletionFailed: if a bundle deletion failed. Forwarded from
1430 :meth:`~omemo.session_manager.SessionManager.create`.
1431 @raise DeviceListUploadFailed: if a device list upload failed. Forwarded from
1432 :meth:`~omemo.session_manager.SessionManager.create`.
1433 @raise DeviceListDownloadFailed: if a device list download failed. Forwarded from
1434 :meth:`~omemo.session_manager.SessionManager.create`.
1435 """
1436
1437 client = sat.get_client(profile)
1438 xep_0060 = cast(XEP_0060, sat.plugins["XEP-0060"])
1439
1440 storage = StorageImpl(profile)
1441
1442 # TODO: Untested
1443 await oldmemo.migrations.migrate(
1444 LegacyStorageImpl(profile, client.jid.userhost()),
1445 storage,
1446 # TODO: Do we want BLINDLY_TRUSTED or TRUSTED here?
1447 TrustLevel.BLINDLY_TRUSTED.name,
1448 TrustLevel.UNDECIDED.name,
1449 TrustLevel.DISTRUSTED.name,
1450 lambda bare_jid, device_id: download_oldmemo_bundle(
1451 client,
1452 xep_0060,
1453 bare_jid,
1454 device_id
1455 )
1456 )
1457
1458 session_manager = await make_session_manager(sat, profile).create(
1459 [
1460 twomemo.Twomemo(
1461 storage,
1462 max_num_per_session_skipped_keys,
1463 max_num_per_message_skipped_keys
1464 ),
1465 oldmemo.Oldmemo(
1466 storage,
1467 max_num_per_session_skipped_keys,
1468 max_num_per_message_skipped_keys
1469 )
1470 ],
1471 storage,
1472 client.jid.userhost(),
1473 initial_own_label,
1474 TrustLevel.UNDECIDED.value,
1475 signed_pre_key_rotation_period,
1476 pre_key_refill_threshold,
1477 omemo.AsyncFramework.TWISTED
1478 )
1479
1480 # This shouldn't hurt here since we're not running on overly constrainted devices.
1481 # TODO: Consider ensuring data consistency regularly/in response to certain events
1482 await session_manager.ensure_data_consistency()
1483
1484 # TODO: Correct entering/leaving of the history synchronization mode isn't terribly
1485 # important for now, since it only prevents an extremely unlikely race condition of
1486 # multiple devices choosing the same pre key for new sessions while the device was
1487 # offline. I don't believe other clients seriously defend against that race condition
1488 # either. In the long run, it might still be cool to have triggers for when history
1489 # sync starts and ends (MAM, MUC catch-up, etc.) and to react to those triggers.
1490 await session_manager.after_history_sync()
1491
1492 return session_manager
1493
1494
1495 DEFAULT_TRUST_MODEL_PARAM = f"""
1496 <params>
1497 <individual>
1498 <category name="{PARAM_CATEGORY}" label={quoteattr(D_('Security'))}>
1499 <param name="{PARAM_NAME}"
1500 label={quoteattr(D_('OMEMO default trust policy'))}
1501 type="list" security="3">
1502 <option value="atm"
1503 label={quoteattr(D_('Automatic Trust Management (more secure)'))} />
1504 <option value="btbv"
1505 label={quoteattr(D_('Blind Trust Before Verification (more user friendly)'))}
1506 selected="true" />
1507 </param>
1508 </category>
1509 </individual>
1510 </params>
1511 """
1512
1513
1514 class OMEMO:
1515 """
1516 Plugin equipping Libervia with OMEMO capabilities under the (modern)
1517 ``urn:xmpp:omemo:2`` namespace and the (legacy) ``eu.siacs.conversations.axolotl``
1518 namespace. Both versions of the protocol are handled by this plugin and compatibility
1519 between the two is maintained. MUC messages are supported next to one to one messages.
1520 For trust management, the two trust models "ATM" and "BTBV" are supported.
1521 """
1522 NS_TWOMEMO = twomemo.twomemo.NAMESPACE
1523 NS_OLDMEMO = oldmemo.oldmemo.NAMESPACE
1524
1525 # For MUC/MIX message stanzas, the <to/> affix is a MUST
1526 SCE_PROFILE_GROUPCHAT = SCEProfile(
1527 rpad_policy=SCEAffixPolicy.REQUIRED,
1528 time_policy=SCEAffixPolicy.OPTIONAL,
1529 to_policy=SCEAffixPolicy.REQUIRED,
1530 from_policy=SCEAffixPolicy.OPTIONAL,
1531 custom_policies={}
1532 )
1533
1534 # For everything but MUC/MIX message stanzas, the <to/> affix is a MAY
1535 SCE_PROFILE = SCEProfile(
1536 rpad_policy=SCEAffixPolicy.REQUIRED,
1537 time_policy=SCEAffixPolicy.OPTIONAL,
1538 to_policy=SCEAffixPolicy.OPTIONAL,
1539 from_policy=SCEAffixPolicy.OPTIONAL,
1540 custom_policies={}
1541 )
1542
1543 def __init__(self, sat: SAT) -> None:
1544 """
1545 @param sat: The SAT instance.
1546 """
1547
1548 self.__sat = sat
1549
1550 # Add configuration option to choose between manual trust and BTBV as the trust
1551 # model
1552 sat.memory.update_params(DEFAULT_TRUST_MODEL_PARAM)
1553
1554 # Plugins
1555 self.__xep_0045 = cast(Optional[XEP_0045], sat.plugins.get("XEP-0045"))
1556 self.__xep_0334 = cast(XEP_0334, sat.plugins["XEP-0334"])
1557 self.__xep_0359 = cast(Optional[XEP_0359], sat.plugins.get("XEP-0359"))
1558 self.__xep_0420 = cast(XEP_0420, sat.plugins["XEP-0420"])
1559
1560 # In contrast to one to one messages, MUC messages are reflected to the sender.
1561 # Thus, the sender does not add messages to their local message log when sending
1562 # them, but when the reflection is received. This approach does not pair well with
1563 # OMEMO, since for security reasons it is forbidden to encrypt messages for the
1564 # own device. Thus, when the reflection of an OMEMO message is received, it can't
1565 # be decrypted and added to the local message log as usual. To counteract this,
1566 # the plaintext of encrypted messages sent to MUCs are cached in this field, such
1567 # that when the reflection is received, the plaintext can be looked up from the
1568 # cache and added to the local message log.
1569 # TODO: The old plugin expired this cache after some time. I'm not sure that's
1570 # really necessary.
1571 self.__muc_plaintext_cache: Dict[MUCPlaintextCacheKey, bytes] = {}
1572
1573 # Mapping from profile name to corresponding session manager
1574 self.__session_managers: Dict[str, omemo.SessionManager] = {}
1575
1576 # Calls waiting for a specific session manager to be built
1577 self.__session_manager_waiters: Dict[str, List[defer.Deferred]] = {}
1578
1579 # These triggers are used by oldmemo, which doesn't do SCE and only applies to
1580 # messages. Temporarily, until a more fitting trigger for SCE-based encryption is
1581 # added, the message_received trigger is also used for twomemo.
1582 sat.trigger.add(
1583 "message_received",
1584 self._message_received_trigger,
1585 priority=100050
1586 )
1587 sat.trigger.add(
1588 "send_message_data",
1589 self.__send_message_data_trigger,
1590 priority=100050
1591 )
1592
1593 # These triggers are used by twomemo, which does do SCE
1594 sat.trigger.add("send", self.__send_trigger, priority=0)
1595 # TODO: Add new triggers here for freshly received and about-to-be-sent stanzas,
1596 # including IQs.
1597
1598 # Give twomemo a (slightly) higher priority than oldmemo
1599 sat.register_encryption_plugin(self, "TWOMEMO", twomemo.twomemo.NAMESPACE, 101)
1600 sat.register_encryption_plugin(self, "OLDMEMO", oldmemo.oldmemo.NAMESPACE, 100)
1601
1602 xep_0163 = cast(XEP_0163, sat.plugins["XEP-0163"])
1603 xep_0163.add_pep_event(
1604 "TWOMEMO_DEVICES",
1605 TWOMEMO_DEVICE_LIST_NODE,
1606 lambda items_event, profile: defer.ensureDeferred(
1607 self.__on_device_list_update(items_event, profile)
1608 )
1609 )
1610 xep_0163.add_pep_event(
1611 "OLDMEMO_DEVICES",
1612 OLDMEMO_DEVICE_LIST_NODE,
1613 lambda items_event, profile: defer.ensureDeferred(
1614 self.__on_device_list_update(items_event, profile)
1615 )
1616 )
1617
1618 try:
1619 self.__text_commands = cast(TextCommands, sat.plugins[C.TEXT_CMDS])
1620 except KeyError:
1621 log.info(_("Text commands not available"))
1622 else:
1623 self.__text_commands.register_text_commands(self)
1624
1625 def profile_connected( # pylint: disable=invalid-name
1626 self,
1627 client: SatXMPPClient
1628 ) -> None:
1629 """
1630 @param client: The client.
1631 """
1632
1633 defer.ensureDeferred(self.get_session_manager(
1634 cast(str, client.profile)
1635 ))
1636
1637 async def cmd_omemo_reset(
1638 self,
1639 client: SatXMPPClient,
1640 mess_data: MessageData
1641 ) -> Literal[False]:
1642 """Reset all sessions of devices that belong to the recipient of ``mess_data``.
1643
1644 This must only be callable manually by the user. Use this when a session is
1645 apparently broken, i.e. sending and receiving encrypted messages doesn't work and
1646 something being wrong has been confirmed manually with the recipient.
1647
1648 @param client: The client.
1649 @param mess_data: The message data, whose ``to`` attribute will be the bare JID to
1650 reset all sessions with.
1651 @return: The constant value ``False``, indicating to the text commands plugin that
1652 the message is not supposed to be sent.
1653 """
1654
1655 twomemo_requested = \
1656 client.encryption.is_encryption_requested(mess_data, twomemo.twomemo.NAMESPACE)
1657 oldmemo_requested = \
1658 client.encryption.is_encryption_requested(mess_data, oldmemo.oldmemo.NAMESPACE)
1659
1660 if not (twomemo_requested or oldmemo_requested):
1661 self.__text_commands.feed_back(
1662 client,
1663 _("You need to have OMEMO encryption activated to reset the session"),
1664 mess_data
1665 )
1666 return False
1667
1668 bare_jid = mess_data["to"].userhost()
1669
1670 session_manager = await self.get_session_manager(client.profile)
1671 devices = await session_manager.get_device_information(bare_jid)
1672
1673 for device in devices:
1674 log.debug(f"Replacing sessions with device {device}")
1675 await session_manager.replace_sessions(device)
1676
1677 self.__text_commands.feed_back(
1678 client,
1679 _("OMEMO session has been reset"),
1680 mess_data
1681 )
1682
1683 return False
1684
1685 async def get_trust_ui( # pylint: disable=invalid-name
1686 self,
1687 client: SatXMPPClient,
1688 entity: jid.JID
1689 ) -> xml_tools.XMLUI:
1690 """
1691 @param client: The client.
1692 @param entity: The entity whose device trust levels to manage.
1693 @return: An XMLUI instance which opens a form to manage the trust level of all
1694 devices belonging to the entity.
1695 """
1696
1697 if entity.resource:
1698 raise ValueError("A bare JID is expected.")
1699
1700 bare_jids: Set[str]
1701 if self.__xep_0045 is not None and self.__xep_0045.is_joined_room(client, entity):
1702 bare_jids = self.__get_joined_muc_users(client, self.__xep_0045, entity)
1703 else:
1704 bare_jids = { entity.userhost() }
1705
1706 session_manager = await self.get_session_manager(client.profile)
1707
1708 # At least sort the devices by bare JID such that they aren't listed completely
1709 # random
1710 devices = sorted(cast(Set[omemo.DeviceInformation], set()).union(*[
1711 await session_manager.get_device_information(bare_jid)
1712 for bare_jid
1713 in bare_jids
1714 ]), key=lambda device: device.bare_jid)
1715
1716 async def callback(
1717 data: Any,
1718 profile: str
1719 ) -> Dict[Never, Never]:
1720 """
1721 @param data: The XMLUI result produces by the trust UI form.
1722 @param profile: The profile.
1723 @return: An empty dictionary. The type of the return value was chosen
1724 conservatively since the exact options are neither known not needed here.
1725 """
1726
1727 if C.bool(data.get("cancelled", "false")):
1728 return {}
1729
1730 data_form_result = cast(
1731 Dict[str, str],
1732 xml_tools.xmlui_result_2_data_form_result(data)
1733 )
1734
1735 trust_updates: Set[TrustUpdate] = set()
1736
1737 for key, value in data_form_result.items():
1738 if not key.startswith("trust_"):
1739 continue
1740
1741 device = devices[int(key[len("trust_"):])]
1742 trust_level_name = value
1743
1744 if device.trust_level_name != trust_level_name:
1745 await session_manager.set_trust(
1746 device.bare_jid,
1747 device.identity_key,
1748 trust_level_name
1749 )
1750
1751 target_trust: Optional[bool] = None
1752
1753 if TrustLevel(trust_level_name) is TrustLevel.TRUSTED:
1754 target_trust = True
1755 if TrustLevel(trust_level_name) is TrustLevel.DISTRUSTED:
1756 target_trust = False
1757
1758 if target_trust is not None:
1759 trust_updates.add(TrustUpdate(
1760 target_jid=jid.JID(device.bare_jid).userhostJID(),
1761 target_key=device.identity_key,
1762 target_trust=target_trust
1763 ))
1764
1765 # Check whether ATM is enabled and handle everything in case it is
1766 trust_system = cast(str, self.__sat.memory.param_get_a(
1767 PARAM_NAME,
1768 PARAM_CATEGORY,
1769 profile_key=profile
1770 ))
1771
1772 if trust_system == "atm":
1773 if len(trust_updates) > 0:
1774 await manage_trust_message_cache(
1775 client,
1776 session_manager,
1777 frozenset(trust_updates)
1778 )
1779
1780 await send_trust_messages(
1781 client,
1782 session_manager,
1783 frozenset(trust_updates)
1784 )
1785
1786 return {}
1787
1788 submit_id = self.__sat.register_callback(callback, with_data=True, one_shot=True)
1789
1790 result = xml_tools.XMLUI(
1791 panel_type=C.XMLUI_FORM,
1792 title=D_("OMEMO trust management"),
1793 submit_id=submit_id
1794 )
1795 # Casting this to Any, otherwise all calls on the variable cause type errors
1796 # pylint: disable=no-member
1797 trust_ui = cast(Any, result)
1798 trust_ui.addText(D_(
1799 "This is OMEMO trusting system. You'll see below the devices of your"
1800 " contacts, and a list selection to trust them or not. A trusted device"
1801 " can read your messages in plain text, so be sure to only validate"
1802 " devices that you are sure are belonging to your contact. It's better"
1803 " to do this when you are next to your contact and their device, so"
1804 " you can check the \"fingerprint\" (the number next to the device)"
1805 " yourself. Do *not* validate a device if the fingerprint is wrong!"
1806 " Note that manually validating a fingerprint disables any form of automatic"
1807 " trust."
1808 ))
1809
1810 own_device, __ = await session_manager.get_own_device_information()
1811
1812 trust_ui.change_container("label")
1813 trust_ui.addLabel(D_("This device ID"))
1814 trust_ui.addText(str(own_device.device_id))
1815 trust_ui.addLabel(D_("This device's fingerprint"))
1816 trust_ui.addText(" ".join(session_manager.format_identity_key(
1817 own_device.identity_key
1818 )))
1819 trust_ui.addEmpty()
1820 trust_ui.addEmpty()
1821
1822 for index, device in enumerate(devices):
1823 trust_ui.addLabel(D_("Contact"))
1824 trust_ui.addJid(jid.JID(device.bare_jid))
1825 trust_ui.addLabel(D_("Device ID"))
1826 trust_ui.addText(str(device.device_id))
1827 trust_ui.addLabel(D_("Fingerprint"))
1828 trust_ui.addText(" ".join(session_manager.format_identity_key(
1829 device.identity_key
1830 )))
1831 trust_ui.addLabel(D_("Trust this device?"))
1832
1833 current_trust_level = TrustLevel(device.trust_level_name)
1834 avaiable_trust_levels = \
1835 { TrustLevel.DISTRUSTED, TrustLevel.TRUSTED, current_trust_level }
1836
1837 trust_ui.addList(
1838 f"trust_{index}",
1839 options=[ trust_level.name for trust_level in avaiable_trust_levels ],
1840 selected=current_trust_level.name,
1841 styles=[ "inline" ]
1842 )
1843
1844 twomemo_active = dict(device.active).get(twomemo.twomemo.NAMESPACE)
1845 if twomemo_active is None:
1846 trust_ui.addEmpty()
1847 trust_ui.addLabel(D_("(not available for Twomemo)"))
1848 if twomemo_active is False:
1849 trust_ui.addEmpty()
1850 trust_ui.addLabel(D_("(inactive for Twomemo)"))
1851
1852 oldmemo_active = dict(device.active).get(oldmemo.oldmemo.NAMESPACE)
1853 if oldmemo_active is None:
1854 trust_ui.addEmpty()
1855 trust_ui.addLabel(D_("(not available for Oldmemo)"))
1856 if oldmemo_active is False:
1857 trust_ui.addEmpty()
1858 trust_ui.addLabel(D_("(inactive for Oldmemo)"))
1859
1860 trust_ui.addEmpty()
1861 trust_ui.addEmpty()
1862
1863 return result
1864
1865 @staticmethod
1866 def __get_joined_muc_users(
1867 client: SatXMPPClient,
1868 xep_0045: XEP_0045,
1869 room_jid: jid.JID
1870 ) -> Set[str]:
1871 """
1872 @param client: The client.
1873 @param xep_0045: A MUC plugin instance.
1874 @param room_jid: The room JID.
1875 @return: A set containing the bare JIDs of the MUC participants.
1876 @raise InternalError: if the MUC is not joined or the entity information of a
1877 participant isn't available.
1878 """
1879
1880 bare_jids: Set[str] = set()
1881
1882 try:
1883 room = cast(muc.Room, xep_0045.get_room(client, room_jid))
1884 except exceptions.NotFound as e:
1885 raise exceptions.InternalError(
1886 "Participant list of unjoined MUC requested."
1887 ) from e
1888
1889 for user in cast(Dict[str, muc.User], room.roster).values():
1890 entity = cast(Optional[SatXMPPEntity], user.entity)
1891 if entity is None:
1892 raise exceptions.InternalError(
1893 f"Participant list of MUC requested, but the entity information of"
1894 f" the participant {user} is not available."
1895 )
1896
1897 bare_jids.add(entity.jid.userhost())
1898
1899 return bare_jids
1900
1901 async def get_session_manager(self, profile: str) -> omemo.SessionManager:
1902 """
1903 @param profile: The profile to prepare for.
1904 @return: A session manager instance for this profile. Creates a new instance if
1905 none was prepared before.
1906 """
1907
1908 try:
1909 # Try to return the session manager
1910 return self.__session_managers[profile]
1911 except KeyError:
1912 # If a session manager for that profile doesn't exist yet, check whether it is
1913 # currently being built. A session manager being built is signified by the
1914 # profile key existing on __session_manager_waiters.
1915 if profile in self.__session_manager_waiters:
1916 # If the session manager is being built, add ourselves to the waiting
1917 # queue
1918 deferred = defer.Deferred()
1919 self.__session_manager_waiters[profile].append(deferred)
1920 return cast(omemo.SessionManager, await deferred)
1921
1922 # If the session manager is not being built, do so here.
1923 self.__session_manager_waiters[profile] = []
1924
1925 # Build and store the session manager
1926 try:
1927 session_manager = await prepare_for_profile(
1928 self.__sat,
1929 profile,
1930 initial_own_label="Libervia"
1931 )
1932 except Exception as e:
1933 # In case of an error during initalization, notify the waiters accordingly
1934 # and delete them
1935 for waiter in self.__session_manager_waiters[profile]:
1936 waiter.errback(e)
1937 del self.__session_manager_waiters[profile]
1938
1939 # Re-raise the exception
1940 raise
1941
1942 self.__session_managers[profile] = session_manager
1943
1944 # Notify the waiters and delete them
1945 for waiter in self.__session_manager_waiters[profile]:
1946 waiter.callback(session_manager)
1947 del self.__session_manager_waiters[profile]
1948
1949 return session_manager
1950
1951 async def __message_received_trigger_atm(
1952 self,
1953 client: SatXMPPClient,
1954 message_elt: domish.Element,
1955 session_manager: omemo.SessionManager,
1956 sender_device_information: omemo.DeviceInformation,
1957 timestamp: datetime
1958 ) -> None:
1959 """Check a newly decrypted message stanza for ATM content and perform ATM in case.
1960
1961 @param client: The client which received the message.
1962 @param message_elt: The message element. Can be modified.
1963 @param session_manager: The session manager.
1964 @param sender_device_information: Information about the device that sent/encrypted
1965 the message.
1966 @param timestamp: Timestamp extracted from the SCE time affix.
1967 """
1968
1969 trust_message_cache = persistent.LazyPersistentBinaryDict(
1970 "XEP-0384/TM",
1971 client.profile
1972 )
1973
1974 new_cache_entries: Set[TrustMessageCacheEntry] = set()
1975
1976 for trust_message_elt in message_elt.elements(NS_TM, "trust-message"):
1977 assert isinstance(trust_message_elt, domish.Element)
1978
1979 try:
1980 TRUST_MESSAGE_SCHEMA.validate(trust_message_elt.toXml())
1981 except xmlschema.XMLSchemaValidationError as e:
1982 raise exceptions.ParsingError(
1983 "<trust-message/> element doesn't pass schema validation."
1984 ) from e
1985
1986 if trust_message_elt["usage"] != NS_ATM:
1987 # Skip non-ATM trust message
1988 continue
1989
1990 if trust_message_elt["encryption"] != OMEMO.NS_TWOMEMO:
1991 # Skip non-twomemo trust message
1992 continue
1993
1994 for key_owner_elt in trust_message_elt.elements(NS_TM, "key-owner"):
1995 assert isinstance(key_owner_elt, domish.Element)
1996
1997 key_owner_jid = jid.JID(key_owner_elt["jid"]).userhostJID()
1998
1999 for trust_elt in key_owner_elt.elements(NS_TM, "trust"):
2000 assert isinstance(trust_elt, domish.Element)
2001
2002 new_cache_entries.add(TrustMessageCacheEntry(
2003 sender_jid=jid.JID(sender_device_information.bare_jid),
2004 sender_key=sender_device_information.identity_key,
2005 timestamp=timestamp,
2006 trust_update=TrustUpdate(
2007 target_jid=key_owner_jid,
2008 target_key=base64.b64decode(str(trust_elt)),
2009 target_trust=True
2010 )
2011 ))
2012
2013 for distrust_elt in key_owner_elt.elements(NS_TM, "distrust"):
2014 assert isinstance(distrust_elt, domish.Element)
2015
2016 new_cache_entries.add(TrustMessageCacheEntry(
2017 sender_jid=jid.JID(sender_device_information.bare_jid),
2018 sender_key=sender_device_information.identity_key,
2019 timestamp=timestamp,
2020 trust_update=TrustUpdate(
2021 target_jid=key_owner_jid,
2022 target_key=base64.b64decode(str(distrust_elt)),
2023 target_trust=False
2024 )
2025 ))
2026
2027 # Load existing cache entries
2028 existing_cache_entries = cast(
2029 Set[TrustMessageCacheEntry],
2030 await trust_message_cache.get("cache", set())
2031 )
2032
2033 # Discard cache entries by timestamp comparison
2034 existing_by_target = {
2035 (
2036 cache_entry.trust_update.target_jid.userhostJID(),
2037 cache_entry.trust_update.target_key
2038 ): cache_entry
2039 for cache_entry
2040 in existing_cache_entries
2041 }
2042
2043 # Iterate over a copy here, such that new_cache_entries can be modified
2044 for new_cache_entry in set(new_cache_entries):
2045 existing_cache_entry = existing_by_target.get(
2046 (
2047 new_cache_entry.trust_update.target_jid.userhostJID(),
2048 new_cache_entry.trust_update.target_key
2049 ),
2050 None
2051 )
2052
2053 if existing_cache_entry is not None:
2054 if existing_cache_entry.timestamp > new_cache_entry.timestamp:
2055 # If the existing cache entry is newer than the new cache entry,
2056 # discard the new one in favor of the existing one
2057 new_cache_entries.remove(new_cache_entry)
2058 else:
2059 # Otherwise, discard the existing cache entry. This includes the case
2060 # when both cache entries have matching timestamps.
2061 existing_cache_entries.remove(existing_cache_entry)
2062
2063 # If the sending device is trusted, apply the new cache entries
2064 applied_trust_updates: Set[TrustUpdate] = set()
2065
2066 if TrustLevel(sender_device_information.trust_level_name) is TrustLevel.TRUSTED:
2067 # Iterate over a copy such that new_cache_entries can be modified
2068 for cache_entry in set(new_cache_entries):
2069 trust_update = cache_entry.trust_update
2070
2071 trust_level = (
2072 TrustLevel.TRUSTED
2073 if trust_update.target_trust
2074 else TrustLevel.DISTRUSTED
2075 )
2076
2077 await session_manager.set_trust(
2078 trust_update.target_jid.userhost(),
2079 trust_update.target_key,
2080 trust_level.name
2081 )
2082
2083 applied_trust_updates.add(trust_update)
2084
2085 new_cache_entries.remove(cache_entry)
2086
2087 # Store the remaining existing and new cache entries
2088 await trust_message_cache.force(
2089 "cache",
2090 existing_cache_entries | new_cache_entries
2091 )
2092
2093 # If the trust of at least one device was modified, run the ATM cache update logic
2094 if len(applied_trust_updates) > 0:
2095 await manage_trust_message_cache(
2096 client,
2097 session_manager,
2098 frozenset(applied_trust_updates)
2099 )
2100
2101 async def _message_received_trigger(
2102 self,
2103 client: SatXMPPClient,
2104 message_elt: domish.Element,
2105 post_treat: defer.Deferred
2106 ) -> bool:
2107 """
2108 @param client: The client which received the message.
2109 @param message_elt: The message element. Can be modified.
2110 @param post_treat: A deferred which evaluates to a :class:`MessageData` once the
2111 message has fully progressed through the message receiving flow. Can be used
2112 to apply treatments to the fully processed message, like marking it as
2113 encrypted.
2114 @return: Whether to continue the message received flow.
2115 """
2116 if client.is_component:
2117 return True
2118 muc_plaintext_cache_key: Optional[MUCPlaintextCacheKey] = None
2119
2120 sender_jid = jid.JID(message_elt["from"])
2121 feedback_jid: jid.JID
2122
2123 message_type = message_elt.getAttribute("type", C.MESS_TYPE_NORMAL)
2124 is_muc_message = message_type == C.MESS_TYPE_GROUPCHAT
2125 if is_muc_message:
2126 if self.__xep_0045 is None:
2127 log.warning(
2128 "Ignoring MUC message since plugin XEP-0045 is not available."
2129 )
2130 # Can't handle a MUC message without XEP-0045, let the flow continue
2131 # normally
2132 return True
2133
2134 room_jid = feedback_jid = sender_jid.userhostJID()
2135
2136 try:
2137 room = cast(muc.Room, self.__xep_0045.get_room(client, room_jid))
2138 except exceptions.NotFound:
2139 log.warning(
2140 f"Ignoring MUC message from a room that has not been joined:"
2141 f" {room_jid}"
2142 )
2143 # Whatever, let the flow continue
2144 return True
2145
2146 sender_user = cast(Optional[muc.User], room.getUser(sender_jid.resource))
2147 if sender_user is None:
2148 log.warning(
2149 f"Ignoring MUC message from room {room_jid} since the sender's user"
2150 f" wasn't found {sender_jid.resource}"
2151 )
2152 # Whatever, let the flow continue
2153 return True
2154
2155 sender_user_jid = cast(Optional[jid.JID], sender_user.entity)
2156 if sender_user_jid is None:
2157 log.warning(
2158 f"Ignoring MUC message from room {room_jid} since the sender's bare"
2159 f" JID couldn't be found from its user information: {sender_user}"
2160 )
2161 # Whatever, let the flow continue
2162 return True
2163
2164 sender_jid = sender_user_jid
2165
2166 message_uid: Optional[str] = None
2167 if self.__xep_0359 is not None:
2168 message_uid = self.__xep_0359.get_origin_id(message_elt)
2169 if message_uid is None:
2170 message_uid = message_elt.getAttribute("id")
2171 if message_uid is not None:
2172 muc_plaintext_cache_key = MUCPlaintextCacheKey(
2173 client,
2174 room_jid,
2175 message_uid
2176 )
2177 else:
2178 # I'm not sure why this check is required, this code is copied from the old
2179 # plugin.
2180 if sender_jid.userhostJID() == client.jid.userhostJID():
2181 try:
2182 feedback_jid = jid.JID(message_elt["to"])
2183 except KeyError:
2184 feedback_jid = client.server_jid
2185 else:
2186 feedback_jid = sender_jid
2187
2188 sender_bare_jid = sender_jid.userhost()
2189
2190 message: Optional[omemo.Message] = None
2191 encrypted_elt: Optional[domish.Element] = None
2192
2193 twomemo_encrypted_elt = cast(Optional[domish.Element], next(
2194 message_elt.elements(twomemo.twomemo.NAMESPACE, "encrypted"),
2195 None
2196 ))
2197
2198 oldmemo_encrypted_elt = cast(Optional[domish.Element], next(
2199 message_elt.elements(oldmemo.oldmemo.NAMESPACE, "encrypted"),
2200 None
2201 ))
2202
2203 try:
2204 session_manager = await self.get_session_manager(cast(str, client.profile))
2205 except Exception as e:
2206 log.error(f"error while preparing profile for {client.profile}: {e}")
2207 # we don't want to block the workflow
2208 return True
2209
2210 if twomemo_encrypted_elt is not None:
2211 try:
2212 message = twomemo.etree.parse_message(
2213 xml_tools.domish_elt_2_et_elt(twomemo_encrypted_elt),
2214 sender_bare_jid
2215 )
2216 except (ValueError, XMLSchemaValidationError):
2217 log.warning(
2218 f"Ingoring malformed encrypted message for namespace"
2219 f" {twomemo.twomemo.NAMESPACE}: {twomemo_encrypted_elt.toXml()}"
2220 )
2221 else:
2222 encrypted_elt = twomemo_encrypted_elt
2223
2224 if oldmemo_encrypted_elt is not None:
2225 try:
2226 message = await oldmemo.etree.parse_message(
2227 xml_tools.domish_elt_2_et_elt(oldmemo_encrypted_elt),
2228 sender_bare_jid,
2229 client.jid.userhost(),
2230 session_manager
2231 )
2232 except (ValueError, XMLSchemaValidationError):
2233 log.warning(
2234 f"Ingoring malformed encrypted message for namespace"
2235 f" {oldmemo.oldmemo.NAMESPACE}: {oldmemo_encrypted_elt.toXml()}"
2236 )
2237 except omemo.SenderNotFound:
2238 log.warning(
2239 f"Ingoring encrypted message for namespace"
2240 f" {oldmemo.oldmemo.NAMESPACE} by unknown sender:"
2241 f" {oldmemo_encrypted_elt.toXml()}"
2242 )
2243 else:
2244 encrypted_elt = oldmemo_encrypted_elt
2245
2246 if message is None or encrypted_elt is None:
2247 # None of our business, let the flow continue
2248 return True
2249
2250 message_elt.children.remove(encrypted_elt)
2251
2252 log.debug(
2253 f"{message.namespace} message of type {message_type} received from"
2254 f" {sender_bare_jid}"
2255 )
2256
2257 plaintext: Optional[bytes]
2258 device_information: omemo.DeviceInformation
2259
2260 if (
2261 muc_plaintext_cache_key is not None
2262 and muc_plaintext_cache_key in self.__muc_plaintext_cache
2263 ):
2264 # Use the cached plaintext
2265 plaintext = self.__muc_plaintext_cache.pop(muc_plaintext_cache_key)
2266
2267 # Since this message was sent by us, use the own device information here
2268 device_information, __ = await session_manager.get_own_device_information()
2269 else:
2270 try:
2271 plaintext, device_information, __ = await session_manager.decrypt(message)
2272 except omemo.MessageNotForUs:
2273 # The difference between this being a debug or a warning is whether there
2274 # is a body included in the message. Without a body, we can assume that
2275 # it's an empty OMEMO message used for protocol stability reasons, which
2276 # is not expected to be sent to all devices of all recipients. If a body
2277 # is included, we can assume that the message carries content and we
2278 # missed out on something.
2279 if len(list(message_elt.elements(C.NS_CLIENT, "body"))) > 0:
2280 client.feedback(
2281 feedback_jid,
2282 D_(
2283 f"An OMEMO message from {sender_jid.full()} has not been"
2284 f" encrypted for our device, we can't decrypt it."
2285 ),
2286 { C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR }
2287 )
2288 log.warning("Message not encrypted for us.")
2289 else:
2290 log.debug("Message not encrypted for us.")
2291
2292 # No point in further processing this message.
2293 return False
2294 except Exception as e:
2295 log.warning(_("Can't decrypt message: {reason}\n{xml}").format(
2296 reason=e,
2297 xml=message_elt.toXml()
2298 ))
2299 client.feedback(
2300 feedback_jid,
2301 D_(
2302 f"An OMEMO message from {sender_jid.full()} can't be decrypted:"
2303 f" {e}"
2304 ),
2305 { C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR }
2306 )
2307 # No point in further processing this message
2308 return False
2309
2310 affix_values: Optional[SCEAffixValues] = None
2311
2312 if message.namespace == twomemo.twomemo.NAMESPACE:
2313 if plaintext is not None:
2314 # XEP_0420.unpack_stanza handles the whole unpacking, including the
2315 # relevant modifications to the element
2316 sce_profile = \
2317 OMEMO.SCE_PROFILE_GROUPCHAT if is_muc_message else OMEMO.SCE_PROFILE
2318 try:
2319 affix_values = self.__xep_0420.unpack_stanza(
2320 sce_profile,
2321 message_elt,
2322 plaintext
2323 )
2324 except Exception as e:
2325 log.warning(D_(
2326 f"Error unpacking SCE-encrypted message: {e}\n{plaintext}"
2327 ))
2328 client.feedback(
2329 feedback_jid,
2330 D_(
2331 f"An OMEMO message from {sender_jid.full()} was rejected:"
2332 f" {e}"
2333 ),
2334 { C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR }
2335 )
2336 # No point in further processing this message
2337 return False
2338 else:
2339 if affix_values.timestamp is not None:
2340 # TODO: affix_values.timestamp contains the timestamp included in
2341 # the encrypted element here. The XEP says it SHOULD be displayed
2342 # with the plaintext by clients.
2343 pass
2344
2345 if message.namespace == oldmemo.oldmemo.NAMESPACE:
2346 # Remove all body elements from the original element, since those act as
2347 # fallbacks in case the encryption protocol is not supported
2348 for child in message_elt.elements():
2349 if child.name == "body":
2350 message_elt.children.remove(child)
2351
2352 if plaintext is not None:
2353 # Add the decrypted body
2354 message_elt.addElement("body", content=plaintext.decode("utf-8"))
2355
2356 # Mark the message as trusted or untrusted. Undecided counts as untrusted here.
2357 trust_level = \
2358 await session_manager._evaluate_custom_trust_level(device_information)
2359
2360 if trust_level is omemo.TrustLevel.TRUSTED:
2361 post_treat.addCallback(client.encryption.mark_as_trusted)
2362 else:
2363 post_treat.addCallback(client.encryption.mark_as_untrusted)
2364
2365 # Mark the message as originally encrypted
2366 post_treat.addCallback(
2367 client.encryption.mark_as_encrypted,
2368 namespace=message.namespace
2369 )
2370
2371 # Handle potential ATM trust updates
2372 if affix_values is not None and affix_values.timestamp is not None:
2373 await self.__message_received_trigger_atm(
2374 client,
2375 message_elt,
2376 session_manager,
2377 device_information,
2378 affix_values.timestamp
2379 )
2380
2381 # Message processed successfully, continue with the flow
2382 return True
2383
2384 async def __send_trigger(self, client: SatXMPPClient, stanza: domish.Element) -> bool:
2385 """
2386 @param client: The client sending this message.
2387 @param stanza: The stanza that is about to be sent. Can be modified.
2388 @return: Whether the send message flow should continue or not.
2389 """
2390 # SCE is only applicable to message and IQ stanzas
2391 # FIXME: temporary disabling IQ stanza encryption
2392 if stanza.name not in { "message" }: # , "iq" }:
2393 return True
2394
2395 # Get the intended recipient
2396 recipient = stanza.getAttribute("to", None)
2397 if recipient is None:
2398 if stanza.name == "message":
2399 # Message stanzas must have a recipient
2400 raise exceptions.InternalError(
2401 f"Message without recipient encountered. Blocking further processing"
2402 f" to avoid leaking plaintext data: {stanza.toXml()}"
2403 )
2404
2405 # IQs without a recipient are a thing, I believe those simply target the
2406 # server and are thus not eligible for e2ee anyway.
2407 return True
2408
2409 # Parse the JID
2410 recipient_bare_jid = jid.JID(recipient).userhostJID()
2411
2412 # Check whether encryption with twomemo is requested
2413 encryption = client.encryption.getSession(recipient_bare_jid)
2414
2415 if encryption is None:
2416 # Encryption is not requested for this recipient
2417 return True
2418
2419 if encryption["plugin"].namespace != twomemo.twomemo.NAMESPACE:
2420 # Encryption is requested for this recipient, but not with twomemo
2421 return True
2422
2423 # All pre-checks done, we can start encrypting!
2424 await self.encrypt(
2425 client,
2426 twomemo.twomemo.NAMESPACE,
2427 stanza,
2428 recipient_bare_jid,
2429 stanza.getAttribute("type", C.MESS_TYPE_NORMAL) == C.MESS_TYPE_GROUPCHAT,
2430 stanza.getAttribute("id", None)
2431 )
2432
2433 # Add a store hint if this is a message stanza
2434 if stanza.name == "message":
2435 self.__xep_0334.add_hint_elements(stanza, [ "store" ])
2436
2437 # Let the flow continue.
2438 return True
2439
2440 async def __send_message_data_trigger(
2441 self,
2442 client: SatXMPPClient,
2443 mess_data: MessageData
2444 ) -> None:
2445 """
2446 @param client: The client sending this message.
2447 @param mess_data: The message data that is about to be sent. Can be modified.
2448 """
2449
2450 # Check whether encryption is requested for this message
2451 try:
2452 namespace = mess_data[C.MESS_KEY_ENCRYPTION]["plugin"].namespace
2453 except KeyError:
2454 return
2455
2456 # If encryption is requested, check whether it's oldmemo
2457 if namespace != oldmemo.oldmemo.NAMESPACE:
2458 return
2459
2460 # All pre-checks done, we can start encrypting!
2461 stanza = mess_data["xml"]
2462 recipient_jid = mess_data["to"]
2463 is_muc_message = mess_data["type"] == C.MESS_TYPE_GROUPCHAT
2464 stanza_id = mess_data["uid"]
2465
2466 await self.encrypt(
2467 client,
2468 oldmemo.oldmemo.NAMESPACE,
2469 stanza,
2470 recipient_jid,
2471 is_muc_message,
2472 stanza_id
2473 )
2474
2475 # Add a store hint
2476 self.__xep_0334.add_hint_elements(stanza, [ "store" ])
2477
2478 async def encrypt(
2479 self,
2480 client: SatXMPPClient,
2481 namespace: Literal["urn:xmpp:omemo:2", "eu.siacs.conversations.axolotl"],
2482 stanza: domish.Element,
2483 recipient_jids: Union[jid.JID, Set[jid.JID]],
2484 is_muc_message: bool,
2485 stanza_id: Optional[str]
2486 ) -> None:
2487 """
2488 @param client: The client.
2489 @param namespace: The namespace of the OMEMO version to use.
2490 @param stanza: The stanza. Twomemo will encrypt the whole stanza using SCE,
2491 oldmemo will encrypt only the body. The stanza is modified by this call.
2492 @param recipient_jid: The JID of the recipients.
2493 Can be a bare (aka "userhost") JIDs but doesn't have to.
2494 A single JID can be used.
2495 @param is_muc_message: Whether the stanza is a message stanza to a MUC room.
2496 @param stanza_id: The id of this stanza. Especially relevant for message stanzas
2497 to MUC rooms such that the outgoing plaintext can be cached for MUC message
2498 reflection handling.
2499
2500 @warning: The calling code MUST take care of adding the store message processing
2501 hint to the stanza if applicable! This can be done before or after this call,
2502 the order doesn't matter.
2503 """
2504 if isinstance(recipient_jids, jid.JID):
2505 recipient_jids = {recipient_jids}
2506 if not recipient_jids:
2507 raise exceptions.InternalError("At least one JID must be specified")
2508 recipient_jid = next(iter(recipient_jids))
2509
2510 muc_plaintext_cache_key: Optional[MUCPlaintextCacheKey] = None
2511
2512 recipient_bare_jids: Set[str]
2513 feedback_jid: jid.JID
2514
2515 if is_muc_message:
2516 if len(recipient_jids) != 1:
2517 raise exceptions.InternalError(
2518 'Only one JID can be set when "is_muc_message" is set'
2519 )
2520 if self.__xep_0045 is None:
2521 raise exceptions.InternalError(
2522 "Encryption of MUC message requested, but plugin XEP-0045 is not"
2523 " available."
2524 )
2525
2526 if stanza_id is None:
2527 raise exceptions.InternalError(
2528 "Encryption of MUC message requested, but stanza id not available."
2529 )
2530
2531 room_jid = feedback_jid = recipient_jid.userhostJID()
2532
2533 recipient_bare_jids = self.__get_joined_muc_users(
2534 client,
2535 self.__xep_0045,
2536 room_jid
2537 )
2538
2539 muc_plaintext_cache_key = MUCPlaintextCacheKey(
2540 client=client,
2541 room_jid=room_jid,
2542 message_uid=stanza_id
2543 )
2544 else:
2545 recipient_bare_jids = {r.userhost() for r in recipient_jids}
2546 feedback_jid = recipient_jid.userhostJID()
2547
2548 log.debug(
2549 f"Intercepting message that is to be encrypted by {namespace} for"
2550 f" {recipient_bare_jids}"
2551 )
2552
2553 def prepare_stanza() -> Optional[bytes]:
2554 """Prepares the stanza for encryption.
2555
2556 Does so by removing all parts that are not supposed to be sent in plain. Also
2557 extracts/prepares the plaintext to encrypt.
2558
2559 @return: The plaintext to encrypt. Returns ``None`` in case body-only
2560 encryption is requested and no body was found. The function should
2561 gracefully return in that case, i.e. it's not a critical error that should
2562 abort the message sending flow.
2563 """
2564
2565 if namespace == twomemo.twomemo.NAMESPACE:
2566 return self.__xep_0420.pack_stanza(
2567 OMEMO.SCE_PROFILE_GROUPCHAT if is_muc_message else OMEMO.SCE_PROFILE,
2568 stanza
2569 )
2570
2571 if namespace == oldmemo.oldmemo.NAMESPACE:
2572 plaintext: Optional[bytes] = None
2573
2574 for child in stanza.elements():
2575 if child.name == "body" and plaintext is None:
2576 plaintext = str(child).encode("utf-8")
2577
2578 # Any other sensitive elements to remove here?
2579 if child.name in { "body", "html" }:
2580 stanza.children.remove(child)
2581
2582 if plaintext is None:
2583 log.warning(
2584 "No body found in intercepted message to be encrypted with"
2585 " oldmemo."
2586 )
2587
2588 return plaintext
2589
2590 return assert_never(namespace)
2591
2592 # The stanza/plaintext preparation was moved into its own little function for type
2593 # safety reasons.
2594 plaintext = prepare_stanza()
2595 if plaintext is None:
2596 return
2597
2598 log.debug(f"Plaintext to encrypt: {plaintext}")
2599
2600 session_manager = await self.get_session_manager(client.profile)
2601
2602 try:
2603 messages, encryption_errors = await session_manager.encrypt(
2604 frozenset(recipient_bare_jids),
2605 { namespace: plaintext },
2606 backend_priority_order=[ namespace ],
2607 identifier=feedback_jid.userhost()
2608 )
2609 except Exception as e:
2610 msg = _(
2611 # pylint: disable=consider-using-f-string
2612 "Can't encrypt message for {entities}: {reason}".format(
2613 entities=', '.join(recipient_bare_jids),
2614 reason=e
2615 )
2616 )
2617 log.warning(msg)
2618 client.feedback(feedback_jid, msg, {
2619 C.MESS_EXTRA_INFO: C.EXTRA_INFO_ENCR_ERR
2620 })
2621 raise e
2622
2623 if len(encryption_errors) > 0:
2624 log.warning(
2625 f"Ignored the following non-critical encryption errors:"
2626 f" {encryption_errors}"
2627 )
2628
2629 encrypted_errors_stringified = ", ".join([
2630 f"device {err.device_id} of {err.bare_jid} under namespace"
2631 f" {err.namespace}"
2632 for err
2633 in encryption_errors
2634 ])
2635
2636 client.feedback(
2637 feedback_jid,
2638 D_(
2639 "There were non-critical errors during encryption resulting in some"
2640 " of your destinees' devices potentially not receiving the message."
2641 " This happens when the encryption data/key material of a device is"
2642 " incomplete or broken, which shouldn't happen for actively used"
2643 " devices, and can usually be ignored. The following devices are"
2644 f" affected: {encrypted_errors_stringified}."
2645 )
2646 )
2647
2648 message = next(message for message in messages if message.namespace == namespace)
2649
2650 if namespace == twomemo.twomemo.NAMESPACE:
2651 # Add the encrypted element
2652 stanza.addChild(xml_tools.et_elt_2_domish_elt(
2653 twomemo.etree.serialize_message(message)
2654 ))
2655
2656 if namespace == oldmemo.oldmemo.NAMESPACE:
2657 # Add the encrypted element
2658 stanza.addChild(xml_tools.et_elt_2_domish_elt(
2659 oldmemo.etree.serialize_message(message)
2660 ))
2661
2662 if muc_plaintext_cache_key is not None:
2663 self.__muc_plaintext_cache[muc_plaintext_cache_key] = plaintext
2664
2665 async def __on_device_list_update(
2666 self,
2667 items_event: pubsub.ItemsEvent,
2668 profile: str
2669 ) -> None:
2670 """Handle device list updates fired by PEP.
2671
2672 @param items_event: The event.
2673 @param profile: The profile this event belongs to.
2674 """
2675
2676 sender = cast(jid.JID, items_event.sender)
2677 items = cast(List[domish.Element], items_event.items)
2678
2679 if len(items) > 1:
2680 log.warning("Ignoring device list update with more than one element.")
2681 return
2682
2683 item = next(iter(items), None)
2684 if item is None:
2685 log.debug("Ignoring empty device list update.")
2686 return
2687
2688 item_elt = xml_tools.domish_elt_2_et_elt(item)
2689
2690 device_list: Dict[int, Optional[str]] = {}
2691 namespace: Optional[str] = None
2692
2693 list_elt = item_elt.find(f"{{{twomemo.twomemo.NAMESPACE}}}devices")
2694 if list_elt is not None:
2695 try:
2696 device_list = twomemo.etree.parse_device_list(list_elt)
2697 except XMLSchemaValidationError:
2698 pass
2699 else:
2700 namespace = twomemo.twomemo.NAMESPACE
2701
2702 list_elt = item_elt.find(f"{{{oldmemo.oldmemo.NAMESPACE}}}list")
2703 if list_elt is not None:
2704 try:
2705 device_list = oldmemo.etree.parse_device_list(list_elt)
2706 except XMLSchemaValidationError:
2707 pass
2708 else:
2709 namespace = oldmemo.oldmemo.NAMESPACE
2710
2711 if namespace is None:
2712 log.warning(
2713 f"Malformed device list update item:"
2714 f" {ET.tostring(item_elt, encoding='unicode')}"
2715 )
2716 return
2717
2718 session_manager = await self.get_session_manager(profile)
2719
2720 await session_manager.update_device_list(
2721 namespace,
2722 sender.userhost(),
2723 device_list
2724 )