Mercurial > libervia-web
diff libervia/web/server/tasks/manager.py @ 1518:eb00d593801d
refactoring: rename `libervia` to `libervia.web` + update imports following backend changes
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 16:49:28 +0200 |
parents | libervia/server/tasks/manager.py@106bae41f5c8 |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/web/server/tasks/manager.py Fri Jun 02 16:49:28 2023 +0200 @@ -0,0 +1,213 @@ +#!/usr/bin/env python3 + +# Libervia: a Salut à Toi frontend +# Copyright (C) 2011-2021 Jérôme Poisson <goffi@goffi.org> + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import os +import os.path +from pathlib import Path +from typing import Dict +import importlib.util +from twisted.internet import defer +from libervia.backend.core.log import getLogger +from libervia.backend.core import exceptions +from libervia.backend.core.i18n import _ +from libervia.backend.tools import utils +from libervia.web.server.constants import Const as C +from . import implicit +from .task import Task + +log = getLogger(__name__) + +DEFAULT_SITE_LABEL = _("default site") + + +class TasksManager: + """Handle tasks of a Libervia site""" + + def __init__(self, host, site_resource): + """ + @param site_resource(LiberviaRootResource): root resource of the site to manage + """ + self.host = host + self.resource = site_resource + self.tasks_dir = self.site_path / C.TASKS_DIR + self.tasks = {} + self._build_path = None + self._current_task = None + + @property + def site_path(self): + return Path(self.resource.site_path) + + @property + def build_path(self): + """path where generated files will be build for this site""" + if self._build_path is None: + self._build_path = self.host.get_build_path(self.site_name) + return self._build_path + + @property + def site_name(self): + return self.resource.site_name + + def validate_data(self, task): + """Check workflow attributes in task""" + + for var, allowed in (("ON_ERROR", ("continue", "stop")), + ("LOG_OUTPUT", bool), + ("WATCH_DIRS", list)): + value = getattr(task, var) + + if isinstance(allowed, type): + if allowed is list and value is None: + continue + if not isinstance(value, allowed): + raise ValueError( + _("Unexpected value for {var}, {allowed} is expected.") + .format(var=var, allowed=allowed)) + else: + if not value in allowed: + raise ValueError(_("Unexpected value for {var}: {value!r}").format( + var=var, value=value)) + + async def import_task( + self, + task_name: str, + task_path: Path, + to_import: Dict[str, Path] + ) -> None: + if task_name in self.tasks: + log.debug(f"skipping task {task_name} which is already imported") + return + module_name = f"{self.site_name or C.SITE_NAME_DEFAULT}.task.{task_name}" + + spec = importlib.util.spec_from_file_location(module_name, task_path) + task_module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(task_module) + task = task_module.Task(self, task_name) + if task.AFTER is not None: + for pre_task_name in task.AFTER: + log.debug( + f"task {task_name!r} must be run after {pre_task_name!r}") + try: + pre_task_path = to_import[pre_task_name] + except KeyError: + raise ValueError( + f"task {task_name!r} must be run after {pre_task_name!r}, " + f"however there is no task with such name") + await self.import_task(pre_task_name, pre_task_path, to_import) + + # we launch prepare, which is a method used to prepare + # data at runtime (e.g. set WATCH_DIRS using config) + try: + prepare = task.prepare + except AttributeError: + pass + else: + log.info(_('== preparing task "{task_name}" for {site_name} =='.format( + task_name=task_name, site_name=self.site_name or DEFAULT_SITE_LABEL))) + try: + await utils.as_deferred(prepare) + except exceptions.CancelError as e: + log.debug(f"Skipping {task_name} which cancelled itself: {e}") + return + + self.tasks[task_name] = task + self.validate_data(task) + if self.host.options['dev-mode']: + dirs = task.WATCH_DIRS or [] + for dir_ in dirs: + self.host.files_watcher.watch_dir( + dir_, auto_add=True, recursive=True, + callback=self._autorun_task, task_name=task_name) + + async def parse_tasks_dir(self, dir_path: Path) -> None: + log.debug(f"parsing tasks in {dir_path}") + tasks_paths = sorted(dir_path.glob('task_*.py')) + to_import = {} + for task_path in tasks_paths: + if not task_path.is_file(): + continue + task_name = task_path.stem[5:].lower().strip() + if not task_name: + continue + if task_name in self.tasks: + raise exceptions.ConflictError( + "A task with the name [{name}] already exists".format( + name=task_name)) + log.debug(f"task {task_name} found") + to_import[task_name] = task_path + + for task_name, task_path in to_import.items(): + await self.import_task(task_name, task_path, to_import) + + async def parse_tasks(self): + # implicit tasks are always run + implicit_path = Path(implicit.__file__).parent + await self.parse_tasks_dir(implicit_path) + # now we check if there are tasks specific to this site + if not self.tasks_dir.is_dir(): + log.debug(_("{name} has no task to launch.").format( + name = self.resource.site_name or DEFAULT_SITE_LABEL)) + return + else: + await self.parse_tasks_dir(self.tasks_dir) + + def _autorun_task(self, host, filepath, flags, task_name): + """Called when an event is received from a watched directory""" + if flags == ['create']: + return + try: + task = self.tasks[task_name] + on_dir_event_cb = task.on_dir_event + except AttributeError: + return defer.ensureDeferred(self.run_task(task_name)) + else: + return utils.as_deferred( + on_dir_event_cb, host, Path(filepath.path.decode()), flags) + + async def run_task_instance(self, task: Task) -> None: + self._current_task = task.name + log.info(_('== running task "{task_name}" for {site_name} =='.format( + task_name=task.name, site_name=self.site_name or DEFAULT_SITE_LABEL))) + os.chdir(self.site_path) + try: + await utils.as_deferred(task.start) + except Exception as e: + on_error = task.ON_ERROR + if on_error == 'stop': + raise e + elif on_error == 'continue': + log.warning(_('Task "{task_name}" failed for {site_name}: {reason}') + .format(task_name=task.name, site_name=self.site_name, reason=e)) + else: + raise exceptions.InternalError("we should never reach this point") + self._current_task = None + + async def run_task(self, task_name: str) -> None: + """Run a single task + + @param task_name(unicode): name of the task to run + """ + task = self.tasks[task_name] + await self.run_task_instance(task) + + async def run_tasks(self): + """Run all the tasks found""" + old_path = os.getcwd() + for task_name, task_value in self.tasks.items(): + await self.run_task(task_name) + os.chdir(old_path)