Mercurial > libervia-web
comparison libervia/server/tasks/manager.py @ 1260:6161076193e0
tasks: dir event filter:
if a Task instance implements `onDirEvent`, it will be called instead of `start`, this way
the task can filter the events or modify the workflow.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 03 May 2020 21:07:22 +0200 |
parents | 80a92eb82b7f |
children | a46d0e0f383b |
comparison
equal
deleted
inserted
replaced
1259:0b269d4a46a3 | 1260:6161076193e0 |
---|---|
138 | 138 |
139 def _autorunTask(self, host, filepath, flags, task_name): | 139 def _autorunTask(self, host, filepath, flags, task_name): |
140 """Called when an event is received from a watched directory""" | 140 """Called when an event is received from a watched directory""" |
141 if flags == ['create']: | 141 if flags == ['create']: |
142 return | 142 return |
143 return defer.ensureDeferred(self.runTask(task_name)) | 143 try: |
144 task = self.tasks[task_name] | |
145 on_dir_event_cb = task.onDirEvent | |
146 except AttributeError: | |
147 return defer.ensureDeferred(self.runTask(task_name)) | |
148 else: | |
149 return utils.asDeferred( | |
150 on_dir_event_cb, host, Path(filepath.path.decode()), flags) | |
144 | 151 |
145 async def runTask(self, task_name): | 152 async def runTaskInstance(self, task: Task) -> None: |
146 """Run a single task | 153 self._current_task = task.name |
147 | |
148 @param task_name(unicode): name of the task to run | |
149 """ | |
150 task = self.tasks[task_name] | |
151 self._current_task = task_name | |
152 log.info(_('== running task "{task_name}" for {site_name} =='.format( | 154 log.info(_('== running task "{task_name}" for {site_name} =='.format( |
153 task_name=task_name, site_name=self.site_name or DEFAULT_SITE_LABEL))) | 155 task_name=task.name, site_name=self.site_name or DEFAULT_SITE_LABEL))) |
154 os.chdir(self.site_path) | 156 os.chdir(self.site_path) |
155 try: | 157 try: |
156 await utils.asDeferred(task.start) | 158 await utils.asDeferred(task.start) |
157 except Exception as e: | 159 except Exception as e: |
158 on_error = task.ON_ERROR | 160 on_error = task.ON_ERROR |
159 if on_error == 'stop': | 161 if on_error == 'stop': |
160 raise e | 162 raise e |
161 elif on_error == 'continue': | 163 elif on_error == 'continue': |
162 log.warning(_('Task "{task_name}" failed for {site_name}: {reason}') | 164 log.warning(_('Task "{task_name}" failed for {site_name}: {reason}') |
163 .format(task_name=task_name, site_name=self.site_name, reason=e)) | 165 .format(task_name=task.name, site_name=self.site_name, reason=e)) |
164 else: | 166 else: |
165 raise exceptions.InternalError("we should never reach this point") | 167 raise exceptions.InternalError("we should never reach this point") |
166 self._current_task = None | 168 self._current_task = None |
169 | |
170 async def runTask(self, task_name: str) -> None: | |
171 """Run a single task | |
172 | |
173 @param task_name(unicode): name of the task to run | |
174 """ | |
175 task = self.tasks[task_name] | |
176 await self.runTaskInstance(task) | |
167 | 177 |
168 async def runTasks(self): | 178 async def runTasks(self): |
169 """Run all the tasks found""" | 179 """Run all the tasks found""" |
170 old_path = os.getcwd() | 180 old_path = os.getcwd() |
171 for task_name, task_value in self.tasks.items(): | 181 for task_name, task_value in self.tasks.items(): |