Mercurial > libervia-backend
diff sat_frontends/jp/cmd_pubsub.py @ 3040:fee60f17ebac
jp: jp asyncio port:
/!\ this commit is huge. Jp is temporarily not working with `dbus` bridge /!\
This patch implements the port of jp to asyncio, so it is now correctly using the bridge
asynchronously, and it can be used with bridges like `pb`. This also simplify the code,
notably for things which were previously implemented with many callbacks (like pagination
with RSM).
During the process, some behaviours have been modified/fixed, in jp and backends, check
diff for details.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 25 Sep 2019 08:56:41 +0200 |
parents | ab2696e34d29 |
children | cea52c9ddfd9 |
line wrap: on
line diff
--- a/sat_frontends/jp/cmd_pubsub.py Wed Sep 25 08:53:38 2019 +0200 +++ b/sat_frontends/jp/cmd_pubsub.py Wed Sep 25 08:56:41 2019 +0200 @@ -18,6 +18,12 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. +import argparse +import os.path +import re +import sys +import subprocess +import asyncio from . import base from sat.core.i18n import _ from sat.core import exceptions @@ -29,11 +35,6 @@ from sat.tools.common import uri from sat.tools.common.ansi import ANSI as A from sat_frontends.tools import jid, strings -import argparse -import os.path -import re -import subprocess -import sys __commands__ = ["Pubsub"] @@ -55,7 +56,6 @@ pubsub_flags={C.NODE}, help=_("retrieve node configuration"), ) - self.need_loop = True def add_parser_options(self): self.parser.add_argument( @@ -72,28 +72,23 @@ def filterKey(self, key): return any((key == k or key == "pubsub#" + k) for k in self.args.keys) - def psNodeConfigurationGetCb(self, config_dict): - key_filter = (lambda k: True) if not self.args.keys else self.filterKey - config_dict = { - self.removePrefix(k): v for k, v in config_dict.items() if key_filter(k) - } - self.output(config_dict) - self.host.quit() - - def psNodeConfigurationGetEb(self, failure_): - self.disp( - "can't get node configuration: {reason}".format(reason=failure_), error=True - ) - self.host.quit(C.EXIT_BRIDGE_ERRBACK) - - def start(self): - self.host.bridge.psNodeConfigurationGet( - self.args.service, - self.args.node, - self.profile, - callback=self.psNodeConfigurationGetCb, - errback=self.psNodeConfigurationGetEb, - ) + async def start(self): + try: + config_dict = await self.host.bridge.psNodeConfigurationGet( + self.args.service, + self.args.node, + self.profile, + ) + except Exception as e: + self.disp(f"can't get node configuration: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + key_filter = (lambda k: True) if not self.args.keys else self.filterKey + config_dict = { + self.removePrefix(k): v for k, v in config_dict.items() if key_filter(k) + } + await self.output(config_dict) + self.host.quit() class NodeCreate(base.CommandBase): @@ -108,7 +103,6 @@ use_verbose=True, help=_("create a node"), ) - self.need_loop = True def add_parser_options(self): self.parser.add_argument( @@ -128,35 +122,28 @@ help=_('don\'t prepend "pubsub#" prefix to field names'), ) - def psNodeCreateCb(self, node_id): - if self.host.verbosity: - announce = _("node created successfully: ") - else: - announce = "" - self.disp(announce + node_id) - self.host.quit() - - def psNodeCreateEb(self, failure_): - self.disp("can't create: {reason}".format(reason=failure_), error=True) - self.host.quit(C.EXIT_BRIDGE_ERRBACK) - - def start(self): + async def start(self): if not self.args.full_prefix: options = {"pubsub#" + k: v for k, v in self.args.fields} else: options = dict(self.args.fields) - self.host.bridge.psNodeCreate( - self.args.service, - self.args.node, - options, - self.profile, - callback=self.psNodeCreateCb, - errback=partial( - self.errback, - msg=_("can't create node: {}"), - exit_code=C.EXIT_BRIDGE_ERRBACK, - ), - ) + try: + node_id = await self.host.bridge.psNodeCreate( + self.args.service, + self.args.node, + options, + self.profile, + ) + except Exception as e: + self.disp(msg=_(f"can't create node: {e}"), error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + if self.host.verbosity: + announce = _("node created successfully: ") + else: + announce = "" + self.disp(announce + node_id) + self.host.quit() class NodePurge(base.CommandBase): @@ -169,7 +156,6 @@ pubsub_flags={C.NODE}, help=_("purge a node (i.e. remove all items from it)"), ) - self.need_loop = True def add_parser_options(self): self.parser.add_argument( @@ -179,35 +165,30 @@ help=_("purge node without confirmation"), ) - def psNodePurgeCb(self): - self.disp(_("node [{node}] purged successfully").format(node=self.args.node)) - self.host.quit() - - def start(self): + async def start(self): if not self.args.force: if not self.args.service: - message = _("Are you sure to purge PEP node [{node_id}]? " - "This will delete ALL items from it!").format( - node_id=self.args.node - ) + message = _( + f"Are you sure to purge PEP node [{self.args.node}]? This will " + f"delete ALL items from it!") else: message = _( - "Are you sure to delete node [{node_id}] on service [{service}]? " - "This will delete ALL items from it!" - ).format(node_id=self.args.node, service=self.args.service) - self.host.confirmOrQuit(message, _("node purge cancelled")) + f"Are you sure to delete node [{self.args.node}] on service " + f"[{self.args.service}]? This will delete ALL items from it!") + await self.host.confirmOrQuit(message, _("node purge cancelled")) - self.host.bridge.psNodePurge( - self.args.service, - self.args.node, - self.profile, - callback=self.psNodePurgeCb, - errback=partial( - self.errback, - msg=_("can't purge node: {}"), - exit_code=C.EXIT_BRIDGE_ERRBACK, - ), - ) + try: + await self.host.bridge.psNodePurge( + self.args.service, + self.args.node, + self.profile, + ) + except Exception as e: + self.disp(msg=_(f"can't purge node: {e}"), error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.disp(_(f"node [{self.args.node}] purged successfully")) + self.host.quit() class NodeDelete(base.CommandBase): @@ -220,7 +201,6 @@ pubsub_flags={C.NODE}, help=_("delete a node"), ) - self.need_loop = True def add_parser_options(self): self.parser.add_argument( @@ -230,33 +210,27 @@ help=_("delete node without confirmation"), ) - def psNodeDeleteCb(self): - self.disp(_("node [{node}] deleted successfully").format(node=self.args.node)) - self.host.quit() - - def start(self): + async def start(self): if not self.args.force: if not self.args.service: - message = _("Are you sure to delete PEP node [{node_id}] ?").format( - node_id=self.args.node - ) + message = _(f"Are you sure to delete PEP node [{self.args.node}] ?") else: - message = _( - "Are you sure to delete node [{node_id}] on service [{service}] ?" - ).format(node_id=self.args.node, service=self.args.service) - self.host.confirmOrQuit(message, _("node deletion cancelled")) + message = _(f"Are you sure to delete node [{self.args.node}] on " + f"service [{self.args.service}]?") + await self.host.confirmOrQuit(message, _("node deletion cancelled")) - self.host.bridge.psNodeDelete( - self.args.service, - self.args.node, - self.profile, - callback=self.psNodeDeleteCb, - errback=partial( - self.errback, - msg=_("can't delete node: {}"), - exit_code=C.EXIT_BRIDGE_ERRBACK, - ), - ) + try: + await self.host.bridge.psNodeDelete( + self.args.service, + self.args.node, + self.profile, + ) + except Exception as e: + self.disp(f"can't delete node: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.disp(_(f"node [{self.args.node}] deleted successfully")) + self.host.quit() class NodeSet(base.CommandBase): @@ -271,7 +245,6 @@ use_verbose=True, help=_("set node configuration"), ) - self.need_loop = True def add_parser_options(self): self.parser.add_argument( @@ -284,32 +257,33 @@ metavar=("KEY", "VALUE"), help=_("configuration field to set (required)"), ) - - def psNodeConfigurationSetCb(self): - self.disp(_("node configuration successful"), 1) - self.host.quit() - - def psNodeConfigurationSetEb(self, failure_): - self.disp( - "can't set node configuration: {reason}".format(reason=failure_), error=True + self.parser.add_argument( + "-F", + "--full-prefix", + action="store_true", + help=_('don\'t prepend "pubsub#" prefix to field names'), ) - self.host.quit(C.EXIT_BRIDGE_ERRBACK) def getKeyName(self, k): - if not k.startswith("pubsub#"): + if self.args.full_prefix or k.startswith("pubsub#"): + return k + else: return "pubsub#" + k - else: - return k - def start(self): - self.host.bridge.psNodeConfigurationSet( - self.args.service, - self.args.node, - {self.getKeyName(k): v for k, v in self.args.fields}, - self.profile, - callback=self.psNodeConfigurationSetCb, - errback=self.psNodeConfigurationSetEb, - ) + async def start(self): + try: + await self.host.bridge.psNodeConfigurationSet( + self.args.service, + self.args.node, + {self.getKeyName(k): v for k, v in self.args.fields}, + self.profile, + ) + except Exception as e: + self.disp(f"can't set node configuration: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.disp(_("node configuration successful"), 1) + self.host.quit() class NodeImport(base.CommandBase): @@ -322,7 +296,6 @@ pubsub_flags={C.NODE}, help=_("import raw XML to a node"), ) - self.need_loop = True def add_parser_options(self): self.parser.add_argument( @@ -337,12 +310,7 @@ "whole XML of each item to import."), ) - def psItemsSendCb(self, item_ids): - self.disp(_('items published with id(s) {item_ids}').format( - item_ids=', '.join(item_ids))) - self.host.quit() - - def start(self): + async def start(self): try: element, etree = xml_tools.etreeParse(self, self.args.import_file, reraise=True) @@ -364,38 +332,34 @@ _("You are not using list of pubsub items, we can't import this file"), error=True) self.host.quit(C.EXIT_DATA_ERROR) + return - items = [etree.tostring(i, encoding="utf-8") for i in element] + items = [etree.tostring(i, encoding="unicode") for i in element] if self.args.admin: - self.host.bridge.psAdminItemsSend( + method = self.host.bridge.psAdminItemsSend + else: + self.disp(_("Items are imported without using admin mode, publisher can't " + "be changed")) + method = self.host.bridge.psItemsSend + + try: + items_ids = await method( self.args.service, self.args.node, items, "", self.profile, - callback=partial(self.psItemsSendCb), - errback=partial( - self.errback, - msg=_("can't send item: {}"), - exit_code=C.EXIT_BRIDGE_ERRBACK, - ), ) + except Exception as e: + self.disp(f"can't send items: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) else: - self.disp(_("Items are imported without using admin mode, publisher can't " - "be changed")) - self.host.bridge.psItemsSend( - self.args.service, - self.args.node, - items, - "", - self.profile, - callback=partial(self.psItemsSendCb), - errback=partial( - self.errback, - msg=_("can't send item: {}"), - exit_code=C.EXIT_BRIDGE_ERRBACK, - ), - ) + if items_ids: + self.disp(_('items published with id(s) {items_ids}').format( + items_ids=', '.join(items_ids))) + else: + self.disp(_('items published')) + self.host.quit() class NodeAffiliationsGet(base.CommandBase): @@ -409,29 +373,23 @@ pubsub_flags={C.NODE}, help=_("retrieve node affiliations (for node owner)"), ) - self.need_loop = True def add_parser_options(self): pass - def psNodeAffiliationsGetCb(self, affiliations): - self.output(affiliations) - self.host.quit() - - def psNodeAffiliationsGetEb(self, failure_): - self.disp( - "can't get node affiliations: {reason}".format(reason=failure_), error=True - ) - self.host.quit(C.EXIT_BRIDGE_ERRBACK) - - def start(self): - self.host.bridge.psNodeAffiliationsGet( - self.args.service, - self.args.node, - self.profile, - callback=self.psNodeAffiliationsGetCb, - errback=self.psNodeAffiliationsGetEb, - ) + async def start(self): + try: + affiliations = await self.host.bridge.psNodeAffiliationsGet( + self.args.service, + self.args.node, + self.profile, + ) + except Exception as e: + self.disp(f"can't get node affiliations: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + await self.output(affiliations) + self.host.quit() class NodeAffiliationsSet(base.CommandBase): @@ -445,7 +403,6 @@ use_verbose=True, help=_("set affiliations (for node owner)"), ) - self.need_loop = True def add_parser_options(self): # XXX: we use optional argument syntax for a required one because list of list of 2 elements @@ -461,26 +418,21 @@ help=_("entity/affiliation couple(s)"), ) - def psNodeAffiliationsSetCb(self): - self.disp(_("affiliations have been set"), 1) - self.host.quit() - - def psNodeAffiliationsSetEb(self, failure_): - self.disp( - "can't set node affiliations: {reason}".format(reason=failure_), error=True - ) - self.host.quit(C.EXIT_BRIDGE_ERRBACK) - - def start(self): + async def start(self): affiliations = dict(self.args.affiliations) - self.host.bridge.psNodeAffiliationsSet( - self.args.service, - self.args.node, - affiliations, - self.profile, - callback=self.psNodeAffiliationsSetCb, - errback=self.psNodeAffiliationsSetEb, - ) + try: + await self.host.bridge.psNodeAffiliationsSet( + self.args.service, + self.args.node, + affiliations, + self.profile, + ) + except Exception as e: + self.disp(f"can't set node affiliations: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.disp(_("affiliations have been set"), 1) + self.host.quit() class NodeAffiliations(base.CommandBase): @@ -506,29 +458,23 @@ pubsub_flags={C.NODE}, help=_("retrieve node subscriptions (for node owner)"), ) - self.need_loop = True def add_parser_options(self): pass - def psNodeSubscriptionsGetCb(self, subscriptions): - self.output(subscriptions) - self.host.quit() - - def psNodeSubscriptionsGetEb(self, failure_): - self.disp( - "can't get node subscriptions: {reason}".format(reason=failure_), error=True - ) - self.host.quit(C.EXIT_BRIDGE_ERRBACK) - - def start(self): - self.host.bridge.psNodeSubscriptionsGet( - self.args.service, - self.args.node, - self.profile, - callback=self.psNodeSubscriptionsGetCb, - errback=self.psNodeSubscriptionsGetEb, - ) + async def start(self): + try: + subscriptions = await self.host.bridge.psNodeSubscriptionsGet( + self.args.service, + self.args.node, + self.profile, + ) + except Exception as e: + self.disp(f"can't get node subscriptions: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + await self.output(subscriptions) + self.host.quit() class StoreSubscriptionAction(argparse.Action): @@ -566,7 +512,6 @@ use_verbose=True, help=_("set/modify subscriptions (for node owner)"), ) - self.need_loop = True def add_parser_options(self): # XXX: we use optional argument syntax for a required one because list of list of 2 elements @@ -583,25 +528,20 @@ help=_("entity/subscription couple(s)"), ) - def psNodeSubscriptionsSetCb(self): - self.disp(_("subscriptions have been set"), 1) - self.host.quit() - - def psNodeSubscriptionsSetEb(self, failure_): - self.disp( - "can't set node subscriptions: {reason}".format(reason=failure_), error=True - ) - self.host.quit(C.EXIT_BRIDGE_ERRBACK) - - def start(self): - self.host.bridge.psNodeSubscriptionsSet( - self.args.service, - self.args.node, - self.args.subscriptions, - self.profile, - callback=self.psNodeSubscriptionsSetCb, - errback=self.psNodeSubscriptionsSetEb, - ) + async def start(self): + try: + self.host.bridge.psNodeSubscriptionsSet( + self.args.service, + self.args.node, + self.args.subscriptions, + self.profile, + ) + except Exception as e: + self.disp(f"can't set node subscriptions: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.disp(_("subscriptions have been set"), 1) + self.host.quit() class NodeSubscriptions(base.CommandBase): @@ -627,28 +567,24 @@ use_verbose=True, help=_("set/replace a schema"), ) - self.need_loop = True def add_parser_options(self): self.parser.add_argument("schema", help=_("schema to set (must be XML)")) - def psSchemaSetCb(self): - self.disp(_("schema has been set"), 1) - self.host.quit() - - def start(self): - self.host.bridge.psSchemaSet( - self.args.service, - self.args.node, - self.args.schema, - self.profile, - callback=self.psSchemaSetCb, - errback=partial( - self.errback, - msg=_("can't set schema: {}"), - exit_code=C.EXIT_BRIDGE_ERRBACK, - ), - ) + async def start(self): + try: + await self.host.bridge.psSchemaSet( + self.args.service, + self.args.node, + self.args.schema, + self.profile, + ) + except Exception as e: + self.disp(f"can't set schema: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.disp(_("schema has been set"), 1) + self.host.quit() class NodeSchemaEdit(base.CommandBase, common.BaseEdit): @@ -666,30 +602,26 @@ help=_("edit a schema"), ) common.BaseEdit.__init__(self, self.host, PUBSUB_SCHEMA_TMP_DIR) - self.need_loop = True def add_parser_options(self): pass - def psSchemaSetCb(self): - self.disp(_("schema has been set"), 1) - self.host.quit() + async def publish(self, schema): + try: + await self.host.bridge.psSchemaSet( + self.args.service, + self.args.node, + schema, + self.profile, + ) + except Exception as e: + self.disp(f"can't set schema: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.disp(_("schema has been set"), 1) + self.host.quit() - def publish(self, schema): - self.host.bridge.psSchemaSet( - self.args.service, - self.args.node, - schema, - self.profile, - callback=self.psSchemaSetCb, - errback=partial( - self.errback, - msg=_("can't set schema: {}"), - exit_code=C.EXIT_BRIDGE_ERRBACK, - ), - ) - - def psSchemaGetCb(self, schema): + async def psSchemaGetCb(self, schema): try: from lxml import etree except ImportError: @@ -707,20 +639,20 @@ etree.tostring(schema_elt, encoding="utf-8", pretty_print=True) ) content_file_obj.seek(0) - self.runEditor("pubsub_schema_editor_args", content_file_path, content_file_obj) + await self.runEditor("pubsub_schema_editor_args", content_file_path, content_file_obj) - def start(self): - self.host.bridge.psSchemaGet( - self.args.service, - self.args.node, - self.profile, - callback=self.psSchemaGetCb, - errback=partial( - self.errback, - msg=_("can't edit schema: {}"), - exit_code=C.EXIT_BRIDGE_ERRBACK, - ), - ) + async def start(self): + try: + schema = await self.host.bridge.psSchemaGet( + self.args.service, + self.args.node, + self.profile, + ) + except Exception as e: + self.disp(f"can't edit schema: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + await self.psSchemaGetCb(schema) class NodeSchemaGet(base.CommandBase): @@ -735,30 +667,27 @@ use_verbose=True, help=_("get schema"), ) - self.need_loop = True def add_parser_options(self): pass - def psSchemaGetCb(self, schema): - if not schema: - self.disp(_("no schema found"), 1) - self.host.quit(1) - self.output(schema) - self.host.quit() - - def start(self): - self.host.bridge.psSchemaGet( - self.args.service, - self.args.node, - self.profile, - callback=self.psSchemaGetCb, - errback=partial( - self.errback, - msg=_("can't get schema: {}"), - exit_code=C.EXIT_BRIDGE_ERRBACK, - ), - ) + async def start(self): + try: + schema = await self.host.bridge.psSchemaGet( + self.args.service, + self.args.node, + self.profile, + ) + except Exception as e: + self.disp(f"can't get schema: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + if schema: + await self.output(schema) + self.host.quit() + else: + self.disp(_("no schema found"), 1) + self.host.quit(1) class NodeSchema(base.CommandBase): @@ -799,7 +728,6 @@ pubsub_flags={C.NODE}, help=_("publish a new item or update an existing one"), ) - self.need_loop = True def add_parser_options(self): self.parser.add_argument( @@ -809,32 +737,29 @@ help=_("id, URL of the item to update, keyword, or nothing for new item"), ) - def psItemsSendCb(self, published_id): - if published_id: - self.disp("Item published at {pub_id}".format(pub_id=published_id)) - else: - self.disp("Item published") - self.host.quit(C.EXIT_OK) - - def start(self): + async def start(self): element, etree = xml_tools.etreeParse(self, sys.stdin) element = xml_tools.getPayload(self, element) payload = etree.tostring(element, encoding="unicode") - self.host.bridge.psItemSend( - self.args.service, - self.args.node, - payload, - self.args.item, - {}, - self.profile, - callback=self.psItemsSendCb, - errback=partial( - self.errback, - msg=_("can't send item: {}"), - exit_code=C.EXIT_BRIDGE_ERRBACK, - ), - ) + try: + published_id = await self.host.bridge.psItemSend( + self.args.service, + self.args.node, + payload, + self.args.item, + {}, + self.profile, + ) + except Exception as e: + self.disp(_(f"can't send item: {e}"), error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + if published_id: + self.disp("Item published at {pub_id}".format(pub_id=published_id)) + else: + self.disp("Item published") + self.host.quit(C.EXIT_OK) class Get(base.CommandBase): @@ -848,7 +773,6 @@ pubsub_flags={C.NODE, C.MULTI_ITEMS}, help=_("get pubsub item(s)"), ) - self.need_loop = True def add_parser_options(self): self.parser.add_argument( @@ -860,26 +784,23 @@ # TODO: a key(s) argument to select keys to display # TODO: add MAM filters - def psItemsGetCb(self, ps_result): - self.output(ps_result[0]) - self.host.quit(C.EXIT_OK) - - def psItemsGetEb(self, failure_): - self.disp("can't get pubsub items: {reason}".format(reason=failure_), error=True) - self.host.quit(C.EXIT_BRIDGE_ERRBACK) - - def start(self): - self.host.bridge.psItemsGet( - self.args.service, - self.args.node, - self.args.max, - self.args.items, - self.args.sub_id, - self.getPubsubExtra(), - self.profile, - callback=self.psItemsGetCb, - errback=self.psItemsGetEb, - ) + async def start(self): + try: + ps_result = await self.host.bridge.psItemsGet( + self.args.service, + self.args.node, + self.args.max, + self.args.items, + self.args.sub_id, + self.getPubsubExtra(), + self.profile, + ) + except Exception as e: + self.disp(f"can't get pubsub items: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + await self.output(ps_result[0]) + self.host.quit(C.EXIT_OK) class Delete(base.CommandBase): @@ -892,7 +813,6 @@ pubsub_flags={C.NODE, C.ITEM, C.SINGLE_ITEM}, help=_("delete an item"), ) - self.need_loop = True def add_parser_options(self): self.parser.add_argument( @@ -902,31 +822,28 @@ "-N", "--notify", action="store_true", help=_("notify deletion") ) - def psItemsDeleteCb(self): - self.disp(_("item {item_id} has been deleted").format(item_id=self.args.item)) - self.host.quit(C.EXIT_OK) - - def start(self): + async def start(self): if not self.args.item: self.parser.error(_("You need to specify an item to delete")) if not self.args.force: message = _("Are you sure to delete item {item_id} ?").format( item_id=self.args.item ) - self.host.confirmOrQuit(message, _("item deletion cancelled")) - self.host.bridge.psRetractItem( - self.args.service, - self.args.node, - self.args.item, - self.args.notify, - self.profile, - callback=self.psItemsDeleteCb, - errback=partial( - self.errback, - msg=_("can't delete item: {}"), - exit_code=C.EXIT_BRIDGE_ERRBACK, - ), - ) + await self.host.confirmOrQuit(message, _("item deletion cancelled")) + try: + await self.host.bridge.psRetractItem( + self.args.service, + self.args.node, + self.args.item, + self.args.notify, + self.profile, + ) + except Exception as e: + self.disp(_(f"can't delete item: {e}"), error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.disp(_(f"item {self.args.item} has been deleted")) + self.host.quit(C.EXIT_OK) class Edit(base.CommandBase, common.BaseEdit): @@ -946,12 +863,8 @@ def add_parser_options(self): pass - def edit(self, content_file_path, content_file_obj): - # we launch editor - self.runEditor("pubsub_editor_args", content_file_path, content_file_obj) - - def publish(self, content): - published_id = self.host.bridge.psItemSend( + async def publish(self, content): + published_id = await self.host.bridge.psItemSend( self.pubsub_service, self.pubsub_node, content, @@ -964,7 +877,7 @@ else: self.disp("Item published") - def getItemData(self, service, node, item): + async def getItemData(self, service, node, item): try: from lxml import etree except ImportError: @@ -974,10 +887,10 @@ ) self.host.quit(1) items = [item] if item else [] - item_raw = self.host.bridge.psItemsGet( + item_raw = (await self.host.bridge.psItemsGet( service, node, 1, items, "", {}, self.profile - )[0][0] - parser = etree.XMLParser(remove_blank_text=True) + ))[0][0] + parser = etree.XMLParser(remove_blank_text=True, recover=True) item_elt = etree.fromstring(item_raw, parser) item_id = item_elt.get("id") try: @@ -987,11 +900,14 @@ return "" return etree.tostring(payload, encoding="unicode", pretty_print=True), item_id - def start(self): - self.pubsub_service, self.pubsub_node, self.pubsub_item, content_file_path, content_file_obj = ( - self.getItemPath() - ) - self.edit(content_file_path, content_file_obj) + async def start(self): + (self.pubsub_service, + self.pubsub_node, + self.pubsub_item, + content_file_path, + content_file_obj) = await self.getItemPath() + await self.runEditor("pubsub_editor_args", content_file_path, content_file_obj) + self.host.quit() class Subscribe(base.CommandBase): @@ -1005,34 +921,30 @@ use_verbose=True, help=_("subscribe to a node"), ) - self.need_loop = True def add_parser_options(self): pass - def psSubscribeCb(self, sub_id): - self.disp(_("subscription done"), 1) - if sub_id: - self.disp(_("subscription id: {sub_id}").format(sub_id=sub_id)) - self.host.quit() - - def start(self): - self.host.bridge.psSubscribe( - self.args.service, - self.args.node, - {}, - self.profile, - callback=self.psSubscribeCb, - errback=partial( - self.errback, - msg=_("can't subscribe to node: {}"), - exit_code=C.EXIT_BRIDGE_ERRBACK, - ), - ) + async def start(self): + try: + sub_id = await self.host.bridge.psSubscribe( + self.args.service, + self.args.node, + {}, + self.profile, + ) + except Exception as e: + self.disp(_(f"can't subscribe to node: {e}"), error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.disp(_("subscription done"), 1) + if sub_id: + self.disp(_("subscription id: {sub_id}").format(sub_id=sub_id)) + self.host.quit() class Unsubscribe(base.CommandBase): - # TODO: voir pourquoi NodeNotFound sur subscribe juste après unsubscribe + # FIXME: check why we get a a NodeNotFound on subscribe just after unsubscribe def __init__(self, host): base.CommandBase.__init__( @@ -1044,27 +956,23 @@ use_verbose=True, help=_("unsubscribe from a node"), ) - self.need_loop = True def add_parser_options(self): pass - def psUnsubscribeCb(self): - self.disp(_("subscription removed"), 1) - self.host.quit() - - def start(self): - self.host.bridge.psUnsubscribe( - self.args.service, - self.args.node, - self.profile, - callback=self.psUnsubscribeCb, - errback=partial( - self.errback, - msg=_("can't unsubscribe from node: {}"), - exit_code=C.EXIT_BRIDGE_ERRBACK, - ), - ) + async def start(self): + try: + await self.host.bridge.psUnsubscribe( + self.args.service, + self.args.node, + self.profile, + ) + except Exception as e: + self.disp(_(f"can't unsubscribe from node: {e}"), error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.disp(_("subscription removed"), 1) + self.host.quit() class Subscriptions(base.CommandBase): @@ -1077,27 +985,23 @@ use_pubsub=True, help=_("retrieve all subscriptions on a service"), ) - self.need_loop = True def add_parser_options(self): pass - def psSubscriptionsGetCb(self, subscriptions): - self.output(subscriptions) - self.host.quit() - - def start(self): - self.host.bridge.psSubscriptionsGet( - self.args.service, - self.args.node, - self.profile, - callback=self.psSubscriptionsGetCb, - errback=partial( - self.errback, - msg=_("can't retrieve subscriptions: {}"), - exit_code=C.EXIT_BRIDGE_ERRBACK, - ), - ) + async def start(self): + try: + subscriptions = await self.host.bridge.psSubscriptionsGet( + self.args.service, + self.args.node, + self.profile, + ) + except Exception as e: + self.disp(_(f"can't retrieve subscriptions: {e}"), error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + await self.output(subscriptions) + self.host.quit() class Affiliations(base.CommandBase): @@ -1110,33 +1014,29 @@ use_pubsub=True, help=_("retrieve all affiliations on a service"), ) - self.need_loop = True def add_parser_options(self): pass - def psAffiliationsGetCb(self, affiliations): - self.output(affiliations) - self.host.quit() - - def psAffiliationsGetEb(self, failure_): - self.disp( - "can't get node affiliations: {reason}".format(reason=failure_), error=True - ) - self.host.quit(C.EXIT_BRIDGE_ERRBACK) - - def start(self): - self.host.bridge.psAffiliationsGet( - self.args.service, - self.args.node, - self.profile, - callback=self.psAffiliationsGetCb, - errback=self.psAffiliationsGetEb, - ) + async def start(self): + try: + affiliations = await self.host.bridge.psAffiliationsGet( + self.args.service, + self.args.node, + self.profile, + ) + except Exception as e: + self.disp( + f"can't get node affiliations: {e}", error=True + ) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + await self.output(affiliations) + self.host.quit() class Search(base.CommandBase): - """this command do a search without using MAM + """This command do a search without using MAM This commands checks every items it finds by itself, so it may be heavy in resources both for server and client @@ -1159,7 +1059,6 @@ use_verbose=True, help=_("search items corresponding to filters"), ) - self.need_loop = True @property def etree(self): @@ -1317,30 +1216,26 @@ ) self.parser.add_argument("command", nargs=argparse.REMAINDER) - def psItemsGetEb(self, failure_, service, node): - self.disp( - "can't get pubsub items at {service} (node: {node}): {reason}".format( - service=service, node=node, reason=failure_ - ), - error=True, - ) - self.to_get -= 1 - - def getItems(self, depth, service, node, items): - search = partial(self.search, depth=depth) - errback = partial(self.psItemsGetEb, service=service, node=node) - self.host.bridge.psItemsGet( - service, - node, - self.args.node_max, - items, - "", - self.getPubsubExtra(), - self.profile, - callback=search, - errback=errback, - ) + async def getItems(self, depth, service, node, items): self.to_get += 1 + try: + items_data = await self.host.bridge.psItemsGet( + service, + node, + self.args.node_max, + items, + "", + self.getPubsubExtra(), + self.profile, + ) + except Exception as e: + self.disp( + f"can't get pubsub items at {service} (node: {node}): {e}", + error=True, + ) + self.to_get -= 1 + else: + await self.search(items_data, depth) def _checkPubsubURL(self, match, found_nodes): """check that the matched URL is an xmpp: one @@ -1360,13 +1255,13 @@ found_node["item"] = url_data["item"] found_nodes.append(found_node) - def getSubNodes(self, item, depth): + async def getSubNodes(self, item, depth): """look for pubsub URIs in item, and getItems on the linked nodes""" found_nodes = [] checkURI = partial(self._checkPubsubURL, found_nodes=found_nodes) strings.RE_URL.sub(checkURI, item) for data in found_nodes: - self.getItems( + await self.getItems( depth + 1, data["service"], data["node"], @@ -1452,7 +1347,11 @@ elif type_ == "python": if item_xml is None: item_xml = self.parseXml(item) - cmd_ns = {"item": item, "item_xml": item_xml} + cmd_ns = { + "etree": self.etree, + "item": item, + "item_xml": item_xml + } try: keep = eval(value, cmd_ns) except SyntaxError as e: @@ -1483,7 +1382,7 @@ return True, item - def doItemAction(self, item, metadata): + async def doItemAction(self, item, metadata): """called when item has been kepts and the action need to be done @param item(unicode): accepted item @@ -1491,7 +1390,7 @@ action = self.args.action if action == "print" or self.host.verbosity > 0: try: - self.output(item) + await self.output(item) except self.etree.XMLSyntaxError: # item is not valid XML, but a string # can happen when --only-matching is used @@ -1521,22 +1420,22 @@ 2, ) if action == "exec": - ret = subprocess.call(cmd_args) + p = await asyncio.create_subprocess_exec(*cmd_args) + ret = await p.wait() else: - p = subprocess.Popen(cmd_args, stdin=subprocess.PIPE) - p.communicate(item.encode("utf-8")) - ret = p.wait() + p = await asyncio.create_subprocess_exec(*cmd_args, + stdin=subprocess.PIPE) + await p.communicate(item.encode(sys.getfilesystemencoding())) + ret = p.returncode if ret != 0: self.disp( A.color( C.A_FAILURE, - _("executed command failed with exit code {code}").format( - code=ret - ), + _(f"executed command failed with exit code {ret}"), ) ) - def search(self, items_data, depth): + async def search(self, items_data, depth): """callback of getItems this method filters items, get sub nodes if needed, @@ -1548,11 +1447,11 @@ items, metadata = items_data for item in items: if depth < self.args.max_depth: - self.getSubNodes(item, depth) + await self.getSubNodes(item, depth) keep, item = self.filter(item) if not keep: continue - self.doItemAction(item, metadata) + await self.doItemAction(item, metadata) # we check if we got all getItems results self.to_get -= 1 @@ -1561,7 +1460,7 @@ self.host.quit() assert self.to_get > 0 - def start(self): + async def start(self): if self.args.command: if self.args.action not in self.EXEC_ACTIONS: self.parser.error( @@ -1584,7 +1483,7 @@ self.args.namespace = dict( self.args.namespace + [("pubsub", "http://jabber.org/protocol/pubsub")] ) - self.getItems(0, self.args.service, self.args.node, self.args.items) + await self.getItems(0, self.args.service, self.args.node, self.args.items) class Transform(base.CommandBase): @@ -1597,7 +1496,6 @@ pubsub_flags={C.NODE, C.MULTI_ITEMS}, help=_("modify items of a node using an external command/script"), ) - self.need_loop = True def add_parser_options(self): self.parser.add_argument( @@ -1630,18 +1528,18 @@ 'Return "DELETE" string to delete the item, and "SKIP" to ignore it'), ) - def psItemsSendCb(self, item_ids, metadata): + async def psItemsSendCb(self, item_ids, metadata): if item_ids: self.disp(_('items published with ids {item_ids}').format( item_ids=', '.join(item_ids))) else: self.disp(_('items published')) if self.args.all: - return self.handleNextPage(metadata) + return await self.handleNextPage(metadata) else: self.host.quit() - def handleNextPage(self, metadata): + async def handleNextPage(self, metadata): """Retrieve new page through RSM or quit if we're in the last page use to handle --all option @@ -1672,24 +1570,27 @@ extra = self.getPubsubExtra() extra['rsm_after'] = last - self.host.bridge.psItemsGet( - self.args.service, - self.args.node, - self.args.rsm_max, - self.args.items, - "", - extra, - self.profile, - callback=self.psItemsGetCb, - errback=partial( - self.errback, - msg=_("can't retrieve items: {}"), - exit_code=C.EXIT_BRIDGE_ERRBACK, - ), - ) + try: + ps_result = await self.host.bridge.psItemsGet( + self.args.service, + self.args.node, + self.args.rsm_max, + self.args.items, + "", + extra, + self.profile, + ) + except Exception as e: + self.disp( + f"can't retrieve items: {e}", error=True + ) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + await self.psItemsGetCb(ps_result) - def psItemsGetCb(self, ps_result): + async def psItemsGetCb(self, ps_result): items, metadata = ps_result + encoding = 'utf-8' new_items = [] for item in items: @@ -1707,48 +1608,46 @@ # we launch the command to filter the item try: - p = subprocess.Popen(self.args.command_path, stdin=subprocess.PIPE, - stdout=subprocess.PIPE) + p = await asyncio.create_subprocess_exec( + self.args.command_path, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE) except OSError as e: exit_code = C.EXIT_CMD_NOT_FOUND if e.errno == 2 else C.EXIT_ERROR - e = str(e).decode('utf-8', errors="ignore") - self.disp("Can't execute the command: {msg}".format(msg=e), error=True) + self.disp(f"Can't execute the command: {e}", error=True) self.host.quit(exit_code) - cmd_std_out, cmd_std_err = p.communicate(item.encode("utf-8")) - ret = p.wait() + encoding = "utf-8" + cmd_std_out, cmd_std_err = await p.communicate(item.encode(encoding)) + ret = p.returncode if ret != 0: - self.disp("The command returned a non zero status while parsing the " - "following item:\n\n{item}".format(item=item), error=True) + self.disp(f"The command returned a non zero status while parsing the " + f"following item:\n\n{item}", error=True) if self.args.ignore_errors: continue else: self.host.quit(C.EXIT_CMD_ERROR) if cmd_std_err is not None: - cmd_std_err = cmd_std_err.decode('utf-8', errors='ignore') + cmd_std_err = cmd_std_err.decode(encoding, errors='ignore') self.disp(cmd_std_err, error=True) - cmd_std_out = cmd_std_out.strip() + cmd_std_out = cmd_std_out.decode(encoding).strip() if cmd_std_out == "DELETE": item_elt, __ = xml_tools.etreeParse(self, item) item_id = item_elt.get('id') - self.disp(_("Deleting item {item_id}").format(item_id=item_id)) + self.disp(_(f"Deleting item {item_id}")) if self.args.apply: - # FIXME: we don't wait for item to be retracted which can cause - # trouble in case of error just before the end of the command - # (the error message may be missed). - # Once moved to Python 3, we must wait for it by using a - # coroutine. - self.host.bridge.psRetractItem( - self.args.service, - self.args.node, - item_id, - False, - self.profile, - errback=partial( - self.errback, - msg=_("can't delete item [%s]: {}" % item_id), - exit_code=C.EXIT_BRIDGE_ERRBACK, - ), - ) + try: + await self.host.bridge.psRetractItem( + self.args.service, + self.args.node, + item_id, + False, + self.profile, + ) + except Exception as e: + self.disp( + f"can't delete item {item_id}: {e}", error=True + ) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) continue elif cmd_std_out == "SKIP": item_elt, __ = xml_tools.etreeParse(self, item) @@ -1774,39 +1673,29 @@ if not self.args.apply: # on dry run we have nothing to wait for, we can quit if self.args.all: - return self.handleNextPage(metadata) + return await self.handleNextPage(metadata) self.host.quit() else: if self.args.admin: - self.host.bridge.psAdminItemsSend( + bridge_method = self.host.bridge.psAdminItemsSend + else: + bridge_method = self.host.bridge.psItemsSend + + try: + ps_result = await bridge_method( self.args.service, self.args.node, new_items, "", self.profile, - callback=partial(self.psItemsSendCb, metadata=metadata), - errback=partial( - self.errback, - msg=_("can't send item: {}"), - exit_code=C.EXIT_BRIDGE_ERRBACK, - ), ) + except Exception as e: + self.disp(f"can't send item: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) else: - self.host.bridge.psItemsSend( - self.args.service, - self.args.node, - new_items, - "", - self.profile, - callback=partial(self.psItemsSendCb, metadata=metadata), - errback=partial( - self.errback, - msg=_("can't send item: {}"), - exit_code=C.EXIT_BRIDGE_ERRBACK, - ), - ) + await self.psItemsSendCb(ps_result, metadata=metadata) - def start(self): + async def start(self): if self.args.all and self.args.order_by != C.ORDER_BY_CREATION: self.check_duplicates = True self.items_ids = [] @@ -1819,21 +1708,22 @@ "but this method is not safe, and some items may be missed.\n---\n")) else: self.check_duplicates = False - self.host.bridge.psItemsGet( - self.args.service, - self.args.node, - self.args.max, - self.args.items, - "", - self.getPubsubExtra(), - self.profile, - callback=self.psItemsGetCb, - errback=partial( - self.errback, - msg=_("can't retrieve items: {}"), - exit_code=C.EXIT_BRIDGE_ERRBACK, - ), - ) + + try: + ps_result = await self.host.bridge.psItemsGet( + self.args.service, + self.args.node, + self.args.max, + self.args.items, + "", + self.getPubsubExtra(), + self.profile, + ) + except Exception as e: + self.disp(f"can't retrieve items: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + await self.psItemsGetCb(ps_result) class Uri(base.CommandBase): @@ -1847,7 +1737,6 @@ pubsub_flags={C.NODE, C.SINGLE_ITEM}, help=_("build URI"), ) - self.need_loop = True def add_parser_options(self): self.parser.add_argument( @@ -1871,19 +1760,19 @@ self.disp(uri.buildXMPPUri("pubsub", **uri_args)) self.host.quit() - def start(self): + async def start(self): if not self.args.service: - self.host.bridge.asyncGetParamA( - "JabberID", - "Connection", - profile_key=self.args.profile, - callback=self.display_uri, - errback=partial( - self.errback, - msg=_("can't retrieve jid: {}"), - exit_code=C.EXIT_BRIDGE_ERRBACK, - ), - ) + try: + jid_ = await self.host.bridge.asyncGetParamA( + "JabberID", + "Connection", + profile_key=self.args.profile + ) + except Exception as e: + self.disp(f"can't retrieve jid: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.display_uri(jid_) else: self.display_uri(None) @@ -1898,7 +1787,6 @@ pubsub_flags={C.NODE}, help=_("create a Pubsub hook"), ) - self.need_loop = True def add_parser_options(self): self.parser.add_argument( @@ -1928,22 +1816,22 @@ _("{path} is not a file").format(path=self.args.hook_arg) ) - def start(self): + async def start(self): self.checkArgs(self) - self.host.bridge.psHookAdd( - self.args.service, - self.args.node, - self.args.type, - self.args.hook_arg, - self.args.persistent, - self.profile, - callback=self.host.quit, - errback=partial( - self.errback, - msg=_("can't create hook: {}"), - exit_code=C.EXIT_BRIDGE_ERRBACK, - ), - ) + try: + await self.host.bridge.psHookAdd( + self.args.service, + self.args.node, + self.args.type, + self.args.hook_arg, + self.args.persistent, + self.profile, + ) + except Exception as e: + self.disp(f"can't create hook: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.host.quit() class HookDelete(base.CommandBase): @@ -1956,7 +1844,6 @@ pubsub_flags={C.NODE}, help=_("delete a Pubsub hook"), ) - self.need_loop = True def add_parser_options(self): self.parser.add_argument( @@ -1976,27 +1863,22 @@ ), ) - def psHookRemoveCb(self, nb_deleted): - self.disp( - _("{nb_deleted} hook(s) have been deleted").format(nb_deleted=nb_deleted) - ) - self.host.quit() - - def start(self): + async def start(self): HookCreate.checkArgs(self) - self.host.bridge.psHookRemove( - self.args.service, - self.args.node, - self.args.type, - self.args.hook_arg, - self.profile, - callback=self.psHookRemoveCb, - errback=partial( - self.errback, - msg=_("can't delete hook: {}"), - exit_code=C.EXIT_BRIDGE_ERRBACK, - ), - ) + try: + nb_deleted = await self.host.bridge.psHookRemove( + self.args.service, + self.args.node, + self.args.type, + self.args.hook_arg, + self.profile, + ) + except Exception as e: + self.disp(f"can't delete hook: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.disp(_(f"{nb_deleted} hook(s) have been deleted")) + self.host.quit() class HookList(base.CommandBase): @@ -2008,27 +1890,23 @@ use_output=C.OUTPUT_LIST_DICT, help=_("list hooks of a profile"), ) - self.need_loop = True def add_parser_options(self): pass - def psHookListCb(self, data): - if not data: - self.disp(_("No hook found.")) - self.output(data) - self.host.quit() - - def start(self): - self.host.bridge.psHookList( - self.profile, - callback=self.psHookListCb, - errback=partial( - self.errback, - msg=_("can't list hooks: {}"), - exit_code=C.EXIT_BRIDGE_ERRBACK, - ), - ) + async def start(self): + try: + data = await self.host.bridge.psHookList( + self.profile, + ) + except Exception as e: + self.disp(f"can't list hooks: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + if not data: + self.disp(_("No hook found.")) + await self.output(data) + self.host.quit() class Hook(base.CommandBase):