comparison sat_frontends/jp/cmd_pubsub.py @ 2777:ff1b40823b07

jp (pubsub): new "transform" command: This command allows to pass all requested items through an external command to filter them (i.e. modify their content). - created new jp.xml_tools module with some common functions (like lxml parsing) - new EXIT code EXIT_CMD_ERROR (used when a third party utility returns an error)
author Goffi <goffi@goffi.org>
date Tue, 15 Jan 2019 08:51:56 +0100
parents b7974814dd5b
children f61a50790fae
comparison
equal deleted inserted replaced
2776:838f53730ce4 2777:ff1b40823b07
22 from sat.core.i18n import _ 22 from sat.core.i18n import _
23 from sat.core import exceptions 23 from sat.core import exceptions
24 from sat_frontends.jp.constants import Const as C 24 from sat_frontends.jp.constants import Const as C
25 from sat_frontends.jp import common 25 from sat_frontends.jp import common
26 from sat_frontends.jp import arg_tools 26 from sat_frontends.jp import arg_tools
27 from sat_frontends.jp import xml_tools
27 from functools import partial 28 from functools import partial
28 from sat.tools.common import uri 29 from sat.tools.common import uri
29 from sat.tools.common.ansi import ANSI as A 30 from sat.tools.common.ansi import ANSI as A
30 from sat_frontends.tools import jid, strings 31 from sat_frontends.tools import jid, strings
31 import argparse 32 import argparse
681 else: 682 else:
682 self.disp(u"Item published") 683 self.disp(u"Item published")
683 self.host.quit(C.EXIT_OK) 684 self.host.quit(C.EXIT_OK)
684 685
685 def start(self): 686 def start(self):
686 try: 687 element, etree = xml_tools.etreeParse(self, sys.stdin)
687 from lxml import etree 688 element = xml_tools.getPayload(self, element)
688 except ImportError:
689 self.disp(
690 u'lxml module must be installed to use edit, please install it with "pip install lxml"',
691 error=True,
692 )
693 self.host.quit(1)
694 try:
695 element = etree.parse(sys.stdin).getroot()
696 except Exception as e:
697 self.parser.error(
698 _(u"Can't parse the payload XML in input: {msg}").format(msg=e)
699 )
700 if element.tag in ("item", "{http://jabber.org/protocol/pubsub}item"):
701 if len(element) > 1:
702 self.parser.error(
703 _(u"<item> can only have one child element (the payload)")
704 )
705 element = element[0]
706 payload = etree.tostring(element, encoding="unicode") 689 payload = etree.tostring(element, encoding="unicode")
707 690
708 self.host.bridge.psItemSend( 691 self.host.bridge.psItemSend(
709 self.args.service, 692 self.args.service,
710 self.args.node, 693 self.args.node,
1068 "-D", 1051 "-D",
1069 "--max-depth", 1052 "--max-depth",
1070 type=int, 1053 type=int,
1071 default=0, 1054 default=0,
1072 help=_( 1055 help=_(
1073 u"maximum depth of recursion (will search linked nodes if > 0, default: 0)" 1056 u"maximum depth of recursion (will search linked nodes if > 0, DEFAULT: 0)"
1074 ), 1057 ),
1075 ) 1058 )
1076 self.parser.add_argument( 1059 self.parser.add_argument(
1077 "-M", 1060 "-M",
1078 "--node-max", 1061 "--node-max",
1079 type=int, 1062 type=int,
1080 default=30, 1063 default=30,
1081 help=_(u"maximum number of items to get per node ({} to get all items, " 1064 help=_(u"maximum number of items to get per node ({} to get all items, "
1082 u"default: 30)".format( C.NO_LIMIT)), 1065 u"DEFAULT: 30)".format( C.NO_LIMIT)),
1083 ) 1066 )
1084 self.parser.add_argument( 1067 self.parser.add_argument(
1085 "-N", 1068 "-N",
1086 "--namespace", 1069 "--namespace",
1087 action="append", 1070 action="append",
1155 dest="filters", 1138 dest="filters",
1156 type=flag_case, 1139 type=flag_case,
1157 const=("ignore-case", True), 1140 const=("ignore-case", True),
1158 nargs="?", 1141 nargs="?",
1159 metavar="BOOLEAN", 1142 metavar="BOOLEAN",
1160 help=_(u"(don't) ignore case in following filters (default: case sensitive)"), 1143 help=_(u"(don't) ignore case in following filters (DEFAULT: case sensitive)"),
1161 ) 1144 )
1162 flags.add_argument( 1145 flags.add_argument(
1163 "-I", 1146 "-I",
1164 "--invert", 1147 "--invert",
1165 action="append", 1148 action="append",
1166 dest="filters", 1149 dest="filters",
1167 type=flag_invert, 1150 type=flag_invert,
1168 const=("invert", True), 1151 const=("invert", True),
1169 nargs="?", 1152 nargs="?",
1170 metavar="BOOLEAN", 1153 metavar="BOOLEAN",
1171 help=_(u"(don't) invert effect of following filters (default: don't invert)"), 1154 help=_(u"(don't) invert effect of following filters (DEFAULT: don't invert)"),
1172 ) 1155 )
1173 flags.add_argument( 1156 flags.add_argument(
1174 "-A", 1157 "-A",
1175 "--dot-all", 1158 "--dot-all",
1176 action="append", 1159 action="append",
1177 dest="filters", 1160 dest="filters",
1178 type=flag_dotall, 1161 type=flag_dotall,
1179 const=("dotall", True), 1162 const=("dotall", True),
1180 nargs="?", 1163 nargs="?",
1181 metavar="BOOLEAN", 1164 metavar="BOOLEAN",
1182 help=_(u"(don't) use DOTALL option for regex (default: don't use)"), 1165 help=_(u"(don't) use DOTALL option for regex (DEFAULT: don't use)"),
1183 ) 1166 )
1184 flags.add_argument( 1167 flags.add_argument(
1185 "-k", 1168 "-k",
1186 "--only-matching", 1169 "--only-matching",
1187 action="append", 1170 action="append",
1197 self.parser.add_argument( 1180 self.parser.add_argument(
1198 "action", 1181 "action",
1199 default="print", 1182 default="print",
1200 nargs="?", 1183 nargs="?",
1201 choices=("print", "exec", "external"), 1184 choices=("print", "exec", "external"),
1202 help=_(u"action to do on found items (default: print)"), 1185 help=_(u"action to do on found items (DEFAULT: print)"),
1203 ) 1186 )
1204 self.parser.add_argument("command", nargs=argparse.REMAINDER) 1187 self.parser.add_argument("command", nargs=argparse.REMAINDER)
1205 1188
1206 def psItemsGetEb(self, failure_, service, node): 1189 def psItemsGetEb(self, failure_, service, node):
1207 self.disp( 1190 self.disp(
1472 self.args.namespace + [("pubsub", "http://jabber.org/protocol/pubsub")] 1455 self.args.namespace + [("pubsub", "http://jabber.org/protocol/pubsub")]
1473 ) 1456 )
1474 self.getItems(0, self.args.service, self.args.node, self.args.items) 1457 self.getItems(0, self.args.service, self.args.node, self.args.items)
1475 1458
1476 1459
1460 class Transform(base.CommandBase):
1461 def __init__(self, host):
1462 base.CommandBase.__init__(
1463 self,
1464 host,
1465 "transform",
1466 use_pubsub=True,
1467 pubsub_flags={C.NODE, C.MULTI_ITEMS},
1468 help=_(u"modify items of a node using an external command/script"),
1469 )
1470 self.need_loop = True
1471
1472 def add_parser_options(self):
1473 self.parser.add_argument(
1474 "--apply",
1475 action="store_true",
1476 help=_(u"apply transformation (DEFAULT: do a dry run)"),
1477 )
1478 self.parser.add_argument(
1479 "--admin",
1480 action="store_true",
1481 help=_(u"do a pubsub admin request, needed to change publisher"),
1482 )
1483 self.parser.add_argument(
1484 "-I",
1485 "--ignore_errors",
1486 action="store_true",
1487 help=_(
1488 u"if command return a non zero exit code, ignore the item and continue"),
1489 )
1490 self.parser.add_argument(
1491 "-A",
1492 "--all",
1493 action="store_true",
1494 help=_(u"get all items by looping over all pages using RSM")
1495 )
1496 self.parser.add_argument(
1497 "command_path",
1498 help=_(u"path to the command to use. Will be called repetitivly with an "
1499 u"item as input. Output (full item XML) will be used as new one. "
1500 u'Return "DELETE" string to delete the item, and "SKIP" to ignore it'),
1501 )
1502
1503 def psAdminItemsSendCb(self, item_ids, metadata):
1504 self.disp(_(u'items published with ids {item_ids}').format(
1505 item_ids=u', '.join(item_ids)))
1506 if self.args.all:
1507 return self.handleNextPage(metadata)
1508 else:
1509 self.host.quit()
1510
1511 def psItemsSendCb(self, item_id, metadata, show_mess=True):
1512 if show_mess:
1513 self.disp(u'item published with id {item_id}'.format(item_id=item_id))
1514 if self.items_sent == self.items_to_send:
1515 if self.args.all:
1516 return self.handleNextPage(metadata)
1517 self.disp(u'all items published')
1518 self.host.quit()
1519
1520 def psRetractItemCb(self, item_id, metadata):
1521 self.psItemsSendCb(item_id, metadata, show_mess=False)
1522
1523 def handleNextPage(self, metadata):
1524 """Retrieve new page through RSM or quit if we're in the last page
1525
1526 use to handle --all option
1527 @param metadata(dict): metadata as returned by psItemsGet
1528 """
1529 try:
1530 last = metadata[u'rsm_last']
1531 index = int(metadata[u'rsm_index'])
1532 count = int(metadata[u'rsm_count'])
1533 except KeyError:
1534 self.disp(_(u"Can't retrieve all items, RSM metadata not available"),
1535 error=True)
1536 self.host.quit(C.EXIT_MISSING_FEATURE)
1537 except ValueError as e:
1538 self.disp(_(u"Can't retrieve all items, bad RSM metadata: {msg}")
1539 .format(msg=e), error=True)
1540 self.host.quit(C.EXIT_ERROR)
1541
1542 if index + self.args.rsm_max >= count:
1543 self.disp(_(u'All items transformed'))
1544 self.host.quit(0)
1545
1546 self.disp(_(u'Retrieving next page ({page_idx}/{page_total})').format(
1547 page_idx = int(index/self.args.rsm_max) + 1,
1548 page_total = int(count/self.args.rsm_max),
1549 )
1550 )
1551
1552 extra = self.getPubsubExtra()
1553 extra[u'rsm_after'] = last
1554 self.host.bridge.psItemsGet(
1555 self.args.service,
1556 self.args.node,
1557 self.args.rsm_max,
1558 self.args.items,
1559 "",
1560 extra,
1561 self.profile,
1562 callback=self.psItemsGetCb,
1563 errback=partial(
1564 self.errback,
1565 msg=_(u"can't retrieve items: {}"),
1566 exit_code=C.EXIT_BRIDGE_ERRBACK,
1567 ),
1568 )
1569
1570 def psItemsGetCb(self, ps_result):
1571 items, metadata = ps_result
1572 if self.args.admin:
1573 new_items = []
1574 else:
1575 self.items_to_send = len(items)
1576 self.items_sent = 0
1577
1578 for item in items:
1579 if self.check_duplicates:
1580 # this is used when we are not ordering by creation
1581 # to avoid infinite loop
1582 item_elt, __ = xml_tools.etreeParse(self, item)
1583 item_id = item_elt.get('id')
1584 if item_id in self.items_ids:
1585 self.disp(_(
1586 u"Duplicate found on item {item_id}, we have probably handled "
1587 u"all items.").format(item_id=item_id))
1588 self.host.quit()
1589 self.items_ids.append(item_id)
1590
1591 # we launch the command to filter the item
1592 try:
1593 p = subprocess.Popen(self.args.command_path, stdin=subprocess.PIPE,
1594 stdout=subprocess.PIPE)
1595 except OSError as e:
1596 exit_code = C.EXIT_CMD_NOT_FOUND if e.errno == 2 else C.EXIT_ERROR
1597 e = str(e).decode('utf-8', errors="ignore")
1598 self.disp(u"Can't execute the command: {msg}".format(msg=e), error=True)
1599 self.host.quit(exit_code)
1600 cmd_std_out, cmd_std_err = p.communicate(item.encode("utf-8"))
1601 ret = p.wait()
1602 if ret != 0:
1603 self.disp(u"The command returned a non zero status while parsing the "
1604 u"following item:\n\n{item}".format(item=item), error=True)
1605 if self.args.ignore_errors:
1606 if not self.args.admin:
1607 self.items_to_send -= 1
1608 continue
1609 else:
1610 self.host.quit(C.EXIT_CMD_ERROR)
1611 if cmd_std_err is not None:
1612 cmd_std_err = cmd_std_err.decode('utf-8', errors='ignore')
1613 self.disp(cmd_std_err, error=True)
1614 cmd_std_out = cmd_std_out.strip()
1615 if cmd_std_out == "DELETE":
1616 item_elt, __ = xml_tools.etreeParse(self, item)
1617 item_id = item_elt.get('id')
1618 self.disp(_(u"Deleting item {item_id}").format(item_id=item_id))
1619 if self.args.apply:
1620 if not self.args.admin:
1621 # we need to increase the counter as if the item were re-published
1622 self.items_sent += 1
1623 self.host.bridge.psRetractItem(
1624 self.args.service,
1625 self.args.node,
1626 item_id,
1627 False,
1628 self.profile,
1629 callback=partial(self.psRetractItemCb, metadata=metadata),
1630 errback=partial(
1631 self.errback,
1632 msg=_(u"can't delete item: {}"),
1633 exit_code=C.EXIT_BRIDGE_ERRBACK,
1634 ),
1635 )
1636 continue
1637 elif cmd_std_out == "SKIP":
1638 item_elt, __ = xml_tools.etreeParse(self, item)
1639 item_id = item_elt.get('id')
1640 self.disp(_(u"Skipping item {item_id}").format(item_id=item_id))
1641 if self.args.apply:
1642 if not self.args.admin:
1643 # see above
1644 self.items_sent += 1
1645 self.psItemsSendCb(item_id, metadata, show_mess=False)
1646 continue
1647 element, etree = xml_tools.etreeParse(self, cmd_std_out)
1648
1649 # at this point command has been run and we have a etree.Element object
1650 if element.tag not in ("item", "{http://jabber.org/protocol/pubsub}item"):
1651 self.disp(u"your script must return a whole item, this is not:\n{xml}"
1652 .format(xml=etree.tostring(element, encoding="unicode")), error=True)
1653 self.host.quit(C.EXIT_DATA_ERROR)
1654
1655 if not self.args.apply:
1656 # we have a dry run, we just display filtered items
1657 serialised = etree.tostring(element, encoding=u'unicode',
1658 pretty_print=True)
1659 self.disp(serialised)
1660 else:
1661 # we will apply the change, either in admin request or as a simple
1662 # pubsub one
1663 if self.args.admin:
1664 new_items.append(etree.tostring(element, encoding="unicode"))
1665 else:
1666 # there is currently no method to send several items at once
1667 # so we publish them one by one
1668 payload = etree.tostring(xml_tools.getPayload(self, element),
1669 encoding="unicode")
1670 item_id = element.get(u'id', '')
1671 self.host.bridge.psItemSend(
1672 self.args.service,
1673 self.args.node,
1674 payload,
1675 item_id,
1676 {},
1677 self.profile,
1678 callback=partial(self.psItemsSendCb, metadata=metadata),
1679 errback=partial(
1680 self.errback,
1681 msg=_(u"can't send item: {}"),
1682 exit_code=C.EXIT_BRIDGE_ERRBACK,
1683 ),
1684 )
1685 self.items_sent += 1
1686
1687 if not self.args.apply:
1688 # on dry run we have nothing to wait for, we can quit
1689 if self.args.all:
1690 return self.handleNextPage(metadata)
1691 self.host.quit()
1692 elif self.args.admin:
1693 self.host.bridge.psAdminItemsSend(
1694 self.args.service,
1695 self.args.node,
1696 new_items,
1697 u"",
1698 self.profile,
1699 callback=partial(self.psAdminItemsSendCb, metadata=metadata),
1700 errback=partial(
1701 self.errback,
1702 msg=_(u"can't send item: {}"),
1703 exit_code=C.EXIT_BRIDGE_ERRBACK,
1704 ),
1705 )
1706
1707 def start(self):
1708 if self.args.all and self.args.order_by != C.ORDER_BY_CREATION:
1709 self.check_duplicates = True
1710 self.items_ids = []
1711 self.disp(A.color(
1712 A.FG_RED, A.BOLD,
1713 u'/!\\ "--all" should be used with "--order-by creation" /!\\\n',
1714 A.RESET,
1715 u"We'll update items, so order may change during transformation,\n"
1716 u"we'll try to mitigate that by stopping on first duplicate,\n"
1717 u"but this method is not safe, and some items may be missed.\n---\n"))
1718 else:
1719 self.check_duplicates = False
1720 self.host.bridge.psItemsGet(
1721 self.args.service,
1722 self.args.node,
1723 self.args.max,
1724 self.args.items,
1725 "",
1726 self.getPubsubExtra(),
1727 self.profile,
1728 callback=self.psItemsGetCb,
1729 errback=partial(
1730 self.errback,
1731 msg=_(u"can't retrieve items: {}"),
1732 exit_code=C.EXIT_BRIDGE_ERRBACK,
1733 ),
1734 )
1735
1736
1477 class Uri(base.CommandBase): 1737 class Uri(base.CommandBase):
1478 def __init__(self, host): 1738 def __init__(self, host):
1479 base.CommandBase.__init__( 1739 base.CommandBase.__init__(
1480 self, 1740 self,
1481 host, 1741 host,
1602 self.parser.add_argument( 1862 self.parser.add_argument(
1603 "-t", 1863 "-t",
1604 "--type", 1864 "--type",
1605 default=u"", 1865 default=u"",
1606 choices=("", "python", "python_file", "python_code"), 1866 choices=("", "python", "python_file", "python_code"),
1607 help=_(u"hook type to remove, empty to remove all (default: remove all)"), 1867 help=_(u"hook type to remove, empty to remove all (DEFAULT: remove all)"),
1608 ) 1868 )
1609 self.parser.add_argument( 1869 self.parser.add_argument(
1610 "-a", 1870 "-a",
1611 "--arg", 1871 "--arg",
1612 dest="hook_arg", 1872 dest="hook_arg",
1613 type=base.unicode_decoder, 1873 type=base.unicode_decoder,
1614 default=u"", 1874 default=u"",
1615 help=_( 1875 help=_(
1616 u"argument of the hook to remove, empty to remove all (default: remove all)" 1876 u"argument of the hook to remove, empty to remove all (DEFAULT: remove all)"
1617 ), 1877 ),
1618 ) 1878 )
1619 1879
1620 def psHookRemoveCb(self, nb_deleted): 1880 def psHookRemoveCb(self, nb_deleted):
1621 self.disp( 1881 self.disp(
1695 Unsubscribe, 1955 Unsubscribe,
1696 Subscriptions, 1956 Subscriptions,
1697 Node, 1957 Node,
1698 Affiliations, 1958 Affiliations,
1699 Search, 1959 Search,
1960 Transform,
1700 Hook, 1961 Hook,
1701 Uri, 1962 Uri,
1702 ) 1963 )
1703 1964
1704 def __init__(self, host): 1965 def __init__(self, host):