Mercurial > libervia-backend
view src/bridge/bridge_constructor/bridge_constructor.py @ 2084:e1015a5df6f5
bridge(constructor): constructor now uses argparse instead of optparse
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 02 Oct 2016 15:56:20 +0200 |
parents | 046449cc2bff |
children | da4097de5a95 |
line wrap: on
line source
#!/usr/bin/env python2 #-*- coding: utf-8 -*- # SàT: a XMPP client # Copyright (C) 2009-2016 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from sat.core.constants import Const as C import sys import os import argparse from ConfigParser import SafeConfigParser as Parser from ConfigParser import NoOptionError import re from datetime import datetime from xml.dom import minidom #consts NAME = u"bridge_constructor" __version__ = C.APP_VERSION DEST_DIR_DEFAULT = "generated" DESCRIPTION = u"""{name} Copyright (C) 2009-2016 Jérôme Poisson (aka Goffi) This script construct a SàT bridge using the given protocol This program comes with ABSOLUTELY NO WARRANTY; This is free software, and you are welcome to redistribute it under certain conditions. """.format(name=NAME, version=__version__) # TODO: move protocoles in separate files (plugins?) MANAGED_PROTOCOLES = ['dbus', 'mediawiki', 'dbus-xml'] DEFAULT_PROTOCOLE = 'dbus' # flags used method/signal declaration (not to be confused with constructor flags) DECLARATION_FLAGS = ['deprecated', 'async'] ENV_OVERRIDE = "SAT_BRIDGE_CONST_" # Prefix used to override a constant class ParseError(Exception): #Used when the signature parsing is going wrong (invalid signature ?) pass class Constructor(object): def __init__(self, bridge_template, options): self.bridge_template = bridge_template self.args = options def getValues(self, name): """Return values of a function in a dict @param name: Name of the function to get @return: dict, each key has the config value or None if the value is not set""" function = {} for option in ['type', 'category', 'sig_in', 'sig_out', 'doc']: try: value = self.bridge_template.get(name, option) except NoOptionError: value = None function[option] = value return function def getDefault(self, name): """Return default values of a function in a dict @param name: Name of the function to get @return: dict, each key is the integer param number (no key if no default value)""" default_dict = {} def_re = re.compile(r"param_(\d+)_default") for option in self.bridge_template.options(name): match = def_re.match(option) if match: try: idx = int(match.group(1)) except ValueError: raise ParseError("Invalid value [%s] for parameter number" % match.group(1)) default_dict[idx] = self.bridge_template.get(name, option) return default_dict def getFlags(self, name): """Return list of flags set for this function @param name: Name of the function to get @return: List of flags (string)""" flags = [] for option in self.bridge_template.options(name): if option in DECLARATION_FLAGS: flags.append(option) return flags def getArgumentsDoc(self, name): """Return documentation of arguments @param name: Name of the function to get @return: dict, each key is the integer param number (no key if no argument doc), value is a tuple (name, doc)""" doc_dict = {} option_re = re.compile(r"doc_param_(\d+)") value_re = re.compile(r"^(\w+): (.*)$", re.MULTILINE | re.DOTALL) for option in self.bridge_template.options(name): if option == 'doc_return': doc_dict['return'] = self.bridge_template.get(name, option) continue match = option_re.match(option) if match: try: idx = int(match.group(1)) except ValueError: raise ParseError("Invalid value [%s] for parameter number" % match.group(1)) value_match = value_re.match(self.bridge_template.get(name, option)) if not value_match: raise ParseError("Invalid value for parameter doc [%i]" % idx) doc_dict[idx] = (value_match.group(1), value_match.group(2)) return doc_dict def getDoc(self, name): """Return documentation of the method @param name: Name of the function to get @return: string documentation, or None""" if self.bridge_template.has_option(name, "doc"): return self.bridge_template.get(name, "doc") return None def argumentsParser(self, signature): """Generator which return individual arguments signatures from a global signature""" start = 0 i = 0 while i < len(signature): if signature[i] not in ['b', 'y', 'n', 'i', 'x', 'q', 'u', 't', 'd', 's', 'a']: raise ParseError("Unmanaged attribute type [%c]" % signature[i]) if signature[i] == 'a': i += 1 if signature[i] != '{' and signature[i] != '(': # FIXME: must manage tuples out of arrays i += 1 yield signature[start:i] start = i continue # we have a simple type for the array opening_car = signature[i] assert(opening_car in ['{', '(']) closing_car = '}' if opening_car == '{' else ')' opening_count = 1 while (True): # we have a dict or a list of tuples i += 1 if i >= len(signature): raise ParseError("missing }") if signature[i] == opening_car: opening_count += 1 if signature[i] == closing_car: opening_count -= 1 if opening_count == 0: break i += 1 yield signature[start:i] start = i def getArguments(self, signature, name=None, default=None, unicode_protect=False): """Return arguments to user given a signature @param signature: signature in the short form (using s,a,i,b etc) @param name: dictionary of arguments name like given by getArguments @param default: dictionary of default values, like given by getDefault @param unicode_protect: activate unicode protection on strings (return strings as unicode(str)) @return: list of arguments that correspond to a signature (e.g.: "sss" return "arg1, arg2, arg3") """ idx = 0 attr_string = [] for arg in self.argumentsParser(signature): attr_string.append(("unicode(%(name)s)%(default)s" if (unicode_protect and arg == 's') else "%(name)s%(default)s") % { 'name': name[idx][0] if (name and idx in name) else "arg_%i" % idx, 'default': "=" + default[idx] if (default and idx in default) else ''}) # give arg_1, arg2, etc or name1, name2=default, etc. #give unicode(arg_1), unicode(arg_2), etc. if unicode_protect is set and arg is a string idx += 1 return ", ".join(attr_string) def generateCoreSide(self): """create the constructor in SàT core side (backend)""" raise NotImplementedError def generateFrontendSide(self): """create the constructor in SàT frontend side""" raise NotImplementedError def finalWrite(self, filename, file_buf): """Write the final generated file in [dest dir]/filename @param filename: name of the file to generate @param file_buf: list of lines (stings) of the file """ if os.path.exists(self.args.dest_dir) and not os.path.isdir(self.args.dest_dir): print ("The destination dir [%s] can't be created: a file with this name already exists !") sys.exit(1) try: if not os.path.exists(self.args.dest_dir): os.mkdir(self.args.dest_dir) full_path = os.path.join(self.args.dest_dir, filename) if os.path.exists(full_path) and not self.args.force: print ("The destination file [%s] already exists ! Use --force to overwrite it" % full_path) try: with open(full_path, 'w') as dest_file: dest_file.write('\n'.join(file_buf)) except IOError: print ("Can't open destination file [%s]" % full_path) except OSError: print("It's not possible to generate the file, check your permissions") exit(1) class MediawikiConstructor(Constructor): def __init__(self, bridge_template, options): Constructor.__init__(self, bridge_template, options) self.core_template = "mediawiki_template.tpl" self.core_dest = "mediawiki.wiki" def _addTextDecorations(self, text): """Add text decorations like coloration or shortcuts""" def anchor_link(match): link = match.group(1) #we add anchor_link for [method_name] syntax: if link in self.bridge_template.sections(): return "[[#%s|%s]]" % (link, link) print ("WARNING: found an anchor link to an unknown method") return link return re.sub(r"\[(\w+)\]", anchor_link, text) def _wikiParameter(self, name, sig_in): """Format parameters with the wiki syntax @param name: name of the function @param sig_in: signature in @return: string of the formated parameters""" arg_doc = self.getArgumentsDoc(name) arg_default = self.getDefault(name) args_str = self.getArguments(sig_in) args = args_str.split(', ') if args_str else [] # ugly but it works :) wiki = [] for i in range(len(args)): if i in arg_doc: name, doc = arg_doc[i] doc = '\n:'.join(doc.rstrip('\n').split('\n')) wiki.append("; %s: %s" % (name, self._addTextDecorations(doc))) else: wiki.append("; arg_%d: " % i) if i in arg_default: wiki.append(":''DEFAULT: %s''" % arg_default[i]) return "\n".join(wiki) def _wikiReturn(self, name): """Format return doc with the wiki syntax @param name: name of the function """ arg_doc = self.getArgumentsDoc(name) wiki = [] if 'return' in arg_doc: wiki.append('\n|-\n! scope=row | return value\n|') wiki.append('<br />\n'.join(self._addTextDecorations(arg_doc['return']).rstrip('\n').split('\n'))) return "\n".join(wiki) def generateCoreSide(self): signals_part = [] methods_part = [] sections = self.bridge_template.sections() sections.sort() for section in sections: function = self.getValues(section) print ("Adding %s %s" % (section, function["type"])) async_msg = """<br />'''This method is asynchronous'''""" deprecated_msg = """<br />'''<font color="#FF0000">/!\ WARNING /!\ : This method is deprecated, please don't use it !</font>'''""" signature_signal = \ """\ ! scope=row | signature | %s |-\ """ % function['sig_in'] signature_method = \ """\ ! scope=row | signature in | %s |- ! scope=row | signature out | %s |-\ """ % (function['sig_in'], function['sig_out']) completion = { 'signature': signature_signal if function['type'] == "signal" else signature_method, 'sig_out': function['sig_out'] or '', 'category': function['category'], 'name': section, 'doc': self.getDoc(section) or "FIXME: No description available", 'async': async_msg if "async" in self.getFlags(section) else "", 'deprecated': deprecated_msg if "deprecated" in self.getFlags(section) else "", 'parameters': self._wikiParameter(section, function['sig_in']), 'return': self._wikiReturn(section) if function['type'] == 'method' else ''} dest = signals_part if function['type'] == "signal" else methods_part dest.append("""\ == %(name)s == ''%(doc)s'' %(deprecated)s %(async)s {| class="wikitable" style="text-align:left; width:80%%;" ! scope=row | category | %(category)s |- %(signature)s ! scope=row | parameters | %(parameters)s%(return)s |} """ % completion) #at this point, signals_part, and methods_part should be filled, #we just have to place them in the right part of the template core_bridge = [] try: with open(self.core_template) as core_template: for line in core_template: if line.startswith('##SIGNALS_PART##'): core_bridge.extend(signals_part) elif line.startswith('##METHODS_PART##'): core_bridge.extend(methods_part) elif line.startswith('##TIMESTAMP##'): core_bridge.append('Generated on %s' % datetime.now()) else: core_bridge.append(line.replace('\n', '')) except IOError: print ("Can't open template file [%s]" % self.core_template) sys.exit(1) #now we write to final file self.finalWrite(self.core_dest, core_bridge) class DbusConstructor(Constructor): def __init__(self, bridge_template, options): Constructor.__init__(self, bridge_template, options) self.core_template = "dbus_core_template.py" self.frontend_template = "dbus_frontend_template.py" self.frontend_dest = self.core_dest = "DBus.py" def generateCoreSide(self): signals_part = [] methods_part = [] direct_calls = [] sections = self.bridge_template.sections() sections.sort() for section in sections: function = self.getValues(section) print ("Adding %s %s" % (section, function["type"])) default = self.getDefault(section) arg_doc = self.getArgumentsDoc(section) async = "async" in self.getFlags(section) completion = { 'sig_in': function['sig_in'] or '', 'sig_out': function['sig_out'] or '', 'category': 'PLUGIN' if function['category'] == 'plugin' else 'CORE', 'name': section, 'args': self.getArguments(function['sig_in'], name=arg_doc, default=default)} if function["type"] == "signal": completion['body'] = "pass" if not self.args.debug else 'log.debug ("%s")' % section signals_part.append("""\ @dbus.service.signal(const_INT_PREFIX+const_%(category)s_SUFFIX, signature='%(sig_in)s') def %(name)s(self, %(args)s): %(body)s """ % completion) direct_calls.append("""\ def %(name)s(self, %(args)s): self.dbus_bridge.%(name)s(%(args)s) """ % completion) elif function["type"] == "method": completion['debug'] = "" if not self.args.debug else 'log.debug ("%s")\n%s' % (section, 8 * ' ') completion['args_result'] = self.getArguments(function['sig_in'], name=arg_doc, unicode_protect=self.args.unicode) completion['async_comma'] = ', ' if async and function['sig_in'] else '' completion['async_args_def'] = 'callback=None, errback=None' if async else '' completion['async_args_call'] = 'callback=callback, errback=errback' if async else '' completion['async_callbacks'] = "('callback', 'errback')" if async else "None" methods_part.append("""\ @dbus.service.method(const_INT_PREFIX+const_%(category)s_SUFFIX, in_signature='%(sig_in)s', out_signature='%(sig_out)s', async_callbacks=%(async_callbacks)s) def %(name)s(self, %(args)s%(async_comma)s%(async_args_def)s): %(debug)sreturn self._callback("%(name)s", %(args_result)s%(async_comma)s%(async_args_call)s) """ % completion) #at this point, signals_part, methods_part and direct_calls should be filled, #we just have to place them in the right part of the template core_bridge = [] const_override_pref = filter(lambda env: env.startswith(ENV_OVERRIDE), os.environ) const_override = [env[len(ENV_OVERRIDE):] for env in const_override_pref] try: with open(self.core_template) as core_template: for line in core_template: if line.startswith('##SIGNALS_PART##'): core_bridge.extend(signals_part) elif line.startswith('##METHODS_PART##'): core_bridge.extend(methods_part) elif line.startswith('##DIRECT_CALLS##'): core_bridge.extend(direct_calls) else: if line.startswith('const_'): const_name = line[len('const_'):line.find(' = ')] if const_name in const_override: print ("const %s overriden" % const_name) core_bridge.append('const_%s = %s' % (const_name, os.environ[ENV_OVERRIDE + const_name])) continue core_bridge.append(line.replace('\n', '')) except IOError: print ("Can't open template file [%s]" % self.core_template) sys.exit(1) #now we write to final file self.finalWrite(self.core_dest, core_bridge) def generateFrontendSide(self): methods_part = [] sections = self.bridge_template.sections() sections.sort() for section in sections: function = self.getValues(section) print ("Adding %s %s" % (section, function["type"])) default = self.getDefault(section) arg_doc = self.getArgumentsDoc(section) async = "async" in self.getFlags(section) completion = { 'sig_in': function['sig_in'] or '', 'sig_out': function['sig_out'] or '', 'category': 'plugin' if function['category'] == 'plugin' else 'core', 'name': section, 'args': self.getArguments(function['sig_in'], name=arg_doc, default=default)} if function["type"] == "method": # XXX: we can manage blocking call in the same way as async one: if callback is None the call will be blocking completion['debug'] = "" if not self.args.debug else 'log.debug ("%s")\n%s' % (section, 8 * ' ') completion['args_result'] = self.getArguments(function['sig_in'], name=arg_doc) completion['async_args'] = 'callback=None, errback=None' completion['async_comma'] = ', ' if function['sig_in'] else '' completion['error_handler'] = """if callback is None: error_handler = None else: if errback is None: errback = log.error error_handler = lambda err:errback(dbus_to_bridge_exception(err)) """ if async: completion['blocking_call'] = '' completion['async_args_result'] = 'timeout=const_TIMEOUT, reply_handler=callback, error_handler=error_handler' else: # XXX: To have a blocking call, we must have not reply_handler, so we test if callback exists, and add reply_handler only in this case completion['blocking_call'] = """kwargs={} if callback is not None: kwargs['timeout'] = const_TIMEOUT kwargs['reply_handler'] = callback kwargs['error_handler'] = error_handler """ completion['async_args_result'] = '**kwargs' result = "self.db_%(category)s_iface.%(name)s(%(args_result)s%(async_comma)s%(async_args_result)s)" % completion completion['result'] = ("unicode(%s)" if self.args.unicode and function['sig_out'] == 's' else "%s") % result methods_part.append("""\ def %(name)s(self, %(args)s%(async_comma)s%(async_args)s): %(error_handler)s%(blocking_call)s%(debug)sreturn %(result)s """ % completion) #at this point, methods_part should be filled, #we just have to place it in the right part of the template frontend_bridge = [] const_override_pref = filter(lambda env: env.startswith(ENV_OVERRIDE), os.environ) const_override = [env[len(ENV_OVERRIDE):] for env in const_override_pref] try: with open(self.frontend_template) as frontend_template: for line in frontend_template: if line.startswith('##METHODS_PART##'): frontend_bridge.extend(methods_part) else: if line.startswith('const_'): const_name = line[len('const_'):line.find(' = ')] if const_name in const_override: print ("const %s overriden" % const_name) frontend_bridge.append('const_%s = %s' % (const_name, os.environ[ENV_OVERRIDE + const_name])) continue frontend_bridge.append(line.replace('\n', '')) except IOError: print ("Can't open template file [%s]" % self.frontend_template) sys.exit(1) #now we write to final file self.finalWrite(self.frontend_dest, frontend_bridge) class DbusXmlConstructor(Constructor): """Constructor for DBus XML syntaxt (used by Qt frontend)""" def __init__(self, bridge_template, options): Constructor.__init__(self, bridge_template, options) self.template = "dbus_xml_template.xml" self.core_dest = "org.goffi.sat.xml" self.default_annotation = {'a{ss}': 'StringDict', 'a(sa{ss}as)': 'QList<Contact>', 'a{i(ss)}': 'HistoryT', 'a(sss)': 'QList<MenuT>', 'a{sa{s(sia{ss})}}': 'PresenceStatusT', } def generateCoreSide(self): try: doc = minidom.parse(self.template) interface_elt = doc.getElementsByTagName('interface')[0] except IOError: print ("Can't access template") sys.exit(1) except IndexError: print ("Template error") sys.exit(1) sections = self.bridge_template.sections() sections.sort() for section in sections: function = self.getValues(section) print ("Adding %s %s" % (section, function["type"])) new_elt = doc.createElement('method' if function["type"] == 'method' else 'signal') new_elt.setAttribute('name', section) idx = 0 args_doc = self.getArgumentsDoc(section) for arg in self.argumentsParser(function['sig_in'] or ''): arg_elt = doc.createElement('arg') arg_elt.setAttribute('name', args_doc[idx][0] if idx in args_doc else "arg_%i" % idx) arg_elt.setAttribute('type', arg) _direction = 'in' if function["type"] == 'method' else 'out' arg_elt.setAttribute('direction', _direction) new_elt.appendChild(arg_elt) if "annotation" in self.args.flags: if arg in self.default_annotation: annot_elt = doc.createElement("annotation") annot_elt.setAttribute('name', "com.trolltech.QtDBus.QtTypeName.In%d" % idx) annot_elt.setAttribute('value', self.default_annotation[arg]) new_elt.appendChild(annot_elt) idx += 1 if function['sig_out']: arg_elt = doc.createElement('arg') arg_elt.setAttribute('type', function['sig_out']) arg_elt.setAttribute('direction', 'out') new_elt.appendChild(arg_elt) if "annotation" in self.args.flags: if function['sig_out'] in self.default_annotation: annot_elt = doc.createElement("annotation") annot_elt.setAttribute('name', "com.trolltech.QtDBus.QtTypeName.Out0") annot_elt.setAttribute('value', self.default_annotation[function['sig_out']]) new_elt.appendChild(annot_elt) interface_elt.appendChild(new_elt) #now we write to final file self.finalWrite(self.core_dest, [doc.toprettyxml()]) class ConstructorError(Exception): pass class ConstructorFactory(object): def create(self, bridge_template, options): if options.protocole == 'dbus': return DbusConstructor(bridge_template, options) elif options.protocole == 'mediawiki': return MediawikiConstructor(bridge_template, options) elif options.protocole == 'dbus-xml': return DbusXmlConstructor(bridge_template, options) raise ConstructorError('Unknown constructor type') class BridgeConstructor(object): def __init__(self): self.args = None def parse_args(self): """Check command line options""" parser = argparse.ArgumentParser(description=DESCRIPTION, formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument("--version", action="version", version= __version__) parser.add_argument("-p", "--protocole", choices=MANAGED_PROTOCOLES, default=DEFAULT_PROTOCOLE, help="generate bridge using PROTOCOLE (default: %(default)s)") # (default: %s, possible values: [%s])" % (DEFAULT_PROTOCOLE, ", ".join(MANAGED_PROTOCOLES))) parser.add_argument("-s", "--side", choices=("core", "frontend"), default="core", help="which side of the bridge do you want to make ?") # (default: %default, possible values: [core, frontend])") parser.add_argument("-t", "--template", type=file, default='bridge_template.ini', help="use TEMPLATE to generate bridge (default: %(default)s)") parser.add_argument("-f", "--force", action="store_true", help=("force overwritting of existing files")) parser.add_argument("-d", "--debug", action="store_true", help=("add debug information printing")) parser.add_argument("--no-unicode", action="store_false", dest="unicode", help=("remove unicode type protection from string results")) parser.add_argument("--flags", nargs='+', default=[], help=("constructors' specific flags")) parser.add_argument("--dest-dir", default=DEST_DIR_DEFAULT, help=("directory when the generated files will be written (default: %(default)s")) return parser.parse_args() def go(self): args = self.parse_args() self.template = Parser() try: self.template.readfp(args.template) except IOError: print ("The template file doesn't exist or is not accessible") exit(1) constructor = ConstructorFactory().create(self.template, args) if args.side == "core": constructor.generateCoreSide() elif args.side == "frontend": constructor.generateFrontendSide() if __name__ == "__main__": bc = BridgeConstructor() bc.go()