Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_xep_0373.py @ 4270:0d7bb4df2343
Reformatted code base using black.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 19 Jun 2024 18:44:57 +0200 |
parents | b53b6dc1f929 |
children | 71c939e34ca6 |
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_xep_0373.py Tue Jun 18 12:06:45 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0373.py Wed Jun 19 18:44:57 2024 +0200 @@ -75,7 +75,7 @@ "GPGProvider", "PublicKeyMetadata", "gpg_provider", - "TrustLevel" + "TrustLevel", ] @@ -86,8 +86,8 @@ C.PI_NAME: "XEP-0373", C.PI_IMPORT_NAME: "XEP-0373", C.PI_TYPE: "SEC", - C.PI_PROTOCOLS: [ "XEP-0373" ], - C.PI_DEPENDENCIES: [ "XEP-0060", "XEP-0163" ], + C.PI_PROTOCOLS: ["XEP-0373"], + C.PI_DEPENDENCIES: ["XEP-0060", "XEP-0163"], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "XEP_0373", C.PI_HANDLER: "no", @@ -306,10 +306,7 @@ @abstractmethod def verify_detached( - self, - data: bytes, - signature: bytes, - public_keys: Set[GPGPublicKey] + self, data: bytes, signature: bytes, public_keys: Set[GPGPublicKey] ) -> None: """Verify signed data, where the signature was created detached from the data. @@ -329,7 +326,7 @@ self, plaintext: bytes, public_keys: Set[GPGPublicKey], - signing_keys: Optional[Set[GPGSecretKey]] = None + signing_keys: Optional[Set[GPGSecretKey]] = None, ) -> bytes: """Encrypt and optionally sign some data. @@ -346,7 +343,7 @@ self, ciphertext: bytes, secret_keys: Set[GPGSecretKey], - public_keys: Optional[Set[GPGPublicKey]] = None + public_keys: Optional[Set[GPGPublicKey]] = None, ) -> bytes: """Decrypt and optionally verify some data. @@ -569,9 +566,7 @@ with gpg.Context(home_dir=self.__home_dir) as c: try: plaintext, __, __ = c.decrypt( - ciphertext, - passphrase=password, - verify=False + ciphertext, passphrase=password, verify=False ) except gpg.errors.GPGMEError as e: # TODO: Find out what kind of error is raised if the password is wrong and @@ -646,10 +641,7 @@ return data def verify_detached( - self, - data: bytes, - signature: bytes, - public_keys: Set[GPGPublicKey] + self, data: bytes, signature: bytes, public_keys: Set[GPGPublicKey] ) -> None: with gpg.Context(home_dir=self.__home_dir) as c: try: @@ -677,7 +669,7 @@ self, plaintext: bytes, public_keys: Set[GPGPublicKey], - signing_keys: Optional[Set[GPGSecretKey]] = None + signing_keys: Optional[Set[GPGSecretKey]] = None, ) -> bytes: recipients = [] for public_key in public_keys: @@ -701,7 +693,7 @@ recipients=recipients, sign=sign, always_trust=True, - add_encrypt_to=True + add_encrypt_to=True, ) except gpg.errors.GPGMEError as e: raise GPGProviderError("Internal GPGME error") from e @@ -720,16 +712,13 @@ self, ciphertext: bytes, secret_keys: Set[GPGSecretKey], - public_keys: Optional[Set[GPGPublicKey]] = None + public_keys: Optional[Set[GPGPublicKey]] = None, ) -> bytes: verify = public_keys is not None with gpg.Context(home_dir=self.__home_dir) as c: try: - plaintext, result, verify_result = c.decrypt( - ciphertext, - verify=verify - ) + plaintext, result, verify_result = c.decrypt(ciphertext, verify=verify) except gpg.errors.GPGMEError as e: raise GPGProviderError("Internal GPGME error") from e except gpg.UnsupportedAlgorithm as e: @@ -760,8 +749,7 @@ try: return { GPGME_GPGPublicKey(key) - for key - in c.keylist(pattern=user_id, secret=False) + for key in c.keylist(pattern=user_id, secret=False) } except gpg.errors.GPGMEError as e: raise GPGProviderError("Internal GPGME error") from e @@ -771,8 +759,7 @@ try: return { GPGME_GPGSecretKey(GPGME_GPGPublicKey(key)) - for key - in c.keylist(pattern=user_id, secret=True) + for key in c.keylist(pattern=user_id, secret=True) } except gpg.errors.GPGMEError as e: raise GPGProviderError("Internal GPGME error") from e @@ -797,7 +784,7 @@ encrypt=True, certify=False, authenticate=False, - force=True + force=True, ) key_obj = c.get_key(result.fpr, secret=True) @@ -813,19 +800,20 @@ """ Metadata about a published public key. """ + fingerprint: str timestamp: datetime def to_dict(self) -> dict: # Convert the instance to a dictionary and handle datetime serialization data = self._asdict() - data['timestamp'] = self.timestamp.isoformat() + data["timestamp"] = self.timestamp.isoformat() return data @staticmethod - def from_dict(data: dict) -> 'PublicKeyMetadata': + def from_dict(data: dict) -> "PublicKeyMetadata": # Load a serialised dictionary - data['timestamp'] = datetime.fromisoformat(data['timestamp']) + data["timestamp"] = datetime.fromisoformat(data["timestamp"]) return PublicKeyMetadata(**data) @@ -841,20 +829,23 @@ DISTRUSTED: str = "DISTRUSTED" -OPENPGP_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?> +OPENPGP_SCHEMA = xmlschema.XMLSchema( + """<?xml version="1.0" encoding="utf8"?> <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" targetNamespace="urn:xmpp:openpgp:0" xmlns="urn:xmpp:openpgp:0"> <xs:element name="openpgp" type="xs:base64Binary"/> </xs:schema> -""") +""" +) # The following schema needs verion 1.1 of XML Schema, which is not supported by lxml. # Luckily, xmlschema exists, which is a clean, well maintained, cross-platform # implementation of XML Schema, including version 1.1. -CONTENT_SCHEMA = xmlschema.XMLSchema11("""<?xml version="1.1" encoding="utf8"?> +CONTENT_SCHEMA = xmlschema.XMLSchema11( + """<?xml version="1.1" encoding="utf8"?> <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" targetNamespace="urn:xmpp:openpgp:0" xmlns="urn:xmpp:openpgp:0"> @@ -914,11 +905,13 @@ </xs:complexType> </xs:element> </xs:schema> -""") +""" +) PUBLIC_KEYS_LIST_NODE = "urn:xmpp:openpgp:0:public-keys" -PUBLIC_KEYS_LIST_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?> +PUBLIC_KEYS_LIST_SCHEMA = xmlschema.XMLSchema( + """<?xml version="1.0" encoding="utf8"?> <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" targetNamespace="urn:xmpp:openpgp:0" xmlns="urn:xmpp:openpgp:0"> @@ -938,10 +931,12 @@ </xs:complexType> </xs:element> </xs:schema> -""") +""" +) -PUBKEY_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?> +PUBKEY_SCHEMA = xmlschema.XMLSchema( + """<?xml version="1.0" encoding="utf8"?> <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" targetNamespace="urn:xmpp:openpgp:0" xmlns="urn:xmpp:openpgp:0"> @@ -957,17 +952,20 @@ <xs:element name="data" type="xs:base64Binary"/> </xs:schema> -""") +""" +) -SECRETKEY_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?> +SECRETKEY_SCHEMA = xmlschema.XMLSchema( + """<?xml version="1.0" encoding="utf8"?> <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" targetNamespace="urn:xmpp:openpgp:0" xmlns="urn:xmpp:openpgp:0"> <xs:element name="secretkey" type="xs:base64Binary"/> </xs:schema> -""") +""" +) DEFAULT_TRUST_MODEL_PARAM = f""" @@ -1005,9 +1003,10 @@ @return: The passphrase. """ - return "-".join("".join( - secrets.choice("123456789ABCDEFGHIJKLMNPQRSTUVWXYZ") for __ in range(4) - ) for __ in range(6)) + return "-".join( + "".join(secrets.choice("123456789ABCDEFGHIJKLMNPQRSTUVWXYZ") for __ in range(4)) + for __ in range(6) + ) # TODO: Handle the user id mess @@ -1038,15 +1037,14 @@ PUBLIC_KEYS_LIST_NODE, lambda items_event, profile: defer.ensureDeferred( self.__on_public_keys_list_update(items_event, profile) - ) + ), ) async def profile_connecting(self, client): client.gpg_provider = get_gpg_provider(self.host, client) async def profile_connected( # pylint: disable=invalid-name - self, - client: SatXMPPClient + self, client: SatXMPPClient ) -> None: """ @param client: The client. @@ -1055,17 +1053,16 @@ profile = cast(str, client.profile) if not profile in self.__storage: - self.__storage[profile] = \ - persistent.LazyPersistentBinaryDict("XEP-0373", client.profile) + self.__storage[profile] = persistent.LazyPersistentBinaryDict( + "XEP-0373", client.profile + ) if len(self.list_secret_keys(client)) == 0: log.debug(f"Generating first GPG key for {client.jid.userhost()}.") await self.create_key(client) async def __on_public_keys_list_update( - self, - items_event: pubsub.ItemsEvent, - profile: str + self, items_event: pubsub.ItemsEvent, profile: str ) -> None: """Handle public keys list updates fired by PEP. @@ -1089,7 +1086,7 @@ public_keys_list_elt = cast( Optional[domish.Element], - next(item_elt.elements(NS_OX, "public-keys-list"), None) + next(item_elt.elements(NS_OX, "public-keys-list"), None), ) pubkey_metadata_elts: Optional[List[domish.Element]] = None @@ -1100,17 +1097,21 @@ except xmlschema.XMLSchemaValidationError: pass else: - pubkey_metadata_elts = \ - list(public_keys_list_elt.elements(NS_OX, "pubkey-metadata")) + pubkey_metadata_elts = list( + public_keys_list_elt.elements(NS_OX, "pubkey-metadata") + ) if pubkey_metadata_elts is None: log.warning(f"Malformed public keys list update item: {item_elt.toXml()}") return - new_public_keys_metadata = { PublicKeyMetadata( - fingerprint=cast(str, pubkey_metadata_elt["v4-fingerprint"]), - timestamp=parse_datetime(cast(str, pubkey_metadata_elt["date"])) - ) for pubkey_metadata_elt in pubkey_metadata_elts } + new_public_keys_metadata = { + PublicKeyMetadata( + fingerprint=cast(str, pubkey_metadata_elt["v4-fingerprint"]), + timestamp=parse_datetime(cast(str, pubkey_metadata_elt["date"])), + ) + for pubkey_metadata_elt in pubkey_metadata_elts + } storage_key = STR_KEY_PUBLIC_KEYS_METADATA.format(sender.userhost()) @@ -1140,11 +1141,15 @@ # included in the update if sender.userhost() == client.jid.userhost(): secret_keys = self.list_secret_keys(client) - missing_keys = set(filter(lambda secret_key: all( - key_metadata.fingerprint != secret_key.public_key.fingerprint - for key_metadata - in new_public_keys_metadata - ), secret_keys)) + missing_keys = set( + filter( + lambda secret_key: all( + key_metadata.fingerprint != secret_key.public_key.fingerprint + for key_metadata in new_public_keys_metadata + ), + secret_keys, + ) + ) if len(missing_keys) > 0: log.warning( @@ -1154,16 +1159,17 @@ for missing_key in missing_keys: log.warning(missing_key.public_key.fingerprint) - new_public_keys_metadata.add(PublicKeyMetadata( - fingerprint=missing_key.public_key.fingerprint, - timestamp=datetime.now(timezone.utc) - )) + new_public_keys_metadata.add( + PublicKeyMetadata( + fingerprint=missing_key.public_key.fingerprint, + timestamp=datetime.now(timezone.utc), + ) + ) await self.publish_public_keys_list(client, new_public_keys_metadata) await self.__storage[profile].force( - storage_key, - [pkm.to_dict() for pkm in new_public_keys_metadata] + storage_key, [pkm.to_dict() for pkm in new_public_keys_metadata] ) def list_public_keys(self, client: SatXMPPClient, jid: jid.JID) -> Set[GPGPublicKey]: @@ -1211,16 +1217,17 @@ for pkm in await self.__storage[client.profile].get(storage_key, []) } - public_keys_list.add(PublicKeyMetadata( - fingerprint=secret_key.public_key.fingerprint, - timestamp=datetime.now(timezone.utc) - )) + public_keys_list.add( + PublicKeyMetadata( + fingerprint=secret_key.public_key.fingerprint, + timestamp=datetime.now(timezone.utc), + ) + ) await self.publish_public_keys_list(client, public_keys_list) await self.__storage[client.profile].force( - storage_key, - [pkm.to_dict() for pkm in public_keys_list] + storage_key, [pkm.to_dict() for pkm in public_keys_list] ) return secret_key @@ -1229,7 +1236,7 @@ def __build_content_element( element_name: Literal["signcrypt", "sign", "crypt"], recipient_jids: Iterable[jid.JID], - include_rpad: bool + include_rpad: bool, ) -> Tuple[domish.Element, domish.Element]: """Build a content element. @@ -1254,8 +1261,7 @@ rpad_length = secrets.randbelow(201) rpad_content = "".join( secrets.choice(string.digits + string.ascii_letters + string.punctuation) - for __ - in range(rpad_length) + for __ in range(rpad_length) ) content_elt.addElement("rpad", content=rpad_content) @@ -1265,7 +1271,7 @@ @staticmethod def build_signcrypt_element( - recipient_jids: Iterable[jid.JID] + recipient_jids: Iterable[jid.JID], ) -> Tuple[domish.Element, domish.Element]: """Build a ``<signcrypt/>`` content element. @@ -1282,8 +1288,7 @@ @staticmethod def build_sign_element( - recipient_jids: Iterable[jid.JID], - include_rpad: bool + recipient_jids: Iterable[jid.JID], include_rpad: bool ) -> Tuple[domish.Element, domish.Element]: """Build a ``<sign/>`` content element. @@ -1302,7 +1307,7 @@ @staticmethod def build_crypt_element( - recipient_jids: Iterable[jid.JID] + recipient_jids: Iterable[jid.JID], ) -> Tuple[domish.Element, domish.Element]: """Build a ``<crypt/>`` content element. @@ -1319,7 +1324,7 @@ self, client: SatXMPPClient, content_elt: domish.Element, - recipient_jids: Set[jid.JID] + recipient_jids: Set[jid.JID], ) -> domish.Element: """Build an ``<openpgp/>`` element. @@ -1333,10 +1338,12 @@ # TODO: I'm not sure whether we want to sign with all keys by default or choose # just one key/a subset of keys to sign with. - signing_keys = set(filter( - lambda secret_key: gpg_provider.can_sign(secret_key.public_key), - self.list_secret_keys(client) - )) + signing_keys = set( + filter( + lambda secret_key: gpg_provider.can_sign(secret_key.public_key), + self.list_secret_keys(client), + ) + ) encryption_keys: Set[GPGPublicKey] = set() @@ -1370,7 +1377,7 @@ client: SatXMPPClient, openpgp_elt: domish.Element, element_name: Literal["signcrypt", "sign", "crypt"], - sender_jid: jid.JID + sender_jid: jid.JID, ) -> Tuple[domish.Element, datetime]: """Verify, decrypt and unpack an ``<openpgp/>`` element. @@ -1392,10 +1399,12 @@ gpg_provider = get_gpg_provider(self.host, client) - decryption_keys = set(filter( - lambda secret_key: gpg_provider.can_encrypt(secret_key.public_key), - self.list_secret_keys(client) - )) + decryption_keys = set( + filter( + lambda secret_key: gpg_provider.can_encrypt(secret_key.public_key), + self.list_secret_keys(client), + ) + ) # import all keys of the sender all_public_keys = await self.import_all_public_keys(client, sender_jid) @@ -1417,9 +1426,7 @@ if element_name == "signcrypt": content = gpg_provider.decrypt( - openpgp_message, - decryption_keys, - public_keys=verification_keys + openpgp_message, decryption_keys, public_keys=verification_keys ) elif element_name == "sign": content = gpg_provider.verify(openpgp_message, verification_keys) @@ -1430,8 +1437,7 @@ try: content_elt = cast( - domish.Element, - xml_tools.ElementParser()(content.decode("utf-8")) + domish.Element, xml_tools.ElementParser()(content.decode("utf-8")) ) except UnicodeDecodeError as e: raise exceptions.ParsingError("UTF-8 decoding error") from e @@ -1446,11 +1452,12 @@ if content_elt.name != element_name: raise exceptions.ParsingError(f"Not a <{element_name}/> element.") - recipient_jids = \ - { jid.JID(to_elt["jid"]) for to_elt in content_elt.elements(NS_OX, "to") } + recipient_jids = { + jid.JID(to_elt["jid"]) for to_elt in content_elt.elements(NS_OX, "to") + } if ( - client.jid.userhostJID() not in { jid.userhostJID() for jid in recipient_jids } + client.jid.userhostJID() not in {jid.userhostJID() for jid in recipient_jids} and element_name != "crypt" ): raise VerificationError( @@ -1467,9 +1474,7 @@ return payload_elt, timestamp async def publish_public_key( - self, - client: SatXMPPClient, - public_key: GPGPublicKey + self, client: SatXMPPClient, public_key: GPGPublicKey ) -> None: """Publish a public key. @@ -1499,19 +1504,17 @@ XEP_0060.EXTRA_PUBLISH_OPTIONS: { XEP_0060.OPT_PERSIST_ITEMS: "true", XEP_0060.OPT_ACCESS_MODEL: "open", - XEP_0060.OPT_MAX_ITEMS: 1 + XEP_0060.OPT_MAX_ITEMS: 1, }, # TODO: Do we really want publish_without_options here? - XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options" - } + XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options", + }, ) except Exception as e: raise XMPPInteractionFailed("Publishing the public key failed.") from e async def import_all_public_keys( - self, - client: SatXMPPClient, - entity_jid: jid.JID + self, client: SatXMPPClient, entity_jid: jid.JID ) -> Set[GPGPublicKey]: """import all public keys of a JID that have not been imported before. @@ -1535,26 +1538,28 @@ client, entity_jid ) if not public_keys_metadata: - raise exceptions.NotFound( - f"Can't find public keys for {entity_jid}" - ) + raise exceptions.NotFound(f"Can't find public keys for {entity_jid}") else: await self.__storage[client.profile].aset( - storage_key, - [pkm.to_dict() for pkm in public_keys_metadata] + storage_key, [pkm.to_dict() for pkm in public_keys_metadata] ) - - missing_keys = set(filter(lambda public_key_metadata: all( - public_key_metadata.fingerprint != public_key.fingerprint - for public_key - in available_public_keys - ), public_keys_metadata)) + missing_keys = set( + filter( + lambda public_key_metadata: all( + public_key_metadata.fingerprint != public_key.fingerprint + for public_key in available_public_keys + ), + public_keys_metadata, + ) + ) for missing_key in missing_keys: try: available_public_keys.add( - await self.import_public_key(client, entity_jid, missing_key.fingerprint) + await self.import_public_key( + client, entity_jid, missing_key.fingerprint + ) ) except Exception as e: log.warning( @@ -1565,10 +1570,7 @@ return available_public_keys async def import_public_key( - self, - client: SatXMPPClient, - jid: jid.JID, - fingerprint: str + self, client: SatXMPPClient, jid: jid.JID, fingerprint: str ) -> GPGPublicKey: """import a public key. @@ -1589,10 +1591,7 @@ try: items, __ = await self.__xep_0060.get_items( - client, - jid.userhostJID(), - node, - max_items=1 + client, jid.userhostJID(), node, max_items=1 ) except exceptions.NotFound as e: raise exceptions.NotFound( @@ -1611,8 +1610,7 @@ ) from e pubkey_elt = cast( - Optional[domish.Element], - next(item_elt.elements(NS_OX, "pubkey"), None) + Optional[domish.Element], next(item_elt.elements(NS_OX, "pubkey"), None) ) if pubkey_elt is None: @@ -1629,16 +1627,14 @@ f" schema validation." ) from e - public_key = gpg_provider.import_public_key(base64.b64decode(str( - next(pubkey_elt.elements(NS_OX, "data")) - ))) + public_key = gpg_provider.import_public_key( + base64.b64decode(str(next(pubkey_elt.elements(NS_OX, "data")))) + ) return public_key async def publish_public_keys_list( - self, - client: SatXMPPClient, - public_keys_list: Iterable[PublicKeyMetadata] + self, client: SatXMPPClient, public_keys_list: Iterable[PublicKeyMetadata] ) -> None: """Publish/update the own public keys list. @@ -1650,7 +1646,7 @@ beforehand. """ - if len({ pkm.fingerprint for pkm in public_keys_list }) != len(public_keys_list): + if len({pkm.fingerprint for pkm in public_keys_list}) != len(public_keys_list): raise ValueError("Public keys list contains duplicate fingerprints.") node = "urn:xmpp:openpgp:0:public-keys" @@ -1673,19 +1669,17 @@ XEP_0060.EXTRA_PUBLISH_OPTIONS: { XEP_0060.OPT_PERSIST_ITEMS: "true", XEP_0060.OPT_ACCESS_MODEL: "open", - XEP_0060.OPT_MAX_ITEMS: 1 + XEP_0060.OPT_MAX_ITEMS: 1, }, # TODO: Do we really want publish_without_options here? - XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options" - } + XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options", + }, ) except Exception as e: raise XMPPInteractionFailed("Publishing the public keys list failed.") from e async def download_public_keys_list( - self, - client: SatXMPPClient, - jid: jid.JID + self, client: SatXMPPClient, jid: jid.JID ) -> Optional[Set[PublicKeyMetadata]]: """Download the public keys list of a JID. @@ -1701,10 +1695,7 @@ try: items, __ = await self.__xep_0060.get_items( - client, - jid.userhostJID(), - node, - max_items=1 + client, jid.userhostJID(), node, max_items=1 ) except exceptions.NotFound: return None @@ -1718,7 +1709,7 @@ public_keys_list_elt = cast( Optional[domish.Element], - next(item_elt.elements(NS_OX, "public-keys-list"), None) + next(item_elt.elements(NS_OX, "public-keys-list"), None), ) if public_keys_list_elt is None: @@ -1735,15 +1726,15 @@ return { PublicKeyMetadata( fingerprint=pubkey_metadata_elt["v4-fingerprint"], - timestamp=parse_datetime(pubkey_metadata_elt["date"]) + timestamp=parse_datetime(pubkey_metadata_elt["date"]), ) - for pubkey_metadata_elt - in public_keys_list_elt.elements(NS_OX, "pubkey-metadata") + for pubkey_metadata_elt in public_keys_list_elt.elements( + NS_OX, "pubkey-metadata" + ) } async def __prepare_secret_key_synchronization( - self, - client: SatXMPPClient + self, client: SatXMPPClient ) -> Optional[domish.Element]: """Prepare for secret key synchronization. @@ -1760,10 +1751,10 @@ """ try: - infos = cast(DiscoInfo, await self.host.memory.disco.get_infos( - client, - client.jid.userhostJID() - )) + infos = cast( + DiscoInfo, + await self.host.memory.disco.get_infos(client, client.jid.userhostJID()), + ) except Exception as e: raise XMPPInteractionFailed( "Error performing service discovery on the own bare JID." @@ -1780,8 +1771,9 @@ "Server doesn't support the whitelist access model." ) - persistent_items_supported = \ + persistent_items_supported = ( "http://jabber.org/protocol/pubsub#persistent-items" in features + ) # TODO: persistent-items is a SHOULD, how do we handle the feature missing? @@ -1789,10 +1781,7 @@ try: items, __ = await self.__xep_0060.get_items( - client, - client.jid.userhostJID(), - node, - max_items=1 + client, client.jid.userhostJID(), node, max_items=1 ) except exceptions.NotFound: try: @@ -1803,8 +1792,8 @@ { XEP_0060.OPT_PERSIST_ITEMS: "true", XEP_0060.OPT_ACCESS_MODEL: "whitelist", - XEP_0060.OPT_MAX_ITEMS: "1" - } + XEP_0060.OPT_MAX_ITEMS: "1", + }, ) except Exception as e: raise XMPPInteractionFailed( @@ -1821,9 +1810,7 @@ return None async def export_secret_keys( - self, - client: SatXMPPClient, - secret_keys: Iterable[GPGSecretKey] + self, client: SatXMPPClient, secret_keys: Iterable[GPGSecretKey] ) -> str: """Export secret keys to synchronize them with other devices. @@ -1854,10 +1841,7 @@ try: await self.__xep_0060.send_item( - client, - client.jid.userhostJID(), - node, - secretkey_elt + client, client.jid.userhostJID(), node, secretkey_elt ) except Exception as e: raise XMPPInteractionFailed("Publishing the secret keys failed.") from e @@ -1885,8 +1869,7 @@ return None secretkey_elt = cast( - Optional[domish.Element], - next(item_elt.elements(NS_OX, "secretkey"), None) + Optional[domish.Element], next(item_elt.elements(NS_OX, "secretkey"), None) ) if secretkey_elt is None: @@ -1902,10 +1885,7 @@ return base64.b64decode(str(secretkey_elt)) def import_secret_keys( - self, - client: SatXMPPClient, - ciphertext: bytes, - backup_code: str + self, client: SatXMPPClient, ciphertext: bytes, backup_code: str ) -> Set[GPGSecretKey]: """import previously downloaded secret keys. @@ -1925,16 +1905,13 @@ gpg_provider = get_gpg_provider(self.host, client) - return gpg_provider.restore_secret_keys(gpg_provider.decrypt_symmetrically( - ciphertext, - backup_code - )) + return gpg_provider.restore_secret_keys( + gpg_provider.decrypt_symmetrically(ciphertext, backup_code) + ) @staticmethod def __get_joined_muc_users( - client: SatXMPPClient, - xep_0045: XEP_0045, - room_jid: jid.JID + client: SatXMPPClient, xep_0045: XEP_0045, room_jid: jid.JID ) -> Set[jid.JID]: """ @param client: The client. @@ -1968,10 +1945,7 @@ return bare_jids async def get_trust( - self, - client: SatXMPPClient, - public_key: GPGPublicKey, - owner: jid.JID + self, client: SatXMPPClient, public_key: GPGPublicKey, owner: jid.JID ) -> TrustLevel: """Query the trust level of a public key. @@ -1993,7 +1967,7 @@ client: SatXMPPClient, public_key: GPGPublicKey, owner: jid.JID, - trust_level: TrustLevel + trust_level: TrustLevel, ) -> None: """Set the trust level of a public key. @@ -2008,9 +1982,7 @@ await self.__storage[client.profile].force(key, trust_level.name) async def get_trust_ui( # pylint: disable=invalid-name - self, - client: SatXMPPClient, - entity: jid.JID + self, client: SatXMPPClient, entity: jid.JID ) -> xml_tools.XMLUI: """ @param client: The client. @@ -2026,17 +1998,17 @@ if self.__xep_0045 is not None and self.__xep_0045.is_joined_room(client, entity): bare_jids = self.__get_joined_muc_users(client, self.__xep_0045, entity) else: - bare_jids = { entity.userhostJID() } + bare_jids = {entity.userhostJID()} - all_public_keys = list({ - bare_jid: list(self.list_public_keys(client, bare_jid)) - for bare_jid - in bare_jids - }.items()) + all_public_keys = list( + { + bare_jid: list(self.list_public_keys(client, bare_jid)) + for bare_jid in bare_jids + }.items() + ) async def callback( - data: Any, - profile: str # pylint: disable=unused-argument + data: Any, profile: str # pylint: disable=unused-argument ) -> Dict[Never, Never]: """ @param data: The XMLUI result produces by the trust UI form. @@ -2049,8 +2021,7 @@ return {} data_form_result = cast( - Dict[str, str], - xml_tools.xmlui_result_2_data_form_result(data) + Dict[str, str], xml_tools.xmlui_result_2_data_form_result(data) ) for key, value in data_form_result.items(): if not key.startswith("trust_"): @@ -2070,22 +2041,22 @@ submit_id = self.host.register_callback(callback, with_data=True, one_shot=True) result = xml_tools.XMLUI( - panel_type=C.XMLUI_FORM, - title=D_("OX trust management"), - submit_id=submit_id + panel_type=C.XMLUI_FORM, title=D_("OX trust management"), submit_id=submit_id ) # Casting this to Any, otherwise all calls on the variable cause type errors # pylint: disable=no-member trust_ui = cast(Any, result) - trust_ui.addText(D_( - "This is OX trusting system. You'll see below the GPG keys of your " - "contacts, and a list selection to trust them or not. A trusted key " - "can read your messages in plain text, so be sure to only validate " - "keys that you are sure are belonging to your contact. It's better " - "to do this when you are next to your contact, so " - "you can check the \"fingerprint\" of the key " - "yourself. Do *not* validate a key if the fingerprint is wrong!" - )) + trust_ui.addText( + D_( + "This is OX trusting system. You'll see below the GPG keys of your " + "contacts, and a list selection to trust them or not. A trusted key " + "can read your messages in plain text, so be sure to only validate " + "keys that you are sure are belonging to your contact. It's better " + "to do this when you are next to your contact, so " + 'you can check the "fingerprint" of the key ' + "yourself. Do *not* validate a key if the fingerprint is wrong!" + ) + ) own_secret_keys = self.list_secret_keys(client) @@ -2096,7 +2067,7 @@ trust_ui.addEmpty() trust_ui.addEmpty() - for outer_index, [ owner, public_keys ] in enumerate(all_public_keys): + for outer_index, [owner, public_keys] in enumerate(all_public_keys): for inner_index, public_key in enumerate(public_keys): trust_ui.addLabel(D_("Contact")) trust_ui.addJid(jid.JID(owner)) @@ -2105,14 +2076,17 @@ trust_ui.addLabel(D_("Trust this device?")) current_trust_level = await self.get_trust(client, public_key, owner) - avaiable_trust_levels = \ - { TrustLevel.DISTRUSTED, TrustLevel.TRUSTED, current_trust_level } + avaiable_trust_levels = { + TrustLevel.DISTRUSTED, + TrustLevel.TRUSTED, + current_trust_level, + } trust_ui.addList( f"trust_{outer_index}_{inner_index}", - options=[ trust_level.name for trust_level in avaiable_trust_levels ], + options=[trust_level.name for trust_level in avaiable_trust_levels], selected=current_trust_level.name, - styles=[ "inline" ] + styles=["inline"], ) trust_ui.addEmpty()