Mercurial > libervia-backend
comparison frontends/src/jp/cmd_input.py @ 2278:489efbda377c
jp (input): input command first draft:
this is an experimental command to use external data as input arguments.
A series of data is used (only CSV is implemented so far), and it is used to fill argument of a command according to a sequence.
The sequence is given using input arguments, with types corresponding to the data found (short option, long option, stdin).
e.g. if a CSV file has row with 3 columns, we can say that column 1 is subject (long option), column 2 is body (stdin), and column 3 is language (short option -l).
A filter can be used after each option type, to transform read value.
Finally a static part is used to have the main command and non dynamic arguments to use.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 28 Jun 2017 01:28:41 +0200 |
parents | |
children | d8e48c850ad2 |
comparison
equal
deleted
inserted
replaced
2277:637886ac35f6 | 2278:489efbda377c |
---|---|
1 #!/usr/bin/env python2 | |
2 # -*- coding: utf-8 -*- | |
3 | |
4 # jp: a SàT command line tool | |
5 # Copyright (C) 2009-2016 Jérôme Poisson (goffi@goffi.org) | |
6 | |
7 # This program is free software: you can redistribute it and/or modify | |
8 # it under the terms of the GNU Affero General Public License as published by | |
9 # the Free Software Foundation, either version 3 of the License, or | |
10 # (at your option) any later version. | |
11 | |
12 # This program is distributed in the hope that it will be useful, | |
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
15 # GNU Affero General Public License for more details. | |
16 | |
17 # You should have received a copy of the GNU Affero General Public License | |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
19 | |
20 | |
21 import base | |
22 from sat.core.i18n import _ | |
23 from sat_frontends.jp.constants import Const as C | |
24 from sat.tools.common.ansi import ANSI as A | |
25 import subprocess | |
26 import argparse | |
27 import os | |
28 import sys | |
29 | |
30 __commands__ = ["Input"] | |
31 OPT_STDIN = 'stdin' | |
32 OPT_SHORT = 'short' | |
33 OPT_LONG = 'long' | |
34 OPT_POS = 'positional' | |
35 OPT_IGNORE = 'ignore' | |
36 OPT_TYPES = (OPT_STDIN, OPT_SHORT, OPT_LONG, OPT_POS, OPT_IGNORE) | |
37 | |
38 | |
39 class InputCommon(base.CommandBase): | |
40 | |
41 def __init__(self, host, name, help): | |
42 base.CommandBase.__init__(self, host, name, use_verbose=True, use_profile=False, help=help) | |
43 self.idx = 0 | |
44 self.reset() | |
45 | |
46 def reset(self): | |
47 self.args_idx = 0 | |
48 self._stdin = [] | |
49 self._opts = [] | |
50 self._pos = [] | |
51 | |
52 def add_parser_options(self): | |
53 self.parser.add_argument("--encoding", default='utf-8', help=_(u"encoding of the input data")) | |
54 self.parser.add_argument("-i", "--stdin", action='append_const', const=(OPT_STDIN, None), dest='arguments', help=_(u"standard input")) | |
55 self.parser.add_argument("-s", "--short", type=self.opt(OPT_SHORT), action='append', dest='arguments', help=_(u"short option")) | |
56 self.parser.add_argument("-l", "--long", type=self.opt(OPT_LONG), action='append', dest='arguments', help=_(u"long option")) | |
57 self.parser.add_argument("-p", "--positional", type=self.opt(OPT_POS), action='append', dest='arguments', help=_(u"positional argument")) | |
58 self.parser.add_argument("-x", "--ignore", action='append_const', const=(OPT_IGNORE, None), dest='arguments', help=_(u"ignore value")) | |
59 self.parser.add_argument("-D", "--debug", action='store_true', help=_(u"don't actually run commands but echo what would be launched")) | |
60 self.parser.add_argument("command", nargs=argparse.REMAINDER) | |
61 | |
62 def opt(self, type_): | |
63 return lambda s: (type_, s) | |
64 | |
65 def addValue(self, value): | |
66 """add a parsed value according to arguments sequence""" | |
67 arguments = self.args.arguments | |
68 try: | |
69 arg_type, arg_name = arguments[self.args_idx] | |
70 except IndexError: | |
71 self.disp(_(u"arguments in input data and in arguments sequence don't match"), error=True) | |
72 self.host.quit(C.EXIT_DATA_ERROR) | |
73 self.args_idx += 1 | |
74 while self.args_idx < len(arguments): | |
75 next_arg = arguments[self.args_idx] | |
76 if next_arg[0] not in OPT_TYPES: | |
77 # we have a filter | |
78 filter_type, filter_arg = arguments[self.args_idx] | |
79 value = self.filter(filter_type, filter_arg, value) | |
80 else: | |
81 break | |
82 self.args_idx += 1 | |
83 | |
84 if not isinstance(value, list): | |
85 value = [value] | |
86 | |
87 for v in value: | |
88 if arg_type == OPT_STDIN: | |
89 self._stdin.append(v.encode('utf-8')) | |
90 elif arg_type == OPT_SHORT: | |
91 self._opts.append('-{}'.format(arg_name)) | |
92 self._opts.append(v.encode('utf-8')) | |
93 elif arg_type == OPT_LONG: | |
94 self._opts.append('--{}'.format(arg_name)) | |
95 self._opts.append(v.encode('utf-8')) | |
96 elif arg_type == OPT_POS: | |
97 self._pos.append(v.encode('utf-8')) | |
98 elif arg_type == OPT_IGNORE: | |
99 pass | |
100 else: | |
101 self.parser.error(_(u"Invalid argument, an option type is expected, got {type_}:{name}").format( | |
102 type_=arg_type, name=arg_name)) | |
103 | |
104 def runCommand(self): | |
105 """run requested command with parsed arguments""" | |
106 if self.args_idx != len(self.args.arguments): | |
107 self.disp(_(u"arguments in input data and in arguments sequence don't match"), error=True) | |
108 self.host.quit(C.EXIT_DATA_ERROR) | |
109 stdin = ''.join(self._stdin) | |
110 if self.args.debug: | |
111 self.disp(_(u'command {idx}').format(idx=self.idx)) | |
112 if stdin: | |
113 self.disp(u'--- STDIN ---') | |
114 self.disp(stdin.decode('utf-8')) | |
115 self.disp(u'-------------') | |
116 self.disp(u'{indent}{prog} {static} {options} {positionals}'.format( | |
117 indent = 4*u' ', | |
118 prog=sys.argv[0], | |
119 static = ' '.join(self.args.command).encode('utf-8'), | |
120 options = u' '.join([o.encode('utf-8') for o in self._opts]), | |
121 positionals = u' '.join([p.encode('utf-8') for p in self._pos]) | |
122 )) | |
123 self.disp(u'') | |
124 else: | |
125 self.disp(_(u'command {idx}').format(idx=self.idx), no_lf=True) | |
126 with open(os.devnull, 'wb') as DEVNULL: | |
127 p = subprocess.Popen([sys.argv[0]] + self.args.command + self._opts + self._pos, | |
128 stdin=subprocess.PIPE, stdout=DEVNULL, stderr=DEVNULL) | |
129 p.communicate(stdin) | |
130 ret = p.wait() | |
131 if ret == 0: | |
132 self.disp(A.color(C.A_SUCCESS, _(u'OK'))) | |
133 else: | |
134 self.disp(A.color(C.A_FAILURE, _(u'FAILED'))) | |
135 | |
136 self.reset() | |
137 self.idx += 1 | |
138 | |
139 def filter(self, filter_type, filter_arg, value): | |
140 raise NotImplementedError | |
141 | |
142 | |
143 class Csv(InputCommon): | |
144 | |
145 def __init__(self, host): | |
146 super(Csv, self).__init__(host, 'csv', _(u'comma-separated values')) | |
147 | |
148 def add_parser_options(self): | |
149 InputCommon.add_parser_options(self) | |
150 self.parser.add_argument("-r", "--row", type=int, default=0, help=_(u"starting row (previous ones will be ignored)")) | |
151 self.parser.add_argument("-S", "--split", action='append_const', const=('split', None), dest='arguments', help=_(u"split value in several options")) | |
152 | |
153 def filter(self, arg_type, filter_type, filter_arg, value): | |
154 if filter_type == 'split': | |
155 return value.split() | |
156 | |
157 super(Csv, self).filter(filter_type, filter_arg, value) | |
158 | |
159 def start(self): | |
160 import csv | |
161 reader = csv.reader(sys.stdin) | |
162 for idx, row in enumerate(reader): | |
163 if idx < self.args.row: | |
164 continue | |
165 for value in row: | |
166 self.addValue(value.decode(self.args.encoding)) | |
167 self.runCommand() | |
168 | |
169 | |
170 class Input(base.CommandBase): | |
171 subcommands = (Csv,) | |
172 | |
173 def __init__(self, host): | |
174 super(Input, self).__init__(host, 'input', use_profile=False, help=_(u'launch command with external input')) |