comparison sat_frontends/jp/cmd_pubsub.py @ 2803:d4a9a60bc650

jp (pubsub/transform): use new psItemsSend method, it is not needed anymore to send items one by one when --admin is not used
author Goffi <goffi@goffi.org>
date Fri, 15 Feb 2019 22:13:43 +0100
parents f61a50790fae
children 710de41da2f2
comparison
equal deleted inserted replaced
2802:f61a50790fae 2803:d4a9a60bc650
1550 help=_(u"path to the command to use. Will be called repetitivly with an " 1550 help=_(u"path to the command to use. Will be called repetitivly with an "
1551 u"item as input. Output (full item XML) will be used as new one. " 1551 u"item as input. Output (full item XML) will be used as new one. "
1552 u'Return "DELETE" string to delete the item, and "SKIP" to ignore it'), 1552 u'Return "DELETE" string to delete the item, and "SKIP" to ignore it'),
1553 ) 1553 )
1554 1554
1555 def psAdminItemsSendCb(self, item_ids, metadata): 1555 def psItemsSendCb(self, item_ids, metadata):
1556 self.disp(_(u'items published with ids {item_ids}').format( 1556 if item_ids:
1557 item_ids=u', '.join(item_ids))) 1557 self.disp(_(u'items published with ids {item_ids}').format(
1558 item_ids=u', '.join(item_ids)))
1559 else:
1560 self.disp(_(u'items published'))
1558 if self.args.all: 1561 if self.args.all:
1559 return self.handleNextPage(metadata) 1562 return self.handleNextPage(metadata)
1560 else: 1563 else:
1561 self.host.quit() 1564 self.host.quit()
1562
1563 def psItemsSendCb(self, item_id, metadata, show_mess=True):
1564 if show_mess:
1565 self.disp(u'item published with id {item_id}'.format(item_id=item_id))
1566 if self.items_sent == self.items_to_send:
1567 if self.args.all:
1568 return self.handleNextPage(metadata)
1569 self.disp(u'all items published')
1570 self.host.quit()
1571
1572 def psRetractItemCb(self, item_id, metadata):
1573 self.psItemsSendCb(item_id, metadata, show_mess=False)
1574 1565
1575 def handleNextPage(self, metadata): 1566 def handleNextPage(self, metadata):
1576 """Retrieve new page through RSM or quit if we're in the last page 1567 """Retrieve new page through RSM or quit if we're in the last page
1577 1568
1578 use to handle --all option 1569 use to handle --all option
1619 ), 1610 ),
1620 ) 1611 )
1621 1612
1622 def psItemsGetCb(self, ps_result): 1613 def psItemsGetCb(self, ps_result):
1623 items, metadata = ps_result 1614 items, metadata = ps_result
1624 if self.args.admin: 1615 new_items = []
1625 new_items = []
1626 else:
1627 self.items_to_send = len(items)
1628 self.items_sent = 0
1629 1616
1630 for item in items: 1617 for item in items:
1631 if self.check_duplicates: 1618 if self.check_duplicates:
1632 # this is used when we are not ordering by creation 1619 # this is used when we are not ordering by creation
1633 # to avoid infinite loop 1620 # to avoid infinite loop
1653 ret = p.wait() 1640 ret = p.wait()
1654 if ret != 0: 1641 if ret != 0:
1655 self.disp(u"The command returned a non zero status while parsing the " 1642 self.disp(u"The command returned a non zero status while parsing the "
1656 u"following item:\n\n{item}".format(item=item), error=True) 1643 u"following item:\n\n{item}".format(item=item), error=True)
1657 if self.args.ignore_errors: 1644 if self.args.ignore_errors:
1658 if not self.args.admin:
1659 self.items_to_send -= 1
1660 continue 1645 continue
1661 else: 1646 else:
1662 self.host.quit(C.EXIT_CMD_ERROR) 1647 self.host.quit(C.EXIT_CMD_ERROR)
1663 if cmd_std_err is not None: 1648 if cmd_std_err is not None:
1664 cmd_std_err = cmd_std_err.decode('utf-8', errors='ignore') 1649 cmd_std_err = cmd_std_err.decode('utf-8', errors='ignore')
1667 if cmd_std_out == "DELETE": 1652 if cmd_std_out == "DELETE":
1668 item_elt, __ = xml_tools.etreeParse(self, item) 1653 item_elt, __ = xml_tools.etreeParse(self, item)
1669 item_id = item_elt.get('id') 1654 item_id = item_elt.get('id')
1670 self.disp(_(u"Deleting item {item_id}").format(item_id=item_id)) 1655 self.disp(_(u"Deleting item {item_id}").format(item_id=item_id))
1671 if self.args.apply: 1656 if self.args.apply:
1672 if not self.args.admin: 1657 # FIXME: we don't wait for item to be retracted which can cause
1673 # we need to increase the counter as if the item were re-published 1658 # trouble in case of error just before the end of the command
1674 self.items_sent += 1 1659 # (the error message may be missed).
1660 # Once moved to Python 3, we must wait for it by using a
1661 # coroutine.
1675 self.host.bridge.psRetractItem( 1662 self.host.bridge.psRetractItem(
1676 self.args.service, 1663 self.args.service,
1677 self.args.node, 1664 self.args.node,
1678 item_id, 1665 item_id,
1679 False, 1666 False,
1680 self.profile, 1667 self.profile,
1681 callback=partial(self.psRetractItemCb, metadata=metadata),
1682 errback=partial( 1668 errback=partial(
1683 self.errback, 1669 self.errback,
1684 msg=_(u"can't delete item: {}"), 1670 msg=_(u"can't delete item [%s]: {}" % item_id),
1685 exit_code=C.EXIT_BRIDGE_ERRBACK, 1671 exit_code=C.EXIT_BRIDGE_ERRBACK,
1686 ), 1672 ),
1687 ) 1673 )
1688 continue 1674 continue
1689 elif cmd_std_out == "SKIP": 1675 elif cmd_std_out == "SKIP":
1690 item_elt, __ = xml_tools.etreeParse(self, item) 1676 item_elt, __ = xml_tools.etreeParse(self, item)
1691 item_id = item_elt.get('id') 1677 item_id = item_elt.get('id')
1692 self.disp(_(u"Skipping item {item_id}").format(item_id=item_id)) 1678 self.disp(_(u"Skipping item {item_id}").format(item_id=item_id))
1693 if self.args.apply:
1694 if not self.args.admin:
1695 # see above
1696 self.items_sent += 1
1697 self.psItemsSendCb(item_id, metadata, show_mess=False)
1698 continue 1679 continue
1699 element, etree = xml_tools.etreeParse(self, cmd_std_out) 1680 element, etree = xml_tools.etreeParse(self, cmd_std_out)
1700 1681
1701 # at this point command has been run and we have a etree.Element object 1682 # at this point command has been run and we have a etree.Element object
1702 if element.tag not in ("item", "{http://jabber.org/protocol/pubsub}item"): 1683 if element.tag not in ("item", "{http://jabber.org/protocol/pubsub}item"):
1708 # we have a dry run, we just display filtered items 1689 # we have a dry run, we just display filtered items
1709 serialised = etree.tostring(element, encoding=u'unicode', 1690 serialised = etree.tostring(element, encoding=u'unicode',
1710 pretty_print=True) 1691 pretty_print=True)
1711 self.disp(serialised) 1692 self.disp(serialised)
1712 else: 1693 else:
1713 # we will apply the change, either in admin request or as a simple 1694 new_items.append(etree.tostring(element, encoding="unicode"))
1714 # pubsub one
1715 if self.args.admin:
1716 new_items.append(etree.tostring(element, encoding="unicode"))
1717 else:
1718 # there is currently no method to send several items at once
1719 # so we publish them one by one
1720 payload = etree.tostring(xml_tools.getPayload(self, element),
1721 encoding="unicode")
1722 item_id = element.get(u'id', '')
1723 self.host.bridge.psItemSend(
1724 self.args.service,
1725 self.args.node,
1726 payload,
1727 item_id,
1728 {},
1729 self.profile,
1730 callback=partial(self.psItemsSendCb, metadata=metadata),
1731 errback=partial(
1732 self.errback,
1733 msg=_(u"can't send item: {}"),
1734 exit_code=C.EXIT_BRIDGE_ERRBACK,
1735 ),
1736 )
1737 self.items_sent += 1
1738 1695
1739 if not self.args.apply: 1696 if not self.args.apply:
1740 # on dry run we have nothing to wait for, we can quit 1697 # on dry run we have nothing to wait for, we can quit
1741 if self.args.all: 1698 if self.args.all:
1742 return self.handleNextPage(metadata) 1699 return self.handleNextPage(metadata)
1743 self.host.quit() 1700 self.host.quit()
1744 elif self.args.admin: 1701 else:
1745 self.host.bridge.psAdminItemsSend( 1702 if self.args.admin:
1746 self.args.service, 1703 self.host.bridge.psAdminItemsSend(
1747 self.args.node, 1704 self.args.service,
1748 new_items, 1705 self.args.node,
1749 u"", 1706 new_items,
1750 self.profile, 1707 u"",
1751 callback=partial(self.psAdminItemsSendCb, metadata=metadata), 1708 self.profile,
1752 errback=partial( 1709 callback=partial(self.psItemsSendCb, metadata=metadata),
1753 self.errback, 1710 errback=partial(
1754 msg=_(u"can't send item: {}"), 1711 self.errback,
1755 exit_code=C.EXIT_BRIDGE_ERRBACK, 1712 msg=_(u"can't send item: {}"),
1756 ), 1713 exit_code=C.EXIT_BRIDGE_ERRBACK,
1757 ) 1714 ),
1715 )
1716 else:
1717 self.host.bridge.psItemsSend(
1718 self.args.service,
1719 self.args.node,
1720 new_items,
1721 u"",
1722 self.profile,
1723 callback=partial(self.psItemsSendCb, metadata=metadata),
1724 errback=partial(
1725 self.errback,
1726 msg=_(u"can't send item: {}"),
1727 exit_code=C.EXIT_BRIDGE_ERRBACK,
1728 ),
1729 )
1758 1730
1759 def start(self): 1731 def start(self):
1760 if self.args.all and self.args.order_by != C.ORDER_BY_CREATION: 1732 if self.args.all and self.args.order_by != C.ORDER_BY_CREATION:
1761 self.check_duplicates = True 1733 self.check_duplicates = True
1762 self.items_ids = [] 1734 self.items_ids = []