Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_xep_0391.py @ 4270:0d7bb4df2343
Reformatted code base using black.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 19 Jun 2024 18:44:57 +0200 |
parents | e11b13418ba6 |
children |
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_xep_0391.py Tue Jun 18 12:06:45 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0391.py Wed Jun 19 18:44:57 2024 +0200 @@ -77,21 +77,11 @@ self.host = host self._o = host.plugins["XEP-0384"] self._j = host.plugins["XEP-0166"] - host.trigger.add( - "XEP-0166_initiate_elt_built", - self._on_initiate_elt_build - ) + host.trigger.add("XEP-0166_initiate_elt_built", self._on_initiate_elt_build) + host.trigger.add("XEP-0166_on_session_initiate", self._on_session_initiate) + host.trigger.add("XEP-0234_jingle_handler", self._add_encryption_filter) host.trigger.add( - "XEP-0166_on_session_initiate", - self._on_session_initiate - ) - host.trigger.add( - "XEP-0234_jingle_handler", - self._add_encryption_filter - ) - host.trigger.add( - "XEP-0234_file_receiving_request_conf", - self._add_encryption_filter + "XEP-0234_file_receiving_request_conf", self._add_encryption_filter ) def get_handler(self, client): @@ -102,11 +92,12 @@ client: SatXMPPEntity, session: Dict[str, Any], iq_elt: domish.Element, - jingle_elt: domish.Element + jingle_elt: domish.Element, ) -> bool: - if client.encryption.get_namespace( - session["peer_jid"].userhostJID() - ) != self._o.NS_OLDMEMO: + if ( + client.encryption.get_namespace(session["peer_jid"].userhostJID()) + != self._o.NS_OLDMEMO + ): return True for content_elt in jingle_elt.elements(self._j.namespace, "content"): content_data = session["contents"][content_elt["name"]] @@ -126,7 +117,7 @@ security_elt["type"] = enc_type encryption_data = content_data["encryption"] = { "cipher": cipher, - "type": enc_type + "type": enc_type, } session_manager = await self._o.get_session_manager(client.profile) await self._o.download_missing_device_lists( @@ -136,19 +127,16 @@ messages, encryption_errors = await session_manager.encrypt( frozenset({session["peer_jid"].userhost()}), # the value seems to be the commonly used value - { self._o.NS_OLDMEMO: b" " }, - backend_priority_order=[ self._o.NS_OLDMEMO ], - identifier = client.jid.userhost() + {self._o.NS_OLDMEMO: b" "}, + backend_priority_order=[self._o.NS_OLDMEMO], + identifier=client.jid.userhost(), ) except Exception as e: log.exception("Can't generate IV and keys") raise e message, plain_key_material = next(iter(messages.items())) iv, key = message.content.initialization_vector, plain_key_material.key - content_data["encryption"].update({ - "iv": iv, - "key": key - }) + content_data["encryption"].update({"iv": iv, "key": key}) encrypted_elt = xml_tools.et_elt_2_domish_elt( oldmemo.etree.serialize_message(message) ) @@ -160,7 +148,7 @@ client: SatXMPPEntity, session: Dict[str, Any], iq_elt: domish.Element, - jingle_elt: domish.Element + jingle_elt: domish.Element, ) -> bool: for content_elt in jingle_elt.elements(self._j.namespace, "content"): content_data = session["contents"][content_elt["name"]] @@ -181,7 +169,7 @@ xml_tools.domish_elt_2_et_elt(encrypted_elt, False), session["peer_jid"].userhost(), client.jid.userhost(), - session_manager + session_manager, ) __, __, plain_key_material = await session_manager.decrypt(message) except Exception as e: @@ -192,7 +180,7 @@ "cipher": security_elt["cipher"], "type": security_elt["type"], "iv": message.content.initialization_vector, - "key": plain_key_material.key + "key": plain_key_material.key, } except KeyError as e: log.warning(f"missing data, can't decrypt: {e}") @@ -201,10 +189,7 @@ return True def __encrypt( - self, - data: bytes, - encryptor: CipherContext, - data_cb: Callable + self, data: bytes, encryptor: CipherContext, data_cb: Callable ) -> bytes: data_cb(data) if data: @@ -213,23 +198,23 @@ try: return encryptor.finalize() + encryptor.tag except AlreadyFinalized: - return b'' + return b"" def __decrypt( self, data: bytes, buffer: list[bytes], decryptor: CipherContext, - data_cb: Callable + data_cb: Callable, ) -> bytes: buffer.append(data) - data = b''.join(buffer) + data = b"".join(buffer) buffer.clear() if len(data) > 16: decrypted = decryptor.update(data[:-16]) data_cb(decrypted) else: - decrypted = b'' + decrypted = b"" buffer.append(data[-16:]) return decrypted @@ -239,7 +224,7 @@ buffer: list[bytes], decryptor: CipherContext, ) -> None: - tag = b''.join(buffer) + tag = b"".join(buffer) file_obj.write(decryptor.finalize_with_tag(tag)) async def _add_encryption_filter( @@ -247,7 +232,7 @@ client: SatXMPPEntity, session: Dict[str, Any], content_data: Dict[str, Any], - elt: domish.Element + elt: domish.Element, ) -> bool: try: file_obj = content_data["stream_object"].file_obj @@ -258,7 +243,7 @@ log.debug("JET skipped due to webrtc transport.") return True try: - encryption_data=content_data["encryption"] + encryption_data = content_data["encryption"] except KeyError: return True cipher = ciphers.Cipher( @@ -274,20 +259,18 @@ self.__decrypt_finalize, file_obj=file_obj, buffer=buffer, - decryptor=decryptor + decryptor=decryptor, ) file_obj.data_cb = partial( self.__decrypt, buffer=buffer, decryptor=decryptor, - data_cb=file_obj.data_cb + data_cb=file_obj.data_cb, ) else: # we are sending a file file_obj.data_cb = partial( - self.__encrypt, - encryptor=cipher.encryptor(), - data_cb=file_obj.data_cb + self.__encrypt, encryptor=cipher.encryptor(), data_cb=file_obj.data_cb ) return True