Mercurial > libervia-backend
view libervia/cli/cmd_info.py @ 4151:18026ce0819c
core (xmpp): message reception workflow refactoring:
- Call methods from a root async one instead of using Deferred callbacks chain.
- Use a queue to be sure to process messages in order.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 22 Nov 2023 14:50:35 +0100 |
parents | 47401850dec6 |
children | 0d7bb4df2343 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia CLI # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from pprint import pformat from libervia.backend.core.i18n import _ from libervia.backend.tools.common import data_format, date_utils from libervia.backend.tools.common.ansi import ANSI as A from libervia.cli import common from libervia.cli.constants import Const as C from . import base __commands__ = ["Info"] class Disco(base.CommandBase): def __init__(self, host): extra_outputs = {"default": self.default_output} super(Disco, self).__init__( host, "disco", use_output="complex", extra_outputs=extra_outputs, help=_("service discovery"), ) def add_parser_options(self): self.parser.add_argument("jid", help=_("entity to discover")) self.parser.add_argument( "-t", "--type", type=str, choices=("infos", "items", "both", "external", "all"), default="all", help=_("type of data to discover"), ) self.parser.add_argument("-n", "--node", default="", help=_("node to use")) self.parser.add_argument( "-C", "--no-cache", dest="use_cache", action="store_false", help=_("ignore cache"), ) def default_output(self, data): features = data.get("features", []) identities = data.get("identities", []) extensions = data.get("extensions", {}) items = data.get("items", []) external = data.get("external", []) identities_table = common.Table( self.host, identities, headers=(_("category"), _("type"), _("name")), use_buffer=True, ) extensions_tpl = [] extensions_types = list(extensions.keys()) extensions_types.sort() for type_ in extensions_types: fields = [] for field in extensions[type_]: field_lines = [] data, values = field data_keys = list(data.keys()) data_keys.sort() for key in data_keys: field_lines.append( A.color("\t", C.A_SUBHEADER, key, A.RESET, ": ", data[key]) ) if len(values) == 1: field_lines.append( A.color( "\t", C.A_SUBHEADER, "value", A.RESET, ": ", values[0] or (A.BOLD + "UNSET"), ) ) elif len(values) > 1: field_lines.append( A.color("\t", C.A_SUBHEADER, "values", A.RESET, ": ") ) for value in values: field_lines.append(A.color("\t - ", A.BOLD, value)) fields.append("\n".join(field_lines)) extensions_tpl.append( "{type_}\n{fields}".format(type_=type_, fields="\n\n".join(fields)) ) items_table = common.Table( self.host, items, headers=(_("entity"), _("node"), _("name")), use_buffer=True ) template = [] fmt_kwargs = {} if features: template.append(A.color(C.A_HEADER, _("Features")) + "\n\n{features}") if identities: template.append(A.color(C.A_HEADER, _("Identities")) + "\n\n{identities}") if extensions: template.append(A.color(C.A_HEADER, _("Extensions")) + "\n\n{extensions}") if items: template.append(A.color(C.A_HEADER, _("Items")) + "\n\n{items}") if external: fmt_lines = [] for e in external: data = {k: e[k] for k in sorted(e)} host = data.pop("host") type_ = data.pop("type") fmt_lines.append(A.color( "\t", C.A_SUBHEADER, host, " ", A.RESET, "[", C.A_LEVEL_COLORS[1], type_, A.RESET, "]", )) extended = data.pop("extended", None) for key, value in data.items(): fmt_lines.append(A.color( "\t\t", C.A_LEVEL_COLORS[2], f"{key}: ", C.A_LEVEL_COLORS[3], str(value) )) if extended: fmt_lines.append(A.color( "\t\t", C.A_HEADER, "extended", )) nb_extended = len(extended) for idx, form_data in enumerate(extended): namespace = form_data.get("namespace") if namespace: fmt_lines.append(A.color( "\t\t", C.A_LEVEL_COLORS[2], "namespace: ", C.A_LEVEL_COLORS[3], A.BOLD, namespace )) for field_data in form_data["fields"]: name = field_data.get("name") if not name: continue field_type = field_data.get("type") if "multi" in field_type: value = ", ".join(field_data.get("values") or []) else: value = field_data.get("value") if value is None: continue if field_type == "boolean": value = C.bool(value) fmt_lines.append(A.color( "\t\t", C.A_LEVEL_COLORS[2], f"{name}: ", C.A_LEVEL_COLORS[3], A.BOLD, str(value) )) if nb_extended>1 and idx < nb_extended-1: fmt_lines.append("\n") fmt_lines.append("\n") template.append( A.color(C.A_HEADER, _("External")) + "\n\n{external_formatted}" ) fmt_kwargs["external_formatted"] = "\n".join(fmt_lines) print( "\n\n".join(template).format( features="\n".join(features), identities=identities_table.display().string, extensions="\n".join(extensions_tpl), items=items_table.display().string, **fmt_kwargs, ) ) async def start(self): infos_requested = self.args.type in ("infos", "both", "all") items_requested = self.args.type in ("items", "both", "all") exter_requested = self.args.type in ("external", "all") if self.args.node: if self.args.type == "external": self.parser.error( '--node can\'t be used with discovery of external services ' '(--type="external")' ) else: exter_requested = False jids = await self.host.check_jids([self.args.jid]) jid = jids[0] data = {} # infos if infos_requested: try: infos = await self.host.bridge.disco_infos( jid, node=self.args.node, use_cache=self.args.use_cache, profile_key=self.host.profile, ) except Exception as e: self.disp(_("error while doing discovery: {e}").format(e=e), error=True) self.host.quit(C.EXIT_BRIDGE_ERRBACK) else: features, identities, extensions = infos features.sort() identities.sort(key=lambda identity: identity[2]) data.update( {"features": features, "identities": identities, "extensions": extensions} ) # items if items_requested: try: items = await self.host.bridge.disco_items( jid, node=self.args.node, use_cache=self.args.use_cache, profile_key=self.host.profile, ) except Exception as e: self.disp(_("error while doing discovery: {e}").format(e=e), error=True) self.host.quit(C.EXIT_BRIDGE_ERRBACK) else: items.sort(key=lambda item: item[2]) data["items"] = items # external if exter_requested: try: ext_services_s = await self.host.bridge.external_disco_get( jid, self.host.profile, ) except Exception as e: self.disp( _("error while doing external service discovery: {e}").format(e=e), error=True ) self.host.quit(C.EXIT_BRIDGE_ERRBACK) else: data["external"] = data_format.deserialise( ext_services_s, type_check=list ) # output await self.output(data) self.host.quit() class Version(base.CommandBase): def __init__(self, host): super(Version, self).__init__(host, "version", help=_("software version")) def add_parser_options(self): self.parser.add_argument("jid", type=str, help=_("Entity to request")) async def start(self): jids = await self.host.check_jids([self.args.jid]) jid = jids[0] try: data = await self.host.bridge.software_version_get(jid, self.host.profile) except Exception as e: self.disp(_("error while trying to get version: {e}").format(e=e), error=True) self.host.quit(C.EXIT_BRIDGE_ERRBACK) else: infos = [] name, version, os = data if name: infos.append(_("Software name: {name}").format(name=name)) if version: infos.append(_("Software version: {version}").format(version=version)) if os: infos.append(_("Operating System: {os}").format(os=os)) print("\n".join(infos)) self.host.quit() class Session(base.CommandBase): def __init__(self, host): extra_outputs = {"default": self.default_output} super(Session, self).__init__( host, "session", use_output="dict", extra_outputs=extra_outputs, help=_("running session"), ) def add_parser_options(self): pass async def default_output(self, data): started = data["started"] data["started"] = "{short} (UTC, {relative})".format( short=date_utils.date_fmt(started), relative=date_utils.date_fmt(started, "relative"), ) await self.host.output(C.OUTPUT_DICT, "simple", {}, data) async def start(self): try: data = await self.host.bridge.session_infos_get(self.host.profile) except Exception as e: self.disp(_("Error getting session infos: {e}").format(e=e), error=True) self.host.quit(C.EXIT_BRIDGE_ERRBACK) else: await self.output(data) self.host.quit() class Devices(base.CommandBase): def __init__(self, host): super(Devices, self).__init__( host, "devices", use_output=C.OUTPUT_LIST_DICT, help=_("devices of an entity") ) def add_parser_options(self): self.parser.add_argument( "jid", type=str, nargs="?", default="", help=_("Entity to request") ) async def start(self): try: data = await self.host.bridge.devices_infos_get( self.args.jid, self.host.profile ) except Exception as e: self.disp(_("Error getting devices infos: {e}").format(e=e), error=True) self.host.quit(C.EXIT_BRIDGE_ERRBACK) else: data = data_format.deserialise(data, type_check=list) await self.output(data) self.host.quit() class Info(base.CommandBase): subcommands = (Disco, Version, Session, Devices) def __init__(self, host): super(Info, self).__init__( host, "info", use_profile=False, help=_("Get various pieces of information on entities"), )