comparison sat/memory/persistent.py @ 3541:888109774673

core: various changes and fixes to work with new storage and D-Bus bridge: - fixes coroutines handling in various places - fixes types which are not serialised by Tx DBus - XEP-0384: call storage methods in main thread in XEP: Python OMEMO's Promise use thread which prevent the use of AsyncIO loop. To work around that, callLater is used to launch storage method in main thread. This is a temporary workaround, as Python OMEMO should get rid of Promise implementation and threads soon.
author Goffi <goffi@goffi.org>
date Thu, 03 Jun 2021 15:21:43 +0200
parents be6d91572633
children cbb988a6f507
comparison
equal deleted inserted replaced
3540:aa58451b77ba 3541:888109774673
15 # GNU Affero General Public License for more details. 15 # GNU Affero General Public License for more details.
16 16
17 # You should have received a copy of the GNU Affero General Public License 17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. 18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 19
20 from twisted.internet import defer
21 from twisted.python import failure
20 from sat.core.i18n import _ 22 from sat.core.i18n import _
21 from sat.core.log import getLogger 23 from sat.core.log import getLogger
24
25
22 log = getLogger(__name__) 26 log = getLogger(__name__)
23 from twisted.python import failure
24 27
25 28
26 class MemoryNotInitializedError(Exception): 29 class MemoryNotInitializedError(Exception):
27 pass 30 pass
28 31
55 """Load persistent data from storage. 58 """Load persistent data from storage.
56 59
57 need to be called before any other operation 60 need to be called before any other operation
58 @return: defers the PersistentDict instance itself 61 @return: defers the PersistentDict instance itself
59 """ 62 """
60 d = self.storage.getPrivates(self.namespace, binary=self.binary, profile=self.profile) 63 d = defer.ensureDeferred(self.storage.getPrivates(
64 self.namespace, binary=self.binary, profile=self.profile
65 ))
61 d.addCallback(self._setCache) 66 d.addCallback(self._setCache)
62 d.addCallback(lambda __: self) 67 d.addCallback(lambda __: self)
63 return d 68 return d
64 69
65 def iteritems(self): 70 def iteritems(self):
109 114
110 def __getitem__(self, key): 115 def __getitem__(self, key):
111 return self._cache.__getitem__(key) 116 return self._cache.__getitem__(key)
112 117
113 def __setitem__(self, key, value): 118 def __setitem__(self, key, value):
114 self.storage.setPrivateValue(self.namespace, key, value, self.binary, 119 defer.ensureDeferred(
115 self.profile) 120 self.storage.setPrivateValue(
121 self.namespace, key, value, self.binary, self.profile
122 )
123 )
116 return self._cache.__setitem__(key, value) 124 return self._cache.__setitem__(key, value)
117 125
118 def __delitem__(self, key): 126 def __delitem__(self, key):
119 self.storage.delPrivateValue(self.namespace, key, self.binary, self.profile) 127 self.storage.delPrivateValue(self.namespace, key, self.binary, self.profile)
120 return self._cache.__delitem__(key) 128 return self._cache.__delitem__(key)
128 return self._cache.get(key, default) 136 return self._cache.get(key, default)
129 137
130 def aset(self, key, value): 138 def aset(self, key, value):
131 """Async set, return a Deferred fired when value is actually stored""" 139 """Async set, return a Deferred fired when value is actually stored"""
132 self._cache.__setitem__(key, value) 140 self._cache.__setitem__(key, value)
133 return self.storage.setPrivateValue(self.namespace, key, value, 141 return defer.ensureDeferred(
134 self.binary, self.profile) 142 self.storage.setPrivateValue(
143 self.namespace, key, value, self.binary, self.profile
144 )
145 )
135 146
136 def adel(self, key): 147 def adel(self, key):
137 """Async del, return a Deferred fired when value is actually deleted""" 148 """Async del, return a Deferred fired when value is actually deleted"""
138 self._cache.__delitem__(key) 149 self._cache.__delitem__(key)
139 return self.storage.delPrivateValue( 150 return self.storage.delPrivateValue(
149 def force(self, name): 160 def force(self, name):
150 """Force saving of an attribute to storage 161 """Force saving of an attribute to storage
151 162
152 @return: deferred fired when data is actually saved 163 @return: deferred fired when data is actually saved
153 """ 164 """
154 return self.storage.setPrivateValue(self.namespace, name, self._cache[name], 165 return defer.ensureDeferred(
155 self.binary, self.profile) 166 self.storage.setPrivateValue(
167 self.namespace, name, self._cache[name], self.binary, self.profile
168 )
169 )
156 170
157 171
158 class PersistentBinaryDict(PersistentDict): 172 class PersistentBinaryDict(PersistentDict):
159 """Persistent dict where value can be any python data (instead of string only)""" 173 """Persistent dict where value can be any python data (instead of string only)"""
160 binary = True 174 binary = True
176 190
177 def iteritems(self): 191 def iteritems(self):
178 raise NotImplementedError 192 raise NotImplementedError
179 193
180 def items(self): 194 def items(self):
181 d = self.storage.getPrivates(self.namespace, binary=self.binary, profile=self.profile) 195 d = defer.ensureDeferred(self.storage.getPrivates(
196 self.namespace, binary=self.binary, profile=self.profile
197 ))
182 d.addCallback(lambda data_dict: data_dict.items()) 198 d.addCallback(lambda data_dict: data_dict.items())
183 return d 199 return d
184 200
185 def all(self): 201 def all(self):
186 return self.storage.getPrivates(self.namespace, binary=self.binary, profile=self.profile) 202 return defer.ensureDeferred(self.storage.getPrivates(
203 self.namespace, binary=self.binary, profile=self.profile
204 ))
187 205
188 def __repr__(self): 206 def __repr__(self):
189 raise NotImplementedError 207 raise NotImplementedError
190 208
191 def __str__(self): 209 def __str__(self):
232 # into debugger in debug mode. 250 # into debugger in debug mode.
233 raise failure.Failure(e) 251 raise failure.Failure(e)
234 252
235 def __getitem__(self, key): 253 def __getitem__(self, key):
236 """get the value as a Deferred""" 254 """get the value as a Deferred"""
237 d = self.storage.getPrivates(self.namespace, keys=[key], binary=self.binary, 255 d = defer.ensureDeferred(self.storage.getPrivates(
238 profile=self.profile) 256 self.namespace, keys=[key], binary=self.binary, profile=self.profile
257 ))
239 d.addCallback(self._data2value, key) 258 d.addCallback(self._data2value, key)
240 return d 259 return d
241 260
242 def __setitem__(self, key, value): 261 def __setitem__(self, key, value):
243 self.storage.setPrivateValue(self.namespace, key, value, self.binary, 262 defer.ensureDeferred(
244 self.profile) 263 self.storage.setPrivateValue(
264 self.namespace, key, value, self.binary, self.profile
265 )
266 )
245 267
246 def __delitem__(self, key): 268 def __delitem__(self, key):
247 self.storage.delPrivateValue(self.namespace, key, self.binary, self.profile) 269 self.storage.delPrivateValue(self.namespace, key, self.binary, self.profile)
248 270
249 def _defaultOrException(self, failure_, default): 271 def _defaultOrException(self, failure_, default):
257 279
258 def aset(self, key, value): 280 def aset(self, key, value):
259 """Async set, return a Deferred fired when value is actually stored""" 281 """Async set, return a Deferred fired when value is actually stored"""
260 # FIXME: redundant with force, force must be removed 282 # FIXME: redundant with force, force must be removed
261 # XXX: similar as PersistentDict.aset, but doesn't use cache 283 # XXX: similar as PersistentDict.aset, but doesn't use cache
262 return self.storage.setPrivateValue(self.namespace, key, value, 284 return defer.ensureDeferred(
263 self.binary, self.profile) 285 self.storage.setPrivateValue(
286 self.namespace, key, value, self.binary, self.profile
287 )
288 )
264 289
265 def adel(self, key): 290 def adel(self, key):
266 """Async del, return a Deferred fired when value is actually deleted""" 291 """Async del, return a Deferred fired when value is actually deleted"""
267 # XXX: similar as PersistentDict.adel, but doesn't use cache 292 # XXX: similar as PersistentDict.adel, but doesn't use cache
268 return self.storage.delPrivateValue( 293 return self.storage.delPrivateValue(
275 """Force saving of an attribute to storage 300 """Force saving of an attribute to storage
276 301
277 @param value(object): value is needed for LazyPersistentBinaryDict 302 @param value(object): value is needed for LazyPersistentBinaryDict
278 @return: deferred fired when data is actually saved 303 @return: deferred fired when data is actually saved
279 """ 304 """
280 return self.storage.setPrivateValue(self.namespace, name, value, self.binary, self.profile) 305 return defer.ensureDeferred(
306 self.storage.setPrivateValue(
307 self.namespace, name, value, self.binary, self.profile
308 )
309 )
281 310
282 def remove(self, key): 311 def remove(self, key):
283 """Delete a key from sotrage, and return a deferred called when it's done 312 """Delete a key from sotrage, and return a deferred called when it's done
284 313
285 @param key(unicode): key to delete 314 @param key(unicode): key to delete