Mercurial > libervia-backend
comparison libervia/cli/common.py @ 4075:47401850dec6
refactoring: rename `libervia.frontends.jp` to `libervia.cli`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 14:54:26 +0200 |
parents | libervia/frontends/jp/common.py@26b7ed2817da |
children | 0d7bb4df2343 |
comparison
equal
deleted
inserted
replaced
4074:26b7ed2817da | 4075:47401850dec6 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 | |
4 # Libervia CLI | |
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
6 | |
7 # This program is free software: you can redistribute it and/or modify | |
8 # it under the terms of the GNU Affero General Public License as published by | |
9 # the Free Software Foundation, either version 3 of the License, or | |
10 # (at your option) any later version. | |
11 | |
12 # This program is distributed in the hope that it will be useful, | |
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
15 # GNU Affero General Public License for more details. | |
16 | |
17 # You should have received a copy of the GNU Affero General Public License | |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
19 | |
20 import json | |
21 import os | |
22 import os.path | |
23 import time | |
24 import tempfile | |
25 import asyncio | |
26 import shlex | |
27 import re | |
28 from pathlib import Path | |
29 from libervia.cli.constants import Const as C | |
30 from libervia.backend.core.i18n import _ | |
31 from libervia.backend.core import exceptions | |
32 from libervia.backend.tools.common import regex | |
33 from libervia.backend.tools.common.ansi import ANSI as A | |
34 from libervia.backend.tools.common import uri as xmpp_uri | |
35 from libervia.backend.tools import config | |
36 from configparser import NoSectionError, NoOptionError | |
37 from collections import namedtuple | |
38 | |
39 # default arguments used for some known editors (editing with metadata) | |
40 VIM_SPLIT_ARGS = "-c 'set nospr|vsplit|wincmd w|next|wincmd w'" | |
41 EMACS_SPLIT_ARGS = '--eval "(split-window-horizontally)"' | |
42 EDITOR_ARGS_MAGIC = { | |
43 "vim": VIM_SPLIT_ARGS + " {content_file} {metadata_file}", | |
44 "nvim": VIM_SPLIT_ARGS + " {content_file} {metadata_file}", | |
45 "gvim": VIM_SPLIT_ARGS + " --nofork {content_file} {metadata_file}", | |
46 "emacs": EMACS_SPLIT_ARGS + " {content_file} {metadata_file}", | |
47 "xemacs": EMACS_SPLIT_ARGS + " {content_file} {metadata_file}", | |
48 "nano": " -F {content_file} {metadata_file}", | |
49 } | |
50 | |
51 SECURE_UNLINK_MAX = 10 | |
52 SECURE_UNLINK_DIR = ".backup" | |
53 METADATA_SUFF = "_metadata.json" | |
54 | |
55 | |
56 def format_time(timestamp): | |
57 """Return formatted date for timestamp | |
58 | |
59 @param timestamp(str,int,float): unix timestamp | |
60 @return (unicode): formatted date | |
61 """ | |
62 fmt = "%d/%m/%Y %H:%M:%S %Z" | |
63 return time.strftime(fmt, time.localtime(float(timestamp))) | |
64 | |
65 | |
66 def ansi_ljust(s, width): | |
67 """ljust method handling ANSI escape codes""" | |
68 cleaned = regex.ansi_remove(s) | |
69 return s + " " * (width - len(cleaned)) | |
70 | |
71 | |
72 def ansi_center(s, width): | |
73 """ljust method handling ANSI escape codes""" | |
74 cleaned = regex.ansi_remove(s) | |
75 diff = width - len(cleaned) | |
76 half = diff / 2 | |
77 return half * " " + s + (half + diff % 2) * " " | |
78 | |
79 | |
80 def ansi_rjust(s, width): | |
81 """ljust method handling ANSI escape codes""" | |
82 cleaned = regex.ansi_remove(s) | |
83 return " " * (width - len(cleaned)) + s | |
84 | |
85 | |
86 def get_tmp_dir(sat_conf, cat_dir, sub_dir=None): | |
87 """Return directory used to store temporary files | |
88 | |
89 @param sat_conf(ConfigParser.ConfigParser): instance opened on sat configuration | |
90 @param cat_dir(str): directory of the category (e.g. "blog") | |
91 @param sub_dir(str): sub directory where data need to be put | |
92 profile can be used here, or special directory name | |
93 sub_dir will be escaped to be usable in path (use regex.path_unescape to find | |
94 initial str) | |
95 @return (Path): path to the dir | |
96 """ | |
97 local_dir = config.config_get(sat_conf, "", "local_dir", Exception) | |
98 path_elts = [local_dir, cat_dir] | |
99 if sub_dir is not None: | |
100 path_elts.append(regex.path_escape(sub_dir)) | |
101 return Path(*path_elts) | |
102 | |
103 | |
104 def parse_args(host, cmd_line, **format_kw): | |
105 """Parse command arguments | |
106 | |
107 @param cmd_line(unicode): command line as found in sat.conf | |
108 @param format_kw: keywords used for formating | |
109 @return (list(unicode)): list of arguments to pass to subprocess function | |
110 """ | |
111 try: | |
112 # we split the arguments and add the known fields | |
113 # we split arguments first to avoid escaping issues in file names | |
114 return [a.format(**format_kw) for a in shlex.split(cmd_line)] | |
115 except ValueError as e: | |
116 host.disp( | |
117 "Couldn't parse editor cmd [{cmd}]: {reason}".format(cmd=cmd_line, reason=e) | |
118 ) | |
119 return [] | |
120 | |
121 | |
122 class BaseEdit(object): | |
123 """base class for editing commands | |
124 | |
125 This class allows to edit file for PubSub or something else. | |
126 It works with temporary files in SàT local_dir, in a "cat_dir" subdir | |
127 """ | |
128 | |
129 def __init__(self, host, cat_dir, use_metadata=False): | |
130 """ | |
131 @param sat_conf(ConfigParser.ConfigParser): instance opened on sat configuration | |
132 @param cat_dir(unicode): directory to use for drafts | |
133 this will be a sub-directory of SàT's local_dir | |
134 @param use_metadata(bool): True is edition need a second file for metadata | |
135 most of signature change with use_metadata with an additional metadata | |
136 argument. | |
137 This is done to raise error if a command needs metadata but forget the flag, | |
138 and vice versa | |
139 """ | |
140 self.host = host | |
141 self.cat_dir = cat_dir | |
142 self.use_metadata = use_metadata | |
143 | |
144 def secure_unlink(self, path): | |
145 """Unlink given path after keeping it for a while | |
146 | |
147 This method is used to prevent accidental deletion of a draft | |
148 If there are more file in SECURE_UNLINK_DIR than SECURE_UNLINK_MAX, | |
149 older file are deleted | |
150 @param path(Path, str): file to unlink | |
151 """ | |
152 path = Path(path).resolve() | |
153 if not path.is_file: | |
154 raise OSError("path must link to a regular file") | |
155 if path.parent != get_tmp_dir(self.sat_conf, self.cat_dir): | |
156 self.disp( | |
157 f"File {path} is not in SàT temporary hierarchy, we do not remove " f"it", | |
158 2, | |
159 ) | |
160 return | |
161 # we have 2 files per draft with use_metadata, so we double max | |
162 unlink_max = SECURE_UNLINK_MAX * 2 if self.use_metadata else SECURE_UNLINK_MAX | |
163 backup_dir = get_tmp_dir(self.sat_conf, self.cat_dir, SECURE_UNLINK_DIR) | |
164 if not os.path.exists(backup_dir): | |
165 os.makedirs(backup_dir) | |
166 filename = os.path.basename(path) | |
167 backup_path = os.path.join(backup_dir, filename) | |
168 # we move file to backup dir | |
169 self.host.disp( | |
170 "Backuping file {src} to {dst}".format(src=path, dst=backup_path), | |
171 1, | |
172 ) | |
173 os.rename(path, backup_path) | |
174 # and if we exceeded the limit, we remove older file | |
175 backup_files = [os.path.join(backup_dir, f) for f in os.listdir(backup_dir)] | |
176 if len(backup_files) > unlink_max: | |
177 backup_files.sort(key=lambda path: os.stat(path).st_mtime) | |
178 for path in backup_files[: len(backup_files) - unlink_max]: | |
179 self.host.disp("Purging backup file {}".format(path), 2) | |
180 os.unlink(path) | |
181 | |
182 async def run_editor( | |
183 self, | |
184 editor_args_opt, | |
185 content_file_path, | |
186 content_file_obj, | |
187 meta_file_path=None, | |
188 meta_ori=None, | |
189 ): | |
190 """Run editor to edit content and metadata | |
191 | |
192 @param editor_args_opt(unicode): option in [cli] section in configuration for | |
193 specific args | |
194 @param content_file_path(str): path to the content file | |
195 @param content_file_obj(file): opened file instance | |
196 @param meta_file_path(str, Path, None): metadata file path | |
197 if None metadata will not be used | |
198 @param meta_ori(dict, None): original cotent of metadata | |
199 can't be used if use_metadata is False | |
200 """ | |
201 if not self.use_metadata: | |
202 assert meta_file_path is None | |
203 assert meta_ori is None | |
204 | |
205 # we calculate hashes to check for modifications | |
206 import hashlib | |
207 | |
208 content_file_obj.seek(0) | |
209 tmp_ori_hash = hashlib.sha1(content_file_obj.read()).digest() | |
210 content_file_obj.close() | |
211 | |
212 # we prepare arguments | |
213 editor = config.config_get(self.sat_conf, C.CONFIG_SECTION, "editor") or os.getenv( | |
214 "EDITOR", "vi" | |
215 ) | |
216 try: | |
217 # is there custom arguments in sat.conf ? | |
218 editor_args = config.config_get( | |
219 self.sat_conf, C.CONFIG_SECTION, editor_args_opt, Exception | |
220 ) | |
221 except (NoOptionError, NoSectionError): | |
222 # no, we check if we know the editor and have special arguments | |
223 if self.use_metadata: | |
224 editor_args = EDITOR_ARGS_MAGIC.get(os.path.basename(editor), "") | |
225 else: | |
226 editor_args = "" | |
227 parse_kwargs = {"content_file": content_file_path} | |
228 if self.use_metadata: | |
229 parse_kwargs["metadata_file"] = meta_file_path | |
230 args = parse_args(self.host, editor_args, **parse_kwargs) | |
231 if not args: | |
232 args = [content_file_path] | |
233 | |
234 # actual editing | |
235 editor_process = await asyncio.create_subprocess_exec( | |
236 editor, *[str(a) for a in args] | |
237 ) | |
238 editor_exit = await editor_process.wait() | |
239 | |
240 # edition will now be checked, and data will be sent if it was a success | |
241 if editor_exit != 0: | |
242 self.disp( | |
243 f"Editor exited with an error code, so temporary file has not be " | |
244 f"deleted, and item is not published.\nYou can find temporary file " | |
245 f"at {content_file_path}", | |
246 error=True, | |
247 ) | |
248 else: | |
249 # main content | |
250 try: | |
251 with content_file_path.open("rb") as f: | |
252 content = f.read() | |
253 except (OSError, IOError): | |
254 self.disp( | |
255 f"Can read file at {content_file_path}, have it been deleted?\n" | |
256 f"Cancelling edition", | |
257 error=True, | |
258 ) | |
259 self.host.quit(C.EXIT_NOT_FOUND) | |
260 | |
261 # metadata | |
262 if self.use_metadata: | |
263 try: | |
264 with meta_file_path.open("rb") as f: | |
265 metadata = json.load(f) | |
266 except (OSError, IOError): | |
267 self.disp( | |
268 f"Can read file at {meta_file_path}, have it been deleted?\n" | |
269 f"Cancelling edition", | |
270 error=True, | |
271 ) | |
272 self.host.quit(C.EXIT_NOT_FOUND) | |
273 except ValueError: | |
274 self.disp( | |
275 f"Can't parse metadata, please check it is correct JSON format. " | |
276 f"Cancelling edition.\nYou can find tmp file at " | |
277 f"{content_file_path} and temporary meta file at " | |
278 f"{meta_file_path}.", | |
279 error=True, | |
280 ) | |
281 self.host.quit(C.EXIT_DATA_ERROR) | |
282 | |
283 if self.use_metadata and not metadata.get("publish", True): | |
284 self.disp( | |
285 f'Publication blocked by "publish" key in metadata, cancelling ' | |
286 f"edition.\n\ntemporary file path:\t{content_file_path}\nmetadata " | |
287 f"file path:\t{meta_file_path}", | |
288 error=True, | |
289 ) | |
290 self.host.quit() | |
291 | |
292 if len(content) == 0: | |
293 self.disp("Content is empty, cancelling the edition") | |
294 if content_file_path.parent != get_tmp_dir(self.sat_conf, self.cat_dir): | |
295 self.disp( | |
296 "File are not in SàT temporary hierarchy, we do not remove them", | |
297 2, | |
298 ) | |
299 self.host.quit() | |
300 self.disp(f"Deletion of {content_file_path}", 2) | |
301 os.unlink(content_file_path) | |
302 if self.use_metadata: | |
303 self.disp(f"Deletion of {meta_file_path}".format(meta_file_path), 2) | |
304 os.unlink(meta_file_path) | |
305 self.host.quit() | |
306 | |
307 # time to re-check the hash | |
308 elif tmp_ori_hash == hashlib.sha1(content).digest() and ( | |
309 not self.use_metadata or meta_ori == metadata | |
310 ): | |
311 self.disp("The content has not been modified, cancelling the edition") | |
312 self.host.quit() | |
313 | |
314 else: | |
315 # we can now send the item | |
316 content = content.decode("utf-8-sig") # we use utf-8-sig to avoid BOM | |
317 try: | |
318 if self.use_metadata: | |
319 await self.publish(content, metadata) | |
320 else: | |
321 await self.publish(content) | |
322 except Exception as e: | |
323 if self.use_metadata: | |
324 self.disp( | |
325 f"Error while sending your item, the temporary files have " | |
326 f"been kept at {content_file_path} and {meta_file_path}: " | |
327 f"{e}", | |
328 error=True, | |
329 ) | |
330 else: | |
331 self.disp( | |
332 f"Error while sending your item, the temporary file has been " | |
333 f"kept at {content_file_path}: {e}", | |
334 error=True, | |
335 ) | |
336 self.host.quit(1) | |
337 | |
338 self.secure_unlink(content_file_path) | |
339 if self.use_metadata: | |
340 self.secure_unlink(meta_file_path) | |
341 | |
342 async def publish(self, content): | |
343 # if metadata is needed, publish will be called with it last argument | |
344 raise NotImplementedError | |
345 | |
346 def get_tmp_file(self): | |
347 """Create a temporary file | |
348 | |
349 @return (tuple(file, Path)): opened (w+b) file object and file path | |
350 """ | |
351 suff = "." + self.get_tmp_suff() | |
352 cat_dir_str = self.cat_dir | |
353 tmp_dir = get_tmp_dir(self.sat_conf, self.cat_dir, self.profile) | |
354 if not tmp_dir.exists(): | |
355 try: | |
356 tmp_dir.mkdir(parents=True) | |
357 except OSError as e: | |
358 self.disp( | |
359 f"Can't create {tmp_dir} directory: {e}", | |
360 error=True, | |
361 ) | |
362 self.host.quit(1) | |
363 try: | |
364 fd, path = tempfile.mkstemp( | |
365 suffix=suff, | |
366 prefix=time.strftime(cat_dir_str + "_%Y-%m-%d_%H:%M:%S_"), | |
367 dir=tmp_dir, | |
368 text=True, | |
369 ) | |
370 return os.fdopen(fd, "w+b"), Path(path) | |
371 except OSError as e: | |
372 self.disp(f"Can't create temporary file: {e}", error=True) | |
373 self.host.quit(1) | |
374 | |
375 def get_current_file(self, profile): | |
376 """Get most recently edited file | |
377 | |
378 @param profile(unicode): profile linked to the draft | |
379 @return(Path): full path of current file | |
380 """ | |
381 # we guess the item currently edited by choosing | |
382 # the most recent file corresponding to temp file pattern | |
383 # in tmp_dir, excluding metadata files | |
384 tmp_dir = get_tmp_dir(self.sat_conf, self.cat_dir, profile) | |
385 available = [ | |
386 p | |
387 for p in tmp_dir.glob(f"{self.cat_dir}_*") | |
388 if not p.match(f"*{METADATA_SUFF}") | |
389 ] | |
390 if not available: | |
391 self.disp( | |
392 f"Could not find any content draft in {tmp_dir}", | |
393 error=True, | |
394 ) | |
395 self.host.quit(1) | |
396 return max(available, key=lambda p: p.stat().st_mtime) | |
397 | |
398 async def get_item_data(self, service, node, item): | |
399 """return formatted content, metadata (or not if use_metadata is false), and item id""" | |
400 raise NotImplementedError | |
401 | |
402 def get_tmp_suff(self): | |
403 """return suffix used for content file""" | |
404 return "xml" | |
405 | |
406 async def get_item_path(self): | |
407 """Retrieve item path (i.e. service and node) from item argument | |
408 | |
409 This method is obviously only useful for edition of PubSub based features | |
410 """ | |
411 service = self.args.service | |
412 node = self.args.node | |
413 item = self.args.item | |
414 last_item = self.args.last_item | |
415 | |
416 if self.args.current: | |
417 # user wants to continue current draft | |
418 content_file_path = self.get_current_file(self.profile) | |
419 self.disp("Continuing edition of current draft", 2) | |
420 content_file_obj = content_file_path.open("r+b") | |
421 # we seek at the end of file in case of an item already exist | |
422 # this will write content of the existing item at the end of the draft. | |
423 # This way no data should be lost. | |
424 content_file_obj.seek(0, os.SEEK_END) | |
425 elif self.args.draft_path: | |
426 # there is an existing draft that we use | |
427 content_file_path = self.args.draft_path.expanduser() | |
428 content_file_obj = content_file_path.open("r+b") | |
429 # we seek at the end for the same reason as above | |
430 content_file_obj.seek(0, os.SEEK_END) | |
431 else: | |
432 # we need a temporary file | |
433 content_file_obj, content_file_path = self.get_tmp_file() | |
434 | |
435 if item or last_item: | |
436 self.disp("Editing requested published item", 2) | |
437 try: | |
438 if self.use_metadata: | |
439 content, metadata, item = await self.get_item_data(service, node, item) | |
440 else: | |
441 content, item = await self.get_item_data(service, node, item) | |
442 except Exception as e: | |
443 # FIXME: ugly but we have not good may to check errors in bridge | |
444 if "item-not-found" in str(e): | |
445 # item doesn't exist, we create a new one with requested id | |
446 metadata = None | |
447 if last_item: | |
448 self.disp(_("no item found at all, we create a new one"), 2) | |
449 else: | |
450 self.disp( | |
451 _( | |
452 'item "{item}" not found, we create a new item with' | |
453 "this id" | |
454 ).format(item=item), | |
455 2, | |
456 ) | |
457 content_file_obj.seek(0) | |
458 else: | |
459 self.disp(f"Error while retrieving item: {e}") | |
460 self.host.quit(C.EXIT_ERROR) | |
461 else: | |
462 # item exists, we write content | |
463 if content_file_obj.tell() != 0: | |
464 # we already have a draft, | |
465 # we copy item content after it and add an indicator | |
466 content_file_obj.write("\n*****\n") | |
467 content_file_obj.write(content.encode("utf-8")) | |
468 content_file_obj.seek(0) | |
469 self.disp(_('item "{item}" found, we edit it').format(item=item), 2) | |
470 else: | |
471 self.disp("Editing a new item", 2) | |
472 if self.use_metadata: | |
473 metadata = None | |
474 | |
475 if self.use_metadata: | |
476 return service, node, item, content_file_path, content_file_obj, metadata | |
477 else: | |
478 return service, node, item, content_file_path, content_file_obj | |
479 | |
480 | |
481 class Table(object): | |
482 def __init__(self, host, data, headers=None, filters=None, use_buffer=False): | |
483 """ | |
484 @param data(iterable[list]): table data | |
485 all lines must have the same number of columns | |
486 @param headers(iterable[unicode], None): names/titles of the columns | |
487 if not None, must have same number of columns as data | |
488 @param filters(iterable[(callable, unicode)], None): values filters | |
489 the callable will get 2 arguments: | |
490 - current column value | |
491 - RowData with all columns values | |
492 if may also only use 1 argument, which will then be current col value. | |
493 the callable must return a string | |
494 if it's unicode, it will be used with .format and must countain u'{}' which | |
495 will be replaced with the string. | |
496 if not None, must have same number of columns as data | |
497 @param use_buffer(bool): if True, bufferise output instead of printing it directly | |
498 """ | |
499 self.host = host | |
500 self._buffer = [] if use_buffer else None | |
501 # headers are columns names/titles, can be None | |
502 self.headers = headers | |
503 # sizes fof columns without headers, | |
504 # headers may be larger | |
505 self.sizes = [] | |
506 # rows countains one list per row with columns values | |
507 self.rows = [] | |
508 | |
509 size = None | |
510 if headers: | |
511 # we use a namedtuple to make the value easily accessible from filters | |
512 headers_safe = [re.sub(r"[^a-zA-Z_]", "_", h) for h in headers] | |
513 row_cls = namedtuple("RowData", headers_safe) | |
514 else: | |
515 row_cls = tuple | |
516 | |
517 for row_data in data: | |
518 new_row = [] | |
519 row_data_list = list(row_data) | |
520 for idx, value in enumerate(row_data_list): | |
521 if filters is not None and filters[idx] is not None: | |
522 filter_ = filters[idx] | |
523 if isinstance(filter_, str): | |
524 col_value = filter_.format(value) | |
525 else: | |
526 try: | |
527 col_value = filter_(value, row_cls(*row_data_list)) | |
528 except TypeError: | |
529 col_value = filter_(value) | |
530 # we count size without ANSI code as they will change length of the | |
531 # string when it's mostly style/color changes. | |
532 col_size = len(regex.ansi_remove(col_value)) | |
533 else: | |
534 col_value = str(value) | |
535 col_size = len(col_value) | |
536 new_row.append(col_value) | |
537 if size is None: | |
538 self.sizes.append(col_size) | |
539 else: | |
540 self.sizes[idx] = max(self.sizes[idx], col_size) | |
541 if size is None: | |
542 size = len(new_row) | |
543 if headers is not None and len(headers) != size: | |
544 raise exceptions.DataError("headers size is not coherent with rows") | |
545 else: | |
546 if len(new_row) != size: | |
547 raise exceptions.DataError("rows size is not coherent") | |
548 self.rows.append(new_row) | |
549 | |
550 if not data and headers is not None: | |
551 # the table is empty, we print headers at their lenght | |
552 self.sizes = [len(h) for h in headers] | |
553 | |
554 @property | |
555 def string(self): | |
556 if self._buffer is None: | |
557 raise exceptions.InternalError("buffer must be used to get a string") | |
558 return "\n".join(self._buffer) | |
559 | |
560 @staticmethod | |
561 def read_dict_values(data, keys, defaults=None): | |
562 if defaults is None: | |
563 defaults = {} | |
564 for key in keys: | |
565 try: | |
566 yield data[key] | |
567 except KeyError as e: | |
568 default = defaults.get(key) | |
569 if default is not None: | |
570 yield default | |
571 else: | |
572 raise e | |
573 | |
574 @classmethod | |
575 def from_list_dict( | |
576 cls, host, data, keys=None, headers=None, filters=None, defaults=None | |
577 ): | |
578 """Create a table from a list of dictionaries | |
579 | |
580 each dictionary is a row of the table, keys being columns names. | |
581 the whole data will be read and kept into memory, to be printed | |
582 @param data(list[dict[unicode, unicode]]): data to create the table from | |
583 @param keys(iterable[unicode], None): keys to get | |
584 if None, all keys will be used | |
585 @param headers(iterable[unicode], None): name of the columns | |
586 names must be in same order as keys | |
587 @param filters(dict[unicode, (callable,unicode)), None): filter to use on values | |
588 keys correspond to keys to filter, and value is the same as for Table.__init__ | |
589 @param defaults(dict[unicode, unicode]): default value to use | |
590 if None, an exception will be raised if not value is found | |
591 """ | |
592 if keys is None and headers is not None: | |
593 # FIXME: keys are not needed with OrderedDict, | |
594 raise exceptions.DataError("You must specify keys order to used headers") | |
595 if keys is None: | |
596 keys = list(data[0].keys()) | |
597 if headers is None: | |
598 headers = keys | |
599 if filters is None: | |
600 filters = {} | |
601 filters = [filters.get(k) for k in keys] | |
602 return cls( | |
603 host, (cls.read_dict_values(d, keys, defaults) for d in data), headers, filters | |
604 ) | |
605 | |
606 def _headers(self, head_sep, headers, sizes, alignment="left", style=None): | |
607 """Render headers | |
608 | |
609 @param head_sep(unicode): sequence to use as separator | |
610 @param alignment(unicode): how to align, can be left, center or right | |
611 @param style(unicode, iterable[unicode], None): ANSI escape sequences to apply | |
612 @param headers(list[unicode]): headers to show | |
613 @param sizes(list[int]): sizes of columns | |
614 """ | |
615 rendered_headers = [] | |
616 if isinstance(style, str): | |
617 style = [style] | |
618 for idx, header in enumerate(headers): | |
619 size = sizes[idx] | |
620 if alignment == "left": | |
621 rendered = header[:size].ljust(size) | |
622 elif alignment == "center": | |
623 rendered = header[:size].center(size) | |
624 elif alignment == "right": | |
625 rendered = header[:size].rjust(size) | |
626 else: | |
627 raise exceptions.InternalError("bad alignment argument") | |
628 if style: | |
629 args = style + [rendered] | |
630 rendered = A.color(*args) | |
631 rendered_headers.append(rendered) | |
632 return head_sep.join(rendered_headers) | |
633 | |
634 def _disp(self, data): | |
635 """output data (can be either bufferised or printed)""" | |
636 if self._buffer is not None: | |
637 self._buffer.append(data) | |
638 else: | |
639 self.host.disp(data) | |
640 | |
641 def display( | |
642 self, | |
643 head_alignment="left", | |
644 columns_alignment="left", | |
645 head_style=None, | |
646 show_header=True, | |
647 show_borders=True, | |
648 hide_cols=None, | |
649 col_sep=" │ ", | |
650 top_left="┌", | |
651 top="─", | |
652 top_sep="─┬─", | |
653 top_right="┐", | |
654 left="│", | |
655 right=None, | |
656 head_sep=None, | |
657 head_line="┄", | |
658 head_line_left="├", | |
659 head_line_sep="┄┼┄", | |
660 head_line_right="┤", | |
661 bottom_left="└", | |
662 bottom=None, | |
663 bottom_sep="─┴─", | |
664 bottom_right="┘", | |
665 ): | |
666 """Print the table | |
667 | |
668 @param show_header(bool): True if header need no be shown | |
669 @param show_borders(bool): True if borders need no be shown | |
670 @param hide_cols(None, iterable(unicode)): columns which should not be displayed | |
671 @param head_alignment(unicode): how to align headers, can be left, center or right | |
672 @param columns_alignment(unicode): how to align columns, can be left, center or | |
673 right | |
674 @param col_sep(unicode): separator betweens columns | |
675 @param head_line(unicode): character to use to make line under head | |
676 @param disp(callable, None): method to use to display the table | |
677 None to use self.host.disp | |
678 """ | |
679 if not self.sizes: | |
680 # the table is empty | |
681 return | |
682 col_sep_size = len(regex.ansi_remove(col_sep)) | |
683 | |
684 # if we have columns to hide, we remove them from headers and size | |
685 if not hide_cols: | |
686 headers = self.headers | |
687 sizes = self.sizes | |
688 else: | |
689 headers = list(self.headers) | |
690 sizes = self.sizes[:] | |
691 ignore_idx = [headers.index(to_hide) for to_hide in hide_cols] | |
692 for to_hide in hide_cols: | |
693 hide_idx = headers.index(to_hide) | |
694 del headers[hide_idx] | |
695 del sizes[hide_idx] | |
696 | |
697 if right is None: | |
698 right = left | |
699 if top_sep is None: | |
700 top_sep = col_sep_size * top | |
701 if head_sep is None: | |
702 head_sep = col_sep | |
703 if bottom is None: | |
704 bottom = top | |
705 if bottom_sep is None: | |
706 bottom_sep = col_sep_size * bottom | |
707 if not show_borders: | |
708 left = right = head_line_left = head_line_right = "" | |
709 # top border | |
710 if show_borders: | |
711 self._disp( | |
712 top_left + top_sep.join([top * size for size in sizes]) + top_right | |
713 ) | |
714 | |
715 # headers | |
716 if show_header and self.headers is not None: | |
717 self._disp( | |
718 left | |
719 + self._headers(head_sep, headers, sizes, head_alignment, head_style) | |
720 + right | |
721 ) | |
722 # header line | |
723 self._disp( | |
724 head_line_left | |
725 + head_line_sep.join([head_line * size for size in sizes]) | |
726 + head_line_right | |
727 ) | |
728 | |
729 # content | |
730 if columns_alignment == "left": | |
731 alignment = lambda idx, s: ansi_ljust(s, sizes[idx]) | |
732 elif columns_alignment == "center": | |
733 alignment = lambda idx, s: ansi_center(s, sizes[idx]) | |
734 elif columns_alignment == "right": | |
735 alignment = lambda idx, s: ansi_rjust(s, sizes[idx]) | |
736 else: | |
737 raise exceptions.InternalError("bad columns alignment argument") | |
738 | |
739 for row in self.rows: | |
740 if hide_cols: | |
741 row = [v for idx, v in enumerate(row) if idx not in ignore_idx] | |
742 self._disp( | |
743 left | |
744 + col_sep.join([alignment(idx, c) for idx, c in enumerate(row)]) | |
745 + right | |
746 ) | |
747 | |
748 if show_borders: | |
749 # bottom border | |
750 self._disp( | |
751 bottom_left | |
752 + bottom_sep.join([bottom * size for size in sizes]) | |
753 + bottom_right | |
754 ) | |
755 # we return self so string can be used after display (table.display().string) | |
756 return self | |
757 | |
758 def display_blank(self, **kwargs): | |
759 """Display table without visible borders""" | |
760 kwargs_ = {"col_sep": " ", "head_line_sep": " ", "show_borders": False} | |
761 kwargs_.update(kwargs) | |
762 return self.display(**kwargs_) | |
763 | |
764 | |
765 async def fill_well_known_uri(command, path, key, meta_map=None): | |
766 """Look for URIs in well-known location and fill appropriate args if suitable | |
767 | |
768 @param command(CommandBase): command instance | |
769 args of this instance will be updated with found values | |
770 @param path(unicode): absolute path to use as a starting point to look for URIs | |
771 @param key(unicode): key to look for | |
772 @param meta_map(dict, None): if not None, map metadata to arg name | |
773 key is metadata used attribute name | |
774 value is name to actually use, or None to ignore | |
775 use empty dict to only retrieve URI | |
776 possible keys are currently: | |
777 - labels | |
778 """ | |
779 args = command.args | |
780 if args.service or args.node: | |
781 # we only look for URIs if a service and a node are not already specified | |
782 return | |
783 | |
784 host = command.host | |
785 | |
786 try: | |
787 uris_data = await host.bridge.uri_find(path, [key]) | |
788 except Exception as e: | |
789 host.disp(f"can't find {key} URI: {e}", error=True) | |
790 host.quit(C.EXIT_BRIDGE_ERRBACK) | |
791 | |
792 try: | |
793 uri_data = uris_data[key] | |
794 except KeyError: | |
795 host.disp( | |
796 _( | |
797 "No {key} URI specified for this project, please specify service and " | |
798 "node" | |
799 ).format(key=key), | |
800 error=True, | |
801 ) | |
802 host.quit(C.EXIT_NOT_FOUND) | |
803 | |
804 uri = uri_data["uri"] | |
805 | |
806 # set extra metadata if they are specified | |
807 for data_key in ["labels"]: | |
808 new_values_json = uri_data.get(data_key) | |
809 if uri_data is not None: | |
810 if meta_map is None: | |
811 dest = data_key | |
812 else: | |
813 dest = meta_map.get(data_key) | |
814 if dest is None: | |
815 continue | |
816 | |
817 try: | |
818 values = getattr(args, data_key) | |
819 except AttributeError: | |
820 raise exceptions.InternalError(f"there is no {data_key!r} arguments") | |
821 else: | |
822 if values is None: | |
823 values = [] | |
824 values.extend(json.loads(new_values_json)) | |
825 setattr(args, dest, values) | |
826 | |
827 parsed_uri = xmpp_uri.parse_xmpp_uri(uri) | |
828 try: | |
829 args.service = parsed_uri["path"] | |
830 args.node = parsed_uri["node"] | |
831 except KeyError: | |
832 host.disp(_("Invalid URI found: {uri}").format(uri=uri), error=True) | |
833 host.quit(C.EXIT_DATA_ERROR) |