comparison sat_frontends/jp/cmd_pubsub.py @ 3668:0eacda79b5d1

CLI (pubsub/cache): `search` implementation rel 361
author Goffi <goffi@goffi.org>
date Wed, 08 Sep 2021 17:58:48 +0200
parents 7f503b20597e
children 5bda9d2e8b35
comparison
equal deleted inserted replaced
3667:9ca19b317293 3668:0eacda79b5d1
22 import os.path 22 import os.path
23 import re 23 import re
24 import sys 24 import sys
25 import subprocess 25 import subprocess
26 import asyncio 26 import asyncio
27 import json
27 from . import base 28 from . import base
28 from sat.core.i18n import _ 29 from sat.core.i18n import _
29 from sat.core import exceptions 30 from sat.core import exceptions
30 from sat_frontends.jp.constants import Const as C 31 from sat_frontends.jp.constants import Const as C
31 from sat_frontends.jp import common 32 from sat_frontends.jp import common
33 from sat_frontends.jp import xml_tools 34 from sat_frontends.jp import xml_tools
34 from functools import partial 35 from functools import partial
35 from sat.tools.common import data_format 36 from sat.tools.common import data_format
36 from sat.tools.common import uri 37 from sat.tools.common import uri
37 from sat.tools.common.ansi import ANSI as A 38 from sat.tools.common.ansi import ANSI as A
39 from sat.tools.common import date_utils
38 from sat_frontends.tools import jid, strings 40 from sat_frontends.tools import jid, strings
39 from sat_frontends.bridge.bridge_frontend import BridgeException 41 from sat_frontends.bridge.bridge_frontend import BridgeException
40 42
41 __commands__ = ["Pubsub"] 43 __commands__ = ["Pubsub"]
42 44
947 async def start(self): 949 async def start(self):
948 if not self.args.force: 950 if not self.args.force:
949 await self.host.confirmOrQuit( 951 await self.host.confirmOrQuit(
950 _( 952 _(
951 "Are you sure to reset cache? All nodes and items will be removed " 953 "Are you sure to reset cache? All nodes and items will be removed "
952 "from it, then it will be progressively refilled as if it were new." 954 "from it, then it will be progressively refilled as if it were new. "
955 "This may be resources intensive."
953 ), 956 ),
954 _("Pubsub cache reset has been cancelled.") 957 _("Pubsub cache reset has been cancelled.")
955 ) 958 )
956 try: 959 try:
957 await self.host.bridge.psCacheReset() 960 await self.host.bridge.psCacheReset()
958 except Exception as e: 961 except Exception as e:
959 self.disp(f"Internal error: {e}", error=True) 962 self.disp(f"Internal error: {e}", error=True)
960 self.host.quit(C.EXIT_INTERNAL_ERROR) 963 self.host.quit(C.EXIT_INTERNAL_ERROR)
961 else: 964 else:
962 self.host.quit(C.EXIT_OK) 965 self.host.quit(C.EXIT_OK)
966
967
968 class CacheSearch(base.CommandBase):
969 def __init__(self, host):
970 extra_outputs = {
971 "default": self.default_output,
972 "xml": self.xml_output,
973 "xml-raw": self.xml_raw_output,
974 }
975 super().__init__(
976 host,
977 "search",
978 use_profile=False,
979 use_output=C.OUTPUT_LIST_DICT,
980 extra_outputs=extra_outputs,
981 help=_("search for pubsub items in cache"),
982 )
983
984 def add_parser_options(self):
985 self.parser.add_argument(
986 "-f", "--fts", help=_("Full-Text Search query"), metavar="FTS_QUERY"
987 )
988 self.parser.add_argument(
989 "-p", "--profile", action="append", dest="profiles", metavar="PROFILE",
990 help="search items only from these profiles. May be used several times."
991 )
992 self.parser.add_argument(
993 "-s", "--service", action="append", dest="services", metavar="SERVICE",
994 help="items must be from specified service. May be used several times."
995 )
996 self.parser.add_argument(
997 "-n", "--node", action="append", dest="nodes", metavar="NODE",
998 help="items must be in the specified node. May be used several times."
999 )
1000 self.parser.add_argument(
1001 "-t", "--type", action="append", dest="types", metavar="TYPE",
1002 help="items must be of specified type. May be used several times."
1003 )
1004 self.parser.add_argument(
1005 "-S", "--subtype", action="append", dest="subtypes", metavar="SUBTYPE",
1006 help="items must be of specified subtype. May be used several times."
1007 )
1008 self.parser.add_argument(
1009 "-P", "--payload", action="store_true", help=_("include item XML payload")
1010 )
1011 self.parser.add_argument(
1012 "-o", "--order-by", action="append", nargs="+",
1013 metavar=("ORDER", "[FIELD] [DIRECTION]"),
1014 help=_("how items must be ordered. May be used several times.")
1015 )
1016 self.parser.add_argument(
1017 "-l", "--limit", type=int, help=_("maximum number of items to return")
1018 )
1019 self.parser.add_argument(
1020 "-i", "--index", type=int, help=_("return results starting from this index")
1021 )
1022 self.parser.add_argument(
1023 "-F",
1024 "--field",
1025 action="append",
1026 nargs=3,
1027 dest="fields",
1028 default=[],
1029 metavar=("PATH", "OPERATOR", "VALUE"),
1030 help=_("parsed data field filter. May be used several times."),
1031 )
1032 self.parser.add_argument(
1033 "-k",
1034 "--key",
1035 action="append",
1036 dest="keys",
1037 metavar="KEY",
1038 help=_(
1039 "data key(s) to display. May be used several times. DEFAULT: show all "
1040 "keys"
1041 ),
1042 )
1043
1044 async def start(self):
1045 query = {}
1046 for arg in ("fts", "profiles", "services", "nodes", "types", "subtypes"):
1047 value = getattr(self.args, arg)
1048 if value:
1049 if arg in ("types", "subtypes"):
1050 # empty string is used to find items without type and/or subtype
1051 value = [v or None for v in value]
1052 query[arg] = value
1053 for arg in ("limit", "index"):
1054 value = getattr(self.args, arg)
1055 if value is not None:
1056 query[arg] = value
1057 if self.args.order_by is not None:
1058 for order_data in self.args.order_by:
1059 order, *args = order_data
1060 if order == "field":
1061 if not args:
1062 self.parser.error(_("field data must be specified in --order-by"))
1063 elif len(args) == 1:
1064 path = args[0]
1065 direction = "asc"
1066 elif len(args) == 2:
1067 path, direction = args
1068 else:
1069 self.parser.error(_(
1070 "You can't specify more that 2 arguments for a field in "
1071 "--order-by"
1072 ))
1073 try:
1074 path = json.loads(path)
1075 except json.JSONDecodeError:
1076 pass
1077 order_query = {
1078 "path": path,
1079 }
1080 else:
1081 order_query = {
1082 "order": order
1083 }
1084 if not args:
1085 direction = "asc"
1086 elif len(args) == 1:
1087 direction = args[0]
1088 else:
1089 self.parser.error(_(
1090 "there are too many arguments in --order-by option"
1091 ))
1092 if direction.lower() not in ("asc", "desc"):
1093 self.parser.error(_("invalid --order-by direction: {direction!r}"))
1094 order_query["direction"] = direction
1095 query.setdefault("order-by", []).append(order_query)
1096
1097 if self.args.fields:
1098 parsed = []
1099 for field in self.args.fields:
1100 path, operator, value = field
1101 try:
1102 path = json.loads(path)
1103 except json.JSONDecodeError:
1104 # this is not a JSON encoded value, we keep it as a string
1105 pass
1106
1107 if not isinstance(path, list):
1108 path = [path]
1109
1110 # handling of TP(<time pattern>)
1111 if operator in (">", "gt", "<", "le", "between"):
1112 def datetime_sub(match):
1113 return str(date_utils.date_parse_ext(
1114 match.group(1), default_tz=date_utils.TZ_LOCAL
1115 ))
1116 value = re.sub(r"\bTP\(([^)]+)\)", datetime_sub, value)
1117
1118 try:
1119 value = json.loads(value)
1120 except json.JSONDecodeError:
1121 # not JSON, as above we keep it as string
1122 pass
1123
1124 if operator in ("overlap", "ioverlap", "disjoint", "idisjoint"):
1125 if not isinstance(value, list):
1126 value = [value]
1127
1128 parsed.append({
1129 "path": path,
1130 "op": operator,
1131 "value": value
1132 })
1133
1134 query["parsed"] = parsed
1135
1136 if self.args.payload or "xml" in self.args.output:
1137 query["with_payload"] = True
1138 if self.args.keys:
1139 self.args.keys.append("item_payload")
1140 try:
1141 found_items = data_format.deserialise(
1142 await self.host.bridge.psCacheSearch(
1143 data_format.serialise(query)
1144 ),
1145 type_check=list,
1146 )
1147 except BridgeException as e:
1148 self.disp(f"can't search for pubsub items in cache: {e}", error=True)
1149 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
1150 except Exception as e:
1151 self.disp(f"Internal error: {e}", error=True)
1152 self.host.quit(C.EXIT_INTERNAL_ERROR)
1153 else:
1154 if self.args.keys:
1155 found_items = [
1156 {k: v for k,v in item.items() if k in self.args.keys}
1157 for item in found_items
1158 ]
1159 await self.output(found_items)
1160 self.host.quit(C.EXIT_OK)
1161
1162 def default_output(self, found_items):
1163 for item in found_items:
1164 for field in ("created", "published", "updated"):
1165 try:
1166 timestamp = item[field]
1167 except KeyError:
1168 pass
1169 else:
1170 try:
1171 item[field] = common.format_time(timestamp)
1172 except ValueError:
1173 pass
1174 self.host._outputs[C.OUTPUT_LIST_DICT]["simple"]["callback"](found_items)
1175
1176 def xml_output(self, found_items):
1177 """Output prettified item payload"""
1178 cb = self.host._outputs[C.OUTPUT_XML][C.OUTPUT_NAME_XML]["callback"]
1179 for item in found_items:
1180 cb(item["item_payload"])
1181
1182 def xml_raw_output(self, found_items):
1183 """Output item payload without prettifying"""
1184 cb = self.host._outputs[C.OUTPUT_XML][C.OUTPUT_NAME_XML_RAW]["callback"]
1185 for item in found_items:
1186 cb(item["item_payload"])
963 1187
964 1188
965 class Cache(base.CommandBase): 1189 class Cache(base.CommandBase):
966 subcommands = ( 1190 subcommands = (
967 CacheGet, 1191 CacheGet,
968 CacheSync, 1192 CacheSync,
969 CachePurge, 1193 CachePurge,
970 CacheReset, 1194 CacheReset,
1195 CacheSearch,
971 ) 1196 )
972 1197
973 def __init__(self, host): 1198 def __init__(self, host):
974 super(Cache, self).__init__( 1199 super(Cache, self).__init__(
975 host, "cache", use_profile=False, help=_("pubsub cache handling") 1200 host, "cache", use_profile=False, help=_("pubsub cache handling")