Mercurial > libervia-backend
changeset 2777:ff1b40823b07
jp (pubsub): new "transform" command:
This command allows to pass all requested items through an external command to filter them (i.e. modify their content).
- created new jp.xml_tools module with some common functions (like lxml parsing)
- new EXIT code EXIT_CMD_ERROR (used when a third party utility returns an error)
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 15 Jan 2019 08:51:56 +0100 |
parents | 838f53730ce4 |
children | f7deb1c36b47 |
files | sat_frontends/jp/cmd_pubsub.py sat_frontends/jp/constants.py sat_frontends/jp/xml_tools.py |
diffstat | 3 files changed, 354 insertions(+), 29 deletions(-) [+] |
line wrap: on
line diff
--- a/sat_frontends/jp/cmd_pubsub.py Tue Jan 15 08:51:54 2019 +0100 +++ b/sat_frontends/jp/cmd_pubsub.py Tue Jan 15 08:51:56 2019 +0100 @@ -24,6 +24,7 @@ from sat_frontends.jp.constants import Const as C from sat_frontends.jp import common from sat_frontends.jp import arg_tools +from sat_frontends.jp import xml_tools from functools import partial from sat.tools.common import uri from sat.tools.common.ansi import ANSI as A @@ -683,26 +684,8 @@ self.host.quit(C.EXIT_OK) def start(self): - try: - from lxml import etree - except ImportError: - self.disp( - u'lxml module must be installed to use edit, please install it with "pip install lxml"', - error=True, - ) - self.host.quit(1) - try: - element = etree.parse(sys.stdin).getroot() - except Exception as e: - self.parser.error( - _(u"Can't parse the payload XML in input: {msg}").format(msg=e) - ) - if element.tag in ("item", "{http://jabber.org/protocol/pubsub}item"): - if len(element) > 1: - self.parser.error( - _(u"<item> can only have one child element (the payload)") - ) - element = element[0] + element, etree = xml_tools.etreeParse(self, sys.stdin) + element = xml_tools.getPayload(self, element) payload = etree.tostring(element, encoding="unicode") self.host.bridge.psItemSend( @@ -1070,7 +1053,7 @@ type=int, default=0, help=_( - u"maximum depth of recursion (will search linked nodes if > 0, default: 0)" + u"maximum depth of recursion (will search linked nodes if > 0, DEFAULT: 0)" ), ) self.parser.add_argument( @@ -1079,7 +1062,7 @@ type=int, default=30, help=_(u"maximum number of items to get per node ({} to get all items, " - u"default: 30)".format( C.NO_LIMIT)), + u"DEFAULT: 30)".format( C.NO_LIMIT)), ) self.parser.add_argument( "-N", @@ -1157,7 +1140,7 @@ const=("ignore-case", True), nargs="?", metavar="BOOLEAN", - help=_(u"(don't) ignore case in following filters (default: case sensitive)"), + help=_(u"(don't) ignore case in following filters (DEFAULT: case sensitive)"), ) flags.add_argument( "-I", @@ -1168,7 +1151,7 @@ const=("invert", True), nargs="?", metavar="BOOLEAN", - help=_(u"(don't) invert effect of following filters (default: don't invert)"), + help=_(u"(don't) invert effect of following filters (DEFAULT: don't invert)"), ) flags.add_argument( "-A", @@ -1179,7 +1162,7 @@ const=("dotall", True), nargs="?", metavar="BOOLEAN", - help=_(u"(don't) use DOTALL option for regex (default: don't use)"), + help=_(u"(don't) use DOTALL option for regex (DEFAULT: don't use)"), ) flags.add_argument( "-k", @@ -1199,7 +1182,7 @@ default="print", nargs="?", choices=("print", "exec", "external"), - help=_(u"action to do on found items (default: print)"), + help=_(u"action to do on found items (DEFAULT: print)"), ) self.parser.add_argument("command", nargs=argparse.REMAINDER) @@ -1474,6 +1457,283 @@ self.getItems(0, self.args.service, self.args.node, self.args.items) +class Transform(base.CommandBase): + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "transform", + use_pubsub=True, + pubsub_flags={C.NODE, C.MULTI_ITEMS}, + help=_(u"modify items of a node using an external command/script"), + ) + self.need_loop = True + + def add_parser_options(self): + self.parser.add_argument( + "--apply", + action="store_true", + help=_(u"apply transformation (DEFAULT: do a dry run)"), + ) + self.parser.add_argument( + "--admin", + action="store_true", + help=_(u"do a pubsub admin request, needed to change publisher"), + ) + self.parser.add_argument( + "-I", + "--ignore_errors", + action="store_true", + help=_( + u"if command return a non zero exit code, ignore the item and continue"), + ) + self.parser.add_argument( + "-A", + "--all", + action="store_true", + help=_(u"get all items by looping over all pages using RSM") + ) + self.parser.add_argument( + "command_path", + help=_(u"path to the command to use. Will be called repetitivly with an " + u"item as input. Output (full item XML) will be used as new one. " + u'Return "DELETE" string to delete the item, and "SKIP" to ignore it'), + ) + + def psAdminItemsSendCb(self, item_ids, metadata): + self.disp(_(u'items published with ids {item_ids}').format( + item_ids=u', '.join(item_ids))) + if self.args.all: + return self.handleNextPage(metadata) + else: + self.host.quit() + + def psItemsSendCb(self, item_id, metadata, show_mess=True): + if show_mess: + self.disp(u'item published with id {item_id}'.format(item_id=item_id)) + if self.items_sent == self.items_to_send: + if self.args.all: + return self.handleNextPage(metadata) + self.disp(u'all items published') + self.host.quit() + + def psRetractItemCb(self, item_id, metadata): + self.psItemsSendCb(item_id, metadata, show_mess=False) + + def handleNextPage(self, metadata): + """Retrieve new page through RSM or quit if we're in the last page + + use to handle --all option + @param metadata(dict): metadata as returned by psItemsGet + """ + try: + last = metadata[u'rsm_last'] + index = int(metadata[u'rsm_index']) + count = int(metadata[u'rsm_count']) + except KeyError: + self.disp(_(u"Can't retrieve all items, RSM metadata not available"), + error=True) + self.host.quit(C.EXIT_MISSING_FEATURE) + except ValueError as e: + self.disp(_(u"Can't retrieve all items, bad RSM metadata: {msg}") + .format(msg=e), error=True) + self.host.quit(C.EXIT_ERROR) + + if index + self.args.rsm_max >= count: + self.disp(_(u'All items transformed')) + self.host.quit(0) + + self.disp(_(u'Retrieving next page ({page_idx}/{page_total})').format( + page_idx = int(index/self.args.rsm_max) + 1, + page_total = int(count/self.args.rsm_max), + ) + ) + + extra = self.getPubsubExtra() + extra[u'rsm_after'] = last + self.host.bridge.psItemsGet( + self.args.service, + self.args.node, + self.args.rsm_max, + self.args.items, + "", + extra, + self.profile, + callback=self.psItemsGetCb, + errback=partial( + self.errback, + msg=_(u"can't retrieve items: {}"), + exit_code=C.EXIT_BRIDGE_ERRBACK, + ), + ) + + def psItemsGetCb(self, ps_result): + items, metadata = ps_result + if self.args.admin: + new_items = [] + else: + self.items_to_send = len(items) + self.items_sent = 0 + + for item in items: + if self.check_duplicates: + # this is used when we are not ordering by creation + # to avoid infinite loop + item_elt, __ = xml_tools.etreeParse(self, item) + item_id = item_elt.get('id') + if item_id in self.items_ids: + self.disp(_( + u"Duplicate found on item {item_id}, we have probably handled " + u"all items.").format(item_id=item_id)) + self.host.quit() + self.items_ids.append(item_id) + + # we launch the command to filter the item + try: + p = subprocess.Popen(self.args.command_path, stdin=subprocess.PIPE, + stdout=subprocess.PIPE) + except OSError as e: + exit_code = C.EXIT_CMD_NOT_FOUND if e.errno == 2 else C.EXIT_ERROR + e = str(e).decode('utf-8', errors="ignore") + self.disp(u"Can't execute the command: {msg}".format(msg=e), error=True) + self.host.quit(exit_code) + cmd_std_out, cmd_std_err = p.communicate(item.encode("utf-8")) + ret = p.wait() + if ret != 0: + self.disp(u"The command returned a non zero status while parsing the " + u"following item:\n\n{item}".format(item=item), error=True) + if self.args.ignore_errors: + if not self.args.admin: + self.items_to_send -= 1 + continue + else: + self.host.quit(C.EXIT_CMD_ERROR) + if cmd_std_err is not None: + cmd_std_err = cmd_std_err.decode('utf-8', errors='ignore') + self.disp(cmd_std_err, error=True) + cmd_std_out = cmd_std_out.strip() + if cmd_std_out == "DELETE": + item_elt, __ = xml_tools.etreeParse(self, item) + item_id = item_elt.get('id') + self.disp(_(u"Deleting item {item_id}").format(item_id=item_id)) + if self.args.apply: + if not self.args.admin: + # we need to increase the counter as if the item were re-published + self.items_sent += 1 + self.host.bridge.psRetractItem( + self.args.service, + self.args.node, + item_id, + False, + self.profile, + callback=partial(self.psRetractItemCb, metadata=metadata), + errback=partial( + self.errback, + msg=_(u"can't delete item: {}"), + exit_code=C.EXIT_BRIDGE_ERRBACK, + ), + ) + continue + elif cmd_std_out == "SKIP": + item_elt, __ = xml_tools.etreeParse(self, item) + item_id = item_elt.get('id') + self.disp(_(u"Skipping item {item_id}").format(item_id=item_id)) + if self.args.apply: + if not self.args.admin: + # see above + self.items_sent += 1 + self.psItemsSendCb(item_id, metadata, show_mess=False) + continue + element, etree = xml_tools.etreeParse(self, cmd_std_out) + + # at this point command has been run and we have a etree.Element object + if element.tag not in ("item", "{http://jabber.org/protocol/pubsub}item"): + self.disp(u"your script must return a whole item, this is not:\n{xml}" + .format(xml=etree.tostring(element, encoding="unicode")), error=True) + self.host.quit(C.EXIT_DATA_ERROR) + + if not self.args.apply: + # we have a dry run, we just display filtered items + serialised = etree.tostring(element, encoding=u'unicode', + pretty_print=True) + self.disp(serialised) + else: + # we will apply the change, either in admin request or as a simple + # pubsub one + if self.args.admin: + new_items.append(etree.tostring(element, encoding="unicode")) + else: + # there is currently no method to send several items at once + # so we publish them one by one + payload = etree.tostring(xml_tools.getPayload(self, element), + encoding="unicode") + item_id = element.get(u'id', '') + self.host.bridge.psItemSend( + self.args.service, + self.args.node, + payload, + item_id, + {}, + self.profile, + callback=partial(self.psItemsSendCb, metadata=metadata), + errback=partial( + self.errback, + msg=_(u"can't send item: {}"), + exit_code=C.EXIT_BRIDGE_ERRBACK, + ), + ) + self.items_sent += 1 + + if not self.args.apply: + # on dry run we have nothing to wait for, we can quit + if self.args.all: + return self.handleNextPage(metadata) + self.host.quit() + elif self.args.admin: + self.host.bridge.psAdminItemsSend( + self.args.service, + self.args.node, + new_items, + u"", + self.profile, + callback=partial(self.psAdminItemsSendCb, metadata=metadata), + errback=partial( + self.errback, + msg=_(u"can't send item: {}"), + exit_code=C.EXIT_BRIDGE_ERRBACK, + ), + ) + + def start(self): + if self.args.all and self.args.order_by != C.ORDER_BY_CREATION: + self.check_duplicates = True + self.items_ids = [] + self.disp(A.color( + A.FG_RED, A.BOLD, + u'/!\\ "--all" should be used with "--order-by creation" /!\\\n', + A.RESET, + u"We'll update items, so order may change during transformation,\n" + u"we'll try to mitigate that by stopping on first duplicate,\n" + u"but this method is not safe, and some items may be missed.\n---\n")) + else: + self.check_duplicates = False + self.host.bridge.psItemsGet( + self.args.service, + self.args.node, + self.args.max, + self.args.items, + "", + self.getPubsubExtra(), + self.profile, + callback=self.psItemsGetCb, + errback=partial( + self.errback, + msg=_(u"can't retrieve items: {}"), + exit_code=C.EXIT_BRIDGE_ERRBACK, + ), + ) + + class Uri(base.CommandBase): def __init__(self, host): base.CommandBase.__init__( @@ -1604,7 +1864,7 @@ "--type", default=u"", choices=("", "python", "python_file", "python_code"), - help=_(u"hook type to remove, empty to remove all (default: remove all)"), + help=_(u"hook type to remove, empty to remove all (DEFAULT: remove all)"), ) self.parser.add_argument( "-a", @@ -1613,7 +1873,7 @@ type=base.unicode_decoder, default=u"", help=_( - u"argument of the hook to remove, empty to remove all (default: remove all)" + u"argument of the hook to remove, empty to remove all (DEFAULT: remove all)" ), ) @@ -1697,6 +1957,7 @@ Node, Affiliations, Search, + Transform, Hook, Uri, )
--- a/sat_frontends/jp/constants.py Tue Jan 15 08:51:54 2019 +0100 +++ b/sat_frontends/jp/constants.py Tue Jan 15 08:51:56 2019 +0100 @@ -80,10 +80,12 @@ EXIT_BRIDGE_ERROR = 3 # can't connect to bridge EXIT_BRIDGE_ERRBACK = 4 # something went wrong when calling a bridge method EXIT_NOT_FOUND = 16 # an item required by a command was not found - EXIT_DATA_ERROR = 17 # data needed for a command is invalid + EXIT_DATA_ERROR = 17 # data needed for a command is invalid + EXIT_MISSING_FEATURE = 18 # a needed plugin or feature is not available EXIT_USER_CANCELLED = 20 # user cancelled action EXIT_FILE_NOT_EXE = ( 126 ) # a file to be executed was found, but it was not an executable utility (cf. man 1 exit) EXIT_CMD_NOT_FOUND = 127 # a utility to be executed was not found (cf. man 1 exit) + EXIT_CMD_ERROR = 127 # a utility to be executed returned an error exit code EXIT_SIGNAL_INT = 128 # a command was interrupted by a signal (cf. man 1 exit)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat_frontends/jp/xml_tools.py Tue Jan 15 08:51:56 2019 +0100 @@ -0,0 +1,62 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# jp: a SàT command line tool +# Copyright (C) 2009-2019 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from sat.core.i18n import _ +from sat_frontends.jp.constants import Const as C + +def etreeParse(cmd, raw_xml): + """Import lxml and parse raw XML + + @param cmd(CommandBase): current command instance + @param raw_xml(file, str): an XML bytestring, string or file-like object + @return (tuple(etree.Element, module): parsed element, etree module + """ + try: + from lxml import etree + except ImportError: + cmd.disp( + u'lxml module must be installed, please install it with "pip install lxml"', + error=True, + ) + cmd.host.quit(C.EXIT_ERROR) + try: + if isinstance(raw_xml, basestring): + parser = etree.XMLParser(remove_blank_text=True) + element = etree.fromstring(raw_xml, parser) + else: + element = etree.parse(raw_xml).getroot() + except Exception as e: + cmd.parser.error( + _(u"Can't parse the payload XML in input: {msg}").format(msg=e) + ) + return element, etree + +def getPayload(cmd, element): + """Retrieve payload element and exit with and error if not found + + @param element(etree.Element): root element + @return element(etree.Element): payload element + """ + if element.tag in ("item", "{http://jabber.org/protocol/pubsub}item"): + if len(element) > 1: + cmd.disp(_(u"<item> can only have one child element (the payload)"), + error=True) + cmd.host.quit(C.EXIT_DATA_ERROR) + element = element[0] + return element