comparison libervia/frontends/jp/common.py @ 4074:26b7ed2817da

refactoring: rename `sat_frontends` to `libervia.frontends`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 14:12:38 +0200
parents sat_frontends/jp/common.py@4b842c1fb686
children
comparison
equal deleted inserted replaced
4073:7c5654c54fed 4074:26b7ed2817da
1 #!/usr/bin/env python3
2
3
4 # jp: a SàT command line tool
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 import json
21 import os
22 import os.path
23 import time
24 import tempfile
25 import asyncio
26 import shlex
27 import re
28 from pathlib import Path
29 from libervia.frontends.jp.constants import Const as C
30 from libervia.backend.core.i18n import _
31 from libervia.backend.core import exceptions
32 from libervia.backend.tools.common import regex
33 from libervia.backend.tools.common.ansi import ANSI as A
34 from libervia.backend.tools.common import uri as xmpp_uri
35 from libervia.backend.tools import config
36 from configparser import NoSectionError, NoOptionError
37 from collections import namedtuple
38
39 # default arguments used for some known editors (editing with metadata)
40 VIM_SPLIT_ARGS = "-c 'set nospr|vsplit|wincmd w|next|wincmd w'"
41 EMACS_SPLIT_ARGS = '--eval "(split-window-horizontally)"'
42 EDITOR_ARGS_MAGIC = {
43 "vim": VIM_SPLIT_ARGS + " {content_file} {metadata_file}",
44 "nvim": VIM_SPLIT_ARGS + " {content_file} {metadata_file}",
45 "gvim": VIM_SPLIT_ARGS + " --nofork {content_file} {metadata_file}",
46 "emacs": EMACS_SPLIT_ARGS + " {content_file} {metadata_file}",
47 "xemacs": EMACS_SPLIT_ARGS + " {content_file} {metadata_file}",
48 "nano": " -F {content_file} {metadata_file}",
49 }
50
51 SECURE_UNLINK_MAX = 10
52 SECURE_UNLINK_DIR = ".backup"
53 METADATA_SUFF = "_metadata.json"
54
55
56 def format_time(timestamp):
57 """Return formatted date for timestamp
58
59 @param timestamp(str,int,float): unix timestamp
60 @return (unicode): formatted date
61 """
62 fmt = "%d/%m/%Y %H:%M:%S %Z"
63 return time.strftime(fmt, time.localtime(float(timestamp)))
64
65
66 def ansi_ljust(s, width):
67 """ljust method handling ANSI escape codes"""
68 cleaned = regex.ansi_remove(s)
69 return s + " " * (width - len(cleaned))
70
71
72 def ansi_center(s, width):
73 """ljust method handling ANSI escape codes"""
74 cleaned = regex.ansi_remove(s)
75 diff = width - len(cleaned)
76 half = diff / 2
77 return half * " " + s + (half + diff % 2) * " "
78
79
80 def ansi_rjust(s, width):
81 """ljust method handling ANSI escape codes"""
82 cleaned = regex.ansi_remove(s)
83 return " " * (width - len(cleaned)) + s
84
85
86 def get_tmp_dir(sat_conf, cat_dir, sub_dir=None):
87 """Return directory used to store temporary files
88
89 @param sat_conf(ConfigParser.ConfigParser): instance opened on sat configuration
90 @param cat_dir(str): directory of the category (e.g. "blog")
91 @param sub_dir(str): sub directory where data need to be put
92 profile can be used here, or special directory name
93 sub_dir will be escaped to be usable in path (use regex.path_unescape to find
94 initial str)
95 @return (Path): path to the dir
96 """
97 local_dir = config.config_get(sat_conf, "", "local_dir", Exception)
98 path_elts = [local_dir, cat_dir]
99 if sub_dir is not None:
100 path_elts.append(regex.path_escape(sub_dir))
101 return Path(*path_elts)
102
103
104 def parse_args(host, cmd_line, **format_kw):
105 """Parse command arguments
106
107 @param cmd_line(unicode): command line as found in sat.conf
108 @param format_kw: keywords used for formating
109 @return (list(unicode)): list of arguments to pass to subprocess function
110 """
111 try:
112 # we split the arguments and add the known fields
113 # we split arguments first to avoid escaping issues in file names
114 return [a.format(**format_kw) for a in shlex.split(cmd_line)]
115 except ValueError as e:
116 host.disp(
117 "Couldn't parse editor cmd [{cmd}]: {reason}".format(cmd=cmd_line, reason=e)
118 )
119 return []
120
121
122 class BaseEdit(object):
123 """base class for editing commands
124
125 This class allows to edit file for PubSub or something else.
126 It works with temporary files in SàT local_dir, in a "cat_dir" subdir
127 """
128
129 def __init__(self, host, cat_dir, use_metadata=False):
130 """
131 @param sat_conf(ConfigParser.ConfigParser): instance opened on sat configuration
132 @param cat_dir(unicode): directory to use for drafts
133 this will be a sub-directory of SàT's local_dir
134 @param use_metadata(bool): True is edition need a second file for metadata
135 most of signature change with use_metadata with an additional metadata
136 argument.
137 This is done to raise error if a command needs metadata but forget the flag,
138 and vice versa
139 """
140 self.host = host
141 self.cat_dir = cat_dir
142 self.use_metadata = use_metadata
143
144 def secure_unlink(self, path):
145 """Unlink given path after keeping it for a while
146
147 This method is used to prevent accidental deletion of a draft
148 If there are more file in SECURE_UNLINK_DIR than SECURE_UNLINK_MAX,
149 older file are deleted
150 @param path(Path, str): file to unlink
151 """
152 path = Path(path).resolve()
153 if not path.is_file:
154 raise OSError("path must link to a regular file")
155 if path.parent != get_tmp_dir(self.sat_conf, self.cat_dir):
156 self.disp(
157 f"File {path} is not in SàT temporary hierarchy, we do not remove " f"it",
158 2,
159 )
160 return
161 # we have 2 files per draft with use_metadata, so we double max
162 unlink_max = SECURE_UNLINK_MAX * 2 if self.use_metadata else SECURE_UNLINK_MAX
163 backup_dir = get_tmp_dir(self.sat_conf, self.cat_dir, SECURE_UNLINK_DIR)
164 if not os.path.exists(backup_dir):
165 os.makedirs(backup_dir)
166 filename = os.path.basename(path)
167 backup_path = os.path.join(backup_dir, filename)
168 # we move file to backup dir
169 self.host.disp(
170 "Backuping file {src} to {dst}".format(src=path, dst=backup_path),
171 1,
172 )
173 os.rename(path, backup_path)
174 # and if we exceeded the limit, we remove older file
175 backup_files = [os.path.join(backup_dir, f) for f in os.listdir(backup_dir)]
176 if len(backup_files) > unlink_max:
177 backup_files.sort(key=lambda path: os.stat(path).st_mtime)
178 for path in backup_files[: len(backup_files) - unlink_max]:
179 self.host.disp("Purging backup file {}".format(path), 2)
180 os.unlink(path)
181
182 async def run_editor(
183 self,
184 editor_args_opt,
185 content_file_path,
186 content_file_obj,
187 meta_file_path=None,
188 meta_ori=None,
189 ):
190 """Run editor to edit content and metadata
191
192 @param editor_args_opt(unicode): option in [jp] section in configuration for
193 specific args
194 @param content_file_path(str): path to the content file
195 @param content_file_obj(file): opened file instance
196 @param meta_file_path(str, Path, None): metadata file path
197 if None metadata will not be used
198 @param meta_ori(dict, None): original cotent of metadata
199 can't be used if use_metadata is False
200 """
201 if not self.use_metadata:
202 assert meta_file_path is None
203 assert meta_ori is None
204
205 # we calculate hashes to check for modifications
206 import hashlib
207
208 content_file_obj.seek(0)
209 tmp_ori_hash = hashlib.sha1(content_file_obj.read()).digest()
210 content_file_obj.close()
211
212 # we prepare arguments
213 editor = config.config_get(self.sat_conf, C.CONFIG_SECTION, "editor") or os.getenv(
214 "EDITOR", "vi"
215 )
216 try:
217 # is there custom arguments in sat.conf ?
218 editor_args = config.config_get(
219 self.sat_conf, C.CONFIG_SECTION, editor_args_opt, Exception
220 )
221 except (NoOptionError, NoSectionError):
222 # no, we check if we know the editor and have special arguments
223 if self.use_metadata:
224 editor_args = EDITOR_ARGS_MAGIC.get(os.path.basename(editor), "")
225 else:
226 editor_args = ""
227 parse_kwargs = {"content_file": content_file_path}
228 if self.use_metadata:
229 parse_kwargs["metadata_file"] = meta_file_path
230 args = parse_args(self.host, editor_args, **parse_kwargs)
231 if not args:
232 args = [content_file_path]
233
234 # actual editing
235 editor_process = await asyncio.create_subprocess_exec(
236 editor, *[str(a) for a in args]
237 )
238 editor_exit = await editor_process.wait()
239
240 # edition will now be checked, and data will be sent if it was a success
241 if editor_exit != 0:
242 self.disp(
243 f"Editor exited with an error code, so temporary file has not be "
244 f"deleted, and item is not published.\nYou can find temporary file "
245 f"at {content_file_path}",
246 error=True,
247 )
248 else:
249 # main content
250 try:
251 with content_file_path.open("rb") as f:
252 content = f.read()
253 except (OSError, IOError):
254 self.disp(
255 f"Can read file at {content_file_path}, have it been deleted?\n"
256 f"Cancelling edition",
257 error=True,
258 )
259 self.host.quit(C.EXIT_NOT_FOUND)
260
261 # metadata
262 if self.use_metadata:
263 try:
264 with meta_file_path.open("rb") as f:
265 metadata = json.load(f)
266 except (OSError, IOError):
267 self.disp(
268 f"Can read file at {meta_file_path}, have it been deleted?\n"
269 f"Cancelling edition",
270 error=True,
271 )
272 self.host.quit(C.EXIT_NOT_FOUND)
273 except ValueError:
274 self.disp(
275 f"Can't parse metadata, please check it is correct JSON format. "
276 f"Cancelling edition.\nYou can find tmp file at "
277 f"{content_file_path} and temporary meta file at "
278 f"{meta_file_path}.",
279 error=True,
280 )
281 self.host.quit(C.EXIT_DATA_ERROR)
282
283 if self.use_metadata and not metadata.get("publish", True):
284 self.disp(
285 f'Publication blocked by "publish" key in metadata, cancelling '
286 f"edition.\n\ntemporary file path:\t{content_file_path}\nmetadata "
287 f"file path:\t{meta_file_path}",
288 error=True,
289 )
290 self.host.quit()
291
292 if len(content) == 0:
293 self.disp("Content is empty, cancelling the edition")
294 if content_file_path.parent != get_tmp_dir(self.sat_conf, self.cat_dir):
295 self.disp(
296 "File are not in SàT temporary hierarchy, we do not remove them",
297 2,
298 )
299 self.host.quit()
300 self.disp(f"Deletion of {content_file_path}", 2)
301 os.unlink(content_file_path)
302 if self.use_metadata:
303 self.disp(f"Deletion of {meta_file_path}".format(meta_file_path), 2)
304 os.unlink(meta_file_path)
305 self.host.quit()
306
307 # time to re-check the hash
308 elif tmp_ori_hash == hashlib.sha1(content).digest() and (
309 not self.use_metadata or meta_ori == metadata
310 ):
311 self.disp("The content has not been modified, cancelling the edition")
312 self.host.quit()
313
314 else:
315 # we can now send the item
316 content = content.decode("utf-8-sig") # we use utf-8-sig to avoid BOM
317 try:
318 if self.use_metadata:
319 await self.publish(content, metadata)
320 else:
321 await self.publish(content)
322 except Exception as e:
323 if self.use_metadata:
324 self.disp(
325 f"Error while sending your item, the temporary files have "
326 f"been kept at {content_file_path} and {meta_file_path}: "
327 f"{e}",
328 error=True,
329 )
330 else:
331 self.disp(
332 f"Error while sending your item, the temporary file has been "
333 f"kept at {content_file_path}: {e}",
334 error=True,
335 )
336 self.host.quit(1)
337
338 self.secure_unlink(content_file_path)
339 if self.use_metadata:
340 self.secure_unlink(meta_file_path)
341
342 async def publish(self, content):
343 # if metadata is needed, publish will be called with it last argument
344 raise NotImplementedError
345
346 def get_tmp_file(self):
347 """Create a temporary file
348
349 @return (tuple(file, Path)): opened (w+b) file object and file path
350 """
351 suff = "." + self.get_tmp_suff()
352 cat_dir_str = self.cat_dir
353 tmp_dir = get_tmp_dir(self.sat_conf, self.cat_dir, self.profile)
354 if not tmp_dir.exists():
355 try:
356 tmp_dir.mkdir(parents=True)
357 except OSError as e:
358 self.disp(
359 f"Can't create {tmp_dir} directory: {e}",
360 error=True,
361 )
362 self.host.quit(1)
363 try:
364 fd, path = tempfile.mkstemp(
365 suffix=suff,
366 prefix=time.strftime(cat_dir_str + "_%Y-%m-%d_%H:%M:%S_"),
367 dir=tmp_dir,
368 text=True,
369 )
370 return os.fdopen(fd, "w+b"), Path(path)
371 except OSError as e:
372 self.disp(f"Can't create temporary file: {e}", error=True)
373 self.host.quit(1)
374
375 def get_current_file(self, profile):
376 """Get most recently edited file
377
378 @param profile(unicode): profile linked to the draft
379 @return(Path): full path of current file
380 """
381 # we guess the item currently edited by choosing
382 # the most recent file corresponding to temp file pattern
383 # in tmp_dir, excluding metadata files
384 tmp_dir = get_tmp_dir(self.sat_conf, self.cat_dir, profile)
385 available = [
386 p
387 for p in tmp_dir.glob(f"{self.cat_dir}_*")
388 if not p.match(f"*{METADATA_SUFF}")
389 ]
390 if not available:
391 self.disp(
392 f"Could not find any content draft in {tmp_dir}",
393 error=True,
394 )
395 self.host.quit(1)
396 return max(available, key=lambda p: p.stat().st_mtime)
397
398 async def get_item_data(self, service, node, item):
399 """return formatted content, metadata (or not if use_metadata is false), and item id"""
400 raise NotImplementedError
401
402 def get_tmp_suff(self):
403 """return suffix used for content file"""
404 return "xml"
405
406 async def get_item_path(self):
407 """Retrieve item path (i.e. service and node) from item argument
408
409 This method is obviously only useful for edition of PubSub based features
410 """
411 service = self.args.service
412 node = self.args.node
413 item = self.args.item
414 last_item = self.args.last_item
415
416 if self.args.current:
417 # user wants to continue current draft
418 content_file_path = self.get_current_file(self.profile)
419 self.disp("Continuing edition of current draft", 2)
420 content_file_obj = content_file_path.open("r+b")
421 # we seek at the end of file in case of an item already exist
422 # this will write content of the existing item at the end of the draft.
423 # This way no data should be lost.
424 content_file_obj.seek(0, os.SEEK_END)
425 elif self.args.draft_path:
426 # there is an existing draft that we use
427 content_file_path = self.args.draft_path.expanduser()
428 content_file_obj = content_file_path.open("r+b")
429 # we seek at the end for the same reason as above
430 content_file_obj.seek(0, os.SEEK_END)
431 else:
432 # we need a temporary file
433 content_file_obj, content_file_path = self.get_tmp_file()
434
435 if item or last_item:
436 self.disp("Editing requested published item", 2)
437 try:
438 if self.use_metadata:
439 content, metadata, item = await self.get_item_data(service, node, item)
440 else:
441 content, item = await self.get_item_data(service, node, item)
442 except Exception as e:
443 # FIXME: ugly but we have not good may to check errors in bridge
444 if "item-not-found" in str(e):
445 #  item doesn't exist, we create a new one with requested id
446 metadata = None
447 if last_item:
448 self.disp(_("no item found at all, we create a new one"), 2)
449 else:
450 self.disp(
451 _(
452 'item "{item}" not found, we create a new item with'
453 "this id"
454 ).format(item=item),
455 2,
456 )
457 content_file_obj.seek(0)
458 else:
459 self.disp(f"Error while retrieving item: {e}")
460 self.host.quit(C.EXIT_ERROR)
461 else:
462 # item exists, we write content
463 if content_file_obj.tell() != 0:
464 # we already have a draft,
465 # we copy item content after it and add an indicator
466 content_file_obj.write("\n*****\n")
467 content_file_obj.write(content.encode("utf-8"))
468 content_file_obj.seek(0)
469 self.disp(_('item "{item}" found, we edit it').format(item=item), 2)
470 else:
471 self.disp("Editing a new item", 2)
472 if self.use_metadata:
473 metadata = None
474
475 if self.use_metadata:
476 return service, node, item, content_file_path, content_file_obj, metadata
477 else:
478 return service, node, item, content_file_path, content_file_obj
479
480
481 class Table(object):
482 def __init__(self, host, data, headers=None, filters=None, use_buffer=False):
483 """
484 @param data(iterable[list]): table data
485 all lines must have the same number of columns
486 @param headers(iterable[unicode], None): names/titles of the columns
487 if not None, must have same number of columns as data
488 @param filters(iterable[(callable, unicode)], None): values filters
489 the callable will get 2 arguments:
490 - current column value
491 - RowData with all columns values
492 if may also only use 1 argument, which will then be current col value.
493 the callable must return a string
494 if it's unicode, it will be used with .format and must countain u'{}' which
495 will be replaced with the string.
496 if not None, must have same number of columns as data
497 @param use_buffer(bool): if True, bufferise output instead of printing it directly
498 """
499 self.host = host
500 self._buffer = [] if use_buffer else None
501 #  headers are columns names/titles, can be None
502 self.headers = headers
503 #  sizes fof columns without headers,
504 # headers may be larger
505 self.sizes = []
506 #  rows countains one list per row with columns values
507 self.rows = []
508
509 size = None
510 if headers:
511 # we use a namedtuple to make the value easily accessible from filters
512 headers_safe = [re.sub(r"[^a-zA-Z_]", "_", h) for h in headers]
513 row_cls = namedtuple("RowData", headers_safe)
514 else:
515 row_cls = tuple
516
517 for row_data in data:
518 new_row = []
519 row_data_list = list(row_data)
520 for idx, value in enumerate(row_data_list):
521 if filters is not None and filters[idx] is not None:
522 filter_ = filters[idx]
523 if isinstance(filter_, str):
524 col_value = filter_.format(value)
525 else:
526 try:
527 col_value = filter_(value, row_cls(*row_data_list))
528 except TypeError:
529 col_value = filter_(value)
530 # we count size without ANSI code as they will change length of the
531 # string when it's mostly style/color changes.
532 col_size = len(regex.ansi_remove(col_value))
533 else:
534 col_value = str(value)
535 col_size = len(col_value)
536 new_row.append(col_value)
537 if size is None:
538 self.sizes.append(col_size)
539 else:
540 self.sizes[idx] = max(self.sizes[idx], col_size)
541 if size is None:
542 size = len(new_row)
543 if headers is not None and len(headers) != size:
544 raise exceptions.DataError("headers size is not coherent with rows")
545 else:
546 if len(new_row) != size:
547 raise exceptions.DataError("rows size is not coherent")
548 self.rows.append(new_row)
549
550 if not data and headers is not None:
551 #  the table is empty, we print headers at their lenght
552 self.sizes = [len(h) for h in headers]
553
554 @property
555 def string(self):
556 if self._buffer is None:
557 raise exceptions.InternalError("buffer must be used to get a string")
558 return "\n".join(self._buffer)
559
560 @staticmethod
561 def read_dict_values(data, keys, defaults=None):
562 if defaults is None:
563 defaults = {}
564 for key in keys:
565 try:
566 yield data[key]
567 except KeyError as e:
568 default = defaults.get(key)
569 if default is not None:
570 yield default
571 else:
572 raise e
573
574 @classmethod
575 def from_list_dict(
576 cls, host, data, keys=None, headers=None, filters=None, defaults=None
577 ):
578 """Create a table from a list of dictionaries
579
580 each dictionary is a row of the table, keys being columns names.
581 the whole data will be read and kept into memory, to be printed
582 @param data(list[dict[unicode, unicode]]): data to create the table from
583 @param keys(iterable[unicode], None): keys to get
584 if None, all keys will be used
585 @param headers(iterable[unicode], None): name of the columns
586 names must be in same order as keys
587 @param filters(dict[unicode, (callable,unicode)), None): filter to use on values
588 keys correspond to keys to filter, and value is the same as for Table.__init__
589 @param defaults(dict[unicode, unicode]): default value to use
590 if None, an exception will be raised if not value is found
591 """
592 if keys is None and headers is not None:
593 # FIXME: keys are not needed with OrderedDict,
594 raise exceptions.DataError("You must specify keys order to used headers")
595 if keys is None:
596 keys = list(data[0].keys())
597 if headers is None:
598 headers = keys
599 if filters is None:
600 filters = {}
601 filters = [filters.get(k) for k in keys]
602 return cls(
603 host, (cls.read_dict_values(d, keys, defaults) for d in data), headers, filters
604 )
605
606 def _headers(self, head_sep, headers, sizes, alignment="left", style=None):
607 """Render headers
608
609 @param head_sep(unicode): sequence to use as separator
610 @param alignment(unicode): how to align, can be left, center or right
611 @param style(unicode, iterable[unicode], None): ANSI escape sequences to apply
612 @param headers(list[unicode]): headers to show
613 @param sizes(list[int]): sizes of columns
614 """
615 rendered_headers = []
616 if isinstance(style, str):
617 style = [style]
618 for idx, header in enumerate(headers):
619 size = sizes[idx]
620 if alignment == "left":
621 rendered = header[:size].ljust(size)
622 elif alignment == "center":
623 rendered = header[:size].center(size)
624 elif alignment == "right":
625 rendered = header[:size].rjust(size)
626 else:
627 raise exceptions.InternalError("bad alignment argument")
628 if style:
629 args = style + [rendered]
630 rendered = A.color(*args)
631 rendered_headers.append(rendered)
632 return head_sep.join(rendered_headers)
633
634 def _disp(self, data):
635 """output data (can be either bufferised or printed)"""
636 if self._buffer is not None:
637 self._buffer.append(data)
638 else:
639 self.host.disp(data)
640
641 def display(
642 self,
643 head_alignment="left",
644 columns_alignment="left",
645 head_style=None,
646 show_header=True,
647 show_borders=True,
648 hide_cols=None,
649 col_sep=" │ ",
650 top_left="┌",
651 top="─",
652 top_sep="─┬─",
653 top_right="┐",
654 left="│",
655 right=None,
656 head_sep=None,
657 head_line="┄",
658 head_line_left="├",
659 head_line_sep="┄┼┄",
660 head_line_right="┤",
661 bottom_left="└",
662 bottom=None,
663 bottom_sep="─┴─",
664 bottom_right="┘",
665 ):
666 """Print the table
667
668 @param show_header(bool): True if header need no be shown
669 @param show_borders(bool): True if borders need no be shown
670 @param hide_cols(None, iterable(unicode)): columns which should not be displayed
671 @param head_alignment(unicode): how to align headers, can be left, center or right
672 @param columns_alignment(unicode): how to align columns, can be left, center or
673 right
674 @param col_sep(unicode): separator betweens columns
675 @param head_line(unicode): character to use to make line under head
676 @param disp(callable, None): method to use to display the table
677 None to use self.host.disp
678 """
679 if not self.sizes:
680 # the table is empty
681 return
682 col_sep_size = len(regex.ansi_remove(col_sep))
683
684 # if we have columns to hide, we remove them from headers and size
685 if not hide_cols:
686 headers = self.headers
687 sizes = self.sizes
688 else:
689 headers = list(self.headers)
690 sizes = self.sizes[:]
691 ignore_idx = [headers.index(to_hide) for to_hide in hide_cols]
692 for to_hide in hide_cols:
693 hide_idx = headers.index(to_hide)
694 del headers[hide_idx]
695 del sizes[hide_idx]
696
697 if right is None:
698 right = left
699 if top_sep is None:
700 top_sep = col_sep_size * top
701 if head_sep is None:
702 head_sep = col_sep
703 if bottom is None:
704 bottom = top
705 if bottom_sep is None:
706 bottom_sep = col_sep_size * bottom
707 if not show_borders:
708 left = right = head_line_left = head_line_right = ""
709 # top border
710 if show_borders:
711 self._disp(
712 top_left + top_sep.join([top * size for size in sizes]) + top_right
713 )
714
715 # headers
716 if show_header and self.headers is not None:
717 self._disp(
718 left
719 + self._headers(head_sep, headers, sizes, head_alignment, head_style)
720 + right
721 )
722 # header line
723 self._disp(
724 head_line_left
725 + head_line_sep.join([head_line * size for size in sizes])
726 + head_line_right
727 )
728
729 # content
730 if columns_alignment == "left":
731 alignment = lambda idx, s: ansi_ljust(s, sizes[idx])
732 elif columns_alignment == "center":
733 alignment = lambda idx, s: ansi_center(s, sizes[idx])
734 elif columns_alignment == "right":
735 alignment = lambda idx, s: ansi_rjust(s, sizes[idx])
736 else:
737 raise exceptions.InternalError("bad columns alignment argument")
738
739 for row in self.rows:
740 if hide_cols:
741 row = [v for idx, v in enumerate(row) if idx not in ignore_idx]
742 self._disp(
743 left
744 + col_sep.join([alignment(idx, c) for idx, c in enumerate(row)])
745 + right
746 )
747
748 if show_borders:
749 # bottom border
750 self._disp(
751 bottom_left
752 + bottom_sep.join([bottom * size for size in sizes])
753 + bottom_right
754 )
755 #  we return self so string can be used after display (table.display().string)
756 return self
757
758 def display_blank(self, **kwargs):
759 """Display table without visible borders"""
760 kwargs_ = {"col_sep": " ", "head_line_sep": " ", "show_borders": False}
761 kwargs_.update(kwargs)
762 return self.display(**kwargs_)
763
764
765 async def fill_well_known_uri(command, path, key, meta_map=None):
766 """Look for URIs in well-known location and fill appropriate args if suitable
767
768 @param command(CommandBase): command instance
769 args of this instance will be updated with found values
770 @param path(unicode): absolute path to use as a starting point to look for URIs
771 @param key(unicode): key to look for
772 @param meta_map(dict, None): if not None, map metadata to arg name
773 key is metadata used attribute name
774 value is name to actually use, or None to ignore
775 use empty dict to only retrieve URI
776 possible keys are currently:
777 - labels
778 """
779 args = command.args
780 if args.service or args.node:
781 # we only look for URIs if a service and a node are not already specified
782 return
783
784 host = command.host
785
786 try:
787 uris_data = await host.bridge.uri_find(path, [key])
788 except Exception as e:
789 host.disp(f"can't find {key} URI: {e}", error=True)
790 host.quit(C.EXIT_BRIDGE_ERRBACK)
791
792 try:
793 uri_data = uris_data[key]
794 except KeyError:
795 host.disp(
796 _(
797 "No {key} URI specified for this project, please specify service and "
798 "node"
799 ).format(key=key),
800 error=True,
801 )
802 host.quit(C.EXIT_NOT_FOUND)
803
804 uri = uri_data["uri"]
805
806 # set extra metadata if they are specified
807 for data_key in ["labels"]:
808 new_values_json = uri_data.get(data_key)
809 if uri_data is not None:
810 if meta_map is None:
811 dest = data_key
812 else:
813 dest = meta_map.get(data_key)
814 if dest is None:
815 continue
816
817 try:
818 values = getattr(args, data_key)
819 except AttributeError:
820 raise exceptions.InternalError(f"there is no {data_key!r} arguments")
821 else:
822 if values is None:
823 values = []
824 values.extend(json.loads(new_values_json))
825 setattr(args, dest, values)
826
827 parsed_uri = xmpp_uri.parse_xmpp_uri(uri)
828 try:
829 args.service = parsed_uri["path"]
830 args.node = parsed_uri["node"]
831 except KeyError:
832 host.disp(_("Invalid URI found: {uri}").format(uri=uri), error=True)
833 host.quit(C.EXIT_DATA_ERROR)