Mercurial > libervia-backend
diff sat_frontends/jp/cmd_pubsub.py @ 3715:b9718216a1c0 0.9
merge bookmark 0.9
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 01 Dec 2021 16:13:31 +0100 |
parents | 0eacda79b5d1 |
children | 5bda9d2e8b35 |
line wrap: on
line diff
--- a/sat_frontends/jp/cmd_pubsub.py Tue Nov 30 23:31:09 2021 +0100 +++ b/sat_frontends/jp/cmd_pubsub.py Wed Dec 01 16:13:31 2021 +0100 @@ -24,6 +24,7 @@ import sys import subprocess import asyncio +import json from . import base from sat.core.i18n import _ from sat.core import exceptions @@ -35,6 +36,7 @@ from sat.tools.common import data_format from sat.tools.common import uri from sat.tools.common.ansi import ANSI as A +from sat.tools.common import date_utils from sat_frontends.tools import jid, strings from sat_frontends.bridge.bridge_frontend import BridgeException @@ -762,6 +764,443 @@ ) +class CacheGet(base.CommandBase): + def __init__(self, host): + super().__init__( + host, + "get", + use_output=C.OUTPUT_LIST_XML, + use_pubsub=True, + pubsub_flags={C.NODE, C.MULTI_ITEMS, C.CACHE}, + help=_("get pubsub item(s) from cache"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "-S", + "--sub-id", + default="", + help=_("subscription id"), + ) + + async def start(self): + try: + ps_result = data_format.deserialise( + await self.host.bridge.psCacheGet( + self.args.service, + self.args.node, + self.args.max, + self.args.items, + self.args.sub_id, + self.getPubsubExtra(), + self.profile, + ) + ) + except BridgeException as e: + if e.classname == "NotFound": + self.disp( + f"The node {self.args.node} from {self.args.service} is not in cache " + f"for {self.profile}", + error=True, + ) + self.host.quit(C.EXIT_NOT_FOUND) + else: + self.disp(f"can't get pubsub items from cache: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + except Exception as e: + self.disp(f"Internal error: {e}", error=True) + self.host.quit(C.EXIT_INTERNAL_ERROR) + else: + await self.output(ps_result["items"]) + self.host.quit(C.EXIT_OK) + + +class CacheSync(base.CommandBase): + + def __init__(self, host): + super().__init__( + host, + "sync", + use_pubsub=True, + pubsub_flags={C.NODE}, + help=_("(re)synchronise a pubsub node"), + ) + + def add_parser_options(self): + pass + + async def start(self): + try: + await self.host.bridge.psCacheSync( + self.args.service, + self.args.node, + self.profile, + ) + except BridgeException as e: + if e.condition == "item-not-found" or e.classname == "NotFound": + self.disp( + f"The node {self.args.node} doesn't exist on {self.args.service}", + error=True, + ) + self.host.quit(C.EXIT_NOT_FOUND) + else: + self.disp(f"can't synchronise pubsub node: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + except Exception as e: + self.disp(f"Internal error: {e}", error=True) + self.host.quit(C.EXIT_INTERNAL_ERROR) + else: + self.host.quit(C.EXIT_OK) + + +class CachePurge(base.CommandBase): + + def __init__(self, host): + super().__init__( + host, + "purge", + use_profile=False, + help=_("purge (delete) items from cache"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "-s", "--service", action="append", metavar="JID", dest="services", + help="purge items only for these services. If not specified, items from ALL " + "services will be purged. May be used several times." + ) + self.parser.add_argument( + "-n", "--node", action="append", dest="nodes", + help="purge items only for these nodes. If not specified, items from ALL " + "nodes will be purged. May be used several times." + ) + self.parser.add_argument( + "-p", "--profile", action="append", dest="profiles", + help="purge items only for these profiles. If not specified, items from ALL " + "profiles will be purged. May be used several times." + ) + self.parser.add_argument( + "-b", "--updated-before", type=base.date_decoder, metavar="TIME_PATTERN", + help="purge items which have been last updated before given time." + ) + self.parser.add_argument( + "-C", "--created-before", type=base.date_decoder, metavar="TIME_PATTERN", + help="purge items which have been last created before given time." + ) + self.parser.add_argument( + "-t", "--type", action="append", dest="types", + help="purge items flagged with TYPE. May be used several times." + ) + self.parser.add_argument( + "-S", "--subtype", action="append", dest="subtypes", + help="purge items flagged with SUBTYPE. May be used several times." + ) + self.parser.add_argument( + "-f", "--force", action="store_true", + help=_("purge items without confirmation") + ) + + async def start(self): + if not self.args.force: + await self.host.confirmOrQuit( + _( + "Are you sure to purge items from cache? You'll have to bypass cache " + "or resynchronise nodes to access deleted items again." + ), + _("Items purgins has been cancelled.") + ) + purge_data = {} + for key in ( + "services", "nodes", "profiles", "updated_before", "created_before", + "types", "subtypes" + ): + value = getattr(self.args, key) + if value is not None: + purge_data[key] = value + try: + await self.host.bridge.psCachePurge( + data_format.serialise( + purge_data + ) + ) + except Exception as e: + self.disp(f"Internal error: {e}", error=True) + self.host.quit(C.EXIT_INTERNAL_ERROR) + else: + self.host.quit(C.EXIT_OK) + + +class CacheReset(base.CommandBase): + + def __init__(self, host): + super().__init__( + host, + "reset", + use_profile=False, + help=_("remove everything from cache"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "-f", "--force", action="store_true", + help=_("reset cache without confirmation") + ) + + async def start(self): + if not self.args.force: + await self.host.confirmOrQuit( + _( + "Are you sure to reset cache? All nodes and items will be removed " + "from it, then it will be progressively refilled as if it were new. " + "This may be resources intensive." + ), + _("Pubsub cache reset has been cancelled.") + ) + try: + await self.host.bridge.psCacheReset() + except Exception as e: + self.disp(f"Internal error: {e}", error=True) + self.host.quit(C.EXIT_INTERNAL_ERROR) + else: + self.host.quit(C.EXIT_OK) + + +class CacheSearch(base.CommandBase): + def __init__(self, host): + extra_outputs = { + "default": self.default_output, + "xml": self.xml_output, + "xml-raw": self.xml_raw_output, + } + super().__init__( + host, + "search", + use_profile=False, + use_output=C.OUTPUT_LIST_DICT, + extra_outputs=extra_outputs, + help=_("search for pubsub items in cache"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "-f", "--fts", help=_("Full-Text Search query"), metavar="FTS_QUERY" + ) + self.parser.add_argument( + "-p", "--profile", action="append", dest="profiles", metavar="PROFILE", + help="search items only from these profiles. May be used several times." + ) + self.parser.add_argument( + "-s", "--service", action="append", dest="services", metavar="SERVICE", + help="items must be from specified service. May be used several times." + ) + self.parser.add_argument( + "-n", "--node", action="append", dest="nodes", metavar="NODE", + help="items must be in the specified node. May be used several times." + ) + self.parser.add_argument( + "-t", "--type", action="append", dest="types", metavar="TYPE", + help="items must be of specified type. May be used several times." + ) + self.parser.add_argument( + "-S", "--subtype", action="append", dest="subtypes", metavar="SUBTYPE", + help="items must be of specified subtype. May be used several times." + ) + self.parser.add_argument( + "-P", "--payload", action="store_true", help=_("include item XML payload") + ) + self.parser.add_argument( + "-o", "--order-by", action="append", nargs="+", + metavar=("ORDER", "[FIELD] [DIRECTION]"), + help=_("how items must be ordered. May be used several times.") + ) + self.parser.add_argument( + "-l", "--limit", type=int, help=_("maximum number of items to return") + ) + self.parser.add_argument( + "-i", "--index", type=int, help=_("return results starting from this index") + ) + self.parser.add_argument( + "-F", + "--field", + action="append", + nargs=3, + dest="fields", + default=[], + metavar=("PATH", "OPERATOR", "VALUE"), + help=_("parsed data field filter. May be used several times."), + ) + self.parser.add_argument( + "-k", + "--key", + action="append", + dest="keys", + metavar="KEY", + help=_( + "data key(s) to display. May be used several times. DEFAULT: show all " + "keys" + ), + ) + + async def start(self): + query = {} + for arg in ("fts", "profiles", "services", "nodes", "types", "subtypes"): + value = getattr(self.args, arg) + if value: + if arg in ("types", "subtypes"): + # empty string is used to find items without type and/or subtype + value = [v or None for v in value] + query[arg] = value + for arg in ("limit", "index"): + value = getattr(self.args, arg) + if value is not None: + query[arg] = value + if self.args.order_by is not None: + for order_data in self.args.order_by: + order, *args = order_data + if order == "field": + if not args: + self.parser.error(_("field data must be specified in --order-by")) + elif len(args) == 1: + path = args[0] + direction = "asc" + elif len(args) == 2: + path, direction = args + else: + self.parser.error(_( + "You can't specify more that 2 arguments for a field in " + "--order-by" + )) + try: + path = json.loads(path) + except json.JSONDecodeError: + pass + order_query = { + "path": path, + } + else: + order_query = { + "order": order + } + if not args: + direction = "asc" + elif len(args) == 1: + direction = args[0] + else: + self.parser.error(_( + "there are too many arguments in --order-by option" + )) + if direction.lower() not in ("asc", "desc"): + self.parser.error(_("invalid --order-by direction: {direction!r}")) + order_query["direction"] = direction + query.setdefault("order-by", []).append(order_query) + + if self.args.fields: + parsed = [] + for field in self.args.fields: + path, operator, value = field + try: + path = json.loads(path) + except json.JSONDecodeError: + # this is not a JSON encoded value, we keep it as a string + pass + + if not isinstance(path, list): + path = [path] + + # handling of TP(<time pattern>) + if operator in (">", "gt", "<", "le", "between"): + def datetime_sub(match): + return str(date_utils.date_parse_ext( + match.group(1), default_tz=date_utils.TZ_LOCAL + )) + value = re.sub(r"\bTP\(([^)]+)\)", datetime_sub, value) + + try: + value = json.loads(value) + except json.JSONDecodeError: + # not JSON, as above we keep it as string + pass + + if operator in ("overlap", "ioverlap", "disjoint", "idisjoint"): + if not isinstance(value, list): + value = [value] + + parsed.append({ + "path": path, + "op": operator, + "value": value + }) + + query["parsed"] = parsed + + if self.args.payload or "xml" in self.args.output: + query["with_payload"] = True + if self.args.keys: + self.args.keys.append("item_payload") + try: + found_items = data_format.deserialise( + await self.host.bridge.psCacheSearch( + data_format.serialise(query) + ), + type_check=list, + ) + except BridgeException as e: + self.disp(f"can't search for pubsub items in cache: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + except Exception as e: + self.disp(f"Internal error: {e}", error=True) + self.host.quit(C.EXIT_INTERNAL_ERROR) + else: + if self.args.keys: + found_items = [ + {k: v for k,v in item.items() if k in self.args.keys} + for item in found_items + ] + await self.output(found_items) + self.host.quit(C.EXIT_OK) + + def default_output(self, found_items): + for item in found_items: + for field in ("created", "published", "updated"): + try: + timestamp = item[field] + except KeyError: + pass + else: + try: + item[field] = common.format_time(timestamp) + except ValueError: + pass + self.host._outputs[C.OUTPUT_LIST_DICT]["simple"]["callback"](found_items) + + def xml_output(self, found_items): + """Output prettified item payload""" + cb = self.host._outputs[C.OUTPUT_XML][C.OUTPUT_NAME_XML]["callback"] + for item in found_items: + cb(item["item_payload"]) + + def xml_raw_output(self, found_items): + """Output item payload without prettifying""" + cb = self.host._outputs[C.OUTPUT_XML][C.OUTPUT_NAME_XML_RAW]["callback"] + for item in found_items: + cb(item["item_payload"]) + + +class Cache(base.CommandBase): + subcommands = ( + CacheGet, + CacheSync, + CachePurge, + CacheReset, + CacheSearch, + ) + + def __init__(self, host): + super(Cache, self).__init__( + host, "cache", use_profile=False, help=_("pubsub cache handling") + ) + + class Set(base.CommandBase): def __init__(self, host): base.CommandBase.__init__( @@ -823,7 +1262,7 @@ "get", use_output=C.OUTPUT_LIST_XML, use_pubsub=True, - pubsub_flags={C.NODE, C.MULTI_ITEMS}, + pubsub_flags={C.NODE, C.MULTI_ITEMS, C.CACHE}, help=_("get pubsub item(s)"), ) @@ -835,7 +1274,6 @@ help=_("subscription id"), ) # TODO: a key(s) argument to select keys to display - # TODO: add MAM filters async def start(self): try: @@ -884,7 +1322,8 @@ "-f", "--force", action="store_true", help=_("delete without confirmation") ) self.parser.add_argument( - "-N", "--notify", action="store_true", help=_("notify deletion") + "--no-notification", dest="notify", action="store_false", + help=_("do not send notification (not recommended)") ) async def start(self): @@ -955,7 +1394,7 @@ items = [item] if item else [] ps_result = data_format.deserialise( await self.host.bridge.psItemsGet( - service, node, 1, items, "", {}, self.profile + service, node, 1, items, "", data_format.serialise({}), self.profile ) ) item_raw = ps_result["items"][0] @@ -1687,7 +2126,7 @@ self.args.rsm_max, self.args.items, "", - extra, + data_format.serialise(extra), self.profile, ) ) @@ -2054,12 +2493,13 @@ Subscribe, Unsubscribe, Subscriptions, - Node, Affiliations, Search, Transform, Hook, Uri, + Node, + Cache, ) def __init__(self, host):