diff sat_frontends/jp/cmd_pubsub.py @ 2777:ff1b40823b07

jp (pubsub): new "transform" command: This command allows to pass all requested items through an external command to filter them (i.e. modify their content). - created new jp.xml_tools module with some common functions (like lxml parsing) - new EXIT code EXIT_CMD_ERROR (used when a third party utility returns an error)
author Goffi <goffi@goffi.org>
date Tue, 15 Jan 2019 08:51:56 +0100
parents b7974814dd5b
children f61a50790fae
line wrap: on
line diff
--- a/sat_frontends/jp/cmd_pubsub.py	Tue Jan 15 08:51:54 2019 +0100
+++ b/sat_frontends/jp/cmd_pubsub.py	Tue Jan 15 08:51:56 2019 +0100
@@ -24,6 +24,7 @@
 from sat_frontends.jp.constants import Const as C
 from sat_frontends.jp import common
 from sat_frontends.jp import arg_tools
+from sat_frontends.jp import xml_tools
 from functools import partial
 from sat.tools.common import uri
 from sat.tools.common.ansi import ANSI as A
@@ -683,26 +684,8 @@
         self.host.quit(C.EXIT_OK)
 
     def start(self):
-        try:
-            from lxml import etree
-        except ImportError:
-            self.disp(
-                u'lxml module must be installed to use edit, please install it with "pip install lxml"',
-                error=True,
-            )
-            self.host.quit(1)
-        try:
-            element = etree.parse(sys.stdin).getroot()
-        except Exception as e:
-            self.parser.error(
-                _(u"Can't parse the payload XML in input: {msg}").format(msg=e)
-            )
-        if element.tag in ("item", "{http://jabber.org/protocol/pubsub}item"):
-            if len(element) > 1:
-                self.parser.error(
-                    _(u"<item> can only have one child element (the payload)")
-                )
-            element = element[0]
+        element, etree = xml_tools.etreeParse(self, sys.stdin)
+        element = xml_tools.getPayload(self, element)
         payload = etree.tostring(element, encoding="unicode")
 
         self.host.bridge.psItemSend(
@@ -1070,7 +1053,7 @@
             type=int,
             default=0,
             help=_(
-                u"maximum depth of recursion (will search linked nodes if > 0, default: 0)"
+                u"maximum depth of recursion (will search linked nodes if > 0, DEFAULT: 0)"
             ),
         )
         self.parser.add_argument(
@@ -1079,7 +1062,7 @@
             type=int,
             default=30,
             help=_(u"maximum number of items to get per node ({} to get all items, "
-                   u"default: 30)".format( C.NO_LIMIT)),
+                   u"DEFAULT: 30)".format( C.NO_LIMIT)),
         )
         self.parser.add_argument(
             "-N",
@@ -1157,7 +1140,7 @@
             const=("ignore-case", True),
             nargs="?",
             metavar="BOOLEAN",
-            help=_(u"(don't) ignore case in following filters (default: case sensitive)"),
+            help=_(u"(don't) ignore case in following filters (DEFAULT: case sensitive)"),
         )
         flags.add_argument(
             "-I",
@@ -1168,7 +1151,7 @@
             const=("invert", True),
             nargs="?",
             metavar="BOOLEAN",
-            help=_(u"(don't) invert effect of following filters (default: don't invert)"),
+            help=_(u"(don't) invert effect of following filters (DEFAULT: don't invert)"),
         )
         flags.add_argument(
             "-A",
@@ -1179,7 +1162,7 @@
             const=("dotall", True),
             nargs="?",
             metavar="BOOLEAN",
-            help=_(u"(don't) use DOTALL option for regex (default: don't use)"),
+            help=_(u"(don't) use DOTALL option for regex (DEFAULT: don't use)"),
         )
         flags.add_argument(
             "-k",
@@ -1199,7 +1182,7 @@
             default="print",
             nargs="?",
             choices=("print", "exec", "external"),
-            help=_(u"action to do on found items (default: print)"),
+            help=_(u"action to do on found items (DEFAULT: print)"),
         )
         self.parser.add_argument("command", nargs=argparse.REMAINDER)
 
@@ -1474,6 +1457,283 @@
         self.getItems(0, self.args.service, self.args.node, self.args.items)
 
 
+class Transform(base.CommandBase):
+    def __init__(self, host):
+        base.CommandBase.__init__(
+            self,
+            host,
+            "transform",
+            use_pubsub=True,
+            pubsub_flags={C.NODE, C.MULTI_ITEMS},
+            help=_(u"modify items of a node using an external command/script"),
+        )
+        self.need_loop = True
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "--apply",
+            action="store_true",
+            help=_(u"apply transformation (DEFAULT: do a dry run)"),
+        )
+        self.parser.add_argument(
+            "--admin",
+            action="store_true",
+            help=_(u"do a pubsub admin request, needed to change publisher"),
+        )
+        self.parser.add_argument(
+            "-I",
+            "--ignore_errors",
+            action="store_true",
+            help=_(
+                u"if command return a non zero exit code, ignore the item and continue"),
+        )
+        self.parser.add_argument(
+            "-A",
+            "--all",
+            action="store_true",
+            help=_(u"get all items by looping over all pages using RSM")
+        )
+        self.parser.add_argument(
+            "command_path",
+            help=_(u"path to the command to use. Will be called repetitivly with an "
+                   u"item as input. Output (full item XML) will be used as new one. "
+                   u'Return "DELETE" string to delete the item, and "SKIP" to ignore it'),
+        )
+
+    def psAdminItemsSendCb(self, item_ids, metadata):
+        self.disp(_(u'items published with ids {item_ids}').format(
+            item_ids=u', '.join(item_ids)))
+        if self.args.all:
+            return self.handleNextPage(metadata)
+        else:
+            self.host.quit()
+
+    def psItemsSendCb(self, item_id, metadata, show_mess=True):
+        if show_mess:
+            self.disp(u'item published with id {item_id}'.format(item_id=item_id))
+        if self.items_sent == self.items_to_send:
+            if self.args.all:
+                return self.handleNextPage(metadata)
+            self.disp(u'all items published')
+            self.host.quit()
+
+    def psRetractItemCb(self, item_id, metadata):
+        self.psItemsSendCb(item_id, metadata, show_mess=False)
+
+    def handleNextPage(self, metadata):
+        """Retrieve new page through RSM or quit if we're in the last page
+
+        use to handle --all option
+        @param metadata(dict): metadata as returned by psItemsGet
+        """
+        try:
+            last = metadata[u'rsm_last']
+            index = int(metadata[u'rsm_index'])
+            count = int(metadata[u'rsm_count'])
+        except KeyError:
+            self.disp(_(u"Can't retrieve all items, RSM metadata not available"),
+                      error=True)
+            self.host.quit(C.EXIT_MISSING_FEATURE)
+        except ValueError as e:
+            self.disp(_(u"Can't retrieve all items, bad RSM metadata: {msg}")
+                      .format(msg=e), error=True)
+            self.host.quit(C.EXIT_ERROR)
+
+        if index + self.args.rsm_max >= count:
+            self.disp(_(u'All items transformed'))
+            self.host.quit(0)
+
+        self.disp(_(u'Retrieving next page ({page_idx}/{page_total})').format(
+            page_idx = int(index/self.args.rsm_max) + 1,
+            page_total = int(count/self.args.rsm_max),
+            )
+        )
+
+        extra = self.getPubsubExtra()
+        extra[u'rsm_after'] = last
+        self.host.bridge.psItemsGet(
+            self.args.service,
+            self.args.node,
+            self.args.rsm_max,
+            self.args.items,
+            "",
+            extra,
+            self.profile,
+            callback=self.psItemsGetCb,
+            errback=partial(
+                self.errback,
+                msg=_(u"can't retrieve items: {}"),
+                exit_code=C.EXIT_BRIDGE_ERRBACK,
+            ),
+        )
+
+    def psItemsGetCb(self, ps_result):
+        items, metadata = ps_result
+        if self.args.admin:
+            new_items = []
+        else:
+            self.items_to_send = len(items)
+            self.items_sent = 0
+
+        for item in items:
+            if self.check_duplicates:
+                # this is used when we are not ordering by creation
+                # to avoid infinite loop
+                item_elt, __ = xml_tools.etreeParse(self, item)
+                item_id = item_elt.get('id')
+                if item_id in self.items_ids:
+                    self.disp(_(
+                        u"Duplicate found on item {item_id}, we have probably handled "
+                        u"all items.").format(item_id=item_id))
+                    self.host.quit()
+                self.items_ids.append(item_id)
+
+            # we launch the command to filter the item
+            try:
+                p = subprocess.Popen(self.args.command_path, stdin=subprocess.PIPE,
+                                                             stdout=subprocess.PIPE)
+            except OSError as e:
+                exit_code = C.EXIT_CMD_NOT_FOUND if e.errno == 2 else C.EXIT_ERROR
+                e = str(e).decode('utf-8', errors="ignore")
+                self.disp(u"Can't execute the command: {msg}".format(msg=e), error=True)
+                self.host.quit(exit_code)
+            cmd_std_out, cmd_std_err = p.communicate(item.encode("utf-8"))
+            ret = p.wait()
+            if ret != 0:
+                self.disp(u"The command returned a non zero status while parsing the "
+                          u"following item:\n\n{item}".format(item=item), error=True)
+                if self.args.ignore_errors:
+                    if not self.args.admin:
+                        self.items_to_send -= 1
+                    continue
+                else:
+                    self.host.quit(C.EXIT_CMD_ERROR)
+            if cmd_std_err is not None:
+                cmd_std_err = cmd_std_err.decode('utf-8', errors='ignore')
+                self.disp(cmd_std_err, error=True)
+            cmd_std_out = cmd_std_out.strip()
+            if cmd_std_out == "DELETE":
+                item_elt, __ = xml_tools.etreeParse(self, item)
+                item_id = item_elt.get('id')
+                self.disp(_(u"Deleting item {item_id}").format(item_id=item_id))
+                if self.args.apply:
+                    if not self.args.admin:
+                        # we need to increase the counter as if the item were re-published
+                        self.items_sent += 1
+                    self.host.bridge.psRetractItem(
+                        self.args.service,
+                        self.args.node,
+                        item_id,
+                        False,
+                        self.profile,
+                        callback=partial(self.psRetractItemCb, metadata=metadata),
+                        errback=partial(
+                            self.errback,
+                            msg=_(u"can't delete item: {}"),
+                            exit_code=C.EXIT_BRIDGE_ERRBACK,
+                        ),
+                    )
+                continue
+            elif cmd_std_out == "SKIP":
+                item_elt, __ = xml_tools.etreeParse(self, item)
+                item_id = item_elt.get('id')
+                self.disp(_(u"Skipping item {item_id}").format(item_id=item_id))
+                if self.args.apply:
+                    if not self.args.admin:
+                        # see above
+                        self.items_sent += 1
+                        self.psItemsSendCb(item_id, metadata, show_mess=False)
+                continue
+            element, etree = xml_tools.etreeParse(self, cmd_std_out)
+
+            # at this point command has been run and we have a etree.Element object
+            if element.tag not in ("item", "{http://jabber.org/protocol/pubsub}item"):
+                self.disp(u"your script must return a whole item, this is not:\n{xml}"
+                    .format(xml=etree.tostring(element, encoding="unicode")), error=True)
+                self.host.quit(C.EXIT_DATA_ERROR)
+
+            if not self.args.apply:
+                # we have a dry run, we just display filtered items
+                serialised = etree.tostring(element, encoding=u'unicode',
+                                            pretty_print=True)
+                self.disp(serialised)
+            else:
+                # we will apply the change, either in admin request or as a simple
+                # pubsub one
+                if self.args.admin:
+                    new_items.append(etree.tostring(element, encoding="unicode"))
+                else:
+                    # there is currently no method to send several items at once
+                    # so we publish them one by one
+                    payload = etree.tostring(xml_tools.getPayload(self, element),
+                                             encoding="unicode")
+                    item_id = element.get(u'id', '')
+                    self.host.bridge.psItemSend(
+                        self.args.service,
+                        self.args.node,
+                        payload,
+                        item_id,
+                        {},
+                        self.profile,
+                        callback=partial(self.psItemsSendCb, metadata=metadata),
+                        errback=partial(
+                            self.errback,
+                            msg=_(u"can't send item: {}"),
+                            exit_code=C.EXIT_BRIDGE_ERRBACK,
+                        ),
+                    )
+                    self.items_sent += 1
+
+        if not self.args.apply:
+            # on dry run we have nothing to wait for, we can quit
+            if self.args.all:
+                return self.handleNextPage(metadata)
+            self.host.quit()
+        elif self.args.admin:
+            self.host.bridge.psAdminItemsSend(
+                self.args.service,
+                self.args.node,
+                new_items,
+                u"",
+                self.profile,
+                callback=partial(self.psAdminItemsSendCb, metadata=metadata),
+                errback=partial(
+                    self.errback,
+                    msg=_(u"can't send item: {}"),
+                    exit_code=C.EXIT_BRIDGE_ERRBACK,
+                ),
+            )
+
+    def start(self):
+        if self.args.all and self.args.order_by != C.ORDER_BY_CREATION:
+            self.check_duplicates = True
+            self.items_ids = []
+            self.disp(A.color(
+                A.FG_RED, A.BOLD,
+                u'/!\\ "--all" should be used with "--order-by creation" /!\\\n',
+                A.RESET,
+                u"We'll update items, so order may change during transformation,\n"
+                u"we'll try to mitigate that by stopping on first duplicate,\n"
+                u"but this method is not safe, and some items may be missed.\n---\n"))
+        else:
+            self.check_duplicates = False
+        self.host.bridge.psItemsGet(
+            self.args.service,
+            self.args.node,
+            self.args.max,
+            self.args.items,
+            "",
+            self.getPubsubExtra(),
+            self.profile,
+            callback=self.psItemsGetCb,
+            errback=partial(
+                self.errback,
+                msg=_(u"can't retrieve items: {}"),
+                exit_code=C.EXIT_BRIDGE_ERRBACK,
+            ),
+        )
+
+
 class Uri(base.CommandBase):
     def __init__(self, host):
         base.CommandBase.__init__(
@@ -1604,7 +1864,7 @@
             "--type",
             default=u"",
             choices=("", "python", "python_file", "python_code"),
-            help=_(u"hook type to remove, empty to remove all (default: remove all)"),
+            help=_(u"hook type to remove, empty to remove all (DEFAULT: remove all)"),
         )
         self.parser.add_argument(
             "-a",
@@ -1613,7 +1873,7 @@
             type=base.unicode_decoder,
             default=u"",
             help=_(
-                u"argument of the hook to remove, empty to remove all (default: remove all)"
+                u"argument of the hook to remove, empty to remove all (DEFAULT: remove all)"
             ),
         )
 
@@ -1697,6 +1957,7 @@
         Node,
         Affiliations,
         Search,
+        Transform,
         Hook,
         Uri,
     )