view sat_frontends/jp/cmd_input.py @ 2755:12d1ca646af1

plugin manhole: manhole debug plugin, first draft: this plugin open a telnet server which open a Python interpreter and allows to inspect the backend while it is running. To enable it, the settings "manhole_debug_dangerous_port_int" must be set to the desired TCP port in [DEFAULT] section of sat.conf. A warning will be logged if the server is enabled. Currently only a telnet server is available, and no login is required. This may change in the futur to enable login/password and ssh access, or even an XMPP access. This is a debugging feature and must only be used during development. Once in the Python interpreter, the "host" variable is accessible.
author Goffi <goffi@goffi.org>
date Fri, 04 Jan 2019 18:59:24 +0100
parents 56f94936df1e
children 003b8b4b56a7
line wrap: on
line source

#!/usr/bin/env python2
# -*- coding: utf-8 -*-

# jp: a SàT command line tool
# Copyright (C) 2009-2018 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import base
from sat.core.i18n import _
from sat.core import exceptions
from sat_frontends.jp.constants import Const as C
from sat.tools.common.ansi import ANSI as A
import subprocess
import argparse
import sys

__commands__ = ["Input"]
OPT_STDIN = "stdin"
OPT_SHORT = "short"
OPT_LONG = "long"
OPT_POS = "positional"
OPT_IGNORE = "ignore"
OPT_TYPES = (OPT_STDIN, OPT_SHORT, OPT_LONG, OPT_POS, OPT_IGNORE)
OPT_EMPTY_SKIP = "skip"
OPT_EMPTY_IGNORE = "ignore"
OPT_EMPTY_CHOICES = (OPT_EMPTY_SKIP, OPT_EMPTY_IGNORE)


class InputCommon(base.CommandBase):
    def __init__(self, host, name, help):
        base.CommandBase.__init__(
            self, host, name, use_verbose=True, use_profile=False, help=help
        )
        self.idx = 0
        self.reset()

    def reset(self):
        self.args_idx = 0
        self._stdin = []
        self._opts = []
        self._pos = []
        self._values_ori = []

    def add_parser_options(self):
        self.parser.add_argument(
            "--encoding", default="utf-8", help=_(u"encoding of the input data")
        )
        self.parser.add_argument(
            "-i",
            "--stdin",
            action="append_const",
            const=(OPT_STDIN, None),
            dest="arguments",
            help=_(u"standard input"),
        )
        self.parser.add_argument(
            "-s",
            "--short",
            type=self.opt(OPT_SHORT),
            action="append",
            dest="arguments",
            help=_(u"short option"),
        )
        self.parser.add_argument(
            "-l",
            "--long",
            type=self.opt(OPT_LONG),
            action="append",
            dest="arguments",
            help=_(u"long option"),
        )
        self.parser.add_argument(
            "-p",
            "--positional",
            type=self.opt(OPT_POS),
            action="append",
            dest="arguments",
            help=_(u"positional argument"),
        )
        self.parser.add_argument(
            "-x",
            "--ignore",
            action="append_const",
            const=(OPT_IGNORE, None),
            dest="arguments",
            help=_(u"ignore value"),
        )
        self.parser.add_argument(
            "-D",
            "--debug",
            action="store_true",
            help=_(u"don't actually run commands but echo what would be launched"),
        )
        self.parser.add_argument(
            "--log", type=argparse.FileType("wb"), help=_(u"log stdout to FILE")
        )
        self.parser.add_argument(
            "--log-err", type=argparse.FileType("wb"), help=_(u"log stderr to FILE")
        )
        self.parser.add_argument("command", nargs=argparse.REMAINDER)

    def opt(self, type_):
        return lambda s: (type_, s)

    def addValue(self, value):
        """add a parsed value according to arguments sequence"""
        self._values_ori.append(value)
        arguments = self.args.arguments
        try:
            arg_type, arg_name = arguments[self.args_idx]
        except IndexError:
            self.disp(
                _(u"arguments in input data and in arguments sequence don't match"),
                error=True,
            )
            self.host.quit(C.EXIT_DATA_ERROR)
        self.args_idx += 1
        while self.args_idx < len(arguments):
            next_arg = arguments[self.args_idx]
            if next_arg[0] not in OPT_TYPES:
                # value will not be used if False or None, so we skip filter
                if value not in (False, None):
                    # we have a filter
                    filter_type, filter_arg = arguments[self.args_idx]
                    value = self.filter(filter_type, filter_arg, value)
            else:
                break
            self.args_idx += 1

        if value is None:
            # we ignore this argument
            return

        if value is False:
            # we skip the whole row
            if self.args.debug:
                self.disp(
                    A.color(
                        C.A_SUBHEADER,
                        _(u"values: "),
                        A.RESET,
                        u", ".join(self._values_ori),
                    ),
                    2,
                )
                self.disp(A.color(A.BOLD, _(u"**SKIPPING**\n")))
            self.reset()
            self.idx += 1
            raise exceptions.CancelError

        if not isinstance(value, list):
            value = [value]

        for v in value:
            if arg_type == OPT_STDIN:
                self._stdin.append(v.encode("utf-8"))
            elif arg_type == OPT_SHORT:
                self._opts.append("-{}".format(arg_name))
                self._opts.append(v.encode("utf-8"))
            elif arg_type == OPT_LONG:
                self._opts.append("--{}".format(arg_name))
                self._opts.append(v.encode("utf-8"))
            elif arg_type == OPT_POS:
                self._pos.append(v.encode("utf-8"))
            elif arg_type == OPT_IGNORE:
                pass
            else:
                self.parser.error(
                    _(
                        u"Invalid argument, an option type is expected, got {type_}:{name}"
                    ).format(type_=arg_type, name=arg_name)
                )

    def runCommand(self):
        """run requested command with parsed arguments"""
        if self.args_idx != len(self.args.arguments):
            self.disp(
                _(u"arguments in input data and in arguments sequence don't match"),
                error=True,
            )
            self.host.quit(C.EXIT_DATA_ERROR)
        self.disp(
            A.color(C.A_HEADER, _(u"command {idx}").format(idx=self.idx)),
            no_lf=not self.args.debug,
        )
        stdin = "".join(self._stdin)
        if self.args.debug:
            self.disp(
                A.color(
                    C.A_SUBHEADER, _(u"values: "), A.RESET, u", ".join(self._values_ori)
                ),
                2,
            )

            if stdin:
                self.disp(A.color(C.A_SUBHEADER, u"--- STDIN ---"))
                self.disp(stdin.decode("utf-8"))
                self.disp(A.color(C.A_SUBHEADER, u"-------------"))
            self.disp(
                u"{indent}{prog} {static} {options} {positionals}".format(
                    indent=4 * u" ",
                    prog=sys.argv[0],
                    static=" ".join(self.args.command).decode("utf-8"),
                    options=u" ".join([o.decode("utf-8") for o in self._opts]),
                    positionals=u" ".join([p.decode("utf-8") for p in self._pos]),
                )
            )
            self.disp(u"\n")
        else:
            self.disp(u" (" + u", ".join(self._values_ori) + u")", 2, no_lf=True)
            args = [sys.argv[0]] + self.args.command + self._opts + self._pos
            p = subprocess.Popen(
                args,
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
            )
            (stdout, stderr) = p.communicate(stdin)
            log = self.args.log
            log_err = self.args.log_err
            log_tpl = "{command}\n{buff}\n\n"
            if log:
                log.write(log_tpl.format(command=" ".join(args), buff=stdout))
            if log_err:
                log_err.write(log_tpl.format(command=" ".join(args), buff=stderr))
            ret = p.wait()
            if ret == 0:
                self.disp(A.color(C.A_SUCCESS, _(u"OK")))
            else:
                self.disp(A.color(C.A_FAILURE, _(u"FAILED")))

        self.reset()
        self.idx += 1

    def filter(self, filter_type, filter_arg, value):
        """change input value

        @param filter_type(unicode): name of the filter
        @param filter_arg(unicode, None): argument of the filter
        @param value(unicode): value to filter
        @return (unicode, False, None): modified value
            False to skip the whole row
            None to ignore this argument (but continue row with other ones)
        """
        raise NotImplementedError


class Csv(InputCommon):
    def __init__(self, host):
        super(Csv, self).__init__(host, "csv", _(u"comma-separated values"))

    def add_parser_options(self):
        InputCommon.add_parser_options(self)
        self.parser.add_argument(
            "-r",
            "--row",
            type=int,
            default=0,
            help=_(u"starting row (previous ones will be ignored)"),
        )
        self.parser.add_argument(
            "-S",
            "--split",
            action="append_const",
            const=("split", None),
            dest="arguments",
            help=_(u"split value in several options"),
        )
        self.parser.add_argument(
            "-E",
            "--empty",
            action="append",
            type=self.opt("empty"),
            dest="arguments",
            help=_(u"action to do on empty value ({choices})").format(
                choices=u", ".join(OPT_EMPTY_CHOICES)
            ),
        )

    def filter(self, filter_type, filter_arg, value):
        if filter_type == "split":
            return value.split()
        elif filter_type == "empty":
            if filter_arg == OPT_EMPTY_IGNORE:
                return value if value else None
            elif filter_arg == OPT_EMPTY_SKIP:
                return value if value else False
            else:
                self.parser.error(
                    _(u"--empty value must be one of {choices}").format(
                        choices=u", ".join(OPT_EMPTY_CHOICES)
                    )
                )

        super(Csv, self).filter(filter_type, filter_arg, value)

    def start(self):
        import csv

        reader = csv.reader(sys.stdin)
        for idx, row in enumerate(reader):
            try:
                if idx < self.args.row:
                    continue
                for value in row:
                    self.addValue(value.decode(self.args.encoding))
                self.runCommand()
            except exceptions.CancelError:
                #  this row has been cancelled, we skip it
                continue


class Input(base.CommandBase):
    subcommands = (Csv,)

    def __init__(self, host):
        super(Input, self).__init__(
            host,
            "input",
            use_profile=False,
            help=_(u"launch command with external input"),
        )