diff sat/memory/persistent.py @ 3541:888109774673

core: various changes and fixes to work with new storage and D-Bus bridge: - fixes coroutines handling in various places - fixes types which are not serialised by Tx DBus - XEP-0384: call storage methods in main thread in XEP: Python OMEMO's Promise use thread which prevent the use of AsyncIO loop. To work around that, callLater is used to launch storage method in main thread. This is a temporary workaround, as Python OMEMO should get rid of Promise implementation and threads soon.
author Goffi <goffi@goffi.org>
date Thu, 03 Jun 2021 15:21:43 +0200
parents be6d91572633
children cbb988a6f507
line wrap: on
line diff
--- a/sat/memory/persistent.py	Thu Jun 03 15:21:43 2021 +0200
+++ b/sat/memory/persistent.py	Thu Jun 03 15:21:43 2021 +0200
@@ -17,10 +17,13 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
+from twisted.internet import defer
+from twisted.python import failure
 from sat.core.i18n import _
 from sat.core.log import getLogger
+
+
 log = getLogger(__name__)
-from twisted.python import failure
 
 
 class MemoryNotInitializedError(Exception):
@@ -57,7 +60,9 @@
         need to be called before any other operation
         @return: defers the PersistentDict instance itself
         """
-        d = self.storage.getPrivates(self.namespace, binary=self.binary, profile=self.profile)
+        d = defer.ensureDeferred(self.storage.getPrivates(
+            self.namespace, binary=self.binary, profile=self.profile
+        ))
         d.addCallback(self._setCache)
         d.addCallback(lambda __: self)
         return d
@@ -111,8 +116,11 @@
         return self._cache.__getitem__(key)
 
     def __setitem__(self, key, value):
-        self.storage.setPrivateValue(self.namespace, key, value, self.binary,
-                                     self.profile)
+        defer.ensureDeferred(
+            self.storage.setPrivateValue(
+                self.namespace, key, value, self.binary, self.profile
+            )
+        )
         return self._cache.__setitem__(key, value)
 
     def __delitem__(self, key):
@@ -130,8 +138,11 @@
     def aset(self, key, value):
         """Async set, return a Deferred fired when value is actually stored"""
         self._cache.__setitem__(key, value)
-        return self.storage.setPrivateValue(self.namespace, key, value,
-                                            self.binary, self.profile)
+        return defer.ensureDeferred(
+            self.storage.setPrivateValue(
+                self.namespace, key, value, self.binary, self.profile
+            )
+        )
 
     def adel(self, key):
         """Async del, return a Deferred fired when value is actually deleted"""
@@ -151,8 +162,11 @@
 
         @return: deferred fired when data is actually saved
         """
-        return self.storage.setPrivateValue(self.namespace, name, self._cache[name],
-                                            self.binary, self.profile)
+        return defer.ensureDeferred(
+            self.storage.setPrivateValue(
+                self.namespace, name, self._cache[name], self.binary, self.profile
+            )
+        )
 
 
 class PersistentBinaryDict(PersistentDict):
@@ -178,12 +192,16 @@
         raise NotImplementedError
 
     def items(self):
-        d = self.storage.getPrivates(self.namespace, binary=self.binary, profile=self.profile)
+        d = defer.ensureDeferred(self.storage.getPrivates(
+            self.namespace, binary=self.binary, profile=self.profile
+        ))
         d.addCallback(lambda data_dict: data_dict.items())
         return d
 
     def all(self):
-        return self.storage.getPrivates(self.namespace, binary=self.binary, profile=self.profile)
+        return defer.ensureDeferred(self.storage.getPrivates(
+            self.namespace, binary=self.binary, profile=self.profile
+        ))
 
     def __repr__(self):
         raise NotImplementedError
@@ -234,14 +252,18 @@
 
     def __getitem__(self, key):
         """get the value as a Deferred"""
-        d = self.storage.getPrivates(self.namespace, keys=[key], binary=self.binary,
-                                     profile=self.profile)
+        d = defer.ensureDeferred(self.storage.getPrivates(
+            self.namespace, keys=[key], binary=self.binary, profile=self.profile
+        ))
         d.addCallback(self._data2value, key)
         return d
 
     def __setitem__(self, key, value):
-        self.storage.setPrivateValue(self.namespace, key, value, self.binary,
-                                     self.profile)
+        defer.ensureDeferred(
+            self.storage.setPrivateValue(
+                self.namespace, key, value, self.binary, self.profile
+            )
+        )
 
     def __delitem__(self, key):
         self.storage.delPrivateValue(self.namespace, key, self.binary, self.profile)
@@ -259,8 +281,11 @@
         """Async set, return a Deferred fired when value is actually stored"""
         # FIXME: redundant with force, force must be removed
         # XXX: similar as PersistentDict.aset, but doesn't use cache
-        return self.storage.setPrivateValue(self.namespace, key, value,
-                                            self.binary, self.profile)
+        return defer.ensureDeferred(
+            self.storage.setPrivateValue(
+                self.namespace, key, value, self.binary, self.profile
+            )
+        )
 
     def adel(self, key):
         """Async del, return a Deferred fired when value is actually deleted"""
@@ -277,7 +302,11 @@
         @param value(object): value is needed for LazyPersistentBinaryDict
         @return: deferred fired when data is actually saved
         """
-        return self.storage.setPrivateValue(self.namespace, name, value, self.binary, self.profile)
+        return defer.ensureDeferred(
+            self.storage.setPrivateValue(
+                self.namespace, name, value, self.binary, self.profile
+            )
+        )
 
     def remove(self, key):
         """Delete a key from sotrage, and return a deferred called when it's done