Mercurial > libervia-backend
comparison sat_frontends/jp/common.py @ 3568:04283582966f
core, frontends: fix invalid translatable strings.
Some f-strings where used in translatable text, this has been fixed by using explicit
`format()` call (using a script based on `tokenize`).
As tokenize messes with spaces, a reformating tool (`black`) has been applied to some
files afterwards.
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 14 Jun 2021 18:35:12 +0200 |
parents | be6d91572633 |
children | 82e616b70a2a |
comparison
equal
deleted
inserted
replaced
3567:a240748ed686 | 3568:04283582966f |
---|---|
141 path = Path(path).resolve() | 141 path = Path(path).resolve() |
142 if not path.is_file: | 142 if not path.is_file: |
143 raise OSError("path must link to a regular file") | 143 raise OSError("path must link to a regular file") |
144 if path.parent != getTmpDir(self.sat_conf, self.cat_dir): | 144 if path.parent != getTmpDir(self.sat_conf, self.cat_dir): |
145 self.disp( | 145 self.disp( |
146 f"File {path} is not in SàT temporary hierarchy, we do not remove " | 146 f"File {path} is not in SàT temporary hierarchy, we do not remove " f"it", |
147 f"it", | |
148 2, | 147 2, |
149 ) | 148 ) |
150 return | 149 return |
151 # we have 2 files per draft with use_metadata, so we double max | 150 # we have 2 files per draft with use_metadata, so we double max |
152 unlink_max = SECURE_UNLINK_MAX * 2 if self.use_metadata else SECURE_UNLINK_MAX | 151 unlink_max = SECURE_UNLINK_MAX * 2 if self.use_metadata else SECURE_UNLINK_MAX |
153 backup_dir = getTmpDir(self.sat_conf, self.cat_dir, SECURE_UNLINK_DIR) | 152 backup_dir = getTmpDir(self.sat_conf, self.cat_dir, SECURE_UNLINK_DIR) |
154 if not os.path.exists(backup_dir): | 153 if not os.path.exists(backup_dir): |
155 os.makedirs(backup_dir) | 154 os.makedirs(backup_dir) |
156 filename = os.path.basename(path) | 155 filename = os.path.basename(path) |
157 backup_path = os.path.join(backup_dir, filename) | 156 backup_path = os.path.join(backup_dir, filename) |
158 # we move file to backup dir | 157 # we move file to backup dir |
159 self.host.disp( | 158 self.host.disp( |
160 "Backuping file {src} to {dst}".format( | 159 "Backuping file {src} to {dst}".format(src=path, dst=backup_path), |
161 src=path, dst=backup_path | |
162 ), | |
163 1, | 160 1, |
164 ) | 161 ) |
165 os.rename(path, backup_path) | 162 os.rename(path, backup_path) |
166 # and if we exceeded the limit, we remove older file | 163 # and if we exceeded the limit, we remove older file |
167 backup_files = [os.path.join(backup_dir, f) for f in os.listdir(backup_dir)] | 164 backup_files = [os.path.join(backup_dir, f) for f in os.listdir(backup_dir)] |
169 backup_files.sort(key=lambda path: os.stat(path).st_mtime) | 166 backup_files.sort(key=lambda path: os.stat(path).st_mtime) |
170 for path in backup_files[: len(backup_files) - unlink_max]: | 167 for path in backup_files[: len(backup_files) - unlink_max]: |
171 self.host.disp("Purging backup file {}".format(path), 2) | 168 self.host.disp("Purging backup file {}".format(path), 2) |
172 os.unlink(path) | 169 os.unlink(path) |
173 | 170 |
174 async def runEditor(self, editor_args_opt, content_file_path, content_file_obj, | 171 async def runEditor( |
175 meta_file_path=None, meta_ori=None): | 172 self, |
173 editor_args_opt, | |
174 content_file_path, | |
175 content_file_obj, | |
176 meta_file_path=None, | |
177 meta_ori=None, | |
178 ): | |
176 """Run editor to edit content and metadata | 179 """Run editor to edit content and metadata |
177 | 180 |
178 @param editor_args_opt(unicode): option in [jp] section in configuration for | 181 @param editor_args_opt(unicode): option in [jp] section in configuration for |
179 specific args | 182 specific args |
180 @param content_file_path(str): path to the content file | 183 @param content_file_path(str): path to the content file |
186 """ | 189 """ |
187 if not self.use_metadata: | 190 if not self.use_metadata: |
188 assert meta_file_path is None | 191 assert meta_file_path is None |
189 assert meta_ori is None | 192 assert meta_ori is None |
190 | 193 |
191 # we calculate hashes to check for modifications | 194 # we calculate hashes to check for modifications |
192 import hashlib | 195 import hashlib |
193 | 196 |
194 content_file_obj.seek(0) | 197 content_file_obj.seek(0) |
195 tmp_ori_hash = hashlib.sha1(content_file_obj.read()).digest() | 198 tmp_ori_hash = hashlib.sha1(content_file_obj.read()).digest() |
196 content_file_obj.close() | 199 content_file_obj.close() |
215 parse_kwargs["metadata_file"] = meta_file_path | 218 parse_kwargs["metadata_file"] = meta_file_path |
216 args = parse_args(self.host, editor_args, **parse_kwargs) | 219 args = parse_args(self.host, editor_args, **parse_kwargs) |
217 if not args: | 220 if not args: |
218 args = [content_file_path] | 221 args = [content_file_path] |
219 | 222 |
220 # actual editing | 223 # actual editing |
221 editor_process = await asyncio.create_subprocess_exec( | 224 editor_process = await asyncio.create_subprocess_exec( |
222 editor, *[str(a) for a in args]) | 225 editor, *[str(a) for a in args] |
226 ) | |
223 editor_exit = await editor_process.wait() | 227 editor_exit = await editor_process.wait() |
224 | 228 |
225 # edition will now be checked, and data will be sent if it was a success | 229 # edition will now be checked, and data will be sent if it was a success |
226 if editor_exit != 0: | 230 if editor_exit != 0: |
227 self.disp( | 231 self.disp( |
241 f"Cancelling edition", | 245 f"Cancelling edition", |
242 error=True, | 246 error=True, |
243 ) | 247 ) |
244 self.host.quit(C.EXIT_NOT_FOUND) | 248 self.host.quit(C.EXIT_NOT_FOUND) |
245 | 249 |
246 # metadata | 250 # metadata |
247 if self.use_metadata: | 251 if self.use_metadata: |
248 try: | 252 try: |
249 with meta_file_path.open("rb") as f: | 253 with meta_file_path.open("rb") as f: |
250 metadata = json.load(f) | 254 metadata = json.load(f) |
251 except (OSError, IOError): | 255 except (OSError, IOError): |
266 self.host.quit(C.EXIT_DATA_ERROR) | 270 self.host.quit(C.EXIT_DATA_ERROR) |
267 | 271 |
268 if self.use_metadata and not metadata.get("publish", True): | 272 if self.use_metadata and not metadata.get("publish", True): |
269 self.disp( | 273 self.disp( |
270 f'Publication blocked by "publish" key in metadata, cancelling ' | 274 f'Publication blocked by "publish" key in metadata, cancelling ' |
271 f'edition.\n\ntemporary file path:\t{content_file_path}\nmetadata ' | 275 f"edition.\n\ntemporary file path:\t{content_file_path}\nmetadata " |
272 f'file path:\t{meta_file_path}', | 276 f"file path:\t{meta_file_path}", |
273 error=True, | 277 error=True, |
274 ) | 278 ) |
275 self.host.quit() | 279 self.host.quit() |
276 | 280 |
277 if len(content) == 0: | 281 if len(content) == 0: |
287 if self.use_metadata: | 291 if self.use_metadata: |
288 self.disp(f"Deletion of {meta_file_path}".format(meta_file_path), 2) | 292 self.disp(f"Deletion of {meta_file_path}".format(meta_file_path), 2) |
289 os.unlink(meta_file_path) | 293 os.unlink(meta_file_path) |
290 self.host.quit() | 294 self.host.quit() |
291 | 295 |
292 # time to re-check the hash | 296 # time to re-check the hash |
293 elif tmp_ori_hash == hashlib.sha1(content).digest() and ( | 297 elif tmp_ori_hash == hashlib.sha1(content).digest() and ( |
294 not self.use_metadata or meta_ori == metadata | 298 not self.use_metadata or meta_ori == metadata |
295 ): | 299 ): |
296 self.disp("The content has not been modified, cancelling the edition") | 300 self.disp("The content has not been modified, cancelling the edition") |
297 self.host.quit() | 301 self.host.quit() |
352 dir=tmp_dir, | 356 dir=tmp_dir, |
353 text=True, | 357 text=True, |
354 ) | 358 ) |
355 return os.fdopen(fd, "w+b"), Path(path) | 359 return os.fdopen(fd, "w+b"), Path(path) |
356 except OSError as e: | 360 except OSError as e: |
357 self.disp( | 361 self.disp(f"Can't create temporary file: {e}", error=True) |
358 f"Can't create temporary file: {e}", error=True | |
359 ) | |
360 self.host.quit(1) | 362 self.host.quit(1) |
361 | 363 |
362 def getCurrentFile(self, profile): | 364 def getCurrentFile(self, profile): |
363 """Get most recently edited file | 365 """Get most recently edited file |
364 | 366 |
369 # the most recent file corresponding to temp file pattern | 371 # the most recent file corresponding to temp file pattern |
370 # in tmp_dir, excluding metadata files | 372 # in tmp_dir, excluding metadata files |
371 tmp_dir = getTmpDir(self.sat_conf, self.cat_dir, profile) | 373 tmp_dir = getTmpDir(self.sat_conf, self.cat_dir, profile) |
372 available = [ | 374 available = [ |
373 p | 375 p |
374 for p in tmp_dir.glob(f'{self.cat_dir}_*') | 376 for p in tmp_dir.glob(f"{self.cat_dir}_*") |
375 if not p.match(f"*{METADATA_SUFF}") | 377 if not p.match(f"*{METADATA_SUFF}") |
376 ] | 378 ] |
377 if not available: | 379 if not available: |
378 self.disp( | 380 self.disp( |
379 f"Could not find any content draft in {tmp_dir}", | 381 f"Could not find any content draft in {tmp_dir}", |
434 if last_item: | 436 if last_item: |
435 self.disp(_("no item found at all, we create a new one"), 2) | 437 self.disp(_("no item found at all, we create a new one"), 2) |
436 else: | 438 else: |
437 self.disp( | 439 self.disp( |
438 _( | 440 _( |
439 f'item "{item}" not found, we create a new item with' | 441 'item "{item}" not found, we create a new item with' |
440 f'this id' | 442 "this id" |
441 ), | 443 ).format(item=item), |
442 2, | 444 2, |
443 ) | 445 ) |
444 content_file_obj.seek(0) | 446 content_file_obj.seek(0) |
445 else: | 447 else: |
446 self.disp(f"Error while retrieving item: {e}") | 448 self.disp(f"Error while retrieving item: {e}") |
451 # we already have a draft, | 453 # we already have a draft, |
452 # we copy item content after it and add an indicator | 454 # we copy item content after it and add an indicator |
453 content_file_obj.write("\n*****\n") | 455 content_file_obj.write("\n*****\n") |
454 content_file_obj.write(content.encode("utf-8")) | 456 content_file_obj.write(content.encode("utf-8")) |
455 content_file_obj.seek(0) | 457 content_file_obj.seek(0) |
456 self.disp( | 458 self.disp(_('item "{item}" found, we edit it').format(item=item), 2) |
457 _(f'item "{item}" found, we edit it'), 2 | |
458 ) | |
459 else: | 459 else: |
460 self.disp("Editing a new item", 2) | 460 self.disp("Editing a new item", 2) |
461 if self.use_metadata: | 461 if self.use_metadata: |
462 metadata = None | 462 metadata = None |
463 | 463 |
496 self.rows = [] | 496 self.rows = [] |
497 | 497 |
498 size = None | 498 size = None |
499 if headers: | 499 if headers: |
500 # we use a namedtuple to make the value easily accessible from filters | 500 # we use a namedtuple to make the value easily accessible from filters |
501 headers_safe = [re.sub(r'[^a-zA-Z_]', '_', h) for h in headers] | 501 headers_safe = [re.sub(r"[^a-zA-Z_]", "_", h) for h in headers] |
502 row_cls = namedtuple("RowData", headers_safe) | 502 row_cls = namedtuple("RowData", headers_safe) |
503 else: | 503 else: |
504 row_cls = tuple | 504 row_cls = tuple |
505 | 505 |
506 for row_data in data: | 506 for row_data in data: |
514 else: | 514 else: |
515 try: | 515 try: |
516 col_value = filter_(value, row_cls(*row_data_list)) | 516 col_value = filter_(value, row_cls(*row_data_list)) |
517 except TypeError: | 517 except TypeError: |
518 col_value = filter_(value) | 518 col_value = filter_(value) |
519 # we count size without ANSI code as they will change length of the | 519 # we count size without ANSI code as they will change length of the |
520 # string when it's mostly style/color changes. | 520 # string when it's mostly style/color changes. |
521 col_size = len(regex.ansiRemove(col_value)) | 521 col_size = len(regex.ansiRemove(col_value)) |
522 else: | 522 else: |
523 col_value = str(value) | 523 col_value = str(value) |
524 col_size = len(col_value) | 524 col_size = len(col_value) |
525 new_row.append(col_value) | 525 new_row.append(col_value) |
560 else: | 560 else: |
561 raise e | 561 raise e |
562 | 562 |
563 @classmethod | 563 @classmethod |
564 def fromListDict( | 564 def fromListDict( |
565 cls, host, data, keys=None, headers=None, filters=None, defaults=None): | 565 cls, host, data, keys=None, headers=None, filters=None, defaults=None |
566 ): | |
566 """Create a table from a list of dictionaries | 567 """Create a table from a list of dictionaries |
567 | 568 |
568 each dictionary is a row of the table, keys being columns names. | 569 each dictionary is a row of the table, keys being columns names. |
569 the whole data will be read and kept into memory, to be printed | 570 the whole data will be read and kept into memory, to be printed |
570 @param data(list[dict[unicode, unicode]]): data to create the table from | 571 @param data(list[dict[unicode, unicode]]): data to create the table from |
692 bottom = top | 693 bottom = top |
693 if bottom_sep is None: | 694 if bottom_sep is None: |
694 bottom_sep = col_sep_size * bottom | 695 bottom_sep = col_sep_size * bottom |
695 if not show_borders: | 696 if not show_borders: |
696 left = right = head_line_left = head_line_right = "" | 697 left = right = head_line_left = head_line_right = "" |
697 # top border | 698 # top border |
698 if show_borders: | 699 if show_borders: |
699 self._disp( | 700 self._disp( |
700 top_left + top_sep.join([top * size for size in sizes]) + top_right | 701 top_left + top_sep.join([top * size for size in sizes]) + top_right |
701 ) | 702 ) |
702 | 703 |
703 # headers | 704 # headers |
704 if show_header and self.headers is not None: | 705 if show_header and self.headers is not None: |
705 self._disp( | 706 self._disp( |
706 left | 707 left |
707 + self._headers(head_sep, headers, sizes, head_alignment, head_style) | 708 + self._headers(head_sep, headers, sizes, head_alignment, head_style) |
708 + right | 709 + right |
712 head_line_left | 713 head_line_left |
713 + head_line_sep.join([head_line * size for size in sizes]) | 714 + head_line_sep.join([head_line * size for size in sizes]) |
714 + head_line_right | 715 + head_line_right |
715 ) | 716 ) |
716 | 717 |
717 # content | 718 # content |
718 if columns_alignment == "left": | 719 if columns_alignment == "left": |
719 alignment = lambda idx, s: ansi_ljust(s, sizes[idx]) | 720 alignment = lambda idx, s: ansi_ljust(s, sizes[idx]) |
720 elif columns_alignment == "center": | 721 elif columns_alignment == "center": |
721 alignment = lambda idx, s: ansi_center(s, sizes[idx]) | 722 alignment = lambda idx, s: ansi_center(s, sizes[idx]) |
722 elif columns_alignment == "right": | 723 elif columns_alignment == "right": |
738 self._disp( | 739 self._disp( |
739 bottom_left | 740 bottom_left |
740 + bottom_sep.join([bottom * size for size in sizes]) | 741 + bottom_sep.join([bottom * size for size in sizes]) |
741 + bottom_right | 742 + bottom_right |
742 ) | 743 ) |
743 # we return self so string can be used after display (table.display().string) | 744 # we return self so string can be used after display (table.display().string) |
744 return self | 745 return self |
745 | 746 |
746 def display_blank(self, **kwargs): | 747 def display_blank(self, **kwargs): |
747 """Display table without visible borders""" | 748 """Display table without visible borders""" |
748 kwargs_ = {"col_sep": " ", "head_line_sep": " ", "show_borders": False} | 749 kwargs_ = {"col_sep": " ", "head_line_sep": " ", "show_borders": False} |
779 | 780 |
780 try: | 781 try: |
781 uri_data = uris_data[key] | 782 uri_data = uris_data[key] |
782 except KeyError: | 783 except KeyError: |
783 host.disp( | 784 host.disp( |
784 _(f"No {key} URI specified for this project, please specify service and " | 785 _( |
785 f"node"), | 786 "No {key} URI specified for this project, please specify service and " |
787 "node" | |
788 ).format(key=key), | |
786 error=True, | 789 error=True, |
787 ) | 790 ) |
788 host.quit(C.EXIT_NOT_FOUND) | 791 host.quit(C.EXIT_NOT_FOUND) |
789 | 792 |
790 uri = uri_data["uri"] | 793 uri = uri_data["uri"] |
791 | 794 |
792 # set extra metadata if they are specified | 795 # set extra metadata if they are specified |
793 for data_key in ['labels']: | 796 for data_key in ["labels"]: |
794 new_values_json = uri_data.get(data_key) | 797 new_values_json = uri_data.get(data_key) |
795 if uri_data is not None: | 798 if uri_data is not None: |
796 if meta_map is None: | 799 if meta_map is None: |
797 dest = data_key | 800 dest = data_key |
798 else: | 801 else: |
801 continue | 804 continue |
802 | 805 |
803 try: | 806 try: |
804 values = getattr(args, data_key) | 807 values = getattr(args, data_key) |
805 except AttributeError: | 808 except AttributeError: |
806 raise exceptions.InternalError(f'there is no {data_key!r} arguments') | 809 raise exceptions.InternalError(f"there is no {data_key!r} arguments") |
807 else: | 810 else: |
808 if values is None: | 811 if values is None: |
809 values = [] | 812 values = [] |
810 values.extend(json.loads(new_values_json)) | 813 values.extend(json.loads(new_values_json)) |
811 setattr(args, dest, values) | 814 setattr(args, dest, values) |
813 parsed_uri = xmpp_uri.parseXMPPUri(uri) | 816 parsed_uri = xmpp_uri.parseXMPPUri(uri) |
814 try: | 817 try: |
815 args.service = parsed_uri["path"] | 818 args.service = parsed_uri["path"] |
816 args.node = parsed_uri["node"] | 819 args.node = parsed_uri["node"] |
817 except KeyError: | 820 except KeyError: |
818 host.disp(_(f"Invalid URI found: {uri}"), error=True) | 821 host.disp(_("Invalid URI found: {uri}").format(uri=uri), error=True) |
819 host.quit(C.EXIT_DATA_ERROR) | 822 host.quit(C.EXIT_DATA_ERROR) |