diff libervia/cli/cmd_info.py @ 4075:47401850dec6

refactoring: rename `libervia.frontends.jp` to `libervia.cli`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 14:54:26 +0200
parents libervia/frontends/jp/cmd_info.py@26b7ed2817da
children 0d7bb4df2343
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/cli/cmd_info.py	Fri Jun 02 14:54:26 2023 +0200
@@ -0,0 +1,386 @@
+#!/usr/bin/env python3
+
+
+# Libervia CLI
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from pprint import pformat
+
+from libervia.backend.core.i18n import _
+from libervia.backend.tools.common import data_format, date_utils
+from libervia.backend.tools.common.ansi import ANSI as A
+from libervia.cli import common
+from libervia.cli.constants import Const as C
+
+from . import base
+
+__commands__ = ["Info"]
+
+
+class Disco(base.CommandBase):
+    def __init__(self, host):
+        extra_outputs = {"default": self.default_output}
+        super(Disco, self).__init__(
+            host,
+            "disco",
+            use_output="complex",
+            extra_outputs=extra_outputs,
+            help=_("service discovery"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument("jid", help=_("entity to discover"))
+        self.parser.add_argument(
+            "-t",
+            "--type",
+            type=str,
+            choices=("infos", "items", "both", "external", "all"),
+            default="all",
+            help=_("type of data to discover"),
+        )
+        self.parser.add_argument("-n", "--node", default="", help=_("node to use"))
+        self.parser.add_argument(
+            "-C",
+            "--no-cache",
+            dest="use_cache",
+            action="store_false",
+            help=_("ignore cache"),
+        )
+
+    def default_output(self, data):
+        features = data.get("features", [])
+        identities = data.get("identities", [])
+        extensions = data.get("extensions", {})
+        items = data.get("items", [])
+        external = data.get("external", [])
+
+        identities_table = common.Table(
+            self.host,
+            identities,
+            headers=(_("category"), _("type"), _("name")),
+            use_buffer=True,
+        )
+
+        extensions_tpl = []
+        extensions_types = list(extensions.keys())
+        extensions_types.sort()
+        for type_ in extensions_types:
+            fields = []
+            for field in extensions[type_]:
+                field_lines = []
+                data, values = field
+                data_keys = list(data.keys())
+                data_keys.sort()
+                for key in data_keys:
+                    field_lines.append(
+                        A.color("\t", C.A_SUBHEADER, key, A.RESET, ": ", data[key])
+                    )
+                if len(values) == 1:
+                    field_lines.append(
+                        A.color(
+                            "\t",
+                            C.A_SUBHEADER,
+                            "value",
+                            A.RESET,
+                            ": ",
+                            values[0] or (A.BOLD + "UNSET"),
+                        )
+                    )
+                elif len(values) > 1:
+                    field_lines.append(
+                        A.color("\t", C.A_SUBHEADER, "values", A.RESET, ": ")
+                    )
+
+                    for value in values:
+                        field_lines.append(A.color("\t  - ", A.BOLD, value))
+                fields.append("\n".join(field_lines))
+            extensions_tpl.append(
+                "{type_}\n{fields}".format(type_=type_, fields="\n\n".join(fields))
+            )
+
+        items_table = common.Table(
+            self.host, items, headers=(_("entity"), _("node"), _("name")), use_buffer=True
+        )
+
+        template = []
+        fmt_kwargs = {}
+        if features:
+            template.append(A.color(C.A_HEADER, _("Features")) + "\n\n{features}")
+        if identities:
+            template.append(A.color(C.A_HEADER, _("Identities")) + "\n\n{identities}")
+        if extensions:
+            template.append(A.color(C.A_HEADER, _("Extensions")) + "\n\n{extensions}")
+        if items:
+            template.append(A.color(C.A_HEADER, _("Items")) + "\n\n{items}")
+        if external:
+            fmt_lines = []
+            for e in external:
+                data = {k: e[k] for k in sorted(e)}
+                host = data.pop("host")
+                type_ = data.pop("type")
+                fmt_lines.append(A.color(
+                    "\t",
+                    C.A_SUBHEADER,
+                    host,
+                    " ",
+                    A.RESET,
+                    "[",
+                    C.A_LEVEL_COLORS[1],
+                    type_,
+                    A.RESET,
+                    "]",
+                ))
+                extended = data.pop("extended", None)
+                for key, value in data.items():
+                    fmt_lines.append(A.color(
+                        "\t\t",
+                        C.A_LEVEL_COLORS[2],
+                        f"{key}: ",
+                        C.A_LEVEL_COLORS[3],
+                        str(value)
+                    ))
+                if extended:
+                    fmt_lines.append(A.color(
+                        "\t\t",
+                        C.A_HEADER,
+                        "extended",
+                    ))
+                    nb_extended = len(extended)
+                    for idx, form_data in enumerate(extended):
+                        namespace = form_data.get("namespace")
+                        if namespace:
+                            fmt_lines.append(A.color(
+                                "\t\t",
+                                C.A_LEVEL_COLORS[2],
+                                "namespace: ",
+                                C.A_LEVEL_COLORS[3],
+                                A.BOLD,
+                                namespace
+                            ))
+                        for field_data in form_data["fields"]:
+                            name = field_data.get("name")
+                            if not name:
+                                continue
+                            field_type = field_data.get("type")
+                            if "multi" in field_type:
+                                value = ", ".join(field_data.get("values") or [])
+                            else:
+                                value = field_data.get("value")
+                                if value is None:
+                                    continue
+                                if field_type == "boolean":
+                                    value = C.bool(value)
+                            fmt_lines.append(A.color(
+                                "\t\t",
+                                C.A_LEVEL_COLORS[2],
+                                f"{name}: ",
+                                C.A_LEVEL_COLORS[3],
+                                A.BOLD,
+                                str(value)
+                            ))
+                        if nb_extended>1 and idx < nb_extended-1:
+                            fmt_lines.append("\n")
+
+                fmt_lines.append("\n")
+
+            template.append(
+                A.color(C.A_HEADER, _("External")) + "\n\n{external_formatted}"
+            )
+            fmt_kwargs["external_formatted"] = "\n".join(fmt_lines)
+
+        print(
+            "\n\n".join(template).format(
+                features="\n".join(features),
+                identities=identities_table.display().string,
+                extensions="\n".join(extensions_tpl),
+                items=items_table.display().string,
+                **fmt_kwargs,
+            )
+        )
+
+    async def start(self):
+        infos_requested = self.args.type in ("infos", "both", "all")
+        items_requested = self.args.type in ("items", "both", "all")
+        exter_requested = self.args.type in ("external", "all")
+        if self.args.node:
+            if self.args.type == "external":
+                self.parser.error(
+                    '--node can\'t be used with discovery of external services '
+                    '(--type="external")'
+                )
+            else:
+                exter_requested = False
+        jids = await self.host.check_jids([self.args.jid])
+        jid = jids[0]
+        data = {}
+
+        # infos
+        if infos_requested:
+            try:
+                infos = await self.host.bridge.disco_infos(
+                    jid,
+                    node=self.args.node,
+                    use_cache=self.args.use_cache,
+                    profile_key=self.host.profile,
+                )
+            except Exception as e:
+                self.disp(_("error while doing discovery: {e}").format(e=e), error=True)
+                self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+
+            else:
+                features, identities, extensions = infos
+                features.sort()
+                identities.sort(key=lambda identity: identity[2])
+                data.update(
+                    {"features": features, "identities": identities, "extensions": extensions}
+                )
+
+        # items
+        if items_requested:
+            try:
+                items = await self.host.bridge.disco_items(
+                    jid,
+                    node=self.args.node,
+                    use_cache=self.args.use_cache,
+                    profile_key=self.host.profile,
+                )
+            except Exception as e:
+                self.disp(_("error while doing discovery: {e}").format(e=e), error=True)
+                self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+            else:
+                items.sort(key=lambda item: item[2])
+                data["items"] = items
+
+        # external
+        if exter_requested:
+            try:
+                ext_services_s = await self.host.bridge.external_disco_get(
+                    jid,
+                    self.host.profile,
+                )
+            except Exception as e:
+                self.disp(
+                    _("error while doing external service discovery: {e}").format(e=e),
+                    error=True
+                )
+                self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+            else:
+                data["external"] = data_format.deserialise(
+                    ext_services_s, type_check=list
+                )
+
+        # output
+        await self.output(data)
+        self.host.quit()
+
+
+class Version(base.CommandBase):
+    def __init__(self, host):
+        super(Version, self).__init__(host, "version", help=_("software version"))
+
+    def add_parser_options(self):
+        self.parser.add_argument("jid", type=str, help=_("Entity to request"))
+
+    async def start(self):
+        jids = await self.host.check_jids([self.args.jid])
+        jid = jids[0]
+        try:
+            data = await self.host.bridge.software_version_get(jid, self.host.profile)
+        except Exception as e:
+            self.disp(_("error while trying to get version: {e}").format(e=e), error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            infos = []
+            name, version, os = data
+            if name:
+                infos.append(_("Software name: {name}").format(name=name))
+            if version:
+                infos.append(_("Software version: {version}").format(version=version))
+            if os:
+                infos.append(_("Operating System: {os}").format(os=os))
+
+            print("\n".join(infos))
+            self.host.quit()
+
+
+class Session(base.CommandBase):
+    def __init__(self, host):
+        extra_outputs = {"default": self.default_output}
+        super(Session, self).__init__(
+            host,
+            "session",
+            use_output="dict",
+            extra_outputs=extra_outputs,
+            help=_("running session"),
+        )
+
+    def add_parser_options(self):
+        pass
+
+    async def default_output(self, data):
+        started = data["started"]
+        data["started"] = "{short} (UTC, {relative})".format(
+            short=date_utils.date_fmt(started),
+            relative=date_utils.date_fmt(started, "relative"),
+        )
+        await self.host.output(C.OUTPUT_DICT, "simple", {}, data)
+
+    async def start(self):
+        try:
+            data = await self.host.bridge.session_infos_get(self.host.profile)
+        except Exception as e:
+            self.disp(_("Error getting session infos: {e}").format(e=e), error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            await self.output(data)
+            self.host.quit()
+
+
+class Devices(base.CommandBase):
+    def __init__(self, host):
+        super(Devices, self).__init__(
+            host, "devices", use_output=C.OUTPUT_LIST_DICT, help=_("devices of an entity")
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "jid", type=str, nargs="?", default="", help=_("Entity to request")
+        )
+
+    async def start(self):
+        try:
+            data = await self.host.bridge.devices_infos_get(
+                self.args.jid, self.host.profile
+            )
+        except Exception as e:
+            self.disp(_("Error getting devices infos: {e}").format(e=e), error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            data = data_format.deserialise(data, type_check=list)
+            await self.output(data)
+            self.host.quit()
+
+
+class Info(base.CommandBase):
+    subcommands = (Disco, Version, Session, Devices)
+
+    def __init__(self, host):
+        super(Info, self).__init__(
+            host,
+            "info",
+            use_profile=False,
+            help=_("Get various pieces of information on entities"),
+        )