comparison sat/plugins/plugin_xep_0373.py @ 4037:524856bd7b19

massive refactoring to switch from camelCase to snake_case: historically, Libervia (SàT before) was using camelCase as allowed by PEP8 when using a pre-PEP8 code, to use the same coding style as in Twisted. However, snake_case is more readable and it's better to follow PEP8 best practices, so it has been decided to move on full snake_case. Because Libervia has a huge codebase, this ended with a ugly mix of camelCase and snake_case. To fix that, this patch does a big refactoring by renaming every function and method (including bridge) that are not coming from Twisted or Wokkel, to use fully snake_case. This is a massive change, and may result in some bugs.
author Goffi <goffi@goffi.org>
date Sat, 08 Apr 2023 13:54:42 +0200
parents 4836b81c5f31
children
comparison
equal deleted inserted replaced
4036:c4464d7ae97b 4037:524856bd7b19
195 @raise UnknownKey: if the public key is not available. 195 @raise UnknownKey: if the public key is not available.
196 """ 196 """
197 197
198 @abstractmethod 198 @abstractmethod
199 def import_public_key(self, packet: bytes) -> GPGPublicKey: 199 def import_public_key(self, packet: bytes) -> GPGPublicKey:
200 """Import a public key from a key material packet according to RFC 4880 §5.5. 200 """import a public key from a key material packet according to RFC 4880 §5.5.
201 201
202 OpenPGP's ASCII Armor is not used. 202 OpenPGP's ASCII Armor is not used.
203 203
204 @param packet: A packet containing an exported public key. 204 @param packet: A packet containing an exported public key.
205 @return: The public key imported from the packet. 205 @return: The public key imported from the packet.
1011 1011
1012 self.host = host 1012 self.host = host
1013 1013
1014 # Add configuration option to choose between manual trust and BTBV as the trust 1014 # Add configuration option to choose between manual trust and BTBV as the trust
1015 # model 1015 # model
1016 host.memory.updateParams(DEFAULT_TRUST_MODEL_PARAM) 1016 host.memory.update_params(DEFAULT_TRUST_MODEL_PARAM)
1017 1017
1018 self.__xep_0045 = cast(Optional[XEP_0045], host.plugins.get("XEP-0045")) 1018 self.__xep_0045 = cast(Optional[XEP_0045], host.plugins.get("XEP-0045"))
1019 self.__xep_0060 = cast(XEP_0060, host.plugins["XEP-0060"]) 1019 self.__xep_0060 = cast(XEP_0060, host.plugins["XEP-0060"])
1020 1020
1021 self.__storage: Dict[str, persistent.LazyPersistentBinaryDict] = {} 1021 self.__storage: Dict[str, persistent.LazyPersistentBinaryDict] = {}
1022 1022
1023 xep_0163 = cast(XEP_0163, host.plugins["XEP-0163"]) 1023 xep_0163 = cast(XEP_0163, host.plugins["XEP-0163"])
1024 xep_0163.addPEPEvent( 1024 xep_0163.add_pep_event(
1025 "OX_PUBLIC_KEYS_LIST", 1025 "OX_PUBLIC_KEYS_LIST",
1026 PUBLIC_KEYS_LIST_NODE, 1026 PUBLIC_KEYS_LIST_NODE,
1027 lambda items_event, profile: defer.ensureDeferred( 1027 lambda items_event, profile: defer.ensureDeferred(
1028 self.__on_public_keys_list_update(items_event, profile) 1028 self.__on_public_keys_list_update(items_event, profile)
1029 ) 1029 )
1030 ) 1030 )
1031 1031
1032 async def profileConnecting(self, client): 1032 async def profile_connecting(self, client):
1033 client.gpg_provider = get_gpg_provider(self.host, client) 1033 client.gpg_provider = get_gpg_provider(self.host, client)
1034 1034
1035 async def profileConnected( # pylint: disable=invalid-name 1035 async def profile_connected( # pylint: disable=invalid-name
1036 self, 1036 self,
1037 client: SatXMPPClient 1037 client: SatXMPPClient
1038 ) -> None: 1038 ) -> None:
1039 """ 1039 """
1040 @param client: The client. 1040 @param client: The client.
1059 1059
1060 @param items_event: The event. 1060 @param items_event: The event.
1061 @param profile: The profile this event belongs to. 1061 @param profile: The profile this event belongs to.
1062 """ 1062 """
1063 1063
1064 client = self.host.getClient(profile) 1064 client = self.host.get_client(profile)
1065 1065
1066 sender = cast(jid.JID, items_event.sender) 1066 sender = cast(jid.JID, items_event.sender)
1067 items = cast(List[domish.Element], items_event.items) 1067 items = cast(List[domish.Element], items_event.items)
1068 1068
1069 if len(items) > 1: 1069 if len(items) > 1:
1321 )) 1321 ))
1322 1322
1323 encryption_keys: Set[GPGPublicKey] = set() 1323 encryption_keys: Set[GPGPublicKey] = set()
1324 1324
1325 for recipient_jid in recipient_jids: 1325 for recipient_jid in recipient_jids:
1326 # Import all keys of the recipient 1326 # import all keys of the recipient
1327 all_public_keys = await self.import_all_public_keys(client, recipient_jid) 1327 all_public_keys = await self.import_all_public_keys(client, recipient_jid)
1328 1328
1329 # Filter for keys that can encrypt 1329 # Filter for keys that can encrypt
1330 encryption_keys |= set(filter(gpg_provider.can_encrypt, all_public_keys)) 1330 encryption_keys |= set(filter(gpg_provider.can_encrypt, all_public_keys))
1331 1331
1377 decryption_keys = set(filter( 1377 decryption_keys = set(filter(
1378 lambda secret_key: gpg_provider.can_encrypt(secret_key.public_key), 1378 lambda secret_key: gpg_provider.can_encrypt(secret_key.public_key),
1379 self.list_secret_keys(client) 1379 self.list_secret_keys(client)
1380 )) 1380 ))
1381 1381
1382 # Import all keys of the sender 1382 # import all keys of the sender
1383 all_public_keys = await self.import_all_public_keys(client, sender_jid) 1383 all_public_keys = await self.import_all_public_keys(client, sender_jid)
1384 1384
1385 # Filter for keys that can sign 1385 # Filter for keys that can sign
1386 verification_keys = set(filter(gpg_provider.can_sign, all_public_keys)) 1386 verification_keys = set(filter(gpg_provider.can_sign, all_public_keys))
1387 1387
1469 pubkey_elt = domish.Element((NS_OX, "pubkey")) 1469 pubkey_elt = domish.Element((NS_OX, "pubkey"))
1470 1470
1471 pubkey_elt.addElement("data", content=base64.b64encode(packet).decode("ASCII")) 1471 pubkey_elt.addElement("data", content=base64.b64encode(packet).decode("ASCII"))
1472 1472
1473 try: 1473 try:
1474 await self.__xep_0060.sendItem( 1474 await self.__xep_0060.send_item(
1475 client, 1475 client,
1476 client.jid.userhostJID(), 1476 client.jid.userhostJID(),
1477 node, 1477 node,
1478 pubkey_elt, 1478 pubkey_elt,
1479 format_datetime(), 1479 format_datetime(),
1493 async def import_all_public_keys( 1493 async def import_all_public_keys(
1494 self, 1494 self,
1495 client: SatXMPPClient, 1495 client: SatXMPPClient,
1496 entity_jid: jid.JID 1496 entity_jid: jid.JID
1497 ) -> Set[GPGPublicKey]: 1497 ) -> Set[GPGPublicKey]:
1498 """Import all public keys of a JID that have not been imported before. 1498 """import all public keys of a JID that have not been imported before.
1499 1499
1500 @param client: The client. 1500 @param client: The client.
1501 @param jid: The JID. Can be a bare JID. 1501 @param jid: The JID. Can be a bare JID.
1502 @return: The public keys. 1502 @return: The public keys.
1503 @note: Failure to import a key simply results in the key not being included in the 1503 @note: Failure to import a key simply results in the key not being included in the
1537 available_public_keys.add( 1537 available_public_keys.add(
1538 await self.import_public_key(client, entity_jid, missing_key.fingerprint) 1538 await self.import_public_key(client, entity_jid, missing_key.fingerprint)
1539 ) 1539 )
1540 except Exception as e: 1540 except Exception as e:
1541 log.warning( 1541 log.warning(
1542 f"Import of public key {missing_key.fingerprint} owned by" 1542 f"import of public key {missing_key.fingerprint} owned by"
1543 f" {entity_jid.userhost()} failed, ignoring: {e}" 1543 f" {entity_jid.userhost()} failed, ignoring: {e}"
1544 ) 1544 )
1545 1545
1546 return available_public_keys 1546 return available_public_keys
1547 1547
1549 self, 1549 self,
1550 client: SatXMPPClient, 1550 client: SatXMPPClient,
1551 jid: jid.JID, 1551 jid: jid.JID,
1552 fingerprint: str 1552 fingerprint: str
1553 ) -> GPGPublicKey: 1553 ) -> GPGPublicKey:
1554 """Import a public key. 1554 """import a public key.
1555 1555
1556 @param client: The client. 1556 @param client: The client.
1557 @param jid: The JID owning the public key. Can be a bare JID. 1557 @param jid: The JID owning the public key. Can be a bare JID.
1558 @param fingerprint: The fingerprint of the public key. 1558 @param fingerprint: The fingerprint of the public key.
1559 @return: The public key. 1559 @return: The public key.
1567 gpg_provider = get_gpg_provider(self.host, client) 1567 gpg_provider = get_gpg_provider(self.host, client)
1568 1568
1569 node = f"urn:xmpp:openpgp:0:public-keys:{fingerprint}" 1569 node = f"urn:xmpp:openpgp:0:public-keys:{fingerprint}"
1570 1570
1571 try: 1571 try:
1572 items, __ = await self.__xep_0060.getItems( 1572 items, __ = await self.__xep_0060.get_items(
1573 client, 1573 client,
1574 jid.userhostJID(), 1574 jid.userhostJID(),
1575 node, 1575 node,
1576 max_items=1 1576 max_items=1
1577 ) 1577 )
1642 pubkey_metadata_elt = public_keys_list_elt.addElement("pubkey-metadata") 1642 pubkey_metadata_elt = public_keys_list_elt.addElement("pubkey-metadata")
1643 pubkey_metadata_elt["v4-fingerprint"] = public_key_metadata.fingerprint 1643 pubkey_metadata_elt["v4-fingerprint"] = public_key_metadata.fingerprint
1644 pubkey_metadata_elt["date"] = format_datetime(public_key_metadata.timestamp) 1644 pubkey_metadata_elt["date"] = format_datetime(public_key_metadata.timestamp)
1645 1645
1646 try: 1646 try:
1647 await self.__xep_0060.sendItem( 1647 await self.__xep_0060.send_item(
1648 client, 1648 client,
1649 client.jid.userhostJID(), 1649 client.jid.userhostJID(),
1650 node, 1650 node,
1651 public_keys_list_elt, 1651 public_keys_list_elt,
1652 item_id=XEP_0060.ID_SINGLETON, 1652 item_id=XEP_0060.ID_SINGLETON,
1679 """ 1679 """
1680 1680
1681 node = "urn:xmpp:openpgp:0:public-keys" 1681 node = "urn:xmpp:openpgp:0:public-keys"
1682 1682
1683 try: 1683 try:
1684 items, __ = await self.__xep_0060.getItems( 1684 items, __ = await self.__xep_0060.get_items(
1685 client, 1685 client,
1686 jid.userhostJID(), 1686 jid.userhostJID(),
1687 node, 1687 node,
1688 max_items=1 1688 max_items=1
1689 ) 1689 )
1739 protocols or protocol extensions. 1739 protocols or protocol extensions.
1740 @raise XMPPInteractionFailed: if any interaction via XMPP failed. 1740 @raise XMPPInteractionFailed: if any interaction via XMPP failed.
1741 """ 1741 """
1742 1742
1743 try: 1743 try:
1744 infos = cast(DiscoInfo, await self.host.memory.disco.getInfos( 1744 infos = cast(DiscoInfo, await self.host.memory.disco.get_infos(
1745 client, 1745 client,
1746 client.jid.userhostJID() 1746 client.jid.userhostJID()
1747 )) 1747 ))
1748 except Exception as e: 1748 except Exception as e:
1749 raise XMPPInteractionFailed( 1749 raise XMPPInteractionFailed(
1767 # TODO: persistent-items is a SHOULD, how do we handle the feature missing? 1767 # TODO: persistent-items is a SHOULD, how do we handle the feature missing?
1768 1768
1769 node = "urn:xmpp:openpgp:0:secret-key" 1769 node = "urn:xmpp:openpgp:0:secret-key"
1770 1770
1771 try: 1771 try:
1772 items, __ = await self.__xep_0060.getItems( 1772 items, __ = await self.__xep_0060.get_items(
1773 client, 1773 client,
1774 client.jid.userhostJID(), 1774 client.jid.userhostJID(),
1775 node, 1775 node,
1776 max_items=1 1776 max_items=1
1777 ) 1777 )
1832 1832
1833 secretkey_elt = domish.Element((NS_OX, "secretkey")) 1833 secretkey_elt = domish.Element((NS_OX, "secretkey"))
1834 secretkey_elt.addContent(base64.b64encode(ciphertext).decode("ASCII")) 1834 secretkey_elt.addContent(base64.b64encode(ciphertext).decode("ASCII"))
1835 1835
1836 try: 1836 try:
1837 await self.__xep_0060.sendItem( 1837 await self.__xep_0060.send_item(
1838 client, 1838 client,
1839 client.jid.userhostJID(), 1839 client.jid.userhostJID(),
1840 node, 1840 node,
1841 secretkey_elt 1841 secretkey_elt
1842 ) 1842 )
1886 self, 1886 self,
1887 client: SatXMPPClient, 1887 client: SatXMPPClient,
1888 ciphertext: bytes, 1888 ciphertext: bytes,
1889 backup_code: str 1889 backup_code: str
1890 ) -> Set[GPGSecretKey]: 1890 ) -> Set[GPGSecretKey]:
1891 """Import previously downloaded secret keys. 1891 """import previously downloaded secret keys.
1892 1892
1893 The downloading and importing steps are separate since a backup code is required 1893 The downloading and importing steps are separate since a backup code is required
1894 for the import and it should be possible to try multiple backup codes without 1894 for the import and it should be possible to try multiple backup codes without
1895 redownloading the data every time. The first half of the import procedure is 1895 redownloading the data every time. The first half of the import procedure is
1896 provided by :meth:`download_secret_keys`. 1896 provided by :meth:`download_secret_keys`.
1928 # TODO: This should probably be a global helper somewhere 1928 # TODO: This should probably be a global helper somewhere
1929 1929
1930 bare_jids: Set[jid.JID] = set() 1930 bare_jids: Set[jid.JID] = set()
1931 1931
1932 try: 1932 try:
1933 room = cast(muc.Room, xep_0045.getRoom(client, room_jid)) 1933 room = cast(muc.Room, xep_0045.get_room(client, room_jid))
1934 except exceptions.NotFound as e: 1934 except exceptions.NotFound as e:
1935 raise exceptions.InternalError( 1935 raise exceptions.InternalError(
1936 "Participant list of unjoined MUC requested." 1936 "Participant list of unjoined MUC requested."
1937 ) from e 1937 ) from e
1938 1938
1986 1986
1987 key = f"/trust/{owner.userhost()}/{public_key.fingerprint}" 1987 key = f"/trust/{owner.userhost()}/{public_key.fingerprint}"
1988 1988
1989 await self.__storage[client.profile].force(key, trust_level.name) 1989 await self.__storage[client.profile].force(key, trust_level.name)
1990 1990
1991 async def getTrustUI( # pylint: disable=invalid-name 1991 async def get_trust_ui( # pylint: disable=invalid-name
1992 self, 1992 self,
1993 client: SatXMPPClient, 1993 client: SatXMPPClient,
1994 entity: jid.JID 1994 entity: jid.JID
1995 ) -> xml_tools.XMLUI: 1995 ) -> xml_tools.XMLUI:
1996 """ 1996 """
2002 2002
2003 if entity.resource: 2003 if entity.resource:
2004 raise ValueError("A bare JID is expected.") 2004 raise ValueError("A bare JID is expected.")
2005 2005
2006 bare_jids: Set[jid.JID] 2006 bare_jids: Set[jid.JID]
2007 if self.__xep_0045 is not None and self.__xep_0045.isJoinedRoom(client, entity): 2007 if self.__xep_0045 is not None and self.__xep_0045.is_joined_room(client, entity):
2008 bare_jids = self.__get_joined_muc_users(client, self.__xep_0045, entity) 2008 bare_jids = self.__get_joined_muc_users(client, self.__xep_0045, entity)
2009 else: 2009 else:
2010 bare_jids = { entity.userhostJID() } 2010 bare_jids = { entity.userhostJID() }
2011 2011
2012 all_public_keys = list({ 2012 all_public_keys = list({
2029 if C.bool(data.get("cancelled", "false")): 2029 if C.bool(data.get("cancelled", "false")):
2030 return {} 2030 return {}
2031 2031
2032 data_form_result = cast( 2032 data_form_result = cast(
2033 Dict[str, str], 2033 Dict[str, str],
2034 xml_tools.XMLUIResult2DataFormResult(data) 2034 xml_tools.xmlui_result_2_data_form_result(data)
2035 ) 2035 )
2036 for key, value in data_form_result.items(): 2036 for key, value in data_form_result.items():
2037 if not key.startswith("trust_"): 2037 if not key.startswith("trust_"):
2038 continue 2038 continue
2039 2039
2046 if (await self.get_trust(client, public_key, owner)) is not trust: 2046 if (await self.get_trust(client, public_key, owner)) is not trust:
2047 await self.set_trust(client, public_key, owner, value) 2047 await self.set_trust(client, public_key, owner, value)
2048 2048
2049 return {} 2049 return {}
2050 2050
2051 submit_id = self.host.registerCallback(callback, with_data=True, one_shot=True) 2051 submit_id = self.host.register_callback(callback, with_data=True, one_shot=True)
2052 2052
2053 result = xml_tools.XMLUI( 2053 result = xml_tools.XMLUI(
2054 panel_type=C.XMLUI_FORM, 2054 panel_type=C.XMLUI_FORM,
2055 title=D_("OX trust management"), 2055 title=D_("OX trust management"),
2056 submit_id=submit_id 2056 submit_id=submit_id
2068 "yourself. Do *not* validate a key if the fingerprint is wrong!" 2068 "yourself. Do *not* validate a key if the fingerprint is wrong!"
2069 )) 2069 ))
2070 2070
2071 own_secret_keys = self.list_secret_keys(client) 2071 own_secret_keys = self.list_secret_keys(client)
2072 2072
2073 trust_ui.changeContainer("label") 2073 trust_ui.change_container("label")
2074 for index, secret_key in enumerate(own_secret_keys): 2074 for index, secret_key in enumerate(own_secret_keys):
2075 trust_ui.addLabel(D_(f"Own secret key {index} fingerprint")) 2075 trust_ui.addLabel(D_(f"Own secret key {index} fingerprint"))
2076 trust_ui.addText(secret_key.public_key.fingerprint) 2076 trust_ui.addText(secret_key.public_key.fingerprint)
2077 trust_ui.addEmpty() 2077 trust_ui.addEmpty()
2078 trust_ui.addEmpty() 2078 trust_ui.addEmpty()