Mercurial > libervia-backend
comparison sat_frontends/jp/cmd_pubsub.py @ 3715:b9718216a1c0 0.9
merge bookmark 0.9
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 01 Dec 2021 16:13:31 +0100 |
parents | 0eacda79b5d1 |
children | 5bda9d2e8b35 |
comparison
equal
deleted
inserted
replaced
3714:af09b5aaa5d7 | 3715:b9718216a1c0 |
---|---|
22 import os.path | 22 import os.path |
23 import re | 23 import re |
24 import sys | 24 import sys |
25 import subprocess | 25 import subprocess |
26 import asyncio | 26 import asyncio |
27 import json | |
27 from . import base | 28 from . import base |
28 from sat.core.i18n import _ | 29 from sat.core.i18n import _ |
29 from sat.core import exceptions | 30 from sat.core import exceptions |
30 from sat_frontends.jp.constants import Const as C | 31 from sat_frontends.jp.constants import Const as C |
31 from sat_frontends.jp import common | 32 from sat_frontends.jp import common |
33 from sat_frontends.jp import xml_tools | 34 from sat_frontends.jp import xml_tools |
34 from functools import partial | 35 from functools import partial |
35 from sat.tools.common import data_format | 36 from sat.tools.common import data_format |
36 from sat.tools.common import uri | 37 from sat.tools.common import uri |
37 from sat.tools.common.ansi import ANSI as A | 38 from sat.tools.common.ansi import ANSI as A |
39 from sat.tools.common import date_utils | |
38 from sat_frontends.tools import jid, strings | 40 from sat_frontends.tools import jid, strings |
39 from sat_frontends.bridge.bridge_frontend import BridgeException | 41 from sat_frontends.bridge.bridge_frontend import BridgeException |
40 | 42 |
41 __commands__ = ["Pubsub"] | 43 __commands__ = ["Pubsub"] |
42 | 44 |
760 super(Node, self).__init__( | 762 super(Node, self).__init__( |
761 host, "node", use_profile=False, help=_("node handling") | 763 host, "node", use_profile=False, help=_("node handling") |
762 ) | 764 ) |
763 | 765 |
764 | 766 |
767 class CacheGet(base.CommandBase): | |
768 def __init__(self, host): | |
769 super().__init__( | |
770 host, | |
771 "get", | |
772 use_output=C.OUTPUT_LIST_XML, | |
773 use_pubsub=True, | |
774 pubsub_flags={C.NODE, C.MULTI_ITEMS, C.CACHE}, | |
775 help=_("get pubsub item(s) from cache"), | |
776 ) | |
777 | |
778 def add_parser_options(self): | |
779 self.parser.add_argument( | |
780 "-S", | |
781 "--sub-id", | |
782 default="", | |
783 help=_("subscription id"), | |
784 ) | |
785 | |
786 async def start(self): | |
787 try: | |
788 ps_result = data_format.deserialise( | |
789 await self.host.bridge.psCacheGet( | |
790 self.args.service, | |
791 self.args.node, | |
792 self.args.max, | |
793 self.args.items, | |
794 self.args.sub_id, | |
795 self.getPubsubExtra(), | |
796 self.profile, | |
797 ) | |
798 ) | |
799 except BridgeException as e: | |
800 if e.classname == "NotFound": | |
801 self.disp( | |
802 f"The node {self.args.node} from {self.args.service} is not in cache " | |
803 f"for {self.profile}", | |
804 error=True, | |
805 ) | |
806 self.host.quit(C.EXIT_NOT_FOUND) | |
807 else: | |
808 self.disp(f"can't get pubsub items from cache: {e}", error=True) | |
809 self.host.quit(C.EXIT_BRIDGE_ERRBACK) | |
810 except Exception as e: | |
811 self.disp(f"Internal error: {e}", error=True) | |
812 self.host.quit(C.EXIT_INTERNAL_ERROR) | |
813 else: | |
814 await self.output(ps_result["items"]) | |
815 self.host.quit(C.EXIT_OK) | |
816 | |
817 | |
818 class CacheSync(base.CommandBase): | |
819 | |
820 def __init__(self, host): | |
821 super().__init__( | |
822 host, | |
823 "sync", | |
824 use_pubsub=True, | |
825 pubsub_flags={C.NODE}, | |
826 help=_("(re)synchronise a pubsub node"), | |
827 ) | |
828 | |
829 def add_parser_options(self): | |
830 pass | |
831 | |
832 async def start(self): | |
833 try: | |
834 await self.host.bridge.psCacheSync( | |
835 self.args.service, | |
836 self.args.node, | |
837 self.profile, | |
838 ) | |
839 except BridgeException as e: | |
840 if e.condition == "item-not-found" or e.classname == "NotFound": | |
841 self.disp( | |
842 f"The node {self.args.node} doesn't exist on {self.args.service}", | |
843 error=True, | |
844 ) | |
845 self.host.quit(C.EXIT_NOT_FOUND) | |
846 else: | |
847 self.disp(f"can't synchronise pubsub node: {e}", error=True) | |
848 self.host.quit(C.EXIT_BRIDGE_ERRBACK) | |
849 except Exception as e: | |
850 self.disp(f"Internal error: {e}", error=True) | |
851 self.host.quit(C.EXIT_INTERNAL_ERROR) | |
852 else: | |
853 self.host.quit(C.EXIT_OK) | |
854 | |
855 | |
856 class CachePurge(base.CommandBase): | |
857 | |
858 def __init__(self, host): | |
859 super().__init__( | |
860 host, | |
861 "purge", | |
862 use_profile=False, | |
863 help=_("purge (delete) items from cache"), | |
864 ) | |
865 | |
866 def add_parser_options(self): | |
867 self.parser.add_argument( | |
868 "-s", "--service", action="append", metavar="JID", dest="services", | |
869 help="purge items only for these services. If not specified, items from ALL " | |
870 "services will be purged. May be used several times." | |
871 ) | |
872 self.parser.add_argument( | |
873 "-n", "--node", action="append", dest="nodes", | |
874 help="purge items only for these nodes. If not specified, items from ALL " | |
875 "nodes will be purged. May be used several times." | |
876 ) | |
877 self.parser.add_argument( | |
878 "-p", "--profile", action="append", dest="profiles", | |
879 help="purge items only for these profiles. If not specified, items from ALL " | |
880 "profiles will be purged. May be used several times." | |
881 ) | |
882 self.parser.add_argument( | |
883 "-b", "--updated-before", type=base.date_decoder, metavar="TIME_PATTERN", | |
884 help="purge items which have been last updated before given time." | |
885 ) | |
886 self.parser.add_argument( | |
887 "-C", "--created-before", type=base.date_decoder, metavar="TIME_PATTERN", | |
888 help="purge items which have been last created before given time." | |
889 ) | |
890 self.parser.add_argument( | |
891 "-t", "--type", action="append", dest="types", | |
892 help="purge items flagged with TYPE. May be used several times." | |
893 ) | |
894 self.parser.add_argument( | |
895 "-S", "--subtype", action="append", dest="subtypes", | |
896 help="purge items flagged with SUBTYPE. May be used several times." | |
897 ) | |
898 self.parser.add_argument( | |
899 "-f", "--force", action="store_true", | |
900 help=_("purge items without confirmation") | |
901 ) | |
902 | |
903 async def start(self): | |
904 if not self.args.force: | |
905 await self.host.confirmOrQuit( | |
906 _( | |
907 "Are you sure to purge items from cache? You'll have to bypass cache " | |
908 "or resynchronise nodes to access deleted items again." | |
909 ), | |
910 _("Items purgins has been cancelled.") | |
911 ) | |
912 purge_data = {} | |
913 for key in ( | |
914 "services", "nodes", "profiles", "updated_before", "created_before", | |
915 "types", "subtypes" | |
916 ): | |
917 value = getattr(self.args, key) | |
918 if value is not None: | |
919 purge_data[key] = value | |
920 try: | |
921 await self.host.bridge.psCachePurge( | |
922 data_format.serialise( | |
923 purge_data | |
924 ) | |
925 ) | |
926 except Exception as e: | |
927 self.disp(f"Internal error: {e}", error=True) | |
928 self.host.quit(C.EXIT_INTERNAL_ERROR) | |
929 else: | |
930 self.host.quit(C.EXIT_OK) | |
931 | |
932 | |
933 class CacheReset(base.CommandBase): | |
934 | |
935 def __init__(self, host): | |
936 super().__init__( | |
937 host, | |
938 "reset", | |
939 use_profile=False, | |
940 help=_("remove everything from cache"), | |
941 ) | |
942 | |
943 def add_parser_options(self): | |
944 self.parser.add_argument( | |
945 "-f", "--force", action="store_true", | |
946 help=_("reset cache without confirmation") | |
947 ) | |
948 | |
949 async def start(self): | |
950 if not self.args.force: | |
951 await self.host.confirmOrQuit( | |
952 _( | |
953 "Are you sure to reset cache? All nodes and items will be removed " | |
954 "from it, then it will be progressively refilled as if it were new. " | |
955 "This may be resources intensive." | |
956 ), | |
957 _("Pubsub cache reset has been cancelled.") | |
958 ) | |
959 try: | |
960 await self.host.bridge.psCacheReset() | |
961 except Exception as e: | |
962 self.disp(f"Internal error: {e}", error=True) | |
963 self.host.quit(C.EXIT_INTERNAL_ERROR) | |
964 else: | |
965 self.host.quit(C.EXIT_OK) | |
966 | |
967 | |
968 class CacheSearch(base.CommandBase): | |
969 def __init__(self, host): | |
970 extra_outputs = { | |
971 "default": self.default_output, | |
972 "xml": self.xml_output, | |
973 "xml-raw": self.xml_raw_output, | |
974 } | |
975 super().__init__( | |
976 host, | |
977 "search", | |
978 use_profile=False, | |
979 use_output=C.OUTPUT_LIST_DICT, | |
980 extra_outputs=extra_outputs, | |
981 help=_("search for pubsub items in cache"), | |
982 ) | |
983 | |
984 def add_parser_options(self): | |
985 self.parser.add_argument( | |
986 "-f", "--fts", help=_("Full-Text Search query"), metavar="FTS_QUERY" | |
987 ) | |
988 self.parser.add_argument( | |
989 "-p", "--profile", action="append", dest="profiles", metavar="PROFILE", | |
990 help="search items only from these profiles. May be used several times." | |
991 ) | |
992 self.parser.add_argument( | |
993 "-s", "--service", action="append", dest="services", metavar="SERVICE", | |
994 help="items must be from specified service. May be used several times." | |
995 ) | |
996 self.parser.add_argument( | |
997 "-n", "--node", action="append", dest="nodes", metavar="NODE", | |
998 help="items must be in the specified node. May be used several times." | |
999 ) | |
1000 self.parser.add_argument( | |
1001 "-t", "--type", action="append", dest="types", metavar="TYPE", | |
1002 help="items must be of specified type. May be used several times." | |
1003 ) | |
1004 self.parser.add_argument( | |
1005 "-S", "--subtype", action="append", dest="subtypes", metavar="SUBTYPE", | |
1006 help="items must be of specified subtype. May be used several times." | |
1007 ) | |
1008 self.parser.add_argument( | |
1009 "-P", "--payload", action="store_true", help=_("include item XML payload") | |
1010 ) | |
1011 self.parser.add_argument( | |
1012 "-o", "--order-by", action="append", nargs="+", | |
1013 metavar=("ORDER", "[FIELD] [DIRECTION]"), | |
1014 help=_("how items must be ordered. May be used several times.") | |
1015 ) | |
1016 self.parser.add_argument( | |
1017 "-l", "--limit", type=int, help=_("maximum number of items to return") | |
1018 ) | |
1019 self.parser.add_argument( | |
1020 "-i", "--index", type=int, help=_("return results starting from this index") | |
1021 ) | |
1022 self.parser.add_argument( | |
1023 "-F", | |
1024 "--field", | |
1025 action="append", | |
1026 nargs=3, | |
1027 dest="fields", | |
1028 default=[], | |
1029 metavar=("PATH", "OPERATOR", "VALUE"), | |
1030 help=_("parsed data field filter. May be used several times."), | |
1031 ) | |
1032 self.parser.add_argument( | |
1033 "-k", | |
1034 "--key", | |
1035 action="append", | |
1036 dest="keys", | |
1037 metavar="KEY", | |
1038 help=_( | |
1039 "data key(s) to display. May be used several times. DEFAULT: show all " | |
1040 "keys" | |
1041 ), | |
1042 ) | |
1043 | |
1044 async def start(self): | |
1045 query = {} | |
1046 for arg in ("fts", "profiles", "services", "nodes", "types", "subtypes"): | |
1047 value = getattr(self.args, arg) | |
1048 if value: | |
1049 if arg in ("types", "subtypes"): | |
1050 # empty string is used to find items without type and/or subtype | |
1051 value = [v or None for v in value] | |
1052 query[arg] = value | |
1053 for arg in ("limit", "index"): | |
1054 value = getattr(self.args, arg) | |
1055 if value is not None: | |
1056 query[arg] = value | |
1057 if self.args.order_by is not None: | |
1058 for order_data in self.args.order_by: | |
1059 order, *args = order_data | |
1060 if order == "field": | |
1061 if not args: | |
1062 self.parser.error(_("field data must be specified in --order-by")) | |
1063 elif len(args) == 1: | |
1064 path = args[0] | |
1065 direction = "asc" | |
1066 elif len(args) == 2: | |
1067 path, direction = args | |
1068 else: | |
1069 self.parser.error(_( | |
1070 "You can't specify more that 2 arguments for a field in " | |
1071 "--order-by" | |
1072 )) | |
1073 try: | |
1074 path = json.loads(path) | |
1075 except json.JSONDecodeError: | |
1076 pass | |
1077 order_query = { | |
1078 "path": path, | |
1079 } | |
1080 else: | |
1081 order_query = { | |
1082 "order": order | |
1083 } | |
1084 if not args: | |
1085 direction = "asc" | |
1086 elif len(args) == 1: | |
1087 direction = args[0] | |
1088 else: | |
1089 self.parser.error(_( | |
1090 "there are too many arguments in --order-by option" | |
1091 )) | |
1092 if direction.lower() not in ("asc", "desc"): | |
1093 self.parser.error(_("invalid --order-by direction: {direction!r}")) | |
1094 order_query["direction"] = direction | |
1095 query.setdefault("order-by", []).append(order_query) | |
1096 | |
1097 if self.args.fields: | |
1098 parsed = [] | |
1099 for field in self.args.fields: | |
1100 path, operator, value = field | |
1101 try: | |
1102 path = json.loads(path) | |
1103 except json.JSONDecodeError: | |
1104 # this is not a JSON encoded value, we keep it as a string | |
1105 pass | |
1106 | |
1107 if not isinstance(path, list): | |
1108 path = [path] | |
1109 | |
1110 # handling of TP(<time pattern>) | |
1111 if operator in (">", "gt", "<", "le", "between"): | |
1112 def datetime_sub(match): | |
1113 return str(date_utils.date_parse_ext( | |
1114 match.group(1), default_tz=date_utils.TZ_LOCAL | |
1115 )) | |
1116 value = re.sub(r"\bTP\(([^)]+)\)", datetime_sub, value) | |
1117 | |
1118 try: | |
1119 value = json.loads(value) | |
1120 except json.JSONDecodeError: | |
1121 # not JSON, as above we keep it as string | |
1122 pass | |
1123 | |
1124 if operator in ("overlap", "ioverlap", "disjoint", "idisjoint"): | |
1125 if not isinstance(value, list): | |
1126 value = [value] | |
1127 | |
1128 parsed.append({ | |
1129 "path": path, | |
1130 "op": operator, | |
1131 "value": value | |
1132 }) | |
1133 | |
1134 query["parsed"] = parsed | |
1135 | |
1136 if self.args.payload or "xml" in self.args.output: | |
1137 query["with_payload"] = True | |
1138 if self.args.keys: | |
1139 self.args.keys.append("item_payload") | |
1140 try: | |
1141 found_items = data_format.deserialise( | |
1142 await self.host.bridge.psCacheSearch( | |
1143 data_format.serialise(query) | |
1144 ), | |
1145 type_check=list, | |
1146 ) | |
1147 except BridgeException as e: | |
1148 self.disp(f"can't search for pubsub items in cache: {e}", error=True) | |
1149 self.host.quit(C.EXIT_BRIDGE_ERRBACK) | |
1150 except Exception as e: | |
1151 self.disp(f"Internal error: {e}", error=True) | |
1152 self.host.quit(C.EXIT_INTERNAL_ERROR) | |
1153 else: | |
1154 if self.args.keys: | |
1155 found_items = [ | |
1156 {k: v for k,v in item.items() if k in self.args.keys} | |
1157 for item in found_items | |
1158 ] | |
1159 await self.output(found_items) | |
1160 self.host.quit(C.EXIT_OK) | |
1161 | |
1162 def default_output(self, found_items): | |
1163 for item in found_items: | |
1164 for field in ("created", "published", "updated"): | |
1165 try: | |
1166 timestamp = item[field] | |
1167 except KeyError: | |
1168 pass | |
1169 else: | |
1170 try: | |
1171 item[field] = common.format_time(timestamp) | |
1172 except ValueError: | |
1173 pass | |
1174 self.host._outputs[C.OUTPUT_LIST_DICT]["simple"]["callback"](found_items) | |
1175 | |
1176 def xml_output(self, found_items): | |
1177 """Output prettified item payload""" | |
1178 cb = self.host._outputs[C.OUTPUT_XML][C.OUTPUT_NAME_XML]["callback"] | |
1179 for item in found_items: | |
1180 cb(item["item_payload"]) | |
1181 | |
1182 def xml_raw_output(self, found_items): | |
1183 """Output item payload without prettifying""" | |
1184 cb = self.host._outputs[C.OUTPUT_XML][C.OUTPUT_NAME_XML_RAW]["callback"] | |
1185 for item in found_items: | |
1186 cb(item["item_payload"]) | |
1187 | |
1188 | |
1189 class Cache(base.CommandBase): | |
1190 subcommands = ( | |
1191 CacheGet, | |
1192 CacheSync, | |
1193 CachePurge, | |
1194 CacheReset, | |
1195 CacheSearch, | |
1196 ) | |
1197 | |
1198 def __init__(self, host): | |
1199 super(Cache, self).__init__( | |
1200 host, "cache", use_profile=False, help=_("pubsub cache handling") | |
1201 ) | |
1202 | |
1203 | |
765 class Set(base.CommandBase): | 1204 class Set(base.CommandBase): |
766 def __init__(self, host): | 1205 def __init__(self, host): |
767 base.CommandBase.__init__( | 1206 base.CommandBase.__init__( |
768 self, | 1207 self, |
769 host, | 1208 host, |
821 self, | 1260 self, |
822 host, | 1261 host, |
823 "get", | 1262 "get", |
824 use_output=C.OUTPUT_LIST_XML, | 1263 use_output=C.OUTPUT_LIST_XML, |
825 use_pubsub=True, | 1264 use_pubsub=True, |
826 pubsub_flags={C.NODE, C.MULTI_ITEMS}, | 1265 pubsub_flags={C.NODE, C.MULTI_ITEMS, C.CACHE}, |
827 help=_("get pubsub item(s)"), | 1266 help=_("get pubsub item(s)"), |
828 ) | 1267 ) |
829 | 1268 |
830 def add_parser_options(self): | 1269 def add_parser_options(self): |
831 self.parser.add_argument( | 1270 self.parser.add_argument( |
833 "--sub-id", | 1272 "--sub-id", |
834 default="", | 1273 default="", |
835 help=_("subscription id"), | 1274 help=_("subscription id"), |
836 ) | 1275 ) |
837 # TODO: a key(s) argument to select keys to display | 1276 # TODO: a key(s) argument to select keys to display |
838 # TODO: add MAM filters | |
839 | 1277 |
840 async def start(self): | 1278 async def start(self): |
841 try: | 1279 try: |
842 ps_result = data_format.deserialise( | 1280 ps_result = data_format.deserialise( |
843 await self.host.bridge.psItemsGet( | 1281 await self.host.bridge.psItemsGet( |
882 def add_parser_options(self): | 1320 def add_parser_options(self): |
883 self.parser.add_argument( | 1321 self.parser.add_argument( |
884 "-f", "--force", action="store_true", help=_("delete without confirmation") | 1322 "-f", "--force", action="store_true", help=_("delete without confirmation") |
885 ) | 1323 ) |
886 self.parser.add_argument( | 1324 self.parser.add_argument( |
887 "-N", "--notify", action="store_true", help=_("notify deletion") | 1325 "--no-notification", dest="notify", action="store_false", |
1326 help=_("do not send notification (not recommended)") | |
888 ) | 1327 ) |
889 | 1328 |
890 async def start(self): | 1329 async def start(self): |
891 if not self.args.item: | 1330 if not self.args.item: |
892 self.parser.error(_("You need to specify an item to delete")) | 1331 self.parser.error(_("You need to specify an item to delete")) |
953 ) | 1392 ) |
954 self.host.quit(1) | 1393 self.host.quit(1) |
955 items = [item] if item else [] | 1394 items = [item] if item else [] |
956 ps_result = data_format.deserialise( | 1395 ps_result = data_format.deserialise( |
957 await self.host.bridge.psItemsGet( | 1396 await self.host.bridge.psItemsGet( |
958 service, node, 1, items, "", {}, self.profile | 1397 service, node, 1, items, "", data_format.serialise({}), self.profile |
959 ) | 1398 ) |
960 ) | 1399 ) |
961 item_raw = ps_result["items"][0] | 1400 item_raw = ps_result["items"][0] |
962 parser = etree.XMLParser(remove_blank_text=True, recover=True) | 1401 parser = etree.XMLParser(remove_blank_text=True, recover=True) |
963 item_elt = etree.fromstring(item_raw, parser) | 1402 item_elt = etree.fromstring(item_raw, parser) |
1685 self.args.service, | 2124 self.args.service, |
1686 self.args.node, | 2125 self.args.node, |
1687 self.args.rsm_max, | 2126 self.args.rsm_max, |
1688 self.args.items, | 2127 self.args.items, |
1689 "", | 2128 "", |
1690 extra, | 2129 data_format.serialise(extra), |
1691 self.profile, | 2130 self.profile, |
1692 ) | 2131 ) |
1693 ) | 2132 ) |
1694 except Exception as e: | 2133 except Exception as e: |
1695 self.disp(f"can't retrieve items: {e}", error=True) | 2134 self.disp(f"can't retrieve items: {e}", error=True) |
2052 Edit, | 2491 Edit, |
2053 Rename, | 2492 Rename, |
2054 Subscribe, | 2493 Subscribe, |
2055 Unsubscribe, | 2494 Unsubscribe, |
2056 Subscriptions, | 2495 Subscriptions, |
2057 Node, | |
2058 Affiliations, | 2496 Affiliations, |
2059 Search, | 2497 Search, |
2060 Transform, | 2498 Transform, |
2061 Hook, | 2499 Hook, |
2062 Uri, | 2500 Uri, |
2501 Node, | |
2502 Cache, | |
2063 ) | 2503 ) |
2064 | 2504 |
2065 def __init__(self, host): | 2505 def __init__(self, host): |
2066 super(Pubsub, self).__init__( | 2506 super(Pubsub, self).__init__( |
2067 host, "pubsub", use_profile=False, help=_("PubSub nodes/items management") | 2507 host, "pubsub", use_profile=False, help=_("PubSub nodes/items management") |