comparison libervia/server/tasks/manager.py @ 1261:a46d0e0f383b

tasks: `AFTER` attribute to handle tasks order: A Task instance can now put a list of tasks names in its `AFTER` attribute, if will then be prepared and launched after it.
author Goffi <goffi@goffi.org>
date Sun, 03 May 2020 21:07:23 +0200
parents 6161076193e0
children df40708c4c76
comparison
equal deleted inserted replaced
1260:6161076193e0 1261:a46d0e0f383b
16 # You should have received a copy of the GNU Affero General Public License 16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. 17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 import os 18 import os
19 import os.path 19 import os.path
20 from pathlib import Path 20 from pathlib import Path
21 from typing import Dict
21 import importlib.util 22 import importlib.util
22 from twisted.internet import defer 23 from twisted.internet import defer
23 from sat.core.log import getLogger 24 from sat.core.log import getLogger
24 from sat.core import exceptions 25 from sat.core import exceptions
25 from sat.core.i18n import _ 26 from sat.core.i18n import _
26 from sat.tools import utils 27 from sat.tools import utils
27 from libervia.server.constants import Const as C 28 from libervia.server.constants import Const as C
28 from . import implicit 29 from . import implicit
30 from .task import Task
29 31
30 log = getLogger(__name__) 32 log = getLogger(__name__)
31 33
32 DEFAULT_SITE_LABEL = _("default site") 34 DEFAULT_SITE_LABEL = _("default site")
33 35
79 else: 81 else:
80 if not value in allowed: 82 if not value in allowed:
81 raise ValueError(_("Unexpected value for {var}: {value!r}").format( 83 raise ValueError(_("Unexpected value for {var}: {value!r}").format(
82 var=var, value=value)) 84 var=var, value=value))
83 85
84 async def parseTasksDir(self, dir_path): 86 async def importTask(
87 self,
88 task_name: str,
89 task_path: Path,
90 to_import: Dict[str, Path]
91 ) -> None:
92 if task_name in self.tasks:
93 log.debug(f"skipping task {task_name} which is already imported")
94 return
95 module_name = f"{self.site_name or C.SITE_NAME_DEFAULT}.task.{task_name}"
96
97 spec = importlib.util.spec_from_file_location(module_name, task_path)
98 task_module = importlib.util.module_from_spec(spec)
99 spec.loader.exec_module(task_module)
100 task = task_module.Task(self, task_name)
101 if task.AFTER is not None:
102 for pre_task_name in task.AFTER:
103 log.debug(
104 f"task {task_name!r} must be run after {pre_task_name!r}")
105 try:
106 pre_task_path = to_import[pre_task_name]
107 except KeyError:
108 raise ValueError(
109 f"task {task_name!r} must be run after {pre_task_name!r}, "
110 f"however there is no task with such name")
111 await self.importTask(pre_task_name, pre_task_path, to_import)
112
113 # we launch prepare, which is a method used to prepare
114 # data at runtime (e.g. set WATCH_DIRS using config)
115 try:
116 prepare = task.prepare
117 except AttributeError:
118 pass
119 else:
120 log.info(_('== preparing task "{task_name}" for {site_name} =='.format(
121 task_name=task_name, site_name=self.site_name or DEFAULT_SITE_LABEL)))
122 try:
123 await utils.asDeferred(prepare)
124 except exceptions.CancelError as e:
125 log.debug(f"Skipping {task_name} which cancelled itself: {e}")
126 return
127
128 self.tasks[task_name] = task
129 self.validateData(task)
130 if self.host.options['dev_mode']:
131 dirs = task.WATCH_DIRS or []
132 for dir_ in dirs:
133 self.host.files_watcher.watchDir(
134 dir_, auto_add=True, recursive=True,
135 callback=self._autorunTask, task_name=task_name)
136
137 async def parseTasksDir(self, dir_path: Path) -> None:
85 log.debug(f"parsing tasks in {dir_path}") 138 log.debug(f"parsing tasks in {dir_path}")
86 tasks_paths = sorted(dir_path.glob('task_*.py')) 139 tasks_paths = sorted(dir_path.glob('task_*.py'))
140 to_import = {}
87 for task_path in tasks_paths: 141 for task_path in tasks_paths:
88 if not task_path.is_file(): 142 if not task_path.is_file():
89 continue 143 continue
90 task_name = task_path.stem[5:].lower().strip() 144 task_name = task_path.stem[5:].lower().strip()
91 if not task_name: 145 if not task_name:
93 if task_name in self.tasks: 147 if task_name in self.tasks:
94 raise exceptions.ConflictError( 148 raise exceptions.ConflictError(
95 "A task with the name [{name}] already exists".format( 149 "A task with the name [{name}] already exists".format(
96 name=task_name)) 150 name=task_name))
97 log.debug(f"task {task_name} found") 151 log.debug(f"task {task_name} found")
98 module_name = f"{self.site_name or C.SITE_NAME_DEFAULT}.task.{task_name}" 152 to_import[task_name] = task_path
99 153
100 spec = importlib.util.spec_from_file_location(module_name, task_path) 154 for task_name, task_path in to_import.items():
101 task_module = importlib.util.module_from_spec(spec) 155 await self.importTask(task_name, task_path, to_import)
102 spec.loader.exec_module(task_module)
103 task = task_module.Task(self)
104
105 # we launch prepare, which is a method used to prepare
106 # data at runtime (e.g. set WATCH_DIRS using config)
107 try:
108 prepare = task.prepare
109 except AttributeError:
110 pass
111 else:
112 try:
113 await utils.asDeferred(prepare)
114 except exceptions.CancelError as e:
115 log.debug(f"Skipping {task_name} which cancelled itself: {e}")
116 continue
117
118 self.tasks[task_name] = task
119 self.validateData(task)
120 if self.host.options['dev_mode']:
121 dirs = task.WATCH_DIRS or []
122 for dir_ in dirs:
123 self.host.files_watcher.watchDir(
124 dir_, auto_add=True, recursive=True,
125 callback=self._autorunTask, task_name=task_name)
126 156
127 async def parseTasks(self): 157 async def parseTasks(self):
128 # implicit tasks are always run 158 # implicit tasks are always run
129 implicit_path = Path(implicit.__file__).parent 159 implicit_path = Path(implicit.__file__).parent
130 await self.parseTasksDir(implicit_path) 160 await self.parseTasksDir(implicit_path)