diff sat/plugins/plugin_comp_file_sharing.py @ 3541:888109774673

core: various changes and fixes to work with new storage and D-Bus bridge: - fixes coroutines handling in various places - fixes types which are not serialised by Tx DBus - XEP-0384: call storage methods in main thread in XEP: Python OMEMO's Promise use thread which prevent the use of AsyncIO loop. To work around that, callLater is used to launch storage method in main thread. This is a temporary workaround, as Python OMEMO should get rid of Promise implementation and threads soon.
author Goffi <goffi@goffi.org>
date Thu, 03 Jun 2021 15:21:43 +0200
parents ab72b8ac3bd2
children 742e466fa000
line wrap: on
line diff
--- a/sat/plugins/plugin_comp_file_sharing.py	Thu Jun 03 15:21:43 2021 +0200
+++ b/sat/plugins/plugin_comp_file_sharing.py	Thu Jun 03 15:21:43 2021 +0200
@@ -31,6 +31,7 @@
 from sat.core.log import getLogger
 from sat.tools import stream
 from sat.tools import video
+from sat.tools.utils import ensure_deferred
 from sat.tools.common import regex
 from sat.tools.common import uri
 from sat.tools.common import files_utils
@@ -487,7 +488,7 @@
                 else:
                     await self.generate_thumbnails(extra, thumb_path)
 
-        self.host.memory.setFile(
+        await self.host.memory.setFile(
             client,
             name=name,
             version="",
@@ -546,8 +547,7 @@
         )
         return False, defer.succeed(True)
 
-    @defer.inlineCallbacks
-    def _retrieveFiles(
+    async def _retrieveFiles(
         self, client, session, content_data, content_name, file_data, file_elt
     ):
         """This method retrieve a file on request, and send if after checking permissions"""
@@ -557,7 +557,7 @@
         else:
             owner = peer_jid
         try:
-            found_files = yield self.host.memory.getFiles(
+            found_files = await self.host.memory.getFiles(
                 client,
                 peer_jid=peer_jid,
                 name=file_data.get("name"),
@@ -575,13 +575,13 @@
                     peer_jid=peer_jid, name=file_data.get("name")
                 )
             )
-            defer.returnValue(False)
+            return False
 
         if not found_files:
             log.warning(
                 _("no matching file found ({file_data})").format(file_data=file_data)
             )
-            defer.returnValue(False)
+            return False
 
         # we only use the first found file
         found_file = found_files[0]
@@ -607,7 +607,7 @@
             size=size,
             data_cb=lambda data: hasher.update(data),
         )
-        defer.returnValue(True)
+        return True
 
     def _fileSendingRequestTrigger(
         self, client, session, content_data, content_name, file_data, file_elt
@@ -617,9 +617,9 @@
         else:
             return (
                 False,
-                self._retrieveFiles(
+                defer.ensureDeferred(self._retrieveFiles(
                     client, session, content_data, content_name, file_data, file_elt
-                ),
+                )),
             )
 
     ## HTTP Upload ##
@@ -757,11 +757,10 @@
             raise error.StanzaError("item-not-found")
         return file_id
 
-    @defer.inlineCallbacks
-    def getFileData(self, requestor, nodeIdentifier):
+    async def getFileData(self, requestor, nodeIdentifier):
         file_id = self._getFileId(nodeIdentifier)
         try:
-            files = yield self.host.memory.getFiles(self.parent, requestor, file_id)
+            files = await self.host.memory.getFiles(self.parent, requestor, file_id)
         except (exceptions.NotFound, exceptions.PermissionError):
             # we don't differenciate between NotFound and PermissionError
             # to avoid leaking information on existing files
@@ -770,7 +769,7 @@
             raise error.StanzaError("item-not-found")
         if len(files) > 1:
             raise error.InternalError("there should be only one file")
-        defer.returnValue(files[0])
+        return files[0]
 
     def commentsUpdate(self, extra, new_comments, peer_jid):
         """update comments (replace or insert new_comments)
@@ -825,10 +824,10 @@
             iq_elt = iq_elt.parent
         return iq_elt["from"]
 
-    @defer.inlineCallbacks
-    def publish(self, requestor, service, nodeIdentifier, items):
+    @ensure_deferred
+    async def publish(self, requestor, service, nodeIdentifier, items):
         #  we retrieve file a first time to check authorisations
-        file_data = yield self.getFileData(requestor, nodeIdentifier)
+        file_data = await self.getFileData(requestor, nodeIdentifier)
         file_id = file_data["id"]
         comments = [(item["id"], self._getFrom(item), item.toXml()) for item in items]
         if requestor.userhostJID() == file_data["owner"]:
@@ -837,24 +836,24 @@
             peer_jid = requestor.userhost()
         update_cb = partial(self.commentsUpdate, new_comments=comments, peer_jid=peer_jid)
         try:
-            yield self.host.memory.fileUpdate(file_id, "extra", update_cb)
+            await self.host.memory.fileUpdate(file_id, "extra", update_cb)
         except exceptions.PermissionError:
             raise error.StanzaError("not-authorized")
 
-    @defer.inlineCallbacks
-    def items(self, requestor, service, nodeIdentifier, maxItems, itemIdentifiers):
-        file_data = yield self.getFileData(requestor, nodeIdentifier)
+    @ensure_deferred
+    async def items(self, requestor, service, nodeIdentifier, maxItems, itemIdentifiers):
+        file_data = await self.getFileData(requestor, nodeIdentifier)
         comments = file_data["extra"].get("comments", [])
         if itemIdentifiers:
             defer.returnValue(
                 [generic.parseXml(c[2]) for c in comments if c[0] in itemIdentifiers]
             )
         else:
-            defer.returnValue([generic.parseXml(c[2]) for c in comments])
+            return [generic.parseXml(c[2]) for c in comments]
 
-    @defer.inlineCallbacks
-    def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
-        file_data = yield self.getFileData(requestor, nodeIdentifier)
+    @ensure_deferred
+    async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
+        file_data = await self.getFileData(requestor, nodeIdentifier)
         file_id = file_data["id"]
         try:
             comments = file_data["extra"]["comments"]
@@ -879,4 +878,4 @@
                 raise error.StanzaError("not-authorized")
 
         remove_cb = partial(self.commentsDelete, comments=to_remove)
-        yield self.host.memory.fileUpdate(file_id, "extra", remove_cb)
+        await self.host.memory.fileUpdate(file_id, "extra", remove_cb)