comparison sat_frontends/jp/cmd_pubsub.py @ 3715:b9718216a1c0 0.9

merge bookmark 0.9
author Goffi <goffi@goffi.org>
date Wed, 01 Dec 2021 16:13:31 +0100
parents 0eacda79b5d1
children 5bda9d2e8b35
comparison
equal deleted inserted replaced
3714:af09b5aaa5d7 3715:b9718216a1c0
22 import os.path 22 import os.path
23 import re 23 import re
24 import sys 24 import sys
25 import subprocess 25 import subprocess
26 import asyncio 26 import asyncio
27 import json
27 from . import base 28 from . import base
28 from sat.core.i18n import _ 29 from sat.core.i18n import _
29 from sat.core import exceptions 30 from sat.core import exceptions
30 from sat_frontends.jp.constants import Const as C 31 from sat_frontends.jp.constants import Const as C
31 from sat_frontends.jp import common 32 from sat_frontends.jp import common
33 from sat_frontends.jp import xml_tools 34 from sat_frontends.jp import xml_tools
34 from functools import partial 35 from functools import partial
35 from sat.tools.common import data_format 36 from sat.tools.common import data_format
36 from sat.tools.common import uri 37 from sat.tools.common import uri
37 from sat.tools.common.ansi import ANSI as A 38 from sat.tools.common.ansi import ANSI as A
39 from sat.tools.common import date_utils
38 from sat_frontends.tools import jid, strings 40 from sat_frontends.tools import jid, strings
39 from sat_frontends.bridge.bridge_frontend import BridgeException 41 from sat_frontends.bridge.bridge_frontend import BridgeException
40 42
41 __commands__ = ["Pubsub"] 43 __commands__ = ["Pubsub"]
42 44
760 super(Node, self).__init__( 762 super(Node, self).__init__(
761 host, "node", use_profile=False, help=_("node handling") 763 host, "node", use_profile=False, help=_("node handling")
762 ) 764 )
763 765
764 766
767 class CacheGet(base.CommandBase):
768 def __init__(self, host):
769 super().__init__(
770 host,
771 "get",
772 use_output=C.OUTPUT_LIST_XML,
773 use_pubsub=True,
774 pubsub_flags={C.NODE, C.MULTI_ITEMS, C.CACHE},
775 help=_("get pubsub item(s) from cache"),
776 )
777
778 def add_parser_options(self):
779 self.parser.add_argument(
780 "-S",
781 "--sub-id",
782 default="",
783 help=_("subscription id"),
784 )
785
786 async def start(self):
787 try:
788 ps_result = data_format.deserialise(
789 await self.host.bridge.psCacheGet(
790 self.args.service,
791 self.args.node,
792 self.args.max,
793 self.args.items,
794 self.args.sub_id,
795 self.getPubsubExtra(),
796 self.profile,
797 )
798 )
799 except BridgeException as e:
800 if e.classname == "NotFound":
801 self.disp(
802 f"The node {self.args.node} from {self.args.service} is not in cache "
803 f"for {self.profile}",
804 error=True,
805 )
806 self.host.quit(C.EXIT_NOT_FOUND)
807 else:
808 self.disp(f"can't get pubsub items from cache: {e}", error=True)
809 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
810 except Exception as e:
811 self.disp(f"Internal error: {e}", error=True)
812 self.host.quit(C.EXIT_INTERNAL_ERROR)
813 else:
814 await self.output(ps_result["items"])
815 self.host.quit(C.EXIT_OK)
816
817
818 class CacheSync(base.CommandBase):
819
820 def __init__(self, host):
821 super().__init__(
822 host,
823 "sync",
824 use_pubsub=True,
825 pubsub_flags={C.NODE},
826 help=_("(re)synchronise a pubsub node"),
827 )
828
829 def add_parser_options(self):
830 pass
831
832 async def start(self):
833 try:
834 await self.host.bridge.psCacheSync(
835 self.args.service,
836 self.args.node,
837 self.profile,
838 )
839 except BridgeException as e:
840 if e.condition == "item-not-found" or e.classname == "NotFound":
841 self.disp(
842 f"The node {self.args.node} doesn't exist on {self.args.service}",
843 error=True,
844 )
845 self.host.quit(C.EXIT_NOT_FOUND)
846 else:
847 self.disp(f"can't synchronise pubsub node: {e}", error=True)
848 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
849 except Exception as e:
850 self.disp(f"Internal error: {e}", error=True)
851 self.host.quit(C.EXIT_INTERNAL_ERROR)
852 else:
853 self.host.quit(C.EXIT_OK)
854
855
856 class CachePurge(base.CommandBase):
857
858 def __init__(self, host):
859 super().__init__(
860 host,
861 "purge",
862 use_profile=False,
863 help=_("purge (delete) items from cache"),
864 )
865
866 def add_parser_options(self):
867 self.parser.add_argument(
868 "-s", "--service", action="append", metavar="JID", dest="services",
869 help="purge items only for these services. If not specified, items from ALL "
870 "services will be purged. May be used several times."
871 )
872 self.parser.add_argument(
873 "-n", "--node", action="append", dest="nodes",
874 help="purge items only for these nodes. If not specified, items from ALL "
875 "nodes will be purged. May be used several times."
876 )
877 self.parser.add_argument(
878 "-p", "--profile", action="append", dest="profiles",
879 help="purge items only for these profiles. If not specified, items from ALL "
880 "profiles will be purged. May be used several times."
881 )
882 self.parser.add_argument(
883 "-b", "--updated-before", type=base.date_decoder, metavar="TIME_PATTERN",
884 help="purge items which have been last updated before given time."
885 )
886 self.parser.add_argument(
887 "-C", "--created-before", type=base.date_decoder, metavar="TIME_PATTERN",
888 help="purge items which have been last created before given time."
889 )
890 self.parser.add_argument(
891 "-t", "--type", action="append", dest="types",
892 help="purge items flagged with TYPE. May be used several times."
893 )
894 self.parser.add_argument(
895 "-S", "--subtype", action="append", dest="subtypes",
896 help="purge items flagged with SUBTYPE. May be used several times."
897 )
898 self.parser.add_argument(
899 "-f", "--force", action="store_true",
900 help=_("purge items without confirmation")
901 )
902
903 async def start(self):
904 if not self.args.force:
905 await self.host.confirmOrQuit(
906 _(
907 "Are you sure to purge items from cache? You'll have to bypass cache "
908 "or resynchronise nodes to access deleted items again."
909 ),
910 _("Items purgins has been cancelled.")
911 )
912 purge_data = {}
913 for key in (
914 "services", "nodes", "profiles", "updated_before", "created_before",
915 "types", "subtypes"
916 ):
917 value = getattr(self.args, key)
918 if value is not None:
919 purge_data[key] = value
920 try:
921 await self.host.bridge.psCachePurge(
922 data_format.serialise(
923 purge_data
924 )
925 )
926 except Exception as e:
927 self.disp(f"Internal error: {e}", error=True)
928 self.host.quit(C.EXIT_INTERNAL_ERROR)
929 else:
930 self.host.quit(C.EXIT_OK)
931
932
933 class CacheReset(base.CommandBase):
934
935 def __init__(self, host):
936 super().__init__(
937 host,
938 "reset",
939 use_profile=False,
940 help=_("remove everything from cache"),
941 )
942
943 def add_parser_options(self):
944 self.parser.add_argument(
945 "-f", "--force", action="store_true",
946 help=_("reset cache without confirmation")
947 )
948
949 async def start(self):
950 if not self.args.force:
951 await self.host.confirmOrQuit(
952 _(
953 "Are you sure to reset cache? All nodes and items will be removed "
954 "from it, then it will be progressively refilled as if it were new. "
955 "This may be resources intensive."
956 ),
957 _("Pubsub cache reset has been cancelled.")
958 )
959 try:
960 await self.host.bridge.psCacheReset()
961 except Exception as e:
962 self.disp(f"Internal error: {e}", error=True)
963 self.host.quit(C.EXIT_INTERNAL_ERROR)
964 else:
965 self.host.quit(C.EXIT_OK)
966
967
968 class CacheSearch(base.CommandBase):
969 def __init__(self, host):
970 extra_outputs = {
971 "default": self.default_output,
972 "xml": self.xml_output,
973 "xml-raw": self.xml_raw_output,
974 }
975 super().__init__(
976 host,
977 "search",
978 use_profile=False,
979 use_output=C.OUTPUT_LIST_DICT,
980 extra_outputs=extra_outputs,
981 help=_("search for pubsub items in cache"),
982 )
983
984 def add_parser_options(self):
985 self.parser.add_argument(
986 "-f", "--fts", help=_("Full-Text Search query"), metavar="FTS_QUERY"
987 )
988 self.parser.add_argument(
989 "-p", "--profile", action="append", dest="profiles", metavar="PROFILE",
990 help="search items only from these profiles. May be used several times."
991 )
992 self.parser.add_argument(
993 "-s", "--service", action="append", dest="services", metavar="SERVICE",
994 help="items must be from specified service. May be used several times."
995 )
996 self.parser.add_argument(
997 "-n", "--node", action="append", dest="nodes", metavar="NODE",
998 help="items must be in the specified node. May be used several times."
999 )
1000 self.parser.add_argument(
1001 "-t", "--type", action="append", dest="types", metavar="TYPE",
1002 help="items must be of specified type. May be used several times."
1003 )
1004 self.parser.add_argument(
1005 "-S", "--subtype", action="append", dest="subtypes", metavar="SUBTYPE",
1006 help="items must be of specified subtype. May be used several times."
1007 )
1008 self.parser.add_argument(
1009 "-P", "--payload", action="store_true", help=_("include item XML payload")
1010 )
1011 self.parser.add_argument(
1012 "-o", "--order-by", action="append", nargs="+",
1013 metavar=("ORDER", "[FIELD] [DIRECTION]"),
1014 help=_("how items must be ordered. May be used several times.")
1015 )
1016 self.parser.add_argument(
1017 "-l", "--limit", type=int, help=_("maximum number of items to return")
1018 )
1019 self.parser.add_argument(
1020 "-i", "--index", type=int, help=_("return results starting from this index")
1021 )
1022 self.parser.add_argument(
1023 "-F",
1024 "--field",
1025 action="append",
1026 nargs=3,
1027 dest="fields",
1028 default=[],
1029 metavar=("PATH", "OPERATOR", "VALUE"),
1030 help=_("parsed data field filter. May be used several times."),
1031 )
1032 self.parser.add_argument(
1033 "-k",
1034 "--key",
1035 action="append",
1036 dest="keys",
1037 metavar="KEY",
1038 help=_(
1039 "data key(s) to display. May be used several times. DEFAULT: show all "
1040 "keys"
1041 ),
1042 )
1043
1044 async def start(self):
1045 query = {}
1046 for arg in ("fts", "profiles", "services", "nodes", "types", "subtypes"):
1047 value = getattr(self.args, arg)
1048 if value:
1049 if arg in ("types", "subtypes"):
1050 # empty string is used to find items without type and/or subtype
1051 value = [v or None for v in value]
1052 query[arg] = value
1053 for arg in ("limit", "index"):
1054 value = getattr(self.args, arg)
1055 if value is not None:
1056 query[arg] = value
1057 if self.args.order_by is not None:
1058 for order_data in self.args.order_by:
1059 order, *args = order_data
1060 if order == "field":
1061 if not args:
1062 self.parser.error(_("field data must be specified in --order-by"))
1063 elif len(args) == 1:
1064 path = args[0]
1065 direction = "asc"
1066 elif len(args) == 2:
1067 path, direction = args
1068 else:
1069 self.parser.error(_(
1070 "You can't specify more that 2 arguments for a field in "
1071 "--order-by"
1072 ))
1073 try:
1074 path = json.loads(path)
1075 except json.JSONDecodeError:
1076 pass
1077 order_query = {
1078 "path": path,
1079 }
1080 else:
1081 order_query = {
1082 "order": order
1083 }
1084 if not args:
1085 direction = "asc"
1086 elif len(args) == 1:
1087 direction = args[0]
1088 else:
1089 self.parser.error(_(
1090 "there are too many arguments in --order-by option"
1091 ))
1092 if direction.lower() not in ("asc", "desc"):
1093 self.parser.error(_("invalid --order-by direction: {direction!r}"))
1094 order_query["direction"] = direction
1095 query.setdefault("order-by", []).append(order_query)
1096
1097 if self.args.fields:
1098 parsed = []
1099 for field in self.args.fields:
1100 path, operator, value = field
1101 try:
1102 path = json.loads(path)
1103 except json.JSONDecodeError:
1104 # this is not a JSON encoded value, we keep it as a string
1105 pass
1106
1107 if not isinstance(path, list):
1108 path = [path]
1109
1110 # handling of TP(<time pattern>)
1111 if operator in (">", "gt", "<", "le", "between"):
1112 def datetime_sub(match):
1113 return str(date_utils.date_parse_ext(
1114 match.group(1), default_tz=date_utils.TZ_LOCAL
1115 ))
1116 value = re.sub(r"\bTP\(([^)]+)\)", datetime_sub, value)
1117
1118 try:
1119 value = json.loads(value)
1120 except json.JSONDecodeError:
1121 # not JSON, as above we keep it as string
1122 pass
1123
1124 if operator in ("overlap", "ioverlap", "disjoint", "idisjoint"):
1125 if not isinstance(value, list):
1126 value = [value]
1127
1128 parsed.append({
1129 "path": path,
1130 "op": operator,
1131 "value": value
1132 })
1133
1134 query["parsed"] = parsed
1135
1136 if self.args.payload or "xml" in self.args.output:
1137 query["with_payload"] = True
1138 if self.args.keys:
1139 self.args.keys.append("item_payload")
1140 try:
1141 found_items = data_format.deserialise(
1142 await self.host.bridge.psCacheSearch(
1143 data_format.serialise(query)
1144 ),
1145 type_check=list,
1146 )
1147 except BridgeException as e:
1148 self.disp(f"can't search for pubsub items in cache: {e}", error=True)
1149 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
1150 except Exception as e:
1151 self.disp(f"Internal error: {e}", error=True)
1152 self.host.quit(C.EXIT_INTERNAL_ERROR)
1153 else:
1154 if self.args.keys:
1155 found_items = [
1156 {k: v for k,v in item.items() if k in self.args.keys}
1157 for item in found_items
1158 ]
1159 await self.output(found_items)
1160 self.host.quit(C.EXIT_OK)
1161
1162 def default_output(self, found_items):
1163 for item in found_items:
1164 for field in ("created", "published", "updated"):
1165 try:
1166 timestamp = item[field]
1167 except KeyError:
1168 pass
1169 else:
1170 try:
1171 item[field] = common.format_time(timestamp)
1172 except ValueError:
1173 pass
1174 self.host._outputs[C.OUTPUT_LIST_DICT]["simple"]["callback"](found_items)
1175
1176 def xml_output(self, found_items):
1177 """Output prettified item payload"""
1178 cb = self.host._outputs[C.OUTPUT_XML][C.OUTPUT_NAME_XML]["callback"]
1179 for item in found_items:
1180 cb(item["item_payload"])
1181
1182 def xml_raw_output(self, found_items):
1183 """Output item payload without prettifying"""
1184 cb = self.host._outputs[C.OUTPUT_XML][C.OUTPUT_NAME_XML_RAW]["callback"]
1185 for item in found_items:
1186 cb(item["item_payload"])
1187
1188
1189 class Cache(base.CommandBase):
1190 subcommands = (
1191 CacheGet,
1192 CacheSync,
1193 CachePurge,
1194 CacheReset,
1195 CacheSearch,
1196 )
1197
1198 def __init__(self, host):
1199 super(Cache, self).__init__(
1200 host, "cache", use_profile=False, help=_("pubsub cache handling")
1201 )
1202
1203
765 class Set(base.CommandBase): 1204 class Set(base.CommandBase):
766 def __init__(self, host): 1205 def __init__(self, host):
767 base.CommandBase.__init__( 1206 base.CommandBase.__init__(
768 self, 1207 self,
769 host, 1208 host,
821 self, 1260 self,
822 host, 1261 host,
823 "get", 1262 "get",
824 use_output=C.OUTPUT_LIST_XML, 1263 use_output=C.OUTPUT_LIST_XML,
825 use_pubsub=True, 1264 use_pubsub=True,
826 pubsub_flags={C.NODE, C.MULTI_ITEMS}, 1265 pubsub_flags={C.NODE, C.MULTI_ITEMS, C.CACHE},
827 help=_("get pubsub item(s)"), 1266 help=_("get pubsub item(s)"),
828 ) 1267 )
829 1268
830 def add_parser_options(self): 1269 def add_parser_options(self):
831 self.parser.add_argument( 1270 self.parser.add_argument(
833 "--sub-id", 1272 "--sub-id",
834 default="", 1273 default="",
835 help=_("subscription id"), 1274 help=_("subscription id"),
836 ) 1275 )
837 #  TODO: a key(s) argument to select keys to display 1276 #  TODO: a key(s) argument to select keys to display
838 # TODO: add MAM filters
839 1277
840 async def start(self): 1278 async def start(self):
841 try: 1279 try:
842 ps_result = data_format.deserialise( 1280 ps_result = data_format.deserialise(
843 await self.host.bridge.psItemsGet( 1281 await self.host.bridge.psItemsGet(
882 def add_parser_options(self): 1320 def add_parser_options(self):
883 self.parser.add_argument( 1321 self.parser.add_argument(
884 "-f", "--force", action="store_true", help=_("delete without confirmation") 1322 "-f", "--force", action="store_true", help=_("delete without confirmation")
885 ) 1323 )
886 self.parser.add_argument( 1324 self.parser.add_argument(
887 "-N", "--notify", action="store_true", help=_("notify deletion") 1325 "--no-notification", dest="notify", action="store_false",
1326 help=_("do not send notification (not recommended)")
888 ) 1327 )
889 1328
890 async def start(self): 1329 async def start(self):
891 if not self.args.item: 1330 if not self.args.item:
892 self.parser.error(_("You need to specify an item to delete")) 1331 self.parser.error(_("You need to specify an item to delete"))
953 ) 1392 )
954 self.host.quit(1) 1393 self.host.quit(1)
955 items = [item] if item else [] 1394 items = [item] if item else []
956 ps_result = data_format.deserialise( 1395 ps_result = data_format.deserialise(
957 await self.host.bridge.psItemsGet( 1396 await self.host.bridge.psItemsGet(
958 service, node, 1, items, "", {}, self.profile 1397 service, node, 1, items, "", data_format.serialise({}), self.profile
959 ) 1398 )
960 ) 1399 )
961 item_raw = ps_result["items"][0] 1400 item_raw = ps_result["items"][0]
962 parser = etree.XMLParser(remove_blank_text=True, recover=True) 1401 parser = etree.XMLParser(remove_blank_text=True, recover=True)
963 item_elt = etree.fromstring(item_raw, parser) 1402 item_elt = etree.fromstring(item_raw, parser)
1685 self.args.service, 2124 self.args.service,
1686 self.args.node, 2125 self.args.node,
1687 self.args.rsm_max, 2126 self.args.rsm_max,
1688 self.args.items, 2127 self.args.items,
1689 "", 2128 "",
1690 extra, 2129 data_format.serialise(extra),
1691 self.profile, 2130 self.profile,
1692 ) 2131 )
1693 ) 2132 )
1694 except Exception as e: 2133 except Exception as e:
1695 self.disp(f"can't retrieve items: {e}", error=True) 2134 self.disp(f"can't retrieve items: {e}", error=True)
2052 Edit, 2491 Edit,
2053 Rename, 2492 Rename,
2054 Subscribe, 2493 Subscribe,
2055 Unsubscribe, 2494 Unsubscribe,
2056 Subscriptions, 2495 Subscriptions,
2057 Node,
2058 Affiliations, 2496 Affiliations,
2059 Search, 2497 Search,
2060 Transform, 2498 Transform,
2061 Hook, 2499 Hook,
2062 Uri, 2500 Uri,
2501 Node,
2502 Cache,
2063 ) 2503 )
2064 2504
2065 def __init__(self, host): 2505 def __init__(self, host):
2066 super(Pubsub, self).__init__( 2506 super(Pubsub, self).__init__(
2067 host, "pubsub", use_profile=False, help=_("PubSub nodes/items management") 2507 host, "pubsub", use_profile=False, help=_("PubSub nodes/items management")