Mercurial > libervia-backend
comparison sat_frontends/jp/cmd_pubsub.py @ 2803:d4a9a60bc650
jp (pubsub/transform): use new psItemsSend method, it is not needed anymore to send items one by one when --admin is not used
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 15 Feb 2019 22:13:43 +0100 |
parents | f61a50790fae |
children | 710de41da2f2 |
comparison
equal
deleted
inserted
replaced
2802:f61a50790fae | 2803:d4a9a60bc650 |
---|---|
1550 help=_(u"path to the command to use. Will be called repetitivly with an " | 1550 help=_(u"path to the command to use. Will be called repetitivly with an " |
1551 u"item as input. Output (full item XML) will be used as new one. " | 1551 u"item as input. Output (full item XML) will be used as new one. " |
1552 u'Return "DELETE" string to delete the item, and "SKIP" to ignore it'), | 1552 u'Return "DELETE" string to delete the item, and "SKIP" to ignore it'), |
1553 ) | 1553 ) |
1554 | 1554 |
1555 def psAdminItemsSendCb(self, item_ids, metadata): | 1555 def psItemsSendCb(self, item_ids, metadata): |
1556 self.disp(_(u'items published with ids {item_ids}').format( | 1556 if item_ids: |
1557 item_ids=u', '.join(item_ids))) | 1557 self.disp(_(u'items published with ids {item_ids}').format( |
1558 item_ids=u', '.join(item_ids))) | |
1559 else: | |
1560 self.disp(_(u'items published')) | |
1558 if self.args.all: | 1561 if self.args.all: |
1559 return self.handleNextPage(metadata) | 1562 return self.handleNextPage(metadata) |
1560 else: | 1563 else: |
1561 self.host.quit() | 1564 self.host.quit() |
1562 | |
1563 def psItemsSendCb(self, item_id, metadata, show_mess=True): | |
1564 if show_mess: | |
1565 self.disp(u'item published with id {item_id}'.format(item_id=item_id)) | |
1566 if self.items_sent == self.items_to_send: | |
1567 if self.args.all: | |
1568 return self.handleNextPage(metadata) | |
1569 self.disp(u'all items published') | |
1570 self.host.quit() | |
1571 | |
1572 def psRetractItemCb(self, item_id, metadata): | |
1573 self.psItemsSendCb(item_id, metadata, show_mess=False) | |
1574 | 1565 |
1575 def handleNextPage(self, metadata): | 1566 def handleNextPage(self, metadata): |
1576 """Retrieve new page through RSM or quit if we're in the last page | 1567 """Retrieve new page through RSM or quit if we're in the last page |
1577 | 1568 |
1578 use to handle --all option | 1569 use to handle --all option |
1619 ), | 1610 ), |
1620 ) | 1611 ) |
1621 | 1612 |
1622 def psItemsGetCb(self, ps_result): | 1613 def psItemsGetCb(self, ps_result): |
1623 items, metadata = ps_result | 1614 items, metadata = ps_result |
1624 if self.args.admin: | 1615 new_items = [] |
1625 new_items = [] | |
1626 else: | |
1627 self.items_to_send = len(items) | |
1628 self.items_sent = 0 | |
1629 | 1616 |
1630 for item in items: | 1617 for item in items: |
1631 if self.check_duplicates: | 1618 if self.check_duplicates: |
1632 # this is used when we are not ordering by creation | 1619 # this is used when we are not ordering by creation |
1633 # to avoid infinite loop | 1620 # to avoid infinite loop |
1653 ret = p.wait() | 1640 ret = p.wait() |
1654 if ret != 0: | 1641 if ret != 0: |
1655 self.disp(u"The command returned a non zero status while parsing the " | 1642 self.disp(u"The command returned a non zero status while parsing the " |
1656 u"following item:\n\n{item}".format(item=item), error=True) | 1643 u"following item:\n\n{item}".format(item=item), error=True) |
1657 if self.args.ignore_errors: | 1644 if self.args.ignore_errors: |
1658 if not self.args.admin: | |
1659 self.items_to_send -= 1 | |
1660 continue | 1645 continue |
1661 else: | 1646 else: |
1662 self.host.quit(C.EXIT_CMD_ERROR) | 1647 self.host.quit(C.EXIT_CMD_ERROR) |
1663 if cmd_std_err is not None: | 1648 if cmd_std_err is not None: |
1664 cmd_std_err = cmd_std_err.decode('utf-8', errors='ignore') | 1649 cmd_std_err = cmd_std_err.decode('utf-8', errors='ignore') |
1667 if cmd_std_out == "DELETE": | 1652 if cmd_std_out == "DELETE": |
1668 item_elt, __ = xml_tools.etreeParse(self, item) | 1653 item_elt, __ = xml_tools.etreeParse(self, item) |
1669 item_id = item_elt.get('id') | 1654 item_id = item_elt.get('id') |
1670 self.disp(_(u"Deleting item {item_id}").format(item_id=item_id)) | 1655 self.disp(_(u"Deleting item {item_id}").format(item_id=item_id)) |
1671 if self.args.apply: | 1656 if self.args.apply: |
1672 if not self.args.admin: | 1657 # FIXME: we don't wait for item to be retracted which can cause |
1673 # we need to increase the counter as if the item were re-published | 1658 # trouble in case of error just before the end of the command |
1674 self.items_sent += 1 | 1659 # (the error message may be missed). |
1660 # Once moved to Python 3, we must wait for it by using a | |
1661 # coroutine. | |
1675 self.host.bridge.psRetractItem( | 1662 self.host.bridge.psRetractItem( |
1676 self.args.service, | 1663 self.args.service, |
1677 self.args.node, | 1664 self.args.node, |
1678 item_id, | 1665 item_id, |
1679 False, | 1666 False, |
1680 self.profile, | 1667 self.profile, |
1681 callback=partial(self.psRetractItemCb, metadata=metadata), | |
1682 errback=partial( | 1668 errback=partial( |
1683 self.errback, | 1669 self.errback, |
1684 msg=_(u"can't delete item: {}"), | 1670 msg=_(u"can't delete item [%s]: {}" % item_id), |
1685 exit_code=C.EXIT_BRIDGE_ERRBACK, | 1671 exit_code=C.EXIT_BRIDGE_ERRBACK, |
1686 ), | 1672 ), |
1687 ) | 1673 ) |
1688 continue | 1674 continue |
1689 elif cmd_std_out == "SKIP": | 1675 elif cmd_std_out == "SKIP": |
1690 item_elt, __ = xml_tools.etreeParse(self, item) | 1676 item_elt, __ = xml_tools.etreeParse(self, item) |
1691 item_id = item_elt.get('id') | 1677 item_id = item_elt.get('id') |
1692 self.disp(_(u"Skipping item {item_id}").format(item_id=item_id)) | 1678 self.disp(_(u"Skipping item {item_id}").format(item_id=item_id)) |
1693 if self.args.apply: | |
1694 if not self.args.admin: | |
1695 # see above | |
1696 self.items_sent += 1 | |
1697 self.psItemsSendCb(item_id, metadata, show_mess=False) | |
1698 continue | 1679 continue |
1699 element, etree = xml_tools.etreeParse(self, cmd_std_out) | 1680 element, etree = xml_tools.etreeParse(self, cmd_std_out) |
1700 | 1681 |
1701 # at this point command has been run and we have a etree.Element object | 1682 # at this point command has been run and we have a etree.Element object |
1702 if element.tag not in ("item", "{http://jabber.org/protocol/pubsub}item"): | 1683 if element.tag not in ("item", "{http://jabber.org/protocol/pubsub}item"): |
1708 # we have a dry run, we just display filtered items | 1689 # we have a dry run, we just display filtered items |
1709 serialised = etree.tostring(element, encoding=u'unicode', | 1690 serialised = etree.tostring(element, encoding=u'unicode', |
1710 pretty_print=True) | 1691 pretty_print=True) |
1711 self.disp(serialised) | 1692 self.disp(serialised) |
1712 else: | 1693 else: |
1713 # we will apply the change, either in admin request or as a simple | 1694 new_items.append(etree.tostring(element, encoding="unicode")) |
1714 # pubsub one | |
1715 if self.args.admin: | |
1716 new_items.append(etree.tostring(element, encoding="unicode")) | |
1717 else: | |
1718 # there is currently no method to send several items at once | |
1719 # so we publish them one by one | |
1720 payload = etree.tostring(xml_tools.getPayload(self, element), | |
1721 encoding="unicode") | |
1722 item_id = element.get(u'id', '') | |
1723 self.host.bridge.psItemSend( | |
1724 self.args.service, | |
1725 self.args.node, | |
1726 payload, | |
1727 item_id, | |
1728 {}, | |
1729 self.profile, | |
1730 callback=partial(self.psItemsSendCb, metadata=metadata), | |
1731 errback=partial( | |
1732 self.errback, | |
1733 msg=_(u"can't send item: {}"), | |
1734 exit_code=C.EXIT_BRIDGE_ERRBACK, | |
1735 ), | |
1736 ) | |
1737 self.items_sent += 1 | |
1738 | 1695 |
1739 if not self.args.apply: | 1696 if not self.args.apply: |
1740 # on dry run we have nothing to wait for, we can quit | 1697 # on dry run we have nothing to wait for, we can quit |
1741 if self.args.all: | 1698 if self.args.all: |
1742 return self.handleNextPage(metadata) | 1699 return self.handleNextPage(metadata) |
1743 self.host.quit() | 1700 self.host.quit() |
1744 elif self.args.admin: | 1701 else: |
1745 self.host.bridge.psAdminItemsSend( | 1702 if self.args.admin: |
1746 self.args.service, | 1703 self.host.bridge.psAdminItemsSend( |
1747 self.args.node, | 1704 self.args.service, |
1748 new_items, | 1705 self.args.node, |
1749 u"", | 1706 new_items, |
1750 self.profile, | 1707 u"", |
1751 callback=partial(self.psAdminItemsSendCb, metadata=metadata), | 1708 self.profile, |
1752 errback=partial( | 1709 callback=partial(self.psItemsSendCb, metadata=metadata), |
1753 self.errback, | 1710 errback=partial( |
1754 msg=_(u"can't send item: {}"), | 1711 self.errback, |
1755 exit_code=C.EXIT_BRIDGE_ERRBACK, | 1712 msg=_(u"can't send item: {}"), |
1756 ), | 1713 exit_code=C.EXIT_BRIDGE_ERRBACK, |
1757 ) | 1714 ), |
1715 ) | |
1716 else: | |
1717 self.host.bridge.psItemsSend( | |
1718 self.args.service, | |
1719 self.args.node, | |
1720 new_items, | |
1721 u"", | |
1722 self.profile, | |
1723 callback=partial(self.psItemsSendCb, metadata=metadata), | |
1724 errback=partial( | |
1725 self.errback, | |
1726 msg=_(u"can't send item: {}"), | |
1727 exit_code=C.EXIT_BRIDGE_ERRBACK, | |
1728 ), | |
1729 ) | |
1758 | 1730 |
1759 def start(self): | 1731 def start(self): |
1760 if self.args.all and self.args.order_by != C.ORDER_BY_CREATION: | 1732 if self.args.all and self.args.order_by != C.ORDER_BY_CREATION: |
1761 self.check_duplicates = True | 1733 self.check_duplicates = True |
1762 self.items_ids = [] | 1734 self.items_ids = [] |