# HG changeset patch # User Goffi # Date 1698843648 -3600 # Node ID 5de6f35953804a9d6319b4b363e75ad4d05ad496 # Parent 10979b5c305a2fa9b1bd7ef2c6835d1abc5a3e87 frontends (tools/aio): add tool to run from thread, run maybe async method, or run GLib loop: - `run_from_thread` can be used to run a method from a thread. - `maybe_async` can be used to run a method if we are not sure if it's async or not. - `install_glib_asyncio_iteration` run the GLib loop from asyncio one's. rel 426 diff -r 10979b5c305a -r 5de6f3595380 libervia/frontends/tools/aio.py --- a/libervia/frontends/tools/aio.py Wed Nov 01 13:57:08 2023 +0100 +++ b/libervia/frontends/tools/aio.py Wed Nov 01 14:00:48 2023 +0100 @@ -17,7 +17,7 @@ # along with this program. If not, see . import asyncio -from typing import Any, Coroutine +from typing import Any, Awaitable, Callable, Coroutine from libervia.backend.core import log as logging @@ -54,3 +54,95 @@ background_tasks.add(task) task.add_done_callback(_on_task_done) + + +def run_with_args( + async_method: Callable[..., Coroutine[Any, Any, Any]], *args: Any, **kwargs: Any +) -> None: + """Schedules and tracks an asynchronous method with arguments. + + This function wraps the provided asynchronous method with its arguments + and then schedules it for execution. + + @param async_method: The asynchronous method to be scheduled. + @param args: Positional arguments to pass to the async_method. + @param kwargs: Keyword arguments to pass to the async_method. + """ + run_async(async_method(*args, **kwargs)) + + +def run_from_thread( + async_method: Coroutine | asyncio.Future, + *args, + loop: asyncio.AbstractEventLoop | None = None, + **kwargs, +) -> None: + """Schedules an asynchronous method from another thread. + + @param async_method: The method to be scheduled for execution. + """ + if loop is None: + loop = asyncio.get_event_loop() + assert loop is not None + loop.call_soon_threadsafe(run_with_args, async_method, *args, **kwargs) + + +def maybe_async(result: Any | Awaitable[Any]) -> Awaitable[Any]: + """ + Convert the provided result into an awaitable. + + @param result: the result of a function or coroutine call + @return: an awaitable object which can be awaited to get the result + """ + if asyncio.iscoroutine(result): + return result + + future = asyncio.Future() + future.set_result(result) + return future + + +def install_glib_asyncio_iteration(): + """Import and install GLib context iteration inside our asyncio event loop. + + This is used as soon as GLib is used (like GStreamer). + Inspired from Kivy's install_gobject_iteration (in ``kivy.support``), thanks to Kivy's + team. + """ + + import asyncio + + try: + from gi.repository import GLib + except ImportError: + raise ImportError("GLib could not be imported. Ensure it's installed.") + + if hasattr(GLib, "_glib_already_installed"): + # already installed, don't do it twice. + return + + GLib._glib_already_installed = True + + loop = asyncio.get_event_loop() + + # Create a GLib MainContext and make it the default + glib_context = GLib.MainContext.default() + + # Function to iterate over the GLib main context + def _glib_iteration(): + # We need to loop over the context to prevent lag + iteration_count = 0 + while glib_context.pending() and iteration_count < 20: + glib_context.iteration(False) + iteration_count += 1 + + # If no work was done in the GLib loop, add a short delay before + # scheduling the next iteration, to prevent spinning and high CPU usage. + if iteration_count == 0: + loop.call_later(0.01, _glib_iteration) + else: + # Schedule ourselves to run again on the next asyncio loop iteration + loop.call_soon(_glib_iteration) + + # Kick off the GLib iteration + loop.call_soon(_glib_iteration)