changeset 4138:5de6f3595380

frontends (tools/aio): add tool to run from thread, run maybe async method, or run GLib loop: - `run_from_thread` can be used to run a method from a thread. - `maybe_async` can be used to run a method if we are not sure if it's async or not. - `install_glib_asyncio_iteration` run the GLib loop from asyncio one's. rel 426
author Goffi <goffi@goffi.org>
date Wed, 01 Nov 2023 14:00:48 +0100
parents 10979b5c305a
children 6745c6bd4c7a
files libervia/frontends/tools/aio.py
diffstat 1 files changed, 93 insertions(+), 1 deletions(-) [+]
line wrap: on
line diff
--- a/libervia/frontends/tools/aio.py	Wed Nov 01 13:57:08 2023 +0100
+++ b/libervia/frontends/tools/aio.py	Wed Nov 01 14:00:48 2023 +0100
@@ -17,7 +17,7 @@
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 import asyncio
-from typing import Any, Coroutine
+from typing import Any, Awaitable, Callable, Coroutine
 
 from libervia.backend.core import log as logging
 
@@ -54,3 +54,95 @@
 
     background_tasks.add(task)
     task.add_done_callback(_on_task_done)
+
+
+def run_with_args(
+    async_method: Callable[..., Coroutine[Any, Any, Any]], *args: Any, **kwargs: Any
+) -> None:
+    """Schedules and tracks an asynchronous method with arguments.
+
+    This function wraps the provided asynchronous method with its arguments
+    and then schedules it for execution.
+
+    @param async_method: The asynchronous method to be scheduled.
+    @param args: Positional arguments to pass to the async_method.
+    @param kwargs: Keyword arguments to pass to the async_method.
+    """
+    run_async(async_method(*args, **kwargs))
+
+
+def run_from_thread(
+    async_method: Coroutine | asyncio.Future,
+    *args,
+    loop: asyncio.AbstractEventLoop | None = None,
+    **kwargs,
+) -> None:
+    """Schedules an asynchronous method from another thread.
+
+    @param async_method: The method to be scheduled for execution.
+    """
+    if loop is None:
+        loop = asyncio.get_event_loop()
+        assert loop is not None
+    loop.call_soon_threadsafe(run_with_args, async_method, *args, **kwargs)
+
+
+def maybe_async(result: Any | Awaitable[Any]) -> Awaitable[Any]:
+    """
+    Convert the provided result into an awaitable.
+
+    @param result: the result of a function or coroutine call
+    @return: an awaitable object which can be awaited to get the result
+    """
+    if asyncio.iscoroutine(result):
+        return result
+
+    future = asyncio.Future()
+    future.set_result(result)
+    return future
+
+
+def install_glib_asyncio_iteration():
+    """Import and install GLib context iteration inside our asyncio event loop.
+
+    This is used as soon as GLib is used (like GStreamer).
+    Inspired from Kivy's install_gobject_iteration (in ``kivy.support``), thanks to Kivy's
+    team.
+    """
+
+    import asyncio
+
+    try:
+        from gi.repository import GLib
+    except ImportError:
+        raise ImportError("GLib could not be imported. Ensure it's installed.")
+
+    if hasattr(GLib, "_glib_already_installed"):
+        # already installed, don't do it twice.
+        return
+
+    GLib._glib_already_installed = True
+
+    loop = asyncio.get_event_loop()
+
+    # Create a GLib MainContext and make it the default
+    glib_context = GLib.MainContext.default()
+
+    # Function to iterate over the GLib main context
+    def _glib_iteration():
+        # We need to loop over the context to prevent lag
+        iteration_count = 0
+        while glib_context.pending() and iteration_count < 20:
+            glib_context.iteration(False)
+            iteration_count += 1
+
+        # If no work was done in the GLib loop, add a short delay before
+        # scheduling the next iteration, to prevent spinning and high CPU usage.
+        if iteration_count == 0:
+            loop.call_later(0.01, _glib_iteration)
+        else:
+            # Schedule ourselves to run again on the next asyncio loop iteration
+            loop.call_soon(_glib_iteration)
+
+    # Kick off the GLib iteration
+    loop.call_soon(_glib_iteration)