comparison libervia/server/tasks/manager.py @ 1260:6161076193e0

tasks: dir event filter: if a Task instance implements `onDirEvent`, it will be called instead of `start`, this way the task can filter the events or modify the workflow.
author Goffi <goffi@goffi.org>
date Sun, 03 May 2020 21:07:22 +0200
parents 80a92eb82b7f
children a46d0e0f383b
comparison
equal deleted inserted replaced
1259:0b269d4a46a3 1260:6161076193e0
138 138
139 def _autorunTask(self, host, filepath, flags, task_name): 139 def _autorunTask(self, host, filepath, flags, task_name):
140 """Called when an event is received from a watched directory""" 140 """Called when an event is received from a watched directory"""
141 if flags == ['create']: 141 if flags == ['create']:
142 return 142 return
143 return defer.ensureDeferred(self.runTask(task_name)) 143 try:
144 task = self.tasks[task_name]
145 on_dir_event_cb = task.onDirEvent
146 except AttributeError:
147 return defer.ensureDeferred(self.runTask(task_name))
148 else:
149 return utils.asDeferred(
150 on_dir_event_cb, host, Path(filepath.path.decode()), flags)
144 151
145 async def runTask(self, task_name): 152 async def runTaskInstance(self, task: Task) -> None:
146 """Run a single task 153 self._current_task = task.name
147
148 @param task_name(unicode): name of the task to run
149 """
150 task = self.tasks[task_name]
151 self._current_task = task_name
152 log.info(_('== running task "{task_name}" for {site_name} =='.format( 154 log.info(_('== running task "{task_name}" for {site_name} =='.format(
153 task_name=task_name, site_name=self.site_name or DEFAULT_SITE_LABEL))) 155 task_name=task.name, site_name=self.site_name or DEFAULT_SITE_LABEL)))
154 os.chdir(self.site_path) 156 os.chdir(self.site_path)
155 try: 157 try:
156 await utils.asDeferred(task.start) 158 await utils.asDeferred(task.start)
157 except Exception as e: 159 except Exception as e:
158 on_error = task.ON_ERROR 160 on_error = task.ON_ERROR
159 if on_error == 'stop': 161 if on_error == 'stop':
160 raise e 162 raise e
161 elif on_error == 'continue': 163 elif on_error == 'continue':
162 log.warning(_('Task "{task_name}" failed for {site_name}: {reason}') 164 log.warning(_('Task "{task_name}" failed for {site_name}: {reason}')
163 .format(task_name=task_name, site_name=self.site_name, reason=e)) 165 .format(task_name=task.name, site_name=self.site_name, reason=e))
164 else: 166 else:
165 raise exceptions.InternalError("we should never reach this point") 167 raise exceptions.InternalError("we should never reach this point")
166 self._current_task = None 168 self._current_task = None
169
170 async def runTask(self, task_name: str) -> None:
171 """Run a single task
172
173 @param task_name(unicode): name of the task to run
174 """
175 task = self.tasks[task_name]
176 await self.runTaskInstance(task)
167 177
168 async def runTasks(self): 178 async def runTasks(self):
169 """Run all the tasks found""" 179 """Run all the tasks found"""
170 old_path = os.getcwd() 180 old_path = os.getcwd()
171 for task_name, task_value in self.tasks.items(): 181 for task_name, task_value in self.tasks.items():