Mercurial > libervia-web
comparison libervia/server/tasks/manager.py @ 1261:a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
A Task instance can now put a list of tasks names in its `AFTER` attribute, if will then
be prepared and launched after it.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 03 May 2020 21:07:23 +0200 |
parents | 6161076193e0 |
children | df40708c4c76 |
comparison
equal
deleted
inserted
replaced
1260:6161076193e0 | 1261:a46d0e0f383b |
---|---|
16 # You should have received a copy of the GNU Affero General Public License | 16 # You should have received a copy of the GNU Affero General Public License |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 17 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
18 import os | 18 import os |
19 import os.path | 19 import os.path |
20 from pathlib import Path | 20 from pathlib import Path |
21 from typing import Dict | |
21 import importlib.util | 22 import importlib.util |
22 from twisted.internet import defer | 23 from twisted.internet import defer |
23 from sat.core.log import getLogger | 24 from sat.core.log import getLogger |
24 from sat.core import exceptions | 25 from sat.core import exceptions |
25 from sat.core.i18n import _ | 26 from sat.core.i18n import _ |
26 from sat.tools import utils | 27 from sat.tools import utils |
27 from libervia.server.constants import Const as C | 28 from libervia.server.constants import Const as C |
28 from . import implicit | 29 from . import implicit |
30 from .task import Task | |
29 | 31 |
30 log = getLogger(__name__) | 32 log = getLogger(__name__) |
31 | 33 |
32 DEFAULT_SITE_LABEL = _("default site") | 34 DEFAULT_SITE_LABEL = _("default site") |
33 | 35 |
79 else: | 81 else: |
80 if not value in allowed: | 82 if not value in allowed: |
81 raise ValueError(_("Unexpected value for {var}: {value!r}").format( | 83 raise ValueError(_("Unexpected value for {var}: {value!r}").format( |
82 var=var, value=value)) | 84 var=var, value=value)) |
83 | 85 |
84 async def parseTasksDir(self, dir_path): | 86 async def importTask( |
87 self, | |
88 task_name: str, | |
89 task_path: Path, | |
90 to_import: Dict[str, Path] | |
91 ) -> None: | |
92 if task_name in self.tasks: | |
93 log.debug(f"skipping task {task_name} which is already imported") | |
94 return | |
95 module_name = f"{self.site_name or C.SITE_NAME_DEFAULT}.task.{task_name}" | |
96 | |
97 spec = importlib.util.spec_from_file_location(module_name, task_path) | |
98 task_module = importlib.util.module_from_spec(spec) | |
99 spec.loader.exec_module(task_module) | |
100 task = task_module.Task(self, task_name) | |
101 if task.AFTER is not None: | |
102 for pre_task_name in task.AFTER: | |
103 log.debug( | |
104 f"task {task_name!r} must be run after {pre_task_name!r}") | |
105 try: | |
106 pre_task_path = to_import[pre_task_name] | |
107 except KeyError: | |
108 raise ValueError( | |
109 f"task {task_name!r} must be run after {pre_task_name!r}, " | |
110 f"however there is no task with such name") | |
111 await self.importTask(pre_task_name, pre_task_path, to_import) | |
112 | |
113 # we launch prepare, which is a method used to prepare | |
114 # data at runtime (e.g. set WATCH_DIRS using config) | |
115 try: | |
116 prepare = task.prepare | |
117 except AttributeError: | |
118 pass | |
119 else: | |
120 log.info(_('== preparing task "{task_name}" for {site_name} =='.format( | |
121 task_name=task_name, site_name=self.site_name or DEFAULT_SITE_LABEL))) | |
122 try: | |
123 await utils.asDeferred(prepare) | |
124 except exceptions.CancelError as e: | |
125 log.debug(f"Skipping {task_name} which cancelled itself: {e}") | |
126 return | |
127 | |
128 self.tasks[task_name] = task | |
129 self.validateData(task) | |
130 if self.host.options['dev_mode']: | |
131 dirs = task.WATCH_DIRS or [] | |
132 for dir_ in dirs: | |
133 self.host.files_watcher.watchDir( | |
134 dir_, auto_add=True, recursive=True, | |
135 callback=self._autorunTask, task_name=task_name) | |
136 | |
137 async def parseTasksDir(self, dir_path: Path) -> None: | |
85 log.debug(f"parsing tasks in {dir_path}") | 138 log.debug(f"parsing tasks in {dir_path}") |
86 tasks_paths = sorted(dir_path.glob('task_*.py')) | 139 tasks_paths = sorted(dir_path.glob('task_*.py')) |
140 to_import = {} | |
87 for task_path in tasks_paths: | 141 for task_path in tasks_paths: |
88 if not task_path.is_file(): | 142 if not task_path.is_file(): |
89 continue | 143 continue |
90 task_name = task_path.stem[5:].lower().strip() | 144 task_name = task_path.stem[5:].lower().strip() |
91 if not task_name: | 145 if not task_name: |
93 if task_name in self.tasks: | 147 if task_name in self.tasks: |
94 raise exceptions.ConflictError( | 148 raise exceptions.ConflictError( |
95 "A task with the name [{name}] already exists".format( | 149 "A task with the name [{name}] already exists".format( |
96 name=task_name)) | 150 name=task_name)) |
97 log.debug(f"task {task_name} found") | 151 log.debug(f"task {task_name} found") |
98 module_name = f"{self.site_name or C.SITE_NAME_DEFAULT}.task.{task_name}" | 152 to_import[task_name] = task_path |
99 | 153 |
100 spec = importlib.util.spec_from_file_location(module_name, task_path) | 154 for task_name, task_path in to_import.items(): |
101 task_module = importlib.util.module_from_spec(spec) | 155 await self.importTask(task_name, task_path, to_import) |
102 spec.loader.exec_module(task_module) | |
103 task = task_module.Task(self) | |
104 | |
105 # we launch prepare, which is a method used to prepare | |
106 # data at runtime (e.g. set WATCH_DIRS using config) | |
107 try: | |
108 prepare = task.prepare | |
109 except AttributeError: | |
110 pass | |
111 else: | |
112 try: | |
113 await utils.asDeferred(prepare) | |
114 except exceptions.CancelError as e: | |
115 log.debug(f"Skipping {task_name} which cancelled itself: {e}") | |
116 continue | |
117 | |
118 self.tasks[task_name] = task | |
119 self.validateData(task) | |
120 if self.host.options['dev_mode']: | |
121 dirs = task.WATCH_DIRS or [] | |
122 for dir_ in dirs: | |
123 self.host.files_watcher.watchDir( | |
124 dir_, auto_add=True, recursive=True, | |
125 callback=self._autorunTask, task_name=task_name) | |
126 | 156 |
127 async def parseTasks(self): | 157 async def parseTasks(self): |
128 # implicit tasks are always run | 158 # implicit tasks are always run |
129 implicit_path = Path(implicit.__file__).parent | 159 implicit_path = Path(implicit.__file__).parent |
130 await self.parseTasksDir(implicit_path) | 160 await self.parseTasksDir(implicit_path) |