changeset 1245:079e8eb6e327

server (tasks): refactoring: - moved `TasksManager` to `server.tasks.manager` - tasks modules now must have a `Task` class which will be instanciated by TasksManager - `server.tasks.task` has a basis for `Task` class - `Task.prepare` can now be asynchronous - `importlib` is now used to import tasks, instead of exec - tasks are now parsed/run after pages are imported - `server.BackendReady` is now a coroutine - type hinting for Task attributes
author Goffi <goffi@goffi.org>
date Sat, 25 Apr 2020 16:08:55 +0200
parents 2ed4e399e1d4
children aaf28d45ae67
files libervia/server/server.py libervia/server/tasks.py libervia/server/tasks/manager.py libervia/server/tasks/task.py
diffstat 4 files changed, 254 insertions(+), 207 deletions(-) [+]
line wrap: on
line diff
--- a/libervia/server/server.py	Mon Apr 20 17:33:41 2020 +0200
+++ b/libervia/server/server.py	Sat Apr 25 16:08:55 2020 +0200
@@ -54,7 +54,7 @@
 from libervia.server import websockets
 from libervia.server.pages import LiberviaPage
 from libervia.server.utils import quote, ProgressHandler
-from libervia.server.tasks import TasksManager
+from libervia.server.tasks.manager import TasksManager
 from functools import partial
 
 try:
@@ -734,8 +734,7 @@
         if default_dict:
             conf[''] = default_dict
 
-    @defer.inlineCallbacks
-    def backendReady(self, __):
+    async def backendReady(self):
         log.info(f"Libervia v{self.full_version}")
         if self.options['dev_mode']:
             log.info(_("Developer mode activated"))
@@ -763,9 +762,10 @@
                 default_site_path, auto_add=True, recursive=True,
                 callback=LiberviaPage.onFileChange, site_root=self.sat_root,
                 site_path=default_site_path)
+        LiberviaPage.importPages(self, self.sat_root)
         tasks_manager = TasksManager(self, self.sat_root)
-        yield tasks_manager.runTasks()
-        LiberviaPage.importPages(self, self.sat_root)
+        await tasks_manager.parseTasks()
+        await tasks_manager.runTasks()
         # FIXME: handle _setMenu in a more generic way, taking care of external sites
         self.sat_root._setMenu(self.options["menu_json"])
         self.vhost_root.default = default_root
@@ -805,8 +805,6 @@
                         site_path, auto_add=True, recursive=True,
                         callback=LiberviaPage.onFileChange, site_root=res,
                         site_path=site_path)
-                tasks_manager = TasksManager(self, res)
-                yield tasks_manager.runTasks()
                 res.putChild(
                     C.BUILD_DIR.encode('utf-8'),
                     ProtectedFile(
@@ -820,6 +818,9 @@
                 #        we may want to disable access to the page by direct URL
                 #        (e.g. /blog disabled except if called by external site)
                 LiberviaPage.importPages(self, res, root_path=default_site_path)
+                tasks_manager = TasksManager(self, res)
+                await tasks_manager.parseTasks()
+                await tasks_manager.runTasks()
                 res._setMenu(self.options["menu_json"])
 
             self.vhost_root.addHost(host_name.encode('utf-8'), res)
@@ -895,7 +896,7 @@
             lambda: self.initialised.callback(None),
             lambda failure: self.initialised.errback(Exception(failure)),
         )
-        self.initialised.addCallback(self.backendReady)
+        self.initialised.addCallback(lambda __: defer.ensureDeferred(self.backendReady()))
         self.initialised.addErrback(self.initEb)
 
     def _bridgeEb(self, failure_):
--- a/libervia/server/tasks.py	Mon Apr 20 17:33:41 2020 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,199 +0,0 @@
-#!/usr/bin/env python3
-
-
-# Libervia: a Salut à Toi frontend
-# Copyright (C) 2011-2020 Jérôme Poisson <goffi@goffi.org>
-
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-# GNU Affero General Public License for more details.
-
-# You should have received a copy of the GNU Affero General Public License
-# along with this program.  If not, see <http://www.gnu.org/licenses/>.
-import os
-import os.path
-from twisted.internet import defer
-from twisted.python.procutils import which
-from sat.core import exceptions
-from sat.core.i18n import _
-from libervia.server.constants import Const as C
-from collections import OrderedDict
-from sat.core.log import getLogger
-from sat.tools.common import async_process
-
-log = getLogger(__name__)
-
-
-class TasksManager(object):
-    """Handle tasks of a Libervia site"""
-    FILE_EXTS = {'py'}
-
-    def __init__(self, host, site_resource):
-        """
-        @param site_resource(LiberviaRootResource): root resource of the site to manage
-        """
-        self.host = host
-        self.resource = site_resource
-        self.tasks_dir = os.path.join(self.resource.site_path, C.TASKS_DIR)
-        self.tasks = OrderedDict()
-        self.parseTasks()
-        self._build_path = None
-        self._current_task = None
-
-    @property
-    def site_path(self):
-        return self.resource.site_path
-
-    @property
-    def build_path(self):
-        """path where generated files will be build for this site"""
-        if self._build_path is None:
-            self._build_path = self.host.getBuildPath(self.site_name)
-        return self._build_path
-
-    def getConfig(self, key, default=None, value_type=None):
-        return self.host.getConfig(self.resource, key=key, default=default,
-                                   value_type=value_type)
-
-    @property
-    def site_name(self):
-        return self.resource.site_name
-
-    @property
-    def task_data(self):
-        return self.tasks[self._current_task]['data']
-
-    def validateData(self, data):
-        """Check values in data"""
-
-        for var, default, allowed in (("ON_ERROR", "stop", ("continue", "stop")),
-                                      ("LOG_OUTPUT", True, bool),
-                                      ("WATCH_DIRS", [], list)):
-            value = data.setdefault(var, default)
-            if isinstance(allowed, type):
-                if not isinstance(value, allowed):
-                    raise ValueError(
-                        _("Unexpected value for {var}, {allowed} is expected.")
-                        .format(var = var, allowed = allowed))
-            else:
-                if not value in allowed:
-                    raise ValueError(_("Unexpected value for {var}: {value}").format(
-                        var = var, value = value))
-
-        for var, default, allowed in [["ON_ERROR", "stop", ("continue", "stop")]]:
-            value = data.setdefault(var, default)
-            if not value in allowed:
-                raise ValueError(_("Unexpected value for {var}: {value}").format(
-                    var = var, value = value))
-
-    def parseTasks(self):
-        if not os.path.isdir(self.tasks_dir):
-            log.debug(_("{name} has no task to launch.").format(
-                name = self.resource.site_name or "default site"))
-            return
-        filenames = os.listdir(self.tasks_dir)
-        filenames.sort()
-        for filename in filenames:
-            filepath = os.path.join(self.tasks_dir, filename)
-            if not filename.startswith('task_') or not os.path.isfile(filepath):
-                continue
-            task_name, ext = os.path.splitext(filename)
-            task_name = task_name[5:].lower().strip()
-            if not task_name:
-                continue
-            if ext[1:] not in self.FILE_EXTS:
-                continue
-            if task_name in self.tasks:
-                raise exceptions.ConflictError(
-                    "A task with the name [{name}] already exists".format(
-                        name=task_name))
-            task_data = {"__name__": "{site_name}.task.{name}".format(
-                site_name=self.site_name, name=task_name)}
-            self.tasks[task_name] = {
-                'path': filepath,
-                'data': task_data,
-            }
-            exec(compile(open(filepath, "rb").read(), filepath, 'exec'), task_data)
-            # we launch prepare, which is a method used to prepare
-            # data at runtime (e.g. set WATCH_DIRS using config)
-            try:
-                prepare = task_data['prepare']
-            except KeyError:
-                pass
-            else:
-                prepare(self)
-            self.validateData(task_data)
-            if self.host.options['dev_mode']:
-                dirs = task_data.get('WATCH_DIRS', [])
-                for dir_ in dirs:
-                    self.host.files_watcher.watchDir(
-                        dir_, auto_add=True, recursive=True,
-                        callback=self._autorunTask, task_name=task_name)
-
-    def _autorunTask(self, host, filepath, flags, task_name):
-        """Called when an event is received from a watched directory"""
-        if flags == ['create']:
-            return
-        return self.runTask(task_name)
-
-    @defer.inlineCallbacks
-    def runTask(self, task_name):
-        """Run a single task
-
-        @param task_name(unicode): name of the task to run
-        """
-        task_value = self.tasks[task_name]
-        self._current_task = task_name
-        log.info(_('== running task "{task_name}" for {site_name} =='.format(
-            task_name=task_name, site_name=self.site_name)))
-        data = task_value['data']
-        os.chdir(self.site_path)
-        try:
-            yield data['start'](self)
-        except Exception as e:
-            on_error = data['ON_ERROR']
-            if on_error == 'stop':
-                raise e
-            elif on_error == 'continue':
-                log.warning(_('Task "{task_name}" failed for {site_name}: {reason}')
-                    .format(task_name=task_name, site_name=self.site_name, reason=e))
-            else:
-                raise exceptions.InternalError("we should never reach this point")
-        self._current_task = None
-
-    @defer.inlineCallbacks
-    def runTasks(self):
-        """Run all the tasks found"""
-        old_path = os.getcwd()
-        for task_name, task_value in self.tasks.items():
-            yield self.runTask(task_name)
-        os.chdir(old_path)
-
-    def findCommand(self, name, *args):
-        """Find full path of a shell command
-
-        @param name(unicode): name of the command to find
-        @param *args(unicode): extra names the command may have
-        @return (unicode): full path of the command
-        @raise exceptions.NotFound: can't find this command
-        """
-        names = (name,) + args
-        for n in names:
-            try:
-                cmd_path = which(n)[0]
-            except IndexError:
-                pass
-            else:
-                return cmd_path
-        raise exceptions.NotFound(_(
-            "Can't find {name} command, did you install it?").format(name=name))
-
-    def runCommand(self, command, *args, **kwargs):
-        kwargs['verbose'] = self.task_data["LOG_OUTPUT"]
-        return async_process.CommandProtocol.run(command, *args, **kwargs)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/server/tasks/manager.py	Sat Apr 25 16:08:55 2020 +0200
@@ -0,0 +1,160 @@
+#!/usr/bin/env python3
+
+# Libervia: a Salut à Toi frontend
+# Copyright (C) 2011-2020 Jérôme Poisson <goffi@goffi.org>
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+import os
+import os.path
+import importlib.util
+from sat.core.log import getLogger
+from sat.core import exceptions
+from sat.core.i18n import _
+from sat.tools import utils
+from libervia.server.constants import Const as C
+
+log = getLogger(__name__)
+
+
+class TasksManager:
+    """Handle tasks of a Libervia site"""
+    FILE_EXTS = {'py'}
+
+    def __init__(self, host, site_resource):
+        """
+        @param site_resource(LiberviaRootResource): root resource of the site to manage
+        """
+        self.host = host
+        self.resource = site_resource
+        self.tasks_dir = os.path.join(self.resource.site_path, C.TASKS_DIR)
+        self.tasks = {}
+        self._build_path = None
+        self._current_task = None
+
+    @property
+    def site_path(self):
+        return self.resource.site_path
+
+    @property
+    def build_path(self):
+        """path where generated files will be build for this site"""
+        if self._build_path is None:
+            self._build_path = self.host.getBuildPath(self.site_name)
+        return self._build_path
+
+    @property
+    def site_name(self):
+        return self.resource.site_name
+
+    def validateData(self, task):
+        """Check workflow attributes in task"""
+
+        for var, allowed in (("ON_ERROR", ("continue", "stop")),
+                             ("LOG_OUTPUT", bool),
+                             ("WATCH_DIRS", list)):
+            value = getattr(task, var)
+
+            if isinstance(allowed, type):
+                if allowed is list and value is None:
+                    continue
+                if not isinstance(value, allowed):
+                    raise ValueError(
+                        _("Unexpected value for {var}, {allowed} is expected.")
+                        .format(var=var, allowed=allowed))
+            else:
+                if not value in allowed:
+                    raise ValueError(_("Unexpected value for {var}: {value!r}").format(
+                        var=var, value=value))
+
+    async def parseTasks(self):
+        if not os.path.isdir(self.tasks_dir):
+            log.debug(_("{name} has no task to launch.").format(
+                name = self.resource.site_name or "default site"))
+            return
+        filenames = os.listdir(self.tasks_dir)
+        filenames.sort()
+        for filename in filenames:
+            filepath = os.path.join(self.tasks_dir, filename)
+            if not filename.startswith('task_') or not os.path.isfile(filepath):
+                continue
+            task_name, ext = os.path.splitext(filename)
+            task_name = task_name[5:].lower().strip()
+            if not task_name:
+                continue
+            if ext[1:] not in self.FILE_EXTS:
+                continue
+            if task_name in self.tasks:
+                raise exceptions.ConflictError(
+                    "A task with the name [{name}] already exists".format(
+                        name=task_name))
+            module_name = f"{self.site_name}.task.{task_name}"
+
+            spec = importlib.util.spec_from_file_location(module_name, filepath)
+            task_module = importlib.util.module_from_spec(spec)
+            spec.loader.exec_module(task_module)
+            task = task_module.Task(self)
+            self.tasks[task_name] = task
+
+
+            # we launch prepare, which is a method used to prepare
+            # data at runtime (e.g. set WATCH_DIRS using config)
+            try:
+                prepare = task.prepare
+            except AttributeError:
+                pass
+            else:
+                await utils.asDeferred(prepare)
+            self.validateData(task)
+            if self.host.options['dev_mode']:
+                dirs = task.WATCH_DIRS or []
+                for dir_ in dirs:
+                    self.host.files_watcher.watchDir(
+                        dir_, auto_add=True, recursive=True,
+                        callback=self._autorunTask, task_name=task_name)
+
+    def _autorunTask(self, host, filepath, flags, task_name):
+        """Called when an event is received from a watched directory"""
+        if flags == ['create']:
+            return
+        return defer.ensureDeferred(self.runTask(task_name))
+
+    async def runTask(self, task_name):
+        """Run a single task
+
+        @param task_name(unicode): name of the task to run
+        """
+        task = self.tasks[task_name]
+        self._current_task = task_name
+        log.info(_('== running task "{task_name}" for {site_name} =='.format(
+            task_name=task_name, site_name=self.site_name)))
+        os.chdir(self.site_path)
+        try:
+            await utils.asDeferred(task.start)
+        except Exception as e:
+            on_error = task.ON_ERROR
+            if on_error == 'stop':
+                raise e
+            elif on_error == 'continue':
+                log.warning(_('Task "{task_name}" failed for {site_name}: {reason}')
+                    .format(task_name=task_name, site_name=self.site_name, reason=e))
+            else:
+                raise exceptions.InternalError("we should never reach this point")
+        self._current_task = None
+
+    async def runTasks(self):
+        """Run all the tasks found"""
+        old_path = os.getcwd()
+        for task_name, task_value in self.tasks.items():
+            await self.runTask(task_name)
+        os.chdir(old_path)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/server/tasks/task.py	Sat Apr 25 16:08:55 2020 +0200
@@ -0,0 +1,85 @@
+#!/usr/bin/env python3
+
+# Libervia: a Salut à Toi frontend
+# Copyright (C) 2011-2020 Jérôme Poisson <goffi@goffi.org>
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+from twisted.python.procutils import which
+from sat.core.log import getLogger
+from sat.tools.common import async_process
+from sat.core import exceptions
+from sat.core.i18n import _
+from typing import Optional
+
+log = getLogger(__name__)
+
+
+class Task:
+    """Handle tasks of a Libervia site"""
+    # can be "stop" or "continue"
+    ON_ERROR: str = "stop"
+    LOG_OUTPUT: bool = True
+    # list of directories to check for restarting this task
+    WATCH_DIRS: Optional[list] = None
+
+    def __init__(self, manager):
+        self.manager = manager
+
+    @property
+    def host(self):
+        return self.manager.host
+
+    @property
+    def resource(self):
+        return self.manager.resource
+
+    @property
+    def site_path(self):
+        return self.manager.site_path
+
+    @property
+    def build_path(self):
+        """path where generated files will be build for this site"""
+        return self.manager.build_path
+
+    def getConfig(self, key, default=None, value_type=None):
+        return self.host.getConfig(self.resource, key=key, default=default,
+                                   value_type=value_type)
+
+    @property
+    def site_name(self):
+        return self.resource.site_name
+
+    def findCommand(self, name, *args):
+        """Find full path of a shell command
+
+        @param name(unicode): name of the command to find
+        @param *args(unicode): extra names the command may have
+        @return (unicode): full path of the command
+        @raise exceptions.NotFound: can't find this command
+        """
+        names = (name,) + args
+        for n in names:
+            try:
+                cmd_path = which(n)[0]
+            except IndexError:
+                pass
+            else:
+                return cmd_path
+        raise exceptions.NotFound(_(
+            "Can't find {name} command, did you install it?").format(name=name))
+
+    def runCommand(self, command, *args, **kwargs):
+        kwargs['verbose'] = self.LOG_OUTPUT
+        return async_process.CommandProtocol.run(command, *args, **kwargs)