Mercurial > libervia-backend
comparison sat_frontends/jp/cmd_pubsub.py @ 3668:0eacda79b5d1
CLI (pubsub/cache): `search` implementation
rel 361
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 08 Sep 2021 17:58:48 +0200 |
parents | 7f503b20597e |
children | 5bda9d2e8b35 |
comparison
equal
deleted
inserted
replaced
3667:9ca19b317293 | 3668:0eacda79b5d1 |
---|---|
22 import os.path | 22 import os.path |
23 import re | 23 import re |
24 import sys | 24 import sys |
25 import subprocess | 25 import subprocess |
26 import asyncio | 26 import asyncio |
27 import json | |
27 from . import base | 28 from . import base |
28 from sat.core.i18n import _ | 29 from sat.core.i18n import _ |
29 from sat.core import exceptions | 30 from sat.core import exceptions |
30 from sat_frontends.jp.constants import Const as C | 31 from sat_frontends.jp.constants import Const as C |
31 from sat_frontends.jp import common | 32 from sat_frontends.jp import common |
33 from sat_frontends.jp import xml_tools | 34 from sat_frontends.jp import xml_tools |
34 from functools import partial | 35 from functools import partial |
35 from sat.tools.common import data_format | 36 from sat.tools.common import data_format |
36 from sat.tools.common import uri | 37 from sat.tools.common import uri |
37 from sat.tools.common.ansi import ANSI as A | 38 from sat.tools.common.ansi import ANSI as A |
39 from sat.tools.common import date_utils | |
38 from sat_frontends.tools import jid, strings | 40 from sat_frontends.tools import jid, strings |
39 from sat_frontends.bridge.bridge_frontend import BridgeException | 41 from sat_frontends.bridge.bridge_frontend import BridgeException |
40 | 42 |
41 __commands__ = ["Pubsub"] | 43 __commands__ = ["Pubsub"] |
42 | 44 |
947 async def start(self): | 949 async def start(self): |
948 if not self.args.force: | 950 if not self.args.force: |
949 await self.host.confirmOrQuit( | 951 await self.host.confirmOrQuit( |
950 _( | 952 _( |
951 "Are you sure to reset cache? All nodes and items will be removed " | 953 "Are you sure to reset cache? All nodes and items will be removed " |
952 "from it, then it will be progressively refilled as if it were new." | 954 "from it, then it will be progressively refilled as if it were new. " |
955 "This may be resources intensive." | |
953 ), | 956 ), |
954 _("Pubsub cache reset has been cancelled.") | 957 _("Pubsub cache reset has been cancelled.") |
955 ) | 958 ) |
956 try: | 959 try: |
957 await self.host.bridge.psCacheReset() | 960 await self.host.bridge.psCacheReset() |
958 except Exception as e: | 961 except Exception as e: |
959 self.disp(f"Internal error: {e}", error=True) | 962 self.disp(f"Internal error: {e}", error=True) |
960 self.host.quit(C.EXIT_INTERNAL_ERROR) | 963 self.host.quit(C.EXIT_INTERNAL_ERROR) |
961 else: | 964 else: |
962 self.host.quit(C.EXIT_OK) | 965 self.host.quit(C.EXIT_OK) |
966 | |
967 | |
968 class CacheSearch(base.CommandBase): | |
969 def __init__(self, host): | |
970 extra_outputs = { | |
971 "default": self.default_output, | |
972 "xml": self.xml_output, | |
973 "xml-raw": self.xml_raw_output, | |
974 } | |
975 super().__init__( | |
976 host, | |
977 "search", | |
978 use_profile=False, | |
979 use_output=C.OUTPUT_LIST_DICT, | |
980 extra_outputs=extra_outputs, | |
981 help=_("search for pubsub items in cache"), | |
982 ) | |
983 | |
984 def add_parser_options(self): | |
985 self.parser.add_argument( | |
986 "-f", "--fts", help=_("Full-Text Search query"), metavar="FTS_QUERY" | |
987 ) | |
988 self.parser.add_argument( | |
989 "-p", "--profile", action="append", dest="profiles", metavar="PROFILE", | |
990 help="search items only from these profiles. May be used several times." | |
991 ) | |
992 self.parser.add_argument( | |
993 "-s", "--service", action="append", dest="services", metavar="SERVICE", | |
994 help="items must be from specified service. May be used several times." | |
995 ) | |
996 self.parser.add_argument( | |
997 "-n", "--node", action="append", dest="nodes", metavar="NODE", | |
998 help="items must be in the specified node. May be used several times." | |
999 ) | |
1000 self.parser.add_argument( | |
1001 "-t", "--type", action="append", dest="types", metavar="TYPE", | |
1002 help="items must be of specified type. May be used several times." | |
1003 ) | |
1004 self.parser.add_argument( | |
1005 "-S", "--subtype", action="append", dest="subtypes", metavar="SUBTYPE", | |
1006 help="items must be of specified subtype. May be used several times." | |
1007 ) | |
1008 self.parser.add_argument( | |
1009 "-P", "--payload", action="store_true", help=_("include item XML payload") | |
1010 ) | |
1011 self.parser.add_argument( | |
1012 "-o", "--order-by", action="append", nargs="+", | |
1013 metavar=("ORDER", "[FIELD] [DIRECTION]"), | |
1014 help=_("how items must be ordered. May be used several times.") | |
1015 ) | |
1016 self.parser.add_argument( | |
1017 "-l", "--limit", type=int, help=_("maximum number of items to return") | |
1018 ) | |
1019 self.parser.add_argument( | |
1020 "-i", "--index", type=int, help=_("return results starting from this index") | |
1021 ) | |
1022 self.parser.add_argument( | |
1023 "-F", | |
1024 "--field", | |
1025 action="append", | |
1026 nargs=3, | |
1027 dest="fields", | |
1028 default=[], | |
1029 metavar=("PATH", "OPERATOR", "VALUE"), | |
1030 help=_("parsed data field filter. May be used several times."), | |
1031 ) | |
1032 self.parser.add_argument( | |
1033 "-k", | |
1034 "--key", | |
1035 action="append", | |
1036 dest="keys", | |
1037 metavar="KEY", | |
1038 help=_( | |
1039 "data key(s) to display. May be used several times. DEFAULT: show all " | |
1040 "keys" | |
1041 ), | |
1042 ) | |
1043 | |
1044 async def start(self): | |
1045 query = {} | |
1046 for arg in ("fts", "profiles", "services", "nodes", "types", "subtypes"): | |
1047 value = getattr(self.args, arg) | |
1048 if value: | |
1049 if arg in ("types", "subtypes"): | |
1050 # empty string is used to find items without type and/or subtype | |
1051 value = [v or None for v in value] | |
1052 query[arg] = value | |
1053 for arg in ("limit", "index"): | |
1054 value = getattr(self.args, arg) | |
1055 if value is not None: | |
1056 query[arg] = value | |
1057 if self.args.order_by is not None: | |
1058 for order_data in self.args.order_by: | |
1059 order, *args = order_data | |
1060 if order == "field": | |
1061 if not args: | |
1062 self.parser.error(_("field data must be specified in --order-by")) | |
1063 elif len(args) == 1: | |
1064 path = args[0] | |
1065 direction = "asc" | |
1066 elif len(args) == 2: | |
1067 path, direction = args | |
1068 else: | |
1069 self.parser.error(_( | |
1070 "You can't specify more that 2 arguments for a field in " | |
1071 "--order-by" | |
1072 )) | |
1073 try: | |
1074 path = json.loads(path) | |
1075 except json.JSONDecodeError: | |
1076 pass | |
1077 order_query = { | |
1078 "path": path, | |
1079 } | |
1080 else: | |
1081 order_query = { | |
1082 "order": order | |
1083 } | |
1084 if not args: | |
1085 direction = "asc" | |
1086 elif len(args) == 1: | |
1087 direction = args[0] | |
1088 else: | |
1089 self.parser.error(_( | |
1090 "there are too many arguments in --order-by option" | |
1091 )) | |
1092 if direction.lower() not in ("asc", "desc"): | |
1093 self.parser.error(_("invalid --order-by direction: {direction!r}")) | |
1094 order_query["direction"] = direction | |
1095 query.setdefault("order-by", []).append(order_query) | |
1096 | |
1097 if self.args.fields: | |
1098 parsed = [] | |
1099 for field in self.args.fields: | |
1100 path, operator, value = field | |
1101 try: | |
1102 path = json.loads(path) | |
1103 except json.JSONDecodeError: | |
1104 # this is not a JSON encoded value, we keep it as a string | |
1105 pass | |
1106 | |
1107 if not isinstance(path, list): | |
1108 path = [path] | |
1109 | |
1110 # handling of TP(<time pattern>) | |
1111 if operator in (">", "gt", "<", "le", "between"): | |
1112 def datetime_sub(match): | |
1113 return str(date_utils.date_parse_ext( | |
1114 match.group(1), default_tz=date_utils.TZ_LOCAL | |
1115 )) | |
1116 value = re.sub(r"\bTP\(([^)]+)\)", datetime_sub, value) | |
1117 | |
1118 try: | |
1119 value = json.loads(value) | |
1120 except json.JSONDecodeError: | |
1121 # not JSON, as above we keep it as string | |
1122 pass | |
1123 | |
1124 if operator in ("overlap", "ioverlap", "disjoint", "idisjoint"): | |
1125 if not isinstance(value, list): | |
1126 value = [value] | |
1127 | |
1128 parsed.append({ | |
1129 "path": path, | |
1130 "op": operator, | |
1131 "value": value | |
1132 }) | |
1133 | |
1134 query["parsed"] = parsed | |
1135 | |
1136 if self.args.payload or "xml" in self.args.output: | |
1137 query["with_payload"] = True | |
1138 if self.args.keys: | |
1139 self.args.keys.append("item_payload") | |
1140 try: | |
1141 found_items = data_format.deserialise( | |
1142 await self.host.bridge.psCacheSearch( | |
1143 data_format.serialise(query) | |
1144 ), | |
1145 type_check=list, | |
1146 ) | |
1147 except BridgeException as e: | |
1148 self.disp(f"can't search for pubsub items in cache: {e}", error=True) | |
1149 self.host.quit(C.EXIT_BRIDGE_ERRBACK) | |
1150 except Exception as e: | |
1151 self.disp(f"Internal error: {e}", error=True) | |
1152 self.host.quit(C.EXIT_INTERNAL_ERROR) | |
1153 else: | |
1154 if self.args.keys: | |
1155 found_items = [ | |
1156 {k: v for k,v in item.items() if k in self.args.keys} | |
1157 for item in found_items | |
1158 ] | |
1159 await self.output(found_items) | |
1160 self.host.quit(C.EXIT_OK) | |
1161 | |
1162 def default_output(self, found_items): | |
1163 for item in found_items: | |
1164 for field in ("created", "published", "updated"): | |
1165 try: | |
1166 timestamp = item[field] | |
1167 except KeyError: | |
1168 pass | |
1169 else: | |
1170 try: | |
1171 item[field] = common.format_time(timestamp) | |
1172 except ValueError: | |
1173 pass | |
1174 self.host._outputs[C.OUTPUT_LIST_DICT]["simple"]["callback"](found_items) | |
1175 | |
1176 def xml_output(self, found_items): | |
1177 """Output prettified item payload""" | |
1178 cb = self.host._outputs[C.OUTPUT_XML][C.OUTPUT_NAME_XML]["callback"] | |
1179 for item in found_items: | |
1180 cb(item["item_payload"]) | |
1181 | |
1182 def xml_raw_output(self, found_items): | |
1183 """Output item payload without prettifying""" | |
1184 cb = self.host._outputs[C.OUTPUT_XML][C.OUTPUT_NAME_XML_RAW]["callback"] | |
1185 for item in found_items: | |
1186 cb(item["item_payload"]) | |
963 | 1187 |
964 | 1188 |
965 class Cache(base.CommandBase): | 1189 class Cache(base.CommandBase): |
966 subcommands = ( | 1190 subcommands = ( |
967 CacheGet, | 1191 CacheGet, |
968 CacheSync, | 1192 CacheSync, |
969 CachePurge, | 1193 CachePurge, |
970 CacheReset, | 1194 CacheReset, |
1195 CacheSearch, | |
971 ) | 1196 ) |
972 | 1197 |
973 def __init__(self, host): | 1198 def __init__(self, host): |
974 super(Cache, self).__init__( | 1199 super(Cache, self).__init__( |
975 host, "cache", use_profile=False, help=_("pubsub cache handling") | 1200 host, "cache", use_profile=False, help=_("pubsub cache handling") |