diff libervia/cli/cmd_shell.py @ 4075:47401850dec6

refactoring: rename `libervia.frontends.jp` to `libervia.cli`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 14:54:26 +0200
parents libervia/frontends/jp/cmd_shell.py@26b7ed2817da
children 0d7bb4df2343
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/cli/cmd_shell.py	Fri Jun 02 14:54:26 2023 +0200
@@ -0,0 +1,305 @@
+#!/usr/bin/env python3
+# Libervia CLI
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU Affero General Public License for more details.
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+import cmd
+import sys
+import shlex
+import subprocess
+from . import base
+from libervia.backend.core.i18n import _
+from libervia.backend.core import exceptions
+from libervia.cli.constants import Const as C
+from libervia.cli import arg_tools
+from libervia.backend.tools.common.ansi import ANSI as A
+__commands__ = ["Shell"]
+INTRO = _(
+    """Welcome to {app_name} shell, the Salut à Toi shell !
+This enrironment helps you using several {app_name} commands with similar parameters.
+To quit, just enter "quit" or press C-d.
+Enter "help" or "?" to know what to do
+class Shell(base.CommandBase, cmd.Cmd):
+    def __init__(self, host):
+        base.CommandBase.__init__(
+            self, host, "shell",
+            help=_("launch libervia-cli in shell (REPL) mode")
+        )
+        cmd.Cmd.__init__(self)
+    def parse_args(self, args):
+        """parse line arguments"""
+        return shlex.split(args, posix=True)
+    def update_path(self):
+        self._cur_parser = self.host.parser
+        self.help = ""
+        for idx, path_elt in enumerate(self.path):
+            try:
+                self._cur_parser = arg_tools.get_cmd_choices(path_elt, self._cur_parser)
+            except exceptions.NotFound:
+                self.disp(_("bad command path"), error=True)
+                self.path = self.path[:idx]
+                break
+            else:
+                self.help = self._cur_parser
+        self.prompt = A.color(C.A_PROMPT_PATH, "/".join(self.path)) + A.color(
+            C.A_PROMPT_SUF, "> "
+        )
+        try:
+            self.actions = list(arg_tools.get_cmd_choices(parser=self._cur_parser).keys())
+        except exceptions.NotFound:
+            self.actions = []
+    def add_parser_options(self):
+        pass
+    def format_args(self, args):
+        """format argument to be printed with quotes if needed"""
+        for arg in args:
+            if " " in arg:
+                yield arg_tools.escape(arg)
+            else:
+                yield arg
+    def run_cmd(self, args, external=False):
+        """run command and retur exit code
+        @param args[list[string]]: arguments of the command
+            must not include program name
+        @param external(bool): True if it's an external command (i.e. not libervia-cli)
+        @return (int): exit code (0 success, any other int failure)
+        """
+        # FIXME: we have to use subprocess
+        # and relaunch whole python for now
+        # because if host.quit() is called in D-Bus callback
+        # GLib quit the whole app without possibility to stop it
+        # didn't found a nice way to work around it so far
+        # Situation should be better when we'll move away from python-dbus
+        if self.verbose:
+            self.disp(
+                _("COMMAND {external}=> {args}").format(
+                    external=_("(external) ") if external else "",
+                    args=" ".join(self.format_args(args)),
+                )
+            )
+        if not external:
+            args = sys.argv[0:1] + args
+        ret_code = subprocess.call(args)
+        # XXX: below is a way to launch the command without creating a new process
+        #      may be used when a solution to the aforementioned issue is there
+        # try:
+        #     self.host._run(args)
+        # except SystemExit as e:
+        #     ret_code = e.code
+        # except Exception as e:
+        #     self.disp(A.color(C.A_FAILURE, u'command failed with an exception: {msg}'.format(msg=e)), error=True)
+        #     ret_code = 1
+        # else:
+        #     ret_code = 0
+        if ret_code != 0:
+            self.disp(
+                A.color(
+                    C.A_FAILURE,
+                    "command failed with an error code of {err_no}".format(
+                        err_no=ret_code
+                    ),
+                ),
+                error=True,
+            )
+        return ret_code
+    def default(self, args):
+        """called when no shell command is recognized
+        will launch the command with args on the line
+        (i.e. will launch do [args])
+        """
+        if args == "EOF":
+            self.do_quit("")
+        self.do_do(args)
+    def do_help(self, args):
+        """show help message"""
+        if not args:
+            self.disp(A.color(C.A_HEADER, _("Shell commands:")), end=' ')
+        super(Shell, self).do_help(args)
+        if not args:
+            self.disp(A.color(C.A_HEADER, _("Action commands:")))
+            help_list = self._cur_parser.format_help().split("\n\n")
+            print(("\n\n".join(help_list[1 if self.path else 2 :])))
+    # FIXME: debug crashes on exit and is not that useful,
+    #        keeping it until refactoring, may be removed entirely then
+    # def do_debug(self, args):
+    #     """launch internal debugger"""
+    #     try:
+    #         import ipdb as pdb
+    #     except ImportError:
+    #         import pdb
+    #     pdb.set_trace()
+    def do_verbose(self, args):
+        """show verbose mode, or (de)activate it"""
+        args = self.parse_args(args)
+        if args:
+            self.verbose = C.bool(args[0])
+        self.disp(
+            _("verbose mode is {status}").format(
+                status=_("ENABLED") if self.verbose else _("DISABLED")
+            )
+        )
+    def do_cmd(self, args):
+        """change command path"""
+        if args == "..":
+            self.path = self.path[:-1]
+        else:
+            if not args or args[0] == "/":
+                self.path = []
+            args = "/".join(args.split())
+            for path_elt in args.split("/"):
+                path_elt = path_elt.strip()
+                if not path_elt:
+                    continue
+                self.path.append(path_elt)
+        self.update_path()
+    def do_version(self, args):
+        """show current backend/CLI version"""
+        self.run_cmd(['--version'])
+    def do_shell(self, args):
+        """launch an external command (you can use ![command] too)"""
+        args = self.parse_args(args)
+        self.run_cmd(args, external=True)
+    def do_do(self, args):
+        """lauch a command"""
+        args = self.parse_args(args)
+        if (
+            self._not_default_profile
+            and not "-p" in args
+            and not "--profile" in args
+            and not "profile" in self.use
+        ):
+            # profile is not specified and we are not using the default profile
+            # so we need to add it in arguments to use current user profile
+            if self.verbose:
+                self.disp(
+                    _("arg profile={profile} (logged profile)").format(
+                        profile=self.profile
+                    )
+                )
+            use = self.use.copy()
+            use["profile"] = self.profile
+        else:
+            use = self.use
+        # args may be modified by use_args
+        # to remove subparsers from it
+        parser_args, use_args = arg_tools.get_use_args(
+            self.host, args, use, verbose=self.verbose, parser=self._cur_parser
+        )
+        cmd_args = self.path + parser_args + use_args
+        self.run_cmd(cmd_args)
+    def do_use(self, args):
+        """fix an argument"""
+        args = self.parse_args(args)
+        if not args:
+            if not self.use:
+                self.disp(_("no argument in USE"))
+            else:
+                self.disp(_("arguments in USE:"))
+                for arg, value in self.use.items():
+                    self.disp(
+                        _(
+                            A.color(
+                                C.A_SUBHEADER,
+                                arg,
+                                A.RESET,
+                                " = ",
+                                arg_tools.escape(value),
+                            )
+                        )
+                    )
+        elif len(args) != 2:
+            self.disp("bad syntax, please use:\nuse [arg] [value]", error=True)
+        else:
+            self.use[args[0]] = " ".join(args[1:])
+            if self.verbose:
+                self.disp(
+                    "set {name} = {value}".format(
+                        name=args[0], value=arg_tools.escape(args[1])
+                    )
+                )
+    def do_use_clear(self, args):
+        """unset one or many argument(s) in USE, or all of them if no arg is specified"""
+        args = self.parse_args(args)
+        if not args:
+            self.use.clear()
+        else:
+            for arg in args:
+                try:
+                    del self.use[arg]
+                except KeyError:
+                    self.disp(
+                        A.color(
+                            C.A_FAILURE, _("argument {name} not found").format(name=arg)
+                        ),
+                        error=True,
+                    )
+                else:
+                    if self.verbose:
+                        self.disp(_("argument {name} removed").format(name=arg))
+    def do_whoami(self, args):
+        """print profile currently used"""
+        self.disp(self.profile)
+    def do_quit(self, args):
+        """quit the shell"""
+        self.disp(_("good bye!"))
+        self.host.quit()
+    def do_exit(self, args):
+        """alias for quit"""
+        self.do_quit(args)
+    async def start(self):
+        # FIXME: "shell" is currently kept synchronous as it works well as it
+        #        and it will be refactored soon.
+        default_profile = self.host.bridge.profile_name_get(C.PROF_KEY_DEFAULT)
+        self._not_default_profile = self.profile != default_profile
+        self.path = []
+        self._cur_parser = self.host.parser
+        self.use = {}
+        self.verbose = False
+        self.update_path()
+        self.cmdloop(INTRO)