comparison sat_frontends/jp/cmd_pubsub.py @ 3312:77177b13ff54

plugin XEP-0060: serialise psItemsGet result with data_format
author Goffi <goffi@goffi.org>
date Fri, 17 Jul 2020 12:57:23 +0200
parents 55eeb0dfd313
children 7ebda4b54170
comparison
equal deleted inserted replaced
3311:29f8122f00f3 3312:77177b13ff54
798 #  TODO: a key(s) argument to select keys to display 798 #  TODO: a key(s) argument to select keys to display
799 # TODO: add MAM filters 799 # TODO: add MAM filters
800 800
801 async def start(self): 801 async def start(self):
802 try: 802 try:
803 ps_result = await self.host.bridge.psItemsGet( 803 ps_result = data_format.deserialise(
804 self.args.service, 804 await self.host.bridge.psItemsGet(
805 self.args.node, 805 self.args.service,
806 self.args.max, 806 self.args.node,
807 self.args.items, 807 self.args.max,
808 self.args.sub_id, 808 self.args.items,
809 self.getPubsubExtra(), 809 self.args.sub_id,
810 self.profile, 810 self.getPubsubExtra(),
811 self.profile,
812 )
811 ) 813 )
812 except Exception as e: 814 except Exception as e:
813 self.disp(f"can't get pubsub items: {e}", error=True) 815 self.disp(f"can't get pubsub items: {e}", error=True)
814 self.host.quit(C.EXIT_BRIDGE_ERRBACK) 816 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
815 else: 817 else:
816 await self.output(ps_result[0]) 818 await self.output(ps_result['items'])
817 self.host.quit(C.EXIT_OK) 819 self.host.quit(C.EXIT_OK)
818 820
819 821
820 class Delete(base.CommandBase): 822 class Delete(base.CommandBase):
821 def __init__(self, host): 823 def __init__(self, host):
899 'with "pip install lxml"', 901 'with "pip install lxml"',
900 error=True, 902 error=True,
901 ) 903 )
902 self.host.quit(1) 904 self.host.quit(1)
903 items = [item] if item else [] 905 items = [item] if item else []
904 item_raw = (await self.host.bridge.psItemsGet( 906 ps_result = data_format.deserialise(
905 service, node, 1, items, "", {}, self.profile 907 await self.host.bridge.psItemsGet(
906 ))[0][0] 908 service, node, 1, items, "", {}, self.profile
909 )
910 )
911 item_raw = ps_result['items'][0]
907 parser = etree.XMLParser(remove_blank_text=True, recover=True) 912 parser = etree.XMLParser(remove_blank_text=True, recover=True)
908 item_elt = etree.fromstring(item_raw, parser) 913 item_elt = etree.fromstring(item_raw, parser)
909 item_id = item_elt.get("id") 914 item_id = item_elt.get("id")
910 try: 915 try:
911 payload = item_elt[0] 916 payload = item_elt[0]
1231 self.parser.add_argument("command", nargs=argparse.REMAINDER) 1236 self.parser.add_argument("command", nargs=argparse.REMAINDER)
1232 1237
1233 async def getItems(self, depth, service, node, items): 1238 async def getItems(self, depth, service, node, items):
1234 self.to_get += 1 1239 self.to_get += 1
1235 try: 1240 try:
1236 items_data = await self.host.bridge.psItemsGet( 1241 ps_result = data_format.deserialise(
1237 service, 1242 await self.host.bridge.psItemsGet(
1238 node, 1243 service,
1239 self.args.node_max, 1244 node,
1240 items, 1245 self.args.node_max,
1241 "", 1246 items,
1242 self.getPubsubExtra(), 1247 "",
1243 self.profile, 1248 self.getPubsubExtra(),
1249 self.profile,
1250 )
1244 ) 1251 )
1245 except Exception as e: 1252 except Exception as e:
1246 self.disp( 1253 self.disp(
1247 f"can't get pubsub items at {service} (node: {node}): {e}", 1254 f"can't get pubsub items at {service} (node: {node}): {e}",
1248 error=True, 1255 error=True,
1249 ) 1256 )
1250 self.to_get -= 1 1257 self.to_get -= 1
1251 else: 1258 else:
1252 await self.search(items_data, depth) 1259 await self.search(ps_result, depth)
1253 1260
1254 def _checkPubsubURL(self, match, found_nodes): 1261 def _checkPubsubURL(self, match, found_nodes):
1255 """check that the matched URL is an xmpp: one 1262 """check that the matched URL is an xmpp: one
1256 1263
1257 @param found_nodes(list[unicode]): found_nodes 1264 @param found_nodes(list[unicode]): found_nodes
1447 C.A_FAILURE, 1454 C.A_FAILURE,
1448 _(f"executed command failed with exit code {ret}"), 1455 _(f"executed command failed with exit code {ret}"),
1449 ) 1456 )
1450 ) 1457 )
1451 1458
1452 async def search(self, items_data, depth): 1459 async def search(self, ps_result, depth):
1453 """callback of getItems 1460 """callback of getItems
1454 1461
1455 this method filters items, get sub nodes if needed, 1462 this method filters items, get sub nodes if needed,
1456 do the requested action, and exit the command when everything is done 1463 do the requested action, and exit the command when everything is done
1457 @param items_data(tuple): result of getItems 1464 @param items_data(tuple): result of getItems
1458 @param depth(int): current depth level 1465 @param depth(int): current depth level
1459 0 for first node, 1 for first children, and so on 1466 0 for first node, 1 for first children, and so on
1460 """ 1467 """
1461 items, metadata = items_data 1468 for item in ps_result['items']:
1462 for item in items:
1463 if depth < self.args.max_depth: 1469 if depth < self.args.max_depth:
1464 await self.getSubNodes(item, depth) 1470 await self.getSubNodes(item, depth)
1465 keep, item = self.filter(item) 1471 keep, item = self.filter(item)
1466 if not keep: 1472 if not keep:
1467 continue 1473 continue
1468 await self.doItemAction(item, metadata) 1474 await self.doItemAction(item, ps_result)
1469 1475
1470 #  we check if we got all getItems results 1476 #  we check if we got all getItems results
1471 self.to_get -= 1 1477 self.to_get -= 1
1472 if self.to_get == 0: 1478 if self.to_get == 0:
1473 # yes, we can quit 1479 # yes, we can quit
1558 1564
1559 use to handle --all option 1565 use to handle --all option
1560 @param metadata(dict): metadata as returned by psItemsGet 1566 @param metadata(dict): metadata as returned by psItemsGet
1561 """ 1567 """
1562 try: 1568 try:
1563 last = metadata['rsm_last'] 1569 last = metadata['rsm']['last']
1564 index = int(metadata['rsm_index']) 1570 index = int(metadata['rsm']['index'])
1565 count = int(metadata['rsm_count']) 1571 count = int(metadata['rsm']['count'])
1566 except KeyError: 1572 except KeyError:
1567 self.disp(_("Can't retrieve all items, RSM metadata not available"), 1573 self.disp(_("Can't retrieve all items, RSM metadata not available"),
1568 error=True) 1574 error=True)
1569 self.host.quit(C.EXIT_MISSING_FEATURE) 1575 self.host.quit(C.EXIT_MISSING_FEATURE)
1570 except ValueError as e: 1576 except ValueError as e:
1583 ) 1589 )
1584 1590
1585 extra = self.getPubsubExtra() 1591 extra = self.getPubsubExtra()
1586 extra['rsm_after'] = last 1592 extra['rsm_after'] = last
1587 try: 1593 try:
1588 ps_result = await self.host.bridge.psItemsGet( 1594 ps_result = await data_format.deserialise(
1589 self.args.service, 1595 self.host.bridge.psItemsGet(
1590 self.args.node, 1596 self.args.service,
1591 self.args.rsm_max, 1597 self.args.node,
1592 self.args.items, 1598 self.args.rsm_max,
1593 "", 1599 self.args.items,
1594 extra, 1600 "",
1595 self.profile, 1601 extra,
1602 self.profile,
1603 )
1596 ) 1604 )
1597 except Exception as e: 1605 except Exception as e:
1598 self.disp( 1606 self.disp(
1599 f"can't retrieve items: {e}", error=True 1607 f"can't retrieve items: {e}", error=True
1600 ) 1608 )
1601 self.host.quit(C.EXIT_BRIDGE_ERRBACK) 1609 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
1602 else: 1610 else:
1603 await self.psItemsGetCb(ps_result) 1611 await self.psItemsGetCb(ps_result)
1604 1612
1605 async def psItemsGetCb(self, ps_result): 1613 async def psItemsGetCb(self, ps_result):
1606 items, metadata = ps_result
1607 encoding = 'utf-8' 1614 encoding = 'utf-8'
1608 new_items = [] 1615 new_items = []
1609 1616
1610 for item in items: 1617 for item in ps_result['items']:
1611 if self.check_duplicates: 1618 if self.check_duplicates:
1612 # this is used when we are not ordering by creation 1619 # this is used when we are not ordering by creation
1613 # to avoid infinite loop 1620 # to avoid infinite loop
1614 item_elt, __ = xml_tools.etreeParse(self, item) 1621 item_elt, __ = xml_tools.etreeParse(self, item)
1615 item_id = item_elt.get('id') 1622 item_id = item_elt.get('id')
1685 new_items.append(etree.tostring(element, encoding="unicode")) 1692 new_items.append(etree.tostring(element, encoding="unicode"))
1686 1693
1687 if not self.args.apply: 1694 if not self.args.apply:
1688 # on dry run we have nothing to wait for, we can quit 1695 # on dry run we have nothing to wait for, we can quit
1689 if self.args.all: 1696 if self.args.all:
1690 return await self.handleNextPage(metadata) 1697 return await self.handleNextPage(ps_result)
1691 self.host.quit() 1698 self.host.quit()
1692 else: 1699 else:
1693 if self.args.admin: 1700 if self.args.admin:
1694 bridge_method = self.host.bridge.psAdminItemsSend 1701 bridge_method = self.host.bridge.psAdminItemsSend
1695 else: 1702 else:
1696 bridge_method = self.host.bridge.psItemsSend 1703 bridge_method = self.host.bridge.psItemsSend
1697 1704
1698 try: 1705 try:
1699 ps_result = await bridge_method( 1706 ps_items_send_result = await bridge_method(
1700 self.args.service, 1707 self.args.service,
1701 self.args.node, 1708 self.args.node,
1702 new_items, 1709 new_items,
1703 "", 1710 "",
1704 self.profile, 1711 self.profile,
1705 ) 1712 )
1706 except Exception as e: 1713 except Exception as e:
1707 self.disp(f"can't send item: {e}", error=True) 1714 self.disp(f"can't send item: {e}", error=True)
1708 self.host.quit(C.EXIT_BRIDGE_ERRBACK) 1715 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
1709 else: 1716 else:
1710 await self.psItemsSendCb(ps_result, metadata=metadata) 1717 await self.psItemsSendCb(ps_items_send_result, metadata=ps_result)
1711 1718
1712 async def start(self): 1719 async def start(self):
1713 if self.args.all and self.args.order_by != C.ORDER_BY_CREATION: 1720 if self.args.all and self.args.order_by != C.ORDER_BY_CREATION:
1714 self.check_duplicates = True 1721 self.check_duplicates = True
1715 self.items_ids = [] 1722 self.items_ids = []
1722 "but this method is not safe, and some items may be missed.\n---\n")) 1729 "but this method is not safe, and some items may be missed.\n---\n"))
1723 else: 1730 else:
1724 self.check_duplicates = False 1731 self.check_duplicates = False
1725 1732
1726 try: 1733 try:
1727 ps_result = await self.host.bridge.psItemsGet( 1734 ps_result = data_format.deserialise(
1728 self.args.service, 1735 await self.host.bridge.psItemsGet(
1729 self.args.node, 1736 self.args.service,
1730 self.args.max, 1737 self.args.node,
1731 self.args.items, 1738 self.args.max,
1732 "", 1739 self.args.items,
1733 self.getPubsubExtra(), 1740 "",
1734 self.profile, 1741 self.getPubsubExtra(),
1742 self.profile,
1743 )
1735 ) 1744 )
1736 except Exception as e: 1745 except Exception as e:
1737 self.disp(f"can't retrieve items: {e}", error=True) 1746 self.disp(f"can't retrieve items: {e}", error=True)
1738 self.host.quit(C.EXIT_BRIDGE_ERRBACK) 1747 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
1739 else: 1748 else: