Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_misc_app_manager.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_misc_app_manager.py@524856bd7b19 |
children | c93b02000ae4 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_misc_app_manager.py Fri Jun 02 11:49:51 2023 +0200 @@ -0,0 +1,636 @@ +#!/usr/bin/env python3 + +# Libervia plugin to manage external applications +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from pathlib import Path +from typing import Optional, List, Callable +from functools import partial, reduce +import tempfile +import secrets +import string +import shortuuid +from twisted.internet import defer +from twisted.python.procutils import which +from libervia.backend.core.i18n import _ +from libervia.backend.core import exceptions +from libervia.backend.core.constants import Const as C +from libervia.backend.core.log import getLogger +from libervia.backend.tools.common import data_format +from libervia.backend.tools.common import async_process + +log = getLogger(__name__) + +try: + import yaml +except ImportError: + raise exceptions.MissingModule( + 'Missing module PyYAML, please download/install it. You can use ' + '"pip install pyyaml"' + ) + +try: + from yaml import CLoader as Loader, CDumper as Dumper +except ImportError: + log.warning( + "Can't use LibYAML binding (is libyaml installed?), pure Python version will be " + "used, but it is slower" + ) + from yaml import Loader, Dumper + +from yaml.constructor import ConstructorError + + +PLUGIN_INFO = { + C.PI_NAME: "Applications Manager", + C.PI_IMPORT_NAME: "APP_MANAGER", + C.PI_TYPE: C.PLUG_TYPE_MISC, + C.PI_MODES: C.PLUG_MODE_BOTH, + C.PI_MAIN: "AppManager", + C.PI_HANDLER: "no", + C.PI_DESCRIPTION: _( + """Applications Manager + +Manage external applications using packagers, OS virtualization/containers or other +software management tools. +"""), +} + +APP_FILE_PREFIX = "sat_app_" + + +class AppManager: + load = partial(yaml.load, Loader=Loader) + dump = partial(yaml.dump, Dumper=Dumper) + + def __init__(self, host): + log.info(_("plugin Applications Manager initialization")) + self.host = host + self._managers = {} + self._apps = {} + self._started = {} + # instance id to app data map + self._instances = {} + host.bridge.add_method( + "applications_list", + ".plugin", + in_sign="as", + out_sign="as", + method=self.list_applications, + ) + host.bridge.add_method( + "application_start", + ".plugin", + in_sign="ss", + out_sign="s", + method=self._start, + async_=True, + ) + host.bridge.add_method( + "application_stop", + ".plugin", + in_sign="sss", + out_sign="", + method=self._stop, + async_=True, + ) + host.bridge.add_method( + "application_exposed_get", + ".plugin", + in_sign="sss", + out_sign="s", + method=self._get_exposed, + async_=True, + ) + # application has been started succeesfully, + # args: name, instance_id, extra + host.bridge.add_signal( + "application_started", ".plugin", signature="sss" + ) + # application went wrong with the application + # args: name, instance_id, extra + host.bridge.add_signal( + "application_error", ".plugin", signature="sss" + ) + yaml.add_constructor( + "!sat_conf", self._sat_conf_constr, Loader=Loader) + yaml.add_constructor( + "!sat_generate_pwd", self._sat_generate_pwd_constr, Loader=Loader) + yaml.add_constructor( + "!sat_param", self._sat_param_constr, Loader=Loader) + + def unload(self): + log.debug("unloading applications manager") + for instances in self._started.values(): + for instance in instances: + data = instance['data'] + if not data['single_instance']: + log.debug( + f"cleaning temporary directory at {data['_instance_dir_path']}") + data['_instance_dir_obj'].cleanup() + + def _sat_conf_constr(self, loader, node): + """Get a value from Libervia configuration + + A list is expected with either "name" of a config parameter, a one or more of + those parameters: + - section + - name + - default value + - filter + filter can be: + - "first": get the first item of the value + """ + config_data = loader.construct_sequence(node) + if len(config_data) == 1: + section, name, default, filter_ = "", config_data[0], None, None + if len(config_data) == 2: + (section, name), default, filter_ = config_data, None, None + elif len(config_data) == 3: + (section, name, default), filter_ = config_data, None + elif len(config_data) == 4: + section, name, default, filter_ = config_data + else: + raise ValueError( + f"invalid !sat_conf value ({config_data!r}), a list of 1 to 4 items is " + "expected" + ) + + value = self.host.memory.config_get(section, name, default) + # FIXME: "public_url" is used only here and doesn't take multi-sites into account + if name == "public_url" and (not value or value.startswith('http')): + if not value: + log.warning(_( + 'No value found for "public_url", using "example.org" for ' + 'now, please set the proper value in libervia.conf')) + else: + log.warning(_( + 'invalid value for "public_url" ({value}), it musts not start with ' + 'schema ("http"), ignoring it and using "example.org" ' + 'instead') + .format(value=value)) + value = "example.org" + + if filter_ is None: + pass + elif filter_ == 'first': + value = value[0] + else: + raise ValueError(f"unmanaged filter: {filter_}") + + return value + + def _sat_generate_pwd_constr(self, loader, node): + alphabet = string.ascii_letters + string.digits + return ''.join(secrets.choice(alphabet) for i in range(30)) + + def _sat_param_constr(self, loader, node): + """Get a parameter specified when starting the application + + The value can be either the name of the parameter to get, or a list as + [name, default_value] + """ + try: + name, default = loader.construct_sequence(node) + except ConstructorError: + name, default = loader.construct_scalar(node), None + return self._params.get(name, default) + + def register(self, manager): + name = manager.name + if name in self._managers: + raise exceptions.ConflictError( + f"There is already a manager with the name {name}") + self._managers[manager.name] = manager + if hasattr(manager, "discover_path"): + self.discover(manager.discover_path, manager) + + def get_manager(self, app_data: dict) -> object: + """Get manager instance needed for this app + + @raise exceptions.DataError: something is wrong with the type + @raise exceptions.NotFound: manager is not registered + """ + try: + app_type = app_data["type"] + except KeyError: + raise exceptions.DataError( + "app file doesn't have the mandatory \"type\" key" + ) + if not isinstance(app_type, str): + raise exceptions.DataError( + f"invalid app data type: {app_type!r}" + ) + app_type = app_type.strip() + try: + return self._managers[app_type] + except KeyError: + raise exceptions.NotFound( + f"No manager found to manage app of type {app_type!r}") + + def get_app_data( + self, + id_type: Optional[str], + identifier: str + ) -> dict: + """Retrieve instance's app_data from identifier + + @param id_type: type of the identifier, can be: + - "name": identifier is a canonical application name + the first found instance of this application is returned + - "instance": identifier is an instance id + @param identifier: identifier according to id_type + @return: instance application data + @raise exceptions.NotFound: no instance with this id can be found + @raise ValueError: id_type is invalid + """ + if not id_type: + id_type = 'name' + if id_type == 'name': + identifier = identifier.lower().strip() + try: + return next(iter(self._started[identifier])) + except (KeyError, StopIteration): + raise exceptions.NotFound( + f"No instance of {identifier!r} is currently running" + ) + elif id_type == 'instance': + instance_id = identifier + try: + return self._instances[instance_id] + except KeyError: + raise exceptions.NotFound( + f"There is no application instance running with id {instance_id!r}" + ) + else: + raise ValueError(f"invalid id_type: {id_type!r}") + + def discover( + self, + dir_path: Path, + manager: Optional = None + ) -> None: + for file_path in dir_path.glob(f"{APP_FILE_PREFIX}*.yaml"): + if manager is None: + try: + app_data = self.parse(file_path) + manager = self.get_manager(app_data) + except (exceptions.DataError, exceptions.NotFound) as e: + log.warning( + f"Can't parse {file_path}, skipping: {e}") + app_name = file_path.stem[len(APP_FILE_PREFIX):].strip().lower() + if not app_name: + log.warning( + f"invalid app file name at {file_path}") + continue + app_dict = self._apps.setdefault(app_name, {}) + manager_set = app_dict.setdefault(manager, set()) + manager_set.add(file_path) + log.debug( + f"{app_name!r} {manager.name} application found" + ) + + def parse(self, file_path: Path, params: Optional[dict] = None) -> dict: + """Parse Libervia application file + + @param params: parameters for running this instance + @raise exceptions.DataError: something is wrong in the file + """ + if params is None: + params = {} + with file_path.open() as f: + # we set parameters to be used only with this instance + # no async method must used between this assignation and `load` + self._params = params + app_data = self.load(f) + self._params = None + if "name" not in app_data: + # note that we don't use lower() here as we want human readable name and + # uppercase may be set on purpose + app_data['name'] = file_path.stem[len(APP_FILE_PREFIX):].strip() + single_instance = app_data.setdefault("single_instance", True) + if not isinstance(single_instance, bool): + raise ValueError( + f'"single_instance" must be a boolean, but it is {type(single_instance)}' + ) + return app_data + + def list_applications(self, filters: Optional[List[str]]) -> List[str]: + """List available application + + @param filters: only show applications matching those filters. + using None will list all known applications + a filter can be: + - available: applications available locally + - running: only show launched applications + """ + if not filters: + return list(self.apps) + found = set() + for filter_ in filters: + if filter_ == "available": + found.update(self._apps) + elif filter_ == "running": + found.update(self._started) + else: + raise ValueError(f"Unknown filter: {filter_}") + return list(found) + + def _start(self, app_name, extra): + extra = data_format.deserialise(extra) + d = defer.ensureDeferred(self.start(str(app_name), extra)) + d.addCallback(data_format.serialise) + return d + + async def start( + self, + app_name: str, + extra: Optional[dict] = None, + ) -> dict: + """Start an application + + @param app_name: name of the application to start + @param extra: extra parameters + @return: data with following keys: + - name (str): canonical application name + - instance (str): instance ID + - started (bool): True if the application is already started + if False, the "application_started" signal should be used to get notificed + when the application is actually started + - expose (dict): exposed data as given by [self.get_exposed] + exposed data which need to be computed are NOT returned, they will + available when the app will be fully started, throught the + [self.get_exposed] method. + """ + # FIXME: for now we use the first app manager available for the requested app_name + # TODO: implement running multiple instance of the same app if some metadata + # to be defined in app_data allows explicitly it. + app_name = app_name.lower().strip() + try: + app_file_path = next(iter(next(iter(self._apps[app_name].values())))) + except KeyError: + raise exceptions.NotFound( + f"No application found with the name {app_name!r}" + ) + log.info(f"starting {app_name!r}") + started_data = self._started.setdefault(app_name, []) + app_data = self.parse(app_file_path, extra) + app_data["_started"] = False + app_data['_file_path'] = app_file_path + app_data['_name_canonical'] = app_name + single_instance = app_data['single_instance'] + ret_data = { + "name": app_name, + "started": False + } + if single_instance: + if started_data: + instance_data = started_data[0] + instance_id = instance_data["_instance_id"] + ret_data["instance"] = instance_id + ret_data["started"] = instance_data["_started"] + ret_data["expose"] = await self.get_exposed( + instance_id, "instance", {"skip_compute": True} + ) + log.info(f"{app_name!r} is already started or being started") + return ret_data + else: + cache_path = self.host.memory.get_cache_path( + PLUGIN_INFO[C.PI_IMPORT_NAME], app_name + ) + cache_path.mkdir(0o700, parents=True, exist_ok=True) + app_data['_instance_dir_path'] = cache_path + else: + dest_dir_obj = tempfile.TemporaryDirectory(prefix="sat_app_") + app_data['_instance_dir_obj'] = dest_dir_obj + app_data['_instance_dir_path'] = Path(dest_dir_obj.name) + instance_id = ret_data["instance"] = app_data['_instance_id'] = shortuuid.uuid() + manager = self.get_manager(app_data) + app_data['_manager'] = manager + started_data.append(app_data) + self._instances[instance_id] = app_data + # we retrieve exposed data such as url_prefix which can be useful computed exposed + # data must wait for the app to be started, so we skip them for now + ret_data["expose"] = await self.get_exposed( + instance_id, "instance", {"skip_compute": True} + ) + + try: + start = manager.start + except AttributeError: + raise exceptions.InternalError( + f"{manager.name} doesn't have the mandatory \"start\" method" + ) + else: + defer.ensureDeferred(self.start_app(start, app_data)) + return ret_data + + async def start_app(self, start_cb: Callable, app_data: dict) -> None: + app_name = app_data["_name_canonical"] + instance_id = app_data["_instance_id"] + try: + await start_cb(app_data) + except Exception as e: + log.exception(f"Can't start libervia app {app_name!r}") + self.host.bridge.application_error( + app_name, + instance_id, + data_format.serialise( + { + "class": str(type(e)), + "msg": str(e) + } + )) + else: + app_data["_started"] = True + self.host.bridge.application_started(app_name, instance_id, "") + log.info(f"{app_name!r} started") + + def _stop(self, identifier, id_type, extra): + extra = data_format.deserialise(extra) + return defer.ensureDeferred( + self.stop(str(identifier), str(id_type) or None, extra)) + + async def stop( + self, + identifier: str, + id_type: Optional[str] = None, + extra: Optional[dict] = None, + ) -> None: + if extra is None: + extra = {} + + app_data = self.get_app_data(id_type, identifier) + + log.info(f"stopping {app_data['name']!r}") + + app_name = app_data['_name_canonical'] + instance_id = app_data['_instance_id'] + manager = app_data['_manager'] + + try: + stop = manager.stop + except AttributeError: + raise exceptions.InternalError( + f"{manager.name} doesn't have the mandatory \"stop\" method" + ) + else: + try: + await stop(app_data) + except Exception as e: + log.warning( + f"Instance {instance_id} of application {app_name} can't be stopped " + f"properly: {e}" + ) + return + + try: + del self._instances[instance_id] + except KeyError: + log.error( + f"INTERNAL ERROR: {instance_id!r} is not present in self._instances") + + try: + self._started[app_name].remove(app_data) + except ValueError: + log.error( + "INTERNAL ERROR: there is no app data in self._started with id " + f"{instance_id!r}" + ) + + log.info(f"{app_name!r} stopped") + + def _get_exposed(self, identifier, id_type, extra): + extra = data_format.deserialise(extra) + d = defer.ensureDeferred(self.get_exposed(identifier, id_type, extra)) + d.addCallback(lambda d: data_format.serialise(d)) + return d + + async def get_exposed( + self, + identifier: str, + id_type: Optional[str] = None, + extra: Optional[dict] = None, + ) -> dict: + """Get data exposed by the application + + The manager's "compute_expose" method will be called if it exists. It can be used + to handle manager specific conventions. + """ + app_data = self.get_app_data(id_type, identifier) + if app_data.get('_exposed_computed', False): + return app_data['expose'] + if extra is None: + extra = {} + expose = app_data.setdefault("expose", {}) + if "passwords" in expose: + passwords = expose['passwords'] + for name, value in list(passwords.items()): + if isinstance(value, list): + # if we have a list, is the sequence of keys leading to the value + # to expose. We use "reduce" to retrieve the desired value + try: + passwords[name] = reduce(lambda l, k: l[k], value, app_data) + except Exception as e: + log.warning( + f"Can't retrieve exposed value for password {name!r}: {e}") + del passwords[name] + + url_prefix = expose.get("url_prefix") + if isinstance(url_prefix, list): + try: + expose["url_prefix"] = reduce(lambda l, k: l[k], url_prefix, app_data) + except Exception as e: + log.warning( + f"Can't retrieve exposed value for url_prefix: {e}") + del expose["url_prefix"] + + if extra.get("skip_compute", False): + return expose + + try: + compute_expose = app_data['_manager'].compute_expose + except AttributeError: + pass + else: + await compute_expose(app_data) + + app_data['_exposed_computed'] = True + return expose + + async def _do_prepare( + self, + app_data: dict, + ) -> None: + name = app_data['name'] + dest_path = app_data['_instance_dir_path'] + if next(dest_path.iterdir(), None) != None: + log.debug(f"There is already a prepared dir at {dest_path}, nothing to do") + return + try: + prepare = app_data['prepare'].copy() + except KeyError: + prepare = {} + + if not prepare: + log.debug("Nothing to prepare for {name!r}") + return + + for action, value in list(prepare.items()): + log.debug(f"[{name}] [prepare] running {action!r} action") + if action == "git": + try: + git_path = which('git')[0] + except IndexError: + raise exceptions.NotFound( + "Can't find \"git\" executable, {name} can't be started without it" + ) + await async_process.run(git_path, "clone", value, str(dest_path)) + log.debug(f"{value!r} git repository cloned at {dest_path}") + else: + raise NotImplementedError( + f"{action!r} is not managed, can't start {name}" + ) + del prepare[action] + + if prepare: + raise exceptions.InternalError('"prepare" should be empty') + + async def _do_create_files( + self, + app_data: dict, + ) -> None: + dest_path = app_data['_instance_dir_path'] + files = app_data.get('files') + if not files: + return + if not isinstance(files, dict): + raise ValueError('"files" must be a dictionary') + for filename, data in files.items(): + path = dest_path / filename + if path.is_file(): + log.info(f"{path} already exists, skipping") + with path.open("w") as f: + f.write(data.get("content", "")) + log.debug(f"{path} created") + + async def start_common(self, app_data: dict) -> None: + """Method running common action when starting a manager + + It should be called by managers in "start" method. + """ + await self._do_prepare(app_data) + await self._do_create_files(app_data)