Mercurial > libervia-backend
diff libervia/cli/cmd_info.py @ 4075:47401850dec6
refactoring: rename `libervia.frontends.jp` to `libervia.cli`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 14:54:26 +0200 |
parents | libervia/frontends/jp/cmd_info.py@26b7ed2817da |
children | 0d7bb4df2343 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/cli/cmd_info.py Fri Jun 02 14:54:26 2023 +0200 @@ -0,0 +1,386 @@ +#!/usr/bin/env python3 + + +# Libervia CLI +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from pprint import pformat + +from libervia.backend.core.i18n import _ +from libervia.backend.tools.common import data_format, date_utils +from libervia.backend.tools.common.ansi import ANSI as A +from libervia.cli import common +from libervia.cli.constants import Const as C + +from . import base + +__commands__ = ["Info"] + + +class Disco(base.CommandBase): + def __init__(self, host): + extra_outputs = {"default": self.default_output} + super(Disco, self).__init__( + host, + "disco", + use_output="complex", + extra_outputs=extra_outputs, + help=_("service discovery"), + ) + + def add_parser_options(self): + self.parser.add_argument("jid", help=_("entity to discover")) + self.parser.add_argument( + "-t", + "--type", + type=str, + choices=("infos", "items", "both", "external", "all"), + default="all", + help=_("type of data to discover"), + ) + self.parser.add_argument("-n", "--node", default="", help=_("node to use")) + self.parser.add_argument( + "-C", + "--no-cache", + dest="use_cache", + action="store_false", + help=_("ignore cache"), + ) + + def default_output(self, data): + features = data.get("features", []) + identities = data.get("identities", []) + extensions = data.get("extensions", {}) + items = data.get("items", []) + external = data.get("external", []) + + identities_table = common.Table( + self.host, + identities, + headers=(_("category"), _("type"), _("name")), + use_buffer=True, + ) + + extensions_tpl = [] + extensions_types = list(extensions.keys()) + extensions_types.sort() + for type_ in extensions_types: + fields = [] + for field in extensions[type_]: + field_lines = [] + data, values = field + data_keys = list(data.keys()) + data_keys.sort() + for key in data_keys: + field_lines.append( + A.color("\t", C.A_SUBHEADER, key, A.RESET, ": ", data[key]) + ) + if len(values) == 1: + field_lines.append( + A.color( + "\t", + C.A_SUBHEADER, + "value", + A.RESET, + ": ", + values[0] or (A.BOLD + "UNSET"), + ) + ) + elif len(values) > 1: + field_lines.append( + A.color("\t", C.A_SUBHEADER, "values", A.RESET, ": ") + ) + + for value in values: + field_lines.append(A.color("\t - ", A.BOLD, value)) + fields.append("\n".join(field_lines)) + extensions_tpl.append( + "{type_}\n{fields}".format(type_=type_, fields="\n\n".join(fields)) + ) + + items_table = common.Table( + self.host, items, headers=(_("entity"), _("node"), _("name")), use_buffer=True + ) + + template = [] + fmt_kwargs = {} + if features: + template.append(A.color(C.A_HEADER, _("Features")) + "\n\n{features}") + if identities: + template.append(A.color(C.A_HEADER, _("Identities")) + "\n\n{identities}") + if extensions: + template.append(A.color(C.A_HEADER, _("Extensions")) + "\n\n{extensions}") + if items: + template.append(A.color(C.A_HEADER, _("Items")) + "\n\n{items}") + if external: + fmt_lines = [] + for e in external: + data = {k: e[k] for k in sorted(e)} + host = data.pop("host") + type_ = data.pop("type") + fmt_lines.append(A.color( + "\t", + C.A_SUBHEADER, + host, + " ", + A.RESET, + "[", + C.A_LEVEL_COLORS[1], + type_, + A.RESET, + "]", + )) + extended = data.pop("extended", None) + for key, value in data.items(): + fmt_lines.append(A.color( + "\t\t", + C.A_LEVEL_COLORS[2], + f"{key}: ", + C.A_LEVEL_COLORS[3], + str(value) + )) + if extended: + fmt_lines.append(A.color( + "\t\t", + C.A_HEADER, + "extended", + )) + nb_extended = len(extended) + for idx, form_data in enumerate(extended): + namespace = form_data.get("namespace") + if namespace: + fmt_lines.append(A.color( + "\t\t", + C.A_LEVEL_COLORS[2], + "namespace: ", + C.A_LEVEL_COLORS[3], + A.BOLD, + namespace + )) + for field_data in form_data["fields"]: + name = field_data.get("name") + if not name: + continue + field_type = field_data.get("type") + if "multi" in field_type: + value = ", ".join(field_data.get("values") or []) + else: + value = field_data.get("value") + if value is None: + continue + if field_type == "boolean": + value = C.bool(value) + fmt_lines.append(A.color( + "\t\t", + C.A_LEVEL_COLORS[2], + f"{name}: ", + C.A_LEVEL_COLORS[3], + A.BOLD, + str(value) + )) + if nb_extended>1 and idx < nb_extended-1: + fmt_lines.append("\n") + + fmt_lines.append("\n") + + template.append( + A.color(C.A_HEADER, _("External")) + "\n\n{external_formatted}" + ) + fmt_kwargs["external_formatted"] = "\n".join(fmt_lines) + + print( + "\n\n".join(template).format( + features="\n".join(features), + identities=identities_table.display().string, + extensions="\n".join(extensions_tpl), + items=items_table.display().string, + **fmt_kwargs, + ) + ) + + async def start(self): + infos_requested = self.args.type in ("infos", "both", "all") + items_requested = self.args.type in ("items", "both", "all") + exter_requested = self.args.type in ("external", "all") + if self.args.node: + if self.args.type == "external": + self.parser.error( + '--node can\'t be used with discovery of external services ' + '(--type="external")' + ) + else: + exter_requested = False + jids = await self.host.check_jids([self.args.jid]) + jid = jids[0] + data = {} + + # infos + if infos_requested: + try: + infos = await self.host.bridge.disco_infos( + jid, + node=self.args.node, + use_cache=self.args.use_cache, + profile_key=self.host.profile, + ) + except Exception as e: + self.disp(_("error while doing discovery: {e}").format(e=e), error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + + else: + features, identities, extensions = infos + features.sort() + identities.sort(key=lambda identity: identity[2]) + data.update( + {"features": features, "identities": identities, "extensions": extensions} + ) + + # items + if items_requested: + try: + items = await self.host.bridge.disco_items( + jid, + node=self.args.node, + use_cache=self.args.use_cache, + profile_key=self.host.profile, + ) + except Exception as e: + self.disp(_("error while doing discovery: {e}").format(e=e), error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + items.sort(key=lambda item: item[2]) + data["items"] = items + + # external + if exter_requested: + try: + ext_services_s = await self.host.bridge.external_disco_get( + jid, + self.host.profile, + ) + except Exception as e: + self.disp( + _("error while doing external service discovery: {e}").format(e=e), + error=True + ) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + data["external"] = data_format.deserialise( + ext_services_s, type_check=list + ) + + # output + await self.output(data) + self.host.quit() + + +class Version(base.CommandBase): + def __init__(self, host): + super(Version, self).__init__(host, "version", help=_("software version")) + + def add_parser_options(self): + self.parser.add_argument("jid", type=str, help=_("Entity to request")) + + async def start(self): + jids = await self.host.check_jids([self.args.jid]) + jid = jids[0] + try: + data = await self.host.bridge.software_version_get(jid, self.host.profile) + except Exception as e: + self.disp(_("error while trying to get version: {e}").format(e=e), error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + infos = [] + name, version, os = data + if name: + infos.append(_("Software name: {name}").format(name=name)) + if version: + infos.append(_("Software version: {version}").format(version=version)) + if os: + infos.append(_("Operating System: {os}").format(os=os)) + + print("\n".join(infos)) + self.host.quit() + + +class Session(base.CommandBase): + def __init__(self, host): + extra_outputs = {"default": self.default_output} + super(Session, self).__init__( + host, + "session", + use_output="dict", + extra_outputs=extra_outputs, + help=_("running session"), + ) + + def add_parser_options(self): + pass + + async def default_output(self, data): + started = data["started"] + data["started"] = "{short} (UTC, {relative})".format( + short=date_utils.date_fmt(started), + relative=date_utils.date_fmt(started, "relative"), + ) + await self.host.output(C.OUTPUT_DICT, "simple", {}, data) + + async def start(self): + try: + data = await self.host.bridge.session_infos_get(self.host.profile) + except Exception as e: + self.disp(_("Error getting session infos: {e}").format(e=e), error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + await self.output(data) + self.host.quit() + + +class Devices(base.CommandBase): + def __init__(self, host): + super(Devices, self).__init__( + host, "devices", use_output=C.OUTPUT_LIST_DICT, help=_("devices of an entity") + ) + + def add_parser_options(self): + self.parser.add_argument( + "jid", type=str, nargs="?", default="", help=_("Entity to request") + ) + + async def start(self): + try: + data = await self.host.bridge.devices_infos_get( + self.args.jid, self.host.profile + ) + except Exception as e: + self.disp(_("Error getting devices infos: {e}").format(e=e), error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + data = data_format.deserialise(data, type_check=list) + await self.output(data) + self.host.quit() + + +class Info(base.CommandBase): + subcommands = (Disco, Version, Session, Devices) + + def __init__(self, host): + super(Info, self).__init__( + host, + "info", + use_profile=False, + help=_("Get various pieces of information on entities"), + )