diff libervia/backend/plugins/plugin_xep_0391.py @ 4270:0d7bb4df2343

Reformatted code base using black.
author Goffi <goffi@goffi.org>
date Wed, 19 Jun 2024 18:44:57 +0200
parents e11b13418ba6
children
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_xep_0391.py	Tue Jun 18 12:06:45 2024 +0200
+++ b/libervia/backend/plugins/plugin_xep_0391.py	Wed Jun 19 18:44:57 2024 +0200
@@ -77,21 +77,11 @@
         self.host = host
         self._o = host.plugins["XEP-0384"]
         self._j = host.plugins["XEP-0166"]
-        host.trigger.add(
-            "XEP-0166_initiate_elt_built",
-            self._on_initiate_elt_build
-        )
+        host.trigger.add("XEP-0166_initiate_elt_built", self._on_initiate_elt_build)
+        host.trigger.add("XEP-0166_on_session_initiate", self._on_session_initiate)
+        host.trigger.add("XEP-0234_jingle_handler", self._add_encryption_filter)
         host.trigger.add(
-            "XEP-0166_on_session_initiate",
-            self._on_session_initiate
-        )
-        host.trigger.add(
-            "XEP-0234_jingle_handler",
-            self._add_encryption_filter
-        )
-        host.trigger.add(
-            "XEP-0234_file_receiving_request_conf",
-            self._add_encryption_filter
+            "XEP-0234_file_receiving_request_conf", self._add_encryption_filter
         )
 
     def get_handler(self, client):
@@ -102,11 +92,12 @@
         client: SatXMPPEntity,
         session: Dict[str, Any],
         iq_elt: domish.Element,
-        jingle_elt: domish.Element
+        jingle_elt: domish.Element,
     ) -> bool:
-        if client.encryption.get_namespace(
-               session["peer_jid"].userhostJID()
-           ) != self._o.NS_OLDMEMO:
+        if (
+            client.encryption.get_namespace(session["peer_jid"].userhostJID())
+            != self._o.NS_OLDMEMO
+        ):
             return True
         for content_elt in jingle_elt.elements(self._j.namespace, "content"):
             content_data = session["contents"][content_elt["name"]]
@@ -126,7 +117,7 @@
             security_elt["type"] = enc_type
             encryption_data = content_data["encryption"] = {
                 "cipher": cipher,
-                "type": enc_type
+                "type": enc_type,
             }
             session_manager = await self._o.get_session_manager(client.profile)
             await self._o.download_missing_device_lists(
@@ -136,19 +127,16 @@
                 messages, encryption_errors = await session_manager.encrypt(
                     frozenset({session["peer_jid"].userhost()}),
                     # the value seems to be the commonly used value
-                    { self._o.NS_OLDMEMO: b" " },
-                    backend_priority_order=[ self._o.NS_OLDMEMO ],
-                    identifier = client.jid.userhost()
+                    {self._o.NS_OLDMEMO: b" "},
+                    backend_priority_order=[self._o.NS_OLDMEMO],
+                    identifier=client.jid.userhost(),
                 )
             except Exception as e:
                 log.exception("Can't generate IV and keys")
                 raise e
             message, plain_key_material = next(iter(messages.items()))
             iv, key = message.content.initialization_vector, plain_key_material.key
-            content_data["encryption"].update({
-                "iv": iv,
-                "key": key
-            })
+            content_data["encryption"].update({"iv": iv, "key": key})
             encrypted_elt = xml_tools.et_elt_2_domish_elt(
                 oldmemo.etree.serialize_message(message)
             )
@@ -160,7 +148,7 @@
         client: SatXMPPEntity,
         session: Dict[str, Any],
         iq_elt: domish.Element,
-        jingle_elt: domish.Element
+        jingle_elt: domish.Element,
     ) -> bool:
         for content_elt in jingle_elt.elements(self._j.namespace, "content"):
             content_data = session["contents"][content_elt["name"]]
@@ -181,7 +169,7 @@
                     xml_tools.domish_elt_2_et_elt(encrypted_elt, False),
                     session["peer_jid"].userhost(),
                     client.jid.userhost(),
-                    session_manager
+                    session_manager,
                 )
                 __, __, plain_key_material = await session_manager.decrypt(message)
             except Exception as e:
@@ -192,7 +180,7 @@
                     "cipher": security_elt["cipher"],
                     "type": security_elt["type"],
                     "iv": message.content.initialization_vector,
-                    "key": plain_key_material.key
+                    "key": plain_key_material.key,
                 }
             except KeyError as e:
                 log.warning(f"missing data, can't decrypt: {e}")
@@ -201,10 +189,7 @@
         return True
 
     def __encrypt(
-        self,
-        data: bytes,
-        encryptor: CipherContext,
-        data_cb: Callable
+        self, data: bytes, encryptor: CipherContext, data_cb: Callable
     ) -> bytes:
         data_cb(data)
         if data:
@@ -213,23 +198,23 @@
             try:
                 return encryptor.finalize() + encryptor.tag
             except AlreadyFinalized:
-                return b''
+                return b""
 
     def __decrypt(
         self,
         data: bytes,
         buffer: list[bytes],
         decryptor: CipherContext,
-        data_cb: Callable
+        data_cb: Callable,
     ) -> bytes:
         buffer.append(data)
-        data = b''.join(buffer)
+        data = b"".join(buffer)
         buffer.clear()
         if len(data) > 16:
             decrypted = decryptor.update(data[:-16])
             data_cb(decrypted)
         else:
-            decrypted = b''
+            decrypted = b""
         buffer.append(data[-16:])
         return decrypted
 
@@ -239,7 +224,7 @@
         buffer: list[bytes],
         decryptor: CipherContext,
     ) -> None:
-        tag = b''.join(buffer)
+        tag = b"".join(buffer)
         file_obj.write(decryptor.finalize_with_tag(tag))
 
     async def _add_encryption_filter(
@@ -247,7 +232,7 @@
         client: SatXMPPEntity,
         session: Dict[str, Any],
         content_data: Dict[str, Any],
-        elt: domish.Element
+        elt: domish.Element,
     ) -> bool:
         try:
             file_obj = content_data["stream_object"].file_obj
@@ -258,7 +243,7 @@
                 log.debug("JET skipped due to webrtc transport.")
                 return True
         try:
-            encryption_data=content_data["encryption"]
+            encryption_data = content_data["encryption"]
         except KeyError:
             return True
         cipher = ciphers.Cipher(
@@ -274,20 +259,18 @@
                 self.__decrypt_finalize,
                 file_obj=file_obj,
                 buffer=buffer,
-                decryptor=decryptor
+                decryptor=decryptor,
             )
             file_obj.data_cb = partial(
                 self.__decrypt,
                 buffer=buffer,
                 decryptor=decryptor,
-                data_cb=file_obj.data_cb
+                data_cb=file_obj.data_cb,
             )
         else:
             # we are sending a file
             file_obj.data_cb = partial(
-                self.__encrypt,
-                encryptor=cipher.encryptor(),
-                data_cb=file_obj.data_cb
+                self.__encrypt, encryptor=cipher.encryptor(), data_cb=file_obj.data_cb
             )
 
         return True