diff libervia/backend/plugins/plugin_xep_0373.py @ 4270:0d7bb4df2343

Reformatted code base using black.
author Goffi <goffi@goffi.org>
date Wed, 19 Jun 2024 18:44:57 +0200
parents b53b6dc1f929
children
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_xep_0373.py	Tue Jun 18 12:06:45 2024 +0200
+++ b/libervia/backend/plugins/plugin_xep_0373.py	Wed Jun 19 18:44:57 2024 +0200
@@ -75,7 +75,7 @@
     "GPGProvider",
     "PublicKeyMetadata",
     "gpg_provider",
-    "TrustLevel"
+    "TrustLevel",
 ]
 
 
@@ -86,8 +86,8 @@
     C.PI_NAME: "XEP-0373",
     C.PI_IMPORT_NAME: "XEP-0373",
     C.PI_TYPE: "SEC",
-    C.PI_PROTOCOLS: [ "XEP-0373" ],
-    C.PI_DEPENDENCIES: [ "XEP-0060", "XEP-0163" ],
+    C.PI_PROTOCOLS: ["XEP-0373"],
+    C.PI_DEPENDENCIES: ["XEP-0060", "XEP-0163"],
     C.PI_RECOMMENDATIONS: [],
     C.PI_MAIN: "XEP_0373",
     C.PI_HANDLER: "no",
@@ -306,10 +306,7 @@
 
     @abstractmethod
     def verify_detached(
-        self,
-        data: bytes,
-        signature: bytes,
-        public_keys: Set[GPGPublicKey]
+        self, data: bytes, signature: bytes, public_keys: Set[GPGPublicKey]
     ) -> None:
         """Verify signed data, where the signature was created detached from the data.
 
@@ -329,7 +326,7 @@
         self,
         plaintext: bytes,
         public_keys: Set[GPGPublicKey],
-        signing_keys: Optional[Set[GPGSecretKey]] = None
+        signing_keys: Optional[Set[GPGSecretKey]] = None,
     ) -> bytes:
         """Encrypt and optionally sign some data.
 
@@ -346,7 +343,7 @@
         self,
         ciphertext: bytes,
         secret_keys: Set[GPGSecretKey],
-        public_keys: Optional[Set[GPGPublicKey]] = None
+        public_keys: Optional[Set[GPGPublicKey]] = None,
     ) -> bytes:
         """Decrypt and optionally verify some data.
 
@@ -569,9 +566,7 @@
         with gpg.Context(home_dir=self.__home_dir) as c:
             try:
                 plaintext, __, __ = c.decrypt(
-                    ciphertext,
-                    passphrase=password,
-                    verify=False
+                    ciphertext, passphrase=password, verify=False
                 )
             except gpg.errors.GPGMEError as e:
                 # TODO: Find out what kind of error is raised if the password is wrong and
@@ -646,10 +641,7 @@
             return data
 
     def verify_detached(
-        self,
-        data: bytes,
-        signature: bytes,
-        public_keys: Set[GPGPublicKey]
+        self, data: bytes, signature: bytes, public_keys: Set[GPGPublicKey]
     ) -> None:
         with gpg.Context(home_dir=self.__home_dir) as c:
             try:
@@ -677,7 +669,7 @@
         self,
         plaintext: bytes,
         public_keys: Set[GPGPublicKey],
-        signing_keys: Optional[Set[GPGSecretKey]] = None
+        signing_keys: Optional[Set[GPGSecretKey]] = None,
     ) -> bytes:
         recipients = []
         for public_key in public_keys:
@@ -701,7 +693,7 @@
                     recipients=recipients,
                     sign=sign,
                     always_trust=True,
-                    add_encrypt_to=True
+                    add_encrypt_to=True,
                 )
             except gpg.errors.GPGMEError as e:
                 raise GPGProviderError("Internal GPGME error") from e
@@ -720,16 +712,13 @@
         self,
         ciphertext: bytes,
         secret_keys: Set[GPGSecretKey],
-        public_keys: Optional[Set[GPGPublicKey]] = None
+        public_keys: Optional[Set[GPGPublicKey]] = None,
     ) -> bytes:
         verify = public_keys is not None
 
         with gpg.Context(home_dir=self.__home_dir) as c:
             try:
-                plaintext, result, verify_result = c.decrypt(
-                    ciphertext,
-                    verify=verify
-                )
+                plaintext, result, verify_result = c.decrypt(ciphertext, verify=verify)
             except gpg.errors.GPGMEError as e:
                 raise GPGProviderError("Internal GPGME error") from e
             except gpg.UnsupportedAlgorithm as e:
@@ -760,8 +749,7 @@
             try:
                 return {
                     GPGME_GPGPublicKey(key)
-                    for key
-                    in c.keylist(pattern=user_id, secret=False)
+                    for key in c.keylist(pattern=user_id, secret=False)
                 }
             except gpg.errors.GPGMEError as e:
                 raise GPGProviderError("Internal GPGME error") from e
@@ -771,8 +759,7 @@
             try:
                 return {
                     GPGME_GPGSecretKey(GPGME_GPGPublicKey(key))
-                    for key
-                    in c.keylist(pattern=user_id, secret=True)
+                    for key in c.keylist(pattern=user_id, secret=True)
                 }
             except gpg.errors.GPGMEError as e:
                 raise GPGProviderError("Internal GPGME error") from e
@@ -797,7 +784,7 @@
                     encrypt=True,
                     certify=False,
                     authenticate=False,
-                    force=True
+                    force=True,
                 )
 
                 key_obj = c.get_key(result.fpr, secret=True)
@@ -813,19 +800,20 @@
     """
     Metadata about a published public key.
     """
+
     fingerprint: str
     timestamp: datetime
 
     def to_dict(self) -> dict:
         # Convert the instance to a dictionary and handle datetime serialization
         data = self._asdict()
-        data['timestamp'] = self.timestamp.isoformat()
+        data["timestamp"] = self.timestamp.isoformat()
         return data
 
     @staticmethod
-    def from_dict(data: dict) -> 'PublicKeyMetadata':
+    def from_dict(data: dict) -> "PublicKeyMetadata":
         # Load a serialised dictionary
-        data['timestamp'] = datetime.fromisoformat(data['timestamp'])
+        data["timestamp"] = datetime.fromisoformat(data["timestamp"])
         return PublicKeyMetadata(**data)
 
 
@@ -841,20 +829,23 @@
     DISTRUSTED: str = "DISTRUSTED"
 
 
-OPENPGP_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?>
+OPENPGP_SCHEMA = xmlschema.XMLSchema(
+    """<?xml version="1.0" encoding="utf8"?>
 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
     targetNamespace="urn:xmpp:openpgp:0"
     xmlns="urn:xmpp:openpgp:0">
 
     <xs:element name="openpgp" type="xs:base64Binary"/>
 </xs:schema>
-""")
+"""
+)
 
 
 # The following schema needs verion 1.1 of XML Schema, which is not supported by lxml.
 # Luckily, xmlschema exists, which is a clean, well maintained, cross-platform
 # implementation of XML Schema, including version 1.1.
-CONTENT_SCHEMA = xmlschema.XMLSchema11("""<?xml version="1.1" encoding="utf8"?>
+CONTENT_SCHEMA = xmlschema.XMLSchema11(
+    """<?xml version="1.1" encoding="utf8"?>
 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
     targetNamespace="urn:xmpp:openpgp:0"
     xmlns="urn:xmpp:openpgp:0">
@@ -914,11 +905,13 @@
         </xs:complexType>
     </xs:element>
 </xs:schema>
-""")
+"""
+)
 
 
 PUBLIC_KEYS_LIST_NODE = "urn:xmpp:openpgp:0:public-keys"
-PUBLIC_KEYS_LIST_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?>
+PUBLIC_KEYS_LIST_SCHEMA = xmlschema.XMLSchema(
+    """<?xml version="1.0" encoding="utf8"?>
 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
     targetNamespace="urn:xmpp:openpgp:0"
     xmlns="urn:xmpp:openpgp:0">
@@ -938,10 +931,12 @@
         </xs:complexType>
     </xs:element>
 </xs:schema>
-""")
+"""
+)
 
 
-PUBKEY_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?>
+PUBKEY_SCHEMA = xmlschema.XMLSchema(
+    """<?xml version="1.0" encoding="utf8"?>
 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
     targetNamespace="urn:xmpp:openpgp:0"
     xmlns="urn:xmpp:openpgp:0">
@@ -957,17 +952,20 @@
 
     <xs:element name="data" type="xs:base64Binary"/>
 </xs:schema>
-""")
+"""
+)
 
 
-SECRETKEY_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?>
+SECRETKEY_SCHEMA = xmlschema.XMLSchema(
+    """<?xml version="1.0" encoding="utf8"?>
 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
     targetNamespace="urn:xmpp:openpgp:0"
     xmlns="urn:xmpp:openpgp:0">
 
     <xs:element name="secretkey" type="xs:base64Binary"/>
 </xs:schema>
-""")
+"""
+)
 
 
 DEFAULT_TRUST_MODEL_PARAM = f"""
@@ -1005,9 +1003,10 @@
     @return: The passphrase.
     """
 
-    return "-".join("".join(
-        secrets.choice("123456789ABCDEFGHIJKLMNPQRSTUVWXYZ") for __ in range(4)
-    ) for __ in range(6))
+    return "-".join(
+        "".join(secrets.choice("123456789ABCDEFGHIJKLMNPQRSTUVWXYZ") for __ in range(4))
+        for __ in range(6)
+    )
 
 
 # TODO: Handle the user id mess
@@ -1038,15 +1037,14 @@
             PUBLIC_KEYS_LIST_NODE,
             lambda items_event, profile: defer.ensureDeferred(
                 self.__on_public_keys_list_update(items_event, profile)
-            )
+            ),
         )
 
     async def profile_connecting(self, client):
         client.gpg_provider = get_gpg_provider(self.host, client)
 
     async def profile_connected(  # pylint: disable=invalid-name
-        self,
-        client: SatXMPPClient
+        self, client: SatXMPPClient
     ) -> None:
         """
         @param client: The client.
@@ -1055,17 +1053,16 @@
         profile = cast(str, client.profile)
 
         if not profile in self.__storage:
-            self.__storage[profile] = \
-                persistent.LazyPersistentBinaryDict("XEP-0373", client.profile)
+            self.__storage[profile] = persistent.LazyPersistentBinaryDict(
+                "XEP-0373", client.profile
+            )
 
         if len(self.list_secret_keys(client)) == 0:
             log.debug(f"Generating first GPG key for {client.jid.userhost()}.")
             await self.create_key(client)
 
     async def __on_public_keys_list_update(
-        self,
-        items_event: pubsub.ItemsEvent,
-        profile: str
+        self, items_event: pubsub.ItemsEvent, profile: str
     ) -> None:
         """Handle public keys list updates fired by PEP.
 
@@ -1089,7 +1086,7 @@
 
         public_keys_list_elt = cast(
             Optional[domish.Element],
-            next(item_elt.elements(NS_OX, "public-keys-list"), None)
+            next(item_elt.elements(NS_OX, "public-keys-list"), None),
         )
 
         pubkey_metadata_elts: Optional[List[domish.Element]] = None
@@ -1100,17 +1097,21 @@
             except xmlschema.XMLSchemaValidationError:
                 pass
             else:
-                pubkey_metadata_elts = \
-                    list(public_keys_list_elt.elements(NS_OX, "pubkey-metadata"))
+                pubkey_metadata_elts = list(
+                    public_keys_list_elt.elements(NS_OX, "pubkey-metadata")
+                )
 
         if pubkey_metadata_elts is None:
             log.warning(f"Malformed public keys list update item: {item_elt.toXml()}")
             return
 
-        new_public_keys_metadata = { PublicKeyMetadata(
-            fingerprint=cast(str, pubkey_metadata_elt["v4-fingerprint"]),
-            timestamp=parse_datetime(cast(str, pubkey_metadata_elt["date"]))
-        ) for pubkey_metadata_elt in pubkey_metadata_elts }
+        new_public_keys_metadata = {
+            PublicKeyMetadata(
+                fingerprint=cast(str, pubkey_metadata_elt["v4-fingerprint"]),
+                timestamp=parse_datetime(cast(str, pubkey_metadata_elt["date"])),
+            )
+            for pubkey_metadata_elt in pubkey_metadata_elts
+        }
 
         storage_key = STR_KEY_PUBLIC_KEYS_METADATA.format(sender.userhost())
 
@@ -1140,11 +1141,15 @@
         # included in the update
         if sender.userhost() == client.jid.userhost():
             secret_keys = self.list_secret_keys(client)
-            missing_keys = set(filter(lambda secret_key: all(
-                key_metadata.fingerprint != secret_key.public_key.fingerprint
-                for key_metadata
-                in new_public_keys_metadata
-            ), secret_keys))
+            missing_keys = set(
+                filter(
+                    lambda secret_key: all(
+                        key_metadata.fingerprint != secret_key.public_key.fingerprint
+                        for key_metadata in new_public_keys_metadata
+                    ),
+                    secret_keys,
+                )
+            )
 
             if len(missing_keys) > 0:
                 log.warning(
@@ -1154,16 +1159,17 @@
 
                 for missing_key in missing_keys:
                     log.warning(missing_key.public_key.fingerprint)
-                    new_public_keys_metadata.add(PublicKeyMetadata(
-                        fingerprint=missing_key.public_key.fingerprint,
-                        timestamp=datetime.now(timezone.utc)
-                    ))
+                    new_public_keys_metadata.add(
+                        PublicKeyMetadata(
+                            fingerprint=missing_key.public_key.fingerprint,
+                            timestamp=datetime.now(timezone.utc),
+                        )
+                    )
 
                 await self.publish_public_keys_list(client, new_public_keys_metadata)
 
         await self.__storage[profile].force(
-            storage_key,
-            [pkm.to_dict() for pkm in new_public_keys_metadata]
+            storage_key, [pkm.to_dict() for pkm in new_public_keys_metadata]
         )
 
     def list_public_keys(self, client: SatXMPPClient, jid: jid.JID) -> Set[GPGPublicKey]:
@@ -1211,16 +1217,17 @@
             for pkm in await self.__storage[client.profile].get(storage_key, [])
         }
 
-        public_keys_list.add(PublicKeyMetadata(
-            fingerprint=secret_key.public_key.fingerprint,
-            timestamp=datetime.now(timezone.utc)
-        ))
+        public_keys_list.add(
+            PublicKeyMetadata(
+                fingerprint=secret_key.public_key.fingerprint,
+                timestamp=datetime.now(timezone.utc),
+            )
+        )
 
         await self.publish_public_keys_list(client, public_keys_list)
 
         await self.__storage[client.profile].force(
-            storage_key,
-            [pkm.to_dict() for pkm in public_keys_list]
+            storage_key, [pkm.to_dict() for pkm in public_keys_list]
         )
 
         return secret_key
@@ -1229,7 +1236,7 @@
     def __build_content_element(
         element_name: Literal["signcrypt", "sign", "crypt"],
         recipient_jids: Iterable[jid.JID],
-        include_rpad: bool
+        include_rpad: bool,
     ) -> Tuple[domish.Element, domish.Element]:
         """Build a content element.
 
@@ -1254,8 +1261,7 @@
             rpad_length = secrets.randbelow(201)
             rpad_content = "".join(
                 secrets.choice(string.digits + string.ascii_letters + string.punctuation)
-                for __
-                in range(rpad_length)
+                for __ in range(rpad_length)
             )
             content_elt.addElement("rpad", content=rpad_content)
 
@@ -1265,7 +1271,7 @@
 
     @staticmethod
     def build_signcrypt_element(
-        recipient_jids: Iterable[jid.JID]
+        recipient_jids: Iterable[jid.JID],
     ) -> Tuple[domish.Element, domish.Element]:
         """Build a ``<signcrypt/>`` content element.
 
@@ -1282,8 +1288,7 @@
 
     @staticmethod
     def build_sign_element(
-        recipient_jids: Iterable[jid.JID],
-        include_rpad: bool
+        recipient_jids: Iterable[jid.JID], include_rpad: bool
     ) -> Tuple[domish.Element, domish.Element]:
         """Build a ``<sign/>`` content element.
 
@@ -1302,7 +1307,7 @@
 
     @staticmethod
     def build_crypt_element(
-        recipient_jids: Iterable[jid.JID]
+        recipient_jids: Iterable[jid.JID],
     ) -> Tuple[domish.Element, domish.Element]:
         """Build a ``<crypt/>`` content element.
 
@@ -1319,7 +1324,7 @@
         self,
         client: SatXMPPClient,
         content_elt: domish.Element,
-        recipient_jids: Set[jid.JID]
+        recipient_jids: Set[jid.JID],
     ) -> domish.Element:
         """Build an ``<openpgp/>`` element.
 
@@ -1333,10 +1338,12 @@
 
         # TODO: I'm not sure whether we want to sign with all keys by default or choose
         # just one key/a subset of keys to sign with.
-        signing_keys = set(filter(
-            lambda secret_key: gpg_provider.can_sign(secret_key.public_key),
-            self.list_secret_keys(client)
-        ))
+        signing_keys = set(
+            filter(
+                lambda secret_key: gpg_provider.can_sign(secret_key.public_key),
+                self.list_secret_keys(client),
+            )
+        )
 
         encryption_keys: Set[GPGPublicKey] = set()
 
@@ -1370,7 +1377,7 @@
         client: SatXMPPClient,
         openpgp_elt: domish.Element,
         element_name: Literal["signcrypt", "sign", "crypt"],
-        sender_jid: jid.JID
+        sender_jid: jid.JID,
     ) -> Tuple[domish.Element, datetime]:
         """Verify, decrypt and unpack an ``<openpgp/>`` element.
 
@@ -1392,10 +1399,12 @@
 
         gpg_provider = get_gpg_provider(self.host, client)
 
-        decryption_keys = set(filter(
-            lambda secret_key: gpg_provider.can_encrypt(secret_key.public_key),
-            self.list_secret_keys(client)
-        ))
+        decryption_keys = set(
+            filter(
+                lambda secret_key: gpg_provider.can_encrypt(secret_key.public_key),
+                self.list_secret_keys(client),
+            )
+        )
 
         # import all keys of the sender
         all_public_keys = await self.import_all_public_keys(client, sender_jid)
@@ -1417,9 +1426,7 @@
 
         if element_name == "signcrypt":
             content = gpg_provider.decrypt(
-                openpgp_message,
-                decryption_keys,
-                public_keys=verification_keys
+                openpgp_message, decryption_keys, public_keys=verification_keys
             )
         elif element_name == "sign":
             content = gpg_provider.verify(openpgp_message, verification_keys)
@@ -1430,8 +1437,7 @@
 
         try:
             content_elt = cast(
-                domish.Element,
-                xml_tools.ElementParser()(content.decode("utf-8"))
+                domish.Element, xml_tools.ElementParser()(content.decode("utf-8"))
             )
         except UnicodeDecodeError as e:
             raise exceptions.ParsingError("UTF-8 decoding error") from e
@@ -1446,11 +1452,12 @@
         if content_elt.name != element_name:
             raise exceptions.ParsingError(f"Not a <{element_name}/> element.")
 
-        recipient_jids = \
-            { jid.JID(to_elt["jid"]) for to_elt in content_elt.elements(NS_OX, "to") }
+        recipient_jids = {
+            jid.JID(to_elt["jid"]) for to_elt in content_elt.elements(NS_OX, "to")
+        }
 
         if (
-            client.jid.userhostJID() not in { jid.userhostJID() for jid in recipient_jids }
+            client.jid.userhostJID() not in {jid.userhostJID() for jid in recipient_jids}
             and element_name != "crypt"
         ):
             raise VerificationError(
@@ -1467,9 +1474,7 @@
         return payload_elt, timestamp
 
     async def publish_public_key(
-        self,
-        client: SatXMPPClient,
-        public_key: GPGPublicKey
+        self, client: SatXMPPClient, public_key: GPGPublicKey
     ) -> None:
         """Publish a public key.
 
@@ -1499,19 +1504,17 @@
                     XEP_0060.EXTRA_PUBLISH_OPTIONS: {
                         XEP_0060.OPT_PERSIST_ITEMS: "true",
                         XEP_0060.OPT_ACCESS_MODEL: "open",
-                        XEP_0060.OPT_MAX_ITEMS: 1
+                        XEP_0060.OPT_MAX_ITEMS: 1,
                     },
                     # TODO: Do we really want publish_without_options here?
-                    XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options"
-                }
+                    XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options",
+                },
             )
         except Exception as e:
             raise XMPPInteractionFailed("Publishing the public key failed.") from e
 
     async def import_all_public_keys(
-        self,
-        client: SatXMPPClient,
-        entity_jid: jid.JID
+        self, client: SatXMPPClient, entity_jid: jid.JID
     ) -> Set[GPGPublicKey]:
         """import all public keys of a JID that have not been imported before.
 
@@ -1535,26 +1538,28 @@
                 client, entity_jid
             )
             if not public_keys_metadata:
-                raise exceptions.NotFound(
-                    f"Can't find public keys for {entity_jid}"
-                )
+                raise exceptions.NotFound(f"Can't find public keys for {entity_jid}")
             else:
                 await self.__storage[client.profile].aset(
-                    storage_key,
-                    [pkm.to_dict() for pkm in public_keys_metadata]
+                    storage_key, [pkm.to_dict() for pkm in public_keys_metadata]
                 )
 
-
-        missing_keys = set(filter(lambda public_key_metadata: all(
-            public_key_metadata.fingerprint != public_key.fingerprint
-            for public_key
-            in available_public_keys
-        ), public_keys_metadata))
+        missing_keys = set(
+            filter(
+                lambda public_key_metadata: all(
+                    public_key_metadata.fingerprint != public_key.fingerprint
+                    for public_key in available_public_keys
+                ),
+                public_keys_metadata,
+            )
+        )
 
         for missing_key in missing_keys:
             try:
                 available_public_keys.add(
-                    await self.import_public_key(client, entity_jid, missing_key.fingerprint)
+                    await self.import_public_key(
+                        client, entity_jid, missing_key.fingerprint
+                    )
                 )
             except Exception as e:
                 log.warning(
@@ -1565,10 +1570,7 @@
         return available_public_keys
 
     async def import_public_key(
-        self,
-        client: SatXMPPClient,
-        jid: jid.JID,
-        fingerprint: str
+        self, client: SatXMPPClient, jid: jid.JID, fingerprint: str
     ) -> GPGPublicKey:
         """import a public key.
 
@@ -1589,10 +1591,7 @@
 
         try:
             items, __ = await self.__xep_0060.get_items(
-                client,
-                jid.userhostJID(),
-                node,
-                max_items=1
+                client, jid.userhostJID(), node, max_items=1
             )
         except exceptions.NotFound as e:
             raise exceptions.NotFound(
@@ -1611,8 +1610,7 @@
             ) from e
 
         pubkey_elt = cast(
-            Optional[domish.Element],
-            next(item_elt.elements(NS_OX, "pubkey"), None)
+            Optional[domish.Element], next(item_elt.elements(NS_OX, "pubkey"), None)
         )
 
         if pubkey_elt is None:
@@ -1629,16 +1627,14 @@
                 f" schema validation."
             ) from e
 
-        public_key = gpg_provider.import_public_key(base64.b64decode(str(
-            next(pubkey_elt.elements(NS_OX, "data"))
-        )))
+        public_key = gpg_provider.import_public_key(
+            base64.b64decode(str(next(pubkey_elt.elements(NS_OX, "data"))))
+        )
 
         return public_key
 
     async def publish_public_keys_list(
-        self,
-        client: SatXMPPClient,
-        public_keys_list: Iterable[PublicKeyMetadata]
+        self, client: SatXMPPClient, public_keys_list: Iterable[PublicKeyMetadata]
     ) -> None:
         """Publish/update the own public keys list.
 
@@ -1650,7 +1646,7 @@
             beforehand.
         """
 
-        if len({ pkm.fingerprint for pkm in public_keys_list }) != len(public_keys_list):
+        if len({pkm.fingerprint for pkm in public_keys_list}) != len(public_keys_list):
             raise ValueError("Public keys list contains duplicate fingerprints.")
 
         node = "urn:xmpp:openpgp:0:public-keys"
@@ -1673,19 +1669,17 @@
                     XEP_0060.EXTRA_PUBLISH_OPTIONS: {
                         XEP_0060.OPT_PERSIST_ITEMS: "true",
                         XEP_0060.OPT_ACCESS_MODEL: "open",
-                        XEP_0060.OPT_MAX_ITEMS: 1
+                        XEP_0060.OPT_MAX_ITEMS: 1,
                     },
                     # TODO: Do we really want publish_without_options here?
-                    XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options"
-                }
+                    XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options",
+                },
             )
         except Exception as e:
             raise XMPPInteractionFailed("Publishing the public keys list failed.") from e
 
     async def download_public_keys_list(
-        self,
-        client: SatXMPPClient,
-        jid: jid.JID
+        self, client: SatXMPPClient, jid: jid.JID
     ) -> Optional[Set[PublicKeyMetadata]]:
         """Download the public keys list of a JID.
 
@@ -1701,10 +1695,7 @@
 
         try:
             items, __ = await self.__xep_0060.get_items(
-                client,
-                jid.userhostJID(),
-                node,
-                max_items=1
+                client, jid.userhostJID(), node, max_items=1
             )
         except exceptions.NotFound:
             return None
@@ -1718,7 +1709,7 @@
 
         public_keys_list_elt = cast(
             Optional[domish.Element],
-            next(item_elt.elements(NS_OX, "public-keys-list"), None)
+            next(item_elt.elements(NS_OX, "public-keys-list"), None),
         )
 
         if public_keys_list_elt is None:
@@ -1735,15 +1726,15 @@
         return {
             PublicKeyMetadata(
                 fingerprint=pubkey_metadata_elt["v4-fingerprint"],
-                timestamp=parse_datetime(pubkey_metadata_elt["date"])
+                timestamp=parse_datetime(pubkey_metadata_elt["date"]),
             )
-            for pubkey_metadata_elt
-            in public_keys_list_elt.elements(NS_OX, "pubkey-metadata")
+            for pubkey_metadata_elt in public_keys_list_elt.elements(
+                NS_OX, "pubkey-metadata"
+            )
         }
 
     async def __prepare_secret_key_synchronization(
-        self,
-        client: SatXMPPClient
+        self, client: SatXMPPClient
     ) -> Optional[domish.Element]:
         """Prepare for secret key synchronization.
 
@@ -1760,10 +1751,10 @@
         """
 
         try:
-            infos = cast(DiscoInfo, await self.host.memory.disco.get_infos(
-                client,
-                client.jid.userhostJID()
-            ))
+            infos = cast(
+                DiscoInfo,
+                await self.host.memory.disco.get_infos(client, client.jid.userhostJID()),
+            )
         except Exception as e:
             raise XMPPInteractionFailed(
                 "Error performing service discovery on the own bare JID."
@@ -1780,8 +1771,9 @@
                 "Server doesn't support the whitelist access model."
             )
 
-        persistent_items_supported = \
+        persistent_items_supported = (
             "http://jabber.org/protocol/pubsub#persistent-items" in features
+        )
 
         # TODO: persistent-items is a SHOULD, how do we handle the feature missing?
 
@@ -1789,10 +1781,7 @@
 
         try:
             items, __ = await self.__xep_0060.get_items(
-                client,
-                client.jid.userhostJID(),
-                node,
-                max_items=1
+                client, client.jid.userhostJID(), node, max_items=1
             )
         except exceptions.NotFound:
             try:
@@ -1803,8 +1792,8 @@
                     {
                         XEP_0060.OPT_PERSIST_ITEMS: "true",
                         XEP_0060.OPT_ACCESS_MODEL: "whitelist",
-                        XEP_0060.OPT_MAX_ITEMS: "1"
-                    }
+                        XEP_0060.OPT_MAX_ITEMS: "1",
+                    },
                 )
             except Exception as e:
                 raise XMPPInteractionFailed(
@@ -1821,9 +1810,7 @@
             return None
 
     async def export_secret_keys(
-        self,
-        client: SatXMPPClient,
-        secret_keys: Iterable[GPGSecretKey]
+        self, client: SatXMPPClient, secret_keys: Iterable[GPGSecretKey]
     ) -> str:
         """Export secret keys to synchronize them with other devices.
 
@@ -1854,10 +1841,7 @@
 
         try:
             await self.__xep_0060.send_item(
-                client,
-                client.jid.userhostJID(),
-                node,
-                secretkey_elt
+                client, client.jid.userhostJID(), node, secretkey_elt
             )
         except Exception as e:
             raise XMPPInteractionFailed("Publishing the secret keys failed.") from e
@@ -1885,8 +1869,7 @@
             return None
 
         secretkey_elt = cast(
-            Optional[domish.Element],
-            next(item_elt.elements(NS_OX, "secretkey"), None)
+            Optional[domish.Element], next(item_elt.elements(NS_OX, "secretkey"), None)
         )
 
         if secretkey_elt is None:
@@ -1902,10 +1885,7 @@
         return base64.b64decode(str(secretkey_elt))
 
     def import_secret_keys(
-        self,
-        client: SatXMPPClient,
-        ciphertext: bytes,
-        backup_code: str
+        self, client: SatXMPPClient, ciphertext: bytes, backup_code: str
     ) -> Set[GPGSecretKey]:
         """import previously downloaded secret keys.
 
@@ -1925,16 +1905,13 @@
 
         gpg_provider = get_gpg_provider(self.host, client)
 
-        return gpg_provider.restore_secret_keys(gpg_provider.decrypt_symmetrically(
-            ciphertext,
-            backup_code
-        ))
+        return gpg_provider.restore_secret_keys(
+            gpg_provider.decrypt_symmetrically(ciphertext, backup_code)
+        )
 
     @staticmethod
     def __get_joined_muc_users(
-        client: SatXMPPClient,
-        xep_0045: XEP_0045,
-        room_jid: jid.JID
+        client: SatXMPPClient, xep_0045: XEP_0045, room_jid: jid.JID
     ) -> Set[jid.JID]:
         """
         @param client: The client.
@@ -1968,10 +1945,7 @@
         return bare_jids
 
     async def get_trust(
-        self,
-        client: SatXMPPClient,
-        public_key: GPGPublicKey,
-        owner: jid.JID
+        self, client: SatXMPPClient, public_key: GPGPublicKey, owner: jid.JID
     ) -> TrustLevel:
         """Query the trust level of a public key.
 
@@ -1993,7 +1967,7 @@
         client: SatXMPPClient,
         public_key: GPGPublicKey,
         owner: jid.JID,
-        trust_level: TrustLevel
+        trust_level: TrustLevel,
     ) -> None:
         """Set the trust level of a public key.
 
@@ -2008,9 +1982,7 @@
         await self.__storage[client.profile].force(key, trust_level.name)
 
     async def get_trust_ui(  # pylint: disable=invalid-name
-        self,
-        client: SatXMPPClient,
-        entity: jid.JID
+        self, client: SatXMPPClient, entity: jid.JID
     ) -> xml_tools.XMLUI:
         """
         @param client: The client.
@@ -2026,17 +1998,17 @@
         if self.__xep_0045 is not None and self.__xep_0045.is_joined_room(client, entity):
             bare_jids = self.__get_joined_muc_users(client, self.__xep_0045, entity)
         else:
-            bare_jids = { entity.userhostJID() }
+            bare_jids = {entity.userhostJID()}
 
-        all_public_keys = list({
-            bare_jid: list(self.list_public_keys(client, bare_jid))
-            for bare_jid
-            in bare_jids
-        }.items())
+        all_public_keys = list(
+            {
+                bare_jid: list(self.list_public_keys(client, bare_jid))
+                for bare_jid in bare_jids
+            }.items()
+        )
 
         async def callback(
-            data: Any,
-            profile: str  # pylint: disable=unused-argument
+            data: Any, profile: str  # pylint: disable=unused-argument
         ) -> Dict[Never, Never]:
             """
             @param data: The XMLUI result produces by the trust UI form.
@@ -2049,8 +2021,7 @@
                 return {}
 
             data_form_result = cast(
-                Dict[str, str],
-                xml_tools.xmlui_result_2_data_form_result(data)
+                Dict[str, str], xml_tools.xmlui_result_2_data_form_result(data)
             )
             for key, value in data_form_result.items():
                 if not key.startswith("trust_"):
@@ -2070,22 +2041,22 @@
         submit_id = self.host.register_callback(callback, with_data=True, one_shot=True)
 
         result = xml_tools.XMLUI(
-            panel_type=C.XMLUI_FORM,
-            title=D_("OX trust management"),
-            submit_id=submit_id
+            panel_type=C.XMLUI_FORM, title=D_("OX trust management"), submit_id=submit_id
         )
         # Casting this to Any, otherwise all calls on the variable cause type errors
         # pylint: disable=no-member
         trust_ui = cast(Any, result)
-        trust_ui.addText(D_(
-            "This is OX trusting system. You'll see below the GPG keys of your "
-            "contacts, and a list selection to trust them or not. A trusted key "
-            "can read your messages in plain text, so be sure to only validate "
-            "keys that you are sure are belonging to your contact. It's better "
-            "to do this when you are next to your contact, so "
-            "you can check the \"fingerprint\" of the key "
-            "yourself. Do *not* validate a key if the fingerprint is wrong!"
-        ))
+        trust_ui.addText(
+            D_(
+                "This is OX trusting system. You'll see below the GPG keys of your "
+                "contacts, and a list selection to trust them or not. A trusted key "
+                "can read your messages in plain text, so be sure to only validate "
+                "keys that you are sure are belonging to your contact. It's better "
+                "to do this when you are next to your contact, so "
+                'you can check the "fingerprint" of the key '
+                "yourself. Do *not* validate a key if the fingerprint is wrong!"
+            )
+        )
 
         own_secret_keys = self.list_secret_keys(client)
 
@@ -2096,7 +2067,7 @@
             trust_ui.addEmpty()
             trust_ui.addEmpty()
 
-        for outer_index, [ owner, public_keys ] in enumerate(all_public_keys):
+        for outer_index, [owner, public_keys] in enumerate(all_public_keys):
             for inner_index, public_key in enumerate(public_keys):
                 trust_ui.addLabel(D_("Contact"))
                 trust_ui.addJid(jid.JID(owner))
@@ -2105,14 +2076,17 @@
                 trust_ui.addLabel(D_("Trust this device?"))
 
                 current_trust_level = await self.get_trust(client, public_key, owner)
-                avaiable_trust_levels = \
-                    { TrustLevel.DISTRUSTED, TrustLevel.TRUSTED, current_trust_level }
+                avaiable_trust_levels = {
+                    TrustLevel.DISTRUSTED,
+                    TrustLevel.TRUSTED,
+                    current_trust_level,
+                }
 
                 trust_ui.addList(
                     f"trust_{outer_index}_{inner_index}",
-                    options=[ trust_level.name for trust_level in avaiable_trust_levels ],
+                    options=[trust_level.name for trust_level in avaiable_trust_levels],
                     selected=current_trust_level.name,
-                    styles=[ "inline" ]
+                    styles=["inline"],
                 )
 
                 trust_ui.addEmpty()