Mercurial > libervia-backend
diff sat_frontends/jp/common.py @ 3040:fee60f17ebac
jp: jp asyncio port:
/!\ this commit is huge. Jp is temporarily not working with `dbus` bridge /!\
This patch implements the port of jp to asyncio, so it is now correctly using the bridge
asynchronously, and it can be used with bridges like `pb`. This also simplify the code,
notably for things which were previously implemented with many callbacks (like pagination
with RSM).
During the process, some behaviours have been modified/fixed, in jp and backends, check
diff for details.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 25 Sep 2019 08:56:41 +0200 |
parents | d314d4181f30 |
children | d9f328374473 |
line wrap: on
line diff
--- a/sat_frontends/jp/common.py Wed Sep 25 08:53:38 2019 +0200 +++ b/sat_frontends/jp/common.py Wed Sep 25 08:56:41 2019 +0200 @@ -17,6 +17,14 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import json +import os +import os.path +import time +import tempfile +import asyncio +import shlex +from pathlib import Path from sat_frontends.jp.constants import Const as C from sat.core.i18n import _ from sat.core import exceptions @@ -26,15 +34,6 @@ from sat.tools import config from configparser import NoSectionError, NoOptionError from collections import namedtuple -from functools import partial -import json -import os -import os.path -import time -import tempfile -import subprocess -import glob -import shlex # defaut arguments used for some known editors (editing with metadata) VIM_SPLIT_ARGS = "-c 'set nospr|vsplit|wincmd w|next|wincmd w'" @@ -76,18 +75,18 @@ """Return directory used to store temporary files @param sat_conf(ConfigParser.ConfigParser): instance opened on sat configuration - @param cat_dir(unicode): directory of the category (e.g. "blog") + @param cat_dir(str): directory of the category (e.g. "blog") @param sub_dir(str): sub directory where data need to be put profile can be used here, or special directory name sub_dir will be escaped to be usable in path (use regex.pathUnescape to find initial str) - @return (str): path to the dir + @return (Path): path to the dir """ local_dir = config.getConfig(sat_conf, "", "local_dir", Exception) - path = [local_dir.encode("utf-8"), cat_dir.encode("utf-8")] + path_elts = [local_dir, cat_dir] if sub_dir is not None: - path.append(regex.pathEscape(sub_dir)) - return os.path.join(*path) + path_elts.append(regex.pathEscape(sub_dir)) + return Path(*path_elts) def parse_args(host, cmd_line, **format_kw): @@ -121,12 +120,14 @@ @param cat_dir(unicode): directory to use for drafts this will be a sub-directory of SàT's local_dir @param use_metadata(bool): True is edition need a second file for metadata - most of signature change with use_metadata with an additional metadata argument. - This is done to raise error if a command needs metadata but forget the flag, and vice versa + most of signature change with use_metadata with an additional metadata + argument. + This is done to raise error if a command needs metadata but forget the flag, + and vice versa """ self.host = host self.sat_conf = config.parseMainConf() - self.cat_dir_str = cat_dir.encode("utf-8") + self.cat_dir = cat_dir self.use_metadata = use_metadata def secureUnlink(self, path): @@ -135,21 +136,21 @@ This method is used to prevent accidental deletion of a draft If there are more file in SECURE_UNLINK_DIR than SECURE_UNLINK_MAX, older file are deleted - @param path(str): file to unlink + @param path(Path, str): file to unlink """ - if not os.path.isfile(path): + path = Path(path).resolve() + if not path.is_file: raise OSError("path must link to a regular file") - if not path.startswith(getTmpDir(self.sat_conf, self.cat_dir_str)): + if path.parent != getTmpDir(self.sat_conf, self.cat_dir): self.disp( - "File {} is not in SàT temporary hierarchy, we do not remove it".format( - path - ), + f"File {path} is not in SàT temporary hierarchy, we do not remove " + f"it", 2, ) return # we have 2 files per draft with use_metadata, so we double max unlink_max = SECURE_UNLINK_MAX * 2 if self.use_metadata else SECURE_UNLINK_MAX - backup_dir = getTmpDir(self.sat_conf, self.cat_dir_str, SECURE_UNLINK_DIR) + backup_dir = getTmpDir(self.sat_conf, self.cat_dir, SECURE_UNLINK_DIR) if not os.path.exists(backup_dir): os.makedirs(backup_dir) filename = os.path.basename(path) @@ -170,21 +171,15 @@ self.host.disp("Purging backup file {}".format(path), 2) os.unlink(path) - def runEditor( - self, - editor_args_opt, - content_file_path, - content_file_obj, - meta_file_path=None, - meta_ori=None, - ): - """run editor to edit content and metadata + async def runEditor(self, editor_args_opt, content_file_path, content_file_obj, + meta_file_path=None, meta_ori=None): + """Run editor to edit content and metadata @param editor_args_opt(unicode): option in [jp] section in configuration for specific args @param content_file_path(str): path to the content file @param content_file_obj(file): opened file instance - @param meta_file_path(str, None): metadata file path + @param meta_file_path(str, Path, None): metadata file path if None metadata will not be used @param meta_ori(dict, None): original cotent of metadata can't be used if use_metadata is False @@ -223,26 +218,27 @@ args = [content_file_path] # actual editing - editor_exit = subprocess.call([editor] + args) + editor_process = await asyncio.create_subprocess_exec( + editor, *[str(a) for a in args]) + editor_exit = await editor_process.wait() # edition will now be checked, and data will be sent if it was a success if editor_exit != 0: self.disp( - "Editor exited with an error code, so temporary file has not be deleted, and item is not published.\nYou can find temporary file at {path}".format( - path=content_file_path - ), + f"Editor exited with an error code, so temporary file has not be " + f"deleted, and item is not published.\nYou can find temporary file " + f"at {content_file_path}", error=True, ) else: # main content try: - with open(content_file_path, "rb") as f: + with content_file_path.open("rb") as f: content = f.read() except (OSError, IOError): self.disp( - "Can read file at {content_path}, have it been deleted?\nCancelling edition".format( - content_path=content_file_path - ), + f"Can read file at {content_file_path}, have it been deleted?\n" + f"Cancelling edition", error=True, ) self.host.quit(C.EXIT_NOT_FOUND) @@ -250,50 +246,46 @@ # metadata if self.use_metadata: try: - with open(meta_file_path, "rb") as f: + with meta_file_path.open("rb") as f: metadata = json.load(f) except (OSError, IOError): self.disp( - "Can read file at {meta_file_path}, have it been deleted?\nCancelling edition".format( - content_path=content_file_path, meta_path=meta_file_path - ), + f"Can read file at {meta_file_path}, have it been deleted?\n" + f"Cancelling edition", error=True, ) self.host.quit(C.EXIT_NOT_FOUND) except ValueError: self.disp( - "Can't parse metadata, please check it is correct JSON format. Cancelling edition.\n" - + "You can find tmp file at {content_path} and temporary meta file at {meta_path}.".format( - content_path=content_file_path, meta_path=meta_file_path - ), + f"Can't parse metadata, please check it is correct JSON format. " + f"Cancelling edition.\nYou can find tmp file at " + f"{content_file_path} and temporary meta file at " + f"{meta_file_path}.", error=True, ) self.host.quit(C.EXIT_DATA_ERROR) if self.use_metadata and not metadata.get("publish", True): self.disp( - 'Publication blocked by "publish" key in metadata, cancelling edition.\n\n' - + "temporary file path:\t{content_path}\nmetadata file path:\t{meta_path}".format( - content_path=content_file_path, meta_path=meta_file_path - ), + f'Publication blocked by "publish" key in metadata, cancelling ' + f'edition.\n\ntemporary file path:\t{content_file_path}\nmetadata ' + f'file path:\t{meta_file_path}', error=True, ) self.host.quit() if len(content) == 0: self.disp("Content is empty, cancelling the edition") - if not content_file_path.startswith( - getTmpDir(self.sat_conf, self.cat_dir_str) - ): + if content_file_path.parent != getTmpDir(self.sat_conf, self.cat_dir): self.disp( "File are not in SàT temporary hierarchy, we do not remove them", 2, ) self.host.quit() - self.disp("Deletion of {}".format(content_file_path), 2) + self.disp(f"Deletion of {content_file_path}", 2) os.unlink(content_file_path) if self.use_metadata: - self.disp("Deletion of {}".format(meta_file_path), 2) + self.disp(f"Deletion of {meta_file_path}".format(meta_file_path), 2) os.unlink(meta_file_path) self.host.quit() @@ -309,24 +301,21 @@ content = content.decode("utf-8-sig") # we use utf-8-sig to avoid BOM try: if self.use_metadata: - self.publish(content, metadata) + await self.publish(content, metadata) else: - self.publish(content) + await self.publish(content) except Exception as e: if self.use_metadata: self.disp( - "Error while sending your item, the temporary files have been kept at {content_path} and {meta_path}: {reason}".format( - content_path=content_file_path, - meta_path=meta_file_path, - reason=e, - ), + f"Error while sending your item, the temporary files have " + f"been kept at {content_file_path} and {meta_file_path}: " + f"{e}", error=True, ) else: self.disp( - "Error while sending your item, the temporary file has been kept at {content_path}: {reason}".format( - content_path=content_file_path, reason=e - ), + f"Error while sending your item, the temporary file has been " + f"kept at {content_file_path}: {e}", error=True, ) self.host.quit(1) @@ -335,41 +324,38 @@ if self.use_metadata: self.secureUnlink(meta_file_path) - def publish(self, content): + async def publish(self, content): # if metadata is needed, publish will be called with it last argument raise NotImplementedError def getTmpFile(self): """Create a temporary file - @param suff (str): suffix to use for the filename - @return (tuple(file, str)): opened (w+b) file object and file path + @return (tuple(file, Path)): opened (w+b) file object and file path """ suff = "." + self.getTmpSuff() - cat_dir_str = self.cat_dir_str - tmp_dir = getTmpDir(self.sat_conf, self.cat_dir_str, self.profile.encode("utf-8")) - if not os.path.exists(tmp_dir): + cat_dir_str = self.cat_dir + tmp_dir = getTmpDir(self.sat_conf, self.cat_dir, self.profile) + if not tmp_dir.exists(): try: - os.makedirs(tmp_dir) + tmp_dir.mkdir(parents=True) except OSError as e: self.disp( - "Can't create {path} directory: {reason}".format( - path=tmp_dir, reason=e - ), + f"Can't create {tmp_dir} directory: {e}", error=True, ) self.host.quit(1) try: fd, path = tempfile.mkstemp( - suffix=suff.encode("utf-8"), + suffix=suff, prefix=time.strftime(cat_dir_str + "_%Y-%m-%d_%H:%M:%S_"), dir=tmp_dir, text=True, ) - return os.fdopen(fd, "w+b"), path + return os.fdopen(fd, "w+b"), Path(path) except OSError as e: self.disp( - "Can't create temporary file: {reason}".format(reason=e), error=True + f"Can't create temporary file: {e}", error=True ) self.host.quit(1) @@ -377,27 +363,26 @@ """Get most recently edited file @param profile(unicode): profile linked to the draft - @return(str): full path of current file + @return(Path): full path of current file """ # we guess the item currently edited by choosing # the most recent file corresponding to temp file pattern # in tmp_dir, excluding metadata files - cat_dir_str = self.cat_dir_str - tmp_dir = getTmpDir(self.sat_conf, self.cat_dir_str, profile.encode("utf-8")) + tmp_dir = getTmpDir(self.sat_conf, self.cat_dir, profile) available = [ - path - for path in glob.glob(os.path.join(tmp_dir, cat_dir_str + "_*")) - if not path.endswith(METADATA_SUFF) + p + for p in tmp_dir.glob(f'{self.cat_dir}_*') + if not p.match(f"*{METADATA_SUFF}") ] if not available: self.disp( - "Could not find any content draft in {path}".format(path=tmp_dir), + f"Could not find any content draft in {tmp_dir}", error=True, ) self.host.quit(1) - return max(available, key=lambda path: os.stat(path).st_mtime) + return max(available, key=lambda p: p.stat().st_mtime) - def getItemData(self, service, node, item): + async def getItemData(self, service, node, item): """return formatted content, metadata (or not if use_metadata is false), and item id""" raise NotImplementedError @@ -405,8 +390,8 @@ """return suffix used for content file""" return "xml" - def getItemPath(self): - """retrieve item path (i.e. service and node) from item argument + async def getItemPath(self): + """Retrieve item path (i.e. service and node) from item argument This method is obviously only useful for edition of PubSub based features """ @@ -419,15 +404,15 @@ # user wants to continue current draft content_file_path = self.getCurrentFile(self.profile) self.disp("Continuing edition of current draft", 2) - content_file_obj = open(content_file_path, "r+b") + content_file_obj = content_file_path.open("r+b") # we seek at the end of file in case of an item already exist # this will write content of the existing item at the end of the draft. # This way no data should be lost. content_file_obj.seek(0, os.SEEK_END) elif self.args.draft_path: # there is an existing draft that we use - content_file_path = os.path.expanduser(self.args.draft_path) - content_file_obj = open(content_file_path, "r+b") + content_file_path = self.args.draft_path.expanduser() + content_file_obj = content_file_path.open("r+b") # we seek at the end for the same reason as above content_file_obj.seek(0, os.SEEK_END) else: @@ -438,9 +423,9 @@ self.disp("Editing requested published item", 2) try: if self.use_metadata: - content, metadata, item = self.getItemData(service, node, item) + content, metadata, item = await self.getItemData(service, node, item) else: - content, item = self.getItemData(service, node, item) + content, item = await self.getItemData(service, node, item) except Exception as e: # FIXME: ugly but we have not good may to check errors in bridge if "item-not-found" in str(e): @@ -451,13 +436,14 @@ else: self.disp( _( - 'item "{item_id}" not found, we create a new item with this id' - ).format(item_id=item), + f'item "{item}" not found, we create a new item with' + f'this id' + ), 2, ) content_file_obj.seek(0) else: - self.disp("Error while retrieving item: {}".format(e)) + self.disp(f"Error while retrieving item: {e}") self.host.quit(C.EXIT_ERROR) else: # item exists, we write content @@ -468,7 +454,7 @@ content_file_obj.write(content.encode("utf-8")) content_file_obj.seek(0) self.disp( - _('item "{item_id}" found, we edit it').format(item_id=item), 2 + _(f'item "{item}" found, we edit it'), 2 ) else: self.disp("Editing a new item", 2) @@ -757,89 +743,70 @@ return self.display(**kwargs_) -class URIFinder(object): - """Helper class to find URIs in well-known locations""" +async def fill_well_known_uri(command, path, key, meta_map=None): + """Look for URIs in well-known location and fill appropriate args if suitable + + @param command(CommandBase): command instance + args of this instance will be updated with found values + @param path(unicode): absolute path to use as a starting point to look for URIs + @param key(unicode): key to look for + @param meta_map(dict, None): if not None, map metadata to arg name + key is metadata used attribute name + value is name to actually use, or None to ignore + use empty dict to only retrieve URI + possible keys are currently: + - labels + """ + args = command.args + if args.service or args.node: + # we only look for URIs if a service and a node are not already specified + return + + host = command.host - def __init__(self, command, path, key, callback, meta_map=None): - """ - @param command(CommandBase): command instance - args of this instance will be updated with found values - @param path(unicode): absolute path to use as a starting point to look for URIs - @param key(unicode): key to look for - @param callback(callable): method to call once URIs are found (or not) - @param meta_map(dict, None): if not None, map metadata to arg name - key is metadata used attribute name - value is name to actually use, or None to ignore - use empty dict to only retrieve URI - possible keys are currently: - - labels - """ - if not command.args.service and not command.args.node: - self.host = command.host - self.args = command.args - self.key = key - self.callback = callback - self.meta_map = meta_map - self.host.bridge.URIFind( - path, - [key], - callback=self.URIFindCb, - errback=partial( - command.errback, - msg=_("can't find " + key + " URI: {}"), - exit_code=C.EXIT_BRIDGE_ERRBACK, - ), - ) - else: - callback() + try: + uris_data = await host.bridge.URIFind(path, [key]) + except Exception as e: + host.disp(f"can't find {key} URI: {e}", error=True) + host.quit(C.EXIT_BRIDGE_ERRBACK) - def setMetadataList(self, uri_data, key): - """Helper method to set list of values from metadata + try: + uri_data = uris_data[key] + except KeyError: + host.disp( + _(f"No {key} URI specified for this project, please specify service and " + f"node"), + error=True, + ) + host.quit(C.EXIT_NOT_FOUND) - @param uri_data(dict): data of the found URI - @param key(unicode): key of the value to retrieve - """ - new_values_json = uri_data.get(key) + uri = uri_data["uri"] + + # set extra metadata if they are specified + for data_key in ['labels']: + new_values_json = uri_data.get(data_key) if uri_data is not None: - if self.meta_map is None: - dest = key + if meta_map is None: + dest = data_key else: - dest = self.meta_map.get(key) + dest = meta_map.get(data_key) if dest is None: - return + continue try: - values = getattr(self.args, key) + values = getattr(args, data_key) except AttributeError: - raise exceptions.InternalError( - 'there is no "{key}" arguments'.format(key=key) - ) + raise exceptions.InternalError(f'there is no {data_key!r} arguments') else: if values is None: values = [] values.extend(json.loads(new_values_json)) - setattr(self.args, dest, values) + setattr(args, dest, values) - def URIFindCb(self, uris_data): - try: - uri_data = uris_data[self.key] - except KeyError: - self.host.disp( - _( - "No {key} URI specified for this project, please specify service and node" - ).format(key=self.key), - error=True, - ) - self.host.quit(C.EXIT_NOT_FOUND) - else: - uri = uri_data["uri"] - - self.setMetadataList(uri_data, "labels") - parsed_uri = xmpp_uri.parseXMPPUri(uri) - try: - self.args.service = parsed_uri["path"] - self.args.node = parsed_uri["node"] - except KeyError: - self.host.disp(_("Invalid URI found: {uri}").format(uri=uri), error=True) - self.host.quit(C.EXIT_DATA_ERROR) - self.callback() + parsed_uri = xmpp_uri.parseXMPPUri(uri) + try: + args.service = parsed_uri["path"] + args.node = parsed_uri["node"] + except KeyError: + host.disp(_(f"Invalid URI found: {uri}"), error=True) + host.quit(C.EXIT_DATA_ERROR)