Mercurial > libervia-backend
comparison libervia/frontends/tools/aio.py @ 4138:5de6f3595380
frontends (tools/aio): add tool to run from thread, run maybe async method, or run GLib loop:
- `run_from_thread` can be used to run a method from a thread.
- `maybe_async` can be used to run a method if we are not sure if it's async or not.
- `install_glib_asyncio_iteration` run the GLib loop from asyncio one's.
rel 426
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 01 Nov 2023 14:00:48 +0100 |
parents | 5fc26a6ef113 |
children | d01b8d002619 |
comparison
equal
deleted
inserted
replaced
4137:10979b5c305a | 4138:5de6f3595380 |
---|---|
15 | 15 |
16 # You should have received a copy of the GNU Affero General Public License | 16 # You should have received a copy of the GNU Affero General Public License |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 17 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
18 | 18 |
19 import asyncio | 19 import asyncio |
20 from typing import Any, Coroutine | 20 from typing import Any, Awaitable, Callable, Coroutine |
21 | 21 |
22 from libervia.backend.core import log as logging | 22 from libervia.backend.core import log as logging |
23 | 23 |
24 log = logging.getLogger(__name__) | 24 log = logging.getLogger(__name__) |
25 background_tasks = set() | 25 background_tasks = set() |
52 else: | 52 else: |
53 task = asyncio.create_task(async_method) | 53 task = asyncio.create_task(async_method) |
54 | 54 |
55 background_tasks.add(task) | 55 background_tasks.add(task) |
56 task.add_done_callback(_on_task_done) | 56 task.add_done_callback(_on_task_done) |
57 | |
58 | |
59 def run_with_args( | |
60 async_method: Callable[..., Coroutine[Any, Any, Any]], *args: Any, **kwargs: Any | |
61 ) -> None: | |
62 """Schedules and tracks an asynchronous method with arguments. | |
63 | |
64 This function wraps the provided asynchronous method with its arguments | |
65 and then schedules it for execution. | |
66 | |
67 @param async_method: The asynchronous method to be scheduled. | |
68 @param args: Positional arguments to pass to the async_method. | |
69 @param kwargs: Keyword arguments to pass to the async_method. | |
70 """ | |
71 run_async(async_method(*args, **kwargs)) | |
72 | |
73 | |
74 def run_from_thread( | |
75 async_method: Coroutine | asyncio.Future, | |
76 *args, | |
77 loop: asyncio.AbstractEventLoop | None = None, | |
78 **kwargs, | |
79 ) -> None: | |
80 """Schedules an asynchronous method from another thread. | |
81 | |
82 @param async_method: The method to be scheduled for execution. | |
83 """ | |
84 if loop is None: | |
85 loop = asyncio.get_event_loop() | |
86 assert loop is not None | |
87 loop.call_soon_threadsafe(run_with_args, async_method, *args, **kwargs) | |
88 | |
89 | |
90 def maybe_async(result: Any | Awaitable[Any]) -> Awaitable[Any]: | |
91 """ | |
92 Convert the provided result into an awaitable. | |
93 | |
94 @param result: the result of a function or coroutine call | |
95 @return: an awaitable object which can be awaited to get the result | |
96 """ | |
97 if asyncio.iscoroutine(result): | |
98 return result | |
99 | |
100 future = asyncio.Future() | |
101 future.set_result(result) | |
102 return future | |
103 | |
104 | |
105 def install_glib_asyncio_iteration(): | |
106 """Import and install GLib context iteration inside our asyncio event loop. | |
107 | |
108 This is used as soon as GLib is used (like GStreamer). | |
109 Inspired from Kivy's install_gobject_iteration (in ``kivy.support``), thanks to Kivy's | |
110 team. | |
111 """ | |
112 | |
113 import asyncio | |
114 | |
115 try: | |
116 from gi.repository import GLib | |
117 except ImportError: | |
118 raise ImportError("GLib could not be imported. Ensure it's installed.") | |
119 | |
120 if hasattr(GLib, "_glib_already_installed"): | |
121 # already installed, don't do it twice. | |
122 return | |
123 | |
124 GLib._glib_already_installed = True | |
125 | |
126 loop = asyncio.get_event_loop() | |
127 | |
128 # Create a GLib MainContext and make it the default | |
129 glib_context = GLib.MainContext.default() | |
130 | |
131 # Function to iterate over the GLib main context | |
132 def _glib_iteration(): | |
133 # We need to loop over the context to prevent lag | |
134 iteration_count = 0 | |
135 while glib_context.pending() and iteration_count < 20: | |
136 glib_context.iteration(False) | |
137 iteration_count += 1 | |
138 | |
139 # If no work was done in the GLib loop, add a short delay before | |
140 # scheduling the next iteration, to prevent spinning and high CPU usage. | |
141 if iteration_count == 0: | |
142 loop.call_later(0.01, _glib_iteration) | |
143 else: | |
144 # Schedule ourselves to run again on the next asyncio loop iteration | |
145 loop.call_soon(_glib_iteration) | |
146 | |
147 # Kick off the GLib iteration | |
148 loop.call_soon(_glib_iteration) |