Mercurial > libervia-backend
diff libervia/backend/memory/encryption.py @ 4270:0d7bb4df2343
Reformatted code base using black.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 19 Jun 2024 18:44:57 +0200 |
parents | 4b842c1fb686 |
children |
line wrap: on
line diff
--- a/libervia/backend/memory/encryption.py Tue Jun 18 12:06:45 2024 +0200 +++ b/libervia/backend/memory/encryption.py Wed Jun 19 18:44:57 2024 +0200 @@ -23,7 +23,11 @@ from twisted.words.protocols.jabber import jid from twisted.internet import defer from twisted.python import failure -from libervia.backend.core.core_types import EncryptionPlugin, EncryptionSession, MessageData +from libervia.backend.core.core_types import ( + EncryptionPlugin, + EncryptionSession, + MessageData, +) from libervia.backend.core.i18n import D_, _ from libervia.backend.core.constants import Const as C from libervia.backend.core import exceptions @@ -38,13 +42,15 @@ class EncryptionHandler: """Class to handle encryption sessions for a client""" + plugins = [] # plugin able to encrypt messages def __init__(self, client): self.client = client self._sessions = {} # bare_jid ==> encryption_data self._stored_session = persistent.PersistentDict( - "core:encryption", profile=client.profile) + "core:encryption", profile=client.profile + ) @property def host(self): @@ -63,9 +69,11 @@ for idx, (success, err) in enumerate(result): if not success: entity_jid_s, namespace = list(self._stored_session.items())[idx] - log.warning(_( - "Could not restart {namespace!r} encryption with {entity}: {err}" - ).format(namespace=namespace, entity=entity_jid_s, err=err)) + log.warning( + _( + "Could not restart {namespace!r} encryption with {entity}: {err}" + ).format(namespace=namespace, entity=entity_jid_s, err=err) + ) log.info(_("encryption sessions restored")) @classmethod @@ -105,7 +113,8 @@ name=name, namespace=namespace, priority=priority, - directed=directed) + directed=directed, + ) cls.plugins.append(plugin) cls.plugins.sort(key=lambda p: p.priority) log.info(_("Encryption plugin registered: {name}").format(name=name)) @@ -119,9 +128,11 @@ try: return next(p for p in cls.plugins if p.namespace == namespace) except StopIteration: - raise exceptions.NotFound(_( - "Can't find requested encryption plugin: {namespace}").format( - namespace=namespace)) + raise exceptions.NotFound( + _("Can't find requested encryption plugin: {namespace}").format( + namespace=namespace + ) + ) @classmethod def get_namespaces(cls): @@ -139,9 +150,9 @@ for p in cls.plugins: if p.name.lower() == name.lower(): return p.namespace - raise exceptions.NotFound(_( - "Can't find a plugin with the name \"{name}\".".format( - name=name))) + raise exceptions.NotFound( + _('Can\'t find a plugin with the name "{name}".'.format(name=name)) + ) def get_bridge_data(self, session): """Retrieve session data serialized for bridge. @@ -150,12 +161,11 @@ @return (unicode): serialized data for bridge """ if session is None: - return '' - plugin = session['plugin'] - bridge_data = {'name': plugin.name, - 'namespace': plugin.namespace} - if 'directed_devices' in session: - bridge_data['directed_devices'] = session['directed_devices'] + return "" + plugin = session["plugin"] + bridge_data = {"name": plugin.name, "namespace": plugin.namespace} + if "directed_devices" in session: + bridge_data["directed_devices"] = session["directed_devices"] return data_format.serialise(bridge_data) @@ -205,8 +215,12 @@ it will be replaced by the new one """ if not self.plugins: - raise exceptions.NotFound(_("No encryption plugin is registered, " - "an encryption session can't be started")) + raise exceptions.NotFound( + _( + "No encryption plugin is registered, " + "an encryption session can't be started" + ) + ) if namespace is None: plugin = self.plugins[0] @@ -218,9 +232,12 @@ # we have already an encryption session with this contact former_plugin = self._sessions[bare_jid]["plugin"] if former_plugin.namespace == namespace: - log.info(_("Session with {bare_jid} is already encrypted with {name}. " - "Nothing to do.").format( - bare_jid=bare_jid, name=former_plugin.name)) + log.info( + _( + "Session with {bare_jid} is already encrypted with {name}. " + "Nothing to do." + ).format(bare_jid=bare_jid, name=former_plugin.name) + ) return if replace: @@ -229,9 +246,10 @@ del self._sessions[bare_jid] await self._stop_encryption(former_plugin, entity) else: - msg = (_("Session with {bare_jid} is already encrypted with {name}. " - "Please stop encryption session before changing algorithm.") - .format(bare_jid=bare_jid, name=plugin.name)) + msg = _( + "Session with {bare_jid} is already encrypted with {name}. " + "Please stop encryption session before changing algorithm." + ).format(bare_jid=bare_jid, name=plugin.name) log.warning(msg) raise exceptions.ConflictError(msg) @@ -241,34 +259,44 @@ entity.resource = self.host.memory.main_resource_get(self.client, entity) if not entity.resource: raise exceptions.NotFound( - _("No resource found for {destinee}, can't encrypt with {name}") - .format(destinee=entity.full(), name=plugin.name)) - log.info(_("No resource specified to encrypt with {name}, using " - "{destinee}.").format(destinee=entity.full(), - name=plugin.name)) + _( + "No resource found for {destinee}, can't encrypt with {name}" + ).format(destinee=entity.full(), name=plugin.name) + ) + log.info( + _( + "No resource specified to encrypt with {name}, using " + "{destinee}." + ).format(destinee=entity.full(), name=plugin.name) + ) # indicate that we encrypt only for some devices - directed_devices = data['directed_devices'] = [entity.resource] + directed_devices = data["directed_devices"] = [entity.resource] elif entity.resource: raise ValueError(_("{name} encryption must be used with bare jids.")) await self._start_encryption(plugin, entity) self._sessions[entity.userhostJID()] = data - log.info(_("Encryption session has been set for {entity_jid} with " - "{encryption_name}").format( - entity_jid=entity.full(), encryption_name=plugin.name)) + log.info( + _( + "Encryption session has been set for {entity_jid} with " + "{encryption_name}" + ).format(entity_jid=entity.full(), encryption_name=plugin.name) + ) self.host.bridge.message_encryption_started( - entity.full(), - self.get_bridge_data(data), - self.client.profile) - msg = D_("Encryption session started: your messages with {destinee} are " - "now end to end encrypted using {name} algorithm.").format( - destinee=entity.full(), name=plugin.name) - directed_devices = data.get('directed_devices') + entity.full(), self.get_bridge_data(data), self.client.profile + ) + msg = D_( + "Encryption session started: your messages with {destinee} are " + "now end to end encrypted using {name} algorithm." + ).format(destinee=entity.full(), name=plugin.name) + directed_devices = data.get("directed_devices") if directed_devices: - msg += "\n" + D_("Message are encrypted only for {nb_devices} device(s): " - "{devices_list}.").format( - nb_devices=len(directed_devices), - devices_list = ', '.join(directed_devices)) + msg += "\n" + D_( + "Message are encrypted only for {nb_devices} device(s): " + "{devices_list}." + ).format( + nb_devices=len(directed_devices), devices_list=", ".join(directed_devices) + ) self.client.feedback(bare_jid, msg) @@ -283,29 +311,38 @@ session = self.getSession(entity.userhostJID()) if not session: raise failure.Failure( - exceptions.NotFound(_("There is no encryption session with this " - "entity."))) - plugin = session['plugin'] + exceptions.NotFound( + _("There is no encryption session with this " "entity.") + ) + ) + plugin = session["plugin"] if namespace is not None and plugin.namespace != namespace: - raise exceptions.InternalError(_( - "The encryption session is not run with the expected plugin: encrypted " - "with {current_name} and was expecting {expected_name}").format( - current_name=session['plugin'].namespace, - expected_name=namespace)) + raise exceptions.InternalError( + _( + "The encryption session is not run with the expected plugin: encrypted " + "with {current_name} and was expecting {expected_name}" + ).format( + current_name=session["plugin"].namespace, expected_name=namespace + ) + ) if entity.resource: try: - directed_devices = session['directed_devices'] + directed_devices = session["directed_devices"] except KeyError: - raise exceptions.NotFound(_( - "There is a session for the whole entity (i.e. all devices of the " - "entity), not a directed one. Please use bare jid if you want to " - "stop the whole encryption with this entity.")) + raise exceptions.NotFound( + _( + "There is a session for the whole entity (i.e. all devices of the " + "entity), not a directed one. Please use bare jid if you want to " + "stop the whole encryption with this entity." + ) + ) try: directed_devices.remove(entity.resource) except ValueError: - raise exceptions.NotFound(_("There is no directed session with this " - "entity.")) + raise exceptions.NotFound( + _("There is no directed session with this " "entity.") + ) else: if not directed_devices: # if we have no more directed device sessions, @@ -319,18 +356,24 @@ del self._sessions[entity.userhostJID()] await self._stop_encryption(plugin, entity) - log.info(_("encryption session stopped with entity {entity}").format( - entity=entity.full())) + log.info( + _("encryption session stopped with entity {entity}").format( + entity=entity.full() + ) + ) self.host.bridge.message_encryption_stopped( entity.full(), - {'name': plugin.name, - 'namespace': plugin.namespace, + { + "name": plugin.name, + "namespace": plugin.namespace, }, - self.client.profile) - msg = D_("Encryption session finished: your messages with {destinee} are " - "NOT end to end encrypted anymore.\nYour server administrators or " - "{destinee} server administrators will be able to read them.").format( - destinee=entity.full()) + self.client.profile, + ) + msg = D_( + "Encryption session finished: your messages with {destinee} are " + "NOT end to end encrypted anymore.\nYour server administrators or " + "{destinee} server administrators will be able to read them." + ).format(destinee=entity.full()) self.client.feedback(entity, msg) @@ -375,16 +418,19 @@ session = self.getSession(entity_jid) if not session: raise exceptions.NotFound( - "No encryption session currently active for {entity_jid}" - .format(entity_jid=entity_jid.full())) - plugin = session['plugin'] + "No encryption session currently active for {entity_jid}".format( + entity_jid=entity_jid.full() + ) + ) + plugin = session["plugin"] else: plugin = self.get_plugin(namespace) try: get_trust_ui = plugin.instance.get_trust_ui except AttributeError: raise NotImplementedError( - "Encryption plugin doesn't handle trust management UI") + "Encryption plugin doesn't handle trust management UI" + ) else: return utils.as_deferred(get_trust_ui, self.client, entity_jid) @@ -393,32 +439,32 @@ @classmethod def _import_menus(cls, host): host.import_menu( - (D_("Encryption"), D_("unencrypted (plain text)")), - partial(cls._on_menu_unencrypted, host=host), - security_limit=0, - help_string=D_("End encrypted session"), - type_=C.MENU_SINGLE, + (D_("Encryption"), D_("unencrypted (plain text)")), + partial(cls._on_menu_unencrypted, host=host), + security_limit=0, + help_string=D_("End encrypted session"), + type_=C.MENU_SINGLE, ) for plg in cls.getPlugins(): host.import_menu( - (D_("Encryption"), plg.name), - partial(cls._on_menu_name, host=host, plg=plg), - security_limit=0, - help_string=D_("Start {name} session").format(name=plg.name), - type_=C.MENU_SINGLE, + (D_("Encryption"), plg.name), + partial(cls._on_menu_name, host=host, plg=plg), + security_limit=0, + help_string=D_("Start {name} session").format(name=plg.name), + type_=C.MENU_SINGLE, ) host.import_menu( - (D_("Encryption"), D_("⛨ {name} trust").format(name=plg.name)), - partial(cls._on_menu_trust, host=host, plg=plg), - security_limit=0, - help_string=D_("Manage {name} trust").format(name=plg.name), - type_=C.MENU_SINGLE, + (D_("Encryption"), D_("⛨ {name} trust").format(name=plg.name)), + partial(cls._on_menu_trust, host=host, plg=plg), + security_limit=0, + help_string=D_("Manage {name} trust").format(name=plg.name), + type_=C.MENU_SINGLE, ) @classmethod def _on_menu_unencrypted(cls, data, host, profile): client = host.get_client(profile) - peer_jid = jid.JID(data['jid']).userhostJID() + peer_jid = jid.JID(data["jid"]).userhostJID() d = defer.ensureDeferred(client.encryption.stop(peer_jid)) d.addCallback(lambda __: {}) return d @@ -426,11 +472,12 @@ @classmethod def _on_menu_name(cls, data, host, plg, profile): client = host.get_client(profile) - peer_jid = jid.JID(data['jid']) + peer_jid = jid.JID(data["jid"]) if not plg.directed: peer_jid = peer_jid.userhostJID() d = defer.ensureDeferred( - client.encryption.start(peer_jid, plg.namespace, replace=True)) + client.encryption.start(peer_jid, plg.namespace, replace=True) + ) d.addCallback(lambda __: {}) return d @@ -438,22 +485,23 @@ @defer.inlineCallbacks def _on_menu_trust(cls, data, host, plg, profile): client = host.get_client(profile) - peer_jid = jid.JID(data['jid']).userhostJID() + peer_jid = jid.JID(data["jid"]).userhostJID() ui = yield client.encryption.get_trust_ui(peer_jid, plg.namespace) - defer.returnValue({'xmlui': ui.toXml()}) + defer.returnValue({"xmlui": ui.toXml()}) ## Triggers ## def set_encryption_flag(self, mess_data): """Set "encryption" key in mess_data if session with destinee is encrypted""" - to_jid = mess_data['to'] + to_jid = mess_data["to"] encryption = self._sessions.get(to_jid.userhostJID()) if encryption is not None: - plugin = encryption['plugin'] + plugin = encryption["plugin"] if mess_data["type"] == "groupchat" and plugin.directed: raise exceptions.InternalError( - f"encryption flag must not be set for groupchat if encryption algorithm " - f"({encryption['plugin'].name}) is directed!") + f"encryption flag must not be set for groupchat if encryption algorithm " + f"({encryption['plugin'].name}) is directed!" + ) mess_data[C.MESS_KEY_ENCRYPTION] = encryption self.mark_as_encrypted(mess_data, plugin.namespace) @@ -467,26 +515,25 @@ @param mess_data(dict): message data as used in post treat workflow @param namespace(str): namespace of the algorithm used for encrypting the message """ - mess_data['extra'][C.MESS_KEY_ENCRYPTED] = True - from_bare_jid = mess_data['from'].userhostJID() + mess_data["extra"][C.MESS_KEY_ENCRYPTED] = True + from_bare_jid = mess_data["from"].userhostJID() if from_bare_jid != self.client.jid.userhostJID(): session = self.getSession(from_bare_jid) if session is None: # if we are currently unencrypted, we start a session automatically # to avoid sending unencrypted messages in an encrypted context - log.info(_( - "Starting e2e session with {peer_jid} as we receive encrypted " - "messages") - .format(peer_jid=from_bare_jid) + log.info( + _( + "Starting e2e session with {peer_jid} as we receive encrypted " + "messages" + ).format(peer_jid=from_bare_jid) ) defer.ensureDeferred(self.start(from_bare_jid, namespace)) return mess_data def is_encryption_requested( - self, - mess_data: MessageData, - namespace: Optional[str] = None + self, mess_data: MessageData, namespace: Optional[str] = None ) -> bool: """Helper method to check if encryption is requested in an outgoind message @@ -499,7 +546,7 @@ if encryption is None: return False # we get plugin even if namespace is None to be sure that the key exists - plugin = encryption['plugin'] + plugin = encryption["plugin"] if namespace is None: return True return plugin.namespace == namespace @@ -510,8 +557,7 @@ @param mess_data(dict): message data @return (bool): True if the encrypted flag is present """ - return mess_data['extra'].get(C.MESS_KEY_ENCRYPTED, False) - + return mess_data["extra"].get(C.MESS_KEY_ENCRYPTED, False) def mark_as_trusted(self, mess_data): """Helper methor to mark a message as sent from a trusted entity. @@ -530,5 +576,5 @@ the plugin @param mess_data(dict): message data as used in post treat workflow """ - mess_data['trusted'] = False + mess_data["trusted"] = False return mess_data