comparison libervia/cli/cmd_input.py @ 4075:47401850dec6

refactoring: rename `libervia.frontends.jp` to `libervia.cli`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 14:54:26 +0200
parents libervia/frontends/jp/cmd_input.py@26b7ed2817da
children 0d7bb4df2343
comparison
equal deleted inserted replaced
4074:26b7ed2817da 4075:47401850dec6
1 #!/usr/bin/env python3
2
3
4 # Libervia CLI
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20
21 import subprocess
22 import argparse
23 import sys
24 import shlex
25 import asyncio
26 from . import base
27 from libervia.backend.core.i18n import _
28 from libervia.backend.core import exceptions
29 from libervia.cli.constants import Const as C
30 from libervia.backend.tools.common.ansi import ANSI as A
31
32 __commands__ = ["Input"]
33 OPT_STDIN = "stdin"
34 OPT_SHORT = "short"
35 OPT_LONG = "long"
36 OPT_POS = "positional"
37 OPT_IGNORE = "ignore"
38 OPT_TYPES = (OPT_STDIN, OPT_SHORT, OPT_LONG, OPT_POS, OPT_IGNORE)
39 OPT_EMPTY_SKIP = "skip"
40 OPT_EMPTY_IGNORE = "ignore"
41 OPT_EMPTY_CHOICES = (OPT_EMPTY_SKIP, OPT_EMPTY_IGNORE)
42
43
44 class InputCommon(base.CommandBase):
45 def __init__(self, host, name, help):
46 base.CommandBase.__init__(
47 self, host, name, use_verbose=True, use_profile=False, help=help
48 )
49 self.idx = 0
50 self.reset()
51
52 def reset(self):
53 self.args_idx = 0
54 self._stdin = []
55 self._opts = []
56 self._pos = []
57 self._values_ori = []
58
59 def add_parser_options(self):
60 self.parser.add_argument(
61 "--encoding", default="utf-8", help=_("encoding of the input data")
62 )
63 self.parser.add_argument(
64 "-i",
65 "--stdin",
66 action="append_const",
67 const=(OPT_STDIN, None),
68 dest="arguments",
69 help=_("standard input"),
70 )
71 self.parser.add_argument(
72 "-s",
73 "--short",
74 type=self.opt(OPT_SHORT),
75 action="append",
76 dest="arguments",
77 help=_("short option"),
78 )
79 self.parser.add_argument(
80 "-l",
81 "--long",
82 type=self.opt(OPT_LONG),
83 action="append",
84 dest="arguments",
85 help=_("long option"),
86 )
87 self.parser.add_argument(
88 "-p",
89 "--positional",
90 type=self.opt(OPT_POS),
91 action="append",
92 dest="arguments",
93 help=_("positional argument"),
94 )
95 self.parser.add_argument(
96 "-x",
97 "--ignore",
98 action="append_const",
99 const=(OPT_IGNORE, None),
100 dest="arguments",
101 help=_("ignore value"),
102 )
103 self.parser.add_argument(
104 "-D",
105 "--debug",
106 action="store_true",
107 help=_("don't actually run commands but echo what would be launched"),
108 )
109 self.parser.add_argument(
110 "--log", type=argparse.FileType("w"), help=_("log stdout to FILE")
111 )
112 self.parser.add_argument(
113 "--log-err", type=argparse.FileType("w"), help=_("log stderr to FILE")
114 )
115 self.parser.add_argument("command", nargs=argparse.REMAINDER)
116
117 def opt(self, type_):
118 return lambda s: (type_, s)
119
120 def add_value(self, value):
121 """add a parsed value according to arguments sequence"""
122 self._values_ori.append(value)
123 arguments = self.args.arguments
124 try:
125 arg_type, arg_name = arguments[self.args_idx]
126 except IndexError:
127 self.disp(
128 _("arguments in input data and in arguments sequence don't match"),
129 error=True,
130 )
131 self.host.quit(C.EXIT_DATA_ERROR)
132 self.args_idx += 1
133 while self.args_idx < len(arguments):
134 next_arg = arguments[self.args_idx]
135 if next_arg[0] not in OPT_TYPES:
136 # value will not be used if False or None, so we skip filter
137 if value not in (False, None):
138 # we have a filter
139 filter_type, filter_arg = arguments[self.args_idx]
140 value = self.filter(filter_type, filter_arg, value)
141 else:
142 break
143 self.args_idx += 1
144
145 if value is None:
146 # we ignore this argument
147 return
148
149 if value is False:
150 # we skip the whole row
151 if self.args.debug:
152 self.disp(
153 A.color(
154 C.A_SUBHEADER,
155 _("values: "),
156 A.RESET,
157 ", ".join(self._values_ori),
158 ),
159 2,
160 )
161 self.disp(A.color(A.BOLD, _("**SKIPPING**\n")))
162 self.reset()
163 self.idx += 1
164 raise exceptions.CancelError
165
166 if not isinstance(value, list):
167 value = [value]
168
169 for v in value:
170 if arg_type == OPT_STDIN:
171 self._stdin.append(v)
172 elif arg_type == OPT_SHORT:
173 self._opts.append("-{}".format(arg_name))
174 self._opts.append(v)
175 elif arg_type == OPT_LONG:
176 self._opts.append("--{}".format(arg_name))
177 self._opts.append(v)
178 elif arg_type == OPT_POS:
179 self._pos.append(v)
180 elif arg_type == OPT_IGNORE:
181 pass
182 else:
183 self.parser.error(
184 _(
185 "Invalid argument, an option type is expected, got {type_}:{name}"
186 ).format(type_=arg_type, name=arg_name)
187 )
188
189 async def runCommand(self):
190 """run requested command with parsed arguments"""
191 if self.args_idx != len(self.args.arguments):
192 self.disp(
193 _("arguments in input data and in arguments sequence don't match"),
194 error=True,
195 )
196 self.host.quit(C.EXIT_DATA_ERROR)
197 end = '\n' if self.args.debug else ' '
198 self.disp(
199 A.color(C.A_HEADER, _("command {idx}").format(idx=self.idx)),
200 end = end,
201 )
202 stdin = "".join(self._stdin)
203 if self.args.debug:
204 self.disp(
205 A.color(
206 C.A_SUBHEADER,
207 _("values: "),
208 A.RESET,
209 ", ".join([shlex.quote(a) for a in self._values_ori])
210 ),
211 2,
212 )
213
214 if stdin:
215 self.disp(A.color(C.A_SUBHEADER, "--- STDIN ---"))
216 self.disp(stdin)
217 self.disp(A.color(C.A_SUBHEADER, "-------------"))
218
219 self.disp(
220 "{indent}{prog} {static} {options} {positionals}".format(
221 indent=4 * " ",
222 prog=sys.argv[0],
223 static=" ".join(self.args.command),
224 options=" ".join(shlex.quote(o) for o in self._opts),
225 positionals=" ".join(shlex.quote(p) for p in self._pos),
226 )
227 )
228 self.disp("\n")
229 else:
230 self.disp(" (" + ", ".join(self._values_ori) + ")", 2, end=' ')
231 args = [sys.argv[0]] + self.args.command + self._opts + self._pos
232 p = await asyncio.create_subprocess_exec(
233 *args,
234 stdin=subprocess.PIPE,
235 stdout=subprocess.PIPE,
236 stderr=subprocess.PIPE,
237 )
238 stdout, stderr = await p.communicate(stdin.encode('utf-8'))
239 log = self.args.log
240 log_err = self.args.log_err
241 log_tpl = "{command}\n{buff}\n\n"
242 if log:
243 log.write(log_tpl.format(
244 command=" ".join(shlex.quote(a) for a in args),
245 buff=stdout.decode('utf-8', 'replace')))
246 if log_err:
247 log_err.write(log_tpl.format(
248 command=" ".join(shlex.quote(a) for a in args),
249 buff=stderr.decode('utf-8', 'replace')))
250 ret = p.returncode
251 if ret == 0:
252 self.disp(A.color(C.A_SUCCESS, _("OK")))
253 else:
254 self.disp(A.color(C.A_FAILURE, _("FAILED")))
255
256 self.reset()
257 self.idx += 1
258
259 def filter(self, filter_type, filter_arg, value):
260 """change input value
261
262 @param filter_type(unicode): name of the filter
263 @param filter_arg(unicode, None): argument of the filter
264 @param value(unicode): value to filter
265 @return (unicode, False, None): modified value
266 False to skip the whole row
267 None to ignore this argument (but continue row with other ones)
268 """
269 raise NotImplementedError
270
271
272 class Csv(InputCommon):
273 def __init__(self, host):
274 super(Csv, self).__init__(host, "csv", _("comma-separated values"))
275
276 def add_parser_options(self):
277 InputCommon.add_parser_options(self)
278 self.parser.add_argument(
279 "-r",
280 "--row",
281 type=int,
282 default=0,
283 help=_("starting row (previous ones will be ignored)"),
284 )
285 self.parser.add_argument(
286 "-S",
287 "--split",
288 action="append_const",
289 const=("split", None),
290 dest="arguments",
291 help=_("split value in several options"),
292 )
293 self.parser.add_argument(
294 "-E",
295 "--empty",
296 action="append",
297 type=self.opt("empty"),
298 dest="arguments",
299 help=_("action to do on empty value ({choices})").format(
300 choices=", ".join(OPT_EMPTY_CHOICES)
301 ),
302 )
303
304 def filter(self, filter_type, filter_arg, value):
305 if filter_type == "split":
306 return value.split()
307 elif filter_type == "empty":
308 if filter_arg == OPT_EMPTY_IGNORE:
309 return value if value else None
310 elif filter_arg == OPT_EMPTY_SKIP:
311 return value if value else False
312 else:
313 self.parser.error(
314 _("--empty value must be one of {choices}").format(
315 choices=", ".join(OPT_EMPTY_CHOICES)
316 )
317 )
318
319 super(Csv, self).filter(filter_type, filter_arg, value)
320
321 async def start(self):
322 import csv
323
324 if self.args.encoding:
325 sys.stdin.reconfigure(encoding=self.args.encoding, errors="replace")
326 reader = csv.reader(sys.stdin)
327 for idx, row in enumerate(reader):
328 try:
329 if idx < self.args.row:
330 continue
331 for value in row:
332 self.add_value(value)
333 await self.runCommand()
334 except exceptions.CancelError:
335 #  this row has been cancelled, we skip it
336 continue
337
338 self.host.quit()
339
340
341 class Input(base.CommandBase):
342 subcommands = (Csv,)
343
344 def __init__(self, host):
345 super(Input, self).__init__(
346 host,
347 "input",
348 use_profile=False,
349 help=_("launch command with external input"),
350 )