Mercurial > libervia-backend
diff sat_frontends/jp/cmd_pubsub.py @ 2777:ff1b40823b07
jp (pubsub): new "transform" command:
This command allows to pass all requested items through an external command to filter them (i.e. modify their content).
- created new jp.xml_tools module with some common functions (like lxml parsing)
- new EXIT code EXIT_CMD_ERROR (used when a third party utility returns an error)
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 15 Jan 2019 08:51:56 +0100 |
parents | b7974814dd5b |
children | f61a50790fae |
line wrap: on
line diff
--- a/sat_frontends/jp/cmd_pubsub.py Tue Jan 15 08:51:54 2019 +0100 +++ b/sat_frontends/jp/cmd_pubsub.py Tue Jan 15 08:51:56 2019 +0100 @@ -24,6 +24,7 @@ from sat_frontends.jp.constants import Const as C from sat_frontends.jp import common from sat_frontends.jp import arg_tools +from sat_frontends.jp import xml_tools from functools import partial from sat.tools.common import uri from sat.tools.common.ansi import ANSI as A @@ -683,26 +684,8 @@ self.host.quit(C.EXIT_OK) def start(self): - try: - from lxml import etree - except ImportError: - self.disp( - u'lxml module must be installed to use edit, please install it with "pip install lxml"', - error=True, - ) - self.host.quit(1) - try: - element = etree.parse(sys.stdin).getroot() - except Exception as e: - self.parser.error( - _(u"Can't parse the payload XML in input: {msg}").format(msg=e) - ) - if element.tag in ("item", "{http://jabber.org/protocol/pubsub}item"): - if len(element) > 1: - self.parser.error( - _(u"<item> can only have one child element (the payload)") - ) - element = element[0] + element, etree = xml_tools.etreeParse(self, sys.stdin) + element = xml_tools.getPayload(self, element) payload = etree.tostring(element, encoding="unicode") self.host.bridge.psItemSend( @@ -1070,7 +1053,7 @@ type=int, default=0, help=_( - u"maximum depth of recursion (will search linked nodes if > 0, default: 0)" + u"maximum depth of recursion (will search linked nodes if > 0, DEFAULT: 0)" ), ) self.parser.add_argument( @@ -1079,7 +1062,7 @@ type=int, default=30, help=_(u"maximum number of items to get per node ({} to get all items, " - u"default: 30)".format( C.NO_LIMIT)), + u"DEFAULT: 30)".format( C.NO_LIMIT)), ) self.parser.add_argument( "-N", @@ -1157,7 +1140,7 @@ const=("ignore-case", True), nargs="?", metavar="BOOLEAN", - help=_(u"(don't) ignore case in following filters (default: case sensitive)"), + help=_(u"(don't) ignore case in following filters (DEFAULT: case sensitive)"), ) flags.add_argument( "-I", @@ -1168,7 +1151,7 @@ const=("invert", True), nargs="?", metavar="BOOLEAN", - help=_(u"(don't) invert effect of following filters (default: don't invert)"), + help=_(u"(don't) invert effect of following filters (DEFAULT: don't invert)"), ) flags.add_argument( "-A", @@ -1179,7 +1162,7 @@ const=("dotall", True), nargs="?", metavar="BOOLEAN", - help=_(u"(don't) use DOTALL option for regex (default: don't use)"), + help=_(u"(don't) use DOTALL option for regex (DEFAULT: don't use)"), ) flags.add_argument( "-k", @@ -1199,7 +1182,7 @@ default="print", nargs="?", choices=("print", "exec", "external"), - help=_(u"action to do on found items (default: print)"), + help=_(u"action to do on found items (DEFAULT: print)"), ) self.parser.add_argument("command", nargs=argparse.REMAINDER) @@ -1474,6 +1457,283 @@ self.getItems(0, self.args.service, self.args.node, self.args.items) +class Transform(base.CommandBase): + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "transform", + use_pubsub=True, + pubsub_flags={C.NODE, C.MULTI_ITEMS}, + help=_(u"modify items of a node using an external command/script"), + ) + self.need_loop = True + + def add_parser_options(self): + self.parser.add_argument( + "--apply", + action="store_true", + help=_(u"apply transformation (DEFAULT: do a dry run)"), + ) + self.parser.add_argument( + "--admin", + action="store_true", + help=_(u"do a pubsub admin request, needed to change publisher"), + ) + self.parser.add_argument( + "-I", + "--ignore_errors", + action="store_true", + help=_( + u"if command return a non zero exit code, ignore the item and continue"), + ) + self.parser.add_argument( + "-A", + "--all", + action="store_true", + help=_(u"get all items by looping over all pages using RSM") + ) + self.parser.add_argument( + "command_path", + help=_(u"path to the command to use. Will be called repetitivly with an " + u"item as input. Output (full item XML) will be used as new one. " + u'Return "DELETE" string to delete the item, and "SKIP" to ignore it'), + ) + + def psAdminItemsSendCb(self, item_ids, metadata): + self.disp(_(u'items published with ids {item_ids}').format( + item_ids=u', '.join(item_ids))) + if self.args.all: + return self.handleNextPage(metadata) + else: + self.host.quit() + + def psItemsSendCb(self, item_id, metadata, show_mess=True): + if show_mess: + self.disp(u'item published with id {item_id}'.format(item_id=item_id)) + if self.items_sent == self.items_to_send: + if self.args.all: + return self.handleNextPage(metadata) + self.disp(u'all items published') + self.host.quit() + + def psRetractItemCb(self, item_id, metadata): + self.psItemsSendCb(item_id, metadata, show_mess=False) + + def handleNextPage(self, metadata): + """Retrieve new page through RSM or quit if we're in the last page + + use to handle --all option + @param metadata(dict): metadata as returned by psItemsGet + """ + try: + last = metadata[u'rsm_last'] + index = int(metadata[u'rsm_index']) + count = int(metadata[u'rsm_count']) + except KeyError: + self.disp(_(u"Can't retrieve all items, RSM metadata not available"), + error=True) + self.host.quit(C.EXIT_MISSING_FEATURE) + except ValueError as e: + self.disp(_(u"Can't retrieve all items, bad RSM metadata: {msg}") + .format(msg=e), error=True) + self.host.quit(C.EXIT_ERROR) + + if index + self.args.rsm_max >= count: + self.disp(_(u'All items transformed')) + self.host.quit(0) + + self.disp(_(u'Retrieving next page ({page_idx}/{page_total})').format( + page_idx = int(index/self.args.rsm_max) + 1, + page_total = int(count/self.args.rsm_max), + ) + ) + + extra = self.getPubsubExtra() + extra[u'rsm_after'] = last + self.host.bridge.psItemsGet( + self.args.service, + self.args.node, + self.args.rsm_max, + self.args.items, + "", + extra, + self.profile, + callback=self.psItemsGetCb, + errback=partial( + self.errback, + msg=_(u"can't retrieve items: {}"), + exit_code=C.EXIT_BRIDGE_ERRBACK, + ), + ) + + def psItemsGetCb(self, ps_result): + items, metadata = ps_result + if self.args.admin: + new_items = [] + else: + self.items_to_send = len(items) + self.items_sent = 0 + + for item in items: + if self.check_duplicates: + # this is used when we are not ordering by creation + # to avoid infinite loop + item_elt, __ = xml_tools.etreeParse(self, item) + item_id = item_elt.get('id') + if item_id in self.items_ids: + self.disp(_( + u"Duplicate found on item {item_id}, we have probably handled " + u"all items.").format(item_id=item_id)) + self.host.quit() + self.items_ids.append(item_id) + + # we launch the command to filter the item + try: + p = subprocess.Popen(self.args.command_path, stdin=subprocess.PIPE, + stdout=subprocess.PIPE) + except OSError as e: + exit_code = C.EXIT_CMD_NOT_FOUND if e.errno == 2 else C.EXIT_ERROR + e = str(e).decode('utf-8', errors="ignore") + self.disp(u"Can't execute the command: {msg}".format(msg=e), error=True) + self.host.quit(exit_code) + cmd_std_out, cmd_std_err = p.communicate(item.encode("utf-8")) + ret = p.wait() + if ret != 0: + self.disp(u"The command returned a non zero status while parsing the " + u"following item:\n\n{item}".format(item=item), error=True) + if self.args.ignore_errors: + if not self.args.admin: + self.items_to_send -= 1 + continue + else: + self.host.quit(C.EXIT_CMD_ERROR) + if cmd_std_err is not None: + cmd_std_err = cmd_std_err.decode('utf-8', errors='ignore') + self.disp(cmd_std_err, error=True) + cmd_std_out = cmd_std_out.strip() + if cmd_std_out == "DELETE": + item_elt, __ = xml_tools.etreeParse(self, item) + item_id = item_elt.get('id') + self.disp(_(u"Deleting item {item_id}").format(item_id=item_id)) + if self.args.apply: + if not self.args.admin: + # we need to increase the counter as if the item were re-published + self.items_sent += 1 + self.host.bridge.psRetractItem( + self.args.service, + self.args.node, + item_id, + False, + self.profile, + callback=partial(self.psRetractItemCb, metadata=metadata), + errback=partial( + self.errback, + msg=_(u"can't delete item: {}"), + exit_code=C.EXIT_BRIDGE_ERRBACK, + ), + ) + continue + elif cmd_std_out == "SKIP": + item_elt, __ = xml_tools.etreeParse(self, item) + item_id = item_elt.get('id') + self.disp(_(u"Skipping item {item_id}").format(item_id=item_id)) + if self.args.apply: + if not self.args.admin: + # see above + self.items_sent += 1 + self.psItemsSendCb(item_id, metadata, show_mess=False) + continue + element, etree = xml_tools.etreeParse(self, cmd_std_out) + + # at this point command has been run and we have a etree.Element object + if element.tag not in ("item", "{http://jabber.org/protocol/pubsub}item"): + self.disp(u"your script must return a whole item, this is not:\n{xml}" + .format(xml=etree.tostring(element, encoding="unicode")), error=True) + self.host.quit(C.EXIT_DATA_ERROR) + + if not self.args.apply: + # we have a dry run, we just display filtered items + serialised = etree.tostring(element, encoding=u'unicode', + pretty_print=True) + self.disp(serialised) + else: + # we will apply the change, either in admin request or as a simple + # pubsub one + if self.args.admin: + new_items.append(etree.tostring(element, encoding="unicode")) + else: + # there is currently no method to send several items at once + # so we publish them one by one + payload = etree.tostring(xml_tools.getPayload(self, element), + encoding="unicode") + item_id = element.get(u'id', '') + self.host.bridge.psItemSend( + self.args.service, + self.args.node, + payload, + item_id, + {}, + self.profile, + callback=partial(self.psItemsSendCb, metadata=metadata), + errback=partial( + self.errback, + msg=_(u"can't send item: {}"), + exit_code=C.EXIT_BRIDGE_ERRBACK, + ), + ) + self.items_sent += 1 + + if not self.args.apply: + # on dry run we have nothing to wait for, we can quit + if self.args.all: + return self.handleNextPage(metadata) + self.host.quit() + elif self.args.admin: + self.host.bridge.psAdminItemsSend( + self.args.service, + self.args.node, + new_items, + u"", + self.profile, + callback=partial(self.psAdminItemsSendCb, metadata=metadata), + errback=partial( + self.errback, + msg=_(u"can't send item: {}"), + exit_code=C.EXIT_BRIDGE_ERRBACK, + ), + ) + + def start(self): + if self.args.all and self.args.order_by != C.ORDER_BY_CREATION: + self.check_duplicates = True + self.items_ids = [] + self.disp(A.color( + A.FG_RED, A.BOLD, + u'/!\\ "--all" should be used with "--order-by creation" /!\\\n', + A.RESET, + u"We'll update items, so order may change during transformation,\n" + u"we'll try to mitigate that by stopping on first duplicate,\n" + u"but this method is not safe, and some items may be missed.\n---\n")) + else: + self.check_duplicates = False + self.host.bridge.psItemsGet( + self.args.service, + self.args.node, + self.args.max, + self.args.items, + "", + self.getPubsubExtra(), + self.profile, + callback=self.psItemsGetCb, + errback=partial( + self.errback, + msg=_(u"can't retrieve items: {}"), + exit_code=C.EXIT_BRIDGE_ERRBACK, + ), + ) + + class Uri(base.CommandBase): def __init__(self, host): base.CommandBase.__init__( @@ -1604,7 +1864,7 @@ "--type", default=u"", choices=("", "python", "python_file", "python_code"), - help=_(u"hook type to remove, empty to remove all (default: remove all)"), + help=_(u"hook type to remove, empty to remove all (DEFAULT: remove all)"), ) self.parser.add_argument( "-a", @@ -1613,7 +1873,7 @@ type=base.unicode_decoder, default=u"", help=_( - u"argument of the hook to remove, empty to remove all (default: remove all)" + u"argument of the hook to remove, empty to remove all (DEFAULT: remove all)" ), ) @@ -1697,6 +1957,7 @@ Node, Affiliations, Search, + Transform, Hook, Uri, )