Mercurial > libervia-backend
view libervia/cli/loops.py @ 4230:314d3c02bb67
core (xmpp): Add a timeout for messages processing to avoid blocking the queue.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 06 Apr 2024 12:21:04 +0200 |
parents | 0f8ea0768a3b |
children | 0d7bb4df2343 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia CLI # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import sys import asyncio import logging as log from libervia.backend.core.i18n import _ from libervia.cli.constants import Const as C from libervia.frontends.tools import aio log.basicConfig(level=log.WARNING, format='[%(name)s] %(message)s') USER_INTER_MSG = _("User interruption: good bye") class QuitException(BaseException): """Quitting is requested This is used to stop execution when host.quit() is called """ def get_libervia_cli_loop(bridge_name): if 'dbus' in bridge_name: import signal class LiberviaCLILoop: def __init__(self): self.loop = asyncio.get_event_loop() def run(self, libervia_cli, args, namespace): aio.install_glib_asyncio_iteration() signal.signal(signal.SIGINT, self._on_sigint) loop = self.loop loop.run_until_complete(libervia_cli.main(args=args, namespace=namespace)) loop.run_forever() def quit(self, exit_code): loop = asyncio.get_event_loop() loop.stop() sys.exit(exit_code) def call_later(self, delay, callback, *args): """call a callback repeatedly @param delay(int): delay between calls in s @param callback(callable): method to call if the callback return True, the call will continue else the calls will stop @param *args: args of the callbac """ loop = asyncio.get_event_loop() loop.call_later(delay, callback, *args) def _on_sigint(self, sig_number, stack_frame): """Called on keyboard interruption Print user interruption message, set exit code and stop reactor """ print("\r" + USER_INTER_MSG) self.quit(C.EXIT_USER_CANCELLED) else: import signal from twisted.internet import asyncioreactor asyncioreactor.install() from twisted.internet import reactor, defer class LiberviaCLILoop: def __init__(self): # exit code must be set when using quit, so if it's not set # something got wrong and we must report it self._exit_code = C.EXIT_INTERNAL_ERROR def run(self, libervia_cli, *args): self.libervia_cli = libervia_cli signal.signal(signal.SIGINT, self._on_sigint) defer.ensureDeferred(self._start(libervia_cli, *args)) try: reactor.run(installSignalHandlers=False) except SystemExit as e: self._exit_code = e.code sys.exit(self._exit_code) async def _start(self, libervia_cli, *args): fut = asyncio.ensure_future(libervia_cli.main(*args)) try: await defer.Deferred.fromFuture(fut) except BaseException: import traceback traceback.print_exc() libervia_cli.quit(1) def quit(self, exit_code): self._exit_code = exit_code reactor.stop() def _timeout_cb(self, args, callback, delay): try: ret = callback(*args) # FIXME: temporary hack to avoid traceback when using XMLUI # to be removed once create_task is not used anymore in # xmlui_manager (i.e. once libervia.frontends.tools.xmlui fully supports # async syntax) except QuitException: return if ret: reactor.callLater(delay, self._timeout_cb, args, callback, delay) def call_later(self, delay, callback, *args): reactor.callLater(delay, self._timeout_cb, args, callback, delay) def _on_sigint(self, sig_number, stack_frame): """Called on keyboard interruption Print user interruption message, set exit code and stop reactor """ print("\r" + USER_INTER_MSG) self._exit_code = C.EXIT_USER_CANCELLED reactor.callFromThread(reactor.stop) if bridge_name == "embedded": raise NotImplementedError # from sat.core import sat_main # sat = sat_main.SAT() return LiberviaCLILoop