Mercurial > libervia-backend
view libervia/frontends/tools/aio.py @ 4152:23d21daed216
core (memory/sqla_mapping): add a `version_id` column to detect race conditions.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 22 Nov 2023 14:52:00 +0100 |
parents | 5de6f3595380 |
children | d01b8d002619 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia AsyncIO helper methods # Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import asyncio from typing import Any, Awaitable, Callable, Coroutine from libervia.backend.core import log as logging log = logging.getLogger(__name__) background_tasks = set() def _on_task_done(task: asyncio.Future) -> None: """Callback function to execute when a task is done. @param task: The completed task. Note: The function removes the task from the tracking set and logs any exceptions that might have occurred. """ background_tasks.discard(task) e = task.exception() if e is not None: exc_info = (type(e), e, e.__traceback__) log.error("Task failed:", exc_info=exc_info) def run_async(async_method: Coroutine | asyncio.Future) -> None: """Schedules and tracks an asynchronous method. @param async_method: The method to be scheduled for execution. Note: The function keeps a strong reference to the task to prevent it from being garbage-collected. In case of task failure, logs the error. cf. https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task """ if isinstance(async_method, asyncio.Future): task = asyncio.ensure_future(async_method) else: task = asyncio.create_task(async_method) background_tasks.add(task) task.add_done_callback(_on_task_done) def run_with_args( async_method: Callable[..., Coroutine[Any, Any, Any]], *args: Any, **kwargs: Any ) -> None: """Schedules and tracks an asynchronous method with arguments. This function wraps the provided asynchronous method with its arguments and then schedules it for execution. @param async_method: The asynchronous method to be scheduled. @param args: Positional arguments to pass to the async_method. @param kwargs: Keyword arguments to pass to the async_method. """ run_async(async_method(*args, **kwargs)) def run_from_thread( async_method: Coroutine | asyncio.Future, *args, loop: asyncio.AbstractEventLoop | None = None, **kwargs, ) -> None: """Schedules an asynchronous method from another thread. @param async_method: The method to be scheduled for execution. """ if loop is None: loop = asyncio.get_event_loop() assert loop is not None loop.call_soon_threadsafe(run_with_args, async_method, *args, **kwargs) def maybe_async(result: Any | Awaitable[Any]) -> Awaitable[Any]: """ Convert the provided result into an awaitable. @param result: the result of a function or coroutine call @return: an awaitable object which can be awaited to get the result """ if asyncio.iscoroutine(result): return result future = asyncio.Future() future.set_result(result) return future def install_glib_asyncio_iteration(): """Import and install GLib context iteration inside our asyncio event loop. This is used as soon as GLib is used (like GStreamer). Inspired from Kivy's install_gobject_iteration (in ``kivy.support``), thanks to Kivy's team. """ import asyncio try: from gi.repository import GLib except ImportError: raise ImportError("GLib could not be imported. Ensure it's installed.") if hasattr(GLib, "_glib_already_installed"): # already installed, don't do it twice. return GLib._glib_already_installed = True loop = asyncio.get_event_loop() # Create a GLib MainContext and make it the default glib_context = GLib.MainContext.default() # Function to iterate over the GLib main context def _glib_iteration(): # We need to loop over the context to prevent lag iteration_count = 0 while glib_context.pending() and iteration_count < 20: glib_context.iteration(False) iteration_count += 1 # If no work was done in the GLib loop, add a short delay before # scheduling the next iteration, to prevent spinning and high CPU usage. if iteration_count == 0: loop.call_later(0.01, _glib_iteration) else: # Schedule ourselves to run again on the next asyncio loop iteration loop.call_soon(_glib_iteration) # Kick off the GLib iteration loop.call_soon(_glib_iteration)