diff libervia/cli/cmd_debug.py @ 4075:47401850dec6

refactoring: rename `libervia.frontends.jp` to `libervia.cli`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 14:54:26 +0200
parents libervia/frontends/jp/cmd_debug.py@26b7ed2817da
children 4ed8de94c926
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/cli/cmd_debug.py	Fri Jun 02 14:54:26 2023 +0200
@@ -0,0 +1,228 @@
+#!/usr/bin/env python3
+
+
+# Libervia CLI
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+
+from . import base
+from libervia.backend.core.i18n import _
+from libervia.cli.constants import Const as C
+from libervia.backend.tools.common.ansi import ANSI as A
+import json
+
+__commands__ = ["Debug"]
+
+
+class BridgeCommon(object):
+    def eval_args(self):
+        if self.args.arg:
+            try:
+                return eval("[{}]".format(",".join(self.args.arg)))
+            except SyntaxError as e:
+                self.disp(
+                    "Can't evaluate arguments: {mess}\n{text}\n{offset}^".format(
+                        mess=e, text=e.text, offset=" " * (e.offset - 1)
+                    ),
+                    error=True,
+                )
+                self.host.quit(C.EXIT_BAD_ARG)
+        else:
+            return []
+
+
+class Method(base.CommandBase, BridgeCommon):
+    def __init__(self, host):
+        base.CommandBase.__init__(self, host, "method", help=_("call a bridge method"))
+        BridgeCommon.__init__(self)
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "method", type=str, help=_("name of the method to execute")
+        )
+        self.parser.add_argument("arg", nargs="*", help=_("argument of the method"))
+
+    async def start(self):
+        method = getattr(self.host.bridge, self.args.method)
+        import inspect
+
+        argspec = inspect.getargspec(method)
+
+        kwargs = {}
+        if "profile_key" in argspec.args:
+            kwargs["profile_key"] = self.profile
+        elif "profile" in argspec.args:
+            kwargs["profile"] = self.profile
+
+        args = self.eval_args()
+
+        try:
+            ret = await method(
+                *args,
+                **kwargs,
+            )
+        except Exception as e:
+            self.disp(
+                _("Error while executing {method}: {e}").format(
+                    method=self.args.method, e=e
+                ),
+                error=True,
+            )
+            self.host.quit(C.EXIT_ERROR)
+        else:
+            if ret is not None:
+                self.disp(str(ret))
+            self.host.quit()
+
+
+class Signal(base.CommandBase, BridgeCommon):
+    def __init__(self, host):
+        base.CommandBase.__init__(
+            self, host, "signal", help=_("send a fake signal from backend")
+        )
+        BridgeCommon.__init__(self)
+
+    def add_parser_options(self):
+        self.parser.add_argument("signal", type=str, help=_("name of the signal to send"))
+        self.parser.add_argument("arg", nargs="*", help=_("argument of the signal"))
+
+    async def start(self):
+        args = self.eval_args()
+        json_args = json.dumps(args)
+        # XXX: we use self.args.profile and not self.profile
+        #      because we want the raw profile_key (so plugin handle C.PROF_KEY_NONE)
+        try:
+            await self.host.bridge.debug_signal_fake(
+                self.args.signal, json_args, self.args.profile
+            )
+        except Exception as e:
+            self.disp(_("Can't send fake signal: {e}").format(e=e), error=True)
+            self.host.quit(C.EXIT_ERROR)
+        else:
+            self.host.quit()
+
+
+class bridge(base.CommandBase):
+    subcommands = (Method, Signal)
+
+    def __init__(self, host):
+        super(bridge, self).__init__(
+            host, "bridge", use_profile=False, help=_("bridge s(t)imulation")
+        )
+
+
+class Monitor(base.CommandBase):
+    def __init__(self, host):
+        super(Monitor, self).__init__(
+            host,
+            "monitor",
+            use_verbose=True,
+            use_profile=False,
+            use_output=C.OUTPUT_XML,
+            help=_("monitor XML stream"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "-d",
+            "--direction",
+            choices=("in", "out", "both"),
+            default="both",
+            help=_("stream direction filter"),
+        )
+
+    async def print_xml(self, direction, xml_data, profile):
+        if self.args.direction == "in" and direction != "IN":
+            return
+        if self.args.direction == "out" and direction != "OUT":
+            return
+        verbosity = self.host.verbosity
+        if not xml_data.strip():
+            if verbosity <= 2:
+                return
+            whiteping = True
+        else:
+            whiteping = False
+
+        if verbosity:
+            profile_disp = f" ({profile})" if verbosity > 1 else ""
+            if direction == "IN":
+                self.disp(
+                    A.color(
+                        A.BOLD, A.FG_YELLOW, "<<<===== IN ====", A.FG_WHITE, profile_disp
+                    )
+                )
+            else:
+                self.disp(
+                    A.color(
+                        A.BOLD, A.FG_CYAN, "==== OUT ====>>>", A.FG_WHITE, profile_disp
+                    )
+                )
+        if whiteping:
+            self.disp("[WHITESPACE PING]")
+        else:
+            try:
+                await self.output(xml_data)
+            except Exception:
+                #  initial stream is not valid XML,
+                # in this case we print directly to data
+                #  FIXME: we should test directly lxml.etree.XMLSyntaxError
+                #        but importing lxml directly here is not clean
+                #        should be wrapped in a custom Exception
+                self.disp(xml_data)
+                self.disp("")
+
+    async def start(self):
+        self.host.bridge.register_signal("xml_log", self.print_xml, "plugin")
+
+
+class Theme(base.CommandBase):
+    def __init__(self, host):
+        base.CommandBase.__init__(
+            self, host, "theme", help=_("print colours used with your background")
+        )
+
+    def add_parser_options(self):
+        pass
+
+    async def start(self):
+        print(f"background currently used: {A.BOLD}{self.host.background}{A.RESET}\n")
+        for attr in dir(C):
+            if not attr.startswith("A_"):
+                continue
+            color = getattr(C, attr)
+            if attr == "A_LEVEL_COLORS":
+                # This constant contains multiple colors
+                self.disp("LEVEL COLORS: ", end=" ")
+                for idx, c in enumerate(color):
+                    last = idx == len(color) - 1
+                    end = "\n" if last else " "
+                    self.disp(
+                        c + f"LEVEL_{idx}" + A.RESET + (", " if not last else ""), end=end
+                    )
+            else:
+                text = attr[2:]
+                self.disp(A.color(color, text))
+        self.host.quit()
+
+
+class Debug(base.CommandBase):
+    subcommands = (bridge, Monitor, Theme)
+
+    def __init__(self, host):
+        super(Debug, self).__init__(
+            host, "debug", use_profile=False, help=_("debugging tools")
+        )