comparison sat_frontends/jp/cmd_pubsub.py @ 3602:7241ce3b79dd

jp (pubsub): new `cache` subcommand with commands to `get`, `sync`, `purge` and `reset` pubsub cache
author Goffi <goffi@goffi.org>
date Thu, 29 Jul 2021 22:51:01 +0200
parents b46e9791168f
children 7f503b20597e
comparison
equal deleted inserted replaced
3601:b46e9791168f 3602:7241ce3b79dd
757 ) 757 )
758 758
759 def __init__(self, host): 759 def __init__(self, host):
760 super(Node, self).__init__( 760 super(Node, self).__init__(
761 host, "node", use_profile=False, help=_("node handling") 761 host, "node", use_profile=False, help=_("node handling")
762 )
763
764
765 class CacheGet(base.CommandBase):
766 def __init__(self, host):
767 super().__init__(
768 host,
769 "get",
770 use_output=C.OUTPUT_LIST_XML,
771 use_pubsub=True,
772 pubsub_flags={C.NODE, C.MULTI_ITEMS, C.CACHE},
773 help=_("get pubsub item(s) from cache"),
774 )
775
776 def add_parser_options(self):
777 self.parser.add_argument(
778 "-S",
779 "--sub-id",
780 default="",
781 help=_("subscription id"),
782 )
783
784 async def start(self):
785 try:
786 ps_result = data_format.deserialise(
787 await self.host.bridge.psCacheGet(
788 self.args.service,
789 self.args.node,
790 self.args.max,
791 self.args.items,
792 self.args.sub_id,
793 self.getPubsubExtra(),
794 self.profile,
795 )
796 )
797 except BridgeException as e:
798 if e.classname == "NotFound":
799 self.disp(
800 f"The node {self.args.node} from {self.args.service} is not in cache "
801 f"for {self.profile}",
802 error=True,
803 )
804 self.host.quit(C.EXIT_NOT_FOUND)
805 else:
806 self.disp(f"can't get pubsub items from cache: {e}", error=True)
807 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
808 except Exception as e:
809 self.disp(f"Internal error: {e}", error=True)
810 self.host.quit(C.EXIT_INTERNAL_ERROR)
811 else:
812 await self.output(ps_result["items"])
813 self.host.quit(C.EXIT_OK)
814
815
816 class CacheSync(base.CommandBase):
817
818 def __init__(self, host):
819 super().__init__(
820 host,
821 "sync",
822 use_pubsub=True,
823 pubsub_flags={C.NODE},
824 help=_("(re)synchronise a pubsub node"),
825 )
826
827 def add_parser_options(self):
828 pass
829
830 async def start(self):
831 try:
832 await self.host.bridge.psCacheSync(
833 self.args.service,
834 self.args.node,
835 self.profile,
836 )
837 except BridgeException as e:
838 if e.condition == "item-not-found" or e.classname == "NotFound":
839 self.disp(
840 f"The node {self.args.node} doesn't exist on {self.args.service}",
841 error=True,
842 )
843 self.host.quit(C.EXIT_NOT_FOUND)
844 else:
845 self.disp(f"can't synchronise pubsub node: {e}", error=True)
846 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
847 except Exception as e:
848 self.disp(f"Internal error: {e}", error=True)
849 self.host.quit(C.EXIT_INTERNAL_ERROR)
850 else:
851 self.host.quit(C.EXIT_OK)
852
853
854 class CachePurge(base.CommandBase):
855
856 def __init__(self, host):
857 super().__init__(
858 host,
859 "purge",
860 use_profile=False,
861 help=_("purge (delete) items from cache"),
862 )
863
864 def add_parser_options(self):
865 self.parser.add_argument(
866 "-s", "--service", action="append", metavar="JID", dest="services",
867 help="purge items only for these services. If not specified, items from ALL "
868 "services will be purged. May be used several times."
869 )
870 self.parser.add_argument(
871 "-n", "--node", action="append", dest="nodes",
872 help="purge items only for these nodes. If not specified, items from ALL "
873 "nodes will be purged. May be used several times."
874 )
875 self.parser.add_argument(
876 "-p", "--profile", action="append", dest="profiles",
877 help="purge items only for these profiles. If not specified, items from ALL "
878 "profiles will be purged. May be used several times."
879 )
880 self.parser.add_argument(
881 "-b", "--updated-before", type=base.date_decoder, metavar="TIME_PATTERN",
882 help="purge items which have been last updated before given time."
883 )
884 self.parser.add_argument(
885 "-C", "--created-before", type=base.date_decoder, metavar="TIME_PATTERN",
886 help="purge items which have been last created before given time."
887 )
888 self.parser.add_argument(
889 "-t", "--type", action="append", dest="types",
890 help="purge items flagged with TYPE. May be used several times."
891 )
892 self.parser.add_argument(
893 "-s", "--subtype", action="append", dest="subtypes",
894 help="purge items flagged with SUBTYPE. May be used several times."
895 )
896 self.parser.add_argument(
897 "-f", "--force", action="store_true",
898 help=_("purge items without confirmation")
899 )
900
901 async def start(self):
902 if not self.args.force:
903 await self.host.confirmOrQuit(
904 _(
905 "Are you sure to purge items from cache? You'll have to bypass cache "
906 "or resynchronise nodes to access deleted items again."
907 ),
908 _("Items purgins has been cancelled.")
909 )
910 purge_data = {}
911 for key in (
912 "services", "nodes", "profiles", "updated_before", "created_before",
913 "types", "subtypes"
914 ):
915 value = getattr(self.args, key)
916 if value is not None:
917 purge_data[key] = value
918 try:
919 await self.host.bridge.psCachePurge(
920 data_format.serialise(
921 purge_data
922 )
923 )
924 except Exception as e:
925 self.disp(f"Internal error: {e}", error=True)
926 self.host.quit(C.EXIT_INTERNAL_ERROR)
927 else:
928 self.host.quit(C.EXIT_OK)
929
930
931 class CacheReset(base.CommandBase):
932
933 def __init__(self, host):
934 super().__init__(
935 host,
936 "reset",
937 use_profile=False,
938 help=_("remove everything from cache"),
939 )
940
941 def add_parser_options(self):
942 self.parser.add_argument(
943 "-f", "--force", action="store_true",
944 help=_("reset cache without confirmation")
945 )
946
947 async def start(self):
948 if not self.args.force:
949 await self.host.confirmOrQuit(
950 _(
951 "Are you sure to reset cache? All nodes and items will be removed "
952 "from it, then it will be progressively refilled as if it were new."
953 ),
954 _("Pubsub cache reset has been cancelled.")
955 )
956 try:
957 await self.host.bridge.psCacheReset()
958 except Exception as e:
959 self.disp(f"Internal error: {e}", error=True)
960 self.host.quit(C.EXIT_INTERNAL_ERROR)
961 else:
962 self.host.quit(C.EXIT_OK)
963
964
965 class Cache(base.CommandBase):
966 subcommands = (
967 CacheGet,
968 CacheSync,
969 CachePurge,
970 CacheReset,
971 )
972
973 def __init__(self, host):
974 super(Cache, self).__init__(
975 host, "cache", use_profile=False, help=_("pubsub cache handling")
762 ) 976 )
763 977
764 978
765 class Set(base.CommandBase): 979 class Set(base.CommandBase):
766 def __init__(self, host): 980 def __init__(self, host):
2052 Edit, 2266 Edit,
2053 Rename, 2267 Rename,
2054 Subscribe, 2268 Subscribe,
2055 Unsubscribe, 2269 Unsubscribe,
2056 Subscriptions, 2270 Subscriptions,
2057 Node,
2058 Affiliations, 2271 Affiliations,
2059 Search, 2272 Search,
2060 Transform, 2273 Transform,
2061 Hook, 2274 Hook,
2062 Uri, 2275 Uri,
2276 Node,
2277 Cache,
2063 ) 2278 )
2064 2279
2065 def __init__(self, host): 2280 def __init__(self, host):
2066 super(Pubsub, self).__init__( 2281 super(Pubsub, self).__init__(
2067 host, "pubsub", use_profile=False, help=_("PubSub nodes/items management") 2282 host, "pubsub", use_profile=False, help=_("PubSub nodes/items management")