diff sat_frontends/jp/base.py @ 3040:fee60f17ebac

jp: jp asyncio port: /!\ this commit is huge. Jp is temporarily not working with `dbus` bridge /!\ This patch implements the port of jp to asyncio, so it is now correctly using the bridge asynchronously, and it can be used with bridges like `pb`. This also simplify the code, notably for things which were previously implemented with many callbacks (like pagination with RSM). During the process, some behaviours have been modified/fixed, in jp and backends, check diff for details.
author Goffi <goffi@goffi.org>
date Wed, 25 Sep 2019 08:56:41 +0200
parents ab2696e34d29
children 3df611adb598
line wrap: on
line diff
--- a/sat_frontends/jp/base.py	Wed Sep 25 08:53:38 2019 +0200
+++ b/sat_frontends/jp/base.py	Wed Sep 25 08:56:41 2019 +0200
@@ -17,18 +17,21 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
+import asyncio
 from sat.core.i18n import _
 
 ### logging ###
 import logging as log
-log.basicConfig(level=log.DEBUG,
-                format='%(message)s')
+log.basicConfig(level=log.WARNING,
+                format='[%(name)s] %(message)s')
 ###
 
 import sys
-import locale
+import os
 import os.path
 import argparse
+import inspect
+from pathlib import Path
 from glob import iglob
 from importlib import import_module
 from sat_frontends.tools.jid import JID
@@ -41,13 +44,20 @@
 from sat_frontends.jp.constants import Const as C
 from sat_frontends.tools import misc
 import xml.etree.ElementTree as ET  # FIXME: used temporarily to manage XMLUI
-import shlex
 from collections import OrderedDict
 
 ## bridge handling
 # we get bridge name from conf and initialise the right class accordingly
 main_config = config.parseMainConf()
 bridge_name = config.getConfig(main_config, '', 'bridge', 'dbus')
+USER_INTER_MSG = _("User interruption: good bye")
+
+
+class QuitException(BaseException):
+    """Quitting is requested
+
+    This is used to stop execution when host.quit() is called
+    """
 
 
 # TODO: move loops handling in a separated module
@@ -63,8 +73,9 @@
         def run(self):
             self.loop.run()
 
-        def quit(self):
+        def quit(self, exit_code):
             self.loop.quit()
+            sys.exit(exit_code)
 
         def call_later(self, delay, callback, *args):
             """call a callback repeatedly
@@ -78,27 +89,66 @@
             GLib.timeout_add(delay, callback, *args)
 
 else:
-    print("can't start jp: only D-Bus bridge is currently handled")
-    sys.exit(C.EXIT_ERROR)
-    # FIXME: twisted loop can be used when jp can handle fully async bridges
-    # from twisted.internet import reactor
+    import signal
+    from twisted.internet import asyncioreactor
+    asyncioreactor.install()
+    from twisted.internet import reactor, defer
+
+    class JPLoop(object):
 
-    # class JPLoop(object):
+        def __init__(self):
+            # exit code must be set when using quit, so if it's not set
+            # something got wrong and we must report it
+            self._exit_code = C.EXIT_INTERNAL_ERROR
 
-    #     def run(self):
-    #         reactor.run()
+        def run(self, jp, *args):
+            self.jp = jp
+            signal.signal(signal.SIGINT, self._on_sigint)
+            defer.ensureDeferred(self._start(jp, *args))
+            try:
+                reactor.run(installSignalHandlers=False)
+            except SystemExit as e:
+                self._exit_code = e.code
+            sys.exit(self._exit_code)
 
-    #     def quit(self):
-    #         reactor.stop()
+        async def _start(self, jp, *args):
+            fut = asyncio.ensure_future(jp.main(*args))
+            try:
+                await defer.Deferred.fromFuture(fut)
+            except BaseException:
+                import traceback
+                traceback.print_exc()
+                jp.quit(1)
+
+        def quit(self, exit_code):
+            self._exit_code = exit_code
+            reactor.stop()
 
-    #     def _timeout_cb(self, args, callback, delay):
-    #         ret = callback(*args)
-    #         if ret:
-    #             reactor.callLater(delay, self._timeout_cb, args, callback, delay)
+        def _timeout_cb(self, args, callback, delay):
+            try:
+                ret = callback(*args)
+            # FIXME: temporary hack to avoid traceback when using XMLUI
+            #        to be removed once create_task is not used anymore in
+            #        xmlui_manager (i.e. once sat_frontends.tools.xmlui fully supports
+            #        async syntax)
+            except QuitException:
+                return
+            if ret:
+                reactor.callLater(delay, self._timeout_cb, args, callback, delay)
 
-    #     def call_later(self, delay, callback, *args):
-    #         delay = float(delay) / 1000
-    #         reactor.callLater(delay, self._timeout_cb, args, callback, delay)
+        def call_later(self, delay, callback, *args):
+            delay = float(delay) / 1000
+            reactor.callLater(delay, self._timeout_cb, args, callback, delay)
+
+        def _on_sigint(self, sig_number, stack_frame):
+            """Called on keyboard interruption
+
+            Print user interruption message, set exit code and stop reactor
+            """
+            print("\r" + USER_INTER_MSG)
+            self._exit_code = C.EXIT_USER_CANCELLED
+            reactor.callFromThread(reactor.stop)
+
 
 if bridge_name == "embedded":
     from sat.core import sat_main
@@ -107,9 +157,10 @@
 try:
     import progressbar
 except ImportError:
-    msg = (_('ProgressBar not available, please download it at http://pypi.python.org/pypi/progressbar\n') +
-           _('Progress bar deactivated\n--\n'))
-    print(msg.encode('utf-8'), file=sys.stderr)
+    msg = (_('ProgressBar not available, please download it at '
+             'http://pypi.python.org/pypi/progressbar\n'
+             'Progress bar deactivated\n--\n'))
+    print(msg, file=sys.stderr)
     progressbar=None
 
 #consts
@@ -122,7 +173,7 @@
 This is free software, and you are welcome to redistribute it under certain conditions.
 """
 
-PROGRESS_DELAY = 10 # the progression will be checked every PROGRESS_DELAY ms
+PROGRESS_DELAY = 0.1 # the progression will be checked every PROGRESS_DELAY s
 
 
 def date_decoder(arg):
@@ -141,33 +192,30 @@
     def __init__(self):
         """
 
-        @attribute quit_on_progress_end (bool): set to False if you manage yourself exiting,
-            or if you want the user to stop by himself
+        @attribute quit_on_progress_end (bool): set to False if you manage yourself
+            exiting, or if you want the user to stop by himself
         @attribute progress_success(callable): method to call when progress just started
             by default display a message
-        @attribute progress_success(callable): method to call when progress is successfully finished
-            by default display a message
+        @attribute progress_success(callable): method to call when progress is
+            successfully finished by default display a message
         @attribute progress_failure(callable): method to call when progress failed
             by default display a message
         """
-        # FIXME: need_loop should be removed, everything must be async in bridge so
-        #        loop will always be needed
         bridge_module = dynamic_import.bridge(bridge_name, 'sat_frontends.bridge')
         if bridge_module is None:
             log.error("Can't import {} bridge".format(bridge_name))
             sys.exit(1)
 
-        self.bridge = bridge_module.Bridge()
-        self.bridge.bridgeConnect(callback=self._bridgeCb, errback=self._bridgeEb)
+        self.bridge = bridge_module.AIOBridge()
+        self._onQuitCallbacks = []
 
-    def _bridgeCb(self):
-        self.parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
-                                              description=DESCRIPTION)
+    def _bridgeConnected(self):
+        self.parser = argparse.ArgumentParser(
+            formatter_class=argparse.RawDescriptionHelpFormatter, description=DESCRIPTION)
         self._make_parents()
         self.add_parser_options()
-        self.subparsers = self.parser.add_subparsers(title=_('Available commands'), dest='subparser_name')
-        self._auto_loop = False # when loop is used for internal reasons
-        self._need_loop = False
+        self.subparsers = self.parser.add_subparsers(
+            title=_('Available commands'), dest='command', required=True)
 
         # progress attributes
         self._progress_id = None # TODO: manage several progress ids
@@ -181,27 +229,14 @@
 
         self.own_jid = None  # must be filled at runtime if needed
 
-    def _bridgeEb(self, failure):
-        if isinstance(failure, exceptions.BridgeExceptionNoService):
-            print((_("Can't connect to SàT backend, are you sure it's launched ?")))
-        elif isinstance(failure, exceptions.BridgeInitError):
-            print((_("Can't init bridge")))
-        else:
-            print((_("Error while initialising bridge: {}".format(failure))))
-        sys.exit(C.EXIT_BRIDGE_ERROR)
-
-    @property
-    def version(self):
-        return self.bridge.getVersion()
-
     @property
     def progress_id(self):
         return self._progress_id
 
-    @progress_id.setter
-    def progress_id(self, value):
-        self._progress_id = value
-        self.replayCache('progress_ids_cache')
+    async def set_progress_id(self, progress_id):
+        # because we use async, we need an explicit setter
+        self._progress_id = progress_id
+        await self.replayCache('progress_ids_cache')
 
     @property
     def watch_progress(self):
@@ -224,13 +259,13 @@
         except AttributeError:
             return 0
 
-    def replayCache(self, cache_attribute):
+    async def replayCache(self, cache_attribute):
         """Replay cached signals
 
         @param cache_attribute(str): name of the attribute containing the cache
             if the attribute doesn't exist, there is no cache and the call is ignored
-            else the cache must be a list of tuples containing the replay callback as first item,
-            then the arguments to use
+            else the cache must be a list of tuples containing the replay callback as
+            first item, then the arguments to use
         """
         try:
             cache = getattr(self, cache_attribute)
@@ -238,7 +273,7 @@
             pass
         else:
             for cache_data in cache:
-                cache_data[0](*cache_data[1:])
+                await cache_data[0](*cache_data[1:])
 
     def disp(self, msg, verbosity=0, error=False, no_lf=False):
         """Print a message to user
@@ -260,23 +295,22 @@
                 else:
                     print(msg)
 
-    def output(self, type_, name, extra_outputs, data):
+    async def output(self, type_, name, extra_outputs, data):
         if name in extra_outputs:
-            extra_outputs[name](data)
+            method = extra_outputs[name]
         else:
-            self._outputs[type_][name]['callback'](data)
+            method = self._outputs[type_][name]['callback']
+
+        ret = method(data)
+        if inspect.isawaitable(ret):
+            await ret
 
     def addOnQuitCallback(self, callback, *args, **kwargs):
         """Add a callback which will be called on quit command
 
         @param callback(callback): method to call
         """
-        try:
-            callbacks_list = self._onQuitCallbacks
-        except AttributeError:
-            callbacks_list = self._onQuitCallbacks = []
-        finally:
-            callbacks_list.append((callback, args, kwargs))
+        self._onQuitCallbacks.append((callback, args, kwargs))
 
     def getOutputChoices(self, output_type):
         """Return valid output filters for output_type
@@ -289,33 +323,52 @@
     def _make_parents(self):
         self.parents = {}
 
-        # we have a special case here as the start-session option is present only if connection is not needed,
-        # so we create two similar parents, one with the option, the other one without it
+        # we have a special case here as the start-session option is present only if
+        # connection is not needed, so we create two similar parents, one with the
+        # option, the other one without it
         for parent_name in ('profile', 'profile_session'):
             parent = self.parents[parent_name] = argparse.ArgumentParser(add_help=False)
-            parent.add_argument("-p", "--profile", action="store", type=str, default='@DEFAULT@', help=_("Use PROFILE profile key (default: %(default)s)"))
-            parent.add_argument("--pwd", action="store", default='', metavar='PASSWORD', help=_("Password used to connect profile, if necessary"))
+            parent.add_argument(
+                "-p", "--profile", action="store", type=str, default='@DEFAULT@',
+                help=_("Use PROFILE profile key (default: %(default)s)"))
+            parent.add_argument(
+                "--pwd", action="store", default='', metavar='PASSWORD',
+                help=_("Password used to connect profile, if necessary"))
 
-        profile_parent, profile_session_parent = self.parents['profile'], self.parents['profile_session']
+        profile_parent, profile_session_parent = (self.parents['profile'],
+                                                  self.parents['profile_session'])
 
-        connect_short, connect_long, connect_action, connect_help = "-c", "--connect", "store_true", _("Connect the profile before doing anything else")
-        profile_parent.add_argument(connect_short, connect_long, action=connect_action, help=connect_help)
+        connect_short, connect_long, connect_action, connect_help = (
+            "-c", "--connect", "store_true",
+            _("Connect the profile before doing anything else")
+        )
+        profile_parent.add_argument(
+            connect_short, connect_long, action=connect_action, help=connect_help)
 
         profile_session_connect_group = profile_session_parent.add_mutually_exclusive_group()
-        profile_session_connect_group.add_argument(connect_short, connect_long, action=connect_action, help=connect_help)
-        profile_session_connect_group.add_argument("--start-session", action="store_true", help=_("Start a profile session without connecting"))
+        profile_session_connect_group.add_argument(
+            connect_short, connect_long, action=connect_action, help=connect_help)
+        profile_session_connect_group.add_argument(
+            "--start-session", action="store_true",
+            help=_("Start a profile session without connecting"))
 
-        progress_parent = self.parents['progress'] = argparse.ArgumentParser(add_help=False)
+        progress_parent = self.parents['progress'] = argparse.ArgumentParser(
+            add_help=False)
         if progressbar:
-            progress_parent.add_argument("-P", "--progress", action="store_true", help=_("Show progress bar"))
+            progress_parent.add_argument(
+                "-P", "--progress", action="store_true", help=_("Show progress bar"))
 
         verbose_parent = self.parents['verbose'] = argparse.ArgumentParser(add_help=False)
-        verbose_parent.add_argument('--verbose', '-v', action='count', default=0, help=_("Add a verbosity level (can be used multiple times)"))
+        verbose_parent.add_argument(
+            '--verbose', '-v', action='count', default=0,
+            help=_("Add a verbosity level (can be used multiple times)"))
 
         draft_parent = self.parents['draft'] = argparse.ArgumentParser(add_help=False)
         draft_group = draft_parent.add_argument_group(_('draft handling'))
-        draft_group.add_argument("-D", "--current", action="store_true", help=_("load current draft"))
-        draft_group.add_argument("-F", "--draft-path", help=_("path to a draft file to retrieve"))
+        draft_group.add_argument(
+            "-D", "--current", action="store_true", help=_("load current draft"))
+        draft_group.add_argument(
+            "-F", "--draft-path", type=Path, help=_("path to a draft file to retrieve"))
 
 
     def make_pubsub_group(self, flags, defaults):
@@ -356,11 +409,14 @@
                     item_help += _(" (DEFAULT: {default})".format(default=default))
             pubsub_group.add_argument("-i", "--item", default='',
                                       help=item_help)
-            pubsub_group.add_argument("-L", "--last-item", action='store_true', help=_('retrieve last item'))
+            pubsub_group.add_argument(
+                "-L", "--last-item", action='store_true', help=_('retrieve last item'))
         elif flags.multi_items:
             # mutiple items, this activate several features: max-items, RSM, MAM
             # and Orbder-by
-            pubsub_group.add_argument("-i", "--item", action='append', dest='items', default=[], help=_("items to retrieve (DEFAULT: all)"))
+            pubsub_group.add_argument(
+                "-i", "--item", action='append', dest='items', default=[],
+                help=_("items to retrieve (DEFAULT: all)"))
             if not flags.no_max:
                 max_group = pubsub_group.add_mutually_exclusive_group()
                 # XXX: defaut value for --max-items or --max is set in parse_pubsub_args
@@ -409,14 +465,22 @@
                 help=_("how items should be ordered"))
 
         if not flags.all_used:
-            raise exceptions.InternalError('unknown flags: {flags}'.format(flags=', '.join(flags.unused)))
+            raise exceptions.InternalError('unknown flags: {flags}'.format(
+                flags=', '.join(flags.unused)))
         if defaults:
-            raise exceptions.InternalError('unused defaults: {defaults}'.format(defaults=defaults))
+            raise exceptions.InternalError(f'unused defaults: {defaults}')
 
         return parent
 
     def add_parser_options(self):
-        self.parser.add_argument('--version', action='version', version=("%(name)s %(version)s %(copyleft)s" % {'name': PROG_NAME, 'version': self.version, 'copyleft': COPYLEFT}))
+        self.parser.add_argument(
+            '--version',
+            action='version',
+            version=("{name} {version} {copyleft}".format(
+                name = PROG_NAME,
+                version = self.version,
+                copyleft = COPYLEFT))
+        )
 
     def register_output(self, type_, name, callback, description="", default=False):
         if type_ not in C.OUTPUT_TYPES:
@@ -427,7 +491,9 @@
                                      }
         if default:
             if type_ in self.default_output:
-                self.disp(_('there is already a default output for {}, ignoring new one').format(type_))
+                self.disp(
+                    _(f'there is already a default output for {type_}, ignoring new one')
+                )
             else:
                 self.default_output[type_] = name
 
@@ -445,7 +511,8 @@
 
     def check_output_options(self, accepted_set, options):
         if not accepted_set.issuperset(options):
-            self.disp("The following output options are invalid: {invalid_options}".format(
+            self.disp(
+                _("The following output options are invalid: {invalid_options}").format(
                 invalid_options = ', '.join(set(options).difference(accepted_set))),
                 error=True)
             self.quit(C.EXIT_BAD_ARG)
@@ -457,17 +524,20 @@
         """
         path = os.path.dirname(sat_frontends.jp.__file__)
         # XXX: outputs must be imported before commands as they are used for arguments
-        for type_, pattern in ((C.PLUGIN_OUTPUT, 'output_*.py'), (C.PLUGIN_CMD, 'cmd_*.py')):
-            modules = (os.path.splitext(module)[0] for module in map(os.path.basename, iglob(os.path.join(path, pattern))))
+        for type_, pattern in ((C.PLUGIN_OUTPUT, 'output_*.py'),
+                               (C.PLUGIN_CMD, 'cmd_*.py')):
+            modules = (
+                os.path.splitext(module)[0]
+                for module in map(os.path.basename, iglob(os.path.join(path, pattern))))
             for module_name in modules:
                 module_path = "sat_frontends.jp." + module_name
                 try:
                     module = import_module(module_path)
                     self.import_plugin_module(module, type_)
                 except ImportError as e:
-                    self.disp(_("Can't import {module_path} plugin, ignoring it: {msg}".format(
-                    module_path = module_path,
-                    msg = e)), error=True)
+                    self.disp(
+                        _(f"Can't import {module_path} plugin, ignoring it: {e}"),
+                        error=True)
                 except exceptions.CancelError:
                     continue
                 except exceptions.MissingModule as e:
@@ -485,7 +555,7 @@
         try:
             class_names =  getattr(module, '__{}__'.format(type_))
         except AttributeError:
-            log.disp(_("Invalid plugin module [{type}] {module}").format(type=type_, module=module), error=True)
+            log.disp(_(f"Invalid plugin module [{type_}] {module}"), error=True)
             raise ImportError
         else:
             for class_name in class_names:
@@ -500,12 +570,15 @@
             scheme = 'http'
         else:
             raise exceptions.InternalError('An HTTP scheme is expected in this method')
-        self.disp("{scheme} URL found, trying to find associated xmpp: URI".format(scheme=scheme.upper()),1)
+        self.disp(f"{scheme.upper()} URL found, trying to find associated xmpp: URI", 1)
         # HTTP URL, we try to find xmpp: links
         try:
             from lxml import etree
         except ImportError:
-            self.disp("lxml module must be installed to use http(s) scheme, please install it with \"pip install lxml\"", error=True)
+            self.disp(
+                "lxml module must be installed to use http(s) scheme, please install it "
+                "with \"pip install lxml\"",
+                error=True)
             self.quit(1)
         import urllib.request, urllib.error, urllib.parse
         parser = etree.HTMLParser()
@@ -517,7 +590,10 @@
         else:
             links = root.xpath("//link[@rel='alternate' and starts-with(@href, 'xmpp:')]")
         if not links:
-            self.disp('Could not find alternate "xmpp:" URI, can\'t find associated XMPP PubSub node/item', error=True)
+            self.disp(
+                _('Could not find alternate "xmpp:" URI, can\'t find associated XMPP '
+                  'PubSub node/item'),
+                error=True)
             self.quit(1)
         xmpp_uri = links[0].get('href')
         return xmpp_uri
@@ -552,7 +628,10 @@
                             try:
                                 items = self.args.items
                             except AttributeError:
-                                self.disp(_("item specified in URL but not needed in command, ignoring it"), error=True)
+                                self.disp(
+                                    _("item specified in URL but not needed in command, "
+                                      "ignoring it"),
+                                    error=True)
                             else:
                                 if not items:
                                     self.args.items = [uri_item]
@@ -565,7 +644,7 @@
                                 if not item_last:
                                     self.args.item = uri_item
                 else:
-                    self.parser.error(_('XMPP URL is not a pubsub one: {url}').format(url=url))
+                    self.parser.error(_(f'XMPP URL is not a pubsub one: {url}'))
         flags = self.args._cmd._pubsub_flags
         # we check required arguments here instead of using add_arguments' required option
         # because the required argument can be set in URL
@@ -580,7 +659,8 @@
         #        so we check conflict here. This may be fixed in Python 3, to be checked
         try:
             if self.args.item and self.args.item_last:
-                self.parser.error(_("--item and --item-last can't be used at the same time"))
+                self.parser.error(
+                    _("--item and --item-last can't be used at the same time"))
         except AttributeError:
             pass
 
@@ -605,46 +685,78 @@
             if self.args.max is None:
                 self.args.max = C.NO_LIMIT
 
-    def run(self, args=None, namespace=None):
-        self.args = self.parser.parse_args(args, namespace=None)
-        if self.args._cmd._use_pubsub:
-            self.parse_pubsub_args()
+    async def main(self, args, namespace):
         try:
-            self.args._cmd.run()
-            if self._need_loop or self._auto_loop:
-                self._start_loop()
-        except KeyboardInterrupt:
-            self.disp(_("User interruption: good bye"))
+            await self.bridge.bridgeConnect()
+        except Exception as e:
+            if isinstance(e, exceptions.BridgeExceptionNoService):
+                print((_("Can't connect to SàT backend, are you sure it's launched ?")))
+            elif isinstance(e, exceptions.BridgeInitError):
+                print((_("Can't init bridge")))
+            else:
+                print((_(f"Error while initialising bridge: {e}")))
+            self.quit(C.EXIT_BRIDGE_ERROR, raise_exc=False)
+            return
+        self.version = await self.bridge.getVersion()
+        self._bridgeConnected()
+        self.import_plugins()
+        try:
+            self.args = self.parser.parse_args(args, namespace=None)
+            if self.args._cmd._use_pubsub:
+                self.parse_pubsub_args()
+            await self.args._cmd.run()
+        except SystemExit as e:
+            self.quit(e.code, raise_exc=False)
+            return
+        except QuitException:
+            return
 
-    def _start_loop(self):
+    def run(self, args=None, namespace=None):
         self.loop = JPLoop()
-        self.loop.run()
+        self.loop.run(self, args, namespace)
 
-    def stop_loop(self):
-        try:
-            self.loop.quit()
-        except AttributeError:
-            pass
+    def _read_stdin(self, stdin_fut):
+        """Callback called by ainput to read stdin"""
+        line = sys.stdin.readline()
+        if line:
+            stdin_fut.set_result(line.rstrip(os.linesep))
+        else:
+            stdin_fut.set_exception(EOFError())
 
-    def confirmOrQuit(self, message, cancel_message=_("action cancelled by user")):
+    async def ainput(self, msg=''):
+        """Asynchronous version of buildin "input" function"""
+        self.disp(msg, no_lf=True)
+        sys.stdout.flush()
+        loop = asyncio.get_running_loop()
+        stdin_fut = loop.create_future()
+        loop.add_reader(sys.stdin, self._read_stdin, stdin_fut)
+        return await stdin_fut
+
+    async def confirmOrQuit(self, message, cancel_message=_("action cancelled by user")):
         """Request user to confirm action, and quit if he doesn't"""
 
-        res = input("{} (y/N)? ".format(message))
+        res = await self.ainput(f"{message} (y/N)? ")
         if res not in ("y", "Y"):
             self.disp(cancel_message)
             self.quit(C.EXIT_USER_CANCELLED)
 
-    def quitFromSignal(self, errcode=0):
-        """Same as self.quit, but from a signal handler
+    def quitFromSignal(self, exit_code=0):
+        r"""Same as self.quit, but from a signal handler
 
         /!\: return must be used after calling this method !
         """
-        assert self._need_loop
         # XXX: python-dbus will show a traceback if we exit in a signal handler
         # so we use this little timeout trick to avoid it
-        self.loop.call_later(0, self.quit, errcode)
+        self.loop.call_later(0, self.quit, exit_code)
+
+    def quit(self, exit_code=0, raise_exc=True):
+        """Terminate the execution with specified exit_code
 
-    def quit(self, errcode=0):
+        This will stop the loop.
+        @param exit_code(int): code to return when quitting the program
+        @param raise_exp(boolean): if True raise a QuitException to stop code execution
+            The default value should be used most of time.
+        """
         # first the onQuitCallbacks
         try:
             callbacks_list = self._onQuitCallbacks
@@ -654,10 +766,11 @@
             for callback, args, kwargs in callbacks_list:
                 callback(*args, **kwargs)
 
-        self.stop_loop()
-        sys.exit(errcode)
+        self.loop.quit(exit_code)
+        if raise_exc:
+            raise QuitException
 
-    def check_jids(self, jids):
+    async def check_jids(self, jids):
         """Check jids validity, transform roster name to corresponding jids
 
         @param profile: profile name
@@ -668,7 +781,7 @@
         names2jid = {}
         nodes2jid = {}
 
-        for contact in self.bridge.getContacts(self.profile):
+        for contact in await self.bridge.getContacts(self.profile):
             jid_s, attr, groups = contact
             _jid = JID(jid_s)
             try:
@@ -704,29 +817,20 @@
 
         return dest_jids
 
-    def connect_profile(self, callback):
-        """ Check if the profile is connected and do it if requested
+    async def connect_profile(self):
+        """Check if the profile is connected and do it if requested
 
-        @param callback: method to call when profile is connected
         @exit: - 1 when profile is not connected and --connect is not set
                - 1 when the profile doesn't exists
                - 1 when there is a connection error
         """
         # FIXME: need better exit codes
 
-        def cant_connect(failure):
-            log.error(_("Can't connect profile: {reason}").format(reason=failure))
-            self.quit(1)
-
-        def cant_start_session(failure):
-            log.error(_("Can't start {profile}'s session: {reason}").format(profile=self.profile, reason=failure))
-            self.quit(1)
-
-        self.profile = self.bridge.profileNameGet(self.args.profile)
+        self.profile = await self.bridge.profileNameGet(self.args.profile)
 
         if not self.profile:
-            log.error(_("The profile [{profile}] doesn't exist").format(profile=self.args.profile))
-            self.quit(1)
+            log.error(_(f"The profile [{self.args.profile}] doesn't exist"))
+            self.quit(C.EXIT_ERROR)
 
         try:
             start_session = self.args.start_session
@@ -734,40 +838,49 @@
             pass
         else:
             if start_session:
-                self.bridge.profileStartSession(self.args.pwd, self.profile, lambda __: callback(), cant_start_session)
-                self._auto_loop = True
+                try:
+                    await self.bridge.profileStartSession(self.args.pwd, self.profile)
+                except Exception as e:
+                    self.disp(_(f"Can't start {self.profile}'s session: {e}"), err=True)
+                    self.quit(1)
                 return
-            elif not self.bridge.profileIsSessionStarted(self.profile):
+            elif not await self.bridge.profileIsSessionStarted(self.profile):
                 if not self.args.connect:
-                    log.error(_("Session for [{profile}] is not started, please start it before using jp, or use either --start-session or --connect option").format(profile=self.profile))
+                    self.disp(_(
+                        f"Session for [{self.profile}] is not started, please start it "
+                        f"before using jp, or use either --start-session or --connect "
+                        f"option"), error=True)
                     self.quit(1)
             elif not getattr(self.args, "connect", False):
-                callback()
                 return
 
 
         if not hasattr(self.args, 'connect'):
-            # a profile can be present without connect option (e.g. on profile creation/deletion)
+            # a profile can be present without connect option (e.g. on profile
+            # creation/deletion)
             return
         elif self.args.connect is True:  # if connection is asked, we connect the profile
-            self.bridge.connect(self.profile, self.args.pwd, {}, lambda __: callback(), cant_connect)
-            self._auto_loop = True
+            try:
+                await self.bridge.connect(self.profile, self.args.pwd, {})
+            except Exception as e:
+                self.disp(_(f"Can't connect profile: {e}"), error=True)
+                self.quit(1)
             return
         else:
-            if not self.bridge.isConnected(self.profile):
-                log.error(_("Profile [{profile}] is not connected, please connect it before using jp, or use --connect option").format(profile=self.profile))
+            if not await self.bridge.isConnected(self.profile):
+                log.error(
+                    _(f"Profile [{self.profile}] is not connected, please connect it "
+                      f"before using jp, or use --connect option"))
                 self.quit(1)
 
-        callback()
-
-    def get_full_jid(self, param_jid):
+    async def get_full_jid(self, param_jid):
         """Return the full jid if possible (add main resource when find a bare jid)"""
         _jid = JID(param_jid)
         if not _jid.resource:
             #if the resource is not given, we try to add the main resource
-            main_resource = self.bridge.getMainResource(param_jid, self.profile)
+            main_resource = await self.bridge.getMainResource(param_jid, self.profile)
             if main_resource:
-                return "%s/%s" % (_jid.bare, main_resource)
+                return f"{_jid.bare}/{main_resource}"
         return param_jid
 
 
@@ -783,7 +896,8 @@
         @param use_output(bool, unicode): if not False, add --output option
         @param extra_outputs(dict): list of command specific outputs:
             key is output name ("default" to use as main output)
-            value is a callable which will format the output (data will be used as only argument)
+            value is a callable which will format the output (data will be used as only
+            argument)
             if a key already exists with normal outputs, the extra one will be used
         @param need_connect(bool, None): True if profile connection is needed
             False else (profile session must still be started)
@@ -799,14 +913,13 @@
                 mandatory arguments are controlled by pubsub_req
             - use_draft(bool): if True, add draft handling options
             ** other arguments **
-            - pubsub_flags(iterable[unicode]): tuple of flags to set pubsub options, can be:
+            - pubsub_flags(iterable[unicode]): tuple of flags to set pubsub options,
+              can be:
                 C.SERVICE: service is required
                 C.NODE: node is required
                 C.ITEM: item is required
                 C.SINGLE_ITEM: only one item is allowed
-        @attribute need_loop(bool): to set by commands when loop is needed
         """
-        self.need_loop = False # to be set by commands when loop is needed
         try: # If we have subcommands, host is a CommandBase and we need to use host.host
             self.host = host.host
         except AttributeError:
@@ -815,10 +928,12 @@
         # --profile option
         parents = kwargs.setdefault('parents', set())
         if use_profile:
-            #self.host.parents['profile'] is an ArgumentParser with profile connection arguments
+            # self.host.parents['profile'] is an ArgumentParser with profile connection
+            # arguments
             if need_connect is None:
                 need_connect = True
-            parents.add(self.host.parents['profile' if need_connect else 'profile_session'])
+            parents.add(
+                self.host.parents['profile' if need_connect else 'profile_session'])
         else:
             assert need_connect is None
         self.need_connect = need_connect
@@ -838,7 +953,8 @@
             choices = set(self.host.getOutputChoices(use_output))
             choices.update(extra_outputs)
             if not choices:
-                raise exceptions.InternalError("No choice found for {} output type".format(use_output))
+                raise exceptions.InternalError(
+                    "No choice found for {} output type".format(use_output))
             try:
                 default = self.host.default_output[use_output]
             except KeyError:
@@ -848,8 +964,12 @@
                     default = 'simple'
                 else:
                     default = list(choices)[0]
-            output_parent.add_argument('--output', '-O', choices=sorted(choices), default=default, help=_("select output format (default: {})".format(default)))
-            output_parent.add_argument('--output-option', '--oo', action="append", dest='output_opts', default=[], help=_("output specific option"))
+            output_parent.add_argument(
+                '--output', '-O', choices=sorted(choices), default=default,
+                help=_("select output format (default: {})".format(default)))
+            output_parent.add_argument(
+                '--output-option', '--oo', action="append", dest='output_opts',
+                default=[], help=_("output specific option"))
             parents.add(output_parent)
         else:
             assert extra_outputs is None
@@ -873,7 +993,7 @@
 
         self.parser = host.subparsers.add_parser(name, help=help, **kwargs)
         if hasattr(self, "subcommands"):
-            self.subparsers = self.parser.add_subparsers()
+            self.subparsers = self.parser.add_subparsers(dest='subcommand', required=True)
         else:
             self.parser.set_defaults(_cmd=self)
         self.add_parser_options()
@@ -894,11 +1014,10 @@
     def progress_id(self):
         return self.host.progress_id
 
-    @progress_id.setter
-    def progress_id(self, value):
-        self.host.progress_id = value
+    async def set_progress_id(self, progress_id):
+        return await self.host.set_progress_id(progress_id)
 
-    def progressStartedHandler(self, uid, metadata, profile):
+    async def progressStartedHandler(self, uid, metadata, profile):
         if profile != self.profile:
             return
         if self.progress_id is None:
@@ -907,15 +1026,20 @@
             # when the progress_id is received
             cache_data = (self.progressStartedHandler, uid, metadata, profile)
             try:
-                self.host.progress_ids_cache.append(cache_data)
+                cache = self.host.progress_ids_cache
             except AttributeError:
-                self.host.progress_ids_cache = [cache_data]
+                cache = self.host.progress_ids_cache = []
+            cache.append(cache_data)
         else:
             if self.host.watch_progress and uid == self.progress_id:
-                self.onProgressStarted(metadata)
-                self.host.loop.call_later(PROGRESS_DELAY, self.progressUpdate)
+                await self.onProgressStarted(metadata)
+                while True:
+                    await asyncio.sleep(PROGRESS_DELAY)
+                    cont = await self.progressUpdate()
+                    if not cont:
+                        break
 
-    def progressFinishedHandler(self, uid, metadata, profile):
+    async def progressFinishedHandler(self, uid, metadata, profile):
         if profile != self.profile:
             return
         if uid == self.progress_id:
@@ -923,39 +1047,58 @@
                 self.host.pbar.finish()
             except AttributeError:
                 pass
-            self.onProgressFinished(metadata)
+            await self.onProgressFinished(metadata)
             if self.host.quit_on_progress_end:
                 self.host.quitFromSignal()
 
-    def progressErrorHandler(self, uid, message, profile):
+    async def progressErrorHandler(self, uid, message, profile):
         if profile != self.profile:
             return
         if uid == self.progress_id:
             if self.args.progress:
                 self.disp('') # progress is not finished, so we skip a line
             if self.host.quit_on_progress_end:
-                self.onProgressError(message)
-                self.host.quitFromSignal(1)
+                await self.onProgressError(message)
+                self.host.quitFromSignal(C.EXIT_ERROR)
 
-    def progressUpdate(self):
-        """This method is continualy called to update the progress bar"""
-        data = self.host.bridge.progressGet(self.progress_id, self.profile)
+    async def progressUpdate(self):
+        """This method is continualy called to update the progress bar
+
+        @return (bool): False to stop being called
+        """
+        data = await self.host.bridge.progressGet(self.progress_id, self.profile)
         if data:
             try:
                 size = data['size']
             except KeyError:
-                self.disp(_("file size is not known, we can't show a progress bar"), 1, error=True)
+                self.disp(_("file size is not known, we can't show a progress bar"), 1,
+                          error=True)
                 return False
             if self.host.pbar is None:
                 #first answer, we must construct the bar
-                self.host.pbar = progressbar.ProgressBar(max_value=int(size),
-                                                         widgets=[_("Progress: "),progressbar.Percentage(),
-                                                         " ",
-                                                         progressbar.Bar(),
-                                                         " ",
-                                                         progressbar.FileTransferSpeed(),
-                                                         " ",
-                                                         progressbar.ETA()])
+
+                # if the instance has a pbar_template attribute, it is used has model,
+                # else default one is used
+                # template is a list of part, where part can be either a str to show directly
+                # or a list where first argument is a name of a progressbar widget, and others
+                # are used as widget arguments
+                try:
+                    template = self.pbar_template
+                except AttributeError:
+                    template = [
+                        _("Progress: "), ["Percentage"], " ", ["Bar"], " ",
+                        ["FileTransferSpeed"], " ", ["ETA"]
+                    ]
+
+                widgets = []
+                for part in template:
+                    if isinstance(part, str):
+                        widgets.append(part)
+                    else:
+                        widget = getattr(progressbar, part.pop(0))
+                        widgets.append(widget(*part))
+
+                self.host.pbar = progressbar.ProgressBar(max_value=int(size), widgets=widgets)
                 self.host.pbar.start()
 
             self.host.pbar.update(int(data['position']))
@@ -963,11 +1106,11 @@
         elif self.host.pbar is not None:
             return False
 
-        self.onProgressUpdate(data)
+        await self.onProgressUpdate(data)
 
         return True
 
-    def onProgressStarted(self, metadata):
+    async def onProgressStarted(self, metadata):
         """Called when progress has just started
 
         can be overidden by a command
@@ -975,7 +1118,7 @@
         """
         self.disp(_("Operation started"), 2)
 
-    def onProgressUpdate(self, metadata):
+    async def onProgressUpdate(self, metadata):
         """Method called on each progress updata
 
         can be overidden by a command to handle progress metadata
@@ -983,7 +1126,7 @@
         """
         pass
 
-    def onProgressFinished(self, metadata):
+    async def onProgressFinished(self, metadata):
         """Called when progress has just finished
 
         can be overidden by a command
@@ -991,12 +1134,12 @@
         """
         self.disp(_("Operation successfully finished"), 2)
 
-    def onProgressError(self, error_msg):
+    async def onProgressError(self, e):
         """Called when a progress failed
 
         @param error_msg(unicode): error message as sent by bridge.progressError
         """
-        self.disp(_("Error while doing operation: {}").format(error_msg), error=True)
+        self.disp(_(f"Error while doing operation: {e}"), error=True)
 
     def disp(self, msg, verbosity=0, error=False, no_lf=False):
         return self.host.disp(msg, verbosity, error, no_lf)
@@ -1005,33 +1148,10 @@
         try:
             output_type = self._output_type
         except AttributeError:
-            raise exceptions.InternalError(_('trying to use output when use_output has not been set'))
+            raise exceptions.InternalError(
+                _('trying to use output when use_output has not been set'))
         return self.host.output(output_type, self.args.output, self.extra_outputs, data)
 
-    def exitCb(self, msg=None):
-        """generic callback for success
-
-        optionally print a message, and quit
-        msg(None, unicode): if not None, print this message
-        """
-        if msg is not None:
-            self.disp(msg)
-        self.host.quit(C.EXIT_OK)
-
-    def errback(self, failure_, msg=None, exit_code=C.EXIT_ERROR):
-        """generic callback for errbacks
-
-        display failure_ then quit with generic error
-        @param failure_: arguments returned by errback
-        @param msg(unicode, None): message template
-            use {} if you want to display failure message
-        @param exit_code(int): shell exit code
-        """
-        if msg is None:
-            msg = _("error: {}")
-        self.disp(msg.format(failure_), error=True)
-        self.host.quit(exit_code)
-
     def getPubsubExtra(self, extra=None):
         """Helper method to compute extra data from pubsub arguments
 
@@ -1090,7 +1210,7 @@
         for cls in subcommands:
             cls(self)
 
-    def run(self):
+    async def run(self):
         """this method is called when a command is actually run
 
         It set stuff like progression callbacks and profile connection
@@ -1098,9 +1218,6 @@
         """
         # we keep a reference to run command, it may be useful e.g. for outputs
         self.host.command = self
-        # host._need_loop is set here from our current value and not before
-        # as the need_loop decision must be taken only by then running command
-        self.host._need_loop = self.need_loop
 
         try:
             show_progress = self.args.progress
@@ -1110,34 +1227,25 @@
         else:
             if show_progress:
                 self.host.watch_progress = True
-            # we need to register the following signal even if we don't display the progress bar
-            self.host.bridge.register_signal("progressStarted", self.progressStartedHandler)
-            self.host.bridge.register_signal("progressFinished", self.progressFinishedHandler)
-            self.host.bridge.register_signal("progressError", self.progressErrorHandler)
+            # we need to register the following signal even if we don't display the
+            # progress bar
+            self.host.bridge.register_signal(
+                "progressStarted", self.progressStartedHandler)
+            self.host.bridge.register_signal(
+                "progressFinished", self.progressFinishedHandler)
+            self.host.bridge.register_signal(
+                "progressError", self.progressErrorHandler)
 
         if self.need_connect is not None:
-             self.host.connect_profile(self.connected)
-        else:
-            self.start()
-
-    def connected(self):
-        """this method is called when profile is connected (or session is started)
+            await self.host.connect_profile()
+        await self.start()
 
-        this method is only called when use_profile is True
-        most of time you should override self.start instead of this method, but if loop
-        if not always needed depending on your arguments, you may override this method,
-        but don't forget to call the parent one (i.e. this one) after self.need_loop is set
-        """
-        if not self.need_loop:
-            self.host.stop_loop()
-        self.start()
-
-    def start(self):
-        """This is the starting point of the command, this method should be overriden
+    async def start(self):
+        """This is the starting point of the command, this method must be overriden
 
         at this point, profile are connected if needed
         """
-        pass
+        raise NotImplementedError
 
 
 class CommandAnswering(CommandBase):
@@ -1152,9 +1260,8 @@
 
     def __init__(self, *args, **kwargs):
         super(CommandAnswering, self).__init__(*args, **kwargs)
-        self.need_loop = True
 
-    def onActionNew(self, action_data, action_id, security_limit, profile):
+    async def onActionNew(self, action_data, action_id, security_limit, profile):
         if profile != self.profile:
             return
         try:
@@ -1172,7 +1279,7 @@
             except KeyError:
                 pass
             else:
-                callback(action_data, action_id, security_limit, profile)
+                await callback(action_data, action_id, security_limit, profile)
 
     def onXMLUI(self, xml_ui):
         """Display a dialog received from the backend.
@@ -1187,11 +1294,9 @@
         if dialog is not None:
             self.disp(dialog.findtext("message"), error=dialog.get("level") == "error")
 
-    def connected(self):
-        """Auto reply to confirmations requests"""
-        self.need_loop = True
-        super(CommandAnswering, self).connected()
+    async def start_answering(self):
+        """Auto reply to confirmation requests"""
         self.host.bridge.register_signal("actionNew", self.onActionNew)
-        actions = self.host.bridge.actionsGet(self.profile)
+        actions = await self.host.bridge.actionsGet(self.profile)
         for action_data, action_id, security_limit in actions:
-            self.onActionNew(action_data, action_id, security_limit, self.profile)
+            await self.onActionNew(action_data, action_id, security_limit, self.profile)