comparison sat/plugins/plugin_xep_0384.py @ 3541:888109774673

core: various changes and fixes to work with new storage and D-Bus bridge: - fixes coroutines handling in various places - fixes types which are not serialised by Tx DBus - XEP-0384: call storage methods in main thread in XEP: Python OMEMO's Promise use thread which prevent the use of AsyncIO loop. To work around that, callLater is used to launch storage method in main thread. This is a temporary workaround, as Python OMEMO should get rid of Promise implementation and threads soon.
author Goffi <goffi@goffi.org>
date Thu, 03 Jun 2021 15:21:43 +0200
parents be6d91572633
children edc79cefe968
comparison
equal deleted inserted replaced
3540:aa58451b77ba 3541:888109774673
112 112
113 @param promise_(promise.Promise): promise to convert 113 @param promise_(promise.Promise): promise to convert
114 @return (defer.Deferred): deferred instance linked to the promise 114 @return (defer.Deferred): deferred instance linked to the promise
115 """ 115 """
116 d = defer.Deferred() 116 d = defer.Deferred()
117 promise_.then(d.callback, d.errback) 117 promise_.then(
118 lambda result: reactor.callLater(0, d.callback, result),
119 lambda exc: reactor.callLater(0, d.errback, exc)
120 )
118 return d 121 return d
119 122
120 123
121 class OmemoStorage(omemo.Storage): 124 class OmemoStorage(omemo.Storage):
122 125
138 This method use partial to call callback with boolean then result when 141 This method use partial to call callback with boolean then result when
139 Deferred is called 142 Deferred is called
140 """ 143 """
141 deferred.addCallback(partial(callback, True)) 144 deferred.addCallback(partial(callback, True))
142 deferred.addErrback(partial(callback, False)) 145 deferred.addErrback(partial(callback, False))
146
147 def _callMainThread(self, callback, method, *args, check_jid=None):
148 d = method(*args)
149 if check_jid is not None:
150 check_jid_d = self._checkJid(check_jid)
151 check_jid_d.addCallback(lambda __: d)
152 d = check_jid_d
153 if callback is not None:
154 d.addCallback(partial(callback, True))
155 d.addErrback(partial(callback, False))
156
157 def _call(self, callback, method, *args, check_jid=None):
158 """Create Deferred and add Promise callback to it
159
160 This method use reactor.callLater to launch Deferred in main thread
161 @param check_jid: run self._checkJid before method
162 """
163 reactor.callLater(
164 0, self._callMainThread, callback, method, *args, check_jid=check_jid
165 )
143 166
144 def _checkJid(self, bare_jid): 167 def _checkJid(self, bare_jid):
145 """Check if jid is known, and store it if not 168 """Check if jid is known, and store it if not
146 169
147 @param bare_jid(unicode): bare jid to check 170 @param bare_jid(unicode): bare jid to check
162 if own_bare_jid != self.own_bare_jid_s or own_device_id != self.device_id: 185 if own_bare_jid != self.own_bare_jid_s or own_device_id != self.device_id:
163 raise exceptions.InternalError('bare jid or device id inconsistency!') 186 raise exceptions.InternalError('bare jid or device id inconsistency!')
164 callback(True, None) 187 callback(True, None)
165 188
166 def loadState(self, callback): 189 def loadState(self, callback):
167 d = self.data.get(KEY_STATE) 190 self._call(callback, self.data.get, KEY_STATE)
168 self.setCb(d, callback)
169 191
170 def storeState(self, callback, state): 192 def storeState(self, callback, state):
171 d = self.data.force(KEY_STATE, state) 193 self._call(callback, self.data.force, KEY_STATE, state)
172 self.setCb(d, callback)
173 194
174 def loadSession(self, callback, bare_jid, device_id): 195 def loadSession(self, callback, bare_jid, device_id):
175 key = '\n'.join([KEY_SESSION, bare_jid, str(device_id)]) 196 key = '\n'.join([KEY_SESSION, bare_jid, str(device_id)])
176 d = self.data.get(key) 197 self._call(callback, self.data.get, key)
177 self.setCb(d, callback)
178 198
179 def storeSession(self, callback, bare_jid, device_id, session): 199 def storeSession(self, callback, bare_jid, device_id, session):
180 key = '\n'.join([KEY_SESSION, bare_jid, str(device_id)]) 200 key = '\n'.join([KEY_SESSION, bare_jid, str(device_id)])
181 d = self.data.force(key, session) 201 self._call(callback, self._data.force, key, session)
182 self.setCb(d, callback)
183 202
184 def deleteSession(self, callback, bare_jid, device_id): 203 def deleteSession(self, callback, bare_jid, device_id):
185 key = '\n'.join([KEY_SESSION, bare_jid, str(device_id)]) 204 key = '\n'.join([KEY_SESSION, bare_jid, str(device_id)])
186 d = self.data.remove(key) 205 self._call(callback, self.data.remove, key)
187 self.setCb(d, callback)
188 206
189 def loadActiveDevices(self, callback, bare_jid): 207 def loadActiveDevices(self, callback, bare_jid):
190 key = '\n'.join([KEY_ACTIVE_DEVICES, bare_jid]) 208 key = '\n'.join([KEY_ACTIVE_DEVICES, bare_jid])
191 d = self.data.get(key, {}) 209 self._call(callback, self.data.get, key, {})
192 if callback is not None:
193 self.setCb(d, callback)
194 return d
195 210
196 def loadInactiveDevices(self, callback, bare_jid): 211 def loadInactiveDevices(self, callback, bare_jid):
197 key = '\n'.join([KEY_INACTIVE_DEVICES, bare_jid]) 212 key = '\n'.join([KEY_INACTIVE_DEVICES, bare_jid])
198 d = self.data.get(key, {}) 213 self._call(callback, self.data.get, key, {})
199 if callback is not None:
200 self.setCb(d, callback)
201 return d
202 214
203 def storeActiveDevices(self, callback, bare_jid, devices): 215 def storeActiveDevices(self, callback, bare_jid, devices):
204 key = '\n'.join([KEY_ACTIVE_DEVICES, bare_jid]) 216 key = '\n'.join([KEY_ACTIVE_DEVICES, bare_jid])
205 d = self._checkJid(bare_jid) 217 self._call(callback, self.data.force, key, devices, check_jid=bare_jid)
206 d.addCallback(lambda _: self.data.force(key, devices))
207 self.setCb(d, callback)
208 218
209 def storeInactiveDevices(self, callback, bare_jid, devices): 219 def storeInactiveDevices(self, callback, bare_jid, devices):
210 key = '\n'.join([KEY_INACTIVE_DEVICES, bare_jid]) 220 key = '\n'.join([KEY_INACTIVE_DEVICES, bare_jid])
211 d = self._checkJid(bare_jid) 221 self._call(callback, self.data.force, key, devices, check_jid=bare_jid)
212 d.addCallback(lambda _: self.data.force(key, devices))
213 self.setCb(d, callback)
214 222
215 def storeTrust(self, callback, bare_jid, device_id, trust): 223 def storeTrust(self, callback, bare_jid, device_id, trust):
216 key = '\n'.join([KEY_TRUST, bare_jid, str(device_id)]) 224 key = '\n'.join([KEY_TRUST, bare_jid, str(device_id)])
217 d = self.data.force(key, trust) 225 self._call(callback, self.data.force, key, trust)
218 self.setCb(d, callback)
219 226
220 def loadTrust(self, callback, bare_jid, device_id): 227 def loadTrust(self, callback, bare_jid, device_id):
221 key = '\n'.join([KEY_TRUST, bare_jid, str(device_id)]) 228 key = '\n'.join([KEY_TRUST, bare_jid, str(device_id)])
222 d = self.data.get(key) 229 self._call(callback, self.data.get, key)
230
231 def listJIDs(self, callback):
223 if callback is not None: 232 if callback is not None:
224 self.setCb(d, callback) 233 callback(True, self.all_jids)
225 return d
226
227 def listJIDs(self, callback):
228 d = defer.succeed(self.all_jids)
229 if callback is not None:
230 self.setCb(d, callback)
231 return d
232 234
233 def _deleteJID_logResults(self, results): 235 def _deleteJID_logResults(self, results):
234 failed = [success for success, __ in results if not success] 236 failed = [success for success, __ in results if not success]
235 if failed: 237 if failed:
236 log.warning( 238 log.warning(
264 d_list.append(lambda __: self.data.force(KEY_ALL_JIDS, self.all_jids)) 266 d_list.append(lambda __: self.data.force(KEY_ALL_JIDS, self.all_jids))
265 d = defer.DeferredList(d_list) 267 d = defer.DeferredList(d_list)
266 d.addCallback(self._deleteJID_logResults) 268 d.addCallback(self._deleteJID_logResults)
267 return d 269 return d
268 270
269 def deleteJID(self, callback, bare_jid): 271 def _deleteJID(self, callback, bare_jid):
270 """Retrieve all (in)actives devices of bare_jid, and delete all related keys"""
271 d_list = [] 272 d_list = []
272 273
273 key = '\n'.join([KEY_ACTIVE_DEVICES, bare_jid]) 274 key = '\n'.join([KEY_ACTIVE_DEVICES, bare_jid])
274 d_list.append(self.data.get(key, [])) 275 d_list.append(self.data.get(key, []))
275 276
282 d_list.append(d_inactive) 283 d_list.append(d_inactive)
283 d = defer.DeferredList(d_list) 284 d = defer.DeferredList(d_list)
284 d.addCallback(self._deleteJID_gotDevices, bare_jid) 285 d.addCallback(self._deleteJID_gotDevices, bare_jid)
285 if callback is not None: 286 if callback is not None:
286 self.setCb(d, callback) 287 self.setCb(d, callback)
287 return d 288
289 def deleteJID(self, callback, bare_jid):
290 """Retrieve all (in)actives devices of bare_jid, and delete all related keys"""
291 reactor.callLater(0, self._deleteJID, callback, bare_jid)
288 292
289 293
290 class SatOTPKPolicy(omemo.DefaultOTPKPolicy): 294 class SatOTPKPolicy(omemo.DefaultOTPKPolicy):
291 pass 295 pass
292 296
726 device_id = random.randint(1, 2**31-1) 730 device_id = random.randint(1, 2**31-1)
727 # we check that it's really unique 731 # we check that it's really unique
728 while device_id in devices: 732 while device_id in devices:
729 device_id = random.randint(1, 2**31-1) 733 device_id = random.randint(1, 2**31-1)
730 # and we save it 734 # and we save it
731 persistent_dict[KEY_DEVICE_ID] = device_id 735 await persistent_dict.aset(KEY_DEVICE_ID, device_id)
732 736
733 log.debug(f"our OMEMO device id is {device_id}") 737 log.debug(f"our OMEMO device id is {device_id}")
734 738
735 if device_id not in devices: 739 if device_id not in devices:
736 log.debug(f"our device id ({device_id}) is not in the list, adding it") 740 log.debug(f"our device id ({device_id}) is not in the list, adding it")