Mercurial > libervia-backend
diff sat_frontends/jp/cmd_pubsub.py @ 3668:0eacda79b5d1
CLI (pubsub/cache): `search` implementation
rel 361
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 08 Sep 2021 17:58:48 +0200 |
parents | 7f503b20597e |
children | 5bda9d2e8b35 |
line wrap: on
line diff
--- a/sat_frontends/jp/cmd_pubsub.py Wed Sep 08 17:58:48 2021 +0200 +++ b/sat_frontends/jp/cmd_pubsub.py Wed Sep 08 17:58:48 2021 +0200 @@ -24,6 +24,7 @@ import sys import subprocess import asyncio +import json from . import base from sat.core.i18n import _ from sat.core import exceptions @@ -35,6 +36,7 @@ from sat.tools.common import data_format from sat.tools.common import uri from sat.tools.common.ansi import ANSI as A +from sat.tools.common import date_utils from sat_frontends.tools import jid, strings from sat_frontends.bridge.bridge_frontend import BridgeException @@ -949,7 +951,8 @@ await self.host.confirmOrQuit( _( "Are you sure to reset cache? All nodes and items will be removed " - "from it, then it will be progressively refilled as if it were new." + "from it, then it will be progressively refilled as if it were new. " + "This may be resources intensive." ), _("Pubsub cache reset has been cancelled.") ) @@ -962,12 +965,234 @@ self.host.quit(C.EXIT_OK) +class CacheSearch(base.CommandBase): + def __init__(self, host): + extra_outputs = { + "default": self.default_output, + "xml": self.xml_output, + "xml-raw": self.xml_raw_output, + } + super().__init__( + host, + "search", + use_profile=False, + use_output=C.OUTPUT_LIST_DICT, + extra_outputs=extra_outputs, + help=_("search for pubsub items in cache"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "-f", "--fts", help=_("Full-Text Search query"), metavar="FTS_QUERY" + ) + self.parser.add_argument( + "-p", "--profile", action="append", dest="profiles", metavar="PROFILE", + help="search items only from these profiles. May be used several times." + ) + self.parser.add_argument( + "-s", "--service", action="append", dest="services", metavar="SERVICE", + help="items must be from specified service. May be used several times." + ) + self.parser.add_argument( + "-n", "--node", action="append", dest="nodes", metavar="NODE", + help="items must be in the specified node. May be used several times." + ) + self.parser.add_argument( + "-t", "--type", action="append", dest="types", metavar="TYPE", + help="items must be of specified type. May be used several times." + ) + self.parser.add_argument( + "-S", "--subtype", action="append", dest="subtypes", metavar="SUBTYPE", + help="items must be of specified subtype. May be used several times." + ) + self.parser.add_argument( + "-P", "--payload", action="store_true", help=_("include item XML payload") + ) + self.parser.add_argument( + "-o", "--order-by", action="append", nargs="+", + metavar=("ORDER", "[FIELD] [DIRECTION]"), + help=_("how items must be ordered. May be used several times.") + ) + self.parser.add_argument( + "-l", "--limit", type=int, help=_("maximum number of items to return") + ) + self.parser.add_argument( + "-i", "--index", type=int, help=_("return results starting from this index") + ) + self.parser.add_argument( + "-F", + "--field", + action="append", + nargs=3, + dest="fields", + default=[], + metavar=("PATH", "OPERATOR", "VALUE"), + help=_("parsed data field filter. May be used several times."), + ) + self.parser.add_argument( + "-k", + "--key", + action="append", + dest="keys", + metavar="KEY", + help=_( + "data key(s) to display. May be used several times. DEFAULT: show all " + "keys" + ), + ) + + async def start(self): + query = {} + for arg in ("fts", "profiles", "services", "nodes", "types", "subtypes"): + value = getattr(self.args, arg) + if value: + if arg in ("types", "subtypes"): + # empty string is used to find items without type and/or subtype + value = [v or None for v in value] + query[arg] = value + for arg in ("limit", "index"): + value = getattr(self.args, arg) + if value is not None: + query[arg] = value + if self.args.order_by is not None: + for order_data in self.args.order_by: + order, *args = order_data + if order == "field": + if not args: + self.parser.error(_("field data must be specified in --order-by")) + elif len(args) == 1: + path = args[0] + direction = "asc" + elif len(args) == 2: + path, direction = args + else: + self.parser.error(_( + "You can't specify more that 2 arguments for a field in " + "--order-by" + )) + try: + path = json.loads(path) + except json.JSONDecodeError: + pass + order_query = { + "path": path, + } + else: + order_query = { + "order": order + } + if not args: + direction = "asc" + elif len(args) == 1: + direction = args[0] + else: + self.parser.error(_( + "there are too many arguments in --order-by option" + )) + if direction.lower() not in ("asc", "desc"): + self.parser.error(_("invalid --order-by direction: {direction!r}")) + order_query["direction"] = direction + query.setdefault("order-by", []).append(order_query) + + if self.args.fields: + parsed = [] + for field in self.args.fields: + path, operator, value = field + try: + path = json.loads(path) + except json.JSONDecodeError: + # this is not a JSON encoded value, we keep it as a string + pass + + if not isinstance(path, list): + path = [path] + + # handling of TP(<time pattern>) + if operator in (">", "gt", "<", "le", "between"): + def datetime_sub(match): + return str(date_utils.date_parse_ext( + match.group(1), default_tz=date_utils.TZ_LOCAL + )) + value = re.sub(r"\bTP\(([^)]+)\)", datetime_sub, value) + + try: + value = json.loads(value) + except json.JSONDecodeError: + # not JSON, as above we keep it as string + pass + + if operator in ("overlap", "ioverlap", "disjoint", "idisjoint"): + if not isinstance(value, list): + value = [value] + + parsed.append({ + "path": path, + "op": operator, + "value": value + }) + + query["parsed"] = parsed + + if self.args.payload or "xml" in self.args.output: + query["with_payload"] = True + if self.args.keys: + self.args.keys.append("item_payload") + try: + found_items = data_format.deserialise( + await self.host.bridge.psCacheSearch( + data_format.serialise(query) + ), + type_check=list, + ) + except BridgeException as e: + self.disp(f"can't search for pubsub items in cache: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + except Exception as e: + self.disp(f"Internal error: {e}", error=True) + self.host.quit(C.EXIT_INTERNAL_ERROR) + else: + if self.args.keys: + found_items = [ + {k: v for k,v in item.items() if k in self.args.keys} + for item in found_items + ] + await self.output(found_items) + self.host.quit(C.EXIT_OK) + + def default_output(self, found_items): + for item in found_items: + for field in ("created", "published", "updated"): + try: + timestamp = item[field] + except KeyError: + pass + else: + try: + item[field] = common.format_time(timestamp) + except ValueError: + pass + self.host._outputs[C.OUTPUT_LIST_DICT]["simple"]["callback"](found_items) + + def xml_output(self, found_items): + """Output prettified item payload""" + cb = self.host._outputs[C.OUTPUT_XML][C.OUTPUT_NAME_XML]["callback"] + for item in found_items: + cb(item["item_payload"]) + + def xml_raw_output(self, found_items): + """Output item payload without prettifying""" + cb = self.host._outputs[C.OUTPUT_XML][C.OUTPUT_NAME_XML_RAW]["callback"] + for item in found_items: + cb(item["item_payload"]) + + class Cache(base.CommandBase): subcommands = ( CacheGet, CacheSync, CachePurge, CacheReset, + CacheSearch, ) def __init__(self, host):