diff sat_frontends/jp/cmd_pubsub.py @ 3715:b9718216a1c0 0.9

merge bookmark 0.9
author Goffi <goffi@goffi.org>
date Wed, 01 Dec 2021 16:13:31 +0100
parents 0eacda79b5d1
children 5bda9d2e8b35
line wrap: on
line diff
--- a/sat_frontends/jp/cmd_pubsub.py	Tue Nov 30 23:31:09 2021 +0100
+++ b/sat_frontends/jp/cmd_pubsub.py	Wed Dec 01 16:13:31 2021 +0100
@@ -24,6 +24,7 @@
 import sys
 import subprocess
 import asyncio
+import json
 from . import base
 from sat.core.i18n import _
 from sat.core import exceptions
@@ -35,6 +36,7 @@
 from sat.tools.common import data_format
 from sat.tools.common import uri
 from sat.tools.common.ansi import ANSI as A
+from sat.tools.common import date_utils
 from sat_frontends.tools import jid, strings
 from sat_frontends.bridge.bridge_frontend import BridgeException
 
@@ -762,6 +764,443 @@
         )
 
 
+class CacheGet(base.CommandBase):
+    def __init__(self, host):
+        super().__init__(
+            host,
+            "get",
+            use_output=C.OUTPUT_LIST_XML,
+            use_pubsub=True,
+            pubsub_flags={C.NODE, C.MULTI_ITEMS, C.CACHE},
+            help=_("get pubsub item(s) from cache"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "-S",
+            "--sub-id",
+            default="",
+            help=_("subscription id"),
+        )
+
+    async def start(self):
+        try:
+            ps_result = data_format.deserialise(
+                await self.host.bridge.psCacheGet(
+                    self.args.service,
+                    self.args.node,
+                    self.args.max,
+                    self.args.items,
+                    self.args.sub_id,
+                    self.getPubsubExtra(),
+                    self.profile,
+                )
+            )
+        except BridgeException as e:
+            if e.classname == "NotFound":
+                self.disp(
+                    f"The node {self.args.node} from {self.args.service} is not in cache "
+                    f"for {self.profile}",
+                    error=True,
+                )
+                self.host.quit(C.EXIT_NOT_FOUND)
+            else:
+                self.disp(f"can't get pubsub items from cache: {e}", error=True)
+                self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        except Exception as e:
+            self.disp(f"Internal error: {e}", error=True)
+            self.host.quit(C.EXIT_INTERNAL_ERROR)
+        else:
+            await self.output(ps_result["items"])
+            self.host.quit(C.EXIT_OK)
+
+
+class CacheSync(base.CommandBase):
+
+    def __init__(self, host):
+        super().__init__(
+            host,
+            "sync",
+            use_pubsub=True,
+            pubsub_flags={C.NODE},
+            help=_("(re)synchronise a pubsub node"),
+        )
+
+    def add_parser_options(self):
+        pass
+
+    async def start(self):
+        try:
+            await self.host.bridge.psCacheSync(
+                self.args.service,
+                self.args.node,
+                self.profile,
+            )
+        except BridgeException as e:
+            if e.condition == "item-not-found" or e.classname == "NotFound":
+                self.disp(
+                    f"The node {self.args.node} doesn't exist on {self.args.service}",
+                    error=True,
+                )
+                self.host.quit(C.EXIT_NOT_FOUND)
+            else:
+                self.disp(f"can't synchronise pubsub node: {e}", error=True)
+                self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        except Exception as e:
+            self.disp(f"Internal error: {e}", error=True)
+            self.host.quit(C.EXIT_INTERNAL_ERROR)
+        else:
+            self.host.quit(C.EXIT_OK)
+
+
+class CachePurge(base.CommandBase):
+
+    def __init__(self, host):
+        super().__init__(
+            host,
+            "purge",
+            use_profile=False,
+            help=_("purge (delete) items from cache"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "-s", "--service", action="append", metavar="JID", dest="services",
+            help="purge items only for these services. If not specified, items from ALL "
+            "services will be purged. May be used several times."
+        )
+        self.parser.add_argument(
+            "-n", "--node", action="append", dest="nodes",
+            help="purge items only for these nodes. If not specified, items from ALL "
+            "nodes will be purged. May be used several times."
+        )
+        self.parser.add_argument(
+            "-p", "--profile", action="append", dest="profiles",
+            help="purge items only for these profiles. If not specified, items from ALL "
+            "profiles will be purged. May be used several times."
+        )
+        self.parser.add_argument(
+            "-b", "--updated-before", type=base.date_decoder, metavar="TIME_PATTERN",
+            help="purge items which have been last updated before given time."
+        )
+        self.parser.add_argument(
+            "-C", "--created-before", type=base.date_decoder, metavar="TIME_PATTERN",
+            help="purge items which have been last created before given time."
+        )
+        self.parser.add_argument(
+            "-t", "--type", action="append", dest="types",
+            help="purge items flagged with TYPE. May be used several times."
+        )
+        self.parser.add_argument(
+            "-S", "--subtype", action="append", dest="subtypes",
+            help="purge items flagged with SUBTYPE. May be used several times."
+        )
+        self.parser.add_argument(
+            "-f", "--force", action="store_true",
+            help=_("purge items without confirmation")
+        )
+
+    async def start(self):
+        if not self.args.force:
+            await self.host.confirmOrQuit(
+                _(
+                    "Are you sure to purge items from cache? You'll have to bypass cache "
+                    "or resynchronise nodes to access deleted items again."
+                ),
+                _("Items purgins has been cancelled.")
+            )
+        purge_data = {}
+        for key in (
+                "services", "nodes", "profiles", "updated_before", "created_before",
+                "types", "subtypes"
+        ):
+            value = getattr(self.args, key)
+            if value is not None:
+                purge_data[key] = value
+        try:
+            await self.host.bridge.psCachePurge(
+                data_format.serialise(
+                    purge_data
+                )
+            )
+        except Exception as e:
+            self.disp(f"Internal error: {e}", error=True)
+            self.host.quit(C.EXIT_INTERNAL_ERROR)
+        else:
+            self.host.quit(C.EXIT_OK)
+
+
+class CacheReset(base.CommandBase):
+
+    def __init__(self, host):
+        super().__init__(
+            host,
+            "reset",
+            use_profile=False,
+            help=_("remove everything from cache"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "-f", "--force", action="store_true",
+            help=_("reset cache without confirmation")
+        )
+
+    async def start(self):
+        if not self.args.force:
+            await self.host.confirmOrQuit(
+                _(
+                    "Are you sure to reset cache? All nodes and items will be removed "
+                    "from it, then it will be progressively refilled as if it were new. "
+                    "This may be resources intensive."
+                ),
+                _("Pubsub cache reset has been cancelled.")
+            )
+        try:
+            await self.host.bridge.psCacheReset()
+        except Exception as e:
+            self.disp(f"Internal error: {e}", error=True)
+            self.host.quit(C.EXIT_INTERNAL_ERROR)
+        else:
+            self.host.quit(C.EXIT_OK)
+
+
+class CacheSearch(base.CommandBase):
+    def __init__(self, host):
+        extra_outputs = {
+            "default": self.default_output,
+            "xml": self.xml_output,
+            "xml-raw": self.xml_raw_output,
+        }
+        super().__init__(
+            host,
+            "search",
+            use_profile=False,
+            use_output=C.OUTPUT_LIST_DICT,
+            extra_outputs=extra_outputs,
+            help=_("search for pubsub items in cache"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "-f", "--fts", help=_("Full-Text Search query"), metavar="FTS_QUERY"
+        )
+        self.parser.add_argument(
+            "-p", "--profile", action="append", dest="profiles", metavar="PROFILE",
+            help="search items only from these profiles. May be used several times."
+        )
+        self.parser.add_argument(
+            "-s", "--service", action="append", dest="services", metavar="SERVICE",
+            help="items must be from specified service. May be used several times."
+        )
+        self.parser.add_argument(
+            "-n", "--node", action="append", dest="nodes", metavar="NODE",
+            help="items must be in the specified node. May be used several times."
+        )
+        self.parser.add_argument(
+            "-t", "--type", action="append", dest="types", metavar="TYPE",
+            help="items must be of specified type. May be used several times."
+        )
+        self.parser.add_argument(
+            "-S", "--subtype", action="append", dest="subtypes", metavar="SUBTYPE",
+            help="items must be of specified subtype. May be used several times."
+        )
+        self.parser.add_argument(
+            "-P", "--payload", action="store_true", help=_("include item XML payload")
+        )
+        self.parser.add_argument(
+            "-o", "--order-by", action="append", nargs="+",
+            metavar=("ORDER", "[FIELD] [DIRECTION]"),
+            help=_("how items must be ordered. May be used several times.")
+        )
+        self.parser.add_argument(
+            "-l", "--limit", type=int, help=_("maximum number of items to return")
+        )
+        self.parser.add_argument(
+            "-i", "--index", type=int, help=_("return results starting from this index")
+        )
+        self.parser.add_argument(
+            "-F",
+            "--field",
+            action="append",
+            nargs=3,
+            dest="fields",
+            default=[],
+            metavar=("PATH", "OPERATOR", "VALUE"),
+            help=_("parsed data field filter. May be used several times."),
+        )
+        self.parser.add_argument(
+            "-k",
+            "--key",
+            action="append",
+            dest="keys",
+            metavar="KEY",
+            help=_(
+                "data key(s) to display. May be used several times. DEFAULT: show all "
+                "keys"
+            ),
+        )
+
+    async def start(self):
+        query = {}
+        for arg in ("fts", "profiles", "services", "nodes", "types", "subtypes"):
+            value = getattr(self.args, arg)
+            if value:
+                if arg in ("types", "subtypes"):
+                    # empty string is used to find items without type and/or subtype
+                    value = [v or None for v in value]
+                query[arg] = value
+        for arg in ("limit", "index"):
+            value = getattr(self.args, arg)
+            if value is not None:
+                query[arg] = value
+        if self.args.order_by is not None:
+            for order_data in self.args.order_by:
+                order, *args = order_data
+                if order == "field":
+                    if not args:
+                        self.parser.error(_("field data must be specified in --order-by"))
+                    elif len(args) == 1:
+                        path = args[0]
+                        direction = "asc"
+                    elif len(args) == 2:
+                        path, direction = args
+                    else:
+                        self.parser.error(_(
+                            "You can't specify more that 2 arguments for a field in "
+                            "--order-by"
+                        ))
+                    try:
+                        path = json.loads(path)
+                    except json.JSONDecodeError:
+                        pass
+                    order_query = {
+                        "path": path,
+                    }
+                else:
+                    order_query = {
+                        "order": order
+                    }
+                    if not args:
+                        direction = "asc"
+                    elif len(args) == 1:
+                        direction = args[0]
+                    else:
+                        self.parser.error(_(
+                            "there are too many arguments in --order-by option"
+                        ))
+                if direction.lower() not in ("asc", "desc"):
+                    self.parser.error(_("invalid --order-by direction: {direction!r}"))
+                order_query["direction"] = direction
+                query.setdefault("order-by", []).append(order_query)
+
+        if self.args.fields:
+            parsed = []
+            for field in self.args.fields:
+                path, operator, value = field
+                try:
+                    path = json.loads(path)
+                except json.JSONDecodeError:
+                    # this is not a JSON encoded value, we keep it as a string
+                    pass
+
+                if not isinstance(path, list):
+                    path = [path]
+
+                # handling of TP(<time pattern>)
+                if operator in (">", "gt", "<", "le", "between"):
+                    def datetime_sub(match):
+                        return str(date_utils.date_parse_ext(
+                            match.group(1), default_tz=date_utils.TZ_LOCAL
+                        ))
+                    value = re.sub(r"\bTP\(([^)]+)\)", datetime_sub, value)
+
+                try:
+                    value = json.loads(value)
+                except json.JSONDecodeError:
+                    # not JSON, as above we keep it as string
+                    pass
+
+                if operator in ("overlap", "ioverlap", "disjoint", "idisjoint"):
+                    if not isinstance(value, list):
+                        value = [value]
+
+                parsed.append({
+                    "path": path,
+                    "op": operator,
+                    "value": value
+                })
+
+            query["parsed"] = parsed
+
+        if self.args.payload or "xml" in self.args.output:
+            query["with_payload"] = True
+            if self.args.keys:
+                self.args.keys.append("item_payload")
+        try:
+            found_items = data_format.deserialise(
+                await self.host.bridge.psCacheSearch(
+                    data_format.serialise(query)
+                ),
+                type_check=list,
+            )
+        except BridgeException as e:
+            self.disp(f"can't search for pubsub items in cache: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        except Exception as e:
+            self.disp(f"Internal error: {e}", error=True)
+            self.host.quit(C.EXIT_INTERNAL_ERROR)
+        else:
+            if self.args.keys:
+                found_items = [
+                    {k: v for k,v in item.items() if k in self.args.keys}
+                    for item in found_items
+                ]
+            await self.output(found_items)
+            self.host.quit(C.EXIT_OK)
+
+    def default_output(self, found_items):
+        for item in found_items:
+            for field in ("created", "published", "updated"):
+                try:
+                    timestamp = item[field]
+                except KeyError:
+                    pass
+                else:
+                    try:
+                        item[field] = common.format_time(timestamp)
+                    except ValueError:
+                        pass
+        self.host._outputs[C.OUTPUT_LIST_DICT]["simple"]["callback"](found_items)
+
+    def xml_output(self, found_items):
+        """Output prettified item payload"""
+        cb = self.host._outputs[C.OUTPUT_XML][C.OUTPUT_NAME_XML]["callback"]
+        for item in found_items:
+            cb(item["item_payload"])
+
+    def xml_raw_output(self, found_items):
+        """Output item payload without prettifying"""
+        cb = self.host._outputs[C.OUTPUT_XML][C.OUTPUT_NAME_XML_RAW]["callback"]
+        for item in found_items:
+            cb(item["item_payload"])
+
+
+class Cache(base.CommandBase):
+    subcommands = (
+        CacheGet,
+        CacheSync,
+        CachePurge,
+        CacheReset,
+        CacheSearch,
+    )
+
+    def __init__(self, host):
+        super(Cache, self).__init__(
+            host, "cache", use_profile=False, help=_("pubsub cache handling")
+        )
+
+
 class Set(base.CommandBase):
     def __init__(self, host):
         base.CommandBase.__init__(
@@ -823,7 +1262,7 @@
             "get",
             use_output=C.OUTPUT_LIST_XML,
             use_pubsub=True,
-            pubsub_flags={C.NODE, C.MULTI_ITEMS},
+            pubsub_flags={C.NODE, C.MULTI_ITEMS, C.CACHE},
             help=_("get pubsub item(s)"),
         )
 
@@ -835,7 +1274,6 @@
             help=_("subscription id"),
         )
         #  TODO: a key(s) argument to select keys to display
-        # TODO: add MAM filters
 
     async def start(self):
         try:
@@ -884,7 +1322,8 @@
             "-f", "--force", action="store_true", help=_("delete without confirmation")
         )
         self.parser.add_argument(
-            "-N", "--notify", action="store_true", help=_("notify deletion")
+            "--no-notification", dest="notify", action="store_false",
+            help=_("do not send notification (not recommended)")
         )
 
     async def start(self):
@@ -955,7 +1394,7 @@
         items = [item] if item else []
         ps_result = data_format.deserialise(
             await self.host.bridge.psItemsGet(
-                service, node, 1, items, "", {}, self.profile
+                service, node, 1, items, "", data_format.serialise({}), self.profile
             )
         )
         item_raw = ps_result["items"][0]
@@ -1687,7 +2126,7 @@
                     self.args.rsm_max,
                     self.args.items,
                     "",
-                    extra,
+                    data_format.serialise(extra),
                     self.profile,
                 )
             )
@@ -2054,12 +2493,13 @@
         Subscribe,
         Unsubscribe,
         Subscriptions,
-        Node,
         Affiliations,
         Search,
         Transform,
         Hook,
         Uri,
+        Node,
+        Cache,
     )
 
     def __init__(self, host):