comparison libervia/cli/cmd_info.py @ 4075:47401850dec6

refactoring: rename `libervia.frontends.jp` to `libervia.cli`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 14:54:26 +0200
parents libervia/frontends/jp/cmd_info.py@26b7ed2817da
children 0d7bb4df2343
comparison
equal deleted inserted replaced
4074:26b7ed2817da 4075:47401850dec6
1 #!/usr/bin/env python3
2
3
4 # Libervia CLI
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 from pprint import pformat
21
22 from libervia.backend.core.i18n import _
23 from libervia.backend.tools.common import data_format, date_utils
24 from libervia.backend.tools.common.ansi import ANSI as A
25 from libervia.cli import common
26 from libervia.cli.constants import Const as C
27
28 from . import base
29
30 __commands__ = ["Info"]
31
32
33 class Disco(base.CommandBase):
34 def __init__(self, host):
35 extra_outputs = {"default": self.default_output}
36 super(Disco, self).__init__(
37 host,
38 "disco",
39 use_output="complex",
40 extra_outputs=extra_outputs,
41 help=_("service discovery"),
42 )
43
44 def add_parser_options(self):
45 self.parser.add_argument("jid", help=_("entity to discover"))
46 self.parser.add_argument(
47 "-t",
48 "--type",
49 type=str,
50 choices=("infos", "items", "both", "external", "all"),
51 default="all",
52 help=_("type of data to discover"),
53 )
54 self.parser.add_argument("-n", "--node", default="", help=_("node to use"))
55 self.parser.add_argument(
56 "-C",
57 "--no-cache",
58 dest="use_cache",
59 action="store_false",
60 help=_("ignore cache"),
61 )
62
63 def default_output(self, data):
64 features = data.get("features", [])
65 identities = data.get("identities", [])
66 extensions = data.get("extensions", {})
67 items = data.get("items", [])
68 external = data.get("external", [])
69
70 identities_table = common.Table(
71 self.host,
72 identities,
73 headers=(_("category"), _("type"), _("name")),
74 use_buffer=True,
75 )
76
77 extensions_tpl = []
78 extensions_types = list(extensions.keys())
79 extensions_types.sort()
80 for type_ in extensions_types:
81 fields = []
82 for field in extensions[type_]:
83 field_lines = []
84 data, values = field
85 data_keys = list(data.keys())
86 data_keys.sort()
87 for key in data_keys:
88 field_lines.append(
89 A.color("\t", C.A_SUBHEADER, key, A.RESET, ": ", data[key])
90 )
91 if len(values) == 1:
92 field_lines.append(
93 A.color(
94 "\t",
95 C.A_SUBHEADER,
96 "value",
97 A.RESET,
98 ": ",
99 values[0] or (A.BOLD + "UNSET"),
100 )
101 )
102 elif len(values) > 1:
103 field_lines.append(
104 A.color("\t", C.A_SUBHEADER, "values", A.RESET, ": ")
105 )
106
107 for value in values:
108 field_lines.append(A.color("\t - ", A.BOLD, value))
109 fields.append("\n".join(field_lines))
110 extensions_tpl.append(
111 "{type_}\n{fields}".format(type_=type_, fields="\n\n".join(fields))
112 )
113
114 items_table = common.Table(
115 self.host, items, headers=(_("entity"), _("node"), _("name")), use_buffer=True
116 )
117
118 template = []
119 fmt_kwargs = {}
120 if features:
121 template.append(A.color(C.A_HEADER, _("Features")) + "\n\n{features}")
122 if identities:
123 template.append(A.color(C.A_HEADER, _("Identities")) + "\n\n{identities}")
124 if extensions:
125 template.append(A.color(C.A_HEADER, _("Extensions")) + "\n\n{extensions}")
126 if items:
127 template.append(A.color(C.A_HEADER, _("Items")) + "\n\n{items}")
128 if external:
129 fmt_lines = []
130 for e in external:
131 data = {k: e[k] for k in sorted(e)}
132 host = data.pop("host")
133 type_ = data.pop("type")
134 fmt_lines.append(A.color(
135 "\t",
136 C.A_SUBHEADER,
137 host,
138 " ",
139 A.RESET,
140 "[",
141 C.A_LEVEL_COLORS[1],
142 type_,
143 A.RESET,
144 "]",
145 ))
146 extended = data.pop("extended", None)
147 for key, value in data.items():
148 fmt_lines.append(A.color(
149 "\t\t",
150 C.A_LEVEL_COLORS[2],
151 f"{key}: ",
152 C.A_LEVEL_COLORS[3],
153 str(value)
154 ))
155 if extended:
156 fmt_lines.append(A.color(
157 "\t\t",
158 C.A_HEADER,
159 "extended",
160 ))
161 nb_extended = len(extended)
162 for idx, form_data in enumerate(extended):
163 namespace = form_data.get("namespace")
164 if namespace:
165 fmt_lines.append(A.color(
166 "\t\t",
167 C.A_LEVEL_COLORS[2],
168 "namespace: ",
169 C.A_LEVEL_COLORS[3],
170 A.BOLD,
171 namespace
172 ))
173 for field_data in form_data["fields"]:
174 name = field_data.get("name")
175 if not name:
176 continue
177 field_type = field_data.get("type")
178 if "multi" in field_type:
179 value = ", ".join(field_data.get("values") or [])
180 else:
181 value = field_data.get("value")
182 if value is None:
183 continue
184 if field_type == "boolean":
185 value = C.bool(value)
186 fmt_lines.append(A.color(
187 "\t\t",
188 C.A_LEVEL_COLORS[2],
189 f"{name}: ",
190 C.A_LEVEL_COLORS[3],
191 A.BOLD,
192 str(value)
193 ))
194 if nb_extended>1 and idx < nb_extended-1:
195 fmt_lines.append("\n")
196
197 fmt_lines.append("\n")
198
199 template.append(
200 A.color(C.A_HEADER, _("External")) + "\n\n{external_formatted}"
201 )
202 fmt_kwargs["external_formatted"] = "\n".join(fmt_lines)
203
204 print(
205 "\n\n".join(template).format(
206 features="\n".join(features),
207 identities=identities_table.display().string,
208 extensions="\n".join(extensions_tpl),
209 items=items_table.display().string,
210 **fmt_kwargs,
211 )
212 )
213
214 async def start(self):
215 infos_requested = self.args.type in ("infos", "both", "all")
216 items_requested = self.args.type in ("items", "both", "all")
217 exter_requested = self.args.type in ("external", "all")
218 if self.args.node:
219 if self.args.type == "external":
220 self.parser.error(
221 '--node can\'t be used with discovery of external services '
222 '(--type="external")'
223 )
224 else:
225 exter_requested = False
226 jids = await self.host.check_jids([self.args.jid])
227 jid = jids[0]
228 data = {}
229
230 # infos
231 if infos_requested:
232 try:
233 infos = await self.host.bridge.disco_infos(
234 jid,
235 node=self.args.node,
236 use_cache=self.args.use_cache,
237 profile_key=self.host.profile,
238 )
239 except Exception as e:
240 self.disp(_("error while doing discovery: {e}").format(e=e), error=True)
241 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
242
243 else:
244 features, identities, extensions = infos
245 features.sort()
246 identities.sort(key=lambda identity: identity[2])
247 data.update(
248 {"features": features, "identities": identities, "extensions": extensions}
249 )
250
251 # items
252 if items_requested:
253 try:
254 items = await self.host.bridge.disco_items(
255 jid,
256 node=self.args.node,
257 use_cache=self.args.use_cache,
258 profile_key=self.host.profile,
259 )
260 except Exception as e:
261 self.disp(_("error while doing discovery: {e}").format(e=e), error=True)
262 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
263 else:
264 items.sort(key=lambda item: item[2])
265 data["items"] = items
266
267 # external
268 if exter_requested:
269 try:
270 ext_services_s = await self.host.bridge.external_disco_get(
271 jid,
272 self.host.profile,
273 )
274 except Exception as e:
275 self.disp(
276 _("error while doing external service discovery: {e}").format(e=e),
277 error=True
278 )
279 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
280 else:
281 data["external"] = data_format.deserialise(
282 ext_services_s, type_check=list
283 )
284
285 # output
286 await self.output(data)
287 self.host.quit()
288
289
290 class Version(base.CommandBase):
291 def __init__(self, host):
292 super(Version, self).__init__(host, "version", help=_("software version"))
293
294 def add_parser_options(self):
295 self.parser.add_argument("jid", type=str, help=_("Entity to request"))
296
297 async def start(self):
298 jids = await self.host.check_jids([self.args.jid])
299 jid = jids[0]
300 try:
301 data = await self.host.bridge.software_version_get(jid, self.host.profile)
302 except Exception as e:
303 self.disp(_("error while trying to get version: {e}").format(e=e), error=True)
304 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
305 else:
306 infos = []
307 name, version, os = data
308 if name:
309 infos.append(_("Software name: {name}").format(name=name))
310 if version:
311 infos.append(_("Software version: {version}").format(version=version))
312 if os:
313 infos.append(_("Operating System: {os}").format(os=os))
314
315 print("\n".join(infos))
316 self.host.quit()
317
318
319 class Session(base.CommandBase):
320 def __init__(self, host):
321 extra_outputs = {"default": self.default_output}
322 super(Session, self).__init__(
323 host,
324 "session",
325 use_output="dict",
326 extra_outputs=extra_outputs,
327 help=_("running session"),
328 )
329
330 def add_parser_options(self):
331 pass
332
333 async def default_output(self, data):
334 started = data["started"]
335 data["started"] = "{short} (UTC, {relative})".format(
336 short=date_utils.date_fmt(started),
337 relative=date_utils.date_fmt(started, "relative"),
338 )
339 await self.host.output(C.OUTPUT_DICT, "simple", {}, data)
340
341 async def start(self):
342 try:
343 data = await self.host.bridge.session_infos_get(self.host.profile)
344 except Exception as e:
345 self.disp(_("Error getting session infos: {e}").format(e=e), error=True)
346 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
347 else:
348 await self.output(data)
349 self.host.quit()
350
351
352 class Devices(base.CommandBase):
353 def __init__(self, host):
354 super(Devices, self).__init__(
355 host, "devices", use_output=C.OUTPUT_LIST_DICT, help=_("devices of an entity")
356 )
357
358 def add_parser_options(self):
359 self.parser.add_argument(
360 "jid", type=str, nargs="?", default="", help=_("Entity to request")
361 )
362
363 async def start(self):
364 try:
365 data = await self.host.bridge.devices_infos_get(
366 self.args.jid, self.host.profile
367 )
368 except Exception as e:
369 self.disp(_("Error getting devices infos: {e}").format(e=e), error=True)
370 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
371 else:
372 data = data_format.deserialise(data, type_check=list)
373 await self.output(data)
374 self.host.quit()
375
376
377 class Info(base.CommandBase):
378 subcommands = (Disco, Version, Session, Devices)
379
380 def __init__(self, host):
381 super(Info, self).__init__(
382 host,
383 "info",
384 use_profile=False,
385 help=_("Get various pieces of information on entities"),
386 )