changeset 3541:888109774673

core: various changes and fixes to work with new storage and D-Bus bridge: - fixes coroutines handling in various places - fixes types which are not serialised by Tx DBus - XEP-0384: call storage methods in main thread in XEP: Python OMEMO's Promise use thread which prevent the use of AsyncIO loop. To work around that, callLater is used to launch storage method in main thread. This is a temporary workaround, as Python OMEMO should get rid of Promise implementation and threads soon.
author Goffi <goffi@goffi.org>
date Thu, 03 Jun 2021 15:21:43 +0200
parents aa58451b77ba
children 813595f88612
files sat/core/sat_main.py sat/core/xmpp.py sat/memory/memory.py sat/memory/persistent.py sat/plugins/plugin_comp_file_sharing.py sat/plugins/plugin_comp_file_sharing_management.py sat/plugins/plugin_misc_identity.py sat/plugins/plugin_xep_0033.py sat/plugins/plugin_xep_0045.py sat/plugins/plugin_xep_0198.py sat/plugins/plugin_xep_0313.py sat/plugins/plugin_xep_0329.py sat/plugins/plugin_xep_0384.py
diffstat 13 files changed, 221 insertions(+), 185 deletions(-) [+]
line wrap: on
line diff
--- a/sat/core/sat_main.py	Thu Jun 03 15:21:43 2021 +0200
+++ b/sat/core/sat_main.py	Thu Jun 03 15:21:43 2021 +0200
@@ -456,7 +456,7 @@
             except AttributeError:
                 continue
             else:
-                defers_list.append(defer.maybeDeferred(unload))
+                defers_list.append(utils.asDeferred(unload))
         return defers_list
 
     def _connect(self, profile_key, password="", options=None):
@@ -470,7 +470,7 @@
         Retrieve the individual parameters, authenticate the profile
         and initiate the connection to the associated XMPP server.
         @param profile: %(doc_profile)s
-        @param password (string): the SàT profile password
+        @param password (string): the Libervia profile password
         @param options (dict): connection options. Key can be:
             -
         @param max_retries (int): max number of connection retries
@@ -533,7 +533,7 @@
         features = []
         for import_name, plugin in self.plugins.items():
             try:
-                features_d = defer.maybeDeferred(plugin.getFeatures, profile_key)
+                features_d = utils.asDeferred(plugin.getFeatures, profile_key)
             except AttributeError:
                 features_d = defer.succeed({})
             features.append(features_d)
@@ -581,7 +581,7 @@
                 attr = client.roster.getAttributes(item)
                 # we use full() and not userhost() because jid with resources are allowed
                 # in roster, even if it's not common.
-                ret.append([item.entity.full(), attr, item.groups])
+                ret.append([item.entity.full(), attr, list(item.groups)])
             return ret
 
         return client.roster.got_roster.addCallback(got_roster)
@@ -1108,6 +1108,7 @@
     def _findByFeatures(self, namespaces, identities, bare_jids, service, roster, own_jid,
                         local_device, profile_key):
         client = self.getClient(profile_key)
+        identities = [tuple(i) for i in identities] if identities else None
         return defer.ensureDeferred(self.findByFeatures(
             client, namespaces, identities, bare_jids, service, roster, own_jid,
             local_device))
--- a/sat/core/xmpp.py	Thu Jun 03 15:21:43 2021 +0200
+++ b/sat/core/xmpp.py	Thu Jun 03 15:21:43 2021 +0200
@@ -714,7 +714,7 @@
             or mess_data["type"] == C.MESS_TYPE_INFO
         )
 
-    def messageAddToHistory(self, data):
+    async def messageAddToHistory(self, data):
         """Store message into database (for local history)
 
         @param data: message data dictionnary
@@ -726,7 +726,7 @@
 
             # we need a message to store
             if self.isMessagePrintable(data):
-                self.host_app.memory.addToHistory(self, data)
+                await self.host_app.memory.addToHistory(self, data)
             else:
                 log.warning(
                     "No message found"
@@ -876,7 +876,9 @@
 
     def addPostXmlCallbacks(self, post_xml_treatments):
         post_xml_treatments.addCallback(self.messageProt.completeAttachments)
-        post_xml_treatments.addCallback(self.messageAddToHistory)
+        post_xml_treatments.addCallback(
+            lambda ret: defer.ensureDeferred(self.messageAddToHistory(ret))
+        )
         post_xml_treatments.addCallback(self.messageSendToBridge)
 
     def send(self, obj):
@@ -1061,7 +1063,9 @@
 
     def addPostXmlCallbacks(self, post_xml_treatments):
         if self.sendHistory:
-            post_xml_treatments.addCallback(self.messageAddToHistory)
+            post_xml_treatments.addCallback(
+                lambda ret: defer.ensureDeferred(self.messageAddToHistory(ret))
+            )
 
     def getOwnerFromJid(self, to_jid: jid.JID) -> jid.JID:
         """Retrieve "owner" of a component resource from the destination jid of the request
@@ -1212,7 +1216,9 @@
         data = self.parseMessage(message_elt)
         post_treat.addCallback(self.completeAttachments)
         post_treat.addCallback(self.skipEmptyMessage)
-        post_treat.addCallback(self.addToHistory)
+        post_treat.addCallback(
+            lambda ret: defer.ensureDeferred(self.addToHistory(ret))
+        )
         post_treat.addCallback(self.bridgeSignal, data)
         post_treat.addErrback(self.cancelErrorTrap)
         post_treat.callback(data)
@@ -1253,14 +1259,14 @@
             raise failure.Failure(exceptions.CancelError("Cancelled empty message"))
         return data
 
-    def addToHistory(self, data):
+    async def addToHistory(self, data):
         if data.pop("history", None) == C.HISTORY_SKIP:
             log.debug("history is skipped as requested")
             data["extra"]["history"] = C.HISTORY_SKIP
         else:
             # we need a message to store
             if self.parent.isMessagePrintable(data):
-                return self.host.memory.addToHistory(self.parent, data)
+                return await self.host.memory.addToHistory(self.parent, data)
             else:
                 log.debug("not storing empty message to history: {data}"
                     .format(data=data))
@@ -1478,7 +1484,8 @@
         self._jids[entity] = item
         self._registerItem(item)
         self.host.bridge.newContact(
-            entity.full(), self.getAttributes(item), item.groups, self.parent.profile
+            entity.full(), self.getAttributes(item), list(item.groups),
+            self.parent.profile
         )
 
     def removeReceived(self, request):
--- a/sat/memory/memory.py	Thu Jun 03 15:21:43 2021 +0200
+++ b/sat/memory/memory.py	Thu Jun 03 15:21:43 2021 +0200
@@ -1131,12 +1131,12 @@
         )
 
     def asyncGetStringParamA(
-        self, name, category, attr="value", security_limit=C.NO_SECURITY_LIMIT,
+        self, name, category, attribute="value", security_limit=C.NO_SECURITY_LIMIT,
         profile_key=C.PROF_KEY_NONE):
 
         profile = self.getProfileName(profile_key)
         return defer.ensureDeferred(self.params.asyncGetStringParamA(
-            name, category, attr, security_limit, profile
+            name, category, attribute, security_limit, profile
         ))
 
     def _getParamsUI(self, security_limit, app, extra_s, profile_key):
@@ -1170,20 +1170,22 @@
         client = self.host.getClient(profile_key)
         # we accept any type
         data = data_format.deserialise(data_s, type_check=None)
-        return self.storage.setPrivateValue(
-            namespace, key, data, binary=True, profile=client.profile)
+        return defer.ensureDeferred(self.storage.setPrivateValue(
+            namespace, key, data, binary=True, profile=client.profile))
 
     def _privateDataGet(self, namespace, key, profile_key):
         client = self.host.getClient(profile_key)
-        d = self.storage.getPrivates(
-            namespace, [key], binary=True, profile=client.profile)
+        d = defer.ensureDeferred(
+            self.storage.getPrivates(
+                namespace, [key], binary=True, profile=client.profile)
+        )
         d.addCallback(lambda data_dict: data_format.serialise(data_dict.get(key)))
         return d
 
     def _privateDataDelete(self, namespace, key, profile_key):
         client = self.host.getClient(profile_key)
-        return self.storage.delPrivateValue(
-            namespace, key, binary=True, profile=client.profile)
+        return defer.ensureDeferred(self.storage.delPrivateValue(
+            namespace, key, binary=True, profile=client.profile))
 
     ## Files ##
 
@@ -1251,8 +1253,7 @@
                     _("unknown access type: {type}").format(type=perm_type)
                 )
 
-    @defer.inlineCallbacks
-    def checkPermissionToRoot(self, client, file_data, peer_jid, perms_to_check):
+    async def checkPermissionToRoot(self, client, file_data, peer_jid, perms_to_check):
         """do checkFilePermission on file_data and all its parents until root"""
         current = file_data
         while True:
@@ -1260,7 +1261,7 @@
             parent = current["parent"]
             if not parent:
                 break
-            files_data = yield self.getFiles(
+            files_data = await self.getFiles(
                 client, peer_jid=None, file_id=parent, perms_to_check=None
             )
             try:
@@ -1268,8 +1269,7 @@
             except IndexError:
                 raise exceptions.DataError("Missing parent")
 
-    @defer.inlineCallbacks
-    def _getParentDir(
+    async def _getParentDir(
         self, client, path, parent, namespace, owner, peer_jid, perms_to_check
     ):
         """Retrieve parent node from a path, or last existing directory
@@ -1293,7 +1293,7 @@
         # non existing directories will be created
         parent = ""
         for idx, path_elt in enumerate(path_elts):
-            directories = yield self.storage.getFiles(
+            directories = await self.storage.getFiles(
                 client,
                 parent=parent,
                 type_=C.FILE_TYPE_DIRECTORY,
@@ -1302,7 +1302,7 @@
                 owner=owner,
             )
             if not directories:
-                defer.returnValue((parent, path_elts[idx:]))
+                return (parent, path_elts[idx:])
                 # from this point, directories don't exist anymore, we have to create them
             elif len(directories) > 1:
                 raise exceptions.InternalError(
@@ -1312,7 +1312,7 @@
                 directory = directories[0]
                 self.checkFilePermission(directory, peer_jid, perms_to_check)
                 parent = directory["id"]
-        defer.returnValue((parent, []))
+        return (parent, [])
 
     def getFileAffiliations(self, file_data: dict) -> Dict[jid.JID, str]:
         """Convert file access to pubsub like affiliations"""
@@ -1484,8 +1484,7 @@
             )
         return peer_jid.userhostJID()
 
-    @defer.inlineCallbacks
-    def getFiles(
+    async def getFiles(
         self, client, peer_jid, file_id=None, version=None, parent=None, path=None,
         type_=None, file_hash=None, hash_algo=None, name=None, namespace=None,
         mime_type=None, public_id=None, owner=None, access=None, projection=None,
@@ -1536,7 +1535,7 @@
         if path is not None:
             path = str(path)
             # permission are checked by _getParentDir
-            parent, remaining_path_elts = yield self._getParentDir(
+            parent, remaining_path_elts = await self._getParentDir(
                 client, path, parent, namespace, owner, peer_jid, perms_to_check
             )
             if remaining_path_elts:
@@ -1546,16 +1545,16 @@
         if parent and peer_jid:
             # if parent is given directly and permission check is requested,
             # we need to check all the parents
-            parent_data = yield self.storage.getFiles(client, file_id=parent)
+            parent_data = await self.storage.getFiles(client, file_id=parent)
             try:
                 parent_data = parent_data[0]
             except IndexError:
                 raise exceptions.DataError("mising parent")
-            yield self.checkPermissionToRoot(
+            await self.checkPermissionToRoot(
                 client, parent_data, peer_jid, perms_to_check
             )
 
-        files = yield self.storage.getFiles(
+        files = await self.storage.getFiles(
             client,
             file_id=file_id,
             version=version,
@@ -1578,15 +1577,16 @@
             to_remove = []
             for file_data in files:
                 try:
-                    self.checkFilePermission(file_data, peer_jid, perms_to_check, set_affiliation=True)
+                    self.checkFilePermission(
+                        file_data, peer_jid, perms_to_check, set_affiliation=True
+                    )
                 except exceptions.PermissionError:
                     to_remove.append(file_data)
             for file_data in to_remove:
                 files.remove(file_data)
-        defer.returnValue(files)
+        return files
 
-    @defer.inlineCallbacks
-    def setFile(
+    async def setFile(
         self, client, name, file_id=None, version="", parent=None, path=None,
         type_=C.FILE_TYPE_FILE, file_hash=None, hash_algo=None, size=None,
         namespace=None, mime_type=None, public_id=None, created=None, modified=None,
@@ -1668,13 +1668,13 @@
         if path is not None:
             path = str(path)
             # _getParentDir will check permissions if peer_jid is set, so we use owner
-            parent, remaining_path_elts = yield self._getParentDir(
+            parent, remaining_path_elts = await self._getParentDir(
                 client, path, parent, namespace, owner, owner, perms_to_check
             )
             # if remaining directories don't exist, we have to create them
             for new_dir in remaining_path_elts:
                 new_dir_id = shortuuid.uuid()
-                yield self.storage.setFile(
+                await self.storage.setFile(
                     client,
                     name=new_dir,
                     file_id=new_dir_id,
@@ -1691,7 +1691,7 @@
         elif parent is None:
             parent = ""
 
-        yield self.storage.setFile(
+        await self.storage.setFile(
             client,
             file_id=file_id,
             version=version,
--- a/sat/memory/persistent.py	Thu Jun 03 15:21:43 2021 +0200
+++ b/sat/memory/persistent.py	Thu Jun 03 15:21:43 2021 +0200
@@ -17,10 +17,13 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
+from twisted.internet import defer
+from twisted.python import failure
 from sat.core.i18n import _
 from sat.core.log import getLogger
+
+
 log = getLogger(__name__)
-from twisted.python import failure
 
 
 class MemoryNotInitializedError(Exception):
@@ -57,7 +60,9 @@
         need to be called before any other operation
         @return: defers the PersistentDict instance itself
         """
-        d = self.storage.getPrivates(self.namespace, binary=self.binary, profile=self.profile)
+        d = defer.ensureDeferred(self.storage.getPrivates(
+            self.namespace, binary=self.binary, profile=self.profile
+        ))
         d.addCallback(self._setCache)
         d.addCallback(lambda __: self)
         return d
@@ -111,8 +116,11 @@
         return self._cache.__getitem__(key)
 
     def __setitem__(self, key, value):
-        self.storage.setPrivateValue(self.namespace, key, value, self.binary,
-                                     self.profile)
+        defer.ensureDeferred(
+            self.storage.setPrivateValue(
+                self.namespace, key, value, self.binary, self.profile
+            )
+        )
         return self._cache.__setitem__(key, value)
 
     def __delitem__(self, key):
@@ -130,8 +138,11 @@
     def aset(self, key, value):
         """Async set, return a Deferred fired when value is actually stored"""
         self._cache.__setitem__(key, value)
-        return self.storage.setPrivateValue(self.namespace, key, value,
-                                            self.binary, self.profile)
+        return defer.ensureDeferred(
+            self.storage.setPrivateValue(
+                self.namespace, key, value, self.binary, self.profile
+            )
+        )
 
     def adel(self, key):
         """Async del, return a Deferred fired when value is actually deleted"""
@@ -151,8 +162,11 @@
 
         @return: deferred fired when data is actually saved
         """
-        return self.storage.setPrivateValue(self.namespace, name, self._cache[name],
-                                            self.binary, self.profile)
+        return defer.ensureDeferred(
+            self.storage.setPrivateValue(
+                self.namespace, name, self._cache[name], self.binary, self.profile
+            )
+        )
 
 
 class PersistentBinaryDict(PersistentDict):
@@ -178,12 +192,16 @@
         raise NotImplementedError
 
     def items(self):
-        d = self.storage.getPrivates(self.namespace, binary=self.binary, profile=self.profile)
+        d = defer.ensureDeferred(self.storage.getPrivates(
+            self.namespace, binary=self.binary, profile=self.profile
+        ))
         d.addCallback(lambda data_dict: data_dict.items())
         return d
 
     def all(self):
-        return self.storage.getPrivates(self.namespace, binary=self.binary, profile=self.profile)
+        return defer.ensureDeferred(self.storage.getPrivates(
+            self.namespace, binary=self.binary, profile=self.profile
+        ))
 
     def __repr__(self):
         raise NotImplementedError
@@ -234,14 +252,18 @@
 
     def __getitem__(self, key):
         """get the value as a Deferred"""
-        d = self.storage.getPrivates(self.namespace, keys=[key], binary=self.binary,
-                                     profile=self.profile)
+        d = defer.ensureDeferred(self.storage.getPrivates(
+            self.namespace, keys=[key], binary=self.binary, profile=self.profile
+        ))
         d.addCallback(self._data2value, key)
         return d
 
     def __setitem__(self, key, value):
-        self.storage.setPrivateValue(self.namespace, key, value, self.binary,
-                                     self.profile)
+        defer.ensureDeferred(
+            self.storage.setPrivateValue(
+                self.namespace, key, value, self.binary, self.profile
+            )
+        )
 
     def __delitem__(self, key):
         self.storage.delPrivateValue(self.namespace, key, self.binary, self.profile)
@@ -259,8 +281,11 @@
         """Async set, return a Deferred fired when value is actually stored"""
         # FIXME: redundant with force, force must be removed
         # XXX: similar as PersistentDict.aset, but doesn't use cache
-        return self.storage.setPrivateValue(self.namespace, key, value,
-                                            self.binary, self.profile)
+        return defer.ensureDeferred(
+            self.storage.setPrivateValue(
+                self.namespace, key, value, self.binary, self.profile
+            )
+        )
 
     def adel(self, key):
         """Async del, return a Deferred fired when value is actually deleted"""
@@ -277,7 +302,11 @@
         @param value(object): value is needed for LazyPersistentBinaryDict
         @return: deferred fired when data is actually saved
         """
-        return self.storage.setPrivateValue(self.namespace, name, value, self.binary, self.profile)
+        return defer.ensureDeferred(
+            self.storage.setPrivateValue(
+                self.namespace, name, value, self.binary, self.profile
+            )
+        )
 
     def remove(self, key):
         """Delete a key from sotrage, and return a deferred called when it's done
--- a/sat/plugins/plugin_comp_file_sharing.py	Thu Jun 03 15:21:43 2021 +0200
+++ b/sat/plugins/plugin_comp_file_sharing.py	Thu Jun 03 15:21:43 2021 +0200
@@ -31,6 +31,7 @@
 from sat.core.log import getLogger
 from sat.tools import stream
 from sat.tools import video
+from sat.tools.utils import ensure_deferred
 from sat.tools.common import regex
 from sat.tools.common import uri
 from sat.tools.common import files_utils
@@ -487,7 +488,7 @@
                 else:
                     await self.generate_thumbnails(extra, thumb_path)
 
-        self.host.memory.setFile(
+        await self.host.memory.setFile(
             client,
             name=name,
             version="",
@@ -546,8 +547,7 @@
         )
         return False, defer.succeed(True)
 
-    @defer.inlineCallbacks
-    def _retrieveFiles(
+    async def _retrieveFiles(
         self, client, session, content_data, content_name, file_data, file_elt
     ):
         """This method retrieve a file on request, and send if after checking permissions"""
@@ -557,7 +557,7 @@
         else:
             owner = peer_jid
         try:
-            found_files = yield self.host.memory.getFiles(
+            found_files = await self.host.memory.getFiles(
                 client,
                 peer_jid=peer_jid,
                 name=file_data.get("name"),
@@ -575,13 +575,13 @@
                     peer_jid=peer_jid, name=file_data.get("name")
                 )
             )
-            defer.returnValue(False)
+            return False
 
         if not found_files:
             log.warning(
                 _("no matching file found ({file_data})").format(file_data=file_data)
             )
-            defer.returnValue(False)
+            return False
 
         # we only use the first found file
         found_file = found_files[0]
@@ -607,7 +607,7 @@
             size=size,
             data_cb=lambda data: hasher.update(data),
         )
-        defer.returnValue(True)
+        return True
 
     def _fileSendingRequestTrigger(
         self, client, session, content_data, content_name, file_data, file_elt
@@ -617,9 +617,9 @@
         else:
             return (
                 False,
-                self._retrieveFiles(
+                defer.ensureDeferred(self._retrieveFiles(
                     client, session, content_data, content_name, file_data, file_elt
-                ),
+                )),
             )
 
     ## HTTP Upload ##
@@ -757,11 +757,10 @@
             raise error.StanzaError("item-not-found")
         return file_id
 
-    @defer.inlineCallbacks
-    def getFileData(self, requestor, nodeIdentifier):
+    async def getFileData(self, requestor, nodeIdentifier):
         file_id = self._getFileId(nodeIdentifier)
         try:
-            files = yield self.host.memory.getFiles(self.parent, requestor, file_id)
+            files = await self.host.memory.getFiles(self.parent, requestor, file_id)
         except (exceptions.NotFound, exceptions.PermissionError):
             # we don't differenciate between NotFound and PermissionError
             # to avoid leaking information on existing files
@@ -770,7 +769,7 @@
             raise error.StanzaError("item-not-found")
         if len(files) > 1:
             raise error.InternalError("there should be only one file")
-        defer.returnValue(files[0])
+        return files[0]
 
     def commentsUpdate(self, extra, new_comments, peer_jid):
         """update comments (replace or insert new_comments)
@@ -825,10 +824,10 @@
             iq_elt = iq_elt.parent
         return iq_elt["from"]
 
-    @defer.inlineCallbacks
-    def publish(self, requestor, service, nodeIdentifier, items):
+    @ensure_deferred
+    async def publish(self, requestor, service, nodeIdentifier, items):
         #  we retrieve file a first time to check authorisations
-        file_data = yield self.getFileData(requestor, nodeIdentifier)
+        file_data = await self.getFileData(requestor, nodeIdentifier)
         file_id = file_data["id"]
         comments = [(item["id"], self._getFrom(item), item.toXml()) for item in items]
         if requestor.userhostJID() == file_data["owner"]:
@@ -837,24 +836,24 @@
             peer_jid = requestor.userhost()
         update_cb = partial(self.commentsUpdate, new_comments=comments, peer_jid=peer_jid)
         try:
-            yield self.host.memory.fileUpdate(file_id, "extra", update_cb)
+            await self.host.memory.fileUpdate(file_id, "extra", update_cb)
         except exceptions.PermissionError:
             raise error.StanzaError("not-authorized")
 
-    @defer.inlineCallbacks
-    def items(self, requestor, service, nodeIdentifier, maxItems, itemIdentifiers):
-        file_data = yield self.getFileData(requestor, nodeIdentifier)
+    @ensure_deferred
+    async def items(self, requestor, service, nodeIdentifier, maxItems, itemIdentifiers):
+        file_data = await self.getFileData(requestor, nodeIdentifier)
         comments = file_data["extra"].get("comments", [])
         if itemIdentifiers:
             defer.returnValue(
                 [generic.parseXml(c[2]) for c in comments if c[0] in itemIdentifiers]
             )
         else:
-            defer.returnValue([generic.parseXml(c[2]) for c in comments])
+            return [generic.parseXml(c[2]) for c in comments]
 
-    @defer.inlineCallbacks
-    def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
-        file_data = yield self.getFileData(requestor, nodeIdentifier)
+    @ensure_deferred
+    async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
+        file_data = await self.getFileData(requestor, nodeIdentifier)
         file_id = file_data["id"]
         try:
             comments = file_data["extra"]["comments"]
@@ -879,4 +878,4 @@
                 raise error.StanzaError("not-authorized")
 
         remove_cb = partial(self.commentsDelete, comments=to_remove)
-        yield self.host.memory.fileUpdate(file_id, "extra", remove_cb)
+        await self.host.memory.fileUpdate(file_id, "extra", remove_cb)
--- a/sat/plugins/plugin_comp_file_sharing_management.py	Thu Jun 03 15:21:43 2021 +0200
+++ b/sat/plugins/plugin_comp_file_sharing_management.py	Thu Jun 03 15:21:43 2021 +0200
@@ -149,8 +149,7 @@
         payload = form.toElement()
         return payload, status, None, None
 
-    @defer.inlineCallbacks
-    def _getFileData(self, client, session_data, command_form):
+    async def _getFileData(self, client, session_data, command_form):
         """Retrieve field requested in root form
 
         "found_file" will also be set in session_data
@@ -177,7 +176,7 @@
         #       this must be managed
 
         try:
-            found_files = yield self.host.memory.getFiles(
+            found_files = await self.host.memory.getFiles(
                 client, requestor_bare, path=parent_path, name=basename,
                 namespace=namespace)
             found_file = found_files[0]
@@ -193,7 +192,7 @@
 
         session_data['found_file'] = found_file
         session_data['namespace'] = namespace
-        defer.returnValue(found_file)
+        return found_file
 
     def _updateReadPermission(self, access, allowed_jids):
         if not allowed_jids:
@@ -209,29 +208,27 @@
                 "jids": [j.full() for j in allowed_jids]
             }
 
-    @defer.inlineCallbacks
-    def _updateDir(self, client, requestor, namespace, file_data, allowed_jids):
+    async def _updateDir(self, client, requestor, namespace, file_data, allowed_jids):
         """Recursively update permission of a directory and all subdirectories
 
         @param file_data(dict): metadata of the file
         @param allowed_jids(list[jid.JID]): list of entities allowed to read the file
         """
         assert file_data['type'] == C.FILE_TYPE_DIRECTORY
-        files_data = yield self.host.memory.getFiles(
+        files_data = await self.host.memory.getFiles(
             client, requestor, parent=file_data['id'], namespace=namespace)
 
         for file_data in files_data:
             if not file_data['access'].get(C.ACCESS_PERM_READ, {}):
                 log.debug("setting {perm} read permission for {name}".format(
                     perm=allowed_jids, name=file_data['name']))
-                yield self.host.memory.fileUpdate(
+                await self.host.memory.fileUpdate(
                     file_data['id'], 'access',
                     partial(self._updateReadPermission, allowed_jids=allowed_jids))
             if file_data['type'] == C.FILE_TYPE_DIRECTORY:
-                yield self._updateDir(client, requestor, namespace, file_data, 'PUBLIC')
+                await self._updateDir(client, requestor, namespace, file_data, 'PUBLIC')
 
-    @defer.inlineCallbacks
-    def _onChangeFile(self, client, command_elt, session_data, action, node):
+    async def _onChangeFile(self, client, command_elt, session_data, action, node):
         try:
             x_elt = next(command_elt.elements(data_form.NS_X_DATA, "x"))
             command_form = data_form.Form.fromElement(x_elt)
@@ -244,14 +241,14 @@
 
         if command_form is None or len(command_form.fields) == 0:
             # root request
-            defer.returnValue(self._getRootArgs())
+            return self._getRootArgs()
 
         elif found_file is None:
             # file selected, we retrieve it and ask for permissions
             try:
-                found_file = yield self._getFileData(client, session_data, command_form)
+                found_file = await self._getFileData(client, session_data, command_form)
             except WorkflowError as e:
-                defer.returnValue(e.err_args)
+                return e.err_args
 
             # management request
             if found_file['type'] == C.FILE_TYPE_DIRECTORY:
@@ -284,7 +281,7 @@
 
             status = self._c.STATUS.EXECUTING
             payload = form.toElement()
-            defer.returnValue((payload, status, None, None))
+            return (payload, status, None, None)
 
         else:
             # final phase, we'll do permission change here
@@ -307,7 +304,7 @@
                     self._c.adHocError(self._c.ERROR.BAD_PAYLOAD)
 
             if found_file['type'] == C.FILE_TYPE_FILE:
-                yield self.host.memory.fileUpdate(
+                await self.host.memory.fileUpdate(
                     found_file['id'], 'access',
                     partial(self._updateReadPermission, allowed_jids=allowed_jids))
             else:
@@ -315,7 +312,7 @@
                     recursive = command_form.fields['recursive']
                 except KeyError:
                     self._c.adHocError(self._c.ERROR.BAD_PAYLOAD)
-                yield self.host.memory.fileUpdate(
+                await self.host.memory.fileUpdate(
                     found_file['id'], 'access',
                     partial(self._updateReadPermission, allowed_jids=allowed_jids))
                 if recursive:
@@ -323,17 +320,16 @@
                     # already a permission set), so allowed entities of root directory
                     # can read them.
                     namespace = session_data['namespace']
-                    yield self._updateDir(
+                    await self._updateDir(
                         client, requestor_bare, namespace, found_file, 'PUBLIC')
 
             # job done, we can end the session
             status = self._c.STATUS.COMPLETED
             payload = None
             note = (self._c.NOTE.INFO, _("management session done"))
-            defer.returnValue((payload, status, None, note))
+            return (payload, status, None, note)
 
-    @defer.inlineCallbacks
-    def _onDeleteFile(self, client, command_elt, session_data, action, node):
+    async def _onDeleteFile(self, client, command_elt, session_data, action, node):
         try:
             x_elt = next(command_elt.elements(data_form.NS_X_DATA, "x"))
             command_form = data_form.Form.fromElement(x_elt)
@@ -346,14 +342,14 @@
 
         if command_form is None or len(command_form.fields) == 0:
             # root request
-            defer.returnValue(self._getRootArgs())
+            return self._getRootArgs()
 
         elif found_file is None:
             # file selected, we need confirmation before actually deleting
             try:
-                found_file = yield self._getFileData(client, session_data, command_form)
+                found_file = await self._getFileData(client, session_data, command_form)
             except WorkflowError as e:
-                defer.returnValue(e.err_args)
+                return e.err_args
             if found_file['type'] == C.FILE_TYPE_DIRECTORY:
                 msg = D_("Are you sure to delete directory {name} and all files and "
                          "directories under it?").format(name=found_file['name'])
@@ -370,7 +366,7 @@
             form.addField(field)
             status = self._c.STATUS.EXECUTING
             payload = form.toElement()
-            defer.returnValue((payload, status, None, None))
+            return (payload, status, None, None)
 
         else:
             # final phase, we'll do deletion here
@@ -382,27 +378,26 @@
                 note = None
             else:
                 recursive = found_file['type'] == C.FILE_TYPE_DIRECTORY
-                yield self.host.memory.fileDelete(
+                await self.host.memory.fileDelete(
                     client, requestor_bare, found_file['id'], recursive)
                 note = (self._c.NOTE.INFO, _("file deleted"))
             status = self._c.STATUS.COMPLETED
             payload = None
-            defer.returnValue((payload, status, None, note))
+            return (payload, status, None, note)
 
     def _updateThumbs(self, extra, thumbnails):
         extra[C.KEY_THUMBNAILS] = thumbnails
 
-    @defer.inlineCallbacks
-    def _genThumbs(self, client, requestor, namespace, file_data):
+    async def _genThumbs(self, client, requestor, namespace, file_data):
         """Recursively generate thumbnails
 
         @param file_data(dict): metadata of the file
         """
         if file_data['type'] == C.FILE_TYPE_DIRECTORY:
-            sub_files_data = yield self.host.memory.getFiles(
+            sub_files_data = await self.host.memory.getFiles(
                 client, requestor, parent=file_data['id'], namespace=namespace)
             for sub_file_data in sub_files_data:
-                yield self._genThumbs(client, requestor, namespace, sub_file_data)
+                await self._genThumbs(client, requestor, namespace, sub_file_data)
 
         elif file_data['type'] == C.FILE_TYPE_FILE:
             media_type = file_data['media_type']
@@ -412,7 +407,7 @@
 
                 for max_thumb_size in self._t.SIZES:
                     try:
-                        thumb_size, thumb_id = yield self._t.generateThumbnail(
+                        thumb_size, thumb_id = await self._t.generateThumbnail(
                             file_path,
                             max_thumb_size,
                             #  we keep thumbnails for 6 months
@@ -424,7 +419,7 @@
                         break
                     thumbnails.append({"id": thumb_id, "size": thumb_size})
 
-                yield self.host.memory.fileUpdate(
+                await self.host.memory.fileUpdate(
                     file_data['id'], 'extra',
                     partial(self._updateThumbs, thumbnails=thumbnails))
 
@@ -434,8 +429,7 @@
         else:
             log.warning("unmanaged file type: {type_}".format(type_=file_data['type']))
 
-    @defer.inlineCallbacks
-    def _onGenThumbnails(self, client, command_elt, session_data, action, node):
+    async def _onGenThumbnails(self, client, command_elt, session_data, action, node):
         try:
             x_elt = next(command_elt.elements(data_form.NS_X_DATA, "x"))
             command_form = data_form.Form.fromElement(x_elt)
@@ -447,23 +441,23 @@
 
         if command_form is None or len(command_form.fields) == 0:
             # root request
-            defer.returnValue(self._getRootArgs())
+            return self._getRootArgs()
 
         elif found_file is None:
             # file selected, we retrieve it and ask for permissions
             try:
-                found_file = yield self._getFileData(client, session_data, command_form)
+                found_file = await self._getFileData(client, session_data, command_form)
             except WorkflowError as e:
-                defer.returnValue(e.err_args)
+                return e.err_args
 
             log.info("Generating thumbnails as requested")
-            yield self._genThumbs(client, requestor, found_file['namespace'], found_file)
+            await self._genThumbs(client, requestor, found_file['namespace'], found_file)
 
             # job done, we can end the session
             status = self._c.STATUS.COMPLETED
             payload = None
             note = (self._c.NOTE.INFO, _("thumbnails generated"))
-            defer.returnValue((payload, status, None, note))
+            return (payload, status, None, note)
 
     async def _onQuota(self, client, command_elt, session_data, action, node):
         requestor = session_data['requestor']
--- a/sat/plugins/plugin_misc_identity.py	Thu Jun 03 15:21:43 2021 +0200
+++ b/sat/plugins/plugin_misc_identity.py	Thu Jun 03 15:21:43 2021 +0200
@@ -142,9 +142,6 @@
 
         stored_data = await client._identity_storage.all()
 
-        self.host.memory.storage.getPrivates(
-            namespace="identity", binary=True, profile=client.profile)
-
         to_delete = []
 
         for key, value in stored_data.items():
--- a/sat/plugins/plugin_xep_0033.py	Thu Jun 03 15:21:43 2021 +0200
+++ b/sat/plugins/plugin_xep_0033.py	Thu Jun 03 15:21:43 2021 +0200
@@ -156,7 +156,9 @@
             d = defer.Deferred()
             if not skip_send:
                 d.addCallback(client.sendMessageData)
-            d.addCallback(client.messageAddToHistory)
+            d.addCallback(
+                lambda ret: defer.ensureDeferred(client.messageAddToHistory(ret))
+            )
             d.addCallback(client.messageSendToBridge)
             d.addErrback(lambda failure: failure.trap(exceptions.CancelError))
             return d.callback(mess_data)
--- a/sat/plugins/plugin_xep_0045.py	Thu Jun 03 15:21:43 2021 +0200
+++ b/sat/plugins/plugin_xep_0045.py	Thu Jun 03 15:21:43 2021 +0200
@@ -837,7 +837,8 @@
             elif client.muc_service is not None:
                 service = client.muc_service
             else:
-                msg = D_("No known default MUC service".format(unparsed))
+                msg = D_("No known default MUC service {unparsed}").format(
+                    unparsed=unparsed)
                 self.text_cmds.feedBack(client, msg, mess_data)
                 return False
         except jid.InvalidFormat:
@@ -1298,7 +1299,9 @@
         except AttributeError:
             mess_data = self.client.messageProt.parseMessage(message.element)
         if mess_data['message'] or mess_data['subject']:
-            return self.host.memory.addToHistory(self.client, mess_data)
+            return defer.ensureDeferred(
+                self.host.memory.addToHistory(self.client, mess_data)
+            )
         else:
             return defer.succeed(None)
 
--- a/sat/plugins/plugin_xep_0198.py	Thu Jun 03 15:21:43 2021 +0200
+++ b/sat/plugins/plugin_xep_0198.py	Thu Jun 03 15:21:43 2021 +0200
@@ -448,7 +448,7 @@
             d.addCallback(lambda __: client.roster.got_roster)
             if plg_0313 is not None:
                 # we retrieve one2one MAM archives
-                d.addCallback(lambda __: plg_0313.resume(client))
+                d.addCallback(lambda __: defer.ensureDeferred(plg_0313.resume(client)))
             # initial presence must be sent manually
             d.addCallback(lambda __: client.presence.available())
             if plg_0045 is not None:
--- a/sat/plugins/plugin_xep_0313.py	Thu Jun 03 15:21:43 2021 +0200
+++ b/sat/plugins/plugin_xep_0313.py	Thu Jun 03 15:21:43 2021 +0200
@@ -75,15 +75,14 @@
             out_sign='(a(sdssa{ss}a{ss}ss)ss)', method=self._getArchives,
             async_=True)
 
-    @defer.inlineCallbacks
-    def resume(self, client):
+    async def resume(self, client):
         """Retrieve one2one messages received since the last we have in local storage"""
-        stanza_id_data = yield self.host.memory.storage.getPrivates(
+        stanza_id_data = await self.host.memory.storage.getPrivates(
             mam.NS_MAM, [KEY_LAST_STANZA_ID], profile=client.profile)
         stanza_id = stanza_id_data.get(KEY_LAST_STANZA_ID)
         if stanza_id is None:
             log.info("can't retrieve last stanza ID, checking history")
-            last_mess = yield self.host.memory.historyGet(
+            last_mess = await self.host.memory.historyGet(
                 None, None, limit=1, filters={'not_types': C.MESS_TYPE_GROUPCHAT,
                                               'last_stanza_id': True},
                 profile=client.profile)
@@ -98,7 +97,7 @@
         complete = False
         count = 0
         while not complete:
-            mam_data = yield self.getArchives(client, mam_req,
+            mam_data = await self.getArchives(client, mam_req,
                                               service=client.jid.userhostJID())
             elt_list, rsm_response, mam_response = mam_data
             complete = mam_response["complete"]
@@ -141,7 +140,7 @@
                     # adding message to history
                     mess_data = client.messageProt.parseMessage(fwd_message_elt)
                     try:
-                        yield client.messageProt.addToHistory(mess_data)
+                        await client.messageProt.addToHistory(mess_data)
                     except exceptions.CancelError as e:
                         log.warning(
                             "message has not been added to history: {e}".format(e=e))
@@ -156,8 +155,8 @@
             log.info(_("We have received {num_mess} message(s) while offline.")
                 .format(num_mess=count))
 
-    def profileConnected(self, client):
-        return self.resume(client)
+    async def profileConnected(self, client):
+        await self.resume(client)
 
     def getHandler(self, client):
         mam_client = client._mam = SatMAMClient(self)
--- a/sat/plugins/plugin_xep_0329.py	Thu Jun 03 15:21:43 2021 +0200
+++ b/sat/plugins/plugin_xep_0329.py	Thu Jun 03 15:21:43 2021 +0200
@@ -19,6 +19,7 @@
 import mimetypes
 import json
 import os
+import traceback
 from pathlib import Path
 from typing import Optional, Dict
 from zope.interface import implementer
@@ -33,6 +34,7 @@
 from sat.core.constants import Const as C
 from sat.core.log import getLogger
 from sat.tools import stream
+from sat.tools import utils
 from sat.tools.common import regex
 
 
@@ -453,9 +455,9 @@
         iq_elt.handled = True
         node = iq_elt.query.getAttribute("node")
         if not node:
-            d = defer.maybeDeferred(root_nodes_cb, client, iq_elt)
+            d = utils.asDeferred(root_nodes_cb, client, iq_elt)
         else:
-            d = defer.maybeDeferred(files_from_node_cb, client, iq_elt, node)
+            d = utils.asDeferred(files_from_node_cb, client, iq_elt, node)
         d.addErrback(
             lambda failure_: log.error(
                 _("error while retrieving files: {msg}").format(msg=failure_)
@@ -589,10 +591,9 @@
         @return (tuple[jid.JID, jid.JID]): peer_jid and owner
         """
 
-    @defer.inlineCallbacks
-    def _compGetRootNodesCb(self, client, iq_elt):
+    async def _compGetRootNodesCb(self, client, iq_elt):
         peer_jid, owner = client.getOwnerAndPeer(iq_elt)
-        files_data = yield self.host.memory.getFiles(
+        files_data = await self.host.memory.getFiles(
             client,
             peer_jid=peer_jid,
             parent="",
@@ -607,8 +608,7 @@
             directory_elt["name"] = name
         client.send(iq_result_elt)
 
-    @defer.inlineCallbacks
-    def _compGetFilesFromNodeCb(self, client, iq_elt, node_path):
+    async def _compGetFilesFromNodeCb(self, client, iq_elt, node_path):
         """Retrieve files from local files repository according to permissions
 
         result stanza is then built and sent to requestor
@@ -618,7 +618,7 @@
         """
         peer_jid, owner = client.getOwnerAndPeer(iq_elt)
         try:
-            files_data = yield self.host.memory.getFiles(
+            files_data = await self.host.memory.getFiles(
                 client, peer_jid=peer_jid, path=node_path, owner=owner
             )
         except exceptions.NotFound:
@@ -628,7 +628,8 @@
             self._iqError(client, iq_elt, condition='not-allowed')
             return
         except Exception as e:
-            log.error("internal server error: {e}".format(e=e))
+            tb = traceback.format_tb(e.__traceback__)
+            log.error(f"internal server error: {e}\n{''.join(tb)}")
             self._iqError(client, iq_elt, condition='internal-server-error')
             return
         iq_result_elt = xmlstream.toResponse(iq_elt, "result")
--- a/sat/plugins/plugin_xep_0384.py	Thu Jun 03 15:21:43 2021 +0200
+++ b/sat/plugins/plugin_xep_0384.py	Thu Jun 03 15:21:43 2021 +0200
@@ -114,7 +114,10 @@
     @return (defer.Deferred): deferred instance linked to the promise
     """
     d = defer.Deferred()
-    promise_.then(d.callback, d.errback)
+    promise_.then(
+        lambda result: reactor.callLater(0, d.callback, result),
+        lambda exc: reactor.callLater(0, d.errback, exc)
+    )
     return d
 
 
@@ -141,6 +144,26 @@
         deferred.addCallback(partial(callback, True))
         deferred.addErrback(partial(callback, False))
 
+    def _callMainThread(self, callback, method, *args, check_jid=None):
+        d = method(*args)
+        if check_jid is not None:
+            check_jid_d = self._checkJid(check_jid)
+            check_jid_d.addCallback(lambda __: d)
+            d = check_jid_d
+        if callback is not None:
+            d.addCallback(partial(callback, True))
+            d.addErrback(partial(callback, False))
+
+    def _call(self, callback, method, *args, check_jid=None):
+        """Create Deferred and add Promise callback to it
+
+        This method use reactor.callLater to launch Deferred in main thread
+        @param check_jid: run self._checkJid before method
+        """
+        reactor.callLater(
+            0, self._callMainThread, callback, method, *args, check_jid=check_jid
+        )
+
     def _checkJid(self, bare_jid):
         """Check if jid is known, and store it if not
 
@@ -164,71 +187,50 @@
         callback(True, None)
 
     def loadState(self, callback):
-        d = self.data.get(KEY_STATE)
-        self.setCb(d, callback)
+        self._call(callback, self.data.get, KEY_STATE)
 
     def storeState(self, callback, state):
-        d = self.data.force(KEY_STATE, state)
-        self.setCb(d, callback)
+        self._call(callback, self.data.force, KEY_STATE, state)
 
     def loadSession(self, callback, bare_jid, device_id):
         key = '\n'.join([KEY_SESSION, bare_jid, str(device_id)])
-        d = self.data.get(key)
-        self.setCb(d, callback)
+        self._call(callback, self.data.get, key)
 
     def storeSession(self, callback, bare_jid, device_id, session):
         key = '\n'.join([KEY_SESSION, bare_jid, str(device_id)])
-        d = self.data.force(key, session)
-        self.setCb(d, callback)
+        self._call(callback, self._data.force, key, session)
 
     def deleteSession(self, callback, bare_jid, device_id):
         key = '\n'.join([KEY_SESSION, bare_jid, str(device_id)])
-        d = self.data.remove(key)
-        self.setCb(d, callback)
+        self._call(callback, self.data.remove, key)
 
     def loadActiveDevices(self, callback, bare_jid):
         key = '\n'.join([KEY_ACTIVE_DEVICES, bare_jid])
-        d = self.data.get(key, {})
-        if callback is not None:
-            self.setCb(d, callback)
-        return d
+        self._call(callback, self.data.get, key, {})
 
     def loadInactiveDevices(self, callback, bare_jid):
         key = '\n'.join([KEY_INACTIVE_DEVICES, bare_jid])
-        d = self.data.get(key, {})
-        if callback is not None:
-            self.setCb(d, callback)
-        return d
+        self._call(callback, self.data.get, key, {})
 
     def storeActiveDevices(self, callback, bare_jid, devices):
         key = '\n'.join([KEY_ACTIVE_DEVICES, bare_jid])
-        d = self._checkJid(bare_jid)
-        d.addCallback(lambda _: self.data.force(key, devices))
-        self.setCb(d, callback)
+        self._call(callback, self.data.force, key, devices, check_jid=bare_jid)
 
     def storeInactiveDevices(self, callback, bare_jid, devices):
         key = '\n'.join([KEY_INACTIVE_DEVICES, bare_jid])
-        d = self._checkJid(bare_jid)
-        d.addCallback(lambda _: self.data.force(key, devices))
-        self.setCb(d, callback)
+        self._call(callback, self.data.force, key, devices, check_jid=bare_jid)
 
     def storeTrust(self, callback, bare_jid, device_id, trust):
         key = '\n'.join([KEY_TRUST, bare_jid, str(device_id)])
-        d = self.data.force(key, trust)
-        self.setCb(d, callback)
+        self._call(callback, self.data.force, key, trust)
 
     def loadTrust(self, callback, bare_jid, device_id):
         key = '\n'.join([KEY_TRUST, bare_jid, str(device_id)])
-        d = self.data.get(key)
-        if callback is not None:
-            self.setCb(d, callback)
-        return d
+        self._call(callback, self.data.get, key)
 
     def listJIDs(self, callback):
-        d = defer.succeed(self.all_jids)
         if callback is not None:
-            self.setCb(d, callback)
-        return d
+            callback(True, self.all_jids)
 
     def _deleteJID_logResults(self, results):
         failed = [success for success, __ in results if not success]
@@ -266,8 +268,7 @@
         d.addCallback(self._deleteJID_logResults)
         return d
 
-    def deleteJID(self, callback, bare_jid):
-        """Retrieve all (in)actives devices of bare_jid, and delete all related keys"""
+    def _deleteJID(self, callback, bare_jid):
         d_list = []
 
         key = '\n'.join([KEY_ACTIVE_DEVICES, bare_jid])
@@ -284,7 +285,10 @@
         d.addCallback(self._deleteJID_gotDevices, bare_jid)
         if callback is not None:
             self.setCb(d, callback)
-        return d
+
+    def deleteJID(self, callback, bare_jid):
+        """Retrieve all (in)actives devices of bare_jid, and delete all related keys"""
+        reactor.callLater(0, self._deleteJID, callback, bare_jid)
 
 
 class SatOTPKPolicy(omemo.DefaultOTPKPolicy):
@@ -728,7 +732,7 @@
             while device_id in devices:
                 device_id = random.randint(1, 2**31-1)
             # and we save it
-            persistent_dict[KEY_DEVICE_ID] = device_id
+            await persistent_dict.aset(KEY_DEVICE_ID, device_id)
 
         log.debug(f"our OMEMO device id is {device_id}")