comparison libervia/server/tasks/manager.py @ 1245:079e8eb6e327

server (tasks): refactoring: - moved `TasksManager` to `server.tasks.manager` - tasks modules now must have a `Task` class which will be instanciated by TasksManager - `server.tasks.task` has a basis for `Task` class - `Task.prepare` can now be asynchronous - `importlib` is now used to import tasks, instead of exec - tasks are now parsed/run after pages are imported - `server.BackendReady` is now a coroutine - type hinting for Task attributes
author Goffi <goffi@goffi.org>
date Sat, 25 Apr 2020 16:08:55 +0200
parents libervia/server/tasks.py@f511f8fbbf8a
children a6c7f07f1e4d
comparison
equal deleted inserted replaced
1244:2ed4e399e1d4 1245:079e8eb6e327
1 #!/usr/bin/env python3
2
3 # Libervia: a Salut à Toi frontend
4 # Copyright (C) 2011-2020 Jérôme Poisson <goffi@goffi.org>
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 import os
19 import os.path
20 import importlib.util
21 from sat.core.log import getLogger
22 from sat.core import exceptions
23 from sat.core.i18n import _
24 from sat.tools import utils
25 from libervia.server.constants import Const as C
26
27 log = getLogger(__name__)
28
29
30 class TasksManager:
31 """Handle tasks of a Libervia site"""
32 FILE_EXTS = {'py'}
33
34 def __init__(self, host, site_resource):
35 """
36 @param site_resource(LiberviaRootResource): root resource of the site to manage
37 """
38 self.host = host
39 self.resource = site_resource
40 self.tasks_dir = os.path.join(self.resource.site_path, C.TASKS_DIR)
41 self.tasks = {}
42 self._build_path = None
43 self._current_task = None
44
45 @property
46 def site_path(self):
47 return self.resource.site_path
48
49 @property
50 def build_path(self):
51 """path where generated files will be build for this site"""
52 if self._build_path is None:
53 self._build_path = self.host.getBuildPath(self.site_name)
54 return self._build_path
55
56 @property
57 def site_name(self):
58 return self.resource.site_name
59
60 def validateData(self, task):
61 """Check workflow attributes in task"""
62
63 for var, allowed in (("ON_ERROR", ("continue", "stop")),
64 ("LOG_OUTPUT", bool),
65 ("WATCH_DIRS", list)):
66 value = getattr(task, var)
67
68 if isinstance(allowed, type):
69 if allowed is list and value is None:
70 continue
71 if not isinstance(value, allowed):
72 raise ValueError(
73 _("Unexpected value for {var}, {allowed} is expected.")
74 .format(var=var, allowed=allowed))
75 else:
76 if not value in allowed:
77 raise ValueError(_("Unexpected value for {var}: {value!r}").format(
78 var=var, value=value))
79
80 async def parseTasks(self):
81 if not os.path.isdir(self.tasks_dir):
82 log.debug(_("{name} has no task to launch.").format(
83 name = self.resource.site_name or "default site"))
84 return
85 filenames = os.listdir(self.tasks_dir)
86 filenames.sort()
87 for filename in filenames:
88 filepath = os.path.join(self.tasks_dir, filename)
89 if not filename.startswith('task_') or not os.path.isfile(filepath):
90 continue
91 task_name, ext = os.path.splitext(filename)
92 task_name = task_name[5:].lower().strip()
93 if not task_name:
94 continue
95 if ext[1:] not in self.FILE_EXTS:
96 continue
97 if task_name in self.tasks:
98 raise exceptions.ConflictError(
99 "A task with the name [{name}] already exists".format(
100 name=task_name))
101 module_name = f"{self.site_name}.task.{task_name}"
102
103 spec = importlib.util.spec_from_file_location(module_name, filepath)
104 task_module = importlib.util.module_from_spec(spec)
105 spec.loader.exec_module(task_module)
106 task = task_module.Task(self)
107 self.tasks[task_name] = task
108
109
110 # we launch prepare, which is a method used to prepare
111 # data at runtime (e.g. set WATCH_DIRS using config)
112 try:
113 prepare = task.prepare
114 except AttributeError:
115 pass
116 else:
117 await utils.asDeferred(prepare)
118 self.validateData(task)
119 if self.host.options['dev_mode']:
120 dirs = task.WATCH_DIRS or []
121 for dir_ in dirs:
122 self.host.files_watcher.watchDir(
123 dir_, auto_add=True, recursive=True,
124 callback=self._autorunTask, task_name=task_name)
125
126 def _autorunTask(self, host, filepath, flags, task_name):
127 """Called when an event is received from a watched directory"""
128 if flags == ['create']:
129 return
130 return defer.ensureDeferred(self.runTask(task_name))
131
132 async def runTask(self, task_name):
133 """Run a single task
134
135 @param task_name(unicode): name of the task to run
136 """
137 task = self.tasks[task_name]
138 self._current_task = task_name
139 log.info(_('== running task "{task_name}" for {site_name} =='.format(
140 task_name=task_name, site_name=self.site_name)))
141 os.chdir(self.site_path)
142 try:
143 await utils.asDeferred(task.start)
144 except Exception as e:
145 on_error = task.ON_ERROR
146 if on_error == 'stop':
147 raise e
148 elif on_error == 'continue':
149 log.warning(_('Task "{task_name}" failed for {site_name}: {reason}')
150 .format(task_name=task_name, site_name=self.site_name, reason=e))
151 else:
152 raise exceptions.InternalError("we should never reach this point")
153 self._current_task = None
154
155 async def runTasks(self):
156 """Run all the tasks found"""
157 old_path = os.getcwd()
158 for task_name, task_value in self.tasks.items():
159 await self.runTask(task_name)
160 os.chdir(old_path)