Mercurial > libervia-web
comparison libervia/web/server/tasks/manager.py @ 1518:eb00d593801d
refactoring: rename `libervia` to `libervia.web` + update imports following backend changes
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 16:49:28 +0200 |
parents | libervia/server/tasks/manager.py@106bae41f5c8 |
children |
comparison
equal
deleted
inserted
replaced
1517:b8ed9726525b | 1518:eb00d593801d |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia: a Salut à Toi frontend | |
4 # Copyright (C) 2011-2021 Jérôme Poisson <goffi@goffi.org> | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 import os | |
19 import os.path | |
20 from pathlib import Path | |
21 from typing import Dict | |
22 import importlib.util | |
23 from twisted.internet import defer | |
24 from libervia.backend.core.log import getLogger | |
25 from libervia.backend.core import exceptions | |
26 from libervia.backend.core.i18n import _ | |
27 from libervia.backend.tools import utils | |
28 from libervia.web.server.constants import Const as C | |
29 from . import implicit | |
30 from .task import Task | |
31 | |
32 log = getLogger(__name__) | |
33 | |
34 DEFAULT_SITE_LABEL = _("default site") | |
35 | |
36 | |
37 class TasksManager: | |
38 """Handle tasks of a Libervia site""" | |
39 | |
40 def __init__(self, host, site_resource): | |
41 """ | |
42 @param site_resource(LiberviaRootResource): root resource of the site to manage | |
43 """ | |
44 self.host = host | |
45 self.resource = site_resource | |
46 self.tasks_dir = self.site_path / C.TASKS_DIR | |
47 self.tasks = {} | |
48 self._build_path = None | |
49 self._current_task = None | |
50 | |
51 @property | |
52 def site_path(self): | |
53 return Path(self.resource.site_path) | |
54 | |
55 @property | |
56 def build_path(self): | |
57 """path where generated files will be build for this site""" | |
58 if self._build_path is None: | |
59 self._build_path = self.host.get_build_path(self.site_name) | |
60 return self._build_path | |
61 | |
62 @property | |
63 def site_name(self): | |
64 return self.resource.site_name | |
65 | |
66 def validate_data(self, task): | |
67 """Check workflow attributes in task""" | |
68 | |
69 for var, allowed in (("ON_ERROR", ("continue", "stop")), | |
70 ("LOG_OUTPUT", bool), | |
71 ("WATCH_DIRS", list)): | |
72 value = getattr(task, var) | |
73 | |
74 if isinstance(allowed, type): | |
75 if allowed is list and value is None: | |
76 continue | |
77 if not isinstance(value, allowed): | |
78 raise ValueError( | |
79 _("Unexpected value for {var}, {allowed} is expected.") | |
80 .format(var=var, allowed=allowed)) | |
81 else: | |
82 if not value in allowed: | |
83 raise ValueError(_("Unexpected value for {var}: {value!r}").format( | |
84 var=var, value=value)) | |
85 | |
86 async def import_task( | |
87 self, | |
88 task_name: str, | |
89 task_path: Path, | |
90 to_import: Dict[str, Path] | |
91 ) -> None: | |
92 if task_name in self.tasks: | |
93 log.debug(f"skipping task {task_name} which is already imported") | |
94 return | |
95 module_name = f"{self.site_name or C.SITE_NAME_DEFAULT}.task.{task_name}" | |
96 | |
97 spec = importlib.util.spec_from_file_location(module_name, task_path) | |
98 task_module = importlib.util.module_from_spec(spec) | |
99 spec.loader.exec_module(task_module) | |
100 task = task_module.Task(self, task_name) | |
101 if task.AFTER is not None: | |
102 for pre_task_name in task.AFTER: | |
103 log.debug( | |
104 f"task {task_name!r} must be run after {pre_task_name!r}") | |
105 try: | |
106 pre_task_path = to_import[pre_task_name] | |
107 except KeyError: | |
108 raise ValueError( | |
109 f"task {task_name!r} must be run after {pre_task_name!r}, " | |
110 f"however there is no task with such name") | |
111 await self.import_task(pre_task_name, pre_task_path, to_import) | |
112 | |
113 # we launch prepare, which is a method used to prepare | |
114 # data at runtime (e.g. set WATCH_DIRS using config) | |
115 try: | |
116 prepare = task.prepare | |
117 except AttributeError: | |
118 pass | |
119 else: | |
120 log.info(_('== preparing task "{task_name}" for {site_name} =='.format( | |
121 task_name=task_name, site_name=self.site_name or DEFAULT_SITE_LABEL))) | |
122 try: | |
123 await utils.as_deferred(prepare) | |
124 except exceptions.CancelError as e: | |
125 log.debug(f"Skipping {task_name} which cancelled itself: {e}") | |
126 return | |
127 | |
128 self.tasks[task_name] = task | |
129 self.validate_data(task) | |
130 if self.host.options['dev-mode']: | |
131 dirs = task.WATCH_DIRS or [] | |
132 for dir_ in dirs: | |
133 self.host.files_watcher.watch_dir( | |
134 dir_, auto_add=True, recursive=True, | |
135 callback=self._autorun_task, task_name=task_name) | |
136 | |
137 async def parse_tasks_dir(self, dir_path: Path) -> None: | |
138 log.debug(f"parsing tasks in {dir_path}") | |
139 tasks_paths = sorted(dir_path.glob('task_*.py')) | |
140 to_import = {} | |
141 for task_path in tasks_paths: | |
142 if not task_path.is_file(): | |
143 continue | |
144 task_name = task_path.stem[5:].lower().strip() | |
145 if not task_name: | |
146 continue | |
147 if task_name in self.tasks: | |
148 raise exceptions.ConflictError( | |
149 "A task with the name [{name}] already exists".format( | |
150 name=task_name)) | |
151 log.debug(f"task {task_name} found") | |
152 to_import[task_name] = task_path | |
153 | |
154 for task_name, task_path in to_import.items(): | |
155 await self.import_task(task_name, task_path, to_import) | |
156 | |
157 async def parse_tasks(self): | |
158 # implicit tasks are always run | |
159 implicit_path = Path(implicit.__file__).parent | |
160 await self.parse_tasks_dir(implicit_path) | |
161 # now we check if there are tasks specific to this site | |
162 if not self.tasks_dir.is_dir(): | |
163 log.debug(_("{name} has no task to launch.").format( | |
164 name = self.resource.site_name or DEFAULT_SITE_LABEL)) | |
165 return | |
166 else: | |
167 await self.parse_tasks_dir(self.tasks_dir) | |
168 | |
169 def _autorun_task(self, host, filepath, flags, task_name): | |
170 """Called when an event is received from a watched directory""" | |
171 if flags == ['create']: | |
172 return | |
173 try: | |
174 task = self.tasks[task_name] | |
175 on_dir_event_cb = task.on_dir_event | |
176 except AttributeError: | |
177 return defer.ensureDeferred(self.run_task(task_name)) | |
178 else: | |
179 return utils.as_deferred( | |
180 on_dir_event_cb, host, Path(filepath.path.decode()), flags) | |
181 | |
182 async def run_task_instance(self, task: Task) -> None: | |
183 self._current_task = task.name | |
184 log.info(_('== running task "{task_name}" for {site_name} =='.format( | |
185 task_name=task.name, site_name=self.site_name or DEFAULT_SITE_LABEL))) | |
186 os.chdir(self.site_path) | |
187 try: | |
188 await utils.as_deferred(task.start) | |
189 except Exception as e: | |
190 on_error = task.ON_ERROR | |
191 if on_error == 'stop': | |
192 raise e | |
193 elif on_error == 'continue': | |
194 log.warning(_('Task "{task_name}" failed for {site_name}: {reason}') | |
195 .format(task_name=task.name, site_name=self.site_name, reason=e)) | |
196 else: | |
197 raise exceptions.InternalError("we should never reach this point") | |
198 self._current_task = None | |
199 | |
200 async def run_task(self, task_name: str) -> None: | |
201 """Run a single task | |
202 | |
203 @param task_name(unicode): name of the task to run | |
204 """ | |
205 task = self.tasks[task_name] | |
206 await self.run_task_instance(task) | |
207 | |
208 async def run_tasks(self): | |
209 """Run all the tasks found""" | |
210 old_path = os.getcwd() | |
211 for task_name, task_value in self.tasks.items(): | |
212 await self.run_task(task_name) | |
213 os.chdir(old_path) |