Mercurial > libervia-backend
diff src/bridge/bridge_constructor/base_constructor.py @ 2085:da4097de5a95
bridge (constructor): refactoring:
- constructors are now in separate modules
- constructors are discovered dynamically
- factorised generation code from D-Bus in base Constructor.
- A generic generation method is now available in base Constructor, using python formatting.
- removed bridge/bridge.py in core as it was useless, may come back in the future if needed
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 02 Oct 2016 22:44:33 +0200 |
parents | |
children | 159250d66407 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/bridge/bridge_constructor/base_constructor.py Sun Oct 02 22:44:33 2016 +0200 @@ -0,0 +1,324 @@ +#!/usr/bin/env python2 +#-*- coding: utf-8 -*- + +# SàT: a XMPP client +# Copyright (C) 2009-2016 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +"""base constructor class""" + +from sat.bridge.bridge_constructor.constants import Const as C +from ConfigParser import NoOptionError +import sys +import os +import os.path +import re +from importlib import import_module + + +class ParseError(Exception): + #Used when the signature parsing is going wrong (invalid signature ?) + pass + + +class Constructor(object): + NAME = None # used in arguments parsing, filename will be used if not set + # following attribute are used by default generation method + # they can be set to dict of strings using python formatting syntax + # dict keys will be used to select part to replace (e.g. "signals" key will + # replace ##SIGNALS_PART## in template), while the value is the format + # keys starting with "signal" will be used for signals, while ones starting with + # "method" will be used for methods + # check D-Bus constructor for an example + CORE_FORMATS = None + CORE_TEMPLATE = None + CORE_DEST = None + FRONTEND_FORMATS = None + FRONTEND_TEMPLATE = None + FRONTEND_DEST = None + + def __init__(self, bridge_template, options): + self.bridge_template = bridge_template + self.args = options + + @property + def constructor_dir(self): + constructor_mod = import_module(self.__module__) + return os.path.dirname(constructor_mod.__file__) + + def getValues(self, name): + """Return values of a function in a dict + @param name: Name of the function to get + @return: dict, each key has the config value or None if the value is not set""" + function = {} + for option in ['type', 'category', 'sig_in', 'sig_out', 'doc']: + try: + value = self.bridge_template.get(name, option) + except NoOptionError: + value = None + function[option] = value + return function + + def getDefault(self, name): + """Return default values of a function in a dict + @param name: Name of the function to get + @return: dict, each key is the integer param number (no key if no default value)""" + default_dict = {} + def_re = re.compile(r"param_(\d+)_default") + + for option in self.bridge_template.options(name): + match = def_re.match(option) + if match: + try: + idx = int(match.group(1)) + except ValueError: + raise ParseError("Invalid value [%s] for parameter number" % match.group(1)) + default_dict[idx] = self.bridge_template.get(name, option) + + return default_dict + + def getFlags(self, name): + """Return list of flags set for this function + + @param name: Name of the function to get + @return: List of flags (string) + """ + flags = [] + for option in self.bridge_template.options(name): + if option in C.DECLARATION_FLAGS: + flags.append(option) + return flags + + def getArgumentsDoc(self, name): + """Return documentation of arguments + @param name: Name of the function to get + @return: dict, each key is the integer param number (no key if no argument doc), value is a tuple (name, doc)""" + doc_dict = {} + option_re = re.compile(r"doc_param_(\d+)") + value_re = re.compile(r"^(\w+): (.*)$", re.MULTILINE | re.DOTALL) + for option in self.bridge_template.options(name): + if option == 'doc_return': + doc_dict['return'] = self.bridge_template.get(name, option) + continue + match = option_re.match(option) + if match: + try: + idx = int(match.group(1)) + except ValueError: + raise ParseError("Invalid value [%s] for parameter number" % match.group(1)) + value_match = value_re.match(self.bridge_template.get(name, option)) + if not value_match: + raise ParseError("Invalid value for parameter doc [%i]" % idx) + doc_dict[idx] = (value_match.group(1), value_match.group(2)) + return doc_dict + + def getDoc(self, name): + """Return documentation of the method + @param name: Name of the function to get + @return: string documentation, or None""" + if self.bridge_template.has_option(name, "doc"): + return self.bridge_template.get(name, "doc") + return None + + def argumentsParser(self, signature): + """Generator which return individual arguments signatures from a global signature""" + start = 0 + i = 0 + + while i < len(signature): + if signature[i] not in ['b', 'y', 'n', 'i', 'x', 'q', 'u', 't', 'd', 's', 'a']: + raise ParseError("Unmanaged attribute type [%c]" % signature[i]) + + if signature[i] == 'a': + i += 1 + if signature[i] != '{' and signature[i] != '(': # FIXME: must manage tuples out of arrays + i += 1 + yield signature[start:i] + start = i + continue # we have a simple type for the array + opening_car = signature[i] + assert(opening_car in ['{', '(']) + closing_car = '}' if opening_car == '{' else ')' + opening_count = 1 + while (True): # we have a dict or a list of tuples + i += 1 + if i >= len(signature): + raise ParseError("missing }") + if signature[i] == opening_car: + opening_count += 1 + if signature[i] == closing_car: + opening_count -= 1 + if opening_count == 0: + break + i += 1 + yield signature[start:i] + start = i + + def getArguments(self, signature, name=None, default=None, unicode_protect=False): + """Return arguments to user given a signature + + @param signature: signature in the short form (using s,a,i,b etc) + @param name: dictionary of arguments name like given by getArguments + @param default: dictionary of default values, like given by getDefault + @param unicode_protect: activate unicode protection on strings (return strings as unicode(str)) + @return: list of arguments that correspond to a signature (e.g.: "sss" return "arg1, arg2, arg3") + """ + idx = 0 + attr_string = [] + + for arg in self.argumentsParser(signature): + attr_string.append(("unicode(%(name)s)%(default)s" if (unicode_protect and arg == 's') else "%(name)s%(default)s") % { + 'name': name[idx][0] if (name and idx in name) else "arg_%i" % idx, + 'default': "=" + default[idx] if (default and idx in default) else ''}) + # give arg_1, arg2, etc or name1, name2=default, etc. + #give unicode(arg_1), unicode(arg_2), etc. if unicode_protect is set and arg is a string + idx += 1 + + return ", ".join(attr_string) + + def getTemplatePath(self, template_file): + """return template path corresponding to file name + + @param template_file(str): name of template file + """ + return os.path.join(self.constructor_dir, template_file) + + def core_completion_method(self, completion, function, default, arg_doc, async_): + """override this method to extend completion""" + pass + + def core_completion_signal(self, completion, function, default, arg_doc, async_): + """override this method to extend completion""" + pass + + def frontend_completion_method(self, completion, function, default, arg_doc, async_): + """override this method to extend completion""" + pass + + def frontend_completion_signal(self, completion, function, default, arg_doc, async_): + """override this method to extend completion""" + pass + + + def generate(self, side): + """generate bridge + + call generateCoreSide or generateFrontendSide if they exists + else call generic self._generate method + """ + try: + if side == "core": + method = self.generateCoreSide + elif side == "frontend": + method = self.generateFrontendSide + except AttributeError: + self._generate(side) + else: + method() + + def _generate(self, side): + """generate the backend + + this is a generic method which will use formats found in self.CORE_SIGNAL_FORMAT + and self.CORE_METHOD_FORMAT (standard format method will be used) + @param side(str): core or frontend + """ + side_vars = [] + for var in ('FORMATS', 'TEMPLATE', 'DEST'): + attr = "{}_{}".format(side.upper(), var) + value = getattr(self, attr) + if value is None: + raise NotImplementedError + side_vars.append(value) + + FORMATS, TEMPLATE, DEST = side_vars + del side_vars + + parts = {part.upper():[] for part in FORMATS} + sections = self.bridge_template.sections() + sections.sort() + for section in sections: + function = self.getValues(section) + print ("Adding %s %s" % (section, function["type"])) + default = self.getDefault(section) + arg_doc = self.getArgumentsDoc(section) + async_ = "async" in self.getFlags(section) + completion = { + 'sig_in': function['sig_in'] or '', + 'sig_out': function['sig_out'] or '', + 'category': 'plugin' if function['category'] == 'plugin' else 'core', + 'name': section, + 'args': self.getArguments(function['sig_in'], name=arg_doc, default=default)} + + extend_method = getattr(self, "{}_completion_{}".format(side, function["type"])) + extend_method(completion, function, default, arg_doc, async_) + + for part, fmt in FORMATS.iteritems(): + if part.startswith(function["type"]): + parts[part.upper()].append(fmt.format(**completion)) + + + #at this point, signals_part, methods_part and direct_calls should be filled, + #we just have to place them in the right part of the template + bridge = [] + const_override = {env[len(C.ENV_OVERRIDE):]:v for env,v in os.environ.iteritems() if env.startswith(C.ENV_OVERRIDE)} + template_path = self.getTemplatePath(TEMPLATE) + try: + with open(template_path) as template: + for line in template: + + for part, extend_list in parts.iteritems(): + if line.startswith('##{}_PART##'.format(part)): + bridge.extend(extend_list) + break + else: + # the line is not a magic part replacement + if line.startswith('const_'): + const_name = line[len('const_'):line.find(' = ')].strip() + if const_name in const_override: + print("const {} overriden".format(const_name)) + bridge.append('const_{} = {}'.format(const_name, const_override[const_name])) + continue + bridge.append(line.replace('\n', '')) + except IOError: + print ("can't open template file [{}]".format(template_path)) + sys.exit(1) + + #now we write to final file + self.finalWrite(DEST, bridge) + + def finalWrite(self, filename, file_buf): + """Write the final generated file in [dest dir]/filename + + @param filename: name of the file to generate + @param file_buf: list of lines (stings) of the file + """ + if os.path.exists(self.args.dest_dir) and not os.path.isdir(self.args.dest_dir): + print ("The destination dir [%s] can't be created: a file with this name already exists !") + sys.exit(1) + try: + if not os.path.exists(self.args.dest_dir): + os.mkdir(self.args.dest_dir) + full_path = os.path.join(self.args.dest_dir, filename) + if os.path.exists(full_path) and not self.args.force: + print ("The destination file [%s] already exists ! Use --force to overwrite it" % full_path) + try: + with open(full_path, 'w') as dest_file: + dest_file.write('\n'.join(file_buf)) + except IOError: + print ("Can't open destination file [%s]" % full_path) + except OSError: + print("It's not possible to generate the file, check your permissions") + exit(1)