# HG changeset patch # User Goffi # Date 1622726503 -7200 # Node ID 8881097746735a8a90b44b04e4761c59d3f0bdfb # Parent aa58451b77ba973b495f22d2dfef57625eff62a5 core: various changes and fixes to work with new storage and D-Bus bridge: - fixes coroutines handling in various places - fixes types which are not serialised by Tx DBus - XEP-0384: call storage methods in main thread in XEP: Python OMEMO's Promise use thread which prevent the use of AsyncIO loop. To work around that, callLater is used to launch storage method in main thread. This is a temporary workaround, as Python OMEMO should get rid of Promise implementation and threads soon. diff -r aa58451b77ba -r 888109774673 sat/core/sat_main.py --- a/sat/core/sat_main.py Thu Jun 03 15:21:43 2021 +0200 +++ b/sat/core/sat_main.py Thu Jun 03 15:21:43 2021 +0200 @@ -456,7 +456,7 @@ except AttributeError: continue else: - defers_list.append(defer.maybeDeferred(unload)) + defers_list.append(utils.asDeferred(unload)) return defers_list def _connect(self, profile_key, password="", options=None): @@ -470,7 +470,7 @@ Retrieve the individual parameters, authenticate the profile and initiate the connection to the associated XMPP server. @param profile: %(doc_profile)s - @param password (string): the SàT profile password + @param password (string): the Libervia profile password @param options (dict): connection options. Key can be: - @param max_retries (int): max number of connection retries @@ -533,7 +533,7 @@ features = [] for import_name, plugin in self.plugins.items(): try: - features_d = defer.maybeDeferred(plugin.getFeatures, profile_key) + features_d = utils.asDeferred(plugin.getFeatures, profile_key) except AttributeError: features_d = defer.succeed({}) features.append(features_d) @@ -581,7 +581,7 @@ attr = client.roster.getAttributes(item) # we use full() and not userhost() because jid with resources are allowed # in roster, even if it's not common. - ret.append([item.entity.full(), attr, item.groups]) + ret.append([item.entity.full(), attr, list(item.groups)]) return ret return client.roster.got_roster.addCallback(got_roster) @@ -1108,6 +1108,7 @@ def _findByFeatures(self, namespaces, identities, bare_jids, service, roster, own_jid, local_device, profile_key): client = self.getClient(profile_key) + identities = [tuple(i) for i in identities] if identities else None return defer.ensureDeferred(self.findByFeatures( client, namespaces, identities, bare_jids, service, roster, own_jid, local_device)) diff -r aa58451b77ba -r 888109774673 sat/core/xmpp.py --- a/sat/core/xmpp.py Thu Jun 03 15:21:43 2021 +0200 +++ b/sat/core/xmpp.py Thu Jun 03 15:21:43 2021 +0200 @@ -714,7 +714,7 @@ or mess_data["type"] == C.MESS_TYPE_INFO ) - def messageAddToHistory(self, data): + async def messageAddToHistory(self, data): """Store message into database (for local history) @param data: message data dictionnary @@ -726,7 +726,7 @@ # we need a message to store if self.isMessagePrintable(data): - self.host_app.memory.addToHistory(self, data) + await self.host_app.memory.addToHistory(self, data) else: log.warning( "No message found" @@ -876,7 +876,9 @@ def addPostXmlCallbacks(self, post_xml_treatments): post_xml_treatments.addCallback(self.messageProt.completeAttachments) - post_xml_treatments.addCallback(self.messageAddToHistory) + post_xml_treatments.addCallback( + lambda ret: defer.ensureDeferred(self.messageAddToHistory(ret)) + ) post_xml_treatments.addCallback(self.messageSendToBridge) def send(self, obj): @@ -1061,7 +1063,9 @@ def addPostXmlCallbacks(self, post_xml_treatments): if self.sendHistory: - post_xml_treatments.addCallback(self.messageAddToHistory) + post_xml_treatments.addCallback( + lambda ret: defer.ensureDeferred(self.messageAddToHistory(ret)) + ) def getOwnerFromJid(self, to_jid: jid.JID) -> jid.JID: """Retrieve "owner" of a component resource from the destination jid of the request @@ -1212,7 +1216,9 @@ data = self.parseMessage(message_elt) post_treat.addCallback(self.completeAttachments) post_treat.addCallback(self.skipEmptyMessage) - post_treat.addCallback(self.addToHistory) + post_treat.addCallback( + lambda ret: defer.ensureDeferred(self.addToHistory(ret)) + ) post_treat.addCallback(self.bridgeSignal, data) post_treat.addErrback(self.cancelErrorTrap) post_treat.callback(data) @@ -1253,14 +1259,14 @@ raise failure.Failure(exceptions.CancelError("Cancelled empty message")) return data - def addToHistory(self, data): + async def addToHistory(self, data): if data.pop("history", None) == C.HISTORY_SKIP: log.debug("history is skipped as requested") data["extra"]["history"] = C.HISTORY_SKIP else: # we need a message to store if self.parent.isMessagePrintable(data): - return self.host.memory.addToHistory(self.parent, data) + return await self.host.memory.addToHistory(self.parent, data) else: log.debug("not storing empty message to history: {data}" .format(data=data)) @@ -1478,7 +1484,8 @@ self._jids[entity] = item self._registerItem(item) self.host.bridge.newContact( - entity.full(), self.getAttributes(item), item.groups, self.parent.profile + entity.full(), self.getAttributes(item), list(item.groups), + self.parent.profile ) def removeReceived(self, request): diff -r aa58451b77ba -r 888109774673 sat/memory/memory.py --- a/sat/memory/memory.py Thu Jun 03 15:21:43 2021 +0200 +++ b/sat/memory/memory.py Thu Jun 03 15:21:43 2021 +0200 @@ -1131,12 +1131,12 @@ ) def asyncGetStringParamA( - self, name, category, attr="value", security_limit=C.NO_SECURITY_LIMIT, + self, name, category, attribute="value", security_limit=C.NO_SECURITY_LIMIT, profile_key=C.PROF_KEY_NONE): profile = self.getProfileName(profile_key) return defer.ensureDeferred(self.params.asyncGetStringParamA( - name, category, attr, security_limit, profile + name, category, attribute, security_limit, profile )) def _getParamsUI(self, security_limit, app, extra_s, profile_key): @@ -1170,20 +1170,22 @@ client = self.host.getClient(profile_key) # we accept any type data = data_format.deserialise(data_s, type_check=None) - return self.storage.setPrivateValue( - namespace, key, data, binary=True, profile=client.profile) + return defer.ensureDeferred(self.storage.setPrivateValue( + namespace, key, data, binary=True, profile=client.profile)) def _privateDataGet(self, namespace, key, profile_key): client = self.host.getClient(profile_key) - d = self.storage.getPrivates( - namespace, [key], binary=True, profile=client.profile) + d = defer.ensureDeferred( + self.storage.getPrivates( + namespace, [key], binary=True, profile=client.profile) + ) d.addCallback(lambda data_dict: data_format.serialise(data_dict.get(key))) return d def _privateDataDelete(self, namespace, key, profile_key): client = self.host.getClient(profile_key) - return self.storage.delPrivateValue( - namespace, key, binary=True, profile=client.profile) + return defer.ensureDeferred(self.storage.delPrivateValue( + namespace, key, binary=True, profile=client.profile)) ## Files ## @@ -1251,8 +1253,7 @@ _("unknown access type: {type}").format(type=perm_type) ) - @defer.inlineCallbacks - def checkPermissionToRoot(self, client, file_data, peer_jid, perms_to_check): + async def checkPermissionToRoot(self, client, file_data, peer_jid, perms_to_check): """do checkFilePermission on file_data and all its parents until root""" current = file_data while True: @@ -1260,7 +1261,7 @@ parent = current["parent"] if not parent: break - files_data = yield self.getFiles( + files_data = await self.getFiles( client, peer_jid=None, file_id=parent, perms_to_check=None ) try: @@ -1268,8 +1269,7 @@ except IndexError: raise exceptions.DataError("Missing parent") - @defer.inlineCallbacks - def _getParentDir( + async def _getParentDir( self, client, path, parent, namespace, owner, peer_jid, perms_to_check ): """Retrieve parent node from a path, or last existing directory @@ -1293,7 +1293,7 @@ # non existing directories will be created parent = "" for idx, path_elt in enumerate(path_elts): - directories = yield self.storage.getFiles( + directories = await self.storage.getFiles( client, parent=parent, type_=C.FILE_TYPE_DIRECTORY, @@ -1302,7 +1302,7 @@ owner=owner, ) if not directories: - defer.returnValue((parent, path_elts[idx:])) + return (parent, path_elts[idx:]) # from this point, directories don't exist anymore, we have to create them elif len(directories) > 1: raise exceptions.InternalError( @@ -1312,7 +1312,7 @@ directory = directories[0] self.checkFilePermission(directory, peer_jid, perms_to_check) parent = directory["id"] - defer.returnValue((parent, [])) + return (parent, []) def getFileAffiliations(self, file_data: dict) -> Dict[jid.JID, str]: """Convert file access to pubsub like affiliations""" @@ -1484,8 +1484,7 @@ ) return peer_jid.userhostJID() - @defer.inlineCallbacks - def getFiles( + async def getFiles( self, client, peer_jid, file_id=None, version=None, parent=None, path=None, type_=None, file_hash=None, hash_algo=None, name=None, namespace=None, mime_type=None, public_id=None, owner=None, access=None, projection=None, @@ -1536,7 +1535,7 @@ if path is not None: path = str(path) # permission are checked by _getParentDir - parent, remaining_path_elts = yield self._getParentDir( + parent, remaining_path_elts = await self._getParentDir( client, path, parent, namespace, owner, peer_jid, perms_to_check ) if remaining_path_elts: @@ -1546,16 +1545,16 @@ if parent and peer_jid: # if parent is given directly and permission check is requested, # we need to check all the parents - parent_data = yield self.storage.getFiles(client, file_id=parent) + parent_data = await self.storage.getFiles(client, file_id=parent) try: parent_data = parent_data[0] except IndexError: raise exceptions.DataError("mising parent") - yield self.checkPermissionToRoot( + await self.checkPermissionToRoot( client, parent_data, peer_jid, perms_to_check ) - files = yield self.storage.getFiles( + files = await self.storage.getFiles( client, file_id=file_id, version=version, @@ -1578,15 +1577,16 @@ to_remove = [] for file_data in files: try: - self.checkFilePermission(file_data, peer_jid, perms_to_check, set_affiliation=True) + self.checkFilePermission( + file_data, peer_jid, perms_to_check, set_affiliation=True + ) except exceptions.PermissionError: to_remove.append(file_data) for file_data in to_remove: files.remove(file_data) - defer.returnValue(files) + return files - @defer.inlineCallbacks - def setFile( + async def setFile( self, client, name, file_id=None, version="", parent=None, path=None, type_=C.FILE_TYPE_FILE, file_hash=None, hash_algo=None, size=None, namespace=None, mime_type=None, public_id=None, created=None, modified=None, @@ -1668,13 +1668,13 @@ if path is not None: path = str(path) # _getParentDir will check permissions if peer_jid is set, so we use owner - parent, remaining_path_elts = yield self._getParentDir( + parent, remaining_path_elts = await self._getParentDir( client, path, parent, namespace, owner, owner, perms_to_check ) # if remaining directories don't exist, we have to create them for new_dir in remaining_path_elts: new_dir_id = shortuuid.uuid() - yield self.storage.setFile( + await self.storage.setFile( client, name=new_dir, file_id=new_dir_id, @@ -1691,7 +1691,7 @@ elif parent is None: parent = "" - yield self.storage.setFile( + await self.storage.setFile( client, file_id=file_id, version=version, diff -r aa58451b77ba -r 888109774673 sat/memory/persistent.py --- a/sat/memory/persistent.py Thu Jun 03 15:21:43 2021 +0200 +++ b/sat/memory/persistent.py Thu Jun 03 15:21:43 2021 +0200 @@ -17,10 +17,13 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +from twisted.internet import defer +from twisted.python import failure from sat.core.i18n import _ from sat.core.log import getLogger + + log = getLogger(__name__) -from twisted.python import failure class MemoryNotInitializedError(Exception): @@ -57,7 +60,9 @@ need to be called before any other operation @return: defers the PersistentDict instance itself """ - d = self.storage.getPrivates(self.namespace, binary=self.binary, profile=self.profile) + d = defer.ensureDeferred(self.storage.getPrivates( + self.namespace, binary=self.binary, profile=self.profile + )) d.addCallback(self._setCache) d.addCallback(lambda __: self) return d @@ -111,8 +116,11 @@ return self._cache.__getitem__(key) def __setitem__(self, key, value): - self.storage.setPrivateValue(self.namespace, key, value, self.binary, - self.profile) + defer.ensureDeferred( + self.storage.setPrivateValue( + self.namespace, key, value, self.binary, self.profile + ) + ) return self._cache.__setitem__(key, value) def __delitem__(self, key): @@ -130,8 +138,11 @@ def aset(self, key, value): """Async set, return a Deferred fired when value is actually stored""" self._cache.__setitem__(key, value) - return self.storage.setPrivateValue(self.namespace, key, value, - self.binary, self.profile) + return defer.ensureDeferred( + self.storage.setPrivateValue( + self.namespace, key, value, self.binary, self.profile + ) + ) def adel(self, key): """Async del, return a Deferred fired when value is actually deleted""" @@ -151,8 +162,11 @@ @return: deferred fired when data is actually saved """ - return self.storage.setPrivateValue(self.namespace, name, self._cache[name], - self.binary, self.profile) + return defer.ensureDeferred( + self.storage.setPrivateValue( + self.namespace, name, self._cache[name], self.binary, self.profile + ) + ) class PersistentBinaryDict(PersistentDict): @@ -178,12 +192,16 @@ raise NotImplementedError def items(self): - d = self.storage.getPrivates(self.namespace, binary=self.binary, profile=self.profile) + d = defer.ensureDeferred(self.storage.getPrivates( + self.namespace, binary=self.binary, profile=self.profile + )) d.addCallback(lambda data_dict: data_dict.items()) return d def all(self): - return self.storage.getPrivates(self.namespace, binary=self.binary, profile=self.profile) + return defer.ensureDeferred(self.storage.getPrivates( + self.namespace, binary=self.binary, profile=self.profile + )) def __repr__(self): raise NotImplementedError @@ -234,14 +252,18 @@ def __getitem__(self, key): """get the value as a Deferred""" - d = self.storage.getPrivates(self.namespace, keys=[key], binary=self.binary, - profile=self.profile) + d = defer.ensureDeferred(self.storage.getPrivates( + self.namespace, keys=[key], binary=self.binary, profile=self.profile + )) d.addCallback(self._data2value, key) return d def __setitem__(self, key, value): - self.storage.setPrivateValue(self.namespace, key, value, self.binary, - self.profile) + defer.ensureDeferred( + self.storage.setPrivateValue( + self.namespace, key, value, self.binary, self.profile + ) + ) def __delitem__(self, key): self.storage.delPrivateValue(self.namespace, key, self.binary, self.profile) @@ -259,8 +281,11 @@ """Async set, return a Deferred fired when value is actually stored""" # FIXME: redundant with force, force must be removed # XXX: similar as PersistentDict.aset, but doesn't use cache - return self.storage.setPrivateValue(self.namespace, key, value, - self.binary, self.profile) + return defer.ensureDeferred( + self.storage.setPrivateValue( + self.namespace, key, value, self.binary, self.profile + ) + ) def adel(self, key): """Async del, return a Deferred fired when value is actually deleted""" @@ -277,7 +302,11 @@ @param value(object): value is needed for LazyPersistentBinaryDict @return: deferred fired when data is actually saved """ - return self.storage.setPrivateValue(self.namespace, name, value, self.binary, self.profile) + return defer.ensureDeferred( + self.storage.setPrivateValue( + self.namespace, name, value, self.binary, self.profile + ) + ) def remove(self, key): """Delete a key from sotrage, and return a deferred called when it's done diff -r aa58451b77ba -r 888109774673 sat/plugins/plugin_comp_file_sharing.py --- a/sat/plugins/plugin_comp_file_sharing.py Thu Jun 03 15:21:43 2021 +0200 +++ b/sat/plugins/plugin_comp_file_sharing.py Thu Jun 03 15:21:43 2021 +0200 @@ -31,6 +31,7 @@ from sat.core.log import getLogger from sat.tools import stream from sat.tools import video +from sat.tools.utils import ensure_deferred from sat.tools.common import regex from sat.tools.common import uri from sat.tools.common import files_utils @@ -487,7 +488,7 @@ else: await self.generate_thumbnails(extra, thumb_path) - self.host.memory.setFile( + await self.host.memory.setFile( client, name=name, version="", @@ -546,8 +547,7 @@ ) return False, defer.succeed(True) - @defer.inlineCallbacks - def _retrieveFiles( + async def _retrieveFiles( self, client, session, content_data, content_name, file_data, file_elt ): """This method retrieve a file on request, and send if after checking permissions""" @@ -557,7 +557,7 @@ else: owner = peer_jid try: - found_files = yield self.host.memory.getFiles( + found_files = await self.host.memory.getFiles( client, peer_jid=peer_jid, name=file_data.get("name"), @@ -575,13 +575,13 @@ peer_jid=peer_jid, name=file_data.get("name") ) ) - defer.returnValue(False) + return False if not found_files: log.warning( _("no matching file found ({file_data})").format(file_data=file_data) ) - defer.returnValue(False) + return False # we only use the first found file found_file = found_files[0] @@ -607,7 +607,7 @@ size=size, data_cb=lambda data: hasher.update(data), ) - defer.returnValue(True) + return True def _fileSendingRequestTrigger( self, client, session, content_data, content_name, file_data, file_elt @@ -617,9 +617,9 @@ else: return ( False, - self._retrieveFiles( + defer.ensureDeferred(self._retrieveFiles( client, session, content_data, content_name, file_data, file_elt - ), + )), ) ## HTTP Upload ## @@ -757,11 +757,10 @@ raise error.StanzaError("item-not-found") return file_id - @defer.inlineCallbacks - def getFileData(self, requestor, nodeIdentifier): + async def getFileData(self, requestor, nodeIdentifier): file_id = self._getFileId(nodeIdentifier) try: - files = yield self.host.memory.getFiles(self.parent, requestor, file_id) + files = await self.host.memory.getFiles(self.parent, requestor, file_id) except (exceptions.NotFound, exceptions.PermissionError): # we don't differenciate between NotFound and PermissionError # to avoid leaking information on existing files @@ -770,7 +769,7 @@ raise error.StanzaError("item-not-found") if len(files) > 1: raise error.InternalError("there should be only one file") - defer.returnValue(files[0]) + return files[0] def commentsUpdate(self, extra, new_comments, peer_jid): """update comments (replace or insert new_comments) @@ -825,10 +824,10 @@ iq_elt = iq_elt.parent return iq_elt["from"] - @defer.inlineCallbacks - def publish(self, requestor, service, nodeIdentifier, items): + @ensure_deferred + async def publish(self, requestor, service, nodeIdentifier, items): #  we retrieve file a first time to check authorisations - file_data = yield self.getFileData(requestor, nodeIdentifier) + file_data = await self.getFileData(requestor, nodeIdentifier) file_id = file_data["id"] comments = [(item["id"], self._getFrom(item), item.toXml()) for item in items] if requestor.userhostJID() == file_data["owner"]: @@ -837,24 +836,24 @@ peer_jid = requestor.userhost() update_cb = partial(self.commentsUpdate, new_comments=comments, peer_jid=peer_jid) try: - yield self.host.memory.fileUpdate(file_id, "extra", update_cb) + await self.host.memory.fileUpdate(file_id, "extra", update_cb) except exceptions.PermissionError: raise error.StanzaError("not-authorized") - @defer.inlineCallbacks - def items(self, requestor, service, nodeIdentifier, maxItems, itemIdentifiers): - file_data = yield self.getFileData(requestor, nodeIdentifier) + @ensure_deferred + async def items(self, requestor, service, nodeIdentifier, maxItems, itemIdentifiers): + file_data = await self.getFileData(requestor, nodeIdentifier) comments = file_data["extra"].get("comments", []) if itemIdentifiers: defer.returnValue( [generic.parseXml(c[2]) for c in comments if c[0] in itemIdentifiers] ) else: - defer.returnValue([generic.parseXml(c[2]) for c in comments]) + return [generic.parseXml(c[2]) for c in comments] - @defer.inlineCallbacks - def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): - file_data = yield self.getFileData(requestor, nodeIdentifier) + @ensure_deferred + async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): + file_data = await self.getFileData(requestor, nodeIdentifier) file_id = file_data["id"] try: comments = file_data["extra"]["comments"] @@ -879,4 +878,4 @@ raise error.StanzaError("not-authorized") remove_cb = partial(self.commentsDelete, comments=to_remove) - yield self.host.memory.fileUpdate(file_id, "extra", remove_cb) + await self.host.memory.fileUpdate(file_id, "extra", remove_cb) diff -r aa58451b77ba -r 888109774673 sat/plugins/plugin_comp_file_sharing_management.py --- a/sat/plugins/plugin_comp_file_sharing_management.py Thu Jun 03 15:21:43 2021 +0200 +++ b/sat/plugins/plugin_comp_file_sharing_management.py Thu Jun 03 15:21:43 2021 +0200 @@ -149,8 +149,7 @@ payload = form.toElement() return payload, status, None, None - @defer.inlineCallbacks - def _getFileData(self, client, session_data, command_form): + async def _getFileData(self, client, session_data, command_form): """Retrieve field requested in root form "found_file" will also be set in session_data @@ -177,7 +176,7 @@ # this must be managed try: - found_files = yield self.host.memory.getFiles( + found_files = await self.host.memory.getFiles( client, requestor_bare, path=parent_path, name=basename, namespace=namespace) found_file = found_files[0] @@ -193,7 +192,7 @@ session_data['found_file'] = found_file session_data['namespace'] = namespace - defer.returnValue(found_file) + return found_file def _updateReadPermission(self, access, allowed_jids): if not allowed_jids: @@ -209,29 +208,27 @@ "jids": [j.full() for j in allowed_jids] } - @defer.inlineCallbacks - def _updateDir(self, client, requestor, namespace, file_data, allowed_jids): + async def _updateDir(self, client, requestor, namespace, file_data, allowed_jids): """Recursively update permission of a directory and all subdirectories @param file_data(dict): metadata of the file @param allowed_jids(list[jid.JID]): list of entities allowed to read the file """ assert file_data['type'] == C.FILE_TYPE_DIRECTORY - files_data = yield self.host.memory.getFiles( + files_data = await self.host.memory.getFiles( client, requestor, parent=file_data['id'], namespace=namespace) for file_data in files_data: if not file_data['access'].get(C.ACCESS_PERM_READ, {}): log.debug("setting {perm} read permission for {name}".format( perm=allowed_jids, name=file_data['name'])) - yield self.host.memory.fileUpdate( + await self.host.memory.fileUpdate( file_data['id'], 'access', partial(self._updateReadPermission, allowed_jids=allowed_jids)) if file_data['type'] == C.FILE_TYPE_DIRECTORY: - yield self._updateDir(client, requestor, namespace, file_data, 'PUBLIC') + await self._updateDir(client, requestor, namespace, file_data, 'PUBLIC') - @defer.inlineCallbacks - def _onChangeFile(self, client, command_elt, session_data, action, node): + async def _onChangeFile(self, client, command_elt, session_data, action, node): try: x_elt = next(command_elt.elements(data_form.NS_X_DATA, "x")) command_form = data_form.Form.fromElement(x_elt) @@ -244,14 +241,14 @@ if command_form is None or len(command_form.fields) == 0: # root request - defer.returnValue(self._getRootArgs()) + return self._getRootArgs() elif found_file is None: # file selected, we retrieve it and ask for permissions try: - found_file = yield self._getFileData(client, session_data, command_form) + found_file = await self._getFileData(client, session_data, command_form) except WorkflowError as e: - defer.returnValue(e.err_args) + return e.err_args # management request if found_file['type'] == C.FILE_TYPE_DIRECTORY: @@ -284,7 +281,7 @@ status = self._c.STATUS.EXECUTING payload = form.toElement() - defer.returnValue((payload, status, None, None)) + return (payload, status, None, None) else: # final phase, we'll do permission change here @@ -307,7 +304,7 @@ self._c.adHocError(self._c.ERROR.BAD_PAYLOAD) if found_file['type'] == C.FILE_TYPE_FILE: - yield self.host.memory.fileUpdate( + await self.host.memory.fileUpdate( found_file['id'], 'access', partial(self._updateReadPermission, allowed_jids=allowed_jids)) else: @@ -315,7 +312,7 @@ recursive = command_form.fields['recursive'] except KeyError: self._c.adHocError(self._c.ERROR.BAD_PAYLOAD) - yield self.host.memory.fileUpdate( + await self.host.memory.fileUpdate( found_file['id'], 'access', partial(self._updateReadPermission, allowed_jids=allowed_jids)) if recursive: @@ -323,17 +320,16 @@ # already a permission set), so allowed entities of root directory # can read them. namespace = session_data['namespace'] - yield self._updateDir( + await self._updateDir( client, requestor_bare, namespace, found_file, 'PUBLIC') # job done, we can end the session status = self._c.STATUS.COMPLETED payload = None note = (self._c.NOTE.INFO, _("management session done")) - defer.returnValue((payload, status, None, note)) + return (payload, status, None, note) - @defer.inlineCallbacks - def _onDeleteFile(self, client, command_elt, session_data, action, node): + async def _onDeleteFile(self, client, command_elt, session_data, action, node): try: x_elt = next(command_elt.elements(data_form.NS_X_DATA, "x")) command_form = data_form.Form.fromElement(x_elt) @@ -346,14 +342,14 @@ if command_form is None or len(command_form.fields) == 0: # root request - defer.returnValue(self._getRootArgs()) + return self._getRootArgs() elif found_file is None: # file selected, we need confirmation before actually deleting try: - found_file = yield self._getFileData(client, session_data, command_form) + found_file = await self._getFileData(client, session_data, command_form) except WorkflowError as e: - defer.returnValue(e.err_args) + return e.err_args if found_file['type'] == C.FILE_TYPE_DIRECTORY: msg = D_("Are you sure to delete directory {name} and all files and " "directories under it?").format(name=found_file['name']) @@ -370,7 +366,7 @@ form.addField(field) status = self._c.STATUS.EXECUTING payload = form.toElement() - defer.returnValue((payload, status, None, None)) + return (payload, status, None, None) else: # final phase, we'll do deletion here @@ -382,27 +378,26 @@ note = None else: recursive = found_file['type'] == C.FILE_TYPE_DIRECTORY - yield self.host.memory.fileDelete( + await self.host.memory.fileDelete( client, requestor_bare, found_file['id'], recursive) note = (self._c.NOTE.INFO, _("file deleted")) status = self._c.STATUS.COMPLETED payload = None - defer.returnValue((payload, status, None, note)) + return (payload, status, None, note) def _updateThumbs(self, extra, thumbnails): extra[C.KEY_THUMBNAILS] = thumbnails - @defer.inlineCallbacks - def _genThumbs(self, client, requestor, namespace, file_data): + async def _genThumbs(self, client, requestor, namespace, file_data): """Recursively generate thumbnails @param file_data(dict): metadata of the file """ if file_data['type'] == C.FILE_TYPE_DIRECTORY: - sub_files_data = yield self.host.memory.getFiles( + sub_files_data = await self.host.memory.getFiles( client, requestor, parent=file_data['id'], namespace=namespace) for sub_file_data in sub_files_data: - yield self._genThumbs(client, requestor, namespace, sub_file_data) + await self._genThumbs(client, requestor, namespace, sub_file_data) elif file_data['type'] == C.FILE_TYPE_FILE: media_type = file_data['media_type'] @@ -412,7 +407,7 @@ for max_thumb_size in self._t.SIZES: try: - thumb_size, thumb_id = yield self._t.generateThumbnail( + thumb_size, thumb_id = await self._t.generateThumbnail( file_path, max_thumb_size, #  we keep thumbnails for 6 months @@ -424,7 +419,7 @@ break thumbnails.append({"id": thumb_id, "size": thumb_size}) - yield self.host.memory.fileUpdate( + await self.host.memory.fileUpdate( file_data['id'], 'extra', partial(self._updateThumbs, thumbnails=thumbnails)) @@ -434,8 +429,7 @@ else: log.warning("unmanaged file type: {type_}".format(type_=file_data['type'])) - @defer.inlineCallbacks - def _onGenThumbnails(self, client, command_elt, session_data, action, node): + async def _onGenThumbnails(self, client, command_elt, session_data, action, node): try: x_elt = next(command_elt.elements(data_form.NS_X_DATA, "x")) command_form = data_form.Form.fromElement(x_elt) @@ -447,23 +441,23 @@ if command_form is None or len(command_form.fields) == 0: # root request - defer.returnValue(self._getRootArgs()) + return self._getRootArgs() elif found_file is None: # file selected, we retrieve it and ask for permissions try: - found_file = yield self._getFileData(client, session_data, command_form) + found_file = await self._getFileData(client, session_data, command_form) except WorkflowError as e: - defer.returnValue(e.err_args) + return e.err_args log.info("Generating thumbnails as requested") - yield self._genThumbs(client, requestor, found_file['namespace'], found_file) + await self._genThumbs(client, requestor, found_file['namespace'], found_file) # job done, we can end the session status = self._c.STATUS.COMPLETED payload = None note = (self._c.NOTE.INFO, _("thumbnails generated")) - defer.returnValue((payload, status, None, note)) + return (payload, status, None, note) async def _onQuota(self, client, command_elt, session_data, action, node): requestor = session_data['requestor'] diff -r aa58451b77ba -r 888109774673 sat/plugins/plugin_misc_identity.py --- a/sat/plugins/plugin_misc_identity.py Thu Jun 03 15:21:43 2021 +0200 +++ b/sat/plugins/plugin_misc_identity.py Thu Jun 03 15:21:43 2021 +0200 @@ -142,9 +142,6 @@ stored_data = await client._identity_storage.all() - self.host.memory.storage.getPrivates( - namespace="identity", binary=True, profile=client.profile) - to_delete = [] for key, value in stored_data.items(): diff -r aa58451b77ba -r 888109774673 sat/plugins/plugin_xep_0033.py --- a/sat/plugins/plugin_xep_0033.py Thu Jun 03 15:21:43 2021 +0200 +++ b/sat/plugins/plugin_xep_0033.py Thu Jun 03 15:21:43 2021 +0200 @@ -156,7 +156,9 @@ d = defer.Deferred() if not skip_send: d.addCallback(client.sendMessageData) - d.addCallback(client.messageAddToHistory) + d.addCallback( + lambda ret: defer.ensureDeferred(client.messageAddToHistory(ret)) + ) d.addCallback(client.messageSendToBridge) d.addErrback(lambda failure: failure.trap(exceptions.CancelError)) return d.callback(mess_data) diff -r aa58451b77ba -r 888109774673 sat/plugins/plugin_xep_0045.py --- a/sat/plugins/plugin_xep_0045.py Thu Jun 03 15:21:43 2021 +0200 +++ b/sat/plugins/plugin_xep_0045.py Thu Jun 03 15:21:43 2021 +0200 @@ -837,7 +837,8 @@ elif client.muc_service is not None: service = client.muc_service else: - msg = D_("No known default MUC service".format(unparsed)) + msg = D_("No known default MUC service {unparsed}").format( + unparsed=unparsed) self.text_cmds.feedBack(client, msg, mess_data) return False except jid.InvalidFormat: @@ -1298,7 +1299,9 @@ except AttributeError: mess_data = self.client.messageProt.parseMessage(message.element) if mess_data['message'] or mess_data['subject']: - return self.host.memory.addToHistory(self.client, mess_data) + return defer.ensureDeferred( + self.host.memory.addToHistory(self.client, mess_data) + ) else: return defer.succeed(None) diff -r aa58451b77ba -r 888109774673 sat/plugins/plugin_xep_0198.py --- a/sat/plugins/plugin_xep_0198.py Thu Jun 03 15:21:43 2021 +0200 +++ b/sat/plugins/plugin_xep_0198.py Thu Jun 03 15:21:43 2021 +0200 @@ -448,7 +448,7 @@ d.addCallback(lambda __: client.roster.got_roster) if plg_0313 is not None: # we retrieve one2one MAM archives - d.addCallback(lambda __: plg_0313.resume(client)) + d.addCallback(lambda __: defer.ensureDeferred(plg_0313.resume(client))) # initial presence must be sent manually d.addCallback(lambda __: client.presence.available()) if plg_0045 is not None: diff -r aa58451b77ba -r 888109774673 sat/plugins/plugin_xep_0313.py --- a/sat/plugins/plugin_xep_0313.py Thu Jun 03 15:21:43 2021 +0200 +++ b/sat/plugins/plugin_xep_0313.py Thu Jun 03 15:21:43 2021 +0200 @@ -75,15 +75,14 @@ out_sign='(a(sdssa{ss}a{ss}ss)ss)', method=self._getArchives, async_=True) - @defer.inlineCallbacks - def resume(self, client): + async def resume(self, client): """Retrieve one2one messages received since the last we have in local storage""" - stanza_id_data = yield self.host.memory.storage.getPrivates( + stanza_id_data = await self.host.memory.storage.getPrivates( mam.NS_MAM, [KEY_LAST_STANZA_ID], profile=client.profile) stanza_id = stanza_id_data.get(KEY_LAST_STANZA_ID) if stanza_id is None: log.info("can't retrieve last stanza ID, checking history") - last_mess = yield self.host.memory.historyGet( + last_mess = await self.host.memory.historyGet( None, None, limit=1, filters={'not_types': C.MESS_TYPE_GROUPCHAT, 'last_stanza_id': True}, profile=client.profile) @@ -98,7 +97,7 @@ complete = False count = 0 while not complete: - mam_data = yield self.getArchives(client, mam_req, + mam_data = await self.getArchives(client, mam_req, service=client.jid.userhostJID()) elt_list, rsm_response, mam_response = mam_data complete = mam_response["complete"] @@ -141,7 +140,7 @@ # adding message to history mess_data = client.messageProt.parseMessage(fwd_message_elt) try: - yield client.messageProt.addToHistory(mess_data) + await client.messageProt.addToHistory(mess_data) except exceptions.CancelError as e: log.warning( "message has not been added to history: {e}".format(e=e)) @@ -156,8 +155,8 @@ log.info(_("We have received {num_mess} message(s) while offline.") .format(num_mess=count)) - def profileConnected(self, client): - return self.resume(client) + async def profileConnected(self, client): + await self.resume(client) def getHandler(self, client): mam_client = client._mam = SatMAMClient(self) diff -r aa58451b77ba -r 888109774673 sat/plugins/plugin_xep_0329.py --- a/sat/plugins/plugin_xep_0329.py Thu Jun 03 15:21:43 2021 +0200 +++ b/sat/plugins/plugin_xep_0329.py Thu Jun 03 15:21:43 2021 +0200 @@ -19,6 +19,7 @@ import mimetypes import json import os +import traceback from pathlib import Path from typing import Optional, Dict from zope.interface import implementer @@ -33,6 +34,7 @@ from sat.core.constants import Const as C from sat.core.log import getLogger from sat.tools import stream +from sat.tools import utils from sat.tools.common import regex @@ -453,9 +455,9 @@ iq_elt.handled = True node = iq_elt.query.getAttribute("node") if not node: - d = defer.maybeDeferred(root_nodes_cb, client, iq_elt) + d = utils.asDeferred(root_nodes_cb, client, iq_elt) else: - d = defer.maybeDeferred(files_from_node_cb, client, iq_elt, node) + d = utils.asDeferred(files_from_node_cb, client, iq_elt, node) d.addErrback( lambda failure_: log.error( _("error while retrieving files: {msg}").format(msg=failure_) @@ -589,10 +591,9 @@ @return (tuple[jid.JID, jid.JID]): peer_jid and owner """ - @defer.inlineCallbacks - def _compGetRootNodesCb(self, client, iq_elt): + async def _compGetRootNodesCb(self, client, iq_elt): peer_jid, owner = client.getOwnerAndPeer(iq_elt) - files_data = yield self.host.memory.getFiles( + files_data = await self.host.memory.getFiles( client, peer_jid=peer_jid, parent="", @@ -607,8 +608,7 @@ directory_elt["name"] = name client.send(iq_result_elt) - @defer.inlineCallbacks - def _compGetFilesFromNodeCb(self, client, iq_elt, node_path): + async def _compGetFilesFromNodeCb(self, client, iq_elt, node_path): """Retrieve files from local files repository according to permissions result stanza is then built and sent to requestor @@ -618,7 +618,7 @@ """ peer_jid, owner = client.getOwnerAndPeer(iq_elt) try: - files_data = yield self.host.memory.getFiles( + files_data = await self.host.memory.getFiles( client, peer_jid=peer_jid, path=node_path, owner=owner ) except exceptions.NotFound: @@ -628,7 +628,8 @@ self._iqError(client, iq_elt, condition='not-allowed') return except Exception as e: - log.error("internal server error: {e}".format(e=e)) + tb = traceback.format_tb(e.__traceback__) + log.error(f"internal server error: {e}\n{''.join(tb)}") self._iqError(client, iq_elt, condition='internal-server-error') return iq_result_elt = xmlstream.toResponse(iq_elt, "result") diff -r aa58451b77ba -r 888109774673 sat/plugins/plugin_xep_0384.py --- a/sat/plugins/plugin_xep_0384.py Thu Jun 03 15:21:43 2021 +0200 +++ b/sat/plugins/plugin_xep_0384.py Thu Jun 03 15:21:43 2021 +0200 @@ -114,7 +114,10 @@ @return (defer.Deferred): deferred instance linked to the promise """ d = defer.Deferred() - promise_.then(d.callback, d.errback) + promise_.then( + lambda result: reactor.callLater(0, d.callback, result), + lambda exc: reactor.callLater(0, d.errback, exc) + ) return d @@ -141,6 +144,26 @@ deferred.addCallback(partial(callback, True)) deferred.addErrback(partial(callback, False)) + def _callMainThread(self, callback, method, *args, check_jid=None): + d = method(*args) + if check_jid is not None: + check_jid_d = self._checkJid(check_jid) + check_jid_d.addCallback(lambda __: d) + d = check_jid_d + if callback is not None: + d.addCallback(partial(callback, True)) + d.addErrback(partial(callback, False)) + + def _call(self, callback, method, *args, check_jid=None): + """Create Deferred and add Promise callback to it + + This method use reactor.callLater to launch Deferred in main thread + @param check_jid: run self._checkJid before method + """ + reactor.callLater( + 0, self._callMainThread, callback, method, *args, check_jid=check_jid + ) + def _checkJid(self, bare_jid): """Check if jid is known, and store it if not @@ -164,71 +187,50 @@ callback(True, None) def loadState(self, callback): - d = self.data.get(KEY_STATE) - self.setCb(d, callback) + self._call(callback, self.data.get, KEY_STATE) def storeState(self, callback, state): - d = self.data.force(KEY_STATE, state) - self.setCb(d, callback) + self._call(callback, self.data.force, KEY_STATE, state) def loadSession(self, callback, bare_jid, device_id): key = '\n'.join([KEY_SESSION, bare_jid, str(device_id)]) - d = self.data.get(key) - self.setCb(d, callback) + self._call(callback, self.data.get, key) def storeSession(self, callback, bare_jid, device_id, session): key = '\n'.join([KEY_SESSION, bare_jid, str(device_id)]) - d = self.data.force(key, session) - self.setCb(d, callback) + self._call(callback, self._data.force, key, session) def deleteSession(self, callback, bare_jid, device_id): key = '\n'.join([KEY_SESSION, bare_jid, str(device_id)]) - d = self.data.remove(key) - self.setCb(d, callback) + self._call(callback, self.data.remove, key) def loadActiveDevices(self, callback, bare_jid): key = '\n'.join([KEY_ACTIVE_DEVICES, bare_jid]) - d = self.data.get(key, {}) - if callback is not None: - self.setCb(d, callback) - return d + self._call(callback, self.data.get, key, {}) def loadInactiveDevices(self, callback, bare_jid): key = '\n'.join([KEY_INACTIVE_DEVICES, bare_jid]) - d = self.data.get(key, {}) - if callback is not None: - self.setCb(d, callback) - return d + self._call(callback, self.data.get, key, {}) def storeActiveDevices(self, callback, bare_jid, devices): key = '\n'.join([KEY_ACTIVE_DEVICES, bare_jid]) - d = self._checkJid(bare_jid) - d.addCallback(lambda _: self.data.force(key, devices)) - self.setCb(d, callback) + self._call(callback, self.data.force, key, devices, check_jid=bare_jid) def storeInactiveDevices(self, callback, bare_jid, devices): key = '\n'.join([KEY_INACTIVE_DEVICES, bare_jid]) - d = self._checkJid(bare_jid) - d.addCallback(lambda _: self.data.force(key, devices)) - self.setCb(d, callback) + self._call(callback, self.data.force, key, devices, check_jid=bare_jid) def storeTrust(self, callback, bare_jid, device_id, trust): key = '\n'.join([KEY_TRUST, bare_jid, str(device_id)]) - d = self.data.force(key, trust) - self.setCb(d, callback) + self._call(callback, self.data.force, key, trust) def loadTrust(self, callback, bare_jid, device_id): key = '\n'.join([KEY_TRUST, bare_jid, str(device_id)]) - d = self.data.get(key) - if callback is not None: - self.setCb(d, callback) - return d + self._call(callback, self.data.get, key) def listJIDs(self, callback): - d = defer.succeed(self.all_jids) if callback is not None: - self.setCb(d, callback) - return d + callback(True, self.all_jids) def _deleteJID_logResults(self, results): failed = [success for success, __ in results if not success] @@ -266,8 +268,7 @@ d.addCallback(self._deleteJID_logResults) return d - def deleteJID(self, callback, bare_jid): - """Retrieve all (in)actives devices of bare_jid, and delete all related keys""" + def _deleteJID(self, callback, bare_jid): d_list = [] key = '\n'.join([KEY_ACTIVE_DEVICES, bare_jid]) @@ -284,7 +285,10 @@ d.addCallback(self._deleteJID_gotDevices, bare_jid) if callback is not None: self.setCb(d, callback) - return d + + def deleteJID(self, callback, bare_jid): + """Retrieve all (in)actives devices of bare_jid, and delete all related keys""" + reactor.callLater(0, self._deleteJID, callback, bare_jid) class SatOTPKPolicy(omemo.DefaultOTPKPolicy): @@ -728,7 +732,7 @@ while device_id in devices: device_id = random.randint(1, 2**31-1) # and we save it - persistent_dict[KEY_DEVICE_ID] = device_id + await persistent_dict.aset(KEY_DEVICE_ID, device_id) log.debug(f"our OMEMO device id is {device_id}")