Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_xep_0384.py @ 4270:0d7bb4df2343
Reformatted code base using black.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 19 Jun 2024 18:44:57 +0200 |
parents | e11b13418ba6 |
children | 23842a63ea00 |
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_xep_0384.py Tue Jun 18 12:06:45 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0384.py Wed Jun 19 18:44:57 2024 +0200 @@ -21,8 +21,20 @@ import enum import logging import time -from typing import \ - Any, Dict, FrozenSet, Iterable, List, Literal, NamedTuple, Optional, Set, Type, Union, cast +from typing import ( + Any, + Dict, + FrozenSet, + Iterable, + List, + Literal, + NamedTuple, + Optional, + Set, + Type, + Union, + cast, +) import uuid import xml.etree.ElementTree as ET from xml.sax.saxutils import quoteattr @@ -45,8 +57,12 @@ from libervia.backend.plugins.plugin_xep_0163 import XEP_0163 from libervia.backend.plugins.plugin_xep_0334 import XEP_0334 from libervia.backend.plugins.plugin_xep_0359 import XEP_0359 -from libervia.backend.plugins.plugin_xep_0420 import \ - XEP_0420, SCEAffixPolicy, SCEAffixValues, SCEProfile +from libervia.backend.plugins.plugin_xep_0420 import ( + XEP_0420, + SCEAffixPolicy, + SCEAffixValues, + SCEProfile, +) from libervia.backend.tools import xml_tools from twisted.internet import defer from twisted.words.protocols.jabber import error, jid @@ -73,10 +89,7 @@ ) from import_error -__all__ = [ # pylint: disable=unused-variable - "PLUGIN_INFO", - "OMEMO" -] +__all__ = ["PLUGIN_INFO", "OMEMO"] # pylint: disable=unused-variable log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call] @@ -85,9 +98,9 @@ C.PI_NAME: "OMEMO", C.PI_IMPORT_NAME: "XEP-0384", C.PI_TYPE: "SEC", - C.PI_PROTOCOLS: [ "XEP-0384" ], - C.PI_DEPENDENCIES: [ "XEP-0163", "XEP-0280", "XEP-0334", "XEP-0060", "XEP-0420" ], - C.PI_RECOMMENDATIONS: [ "XEP-0045", "XEP-0359", C.TEXT_CMDS ], + C.PI_PROTOCOLS: ["XEP-0384"], + C.PI_DEPENDENCIES: ["XEP-0163", "XEP-0280", "XEP-0334", "XEP-0060", "XEP-0420"], + C.PI_RECOMMENDATIONS: ["XEP-0045", "XEP-0359", C.TEXT_CMDS], C.PI_MAIN: "OMEMO", C.PI_HANDLER: "no", C.PI_DESCRIPTION: _("""Implementation of OMEMO"""), @@ -219,8 +232,7 @@ return None return oldmemo.migrations.OwnData( - own_bare_jid=self.__own_bare_jid, - own_device_id=own_device_id + own_bare_jid=self.__own_bare_jid, own_device_id=own_device_id ) async def deleteOwnData(self) -> None: @@ -232,7 +244,7 @@ async def loadState(self) -> Optional[oldmemo.migrations.State]: return cast( Optional[oldmemo.migrations.State], - await self.__storage.get(LegacyStorageImpl.KEY_STATE, None) + await self.__storage.get(LegacyStorageImpl.KEY_STATE, None), ) async def deleteState(self) -> None: @@ -242,19 +254,16 @@ pass async def loadSession( - self, - bare_jid: str, - device_id: int + self, bare_jid: str, device_id: int ) -> Optional[oldmemo.migrations.Session]: - key = "\n".join([ LegacyStorageImpl.KEY_SESSION, bare_jid, str(device_id) ]) + key = "\n".join([LegacyStorageImpl.KEY_SESSION, bare_jid, str(device_id)]) return cast( - Optional[oldmemo.migrations.Session], - await self.__storage.get(key, None) + Optional[oldmemo.migrations.Session], await self.__storage.get(key, None) ) async def deleteSession(self, bare_jid: str, device_id: int) -> None: - key = "\n".join([ LegacyStorageImpl.KEY_SESSION, bare_jid, str(device_id) ]) + key = "\n".join([LegacyStorageImpl.KEY_SESSION, bare_jid, str(device_id)]) try: await self.__storage.remove(key) @@ -262,23 +271,17 @@ pass async def loadActiveDevices(self, bare_jid: str) -> Optional[List[int]]: - key = "\n".join([ LegacyStorageImpl.KEY_ACTIVE_DEVICES, bare_jid ]) - - return cast( - Optional[List[int]], - await self.__storage.get(key, None) - ) + key = "\n".join([LegacyStorageImpl.KEY_ACTIVE_DEVICES, bare_jid]) + + return cast(Optional[List[int]], await self.__storage.get(key, None)) async def loadInactiveDevices(self, bare_jid: str) -> Optional[Dict[int, int]]: - key = "\n".join([ LegacyStorageImpl.KEY_INACTIVE_DEVICES, bare_jid ]) - - return cast( - Optional[Dict[int, int]], - await self.__storage.get(key, None) - ) + key = "\n".join([LegacyStorageImpl.KEY_INACTIVE_DEVICES, bare_jid]) + + return cast(Optional[Dict[int, int]], await self.__storage.get(key, None)) async def deleteActiveDevices(self, bare_jid: str) -> None: - key = "\n".join([ LegacyStorageImpl.KEY_ACTIVE_DEVICES, bare_jid ]) + key = "\n".join([LegacyStorageImpl.KEY_ACTIVE_DEVICES, bare_jid]) try: await self.__storage.remove(key) @@ -286,7 +289,7 @@ pass async def deleteInactiveDevices(self, bare_jid: str) -> None: - key = "\n".join([ LegacyStorageImpl.KEY_INACTIVE_DEVICES, bare_jid ]) + key = "\n".join([LegacyStorageImpl.KEY_INACTIVE_DEVICES, bare_jid]) try: await self.__storage.remove(key) @@ -294,19 +297,16 @@ pass async def loadTrust( - self, - bare_jid: str, - device_id: int + self, bare_jid: str, device_id: int ) -> Optional[oldmemo.migrations.Trust]: - key = "\n".join([ LegacyStorageImpl.KEY_TRUST, bare_jid, str(device_id) ]) + key = "\n".join([LegacyStorageImpl.KEY_TRUST, bare_jid, str(device_id)]) return cast( - Optional[oldmemo.migrations.Trust], - await self.__storage.get(key, None) + Optional[oldmemo.migrations.Trust], await self.__storage.get(key, None) ) async def deleteTrust(self, bare_jid: str, device_id: int) -> None: - key = "\n".join([ LegacyStorageImpl.KEY_TRUST, bare_jid, str(device_id) ]) + key = "\n".join([LegacyStorageImpl.KEY_TRUST, bare_jid, str(device_id)]) try: await self.__storage.remove(key) @@ -326,10 +326,7 @@ async def download_oldmemo_bundle( - client: SatXMPPClient, - xep_0060: XEP_0060, - bare_jid: str, - device_id: int + client: SatXMPPClient, xep_0060: XEP_0060, bare_jid: str, device_id: int ) -> oldmemo.oldmemo.BundleImpl: """Download the oldmemo bundle corresponding to a specific device. @@ -361,8 +358,9 @@ f" {namespace}: Unexpected number of items retrieved: {len(items)}." ) - element = \ - next(iter(xml_tools.domish_elt_2_et_elt(cast(domish.Element, items[0]))), None) + element = next( + iter(xml_tools.domish_elt_2_et_elt(cast(domish.Element, items[0]))), None + ) if element is None: raise omemo.BundleDownloadFailed( f"Bundle download failed for {bare_jid}: {device_id} under namespace" @@ -385,7 +383,8 @@ NS_ATM: Final = "urn:xmpp:atm:1" -TRUST_MESSAGE_SCHEMA = xmlschema.XMLSchema("""<?xml version='1.0' encoding='UTF-8'?> +TRUST_MESSAGE_SCHEMA = xmlschema.XMLSchema( + """<?xml version='1.0' encoding='UTF-8'?> <xs:schema xmlns:xs='http://www.w3.org/2001/XMLSchema' targetNamespace='urn:xmpp:tm:1' xmlns='urn:xmpp:tm:1' @@ -413,7 +412,8 @@ </xs:complexType> </xs:element> </xs:schema> -""") +""" +) # This is compatible with omemo:2's SCE profile @@ -422,7 +422,7 @@ time_policy=SCEAffixPolicy.REQUIRED, to_policy=SCEAffixPolicy.OPTIONAL, from_policy=SCEAffixPolicy.OPTIONAL, - custom_policies={} + custom_policies={}, ) @@ -470,7 +470,7 @@ "sender_jid": self.sender_jid.full(), "sender_key": self.sender_key.hex(), "timestamp": self.timestamp.isoformat(), - "trust_update": self.trust_update.to_dict() + "trust_update": self.trust_update.to_dict(), } return data @@ -499,7 +499,7 @@ async def manage_trust_message_cache( client: SatXMPPClient, session_manager: omemo.SessionManager, - applied_trust_updates: FrozenSet[TrustUpdate] + applied_trust_updates: FrozenSet[TrustUpdate], ) -> None: """Manage the ATM trust message cache after trust updates have been applied. @@ -510,8 +510,7 @@ """ trust_message_cache = persistent.LazyPersistentBinaryDict( - "XEP-0384/TM", - client.profile + "XEP-0384/TM", client.profile ) # Load cache entries @@ -524,16 +523,14 @@ cache_entries_by_target = { ( cache_entry.trust_update.target_jid.userhostJID(), - cache_entry.trust_update.target_key + cache_entry.trust_update.target_key, ): cache_entry - for cache_entry - in cache_entries + for cache_entry in cache_entries } for trust_update in applied_trust_updates: cache_entry = cache_entries_by_target.get( - (trust_update.target_jid.userhostJID(), trust_update.target_key), - None + (trust_update.target_jid.userhostJID(), trust_update.target_key), None ) if cache_entry is not None: @@ -561,7 +558,7 @@ await session_manager.set_trust( cache_entry.trust_update.target_jid.userhost(), cache_entry.trust_update.target_key, - trust_level.name + trust_level.name, ) # Track the fact that this trust update has been applied @@ -571,10 +568,7 @@ cache_entries.remove(cache_entry) # Store the updated cache entries - await trust_message_cache.force( - "cache", - [tm.to_dict() for tm in cache_entries] - ) + await trust_message_cache.force("cache", [tm.to_dict() for tm in cache_entries]) # TODO: Notify the user ("feedback") about automatically updated trust? @@ -582,15 +576,12 @@ # If any trust has been updated, recursively perform another run of cache # management await manage_trust_message_cache( - client, - session_manager, - frozenset(new_trust_updates) + client, session_manager, frozenset(new_trust_updates) ) async def get_trust_as_trust_updates( - session_manager: omemo.SessionManager, - target_jid: jid.JID + session_manager: omemo.SessionManager, target_jid: jid.JID ) -> FrozenSet[TrustUpdate]: """Get the trust status of all known keys of a JID as trust updates for use with ATM. @@ -617,11 +608,13 @@ # Skip devices that are not explicitly trusted or distrusted continue - trust_updates.add(TrustUpdate( - target_jid=target_jid.userhostJID(), - target_key=device.identity_key, - target_trust=target_trust - )) + trust_updates.add( + TrustUpdate( + target_jid=target_jid.userhostJID(), + target_key=device.identity_key, + target_trust=target_trust, + ) + ) return frozenset(trust_updates) @@ -629,7 +622,7 @@ async def send_trust_messages( client: SatXMPPClient, session_manager: omemo.SessionManager, - applied_trust_updates: FrozenSet[TrustUpdate] + applied_trust_updates: FrozenSet[TrustUpdate], ) -> None: """Send information about updated trust to peers via ATM (XEP-0450). @@ -647,20 +640,21 @@ own_trust_updates = await get_trust_as_trust_updates(session_manager, own_jid) # JIDs of which at least one device's trust has been updated - updated_jids = frozenset({ - trust_update.target_jid.userhostJID() - for trust_update - in applied_trust_updates - }) + updated_jids = frozenset( + {trust_update.target_jid.userhostJID() for trust_update in applied_trust_updates} + ) trust_messages: Set[PartialTrustMessage] = set() for updated_jid in updated_jids: # Get the trust updates for that JID - trust_updates = frozenset({ - trust_update for trust_update in applied_trust_updates - if trust_update.target_jid.userhostJID() == updated_jid - }) + trust_updates = frozenset( + { + trust_update + for trust_update in applied_trust_updates + if trust_update.target_jid.userhostJID() == updated_jid + } + ) if updated_jid == own_jid: # If the own JID is updated, _all_ peers have to be notified @@ -668,66 +662,81 @@ # and storage keys until I've added public API to get a list of peers to # python-omemo. storage: omemo.Storage = getattr(session_manager, "_SessionManager__storage") - peer_jids = frozenset({ - jid.JID(bare_jid).userhostJID() for bare_jid in (await storage.load_list( - f"/{OMEMO.NS_TWOMEMO}/bare_jids", - str - )).maybe([]) - }) + peer_jids = frozenset( + { + jid.JID(bare_jid).userhostJID() + for bare_jid in ( + await storage.load_list(f"/{OMEMO.NS_TWOMEMO}/bare_jids", str) + ).maybe([]) + } + ) if len(peer_jids) == 0: # If there are no peers to notify, notify our other devices about the # changes directly - trust_messages.add(PartialTrustMessage( - recipient_jid=own_jid, - updated_jid=own_jid, - trust_updates=trust_updates - )) + trust_messages.add( + PartialTrustMessage( + recipient_jid=own_jid, + updated_jid=own_jid, + trust_updates=trust_updates, + ) + ) else: # Otherwise, notify all peers about the changes in trust and let carbons # handle the copy to our own JID for peer_jid in peer_jids: - trust_messages.add(PartialTrustMessage( - recipient_jid=peer_jid, - updated_jid=own_jid, - trust_updates=trust_updates - )) + trust_messages.add( + PartialTrustMessage( + recipient_jid=peer_jid, + updated_jid=own_jid, + trust_updates=trust_updates, + ) + ) # Also send full trust information about _every_ peer to our newly # trusted devices - peer_trust_updates = \ - await get_trust_as_trust_updates(session_manager, peer_jid) - - trust_messages.add(PartialTrustMessage( - recipient_jid=own_jid, - updated_jid=peer_jid, - trust_updates=peer_trust_updates - )) + peer_trust_updates = await get_trust_as_trust_updates( + session_manager, peer_jid + ) + + trust_messages.add( + PartialTrustMessage( + recipient_jid=own_jid, + updated_jid=peer_jid, + trust_updates=peer_trust_updates, + ) + ) # Send information about our own devices to our newly trusted devices - trust_messages.add(PartialTrustMessage( - recipient_jid=own_jid, - updated_jid=own_jid, - trust_updates=own_trust_updates - )) + trust_messages.add( + PartialTrustMessage( + recipient_jid=own_jid, + updated_jid=own_jid, + trust_updates=own_trust_updates, + ) + ) else: # Notify our other devices about the changes in trust - trust_messages.add(PartialTrustMessage( - recipient_jid=own_jid, - updated_jid=updated_jid, - trust_updates=trust_updates - )) + trust_messages.add( + PartialTrustMessage( + recipient_jid=own_jid, + updated_jid=updated_jid, + trust_updates=trust_updates, + ) + ) # Send a summary of our own trust to newly trusted devices - trust_messages.add(PartialTrustMessage( - recipient_jid=updated_jid, - updated_jid=own_jid, - trust_updates=own_trust_updates - )) + trust_messages.add( + PartialTrustMessage( + recipient_jid=updated_jid, + updated_jid=own_jid, + trust_updates=own_trust_updates, + ) + ) # All trust messages prepared. Merge all trust messages directed at the same # recipient. - recipient_jids = { trust_message.recipient_jid for trust_message in trust_messages } + recipient_jids = {trust_message.recipient_jid for trust_message in trust_messages} for recipient_jid in recipient_jids: updated: Dict[jid.JID, Set[TrustUpdate]] = {} @@ -736,8 +745,9 @@ # Merge trust messages directed at that recipient if trust_message.recipient_jid == recipient_jid: # Merge the trust updates - updated[trust_message.updated_jid] = \ - updated.get(trust_message.updated_jid, set()) + updated[trust_message.updated_jid] = updated.get( + trust_message.updated_jid, set() + ) updated[trust_message.updated_jid] |= trust_message.trust_updates @@ -751,31 +761,34 @@ key_owner_elt["jid"] = updated_jid.userhost() for trust_update in trust_updates: - serialized_identity_key = \ - base64.b64encode(trust_update.target_key).decode("ASCII") + serialized_identity_key = base64.b64encode( + trust_update.target_key + ).decode("ASCII") if trust_update.target_trust: key_owner_elt.addElement( - (NS_TM, "trust"), - content=serialized_identity_key + (NS_TM, "trust"), content=serialized_identity_key ) else: key_owner_elt.addElement( - (NS_TM, "distrust"), - content=serialized_identity_key + (NS_TM, "distrust"), content=serialized_identity_key ) # Finally, encrypt and send the trust message! - message_data = client.generate_message_xml(MessageData({ - "from": own_jid, - "to": recipient_jid, - "uid": str(uuid.uuid4()), - "message": {}, - "subject": {}, - "type": C.MESS_TYPE_CHAT, - "extra": {}, - "timestamp": time.time() - })) + message_data = client.generate_message_xml( + MessageData( + { + "from": own_jid, + "to": recipient_jid, + "uid": str(uuid.uuid4()), + "message": {}, + "subject": {}, + "type": C.MESS_TYPE_CHAT, + "extra": {}, + "timestamp": time.time(), + } + ) + ) message_data["xml"].addChild(trust_message_elt) @@ -786,23 +799,21 @@ # TODO: The following is mostly duplicate code try: messages, encryption_errors = await session_manager.encrypt( - frozenset({ own_jid.userhost(), recipient_jid.userhost() }), - { OMEMO.NS_TWOMEMO: plaintext }, - backend_priority_order=[ OMEMO.NS_TWOMEMO ], - identifier=feedback_jid.userhost() + frozenset({own_jid.userhost(), recipient_jid.userhost()}), + {OMEMO.NS_TWOMEMO: plaintext}, + backend_priority_order=[OMEMO.NS_TWOMEMO], + identifier=feedback_jid.userhost(), ) except Exception as e: msg = _( # pylint: disable=consider-using-f-string "Can't encrypt message for {entities}: {reason}".format( - entities=', '.join({ own_jid.userhost(), recipient_jid.userhost() }), - reason=e + entities=", ".join({own_jid.userhost(), recipient_jid.userhost()}), + reason=e, ) ) log.warning(msg) - client.feedback(feedback_jid, msg, { - C.MESS_EXTRA_INFO: C.EXTRA_INFO_ENCR_ERR - }) + client.feedback(feedback_jid, msg, {C.MESS_EXTRA_INFO: C.EXTRA_INFO_ENCR_ERR}) raise e if len(encryption_errors) > 0: @@ -811,12 +822,13 @@ f" {encryption_errors}" ) - encrypted_errors_stringified = ", ".join([ - f"device {err.device_id} of {err.bare_jid} under namespace" - f" {err.namespace}" - for err - in encryption_errors - ]) + encrypted_errors_stringified = ", ".join( + [ + f"device {err.device_id} of {err.bare_jid} under namespace" + f" {err.namespace}" + for err in encryption_errors + ] + ) client.feedback( feedback_jid, @@ -827,23 +839,24 @@ " incomplete or broken, which shouldn't happen for actively used" " devices, and can usually be ignored. The following devices are" f" affected: {encrypted_errors_stringified}." - ) + ), ) message = next( - message for message in messages - if message.namespace == OMEMO.NS_TWOMEMO + message for message in messages if message.namespace == OMEMO.NS_TWOMEMO ) # Add the encrypted element - message_data["xml"].addChild(xml_tools.et_elt_2_domish_elt( - twomemo.etree.serialize_message(message) - )) + message_data["xml"].addChild( + xml_tools.et_elt_2_domish_elt(twomemo.etree.serialize_message(message)) + ) await client.a_send(message_data["xml"]) -def make_session_manager(sat: LiberviaBackend, profile: str) -> Type[omemo.SessionManager]: +def make_session_manager( + sat: LiberviaBackend, profile: str +) -> Type[omemo.SessionManager]: """ @param sat: The SAT instance. @param profile: The profile. @@ -876,10 +889,10 @@ extra={ XEP_0060.EXTRA_PUBLISH_OPTIONS: { XEP_0060.OPT_ACCESS_MODEL: "open", - XEP_0060.OPT_MAX_ITEMS: "max" + XEP_0060.OPT_MAX_ITEMS: "max", }, - XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "raise" - } + XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "raise", + }, ) except (error.StanzaError, Exception) as e: if ( @@ -917,10 +930,10 @@ extra={ XEP_0060.EXTRA_PUBLISH_OPTIONS: { XEP_0060.OPT_ACCESS_MODEL: "open", - XEP_0060.OPT_MAX_ITEMS: 1 + XEP_0060.OPT_MAX_ITEMS: 1, }, - XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options" - } + XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options", + }, ) except Exception as e: raise omemo.BundleUploadFailed( @@ -933,19 +946,14 @@ @staticmethod async def _download_bundle( - namespace: str, - bare_jid: str, - device_id: int + namespace: str, bare_jid: str, device_id: int ) -> omemo.Bundle: if namespace == twomemo.twomemo.NAMESPACE: node = "urn:xmpp:omemo:2:bundles" try: items, __ = await xep_0060.get_items( - client, - jid.JID(bare_jid), - node, - item_ids=[ str(device_id) ] + client, jid.JID(bare_jid), node, item_ids=[str(device_id)] ) except Exception as e: raise omemo.BundleDownloadFailed( @@ -962,7 +970,7 @@ element = next( iter(xml_tools.domish_elt_2_et_elt(cast(domish.Element, items[0]))), - None + None, ) if element is None: raise omemo.BundleDownloadFailed( @@ -981,10 +989,7 @@ if namespace == oldmemo.oldmemo.NAMESPACE: return await download_oldmemo_bundle( - client, - xep_0060, - bare_jid, - device_id + client, xep_0060, bare_jid, device_id ) raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}") @@ -999,8 +1004,8 @@ client, client.jid.userhostJID(), node, - [ str(device_id) ], - notify=False + [str(device_id)], + notify=False, ) except Exception as e: raise omemo.BundleDeletionFailed( @@ -1027,8 +1032,7 @@ @staticmethod async def _upload_device_list( - namespace: str, - device_list: Dict[int, Optional[str]] + namespace: str, device_list: Dict[int, Optional[str]] ) -> None: element: Optional[ET.Element] = None node: Optional[str] = None @@ -1053,10 +1057,10 @@ extra={ XEP_0060.EXTRA_PUBLISH_OPTIONS: { XEP_0060.OPT_MAX_ITEMS: 1, - XEP_0060.OPT_ACCESS_MODEL: "open" + XEP_0060.OPT_ACCESS_MODEL: "open", }, - XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "raise" - } + XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "raise", + }, ) except (error.StanzaError, Exception) as e: if ( @@ -1080,8 +1084,7 @@ @staticmethod async def _download_device_list( - namespace: str, - bare_jid: str + namespace: str, bare_jid: str ) -> Dict[int, Optional[str]]: node: Optional[str] = None @@ -1113,8 +1116,7 @@ ) element = next( - iter(xml_tools.domish_elt_2_et_elt(cast(domish.Element, items[0]))), - None + iter(xml_tools.domish_elt_2_et_elt(cast(domish.Element, items[0]))), None ) if element is None: @@ -1138,8 +1140,7 @@ raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}") async def _evaluate_custom_trust_level( - self, - device: omemo.DeviceInformation + self, device: omemo.DeviceInformation ) -> omemo.TrustLevel: # Get the custom trust level try: @@ -1161,11 +1162,12 @@ # on the trust system and phase if trust_level is TrustLevel.BLINDLY_TRUSTED: # Get the name of the active trust system - trust_system = cast(str, sat.memory.param_get_a( - PARAM_NAME, - PARAM_CATEGORY, - profile_key=profile - )) + trust_system = cast( + str, + sat.memory.param_get_a( + PARAM_NAME, PARAM_CATEGORY, profile_key=profile + ), + ) # If the trust model is BTBV, blind trust is always enabled if trust_system == "btbv": @@ -1177,10 +1179,11 @@ # Find out whether we are in phase one or two devices = await self.get_device_information(device.bare_jid) - phase_one = all(TrustLevel(device.trust_level_name) in { - TrustLevel.UNDECIDED, - TrustLevel.BLINDLY_TRUSTED - } for device in devices) + phase_one = all( + TrustLevel(device.trust_level_name) + in {TrustLevel.UNDECIDED, TrustLevel.BLINDLY_TRUSTED} + for device in devices + ) if phase_one: return omemo.TrustLevel.TRUSTED @@ -1194,9 +1197,7 @@ assert_never(trust_level) async def _make_trust_decision( - self, - undecided: FrozenSet[omemo.DeviceInformation], - identifier: Optional[str] + self, undecided: FrozenSet[omemo.DeviceInformation], identifier: Optional[str] ) -> None: if identifier is None: raise omemo.TrustDecisionFailed( @@ -1210,7 +1211,7 @@ # first manual verification is performed. Thus, we can separate bare JIDs into # two pools here, one pool of bare JIDs for which blind trust is active, and # one pool of bare JIDs for which manual trust is used instead. - bare_jids = { device.bare_jid for device in undecided } + bare_jids = {device.bare_jid for device in undecided} blind_trust_bare_jids: Set[str] = set() manual_trust_bare_jids: Set[str] = set() @@ -1222,20 +1223,23 @@ # If the trust levels of all devices correspond to those used by blind # trust, blind trust applies. Otherwise, fall back to manual trust. - if all(TrustLevel(device.trust_level_name) in { - TrustLevel.UNDECIDED, - TrustLevel.BLINDLY_TRUSTED - } for device in devices): + if all( + TrustLevel(device.trust_level_name) + in {TrustLevel.UNDECIDED, TrustLevel.BLINDLY_TRUSTED} + for device in devices + ): blind_trust_bare_jids.add(bare_jid) else: manual_trust_bare_jids.add(bare_jid) # With the JIDs sorted into their respective pools, the undecided devices can # be categorized too - blindly_trusted_devices = \ - { dev for dev in undecided if dev.bare_jid in blind_trust_bare_jids } - manually_trusted_devices = \ - { dev for dev in undecided if dev.bare_jid in manual_trust_bare_jids } + blindly_trusted_devices = { + dev for dev in undecided if dev.bare_jid in blind_trust_bare_jids + } + manually_trusted_devices = { + dev for dev in undecided if dev.bare_jid in manual_trust_bare_jids + } # Blindly trust devices handled by blind trust if len(blindly_trusted_devices) > 0: @@ -1243,15 +1247,16 @@ await self.set_trust( device.bare_jid, device.identity_key, - TrustLevel.BLINDLY_TRUSTED.name + TrustLevel.BLINDLY_TRUSTED.name, ) - blindly_trusted_devices_stringified = ", ".join([ - f"device {device.device_id} of {device.bare_jid} under namespace" - f" {device.namespaces}" - for device - in blindly_trusted_devices - ]) + blindly_trusted_devices_stringified = ", ".join( + [ + f"device {device.device_id} of {device.bare_jid} under namespace" + f" {device.namespaces}" + for device in blindly_trusted_devices + ] + ) client.feedback( feedback_jid, @@ -1259,7 +1264,7 @@ "Not all destination devices are trusted, unknown devices will be" " blindly trusted.\nFollowing devices have been automatically" f" trusted: {blindly_trusted_devices_stringified}." - ) + ), ) # Prompt the user for manual trust decisions on the devices handled by manual @@ -1272,11 +1277,10 @@ " message in such a situation. Please indicate if you trust" " those devices or not in the trust manager before we can" " send this message." - ) + ), ) await self.__prompt_manual_trust( - frozenset(manually_trusted_devices), - feedback_jid + frozenset(manually_trusted_devices), feedback_jid ) @staticmethod @@ -1291,16 +1295,20 @@ if element is None: raise omemo.UnknownNamespace(f"Unknown namespace: {message.namespace}") - message_data = client.generate_message_xml(MessageData({ - "from": client.jid, - "to": jid.JID(bare_jid), - "uid": str(uuid.uuid4()), - "message": {}, - "subject": {}, - "type": C.MESS_TYPE_CHAT, - "extra": {}, - "timestamp": time.time() - })) + message_data = client.generate_message_xml( + MessageData( + { + "from": client.jid, + "to": jid.JID(bare_jid), + "uid": str(uuid.uuid4()), + "message": {}, + "subject": {}, + "type": C.MESS_TYPE_CHAT, + "extra": {}, + "timestamp": time.time(), + } + ) + ) message_data["xml"].addChild(xml_tools.et_elt_2_domish_elt(element)) @@ -1310,9 +1318,7 @@ raise omemo.MessageSendingFailed() from e async def __prompt_manual_trust( - self, - undecided: FrozenSet[omemo.DeviceInformation], - feedback_jid: jid.JID + self, undecided: FrozenSet[omemo.DeviceInformation], feedback_jid: jid.JID ) -> None: """Asks the user to decide on the manual trust level of a set of devices. @@ -1340,20 +1346,25 @@ # Casting this to Any, otherwise all calls on the variable cause type errors # pylint: disable=no-member - trust_ui = cast(Any, xml_tools.XMLUI( - panel_type=C.XMLUI_FORM, - title=D_("OMEMO trust management"), - submit_id="" - )) - trust_ui.addText(D_( - "This is OMEMO trusting system. You'll see below the devices of your " - "contacts, and a checkbox to trust them or not. A trusted device " - "can read your messages in plain text, so be sure to only validate " - "devices that you are sure are belonging to your contact. It's better " - "to do this when you are next to your contact and their device, so " - "you can check the \"fingerprint\" (the number next to the device) " - "yourself. Do *not* validate a device if the fingerprint is wrong!" - )) + trust_ui = cast( + Any, + xml_tools.XMLUI( + panel_type=C.XMLUI_FORM, + title=D_("OMEMO trust management"), + submit_id="", + ), + ) + trust_ui.addText( + D_( + "This is OMEMO trusting system. You'll see below the devices of your " + "contacts, and a checkbox to trust them or not. A trusted device " + "can read your messages in plain text, so be sure to only validate " + "devices that you are sure are belonging to your contact. It's better " + "to do this when you are next to your contact and their device, so " + 'you can check the "fingerprint" (the number next to the device) ' + "yourself. Do *not* validate a device if the fingerprint is wrong!" + ) + ) own_device, __ = await self.get_own_device_information() @@ -1384,16 +1395,16 @@ trust_ui_result = await xml_tools.defer_xmlui( sat, trust_ui, - action_extra={ "meta_encryption_trust": namespace }, - profile=profile + action_extra={"meta_encryption_trust": namespace}, + profile=profile, ) if C.bool(trust_ui_result.get("cancelled", "false")): raise omemo.TrustDecisionFailed("Trust UI cancelled.") - data_form_result = cast(Dict[str, str], xml_tools.xmlui_result_2_data_form_result( - trust_ui_result - )) + data_form_result = cast( + Dict[str, str], xml_tools.xmlui_result_2_data_form_result(trust_ui_result) + ) trust_updates: Set[TrustUpdate] = set() @@ -1401,29 +1412,29 @@ if not key.startswith("trust_"): continue - device = undecided_ordered[int(key[len("trust_"):])] + device = undecided_ordered[int(key[len("trust_") :])] target_trust = C.bool(value) - trust_level = \ + trust_level = ( TrustLevel.TRUSTED if target_trust else TrustLevel.DISTRUSTED + ) await self.set_trust( - device.bare_jid, - device.identity_key, - trust_level.name + device.bare_jid, device.identity_key, trust_level.name ) - trust_updates.add(TrustUpdate( - target_jid=jid.JID(device.bare_jid).userhostJID(), - target_key=device.identity_key, - target_trust=target_trust - )) + trust_updates.add( + TrustUpdate( + target_jid=jid.JID(device.bare_jid).userhostJID(), + target_key=device.identity_key, + target_trust=target_trust, + ) + ) # Check whether ATM is enabled and handle everything in case it is - trust_system = cast(str, sat.memory.param_get_a( - PARAM_NAME, - PARAM_CATEGORY, - profile_key=profile - )) + trust_system = cast( + str, + sat.memory.param_get_a(PARAM_NAME, PARAM_CATEGORY, profile_key=profile), + ) if trust_system == "atm": await manage_trust_message_cache(client, self, frozenset(trust_updates)) @@ -1439,7 +1450,7 @@ signed_pre_key_rotation_period: int = 7 * 24 * 60 * 60, pre_key_refill_threshold: int = 99, max_num_per_session_skipped_keys: int = 1000, - max_num_per_message_skipped_keys: Optional[int] = None + max_num_per_message_skipped_keys: Optional[int] = None, ) -> omemo.SessionManager: """Prepare the OMEMO library (storage, backends, core) for a specific profile. @@ -1492,11 +1503,8 @@ TrustLevel.UNDECIDED.name, TrustLevel.DISTRUSTED.name, lambda bare_jid, device_id: download_oldmemo_bundle( - client, - xep_0060, - bare_jid, - device_id - ) + client, xep_0060, bare_jid, device_id + ), ) session_manager = await make_session_manager(sat, profile).create( @@ -1504,13 +1512,13 @@ twomemo.Twomemo( storage, max_num_per_session_skipped_keys, - max_num_per_message_skipped_keys + max_num_per_message_skipped_keys, ), oldmemo.Oldmemo( storage, max_num_per_session_skipped_keys, - max_num_per_message_skipped_keys - ) + max_num_per_message_skipped_keys, + ), ], storage, client.jid.userhost(), @@ -1518,7 +1526,7 @@ TrustLevel.UNDECIDED.value, signed_pre_key_rotation_period, pre_key_refill_threshold, - omemo.AsyncFramework.TWISTED + omemo.AsyncFramework.TWISTED, ) # This shouldn't hurt here since we're not running on overly constrainted devices. @@ -1563,6 +1571,7 @@ between the two is maintained. MUC messages are supported next to one to one messages. For trust management, the two trust models "ATM" and "BTBV" are supported. """ + NS_TWOMEMO = twomemo.twomemo.NAMESPACE NS_OLDMEMO = oldmemo.oldmemo.NAMESPACE @@ -1572,7 +1581,7 @@ time_policy=SCEAffixPolicy.OPTIONAL, to_policy=SCEAffixPolicy.REQUIRED, from_policy=SCEAffixPolicy.OPTIONAL, - custom_policies={} + custom_policies={}, ) # For everything but MUC/MIX message stanzas, the <to/> affix is a MAY @@ -1581,7 +1590,7 @@ time_policy=SCEAffixPolicy.OPTIONAL, to_policy=SCEAffixPolicy.OPTIONAL, from_policy=SCEAffixPolicy.OPTIONAL, - custom_policies={} + custom_policies={}, ) def __init__(self, host: LiberviaBackend) -> None: @@ -1625,9 +1634,7 @@ # messages. Temporarily, until a more fitting trigger for SCE-based encryption is # added, the message_received trigger is also used for twomemo. host.trigger.add( - "message_received", - self._message_received_trigger, - priority=100050 + "message_received", self._message_received_trigger, priority=100050 ) host.trigger.add("send", self.__send_trigger, priority=0) @@ -1646,14 +1653,14 @@ TWOMEMO_DEVICE_LIST_NODE, lambda items_event, profile: defer.ensureDeferred( self.__on_device_list_update(items_event, profile) - ) + ), ) xep_0163.add_pep_event( "OLDMEMO_DEVICES", OLDMEMO_DEVICE_LIST_NODE, lambda items_event, profile: defer.ensureDeferred( self.__on_device_list_update(items_event, profile) - ) + ), ) try: @@ -1664,21 +1671,16 @@ self.__text_commands.register_text_commands(self) def profile_connected( # pylint: disable=invalid-name - self, - client: SatXMPPClient + self, client: SatXMPPClient ) -> None: """ @param client: The client. """ - defer.ensureDeferred(self.get_session_manager( - cast(str, client.profile) - )) + defer.ensureDeferred(self.get_session_manager(cast(str, client.profile))) async def cmd_omemo_reset( - self, - client: SatXMPPClient, - mess_data: MessageData + self, client: SatXMPPClient, mess_data: MessageData ) -> Literal[False]: """Reset all sessions of devices that belong to the recipient of ``mess_data``. @@ -1693,16 +1695,18 @@ the message is not supposed to be sent. """ - twomemo_requested = \ - client.encryption.is_encryption_requested(mess_data, twomemo.twomemo.NAMESPACE) - oldmemo_requested = \ - client.encryption.is_encryption_requested(mess_data, oldmemo.oldmemo.NAMESPACE) + twomemo_requested = client.encryption.is_encryption_requested( + mess_data, twomemo.twomemo.NAMESPACE + ) + oldmemo_requested = client.encryption.is_encryption_requested( + mess_data, oldmemo.oldmemo.NAMESPACE + ) if not (twomemo_requested or oldmemo_requested): self.__text_commands.feed_back( client, _("You need to have OMEMO encryption activated to reset the session"), - mess_data + mess_data, ) return False @@ -1716,17 +1720,13 @@ await session_manager.replace_sessions(device) self.__text_commands.feed_back( - client, - _("OMEMO session has been reset"), - mess_data + client, _("OMEMO session has been reset"), mess_data ) return False async def get_trust_ui( # pylint: disable=invalid-name - self, - client: SatXMPPClient, - entity: jid.JID + self, client: SatXMPPClient, entity: jid.JID ) -> xml_tools.XMLUI: """ @param client: The client. @@ -1742,22 +1742,23 @@ if self.__xep_0045 is not None and self.__xep_0045.is_joined_room(client, entity): bare_jids = self.__get_joined_muc_users(client, self.__xep_0045, entity) else: - bare_jids = { entity.userhost() } + bare_jids = {entity.userhost()} session_manager = await self.get_session_manager(client.profile) # At least sort the devices by bare JID such that they aren't listed completely # random - devices = sorted(cast(Set[omemo.DeviceInformation], set()).union(*[ - await session_manager.get_device_information(bare_jid) - for bare_jid - in bare_jids - ]), key=lambda device: device.bare_jid) - - async def callback( - data: Any, - profile: str - ) -> Dict[Never, Never]: + devices = sorted( + cast(Set[omemo.DeviceInformation], set()).union( + *[ + await session_manager.get_device_information(bare_jid) + for bare_jid in bare_jids + ] + ), + key=lambda device: device.bare_jid, + ) + + async def callback(data: Any, profile: str) -> Dict[Never, Never]: """ @param data: The XMLUI result produces by the trust UI form. @param profile: The profile. @@ -1769,8 +1770,7 @@ return {} data_form_result = cast( - Dict[str, str], - xml_tools.xmlui_result_2_data_form_result(data) + Dict[str, str], xml_tools.xmlui_result_2_data_form_result(data) ) trust_updates: Set[TrustUpdate] = set() @@ -1779,14 +1779,12 @@ if not key.startswith("trust_"): continue - device = devices[int(key[len("trust_"):])] + device = devices[int(key[len("trust_") :])] trust_level_name = value if device.trust_level_name != trust_level_name: await session_manager.set_trust( - device.bare_jid, - device.identity_key, - trust_level_name + device.bare_jid, device.identity_key, trust_level_name ) target_trust: Optional[bool] = None @@ -1797,31 +1795,30 @@ target_trust = False if target_trust is not None: - trust_updates.add(TrustUpdate( - target_jid=jid.JID(device.bare_jid).userhostJID(), - target_key=device.identity_key, - target_trust=target_trust - )) + trust_updates.add( + TrustUpdate( + target_jid=jid.JID(device.bare_jid).userhostJID(), + target_key=device.identity_key, + target_trust=target_trust, + ) + ) # Check whether ATM is enabled and handle everything in case it is - trust_system = cast(str, self.host.memory.param_get_a( - PARAM_NAME, - PARAM_CATEGORY, - profile_key=profile - )) + trust_system = cast( + str, + self.host.memory.param_get_a( + PARAM_NAME, PARAM_CATEGORY, profile_key=profile + ), + ) if trust_system == "atm": if len(trust_updates) > 0: await manage_trust_message_cache( - client, - session_manager, - frozenset(trust_updates) + client, session_manager, frozenset(trust_updates) ) await send_trust_messages( - client, - session_manager, - frozenset(trust_updates) + client, session_manager, frozenset(trust_updates) ) return {} @@ -1831,22 +1828,24 @@ result = xml_tools.XMLUI( panel_type=C.XMLUI_FORM, title=D_("OMEMO trust management"), - submit_id=submit_id + submit_id=submit_id, ) # Casting this to Any, otherwise all calls on the variable cause type errors # pylint: disable=no-member trust_ui = cast(Any, result) - trust_ui.addText(D_( - "This is OMEMO trusting system. You'll see below the devices of your" - " contacts, and a list selection to trust them or not. A trusted device" - " can read your messages in plain text, so be sure to only validate" - " devices that you are sure are belonging to your contact. It's better" - " to do this when you are next to your contact and their device, so" - " you can check the \"fingerprint\" (the number next to the device)" - " yourself. Do *not* validate a device if the fingerprint is wrong!" - " Note that manually validating a fingerprint disables any form of automatic" - " trust." - )) + trust_ui.addText( + D_( + "This is OMEMO trusting system. You'll see below the devices of your" + " contacts, and a list selection to trust them or not. A trusted device" + " can read your messages in plain text, so be sure to only validate" + " devices that you are sure are belonging to your contact. It's better" + " to do this when you are next to your contact and their device, so" + ' you can check the "fingerprint" (the number next to the device)' + " yourself. Do *not* validate a device if the fingerprint is wrong!" + " Note that manually validating a fingerprint disables any form of automatic" + " trust." + ) + ) own_device, __ = await session_manager.get_own_device_information() @@ -1854,9 +1853,9 @@ trust_ui.addLabel(D_("This device ID")) trust_ui.addText(str(own_device.device_id)) trust_ui.addLabel(D_("This device's fingerprint")) - trust_ui.addText(" ".join(session_manager.format_identity_key( - own_device.identity_key - ))) + trust_ui.addText( + " ".join(session_manager.format_identity_key(own_device.identity_key)) + ) trust_ui.addEmpty() trust_ui.addEmpty() @@ -1866,20 +1865,23 @@ trust_ui.addLabel(D_("Device ID")) trust_ui.addText(str(device.device_id)) trust_ui.addLabel(D_("Fingerprint")) - trust_ui.addText(" ".join(session_manager.format_identity_key( - device.identity_key - ))) + trust_ui.addText( + " ".join(session_manager.format_identity_key(device.identity_key)) + ) trust_ui.addLabel(D_("Trust this device?")) current_trust_level = TrustLevel(device.trust_level_name) - avaiable_trust_levels = \ - { TrustLevel.DISTRUSTED, TrustLevel.TRUSTED, current_trust_level } + avaiable_trust_levels = { + TrustLevel.DISTRUSTED, + TrustLevel.TRUSTED, + current_trust_level, + } trust_ui.addList( f"trust_{index}", - options=[ trust_level.name for trust_level in avaiable_trust_levels ], + options=[trust_level.name for trust_level in avaiable_trust_levels], selected=current_trust_level.name, - styles=[ "inline" ] + styles=["inline"], ) twomemo_active = dict(device.active).get(twomemo.twomemo.NAMESPACE) @@ -1905,9 +1907,7 @@ @staticmethod def __get_joined_muc_users( - client: SatXMPPClient, - xep_0045: XEP_0045, - room_jid: jid.JID + client: SatXMPPClient, xep_0045: XEP_0045, room_jid: jid.JID ) -> Set[str]: """ @param client: The client. @@ -1966,9 +1966,7 @@ # Build and store the session manager try: session_manager = await prepare_for_profile( - self.host, - profile, - initial_own_label="Libervia" + self.host, profile, initial_own_label="Libervia" ) except Exception as e: # In case of an error during initalization, notify the waiters accordingly @@ -1995,7 +1993,7 @@ message_elt: domish.Element, session_manager: omemo.SessionManager, sender_device_information: omemo.DeviceInformation, - timestamp: datetime + timestamp: datetime, ) -> None: """Check a newly decrypted message stanza for ATM content and perform ATM in case. @@ -2008,8 +2006,7 @@ """ trust_message_cache = persistent.LazyPersistentBinaryDict( - "XEP-0384/TM", - client.profile + "XEP-0384/TM", client.profile ) new_cache_entries: Set[TrustMessageCacheEntry] = set() @@ -2040,30 +2037,34 @@ for trust_elt in key_owner_elt.elements(NS_TM, "trust"): assert isinstance(trust_elt, domish.Element) - new_cache_entries.add(TrustMessageCacheEntry( - sender_jid=jid.JID(sender_device_information.bare_jid), - sender_key=sender_device_information.identity_key, - timestamp=timestamp, - trust_update=TrustUpdate( - target_jid=key_owner_jid, - target_key=base64.b64decode(str(trust_elt)), - target_trust=True + new_cache_entries.add( + TrustMessageCacheEntry( + sender_jid=jid.JID(sender_device_information.bare_jid), + sender_key=sender_device_information.identity_key, + timestamp=timestamp, + trust_update=TrustUpdate( + target_jid=key_owner_jid, + target_key=base64.b64decode(str(trust_elt)), + target_trust=True, + ), ) - )) + ) for distrust_elt in key_owner_elt.elements(NS_TM, "distrust"): assert isinstance(distrust_elt, domish.Element) - new_cache_entries.add(TrustMessageCacheEntry( - sender_jid=jid.JID(sender_device_information.bare_jid), - sender_key=sender_device_information.identity_key, - timestamp=timestamp, - trust_update=TrustUpdate( - target_jid=key_owner_jid, - target_key=base64.b64decode(str(distrust_elt)), - target_trust=False + new_cache_entries.add( + TrustMessageCacheEntry( + sender_jid=jid.JID(sender_device_information.bare_jid), + sender_key=sender_device_information.identity_key, + timestamp=timestamp, + trust_update=TrustUpdate( + target_jid=key_owner_jid, + target_key=base64.b64decode(str(distrust_elt)), + target_trust=False, + ), ) - )) + ) # Load existing cache entries existing_cache_entries = { @@ -2075,10 +2076,9 @@ existing_by_target = { ( cache_entry.trust_update.target_jid.userhostJID(), - cache_entry.trust_update.target_key + cache_entry.trust_update.target_key, ): cache_entry - for cache_entry - in existing_cache_entries + for cache_entry in existing_cache_entries } # Iterate over a copy here, such that new_cache_entries can be modified @@ -2086,9 +2086,9 @@ existing_cache_entry = existing_by_target.get( ( new_cache_entry.trust_update.target_jid.userhostJID(), - new_cache_entry.trust_update.target_key + new_cache_entry.trust_update.target_key, ), - None + None, ) if existing_cache_entry is not None: @@ -2118,7 +2118,7 @@ await session_manager.set_trust( trust_update.target_jid.userhost(), trust_update.target_key, - trust_level.name + trust_level.name, ) applied_trust_updates.add(trust_update) @@ -2127,23 +2127,20 @@ # Store the remaining existing and new cache entries await trust_message_cache.force( - "cache", - [tm.to_dict() for tm in existing_cache_entries | new_cache_entries] + "cache", [tm.to_dict() for tm in existing_cache_entries | new_cache_entries] ) # If the trust of at least one device was modified, run the ATM cache update logic if len(applied_trust_updates) > 0: await manage_trust_message_cache( - client, - session_manager, - frozenset(applied_trust_updates) + client, session_manager, frozenset(applied_trust_updates) ) async def _message_received_trigger( self, client: SatXMPPClient, message_elt: domish.Element, - post_treat: defer.Deferred + post_treat: defer.Deferred, ) -> bool: """ @param client: The client which received the message. @@ -2211,9 +2208,7 @@ message_uid = message_elt.getAttribute("id") if message_uid is not None: muc_plaintext_cache_key = MUCPlaintextCacheKey( - client, - room_jid, - message_uid + client, room_jid, message_uid ) else: # I'm not sure why this check is required, this code is copied from the old @@ -2231,15 +2226,15 @@ message: Optional[omemo.Message] = None encrypted_elt: Optional[domish.Element] = None - twomemo_encrypted_elt = cast(Optional[domish.Element], next( - message_elt.elements(twomemo.twomemo.NAMESPACE, "encrypted"), - None - )) - - oldmemo_encrypted_elt = cast(Optional[domish.Element], next( - message_elt.elements(oldmemo.oldmemo.NAMESPACE, "encrypted"), - None - )) + twomemo_encrypted_elt = cast( + Optional[domish.Element], + next(message_elt.elements(twomemo.twomemo.NAMESPACE, "encrypted"), None), + ) + + oldmemo_encrypted_elt = cast( + Optional[domish.Element], + next(message_elt.elements(oldmemo.oldmemo.NAMESPACE, "encrypted"), None), + ) try: session_manager = await self.get_session_manager(cast(str, client.profile)) @@ -2251,8 +2246,7 @@ if twomemo_encrypted_elt is not None: try: message = twomemo.etree.parse_message( - xml_tools.domish_elt_2_et_elt(twomemo_encrypted_elt), - sender_bare_jid + xml_tools.domish_elt_2_et_elt(twomemo_encrypted_elt), sender_bare_jid ) except (ValueError, XMLSchemaValidationError): log.warning( @@ -2268,7 +2262,7 @@ xml_tools.domish_elt_2_et_elt(oldmemo_encrypted_elt), sender_bare_jid, client.jid.userhost(), - session_manager + session_manager, ) except (ValueError, XMLSchemaValidationError): log.warning( @@ -2324,7 +2318,7 @@ f"An OMEMO message from {sender_jid.full()} has not been" f" encrypted for our device, we can't decrypt it." ), - { C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR } + {C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR}, ) log.warning("Message not encrypted for us.") else: @@ -2333,17 +2327,18 @@ # No point in further processing this message. return False except Exception as e: - log.warning(_("Can't decrypt message: {reason}\n{xml}").format( - reason=e, - xml=message_elt.toXml() - )) + log.warning( + _("Can't decrypt message: {reason}\n{xml}").format( + reason=e, xml=message_elt.toXml() + ) + ) client.feedback( feedback_jid, D_( f"An OMEMO message from {sender_jid.full()} can't be decrypted:" f" {e}" ), - { C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR } + {C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR}, ) # No point in further processing this message return False @@ -2354,25 +2349,24 @@ if plaintext is not None: # XEP_0420.unpack_stanza handles the whole unpacking, including the # relevant modifications to the element - sce_profile = \ + sce_profile = ( OMEMO.SCE_PROFILE_GROUPCHAT if is_muc_message else OMEMO.SCE_PROFILE + ) try: affix_values = self.__xep_0420.unpack_stanza( - sce_profile, - message_elt, - plaintext + sce_profile, message_elt, plaintext ) except Exception as e: - log.warning(D_( - f"Error unpacking SCE-encrypted message: {e}\n{plaintext}" - )) + log.warning( + D_(f"Error unpacking SCE-encrypted message: {e}\n{plaintext}") + ) client.feedback( feedback_jid, D_( f"An OMEMO message from {sender_jid.full()} was rejected:" f" {e}" ), - { C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR } + {C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR}, ) # No point in further processing this message return False @@ -2395,8 +2389,9 @@ message_elt.addElement("body", content=plaintext.decode("utf-8")) # Mark the message as trusted or untrusted. Undecided counts as untrusted here. - trust_level = \ - await session_manager._evaluate_custom_trust_level(device_information) + trust_level = await session_manager._evaluate_custom_trust_level( + device_information + ) if trust_level is omemo.TrustLevel.TRUSTED: post_treat.addCallback(client.encryption.mark_as_trusted) @@ -2405,8 +2400,7 @@ # Mark the message as originally encrypted post_treat.addCallback( - client.encryption.mark_as_encrypted, - namespace=message.namespace + client.encryption.mark_as_encrypted, namespace=message.namespace ) # Handle potential ATM trust updates @@ -2416,7 +2410,7 @@ message_elt, session_manager, device_information, - affix_values.timestamp + affix_values.timestamp, ) # Message processed successfully, continue with the flow @@ -2430,7 +2424,7 @@ """ # SCE is only applicable to message and IQ stanzas # FIXME: temporary disabling IQ stanza encryption - if stanza.name not in { "message" }: # , "iq" }: + if stanza.name not in {"message"}: # , "iq" }: return True # Get the intended recipient @@ -2466,17 +2460,15 @@ stanza, recipient_bare_jid, stanza.getAttribute("type", C.MESS_TYPE_NORMAL) == C.MESS_TYPE_GROUPCHAT, - stanza.getAttribute("id", None) + stanza.getAttribute("id", None), ) else: # Encryption is requested for this recipient, but not with twomemo return True - - # Add a store hint if this is a message stanza if stanza.name == "message": - self.__xep_0334.add_hint_elements(stanza, [ "store" ]) + self.__xep_0334.add_hint_elements(stanza, ["store"]) # Let the flow continue. return True @@ -2501,9 +2493,8 @@ device_information = await session_manager.get_device_information( bare_jid.userhost() ) - if ( - not device_information - or not all(namespace in di.namespaces for di in device_information) + if not device_information or not all( + namespace in di.namespaces for di in device_information ): if namespace == self.NS_TWOMEMO: algo, node = "OMEMO", TWOMEMO_DEVICE_LIST_NODE @@ -2528,7 +2519,7 @@ stanza: domish.Element, recipient_jids: Union[jid.JID, Set[jid.JID]], is_muc_message: bool, - stanza_id: Optional[str] + stanza_id: Optional[str], ) -> None: """ @param client: The client. @@ -2577,15 +2568,11 @@ room_jid = feedback_jid = recipient_jid.userhostJID() recipient_bare_jids = self.__get_joined_muc_users( - client, - self.__xep_0045, - room_jid + client, self.__xep_0045, room_jid ) muc_plaintext_cache_key = MUCPlaintextCacheKey( - client=client, - room_jid=room_jid, - message_uid=stanza_id + client=client, room_jid=room_jid, message_uid=stanza_id ) else: recipient_bare_jids = {r.userhost() for r in recipient_jids} @@ -2611,7 +2598,7 @@ if namespace == twomemo.twomemo.NAMESPACE: return self.__xep_0420.pack_stanza( OMEMO.SCE_PROFILE_GROUPCHAT if is_muc_message else OMEMO.SCE_PROFILE, - stanza + stanza, ) if namespace == oldmemo.oldmemo.NAMESPACE: @@ -2622,7 +2609,7 @@ plaintext = str(child).encode("utf-8") # Any other sensitive elements to remove here? - if child.name in { "body", "html" }: + if child.name in {"body", "html"}: stanza.children.remove(child) if plaintext is None: @@ -2644,27 +2631,26 @@ log.debug(f"Plaintext to encrypt: {plaintext}") session_manager = await self.get_session_manager(client.profile) - await self.download_missing_device_lists(client, namespace, recipient_jids, session_manager) + await self.download_missing_device_lists( + client, namespace, recipient_jids, session_manager + ) try: messages, encryption_errors = await session_manager.encrypt( frozenset(recipient_bare_jids), - { namespace: plaintext }, - backend_priority_order=[ namespace ], - identifier=feedback_jid.userhost() + {namespace: plaintext}, + backend_priority_order=[namespace], + identifier=feedback_jid.userhost(), ) except Exception as e: msg = _( # pylint: disable=consider-using-f-string "Can't encrypt message for {entities}: {reason}".format( - entities=', '.join(recipient_bare_jids), - reason=e + entities=", ".join(recipient_bare_jids), reason=e ) ) log.warning(msg) - client.feedback(feedback_jid, msg, { - C.MESS_EXTRA_INFO: C.EXTRA_INFO_ENCR_ERR - }) + client.feedback(feedback_jid, msg, {C.MESS_EXTRA_INFO: C.EXTRA_INFO_ENCR_ERR}) raise e if len(encryption_errors) > 0: @@ -2673,12 +2659,13 @@ f" {encryption_errors}" ) - encrypted_errors_stringified = ", ".join([ - f"device {err.device_id} of {err.bare_jid} under namespace" - f" {err.namespace}" - for err - in encryption_errors - ]) + encrypted_errors_stringified = ", ".join( + [ + f"device {err.device_id} of {err.bare_jid} under namespace" + f" {err.namespace}" + for err in encryption_errors + ] + ) client.feedback( feedback_jid, @@ -2689,30 +2676,28 @@ " incomplete or broken, which shouldn't happen for actively used" " devices, and can usually be ignored. The following devices are" f" affected: {encrypted_errors_stringified}." - ) + ), ) message = next(message for message in messages if message.namespace == namespace) if namespace == twomemo.twomemo.NAMESPACE: # Add the encrypted element - stanza.addChild(xml_tools.et_elt_2_domish_elt( - twomemo.etree.serialize_message(message) - )) + stanza.addChild( + xml_tools.et_elt_2_domish_elt(twomemo.etree.serialize_message(message)) + ) if namespace == oldmemo.oldmemo.NAMESPACE: # Add the encrypted element - stanza.addChild(xml_tools.et_elt_2_domish_elt( - oldmemo.etree.serialize_message(message) - )) + stanza.addChild( + xml_tools.et_elt_2_domish_elt(oldmemo.etree.serialize_message(message)) + ) if muc_plaintext_cache_key is not None: self.__muc_plaintext_cache[muc_plaintext_cache_key] = plaintext async def __on_device_list_update( - self, - items_event: pubsub.ItemsEvent, - profile: str + self, items_event: pubsub.ItemsEvent, profile: str ) -> None: """Handle device list updates fired by PEP. @@ -2726,10 +2711,7 @@ await self._update_device_list(client, sender, items) async def _update_device_list( - self, - client: SatXMPPEntity, - sender: jid.JID, - items: list[domish.Element] + self, client: SatXMPPEntity, sender: jid.JID, items: list[domish.Element] ) -> None: if len(items) > 1: @@ -2774,7 +2756,5 @@ session_manager = await self.get_session_manager(client.profile) await session_manager.update_device_list( - namespace, - sender.userhost(), - device_list + namespace, sender.userhost(), device_list )