Mercurial > libervia-backend
comparison sat/plugins/plugin_xep_0373.py @ 4037:524856bd7b19
massive refactoring to switch from camelCase to snake_case:
historically, Libervia (SàT before) was using camelCase as allowed by PEP8 when using a
pre-PEP8 code, to use the same coding style as in Twisted.
However, snake_case is more readable and it's better to follow PEP8 best practices, so it
has been decided to move on full snake_case. Because Libervia has a huge codebase, this
ended with a ugly mix of camelCase and snake_case.
To fix that, this patch does a big refactoring by renaming every function and method
(including bridge) that are not coming from Twisted or Wokkel, to use fully snake_case.
This is a massive change, and may result in some bugs.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 08 Apr 2023 13:54:42 +0200 |
parents | 4836b81c5f31 |
children |
comparison
equal
deleted
inserted
replaced
4036:c4464d7ae97b | 4037:524856bd7b19 |
---|---|
195 @raise UnknownKey: if the public key is not available. | 195 @raise UnknownKey: if the public key is not available. |
196 """ | 196 """ |
197 | 197 |
198 @abstractmethod | 198 @abstractmethod |
199 def import_public_key(self, packet: bytes) -> GPGPublicKey: | 199 def import_public_key(self, packet: bytes) -> GPGPublicKey: |
200 """Import a public key from a key material packet according to RFC 4880 §5.5. | 200 """import a public key from a key material packet according to RFC 4880 §5.5. |
201 | 201 |
202 OpenPGP's ASCII Armor is not used. | 202 OpenPGP's ASCII Armor is not used. |
203 | 203 |
204 @param packet: A packet containing an exported public key. | 204 @param packet: A packet containing an exported public key. |
205 @return: The public key imported from the packet. | 205 @return: The public key imported from the packet. |
1011 | 1011 |
1012 self.host = host | 1012 self.host = host |
1013 | 1013 |
1014 # Add configuration option to choose between manual trust and BTBV as the trust | 1014 # Add configuration option to choose between manual trust and BTBV as the trust |
1015 # model | 1015 # model |
1016 host.memory.updateParams(DEFAULT_TRUST_MODEL_PARAM) | 1016 host.memory.update_params(DEFAULT_TRUST_MODEL_PARAM) |
1017 | 1017 |
1018 self.__xep_0045 = cast(Optional[XEP_0045], host.plugins.get("XEP-0045")) | 1018 self.__xep_0045 = cast(Optional[XEP_0045], host.plugins.get("XEP-0045")) |
1019 self.__xep_0060 = cast(XEP_0060, host.plugins["XEP-0060"]) | 1019 self.__xep_0060 = cast(XEP_0060, host.plugins["XEP-0060"]) |
1020 | 1020 |
1021 self.__storage: Dict[str, persistent.LazyPersistentBinaryDict] = {} | 1021 self.__storage: Dict[str, persistent.LazyPersistentBinaryDict] = {} |
1022 | 1022 |
1023 xep_0163 = cast(XEP_0163, host.plugins["XEP-0163"]) | 1023 xep_0163 = cast(XEP_0163, host.plugins["XEP-0163"]) |
1024 xep_0163.addPEPEvent( | 1024 xep_0163.add_pep_event( |
1025 "OX_PUBLIC_KEYS_LIST", | 1025 "OX_PUBLIC_KEYS_LIST", |
1026 PUBLIC_KEYS_LIST_NODE, | 1026 PUBLIC_KEYS_LIST_NODE, |
1027 lambda items_event, profile: defer.ensureDeferred( | 1027 lambda items_event, profile: defer.ensureDeferred( |
1028 self.__on_public_keys_list_update(items_event, profile) | 1028 self.__on_public_keys_list_update(items_event, profile) |
1029 ) | 1029 ) |
1030 ) | 1030 ) |
1031 | 1031 |
1032 async def profileConnecting(self, client): | 1032 async def profile_connecting(self, client): |
1033 client.gpg_provider = get_gpg_provider(self.host, client) | 1033 client.gpg_provider = get_gpg_provider(self.host, client) |
1034 | 1034 |
1035 async def profileConnected( # pylint: disable=invalid-name | 1035 async def profile_connected( # pylint: disable=invalid-name |
1036 self, | 1036 self, |
1037 client: SatXMPPClient | 1037 client: SatXMPPClient |
1038 ) -> None: | 1038 ) -> None: |
1039 """ | 1039 """ |
1040 @param client: The client. | 1040 @param client: The client. |
1059 | 1059 |
1060 @param items_event: The event. | 1060 @param items_event: The event. |
1061 @param profile: The profile this event belongs to. | 1061 @param profile: The profile this event belongs to. |
1062 """ | 1062 """ |
1063 | 1063 |
1064 client = self.host.getClient(profile) | 1064 client = self.host.get_client(profile) |
1065 | 1065 |
1066 sender = cast(jid.JID, items_event.sender) | 1066 sender = cast(jid.JID, items_event.sender) |
1067 items = cast(List[domish.Element], items_event.items) | 1067 items = cast(List[domish.Element], items_event.items) |
1068 | 1068 |
1069 if len(items) > 1: | 1069 if len(items) > 1: |
1321 )) | 1321 )) |
1322 | 1322 |
1323 encryption_keys: Set[GPGPublicKey] = set() | 1323 encryption_keys: Set[GPGPublicKey] = set() |
1324 | 1324 |
1325 for recipient_jid in recipient_jids: | 1325 for recipient_jid in recipient_jids: |
1326 # Import all keys of the recipient | 1326 # import all keys of the recipient |
1327 all_public_keys = await self.import_all_public_keys(client, recipient_jid) | 1327 all_public_keys = await self.import_all_public_keys(client, recipient_jid) |
1328 | 1328 |
1329 # Filter for keys that can encrypt | 1329 # Filter for keys that can encrypt |
1330 encryption_keys |= set(filter(gpg_provider.can_encrypt, all_public_keys)) | 1330 encryption_keys |= set(filter(gpg_provider.can_encrypt, all_public_keys)) |
1331 | 1331 |
1377 decryption_keys = set(filter( | 1377 decryption_keys = set(filter( |
1378 lambda secret_key: gpg_provider.can_encrypt(secret_key.public_key), | 1378 lambda secret_key: gpg_provider.can_encrypt(secret_key.public_key), |
1379 self.list_secret_keys(client) | 1379 self.list_secret_keys(client) |
1380 )) | 1380 )) |
1381 | 1381 |
1382 # Import all keys of the sender | 1382 # import all keys of the sender |
1383 all_public_keys = await self.import_all_public_keys(client, sender_jid) | 1383 all_public_keys = await self.import_all_public_keys(client, sender_jid) |
1384 | 1384 |
1385 # Filter for keys that can sign | 1385 # Filter for keys that can sign |
1386 verification_keys = set(filter(gpg_provider.can_sign, all_public_keys)) | 1386 verification_keys = set(filter(gpg_provider.can_sign, all_public_keys)) |
1387 | 1387 |
1469 pubkey_elt = domish.Element((NS_OX, "pubkey")) | 1469 pubkey_elt = domish.Element((NS_OX, "pubkey")) |
1470 | 1470 |
1471 pubkey_elt.addElement("data", content=base64.b64encode(packet).decode("ASCII")) | 1471 pubkey_elt.addElement("data", content=base64.b64encode(packet).decode("ASCII")) |
1472 | 1472 |
1473 try: | 1473 try: |
1474 await self.__xep_0060.sendItem( | 1474 await self.__xep_0060.send_item( |
1475 client, | 1475 client, |
1476 client.jid.userhostJID(), | 1476 client.jid.userhostJID(), |
1477 node, | 1477 node, |
1478 pubkey_elt, | 1478 pubkey_elt, |
1479 format_datetime(), | 1479 format_datetime(), |
1493 async def import_all_public_keys( | 1493 async def import_all_public_keys( |
1494 self, | 1494 self, |
1495 client: SatXMPPClient, | 1495 client: SatXMPPClient, |
1496 entity_jid: jid.JID | 1496 entity_jid: jid.JID |
1497 ) -> Set[GPGPublicKey]: | 1497 ) -> Set[GPGPublicKey]: |
1498 """Import all public keys of a JID that have not been imported before. | 1498 """import all public keys of a JID that have not been imported before. |
1499 | 1499 |
1500 @param client: The client. | 1500 @param client: The client. |
1501 @param jid: The JID. Can be a bare JID. | 1501 @param jid: The JID. Can be a bare JID. |
1502 @return: The public keys. | 1502 @return: The public keys. |
1503 @note: Failure to import a key simply results in the key not being included in the | 1503 @note: Failure to import a key simply results in the key not being included in the |
1537 available_public_keys.add( | 1537 available_public_keys.add( |
1538 await self.import_public_key(client, entity_jid, missing_key.fingerprint) | 1538 await self.import_public_key(client, entity_jid, missing_key.fingerprint) |
1539 ) | 1539 ) |
1540 except Exception as e: | 1540 except Exception as e: |
1541 log.warning( | 1541 log.warning( |
1542 f"Import of public key {missing_key.fingerprint} owned by" | 1542 f"import of public key {missing_key.fingerprint} owned by" |
1543 f" {entity_jid.userhost()} failed, ignoring: {e}" | 1543 f" {entity_jid.userhost()} failed, ignoring: {e}" |
1544 ) | 1544 ) |
1545 | 1545 |
1546 return available_public_keys | 1546 return available_public_keys |
1547 | 1547 |
1549 self, | 1549 self, |
1550 client: SatXMPPClient, | 1550 client: SatXMPPClient, |
1551 jid: jid.JID, | 1551 jid: jid.JID, |
1552 fingerprint: str | 1552 fingerprint: str |
1553 ) -> GPGPublicKey: | 1553 ) -> GPGPublicKey: |
1554 """Import a public key. | 1554 """import a public key. |
1555 | 1555 |
1556 @param client: The client. | 1556 @param client: The client. |
1557 @param jid: The JID owning the public key. Can be a bare JID. | 1557 @param jid: The JID owning the public key. Can be a bare JID. |
1558 @param fingerprint: The fingerprint of the public key. | 1558 @param fingerprint: The fingerprint of the public key. |
1559 @return: The public key. | 1559 @return: The public key. |
1567 gpg_provider = get_gpg_provider(self.host, client) | 1567 gpg_provider = get_gpg_provider(self.host, client) |
1568 | 1568 |
1569 node = f"urn:xmpp:openpgp:0:public-keys:{fingerprint}" | 1569 node = f"urn:xmpp:openpgp:0:public-keys:{fingerprint}" |
1570 | 1570 |
1571 try: | 1571 try: |
1572 items, __ = await self.__xep_0060.getItems( | 1572 items, __ = await self.__xep_0060.get_items( |
1573 client, | 1573 client, |
1574 jid.userhostJID(), | 1574 jid.userhostJID(), |
1575 node, | 1575 node, |
1576 max_items=1 | 1576 max_items=1 |
1577 ) | 1577 ) |
1642 pubkey_metadata_elt = public_keys_list_elt.addElement("pubkey-metadata") | 1642 pubkey_metadata_elt = public_keys_list_elt.addElement("pubkey-metadata") |
1643 pubkey_metadata_elt["v4-fingerprint"] = public_key_metadata.fingerprint | 1643 pubkey_metadata_elt["v4-fingerprint"] = public_key_metadata.fingerprint |
1644 pubkey_metadata_elt["date"] = format_datetime(public_key_metadata.timestamp) | 1644 pubkey_metadata_elt["date"] = format_datetime(public_key_metadata.timestamp) |
1645 | 1645 |
1646 try: | 1646 try: |
1647 await self.__xep_0060.sendItem( | 1647 await self.__xep_0060.send_item( |
1648 client, | 1648 client, |
1649 client.jid.userhostJID(), | 1649 client.jid.userhostJID(), |
1650 node, | 1650 node, |
1651 public_keys_list_elt, | 1651 public_keys_list_elt, |
1652 item_id=XEP_0060.ID_SINGLETON, | 1652 item_id=XEP_0060.ID_SINGLETON, |
1679 """ | 1679 """ |
1680 | 1680 |
1681 node = "urn:xmpp:openpgp:0:public-keys" | 1681 node = "urn:xmpp:openpgp:0:public-keys" |
1682 | 1682 |
1683 try: | 1683 try: |
1684 items, __ = await self.__xep_0060.getItems( | 1684 items, __ = await self.__xep_0060.get_items( |
1685 client, | 1685 client, |
1686 jid.userhostJID(), | 1686 jid.userhostJID(), |
1687 node, | 1687 node, |
1688 max_items=1 | 1688 max_items=1 |
1689 ) | 1689 ) |
1739 protocols or protocol extensions. | 1739 protocols or protocol extensions. |
1740 @raise XMPPInteractionFailed: if any interaction via XMPP failed. | 1740 @raise XMPPInteractionFailed: if any interaction via XMPP failed. |
1741 """ | 1741 """ |
1742 | 1742 |
1743 try: | 1743 try: |
1744 infos = cast(DiscoInfo, await self.host.memory.disco.getInfos( | 1744 infos = cast(DiscoInfo, await self.host.memory.disco.get_infos( |
1745 client, | 1745 client, |
1746 client.jid.userhostJID() | 1746 client.jid.userhostJID() |
1747 )) | 1747 )) |
1748 except Exception as e: | 1748 except Exception as e: |
1749 raise XMPPInteractionFailed( | 1749 raise XMPPInteractionFailed( |
1767 # TODO: persistent-items is a SHOULD, how do we handle the feature missing? | 1767 # TODO: persistent-items is a SHOULD, how do we handle the feature missing? |
1768 | 1768 |
1769 node = "urn:xmpp:openpgp:0:secret-key" | 1769 node = "urn:xmpp:openpgp:0:secret-key" |
1770 | 1770 |
1771 try: | 1771 try: |
1772 items, __ = await self.__xep_0060.getItems( | 1772 items, __ = await self.__xep_0060.get_items( |
1773 client, | 1773 client, |
1774 client.jid.userhostJID(), | 1774 client.jid.userhostJID(), |
1775 node, | 1775 node, |
1776 max_items=1 | 1776 max_items=1 |
1777 ) | 1777 ) |
1832 | 1832 |
1833 secretkey_elt = domish.Element((NS_OX, "secretkey")) | 1833 secretkey_elt = domish.Element((NS_OX, "secretkey")) |
1834 secretkey_elt.addContent(base64.b64encode(ciphertext).decode("ASCII")) | 1834 secretkey_elt.addContent(base64.b64encode(ciphertext).decode("ASCII")) |
1835 | 1835 |
1836 try: | 1836 try: |
1837 await self.__xep_0060.sendItem( | 1837 await self.__xep_0060.send_item( |
1838 client, | 1838 client, |
1839 client.jid.userhostJID(), | 1839 client.jid.userhostJID(), |
1840 node, | 1840 node, |
1841 secretkey_elt | 1841 secretkey_elt |
1842 ) | 1842 ) |
1886 self, | 1886 self, |
1887 client: SatXMPPClient, | 1887 client: SatXMPPClient, |
1888 ciphertext: bytes, | 1888 ciphertext: bytes, |
1889 backup_code: str | 1889 backup_code: str |
1890 ) -> Set[GPGSecretKey]: | 1890 ) -> Set[GPGSecretKey]: |
1891 """Import previously downloaded secret keys. | 1891 """import previously downloaded secret keys. |
1892 | 1892 |
1893 The downloading and importing steps are separate since a backup code is required | 1893 The downloading and importing steps are separate since a backup code is required |
1894 for the import and it should be possible to try multiple backup codes without | 1894 for the import and it should be possible to try multiple backup codes without |
1895 redownloading the data every time. The first half of the import procedure is | 1895 redownloading the data every time. The first half of the import procedure is |
1896 provided by :meth:`download_secret_keys`. | 1896 provided by :meth:`download_secret_keys`. |
1928 # TODO: This should probably be a global helper somewhere | 1928 # TODO: This should probably be a global helper somewhere |
1929 | 1929 |
1930 bare_jids: Set[jid.JID] = set() | 1930 bare_jids: Set[jid.JID] = set() |
1931 | 1931 |
1932 try: | 1932 try: |
1933 room = cast(muc.Room, xep_0045.getRoom(client, room_jid)) | 1933 room = cast(muc.Room, xep_0045.get_room(client, room_jid)) |
1934 except exceptions.NotFound as e: | 1934 except exceptions.NotFound as e: |
1935 raise exceptions.InternalError( | 1935 raise exceptions.InternalError( |
1936 "Participant list of unjoined MUC requested." | 1936 "Participant list of unjoined MUC requested." |
1937 ) from e | 1937 ) from e |
1938 | 1938 |
1986 | 1986 |
1987 key = f"/trust/{owner.userhost()}/{public_key.fingerprint}" | 1987 key = f"/trust/{owner.userhost()}/{public_key.fingerprint}" |
1988 | 1988 |
1989 await self.__storage[client.profile].force(key, trust_level.name) | 1989 await self.__storage[client.profile].force(key, trust_level.name) |
1990 | 1990 |
1991 async def getTrustUI( # pylint: disable=invalid-name | 1991 async def get_trust_ui( # pylint: disable=invalid-name |
1992 self, | 1992 self, |
1993 client: SatXMPPClient, | 1993 client: SatXMPPClient, |
1994 entity: jid.JID | 1994 entity: jid.JID |
1995 ) -> xml_tools.XMLUI: | 1995 ) -> xml_tools.XMLUI: |
1996 """ | 1996 """ |
2002 | 2002 |
2003 if entity.resource: | 2003 if entity.resource: |
2004 raise ValueError("A bare JID is expected.") | 2004 raise ValueError("A bare JID is expected.") |
2005 | 2005 |
2006 bare_jids: Set[jid.JID] | 2006 bare_jids: Set[jid.JID] |
2007 if self.__xep_0045 is not None and self.__xep_0045.isJoinedRoom(client, entity): | 2007 if self.__xep_0045 is not None and self.__xep_0045.is_joined_room(client, entity): |
2008 bare_jids = self.__get_joined_muc_users(client, self.__xep_0045, entity) | 2008 bare_jids = self.__get_joined_muc_users(client, self.__xep_0045, entity) |
2009 else: | 2009 else: |
2010 bare_jids = { entity.userhostJID() } | 2010 bare_jids = { entity.userhostJID() } |
2011 | 2011 |
2012 all_public_keys = list({ | 2012 all_public_keys = list({ |
2029 if C.bool(data.get("cancelled", "false")): | 2029 if C.bool(data.get("cancelled", "false")): |
2030 return {} | 2030 return {} |
2031 | 2031 |
2032 data_form_result = cast( | 2032 data_form_result = cast( |
2033 Dict[str, str], | 2033 Dict[str, str], |
2034 xml_tools.XMLUIResult2DataFormResult(data) | 2034 xml_tools.xmlui_result_2_data_form_result(data) |
2035 ) | 2035 ) |
2036 for key, value in data_form_result.items(): | 2036 for key, value in data_form_result.items(): |
2037 if not key.startswith("trust_"): | 2037 if not key.startswith("trust_"): |
2038 continue | 2038 continue |
2039 | 2039 |
2046 if (await self.get_trust(client, public_key, owner)) is not trust: | 2046 if (await self.get_trust(client, public_key, owner)) is not trust: |
2047 await self.set_trust(client, public_key, owner, value) | 2047 await self.set_trust(client, public_key, owner, value) |
2048 | 2048 |
2049 return {} | 2049 return {} |
2050 | 2050 |
2051 submit_id = self.host.registerCallback(callback, with_data=True, one_shot=True) | 2051 submit_id = self.host.register_callback(callback, with_data=True, one_shot=True) |
2052 | 2052 |
2053 result = xml_tools.XMLUI( | 2053 result = xml_tools.XMLUI( |
2054 panel_type=C.XMLUI_FORM, | 2054 panel_type=C.XMLUI_FORM, |
2055 title=D_("OX trust management"), | 2055 title=D_("OX trust management"), |
2056 submit_id=submit_id | 2056 submit_id=submit_id |
2068 "yourself. Do *not* validate a key if the fingerprint is wrong!" | 2068 "yourself. Do *not* validate a key if the fingerprint is wrong!" |
2069 )) | 2069 )) |
2070 | 2070 |
2071 own_secret_keys = self.list_secret_keys(client) | 2071 own_secret_keys = self.list_secret_keys(client) |
2072 | 2072 |
2073 trust_ui.changeContainer("label") | 2073 trust_ui.change_container("label") |
2074 for index, secret_key in enumerate(own_secret_keys): | 2074 for index, secret_key in enumerate(own_secret_keys): |
2075 trust_ui.addLabel(D_(f"Own secret key {index} fingerprint")) | 2075 trust_ui.addLabel(D_(f"Own secret key {index} fingerprint")) |
2076 trust_ui.addText(secret_key.public_key.fingerprint) | 2076 trust_ui.addText(secret_key.public_key.fingerprint) |
2077 trust_ui.addEmpty() | 2077 trust_ui.addEmpty() |
2078 trust_ui.addEmpty() | 2078 trust_ui.addEmpty() |