Mercurial > libervia-web
annotate libervia/server/tasks/manager.py @ 1274:eb4f03da0d7d
server: re-usable Twisted TLS code has been moved to SàT backend
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 29 May 2020 21:56:42 +0200 |
parents | a46d0e0f383b |
children | df40708c4c76 |
rev | line source |
---|---|
1239 | 1 #!/usr/bin/env python3 |
2 | |
1146 | 3 # Libervia: a Salut à Toi frontend |
1237 | 4 # Copyright (C) 2011-2020 Jérôme Poisson <goffi@goffi.org> |
1146 | 5 |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 import os | |
19 import os.path | |
1247
a6c7f07f1e4d
tasks: implicit tasks + Brython task:
Goffi <goffi@goffi.org>
parents:
1245
diff
changeset
|
20 from pathlib import Path |
1261
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
21 from typing import Dict |
1245 | 22 import importlib.util |
1247
a6c7f07f1e4d
tasks: implicit tasks + Brython task:
Goffi <goffi@goffi.org>
parents:
1245
diff
changeset
|
23 from twisted.internet import defer |
1245 | 24 from sat.core.log import getLogger |
1146 | 25 from sat.core import exceptions |
26 from sat.core.i18n import _ | |
1245 | 27 from sat.tools import utils |
1146 | 28 from libervia.server.constants import Const as C |
1247
a6c7f07f1e4d
tasks: implicit tasks + Brython task:
Goffi <goffi@goffi.org>
parents:
1245
diff
changeset
|
29 from . import implicit |
1261
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
30 from .task import Task |
1146 | 31 |
32 log = getLogger(__name__) | |
33 | |
1252
80a92eb82b7f
server (tasks manager): added a label for default site
Goffi <goffi@goffi.org>
parents:
1247
diff
changeset
|
34 DEFAULT_SITE_LABEL = _("default site") |
80a92eb82b7f
server (tasks manager): added a label for default site
Goffi <goffi@goffi.org>
parents:
1247
diff
changeset
|
35 |
1146 | 36 |
1245 | 37 class TasksManager: |
1146 | 38 """Handle tasks of a Libervia site""" |
39 | |
40 def __init__(self, host, site_resource): | |
41 """ | |
42 @param site_resource(LiberviaRootResource): root resource of the site to manage | |
43 """ | |
44 self.host = host | |
45 self.resource = site_resource | |
1247
a6c7f07f1e4d
tasks: implicit tasks + Brython task:
Goffi <goffi@goffi.org>
parents:
1245
diff
changeset
|
46 self.tasks_dir = self.site_path / C.TASKS_DIR |
1245 | 47 self.tasks = {} |
1146 | 48 self._build_path = None |
49 self._current_task = None | |
50 | |
51 @property | |
52 def site_path(self): | |
1247
a6c7f07f1e4d
tasks: implicit tasks + Brython task:
Goffi <goffi@goffi.org>
parents:
1245
diff
changeset
|
53 return Path(self.resource.site_path) |
1146 | 54 |
55 @property | |
56 def build_path(self): | |
57 """path where generated files will be build for this site""" | |
58 if self._build_path is None: | |
59 self._build_path = self.host.getBuildPath(self.site_name) | |
60 return self._build_path | |
61 | |
62 @property | |
63 def site_name(self): | |
64 return self.resource.site_name | |
65 | |
1245 | 66 def validateData(self, task): |
67 """Check workflow attributes in task""" | |
1146 | 68 |
1245 | 69 for var, allowed in (("ON_ERROR", ("continue", "stop")), |
70 ("LOG_OUTPUT", bool), | |
71 ("WATCH_DIRS", list)): | |
72 value = getattr(task, var) | |
1146 | 73 |
74 if isinstance(allowed, type): | |
1245 | 75 if allowed is list and value is None: |
76 continue | |
1146 | 77 if not isinstance(value, allowed): |
78 raise ValueError( | |
1216 | 79 _("Unexpected value for {var}, {allowed} is expected.") |
1245 | 80 .format(var=var, allowed=allowed)) |
1146 | 81 else: |
82 if not value in allowed: | |
1245 | 83 raise ValueError(_("Unexpected value for {var}: {value!r}").format( |
84 var=var, value=value)) | |
1146 | 85 |
1261
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
86 async def importTask( |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
87 self, |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
88 task_name: str, |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
89 task_path: Path, |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
90 to_import: Dict[str, Path] |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
91 ) -> None: |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
92 if task_name in self.tasks: |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
93 log.debug(f"skipping task {task_name} which is already imported") |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
94 return |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
95 module_name = f"{self.site_name or C.SITE_NAME_DEFAULT}.task.{task_name}" |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
96 |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
97 spec = importlib.util.spec_from_file_location(module_name, task_path) |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
98 task_module = importlib.util.module_from_spec(spec) |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
99 spec.loader.exec_module(task_module) |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
100 task = task_module.Task(self, task_name) |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
101 if task.AFTER is not None: |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
102 for pre_task_name in task.AFTER: |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
103 log.debug( |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
104 f"task {task_name!r} must be run after {pre_task_name!r}") |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
105 try: |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
106 pre_task_path = to_import[pre_task_name] |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
107 except KeyError: |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
108 raise ValueError( |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
109 f"task {task_name!r} must be run after {pre_task_name!r}, " |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
110 f"however there is no task with such name") |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
111 await self.importTask(pre_task_name, pre_task_path, to_import) |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
112 |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
113 # we launch prepare, which is a method used to prepare |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
114 # data at runtime (e.g. set WATCH_DIRS using config) |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
115 try: |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
116 prepare = task.prepare |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
117 except AttributeError: |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
118 pass |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
119 else: |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
120 log.info(_('== preparing task "{task_name}" for {site_name} =='.format( |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
121 task_name=task_name, site_name=self.site_name or DEFAULT_SITE_LABEL))) |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
122 try: |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
123 await utils.asDeferred(prepare) |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
124 except exceptions.CancelError as e: |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
125 log.debug(f"Skipping {task_name} which cancelled itself: {e}") |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
126 return |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
127 |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
128 self.tasks[task_name] = task |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
129 self.validateData(task) |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
130 if self.host.options['dev_mode']: |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
131 dirs = task.WATCH_DIRS or [] |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
132 for dir_ in dirs: |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
133 self.host.files_watcher.watchDir( |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
134 dir_, auto_add=True, recursive=True, |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
135 callback=self._autorunTask, task_name=task_name) |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
136 |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
137 async def parseTasksDir(self, dir_path: Path) -> None: |
1247
a6c7f07f1e4d
tasks: implicit tasks + Brython task:
Goffi <goffi@goffi.org>
parents:
1245
diff
changeset
|
138 log.debug(f"parsing tasks in {dir_path}") |
a6c7f07f1e4d
tasks: implicit tasks + Brython task:
Goffi <goffi@goffi.org>
parents:
1245
diff
changeset
|
139 tasks_paths = sorted(dir_path.glob('task_*.py')) |
1261
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
140 to_import = {} |
1247
a6c7f07f1e4d
tasks: implicit tasks + Brython task:
Goffi <goffi@goffi.org>
parents:
1245
diff
changeset
|
141 for task_path in tasks_paths: |
a6c7f07f1e4d
tasks: implicit tasks + Brython task:
Goffi <goffi@goffi.org>
parents:
1245
diff
changeset
|
142 if not task_path.is_file(): |
1146 | 143 continue |
1247
a6c7f07f1e4d
tasks: implicit tasks + Brython task:
Goffi <goffi@goffi.org>
parents:
1245
diff
changeset
|
144 task_name = task_path.stem[5:].lower().strip() |
1146 | 145 if not task_name: |
146 continue | |
147 if task_name in self.tasks: | |
148 raise exceptions.ConflictError( | |
1216 | 149 "A task with the name [{name}] already exists".format( |
1146 | 150 name=task_name)) |
1247
a6c7f07f1e4d
tasks: implicit tasks + Brython task:
Goffi <goffi@goffi.org>
parents:
1245
diff
changeset
|
151 log.debug(f"task {task_name} found") |
1261
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
152 to_import[task_name] = task_path |
1245 | 153 |
1261
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
154 for task_name, task_path in to_import.items(): |
a46d0e0f383b
tasks: `AFTER` attribute to handle tasks order:
Goffi <goffi@goffi.org>
parents:
1260
diff
changeset
|
155 await self.importTask(task_name, task_path, to_import) |
1155
813d54af8c0c
server (tasks): tasks can now be automatically ran when something happen in a watched dir:
Goffi <goffi@goffi.org>
parents:
1154
diff
changeset
|
156 |
1247
a6c7f07f1e4d
tasks: implicit tasks + Brython task:
Goffi <goffi@goffi.org>
parents:
1245
diff
changeset
|
157 async def parseTasks(self): |
a6c7f07f1e4d
tasks: implicit tasks + Brython task:
Goffi <goffi@goffi.org>
parents:
1245
diff
changeset
|
158 # implicit tasks are always run |
a6c7f07f1e4d
tasks: implicit tasks + Brython task:
Goffi <goffi@goffi.org>
parents:
1245
diff
changeset
|
159 implicit_path = Path(implicit.__file__).parent |
a6c7f07f1e4d
tasks: implicit tasks + Brython task:
Goffi <goffi@goffi.org>
parents:
1245
diff
changeset
|
160 await self.parseTasksDir(implicit_path) |
a6c7f07f1e4d
tasks: implicit tasks + Brython task:
Goffi <goffi@goffi.org>
parents:
1245
diff
changeset
|
161 # now we check if there are tasks specific to this site |
a6c7f07f1e4d
tasks: implicit tasks + Brython task:
Goffi <goffi@goffi.org>
parents:
1245
diff
changeset
|
162 if not self.tasks_dir.is_dir(): |
a6c7f07f1e4d
tasks: implicit tasks + Brython task:
Goffi <goffi@goffi.org>
parents:
1245
diff
changeset
|
163 log.debug(_("{name} has no task to launch.").format( |
1252
80a92eb82b7f
server (tasks manager): added a label for default site
Goffi <goffi@goffi.org>
parents:
1247
diff
changeset
|
164 name = self.resource.site_name or DEFAULT_SITE_LABEL)) |
1247
a6c7f07f1e4d
tasks: implicit tasks + Brython task:
Goffi <goffi@goffi.org>
parents:
1245
diff
changeset
|
165 return |
a6c7f07f1e4d
tasks: implicit tasks + Brython task:
Goffi <goffi@goffi.org>
parents:
1245
diff
changeset
|
166 else: |
a6c7f07f1e4d
tasks: implicit tasks + Brython task:
Goffi <goffi@goffi.org>
parents:
1245
diff
changeset
|
167 await self.parseTasksDir(self.tasks_dir) |
a6c7f07f1e4d
tasks: implicit tasks + Brython task:
Goffi <goffi@goffi.org>
parents:
1245
diff
changeset
|
168 |
1155
813d54af8c0c
server (tasks): tasks can now be automatically ran when something happen in a watched dir:
Goffi <goffi@goffi.org>
parents:
1154
diff
changeset
|
169 def _autorunTask(self, host, filepath, flags, task_name): |
813d54af8c0c
server (tasks): tasks can now be automatically ran when something happen in a watched dir:
Goffi <goffi@goffi.org>
parents:
1154
diff
changeset
|
170 """Called when an event is received from a watched directory""" |
813d54af8c0c
server (tasks): tasks can now be automatically ran when something happen in a watched dir:
Goffi <goffi@goffi.org>
parents:
1154
diff
changeset
|
171 if flags == ['create']: |
813d54af8c0c
server (tasks): tasks can now be automatically ran when something happen in a watched dir:
Goffi <goffi@goffi.org>
parents:
1154
diff
changeset
|
172 return |
1260 | 173 try: |
174 task = self.tasks[task_name] | |
175 on_dir_event_cb = task.onDirEvent | |
176 except AttributeError: | |
177 return defer.ensureDeferred(self.runTask(task_name)) | |
178 else: | |
179 return utils.asDeferred( | |
180 on_dir_event_cb, host, Path(filepath.path.decode()), flags) | |
1155
813d54af8c0c
server (tasks): tasks can now be automatically ran when something happen in a watched dir:
Goffi <goffi@goffi.org>
parents:
1154
diff
changeset
|
181 |
1260 | 182 async def runTaskInstance(self, task: Task) -> None: |
183 self._current_task = task.name | |
1216 | 184 log.info(_('== running task "{task_name}" for {site_name} =='.format( |
1260 | 185 task_name=task.name, site_name=self.site_name or DEFAULT_SITE_LABEL))) |
1155
813d54af8c0c
server (tasks): tasks can now be automatically ran when something happen in a watched dir:
Goffi <goffi@goffi.org>
parents:
1154
diff
changeset
|
186 os.chdir(self.site_path) |
813d54af8c0c
server (tasks): tasks can now be automatically ran when something happen in a watched dir:
Goffi <goffi@goffi.org>
parents:
1154
diff
changeset
|
187 try: |
1245 | 188 await utils.asDeferred(task.start) |
1155
813d54af8c0c
server (tasks): tasks can now be automatically ran when something happen in a watched dir:
Goffi <goffi@goffi.org>
parents:
1154
diff
changeset
|
189 except Exception as e: |
1245 | 190 on_error = task.ON_ERROR |
1216 | 191 if on_error == 'stop': |
1155
813d54af8c0c
server (tasks): tasks can now be automatically ran when something happen in a watched dir:
Goffi <goffi@goffi.org>
parents:
1154
diff
changeset
|
192 raise e |
1216 | 193 elif on_error == 'continue': |
194 log.warning(_('Task "{task_name}" failed for {site_name}: {reason}') | |
1260 | 195 .format(task_name=task.name, site_name=self.site_name, reason=e)) |
1155
813d54af8c0c
server (tasks): tasks can now be automatically ran when something happen in a watched dir:
Goffi <goffi@goffi.org>
parents:
1154
diff
changeset
|
196 else: |
1216 | 197 raise exceptions.InternalError("we should never reach this point") |
1155
813d54af8c0c
server (tasks): tasks can now be automatically ran when something happen in a watched dir:
Goffi <goffi@goffi.org>
parents:
1154
diff
changeset
|
198 self._current_task = None |
1146 | 199 |
1260 | 200 async def runTask(self, task_name: str) -> None: |
201 """Run a single task | |
202 | |
203 @param task_name(unicode): name of the task to run | |
204 """ | |
205 task = self.tasks[task_name] | |
206 await self.runTaskInstance(task) | |
207 | |
1245 | 208 async def runTasks(self): |
1146 | 209 """Run all the tasks found""" |
210 old_path = os.getcwd() | |
1216 | 211 for task_name, task_value in self.tasks.items(): |
1245 | 212 await self.runTask(task_name) |
1146 | 213 os.chdir(old_path) |