diff libervia/server/tasks/manager.py @ 1245:079e8eb6e327

server (tasks): refactoring: - moved `TasksManager` to `server.tasks.manager` - tasks modules now must have a `Task` class which will be instanciated by TasksManager - `server.tasks.task` has a basis for `Task` class - `Task.prepare` can now be asynchronous - `importlib` is now used to import tasks, instead of exec - tasks are now parsed/run after pages are imported - `server.BackendReady` is now a coroutine - type hinting for Task attributes
author Goffi <goffi@goffi.org>
date Sat, 25 Apr 2020 16:08:55 +0200
parents libervia/server/tasks.py@f511f8fbbf8a
children a6c7f07f1e4d
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/server/tasks/manager.py	Sat Apr 25 16:08:55 2020 +0200
@@ -0,0 +1,160 @@
+#!/usr/bin/env python3
+
+# Libervia: a Salut à Toi frontend
+# Copyright (C) 2011-2020 Jérôme Poisson <goffi@goffi.org>
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+import os
+import os.path
+import importlib.util
+from sat.core.log import getLogger
+from sat.core import exceptions
+from sat.core.i18n import _
+from sat.tools import utils
+from libervia.server.constants import Const as C
+
+log = getLogger(__name__)
+
+
+class TasksManager:
+    """Handle tasks of a Libervia site"""
+    FILE_EXTS = {'py'}
+
+    def __init__(self, host, site_resource):
+        """
+        @param site_resource(LiberviaRootResource): root resource of the site to manage
+        """
+        self.host = host
+        self.resource = site_resource
+        self.tasks_dir = os.path.join(self.resource.site_path, C.TASKS_DIR)
+        self.tasks = {}
+        self._build_path = None
+        self._current_task = None
+
+    @property
+    def site_path(self):
+        return self.resource.site_path
+
+    @property
+    def build_path(self):
+        """path where generated files will be build for this site"""
+        if self._build_path is None:
+            self._build_path = self.host.getBuildPath(self.site_name)
+        return self._build_path
+
+    @property
+    def site_name(self):
+        return self.resource.site_name
+
+    def validateData(self, task):
+        """Check workflow attributes in task"""
+
+        for var, allowed in (("ON_ERROR", ("continue", "stop")),
+                             ("LOG_OUTPUT", bool),
+                             ("WATCH_DIRS", list)):
+            value = getattr(task, var)
+
+            if isinstance(allowed, type):
+                if allowed is list and value is None:
+                    continue
+                if not isinstance(value, allowed):
+                    raise ValueError(
+                        _("Unexpected value for {var}, {allowed} is expected.")
+                        .format(var=var, allowed=allowed))
+            else:
+                if not value in allowed:
+                    raise ValueError(_("Unexpected value for {var}: {value!r}").format(
+                        var=var, value=value))
+
+    async def parseTasks(self):
+        if not os.path.isdir(self.tasks_dir):
+            log.debug(_("{name} has no task to launch.").format(
+                name = self.resource.site_name or "default site"))
+            return
+        filenames = os.listdir(self.tasks_dir)
+        filenames.sort()
+        for filename in filenames:
+            filepath = os.path.join(self.tasks_dir, filename)
+            if not filename.startswith('task_') or not os.path.isfile(filepath):
+                continue
+            task_name, ext = os.path.splitext(filename)
+            task_name = task_name[5:].lower().strip()
+            if not task_name:
+                continue
+            if ext[1:] not in self.FILE_EXTS:
+                continue
+            if task_name in self.tasks:
+                raise exceptions.ConflictError(
+                    "A task with the name [{name}] already exists".format(
+                        name=task_name))
+            module_name = f"{self.site_name}.task.{task_name}"
+
+            spec = importlib.util.spec_from_file_location(module_name, filepath)
+            task_module = importlib.util.module_from_spec(spec)
+            spec.loader.exec_module(task_module)
+            task = task_module.Task(self)
+            self.tasks[task_name] = task
+
+
+            # we launch prepare, which is a method used to prepare
+            # data at runtime (e.g. set WATCH_DIRS using config)
+            try:
+                prepare = task.prepare
+            except AttributeError:
+                pass
+            else:
+                await utils.asDeferred(prepare)
+            self.validateData(task)
+            if self.host.options['dev_mode']:
+                dirs = task.WATCH_DIRS or []
+                for dir_ in dirs:
+                    self.host.files_watcher.watchDir(
+                        dir_, auto_add=True, recursive=True,
+                        callback=self._autorunTask, task_name=task_name)
+
+    def _autorunTask(self, host, filepath, flags, task_name):
+        """Called when an event is received from a watched directory"""
+        if flags == ['create']:
+            return
+        return defer.ensureDeferred(self.runTask(task_name))
+
+    async def runTask(self, task_name):
+        """Run a single task
+
+        @param task_name(unicode): name of the task to run
+        """
+        task = self.tasks[task_name]
+        self._current_task = task_name
+        log.info(_('== running task "{task_name}" for {site_name} =='.format(
+            task_name=task_name, site_name=self.site_name)))
+        os.chdir(self.site_path)
+        try:
+            await utils.asDeferred(task.start)
+        except Exception as e:
+            on_error = task.ON_ERROR
+            if on_error == 'stop':
+                raise e
+            elif on_error == 'continue':
+                log.warning(_('Task "{task_name}" failed for {site_name}: {reason}')
+                    .format(task_name=task_name, site_name=self.site_name, reason=e))
+            else:
+                raise exceptions.InternalError("we should never reach this point")
+        self._current_task = None
+
+    async def runTasks(self):
+        """Run all the tasks found"""
+        old_path = os.getcwd()
+        for task_name, task_value in self.tasks.items():
+            await self.runTask(task_name)
+        os.chdir(old_path)