diff libervia/backend/memory/encryption.py @ 4270:0d7bb4df2343

Reformatted code base using black.
author Goffi <goffi@goffi.org>
date Wed, 19 Jun 2024 18:44:57 +0200
parents 4b842c1fb686
children
line wrap: on
line diff
--- a/libervia/backend/memory/encryption.py	Tue Jun 18 12:06:45 2024 +0200
+++ b/libervia/backend/memory/encryption.py	Wed Jun 19 18:44:57 2024 +0200
@@ -23,7 +23,11 @@
 from twisted.words.protocols.jabber import jid
 from twisted.internet import defer
 from twisted.python import failure
-from libervia.backend.core.core_types import EncryptionPlugin, EncryptionSession, MessageData
+from libervia.backend.core.core_types import (
+    EncryptionPlugin,
+    EncryptionSession,
+    MessageData,
+)
 from libervia.backend.core.i18n import D_, _
 from libervia.backend.core.constants import Const as C
 from libervia.backend.core import exceptions
@@ -38,13 +42,15 @@
 
 class EncryptionHandler:
     """Class to handle encryption sessions for a client"""
+
     plugins = []  # plugin able to encrypt messages
 
     def __init__(self, client):
         self.client = client
         self._sessions = {}  # bare_jid ==> encryption_data
         self._stored_session = persistent.PersistentDict(
-            "core:encryption", profile=client.profile)
+            "core:encryption", profile=client.profile
+        )
 
     @property
     def host(self):
@@ -63,9 +69,11 @@
             for idx, (success, err) in enumerate(result):
                 if not success:
                     entity_jid_s, namespace = list(self._stored_session.items())[idx]
-                    log.warning(_(
-                        "Could not restart {namespace!r} encryption with {entity}: {err}"
-                        ).format(namespace=namespace, entity=entity_jid_s, err=err))
+                    log.warning(
+                        _(
+                            "Could not restart {namespace!r} encryption with {entity}: {err}"
+                        ).format(namespace=namespace, entity=entity_jid_s, err=err)
+                    )
             log.info(_("encryption sessions restored"))
 
     @classmethod
@@ -105,7 +113,8 @@
             name=name,
             namespace=namespace,
             priority=priority,
-            directed=directed)
+            directed=directed,
+        )
         cls.plugins.append(plugin)
         cls.plugins.sort(key=lambda p: p.priority)
         log.info(_("Encryption plugin registered: {name}").format(name=name))
@@ -119,9 +128,11 @@
         try:
             return next(p for p in cls.plugins if p.namespace == namespace)
         except StopIteration:
-            raise exceptions.NotFound(_(
-                "Can't find requested encryption plugin: {namespace}").format(
-                    namespace=namespace))
+            raise exceptions.NotFound(
+                _("Can't find requested encryption plugin: {namespace}").format(
+                    namespace=namespace
+                )
+            )
 
     @classmethod
     def get_namespaces(cls):
@@ -139,9 +150,9 @@
         for p in cls.plugins:
             if p.name.lower() == name.lower():
                 return p.namespace
-        raise exceptions.NotFound(_(
-            "Can't find a plugin with the name \"{name}\".".format(
-                name=name)))
+        raise exceptions.NotFound(
+            _('Can\'t find a plugin with the name "{name}".'.format(name=name))
+        )
 
     def get_bridge_data(self, session):
         """Retrieve session data serialized for bridge.
@@ -150,12 +161,11 @@
         @return (unicode): serialized data for bridge
         """
         if session is None:
-            return ''
-        plugin = session['plugin']
-        bridge_data = {'name': plugin.name,
-                       'namespace': plugin.namespace}
-        if 'directed_devices' in session:
-            bridge_data['directed_devices'] = session['directed_devices']
+            return ""
+        plugin = session["plugin"]
+        bridge_data = {"name": plugin.name, "namespace": plugin.namespace}
+        if "directed_devices" in session:
+            bridge_data["directed_devices"] = session["directed_devices"]
 
         return data_format.serialise(bridge_data)
 
@@ -205,8 +215,12 @@
             it will be replaced by the new one
         """
         if not self.plugins:
-            raise exceptions.NotFound(_("No encryption plugin is registered, "
-                                        "an encryption session can't be started"))
+            raise exceptions.NotFound(
+                _(
+                    "No encryption plugin is registered, "
+                    "an encryption session can't be started"
+                )
+            )
 
         if namespace is None:
             plugin = self.plugins[0]
@@ -218,9 +232,12 @@
             # we have already an encryption session with this contact
             former_plugin = self._sessions[bare_jid]["plugin"]
             if former_plugin.namespace == namespace:
-                log.info(_("Session with {bare_jid} is already encrypted with {name}. "
-                           "Nothing to do.").format(
-                               bare_jid=bare_jid, name=former_plugin.name))
+                log.info(
+                    _(
+                        "Session with {bare_jid} is already encrypted with {name}. "
+                        "Nothing to do."
+                    ).format(bare_jid=bare_jid, name=former_plugin.name)
+                )
                 return
 
             if replace:
@@ -229,9 +246,10 @@
                 del self._sessions[bare_jid]
                 await self._stop_encryption(former_plugin, entity)
             else:
-                msg = (_("Session with {bare_jid} is already encrypted with {name}. "
-                         "Please stop encryption session before changing algorithm.")
-                       .format(bare_jid=bare_jid, name=plugin.name))
+                msg = _(
+                    "Session with {bare_jid} is already encrypted with {name}. "
+                    "Please stop encryption session before changing algorithm."
+                ).format(bare_jid=bare_jid, name=plugin.name)
                 log.warning(msg)
                 raise exceptions.ConflictError(msg)
 
@@ -241,34 +259,44 @@
                 entity.resource = self.host.memory.main_resource_get(self.client, entity)
                 if not entity.resource:
                     raise exceptions.NotFound(
-                        _("No resource found for {destinee}, can't encrypt with {name}")
-                        .format(destinee=entity.full(), name=plugin.name))
-                log.info(_("No resource specified to encrypt with {name}, using "
-                           "{destinee}.").format(destinee=entity.full(),
-                                                  name=plugin.name))
+                        _(
+                            "No resource found for {destinee}, can't encrypt with {name}"
+                        ).format(destinee=entity.full(), name=plugin.name)
+                    )
+                log.info(
+                    _(
+                        "No resource specified to encrypt with {name}, using "
+                        "{destinee}."
+                    ).format(destinee=entity.full(), name=plugin.name)
+                )
             # indicate that we encrypt only for some devices
-            directed_devices = data['directed_devices'] = [entity.resource]
+            directed_devices = data["directed_devices"] = [entity.resource]
         elif entity.resource:
             raise ValueError(_("{name} encryption must be used with bare jids."))
 
         await self._start_encryption(plugin, entity)
         self._sessions[entity.userhostJID()] = data
-        log.info(_("Encryption session has been set for {entity_jid} with "
-                   "{encryption_name}").format(
-                   entity_jid=entity.full(), encryption_name=plugin.name))
+        log.info(
+            _(
+                "Encryption session has been set for {entity_jid} with "
+                "{encryption_name}"
+            ).format(entity_jid=entity.full(), encryption_name=plugin.name)
+        )
         self.host.bridge.message_encryption_started(
-            entity.full(),
-            self.get_bridge_data(data),
-            self.client.profile)
-        msg = D_("Encryption session started: your messages with {destinee} are "
-                 "now end to end encrypted using {name} algorithm.").format(
-                 destinee=entity.full(), name=plugin.name)
-        directed_devices = data.get('directed_devices')
+            entity.full(), self.get_bridge_data(data), self.client.profile
+        )
+        msg = D_(
+            "Encryption session started: your messages with {destinee} are "
+            "now end to end encrypted using {name} algorithm."
+        ).format(destinee=entity.full(), name=plugin.name)
+        directed_devices = data.get("directed_devices")
         if directed_devices:
-            msg += "\n" + D_("Message are encrypted only for {nb_devices} device(s): "
-                              "{devices_list}.").format(
-                              nb_devices=len(directed_devices),
-                              devices_list = ', '.join(directed_devices))
+            msg += "\n" + D_(
+                "Message are encrypted only for {nb_devices} device(s): "
+                "{devices_list}."
+            ).format(
+                nb_devices=len(directed_devices), devices_list=", ".join(directed_devices)
+            )
 
         self.client.feedback(bare_jid, msg)
 
@@ -283,29 +311,38 @@
         session = self.getSession(entity.userhostJID())
         if not session:
             raise failure.Failure(
-                exceptions.NotFound(_("There is no encryption session with this "
-                                      "entity.")))
-        plugin = session['plugin']
+                exceptions.NotFound(
+                    _("There is no encryption session with this " "entity.")
+                )
+            )
+        plugin = session["plugin"]
         if namespace is not None and plugin.namespace != namespace:
-            raise exceptions.InternalError(_(
-                "The encryption session is not run with the expected plugin: encrypted "
-                "with {current_name} and was expecting {expected_name}").format(
-                current_name=session['plugin'].namespace,
-                expected_name=namespace))
+            raise exceptions.InternalError(
+                _(
+                    "The encryption session is not run with the expected plugin: encrypted "
+                    "with {current_name} and was expecting {expected_name}"
+                ).format(
+                    current_name=session["plugin"].namespace, expected_name=namespace
+                )
+            )
         if entity.resource:
             try:
-                directed_devices = session['directed_devices']
+                directed_devices = session["directed_devices"]
             except KeyError:
-                raise exceptions.NotFound(_(
-                    "There is a session for the whole entity (i.e. all devices of the "
-                    "entity), not a directed one. Please use bare jid if you want to "
-                    "stop the whole encryption with this entity."))
+                raise exceptions.NotFound(
+                    _(
+                        "There is a session for the whole entity (i.e. all devices of the "
+                        "entity), not a directed one. Please use bare jid if you want to "
+                        "stop the whole encryption with this entity."
+                    )
+                )
 
             try:
                 directed_devices.remove(entity.resource)
             except ValueError:
-                raise exceptions.NotFound(_("There is no directed session with this "
-                                            "entity."))
+                raise exceptions.NotFound(
+                    _("There is no directed session with this " "entity.")
+                )
             else:
                 if not directed_devices:
                     # if we have no more directed device sessions,
@@ -319,18 +356,24 @@
             del self._sessions[entity.userhostJID()]
             await self._stop_encryption(plugin, entity)
 
-        log.info(_("encryption session stopped with entity {entity}").format(
-            entity=entity.full()))
+        log.info(
+            _("encryption session stopped with entity {entity}").format(
+                entity=entity.full()
+            )
+        )
         self.host.bridge.message_encryption_stopped(
             entity.full(),
-            {'name': plugin.name,
-             'namespace': plugin.namespace,
+            {
+                "name": plugin.name,
+                "namespace": plugin.namespace,
             },
-            self.client.profile)
-        msg = D_("Encryption session finished: your messages with {destinee} are "
-                 "NOT end to end encrypted anymore.\nYour server administrators or "
-                 "{destinee} server administrators will be able to read them.").format(
-                 destinee=entity.full())
+            self.client.profile,
+        )
+        msg = D_(
+            "Encryption session finished: your messages with {destinee} are "
+            "NOT end to end encrypted anymore.\nYour server administrators or "
+            "{destinee} server administrators will be able to read them."
+        ).format(destinee=entity.full())
 
         self.client.feedback(entity, msg)
 
@@ -375,16 +418,19 @@
             session = self.getSession(entity_jid)
             if not session:
                 raise exceptions.NotFound(
-                    "No encryption session currently active for {entity_jid}"
-                    .format(entity_jid=entity_jid.full()))
-            plugin = session['plugin']
+                    "No encryption session currently active for {entity_jid}".format(
+                        entity_jid=entity_jid.full()
+                    )
+                )
+            plugin = session["plugin"]
         else:
             plugin = self.get_plugin(namespace)
         try:
             get_trust_ui = plugin.instance.get_trust_ui
         except AttributeError:
             raise NotImplementedError(
-                "Encryption plugin doesn't handle trust management UI")
+                "Encryption plugin doesn't handle trust management UI"
+            )
         else:
             return utils.as_deferred(get_trust_ui, self.client, entity_jid)
 
@@ -393,32 +439,32 @@
     @classmethod
     def _import_menus(cls, host):
         host.import_menu(
-             (D_("Encryption"), D_("unencrypted (plain text)")),
-             partial(cls._on_menu_unencrypted, host=host),
-             security_limit=0,
-             help_string=D_("End encrypted session"),
-             type_=C.MENU_SINGLE,
+            (D_("Encryption"), D_("unencrypted (plain text)")),
+            partial(cls._on_menu_unencrypted, host=host),
+            security_limit=0,
+            help_string=D_("End encrypted session"),
+            type_=C.MENU_SINGLE,
         )
         for plg in cls.getPlugins():
             host.import_menu(
-                 (D_("Encryption"), plg.name),
-                 partial(cls._on_menu_name, host=host, plg=plg),
-                 security_limit=0,
-                 help_string=D_("Start {name} session").format(name=plg.name),
-                 type_=C.MENU_SINGLE,
+                (D_("Encryption"), plg.name),
+                partial(cls._on_menu_name, host=host, plg=plg),
+                security_limit=0,
+                help_string=D_("Start {name} session").format(name=plg.name),
+                type_=C.MENU_SINGLE,
             )
             host.import_menu(
-                 (D_("Encryption"), D_("⛨ {name} trust").format(name=plg.name)),
-                 partial(cls._on_menu_trust, host=host, plg=plg),
-                 security_limit=0,
-                 help_string=D_("Manage {name} trust").format(name=plg.name),
-                 type_=C.MENU_SINGLE,
+                (D_("Encryption"), D_("⛨ {name} trust").format(name=plg.name)),
+                partial(cls._on_menu_trust, host=host, plg=plg),
+                security_limit=0,
+                help_string=D_("Manage {name} trust").format(name=plg.name),
+                type_=C.MENU_SINGLE,
             )
 
     @classmethod
     def _on_menu_unencrypted(cls, data, host, profile):
         client = host.get_client(profile)
-        peer_jid = jid.JID(data['jid']).userhostJID()
+        peer_jid = jid.JID(data["jid"]).userhostJID()
         d = defer.ensureDeferred(client.encryption.stop(peer_jid))
         d.addCallback(lambda __: {})
         return d
@@ -426,11 +472,12 @@
     @classmethod
     def _on_menu_name(cls, data, host, plg, profile):
         client = host.get_client(profile)
-        peer_jid = jid.JID(data['jid'])
+        peer_jid = jid.JID(data["jid"])
         if not plg.directed:
             peer_jid = peer_jid.userhostJID()
         d = defer.ensureDeferred(
-            client.encryption.start(peer_jid, plg.namespace, replace=True))
+            client.encryption.start(peer_jid, plg.namespace, replace=True)
+        )
         d.addCallback(lambda __: {})
         return d
 
@@ -438,22 +485,23 @@
     @defer.inlineCallbacks
     def _on_menu_trust(cls, data, host, plg, profile):
         client = host.get_client(profile)
-        peer_jid = jid.JID(data['jid']).userhostJID()
+        peer_jid = jid.JID(data["jid"]).userhostJID()
         ui = yield client.encryption.get_trust_ui(peer_jid, plg.namespace)
-        defer.returnValue({'xmlui': ui.toXml()})
+        defer.returnValue({"xmlui": ui.toXml()})
 
     ## Triggers ##
 
     def set_encryption_flag(self, mess_data):
         """Set "encryption" key in mess_data if session with destinee is encrypted"""
-        to_jid = mess_data['to']
+        to_jid = mess_data["to"]
         encryption = self._sessions.get(to_jid.userhostJID())
         if encryption is not None:
-            plugin = encryption['plugin']
+            plugin = encryption["plugin"]
             if mess_data["type"] == "groupchat" and plugin.directed:
                 raise exceptions.InternalError(
-                f"encryption flag must not be set for groupchat if encryption algorithm "
-                f"({encryption['plugin'].name}) is directed!")
+                    f"encryption flag must not be set for groupchat if encryption algorithm "
+                    f"({encryption['plugin'].name}) is directed!"
+                )
             mess_data[C.MESS_KEY_ENCRYPTION] = encryption
             self.mark_as_encrypted(mess_data, plugin.namespace)
 
@@ -467,26 +515,25 @@
         @param mess_data(dict): message data as used in post treat workflow
         @param namespace(str): namespace of the algorithm used for encrypting the message
         """
-        mess_data['extra'][C.MESS_KEY_ENCRYPTED] = True
-        from_bare_jid = mess_data['from'].userhostJID()
+        mess_data["extra"][C.MESS_KEY_ENCRYPTED] = True
+        from_bare_jid = mess_data["from"].userhostJID()
         if from_bare_jid != self.client.jid.userhostJID():
             session = self.getSession(from_bare_jid)
             if session is None:
                 # if we are currently unencrypted, we start a session automatically
                 # to avoid sending unencrypted messages in an encrypted context
-                log.info(_(
-                    "Starting e2e session with {peer_jid} as we receive encrypted "
-                    "messages")
-                    .format(peer_jid=from_bare_jid)
+                log.info(
+                    _(
+                        "Starting e2e session with {peer_jid} as we receive encrypted "
+                        "messages"
+                    ).format(peer_jid=from_bare_jid)
                 )
                 defer.ensureDeferred(self.start(from_bare_jid, namespace))
 
         return mess_data
 
     def is_encryption_requested(
-        self,
-        mess_data: MessageData,
-        namespace: Optional[str] = None
+        self, mess_data: MessageData, namespace: Optional[str] = None
     ) -> bool:
         """Helper method to check if encryption is requested in an outgoind message
 
@@ -499,7 +546,7 @@
         if encryption is None:
             return False
         # we get plugin even if namespace is None to be sure that the key exists
-        plugin = encryption['plugin']
+        plugin = encryption["plugin"]
         if namespace is None:
             return True
         return plugin.namespace == namespace
@@ -510,8 +557,7 @@
         @param mess_data(dict): message data
         @return (bool): True if the encrypted flag is present
         """
-        return mess_data['extra'].get(C.MESS_KEY_ENCRYPTED, False)
-
+        return mess_data["extra"].get(C.MESS_KEY_ENCRYPTED, False)
 
     def mark_as_trusted(self, mess_data):
         """Helper methor to mark a message as sent from a trusted entity.
@@ -530,5 +576,5 @@
         the plugin
         @param mess_data(dict): message data as used in post treat workflow
         """
-        mess_data['trusted'] = False
+        mess_data["trusted"] = False
         return mess_data