Mercurial > libervia-backend
view src/bridge/bridge_constructor/bridge_contructor.py @ 359:eb9d33ba4e36
bridge: templates' constants can now be overrided
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 06 Jun 2011 18:35:30 +0200 |
parents | 4402ac630712 |
children | 3ea41a199b36 |
line wrap: on
line source
#!/usr/bin/python #-*- coding: utf-8 -*- """ SAT: a jabber client Copyright (C) 2009, 2010, 2011 Jérôme Poisson (goffi@goffi.org) This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. """ #consts NAME = u"bridge_constructor" VERSION="0.1.0" DEST_DIR="generated" ABOUT = NAME+u""" v%s (c) Jérôme Poisson (aka Goffi) 2011 --- """+NAME+u""" Copyright (C) 2011 Jérôme Poisson (aka Goffi) This program comes with ABSOLUTELY NO WARRANTY; This is free software, and you are welcome to redistribute it under certain conditions. --- This script construct a SàT bridge using the given protocol """ MANAGED_PROTOCOLES=['dbus','mediawiki'] DEFAULT_PROTOCOLE='dbus' FLAGS=['deprecated', 'async'] ENV_OVERRIDE = "SAT_BRIDGE_CONST_" #Prefix used to override a constant import sys import os from os import path from optparse import OptionParser from ConfigParser import SafeConfigParser as Parser from ConfigParser import NoOptionError import re from datetime import datetime class ParseError(Exception): #Used when the signature parsing is going wrong (invalid signature ?) pass class Constructor: def __init__(self, bridge_template, options): self.bridge_template = bridge_template self.options = options def getValues(self, name): """Return values of a function in a dict @param name: Name of the function to get @return: dict, each key has the config value or None if the value is not set""" function={} for option in ['type','category','sig_in','sig_out','doc']: try: value = self.bridge_template.get(name, option) except NoOptionError: value = None function[option] = value return function def getDefault(self, name): """Return default values of a function in a dict @param name: Name of the function to get @return: dict, each key is the integer param number (no key if no default value)""" default_dict={} def_re = re.compile(r"param_(\d+)_default") for option in self.bridge_template.options(name): match = def_re.match(option) if match: try: idx = int(match.group(1)) except ValueError: raise ParseError("Invalid value [%s] for parameter number" % match.group(1)) default_dict[idx] = self.bridge_template.get(name, option) return default_dict def getFlags(self, name): """Return list of flags set for this function @param name: Name of the function to get @return: List of flags (string)""" flags=[] for option in self.bridge_template.options(name): if option in FLAGS: flags.append(option) return flags def getArgumentsDoc(self, name): """Return documentation of arguments @param name: Name of the function to get @return: dict, each key is the integer param number (no key if no argument doc), value is a tuple (name, doc)""" doc_dict={} option_re = re.compile(r"doc_param_(\d+)") value_re = re.compile(r"^(\w+): (.*)$", re.MULTILINE | re.DOTALL) for option in self.bridge_template.options(name): if option == 'doc_return': doc_dict['return'] = self.bridge_template.get(name, option) continue match = option_re.match(option) if match: try: idx = int(match.group(1)) except ValueError: raise ParseError("Invalid value [%s] for parameter number" % match.group(1)) value_match = value_re.match(self.bridge_template.get(name, option)) if not value_match: raise ParseError("Invalid value for parameter doc [%i]" % idx) doc_dict[idx]=(value_match.group(1),value_match.group(2)) return doc_dict def getDoc(self, name): """Return documentation of the method @param name: Name of the function to get @return: string documentation, or None""" if self.bridge_template.has_option(name, "doc"): return self.bridge_template.get(name, "doc") return None def getArguments(self, signature, name=None, default=None, unicode_protect=False): """Return arguments to user given a signature @param signature: signature in the short form (using s,a,i,b etc) @param name: dictionary of arguments name like given by getArguments @param default: dictionary of default values, like given by getDefault @param unicode_protect: activate unicode protection on strings (return strings as unicode(str)) @return: list of arguments that correspond to a signature (e.g.: "sss" return "arg1, arg2, arg3")""" i=0 idx=0 attr_string=[] while i<len(signature): if signature[i] not in ['b','y','n','i','x','q','u','t','d','s','a']: raise ParseError("Unmanaged attribute type [%c]" % signature[i]) attr_string.append(("unicode(%(name)s)%(default)s" if (unicode_protect and signature[i]=='s') else "%(name)s%(default)s") % { 'name':name[idx][0] if (name and name.has_key(idx)) else "arg_%i" % idx, 'default':"="+default[idx] if (default and default.has_key(idx)) else '' }) #give arg_1, arg2, etc or name1, name2=default, etc. \ #give unicode(arg_1), unicode(arg_2), etc. if unicode_protect is set and arg is a string idx+=1 if signature[i] == 'a': i+=1 if signature[i]!='{' and signature[i]!='(': #FIXME: must manage tuples out of arrays i+=1 continue #we have a simple type for the array opening_car = signature[i] assert(opening_car in ['{','(']) closing_car = '}' if opening_car == '{' else ')' opening_count = 1 while (True): #we have a dict or a list of tuples i+=1 if i>=len(signature): raise ParseError("missing }") if signature[i] == opening_car: opening_count+=1 if signature[i] == closing_car: opening_count-=1 if opening_count == 0: break i+=1 return ", ".join(attr_string) def generateCoreSide(self): """create the constructor in SàT core side (backend)""" raise NotImplementedError def generateFrontendSide(self): """create the constructor in SàT frontend side""" raise NotImplementedError def finalWrite(self, filename, file_buf): """Write the final generated file in DEST_DIR/filename @param filename: name of the file to generate @param file_buf: list of lines (stings) of the file""" if os.path.exists(DEST_DIR) and not os.path.isdir(DEST_DIR): print ("The destination dir [%s] can't be created: a file with this name already exists !") sys.exit(1) try: if not os.path.exists(DEST_DIR): os.mkdir(DEST_DIR) full_path=os.path.join(DEST_DIR,filename) if os.path.exists(full_path) and not self.options.force: print ("The destination file [%s] already exists ! Use --force to overwrite it" % full_path) try: with open(full_path,'w') as dest_file: dest_file.write('\n'.join(file_buf)) except IOError: print ("Can't open destination file [%s]" % full_path) except OSError: print("It's not possible to generate the file, check your permissions") exit(1) class MediawikiConstructor(Constructor): def __init__(self, bridge_template, options): Constructor.__init__(self, bridge_template, options) self.core_template="mediawiki_template.tpl" self.core_dest="mediawiki.wiki" def _wikiParameter(self, name, sig_in): """Format parameters with the wiki syntax @param name: name of the function @param sig_in: signature in @return: string of the formated parameters""" arg_doc = self.getArgumentsDoc(name) arg_default = self.getDefault(name) args_str = self.getArguments(sig_in) args = args_str.split(', ') if args_str else [] #ugly but it works :) wiki=[] for i in range(len(args)): if arg_doc.has_key(i): name,doc=arg_doc[i] doc='\n:'.join(doc.rstrip('\n').split('\n')) wiki.append("; %s: %s" % (name, doc)) else: wiki.append("; arg_%d: " % i) if arg_default.has_key(i): wiki.append(":''DEFAULT: %s''" % arg_default[i]) return "\n".join(wiki) def _wikiReturn(self, name): """Format return doc with the wiki syntax @param name: name of the function """ arg_doc = self.getArgumentsDoc(name) wiki=[] if arg_doc.has_key('return'): wiki.append('\n|-\n! scope=row | return value\n|') wiki.append('<br />\n'.join(arg_doc['return'].rstrip('\n').split('\n'))) return "\n".join(wiki) def generateCoreSide(self): signals_part = [] methods_part = [] sections = self.bridge_template.sections() sections.sort() for section in sections: function = self.getValues(section) print ("Adding %s %s" % (section, function["type"])) default = self.getDefault(section) async_msg = """<br />'''This method is asynchrone'''""" deprecated_msg = """<br />'''<font color="#FF0000">/!\ WARNING /!\ : This method is deprecated, please don't use it !</font>'''""" signature_signal = \ """\ ! scope=row | signature | %s |-\ """ % function['sig_in'] signature_method = \ """\ ! scope=row | signature in | %s |- ! scope=row | signature out | %s |-\ """ % (function['sig_in'], function['sig_out']) completion = { 'signature':signature_signal if function['type']=="signal" else signature_method, 'sig_out':function['sig_out'] or '', 'category':function['category'], 'name':section, 'doc':self.getDoc(section) or "FIXME: No description available", 'async':async_msg if "async" in self.getFlags(section) else "", 'deprecated':deprecated_msg if "deprecated" in self.getFlags(section) else "", 'parameters':self._wikiParameter(section, function['sig_in']), 'return':self._wikiReturn(section) if function['type'] == 'method' else '' } dest = signals_part if function['type'] == "signal" else methods_part dest.append("""\ == %(name)s == ''%(doc)s'' %(deprecated)s %(async)s {| class="wikitable" style="text-align:left; width:80%%;" ! scope=row | category | %(category)s |- %(signature)s ! scope=row | parameters | %(parameters)s%(return)s |} """ % completion) #at this point, signals_part, and methods_part should be filled, #we just have to place them in the right part of the template core_bridge = [] try: with open(self.core_template) as core_template: for line in core_template: if line.startswith('##SIGNALS_PART##'): core_bridge.extend(signals_part) elif line.startswith('##METHODS_PART##'): core_bridge.extend(methods_part) elif line.startswith('##TIMESTAMP##'): core_bridge.append('Generated on %s' % datetime.now()) else: core_bridge.append(line.replace('\n','')) except IOError: print ("Can't open template file [%s]" % self.core_template) sys.exit(1) #now we write to final file self.finalWrite(self.core_dest, core_bridge) class DbusConstructor(Constructor): def __init__(self, bridge_template, options): Constructor.__init__(self, bridge_template, options) self.core_template="dbus_core_template.py" self.frontend_template="dbus_frontend_template.py" self.frontend_dest = self.core_dest="DBus.py" def generateCoreSide(self): signals_part = [] methods_part = [] direct_calls = [] sections = self.bridge_template.sections() sections.sort() for section in sections: function = self.getValues(section) print ("Adding %s %s" % (section, function["type"])) default = self.getDefault(section) arg_doc = self.getArgumentsDoc(section) async = "async" in self.getFlags(section) completion = { 'sig_in':function['sig_in'] or '', 'sig_out':function['sig_out'] or '', 'category':'REQ' if function['category'] == 'request' else 'COMM', 'name':section, 'args':self.getArguments(function['sig_in'], name=arg_doc, default=default ) } if function["type"] == "signal": completion['body'] = "pass" if not self.options.debug else 'debug ("%s")' % section signals_part.append("""\ @dbus.service.signal(const_INT_PREFIX+const_%(category)s_SUFFIX, signature='%(sig_in)s') def %(name)s(self, %(args)s): %(body)s """ % completion) direct_calls.append("""\ def %(name)s(self, %(args)s): self.dbus_bridge.%(name)s(%(args)s) """ % completion) elif function["type"] == "method": completion['debug'] = "" if not self.options.debug else 'debug ("%s")\n%s' % (section,8*' ') completion['args_result'] = self.getArguments(function['sig_in'], name=arg_doc, unicode_protect=self.options.unicode) completion['async_comma'] = ', ' if async and function['sig_in'] else '' completion['async_args_def'] = 'callback=None, errback=None' if async else '' completion['async_args_call'] = 'callback, errback' if async else '' completion['async_callbacks'] = "('callback', 'errback')" if async else "None" methods_part.append("""\ @dbus.service.method(const_INT_PREFIX+const_%(category)s_SUFFIX, in_signature='%(sig_in)s', out_signature='%(sig_out)s', async_callbacks=%(async_callbacks)s) def %(name)s(self, %(args)s%(async_comma)s%(async_args_def)s): %(debug)sreturn self.cb["%(name)s"](%(args_result)s%(async_comma)s%(async_args_call)s) """ % completion) #at this point, signals_part, methods_part and direct_calls should be filled, #we just have to place them in the right part of the template core_bridge = [] const_override_pref = filter(lambda env: env.startswith(ENV_OVERRIDE), os.environ) const_override = [env[len(ENV_OVERRIDE):] for env in const_override_pref] try: with open(self.core_template) as core_template: for line in core_template: if line.startswith('##SIGNALS_PART##'): core_bridge.extend(signals_part) elif line.startswith('##METHODS_PART##'): core_bridge.extend(methods_part) elif line.startswith('##DIRECT_CALLS##'): core_bridge.extend(direct_calls) else: if line.startswith('const_'): const_name = line[len('const_'):line.find(' = ')] if const_name in const_override: print ("const %s overriden" % const_name) core_bridge.append('const_%s = %s' % (const_name, os.environ[ENV_OVERRIDE+const_name])) continue core_bridge.append(line.replace('\n','')) except IOError: print ("Can't open template file [%s]" % self.core_template) sys.exit(1) #now we write to final file self.finalWrite(self.core_dest, core_bridge) def generateFrontendSide(self): methods_part = [] sections = self.bridge_template.sections() sections.sort() for section in sections: function = self.getValues(section) print ("Adding %s %s" % (section, function["type"])) default = self.getDefault(section) arg_doc = self.getArgumentsDoc(section) async = "async" in self.getFlags(section) completion = { 'sig_in':function['sig_in'] or '', 'sig_out':function['sig_out'] or '', 'category':'req' if function['category'] == 'request' else 'comm', 'name':section, 'args':self.getArguments(function['sig_in'], name=arg_doc, default=default) } if function["type"] == "method": completion['debug'] = "" if not self.options.debug else 'debug ("%s")\n%s' % (section,8*' ') completion['args_result'] = self.getArguments(function['sig_in'], name=arg_doc) completion['async_args'] = ', callback=None, errback=None' if async else '' completion['async_comma'] = ', ' if async and function['sig_in'] else '' completion['async_args_result'] = 'reply_handler=callback, error_handler=errback' if async else '' result = "self.db_%(category)s_iface.%(name)s(%(args_result)s%(async_comma)s%(async_args_result)s)" % completion completion['result'] = ("unicode(%s)" if self.options.unicode and function['sig_out'] == 's' else "%s") % result methods_part.append("""\ def %(name)s(self, %(args)s%(async_args)s): %(debug)sreturn %(result)s """ % completion) #at this point, methods_part should be filled, #we just have to place it in the right part of the template frontend_bridge = [] const_override_pref = filter(lambda env: env.startswith(ENV_OVERRIDE), os.environ) const_override = [env[len(ENV_OVERRIDE):] for env in const_override_pref] try: with open(self.frontend_template) as frontend_template: for line in frontend_template: if line.startswith('##METHODS_PART##'): frontend_bridge.extend(methods_part) else: if line.startswith('const_'): const_name = line[len('const_'):line.find(' = ')] if const_name in const_override: print ("const %s overriden" % const_name) frontend_bridge.append('const_%s = %s' % (const_name, os.environ[ENV_OVERRIDE+const_name])) continue frontend_bridge.append(line.replace('\n','')) except IOError: print ("Can't open template file [%s]" % self.frontend_template) sys.exit(1) #now we write to final file self.finalWrite(self.frontend_dest, frontend_bridge) class ConstructorError(Exception): pass class ConstructorFactory: def create(self, bridge_template, options): if options.protocole=='dbus': return DbusConstructor(bridge_template, options) elif options.protocole=='mediawiki': return MediawikiConstructor(bridge_template, options) raise ConstructorError('Unknown constructor type') class BridgeConstructor: def __init__(self): self.options = None def check_options(self): """Check command line options""" _usage=""" %prog [options] %prog --help for options list """ parser = OptionParser(usage=_usage,version=ABOUT % VERSION) parser.add_option("-p", "--protocole", action="store", type="string", default=DEFAULT_PROTOCOLE, help="Generate bridge using PROTOCOLE (default: %s, possible values: [%s])" % (DEFAULT_PROTOCOLE, ", ".join(MANAGED_PROTOCOLES))) parser.add_option("-s", "--side", action="store", type="string", default="core", help="Which side of the bridge do you want to make ? (default: %default, possible values: [core, frontend])") parser.add_option("-t", "--template", action="store", type="string", default='bridge_template.ini', help="Use TEMPLATE to generate bridge (default: %default)") parser.add_option("-f", "--force", action="store_true", default=False, help=("Force overwritting of existing files")) parser.add_option("-d", "--debug", action="store_true", default=False, help=("Add debug information printing")) parser.add_option("--no_unicode", action="store_false", dest="unicode", default=True, help=("Remove unicode type protection from string results")) (self.options, args) = parser.parse_args() return args def go(self): self.check_options() self.template = Parser() try: self.template.readfp(open(self.options.template)) except IOError: print ("The template file doesn't exist or is not accessible") exit(1) constructor = ConstructorFactory().create(self.template, self.options) if self.options.side == "core": constructor.generateCoreSide() elif self.options.side == "frontend": constructor.generateFrontendSide() if __name__ == "__main__": bc = BridgeConstructor() bc.go()