diff libervia/cli/cmd_input.py @ 4075:47401850dec6

refactoring: rename `libervia.frontends.jp` to `libervia.cli`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 14:54:26 +0200
parents libervia/frontends/jp/cmd_input.py@26b7ed2817da
children 0d7bb4df2343
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/cli/cmd_input.py	Fri Jun 02 14:54:26 2023 +0200
@@ -0,0 +1,350 @@
+#!/usr/bin/env python3
+
+
+# Libervia CLI
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+
+import subprocess
+import argparse
+import sys
+import shlex
+import asyncio
+from . import base
+from libervia.backend.core.i18n import _
+from libervia.backend.core import exceptions
+from libervia.cli.constants import Const as C
+from libervia.backend.tools.common.ansi import ANSI as A
+
+__commands__ = ["Input"]
+OPT_STDIN = "stdin"
+OPT_SHORT = "short"
+OPT_LONG = "long"
+OPT_POS = "positional"
+OPT_IGNORE = "ignore"
+OPT_TYPES = (OPT_STDIN, OPT_SHORT, OPT_LONG, OPT_POS, OPT_IGNORE)
+OPT_EMPTY_SKIP = "skip"
+OPT_EMPTY_IGNORE = "ignore"
+OPT_EMPTY_CHOICES = (OPT_EMPTY_SKIP, OPT_EMPTY_IGNORE)
+
+
+class InputCommon(base.CommandBase):
+    def __init__(self, host, name, help):
+        base.CommandBase.__init__(
+            self, host, name, use_verbose=True, use_profile=False, help=help
+        )
+        self.idx = 0
+        self.reset()
+
+    def reset(self):
+        self.args_idx = 0
+        self._stdin = []
+        self._opts = []
+        self._pos = []
+        self._values_ori = []
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "--encoding", default="utf-8", help=_("encoding of the input data")
+        )
+        self.parser.add_argument(
+            "-i",
+            "--stdin",
+            action="append_const",
+            const=(OPT_STDIN, None),
+            dest="arguments",
+            help=_("standard input"),
+        )
+        self.parser.add_argument(
+            "-s",
+            "--short",
+            type=self.opt(OPT_SHORT),
+            action="append",
+            dest="arguments",
+            help=_("short option"),
+        )
+        self.parser.add_argument(
+            "-l",
+            "--long",
+            type=self.opt(OPT_LONG),
+            action="append",
+            dest="arguments",
+            help=_("long option"),
+        )
+        self.parser.add_argument(
+            "-p",
+            "--positional",
+            type=self.opt(OPT_POS),
+            action="append",
+            dest="arguments",
+            help=_("positional argument"),
+        )
+        self.parser.add_argument(
+            "-x",
+            "--ignore",
+            action="append_const",
+            const=(OPT_IGNORE, None),
+            dest="arguments",
+            help=_("ignore value"),
+        )
+        self.parser.add_argument(
+            "-D",
+            "--debug",
+            action="store_true",
+            help=_("don't actually run commands but echo what would be launched"),
+        )
+        self.parser.add_argument(
+            "--log", type=argparse.FileType("w"), help=_("log stdout to FILE")
+        )
+        self.parser.add_argument(
+            "--log-err", type=argparse.FileType("w"), help=_("log stderr to FILE")
+        )
+        self.parser.add_argument("command", nargs=argparse.REMAINDER)
+
+    def opt(self, type_):
+        return lambda s: (type_, s)
+
+    def add_value(self, value):
+        """add a parsed value according to arguments sequence"""
+        self._values_ori.append(value)
+        arguments = self.args.arguments
+        try:
+            arg_type, arg_name = arguments[self.args_idx]
+        except IndexError:
+            self.disp(
+                _("arguments in input data and in arguments sequence don't match"),
+                error=True,
+            )
+            self.host.quit(C.EXIT_DATA_ERROR)
+        self.args_idx += 1
+        while self.args_idx < len(arguments):
+            next_arg = arguments[self.args_idx]
+            if next_arg[0] not in OPT_TYPES:
+                # value will not be used if False or None, so we skip filter
+                if value not in (False, None):
+                    # we have a filter
+                    filter_type, filter_arg = arguments[self.args_idx]
+                    value = self.filter(filter_type, filter_arg, value)
+            else:
+                break
+            self.args_idx += 1
+
+        if value is None:
+            # we ignore this argument
+            return
+
+        if value is False:
+            # we skip the whole row
+            if self.args.debug:
+                self.disp(
+                    A.color(
+                        C.A_SUBHEADER,
+                        _("values: "),
+                        A.RESET,
+                        ", ".join(self._values_ori),
+                    ),
+                    2,
+                )
+                self.disp(A.color(A.BOLD, _("**SKIPPING**\n")))
+            self.reset()
+            self.idx += 1
+            raise exceptions.CancelError
+
+        if not isinstance(value, list):
+            value = [value]
+
+        for v in value:
+            if arg_type == OPT_STDIN:
+                self._stdin.append(v)
+            elif arg_type == OPT_SHORT:
+                self._opts.append("-{}".format(arg_name))
+                self._opts.append(v)
+            elif arg_type == OPT_LONG:
+                self._opts.append("--{}".format(arg_name))
+                self._opts.append(v)
+            elif arg_type == OPT_POS:
+                self._pos.append(v)
+            elif arg_type == OPT_IGNORE:
+                pass
+            else:
+                self.parser.error(
+                    _(
+                        "Invalid argument, an option type is expected, got {type_}:{name}"
+                    ).format(type_=arg_type, name=arg_name)
+                )
+
+    async def runCommand(self):
+        """run requested command with parsed arguments"""
+        if self.args_idx != len(self.args.arguments):
+            self.disp(
+                _("arguments in input data and in arguments sequence don't match"),
+                error=True,
+            )
+            self.host.quit(C.EXIT_DATA_ERROR)
+        end = '\n' if self.args.debug else ' '
+        self.disp(
+            A.color(C.A_HEADER, _("command {idx}").format(idx=self.idx)),
+            end = end,
+        )
+        stdin = "".join(self._stdin)
+        if self.args.debug:
+            self.disp(
+                A.color(
+                    C.A_SUBHEADER,
+                    _("values: "),
+                    A.RESET,
+                    ", ".join([shlex.quote(a) for a in self._values_ori])
+                ),
+                2,
+            )
+
+            if stdin:
+                self.disp(A.color(C.A_SUBHEADER, "--- STDIN ---"))
+                self.disp(stdin)
+                self.disp(A.color(C.A_SUBHEADER, "-------------"))
+
+            self.disp(
+                "{indent}{prog} {static} {options} {positionals}".format(
+                    indent=4 * " ",
+                    prog=sys.argv[0],
+                    static=" ".join(self.args.command),
+                    options=" ".join(shlex.quote(o) for o in self._opts),
+                    positionals=" ".join(shlex.quote(p) for p in self._pos),
+                )
+            )
+            self.disp("\n")
+        else:
+            self.disp(" (" + ", ".join(self._values_ori) + ")", 2, end=' ')
+            args = [sys.argv[0]] + self.args.command + self._opts + self._pos
+            p = await asyncio.create_subprocess_exec(
+                *args,
+                stdin=subprocess.PIPE,
+                stdout=subprocess.PIPE,
+                stderr=subprocess.PIPE,
+            )
+            stdout, stderr = await p.communicate(stdin.encode('utf-8'))
+            log = self.args.log
+            log_err = self.args.log_err
+            log_tpl = "{command}\n{buff}\n\n"
+            if log:
+                log.write(log_tpl.format(
+                    command=" ".join(shlex.quote(a) for a in args),
+                    buff=stdout.decode('utf-8', 'replace')))
+            if log_err:
+                log_err.write(log_tpl.format(
+                    command=" ".join(shlex.quote(a) for a in args),
+                    buff=stderr.decode('utf-8', 'replace')))
+            ret = p.returncode
+            if ret == 0:
+                self.disp(A.color(C.A_SUCCESS, _("OK")))
+            else:
+                self.disp(A.color(C.A_FAILURE, _("FAILED")))
+
+        self.reset()
+        self.idx += 1
+
+    def filter(self, filter_type, filter_arg, value):
+        """change input value
+
+        @param filter_type(unicode): name of the filter
+        @param filter_arg(unicode, None): argument of the filter
+        @param value(unicode): value to filter
+        @return (unicode, False, None): modified value
+            False to skip the whole row
+            None to ignore this argument (but continue row with other ones)
+        """
+        raise NotImplementedError
+
+
+class Csv(InputCommon):
+    def __init__(self, host):
+        super(Csv, self).__init__(host, "csv", _("comma-separated values"))
+
+    def add_parser_options(self):
+        InputCommon.add_parser_options(self)
+        self.parser.add_argument(
+            "-r",
+            "--row",
+            type=int,
+            default=0,
+            help=_("starting row (previous ones will be ignored)"),
+        )
+        self.parser.add_argument(
+            "-S",
+            "--split",
+            action="append_const",
+            const=("split", None),
+            dest="arguments",
+            help=_("split value in several options"),
+        )
+        self.parser.add_argument(
+            "-E",
+            "--empty",
+            action="append",
+            type=self.opt("empty"),
+            dest="arguments",
+            help=_("action to do on empty value ({choices})").format(
+                choices=", ".join(OPT_EMPTY_CHOICES)
+            ),
+        )
+
+    def filter(self, filter_type, filter_arg, value):
+        if filter_type == "split":
+            return value.split()
+        elif filter_type == "empty":
+            if filter_arg == OPT_EMPTY_IGNORE:
+                return value if value else None
+            elif filter_arg == OPT_EMPTY_SKIP:
+                return value if value else False
+            else:
+                self.parser.error(
+                    _("--empty value must be one of {choices}").format(
+                        choices=", ".join(OPT_EMPTY_CHOICES)
+                    )
+                )
+
+        super(Csv, self).filter(filter_type, filter_arg, value)
+
+    async def start(self):
+        import csv
+
+        if self.args.encoding:
+            sys.stdin.reconfigure(encoding=self.args.encoding, errors="replace")
+        reader = csv.reader(sys.stdin)
+        for idx, row in enumerate(reader):
+            try:
+                if idx < self.args.row:
+                    continue
+                for value in row:
+                    self.add_value(value)
+                await self.runCommand()
+            except exceptions.CancelError:
+                #  this row has been cancelled, we skip it
+                continue
+
+        self.host.quit()
+
+
+class Input(base.CommandBase):
+    subcommands = (Csv,)
+
+    def __init__(self, host):
+        super(Input, self).__init__(
+            host,
+            "input",
+            use_profile=False,
+            help=_("launch command with external input"),
+        )