changeset 1260:6161076193e0

tasks: dir event filter: if a Task instance implements `onDirEvent`, it will be called instead of `start`, this way the task can filter the events or modify the workflow.
author Goffi <goffi@goffi.org>
date Sun, 03 May 2020 21:07:22 +0200
parents 0b269d4a46a3
children a46d0e0f383b
files libervia/server/tasks/manager.py libervia/server/tasks/task.py
diffstat 2 files changed, 22 insertions(+), 10 deletions(-) [+]
line wrap: on
line diff
--- a/libervia/server/tasks/manager.py	Sun May 03 19:32:20 2020 +0200
+++ b/libervia/server/tasks/manager.py	Sun May 03 21:07:22 2020 +0200
@@ -140,17 +140,19 @@
         """Called when an event is received from a watched directory"""
         if flags == ['create']:
             return
-        return defer.ensureDeferred(self.runTask(task_name))
-
-    async def runTask(self, task_name):
-        """Run a single task
+        try:
+            task = self.tasks[task_name]
+            on_dir_event_cb = task.onDirEvent
+        except AttributeError:
+            return defer.ensureDeferred(self.runTask(task_name))
+        else:
+            return utils.asDeferred(
+                on_dir_event_cb, host, Path(filepath.path.decode()), flags)
 
-        @param task_name(unicode): name of the task to run
-        """
-        task = self.tasks[task_name]
-        self._current_task = task_name
+    async def runTaskInstance(self, task: Task) -> None:
+        self._current_task = task.name
         log.info(_('== running task "{task_name}" for {site_name} =='.format(
-            task_name=task_name, site_name=self.site_name or DEFAULT_SITE_LABEL)))
+            task_name=task.name, site_name=self.site_name or DEFAULT_SITE_LABEL)))
         os.chdir(self.site_path)
         try:
             await utils.asDeferred(task.start)
@@ -160,11 +162,19 @@
                 raise e
             elif on_error == 'continue':
                 log.warning(_('Task "{task_name}" failed for {site_name}: {reason}')
-                    .format(task_name=task_name, site_name=self.site_name, reason=e))
+                    .format(task_name=task.name, site_name=self.site_name, reason=e))
             else:
                 raise exceptions.InternalError("we should never reach this point")
         self._current_task = None
 
+    async def runTask(self, task_name: str) -> None:
+        """Run a single task
+
+        @param task_name(unicode): name of the task to run
+        """
+        task = self.tasks[task_name]
+        await self.runTaskInstance(task)
+
     async def runTasks(self):
         """Run all the tasks found"""
         old_path = os.getcwd()
--- a/libervia/server/tasks/task.py	Sun May 03 19:32:20 2020 +0200
+++ b/libervia/server/tasks/task.py	Sun May 03 21:07:22 2020 +0200
@@ -31,6 +31,8 @@
     ON_ERROR: str = "stop"
     LOG_OUTPUT: bool = True
     # list of directories to check for restarting this task
+    # Task.onDirEvent will be called if it exists, otherwise
+    # the task will be run and Task.start will be called
     WATCH_DIRS: Optional[list] = None
 
     def __init__(self, manager):