Mercurial > libervia-backend
diff libervia/cli/base.py @ 4270:0d7bb4df2343
Reformatted code base using black.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 19 Jun 2024 18:44:57 +0200 |
parents | 5115976e1e3d |
children |
line wrap: on
line diff
--- a/libervia/cli/base.py Tue Jun 18 12:06:45 2024 +0200 +++ b/libervia/cli/base.py Wed Jun 19 18:44:57 2024 +0200 @@ -21,8 +21,8 @@ ### logging ### import logging as log -log.basicConfig(level=log.WARNING, - format='[%(name)s] %(message)s') + +log.basicConfig(level=log.WARNING, format="[%(name)s] %(message)s") ### import sys @@ -55,31 +55,36 @@ from rich import console ## bridge handling -# we get bridge name from conf and initialise the right class accordingly +# we get bridge name from conf and initialise the right class accordingly main_config = config.parse_main_conf() -bridge_name = config.config_get(main_config, '', 'bridge', 'dbus') +bridge_name = config.config_get(main_config, "", "bridge", "dbus") LiberviaCLILoop = get_libervia_cli_loop(bridge_name) try: import progressbar except ImportError: - msg = (_('ProgressBar not available, please download it at ' - 'http://pypi.python.org/pypi/progressbar\n' - 'Progress bar deactivated\n--\n')) + msg = _( + "ProgressBar not available, please download it at " + "http://pypi.python.org/pypi/progressbar\n" + "Progress bar deactivated\n--\n" + ) print(msg, file=sys.stderr) - progressbar=None + progressbar = None -#consts -DESCRIPTION = """This software is a command line tool for XMPP. -Get the latest version at """ + C.APP_URL +# consts +DESCRIPTION = ( + """This software is a command line tool for XMPP. +Get the latest version at """ + + C.APP_URL +) COPYLEFT = """Copyright (C) 2009-2024 Jérôme Poisson, Adrien Cossa This program comes with ABSOLUTELY NO WARRANTY; This is free software, and you are welcome to redistribute it under certain conditions. """ -PROGRESS_DELAY = 0.1 # the progression will be checked every PROGRESS_DELAY s +PROGRESS_DELAY = 0.1 # the progression will be checked every PROGRESS_DELAY s def date_decoder(arg): @@ -95,6 +100,7 @@ specify what kind of operation you want to perform. """ + def __init__(self): """ @@ -110,7 +116,7 @@ self.console = console.Console(theme=C.THEME_DEFAULT) self.sat_conf = main_config self.set_color_theme() - bridge_module = dynamic_import.bridge(bridge_name, 'libervia.frontends.bridge') + bridge_module = dynamic_import.bridge(bridge_name, "libervia.frontends.bridge") if bridge_module is None: log.error("Can't import {} bridge".format(bridge_name)) sys.exit(1) @@ -132,15 +138,18 @@ color_fg_bg = os.getenv("COLORFGBG") - if ((sys.stdin.isatty() and sys.stdout.isatty() - and ( - # XTerm - os.getenv("XTERM_VERSION") - # Konsole - or os.getenv("KONSOLE_VERSION") - # All VTE based terminals - or vte_version >= 3502 - ))): + if ( + sys.stdin.isatty() + and sys.stdout.isatty() + and ( + # XTerm + os.getenv("XTERM_VERSION") + # Konsole + or os.getenv("KONSOLE_VERSION") + # All VTE based terminals + or vte_version >= 3502 + ) + ): # ANSI escape sequence stdin_fd = sys.stdin.fileno() old_settings = termios.tcgetattr(stdin_fd) @@ -155,9 +164,9 @@ if ch != c: # background id is not supported, we default to "dark" # TODO: log something? - return 'dark' + return "dark" red, green, blue = [ - int(c, 16)/65535 for c in sys.stdin.read(14).split('/') + int(c, 16) / 65535 for c in sys.stdin.read(14).split("/") ] # '\a' is the last character sys.stdin.read(1) @@ -166,9 +175,9 @@ lum = utils.per_luminance(red, green, blue) if lum <= 0.5: - return 'dark' + return "dark" else: - return 'light' + return "light" elif color_fg_bg: # no luck with ANSI escape sequence, we try COLORFGBG environment variable try: @@ -184,17 +193,18 @@ return "dark" def set_color_theme(self): - background = self.get_config('background', default='auto') - if background == 'auto': + background = self.get_config("background", default="auto") + if background == "auto": background = self.guess_background() - if background not in ('dark', 'light'): - raise exceptions.ConfigError(_( - 'Invalid value set for "background" ({background}), please check ' - 'your settings in libervia.conf').format( - background=repr(background) - )) + if background not in ("dark", "light"): + raise exceptions.ConfigError( + _( + 'Invalid value set for "background" ({background}), please check ' + "your settings in libervia.conf" + ).format(background=repr(background)) + ) self.background = background - if background == 'light': + if background == "light": C.A_HEADER = A.FG_MAGENTA C.A_SUBHEADER = A.BOLD + A.FG_RED C.A_LEVEL_COLORS = (C.A_HEADER, A.BOLD + A.FG_BLUE, A.FG_MAGENTA, A.FG_CYAN) @@ -208,14 +218,16 @@ def _bridge_connected(self): self.parser = argparse.ArgumentParser( - formatter_class=argparse.RawDescriptionHelpFormatter, description=DESCRIPTION) + formatter_class=argparse.RawDescriptionHelpFormatter, description=DESCRIPTION + ) self._make_parents() self.add_parser_options() self.subparsers = self.parser.add_subparsers( - title=_('Available commands'), dest='command', required=True) + title=_("Available commands"), dest="command", required=True + ) # progress attributes - self._progress_id = None # TODO: manage several progress ids + self._progress_id = None # TODO: manage several progress ids self.quit_on_progress_end = True # outputs @@ -233,7 +245,7 @@ async def set_progress_id(self, progress_id): # because we use async, we need an explicit setter self._progress_id = progress_id - await self.replay_cache('progress_ids_cache') + await self.replay_cache("progress_ids_cache") @property def watch_progress(self): @@ -272,7 +284,7 @@ for cache_data in cache: await cache_data[0](*cache_data[1:]) - def disp(self, msg, verbosity=0, error=False, end='\n'): + def disp(self, msg, verbosity=0, error=False, end="\n"): """Print a message to user @param msg(unicode): message to print @@ -290,7 +302,7 @@ if name in extra_outputs: method = extra_outputs[name] else: - method = self._outputs[type_][name]['callback'] + method = self._outputs[type_][name]["callback"] ret = method(data) if inspect.isawaitable(ret): @@ -317,55 +329,83 @@ # we have a special case here as the start-session option is present only if # connection is not needed, so we create two similar parents, one with the # option, the other one without it - for parent_name in ('profile', 'profile_session'): + for parent_name in ("profile", "profile_session"): parent = self.parents[parent_name] = argparse.ArgumentParser(add_help=False) parent.add_argument( - "-p", "--profile", action="store", type=str, default='@DEFAULT@', - help=_("Use PROFILE profile key (default: %(default)s)")) + "-p", + "--profile", + action="store", + type=str, + default="@DEFAULT@", + help=_("Use PROFILE profile key (default: %(default)s)"), + ) parent.add_argument( - "--pwd", action="store", metavar='PASSWORD', - help=_("Password used to connect profile, if necessary")) + "--pwd", + action="store", + metavar="PASSWORD", + help=_("Password used to connect profile, if necessary"), + ) - profile_parent, profile_session_parent = (self.parents['profile'], - self.parents['profile_session']) + profile_parent, profile_session_parent = ( + self.parents["profile"], + self.parents["profile_session"], + ) connect_short, connect_long, connect_action, connect_help = ( - "-c", "--connect", "store_true", - _("Connect the profile before doing anything else") + "-c", + "--connect", + "store_true", + _("Connect the profile before doing anything else"), ) profile_parent.add_argument( - connect_short, connect_long, action=connect_action, help=connect_help) + connect_short, connect_long, action=connect_action, help=connect_help + ) - profile_session_connect_group = profile_session_parent.add_mutually_exclusive_group() - profile_session_connect_group.add_argument( - connect_short, connect_long, action=connect_action, help=connect_help) + profile_session_connect_group = ( + profile_session_parent.add_mutually_exclusive_group() + ) profile_session_connect_group.add_argument( - "--start-session", action="store_true", - help=_("Start a profile session without connecting")) + connect_short, connect_long, action=connect_action, help=connect_help + ) + profile_session_connect_group.add_argument( + "--start-session", + action="store_true", + help=_("Start a profile session without connecting"), + ) - progress_parent = self.parents['progress'] = argparse.ArgumentParser( - add_help=False) + progress_parent = self.parents["progress"] = argparse.ArgumentParser( + add_help=False + ) if progressbar: progress_parent.add_argument( - "-P", "--progress", action="store_true", help=_("Show progress bar")) + "-P", "--progress", action="store_true", help=_("Show progress bar") + ) - verbose_parent = self.parents['verbose'] = argparse.ArgumentParser(add_help=False) + verbose_parent = self.parents["verbose"] = argparse.ArgumentParser(add_help=False) verbose_parent.add_argument( - '--verbose', '-v', action='count', default=0, - help=_("Add a verbosity level (can be used multiple times)")) + "--verbose", + "-v", + action="count", + default=0, + help=_("Add a verbosity level (can be used multiple times)"), + ) - quiet_parent = self.parents['quiet'] = argparse.ArgumentParser(add_help=False) + quiet_parent = self.parents["quiet"] = argparse.ArgumentParser(add_help=False) quiet_parent.add_argument( - '--quiet', '-q', action='store_true', - help=_("be quiet (only output machine readable data)")) + "--quiet", + "-q", + action="store_true", + help=_("be quiet (only output machine readable data)"), + ) - draft_parent = self.parents['draft'] = argparse.ArgumentParser(add_help=False) - draft_group = draft_parent.add_argument_group(_('draft handling')) + draft_parent = self.parents["draft"] = argparse.ArgumentParser(add_help=False) + draft_group = draft_parent.add_argument_group(_("draft handling")) draft_group.add_argument( - "-D", "--current", action="store_true", help=_("load current draft")) + "-D", "--current", action="store_true", help=_("load current draft") + ) draft_group.add_argument( - "-F", "--draft-path", type=Path, help=_("path to a draft file to retrieve")) - + "-F", "--draft-path", type=Path, help=_("path to a draft file to retrieve") + ) def make_pubsub_group(self, flags, defaults): """Generate pubsub options according to flags @@ -378,75 +418,106 @@ """ flags = misc.FlagsHandler(flags) parent = argparse.ArgumentParser(add_help=False) - pubsub_group = parent.add_argument_group('pubsub') - pubsub_group.add_argument("-u", "--pubsub-url", - help=_("Pubsub URL (xmpp or http)")) + pubsub_group = parent.add_argument_group("pubsub") + pubsub_group.add_argument( + "-u", "--pubsub-url", help=_("Pubsub URL (xmpp or http)") + ) service_help = _("JID of the PubSub service") if not flags.service: - default = defaults.pop('service', _('PEP service')) + default = defaults.pop("service", _("PEP service")) if default is not None: service_help += _(" (DEFAULT: {default})".format(default=default)) - pubsub_group.add_argument("-s", "--service", default='', - help=service_help) + pubsub_group.add_argument("-s", "--service", default="", help=service_help) node_help = _("node to request") if not flags.node: - default = defaults.pop('node', _('standard node')) + default = defaults.pop("node", _("standard node")) if default is not None: node_help += _(" (DEFAULT: {default})".format(default=default)) - pubsub_group.add_argument("-n", "--node", default='', help=node_help) + pubsub_group.add_argument("-n", "--node", default="", help=node_help) if flags.single_item: - item_help = ("item to retrieve") + item_help = "item to retrieve" if not flags.item: - default = defaults.pop('item', _('last item')) + default = defaults.pop("item", _("last item")) if default is not None: item_help += _(" (DEFAULT: {default})".format(default=default)) - pubsub_group.add_argument("-i", "--item", default='', - help=item_help) + pubsub_group.add_argument("-i", "--item", default="", help=item_help) pubsub_group.add_argument( - "-L", "--last-item", action='store_true', help=_('retrieve last item')) + "-L", "--last-item", action="store_true", help=_("retrieve last item") + ) elif flags.multi_items: # mutiple items, this activate several features: max-items, RSM, MAM # and Orbder-by pubsub_group.add_argument( - "-i", "--item", action='append', dest='items', default=[], - help=_("items to retrieve (DEFAULT: all)")) + "-i", + "--item", + action="append", + dest="items", + default=[], + help=_("items to retrieve (DEFAULT: all)"), + ) if not flags.no_max: max_group = pubsub_group.add_mutually_exclusive_group() # XXX: defaut value for --max-items or --max is set in parse_pubsub_args max_group.add_argument( - "-M", "--max-items", dest="max", type=int, - help=_("maximum number of items to get ({no_limit} to get all items)" - .format(no_limit=C.NO_LIMIT))) + "-M", + "--max-items", + dest="max", + type=int, + help=_( + "maximum number of items to get ({no_limit} to get all items)".format( + no_limit=C.NO_LIMIT + ) + ), + ) # FIXME: it could be possible to no duplicate max (between pubsub # max-items and RSM max)should not be duplicated, RSM could be # used when available and pubsub max otherwise max_group.add_argument( - "-m", "--max", dest="rsm_max", type=int, - help=_("maximum number of items to get per page (DEFAULT: 10)")) + "-m", + "--max", + dest="rsm_max", + type=int, + help=_("maximum number of items to get per page (DEFAULT: 10)"), + ) # RSM rsm_page_group = pubsub_group.add_mutually_exclusive_group() rsm_page_group.add_argument( - "-a", "--after", dest="rsm_after", - help=_("find page after this item"), metavar='ITEM_ID') + "-a", + "--after", + dest="rsm_after", + help=_("find page after this item"), + metavar="ITEM_ID", + ) rsm_page_group.add_argument( - "-b", "--before", dest="rsm_before", - help=_("find page before this item"), metavar='ITEM_ID') + "-b", + "--before", + dest="rsm_before", + help=_("find page before this item"), + metavar="ITEM_ID", + ) rsm_page_group.add_argument( - "--index", dest="rsm_index", type=int, - help=_("index of the first item to retrieve")) - + "--index", + dest="rsm_index", + type=int, + help=_("index of the first item to retrieve"), + ) # MAM pubsub_group.add_argument( - "-f", "--filter", dest='mam_filters', nargs=2, - action='append', default=[], help=_("MAM filters to use"), - metavar=("FILTER_NAME", "VALUE") + "-f", + "--filter", + dest="mam_filters", + nargs=2, + action="append", + default=[], + help=_("MAM filters to use"), + metavar=("FILTER_NAME", "VALUE"), ) # Order-By @@ -456,57 +527,62 @@ # current specifications, as only "creation" and "modification" are # available) pubsub_group.add_argument( - "-o", "--order-by", choices=[C.ORDER_BY_CREATION, - C.ORDER_BY_MODIFICATION], - help=_("how items should be ordered")) + "-o", + "--order-by", + choices=[C.ORDER_BY_CREATION, C.ORDER_BY_MODIFICATION], + help=_("how items should be ordered"), + ) if flags[C.CACHE]: pubsub_group.add_argument( - "-C", "--no-cache", dest="use_cache", action='store_false', - help=_("don't use Pubsub cache") + "-C", + "--no-cache", + dest="use_cache", + action="store_false", + help=_("don't use Pubsub cache"), ) if not flags.all_used: - raise exceptions.InternalError('unknown flags: {flags}'.format( - flags=', '.join(flags.unused))) + raise exceptions.InternalError( + "unknown flags: {flags}".format(flags=", ".join(flags.unused)) + ) if defaults: - raise exceptions.InternalError(f'unused defaults: {defaults}') + raise exceptions.InternalError(f"unused defaults: {defaults}") return parent def add_parser_options(self): self.parser.add_argument( - '--version', - action='version', - version=("{name} {version} {copyleft}".format( - name = C.APP_NAME, - version = self.version, - copyleft = COPYLEFT)) + "--version", + action="version", + version=( + "{name} {version} {copyleft}".format( + name=C.APP_NAME, version=self.version, copyleft=COPYLEFT + ) + ), ) def register_output(self, type_, name, callback, description="", default=False): if type_ not in C.OUTPUT_TYPES: log.error("Invalid output type {}".format(type_)) return - self._outputs[type_][name] = {'callback': callback, - 'description': description - } + self._outputs[type_][name] = {"callback": callback, "description": description} if default: if type_ in self.default_output: self.disp( - _('there is already a default output for {type}, ignoring new one') - .format(type=type_) + _( + "there is already a default output for {type}, ignoring new one" + ).format(type=type_) ) else: self.default_output[type_] = name - def parse_output_options(self): options = self.command.args.output_opts options_dict = {} for option in options: try: - key, value = option.split('=', 1) + key, value = option.split("=", 1) except ValueError: key, value = option, None options_dict[key.strip()] = value.strip() if value is not None else None @@ -516,8 +592,10 @@ if not accepted_set.issuperset(options): self.disp( _("The following output options are invalid: {invalid_options}").format( - invalid_options = ', '.join(set(options).difference(accepted_set))), - error=True) + invalid_options=", ".join(set(options).difference(accepted_set)) + ), + error=True, + ) self.quit(C.EXIT_BAD_ARG) def import_plugins(self): @@ -527,11 +605,14 @@ """ path = os.path.dirname(libervia.cli.__file__) # XXX: outputs must be imported before commands as they are used for arguments - for type_, pattern in ((C.PLUGIN_OUTPUT, 'output_*.py'), - (C.PLUGIN_CMD, 'cmd_*.py')): + for type_, pattern in ( + (C.PLUGIN_OUTPUT, "output_*.py"), + (C.PLUGIN_CMD, "cmd_*.py"), + ): modules = ( os.path.splitext(module)[0] - for module in map(os.path.basename, iglob(os.path.join(path, pattern)))) + for module in map(os.path.basename, iglob(os.path.join(path, pattern))) + ) for module_name in modules: module_path = "libervia.cli." + module_name try: @@ -540,15 +621,21 @@ except exceptions.CancelError: continue except exceptions.MissingModule as e: - self.disp(_("Missing module for plugin {name}: {missing}".format( - name = module_path, - missing = e)), error=True) + self.disp( + _( + "Missing module for plugin {name}: {missing}".format( + name=module_path, missing=e + ) + ), + error=True, + ) except Exception as e: self.disp( - _("Can't import {module_path} plugin, ignoring it: {e}") - .format(module_path=module_path, e=e), - error=True) - + _("Can't import {module_path} plugin, ignoring it: {e}").format( + module_path=module_path, e=e + ), + error=True, + ) def import_plugin_module(self, module, type_): """add commands or outpus from a module to CLI frontend @@ -557,12 +644,14 @@ @param type_(str): one of C_PLUGIN_* """ try: - class_names = getattr(module, '__{}__'.format(type_)) + class_names = getattr(module, "__{}__".format(type_)) except AttributeError: log.disp( - _("Invalid plugin module [{type}] {module}") - .format(type=type_, module=module), - error=True) + _("Invalid plugin module [{type}] {module}").format( + type=type_, module=module + ), + error=True, + ) raise ImportError else: for class_name in class_names: @@ -571,12 +660,12 @@ def get_xmpp_uri_from_http(self, http_url): """parse HTML page at http(s) URL, and looks for xmpp: uri""" - if http_url.startswith('https'): - scheme = 'https' - elif http_url.startswith('http'): - scheme = 'http' + if http_url.startswith("https"): + scheme = "https" + elif http_url.startswith("http"): + scheme = "http" else: - raise exceptions.InternalError('An HTTP scheme is expected in this method') + raise exceptions.InternalError("An HTTP scheme is expected in this method") self.disp(f"{scheme.upper()} URL found, trying to find associated xmpp: URI", 1) # HTTP URL, we try to find xmpp: links try: @@ -584,10 +673,12 @@ except ImportError: self.disp( "lxml module must be installed to use http(s) scheme, please install it " - "with \"pip install lxml\"", - error=True) + 'with "pip install lxml"', + error=True, + ) self.quit(1) import urllib.request, urllib.error, urllib.parse + parser = etree.HTMLParser() try: root = etree.parse(urllib.request.urlopen(http_url), parser) @@ -598,33 +689,36 @@ links = root.xpath("//link[@rel='alternate' and starts-with(@href, 'xmpp:')]") if not links: self.disp( - _('Could not find alternate "xmpp:" URI, can\'t find associated XMPP ' - 'PubSub node/item'), - error=True) + _( + 'Could not find alternate "xmpp:" URI, can\'t find associated XMPP ' + "PubSub node/item" + ), + error=True, + ) self.quit(1) - xmpp_uri = links[0].get('href') + xmpp_uri = links[0].get("href") return xmpp_uri def parse_pubsub_args(self): if self.args.pubsub_url is not None: url = self.args.pubsub_url - if url.startswith('http'): - # http(s) URL, we try to retrieve xmpp one from there + if url.startswith("http"): + # http(s) URL, we try to retrieve xmpp one from there url = self.get_xmpp_uri_from_http(url) try: uri_data = uri.parse_xmpp_uri(url) except ValueError: - self.parser.error(_('invalid XMPP URL: {url}').format(url=url)) + self.parser.error(_("invalid XMPP URL: {url}").format(url=url)) else: - if uri_data['type'] == 'pubsub': + if uri_data["type"] == "pubsub": # URL is alright, we only set data not already set by other options if not self.args.service: - self.args.service = uri_data['path'] + self.args.service = uri_data["path"] if not self.args.node: - self.args.node = uri_data['node'] - uri_item = uri_data.get('item') + self.args.node = uri_data["node"] + uri_item = uri_data.get("item") if uri_item: # there is an item in URI # we use it only if item is not already set @@ -636,9 +730,12 @@ items = self.args.items except AttributeError: self.disp( - _("item specified in URL but not needed in command, " - "ignoring it"), - error=True) + _( + "item specified in URL but not needed in command, " + "ignoring it" + ), + error=True, + ) else: if not items: self.args.items = [uri_item] @@ -652,7 +749,7 @@ self.args.item = uri_item else: self.parser.error( - _('XMPP URL is not a pubsub one: {url}').format(url=url) + _("XMPP URL is not a pubsub one: {url}").format(url=url) ) flags = self.args._cmd._pubsub_flags # we check required arguments here instead of using add_arguments' required option @@ -669,7 +766,8 @@ try: if self.args.item and self.args.item_last: self.parser.error( - _("--item and --item-last can't be used at the same time")) + _("--item and --item-last can't be used at the same time") + ) except AttributeError: pass @@ -683,8 +781,13 @@ # to use pubsub's max or RSM's max. The later is used if any RSM or MAM # argument is set if max_items is None and rsm_max is None: - to_check = ('mam_filters', 'rsm_max', 'rsm_after', 'rsm_before', - 'rsm_index') + to_check = ( + "mam_filters", + "rsm_max", + "rsm_after", + "rsm_before", + "rsm_index", + ) if any((getattr(self.args, name) for name in to_check)): # we use RSM self.args.rsm_max = 10 @@ -700,17 +803,17 @@ except Exception as e: if isinstance(e, exceptions.BridgeExceptionNoService): print( - _("Can't connect to Libervia backend, are you sure that it's " - "launched ?") + _( + "Can't connect to Libervia backend, are you sure that it's " + "launched ?" + ) ) self.quit(C.EXIT_BACKEND_NOT_FOUND, raise_exc=False) elif isinstance(e, exceptions.BridgeInitError): print(_("Can't init bridge")) self.quit(C.EXIT_BRIDGE_ERROR, raise_exc=False) else: - print( - _("Error while initialising bridge: {e}").format(e=e) - ) + print(_("Error while initialising bridge: {e}").format(e=e)) self.quit(C.EXIT_BRIDGE_ERROR, raise_exc=False) return # we wait on init_pre_script instead of ready_get, so the CLI frontend can be used @@ -743,7 +846,6 @@ def run(cls): cls()._run() - def _read_stdin(self, stdin_fut): """Callback called by ainput to read stdin""" line = sys.stdin.readline() @@ -752,9 +854,9 @@ else: stdin_fut.set_exception(EOFError()) - async def ainput(self, msg=''): + async def ainput(self, msg=""): """Asynchronous version of buildin "input" function""" - self.disp(msg, end=' ') + self.disp(msg, end=" ") sys.stdout.flush() loop = asyncio.get_running_loop() stdin_fut = loop.create_future() @@ -766,7 +868,9 @@ res = await self.ainput(f"{message} (y/N)? ") return res in ("y", "Y") - async def confirm_or_quit(self, message, cancel_message=_("action cancelled by user")): + async def confirm_or_quit( + self, message, cancel_message=_("action cancelled by user") + ): """Request user to confirm action, and quit if he doesn't""" confirmed = await self.confirm(message) if not confirmed: @@ -804,7 +908,7 @@ if raise_exc: raise QuitException - async def a_quit(self, exit_code: int=0, raise_exc=True): + async def a_quit(self, exit_code: int = 0, raise_exc=True): """Execute async quit callback before actually quitting This method should be prefered to ``quit``, as it executes async quit callbacks @@ -864,10 +968,10 @@ def check(jid): if not jid.is_valid: - log.error (_("%s is not a valid JID !"), jid) + log.error(_("%s is not a valid JID !"), jid) self.quit(1) - dest_jids=[] + dest_jids = [] try: for i in range(len(jids)): dest_jids.append(expand_jid(jids[i])) @@ -877,7 +981,7 @@ return dest_jids - async def a_pwd_input(self, msg=''): + async def a_pwd_input(self, msg=""): """Like ainput but with echo disabled (useful for passwords)""" # we disable echo, code adapted from getpass standard module which has been # written by Piers Lauder (original), Guido van Rossum (Windows support and @@ -888,7 +992,7 @@ new = old[:] new[3] &= ~termios.ECHO tcsetattr_flags = termios.TCSAFLUSH - if hasattr(termios, 'TCSASOFT'): + if hasattr(termios, "TCSASOFT"): tcsetattr_flags |= termios.TCSASOFT try: termios.tcsetattr(stdin_fd, tcsetattr_flags, new) @@ -896,7 +1000,7 @@ finally: termios.tcsetattr(stdin_fd, tcsetattr_flags, old) sys.stderr.flush() - self.disp('') + self.disp("") return pwd async def connect_or_prompt(self, method, err_msg=None): @@ -910,15 +1014,16 @@ password = self.args.pwd while True: try: - await method(password or '') + await method(password or "") except Exception as e: - if ((isinstance(e, BridgeException) - and e.classname == 'PasswordError' - and self.args.pwd is None)): + if ( + isinstance(e, BridgeException) + and e.classname == "PasswordError" + and self.args.pwd is None + ): if password is not None: self.disp(A.color(C.A_WARNING, _("invalid password"))) - password = await self.a_pwd_input( - _("please enter profile password:")) + password = await self.a_pwd_input(_("please enter profile password:")) else: self.disp(err_msg.format(profile=self.profile, e=e), error=True) self.quit(C.EXIT_ERROR) @@ -938,8 +1043,9 @@ if not self.profile: log.error( - _("The profile [{profile}] doesn't exist") - .format(profile=self.args.profile) + _("The profile [{profile}] doesn't exist").format( + profile=self.args.profile + ) ) self.quit(C.EXIT_ERROR) @@ -951,38 +1057,40 @@ if start_session: await self.connect_or_prompt( lambda pwd: self.bridge.profile_start_session(pwd, self.profile), - err_msg="Can't start {profile}'s session: {e}" + err_msg="Can't start {profile}'s session: {e}", ) return elif not await self.bridge.profile_is_session_started(self.profile): if not self.args.connect: - self.disp(_( - "Session for [{profile}] is not started, please start it " - "before using libervia-cli, or use either --start-session or " - "--connect option" - .format(profile=self.profile) - ), error=True) + self.disp( + _( + "Session for [{profile}] is not started, please start it " + "before using libervia-cli, or use either --start-session or " + "--connect option".format(profile=self.profile) + ), + error=True, + ) self.quit(1) elif not getattr(self.args, "connect", False): return - - if not hasattr(self.args, 'connect'): + if not hasattr(self.args, "connect"): # a profile can be present without connect option (e.g. on profile # creation/deletion) return elif self.args.connect is True: # if connection is asked, we connect the profile await self.connect_or_prompt( lambda pwd: self.bridge.connect(self.profile, pwd, {}), - err_msg = 'Can\'t connect profile "{profile!s}": {e}' + err_msg='Can\'t connect profile "{profile!s}": {e}', ) return else: if not await self.bridge.is_connected(self.profile): log.error( - _("Profile [{profile}] is not connected, please connect it " - "before using libervia-cli, or use --connect option") - .format(profile=self.profile) + _( + "Profile [{profile}] is not connected, please connect it " + "before using libervia-cli, or use --connect option" + ).format(profile=self.profile) ) self.quit(1) @@ -992,7 +1100,7 @@ # as backend now handle jingles message initiation _jid = JID(param_jid) if not _jid.resource: - #if the resource is not given, we try to add the main resource + # if the resource is not given, we try to add the main resource main_resource = await self.bridge.main_resource_get(param_jid, self.profile) if main_resource: return f"{_jid.bare}/{main_resource}" @@ -1017,7 +1125,7 @@ extra_outputs: Optional[dict] = None, need_connect: Optional[bool] = None, help: Optional[str] = None, - **kwargs + **kwargs, ): """Initialise CommandBase @@ -1051,20 +1159,21 @@ C.ITEM: item is required C.SINGLE_ITEM: only one item is allowed """ - try: # If we have subcommands, host is a CommandBase and we need to use host.host + try: # If we have subcommands, host is a CommandBase and we need to use host.host self.host = host.host except AttributeError: self.host = host # --profile option - parents = kwargs.setdefault('parents', set()) + parents = kwargs.setdefault("parents", set()) if use_profile: # self.host.parents['profile'] is an ArgumentParser with profile connection # arguments if need_connect is None: need_connect = True parents.add( - self.host.parents['profile' if need_connect else 'profile_session']) + self.host.parents["profile" if need_connect else "profile_session"] + ) else: assert need_connect is None self.need_connect = need_connect @@ -1085,37 +1194,47 @@ choices.update(extra_outputs) if not choices: raise exceptions.InternalError( - "No choice found for {} output type".format(use_output)) + "No choice found for {} output type".format(use_output) + ) try: default = self.host.default_output[use_output] except KeyError: - if 'default' in choices: - default = 'default' - elif 'simple' in choices: - default = 'simple' + if "default" in choices: + default = "default" + elif "simple" in choices: + default = "simple" else: default = list(choices)[0] output_parent.add_argument( - '--output', '-O', choices=sorted(choices), default=default, - help=_("select output format (default: {})".format(default))) + "--output", + "-O", + choices=sorted(choices), + default=default, + help=_("select output format (default: {})".format(default)), + ) output_parent.add_argument( - '--output-option', '--oo', action="append", dest='output_opts', - default=[], help=_("output specific option")) + "--output-option", + "--oo", + action="append", + dest="output_opts", + default=[], + help=_("output specific option"), + ) parents.add(output_parent) else: assert extra_outputs is None - self._use_pubsub = kwargs.pop('use_pubsub', False) + self._use_pubsub = kwargs.pop("use_pubsub", False) if self._use_pubsub: - flags = kwargs.pop('pubsub_flags', []) - defaults = kwargs.pop('pubsub_defaults', {}) + flags = kwargs.pop("pubsub_flags", []) + defaults = kwargs.pop("pubsub_defaults", {}) parents.add(self.host.make_pubsub_group(flags, defaults)) self._pubsub_flags = flags # other common options - use_opts = {k:v for k,v in kwargs.items() if k.startswith('use_')} + use_opts = {k: v for k, v in kwargs.items() if k.startswith("use_")} for param, do_use in use_opts.items(): - opt=param[4:] # if param is use_verbose, opt is verbose + opt = param[4:] # if param is use_verbose, opt is verbose if opt not in self.host.parents: raise exceptions.InternalError("Unknown parent option {}".format(opt)) del kwargs[param] @@ -1124,7 +1243,7 @@ self.parser = host.subparsers.add_parser(name, help=help, **kwargs) if hasattr(self, "subcommands"): - self.subparsers = self.parser.add_subparsers(dest='subcommand', required=True) + self.subparsers = self.parser.add_subparsers(dest="subcommand", required=True) else: self.parser.set_defaults(_cmd=self) self.add_parser_options() @@ -1198,7 +1317,7 @@ return if uid == self.progress_id: if self.args.progress: - self.disp('') # progress is not finished, so we skip a line + self.disp("") # progress is not finished, so we skip a line if self.host.quit_on_progress_end: await self.on_progress_error(message) self.host.quit_from_signal(C.EXIT_ERROR) @@ -1211,13 +1330,16 @@ data = await self.host.bridge.progress_get(self.progress_id, self.profile) if data: try: - size = data['size'] + size = data["size"] except KeyError: - self.disp(_("file size is not known, we can't show a progress bar"), 1, - error=True) + self.disp( + _("file size is not known, we can't show a progress bar"), + 1, + error=True, + ) return False if self.host.pbar is None: - #first answer, we must construct the bar + # first answer, we must construct the bar # if the instance has a pbar_template attribute, it is used has model, # else default one is used @@ -1228,8 +1350,14 @@ template = self.pbar_template except AttributeError: template = [ - _("Progress: "), ["Percentage"], " ", ["Bar"], " ", - ["FileTransferSpeed"], " ", ["ETA"] + _("Progress: "), + ["Percentage"], + " ", + ["Bar"], + " ", + ["FileTransferSpeed"], + " ", + ["ETA"], ] widgets = [] @@ -1240,10 +1368,12 @@ widget = getattr(progressbar, part.pop(0)) widgets.append(widget(*part)) - self.host.pbar = progressbar.ProgressBar(max_value=int(size), widgets=widgets) + self.host.pbar = progressbar.ProgressBar( + max_value=int(size), widgets=widgets + ) self.host.pbar.start() - self.host.pbar.update(int(data['position'])) + self.host.pbar.update(int(data["position"])) elif self.host.pbar is not None: return False @@ -1283,7 +1413,7 @@ """ self.disp(_("Error while doing operation: {e}").format(e=e), error=True) - def disp(self, msg, verbosity=0, error=False, end='\n'): + def disp(self, msg, verbosity=0, error=False, end="\n"): return self.host.disp(msg, verbosity, error, end) def output(self, data): @@ -1291,7 +1421,8 @@ output_type = self._output_type except AttributeError: raise exceptions.InternalError( - _('trying to use output when use_output has not been set')) + _("trying to use output when use_output has not been set") + ) return self.host.output(output_type, self.args.output, self.extra_outputs, data) def get_pubsub_extra(self, extra: Optional[dict] = None) -> str: @@ -1307,27 +1438,30 @@ if intersection: raise exceptions.ConflictError( "given extra dict has conflicting keys with pubsub keys " - "{intersection}".format(intersection=intersection)) + "{intersection}".format(intersection=intersection) + ) # RSM - for attribute in ('max', 'after', 'before', 'index'): - key = 'rsm_' + attribute + for attribute in ("max", "after", "before", "index"): + key = "rsm_" + attribute if key in extra: raise exceptions.ConflictError( - "This key already exists in extra: u{key}".format(key=key)) + "This key already exists in extra: u{key}".format(key=key) + ) value = getattr(self.args, key, None) if value is not None: extra[key] = str(value) # MAM - if hasattr(self.args, 'mam_filters'): + if hasattr(self.args, "mam_filters"): for key, value in self.args.mam_filters: - key = 'filter_' + key + key = "filter_" + key if key in extra: raise exceptions.ConflictError( - "This key already exists in extra: u{key}".format(key=key)) + "This key already exists in extra: u{key}".format(key=key) + ) extra[key] = value # Order-By @@ -1390,11 +1524,14 @@ # we need to register the following signal even if we don't display the # progress bar self.host.bridge.register_signal( - "progress_started", self.progress_started_handler) + "progress_started", self.progress_started_handler + ) self.host.bridge.register_signal( - "progress_finished", self.progress_finished_handler) + "progress_finished", self.progress_finished_handler + ) self.host.bridge.register_signal( - "progress_error", self.progress_error_handler) + "progress_error", self.progress_error_handler + ) if self.need_connect is not None: await self.host.connect_profile() @@ -1413,26 +1550,23 @@ to manage action_types answer, """ - action_callbacks = {} # XXX: set managed action types in a dict here: - # key is the action_type, value is the callable - # which will manage the answer. profile filtering is - # already managed when callback is called + + action_callbacks = {} # XXX: set managed action types in a dict here: + # key is the action_type, value is the callable + # which will manage the answer. profile filtering is + # already managed when callback is called def __init__(self, *args, **kwargs): super(CommandAnswering, self).__init__(*args, **kwargs) async def on_action_new( - self, - action_data_s: str, - action_id: str, - security_limit: int, - profile: str + self, action_data_s: str, action_id: str, security_limit: int, profile: str ) -> None: if profile != self.profile: return action_data = data_format.deserialise(action_data_s) try: - action_type = action_data['type'] + action_type = action_data["type"] except KeyError: try: xml_ui = action_data["xmlui"] @@ -1456,7 +1590,7 @@ # FIXME: we temporarily use ElementTree, but a real XMLUI managing module # should be available in the future # TODO: XMLUI module - ui = ET.fromstring(xml_ui.encode('utf-8')) + ui = ET.fromstring(xml_ui.encode("utf-8")) dialog = ui.find("dialog") if dialog is not None: self.disp(dialog.findtext("message"), error=dialog.get("level") == "error") @@ -1466,4 +1600,6 @@ self.host.bridge.register_signal("action_new", self.on_action_new) actions = await self.host.bridge.actions_get(self.profile) for action_data_s, action_id, security_limit in actions: - await self.on_action_new(action_data_s, action_id, security_limit, self.profile) + await self.on_action_new( + action_data_s, action_id, security_limit, self.profile + )