Mercurial > libervia-backend
comparison sat/plugins/plugin_xep_0384.py @ 4037:524856bd7b19
massive refactoring to switch from camelCase to snake_case:
historically, Libervia (SàT before) was using camelCase as allowed by PEP8 when using a
pre-PEP8 code, to use the same coding style as in Twisted.
However, snake_case is more readable and it's better to follow PEP8 best practices, so it
has been decided to move on full snake_case. Because Libervia has a huge codebase, this
ended with a ugly mix of camelCase and snake_case.
To fix that, this patch does a big refactoring by renaming every function and method
(including bridge) that are not coming from Twisted or Wokkel, to use fully snake_case.
This is a massive change, and may result in some bugs.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 08 Apr 2023 13:54:42 +0200 |
parents | 1d5a81e3c9e8 |
children | c23cad65ae99 |
comparison
equal
deleted
inserted
replaced
4036:c4464d7ae97b | 4037:524856bd7b19 |
---|---|
203 KEY_ALL_JIDS = "ALL_JIDS" | 203 KEY_ALL_JIDS = "ALL_JIDS" |
204 | 204 |
205 def __init__(self, profile: str, own_bare_jid: str) -> None: | 205 def __init__(self, profile: str, own_bare_jid: str) -> None: |
206 """ | 206 """ |
207 @param profile: The profile this OMEMO data belongs to. | 207 @param profile: The profile this OMEMO data belongs to. |
208 @param own_bare_jid: The own bare JID, to return by the :meth:`loadOwnData` call. | 208 @param own_bare_jid: The own bare JID, to return by the :meth:`load_own_data` call. |
209 """ | 209 """ |
210 | 210 |
211 self.__storage = persistent.LazyPersistentBinaryDict("XEP-0384", profile) | 211 self.__storage = persistent.LazyPersistentBinaryDict("XEP-0384", profile) |
212 self.__own_bare_jid = own_bare_jid | 212 self.__own_bare_jid = own_bare_jid |
213 | 213 |
344 | 344 |
345 namespace = oldmemo.oldmemo.NAMESPACE | 345 namespace = oldmemo.oldmemo.NAMESPACE |
346 node = f"eu.siacs.conversations.axolotl.bundles:{device_id}" | 346 node = f"eu.siacs.conversations.axolotl.bundles:{device_id}" |
347 | 347 |
348 try: | 348 try: |
349 items, __ = await xep_0060.getItems(client, jid.JID(bare_jid), node, max_items=1) | 349 items, __ = await xep_0060.get_items(client, jid.JID(bare_jid), node, max_items=1) |
350 except Exception as e: | 350 except Exception as e: |
351 raise omemo.BundleDownloadFailed( | 351 raise omemo.BundleDownloadFailed( |
352 f"Bundle download failed for {bare_jid}: {device_id} under namespace" | 352 f"Bundle download failed for {bare_jid}: {device_id} under namespace" |
353 f" {namespace}" | 353 f" {namespace}" |
354 ) from e | 354 ) from e |
724 (NS_TM, "distrust"), | 724 (NS_TM, "distrust"), |
725 content=serialized_identity_key | 725 content=serialized_identity_key |
726 ) | 726 ) |
727 | 727 |
728 # Finally, encrypt and send the trust message! | 728 # Finally, encrypt and send the trust message! |
729 message_data = client.generateMessageXML(MessageData({ | 729 message_data = client.generate_message_xml(MessageData({ |
730 "from": own_jid, | 730 "from": own_jid, |
731 "to": recipient_jid, | 731 "to": recipient_jid, |
732 "uid": str(uuid.uuid4()), | 732 "uid": str(uuid.uuid4()), |
733 "message": {}, | 733 "message": {}, |
734 "subject": {}, | 734 "subject": {}, |
809 @param profile: The profile. | 809 @param profile: The profile. |
810 @return: A non-abstract subclass of :class:`~omemo.session_manager.SessionManager` | 810 @return: A non-abstract subclass of :class:`~omemo.session_manager.SessionManager` |
811 with XMPP interactions and trust handled via the SAT instance. | 811 with XMPP interactions and trust handled via the SAT instance. |
812 """ | 812 """ |
813 | 813 |
814 client = sat.getClient(profile) | 814 client = sat.get_client(profile) |
815 xep_0060 = cast(XEP_0060, sat.plugins["XEP-0060"]) | 815 xep_0060 = cast(XEP_0060, sat.plugins["XEP-0060"]) |
816 | 816 |
817 class SessionManagerImpl(omemo.SessionManager): | 817 class SessionManagerImpl(omemo.SessionManager): |
818 """ | 818 """ |
819 Session manager implementation handling XMPP interactions and trust via an | 819 Session manager implementation handling XMPP interactions and trust via an |
825 if isinstance(bundle, twomemo.twomemo.BundleImpl): | 825 if isinstance(bundle, twomemo.twomemo.BundleImpl): |
826 element = twomemo.etree.serialize_bundle(bundle) | 826 element = twomemo.etree.serialize_bundle(bundle) |
827 | 827 |
828 node = "urn:xmpp:omemo:2:bundles" | 828 node = "urn:xmpp:omemo:2:bundles" |
829 try: | 829 try: |
830 await xep_0060.sendItem( | 830 await xep_0060.send_item( |
831 client, | 831 client, |
832 client.jid.userhostJID(), | 832 client.jid.userhostJID(), |
833 node, | 833 node, |
834 xml_tools.et_elt_2_domish_elt(element), | 834 xml_tools.et_elt_2_domish_elt(element), |
835 item_id=str(bundle.device_id), | 835 item_id=str(bundle.device_id), |
865 if isinstance(bundle, oldmemo.oldmemo.BundleImpl): | 865 if isinstance(bundle, oldmemo.oldmemo.BundleImpl): |
866 element = oldmemo.etree.serialize_bundle(bundle) | 866 element = oldmemo.etree.serialize_bundle(bundle) |
867 | 867 |
868 node = f"eu.siacs.conversations.axolotl.bundles:{bundle.device_id}" | 868 node = f"eu.siacs.conversations.axolotl.bundles:{bundle.device_id}" |
869 try: | 869 try: |
870 await xep_0060.sendItem( | 870 await xep_0060.send_item( |
871 client, | 871 client, |
872 client.jid.userhostJID(), | 872 client.jid.userhostJID(), |
873 node, | 873 node, |
874 xml_tools.et_elt_2_domish_elt(element), | 874 xml_tools.et_elt_2_domish_elt(element), |
875 item_id=xep_0060.ID_SINGLETON, | 875 item_id=xep_0060.ID_SINGLETON, |
895 ) -> omemo.Bundle: | 895 ) -> omemo.Bundle: |
896 if namespace == twomemo.twomemo.NAMESPACE: | 896 if namespace == twomemo.twomemo.NAMESPACE: |
897 node = "urn:xmpp:omemo:2:bundles" | 897 node = "urn:xmpp:omemo:2:bundles" |
898 | 898 |
899 try: | 899 try: |
900 items, __ = await xep_0060.getItems( | 900 items, __ = await xep_0060.get_items( |
901 client, | 901 client, |
902 jid.JID(bare_jid), | 902 jid.JID(bare_jid), |
903 node, | 903 node, |
904 item_ids=[ str(device_id) ] | 904 item_ids=[ str(device_id) ] |
905 ) | 905 ) |
949 async def _delete_bundle(namespace: str, device_id: int) -> None: | 949 async def _delete_bundle(namespace: str, device_id: int) -> None: |
950 if namespace == twomemo.twomemo.NAMESPACE: | 950 if namespace == twomemo.twomemo.NAMESPACE: |
951 node = "urn:xmpp:omemo:2:bundles" | 951 node = "urn:xmpp:omemo:2:bundles" |
952 | 952 |
953 try: | 953 try: |
954 await xep_0060.retractItems( | 954 await xep_0060.retract_items( |
955 client, | 955 client, |
956 client.jid.userhostJID(), | 956 client.jid.userhostJID(), |
957 node, | 957 node, |
958 [ str(device_id) ], | 958 [ str(device_id) ], |
959 notify=False | 959 notify=False |
998 | 998 |
999 if element is None or node is None: | 999 if element is None or node is None: |
1000 raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}") | 1000 raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}") |
1001 | 1001 |
1002 try: | 1002 try: |
1003 await xep_0060.sendItem( | 1003 await xep_0060.send_item( |
1004 client, | 1004 client, |
1005 client.jid.userhostJID(), | 1005 client.jid.userhostJID(), |
1006 node, | 1006 node, |
1007 xml_tools.et_elt_2_domish_elt(element), | 1007 xml_tools.et_elt_2_domish_elt(element), |
1008 item_id=xep_0060.ID_SINGLETON, | 1008 item_id=xep_0060.ID_SINGLETON, |
1048 | 1048 |
1049 if node is None: | 1049 if node is None: |
1050 raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}") | 1050 raise omemo.UnknownNamespace(f"Unknown namespace: {namespace}") |
1051 | 1051 |
1052 try: | 1052 try: |
1053 items, __ = await xep_0060.getItems(client, jid.JID(bare_jid), node) | 1053 items, __ = await xep_0060.get_items(client, jid.JID(bare_jid), node) |
1054 except exceptions.NotFound: | 1054 except exceptions.NotFound: |
1055 return {} | 1055 return {} |
1056 except Exception as e: | 1056 except Exception as e: |
1057 raise omemo.DeviceListDownloadFailed( | 1057 raise omemo.DeviceListDownloadFailed( |
1058 f"Device list download failed for {bare_jid} under namespace" | 1058 f"Device list download failed for {bare_jid} under namespace" |
1115 | 1115 |
1116 # The blindly trusted case is more complicated, since its evaluation depends | 1116 # The blindly trusted case is more complicated, since its evaluation depends |
1117 # on the trust system and phase | 1117 # on the trust system and phase |
1118 if trust_level is TrustLevel.BLINDLY_TRUSTED: | 1118 if trust_level is TrustLevel.BLINDLY_TRUSTED: |
1119 # Get the name of the active trust system | 1119 # Get the name of the active trust system |
1120 trust_system = cast(str, sat.memory.getParamA( | 1120 trust_system = cast(str, sat.memory.param_get_a( |
1121 PARAM_NAME, | 1121 PARAM_NAME, |
1122 PARAM_CATEGORY, | 1122 PARAM_CATEGORY, |
1123 profile_key=profile | 1123 profile_key=profile |
1124 )) | 1124 )) |
1125 | 1125 |
1245 element = oldmemo.etree.serialize_message(message) | 1245 element = oldmemo.etree.serialize_message(message) |
1246 | 1246 |
1247 if element is None: | 1247 if element is None: |
1248 raise omemo.UnknownNamespace(f"Unknown namespace: {message.namespace}") | 1248 raise omemo.UnknownNamespace(f"Unknown namespace: {message.namespace}") |
1249 | 1249 |
1250 message_data = client.generateMessageXML(MessageData({ | 1250 message_data = client.generate_message_xml(MessageData({ |
1251 "from": client.jid, | 1251 "from": client.jid, |
1252 "to": jid.JID(bare_jid), | 1252 "to": jid.JID(bare_jid), |
1253 "uid": str(uuid.uuid4()), | 1253 "uid": str(uuid.uuid4()), |
1254 "message": {}, | 1254 "message": {}, |
1255 "subject": {}, | 1255 "subject": {}, |
1280 one message, the recipient JID. In case of a MUC message, the room JID. | 1280 one message, the recipient JID. In case of a MUC message, the room JID. |
1281 @raise TrustDecisionFailed: if the user cancels the prompt. | 1281 @raise TrustDecisionFailed: if the user cancels the prompt. |
1282 """ | 1282 """ |
1283 | 1283 |
1284 # This session manager handles encryption with both twomemo and oldmemo, but | 1284 # This session manager handles encryption with both twomemo and oldmemo, but |
1285 # both are currently registered as different plugins and the `deferXMLUI` | 1285 # both are currently registered as different plugins and the `defer_xmlui` |
1286 # below requires a single namespace identifying the encryption plugin. Thus, | 1286 # below requires a single namespace identifying the encryption plugin. Thus, |
1287 # get the namespace of the requested encryption method from the encryption | 1287 # get the namespace of the requested encryption method from the encryption |
1288 # session using the feedback JID. | 1288 # session using the feedback JID. |
1289 encryption = client.encryption.getSession(feedback_jid) | 1289 encryption = client.encryption.getSession(feedback_jid) |
1290 if encryption is None: | 1290 if encryption is None: |
1311 "yourself. Do *not* validate a device if the fingerprint is wrong!" | 1311 "yourself. Do *not* validate a device if the fingerprint is wrong!" |
1312 )) | 1312 )) |
1313 | 1313 |
1314 own_device, __ = await self.get_own_device_information() | 1314 own_device, __ = await self.get_own_device_information() |
1315 | 1315 |
1316 trust_ui.changeContainer("label") | 1316 trust_ui.change_container("label") |
1317 trust_ui.addLabel(D_("This device ID")) | 1317 trust_ui.addLabel(D_("This device ID")) |
1318 trust_ui.addText(str(own_device.device_id)) | 1318 trust_ui.addText(str(own_device.device_id)) |
1319 trust_ui.addLabel(D_("This device's fingerprint")) | 1319 trust_ui.addLabel(D_("This device's fingerprint")) |
1320 trust_ui.addText(" ".join(self.format_identity_key(own_device.identity_key))) | 1320 trust_ui.addText(" ".join(self.format_identity_key(own_device.identity_key))) |
1321 trust_ui.addEmpty() | 1321 trust_ui.addEmpty() |
1331 trust_ui.addLabel(D_("Device ID")) | 1331 trust_ui.addLabel(D_("Device ID")) |
1332 trust_ui.addText(str(device.device_id)) | 1332 trust_ui.addText(str(device.device_id)) |
1333 trust_ui.addLabel(D_("Fingerprint")) | 1333 trust_ui.addLabel(D_("Fingerprint")) |
1334 trust_ui.addText(" ".join(self.format_identity_key(device.identity_key))) | 1334 trust_ui.addText(" ".join(self.format_identity_key(device.identity_key))) |
1335 trust_ui.addLabel(D_("Trust this device?")) | 1335 trust_ui.addLabel(D_("Trust this device?")) |
1336 trust_ui.addBool(f"trust_{index}", value=C.boolConst(False)) | 1336 trust_ui.addBool(f"trust_{index}", value=C.bool_const(False)) |
1337 trust_ui.addEmpty() | 1337 trust_ui.addEmpty() |
1338 trust_ui.addEmpty() | 1338 trust_ui.addEmpty() |
1339 | 1339 |
1340 trust_ui_result = await xml_tools.deferXMLUI( | 1340 trust_ui_result = await xml_tools.defer_xmlui( |
1341 sat, | 1341 sat, |
1342 trust_ui, | 1342 trust_ui, |
1343 action_extra={ "meta_encryption_trust": namespace }, | 1343 action_extra={ "meta_encryption_trust": namespace }, |
1344 profile=profile | 1344 profile=profile |
1345 ) | 1345 ) |
1346 | 1346 |
1347 if C.bool(trust_ui_result.get("cancelled", "false")): | 1347 if C.bool(trust_ui_result.get("cancelled", "false")): |
1348 raise omemo.TrustDecisionFailed("Trust UI cancelled.") | 1348 raise omemo.TrustDecisionFailed("Trust UI cancelled.") |
1349 | 1349 |
1350 data_form_result = cast(Dict[str, str], xml_tools.XMLUIResult2DataFormResult( | 1350 data_form_result = cast(Dict[str, str], xml_tools.xmlui_result_2_data_form_result( |
1351 trust_ui_result | 1351 trust_ui_result |
1352 )) | 1352 )) |
1353 | 1353 |
1354 trust_updates: Set[TrustUpdate] = set() | 1354 trust_updates: Set[TrustUpdate] = set() |
1355 | 1355 |
1373 target_key=device.identity_key, | 1373 target_key=device.identity_key, |
1374 target_trust=target_trust | 1374 target_trust=target_trust |
1375 )) | 1375 )) |
1376 | 1376 |
1377 # Check whether ATM is enabled and handle everything in case it is | 1377 # Check whether ATM is enabled and handle everything in case it is |
1378 trust_system = cast(str, sat.memory.getParamA( | 1378 trust_system = cast(str, sat.memory.param_get_a( |
1379 PARAM_NAME, | 1379 PARAM_NAME, |
1380 PARAM_CATEGORY, | 1380 PARAM_CATEGORY, |
1381 profile_key=profile | 1381 profile_key=profile |
1382 )) | 1382 )) |
1383 | 1383 |
1432 :meth:`~omemo.session_manager.SessionManager.create`. | 1432 :meth:`~omemo.session_manager.SessionManager.create`. |
1433 @raise DeviceListDownloadFailed: if a device list download failed. Forwarded from | 1433 @raise DeviceListDownloadFailed: if a device list download failed. Forwarded from |
1434 :meth:`~omemo.session_manager.SessionManager.create`. | 1434 :meth:`~omemo.session_manager.SessionManager.create`. |
1435 """ | 1435 """ |
1436 | 1436 |
1437 client = sat.getClient(profile) | 1437 client = sat.get_client(profile) |
1438 xep_0060 = cast(XEP_0060, sat.plugins["XEP-0060"]) | 1438 xep_0060 = cast(XEP_0060, sat.plugins["XEP-0060"]) |
1439 | 1439 |
1440 storage = StorageImpl(profile) | 1440 storage = StorageImpl(profile) |
1441 | 1441 |
1442 # TODO: Untested | 1442 # TODO: Untested |
1547 | 1547 |
1548 self.__sat = sat | 1548 self.__sat = sat |
1549 | 1549 |
1550 # Add configuration option to choose between manual trust and BTBV as the trust | 1550 # Add configuration option to choose between manual trust and BTBV as the trust |
1551 # model | 1551 # model |
1552 sat.memory.updateParams(DEFAULT_TRUST_MODEL_PARAM) | 1552 sat.memory.update_params(DEFAULT_TRUST_MODEL_PARAM) |
1553 | 1553 |
1554 # Plugins | 1554 # Plugins |
1555 self.__xep_0045 = cast(Optional[XEP_0045], sat.plugins.get("XEP-0045")) | 1555 self.__xep_0045 = cast(Optional[XEP_0045], sat.plugins.get("XEP-0045")) |
1556 self.__xep_0334 = cast(XEP_0334, sat.plugins["XEP-0334"]) | 1556 self.__xep_0334 = cast(XEP_0334, sat.plugins["XEP-0334"]) |
1557 self.__xep_0359 = cast(Optional[XEP_0359], sat.plugins.get("XEP-0359")) | 1557 self.__xep_0359 = cast(Optional[XEP_0359], sat.plugins.get("XEP-0359")) |
1583 "messageReceived", | 1583 "messageReceived", |
1584 self._message_received_trigger, | 1584 self._message_received_trigger, |
1585 priority=100050 | 1585 priority=100050 |
1586 ) | 1586 ) |
1587 sat.trigger.add( | 1587 sat.trigger.add( |
1588 "sendMessageData", | 1588 "send_message_data", |
1589 self.__send_message_data_trigger, | 1589 self.__send_message_data_trigger, |
1590 priority=100050 | 1590 priority=100050 |
1591 ) | 1591 ) |
1592 | 1592 |
1593 # These triggers are used by twomemo, which does do SCE | 1593 # These triggers are used by twomemo, which does do SCE |
1594 sat.trigger.add("send", self.__send_trigger, priority=0) | 1594 sat.trigger.add("send", self.__send_trigger, priority=0) |
1595 # TODO: Add new triggers here for freshly received and about-to-be-sent stanzas, | 1595 # TODO: Add new triggers here for freshly received and about-to-be-sent stanzas, |
1596 # including IQs. | 1596 # including IQs. |
1597 | 1597 |
1598 # Give twomemo a (slightly) higher priority than oldmemo | 1598 # Give twomemo a (slightly) higher priority than oldmemo |
1599 sat.registerEncryptionPlugin(self, "TWOMEMO", twomemo.twomemo.NAMESPACE, 101) | 1599 sat.register_encryption_plugin(self, "TWOMEMO", twomemo.twomemo.NAMESPACE, 101) |
1600 sat.registerEncryptionPlugin(self, "OLDMEMO", oldmemo.oldmemo.NAMESPACE, 100) | 1600 sat.register_encryption_plugin(self, "OLDMEMO", oldmemo.oldmemo.NAMESPACE, 100) |
1601 | 1601 |
1602 xep_0163 = cast(XEP_0163, sat.plugins["XEP-0163"]) | 1602 xep_0163 = cast(XEP_0163, sat.plugins["XEP-0163"]) |
1603 xep_0163.addPEPEvent( | 1603 xep_0163.add_pep_event( |
1604 "TWOMEMO_DEVICES", | 1604 "TWOMEMO_DEVICES", |
1605 TWOMEMO_DEVICE_LIST_NODE, | 1605 TWOMEMO_DEVICE_LIST_NODE, |
1606 lambda items_event, profile: defer.ensureDeferred( | 1606 lambda items_event, profile: defer.ensureDeferred( |
1607 self.__on_device_list_update(items_event, profile) | 1607 self.__on_device_list_update(items_event, profile) |
1608 ) | 1608 ) |
1609 ) | 1609 ) |
1610 xep_0163.addPEPEvent( | 1610 xep_0163.add_pep_event( |
1611 "OLDMEMO_DEVICES", | 1611 "OLDMEMO_DEVICES", |
1612 OLDMEMO_DEVICE_LIST_NODE, | 1612 OLDMEMO_DEVICE_LIST_NODE, |
1613 lambda items_event, profile: defer.ensureDeferred( | 1613 lambda items_event, profile: defer.ensureDeferred( |
1614 self.__on_device_list_update(items_event, profile) | 1614 self.__on_device_list_update(items_event, profile) |
1615 ) | 1615 ) |
1618 try: | 1618 try: |
1619 self.__text_commands = cast(TextCommands, sat.plugins[C.TEXT_CMDS]) | 1619 self.__text_commands = cast(TextCommands, sat.plugins[C.TEXT_CMDS]) |
1620 except KeyError: | 1620 except KeyError: |
1621 log.info(_("Text commands not available")) | 1621 log.info(_("Text commands not available")) |
1622 else: | 1622 else: |
1623 self.__text_commands.registerTextCommands(self) | 1623 self.__text_commands.register_text_commands(self) |
1624 | 1624 |
1625 def profileConnected( # pylint: disable=invalid-name | 1625 def profile_connected( # pylint: disable=invalid-name |
1626 self, | 1626 self, |
1627 client: SatXMPPClient | 1627 client: SatXMPPClient |
1628 ) -> None: | 1628 ) -> None: |
1629 """ | 1629 """ |
1630 @param client: The client. | 1630 @param client: The client. |
1651 @return: The constant value ``False``, indicating to the text commands plugin that | 1651 @return: The constant value ``False``, indicating to the text commands plugin that |
1652 the message is not supposed to be sent. | 1652 the message is not supposed to be sent. |
1653 """ | 1653 """ |
1654 | 1654 |
1655 twomemo_requested = \ | 1655 twomemo_requested = \ |
1656 client.encryption.isEncryptionRequested(mess_data, twomemo.twomemo.NAMESPACE) | 1656 client.encryption.is_encryption_requested(mess_data, twomemo.twomemo.NAMESPACE) |
1657 oldmemo_requested = \ | 1657 oldmemo_requested = \ |
1658 client.encryption.isEncryptionRequested(mess_data, oldmemo.oldmemo.NAMESPACE) | 1658 client.encryption.is_encryption_requested(mess_data, oldmemo.oldmemo.NAMESPACE) |
1659 | 1659 |
1660 if not (twomemo_requested or oldmemo_requested): | 1660 if not (twomemo_requested or oldmemo_requested): |
1661 self.__text_commands.feedBack( | 1661 self.__text_commands.feed_back( |
1662 client, | 1662 client, |
1663 _("You need to have OMEMO encryption activated to reset the session"), | 1663 _("You need to have OMEMO encryption activated to reset the session"), |
1664 mess_data | 1664 mess_data |
1665 ) | 1665 ) |
1666 return False | 1666 return False |
1672 | 1672 |
1673 for device in devices: | 1673 for device in devices: |
1674 log.debug(f"Replacing sessions with device {device}") | 1674 log.debug(f"Replacing sessions with device {device}") |
1675 await session_manager.replace_sessions(device) | 1675 await session_manager.replace_sessions(device) |
1676 | 1676 |
1677 self.__text_commands.feedBack( | 1677 self.__text_commands.feed_back( |
1678 client, | 1678 client, |
1679 _("OMEMO session has been reset"), | 1679 _("OMEMO session has been reset"), |
1680 mess_data | 1680 mess_data |
1681 ) | 1681 ) |
1682 | 1682 |
1683 return False | 1683 return False |
1684 | 1684 |
1685 async def getTrustUI( # pylint: disable=invalid-name | 1685 async def get_trust_ui( # pylint: disable=invalid-name |
1686 self, | 1686 self, |
1687 client: SatXMPPClient, | 1687 client: SatXMPPClient, |
1688 entity: jid.JID | 1688 entity: jid.JID |
1689 ) -> xml_tools.XMLUI: | 1689 ) -> xml_tools.XMLUI: |
1690 """ | 1690 """ |
1696 | 1696 |
1697 if entity.resource: | 1697 if entity.resource: |
1698 raise ValueError("A bare JID is expected.") | 1698 raise ValueError("A bare JID is expected.") |
1699 | 1699 |
1700 bare_jids: Set[str] | 1700 bare_jids: Set[str] |
1701 if self.__xep_0045 is not None and self.__xep_0045.isJoinedRoom(client, entity): | 1701 if self.__xep_0045 is not None and self.__xep_0045.is_joined_room(client, entity): |
1702 bare_jids = self.__get_joined_muc_users(client, self.__xep_0045, entity) | 1702 bare_jids = self.__get_joined_muc_users(client, self.__xep_0045, entity) |
1703 else: | 1703 else: |
1704 bare_jids = { entity.userhost() } | 1704 bare_jids = { entity.userhost() } |
1705 | 1705 |
1706 session_manager = await self.get_session_manager(client.profile) | 1706 session_manager = await self.get_session_manager(client.profile) |
1727 if C.bool(data.get("cancelled", "false")): | 1727 if C.bool(data.get("cancelled", "false")): |
1728 return {} | 1728 return {} |
1729 | 1729 |
1730 data_form_result = cast( | 1730 data_form_result = cast( |
1731 Dict[str, str], | 1731 Dict[str, str], |
1732 xml_tools.XMLUIResult2DataFormResult(data) | 1732 xml_tools.xmlui_result_2_data_form_result(data) |
1733 ) | 1733 ) |
1734 | 1734 |
1735 trust_updates: Set[TrustUpdate] = set() | 1735 trust_updates: Set[TrustUpdate] = set() |
1736 | 1736 |
1737 for key, value in data_form_result.items(): | 1737 for key, value in data_form_result.items(): |
1761 target_key=device.identity_key, | 1761 target_key=device.identity_key, |
1762 target_trust=target_trust | 1762 target_trust=target_trust |
1763 )) | 1763 )) |
1764 | 1764 |
1765 # Check whether ATM is enabled and handle everything in case it is | 1765 # Check whether ATM is enabled and handle everything in case it is |
1766 trust_system = cast(str, self.__sat.memory.getParamA( | 1766 trust_system = cast(str, self.__sat.memory.param_get_a( |
1767 PARAM_NAME, | 1767 PARAM_NAME, |
1768 PARAM_CATEGORY, | 1768 PARAM_CATEGORY, |
1769 profile_key=profile | 1769 profile_key=profile |
1770 )) | 1770 )) |
1771 | 1771 |
1783 frozenset(trust_updates) | 1783 frozenset(trust_updates) |
1784 ) | 1784 ) |
1785 | 1785 |
1786 return {} | 1786 return {} |
1787 | 1787 |
1788 submit_id = self.__sat.registerCallback(callback, with_data=True, one_shot=True) | 1788 submit_id = self.__sat.register_callback(callback, with_data=True, one_shot=True) |
1789 | 1789 |
1790 result = xml_tools.XMLUI( | 1790 result = xml_tools.XMLUI( |
1791 panel_type=C.XMLUI_FORM, | 1791 panel_type=C.XMLUI_FORM, |
1792 title=D_("OMEMO trust management"), | 1792 title=D_("OMEMO trust management"), |
1793 submit_id=submit_id | 1793 submit_id=submit_id |
1807 " trust." | 1807 " trust." |
1808 )) | 1808 )) |
1809 | 1809 |
1810 own_device, __ = await session_manager.get_own_device_information() | 1810 own_device, __ = await session_manager.get_own_device_information() |
1811 | 1811 |
1812 trust_ui.changeContainer("label") | 1812 trust_ui.change_container("label") |
1813 trust_ui.addLabel(D_("This device ID")) | 1813 trust_ui.addLabel(D_("This device ID")) |
1814 trust_ui.addText(str(own_device.device_id)) | 1814 trust_ui.addText(str(own_device.device_id)) |
1815 trust_ui.addLabel(D_("This device's fingerprint")) | 1815 trust_ui.addLabel(D_("This device's fingerprint")) |
1816 trust_ui.addText(" ".join(session_manager.format_identity_key( | 1816 trust_ui.addText(" ".join(session_manager.format_identity_key( |
1817 own_device.identity_key | 1817 own_device.identity_key |
1878 """ | 1878 """ |
1879 | 1879 |
1880 bare_jids: Set[str] = set() | 1880 bare_jids: Set[str] = set() |
1881 | 1881 |
1882 try: | 1882 try: |
1883 room = cast(muc.Room, xep_0045.getRoom(client, room_jid)) | 1883 room = cast(muc.Room, xep_0045.get_room(client, room_jid)) |
1884 except exceptions.NotFound as e: | 1884 except exceptions.NotFound as e: |
1885 raise exceptions.InternalError( | 1885 raise exceptions.InternalError( |
1886 "Participant list of unjoined MUC requested." | 1886 "Participant list of unjoined MUC requested." |
1887 ) from e | 1887 ) from e |
1888 | 1888 |
2132 return True | 2132 return True |
2133 | 2133 |
2134 room_jid = feedback_jid = sender_jid.userhostJID() | 2134 room_jid = feedback_jid = sender_jid.userhostJID() |
2135 | 2135 |
2136 try: | 2136 try: |
2137 room = cast(muc.Room, self.__xep_0045.getRoom(client, room_jid)) | 2137 room = cast(muc.Room, self.__xep_0045.get_room(client, room_jid)) |
2138 except exceptions.NotFound: | 2138 except exceptions.NotFound: |
2139 log.warning( | 2139 log.warning( |
2140 f"Ignoring MUC message from a room that has not been joined:" | 2140 f"Ignoring MUC message from a room that has not been joined:" |
2141 f" {room_jid}" | 2141 f" {room_jid}" |
2142 ) | 2142 ) |
2163 | 2163 |
2164 sender_jid = sender_user_jid | 2164 sender_jid = sender_user_jid |
2165 | 2165 |
2166 message_uid: Optional[str] = None | 2166 message_uid: Optional[str] = None |
2167 if self.__xep_0359 is not None: | 2167 if self.__xep_0359 is not None: |
2168 message_uid = self.__xep_0359.getOriginId(message_elt) | 2168 message_uid = self.__xep_0359.get_origin_id(message_elt) |
2169 if message_uid is None: | 2169 if message_uid is None: |
2170 message_uid = message_elt.getAttribute("id") | 2170 message_uid = message_elt.getAttribute("id") |
2171 if message_uid is not None: | 2171 if message_uid is not None: |
2172 muc_plaintext_cache_key = MUCPlaintextCacheKey( | 2172 muc_plaintext_cache_key = MUCPlaintextCacheKey( |
2173 client, | 2173 client, |
2356 # Mark the message as trusted or untrusted. Undecided counts as untrusted here. | 2356 # Mark the message as trusted or untrusted. Undecided counts as untrusted here. |
2357 trust_level = \ | 2357 trust_level = \ |
2358 await session_manager._evaluate_custom_trust_level(device_information) | 2358 await session_manager._evaluate_custom_trust_level(device_information) |
2359 | 2359 |
2360 if trust_level is omemo.TrustLevel.TRUSTED: | 2360 if trust_level is omemo.TrustLevel.TRUSTED: |
2361 post_treat.addCallback(client.encryption.markAsTrusted) | 2361 post_treat.addCallback(client.encryption.mark_as_trusted) |
2362 else: | 2362 else: |
2363 post_treat.addCallback(client.encryption.markAsUntrusted) | 2363 post_treat.addCallback(client.encryption.mark_as_untrusted) |
2364 | 2364 |
2365 # Mark the message as originally encrypted | 2365 # Mark the message as originally encrypted |
2366 post_treat.addCallback( | 2366 post_treat.addCallback( |
2367 client.encryption.markAsEncrypted, | 2367 client.encryption.mark_as_encrypted, |
2368 namespace=message.namespace | 2368 namespace=message.namespace |
2369 ) | 2369 ) |
2370 | 2370 |
2371 # Handle potential ATM trust updates | 2371 # Handle potential ATM trust updates |
2372 if affix_values is not None and affix_values.timestamp is not None: | 2372 if affix_values is not None and affix_values.timestamp is not None: |
2430 stanza.getAttribute("id", None) | 2430 stanza.getAttribute("id", None) |
2431 ) | 2431 ) |
2432 | 2432 |
2433 # Add a store hint if this is a message stanza | 2433 # Add a store hint if this is a message stanza |
2434 if stanza.name == "message": | 2434 if stanza.name == "message": |
2435 self.__xep_0334.addHintElements(stanza, [ "store" ]) | 2435 self.__xep_0334.add_hint_elements(stanza, [ "store" ]) |
2436 | 2436 |
2437 # Let the flow continue. | 2437 # Let the flow continue. |
2438 return True | 2438 return True |
2439 | 2439 |
2440 async def __send_message_data_trigger( | 2440 async def __send_message_data_trigger( |
2471 is_muc_message, | 2471 is_muc_message, |
2472 stanza_id | 2472 stanza_id |
2473 ) | 2473 ) |
2474 | 2474 |
2475 # Add a store hint | 2475 # Add a store hint |
2476 self.__xep_0334.addHintElements(stanza, [ "store" ]) | 2476 self.__xep_0334.add_hint_elements(stanza, [ "store" ]) |
2477 | 2477 |
2478 async def encrypt( | 2478 async def encrypt( |
2479 self, | 2479 self, |
2480 client: SatXMPPClient, | 2480 client: SatXMPPClient, |
2481 namespace: Literal["urn:xmpp:omemo:2", "eu.siacs.conversations.axolotl"], | 2481 namespace: Literal["urn:xmpp:omemo:2", "eu.siacs.conversations.axolotl"], |