Mercurial > libervia-backend
view libervia/backend/plugins/plugin_misc_app_manager/__init__.py @ 4309:b56b1eae7994
component email gateway: add multicasting:
XEP-0033 multicasting is now supported both for incoming and outgoing messages. XEP-0033
metadata are converted to suitable Email headers and vice versa.
Email address and JID are both supported, and delivery is done by the gateway when
suitable on incoming messages.
rel 450
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 26 Sep 2024 16:12:01 +0200 |
parents | 0d7bb4df2343 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin to manage external applications # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from functools import partial, reduce from pathlib import Path import secrets import string import tempfile import time from typing import Any, Callable, List, Optional from urllib.parse import urlparse, urlunparse import shortuuid from twisted.internet import defer from twisted.python.procutils import which from yaml.constructor import ConstructorError from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger from libervia.backend.memory import persistent from libervia.backend.plugins.plugin_misc_app_manager.models import AppManagerBackend from libervia.backend.tools.common import data_format from libervia.backend.tools.common import async_process log = getLogger(__name__) try: import yaml except ImportError: raise exceptions.MissingModule( "Missing module PyYAML, please download/install it. You can use " '"pip install pyyaml"' ) try: from yaml import CLoader as Loader, CDumper as Dumper except ImportError: log.warning( "Can't use LibYAML binding (is libyaml installed?), pure Python version will be " "used, but it is slower" ) from yaml import Loader, Dumper PLUGIN_INFO = { C.PI_NAME: "Applications Manager", C.PI_IMPORT_NAME: "APP_MANAGER", C.PI_TYPE: C.PLUG_TYPE_MISC, C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_MAIN: "AppManager", C.PI_HANDLER: "no", C.PI_DESCRIPTION: _( """Applications Manager Manage external applications using packagers, OS virtualization/containers or other software management tools. """ ), } APP_FILE_PREFIX = "libervia_app_" class AppManager: load = partial(yaml.load, Loader=Loader) dump = partial(yaml.dump, Dumper=Dumper) def __init__(self, host): log.info(_("plugin Applications Manager initialization")) self.host = host self._managers = {} self._apps = {} self._started = {} # instance id to app data map self._instances = {} self.persistent_data = persistent.LazyPersistentBinaryDict("app_manager") host.bridge.add_method( "applications_list", ".plugin", in_sign="as", out_sign="as", method=self.list_applications, ) host.bridge.add_method( "application_start", ".plugin", in_sign="ss", out_sign="s", method=self._start, async_=True, ) host.bridge.add_method( "application_stop", ".plugin", in_sign="sss", out_sign="", method=self._stop, async_=True, ) host.bridge.add_method( "application_exposed_get", ".plugin", in_sign="sss", out_sign="s", method=self._get_exposed, async_=True, ) # application has been started succeesfully, # args: name, instance_id, extra host.bridge.add_signal("application_started", ".plugin", signature="sss") # application went wrong with the application # args: name, instance_id, extra host.bridge.add_signal("application_error", ".plugin", signature="sss") yaml.add_constructor("!libervia_conf", self._libervia_conf_constr, Loader=Loader) yaml.add_constructor( "!libervia_generate_pwd", self._libervia_generate_pwd_constr, Loader=Loader ) yaml.add_constructor( "!libervia_param", self._libervia_param_constr, Loader=Loader ) def unload(self): log.debug("unloading applications manager") for instances in self._started.values(): for instance in instances: data = instance["data"] if not data["single_instance"]: log.debug( f"cleaning temporary directory at {data['_instance_dir_path']}" ) data["_instance_dir_obj"].cleanup() def _libervia_conf_constr(self, loader, node) -> str: """Get a value from Libervia configuration A list is expected with either "name" of a config parameter, a one or more of those parameters: - section - name - default value - filter filter can be: - "first": get the first item of the value - "not": get the opposite value (to be used with booleans) """ config_data = loader.construct_sequence(node) if len(config_data) == 1: section, name, default, filter_ = "", config_data[0], None, None if len(config_data) == 2: (section, name), default, filter_ = config_data, None, None elif len(config_data) == 3: (section, name, default), filter_ = config_data, None elif len(config_data) == 4: section, name, default, filter_ = config_data else: raise ValueError( f"invalid !libervia_conf value ({config_data!r}), a list of 1 to 4 items " "is expected" ) value = self.host.memory.config_get(section, name, default) # FIXME: "public_url" is used only here and doesn't take multi-sites into account if name == "public_url" and (not value or value.startswith("http")): if not value: log.warning( _( 'No value found for "public_url", using "example.org" for ' "now, please set the proper value in libervia.conf" ) ) else: log.warning( _( 'invalid value for "public_url" ({value}), it musts not start with ' 'schema ("http"), ignoring it and using "example.org" ' "instead" ).format(value=value) ) value = "example.org" if filter_ is None: pass elif filter_ == "first": value = value[0] elif filter_ == "not": value = C.bool(value) value = C.bool_const(not value).lower() else: raise ValueError(f"unmanaged filter: {filter_}") return value def _libervia_generate_pwd_constr(self, loader, node) -> str: """Generate a password and store it in persistent data. If the password has already been generated previously, it is reused. Mapping arguments are: ``name`` (str) **required** Name of the password. Will be used to store it. ``size`` (int) Size of the password to generate. Default to 30 """ try: kwargs = loader.construct_mapping(node) name = kwargs["name"] except (ConstructorError, KeyError): raise ValueError( '!libervia_generate_pwd map arguments is missing. At least "name" ' "must be specified." ) pwd_data_key = f"pwd_{name}" try: key = self._app_persistent_data[pwd_data_key] except KeyError: alphabet = string.ascii_letters + string.digits key_size = int(kwargs.get("size", 30)) key = "".join(secrets.choice(alphabet) for __ in range(key_size)) self._app_persistent_data[pwd_data_key] = key else: log.debug(f"Re-using existing key for {name!r} password.") return key def _libervia_param_constr(self, loader, node) -> str: """Get a parameter specified when starting the application The value can be either the name of the parameter to get, or a list as [name, default_value] """ try: name, default = loader.construct_sequence(node) except ConstructorError: name, default = loader.construct_scalar(node), None assert self._params is not None return self._params.get(name, default) def register(self, manager): name = manager.name if name in self._managers: raise exceptions.ConflictError( f"There is already a manager with the name {name}" ) self._managers[manager.name] = manager if manager.discover_path is not None: self.discover(manager.discover_path, manager) def get_manager(self, app_data: dict) -> AppManagerBackend: """Get manager instance needed for this app @raise exceptions.DataError: something is wrong with the type @raise exceptions.NotFound: manager is not registered """ try: app_type = app_data["type"] except KeyError: raise exceptions.DataError('app file doesn\'t have the mandatory "type" key') if not isinstance(app_type, str): raise exceptions.DataError(f"invalid app data type: {app_type!r}") app_type = app_type.strip() try: return self._managers[app_type] except KeyError: raise exceptions.NotFound( f"No manager found to manage app of type {app_type!r}" ) def get_app_data(self, id_type: Optional[str], identifier: str) -> dict: """Retrieve instance's app_data from identifier @param id_type: type of the identifier, can be: - "name": identifier is a canonical application name the first found instance of this application is returned - "instance": identifier is an instance id @param identifier: identifier according to id_type @return: instance application data @raise exceptions.NotFound: no instance with this id can be found @raise ValueError: id_type is invalid """ if not id_type: id_type = "name" if id_type == "name": identifier = identifier.lower().strip() try: return next(iter(self._started[identifier])) except (KeyError, StopIteration): raise exceptions.NotFound( f"No instance of {identifier!r} is currently running" ) elif id_type == "instance": instance_id = identifier try: return self._instances[instance_id] except KeyError: raise exceptions.NotFound( f"There is no application instance running with id {instance_id!r}" ) else: raise ValueError(f"invalid id_type: {id_type!r}") def discover(self, dir_path: Path, manager: AppManagerBackend | None = None) -> None: """Search for app configuration file. App configuration files must start with [APP_FILE_PREFIX] and have a ``.yaml`` extension. """ for file_path in dir_path.glob(f"{APP_FILE_PREFIX}*.yaml"): if manager is None: try: app_data = self.parse(file_path) manager = self.get_manager(app_data) except (exceptions.DataError, exceptions.NotFound) as e: log.warning(f"Can't parse {file_path}, skipping: {e}") continue app_name = file_path.stem[len(APP_FILE_PREFIX) :].strip().lower() if not app_name: log.warning(f"invalid app file name at {file_path}") continue app_dict = self._apps.setdefault(app_name, {}) manager_set = app_dict.setdefault(manager, set()) manager_set.add(file_path) log.debug(f"{app_name!r} {manager.name} application found") def parse(self, file_path: Path, params: Optional[dict] = None) -> dict: """Parse Libervia application file @param params: parameters for running this instance @raise exceptions.DataError: something is wrong in the file """ if params is None: params = {} with file_path.open() as f: # we set parameters to be used only with this instance # no async method must used between this assignation and `load` self._params = params app_data = self.load(f) self._params = None if "name" not in app_data: # note that we don't use lower() here as we want human readable name and # uppercase may be set on purpose app_data["name"] = file_path.stem[len(APP_FILE_PREFIX) :].strip() single_instance = app_data.setdefault("single_instance", True) if not isinstance(single_instance, bool): raise ValueError( f'"single_instance" must be a boolean, but it is {type(single_instance)}' ) return app_data def list_applications(self, filters: Optional[List[str]]) -> List[str]: """List available application @param filters: only show applications matching those filters. using None will list all known applications a filter can be: - available: applications available locally - running: only show launched applications """ if not filters: return list(self.apps) found = set() for filter_ in filters: if filter_ == "available": found.update(self._apps) elif filter_ == "running": found.update(self._started) else: raise ValueError(f"Unknown filter: {filter_}") return list(found) def _start(self, app_name, extra): extra = data_format.deserialise(extra) d = defer.ensureDeferred(self.start(str(app_name), extra)) d.addCallback(data_format.serialise) return d async def start( self, app_name: str, extra: Optional[dict] = None, ) -> dict: """Start an application @param app_name: name of the application to start @param extra: extra parameters @return: data with following keys: - name (str): canonical application name - instance (str): instance ID - started (bool): True if the application is already started if False, the "application_started" signal should be used to get notificed when the application is actually started - expose (dict): exposed data as given by [self.get_exposed] exposed data which need to be computed are NOT returned, they will available when the app will be fully started, throught the [self.get_exposed] method. """ # FIXME: for now we use the first app manager available for the requested app_name # TODO: implement running multiple instance of the same app if some metadata # to be defined in app_data allows explicitly it. app_name = app_name.lower().strip() try: app_file_path = next(iter(next(iter(self._apps[app_name].values())))) except KeyError: raise exceptions.NotFound(f"No application found with the name {app_name!r}") log.info(f"starting {app_name!r}") self._app_persistent_data = await self.persistent_data.get(app_name) or {} self._app_persistent_data["last_started"] = time.time() started_data = self._started.setdefault(app_name, []) app_data = self.parse(app_file_path, extra) await self.persistent_data.aset(app_name, self._app_persistent_data) app_data["_started"] = False app_data["_file_path"] = app_file_path app_data["_name_canonical"] = app_name single_instance = app_data["single_instance"] ret_data = {"name": app_name, "started": False} if single_instance: if started_data: instance_data = started_data[0] instance_id = instance_data["_instance_id"] ret_data["instance"] = instance_id ret_data["started"] = instance_data["_started"] ret_data["expose"] = await self.get_exposed( instance_id, "instance", {"skip_compute": True} ) log.info(f"{app_name!r} is already started or being started") return ret_data else: cache_path = self.host.memory.get_cache_path( PLUGIN_INFO[C.PI_IMPORT_NAME], app_name ) cache_path.mkdir(0o700, parents=True, exist_ok=True) app_data["_instance_dir_path"] = cache_path else: dest_dir_obj = tempfile.TemporaryDirectory(prefix="libervia_app_") app_data["_instance_dir_obj"] = dest_dir_obj app_data["_instance_dir_path"] = Path(dest_dir_obj.name) instance_id = ret_data["instance"] = app_data["_instance_id"] = shortuuid.uuid() manager = self.get_manager(app_data) app_data["_manager"] = manager started_data.append(app_data) self._instances[instance_id] = app_data # we retrieve exposed data such as url_prefix which can be useful computed exposed # data must wait for the app to be started, so we skip them for now ret_data["expose"] = await self.get_exposed( instance_id, "instance", {"skip_compute": True} ) try: start = manager.start except AttributeError: raise exceptions.InternalError( f'{manager.name} doesn\'t have the mandatory "start" method' ) else: defer.ensureDeferred(self.start_app(start, app_data)) return ret_data async def start_app(self, start_cb: Callable, app_data: dict) -> None: app_name = app_data["_name_canonical"] instance_id = app_data["_instance_id"] try: await start_cb(app_data) except Exception as e: log.exception(f"Can't start libervia app {app_name!r}") self.host.bridge.application_error( app_name, instance_id, data_format.serialise({"class": str(type(e)), "msg": str(e)}), ) else: app_data["_started"] = True self.host.bridge.application_started(app_name, instance_id, "") log.info(f"{app_name!r} started") def _stop(self, identifier, id_type, extra): extra = data_format.deserialise(extra) return defer.ensureDeferred( self.stop(str(identifier), str(id_type) or None, extra) ) async def stop( self, identifier: str, id_type: Optional[str] = None, extra: Optional[dict] = None, ) -> None: if extra is None: extra = {} app_data = self.get_app_data(id_type, identifier) log.info(f"stopping {app_data['name']!r}") app_name = app_data["_name_canonical"] instance_id = app_data["_instance_id"] manager = app_data["_manager"] try: stop = manager.stop except AttributeError: raise exceptions.InternalError( f'{manager.name} doesn\'t have the mandatory "stop" method' ) else: try: await stop(app_data) except Exception as e: log.warning( f"Instance {instance_id} of application {app_name} can't be stopped " f"properly: {e}" ) return try: del self._instances[instance_id] except KeyError: log.error( f"INTERNAL ERROR: {instance_id!r} is not present in self._instances" ) try: self._started[app_name].remove(app_data) except ValueError: log.error( "INTERNAL ERROR: there is no app data in self._started with id " f"{instance_id!r}" ) log.info(f"{app_name!r} stopped") def _get_exposed(self, identifier, id_type, extra): extra = data_format.deserialise(extra) d = defer.ensureDeferred(self.get_exposed(identifier, id_type, extra)) d.addCallback(lambda d: data_format.serialise(d)) return d def get_app_data_value(self, path: list[str], data: dict[str, Any]) -> Any: """Return a value from app data from its path. @param path: sequence of keys to get to retrieve the value @return: requested value @raise KeyError: Can't find the requested value. """ return reduce(lambda l, k: l[k], path, data) async def get_exposed( self, identifier: str, id_type: Optional[str] = None, extra: Optional[dict] = None, ) -> dict: """Get data exposed by the application The manager's "compute_expose" method will be called if it exists. It can be used to handle manager specific conventions. """ app_data = self.get_app_data(id_type, identifier) if app_data.get("_exposed_computed", False): return app_data["expose"] if extra is None: extra = {} expose = app_data.setdefault("expose", {}) if "passwords" in expose: passwords = expose["passwords"] for name, value in list(passwords.items()): if isinstance(value, list): # if we have a list, is the sequence of keys leading to the value # to expose. try: passwords[name] = self.get_app_data_value(value, app_data) except KeyError: log.warning( f"Can't retrieve exposed value for password {name!r}: {e}" ) del passwords[name] for key in ("url_prefix", "front_url"): value = expose.get(key) if isinstance(value, list): try: expose[key] = self.get_app_data_value(value, app_data) except KeyError: log.warning(f"Can't retrieve exposed value for {key!r} at {value}") del expose[key] front_url = expose.get("front_url") if front_url: # we want to be sure that a scheme is defined, defaulting to ``https`` parsed_url = urlparse(front_url) if not parsed_url.scheme: if not parsed_url.netloc: path_elt = parsed_url.path.split("/", 1) parsed_url = parsed_url._replace( netloc=path_elt[0], path=f"/{path_elt[1]}" if len(path_elt) > 1 else "", ) parsed_url = parsed_url._replace(scheme="https") expose["front_url"] = urlunparse(parsed_url) if extra.get("skip_compute", False): return expose try: compute_expose = app_data["_manager"].compute_expose except AttributeError: pass else: await compute_expose(app_data) app_data["_exposed_computed"] = True return expose async def _do_prepare( self, app_data: dict, ) -> None: name = app_data["name"] dest_path = app_data["_instance_dir_path"] if next(dest_path.iterdir(), None) != None: log.debug(f"There is already a prepared dir at {dest_path}, nothing to do") return try: prepare = app_data["prepare"].copy() except KeyError: prepare = {} if not prepare: log.debug(f"Nothing to prepare for {name!r}") return for action, value in list(prepare.items()): log.debug(f"[{name}] [prepare] running {action!r} action") if action == "git": try: git_path = which("git")[0] except IndexError: raise exceptions.NotFound( "Can't find \"git\" executable, {name} can't be started without it" ) await async_process.run(git_path, "clone", value, str(dest_path)) log.debug(f"{value!r} git repository cloned at {dest_path}") else: raise NotImplementedError( f"{action!r} is not managed, can't start {name}" ) del prepare[action] if prepare: raise exceptions.InternalError('"prepare" should be empty') async def _do_create_files( self, app_data: dict, ) -> None: dest_path = app_data["_instance_dir_path"] files = app_data.get("files") if not files: return if not isinstance(files, dict): raise ValueError('"files" must be a dictionary') for filename, data in files.items(): path = dest_path / filename if path.is_file(): log.info(f"{path} already exists, skipping") with path.open("w") as f: f.write(data.get("content", "")) log.debug(f"{path} created") async def start_common(self, app_data: dict) -> None: """Method running common action when starting a manager It should be called by managers in "start" method. """ await self._do_prepare(app_data) await self._do_create_files(app_data)