view libervia/cli/loops.py @ 4338:7c0b7ecb816f

component email gateway: Add a pubsub service: a pubsub service is implemented to retrieve and manage attachments using XEP-0498. rel 453
author Goffi <goffi@goffi.org>
date Tue, 03 Dec 2024 00:13:23 +0100
parents 0d7bb4df2343
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia CLI
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import sys
import asyncio
import logging as log
from libervia.backend.core.i18n import _
from libervia.cli.constants import Const as C
from libervia.frontends.tools import aio

log.basicConfig(level=log.WARNING, format="[%(name)s] %(message)s")

USER_INTER_MSG = _("User interruption: good bye")


class QuitException(BaseException):
    """Quitting is requested

    This is used to stop execution when host.quit() is called
    """


def get_libervia_cli_loop(bridge_name):
    if "dbus" in bridge_name:
        import signal

        class LiberviaCLILoop:

            def __init__(self):
                self.loop = asyncio.get_event_loop()

            def run(self, libervia_cli, args, namespace):
                aio.install_glib_asyncio_iteration()
                signal.signal(signal.SIGINT, self._on_sigint)
                loop = self.loop
                loop.run_until_complete(libervia_cli.main(args=args, namespace=namespace))
                loop.run_forever()

            def quit(self, exit_code):
                loop = asyncio.get_event_loop()
                loop.stop()
                sys.exit(exit_code)

            def call_later(self, delay, callback, *args):
                """call a callback repeatedly

                @param delay(int): delay between calls in s
                @param callback(callable): method to call
                    if the callback return True, the call will continue
                    else the calls will stop
                @param *args: args of the callbac
                """
                loop = asyncio.get_event_loop()
                loop.call_later(delay, callback, *args)

            def _on_sigint(self, sig_number, stack_frame):
                """Called on keyboard interruption

                Print user interruption message, set exit code and stop reactor
                """
                print("\r" + USER_INTER_MSG)
                self.quit(C.EXIT_USER_CANCELLED)

    else:
        import signal
        from twisted.internet import asyncioreactor

        asyncioreactor.install()
        from twisted.internet import reactor, defer

        class LiberviaCLILoop:

            def __init__(self):
                # exit code must be set when using quit, so if it's not set
                # something got wrong and we must report it
                self._exit_code = C.EXIT_INTERNAL_ERROR

            def run(self, libervia_cli, *args):
                self.libervia_cli = libervia_cli
                signal.signal(signal.SIGINT, self._on_sigint)
                defer.ensureDeferred(self._start(libervia_cli, *args))
                try:
                    reactor.run(installSignalHandlers=False)
                except SystemExit as e:
                    self._exit_code = e.code
                sys.exit(self._exit_code)

            async def _start(self, libervia_cli, *args):
                fut = asyncio.ensure_future(libervia_cli.main(*args))
                try:
                    await defer.Deferred.fromFuture(fut)
                except BaseException:
                    import traceback

                    traceback.print_exc()
                    libervia_cli.quit(1)

            def quit(self, exit_code):
                self._exit_code = exit_code
                reactor.stop()

            def _timeout_cb(self, args, callback, delay):
                try:
                    ret = callback(*args)
                # FIXME: temporary hack to avoid traceback when using XMLUI
                #        to be removed once create_task is not used anymore in
                #        xmlui_manager (i.e. once libervia.frontends.tools.xmlui fully supports
                #        async syntax)
                except QuitException:
                    return
                if ret:
                    reactor.callLater(delay, self._timeout_cb, args, callback, delay)

            def call_later(self, delay, callback, *args):
                reactor.callLater(delay, self._timeout_cb, args, callback, delay)

            def _on_sigint(self, sig_number, stack_frame):
                """Called on keyboard interruption

                Print user interruption message, set exit code and stop reactor
                """
                print("\r" + USER_INTER_MSG)
                self._exit_code = C.EXIT_USER_CANCELLED
                reactor.callFromThread(reactor.stop)

    if bridge_name == "embedded":
        raise NotImplementedError
        # from sat.core import sat_main
        # sat = sat_main.SAT()

    return LiberviaCLILoop