comparison libervia/frontends/tools/aio.py @ 4138:5de6f3595380

frontends (tools/aio): add tool to run from thread, run maybe async method, or run GLib loop: - `run_from_thread` can be used to run a method from a thread. - `maybe_async` can be used to run a method if we are not sure if it's async or not. - `install_glib_asyncio_iteration` run the GLib loop from asyncio one's. rel 426
author Goffi <goffi@goffi.org>
date Wed, 01 Nov 2023 14:00:48 +0100
parents 5fc26a6ef113
children d01b8d002619
comparison
equal deleted inserted replaced
4137:10979b5c305a 4138:5de6f3595380
15 15
16 # You should have received a copy of the GNU Affero General Public License 16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. 17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 18
19 import asyncio 19 import asyncio
20 from typing import Any, Coroutine 20 from typing import Any, Awaitable, Callable, Coroutine
21 21
22 from libervia.backend.core import log as logging 22 from libervia.backend.core import log as logging
23 23
24 log = logging.getLogger(__name__) 24 log = logging.getLogger(__name__)
25 background_tasks = set() 25 background_tasks = set()
52 else: 52 else:
53 task = asyncio.create_task(async_method) 53 task = asyncio.create_task(async_method)
54 54
55 background_tasks.add(task) 55 background_tasks.add(task)
56 task.add_done_callback(_on_task_done) 56 task.add_done_callback(_on_task_done)
57
58
59 def run_with_args(
60 async_method: Callable[..., Coroutine[Any, Any, Any]], *args: Any, **kwargs: Any
61 ) -> None:
62 """Schedules and tracks an asynchronous method with arguments.
63
64 This function wraps the provided asynchronous method with its arguments
65 and then schedules it for execution.
66
67 @param async_method: The asynchronous method to be scheduled.
68 @param args: Positional arguments to pass to the async_method.
69 @param kwargs: Keyword arguments to pass to the async_method.
70 """
71 run_async(async_method(*args, **kwargs))
72
73
74 def run_from_thread(
75 async_method: Coroutine | asyncio.Future,
76 *args,
77 loop: asyncio.AbstractEventLoop | None = None,
78 **kwargs,
79 ) -> None:
80 """Schedules an asynchronous method from another thread.
81
82 @param async_method: The method to be scheduled for execution.
83 """
84 if loop is None:
85 loop = asyncio.get_event_loop()
86 assert loop is not None
87 loop.call_soon_threadsafe(run_with_args, async_method, *args, **kwargs)
88
89
90 def maybe_async(result: Any | Awaitable[Any]) -> Awaitable[Any]:
91 """
92 Convert the provided result into an awaitable.
93
94 @param result: the result of a function or coroutine call
95 @return: an awaitable object which can be awaited to get the result
96 """
97 if asyncio.iscoroutine(result):
98 return result
99
100 future = asyncio.Future()
101 future.set_result(result)
102 return future
103
104
105 def install_glib_asyncio_iteration():
106 """Import and install GLib context iteration inside our asyncio event loop.
107
108 This is used as soon as GLib is used (like GStreamer).
109 Inspired from Kivy's install_gobject_iteration (in ``kivy.support``), thanks to Kivy's
110 team.
111 """
112
113 import asyncio
114
115 try:
116 from gi.repository import GLib
117 except ImportError:
118 raise ImportError("GLib could not be imported. Ensure it's installed.")
119
120 if hasattr(GLib, "_glib_already_installed"):
121 # already installed, don't do it twice.
122 return
123
124 GLib._glib_already_installed = True
125
126 loop = asyncio.get_event_loop()
127
128 # Create a GLib MainContext and make it the default
129 glib_context = GLib.MainContext.default()
130
131 # Function to iterate over the GLib main context
132 def _glib_iteration():
133 # We need to loop over the context to prevent lag
134 iteration_count = 0
135 while glib_context.pending() and iteration_count < 20:
136 glib_context.iteration(False)
137 iteration_count += 1
138
139 # If no work was done in the GLib loop, add a short delay before
140 # scheduling the next iteration, to prevent spinning and high CPU usage.
141 if iteration_count == 0:
142 loop.call_later(0.01, _glib_iteration)
143 else:
144 # Schedule ourselves to run again on the next asyncio loop iteration
145 loop.call_soon(_glib_iteration)
146
147 # Kick off the GLib iteration
148 loop.call_soon(_glib_iteration)