# HG changeset patch # User Goffi # Date 1587823735 -7200 # Node ID 079e8eb6e327017e3506daeb92b1ea3bec07c95b # Parent 2ed4e399e1d4832ae4f6b0563bd4ee0ef6dc8657 server (tasks): refactoring: - moved `TasksManager` to `server.tasks.manager` - tasks modules now must have a `Task` class which will be instanciated by TasksManager - `server.tasks.task` has a basis for `Task` class - `Task.prepare` can now be asynchronous - `importlib` is now used to import tasks, instead of exec - tasks are now parsed/run after pages are imported - `server.BackendReady` is now a coroutine - type hinting for Task attributes diff -r 2ed4e399e1d4 -r 079e8eb6e327 libervia/server/server.py --- a/libervia/server/server.py Mon Apr 20 17:33:41 2020 +0200 +++ b/libervia/server/server.py Sat Apr 25 16:08:55 2020 +0200 @@ -54,7 +54,7 @@ from libervia.server import websockets from libervia.server.pages import LiberviaPage from libervia.server.utils import quote, ProgressHandler -from libervia.server.tasks import TasksManager +from libervia.server.tasks.manager import TasksManager from functools import partial try: @@ -734,8 +734,7 @@ if default_dict: conf[''] = default_dict - @defer.inlineCallbacks - def backendReady(self, __): + async def backendReady(self): log.info(f"Libervia v{self.full_version}") if self.options['dev_mode']: log.info(_("Developer mode activated")) @@ -763,9 +762,10 @@ default_site_path, auto_add=True, recursive=True, callback=LiberviaPage.onFileChange, site_root=self.sat_root, site_path=default_site_path) + LiberviaPage.importPages(self, self.sat_root) tasks_manager = TasksManager(self, self.sat_root) - yield tasks_manager.runTasks() - LiberviaPage.importPages(self, self.sat_root) + await tasks_manager.parseTasks() + await tasks_manager.runTasks() # FIXME: handle _setMenu in a more generic way, taking care of external sites self.sat_root._setMenu(self.options["menu_json"]) self.vhost_root.default = default_root @@ -805,8 +805,6 @@ site_path, auto_add=True, recursive=True, callback=LiberviaPage.onFileChange, site_root=res, site_path=site_path) - tasks_manager = TasksManager(self, res) - yield tasks_manager.runTasks() res.putChild( C.BUILD_DIR.encode('utf-8'), ProtectedFile( @@ -820,6 +818,9 @@ # we may want to disable access to the page by direct URL # (e.g. /blog disabled except if called by external site) LiberviaPage.importPages(self, res, root_path=default_site_path) + tasks_manager = TasksManager(self, res) + await tasks_manager.parseTasks() + await tasks_manager.runTasks() res._setMenu(self.options["menu_json"]) self.vhost_root.addHost(host_name.encode('utf-8'), res) @@ -895,7 +896,7 @@ lambda: self.initialised.callback(None), lambda failure: self.initialised.errback(Exception(failure)), ) - self.initialised.addCallback(self.backendReady) + self.initialised.addCallback(lambda __: defer.ensureDeferred(self.backendReady())) self.initialised.addErrback(self.initEb) def _bridgeEb(self, failure_): diff -r 2ed4e399e1d4 -r 079e8eb6e327 libervia/server/tasks.py --- a/libervia/server/tasks.py Mon Apr 20 17:33:41 2020 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,199 +0,0 @@ -#!/usr/bin/env python3 - - -# Libervia: a Salut à Toi frontend -# Copyright (C) 2011-2020 Jérôme Poisson - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. - -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . -import os -import os.path -from twisted.internet import defer -from twisted.python.procutils import which -from sat.core import exceptions -from sat.core.i18n import _ -from libervia.server.constants import Const as C -from collections import OrderedDict -from sat.core.log import getLogger -from sat.tools.common import async_process - -log = getLogger(__name__) - - -class TasksManager(object): - """Handle tasks of a Libervia site""" - FILE_EXTS = {'py'} - - def __init__(self, host, site_resource): - """ - @param site_resource(LiberviaRootResource): root resource of the site to manage - """ - self.host = host - self.resource = site_resource - self.tasks_dir = os.path.join(self.resource.site_path, C.TASKS_DIR) - self.tasks = OrderedDict() - self.parseTasks() - self._build_path = None - self._current_task = None - - @property - def site_path(self): - return self.resource.site_path - - @property - def build_path(self): - """path where generated files will be build for this site""" - if self._build_path is None: - self._build_path = self.host.getBuildPath(self.site_name) - return self._build_path - - def getConfig(self, key, default=None, value_type=None): - return self.host.getConfig(self.resource, key=key, default=default, - value_type=value_type) - - @property - def site_name(self): - return self.resource.site_name - - @property - def task_data(self): - return self.tasks[self._current_task]['data'] - - def validateData(self, data): - """Check values in data""" - - for var, default, allowed in (("ON_ERROR", "stop", ("continue", "stop")), - ("LOG_OUTPUT", True, bool), - ("WATCH_DIRS", [], list)): - value = data.setdefault(var, default) - if isinstance(allowed, type): - if not isinstance(value, allowed): - raise ValueError( - _("Unexpected value for {var}, {allowed} is expected.") - .format(var = var, allowed = allowed)) - else: - if not value in allowed: - raise ValueError(_("Unexpected value for {var}: {value}").format( - var = var, value = value)) - - for var, default, allowed in [["ON_ERROR", "stop", ("continue", "stop")]]: - value = data.setdefault(var, default) - if not value in allowed: - raise ValueError(_("Unexpected value for {var}: {value}").format( - var = var, value = value)) - - def parseTasks(self): - if not os.path.isdir(self.tasks_dir): - log.debug(_("{name} has no task to launch.").format( - name = self.resource.site_name or "default site")) - return - filenames = os.listdir(self.tasks_dir) - filenames.sort() - for filename in filenames: - filepath = os.path.join(self.tasks_dir, filename) - if not filename.startswith('task_') or not os.path.isfile(filepath): - continue - task_name, ext = os.path.splitext(filename) - task_name = task_name[5:].lower().strip() - if not task_name: - continue - if ext[1:] not in self.FILE_EXTS: - continue - if task_name in self.tasks: - raise exceptions.ConflictError( - "A task with the name [{name}] already exists".format( - name=task_name)) - task_data = {"__name__": "{site_name}.task.{name}".format( - site_name=self.site_name, name=task_name)} - self.tasks[task_name] = { - 'path': filepath, - 'data': task_data, - } - exec(compile(open(filepath, "rb").read(), filepath, 'exec'), task_data) - # we launch prepare, which is a method used to prepare - # data at runtime (e.g. set WATCH_DIRS using config) - try: - prepare = task_data['prepare'] - except KeyError: - pass - else: - prepare(self) - self.validateData(task_data) - if self.host.options['dev_mode']: - dirs = task_data.get('WATCH_DIRS', []) - for dir_ in dirs: - self.host.files_watcher.watchDir( - dir_, auto_add=True, recursive=True, - callback=self._autorunTask, task_name=task_name) - - def _autorunTask(self, host, filepath, flags, task_name): - """Called when an event is received from a watched directory""" - if flags == ['create']: - return - return self.runTask(task_name) - - @defer.inlineCallbacks - def runTask(self, task_name): - """Run a single task - - @param task_name(unicode): name of the task to run - """ - task_value = self.tasks[task_name] - self._current_task = task_name - log.info(_('== running task "{task_name}" for {site_name} =='.format( - task_name=task_name, site_name=self.site_name))) - data = task_value['data'] - os.chdir(self.site_path) - try: - yield data['start'](self) - except Exception as e: - on_error = data['ON_ERROR'] - if on_error == 'stop': - raise e - elif on_error == 'continue': - log.warning(_('Task "{task_name}" failed for {site_name}: {reason}') - .format(task_name=task_name, site_name=self.site_name, reason=e)) - else: - raise exceptions.InternalError("we should never reach this point") - self._current_task = None - - @defer.inlineCallbacks - def runTasks(self): - """Run all the tasks found""" - old_path = os.getcwd() - for task_name, task_value in self.tasks.items(): - yield self.runTask(task_name) - os.chdir(old_path) - - def findCommand(self, name, *args): - """Find full path of a shell command - - @param name(unicode): name of the command to find - @param *args(unicode): extra names the command may have - @return (unicode): full path of the command - @raise exceptions.NotFound: can't find this command - """ - names = (name,) + args - for n in names: - try: - cmd_path = which(n)[0] - except IndexError: - pass - else: - return cmd_path - raise exceptions.NotFound(_( - "Can't find {name} command, did you install it?").format(name=name)) - - def runCommand(self, command, *args, **kwargs): - kwargs['verbose'] = self.task_data["LOG_OUTPUT"] - return async_process.CommandProtocol.run(command, *args, **kwargs) diff -r 2ed4e399e1d4 -r 079e8eb6e327 libervia/server/tasks/manager.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/server/tasks/manager.py Sat Apr 25 16:08:55 2020 +0200 @@ -0,0 +1,160 @@ +#!/usr/bin/env python3 + +# Libervia: a Salut à Toi frontend +# Copyright (C) 2011-2020 Jérôme Poisson + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +import os +import os.path +import importlib.util +from sat.core.log import getLogger +from sat.core import exceptions +from sat.core.i18n import _ +from sat.tools import utils +from libervia.server.constants import Const as C + +log = getLogger(__name__) + + +class TasksManager: + """Handle tasks of a Libervia site""" + FILE_EXTS = {'py'} + + def __init__(self, host, site_resource): + """ + @param site_resource(LiberviaRootResource): root resource of the site to manage + """ + self.host = host + self.resource = site_resource + self.tasks_dir = os.path.join(self.resource.site_path, C.TASKS_DIR) + self.tasks = {} + self._build_path = None + self._current_task = None + + @property + def site_path(self): + return self.resource.site_path + + @property + def build_path(self): + """path where generated files will be build for this site""" + if self._build_path is None: + self._build_path = self.host.getBuildPath(self.site_name) + return self._build_path + + @property + def site_name(self): + return self.resource.site_name + + def validateData(self, task): + """Check workflow attributes in task""" + + for var, allowed in (("ON_ERROR", ("continue", "stop")), + ("LOG_OUTPUT", bool), + ("WATCH_DIRS", list)): + value = getattr(task, var) + + if isinstance(allowed, type): + if allowed is list and value is None: + continue + if not isinstance(value, allowed): + raise ValueError( + _("Unexpected value for {var}, {allowed} is expected.") + .format(var=var, allowed=allowed)) + else: + if not value in allowed: + raise ValueError(_("Unexpected value for {var}: {value!r}").format( + var=var, value=value)) + + async def parseTasks(self): + if not os.path.isdir(self.tasks_dir): + log.debug(_("{name} has no task to launch.").format( + name = self.resource.site_name or "default site")) + return + filenames = os.listdir(self.tasks_dir) + filenames.sort() + for filename in filenames: + filepath = os.path.join(self.tasks_dir, filename) + if not filename.startswith('task_') or not os.path.isfile(filepath): + continue + task_name, ext = os.path.splitext(filename) + task_name = task_name[5:].lower().strip() + if not task_name: + continue + if ext[1:] not in self.FILE_EXTS: + continue + if task_name in self.tasks: + raise exceptions.ConflictError( + "A task with the name [{name}] already exists".format( + name=task_name)) + module_name = f"{self.site_name}.task.{task_name}" + + spec = importlib.util.spec_from_file_location(module_name, filepath) + task_module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(task_module) + task = task_module.Task(self) + self.tasks[task_name] = task + + + # we launch prepare, which is a method used to prepare + # data at runtime (e.g. set WATCH_DIRS using config) + try: + prepare = task.prepare + except AttributeError: + pass + else: + await utils.asDeferred(prepare) + self.validateData(task) + if self.host.options['dev_mode']: + dirs = task.WATCH_DIRS or [] + for dir_ in dirs: + self.host.files_watcher.watchDir( + dir_, auto_add=True, recursive=True, + callback=self._autorunTask, task_name=task_name) + + def _autorunTask(self, host, filepath, flags, task_name): + """Called when an event is received from a watched directory""" + if flags == ['create']: + return + return defer.ensureDeferred(self.runTask(task_name)) + + async def runTask(self, task_name): + """Run a single task + + @param task_name(unicode): name of the task to run + """ + task = self.tasks[task_name] + self._current_task = task_name + log.info(_('== running task "{task_name}" for {site_name} =='.format( + task_name=task_name, site_name=self.site_name))) + os.chdir(self.site_path) + try: + await utils.asDeferred(task.start) + except Exception as e: + on_error = task.ON_ERROR + if on_error == 'stop': + raise e + elif on_error == 'continue': + log.warning(_('Task "{task_name}" failed for {site_name}: {reason}') + .format(task_name=task_name, site_name=self.site_name, reason=e)) + else: + raise exceptions.InternalError("we should never reach this point") + self._current_task = None + + async def runTasks(self): + """Run all the tasks found""" + old_path = os.getcwd() + for task_name, task_value in self.tasks.items(): + await self.runTask(task_name) + os.chdir(old_path) diff -r 2ed4e399e1d4 -r 079e8eb6e327 libervia/server/tasks/task.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/server/tasks/task.py Sat Apr 25 16:08:55 2020 +0200 @@ -0,0 +1,85 @@ +#!/usr/bin/env python3 + +# Libervia: a Salut à Toi frontend +# Copyright (C) 2011-2020 Jérôme Poisson + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +from twisted.python.procutils import which +from sat.core.log import getLogger +from sat.tools.common import async_process +from sat.core import exceptions +from sat.core.i18n import _ +from typing import Optional + +log = getLogger(__name__) + + +class Task: + """Handle tasks of a Libervia site""" + # can be "stop" or "continue" + ON_ERROR: str = "stop" + LOG_OUTPUT: bool = True + # list of directories to check for restarting this task + WATCH_DIRS: Optional[list] = None + + def __init__(self, manager): + self.manager = manager + + @property + def host(self): + return self.manager.host + + @property + def resource(self): + return self.manager.resource + + @property + def site_path(self): + return self.manager.site_path + + @property + def build_path(self): + """path where generated files will be build for this site""" + return self.manager.build_path + + def getConfig(self, key, default=None, value_type=None): + return self.host.getConfig(self.resource, key=key, default=default, + value_type=value_type) + + @property + def site_name(self): + return self.resource.site_name + + def findCommand(self, name, *args): + """Find full path of a shell command + + @param name(unicode): name of the command to find + @param *args(unicode): extra names the command may have + @return (unicode): full path of the command + @raise exceptions.NotFound: can't find this command + """ + names = (name,) + args + for n in names: + try: + cmd_path = which(n)[0] + except IndexError: + pass + else: + return cmd_path + raise exceptions.NotFound(_( + "Can't find {name} command, did you install it?").format(name=name)) + + def runCommand(self, command, *args, **kwargs): + kwargs['verbose'] = self.LOG_OUTPUT + return async_process.CommandProtocol.run(command, *args, **kwargs)