comparison libervia/web/server/tasks/manager.py @ 1518:eb00d593801d

refactoring: rename `libervia` to `libervia.web` + update imports following backend changes
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 16:49:28 +0200
parents libervia/server/tasks/manager.py@106bae41f5c8
children
comparison
equal deleted inserted replaced
1517:b8ed9726525b 1518:eb00d593801d
1 #!/usr/bin/env python3
2
3 # Libervia: a Salut à Toi frontend
4 # Copyright (C) 2011-2021 Jérôme Poisson <goffi@goffi.org>
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 import os
19 import os.path
20 from pathlib import Path
21 from typing import Dict
22 import importlib.util
23 from twisted.internet import defer
24 from libervia.backend.core.log import getLogger
25 from libervia.backend.core import exceptions
26 from libervia.backend.core.i18n import _
27 from libervia.backend.tools import utils
28 from libervia.web.server.constants import Const as C
29 from . import implicit
30 from .task import Task
31
32 log = getLogger(__name__)
33
34 DEFAULT_SITE_LABEL = _("default site")
35
36
37 class TasksManager:
38 """Handle tasks of a Libervia site"""
39
40 def __init__(self, host, site_resource):
41 """
42 @param site_resource(LiberviaRootResource): root resource of the site to manage
43 """
44 self.host = host
45 self.resource = site_resource
46 self.tasks_dir = self.site_path / C.TASKS_DIR
47 self.tasks = {}
48 self._build_path = None
49 self._current_task = None
50
51 @property
52 def site_path(self):
53 return Path(self.resource.site_path)
54
55 @property
56 def build_path(self):
57 """path where generated files will be build for this site"""
58 if self._build_path is None:
59 self._build_path = self.host.get_build_path(self.site_name)
60 return self._build_path
61
62 @property
63 def site_name(self):
64 return self.resource.site_name
65
66 def validate_data(self, task):
67 """Check workflow attributes in task"""
68
69 for var, allowed in (("ON_ERROR", ("continue", "stop")),
70 ("LOG_OUTPUT", bool),
71 ("WATCH_DIRS", list)):
72 value = getattr(task, var)
73
74 if isinstance(allowed, type):
75 if allowed is list and value is None:
76 continue
77 if not isinstance(value, allowed):
78 raise ValueError(
79 _("Unexpected value for {var}, {allowed} is expected.")
80 .format(var=var, allowed=allowed))
81 else:
82 if not value in allowed:
83 raise ValueError(_("Unexpected value for {var}: {value!r}").format(
84 var=var, value=value))
85
86 async def import_task(
87 self,
88 task_name: str,
89 task_path: Path,
90 to_import: Dict[str, Path]
91 ) -> None:
92 if task_name in self.tasks:
93 log.debug(f"skipping task {task_name} which is already imported")
94 return
95 module_name = f"{self.site_name or C.SITE_NAME_DEFAULT}.task.{task_name}"
96
97 spec = importlib.util.spec_from_file_location(module_name, task_path)
98 task_module = importlib.util.module_from_spec(spec)
99 spec.loader.exec_module(task_module)
100 task = task_module.Task(self, task_name)
101 if task.AFTER is not None:
102 for pre_task_name in task.AFTER:
103 log.debug(
104 f"task {task_name!r} must be run after {pre_task_name!r}")
105 try:
106 pre_task_path = to_import[pre_task_name]
107 except KeyError:
108 raise ValueError(
109 f"task {task_name!r} must be run after {pre_task_name!r}, "
110 f"however there is no task with such name")
111 await self.import_task(pre_task_name, pre_task_path, to_import)
112
113 # we launch prepare, which is a method used to prepare
114 # data at runtime (e.g. set WATCH_DIRS using config)
115 try:
116 prepare = task.prepare
117 except AttributeError:
118 pass
119 else:
120 log.info(_('== preparing task "{task_name}" for {site_name} =='.format(
121 task_name=task_name, site_name=self.site_name or DEFAULT_SITE_LABEL)))
122 try:
123 await utils.as_deferred(prepare)
124 except exceptions.CancelError as e:
125 log.debug(f"Skipping {task_name} which cancelled itself: {e}")
126 return
127
128 self.tasks[task_name] = task
129 self.validate_data(task)
130 if self.host.options['dev-mode']:
131 dirs = task.WATCH_DIRS or []
132 for dir_ in dirs:
133 self.host.files_watcher.watch_dir(
134 dir_, auto_add=True, recursive=True,
135 callback=self._autorun_task, task_name=task_name)
136
137 async def parse_tasks_dir(self, dir_path: Path) -> None:
138 log.debug(f"parsing tasks in {dir_path}")
139 tasks_paths = sorted(dir_path.glob('task_*.py'))
140 to_import = {}
141 for task_path in tasks_paths:
142 if not task_path.is_file():
143 continue
144 task_name = task_path.stem[5:].lower().strip()
145 if not task_name:
146 continue
147 if task_name in self.tasks:
148 raise exceptions.ConflictError(
149 "A task with the name [{name}] already exists".format(
150 name=task_name))
151 log.debug(f"task {task_name} found")
152 to_import[task_name] = task_path
153
154 for task_name, task_path in to_import.items():
155 await self.import_task(task_name, task_path, to_import)
156
157 async def parse_tasks(self):
158 # implicit tasks are always run
159 implicit_path = Path(implicit.__file__).parent
160 await self.parse_tasks_dir(implicit_path)
161 # now we check if there are tasks specific to this site
162 if not self.tasks_dir.is_dir():
163 log.debug(_("{name} has no task to launch.").format(
164 name = self.resource.site_name or DEFAULT_SITE_LABEL))
165 return
166 else:
167 await self.parse_tasks_dir(self.tasks_dir)
168
169 def _autorun_task(self, host, filepath, flags, task_name):
170 """Called when an event is received from a watched directory"""
171 if flags == ['create']:
172 return
173 try:
174 task = self.tasks[task_name]
175 on_dir_event_cb = task.on_dir_event
176 except AttributeError:
177 return defer.ensureDeferred(self.run_task(task_name))
178 else:
179 return utils.as_deferred(
180 on_dir_event_cb, host, Path(filepath.path.decode()), flags)
181
182 async def run_task_instance(self, task: Task) -> None:
183 self._current_task = task.name
184 log.info(_('== running task "{task_name}" for {site_name} =='.format(
185 task_name=task.name, site_name=self.site_name or DEFAULT_SITE_LABEL)))
186 os.chdir(self.site_path)
187 try:
188 await utils.as_deferred(task.start)
189 except Exception as e:
190 on_error = task.ON_ERROR
191 if on_error == 'stop':
192 raise e
193 elif on_error == 'continue':
194 log.warning(_('Task "{task_name}" failed for {site_name}: {reason}')
195 .format(task_name=task.name, site_name=self.site_name, reason=e))
196 else:
197 raise exceptions.InternalError("we should never reach this point")
198 self._current_task = None
199
200 async def run_task(self, task_name: str) -> None:
201 """Run a single task
202
203 @param task_name(unicode): name of the task to run
204 """
205 task = self.tasks[task_name]
206 await self.run_task_instance(task)
207
208 async def run_tasks(self):
209 """Run all the tasks found"""
210 old_path = os.getcwd()
211 for task_name, task_value in self.tasks.items():
212 await self.run_task(task_name)
213 os.chdir(old_path)