Mercurial > libervia-web
view libervia/server/tasks/manager.py @ 1346:cda5537c71d6
browser (photos/album): videos integrations:
videos can now be added to an album, the alternative media player is then used to display
them. Slides options are used to remove pagination and slidebar from slideshow (they don't
play well with media player), and video are reset when its slide is exited.
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 25 Aug 2020 08:31:56 +0200 |
parents | a46d0e0f383b |
children | df40708c4c76 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia: a Salut à Toi frontend # Copyright (C) 2011-2020 Jérôme Poisson <goffi@goffi.org> # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os import os.path from pathlib import Path from typing import Dict import importlib.util from twisted.internet import defer from sat.core.log import getLogger from sat.core import exceptions from sat.core.i18n import _ from sat.tools import utils from libervia.server.constants import Const as C from . import implicit from .task import Task log = getLogger(__name__) DEFAULT_SITE_LABEL = _("default site") class TasksManager: """Handle tasks of a Libervia site""" def __init__(self, host, site_resource): """ @param site_resource(LiberviaRootResource): root resource of the site to manage """ self.host = host self.resource = site_resource self.tasks_dir = self.site_path / C.TASKS_DIR self.tasks = {} self._build_path = None self._current_task = None @property def site_path(self): return Path(self.resource.site_path) @property def build_path(self): """path where generated files will be build for this site""" if self._build_path is None: self._build_path = self.host.getBuildPath(self.site_name) return self._build_path @property def site_name(self): return self.resource.site_name def validateData(self, task): """Check workflow attributes in task""" for var, allowed in (("ON_ERROR", ("continue", "stop")), ("LOG_OUTPUT", bool), ("WATCH_DIRS", list)): value = getattr(task, var) if isinstance(allowed, type): if allowed is list and value is None: continue if not isinstance(value, allowed): raise ValueError( _("Unexpected value for {var}, {allowed} is expected.") .format(var=var, allowed=allowed)) else: if not value in allowed: raise ValueError(_("Unexpected value for {var}: {value!r}").format( var=var, value=value)) async def importTask( self, task_name: str, task_path: Path, to_import: Dict[str, Path] ) -> None: if task_name in self.tasks: log.debug(f"skipping task {task_name} which is already imported") return module_name = f"{self.site_name or C.SITE_NAME_DEFAULT}.task.{task_name}" spec = importlib.util.spec_from_file_location(module_name, task_path) task_module = importlib.util.module_from_spec(spec) spec.loader.exec_module(task_module) task = task_module.Task(self, task_name) if task.AFTER is not None: for pre_task_name in task.AFTER: log.debug( f"task {task_name!r} must be run after {pre_task_name!r}") try: pre_task_path = to_import[pre_task_name] except KeyError: raise ValueError( f"task {task_name!r} must be run after {pre_task_name!r}, " f"however there is no task with such name") await self.importTask(pre_task_name, pre_task_path, to_import) # we launch prepare, which is a method used to prepare # data at runtime (e.g. set WATCH_DIRS using config) try: prepare = task.prepare except AttributeError: pass else: log.info(_('== preparing task "{task_name}" for {site_name} =='.format( task_name=task_name, site_name=self.site_name or DEFAULT_SITE_LABEL))) try: await utils.asDeferred(prepare) except exceptions.CancelError as e: log.debug(f"Skipping {task_name} which cancelled itself: {e}") return self.tasks[task_name] = task self.validateData(task) if self.host.options['dev_mode']: dirs = task.WATCH_DIRS or [] for dir_ in dirs: self.host.files_watcher.watchDir( dir_, auto_add=True, recursive=True, callback=self._autorunTask, task_name=task_name) async def parseTasksDir(self, dir_path: Path) -> None: log.debug(f"parsing tasks in {dir_path}") tasks_paths = sorted(dir_path.glob('task_*.py')) to_import = {} for task_path in tasks_paths: if not task_path.is_file(): continue task_name = task_path.stem[5:].lower().strip() if not task_name: continue if task_name in self.tasks: raise exceptions.ConflictError( "A task with the name [{name}] already exists".format( name=task_name)) log.debug(f"task {task_name} found") to_import[task_name] = task_path for task_name, task_path in to_import.items(): await self.importTask(task_name, task_path, to_import) async def parseTasks(self): # implicit tasks are always run implicit_path = Path(implicit.__file__).parent await self.parseTasksDir(implicit_path) # now we check if there are tasks specific to this site if not self.tasks_dir.is_dir(): log.debug(_("{name} has no task to launch.").format( name = self.resource.site_name or DEFAULT_SITE_LABEL)) return else: await self.parseTasksDir(self.tasks_dir) def _autorunTask(self, host, filepath, flags, task_name): """Called when an event is received from a watched directory""" if flags == ['create']: return try: task = self.tasks[task_name] on_dir_event_cb = task.onDirEvent except AttributeError: return defer.ensureDeferred(self.runTask(task_name)) else: return utils.asDeferred( on_dir_event_cb, host, Path(filepath.path.decode()), flags) async def runTaskInstance(self, task: Task) -> None: self._current_task = task.name log.info(_('== running task "{task_name}" for {site_name} =='.format( task_name=task.name, site_name=self.site_name or DEFAULT_SITE_LABEL))) os.chdir(self.site_path) try: await utils.asDeferred(task.start) except Exception as e: on_error = task.ON_ERROR if on_error == 'stop': raise e elif on_error == 'continue': log.warning(_('Task "{task_name}" failed for {site_name}: {reason}') .format(task_name=task.name, site_name=self.site_name, reason=e)) else: raise exceptions.InternalError("we should never reach this point") self._current_task = None async def runTask(self, task_name: str) -> None: """Run a single task @param task_name(unicode): name of the task to run """ task = self.tasks[task_name] await self.runTaskInstance(task) async def runTasks(self): """Run all the tasks found""" old_path = os.getcwd() for task_name, task_value in self.tasks.items(): await self.runTask(task_name) os.chdir(old_path)