Mercurial > libervia-backend
comparison sat/plugins/plugin_xep_0384.py @ 3541:888109774673
core: various changes and fixes to work with new storage and D-Bus bridge:
- fixes coroutines handling in various places
- fixes types which are not serialised by Tx DBus
- XEP-0384: call storage methods in main thread in XEP: Python OMEMO's Promise use thread
which prevent the use of AsyncIO loop. To work around that, callLater is used to launch
storage method in main thread. This is a temporary workaround, as Python OMEMO should
get rid of Promise implementation and threads soon.
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 03 Jun 2021 15:21:43 +0200 |
parents | be6d91572633 |
children | edc79cefe968 |
comparison
equal
deleted
inserted
replaced
3540:aa58451b77ba | 3541:888109774673 |
---|---|
112 | 112 |
113 @param promise_(promise.Promise): promise to convert | 113 @param promise_(promise.Promise): promise to convert |
114 @return (defer.Deferred): deferred instance linked to the promise | 114 @return (defer.Deferred): deferred instance linked to the promise |
115 """ | 115 """ |
116 d = defer.Deferred() | 116 d = defer.Deferred() |
117 promise_.then(d.callback, d.errback) | 117 promise_.then( |
118 lambda result: reactor.callLater(0, d.callback, result), | |
119 lambda exc: reactor.callLater(0, d.errback, exc) | |
120 ) | |
118 return d | 121 return d |
119 | 122 |
120 | 123 |
121 class OmemoStorage(omemo.Storage): | 124 class OmemoStorage(omemo.Storage): |
122 | 125 |
138 This method use partial to call callback with boolean then result when | 141 This method use partial to call callback with boolean then result when |
139 Deferred is called | 142 Deferred is called |
140 """ | 143 """ |
141 deferred.addCallback(partial(callback, True)) | 144 deferred.addCallback(partial(callback, True)) |
142 deferred.addErrback(partial(callback, False)) | 145 deferred.addErrback(partial(callback, False)) |
146 | |
147 def _callMainThread(self, callback, method, *args, check_jid=None): | |
148 d = method(*args) | |
149 if check_jid is not None: | |
150 check_jid_d = self._checkJid(check_jid) | |
151 check_jid_d.addCallback(lambda __: d) | |
152 d = check_jid_d | |
153 if callback is not None: | |
154 d.addCallback(partial(callback, True)) | |
155 d.addErrback(partial(callback, False)) | |
156 | |
157 def _call(self, callback, method, *args, check_jid=None): | |
158 """Create Deferred and add Promise callback to it | |
159 | |
160 This method use reactor.callLater to launch Deferred in main thread | |
161 @param check_jid: run self._checkJid before method | |
162 """ | |
163 reactor.callLater( | |
164 0, self._callMainThread, callback, method, *args, check_jid=check_jid | |
165 ) | |
143 | 166 |
144 def _checkJid(self, bare_jid): | 167 def _checkJid(self, bare_jid): |
145 """Check if jid is known, and store it if not | 168 """Check if jid is known, and store it if not |
146 | 169 |
147 @param bare_jid(unicode): bare jid to check | 170 @param bare_jid(unicode): bare jid to check |
162 if own_bare_jid != self.own_bare_jid_s or own_device_id != self.device_id: | 185 if own_bare_jid != self.own_bare_jid_s or own_device_id != self.device_id: |
163 raise exceptions.InternalError('bare jid or device id inconsistency!') | 186 raise exceptions.InternalError('bare jid or device id inconsistency!') |
164 callback(True, None) | 187 callback(True, None) |
165 | 188 |
166 def loadState(self, callback): | 189 def loadState(self, callback): |
167 d = self.data.get(KEY_STATE) | 190 self._call(callback, self.data.get, KEY_STATE) |
168 self.setCb(d, callback) | |
169 | 191 |
170 def storeState(self, callback, state): | 192 def storeState(self, callback, state): |
171 d = self.data.force(KEY_STATE, state) | 193 self._call(callback, self.data.force, KEY_STATE, state) |
172 self.setCb(d, callback) | |
173 | 194 |
174 def loadSession(self, callback, bare_jid, device_id): | 195 def loadSession(self, callback, bare_jid, device_id): |
175 key = '\n'.join([KEY_SESSION, bare_jid, str(device_id)]) | 196 key = '\n'.join([KEY_SESSION, bare_jid, str(device_id)]) |
176 d = self.data.get(key) | 197 self._call(callback, self.data.get, key) |
177 self.setCb(d, callback) | |
178 | 198 |
179 def storeSession(self, callback, bare_jid, device_id, session): | 199 def storeSession(self, callback, bare_jid, device_id, session): |
180 key = '\n'.join([KEY_SESSION, bare_jid, str(device_id)]) | 200 key = '\n'.join([KEY_SESSION, bare_jid, str(device_id)]) |
181 d = self.data.force(key, session) | 201 self._call(callback, self._data.force, key, session) |
182 self.setCb(d, callback) | |
183 | 202 |
184 def deleteSession(self, callback, bare_jid, device_id): | 203 def deleteSession(self, callback, bare_jid, device_id): |
185 key = '\n'.join([KEY_SESSION, bare_jid, str(device_id)]) | 204 key = '\n'.join([KEY_SESSION, bare_jid, str(device_id)]) |
186 d = self.data.remove(key) | 205 self._call(callback, self.data.remove, key) |
187 self.setCb(d, callback) | |
188 | 206 |
189 def loadActiveDevices(self, callback, bare_jid): | 207 def loadActiveDevices(self, callback, bare_jid): |
190 key = '\n'.join([KEY_ACTIVE_DEVICES, bare_jid]) | 208 key = '\n'.join([KEY_ACTIVE_DEVICES, bare_jid]) |
191 d = self.data.get(key, {}) | 209 self._call(callback, self.data.get, key, {}) |
192 if callback is not None: | |
193 self.setCb(d, callback) | |
194 return d | |
195 | 210 |
196 def loadInactiveDevices(self, callback, bare_jid): | 211 def loadInactiveDevices(self, callback, bare_jid): |
197 key = '\n'.join([KEY_INACTIVE_DEVICES, bare_jid]) | 212 key = '\n'.join([KEY_INACTIVE_DEVICES, bare_jid]) |
198 d = self.data.get(key, {}) | 213 self._call(callback, self.data.get, key, {}) |
199 if callback is not None: | |
200 self.setCb(d, callback) | |
201 return d | |
202 | 214 |
203 def storeActiveDevices(self, callback, bare_jid, devices): | 215 def storeActiveDevices(self, callback, bare_jid, devices): |
204 key = '\n'.join([KEY_ACTIVE_DEVICES, bare_jid]) | 216 key = '\n'.join([KEY_ACTIVE_DEVICES, bare_jid]) |
205 d = self._checkJid(bare_jid) | 217 self._call(callback, self.data.force, key, devices, check_jid=bare_jid) |
206 d.addCallback(lambda _: self.data.force(key, devices)) | |
207 self.setCb(d, callback) | |
208 | 218 |
209 def storeInactiveDevices(self, callback, bare_jid, devices): | 219 def storeInactiveDevices(self, callback, bare_jid, devices): |
210 key = '\n'.join([KEY_INACTIVE_DEVICES, bare_jid]) | 220 key = '\n'.join([KEY_INACTIVE_DEVICES, bare_jid]) |
211 d = self._checkJid(bare_jid) | 221 self._call(callback, self.data.force, key, devices, check_jid=bare_jid) |
212 d.addCallback(lambda _: self.data.force(key, devices)) | |
213 self.setCb(d, callback) | |
214 | 222 |
215 def storeTrust(self, callback, bare_jid, device_id, trust): | 223 def storeTrust(self, callback, bare_jid, device_id, trust): |
216 key = '\n'.join([KEY_TRUST, bare_jid, str(device_id)]) | 224 key = '\n'.join([KEY_TRUST, bare_jid, str(device_id)]) |
217 d = self.data.force(key, trust) | 225 self._call(callback, self.data.force, key, trust) |
218 self.setCb(d, callback) | |
219 | 226 |
220 def loadTrust(self, callback, bare_jid, device_id): | 227 def loadTrust(self, callback, bare_jid, device_id): |
221 key = '\n'.join([KEY_TRUST, bare_jid, str(device_id)]) | 228 key = '\n'.join([KEY_TRUST, bare_jid, str(device_id)]) |
222 d = self.data.get(key) | 229 self._call(callback, self.data.get, key) |
230 | |
231 def listJIDs(self, callback): | |
223 if callback is not None: | 232 if callback is not None: |
224 self.setCb(d, callback) | 233 callback(True, self.all_jids) |
225 return d | |
226 | |
227 def listJIDs(self, callback): | |
228 d = defer.succeed(self.all_jids) | |
229 if callback is not None: | |
230 self.setCb(d, callback) | |
231 return d | |
232 | 234 |
233 def _deleteJID_logResults(self, results): | 235 def _deleteJID_logResults(self, results): |
234 failed = [success for success, __ in results if not success] | 236 failed = [success for success, __ in results if not success] |
235 if failed: | 237 if failed: |
236 log.warning( | 238 log.warning( |
264 d_list.append(lambda __: self.data.force(KEY_ALL_JIDS, self.all_jids)) | 266 d_list.append(lambda __: self.data.force(KEY_ALL_JIDS, self.all_jids)) |
265 d = defer.DeferredList(d_list) | 267 d = defer.DeferredList(d_list) |
266 d.addCallback(self._deleteJID_logResults) | 268 d.addCallback(self._deleteJID_logResults) |
267 return d | 269 return d |
268 | 270 |
269 def deleteJID(self, callback, bare_jid): | 271 def _deleteJID(self, callback, bare_jid): |
270 """Retrieve all (in)actives devices of bare_jid, and delete all related keys""" | |
271 d_list = [] | 272 d_list = [] |
272 | 273 |
273 key = '\n'.join([KEY_ACTIVE_DEVICES, bare_jid]) | 274 key = '\n'.join([KEY_ACTIVE_DEVICES, bare_jid]) |
274 d_list.append(self.data.get(key, [])) | 275 d_list.append(self.data.get(key, [])) |
275 | 276 |
282 d_list.append(d_inactive) | 283 d_list.append(d_inactive) |
283 d = defer.DeferredList(d_list) | 284 d = defer.DeferredList(d_list) |
284 d.addCallback(self._deleteJID_gotDevices, bare_jid) | 285 d.addCallback(self._deleteJID_gotDevices, bare_jid) |
285 if callback is not None: | 286 if callback is not None: |
286 self.setCb(d, callback) | 287 self.setCb(d, callback) |
287 return d | 288 |
289 def deleteJID(self, callback, bare_jid): | |
290 """Retrieve all (in)actives devices of bare_jid, and delete all related keys""" | |
291 reactor.callLater(0, self._deleteJID, callback, bare_jid) | |
288 | 292 |
289 | 293 |
290 class SatOTPKPolicy(omemo.DefaultOTPKPolicy): | 294 class SatOTPKPolicy(omemo.DefaultOTPKPolicy): |
291 pass | 295 pass |
292 | 296 |
726 device_id = random.randint(1, 2**31-1) | 730 device_id = random.randint(1, 2**31-1) |
727 # we check that it's really unique | 731 # we check that it's really unique |
728 while device_id in devices: | 732 while device_id in devices: |
729 device_id = random.randint(1, 2**31-1) | 733 device_id = random.randint(1, 2**31-1) |
730 # and we save it | 734 # and we save it |
731 persistent_dict[KEY_DEVICE_ID] = device_id | 735 await persistent_dict.aset(KEY_DEVICE_ID, device_id) |
732 | 736 |
733 log.debug(f"our OMEMO device id is {device_id}") | 737 log.debug(f"our OMEMO device id is {device_id}") |
734 | 738 |
735 if device_id not in devices: | 739 if device_id not in devices: |
736 log.debug(f"our device id ({device_id}) is not in the list, adding it") | 740 log.debug(f"our device id ({device_id}) is not in the list, adding it") |