Mercurial > libervia-backend
diff sat_frontends/jp/base.py @ 3040:fee60f17ebac
jp: jp asyncio port:
/!\ this commit is huge. Jp is temporarily not working with `dbus` bridge /!\
This patch implements the port of jp to asyncio, so it is now correctly using the bridge
asynchronously, and it can be used with bridges like `pb`. This also simplify the code,
notably for things which were previously implemented with many callbacks (like pagination
with RSM).
During the process, some behaviours have been modified/fixed, in jp and backends, check
diff for details.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 25 Sep 2019 08:56:41 +0200 |
parents | ab2696e34d29 |
children | 3df611adb598 |
line wrap: on
line diff
--- a/sat_frontends/jp/base.py Wed Sep 25 08:53:38 2019 +0200 +++ b/sat_frontends/jp/base.py Wed Sep 25 08:56:41 2019 +0200 @@ -17,18 +17,21 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import asyncio from sat.core.i18n import _ ### logging ### import logging as log -log.basicConfig(level=log.DEBUG, - format='%(message)s') +log.basicConfig(level=log.WARNING, + format='[%(name)s] %(message)s') ### import sys -import locale +import os import os.path import argparse +import inspect +from pathlib import Path from glob import iglob from importlib import import_module from sat_frontends.tools.jid import JID @@ -41,13 +44,20 @@ from sat_frontends.jp.constants import Const as C from sat_frontends.tools import misc import xml.etree.ElementTree as ET # FIXME: used temporarily to manage XMLUI -import shlex from collections import OrderedDict ## bridge handling # we get bridge name from conf and initialise the right class accordingly main_config = config.parseMainConf() bridge_name = config.getConfig(main_config, '', 'bridge', 'dbus') +USER_INTER_MSG = _("User interruption: good bye") + + +class QuitException(BaseException): + """Quitting is requested + + This is used to stop execution when host.quit() is called + """ # TODO: move loops handling in a separated module @@ -63,8 +73,9 @@ def run(self): self.loop.run() - def quit(self): + def quit(self, exit_code): self.loop.quit() + sys.exit(exit_code) def call_later(self, delay, callback, *args): """call a callback repeatedly @@ -78,27 +89,66 @@ GLib.timeout_add(delay, callback, *args) else: - print("can't start jp: only D-Bus bridge is currently handled") - sys.exit(C.EXIT_ERROR) - # FIXME: twisted loop can be used when jp can handle fully async bridges - # from twisted.internet import reactor + import signal + from twisted.internet import asyncioreactor + asyncioreactor.install() + from twisted.internet import reactor, defer + + class JPLoop(object): - # class JPLoop(object): + def __init__(self): + # exit code must be set when using quit, so if it's not set + # something got wrong and we must report it + self._exit_code = C.EXIT_INTERNAL_ERROR - # def run(self): - # reactor.run() + def run(self, jp, *args): + self.jp = jp + signal.signal(signal.SIGINT, self._on_sigint) + defer.ensureDeferred(self._start(jp, *args)) + try: + reactor.run(installSignalHandlers=False) + except SystemExit as e: + self._exit_code = e.code + sys.exit(self._exit_code) - # def quit(self): - # reactor.stop() + async def _start(self, jp, *args): + fut = asyncio.ensure_future(jp.main(*args)) + try: + await defer.Deferred.fromFuture(fut) + except BaseException: + import traceback + traceback.print_exc() + jp.quit(1) + + def quit(self, exit_code): + self._exit_code = exit_code + reactor.stop() - # def _timeout_cb(self, args, callback, delay): - # ret = callback(*args) - # if ret: - # reactor.callLater(delay, self._timeout_cb, args, callback, delay) + def _timeout_cb(self, args, callback, delay): + try: + ret = callback(*args) + # FIXME: temporary hack to avoid traceback when using XMLUI + # to be removed once create_task is not used anymore in + # xmlui_manager (i.e. once sat_frontends.tools.xmlui fully supports + # async syntax) + except QuitException: + return + if ret: + reactor.callLater(delay, self._timeout_cb, args, callback, delay) - # def call_later(self, delay, callback, *args): - # delay = float(delay) / 1000 - # reactor.callLater(delay, self._timeout_cb, args, callback, delay) + def call_later(self, delay, callback, *args): + delay = float(delay) / 1000 + reactor.callLater(delay, self._timeout_cb, args, callback, delay) + + def _on_sigint(self, sig_number, stack_frame): + """Called on keyboard interruption + + Print user interruption message, set exit code and stop reactor + """ + print("\r" + USER_INTER_MSG) + self._exit_code = C.EXIT_USER_CANCELLED + reactor.callFromThread(reactor.stop) + if bridge_name == "embedded": from sat.core import sat_main @@ -107,9 +157,10 @@ try: import progressbar except ImportError: - msg = (_('ProgressBar not available, please download it at http://pypi.python.org/pypi/progressbar\n') + - _('Progress bar deactivated\n--\n')) - print(msg.encode('utf-8'), file=sys.stderr) + msg = (_('ProgressBar not available, please download it at ' + 'http://pypi.python.org/pypi/progressbar\n' + 'Progress bar deactivated\n--\n')) + print(msg, file=sys.stderr) progressbar=None #consts @@ -122,7 +173,7 @@ This is free software, and you are welcome to redistribute it under certain conditions. """ -PROGRESS_DELAY = 10 # the progression will be checked every PROGRESS_DELAY ms +PROGRESS_DELAY = 0.1 # the progression will be checked every PROGRESS_DELAY s def date_decoder(arg): @@ -141,33 +192,30 @@ def __init__(self): """ - @attribute quit_on_progress_end (bool): set to False if you manage yourself exiting, - or if you want the user to stop by himself + @attribute quit_on_progress_end (bool): set to False if you manage yourself + exiting, or if you want the user to stop by himself @attribute progress_success(callable): method to call when progress just started by default display a message - @attribute progress_success(callable): method to call when progress is successfully finished - by default display a message + @attribute progress_success(callable): method to call when progress is + successfully finished by default display a message @attribute progress_failure(callable): method to call when progress failed by default display a message """ - # FIXME: need_loop should be removed, everything must be async in bridge so - # loop will always be needed bridge_module = dynamic_import.bridge(bridge_name, 'sat_frontends.bridge') if bridge_module is None: log.error("Can't import {} bridge".format(bridge_name)) sys.exit(1) - self.bridge = bridge_module.Bridge() - self.bridge.bridgeConnect(callback=self._bridgeCb, errback=self._bridgeEb) + self.bridge = bridge_module.AIOBridge() + self._onQuitCallbacks = [] - def _bridgeCb(self): - self.parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter, - description=DESCRIPTION) + def _bridgeConnected(self): + self.parser = argparse.ArgumentParser( + formatter_class=argparse.RawDescriptionHelpFormatter, description=DESCRIPTION) self._make_parents() self.add_parser_options() - self.subparsers = self.parser.add_subparsers(title=_('Available commands'), dest='subparser_name') - self._auto_loop = False # when loop is used for internal reasons - self._need_loop = False + self.subparsers = self.parser.add_subparsers( + title=_('Available commands'), dest='command', required=True) # progress attributes self._progress_id = None # TODO: manage several progress ids @@ -181,27 +229,14 @@ self.own_jid = None # must be filled at runtime if needed - def _bridgeEb(self, failure): - if isinstance(failure, exceptions.BridgeExceptionNoService): - print((_("Can't connect to SàT backend, are you sure it's launched ?"))) - elif isinstance(failure, exceptions.BridgeInitError): - print((_("Can't init bridge"))) - else: - print((_("Error while initialising bridge: {}".format(failure)))) - sys.exit(C.EXIT_BRIDGE_ERROR) - - @property - def version(self): - return self.bridge.getVersion() - @property def progress_id(self): return self._progress_id - @progress_id.setter - def progress_id(self, value): - self._progress_id = value - self.replayCache('progress_ids_cache') + async def set_progress_id(self, progress_id): + # because we use async, we need an explicit setter + self._progress_id = progress_id + await self.replayCache('progress_ids_cache') @property def watch_progress(self): @@ -224,13 +259,13 @@ except AttributeError: return 0 - def replayCache(self, cache_attribute): + async def replayCache(self, cache_attribute): """Replay cached signals @param cache_attribute(str): name of the attribute containing the cache if the attribute doesn't exist, there is no cache and the call is ignored - else the cache must be a list of tuples containing the replay callback as first item, - then the arguments to use + else the cache must be a list of tuples containing the replay callback as + first item, then the arguments to use """ try: cache = getattr(self, cache_attribute) @@ -238,7 +273,7 @@ pass else: for cache_data in cache: - cache_data[0](*cache_data[1:]) + await cache_data[0](*cache_data[1:]) def disp(self, msg, verbosity=0, error=False, no_lf=False): """Print a message to user @@ -260,23 +295,22 @@ else: print(msg) - def output(self, type_, name, extra_outputs, data): + async def output(self, type_, name, extra_outputs, data): if name in extra_outputs: - extra_outputs[name](data) + method = extra_outputs[name] else: - self._outputs[type_][name]['callback'](data) + method = self._outputs[type_][name]['callback'] + + ret = method(data) + if inspect.isawaitable(ret): + await ret def addOnQuitCallback(self, callback, *args, **kwargs): """Add a callback which will be called on quit command @param callback(callback): method to call """ - try: - callbacks_list = self._onQuitCallbacks - except AttributeError: - callbacks_list = self._onQuitCallbacks = [] - finally: - callbacks_list.append((callback, args, kwargs)) + self._onQuitCallbacks.append((callback, args, kwargs)) def getOutputChoices(self, output_type): """Return valid output filters for output_type @@ -289,33 +323,52 @@ def _make_parents(self): self.parents = {} - # we have a special case here as the start-session option is present only if connection is not needed, - # so we create two similar parents, one with the option, the other one without it + # we have a special case here as the start-session option is present only if + # connection is not needed, so we create two similar parents, one with the + # option, the other one without it for parent_name in ('profile', 'profile_session'): parent = self.parents[parent_name] = argparse.ArgumentParser(add_help=False) - parent.add_argument("-p", "--profile", action="store", type=str, default='@DEFAULT@', help=_("Use PROFILE profile key (default: %(default)s)")) - parent.add_argument("--pwd", action="store", default='', metavar='PASSWORD', help=_("Password used to connect profile, if necessary")) + parent.add_argument( + "-p", "--profile", action="store", type=str, default='@DEFAULT@', + help=_("Use PROFILE profile key (default: %(default)s)")) + parent.add_argument( + "--pwd", action="store", default='', metavar='PASSWORD', + help=_("Password used to connect profile, if necessary")) - profile_parent, profile_session_parent = self.parents['profile'], self.parents['profile_session'] + profile_parent, profile_session_parent = (self.parents['profile'], + self.parents['profile_session']) - connect_short, connect_long, connect_action, connect_help = "-c", "--connect", "store_true", _("Connect the profile before doing anything else") - profile_parent.add_argument(connect_short, connect_long, action=connect_action, help=connect_help) + connect_short, connect_long, connect_action, connect_help = ( + "-c", "--connect", "store_true", + _("Connect the profile before doing anything else") + ) + profile_parent.add_argument( + connect_short, connect_long, action=connect_action, help=connect_help) profile_session_connect_group = profile_session_parent.add_mutually_exclusive_group() - profile_session_connect_group.add_argument(connect_short, connect_long, action=connect_action, help=connect_help) - profile_session_connect_group.add_argument("--start-session", action="store_true", help=_("Start a profile session without connecting")) + profile_session_connect_group.add_argument( + connect_short, connect_long, action=connect_action, help=connect_help) + profile_session_connect_group.add_argument( + "--start-session", action="store_true", + help=_("Start a profile session without connecting")) - progress_parent = self.parents['progress'] = argparse.ArgumentParser(add_help=False) + progress_parent = self.parents['progress'] = argparse.ArgumentParser( + add_help=False) if progressbar: - progress_parent.add_argument("-P", "--progress", action="store_true", help=_("Show progress bar")) + progress_parent.add_argument( + "-P", "--progress", action="store_true", help=_("Show progress bar")) verbose_parent = self.parents['verbose'] = argparse.ArgumentParser(add_help=False) - verbose_parent.add_argument('--verbose', '-v', action='count', default=0, help=_("Add a verbosity level (can be used multiple times)")) + verbose_parent.add_argument( + '--verbose', '-v', action='count', default=0, + help=_("Add a verbosity level (can be used multiple times)")) draft_parent = self.parents['draft'] = argparse.ArgumentParser(add_help=False) draft_group = draft_parent.add_argument_group(_('draft handling')) - draft_group.add_argument("-D", "--current", action="store_true", help=_("load current draft")) - draft_group.add_argument("-F", "--draft-path", help=_("path to a draft file to retrieve")) + draft_group.add_argument( + "-D", "--current", action="store_true", help=_("load current draft")) + draft_group.add_argument( + "-F", "--draft-path", type=Path, help=_("path to a draft file to retrieve")) def make_pubsub_group(self, flags, defaults): @@ -356,11 +409,14 @@ item_help += _(" (DEFAULT: {default})".format(default=default)) pubsub_group.add_argument("-i", "--item", default='', help=item_help) - pubsub_group.add_argument("-L", "--last-item", action='store_true', help=_('retrieve last item')) + pubsub_group.add_argument( + "-L", "--last-item", action='store_true', help=_('retrieve last item')) elif flags.multi_items: # mutiple items, this activate several features: max-items, RSM, MAM # and Orbder-by - pubsub_group.add_argument("-i", "--item", action='append', dest='items', default=[], help=_("items to retrieve (DEFAULT: all)")) + pubsub_group.add_argument( + "-i", "--item", action='append', dest='items', default=[], + help=_("items to retrieve (DEFAULT: all)")) if not flags.no_max: max_group = pubsub_group.add_mutually_exclusive_group() # XXX: defaut value for --max-items or --max is set in parse_pubsub_args @@ -409,14 +465,22 @@ help=_("how items should be ordered")) if not flags.all_used: - raise exceptions.InternalError('unknown flags: {flags}'.format(flags=', '.join(flags.unused))) + raise exceptions.InternalError('unknown flags: {flags}'.format( + flags=', '.join(flags.unused))) if defaults: - raise exceptions.InternalError('unused defaults: {defaults}'.format(defaults=defaults)) + raise exceptions.InternalError(f'unused defaults: {defaults}') return parent def add_parser_options(self): - self.parser.add_argument('--version', action='version', version=("%(name)s %(version)s %(copyleft)s" % {'name': PROG_NAME, 'version': self.version, 'copyleft': COPYLEFT})) + self.parser.add_argument( + '--version', + action='version', + version=("{name} {version} {copyleft}".format( + name = PROG_NAME, + version = self.version, + copyleft = COPYLEFT)) + ) def register_output(self, type_, name, callback, description="", default=False): if type_ not in C.OUTPUT_TYPES: @@ -427,7 +491,9 @@ } if default: if type_ in self.default_output: - self.disp(_('there is already a default output for {}, ignoring new one').format(type_)) + self.disp( + _(f'there is already a default output for {type_}, ignoring new one') + ) else: self.default_output[type_] = name @@ -445,7 +511,8 @@ def check_output_options(self, accepted_set, options): if not accepted_set.issuperset(options): - self.disp("The following output options are invalid: {invalid_options}".format( + self.disp( + _("The following output options are invalid: {invalid_options}").format( invalid_options = ', '.join(set(options).difference(accepted_set))), error=True) self.quit(C.EXIT_BAD_ARG) @@ -457,17 +524,20 @@ """ path = os.path.dirname(sat_frontends.jp.__file__) # XXX: outputs must be imported before commands as they are used for arguments - for type_, pattern in ((C.PLUGIN_OUTPUT, 'output_*.py'), (C.PLUGIN_CMD, 'cmd_*.py')): - modules = (os.path.splitext(module)[0] for module in map(os.path.basename, iglob(os.path.join(path, pattern)))) + for type_, pattern in ((C.PLUGIN_OUTPUT, 'output_*.py'), + (C.PLUGIN_CMD, 'cmd_*.py')): + modules = ( + os.path.splitext(module)[0] + for module in map(os.path.basename, iglob(os.path.join(path, pattern)))) for module_name in modules: module_path = "sat_frontends.jp." + module_name try: module = import_module(module_path) self.import_plugin_module(module, type_) except ImportError as e: - self.disp(_("Can't import {module_path} plugin, ignoring it: {msg}".format( - module_path = module_path, - msg = e)), error=True) + self.disp( + _(f"Can't import {module_path} plugin, ignoring it: {e}"), + error=True) except exceptions.CancelError: continue except exceptions.MissingModule as e: @@ -485,7 +555,7 @@ try: class_names = getattr(module, '__{}__'.format(type_)) except AttributeError: - log.disp(_("Invalid plugin module [{type}] {module}").format(type=type_, module=module), error=True) + log.disp(_(f"Invalid plugin module [{type_}] {module}"), error=True) raise ImportError else: for class_name in class_names: @@ -500,12 +570,15 @@ scheme = 'http' else: raise exceptions.InternalError('An HTTP scheme is expected in this method') - self.disp("{scheme} URL found, trying to find associated xmpp: URI".format(scheme=scheme.upper()),1) + self.disp(f"{scheme.upper()} URL found, trying to find associated xmpp: URI", 1) # HTTP URL, we try to find xmpp: links try: from lxml import etree except ImportError: - self.disp("lxml module must be installed to use http(s) scheme, please install it with \"pip install lxml\"", error=True) + self.disp( + "lxml module must be installed to use http(s) scheme, please install it " + "with \"pip install lxml\"", + error=True) self.quit(1) import urllib.request, urllib.error, urllib.parse parser = etree.HTMLParser() @@ -517,7 +590,10 @@ else: links = root.xpath("//link[@rel='alternate' and starts-with(@href, 'xmpp:')]") if not links: - self.disp('Could not find alternate "xmpp:" URI, can\'t find associated XMPP PubSub node/item', error=True) + self.disp( + _('Could not find alternate "xmpp:" URI, can\'t find associated XMPP ' + 'PubSub node/item'), + error=True) self.quit(1) xmpp_uri = links[0].get('href') return xmpp_uri @@ -552,7 +628,10 @@ try: items = self.args.items except AttributeError: - self.disp(_("item specified in URL but not needed in command, ignoring it"), error=True) + self.disp( + _("item specified in URL but not needed in command, " + "ignoring it"), + error=True) else: if not items: self.args.items = [uri_item] @@ -565,7 +644,7 @@ if not item_last: self.args.item = uri_item else: - self.parser.error(_('XMPP URL is not a pubsub one: {url}').format(url=url)) + self.parser.error(_(f'XMPP URL is not a pubsub one: {url}')) flags = self.args._cmd._pubsub_flags # we check required arguments here instead of using add_arguments' required option # because the required argument can be set in URL @@ -580,7 +659,8 @@ # so we check conflict here. This may be fixed in Python 3, to be checked try: if self.args.item and self.args.item_last: - self.parser.error(_("--item and --item-last can't be used at the same time")) + self.parser.error( + _("--item and --item-last can't be used at the same time")) except AttributeError: pass @@ -605,46 +685,78 @@ if self.args.max is None: self.args.max = C.NO_LIMIT - def run(self, args=None, namespace=None): - self.args = self.parser.parse_args(args, namespace=None) - if self.args._cmd._use_pubsub: - self.parse_pubsub_args() + async def main(self, args, namespace): try: - self.args._cmd.run() - if self._need_loop or self._auto_loop: - self._start_loop() - except KeyboardInterrupt: - self.disp(_("User interruption: good bye")) + await self.bridge.bridgeConnect() + except Exception as e: + if isinstance(e, exceptions.BridgeExceptionNoService): + print((_("Can't connect to SàT backend, are you sure it's launched ?"))) + elif isinstance(e, exceptions.BridgeInitError): + print((_("Can't init bridge"))) + else: + print((_(f"Error while initialising bridge: {e}"))) + self.quit(C.EXIT_BRIDGE_ERROR, raise_exc=False) + return + self.version = await self.bridge.getVersion() + self._bridgeConnected() + self.import_plugins() + try: + self.args = self.parser.parse_args(args, namespace=None) + if self.args._cmd._use_pubsub: + self.parse_pubsub_args() + await self.args._cmd.run() + except SystemExit as e: + self.quit(e.code, raise_exc=False) + return + except QuitException: + return - def _start_loop(self): + def run(self, args=None, namespace=None): self.loop = JPLoop() - self.loop.run() + self.loop.run(self, args, namespace) - def stop_loop(self): - try: - self.loop.quit() - except AttributeError: - pass + def _read_stdin(self, stdin_fut): + """Callback called by ainput to read stdin""" + line = sys.stdin.readline() + if line: + stdin_fut.set_result(line.rstrip(os.linesep)) + else: + stdin_fut.set_exception(EOFError()) - def confirmOrQuit(self, message, cancel_message=_("action cancelled by user")): + async def ainput(self, msg=''): + """Asynchronous version of buildin "input" function""" + self.disp(msg, no_lf=True) + sys.stdout.flush() + loop = asyncio.get_running_loop() + stdin_fut = loop.create_future() + loop.add_reader(sys.stdin, self._read_stdin, stdin_fut) + return await stdin_fut + + async def confirmOrQuit(self, message, cancel_message=_("action cancelled by user")): """Request user to confirm action, and quit if he doesn't""" - res = input("{} (y/N)? ".format(message)) + res = await self.ainput(f"{message} (y/N)? ") if res not in ("y", "Y"): self.disp(cancel_message) self.quit(C.EXIT_USER_CANCELLED) - def quitFromSignal(self, errcode=0): - """Same as self.quit, but from a signal handler + def quitFromSignal(self, exit_code=0): + r"""Same as self.quit, but from a signal handler /!\: return must be used after calling this method ! """ - assert self._need_loop # XXX: python-dbus will show a traceback if we exit in a signal handler # so we use this little timeout trick to avoid it - self.loop.call_later(0, self.quit, errcode) + self.loop.call_later(0, self.quit, exit_code) + + def quit(self, exit_code=0, raise_exc=True): + """Terminate the execution with specified exit_code - def quit(self, errcode=0): + This will stop the loop. + @param exit_code(int): code to return when quitting the program + @param raise_exp(boolean): if True raise a QuitException to stop code execution + The default value should be used most of time. + """ # first the onQuitCallbacks try: callbacks_list = self._onQuitCallbacks @@ -654,10 +766,11 @@ for callback, args, kwargs in callbacks_list: callback(*args, **kwargs) - self.stop_loop() - sys.exit(errcode) + self.loop.quit(exit_code) + if raise_exc: + raise QuitException - def check_jids(self, jids): + async def check_jids(self, jids): """Check jids validity, transform roster name to corresponding jids @param profile: profile name @@ -668,7 +781,7 @@ names2jid = {} nodes2jid = {} - for contact in self.bridge.getContacts(self.profile): + for contact in await self.bridge.getContacts(self.profile): jid_s, attr, groups = contact _jid = JID(jid_s) try: @@ -704,29 +817,20 @@ return dest_jids - def connect_profile(self, callback): - """ Check if the profile is connected and do it if requested + async def connect_profile(self): + """Check if the profile is connected and do it if requested - @param callback: method to call when profile is connected @exit: - 1 when profile is not connected and --connect is not set - 1 when the profile doesn't exists - 1 when there is a connection error """ # FIXME: need better exit codes - def cant_connect(failure): - log.error(_("Can't connect profile: {reason}").format(reason=failure)) - self.quit(1) - - def cant_start_session(failure): - log.error(_("Can't start {profile}'s session: {reason}").format(profile=self.profile, reason=failure)) - self.quit(1) - - self.profile = self.bridge.profileNameGet(self.args.profile) + self.profile = await self.bridge.profileNameGet(self.args.profile) if not self.profile: - log.error(_("The profile [{profile}] doesn't exist").format(profile=self.args.profile)) - self.quit(1) + log.error(_(f"The profile [{self.args.profile}] doesn't exist")) + self.quit(C.EXIT_ERROR) try: start_session = self.args.start_session @@ -734,40 +838,49 @@ pass else: if start_session: - self.bridge.profileStartSession(self.args.pwd, self.profile, lambda __: callback(), cant_start_session) - self._auto_loop = True + try: + await self.bridge.profileStartSession(self.args.pwd, self.profile) + except Exception as e: + self.disp(_(f"Can't start {self.profile}'s session: {e}"), err=True) + self.quit(1) return - elif not self.bridge.profileIsSessionStarted(self.profile): + elif not await self.bridge.profileIsSessionStarted(self.profile): if not self.args.connect: - log.error(_("Session for [{profile}] is not started, please start it before using jp, or use either --start-session or --connect option").format(profile=self.profile)) + self.disp(_( + f"Session for [{self.profile}] is not started, please start it " + f"before using jp, or use either --start-session or --connect " + f"option"), error=True) self.quit(1) elif not getattr(self.args, "connect", False): - callback() return if not hasattr(self.args, 'connect'): - # a profile can be present without connect option (e.g. on profile creation/deletion) + # a profile can be present without connect option (e.g. on profile + # creation/deletion) return elif self.args.connect is True: # if connection is asked, we connect the profile - self.bridge.connect(self.profile, self.args.pwd, {}, lambda __: callback(), cant_connect) - self._auto_loop = True + try: + await self.bridge.connect(self.profile, self.args.pwd, {}) + except Exception as e: + self.disp(_(f"Can't connect profile: {e}"), error=True) + self.quit(1) return else: - if not self.bridge.isConnected(self.profile): - log.error(_("Profile [{profile}] is not connected, please connect it before using jp, or use --connect option").format(profile=self.profile)) + if not await self.bridge.isConnected(self.profile): + log.error( + _(f"Profile [{self.profile}] is not connected, please connect it " + f"before using jp, or use --connect option")) self.quit(1) - callback() - - def get_full_jid(self, param_jid): + async def get_full_jid(self, param_jid): """Return the full jid if possible (add main resource when find a bare jid)""" _jid = JID(param_jid) if not _jid.resource: #if the resource is not given, we try to add the main resource - main_resource = self.bridge.getMainResource(param_jid, self.profile) + main_resource = await self.bridge.getMainResource(param_jid, self.profile) if main_resource: - return "%s/%s" % (_jid.bare, main_resource) + return f"{_jid.bare}/{main_resource}" return param_jid @@ -783,7 +896,8 @@ @param use_output(bool, unicode): if not False, add --output option @param extra_outputs(dict): list of command specific outputs: key is output name ("default" to use as main output) - value is a callable which will format the output (data will be used as only argument) + value is a callable which will format the output (data will be used as only + argument) if a key already exists with normal outputs, the extra one will be used @param need_connect(bool, None): True if profile connection is needed False else (profile session must still be started) @@ -799,14 +913,13 @@ mandatory arguments are controlled by pubsub_req - use_draft(bool): if True, add draft handling options ** other arguments ** - - pubsub_flags(iterable[unicode]): tuple of flags to set pubsub options, can be: + - pubsub_flags(iterable[unicode]): tuple of flags to set pubsub options, + can be: C.SERVICE: service is required C.NODE: node is required C.ITEM: item is required C.SINGLE_ITEM: only one item is allowed - @attribute need_loop(bool): to set by commands when loop is needed """ - self.need_loop = False # to be set by commands when loop is needed try: # If we have subcommands, host is a CommandBase and we need to use host.host self.host = host.host except AttributeError: @@ -815,10 +928,12 @@ # --profile option parents = kwargs.setdefault('parents', set()) if use_profile: - #self.host.parents['profile'] is an ArgumentParser with profile connection arguments + # self.host.parents['profile'] is an ArgumentParser with profile connection + # arguments if need_connect is None: need_connect = True - parents.add(self.host.parents['profile' if need_connect else 'profile_session']) + parents.add( + self.host.parents['profile' if need_connect else 'profile_session']) else: assert need_connect is None self.need_connect = need_connect @@ -838,7 +953,8 @@ choices = set(self.host.getOutputChoices(use_output)) choices.update(extra_outputs) if not choices: - raise exceptions.InternalError("No choice found for {} output type".format(use_output)) + raise exceptions.InternalError( + "No choice found for {} output type".format(use_output)) try: default = self.host.default_output[use_output] except KeyError: @@ -848,8 +964,12 @@ default = 'simple' else: default = list(choices)[0] - output_parent.add_argument('--output', '-O', choices=sorted(choices), default=default, help=_("select output format (default: {})".format(default))) - output_parent.add_argument('--output-option', '--oo', action="append", dest='output_opts', default=[], help=_("output specific option")) + output_parent.add_argument( + '--output', '-O', choices=sorted(choices), default=default, + help=_("select output format (default: {})".format(default))) + output_parent.add_argument( + '--output-option', '--oo', action="append", dest='output_opts', + default=[], help=_("output specific option")) parents.add(output_parent) else: assert extra_outputs is None @@ -873,7 +993,7 @@ self.parser = host.subparsers.add_parser(name, help=help, **kwargs) if hasattr(self, "subcommands"): - self.subparsers = self.parser.add_subparsers() + self.subparsers = self.parser.add_subparsers(dest='subcommand', required=True) else: self.parser.set_defaults(_cmd=self) self.add_parser_options() @@ -894,11 +1014,10 @@ def progress_id(self): return self.host.progress_id - @progress_id.setter - def progress_id(self, value): - self.host.progress_id = value + async def set_progress_id(self, progress_id): + return await self.host.set_progress_id(progress_id) - def progressStartedHandler(self, uid, metadata, profile): + async def progressStartedHandler(self, uid, metadata, profile): if profile != self.profile: return if self.progress_id is None: @@ -907,15 +1026,20 @@ # when the progress_id is received cache_data = (self.progressStartedHandler, uid, metadata, profile) try: - self.host.progress_ids_cache.append(cache_data) + cache = self.host.progress_ids_cache except AttributeError: - self.host.progress_ids_cache = [cache_data] + cache = self.host.progress_ids_cache = [] + cache.append(cache_data) else: if self.host.watch_progress and uid == self.progress_id: - self.onProgressStarted(metadata) - self.host.loop.call_later(PROGRESS_DELAY, self.progressUpdate) + await self.onProgressStarted(metadata) + while True: + await asyncio.sleep(PROGRESS_DELAY) + cont = await self.progressUpdate() + if not cont: + break - def progressFinishedHandler(self, uid, metadata, profile): + async def progressFinishedHandler(self, uid, metadata, profile): if profile != self.profile: return if uid == self.progress_id: @@ -923,39 +1047,58 @@ self.host.pbar.finish() except AttributeError: pass - self.onProgressFinished(metadata) + await self.onProgressFinished(metadata) if self.host.quit_on_progress_end: self.host.quitFromSignal() - def progressErrorHandler(self, uid, message, profile): + async def progressErrorHandler(self, uid, message, profile): if profile != self.profile: return if uid == self.progress_id: if self.args.progress: self.disp('') # progress is not finished, so we skip a line if self.host.quit_on_progress_end: - self.onProgressError(message) - self.host.quitFromSignal(1) + await self.onProgressError(message) + self.host.quitFromSignal(C.EXIT_ERROR) - def progressUpdate(self): - """This method is continualy called to update the progress bar""" - data = self.host.bridge.progressGet(self.progress_id, self.profile) + async def progressUpdate(self): + """This method is continualy called to update the progress bar + + @return (bool): False to stop being called + """ + data = await self.host.bridge.progressGet(self.progress_id, self.profile) if data: try: size = data['size'] except KeyError: - self.disp(_("file size is not known, we can't show a progress bar"), 1, error=True) + self.disp(_("file size is not known, we can't show a progress bar"), 1, + error=True) return False if self.host.pbar is None: #first answer, we must construct the bar - self.host.pbar = progressbar.ProgressBar(max_value=int(size), - widgets=[_("Progress: "),progressbar.Percentage(), - " ", - progressbar.Bar(), - " ", - progressbar.FileTransferSpeed(), - " ", - progressbar.ETA()]) + + # if the instance has a pbar_template attribute, it is used has model, + # else default one is used + # template is a list of part, where part can be either a str to show directly + # or a list where first argument is a name of a progressbar widget, and others + # are used as widget arguments + try: + template = self.pbar_template + except AttributeError: + template = [ + _("Progress: "), ["Percentage"], " ", ["Bar"], " ", + ["FileTransferSpeed"], " ", ["ETA"] + ] + + widgets = [] + for part in template: + if isinstance(part, str): + widgets.append(part) + else: + widget = getattr(progressbar, part.pop(0)) + widgets.append(widget(*part)) + + self.host.pbar = progressbar.ProgressBar(max_value=int(size), widgets=widgets) self.host.pbar.start() self.host.pbar.update(int(data['position'])) @@ -963,11 +1106,11 @@ elif self.host.pbar is not None: return False - self.onProgressUpdate(data) + await self.onProgressUpdate(data) return True - def onProgressStarted(self, metadata): + async def onProgressStarted(self, metadata): """Called when progress has just started can be overidden by a command @@ -975,7 +1118,7 @@ """ self.disp(_("Operation started"), 2) - def onProgressUpdate(self, metadata): + async def onProgressUpdate(self, metadata): """Method called on each progress updata can be overidden by a command to handle progress metadata @@ -983,7 +1126,7 @@ """ pass - def onProgressFinished(self, metadata): + async def onProgressFinished(self, metadata): """Called when progress has just finished can be overidden by a command @@ -991,12 +1134,12 @@ """ self.disp(_("Operation successfully finished"), 2) - def onProgressError(self, error_msg): + async def onProgressError(self, e): """Called when a progress failed @param error_msg(unicode): error message as sent by bridge.progressError """ - self.disp(_("Error while doing operation: {}").format(error_msg), error=True) + self.disp(_(f"Error while doing operation: {e}"), error=True) def disp(self, msg, verbosity=0, error=False, no_lf=False): return self.host.disp(msg, verbosity, error, no_lf) @@ -1005,33 +1148,10 @@ try: output_type = self._output_type except AttributeError: - raise exceptions.InternalError(_('trying to use output when use_output has not been set')) + raise exceptions.InternalError( + _('trying to use output when use_output has not been set')) return self.host.output(output_type, self.args.output, self.extra_outputs, data) - def exitCb(self, msg=None): - """generic callback for success - - optionally print a message, and quit - msg(None, unicode): if not None, print this message - """ - if msg is not None: - self.disp(msg) - self.host.quit(C.EXIT_OK) - - def errback(self, failure_, msg=None, exit_code=C.EXIT_ERROR): - """generic callback for errbacks - - display failure_ then quit with generic error - @param failure_: arguments returned by errback - @param msg(unicode, None): message template - use {} if you want to display failure message - @param exit_code(int): shell exit code - """ - if msg is None: - msg = _("error: {}") - self.disp(msg.format(failure_), error=True) - self.host.quit(exit_code) - def getPubsubExtra(self, extra=None): """Helper method to compute extra data from pubsub arguments @@ -1090,7 +1210,7 @@ for cls in subcommands: cls(self) - def run(self): + async def run(self): """this method is called when a command is actually run It set stuff like progression callbacks and profile connection @@ -1098,9 +1218,6 @@ """ # we keep a reference to run command, it may be useful e.g. for outputs self.host.command = self - # host._need_loop is set here from our current value and not before - # as the need_loop decision must be taken only by then running command - self.host._need_loop = self.need_loop try: show_progress = self.args.progress @@ -1110,34 +1227,25 @@ else: if show_progress: self.host.watch_progress = True - # we need to register the following signal even if we don't display the progress bar - self.host.bridge.register_signal("progressStarted", self.progressStartedHandler) - self.host.bridge.register_signal("progressFinished", self.progressFinishedHandler) - self.host.bridge.register_signal("progressError", self.progressErrorHandler) + # we need to register the following signal even if we don't display the + # progress bar + self.host.bridge.register_signal( + "progressStarted", self.progressStartedHandler) + self.host.bridge.register_signal( + "progressFinished", self.progressFinishedHandler) + self.host.bridge.register_signal( + "progressError", self.progressErrorHandler) if self.need_connect is not None: - self.host.connect_profile(self.connected) - else: - self.start() - - def connected(self): - """this method is called when profile is connected (or session is started) + await self.host.connect_profile() + await self.start() - this method is only called when use_profile is True - most of time you should override self.start instead of this method, but if loop - if not always needed depending on your arguments, you may override this method, - but don't forget to call the parent one (i.e. this one) after self.need_loop is set - """ - if not self.need_loop: - self.host.stop_loop() - self.start() - - def start(self): - """This is the starting point of the command, this method should be overriden + async def start(self): + """This is the starting point of the command, this method must be overriden at this point, profile are connected if needed """ - pass + raise NotImplementedError class CommandAnswering(CommandBase): @@ -1152,9 +1260,8 @@ def __init__(self, *args, **kwargs): super(CommandAnswering, self).__init__(*args, **kwargs) - self.need_loop = True - def onActionNew(self, action_data, action_id, security_limit, profile): + async def onActionNew(self, action_data, action_id, security_limit, profile): if profile != self.profile: return try: @@ -1172,7 +1279,7 @@ except KeyError: pass else: - callback(action_data, action_id, security_limit, profile) + await callback(action_data, action_id, security_limit, profile) def onXMLUI(self, xml_ui): """Display a dialog received from the backend. @@ -1187,11 +1294,9 @@ if dialog is not None: self.disp(dialog.findtext("message"), error=dialog.get("level") == "error") - def connected(self): - """Auto reply to confirmations requests""" - self.need_loop = True - super(CommandAnswering, self).connected() + async def start_answering(self): + """Auto reply to confirmation requests""" self.host.bridge.register_signal("actionNew", self.onActionNew) - actions = self.host.bridge.actionsGet(self.profile) + actions = await self.host.bridge.actionsGet(self.profile) for action_data, action_id, security_limit in actions: - self.onActionNew(action_data, action_id, security_limit, self.profile) + await self.onActionNew(action_data, action_id, security_limit, self.profile)