diff sat_frontends/jp/cmd_pubsub.py @ 3040:fee60f17ebac

jp: jp asyncio port: /!\ this commit is huge. Jp is temporarily not working with `dbus` bridge /!\ This patch implements the port of jp to asyncio, so it is now correctly using the bridge asynchronously, and it can be used with bridges like `pb`. This also simplify the code, notably for things which were previously implemented with many callbacks (like pagination with RSM). During the process, some behaviours have been modified/fixed, in jp and backends, check diff for details.
author Goffi <goffi@goffi.org>
date Wed, 25 Sep 2019 08:56:41 +0200
parents ab2696e34d29
children cea52c9ddfd9
line wrap: on
line diff
--- a/sat_frontends/jp/cmd_pubsub.py	Wed Sep 25 08:53:38 2019 +0200
+++ b/sat_frontends/jp/cmd_pubsub.py	Wed Sep 25 08:56:41 2019 +0200
@@ -18,6 +18,12 @@
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 
+import argparse
+import os.path
+import re
+import sys
+import subprocess
+import asyncio
 from . import base
 from sat.core.i18n import _
 from sat.core import exceptions
@@ -29,11 +35,6 @@
 from sat.tools.common import uri
 from sat.tools.common.ansi import ANSI as A
 from sat_frontends.tools import jid, strings
-import argparse
-import os.path
-import re
-import subprocess
-import sys
 
 __commands__ = ["Pubsub"]
 
@@ -55,7 +56,6 @@
             pubsub_flags={C.NODE},
             help=_("retrieve node configuration"),
         )
-        self.need_loop = True
 
     def add_parser_options(self):
         self.parser.add_argument(
@@ -72,28 +72,23 @@
     def filterKey(self, key):
         return any((key == k or key == "pubsub#" + k) for k in self.args.keys)
 
-    def psNodeConfigurationGetCb(self, config_dict):
-        key_filter = (lambda k: True) if not self.args.keys else self.filterKey
-        config_dict = {
-            self.removePrefix(k): v for k, v in config_dict.items() if key_filter(k)
-        }
-        self.output(config_dict)
-        self.host.quit()
-
-    def psNodeConfigurationGetEb(self, failure_):
-        self.disp(
-            "can't get node configuration: {reason}".format(reason=failure_), error=True
-        )
-        self.host.quit(C.EXIT_BRIDGE_ERRBACK)
-
-    def start(self):
-        self.host.bridge.psNodeConfigurationGet(
-            self.args.service,
-            self.args.node,
-            self.profile,
-            callback=self.psNodeConfigurationGetCb,
-            errback=self.psNodeConfigurationGetEb,
-        )
+    async def start(self):
+        try:
+            config_dict = await self.host.bridge.psNodeConfigurationGet(
+                self.args.service,
+                self.args.node,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't get node configuration: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            key_filter = (lambda k: True) if not self.args.keys else self.filterKey
+            config_dict = {
+                self.removePrefix(k): v for k, v in config_dict.items() if key_filter(k)
+            }
+            await self.output(config_dict)
+            self.host.quit()
 
 
 class NodeCreate(base.CommandBase):
@@ -108,7 +103,6 @@
             use_verbose=True,
             help=_("create a node"),
         )
-        self.need_loop = True
 
     def add_parser_options(self):
         self.parser.add_argument(
@@ -128,35 +122,28 @@
             help=_('don\'t prepend "pubsub#" prefix to field names'),
         )
 
-    def psNodeCreateCb(self, node_id):
-        if self.host.verbosity:
-            announce = _("node created successfully: ")
-        else:
-            announce = ""
-        self.disp(announce + node_id)
-        self.host.quit()
-
-    def psNodeCreateEb(self, failure_):
-        self.disp("can't create: {reason}".format(reason=failure_), error=True)
-        self.host.quit(C.EXIT_BRIDGE_ERRBACK)
-
-    def start(self):
+    async def start(self):
         if not self.args.full_prefix:
             options = {"pubsub#" + k: v for k, v in self.args.fields}
         else:
             options = dict(self.args.fields)
-        self.host.bridge.psNodeCreate(
-            self.args.service,
-            self.args.node,
-            options,
-            self.profile,
-            callback=self.psNodeCreateCb,
-            errback=partial(
-                self.errback,
-                msg=_("can't create node: {}"),
-                exit_code=C.EXIT_BRIDGE_ERRBACK,
-            ),
-        )
+        try:
+            node_id = await self.host.bridge.psNodeCreate(
+                self.args.service,
+                self.args.node,
+                options,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(msg=_(f"can't create node: {e}"), error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            if self.host.verbosity:
+                announce = _("node created successfully: ")
+            else:
+                announce = ""
+            self.disp(announce + node_id)
+            self.host.quit()
 
 
 class NodePurge(base.CommandBase):
@@ -169,7 +156,6 @@
             pubsub_flags={C.NODE},
             help=_("purge a node (i.e. remove all items from it)"),
         )
-        self.need_loop = True
 
     def add_parser_options(self):
         self.parser.add_argument(
@@ -179,35 +165,30 @@
             help=_("purge node without confirmation"),
         )
 
-    def psNodePurgeCb(self):
-        self.disp(_("node [{node}] purged successfully").format(node=self.args.node))
-        self.host.quit()
-
-    def start(self):
+    async def start(self):
         if not self.args.force:
             if not self.args.service:
-                message = _("Are you sure to purge PEP node [{node_id}]? "
-                            "This will delete ALL items from it!").format(
-                    node_id=self.args.node
-                )
+                message = _(
+                    f"Are you sure to purge PEP node [{self.args.node}]? This will "
+                    f"delete ALL items from it!")
             else:
                 message = _(
-                    "Are you sure to delete node [{node_id}] on service [{service}]? "
-                    "This will delete ALL items from it!"
-                ).format(node_id=self.args.node, service=self.args.service)
-            self.host.confirmOrQuit(message, _("node purge cancelled"))
+                    f"Are you sure to delete node [{self.args.node}] on service "
+                    f"[{self.args.service}]? This will delete ALL items from it!")
+            await self.host.confirmOrQuit(message, _("node purge cancelled"))
 
-        self.host.bridge.psNodePurge(
-            self.args.service,
-            self.args.node,
-            self.profile,
-            callback=self.psNodePurgeCb,
-            errback=partial(
-                self.errback,
-                msg=_("can't purge node: {}"),
-                exit_code=C.EXIT_BRIDGE_ERRBACK,
-            ),
-        )
+        try:
+            await self.host.bridge.psNodePurge(
+                self.args.service,
+                self.args.node,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(msg=_(f"can't purge node: {e}"), error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.disp(_(f"node [{self.args.node}] purged successfully"))
+            self.host.quit()
 
 
 class NodeDelete(base.CommandBase):
@@ -220,7 +201,6 @@
             pubsub_flags={C.NODE},
             help=_("delete a node"),
         )
-        self.need_loop = True
 
     def add_parser_options(self):
         self.parser.add_argument(
@@ -230,33 +210,27 @@
             help=_("delete node without confirmation"),
         )
 
-    def psNodeDeleteCb(self):
-        self.disp(_("node [{node}] deleted successfully").format(node=self.args.node))
-        self.host.quit()
-
-    def start(self):
+    async def start(self):
         if not self.args.force:
             if not self.args.service:
-                message = _("Are you sure to delete PEP node [{node_id}] ?").format(
-                    node_id=self.args.node
-                )
+                message = _(f"Are you sure to delete PEP node [{self.args.node}] ?")
             else:
-                message = _(
-                    "Are you sure to delete node [{node_id}] on service [{service}] ?"
-                ).format(node_id=self.args.node, service=self.args.service)
-            self.host.confirmOrQuit(message, _("node deletion cancelled"))
+                message = _(f"Are you sure to delete node [{self.args.node}] on "
+                            f"service [{self.args.service}]?")
+            await self.host.confirmOrQuit(message, _("node deletion cancelled"))
 
-        self.host.bridge.psNodeDelete(
-            self.args.service,
-            self.args.node,
-            self.profile,
-            callback=self.psNodeDeleteCb,
-            errback=partial(
-                self.errback,
-                msg=_("can't delete node: {}"),
-                exit_code=C.EXIT_BRIDGE_ERRBACK,
-            ),
-        )
+        try:
+            await self.host.bridge.psNodeDelete(
+                self.args.service,
+                self.args.node,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't delete node: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.disp(_(f"node [{self.args.node}] deleted successfully"))
+            self.host.quit()
 
 
 class NodeSet(base.CommandBase):
@@ -271,7 +245,6 @@
             use_verbose=True,
             help=_("set node configuration"),
         )
-        self.need_loop = True
 
     def add_parser_options(self):
         self.parser.add_argument(
@@ -284,32 +257,33 @@
             metavar=("KEY", "VALUE"),
             help=_("configuration field to set (required)"),
         )
-
-    def psNodeConfigurationSetCb(self):
-        self.disp(_("node configuration successful"), 1)
-        self.host.quit()
-
-    def psNodeConfigurationSetEb(self, failure_):
-        self.disp(
-            "can't set node configuration: {reason}".format(reason=failure_), error=True
+        self.parser.add_argument(
+            "-F",
+            "--full-prefix",
+            action="store_true",
+            help=_('don\'t prepend "pubsub#" prefix to field names'),
         )
-        self.host.quit(C.EXIT_BRIDGE_ERRBACK)
 
     def getKeyName(self, k):
-        if not k.startswith("pubsub#"):
+        if self.args.full_prefix or k.startswith("pubsub#"):
+            return k
+        else:
             return "pubsub#" + k
-        else:
-            return k
 
-    def start(self):
-        self.host.bridge.psNodeConfigurationSet(
-            self.args.service,
-            self.args.node,
-            {self.getKeyName(k): v for k, v in self.args.fields},
-            self.profile,
-            callback=self.psNodeConfigurationSetCb,
-            errback=self.psNodeConfigurationSetEb,
-        )
+    async def start(self):
+        try:
+            await self.host.bridge.psNodeConfigurationSet(
+                self.args.service,
+                self.args.node,
+                {self.getKeyName(k): v for k, v in self.args.fields},
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't set node configuration: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.disp(_("node configuration successful"), 1)
+            self.host.quit()
 
 
 class NodeImport(base.CommandBase):
@@ -322,7 +296,6 @@
             pubsub_flags={C.NODE},
             help=_("import raw XML to a node"),
         )
-        self.need_loop = True
 
     def add_parser_options(self):
         self.parser.add_argument(
@@ -337,12 +310,7 @@
                    "whole XML of each item to import."),
         )
 
-    def psItemsSendCb(self, item_ids):
-        self.disp(_('items published with id(s) {item_ids}').format(
-            item_ids=', '.join(item_ids)))
-        self.host.quit()
-
-    def start(self):
+    async def start(self):
         try:
             element, etree = xml_tools.etreeParse(self, self.args.import_file,
                                                   reraise=True)
@@ -364,38 +332,34 @@
                 _("You are not using list of pubsub items, we can't import this file"),
                 error=True)
             self.host.quit(C.EXIT_DATA_ERROR)
+            return
 
-        items = [etree.tostring(i, encoding="utf-8") for i in element]
+        items = [etree.tostring(i, encoding="unicode") for i in element]
         if self.args.admin:
-            self.host.bridge.psAdminItemsSend(
+            method = self.host.bridge.psAdminItemsSend
+        else:
+            self.disp(_("Items are imported without using admin mode, publisher can't "
+                        "be changed"))
+            method = self.host.bridge.psItemsSend
+
+        try:
+            items_ids = await method(
                 self.args.service,
                 self.args.node,
                 items,
                 "",
                 self.profile,
-                callback=partial(self.psItemsSendCb),
-                errback=partial(
-                    self.errback,
-                    msg=_("can't send item: {}"),
-                    exit_code=C.EXIT_BRIDGE_ERRBACK,
-                ),
             )
+        except Exception as e:
+            self.disp(f"can't send items: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
         else:
-            self.disp(_("Items are imported without using admin mode, publisher can't "
-                        "be changed"))
-            self.host.bridge.psItemsSend(
-                self.args.service,
-                self.args.node,
-                items,
-                "",
-                self.profile,
-                callback=partial(self.psItemsSendCb),
-                errback=partial(
-                    self.errback,
-                    msg=_("can't send item: {}"),
-                    exit_code=C.EXIT_BRIDGE_ERRBACK,
-                ),
-            )
+            if items_ids:
+                self.disp(_('items published with id(s) {items_ids}').format(
+                    items_ids=', '.join(items_ids)))
+            else:
+                self.disp(_('items published'))
+            self.host.quit()
 
 
 class NodeAffiliationsGet(base.CommandBase):
@@ -409,29 +373,23 @@
             pubsub_flags={C.NODE},
             help=_("retrieve node affiliations (for node owner)"),
         )
-        self.need_loop = True
 
     def add_parser_options(self):
         pass
 
-    def psNodeAffiliationsGetCb(self, affiliations):
-        self.output(affiliations)
-        self.host.quit()
-
-    def psNodeAffiliationsGetEb(self, failure_):
-        self.disp(
-            "can't get node affiliations: {reason}".format(reason=failure_), error=True
-        )
-        self.host.quit(C.EXIT_BRIDGE_ERRBACK)
-
-    def start(self):
-        self.host.bridge.psNodeAffiliationsGet(
-            self.args.service,
-            self.args.node,
-            self.profile,
-            callback=self.psNodeAffiliationsGetCb,
-            errback=self.psNodeAffiliationsGetEb,
-        )
+    async def start(self):
+        try:
+            affiliations = await self.host.bridge.psNodeAffiliationsGet(
+                self.args.service,
+                self.args.node,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't get node affiliations: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            await self.output(affiliations)
+            self.host.quit()
 
 
 class NodeAffiliationsSet(base.CommandBase):
@@ -445,7 +403,6 @@
             use_verbose=True,
             help=_("set affiliations (for node owner)"),
         )
-        self.need_loop = True
 
     def add_parser_options(self):
         # XXX: we use optional argument syntax for a required one because list of list of 2 elements
@@ -461,26 +418,21 @@
             help=_("entity/affiliation couple(s)"),
         )
 
-    def psNodeAffiliationsSetCb(self):
-        self.disp(_("affiliations have been set"), 1)
-        self.host.quit()
-
-    def psNodeAffiliationsSetEb(self, failure_):
-        self.disp(
-            "can't set node affiliations: {reason}".format(reason=failure_), error=True
-        )
-        self.host.quit(C.EXIT_BRIDGE_ERRBACK)
-
-    def start(self):
+    async def start(self):
         affiliations = dict(self.args.affiliations)
-        self.host.bridge.psNodeAffiliationsSet(
-            self.args.service,
-            self.args.node,
-            affiliations,
-            self.profile,
-            callback=self.psNodeAffiliationsSetCb,
-            errback=self.psNodeAffiliationsSetEb,
-        )
+        try:
+            await self.host.bridge.psNodeAffiliationsSet(
+                self.args.service,
+                self.args.node,
+                affiliations,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't set node affiliations: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.disp(_("affiliations have been set"), 1)
+            self.host.quit()
 
 
 class NodeAffiliations(base.CommandBase):
@@ -506,29 +458,23 @@
             pubsub_flags={C.NODE},
             help=_("retrieve node subscriptions (for node owner)"),
         )
-        self.need_loop = True
 
     def add_parser_options(self):
         pass
 
-    def psNodeSubscriptionsGetCb(self, subscriptions):
-        self.output(subscriptions)
-        self.host.quit()
-
-    def psNodeSubscriptionsGetEb(self, failure_):
-        self.disp(
-            "can't get node subscriptions: {reason}".format(reason=failure_), error=True
-        )
-        self.host.quit(C.EXIT_BRIDGE_ERRBACK)
-
-    def start(self):
-        self.host.bridge.psNodeSubscriptionsGet(
-            self.args.service,
-            self.args.node,
-            self.profile,
-            callback=self.psNodeSubscriptionsGetCb,
-            errback=self.psNodeSubscriptionsGetEb,
-        )
+    async def start(self):
+        try:
+            subscriptions = await self.host.bridge.psNodeSubscriptionsGet(
+                self.args.service,
+                self.args.node,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't get node subscriptions: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            await self.output(subscriptions)
+            self.host.quit()
 
 
 class StoreSubscriptionAction(argparse.Action):
@@ -566,7 +512,6 @@
             use_verbose=True,
             help=_("set/modify subscriptions (for node owner)"),
         )
-        self.need_loop = True
 
     def add_parser_options(self):
         # XXX: we use optional argument syntax for a required one because list of list of 2 elements
@@ -583,25 +528,20 @@
             help=_("entity/subscription couple(s)"),
         )
 
-    def psNodeSubscriptionsSetCb(self):
-        self.disp(_("subscriptions have been set"), 1)
-        self.host.quit()
-
-    def psNodeSubscriptionsSetEb(self, failure_):
-        self.disp(
-            "can't set node subscriptions: {reason}".format(reason=failure_), error=True
-        )
-        self.host.quit(C.EXIT_BRIDGE_ERRBACK)
-
-    def start(self):
-        self.host.bridge.psNodeSubscriptionsSet(
-            self.args.service,
-            self.args.node,
-            self.args.subscriptions,
-            self.profile,
-            callback=self.psNodeSubscriptionsSetCb,
-            errback=self.psNodeSubscriptionsSetEb,
-        )
+    async def start(self):
+        try:
+            self.host.bridge.psNodeSubscriptionsSet(
+                self.args.service,
+                self.args.node,
+                self.args.subscriptions,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't set node subscriptions: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.disp(_("subscriptions have been set"), 1)
+            self.host.quit()
 
 
 class NodeSubscriptions(base.CommandBase):
@@ -627,28 +567,24 @@
             use_verbose=True,
             help=_("set/replace a schema"),
         )
-        self.need_loop = True
 
     def add_parser_options(self):
         self.parser.add_argument("schema", help=_("schema to set (must be XML)"))
 
-    def psSchemaSetCb(self):
-        self.disp(_("schema has been set"), 1)
-        self.host.quit()
-
-    def start(self):
-        self.host.bridge.psSchemaSet(
-            self.args.service,
-            self.args.node,
-            self.args.schema,
-            self.profile,
-            callback=self.psSchemaSetCb,
-            errback=partial(
-                self.errback,
-                msg=_("can't set schema: {}"),
-                exit_code=C.EXIT_BRIDGE_ERRBACK,
-            ),
-        )
+    async def start(self):
+        try:
+            await self.host.bridge.psSchemaSet(
+                self.args.service,
+                self.args.node,
+                self.args.schema,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't set schema: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.disp(_("schema has been set"), 1)
+            self.host.quit()
 
 
 class NodeSchemaEdit(base.CommandBase, common.BaseEdit):
@@ -666,30 +602,26 @@
             help=_("edit a schema"),
         )
         common.BaseEdit.__init__(self, self.host, PUBSUB_SCHEMA_TMP_DIR)
-        self.need_loop = True
 
     def add_parser_options(self):
         pass
 
-    def psSchemaSetCb(self):
-        self.disp(_("schema has been set"), 1)
-        self.host.quit()
+    async def publish(self, schema):
+        try:
+            await self.host.bridge.psSchemaSet(
+                self.args.service,
+                self.args.node,
+                schema,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't set schema: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.disp(_("schema has been set"), 1)
+            self.host.quit()
 
-    def publish(self, schema):
-        self.host.bridge.psSchemaSet(
-            self.args.service,
-            self.args.node,
-            schema,
-            self.profile,
-            callback=self.psSchemaSetCb,
-            errback=partial(
-                self.errback,
-                msg=_("can't set schema: {}"),
-                exit_code=C.EXIT_BRIDGE_ERRBACK,
-            ),
-        )
-
-    def psSchemaGetCb(self, schema):
+    async def psSchemaGetCb(self, schema):
         try:
             from lxml import etree
         except ImportError:
@@ -707,20 +639,20 @@
                 etree.tostring(schema_elt, encoding="utf-8", pretty_print=True)
             )
             content_file_obj.seek(0)
-        self.runEditor("pubsub_schema_editor_args", content_file_path, content_file_obj)
+        await self.runEditor("pubsub_schema_editor_args", content_file_path, content_file_obj)
 
-    def start(self):
-        self.host.bridge.psSchemaGet(
-            self.args.service,
-            self.args.node,
-            self.profile,
-            callback=self.psSchemaGetCb,
-            errback=partial(
-                self.errback,
-                msg=_("can't edit schema: {}"),
-                exit_code=C.EXIT_BRIDGE_ERRBACK,
-            ),
-        )
+    async def start(self):
+        try:
+            schema = await self.host.bridge.psSchemaGet(
+                self.args.service,
+                self.args.node,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't edit schema: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            await self.psSchemaGetCb(schema)
 
 
 class NodeSchemaGet(base.CommandBase):
@@ -735,30 +667,27 @@
             use_verbose=True,
             help=_("get schema"),
         )
-        self.need_loop = True
 
     def add_parser_options(self):
         pass
 
-    def psSchemaGetCb(self, schema):
-        if not schema:
-            self.disp(_("no schema found"), 1)
-            self.host.quit(1)
-        self.output(schema)
-        self.host.quit()
-
-    def start(self):
-        self.host.bridge.psSchemaGet(
-            self.args.service,
-            self.args.node,
-            self.profile,
-            callback=self.psSchemaGetCb,
-            errback=partial(
-                self.errback,
-                msg=_("can't get schema: {}"),
-                exit_code=C.EXIT_BRIDGE_ERRBACK,
-            ),
-        )
+    async def start(self):
+        try:
+            schema = await self.host.bridge.psSchemaGet(
+                self.args.service,
+                self.args.node,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't get schema: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            if schema:
+                await self.output(schema)
+                self.host.quit()
+            else:
+                self.disp(_("no schema found"), 1)
+                self.host.quit(1)
 
 
 class NodeSchema(base.CommandBase):
@@ -799,7 +728,6 @@
             pubsub_flags={C.NODE},
             help=_("publish a new item or update an existing one"),
         )
-        self.need_loop = True
 
     def add_parser_options(self):
         self.parser.add_argument(
@@ -809,32 +737,29 @@
             help=_("id, URL of the item to update, keyword, or nothing for new item"),
         )
 
-    def psItemsSendCb(self, published_id):
-        if published_id:
-            self.disp("Item published at {pub_id}".format(pub_id=published_id))
-        else:
-            self.disp("Item published")
-        self.host.quit(C.EXIT_OK)
-
-    def start(self):
+    async def start(self):
         element, etree = xml_tools.etreeParse(self, sys.stdin)
         element = xml_tools.getPayload(self, element)
         payload = etree.tostring(element, encoding="unicode")
 
-        self.host.bridge.psItemSend(
-            self.args.service,
-            self.args.node,
-            payload,
-            self.args.item,
-            {},
-            self.profile,
-            callback=self.psItemsSendCb,
-            errback=partial(
-                self.errback,
-                msg=_("can't send item: {}"),
-                exit_code=C.EXIT_BRIDGE_ERRBACK,
-            ),
-        )
+        try:
+            published_id = await self.host.bridge.psItemSend(
+                self.args.service,
+                self.args.node,
+                payload,
+                self.args.item,
+                {},
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(_(f"can't send item: {e}"), error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            if published_id:
+                self.disp("Item published at {pub_id}".format(pub_id=published_id))
+            else:
+                self.disp("Item published")
+            self.host.quit(C.EXIT_OK)
 
 
 class Get(base.CommandBase):
@@ -848,7 +773,6 @@
             pubsub_flags={C.NODE, C.MULTI_ITEMS},
             help=_("get pubsub item(s)"),
         )
-        self.need_loop = True
 
     def add_parser_options(self):
         self.parser.add_argument(
@@ -860,26 +784,23 @@
         #  TODO: a key(s) argument to select keys to display
         # TODO: add MAM filters
 
-    def psItemsGetCb(self, ps_result):
-        self.output(ps_result[0])
-        self.host.quit(C.EXIT_OK)
-
-    def psItemsGetEb(self, failure_):
-        self.disp("can't get pubsub items: {reason}".format(reason=failure_), error=True)
-        self.host.quit(C.EXIT_BRIDGE_ERRBACK)
-
-    def start(self):
-        self.host.bridge.psItemsGet(
-            self.args.service,
-            self.args.node,
-            self.args.max,
-            self.args.items,
-            self.args.sub_id,
-            self.getPubsubExtra(),
-            self.profile,
-            callback=self.psItemsGetCb,
-            errback=self.psItemsGetEb,
-        )
+    async def start(self):
+        try:
+            ps_result = await self.host.bridge.psItemsGet(
+                self.args.service,
+                self.args.node,
+                self.args.max,
+                self.args.items,
+                self.args.sub_id,
+                self.getPubsubExtra(),
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't get pubsub items: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            await self.output(ps_result[0])
+            self.host.quit(C.EXIT_OK)
 
 
 class Delete(base.CommandBase):
@@ -892,7 +813,6 @@
             pubsub_flags={C.NODE, C.ITEM, C.SINGLE_ITEM},
             help=_("delete an item"),
         )
-        self.need_loop = True
 
     def add_parser_options(self):
         self.parser.add_argument(
@@ -902,31 +822,28 @@
             "-N", "--notify", action="store_true", help=_("notify deletion")
         )
 
-    def psItemsDeleteCb(self):
-        self.disp(_("item {item_id} has been deleted").format(item_id=self.args.item))
-        self.host.quit(C.EXIT_OK)
-
-    def start(self):
+    async def start(self):
         if not self.args.item:
             self.parser.error(_("You need to specify an item to delete"))
         if not self.args.force:
             message = _("Are you sure to delete item {item_id} ?").format(
                 item_id=self.args.item
             )
-            self.host.confirmOrQuit(message, _("item deletion cancelled"))
-        self.host.bridge.psRetractItem(
-            self.args.service,
-            self.args.node,
-            self.args.item,
-            self.args.notify,
-            self.profile,
-            callback=self.psItemsDeleteCb,
-            errback=partial(
-                self.errback,
-                msg=_("can't delete item: {}"),
-                exit_code=C.EXIT_BRIDGE_ERRBACK,
-            ),
-        )
+            await self.host.confirmOrQuit(message, _("item deletion cancelled"))
+        try:
+            await self.host.bridge.psRetractItem(
+                self.args.service,
+                self.args.node,
+                self.args.item,
+                self.args.notify,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(_(f"can't delete item: {e}"), error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.disp(_(f"item {self.args.item} has been deleted"))
+            self.host.quit(C.EXIT_OK)
 
 
 class Edit(base.CommandBase, common.BaseEdit):
@@ -946,12 +863,8 @@
     def add_parser_options(self):
         pass
 
-    def edit(self, content_file_path, content_file_obj):
-        # we launch editor
-        self.runEditor("pubsub_editor_args", content_file_path, content_file_obj)
-
-    def publish(self, content):
-        published_id = self.host.bridge.psItemSend(
+    async def publish(self, content):
+        published_id = await self.host.bridge.psItemSend(
             self.pubsub_service,
             self.pubsub_node,
             content,
@@ -964,7 +877,7 @@
         else:
             self.disp("Item published")
 
-    def getItemData(self, service, node, item):
+    async def getItemData(self, service, node, item):
         try:
             from lxml import etree
         except ImportError:
@@ -974,10 +887,10 @@
             )
             self.host.quit(1)
         items = [item] if item else []
-        item_raw = self.host.bridge.psItemsGet(
+        item_raw = (await self.host.bridge.psItemsGet(
             service, node, 1, items, "", {}, self.profile
-        )[0][0]
-        parser = etree.XMLParser(remove_blank_text=True)
+        ))[0][0]
+        parser = etree.XMLParser(remove_blank_text=True, recover=True)
         item_elt = etree.fromstring(item_raw, parser)
         item_id = item_elt.get("id")
         try:
@@ -987,11 +900,14 @@
             return ""
         return etree.tostring(payload, encoding="unicode", pretty_print=True), item_id
 
-    def start(self):
-        self.pubsub_service, self.pubsub_node, self.pubsub_item, content_file_path, content_file_obj = (
-            self.getItemPath()
-        )
-        self.edit(content_file_path, content_file_obj)
+    async def start(self):
+        (self.pubsub_service,
+         self.pubsub_node,
+         self.pubsub_item,
+         content_file_path,
+         content_file_obj) = await self.getItemPath()
+        await self.runEditor("pubsub_editor_args", content_file_path, content_file_obj)
+        self.host.quit()
 
 
 class Subscribe(base.CommandBase):
@@ -1005,34 +921,30 @@
             use_verbose=True,
             help=_("subscribe to a node"),
         )
-        self.need_loop = True
 
     def add_parser_options(self):
         pass
 
-    def psSubscribeCb(self, sub_id):
-        self.disp(_("subscription done"), 1)
-        if sub_id:
-            self.disp(_("subscription id: {sub_id}").format(sub_id=sub_id))
-        self.host.quit()
-
-    def start(self):
-        self.host.bridge.psSubscribe(
-            self.args.service,
-            self.args.node,
-            {},
-            self.profile,
-            callback=self.psSubscribeCb,
-            errback=partial(
-                self.errback,
-                msg=_("can't subscribe to node: {}"),
-                exit_code=C.EXIT_BRIDGE_ERRBACK,
-            ),
-        )
+    async def start(self):
+        try:
+            sub_id = await self.host.bridge.psSubscribe(
+                self.args.service,
+                self.args.node,
+                {},
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(_(f"can't subscribe to node: {e}"), error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.disp(_("subscription done"), 1)
+            if sub_id:
+                self.disp(_("subscription id: {sub_id}").format(sub_id=sub_id))
+            self.host.quit()
 
 
 class Unsubscribe(base.CommandBase):
-    # TODO: voir pourquoi NodeNotFound sur subscribe juste après unsubscribe
+    # FIXME: check why we get a a NodeNotFound on subscribe just after unsubscribe
 
     def __init__(self, host):
         base.CommandBase.__init__(
@@ -1044,27 +956,23 @@
             use_verbose=True,
             help=_("unsubscribe from a node"),
         )
-        self.need_loop = True
 
     def add_parser_options(self):
         pass
 
-    def psUnsubscribeCb(self):
-        self.disp(_("subscription removed"), 1)
-        self.host.quit()
-
-    def start(self):
-        self.host.bridge.psUnsubscribe(
-            self.args.service,
-            self.args.node,
-            self.profile,
-            callback=self.psUnsubscribeCb,
-            errback=partial(
-                self.errback,
-                msg=_("can't unsubscribe from node: {}"),
-                exit_code=C.EXIT_BRIDGE_ERRBACK,
-            ),
-        )
+    async def start(self):
+        try:
+            await self.host.bridge.psUnsubscribe(
+                self.args.service,
+                self.args.node,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(_(f"can't unsubscribe from node: {e}"), error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.disp(_("subscription removed"), 1)
+            self.host.quit()
 
 
 class Subscriptions(base.CommandBase):
@@ -1077,27 +985,23 @@
             use_pubsub=True,
             help=_("retrieve all subscriptions on a service"),
         )
-        self.need_loop = True
 
     def add_parser_options(self):
         pass
 
-    def psSubscriptionsGetCb(self, subscriptions):
-        self.output(subscriptions)
-        self.host.quit()
-
-    def start(self):
-        self.host.bridge.psSubscriptionsGet(
-            self.args.service,
-            self.args.node,
-            self.profile,
-            callback=self.psSubscriptionsGetCb,
-            errback=partial(
-                self.errback,
-                msg=_("can't retrieve subscriptions: {}"),
-                exit_code=C.EXIT_BRIDGE_ERRBACK,
-            ),
-        )
+    async def start(self):
+        try:
+            subscriptions = await self.host.bridge.psSubscriptionsGet(
+                self.args.service,
+                self.args.node,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(_(f"can't retrieve subscriptions: {e}"), error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            await self.output(subscriptions)
+            self.host.quit()
 
 
 class Affiliations(base.CommandBase):
@@ -1110,33 +1014,29 @@
             use_pubsub=True,
             help=_("retrieve all affiliations on a service"),
         )
-        self.need_loop = True
 
     def add_parser_options(self):
         pass
 
-    def psAffiliationsGetCb(self, affiliations):
-        self.output(affiliations)
-        self.host.quit()
-
-    def psAffiliationsGetEb(self, failure_):
-        self.disp(
-            "can't get node affiliations: {reason}".format(reason=failure_), error=True
-        )
-        self.host.quit(C.EXIT_BRIDGE_ERRBACK)
-
-    def start(self):
-        self.host.bridge.psAffiliationsGet(
-            self.args.service,
-            self.args.node,
-            self.profile,
-            callback=self.psAffiliationsGetCb,
-            errback=self.psAffiliationsGetEb,
-        )
+    async def start(self):
+        try:
+            affiliations = await self.host.bridge.psAffiliationsGet(
+                self.args.service,
+                self.args.node,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(
+                f"can't get node affiliations: {e}", error=True
+            )
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            await self.output(affiliations)
+            self.host.quit()
 
 
 class Search(base.CommandBase):
-    """this command do a search without using MAM
+    """This command do a search without using MAM
 
     This commands checks every items it finds by itself,
     so it may be heavy in resources both for server and client
@@ -1159,7 +1059,6 @@
             use_verbose=True,
             help=_("search items corresponding to filters"),
         )
-        self.need_loop = True
 
     @property
     def etree(self):
@@ -1317,30 +1216,26 @@
         )
         self.parser.add_argument("command", nargs=argparse.REMAINDER)
 
-    def psItemsGetEb(self, failure_, service, node):
-        self.disp(
-            "can't get pubsub items at {service} (node: {node}): {reason}".format(
-                service=service, node=node, reason=failure_
-            ),
-            error=True,
-        )
-        self.to_get -= 1
-
-    def getItems(self, depth, service, node, items):
-        search = partial(self.search, depth=depth)
-        errback = partial(self.psItemsGetEb, service=service, node=node)
-        self.host.bridge.psItemsGet(
-            service,
-            node,
-            self.args.node_max,
-            items,
-            "",
-            self.getPubsubExtra(),
-            self.profile,
-            callback=search,
-            errback=errback,
-        )
+    async def getItems(self, depth, service, node, items):
         self.to_get += 1
+        try:
+            items_data = await self.host.bridge.psItemsGet(
+                service,
+                node,
+                self.args.node_max,
+                items,
+                "",
+                self.getPubsubExtra(),
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(
+                f"can't get pubsub items at {service} (node: {node}): {e}",
+                error=True,
+            )
+            self.to_get -= 1
+        else:
+            await self.search(items_data, depth)
 
     def _checkPubsubURL(self, match, found_nodes):
         """check that the matched URL is an xmpp: one
@@ -1360,13 +1255,13 @@
                     found_node["item"] = url_data["item"]
                 found_nodes.append(found_node)
 
-    def getSubNodes(self, item, depth):
+    async def getSubNodes(self, item, depth):
         """look for pubsub URIs in item, and getItems on the linked nodes"""
         found_nodes = []
         checkURI = partial(self._checkPubsubURL, found_nodes=found_nodes)
         strings.RE_URL.sub(checkURI, item)
         for data in found_nodes:
-            self.getItems(
+            await self.getItems(
                 depth + 1,
                 data["service"],
                 data["node"],
@@ -1452,7 +1347,11 @@
             elif type_ == "python":
                 if item_xml is None:
                     item_xml = self.parseXml(item)
-                cmd_ns = {"item": item, "item_xml": item_xml}
+                cmd_ns = {
+                    "etree": self.etree,
+                    "item": item,
+                    "item_xml": item_xml
+                    }
                 try:
                     keep = eval(value, cmd_ns)
                 except SyntaxError as e:
@@ -1483,7 +1382,7 @@
 
         return True, item
 
-    def doItemAction(self, item, metadata):
+    async def doItemAction(self, item, metadata):
         """called when item has been kepts and the action need to be done
 
         @param item(unicode): accepted item
@@ -1491,7 +1390,7 @@
         action = self.args.action
         if action == "print" or self.host.verbosity > 0:
             try:
-                self.output(item)
+                await self.output(item)
             except self.etree.XMLSyntaxError:
                 # item is not valid XML, but a string
                 # can happen when --only-matching is used
@@ -1521,22 +1420,22 @@
                 2,
             )
             if action == "exec":
-                ret = subprocess.call(cmd_args)
+                p = await asyncio.create_subprocess_exec(*cmd_args)
+                ret = await p.wait()
             else:
-                p = subprocess.Popen(cmd_args, stdin=subprocess.PIPE)
-                p.communicate(item.encode("utf-8"))
-                ret = p.wait()
+                p = await asyncio.create_subprocess_exec(*cmd_args,
+                                                         stdin=subprocess.PIPE)
+                await p.communicate(item.encode(sys.getfilesystemencoding()))
+                ret = p.returncode
             if ret != 0:
                 self.disp(
                     A.color(
                         C.A_FAILURE,
-                        _("executed command failed with exit code {code}").format(
-                            code=ret
-                        ),
+                        _(f"executed command failed with exit code {ret}"),
                     )
                 )
 
-    def search(self, items_data, depth):
+    async def search(self, items_data, depth):
         """callback of getItems
 
         this method filters items, get sub nodes if needed,
@@ -1548,11 +1447,11 @@
         items, metadata = items_data
         for item in items:
             if depth < self.args.max_depth:
-                self.getSubNodes(item, depth)
+                await self.getSubNodes(item, depth)
             keep, item = self.filter(item)
             if not keep:
                 continue
-            self.doItemAction(item, metadata)
+            await self.doItemAction(item, metadata)
 
         #  we check if we got all getItems results
         self.to_get -= 1
@@ -1561,7 +1460,7 @@
             self.host.quit()
         assert self.to_get > 0
 
-    def start(self):
+    async def start(self):
         if self.args.command:
             if self.args.action not in self.EXEC_ACTIONS:
                 self.parser.error(
@@ -1584,7 +1483,7 @@
         self.args.namespace = dict(
             self.args.namespace + [("pubsub", "http://jabber.org/protocol/pubsub")]
         )
-        self.getItems(0, self.args.service, self.args.node, self.args.items)
+        await self.getItems(0, self.args.service, self.args.node, self.args.items)
 
 
 class Transform(base.CommandBase):
@@ -1597,7 +1496,6 @@
             pubsub_flags={C.NODE, C.MULTI_ITEMS},
             help=_("modify items of a node using an external command/script"),
         )
-        self.need_loop = True
 
     def add_parser_options(self):
         self.parser.add_argument(
@@ -1630,18 +1528,18 @@
                    'Return "DELETE" string to delete the item, and "SKIP" to ignore it'),
         )
 
-    def psItemsSendCb(self, item_ids, metadata):
+    async def psItemsSendCb(self, item_ids, metadata):
         if item_ids:
             self.disp(_('items published with ids {item_ids}').format(
                 item_ids=', '.join(item_ids)))
         else:
             self.disp(_('items published'))
         if self.args.all:
-            return self.handleNextPage(metadata)
+            return await self.handleNextPage(metadata)
         else:
             self.host.quit()
 
-    def handleNextPage(self, metadata):
+    async def handleNextPage(self, metadata):
         """Retrieve new page through RSM or quit if we're in the last page
 
         use to handle --all option
@@ -1672,24 +1570,27 @@
 
         extra = self.getPubsubExtra()
         extra['rsm_after'] = last
-        self.host.bridge.psItemsGet(
-            self.args.service,
-            self.args.node,
-            self.args.rsm_max,
-            self.args.items,
-            "",
-            extra,
-            self.profile,
-            callback=self.psItemsGetCb,
-            errback=partial(
-                self.errback,
-                msg=_("can't retrieve items: {}"),
-                exit_code=C.EXIT_BRIDGE_ERRBACK,
-            ),
-        )
+        try:
+            ps_result = await self.host.bridge.psItemsGet(
+                self.args.service,
+                self.args.node,
+                self.args.rsm_max,
+                self.args.items,
+                "",
+                extra,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(
+                f"can't retrieve items: {e}", error=True
+            )
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            await self.psItemsGetCb(ps_result)
 
-    def psItemsGetCb(self, ps_result):
+    async def psItemsGetCb(self, ps_result):
         items, metadata = ps_result
+        encoding = 'utf-8'
         new_items = []
 
         for item in items:
@@ -1707,48 +1608,46 @@
 
             # we launch the command to filter the item
             try:
-                p = subprocess.Popen(self.args.command_path, stdin=subprocess.PIPE,
-                                                             stdout=subprocess.PIPE)
+                p = await asyncio.create_subprocess_exec(
+                    self.args.command_path,
+                    stdin=subprocess.PIPE,
+                    stdout=subprocess.PIPE)
             except OSError as e:
                 exit_code = C.EXIT_CMD_NOT_FOUND if e.errno == 2 else C.EXIT_ERROR
-                e = str(e).decode('utf-8', errors="ignore")
-                self.disp("Can't execute the command: {msg}".format(msg=e), error=True)
+                self.disp(f"Can't execute the command: {e}", error=True)
                 self.host.quit(exit_code)
-            cmd_std_out, cmd_std_err = p.communicate(item.encode("utf-8"))
-            ret = p.wait()
+            encoding = "utf-8"
+            cmd_std_out, cmd_std_err = await p.communicate(item.encode(encoding))
+            ret = p.returncode
             if ret != 0:
-                self.disp("The command returned a non zero status while parsing the "
-                          "following item:\n\n{item}".format(item=item), error=True)
+                self.disp(f"The command returned a non zero status while parsing the "
+                          f"following item:\n\n{item}", error=True)
                 if self.args.ignore_errors:
                     continue
                 else:
                     self.host.quit(C.EXIT_CMD_ERROR)
             if cmd_std_err is not None:
-                cmd_std_err = cmd_std_err.decode('utf-8', errors='ignore')
+                cmd_std_err = cmd_std_err.decode(encoding, errors='ignore')
                 self.disp(cmd_std_err, error=True)
-            cmd_std_out = cmd_std_out.strip()
+            cmd_std_out = cmd_std_out.decode(encoding).strip()
             if cmd_std_out == "DELETE":
                 item_elt, __ = xml_tools.etreeParse(self, item)
                 item_id = item_elt.get('id')
-                self.disp(_("Deleting item {item_id}").format(item_id=item_id))
+                self.disp(_(f"Deleting item {item_id}"))
                 if self.args.apply:
-                    # FIXME: we don't wait for item to be retracted which can cause
-                    #        trouble in case of error just before the end of the command
-                    #        (the error message may be missed).
-                    #        Once moved to Python 3, we must wait for it by using a
-                    #        coroutine.
-                    self.host.bridge.psRetractItem(
-                        self.args.service,
-                        self.args.node,
-                        item_id,
-                        False,
-                        self.profile,
-                        errback=partial(
-                            self.errback,
-                            msg=_("can't delete item [%s]: {}" % item_id),
-                            exit_code=C.EXIT_BRIDGE_ERRBACK,
-                        ),
-                    )
+                    try:
+                        await self.host.bridge.psRetractItem(
+                            self.args.service,
+                            self.args.node,
+                            item_id,
+                            False,
+                            self.profile,
+                        )
+                    except Exception as e:
+                        self.disp(
+                            f"can't delete item {item_id}: {e}", error=True
+                        )
+                        self.host.quit(C.EXIT_BRIDGE_ERRBACK)
                 continue
             elif cmd_std_out == "SKIP":
                 item_elt, __ = xml_tools.etreeParse(self, item)
@@ -1774,39 +1673,29 @@
         if not self.args.apply:
             # on dry run we have nothing to wait for, we can quit
             if self.args.all:
-                return self.handleNextPage(metadata)
+                return await self.handleNextPage(metadata)
             self.host.quit()
         else:
             if self.args.admin:
-                self.host.bridge.psAdminItemsSend(
+                bridge_method = self.host.bridge.psAdminItemsSend
+            else:
+                bridge_method = self.host.bridge.psItemsSend
+
+            try:
+                ps_result = await bridge_method(
                     self.args.service,
                     self.args.node,
                     new_items,
                     "",
                     self.profile,
-                    callback=partial(self.psItemsSendCb, metadata=metadata),
-                    errback=partial(
-                        self.errback,
-                        msg=_("can't send item: {}"),
-                        exit_code=C.EXIT_BRIDGE_ERRBACK,
-                    ),
                 )
+            except Exception as e:
+                self.disp(f"can't send item: {e}", error=True)
+                self.host.quit(C.EXIT_BRIDGE_ERRBACK)
             else:
-                self.host.bridge.psItemsSend(
-                    self.args.service,
-                    self.args.node,
-                    new_items,
-                    "",
-                    self.profile,
-                    callback=partial(self.psItemsSendCb, metadata=metadata),
-                    errback=partial(
-                        self.errback,
-                        msg=_("can't send item: {}"),
-                        exit_code=C.EXIT_BRIDGE_ERRBACK,
-                    ),
-                )
+                await self.psItemsSendCb(ps_result, metadata=metadata)
 
-    def start(self):
+    async def start(self):
         if self.args.all and self.args.order_by != C.ORDER_BY_CREATION:
             self.check_duplicates = True
             self.items_ids = []
@@ -1819,21 +1708,22 @@
                 "but this method is not safe, and some items may be missed.\n---\n"))
         else:
             self.check_duplicates = False
-        self.host.bridge.psItemsGet(
-            self.args.service,
-            self.args.node,
-            self.args.max,
-            self.args.items,
-            "",
-            self.getPubsubExtra(),
-            self.profile,
-            callback=self.psItemsGetCb,
-            errback=partial(
-                self.errback,
-                msg=_("can't retrieve items: {}"),
-                exit_code=C.EXIT_BRIDGE_ERRBACK,
-            ),
-        )
+
+        try:
+            ps_result = await self.host.bridge.psItemsGet(
+                self.args.service,
+                self.args.node,
+                self.args.max,
+                self.args.items,
+                "",
+                self.getPubsubExtra(),
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't retrieve items: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            await self.psItemsGetCb(ps_result)
 
 
 class Uri(base.CommandBase):
@@ -1847,7 +1737,6 @@
             pubsub_flags={C.NODE, C.SINGLE_ITEM},
             help=_("build URI"),
         )
-        self.need_loop = True
 
     def add_parser_options(self):
         self.parser.add_argument(
@@ -1871,19 +1760,19 @@
         self.disp(uri.buildXMPPUri("pubsub", **uri_args))
         self.host.quit()
 
-    def start(self):
+    async def start(self):
         if not self.args.service:
-            self.host.bridge.asyncGetParamA(
-                "JabberID",
-                "Connection",
-                profile_key=self.args.profile,
-                callback=self.display_uri,
-                errback=partial(
-                    self.errback,
-                    msg=_("can't retrieve jid: {}"),
-                    exit_code=C.EXIT_BRIDGE_ERRBACK,
-                ),
-            )
+            try:
+                jid_ = await self.host.bridge.asyncGetParamA(
+                    "JabberID",
+                    "Connection",
+                    profile_key=self.args.profile
+                )
+            except Exception as e:
+                self.disp(f"can't retrieve jid: {e}", error=True)
+                self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+            else:
+                self.display_uri(jid_)
         else:
             self.display_uri(None)
 
@@ -1898,7 +1787,6 @@
             pubsub_flags={C.NODE},
             help=_("create a Pubsub hook"),
         )
-        self.need_loop = True
 
     def add_parser_options(self):
         self.parser.add_argument(
@@ -1928,22 +1816,22 @@
                     _("{path} is not a file").format(path=self.args.hook_arg)
                 )
 
-    def start(self):
+    async def start(self):
         self.checkArgs(self)
-        self.host.bridge.psHookAdd(
-            self.args.service,
-            self.args.node,
-            self.args.type,
-            self.args.hook_arg,
-            self.args.persistent,
-            self.profile,
-            callback=self.host.quit,
-            errback=partial(
-                self.errback,
-                msg=_("can't create hook: {}"),
-                exit_code=C.EXIT_BRIDGE_ERRBACK,
-            ),
-        )
+        try:
+            await self.host.bridge.psHookAdd(
+                self.args.service,
+                self.args.node,
+                self.args.type,
+                self.args.hook_arg,
+                self.args.persistent,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't create hook: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.host.quit()
 
 
 class HookDelete(base.CommandBase):
@@ -1956,7 +1844,6 @@
             pubsub_flags={C.NODE},
             help=_("delete a Pubsub hook"),
         )
-        self.need_loop = True
 
     def add_parser_options(self):
         self.parser.add_argument(
@@ -1976,27 +1863,22 @@
             ),
         )
 
-    def psHookRemoveCb(self, nb_deleted):
-        self.disp(
-            _("{nb_deleted} hook(s) have been deleted").format(nb_deleted=nb_deleted)
-        )
-        self.host.quit()
-
-    def start(self):
+    async def start(self):
         HookCreate.checkArgs(self)
-        self.host.bridge.psHookRemove(
-            self.args.service,
-            self.args.node,
-            self.args.type,
-            self.args.hook_arg,
-            self.profile,
-            callback=self.psHookRemoveCb,
-            errback=partial(
-                self.errback,
-                msg=_("can't delete hook: {}"),
-                exit_code=C.EXIT_BRIDGE_ERRBACK,
-            ),
-        )
+        try:
+            nb_deleted = await self.host.bridge.psHookRemove(
+                self.args.service,
+                self.args.node,
+                self.args.type,
+                self.args.hook_arg,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't delete hook: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.disp(_(f"{nb_deleted} hook(s) have been deleted"))
+            self.host.quit()
 
 
 class HookList(base.CommandBase):
@@ -2008,27 +1890,23 @@
             use_output=C.OUTPUT_LIST_DICT,
             help=_("list hooks of a profile"),
         )
-        self.need_loop = True
 
     def add_parser_options(self):
         pass
 
-    def psHookListCb(self, data):
-        if not data:
-            self.disp(_("No hook found."))
-        self.output(data)
-        self.host.quit()
-
-    def start(self):
-        self.host.bridge.psHookList(
-            self.profile,
-            callback=self.psHookListCb,
-            errback=partial(
-                self.errback,
-                msg=_("can't list hooks: {}"),
-                exit_code=C.EXIT_BRIDGE_ERRBACK,
-            ),
-        )
+    async def start(self):
+        try:
+            data = await self.host.bridge.psHookList(
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't list hooks: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            if not data:
+                self.disp(_("No hook found."))
+            await self.output(data)
+            self.host.quit()
 
 
 class Hook(base.CommandBase):