Mercurial > libervia-backend
diff libervia/cli/cmd_shell.py @ 4075:47401850dec6
refactoring: rename `libervia.frontends.jp` to `libervia.cli`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 14:54:26 +0200 |
parents | libervia/frontends/jp/cmd_shell.py@26b7ed2817da |
children | 0d7bb4df2343 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/cli/cmd_shell.py Fri Jun 02 14:54:26 2023 +0200 @@ -0,0 +1,305 @@ +#!/usr/bin/env python3 + + +# Libervia CLI +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import cmd +import sys +import shlex +import subprocess +from . import base +from libervia.backend.core.i18n import _ +from libervia.backend.core import exceptions +from libervia.cli.constants import Const as C +from libervia.cli import arg_tools +from libervia.backend.tools.common.ansi import ANSI as A + +__commands__ = ["Shell"] +INTRO = _( + """Welcome to {app_name} shell, the Salut à Toi shell ! + +This enrironment helps you using several {app_name} commands with similar parameters. + +To quit, just enter "quit" or press C-d. +Enter "help" or "?" to know what to do +""" +).format(app_name=C.APP_NAME) + + +class Shell(base.CommandBase, cmd.Cmd): + def __init__(self, host): + base.CommandBase.__init__( + self, host, "shell", + help=_("launch libervia-cli in shell (REPL) mode") + ) + cmd.Cmd.__init__(self) + + def parse_args(self, args): + """parse line arguments""" + return shlex.split(args, posix=True) + + def update_path(self): + self._cur_parser = self.host.parser + self.help = "" + for idx, path_elt in enumerate(self.path): + try: + self._cur_parser = arg_tools.get_cmd_choices(path_elt, self._cur_parser) + except exceptions.NotFound: + self.disp(_("bad command path"), error=True) + self.path = self.path[:idx] + break + else: + self.help = self._cur_parser + + self.prompt = A.color(C.A_PROMPT_PATH, "/".join(self.path)) + A.color( + C.A_PROMPT_SUF, "> " + ) + try: + self.actions = list(arg_tools.get_cmd_choices(parser=self._cur_parser).keys()) + except exceptions.NotFound: + self.actions = [] + + def add_parser_options(self): + pass + + def format_args(self, args): + """format argument to be printed with quotes if needed""" + for arg in args: + if " " in arg: + yield arg_tools.escape(arg) + else: + yield arg + + def run_cmd(self, args, external=False): + """run command and retur exit code + + @param args[list[string]]: arguments of the command + must not include program name + @param external(bool): True if it's an external command (i.e. not libervia-cli) + @return (int): exit code (0 success, any other int failure) + """ + # FIXME: we have to use subprocess + # and relaunch whole python for now + # because if host.quit() is called in D-Bus callback + # GLib quit the whole app without possibility to stop it + # didn't found a nice way to work around it so far + # Situation should be better when we'll move away from python-dbus + if self.verbose: + self.disp( + _("COMMAND {external}=> {args}").format( + external=_("(external) ") if external else "", + args=" ".join(self.format_args(args)), + ) + ) + if not external: + args = sys.argv[0:1] + args + ret_code = subprocess.call(args) + # XXX: below is a way to launch the command without creating a new process + # may be used when a solution to the aforementioned issue is there + # try: + # self.host._run(args) + # except SystemExit as e: + # ret_code = e.code + # except Exception as e: + # self.disp(A.color(C.A_FAILURE, u'command failed with an exception: {msg}'.format(msg=e)), error=True) + # ret_code = 1 + # else: + # ret_code = 0 + + if ret_code != 0: + self.disp( + A.color( + C.A_FAILURE, + "command failed with an error code of {err_no}".format( + err_no=ret_code + ), + ), + error=True, + ) + return ret_code + + def default(self, args): + """called when no shell command is recognized + + will launch the command with args on the line + (i.e. will launch do [args]) + """ + if args == "EOF": + self.do_quit("") + self.do_do(args) + + def do_help(self, args): + """show help message""" + if not args: + self.disp(A.color(C.A_HEADER, _("Shell commands:")), end=' ') + super(Shell, self).do_help(args) + if not args: + self.disp(A.color(C.A_HEADER, _("Action commands:"))) + help_list = self._cur_parser.format_help().split("\n\n") + print(("\n\n".join(help_list[1 if self.path else 2 :]))) + + # FIXME: debug crashes on exit and is not that useful, + # keeping it until refactoring, may be removed entirely then + # def do_debug(self, args): + # """launch internal debugger""" + # try: + # import ipdb as pdb + # except ImportError: + # import pdb + # pdb.set_trace() + + def do_verbose(self, args): + """show verbose mode, or (de)activate it""" + args = self.parse_args(args) + if args: + self.verbose = C.bool(args[0]) + self.disp( + _("verbose mode is {status}").format( + status=_("ENABLED") if self.verbose else _("DISABLED") + ) + ) + + def do_cmd(self, args): + """change command path""" + if args == "..": + self.path = self.path[:-1] + else: + if not args or args[0] == "/": + self.path = [] + args = "/".join(args.split()) + for path_elt in args.split("/"): + path_elt = path_elt.strip() + if not path_elt: + continue + self.path.append(path_elt) + self.update_path() + + def do_version(self, args): + """show current backend/CLI version""" + self.run_cmd(['--version']) + + def do_shell(self, args): + """launch an external command (you can use ![command] too)""" + args = self.parse_args(args) + self.run_cmd(args, external=True) + + def do_do(self, args): + """lauch a command""" + args = self.parse_args(args) + if ( + self._not_default_profile + and not "-p" in args + and not "--profile" in args + and not "profile" in self.use + ): + # profile is not specified and we are not using the default profile + # so we need to add it in arguments to use current user profile + if self.verbose: + self.disp( + _("arg profile={profile} (logged profile)").format( + profile=self.profile + ) + ) + use = self.use.copy() + use["profile"] = self.profile + else: + use = self.use + + # args may be modified by use_args + # to remove subparsers from it + parser_args, use_args = arg_tools.get_use_args( + self.host, args, use, verbose=self.verbose, parser=self._cur_parser + ) + cmd_args = self.path + parser_args + use_args + self.run_cmd(cmd_args) + + def do_use(self, args): + """fix an argument""" + args = self.parse_args(args) + if not args: + if not self.use: + self.disp(_("no argument in USE")) + else: + self.disp(_("arguments in USE:")) + for arg, value in self.use.items(): + self.disp( + _( + A.color( + C.A_SUBHEADER, + arg, + A.RESET, + " = ", + arg_tools.escape(value), + ) + ) + ) + elif len(args) != 2: + self.disp("bad syntax, please use:\nuse [arg] [value]", error=True) + else: + self.use[args[0]] = " ".join(args[1:]) + if self.verbose: + self.disp( + "set {name} = {value}".format( + name=args[0], value=arg_tools.escape(args[1]) + ) + ) + + def do_use_clear(self, args): + """unset one or many argument(s) in USE, or all of them if no arg is specified""" + args = self.parse_args(args) + if not args: + self.use.clear() + else: + for arg in args: + try: + del self.use[arg] + except KeyError: + self.disp( + A.color( + C.A_FAILURE, _("argument {name} not found").format(name=arg) + ), + error=True, + ) + else: + if self.verbose: + self.disp(_("argument {name} removed").format(name=arg)) + + def do_whoami(self, args): + """print profile currently used""" + self.disp(self.profile) + + def do_quit(self, args): + """quit the shell""" + self.disp(_("good bye!")) + self.host.quit() + + def do_exit(self, args): + """alias for quit""" + self.do_quit(args) + + async def start(self): + # FIXME: "shell" is currently kept synchronous as it works well as it + # and it will be refactored soon. + default_profile = self.host.bridge.profile_name_get(C.PROF_KEY_DEFAULT) + self._not_default_profile = self.profile != default_profile + self.path = [] + self._cur_parser = self.host.parser + self.use = {} + self.verbose = False + self.update_path() + self.cmdloop(INTRO)