Mercurial > libervia-backend
comparison sat_frontends/jp/cmd_pubsub.py @ 2777:ff1b40823b07
jp (pubsub): new "transform" command:
This command allows to pass all requested items through an external command to filter them (i.e. modify their content).
- created new jp.xml_tools module with some common functions (like lxml parsing)
- new EXIT code EXIT_CMD_ERROR (used when a third party utility returns an error)
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 15 Jan 2019 08:51:56 +0100 |
parents | b7974814dd5b |
children | f61a50790fae |
comparison
equal
deleted
inserted
replaced
2776:838f53730ce4 | 2777:ff1b40823b07 |
---|---|
22 from sat.core.i18n import _ | 22 from sat.core.i18n import _ |
23 from sat.core import exceptions | 23 from sat.core import exceptions |
24 from sat_frontends.jp.constants import Const as C | 24 from sat_frontends.jp.constants import Const as C |
25 from sat_frontends.jp import common | 25 from sat_frontends.jp import common |
26 from sat_frontends.jp import arg_tools | 26 from sat_frontends.jp import arg_tools |
27 from sat_frontends.jp import xml_tools | |
27 from functools import partial | 28 from functools import partial |
28 from sat.tools.common import uri | 29 from sat.tools.common import uri |
29 from sat.tools.common.ansi import ANSI as A | 30 from sat.tools.common.ansi import ANSI as A |
30 from sat_frontends.tools import jid, strings | 31 from sat_frontends.tools import jid, strings |
31 import argparse | 32 import argparse |
681 else: | 682 else: |
682 self.disp(u"Item published") | 683 self.disp(u"Item published") |
683 self.host.quit(C.EXIT_OK) | 684 self.host.quit(C.EXIT_OK) |
684 | 685 |
685 def start(self): | 686 def start(self): |
686 try: | 687 element, etree = xml_tools.etreeParse(self, sys.stdin) |
687 from lxml import etree | 688 element = xml_tools.getPayload(self, element) |
688 except ImportError: | |
689 self.disp( | |
690 u'lxml module must be installed to use edit, please install it with "pip install lxml"', | |
691 error=True, | |
692 ) | |
693 self.host.quit(1) | |
694 try: | |
695 element = etree.parse(sys.stdin).getroot() | |
696 except Exception as e: | |
697 self.parser.error( | |
698 _(u"Can't parse the payload XML in input: {msg}").format(msg=e) | |
699 ) | |
700 if element.tag in ("item", "{http://jabber.org/protocol/pubsub}item"): | |
701 if len(element) > 1: | |
702 self.parser.error( | |
703 _(u"<item> can only have one child element (the payload)") | |
704 ) | |
705 element = element[0] | |
706 payload = etree.tostring(element, encoding="unicode") | 689 payload = etree.tostring(element, encoding="unicode") |
707 | 690 |
708 self.host.bridge.psItemSend( | 691 self.host.bridge.psItemSend( |
709 self.args.service, | 692 self.args.service, |
710 self.args.node, | 693 self.args.node, |
1068 "-D", | 1051 "-D", |
1069 "--max-depth", | 1052 "--max-depth", |
1070 type=int, | 1053 type=int, |
1071 default=0, | 1054 default=0, |
1072 help=_( | 1055 help=_( |
1073 u"maximum depth of recursion (will search linked nodes if > 0, default: 0)" | 1056 u"maximum depth of recursion (will search linked nodes if > 0, DEFAULT: 0)" |
1074 ), | 1057 ), |
1075 ) | 1058 ) |
1076 self.parser.add_argument( | 1059 self.parser.add_argument( |
1077 "-M", | 1060 "-M", |
1078 "--node-max", | 1061 "--node-max", |
1079 type=int, | 1062 type=int, |
1080 default=30, | 1063 default=30, |
1081 help=_(u"maximum number of items to get per node ({} to get all items, " | 1064 help=_(u"maximum number of items to get per node ({} to get all items, " |
1082 u"default: 30)".format( C.NO_LIMIT)), | 1065 u"DEFAULT: 30)".format( C.NO_LIMIT)), |
1083 ) | 1066 ) |
1084 self.parser.add_argument( | 1067 self.parser.add_argument( |
1085 "-N", | 1068 "-N", |
1086 "--namespace", | 1069 "--namespace", |
1087 action="append", | 1070 action="append", |
1155 dest="filters", | 1138 dest="filters", |
1156 type=flag_case, | 1139 type=flag_case, |
1157 const=("ignore-case", True), | 1140 const=("ignore-case", True), |
1158 nargs="?", | 1141 nargs="?", |
1159 metavar="BOOLEAN", | 1142 metavar="BOOLEAN", |
1160 help=_(u"(don't) ignore case in following filters (default: case sensitive)"), | 1143 help=_(u"(don't) ignore case in following filters (DEFAULT: case sensitive)"), |
1161 ) | 1144 ) |
1162 flags.add_argument( | 1145 flags.add_argument( |
1163 "-I", | 1146 "-I", |
1164 "--invert", | 1147 "--invert", |
1165 action="append", | 1148 action="append", |
1166 dest="filters", | 1149 dest="filters", |
1167 type=flag_invert, | 1150 type=flag_invert, |
1168 const=("invert", True), | 1151 const=("invert", True), |
1169 nargs="?", | 1152 nargs="?", |
1170 metavar="BOOLEAN", | 1153 metavar="BOOLEAN", |
1171 help=_(u"(don't) invert effect of following filters (default: don't invert)"), | 1154 help=_(u"(don't) invert effect of following filters (DEFAULT: don't invert)"), |
1172 ) | 1155 ) |
1173 flags.add_argument( | 1156 flags.add_argument( |
1174 "-A", | 1157 "-A", |
1175 "--dot-all", | 1158 "--dot-all", |
1176 action="append", | 1159 action="append", |
1177 dest="filters", | 1160 dest="filters", |
1178 type=flag_dotall, | 1161 type=flag_dotall, |
1179 const=("dotall", True), | 1162 const=("dotall", True), |
1180 nargs="?", | 1163 nargs="?", |
1181 metavar="BOOLEAN", | 1164 metavar="BOOLEAN", |
1182 help=_(u"(don't) use DOTALL option for regex (default: don't use)"), | 1165 help=_(u"(don't) use DOTALL option for regex (DEFAULT: don't use)"), |
1183 ) | 1166 ) |
1184 flags.add_argument( | 1167 flags.add_argument( |
1185 "-k", | 1168 "-k", |
1186 "--only-matching", | 1169 "--only-matching", |
1187 action="append", | 1170 action="append", |
1197 self.parser.add_argument( | 1180 self.parser.add_argument( |
1198 "action", | 1181 "action", |
1199 default="print", | 1182 default="print", |
1200 nargs="?", | 1183 nargs="?", |
1201 choices=("print", "exec", "external"), | 1184 choices=("print", "exec", "external"), |
1202 help=_(u"action to do on found items (default: print)"), | 1185 help=_(u"action to do on found items (DEFAULT: print)"), |
1203 ) | 1186 ) |
1204 self.parser.add_argument("command", nargs=argparse.REMAINDER) | 1187 self.parser.add_argument("command", nargs=argparse.REMAINDER) |
1205 | 1188 |
1206 def psItemsGetEb(self, failure_, service, node): | 1189 def psItemsGetEb(self, failure_, service, node): |
1207 self.disp( | 1190 self.disp( |
1472 self.args.namespace + [("pubsub", "http://jabber.org/protocol/pubsub")] | 1455 self.args.namespace + [("pubsub", "http://jabber.org/protocol/pubsub")] |
1473 ) | 1456 ) |
1474 self.getItems(0, self.args.service, self.args.node, self.args.items) | 1457 self.getItems(0, self.args.service, self.args.node, self.args.items) |
1475 | 1458 |
1476 | 1459 |
1460 class Transform(base.CommandBase): | |
1461 def __init__(self, host): | |
1462 base.CommandBase.__init__( | |
1463 self, | |
1464 host, | |
1465 "transform", | |
1466 use_pubsub=True, | |
1467 pubsub_flags={C.NODE, C.MULTI_ITEMS}, | |
1468 help=_(u"modify items of a node using an external command/script"), | |
1469 ) | |
1470 self.need_loop = True | |
1471 | |
1472 def add_parser_options(self): | |
1473 self.parser.add_argument( | |
1474 "--apply", | |
1475 action="store_true", | |
1476 help=_(u"apply transformation (DEFAULT: do a dry run)"), | |
1477 ) | |
1478 self.parser.add_argument( | |
1479 "--admin", | |
1480 action="store_true", | |
1481 help=_(u"do a pubsub admin request, needed to change publisher"), | |
1482 ) | |
1483 self.parser.add_argument( | |
1484 "-I", | |
1485 "--ignore_errors", | |
1486 action="store_true", | |
1487 help=_( | |
1488 u"if command return a non zero exit code, ignore the item and continue"), | |
1489 ) | |
1490 self.parser.add_argument( | |
1491 "-A", | |
1492 "--all", | |
1493 action="store_true", | |
1494 help=_(u"get all items by looping over all pages using RSM") | |
1495 ) | |
1496 self.parser.add_argument( | |
1497 "command_path", | |
1498 help=_(u"path to the command to use. Will be called repetitivly with an " | |
1499 u"item as input. Output (full item XML) will be used as new one. " | |
1500 u'Return "DELETE" string to delete the item, and "SKIP" to ignore it'), | |
1501 ) | |
1502 | |
1503 def psAdminItemsSendCb(self, item_ids, metadata): | |
1504 self.disp(_(u'items published with ids {item_ids}').format( | |
1505 item_ids=u', '.join(item_ids))) | |
1506 if self.args.all: | |
1507 return self.handleNextPage(metadata) | |
1508 else: | |
1509 self.host.quit() | |
1510 | |
1511 def psItemsSendCb(self, item_id, metadata, show_mess=True): | |
1512 if show_mess: | |
1513 self.disp(u'item published with id {item_id}'.format(item_id=item_id)) | |
1514 if self.items_sent == self.items_to_send: | |
1515 if self.args.all: | |
1516 return self.handleNextPage(metadata) | |
1517 self.disp(u'all items published') | |
1518 self.host.quit() | |
1519 | |
1520 def psRetractItemCb(self, item_id, metadata): | |
1521 self.psItemsSendCb(item_id, metadata, show_mess=False) | |
1522 | |
1523 def handleNextPage(self, metadata): | |
1524 """Retrieve new page through RSM or quit if we're in the last page | |
1525 | |
1526 use to handle --all option | |
1527 @param metadata(dict): metadata as returned by psItemsGet | |
1528 """ | |
1529 try: | |
1530 last = metadata[u'rsm_last'] | |
1531 index = int(metadata[u'rsm_index']) | |
1532 count = int(metadata[u'rsm_count']) | |
1533 except KeyError: | |
1534 self.disp(_(u"Can't retrieve all items, RSM metadata not available"), | |
1535 error=True) | |
1536 self.host.quit(C.EXIT_MISSING_FEATURE) | |
1537 except ValueError as e: | |
1538 self.disp(_(u"Can't retrieve all items, bad RSM metadata: {msg}") | |
1539 .format(msg=e), error=True) | |
1540 self.host.quit(C.EXIT_ERROR) | |
1541 | |
1542 if index + self.args.rsm_max >= count: | |
1543 self.disp(_(u'All items transformed')) | |
1544 self.host.quit(0) | |
1545 | |
1546 self.disp(_(u'Retrieving next page ({page_idx}/{page_total})').format( | |
1547 page_idx = int(index/self.args.rsm_max) + 1, | |
1548 page_total = int(count/self.args.rsm_max), | |
1549 ) | |
1550 ) | |
1551 | |
1552 extra = self.getPubsubExtra() | |
1553 extra[u'rsm_after'] = last | |
1554 self.host.bridge.psItemsGet( | |
1555 self.args.service, | |
1556 self.args.node, | |
1557 self.args.rsm_max, | |
1558 self.args.items, | |
1559 "", | |
1560 extra, | |
1561 self.profile, | |
1562 callback=self.psItemsGetCb, | |
1563 errback=partial( | |
1564 self.errback, | |
1565 msg=_(u"can't retrieve items: {}"), | |
1566 exit_code=C.EXIT_BRIDGE_ERRBACK, | |
1567 ), | |
1568 ) | |
1569 | |
1570 def psItemsGetCb(self, ps_result): | |
1571 items, metadata = ps_result | |
1572 if self.args.admin: | |
1573 new_items = [] | |
1574 else: | |
1575 self.items_to_send = len(items) | |
1576 self.items_sent = 0 | |
1577 | |
1578 for item in items: | |
1579 if self.check_duplicates: | |
1580 # this is used when we are not ordering by creation | |
1581 # to avoid infinite loop | |
1582 item_elt, __ = xml_tools.etreeParse(self, item) | |
1583 item_id = item_elt.get('id') | |
1584 if item_id in self.items_ids: | |
1585 self.disp(_( | |
1586 u"Duplicate found on item {item_id}, we have probably handled " | |
1587 u"all items.").format(item_id=item_id)) | |
1588 self.host.quit() | |
1589 self.items_ids.append(item_id) | |
1590 | |
1591 # we launch the command to filter the item | |
1592 try: | |
1593 p = subprocess.Popen(self.args.command_path, stdin=subprocess.PIPE, | |
1594 stdout=subprocess.PIPE) | |
1595 except OSError as e: | |
1596 exit_code = C.EXIT_CMD_NOT_FOUND if e.errno == 2 else C.EXIT_ERROR | |
1597 e = str(e).decode('utf-8', errors="ignore") | |
1598 self.disp(u"Can't execute the command: {msg}".format(msg=e), error=True) | |
1599 self.host.quit(exit_code) | |
1600 cmd_std_out, cmd_std_err = p.communicate(item.encode("utf-8")) | |
1601 ret = p.wait() | |
1602 if ret != 0: | |
1603 self.disp(u"The command returned a non zero status while parsing the " | |
1604 u"following item:\n\n{item}".format(item=item), error=True) | |
1605 if self.args.ignore_errors: | |
1606 if not self.args.admin: | |
1607 self.items_to_send -= 1 | |
1608 continue | |
1609 else: | |
1610 self.host.quit(C.EXIT_CMD_ERROR) | |
1611 if cmd_std_err is not None: | |
1612 cmd_std_err = cmd_std_err.decode('utf-8', errors='ignore') | |
1613 self.disp(cmd_std_err, error=True) | |
1614 cmd_std_out = cmd_std_out.strip() | |
1615 if cmd_std_out == "DELETE": | |
1616 item_elt, __ = xml_tools.etreeParse(self, item) | |
1617 item_id = item_elt.get('id') | |
1618 self.disp(_(u"Deleting item {item_id}").format(item_id=item_id)) | |
1619 if self.args.apply: | |
1620 if not self.args.admin: | |
1621 # we need to increase the counter as if the item were re-published | |
1622 self.items_sent += 1 | |
1623 self.host.bridge.psRetractItem( | |
1624 self.args.service, | |
1625 self.args.node, | |
1626 item_id, | |
1627 False, | |
1628 self.profile, | |
1629 callback=partial(self.psRetractItemCb, metadata=metadata), | |
1630 errback=partial( | |
1631 self.errback, | |
1632 msg=_(u"can't delete item: {}"), | |
1633 exit_code=C.EXIT_BRIDGE_ERRBACK, | |
1634 ), | |
1635 ) | |
1636 continue | |
1637 elif cmd_std_out == "SKIP": | |
1638 item_elt, __ = xml_tools.etreeParse(self, item) | |
1639 item_id = item_elt.get('id') | |
1640 self.disp(_(u"Skipping item {item_id}").format(item_id=item_id)) | |
1641 if self.args.apply: | |
1642 if not self.args.admin: | |
1643 # see above | |
1644 self.items_sent += 1 | |
1645 self.psItemsSendCb(item_id, metadata, show_mess=False) | |
1646 continue | |
1647 element, etree = xml_tools.etreeParse(self, cmd_std_out) | |
1648 | |
1649 # at this point command has been run and we have a etree.Element object | |
1650 if element.tag not in ("item", "{http://jabber.org/protocol/pubsub}item"): | |
1651 self.disp(u"your script must return a whole item, this is not:\n{xml}" | |
1652 .format(xml=etree.tostring(element, encoding="unicode")), error=True) | |
1653 self.host.quit(C.EXIT_DATA_ERROR) | |
1654 | |
1655 if not self.args.apply: | |
1656 # we have a dry run, we just display filtered items | |
1657 serialised = etree.tostring(element, encoding=u'unicode', | |
1658 pretty_print=True) | |
1659 self.disp(serialised) | |
1660 else: | |
1661 # we will apply the change, either in admin request or as a simple | |
1662 # pubsub one | |
1663 if self.args.admin: | |
1664 new_items.append(etree.tostring(element, encoding="unicode")) | |
1665 else: | |
1666 # there is currently no method to send several items at once | |
1667 # so we publish them one by one | |
1668 payload = etree.tostring(xml_tools.getPayload(self, element), | |
1669 encoding="unicode") | |
1670 item_id = element.get(u'id', '') | |
1671 self.host.bridge.psItemSend( | |
1672 self.args.service, | |
1673 self.args.node, | |
1674 payload, | |
1675 item_id, | |
1676 {}, | |
1677 self.profile, | |
1678 callback=partial(self.psItemsSendCb, metadata=metadata), | |
1679 errback=partial( | |
1680 self.errback, | |
1681 msg=_(u"can't send item: {}"), | |
1682 exit_code=C.EXIT_BRIDGE_ERRBACK, | |
1683 ), | |
1684 ) | |
1685 self.items_sent += 1 | |
1686 | |
1687 if not self.args.apply: | |
1688 # on dry run we have nothing to wait for, we can quit | |
1689 if self.args.all: | |
1690 return self.handleNextPage(metadata) | |
1691 self.host.quit() | |
1692 elif self.args.admin: | |
1693 self.host.bridge.psAdminItemsSend( | |
1694 self.args.service, | |
1695 self.args.node, | |
1696 new_items, | |
1697 u"", | |
1698 self.profile, | |
1699 callback=partial(self.psAdminItemsSendCb, metadata=metadata), | |
1700 errback=partial( | |
1701 self.errback, | |
1702 msg=_(u"can't send item: {}"), | |
1703 exit_code=C.EXIT_BRIDGE_ERRBACK, | |
1704 ), | |
1705 ) | |
1706 | |
1707 def start(self): | |
1708 if self.args.all and self.args.order_by != C.ORDER_BY_CREATION: | |
1709 self.check_duplicates = True | |
1710 self.items_ids = [] | |
1711 self.disp(A.color( | |
1712 A.FG_RED, A.BOLD, | |
1713 u'/!\\ "--all" should be used with "--order-by creation" /!\\\n', | |
1714 A.RESET, | |
1715 u"We'll update items, so order may change during transformation,\n" | |
1716 u"we'll try to mitigate that by stopping on first duplicate,\n" | |
1717 u"but this method is not safe, and some items may be missed.\n---\n")) | |
1718 else: | |
1719 self.check_duplicates = False | |
1720 self.host.bridge.psItemsGet( | |
1721 self.args.service, | |
1722 self.args.node, | |
1723 self.args.max, | |
1724 self.args.items, | |
1725 "", | |
1726 self.getPubsubExtra(), | |
1727 self.profile, | |
1728 callback=self.psItemsGetCb, | |
1729 errback=partial( | |
1730 self.errback, | |
1731 msg=_(u"can't retrieve items: {}"), | |
1732 exit_code=C.EXIT_BRIDGE_ERRBACK, | |
1733 ), | |
1734 ) | |
1735 | |
1736 | |
1477 class Uri(base.CommandBase): | 1737 class Uri(base.CommandBase): |
1478 def __init__(self, host): | 1738 def __init__(self, host): |
1479 base.CommandBase.__init__( | 1739 base.CommandBase.__init__( |
1480 self, | 1740 self, |
1481 host, | 1741 host, |
1602 self.parser.add_argument( | 1862 self.parser.add_argument( |
1603 "-t", | 1863 "-t", |
1604 "--type", | 1864 "--type", |
1605 default=u"", | 1865 default=u"", |
1606 choices=("", "python", "python_file", "python_code"), | 1866 choices=("", "python", "python_file", "python_code"), |
1607 help=_(u"hook type to remove, empty to remove all (default: remove all)"), | 1867 help=_(u"hook type to remove, empty to remove all (DEFAULT: remove all)"), |
1608 ) | 1868 ) |
1609 self.parser.add_argument( | 1869 self.parser.add_argument( |
1610 "-a", | 1870 "-a", |
1611 "--arg", | 1871 "--arg", |
1612 dest="hook_arg", | 1872 dest="hook_arg", |
1613 type=base.unicode_decoder, | 1873 type=base.unicode_decoder, |
1614 default=u"", | 1874 default=u"", |
1615 help=_( | 1875 help=_( |
1616 u"argument of the hook to remove, empty to remove all (default: remove all)" | 1876 u"argument of the hook to remove, empty to remove all (DEFAULT: remove all)" |
1617 ), | 1877 ), |
1618 ) | 1878 ) |
1619 | 1879 |
1620 def psHookRemoveCb(self, nb_deleted): | 1880 def psHookRemoveCb(self, nb_deleted): |
1621 self.disp( | 1881 self.disp( |
1695 Unsubscribe, | 1955 Unsubscribe, |
1696 Subscriptions, | 1956 Subscriptions, |
1697 Node, | 1957 Node, |
1698 Affiliations, | 1958 Affiliations, |
1699 Search, | 1959 Search, |
1960 Transform, | |
1700 Hook, | 1961 Hook, |
1701 Uri, | 1962 Uri, |
1702 ) | 1963 ) |
1703 | 1964 |
1704 def __init__(self, host): | 1965 def __init__(self, host): |