Mercurial > libervia-backend
changeset 3602:7241ce3b79dd
jp (pubsub): new `cache` subcommand with commands to `get`, `sync`, `purge` and `reset` pubsub cache
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 29 Jul 2021 22:51:01 +0200 (2021-07-29) |
parents | b46e9791168f |
children | 25cbaf047728 |
files | sat_frontends/jp/cmd_pubsub.py |
diffstat | 1 files changed, 216 insertions(+), 1 deletions(-) [+] |
line wrap: on
line diff
--- a/sat_frontends/jp/cmd_pubsub.py Thu Jul 29 22:51:01 2021 +0200 +++ b/sat_frontends/jp/cmd_pubsub.py Thu Jul 29 22:51:01 2021 +0200 @@ -762,6 +762,220 @@ ) +class CacheGet(base.CommandBase): + def __init__(self, host): + super().__init__( + host, + "get", + use_output=C.OUTPUT_LIST_XML, + use_pubsub=True, + pubsub_flags={C.NODE, C.MULTI_ITEMS, C.CACHE}, + help=_("get pubsub item(s) from cache"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "-S", + "--sub-id", + default="", + help=_("subscription id"), + ) + + async def start(self): + try: + ps_result = data_format.deserialise( + await self.host.bridge.psCacheGet( + self.args.service, + self.args.node, + self.args.max, + self.args.items, + self.args.sub_id, + self.getPubsubExtra(), + self.profile, + ) + ) + except BridgeException as e: + if e.classname == "NotFound": + self.disp( + f"The node {self.args.node} from {self.args.service} is not in cache " + f"for {self.profile}", + error=True, + ) + self.host.quit(C.EXIT_NOT_FOUND) + else: + self.disp(f"can't get pubsub items from cache: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + except Exception as e: + self.disp(f"Internal error: {e}", error=True) + self.host.quit(C.EXIT_INTERNAL_ERROR) + else: + await self.output(ps_result["items"]) + self.host.quit(C.EXIT_OK) + + +class CacheSync(base.CommandBase): + + def __init__(self, host): + super().__init__( + host, + "sync", + use_pubsub=True, + pubsub_flags={C.NODE}, + help=_("(re)synchronise a pubsub node"), + ) + + def add_parser_options(self): + pass + + async def start(self): + try: + await self.host.bridge.psCacheSync( + self.args.service, + self.args.node, + self.profile, + ) + except BridgeException as e: + if e.condition == "item-not-found" or e.classname == "NotFound": + self.disp( + f"The node {self.args.node} doesn't exist on {self.args.service}", + error=True, + ) + self.host.quit(C.EXIT_NOT_FOUND) + else: + self.disp(f"can't synchronise pubsub node: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + except Exception as e: + self.disp(f"Internal error: {e}", error=True) + self.host.quit(C.EXIT_INTERNAL_ERROR) + else: + self.host.quit(C.EXIT_OK) + + +class CachePurge(base.CommandBase): + + def __init__(self, host): + super().__init__( + host, + "purge", + use_profile=False, + help=_("purge (delete) items from cache"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "-s", "--service", action="append", metavar="JID", dest="services", + help="purge items only for these services. If not specified, items from ALL " + "services will be purged. May be used several times." + ) + self.parser.add_argument( + "-n", "--node", action="append", dest="nodes", + help="purge items only for these nodes. If not specified, items from ALL " + "nodes will be purged. May be used several times." + ) + self.parser.add_argument( + "-p", "--profile", action="append", dest="profiles", + help="purge items only for these profiles. If not specified, items from ALL " + "profiles will be purged. May be used several times." + ) + self.parser.add_argument( + "-b", "--updated-before", type=base.date_decoder, metavar="TIME_PATTERN", + help="purge items which have been last updated before given time." + ) + self.parser.add_argument( + "-C", "--created-before", type=base.date_decoder, metavar="TIME_PATTERN", + help="purge items which have been last created before given time." + ) + self.parser.add_argument( + "-t", "--type", action="append", dest="types", + help="purge items flagged with TYPE. May be used several times." + ) + self.parser.add_argument( + "-s", "--subtype", action="append", dest="subtypes", + help="purge items flagged with SUBTYPE. May be used several times." + ) + self.parser.add_argument( + "-f", "--force", action="store_true", + help=_("purge items without confirmation") + ) + + async def start(self): + if not self.args.force: + await self.host.confirmOrQuit( + _( + "Are you sure to purge items from cache? You'll have to bypass cache " + "or resynchronise nodes to access deleted items again." + ), + _("Items purgins has been cancelled.") + ) + purge_data = {} + for key in ( + "services", "nodes", "profiles", "updated_before", "created_before", + "types", "subtypes" + ): + value = getattr(self.args, key) + if value is not None: + purge_data[key] = value + try: + await self.host.bridge.psCachePurge( + data_format.serialise( + purge_data + ) + ) + except Exception as e: + self.disp(f"Internal error: {e}", error=True) + self.host.quit(C.EXIT_INTERNAL_ERROR) + else: + self.host.quit(C.EXIT_OK) + + +class CacheReset(base.CommandBase): + + def __init__(self, host): + super().__init__( + host, + "reset", + use_profile=False, + help=_("remove everything from cache"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "-f", "--force", action="store_true", + help=_("reset cache without confirmation") + ) + + async def start(self): + if not self.args.force: + await self.host.confirmOrQuit( + _( + "Are you sure to reset cache? All nodes and items will be removed " + "from it, then it will be progressively refilled as if it were new." + ), + _("Pubsub cache reset has been cancelled.") + ) + try: + await self.host.bridge.psCacheReset() + except Exception as e: + self.disp(f"Internal error: {e}", error=True) + self.host.quit(C.EXIT_INTERNAL_ERROR) + else: + self.host.quit(C.EXIT_OK) + + +class Cache(base.CommandBase): + subcommands = ( + CacheGet, + CacheSync, + CachePurge, + CacheReset, + ) + + def __init__(self, host): + super(Cache, self).__init__( + host, "cache", use_profile=False, help=_("pubsub cache handling") + ) + + class Set(base.CommandBase): def __init__(self, host): base.CommandBase.__init__( @@ -2054,12 +2268,13 @@ Subscribe, Unsubscribe, Subscriptions, - Node, Affiliations, Search, Transform, Hook, Uri, + Node, + Cache, ) def __init__(self, host):