comparison sat_frontends/jp/common.py @ 3568:04283582966f

core, frontends: fix invalid translatable strings. Some f-strings where used in translatable text, this has been fixed by using explicit `format()` call (using a script based on `tokenize`). As tokenize messes with spaces, a reformating tool (`black`) has been applied to some files afterwards.
author Goffi <goffi@goffi.org>
date Mon, 14 Jun 2021 18:35:12 +0200
parents be6d91572633
children 82e616b70a2a
comparison
equal deleted inserted replaced
3567:a240748ed686 3568:04283582966f
141 path = Path(path).resolve() 141 path = Path(path).resolve()
142 if not path.is_file: 142 if not path.is_file:
143 raise OSError("path must link to a regular file") 143 raise OSError("path must link to a regular file")
144 if path.parent != getTmpDir(self.sat_conf, self.cat_dir): 144 if path.parent != getTmpDir(self.sat_conf, self.cat_dir):
145 self.disp( 145 self.disp(
146 f"File {path} is not in SàT temporary hierarchy, we do not remove " 146 f"File {path} is not in SàT temporary hierarchy, we do not remove " f"it",
147 f"it",
148 2, 147 2,
149 ) 148 )
150 return 149 return
151 # we have 2 files per draft with use_metadata, so we double max 150 # we have 2 files per draft with use_metadata, so we double max
152 unlink_max = SECURE_UNLINK_MAX * 2 if self.use_metadata else SECURE_UNLINK_MAX 151 unlink_max = SECURE_UNLINK_MAX * 2 if self.use_metadata else SECURE_UNLINK_MAX
153 backup_dir = getTmpDir(self.sat_conf, self.cat_dir, SECURE_UNLINK_DIR) 152 backup_dir = getTmpDir(self.sat_conf, self.cat_dir, SECURE_UNLINK_DIR)
154 if not os.path.exists(backup_dir): 153 if not os.path.exists(backup_dir):
155 os.makedirs(backup_dir) 154 os.makedirs(backup_dir)
156 filename = os.path.basename(path) 155 filename = os.path.basename(path)
157 backup_path = os.path.join(backup_dir, filename) 156 backup_path = os.path.join(backup_dir, filename)
158 # we move file to backup dir 157 # we move file to backup dir
159 self.host.disp( 158 self.host.disp(
160 "Backuping file {src} to {dst}".format( 159 "Backuping file {src} to {dst}".format(src=path, dst=backup_path),
161 src=path, dst=backup_path
162 ),
163 1, 160 1,
164 ) 161 )
165 os.rename(path, backup_path) 162 os.rename(path, backup_path)
166 # and if we exceeded the limit, we remove older file 163 # and if we exceeded the limit, we remove older file
167 backup_files = [os.path.join(backup_dir, f) for f in os.listdir(backup_dir)] 164 backup_files = [os.path.join(backup_dir, f) for f in os.listdir(backup_dir)]
169 backup_files.sort(key=lambda path: os.stat(path).st_mtime) 166 backup_files.sort(key=lambda path: os.stat(path).st_mtime)
170 for path in backup_files[: len(backup_files) - unlink_max]: 167 for path in backup_files[: len(backup_files) - unlink_max]:
171 self.host.disp("Purging backup file {}".format(path), 2) 168 self.host.disp("Purging backup file {}".format(path), 2)
172 os.unlink(path) 169 os.unlink(path)
173 170
174 async def runEditor(self, editor_args_opt, content_file_path, content_file_obj, 171 async def runEditor(
175 meta_file_path=None, meta_ori=None): 172 self,
173 editor_args_opt,
174 content_file_path,
175 content_file_obj,
176 meta_file_path=None,
177 meta_ori=None,
178 ):
176 """Run editor to edit content and metadata 179 """Run editor to edit content and metadata
177 180
178 @param editor_args_opt(unicode): option in [jp] section in configuration for 181 @param editor_args_opt(unicode): option in [jp] section in configuration for
179 specific args 182 specific args
180 @param content_file_path(str): path to the content file 183 @param content_file_path(str): path to the content file
186 """ 189 """
187 if not self.use_metadata: 190 if not self.use_metadata:
188 assert meta_file_path is None 191 assert meta_file_path is None
189 assert meta_ori is None 192 assert meta_ori is None
190 193
191 # we calculate hashes to check for modifications 194 # we calculate hashes to check for modifications
192 import hashlib 195 import hashlib
193 196
194 content_file_obj.seek(0) 197 content_file_obj.seek(0)
195 tmp_ori_hash = hashlib.sha1(content_file_obj.read()).digest() 198 tmp_ori_hash = hashlib.sha1(content_file_obj.read()).digest()
196 content_file_obj.close() 199 content_file_obj.close()
215 parse_kwargs["metadata_file"] = meta_file_path 218 parse_kwargs["metadata_file"] = meta_file_path
216 args = parse_args(self.host, editor_args, **parse_kwargs) 219 args = parse_args(self.host, editor_args, **parse_kwargs)
217 if not args: 220 if not args:
218 args = [content_file_path] 221 args = [content_file_path]
219 222
220 # actual editing 223 # actual editing
221 editor_process = await asyncio.create_subprocess_exec( 224 editor_process = await asyncio.create_subprocess_exec(
222 editor, *[str(a) for a in args]) 225 editor, *[str(a) for a in args]
226 )
223 editor_exit = await editor_process.wait() 227 editor_exit = await editor_process.wait()
224 228
225 # edition will now be checked, and data will be sent if it was a success 229 # edition will now be checked, and data will be sent if it was a success
226 if editor_exit != 0: 230 if editor_exit != 0:
227 self.disp( 231 self.disp(
241 f"Cancelling edition", 245 f"Cancelling edition",
242 error=True, 246 error=True,
243 ) 247 )
244 self.host.quit(C.EXIT_NOT_FOUND) 248 self.host.quit(C.EXIT_NOT_FOUND)
245 249
246 # metadata 250 # metadata
247 if self.use_metadata: 251 if self.use_metadata:
248 try: 252 try:
249 with meta_file_path.open("rb") as f: 253 with meta_file_path.open("rb") as f:
250 metadata = json.load(f) 254 metadata = json.load(f)
251 except (OSError, IOError): 255 except (OSError, IOError):
266 self.host.quit(C.EXIT_DATA_ERROR) 270 self.host.quit(C.EXIT_DATA_ERROR)
267 271
268 if self.use_metadata and not metadata.get("publish", True): 272 if self.use_metadata and not metadata.get("publish", True):
269 self.disp( 273 self.disp(
270 f'Publication blocked by "publish" key in metadata, cancelling ' 274 f'Publication blocked by "publish" key in metadata, cancelling '
271 f'edition.\n\ntemporary file path:\t{content_file_path}\nmetadata ' 275 f"edition.\n\ntemporary file path:\t{content_file_path}\nmetadata "
272 f'file path:\t{meta_file_path}', 276 f"file path:\t{meta_file_path}",
273 error=True, 277 error=True,
274 ) 278 )
275 self.host.quit() 279 self.host.quit()
276 280
277 if len(content) == 0: 281 if len(content) == 0:
287 if self.use_metadata: 291 if self.use_metadata:
288 self.disp(f"Deletion of {meta_file_path}".format(meta_file_path), 2) 292 self.disp(f"Deletion of {meta_file_path}".format(meta_file_path), 2)
289 os.unlink(meta_file_path) 293 os.unlink(meta_file_path)
290 self.host.quit() 294 self.host.quit()
291 295
292 # time to re-check the hash 296 # time to re-check the hash
293 elif tmp_ori_hash == hashlib.sha1(content).digest() and ( 297 elif tmp_ori_hash == hashlib.sha1(content).digest() and (
294 not self.use_metadata or meta_ori == metadata 298 not self.use_metadata or meta_ori == metadata
295 ): 299 ):
296 self.disp("The content has not been modified, cancelling the edition") 300 self.disp("The content has not been modified, cancelling the edition")
297 self.host.quit() 301 self.host.quit()
352 dir=tmp_dir, 356 dir=tmp_dir,
353 text=True, 357 text=True,
354 ) 358 )
355 return os.fdopen(fd, "w+b"), Path(path) 359 return os.fdopen(fd, "w+b"), Path(path)
356 except OSError as e: 360 except OSError as e:
357 self.disp( 361 self.disp(f"Can't create temporary file: {e}", error=True)
358 f"Can't create temporary file: {e}", error=True
359 )
360 self.host.quit(1) 362 self.host.quit(1)
361 363
362 def getCurrentFile(self, profile): 364 def getCurrentFile(self, profile):
363 """Get most recently edited file 365 """Get most recently edited file
364 366
369 # the most recent file corresponding to temp file pattern 371 # the most recent file corresponding to temp file pattern
370 # in tmp_dir, excluding metadata files 372 # in tmp_dir, excluding metadata files
371 tmp_dir = getTmpDir(self.sat_conf, self.cat_dir, profile) 373 tmp_dir = getTmpDir(self.sat_conf, self.cat_dir, profile)
372 available = [ 374 available = [
373 p 375 p
374 for p in tmp_dir.glob(f'{self.cat_dir}_*') 376 for p in tmp_dir.glob(f"{self.cat_dir}_*")
375 if not p.match(f"*{METADATA_SUFF}") 377 if not p.match(f"*{METADATA_SUFF}")
376 ] 378 ]
377 if not available: 379 if not available:
378 self.disp( 380 self.disp(
379 f"Could not find any content draft in {tmp_dir}", 381 f"Could not find any content draft in {tmp_dir}",
434 if last_item: 436 if last_item:
435 self.disp(_("no item found at all, we create a new one"), 2) 437 self.disp(_("no item found at all, we create a new one"), 2)
436 else: 438 else:
437 self.disp( 439 self.disp(
438 _( 440 _(
439 f'item "{item}" not found, we create a new item with' 441 'item "{item}" not found, we create a new item with'
440 f'this id' 442 "this id"
441 ), 443 ).format(item=item),
442 2, 444 2,
443 ) 445 )
444 content_file_obj.seek(0) 446 content_file_obj.seek(0)
445 else: 447 else:
446 self.disp(f"Error while retrieving item: {e}") 448 self.disp(f"Error while retrieving item: {e}")
451 # we already have a draft, 453 # we already have a draft,
452 # we copy item content after it and add an indicator 454 # we copy item content after it and add an indicator
453 content_file_obj.write("\n*****\n") 455 content_file_obj.write("\n*****\n")
454 content_file_obj.write(content.encode("utf-8")) 456 content_file_obj.write(content.encode("utf-8"))
455 content_file_obj.seek(0) 457 content_file_obj.seek(0)
456 self.disp( 458 self.disp(_('item "{item}" found, we edit it').format(item=item), 2)
457 _(f'item "{item}" found, we edit it'), 2
458 )
459 else: 459 else:
460 self.disp("Editing a new item", 2) 460 self.disp("Editing a new item", 2)
461 if self.use_metadata: 461 if self.use_metadata:
462 metadata = None 462 metadata = None
463 463
496 self.rows = [] 496 self.rows = []
497 497
498 size = None 498 size = None
499 if headers: 499 if headers:
500 # we use a namedtuple to make the value easily accessible from filters 500 # we use a namedtuple to make the value easily accessible from filters
501 headers_safe = [re.sub(r'[^a-zA-Z_]', '_', h) for h in headers] 501 headers_safe = [re.sub(r"[^a-zA-Z_]", "_", h) for h in headers]
502 row_cls = namedtuple("RowData", headers_safe) 502 row_cls = namedtuple("RowData", headers_safe)
503 else: 503 else:
504 row_cls = tuple 504 row_cls = tuple
505 505
506 for row_data in data: 506 for row_data in data:
514 else: 514 else:
515 try: 515 try:
516 col_value = filter_(value, row_cls(*row_data_list)) 516 col_value = filter_(value, row_cls(*row_data_list))
517 except TypeError: 517 except TypeError:
518 col_value = filter_(value) 518 col_value = filter_(value)
519 # we count size without ANSI code as they will change length of the 519 # we count size without ANSI code as they will change length of the
520 # string when it's mostly style/color changes. 520 # string when it's mostly style/color changes.
521 col_size = len(regex.ansiRemove(col_value)) 521 col_size = len(regex.ansiRemove(col_value))
522 else: 522 else:
523 col_value = str(value) 523 col_value = str(value)
524 col_size = len(col_value) 524 col_size = len(col_value)
525 new_row.append(col_value) 525 new_row.append(col_value)
560 else: 560 else:
561 raise e 561 raise e
562 562
563 @classmethod 563 @classmethod
564 def fromListDict( 564 def fromListDict(
565 cls, host, data, keys=None, headers=None, filters=None, defaults=None): 565 cls, host, data, keys=None, headers=None, filters=None, defaults=None
566 ):
566 """Create a table from a list of dictionaries 567 """Create a table from a list of dictionaries
567 568
568 each dictionary is a row of the table, keys being columns names. 569 each dictionary is a row of the table, keys being columns names.
569 the whole data will be read and kept into memory, to be printed 570 the whole data will be read and kept into memory, to be printed
570 @param data(list[dict[unicode, unicode]]): data to create the table from 571 @param data(list[dict[unicode, unicode]]): data to create the table from
692 bottom = top 693 bottom = top
693 if bottom_sep is None: 694 if bottom_sep is None:
694 bottom_sep = col_sep_size * bottom 695 bottom_sep = col_sep_size * bottom
695 if not show_borders: 696 if not show_borders:
696 left = right = head_line_left = head_line_right = "" 697 left = right = head_line_left = head_line_right = ""
697 # top border 698 # top border
698 if show_borders: 699 if show_borders:
699 self._disp( 700 self._disp(
700 top_left + top_sep.join([top * size for size in sizes]) + top_right 701 top_left + top_sep.join([top * size for size in sizes]) + top_right
701 ) 702 )
702 703
703 # headers 704 # headers
704 if show_header and self.headers is not None: 705 if show_header and self.headers is not None:
705 self._disp( 706 self._disp(
706 left 707 left
707 + self._headers(head_sep, headers, sizes, head_alignment, head_style) 708 + self._headers(head_sep, headers, sizes, head_alignment, head_style)
708 + right 709 + right
712 head_line_left 713 head_line_left
713 + head_line_sep.join([head_line * size for size in sizes]) 714 + head_line_sep.join([head_line * size for size in sizes])
714 + head_line_right 715 + head_line_right
715 ) 716 )
716 717
717 # content 718 # content
718 if columns_alignment == "left": 719 if columns_alignment == "left":
719 alignment = lambda idx, s: ansi_ljust(s, sizes[idx]) 720 alignment = lambda idx, s: ansi_ljust(s, sizes[idx])
720 elif columns_alignment == "center": 721 elif columns_alignment == "center":
721 alignment = lambda idx, s: ansi_center(s, sizes[idx]) 722 alignment = lambda idx, s: ansi_center(s, sizes[idx])
722 elif columns_alignment == "right": 723 elif columns_alignment == "right":
738 self._disp( 739 self._disp(
739 bottom_left 740 bottom_left
740 + bottom_sep.join([bottom * size for size in sizes]) 741 + bottom_sep.join([bottom * size for size in sizes])
741 + bottom_right 742 + bottom_right
742 ) 743 )
743 #  we return self so string can be used after display (table.display().string) 744 #  we return self so string can be used after display (table.display().string)
744 return self 745 return self
745 746
746 def display_blank(self, **kwargs): 747 def display_blank(self, **kwargs):
747 """Display table without visible borders""" 748 """Display table without visible borders"""
748 kwargs_ = {"col_sep": " ", "head_line_sep": " ", "show_borders": False} 749 kwargs_ = {"col_sep": " ", "head_line_sep": " ", "show_borders": False}
779 780
780 try: 781 try:
781 uri_data = uris_data[key] 782 uri_data = uris_data[key]
782 except KeyError: 783 except KeyError:
783 host.disp( 784 host.disp(
784 _(f"No {key} URI specified for this project, please specify service and " 785 _(
785 f"node"), 786 "No {key} URI specified for this project, please specify service and "
787 "node"
788 ).format(key=key),
786 error=True, 789 error=True,
787 ) 790 )
788 host.quit(C.EXIT_NOT_FOUND) 791 host.quit(C.EXIT_NOT_FOUND)
789 792
790 uri = uri_data["uri"] 793 uri = uri_data["uri"]
791 794
792 # set extra metadata if they are specified 795 # set extra metadata if they are specified
793 for data_key in ['labels']: 796 for data_key in ["labels"]:
794 new_values_json = uri_data.get(data_key) 797 new_values_json = uri_data.get(data_key)
795 if uri_data is not None: 798 if uri_data is not None:
796 if meta_map is None: 799 if meta_map is None:
797 dest = data_key 800 dest = data_key
798 else: 801 else:
801 continue 804 continue
802 805
803 try: 806 try:
804 values = getattr(args, data_key) 807 values = getattr(args, data_key)
805 except AttributeError: 808 except AttributeError:
806 raise exceptions.InternalError(f'there is no {data_key!r} arguments') 809 raise exceptions.InternalError(f"there is no {data_key!r} arguments")
807 else: 810 else:
808 if values is None: 811 if values is None:
809 values = [] 812 values = []
810 values.extend(json.loads(new_values_json)) 813 values.extend(json.loads(new_values_json))
811 setattr(args, dest, values) 814 setattr(args, dest, values)
813 parsed_uri = xmpp_uri.parseXMPPUri(uri) 816 parsed_uri = xmpp_uri.parseXMPPUri(uri)
814 try: 817 try:
815 args.service = parsed_uri["path"] 818 args.service = parsed_uri["path"]
816 args.node = parsed_uri["node"] 819 args.node = parsed_uri["node"]
817 except KeyError: 820 except KeyError:
818 host.disp(_(f"Invalid URI found: {uri}"), error=True) 821 host.disp(_("Invalid URI found: {uri}").format(uri=uri), error=True)
819 host.quit(C.EXIT_DATA_ERROR) 822 host.quit(C.EXIT_DATA_ERROR)