diff sat_frontends/jp/cmd_pubsub.py @ 3602:7241ce3b79dd

jp (pubsub): new `cache` subcommand with commands to `get`, `sync`, `purge` and `reset` pubsub cache
author Goffi <goffi@goffi.org>
date Thu, 29 Jul 2021 22:51:01 +0200
parents b46e9791168f
children 7f503b20597e
line wrap: on
line diff
--- a/sat_frontends/jp/cmd_pubsub.py	Thu Jul 29 22:51:01 2021 +0200
+++ b/sat_frontends/jp/cmd_pubsub.py	Thu Jul 29 22:51:01 2021 +0200
@@ -762,6 +762,220 @@
         )
 
 
+class CacheGet(base.CommandBase):
+    def __init__(self, host):
+        super().__init__(
+            host,
+            "get",
+            use_output=C.OUTPUT_LIST_XML,
+            use_pubsub=True,
+            pubsub_flags={C.NODE, C.MULTI_ITEMS, C.CACHE},
+            help=_("get pubsub item(s) from cache"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "-S",
+            "--sub-id",
+            default="",
+            help=_("subscription id"),
+        )
+
+    async def start(self):
+        try:
+            ps_result = data_format.deserialise(
+                await self.host.bridge.psCacheGet(
+                    self.args.service,
+                    self.args.node,
+                    self.args.max,
+                    self.args.items,
+                    self.args.sub_id,
+                    self.getPubsubExtra(),
+                    self.profile,
+                )
+            )
+        except BridgeException as e:
+            if e.classname == "NotFound":
+                self.disp(
+                    f"The node {self.args.node} from {self.args.service} is not in cache "
+                    f"for {self.profile}",
+                    error=True,
+                )
+                self.host.quit(C.EXIT_NOT_FOUND)
+            else:
+                self.disp(f"can't get pubsub items from cache: {e}", error=True)
+                self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        except Exception as e:
+            self.disp(f"Internal error: {e}", error=True)
+            self.host.quit(C.EXIT_INTERNAL_ERROR)
+        else:
+            await self.output(ps_result["items"])
+            self.host.quit(C.EXIT_OK)
+
+
+class CacheSync(base.CommandBase):
+
+    def __init__(self, host):
+        super().__init__(
+            host,
+            "sync",
+            use_pubsub=True,
+            pubsub_flags={C.NODE},
+            help=_("(re)synchronise a pubsub node"),
+        )
+
+    def add_parser_options(self):
+        pass
+
+    async def start(self):
+        try:
+            await self.host.bridge.psCacheSync(
+                self.args.service,
+                self.args.node,
+                self.profile,
+            )
+        except BridgeException as e:
+            if e.condition == "item-not-found" or e.classname == "NotFound":
+                self.disp(
+                    f"The node {self.args.node} doesn't exist on {self.args.service}",
+                    error=True,
+                )
+                self.host.quit(C.EXIT_NOT_FOUND)
+            else:
+                self.disp(f"can't synchronise pubsub node: {e}", error=True)
+                self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        except Exception as e:
+            self.disp(f"Internal error: {e}", error=True)
+            self.host.quit(C.EXIT_INTERNAL_ERROR)
+        else:
+            self.host.quit(C.EXIT_OK)
+
+
+class CachePurge(base.CommandBase):
+
+    def __init__(self, host):
+        super().__init__(
+            host,
+            "purge",
+            use_profile=False,
+            help=_("purge (delete) items from cache"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "-s", "--service", action="append", metavar="JID", dest="services",
+            help="purge items only for these services. If not specified, items from ALL "
+            "services will be purged. May be used several times."
+        )
+        self.parser.add_argument(
+            "-n", "--node", action="append", dest="nodes",
+            help="purge items only for these nodes. If not specified, items from ALL "
+            "nodes will be purged. May be used several times."
+        )
+        self.parser.add_argument(
+            "-p", "--profile", action="append", dest="profiles",
+            help="purge items only for these profiles. If not specified, items from ALL "
+            "profiles will be purged. May be used several times."
+        )
+        self.parser.add_argument(
+            "-b", "--updated-before", type=base.date_decoder, metavar="TIME_PATTERN",
+            help="purge items which have been last updated before given time."
+        )
+        self.parser.add_argument(
+            "-C", "--created-before", type=base.date_decoder, metavar="TIME_PATTERN",
+            help="purge items which have been last created before given time."
+        )
+        self.parser.add_argument(
+            "-t", "--type", action="append", dest="types",
+            help="purge items flagged with TYPE. May be used several times."
+        )
+        self.parser.add_argument(
+            "-s", "--subtype", action="append", dest="subtypes",
+            help="purge items flagged with SUBTYPE. May be used several times."
+        )
+        self.parser.add_argument(
+            "-f", "--force", action="store_true",
+            help=_("purge items without confirmation")
+        )
+
+    async def start(self):
+        if not self.args.force:
+            await self.host.confirmOrQuit(
+                _(
+                    "Are you sure to purge items from cache? You'll have to bypass cache "
+                    "or resynchronise nodes to access deleted items again."
+                ),
+                _("Items purgins has been cancelled.")
+            )
+        purge_data = {}
+        for key in (
+                "services", "nodes", "profiles", "updated_before", "created_before",
+                "types", "subtypes"
+        ):
+            value = getattr(self.args, key)
+            if value is not None:
+                purge_data[key] = value
+        try:
+            await self.host.bridge.psCachePurge(
+                data_format.serialise(
+                    purge_data
+                )
+            )
+        except Exception as e:
+            self.disp(f"Internal error: {e}", error=True)
+            self.host.quit(C.EXIT_INTERNAL_ERROR)
+        else:
+            self.host.quit(C.EXIT_OK)
+
+
+class CacheReset(base.CommandBase):
+
+    def __init__(self, host):
+        super().__init__(
+            host,
+            "reset",
+            use_profile=False,
+            help=_("remove everything from cache"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "-f", "--force", action="store_true",
+            help=_("reset cache without confirmation")
+        )
+
+    async def start(self):
+        if not self.args.force:
+            await self.host.confirmOrQuit(
+                _(
+                    "Are you sure to reset cache? All nodes and items will be removed "
+                    "from it, then it will be progressively refilled as if it were new."
+                ),
+                _("Pubsub cache reset has been cancelled.")
+            )
+        try:
+            await self.host.bridge.psCacheReset()
+        except Exception as e:
+            self.disp(f"Internal error: {e}", error=True)
+            self.host.quit(C.EXIT_INTERNAL_ERROR)
+        else:
+            self.host.quit(C.EXIT_OK)
+
+
+class Cache(base.CommandBase):
+    subcommands = (
+        CacheGet,
+        CacheSync,
+        CachePurge,
+        CacheReset,
+    )
+
+    def __init__(self, host):
+        super(Cache, self).__init__(
+            host, "cache", use_profile=False, help=_("pubsub cache handling")
+        )
+
+
 class Set(base.CommandBase):
     def __init__(self, host):
         base.CommandBase.__init__(
@@ -2054,12 +2268,13 @@
         Subscribe,
         Unsubscribe,
         Subscriptions,
-        Node,
         Affiliations,
         Search,
         Transform,
         Hook,
         Uri,
+        Node,
+        Cache,
     )
 
     def __init__(self, host):