comparison libervia/server/tasks/manager.py @ 1509:106bae41f5c8

massive refactoring from camelCase -> snake_case. See backend commit log for more details
author Goffi <goffi@goffi.org>
date Sat, 08 Apr 2023 13:44:11 +0200
parents 822bd0139769
children
comparison
equal deleted inserted replaced
1508:ec3ad9abf9f9 1509:106bae41f5c8
54 54
55 @property 55 @property
56 def build_path(self): 56 def build_path(self):
57 """path where generated files will be build for this site""" 57 """path where generated files will be build for this site"""
58 if self._build_path is None: 58 if self._build_path is None:
59 self._build_path = self.host.getBuildPath(self.site_name) 59 self._build_path = self.host.get_build_path(self.site_name)
60 return self._build_path 60 return self._build_path
61 61
62 @property 62 @property
63 def site_name(self): 63 def site_name(self):
64 return self.resource.site_name 64 return self.resource.site_name
65 65
66 def validateData(self, task): 66 def validate_data(self, task):
67 """Check workflow attributes in task""" 67 """Check workflow attributes in task"""
68 68
69 for var, allowed in (("ON_ERROR", ("continue", "stop")), 69 for var, allowed in (("ON_ERROR", ("continue", "stop")),
70 ("LOG_OUTPUT", bool), 70 ("LOG_OUTPUT", bool),
71 ("WATCH_DIRS", list)): 71 ("WATCH_DIRS", list)):
81 else: 81 else:
82 if not value in allowed: 82 if not value in allowed:
83 raise ValueError(_("Unexpected value for {var}: {value!r}").format( 83 raise ValueError(_("Unexpected value for {var}: {value!r}").format(
84 var=var, value=value)) 84 var=var, value=value))
85 85
86 async def importTask( 86 async def import_task(
87 self, 87 self,
88 task_name: str, 88 task_name: str,
89 task_path: Path, 89 task_path: Path,
90 to_import: Dict[str, Path] 90 to_import: Dict[str, Path]
91 ) -> None: 91 ) -> None:
106 pre_task_path = to_import[pre_task_name] 106 pre_task_path = to_import[pre_task_name]
107 except KeyError: 107 except KeyError:
108 raise ValueError( 108 raise ValueError(
109 f"task {task_name!r} must be run after {pre_task_name!r}, " 109 f"task {task_name!r} must be run after {pre_task_name!r}, "
110 f"however there is no task with such name") 110 f"however there is no task with such name")
111 await self.importTask(pre_task_name, pre_task_path, to_import) 111 await self.import_task(pre_task_name, pre_task_path, to_import)
112 112
113 # we launch prepare, which is a method used to prepare 113 # we launch prepare, which is a method used to prepare
114 # data at runtime (e.g. set WATCH_DIRS using config) 114 # data at runtime (e.g. set WATCH_DIRS using config)
115 try: 115 try:
116 prepare = task.prepare 116 prepare = task.prepare
118 pass 118 pass
119 else: 119 else:
120 log.info(_('== preparing task "{task_name}" for {site_name} =='.format( 120 log.info(_('== preparing task "{task_name}" for {site_name} =='.format(
121 task_name=task_name, site_name=self.site_name or DEFAULT_SITE_LABEL))) 121 task_name=task_name, site_name=self.site_name or DEFAULT_SITE_LABEL)))
122 try: 122 try:
123 await utils.asDeferred(prepare) 123 await utils.as_deferred(prepare)
124 except exceptions.CancelError as e: 124 except exceptions.CancelError as e:
125 log.debug(f"Skipping {task_name} which cancelled itself: {e}") 125 log.debug(f"Skipping {task_name} which cancelled itself: {e}")
126 return 126 return
127 127
128 self.tasks[task_name] = task 128 self.tasks[task_name] = task
129 self.validateData(task) 129 self.validate_data(task)
130 if self.host.options['dev-mode']: 130 if self.host.options['dev-mode']:
131 dirs = task.WATCH_DIRS or [] 131 dirs = task.WATCH_DIRS or []
132 for dir_ in dirs: 132 for dir_ in dirs:
133 self.host.files_watcher.watchDir( 133 self.host.files_watcher.watch_dir(
134 dir_, auto_add=True, recursive=True, 134 dir_, auto_add=True, recursive=True,
135 callback=self._autorunTask, task_name=task_name) 135 callback=self._autorun_task, task_name=task_name)
136 136
137 async def parseTasksDir(self, dir_path: Path) -> None: 137 async def parse_tasks_dir(self, dir_path: Path) -> None:
138 log.debug(f"parsing tasks in {dir_path}") 138 log.debug(f"parsing tasks in {dir_path}")
139 tasks_paths = sorted(dir_path.glob('task_*.py')) 139 tasks_paths = sorted(dir_path.glob('task_*.py'))
140 to_import = {} 140 to_import = {}
141 for task_path in tasks_paths: 141 for task_path in tasks_paths:
142 if not task_path.is_file(): 142 if not task_path.is_file():
150 name=task_name)) 150 name=task_name))
151 log.debug(f"task {task_name} found") 151 log.debug(f"task {task_name} found")
152 to_import[task_name] = task_path 152 to_import[task_name] = task_path
153 153
154 for task_name, task_path in to_import.items(): 154 for task_name, task_path in to_import.items():
155 await self.importTask(task_name, task_path, to_import) 155 await self.import_task(task_name, task_path, to_import)
156 156
157 async def parseTasks(self): 157 async def parse_tasks(self):
158 # implicit tasks are always run 158 # implicit tasks are always run
159 implicit_path = Path(implicit.__file__).parent 159 implicit_path = Path(implicit.__file__).parent
160 await self.parseTasksDir(implicit_path) 160 await self.parse_tasks_dir(implicit_path)
161 # now we check if there are tasks specific to this site 161 # now we check if there are tasks specific to this site
162 if not self.tasks_dir.is_dir(): 162 if not self.tasks_dir.is_dir():
163 log.debug(_("{name} has no task to launch.").format( 163 log.debug(_("{name} has no task to launch.").format(
164 name = self.resource.site_name or DEFAULT_SITE_LABEL)) 164 name = self.resource.site_name or DEFAULT_SITE_LABEL))
165 return 165 return
166 else: 166 else:
167 await self.parseTasksDir(self.tasks_dir) 167 await self.parse_tasks_dir(self.tasks_dir)
168 168
169 def _autorunTask(self, host, filepath, flags, task_name): 169 def _autorun_task(self, host, filepath, flags, task_name):
170 """Called when an event is received from a watched directory""" 170 """Called when an event is received from a watched directory"""
171 if flags == ['create']: 171 if flags == ['create']:
172 return 172 return
173 try: 173 try:
174 task = self.tasks[task_name] 174 task = self.tasks[task_name]
175 on_dir_event_cb = task.onDirEvent 175 on_dir_event_cb = task.on_dir_event
176 except AttributeError: 176 except AttributeError:
177 return defer.ensureDeferred(self.runTask(task_name)) 177 return defer.ensureDeferred(self.run_task(task_name))
178 else: 178 else:
179 return utils.asDeferred( 179 return utils.as_deferred(
180 on_dir_event_cb, host, Path(filepath.path.decode()), flags) 180 on_dir_event_cb, host, Path(filepath.path.decode()), flags)
181 181
182 async def runTaskInstance(self, task: Task) -> None: 182 async def run_task_instance(self, task: Task) -> None:
183 self._current_task = task.name 183 self._current_task = task.name
184 log.info(_('== running task "{task_name}" for {site_name} =='.format( 184 log.info(_('== running task "{task_name}" for {site_name} =='.format(
185 task_name=task.name, site_name=self.site_name or DEFAULT_SITE_LABEL))) 185 task_name=task.name, site_name=self.site_name or DEFAULT_SITE_LABEL)))
186 os.chdir(self.site_path) 186 os.chdir(self.site_path)
187 try: 187 try:
188 await utils.asDeferred(task.start) 188 await utils.as_deferred(task.start)
189 except Exception as e: 189 except Exception as e:
190 on_error = task.ON_ERROR 190 on_error = task.ON_ERROR
191 if on_error == 'stop': 191 if on_error == 'stop':
192 raise e 192 raise e
193 elif on_error == 'continue': 193 elif on_error == 'continue':
195 .format(task_name=task.name, site_name=self.site_name, reason=e)) 195 .format(task_name=task.name, site_name=self.site_name, reason=e))
196 else: 196 else:
197 raise exceptions.InternalError("we should never reach this point") 197 raise exceptions.InternalError("we should never reach this point")
198 self._current_task = None 198 self._current_task = None
199 199
200 async def runTask(self, task_name: str) -> None: 200 async def run_task(self, task_name: str) -> None:
201 """Run a single task 201 """Run a single task
202 202
203 @param task_name(unicode): name of the task to run 203 @param task_name(unicode): name of the task to run
204 """ 204 """
205 task = self.tasks[task_name] 205 task = self.tasks[task_name]
206 await self.runTaskInstance(task) 206 await self.run_task_instance(task)
207 207
208 async def runTasks(self): 208 async def run_tasks(self):
209 """Run all the tasks found""" 209 """Run all the tasks found"""
210 old_path = os.getcwd() 210 old_path = os.getcwd()
211 for task_name, task_value in self.tasks.items(): 211 for task_name, task_value in self.tasks.items():
212 await self.runTask(task_name) 212 await self.run_task(task_name)
213 os.chdir(old_path) 213 os.chdir(old_path)