Mercurial > libervia-backend
diff libervia/backend/bridge/bridge_constructor/base_constructor.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/bridge/bridge_constructor/base_constructor.py@524856bd7b19 |
children | 0d7bb4df2343 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/bridge/bridge_constructor/base_constructor.py Fri Jun 02 11:49:51 2023 +0200 @@ -0,0 +1,364 @@ +#!/usr/bin/env python3 + + +# Libervia: an XMPP client +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +"""base constructor class""" + +from libervia.backend.bridge.bridge_constructor.constants import Const as C +from configparser import NoOptionError +import sys +import os +import os.path +import re +from importlib import import_module + + +class ParseError(Exception): + # Used when the signature parsing is going wrong (invalid signature ?) + pass + + +class Constructor(object): + NAME = None # used in arguments parsing, filename will be used if not set + # following attribute are used by default generation method + # they can be set to dict of strings using python formatting syntax + # dict keys will be used to select part to replace (e.g. "signals" key will + # replace ##SIGNALS_PART## in template), while the value is the format + # keys starting with "signal" will be used for signals, while ones starting with + # "method" will be used for methods + # check D-Bus constructor for an example + CORE_FORMATS = None + CORE_TEMPLATE = None + CORE_DEST = None + FRONTEND_FORMATS = None + FRONTEND_TEMPLATE = None + FRONTEND_DEST = None + + # set to False if your bridge needs only core + FRONTEND_ACTIVATE = True + + def __init__(self, bridge_template, options): + self.bridge_template = bridge_template + self.args = options + + @property + def constructor_dir(self): + constructor_mod = import_module(self.__module__) + return os.path.dirname(constructor_mod.__file__) + + def getValues(self, name): + """Return values of a function in a dict + @param name: Name of the function to get + @return: dict, each key has the config value or None if the value is not set""" + function = {} + for option in ["type", "category", "sig_in", "sig_out", "doc"]: + try: + value = self.bridge_template.get(name, option) + except NoOptionError: + value = None + function[option] = value + return function + + def get_default(self, name): + """Return default values of a function in a dict + @param name: Name of the function to get + @return: dict, each key is the integer param number (no key if no default value)""" + default_dict = {} + def_re = re.compile(r"param_(\d+)_default") + + for option in self.bridge_template.options(name): + match = def_re.match(option) + if match: + try: + idx = int(match.group(1)) + except ValueError: + raise ParseError( + "Invalid value [%s] for parameter number" % match.group(1) + ) + default_dict[idx] = self.bridge_template.get(name, option) + + return default_dict + + def getFlags(self, name): + """Return list of flags set for this function + + @param name: Name of the function to get + @return: List of flags (string) + """ + flags = [] + for option in self.bridge_template.options(name): + if option in C.DECLARATION_FLAGS: + flags.append(option) + return flags + + def get_arguments_doc(self, name): + """Return documentation of arguments + @param name: Name of the function to get + @return: dict, each key is the integer param number (no key if no argument doc), value is a tuple (name, doc)""" + doc_dict = {} + option_re = re.compile(r"doc_param_(\d+)") + value_re = re.compile(r"^(\w+): (.*)$", re.MULTILINE | re.DOTALL) + for option in self.bridge_template.options(name): + if option == "doc_return": + doc_dict["return"] = self.bridge_template.get(name, option) + continue + match = option_re.match(option) + if match: + try: + idx = int(match.group(1)) + except ValueError: + raise ParseError( + "Invalid value [%s] for parameter number" % match.group(1) + ) + value_match = value_re.match(self.bridge_template.get(name, option)) + if not value_match: + raise ParseError("Invalid value for parameter doc [%i]" % idx) + doc_dict[idx] = (value_match.group(1), value_match.group(2)) + return doc_dict + + def get_doc(self, name): + """Return documentation of the method + @param name: Name of the function to get + @return: string documentation, or None""" + if self.bridge_template.has_option(name, "doc"): + return self.bridge_template.get(name, "doc") + return None + + def arguments_parser(self, signature): + """Generator which return individual arguments signatures from a global signature""" + start = 0 + i = 0 + + while i < len(signature): + if signature[i] not in ["b", "y", "n", "i", "x", "q", "u", "t", "d", "s", + "a"]: + raise ParseError("Unmanaged attribute type [%c]" % signature[i]) + + if signature[i] == "a": + i += 1 + if ( + signature[i] != "{" and signature[i] != "(" + ): # FIXME: must manage tuples out of arrays + i += 1 + yield signature[start:i] + start = i + continue # we have a simple type for the array + opening_car = signature[i] + assert opening_car in ["{", "("] + closing_car = "}" if opening_car == "{" else ")" + opening_count = 1 + while True: # we have a dict or a list of tuples + i += 1 + if i >= len(signature): + raise ParseError("missing }") + if signature[i] == opening_car: + opening_count += 1 + if signature[i] == closing_car: + opening_count -= 1 + if opening_count == 0: + break + i += 1 + yield signature[start:i] + start = i + + def get_arguments(self, signature, name=None, default=None, unicode_protect=False): + """Return arguments to user given a signature + + @param signature: signature in the short form (using s,a,i,b etc) + @param name: dictionary of arguments name like given by get_arguments_doc + @param default: dictionary of default values, like given by get_default + @param unicode_protect: activate unicode protection on strings (return strings as unicode(str)) + @return (str): arguments that correspond to a signature (e.g.: "sss" return "arg1, arg2, arg3") + """ + idx = 0 + attr_string = [] + + for arg in self.arguments_parser(signature): + attr_string.append( + ( + "str(%(name)s)%(default)s" + if (unicode_protect and arg == "s") + else "%(name)s%(default)s" + ) + % { + "name": name[idx][0] if (name and idx in name) else "arg_%i" % idx, + "default": "=" + default[idx] if (default and idx in default) else "", + } + ) + # give arg_1, arg2, etc or name1, name2=default, etc. + # give unicode(arg_1), unicode(arg_2), etc. if unicode_protect is set and arg is a string + idx += 1 + + return ", ".join(attr_string) + + def get_template_path(self, template_file): + """return template path corresponding to file name + + @param template_file(str): name of template file + """ + return os.path.join(self.constructor_dir, template_file) + + def core_completion_method(self, completion, function, default, arg_doc, async_): + """override this method to extend completion""" + pass + + def core_completion_signal(self, completion, function, default, arg_doc, async_): + """override this method to extend completion""" + pass + + def frontend_completion_method(self, completion, function, default, arg_doc, async_): + """override this method to extend completion""" + pass + + def frontend_completion_signal(self, completion, function, default, arg_doc, async_): + """override this method to extend completion""" + pass + + def generate(self, side): + """generate bridge + + call generate_core_side or generateFrontendSide if they exists + else call generic self._generate method + """ + try: + if side == "core": + method = self.generate_core_side + elif side == "frontend": + if not self.FRONTEND_ACTIVATE: + print("This constructor only handle core, please use core side") + sys.exit(1) + method = self.generateFrontendSide + except AttributeError: + self._generate(side) + else: + method() + + def _generate(self, side): + """generate the backend + + this is a generic method which will use formats found in self.CORE_SIGNAL_FORMAT + and self.CORE_METHOD_FORMAT (standard format method will be used) + @param side(str): core or frontend + """ + side_vars = [] + for var in ("FORMATS", "TEMPLATE", "DEST"): + attr = "{}_{}".format(side.upper(), var) + value = getattr(self, attr) + if value is None: + raise NotImplementedError + side_vars.append(value) + + FORMATS, TEMPLATE, DEST = side_vars + del side_vars + + parts = {part.upper(): [] for part in FORMATS} + sections = self.bridge_template.sections() + sections.sort() + for section in sections: + function = self.getValues(section) + print(("Adding %s %s" % (section, function["type"]))) + default = self.get_default(section) + arg_doc = self.get_arguments_doc(section) + async_ = "async" in self.getFlags(section) + completion = { + "sig_in": function["sig_in"] or "", + "sig_out": function["sig_out"] or "", + "category": "plugin" if function["category"] == "plugin" else "core", + "name": section, + # arguments with default values + "args": self.get_arguments( + function["sig_in"], name=arg_doc, default=default + ), + "args_no_default": self.get_arguments(function["sig_in"], name=arg_doc), + } + + extend_method = getattr( + self, "{}_completion_{}".format(side, function["type"]) + ) + extend_method(completion, function, default, arg_doc, async_) + + for part, fmt in FORMATS.items(): + if (part.startswith(function["type"]) + or part.startswith(f"async_{function['type']}")): + parts[part.upper()].append(fmt.format(**completion)) + + # at this point, signals_part, methods_part and direct_calls should be filled, + # we just have to place them in the right part of the template + bridge = [] + const_override = { + env[len(C.ENV_OVERRIDE) :]: v + for env, v in os.environ.items() + if env.startswith(C.ENV_OVERRIDE) + } + template_path = self.get_template_path(TEMPLATE) + try: + with open(template_path) as template: + for line in template: + + for part, extend_list in parts.items(): + if line.startswith("##{}_PART##".format(part)): + bridge.extend(extend_list) + break + else: + # the line is not a magic part replacement + if line.startswith("const_"): + const_name = line[len("const_") : line.find(" = ")].strip() + if const_name in const_override: + print(("const {} overriden".format(const_name))) + bridge.append( + "const_{} = {}".format( + const_name, const_override[const_name] + ) + ) + continue + bridge.append(line.replace("\n", "")) + except IOError: + print(("can't open template file [{}]".format(template_path))) + sys.exit(1) + + # now we write to final file + self.final_write(DEST, bridge) + + def final_write(self, filename, file_buf): + """Write the final generated file in [dest dir]/filename + + @param filename: name of the file to generate + @param file_buf: list of lines (stings) of the file + """ + if os.path.exists(self.args.dest_dir) and not os.path.isdir(self.args.dest_dir): + print( + "The destination dir [%s] can't be created: a file with this name already exists !" + ) + sys.exit(1) + try: + if not os.path.exists(self.args.dest_dir): + os.mkdir(self.args.dest_dir) + full_path = os.path.join(self.args.dest_dir, filename) + if os.path.exists(full_path) and not self.args.force: + print(( + "The destination file [%s] already exists ! Use --force to overwrite it" + % full_path + )) + try: + with open(full_path, "w") as dest_file: + dest_file.write("\n".join(file_buf)) + except IOError: + print(("Can't open destination file [%s]" % full_path)) + except OSError: + print("It's not possible to generate the file, check your permissions") + exit(1)