Mercurial > libervia-backend
comparison sat/plugins/plugin_xep_0373.py @ 3960:4836b81c5f31
plugin XEP-0373: minor renaming + set `gpg_provider` in client:
- `sat` and `self.__sat` have been renamed to `host` and `self.host` for consistency with
other plugins.
- `gpg_provider` is set in client while profile is connecting, so the instance is easily
accessible to plugins depending on XEP-0373
rel 381
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 30 Oct 2022 01:06:32 +0200 |
parents | 1ab16449577b |
children | 524856bd7b19 |
comparison
equal
deleted
inserted
replaced
3959:da0e772881c3 | 3960:4836b81c5f31 |
---|---|
1002 class XEP_0373: | 1002 class XEP_0373: |
1003 """ | 1003 """ |
1004 Implementation of XEP-0373: OpenPGP for XMPP under namespace ``urn:xmpp:openpgp:0``. | 1004 Implementation of XEP-0373: OpenPGP for XMPP under namespace ``urn:xmpp:openpgp:0``. |
1005 """ | 1005 """ |
1006 | 1006 |
1007 def __init__(self, sat: SAT) -> None: | 1007 def __init__(self, host: SAT) -> None: |
1008 """ | 1008 """ |
1009 @param sat: The SAT instance. | 1009 @param sat: The SAT instance. |
1010 """ | 1010 """ |
1011 | 1011 |
1012 self.__sat = sat | 1012 self.host = host |
1013 | 1013 |
1014 # Add configuration option to choose between manual trust and BTBV as the trust | 1014 # Add configuration option to choose between manual trust and BTBV as the trust |
1015 # model | 1015 # model |
1016 sat.memory.updateParams(DEFAULT_TRUST_MODEL_PARAM) | 1016 host.memory.updateParams(DEFAULT_TRUST_MODEL_PARAM) |
1017 | 1017 |
1018 self.__xep_0045 = cast(Optional[XEP_0045], sat.plugins.get("XEP-0045")) | 1018 self.__xep_0045 = cast(Optional[XEP_0045], host.plugins.get("XEP-0045")) |
1019 self.__xep_0060 = cast(XEP_0060, sat.plugins["XEP-0060"]) | 1019 self.__xep_0060 = cast(XEP_0060, host.plugins["XEP-0060"]) |
1020 | 1020 |
1021 self.__storage: Dict[str, persistent.LazyPersistentBinaryDict] = {} | 1021 self.__storage: Dict[str, persistent.LazyPersistentBinaryDict] = {} |
1022 | 1022 |
1023 xep_0163 = cast(XEP_0163, sat.plugins["XEP-0163"]) | 1023 xep_0163 = cast(XEP_0163, host.plugins["XEP-0163"]) |
1024 xep_0163.addPEPEvent( | 1024 xep_0163.addPEPEvent( |
1025 "OX_PUBLIC_KEYS_LIST", | 1025 "OX_PUBLIC_KEYS_LIST", |
1026 PUBLIC_KEYS_LIST_NODE, | 1026 PUBLIC_KEYS_LIST_NODE, |
1027 lambda items_event, profile: defer.ensureDeferred( | 1027 lambda items_event, profile: defer.ensureDeferred( |
1028 self.__on_public_keys_list_update(items_event, profile) | 1028 self.__on_public_keys_list_update(items_event, profile) |
1029 ) | 1029 ) |
1030 ) | 1030 ) |
1031 | 1031 |
1032 async def profileConnecting(self, client): | |
1033 client.gpg_provider = get_gpg_provider(self.host, client) | |
1034 | |
1032 async def profileConnected( # pylint: disable=invalid-name | 1035 async def profileConnected( # pylint: disable=invalid-name |
1033 self, | 1036 self, |
1034 client: SatXMPPClient | 1037 client: SatXMPPClient |
1035 ) -> None: | 1038 ) -> None: |
1036 """ | 1039 """ |
1056 | 1059 |
1057 @param items_event: The event. | 1060 @param items_event: The event. |
1058 @param profile: The profile this event belongs to. | 1061 @param profile: The profile this event belongs to. |
1059 """ | 1062 """ |
1060 | 1063 |
1061 client = self.__sat.getClient(profile) | 1064 client = self.host.getClient(profile) |
1062 | 1065 |
1063 sender = cast(jid.JID, items_event.sender) | 1066 sender = cast(jid.JID, items_event.sender) |
1064 items = cast(List[domish.Element], items_event.items) | 1067 items = cast(List[domish.Element], items_event.items) |
1065 | 1068 |
1066 if len(items) > 1: | 1069 if len(items) > 1: |
1154 @param client: The client to perform this operation with. | 1157 @param client: The client to perform this operation with. |
1155 @param jid: The JID. Can be a bare JID. | 1158 @param jid: The JID. Can be a bare JID. |
1156 @return: The set of public keys available for this JID. | 1159 @return: The set of public keys available for this JID. |
1157 """ | 1160 """ |
1158 | 1161 |
1159 gpg_provider = get_gpg_provider(self.__sat, client) | 1162 gpg_provider = get_gpg_provider(self.host, client) |
1160 | 1163 |
1161 return gpg_provider.list_public_keys(f"xmpp:{jid.userhost()}") | 1164 return gpg_provider.list_public_keys(f"xmpp:{jid.userhost()}") |
1162 | 1165 |
1163 def list_secret_keys(self, client: SatXMPPClient) -> Set[GPGSecretKey]: | 1166 def list_secret_keys(self, client: SatXMPPClient) -> Set[GPGSecretKey]: |
1164 """List GPG secret keys available for a JID. | 1167 """List GPG secret keys available for a JID. |
1165 | 1168 |
1166 @param client: The client to perform this operation with. | 1169 @param client: The client to perform this operation with. |
1167 @return: The set of secret keys available for this JID. | 1170 @return: The set of secret keys available for this JID. |
1168 """ | 1171 """ |
1169 | 1172 |
1170 gpg_provider = get_gpg_provider(self.__sat, client) | 1173 gpg_provider = get_gpg_provider(self.host, client) |
1171 | 1174 |
1172 return gpg_provider.list_secret_keys(f"xmpp:{client.jid.userhost()}") | 1175 return gpg_provider.list_secret_keys(f"xmpp:{client.jid.userhost()}") |
1173 | 1176 |
1174 async def create_key(self, client: SatXMPPClient) -> GPGSecretKey: | 1177 async def create_key(self, client: SatXMPPClient) -> GPGSecretKey: |
1175 """Create a new GPG key, capable of signing and encryption. | 1178 """Create a new GPG key, capable of signing and encryption. |
1178 | 1181 |
1179 @param client: The client to perform this operation with. | 1182 @param client: The client to perform this operation with. |
1180 @return: The new key. | 1183 @return: The new key. |
1181 """ | 1184 """ |
1182 | 1185 |
1183 gpg_provider = get_gpg_provider(self.__sat, client) | 1186 gpg_provider = get_gpg_provider(self.host, client) |
1184 | 1187 |
1185 secret_key = gpg_provider.create_key(f"xmpp:{client.jid.userhost()}") | 1188 secret_key = gpg_provider.create_key(f"xmpp:{client.jid.userhost()}") |
1186 | 1189 |
1187 await self.publish_public_key(client, secret_key.public_key) | 1190 await self.publish_public_key(client, secret_key.public_key) |
1188 | 1191 |
1306 @param content_elt: The content element to contain in the ``<openpgp/>`` element. | 1309 @param content_elt: The content element to contain in the ``<openpgp/>`` element. |
1307 @param recipient_jids: The recipient's JIDs. Can be bare JIDs. | 1310 @param recipient_jids: The recipient's JIDs. Can be bare JIDs. |
1308 @return: The ``<openpgp/>`` element. | 1311 @return: The ``<openpgp/>`` element. |
1309 """ | 1312 """ |
1310 | 1313 |
1311 gpg_provider = get_gpg_provider(self.__sat, client) | 1314 gpg_provider = get_gpg_provider(self.host, client) |
1312 | 1315 |
1313 # TODO: I'm not sure whether we want to sign with all keys by default or choose | 1316 # TODO: I'm not sure whether we want to sign with all keys by default or choose |
1314 # just one key/a subset of keys to sign with. | 1317 # just one key/a subset of keys to sign with. |
1315 signing_keys = set(filter( | 1318 signing_keys = set(filter( |
1316 lambda secret_key: gpg_provider.can_sign(secret_key.public_key), | 1319 lambda secret_key: gpg_provider.can_sign(secret_key.public_key), |
1367 | 1370 |
1368 @warning: The timestamp is not verified for plausibility; this SHOULD be done by | 1371 @warning: The timestamp is not verified for plausibility; this SHOULD be done by |
1369 the calling code. | 1372 the calling code. |
1370 """ | 1373 """ |
1371 | 1374 |
1372 gpg_provider = get_gpg_provider(self.__sat, client) | 1375 gpg_provider = get_gpg_provider(self.host, client) |
1373 | 1376 |
1374 decryption_keys = set(filter( | 1377 decryption_keys = set(filter( |
1375 lambda secret_key: gpg_provider.can_encrypt(secret_key.public_key), | 1378 lambda secret_key: gpg_provider.can_encrypt(secret_key.public_key), |
1376 self.list_secret_keys(client) | 1379 self.list_secret_keys(client) |
1377 )) | 1380 )) |
1455 @param client: The client. | 1458 @param client: The client. |
1456 @param public_key: The public key to publish. | 1459 @param public_key: The public key to publish. |
1457 @raise XMPPInteractionFailed: if any interaction via XMPP failed. | 1460 @raise XMPPInteractionFailed: if any interaction via XMPP failed. |
1458 """ | 1461 """ |
1459 | 1462 |
1460 gpg_provider = get_gpg_provider(self.__sat, client) | 1463 gpg_provider = get_gpg_provider(self.host, client) |
1461 | 1464 |
1462 packet = gpg_provider.export_public_key(public_key) | 1465 packet = gpg_provider.export_public_key(public_key) |
1463 | 1466 |
1464 node = f"urn:xmpp:openpgp:0:public-keys:{public_key.fingerprint}" | 1467 node = f"urn:xmpp:openpgp:0:public-keys:{public_key.fingerprint}" |
1465 | 1468 |
1559 @raise InvalidPacket: if the packet is either syntactically or semantically deemed | 1562 @raise InvalidPacket: if the packet is either syntactically or semantically deemed |
1560 invalid. | 1563 invalid. |
1561 @raise XMPPInteractionFailed: if any interaction via XMPP failed. | 1564 @raise XMPPInteractionFailed: if any interaction via XMPP failed. |
1562 """ | 1565 """ |
1563 | 1566 |
1564 gpg_provider = get_gpg_provider(self.__sat, client) | 1567 gpg_provider = get_gpg_provider(self.host, client) |
1565 | 1568 |
1566 node = f"urn:xmpp:openpgp:0:public-keys:{fingerprint}" | 1569 node = f"urn:xmpp:openpgp:0:public-keys:{fingerprint}" |
1567 | 1570 |
1568 try: | 1571 try: |
1569 items, __ = await self.__xep_0060.getItems( | 1572 items, __ = await self.__xep_0060.getItems( |
1736 protocols or protocol extensions. | 1739 protocols or protocol extensions. |
1737 @raise XMPPInteractionFailed: if any interaction via XMPP failed. | 1740 @raise XMPPInteractionFailed: if any interaction via XMPP failed. |
1738 """ | 1741 """ |
1739 | 1742 |
1740 try: | 1743 try: |
1741 infos = cast(DiscoInfo, await self.__sat.memory.disco.getInfos( | 1744 infos = cast(DiscoInfo, await self.host.memory.disco.getInfos( |
1742 client, | 1745 client, |
1743 client.jid.userhostJID() | 1746 client.jid.userhostJID() |
1744 )) | 1747 )) |
1745 except Exception as e: | 1748 except Exception as e: |
1746 raise XMPPInteractionFailed( | 1749 raise XMPPInteractionFailed( |
1811 @raise exceptions.FeatureNotFound: if the server lacks support for the required | 1814 @raise exceptions.FeatureNotFound: if the server lacks support for the required |
1812 protocols or protocol extensions. | 1815 protocols or protocol extensions. |
1813 @raise XMPPInteractionFailed: if any interaction via XMPP failed. | 1816 @raise XMPPInteractionFailed: if any interaction via XMPP failed. |
1814 """ | 1817 """ |
1815 | 1818 |
1816 gpg_provider = get_gpg_provider(self.__sat, client) | 1819 gpg_provider = get_gpg_provider(self.host, client) |
1817 | 1820 |
1818 await self.__prepare_secret_key_synchronization(client) | 1821 await self.__prepare_secret_key_synchronization(client) |
1819 | 1822 |
1820 backup_code = generate_passphrase() | 1823 backup_code = generate_passphrase() |
1821 | 1824 |
1899 @raise InvalidPacket: if one of the GPG packets building the secret key data is | 1902 @raise InvalidPacket: if one of the GPG packets building the secret key data is |
1900 either syntactically or semantically deemed invalid. | 1903 either syntactically or semantically deemed invalid. |
1901 @raise DecryptionFailed: on decryption failure. | 1904 @raise DecryptionFailed: on decryption failure. |
1902 """ | 1905 """ |
1903 | 1906 |
1904 gpg_provider = get_gpg_provider(self.__sat, client) | 1907 gpg_provider = get_gpg_provider(self.host, client) |
1905 | 1908 |
1906 return gpg_provider.restore_secret_keys(gpg_provider.decrypt_symmetrically( | 1909 return gpg_provider.restore_secret_keys(gpg_provider.decrypt_symmetrically( |
1907 ciphertext, | 1910 ciphertext, |
1908 backup_code | 1911 backup_code |
1909 )) | 1912 )) |
2043 if (await self.get_trust(client, public_key, owner)) is not trust: | 2046 if (await self.get_trust(client, public_key, owner)) is not trust: |
2044 await self.set_trust(client, public_key, owner, value) | 2047 await self.set_trust(client, public_key, owner, value) |
2045 | 2048 |
2046 return {} | 2049 return {} |
2047 | 2050 |
2048 submit_id = self.__sat.registerCallback(callback, with_data=True, one_shot=True) | 2051 submit_id = self.host.registerCallback(callback, with_data=True, one_shot=True) |
2049 | 2052 |
2050 result = xml_tools.XMLUI( | 2053 result = xml_tools.XMLUI( |
2051 panel_type=C.XMLUI_FORM, | 2054 panel_type=C.XMLUI_FORM, |
2052 title=D_("OX trust management"), | 2055 title=D_("OX trust management"), |
2053 submit_id=submit_id | 2056 submit_id=submit_id |