Mercurial > libervia-backend
comparison sat_frontends/jp/cmd_pubsub.py @ 3312:77177b13ff54
plugin XEP-0060: serialise psItemsGet result with data_format
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 17 Jul 2020 12:57:23 +0200 |
parents | 55eeb0dfd313 |
children | 7ebda4b54170 |
comparison
equal
deleted
inserted
replaced
3311:29f8122f00f3 | 3312:77177b13ff54 |
---|---|
798 # TODO: a key(s) argument to select keys to display | 798 # TODO: a key(s) argument to select keys to display |
799 # TODO: add MAM filters | 799 # TODO: add MAM filters |
800 | 800 |
801 async def start(self): | 801 async def start(self): |
802 try: | 802 try: |
803 ps_result = await self.host.bridge.psItemsGet( | 803 ps_result = data_format.deserialise( |
804 self.args.service, | 804 await self.host.bridge.psItemsGet( |
805 self.args.node, | 805 self.args.service, |
806 self.args.max, | 806 self.args.node, |
807 self.args.items, | 807 self.args.max, |
808 self.args.sub_id, | 808 self.args.items, |
809 self.getPubsubExtra(), | 809 self.args.sub_id, |
810 self.profile, | 810 self.getPubsubExtra(), |
811 self.profile, | |
812 ) | |
811 ) | 813 ) |
812 except Exception as e: | 814 except Exception as e: |
813 self.disp(f"can't get pubsub items: {e}", error=True) | 815 self.disp(f"can't get pubsub items: {e}", error=True) |
814 self.host.quit(C.EXIT_BRIDGE_ERRBACK) | 816 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
815 else: | 817 else: |
816 await self.output(ps_result[0]) | 818 await self.output(ps_result['items']) |
817 self.host.quit(C.EXIT_OK) | 819 self.host.quit(C.EXIT_OK) |
818 | 820 |
819 | 821 |
820 class Delete(base.CommandBase): | 822 class Delete(base.CommandBase): |
821 def __init__(self, host): | 823 def __init__(self, host): |
899 'with "pip install lxml"', | 901 'with "pip install lxml"', |
900 error=True, | 902 error=True, |
901 ) | 903 ) |
902 self.host.quit(1) | 904 self.host.quit(1) |
903 items = [item] if item else [] | 905 items = [item] if item else [] |
904 item_raw = (await self.host.bridge.psItemsGet( | 906 ps_result = data_format.deserialise( |
905 service, node, 1, items, "", {}, self.profile | 907 await self.host.bridge.psItemsGet( |
906 ))[0][0] | 908 service, node, 1, items, "", {}, self.profile |
909 ) | |
910 ) | |
911 item_raw = ps_result['items'][0] | |
907 parser = etree.XMLParser(remove_blank_text=True, recover=True) | 912 parser = etree.XMLParser(remove_blank_text=True, recover=True) |
908 item_elt = etree.fromstring(item_raw, parser) | 913 item_elt = etree.fromstring(item_raw, parser) |
909 item_id = item_elt.get("id") | 914 item_id = item_elt.get("id") |
910 try: | 915 try: |
911 payload = item_elt[0] | 916 payload = item_elt[0] |
1231 self.parser.add_argument("command", nargs=argparse.REMAINDER) | 1236 self.parser.add_argument("command", nargs=argparse.REMAINDER) |
1232 | 1237 |
1233 async def getItems(self, depth, service, node, items): | 1238 async def getItems(self, depth, service, node, items): |
1234 self.to_get += 1 | 1239 self.to_get += 1 |
1235 try: | 1240 try: |
1236 items_data = await self.host.bridge.psItemsGet( | 1241 ps_result = data_format.deserialise( |
1237 service, | 1242 await self.host.bridge.psItemsGet( |
1238 node, | 1243 service, |
1239 self.args.node_max, | 1244 node, |
1240 items, | 1245 self.args.node_max, |
1241 "", | 1246 items, |
1242 self.getPubsubExtra(), | 1247 "", |
1243 self.profile, | 1248 self.getPubsubExtra(), |
1249 self.profile, | |
1250 ) | |
1244 ) | 1251 ) |
1245 except Exception as e: | 1252 except Exception as e: |
1246 self.disp( | 1253 self.disp( |
1247 f"can't get pubsub items at {service} (node: {node}): {e}", | 1254 f"can't get pubsub items at {service} (node: {node}): {e}", |
1248 error=True, | 1255 error=True, |
1249 ) | 1256 ) |
1250 self.to_get -= 1 | 1257 self.to_get -= 1 |
1251 else: | 1258 else: |
1252 await self.search(items_data, depth) | 1259 await self.search(ps_result, depth) |
1253 | 1260 |
1254 def _checkPubsubURL(self, match, found_nodes): | 1261 def _checkPubsubURL(self, match, found_nodes): |
1255 """check that the matched URL is an xmpp: one | 1262 """check that the matched URL is an xmpp: one |
1256 | 1263 |
1257 @param found_nodes(list[unicode]): found_nodes | 1264 @param found_nodes(list[unicode]): found_nodes |
1447 C.A_FAILURE, | 1454 C.A_FAILURE, |
1448 _(f"executed command failed with exit code {ret}"), | 1455 _(f"executed command failed with exit code {ret}"), |
1449 ) | 1456 ) |
1450 ) | 1457 ) |
1451 | 1458 |
1452 async def search(self, items_data, depth): | 1459 async def search(self, ps_result, depth): |
1453 """callback of getItems | 1460 """callback of getItems |
1454 | 1461 |
1455 this method filters items, get sub nodes if needed, | 1462 this method filters items, get sub nodes if needed, |
1456 do the requested action, and exit the command when everything is done | 1463 do the requested action, and exit the command when everything is done |
1457 @param items_data(tuple): result of getItems | 1464 @param items_data(tuple): result of getItems |
1458 @param depth(int): current depth level | 1465 @param depth(int): current depth level |
1459 0 for first node, 1 for first children, and so on | 1466 0 for first node, 1 for first children, and so on |
1460 """ | 1467 """ |
1461 items, metadata = items_data | 1468 for item in ps_result['items']: |
1462 for item in items: | |
1463 if depth < self.args.max_depth: | 1469 if depth < self.args.max_depth: |
1464 await self.getSubNodes(item, depth) | 1470 await self.getSubNodes(item, depth) |
1465 keep, item = self.filter(item) | 1471 keep, item = self.filter(item) |
1466 if not keep: | 1472 if not keep: |
1467 continue | 1473 continue |
1468 await self.doItemAction(item, metadata) | 1474 await self.doItemAction(item, ps_result) |
1469 | 1475 |
1470 # we check if we got all getItems results | 1476 # we check if we got all getItems results |
1471 self.to_get -= 1 | 1477 self.to_get -= 1 |
1472 if self.to_get == 0: | 1478 if self.to_get == 0: |
1473 # yes, we can quit | 1479 # yes, we can quit |
1558 | 1564 |
1559 use to handle --all option | 1565 use to handle --all option |
1560 @param metadata(dict): metadata as returned by psItemsGet | 1566 @param metadata(dict): metadata as returned by psItemsGet |
1561 """ | 1567 """ |
1562 try: | 1568 try: |
1563 last = metadata['rsm_last'] | 1569 last = metadata['rsm']['last'] |
1564 index = int(metadata['rsm_index']) | 1570 index = int(metadata['rsm']['index']) |
1565 count = int(metadata['rsm_count']) | 1571 count = int(metadata['rsm']['count']) |
1566 except KeyError: | 1572 except KeyError: |
1567 self.disp(_("Can't retrieve all items, RSM metadata not available"), | 1573 self.disp(_("Can't retrieve all items, RSM metadata not available"), |
1568 error=True) | 1574 error=True) |
1569 self.host.quit(C.EXIT_MISSING_FEATURE) | 1575 self.host.quit(C.EXIT_MISSING_FEATURE) |
1570 except ValueError as e: | 1576 except ValueError as e: |
1583 ) | 1589 ) |
1584 | 1590 |
1585 extra = self.getPubsubExtra() | 1591 extra = self.getPubsubExtra() |
1586 extra['rsm_after'] = last | 1592 extra['rsm_after'] = last |
1587 try: | 1593 try: |
1588 ps_result = await self.host.bridge.psItemsGet( | 1594 ps_result = await data_format.deserialise( |
1589 self.args.service, | 1595 self.host.bridge.psItemsGet( |
1590 self.args.node, | 1596 self.args.service, |
1591 self.args.rsm_max, | 1597 self.args.node, |
1592 self.args.items, | 1598 self.args.rsm_max, |
1593 "", | 1599 self.args.items, |
1594 extra, | 1600 "", |
1595 self.profile, | 1601 extra, |
1602 self.profile, | |
1603 ) | |
1596 ) | 1604 ) |
1597 except Exception as e: | 1605 except Exception as e: |
1598 self.disp( | 1606 self.disp( |
1599 f"can't retrieve items: {e}", error=True | 1607 f"can't retrieve items: {e}", error=True |
1600 ) | 1608 ) |
1601 self.host.quit(C.EXIT_BRIDGE_ERRBACK) | 1609 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
1602 else: | 1610 else: |
1603 await self.psItemsGetCb(ps_result) | 1611 await self.psItemsGetCb(ps_result) |
1604 | 1612 |
1605 async def psItemsGetCb(self, ps_result): | 1613 async def psItemsGetCb(self, ps_result): |
1606 items, metadata = ps_result | |
1607 encoding = 'utf-8' | 1614 encoding = 'utf-8' |
1608 new_items = [] | 1615 new_items = [] |
1609 | 1616 |
1610 for item in items: | 1617 for item in ps_result['items']: |
1611 if self.check_duplicates: | 1618 if self.check_duplicates: |
1612 # this is used when we are not ordering by creation | 1619 # this is used when we are not ordering by creation |
1613 # to avoid infinite loop | 1620 # to avoid infinite loop |
1614 item_elt, __ = xml_tools.etreeParse(self, item) | 1621 item_elt, __ = xml_tools.etreeParse(self, item) |
1615 item_id = item_elt.get('id') | 1622 item_id = item_elt.get('id') |
1685 new_items.append(etree.tostring(element, encoding="unicode")) | 1692 new_items.append(etree.tostring(element, encoding="unicode")) |
1686 | 1693 |
1687 if not self.args.apply: | 1694 if not self.args.apply: |
1688 # on dry run we have nothing to wait for, we can quit | 1695 # on dry run we have nothing to wait for, we can quit |
1689 if self.args.all: | 1696 if self.args.all: |
1690 return await self.handleNextPage(metadata) | 1697 return await self.handleNextPage(ps_result) |
1691 self.host.quit() | 1698 self.host.quit() |
1692 else: | 1699 else: |
1693 if self.args.admin: | 1700 if self.args.admin: |
1694 bridge_method = self.host.bridge.psAdminItemsSend | 1701 bridge_method = self.host.bridge.psAdminItemsSend |
1695 else: | 1702 else: |
1696 bridge_method = self.host.bridge.psItemsSend | 1703 bridge_method = self.host.bridge.psItemsSend |
1697 | 1704 |
1698 try: | 1705 try: |
1699 ps_result = await bridge_method( | 1706 ps_items_send_result = await bridge_method( |
1700 self.args.service, | 1707 self.args.service, |
1701 self.args.node, | 1708 self.args.node, |
1702 new_items, | 1709 new_items, |
1703 "", | 1710 "", |
1704 self.profile, | 1711 self.profile, |
1705 ) | 1712 ) |
1706 except Exception as e: | 1713 except Exception as e: |
1707 self.disp(f"can't send item: {e}", error=True) | 1714 self.disp(f"can't send item: {e}", error=True) |
1708 self.host.quit(C.EXIT_BRIDGE_ERRBACK) | 1715 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
1709 else: | 1716 else: |
1710 await self.psItemsSendCb(ps_result, metadata=metadata) | 1717 await self.psItemsSendCb(ps_items_send_result, metadata=ps_result) |
1711 | 1718 |
1712 async def start(self): | 1719 async def start(self): |
1713 if self.args.all and self.args.order_by != C.ORDER_BY_CREATION: | 1720 if self.args.all and self.args.order_by != C.ORDER_BY_CREATION: |
1714 self.check_duplicates = True | 1721 self.check_duplicates = True |
1715 self.items_ids = [] | 1722 self.items_ids = [] |
1722 "but this method is not safe, and some items may be missed.\n---\n")) | 1729 "but this method is not safe, and some items may be missed.\n---\n")) |
1723 else: | 1730 else: |
1724 self.check_duplicates = False | 1731 self.check_duplicates = False |
1725 | 1732 |
1726 try: | 1733 try: |
1727 ps_result = await self.host.bridge.psItemsGet( | 1734 ps_result = data_format.deserialise( |
1728 self.args.service, | 1735 await self.host.bridge.psItemsGet( |
1729 self.args.node, | 1736 self.args.service, |
1730 self.args.max, | 1737 self.args.node, |
1731 self.args.items, | 1738 self.args.max, |
1732 "", | 1739 self.args.items, |
1733 self.getPubsubExtra(), | 1740 "", |
1734 self.profile, | 1741 self.getPubsubExtra(), |
1742 self.profile, | |
1743 ) | |
1735 ) | 1744 ) |
1736 except Exception as e: | 1745 except Exception as e: |
1737 self.disp(f"can't retrieve items: {e}", error=True) | 1746 self.disp(f"can't retrieve items: {e}", error=True) |
1738 self.host.quit(C.EXIT_BRIDGE_ERRBACK) | 1747 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
1739 else: | 1748 else: |