Mercurial > libervia-backend
view src/bridge/bridge_constructor/bridge_contructor.py @ 347:ea3e1b82dd79
core: contact deletion from roster if we have no subscription to it (behaviour may change in futur)
quick frontend: groups is updated in contactList in case of roster push
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 29 May 2011 16:12:08 +0200 |
parents | 4402ac630712 |
children | eb9d33ba4e36 |
line wrap: on
line source
#!/usr/bin/python #-*- coding: utf-8 -*- """ SAT: a jabber client Copyright (C) 2009, 2010, 2011 Jérôme Poisson (goffi@goffi.org) This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. """ #consts NAME = u"bridge_constructor" VERSION="0.1.0" DEST_DIR="generated" ABOUT = NAME+u""" v%s (c) Jérôme Poisson (aka Goffi) 2011 --- """+NAME+u""" Copyright (C) 2011 Jérôme Poisson (aka Goffi) This program comes with ABSOLUTELY NO WARRANTY; This is free software, and you are welcome to redistribute it under certain conditions. --- This script construct a SàT bridge using the given protocol """ MANAGED_PROTOCOLES=['dbus','mediawiki'] DEFAULT_PROTOCOLE='dbus' FLAGS=['deprecated', 'async'] import sys import os from os import path from optparse import OptionParser from ConfigParser import SafeConfigParser as Parser from ConfigParser import NoOptionError import re from datetime import datetime class ParseError(Exception): #Used when the signature parsing is going wrong (invalid signature ?) pass class Constructor: def __init__(self, bridge_template, options): self.bridge_template = bridge_template self.options = options def getValues(self, name): """Return values of a function in a dict @param name: Name of the function to get @return: dict, each key has the config value or None if the value is not set""" function={} for option in ['type','category','sig_in','sig_out','doc']: try: value = self.bridge_template.get(name, option) except NoOptionError: value = None function[option] = value return function def getDefault(self, name): """Return default values of a function in a dict @param name: Name of the function to get @return: dict, each key is the integer param number (no key if no default value)""" default_dict={} def_re = re.compile(r"param_(\d+)_default") for option in self.bridge_template.options(name): match = def_re.match(option) if match: try: idx = int(match.group(1)) except ValueError: raise ParseError("Invalid value [%s] for parameter number" % match.group(1)) default_dict[idx] = self.bridge_template.get(name, option) return default_dict def getFlags(self, name): """Return list of flags set for this function @param name: Name of the function to get @return: List of flags (string)""" flags=[] for option in self.bridge_template.options(name): if option in FLAGS: flags.append(option) return flags def getArgumentsDoc(self, name): """Return documentation of arguments @param name: Name of the function to get @return: dict, each key is the integer param number (no key if no argument doc), value is a tuple (name, doc)""" doc_dict={} option_re = re.compile(r"doc_param_(\d+)") value_re = re.compile(r"^(\w+): (.*)$", re.MULTILINE | re.DOTALL) for option in self.bridge_template.options(name): if option == 'doc_return': doc_dict['return'] = self.bridge_template.get(name, option) continue match = option_re.match(option) if match: try: idx = int(match.group(1)) except ValueError: raise ParseError("Invalid value [%s] for parameter number" % match.group(1)) value_match = value_re.match(self.bridge_template.get(name, option)) if not value_match: raise ParseError("Invalid value for parameter doc [%i]" % idx) doc_dict[idx]=(value_match.group(1),value_match.group(2)) return doc_dict def getDoc(self, name): """Return documentation of the method @param name: Name of the function to get @return: string documentation, or None""" if self.bridge_template.has_option(name, "doc"): return self.bridge_template.get(name, "doc") return None def getArguments(self, signature, name=None, default=None, unicode_protect=False): """Return arguments to user given a signature @param signature: signature in the short form (using s,a,i,b etc) @param name: dictionary of arguments name like given by getArguments @param default: dictionary of default values, like given by getDefault @param unicode_protect: activate unicode protection on strings (return strings as unicode(str)) @return: list of arguments that correspond to a signature (e.g.: "sss" return "arg1, arg2, arg3")""" i=0 idx=0 attr_string=[] while i<len(signature): if signature[i] not in ['b','y','n','i','x','q','u','t','d','s','a']: raise ParseError("Unmanaged attribute type [%c]" % signature[i]) attr_string.append(("unicode(%(name)s)%(default)s" if (unicode_protect and signature[i]=='s') else "%(name)s%(default)s") % { 'name':name[idx][0] if (name and name.has_key(idx)) else "arg_%i" % idx, 'default':"="+default[idx] if (default and default.has_key(idx)) else '' }) #give arg_1, arg2, etc or name1, name2=default, etc. \ #give unicode(arg_1), unicode(arg_2), etc. if unicode_protect is set and arg is a string idx+=1 if signature[i] == 'a': i+=1 if signature[i]!='{' and signature[i]!='(': #FIXME: must manage tuples out of arrays i+=1 continue #we have a simple type for the array opening_car = signature[i] assert(opening_car in ['{','(']) closing_car = '}' if opening_car == '{' else ')' opening_count = 1 while (True): #we have a dict or a list of tuples i+=1 if i>=len(signature): raise ParseError("missing }") if signature[i] == opening_car: opening_count+=1 if signature[i] == closing_car: opening_count-=1 if opening_count == 0: break i+=1 return ", ".join(attr_string) def generateCoreSide(self): """create the constructor in SàT core side (backend)""" raise NotImplementedError def generateFrontendSide(self): """create the constructor in SàT frontend side""" raise NotImplementedError def finalWrite(self, filename, file_buf): """Write the final generated file in DEST_DIR/filename @param filename: name of the file to generate @param file_buf: list of lines (stings) of the file""" if os.path.exists(DEST_DIR) and not os.path.isdir(DEST_DIR): print ("The destination dir [%s] can't be created: a file with this name already exists !") sys.exit(1) try: if not os.path.exists(DEST_DIR): os.mkdir(DEST_DIR) full_path=os.path.join(DEST_DIR,filename) if os.path.exists(full_path) and not self.options.force: print ("The destination file [%s] already exists ! Use --force to overwrite it" % full_path) try: with open(full_path,'w') as dest_file: dest_file.write('\n'.join(file_buf)) except IOError: print ("Can't open destination file [%s]" % full_path) except OSError: print("It's not possible to generate the file, check your permissions") exit(1) class MediawikiConstructor(Constructor): def __init__(self, bridge_template, options): Constructor.__init__(self, bridge_template, options) self.core_template="mediawiki_template.tpl" self.core_dest="mediawiki.wiki" def _wikiParameter(self, name, sig_in): """Format parameters with the wiki syntax @param name: name of the function @param sig_in: signature in @return: string of the formated parameters""" arg_doc = self.getArgumentsDoc(name) arg_default = self.getDefault(name) args_str = self.getArguments(sig_in) args = args_str.split(', ') if args_str else [] #ugly but it works :) wiki=[] for i in range(len(args)): if arg_doc.has_key(i): name,doc=arg_doc[i] doc='\n:'.join(doc.rstrip('\n').split('\n')) wiki.append("; %s: %s" % (name, doc)) else: wiki.append("; arg_%d: " % i) if arg_default.has_key(i): wiki.append(":''DEFAULT: %s''" % arg_default[i]) return "\n".join(wiki) def _wikiReturn(self, name): """Format return doc with the wiki syntax @param name: name of the function """ arg_doc = self.getArgumentsDoc(name) wiki=[] if arg_doc.has_key('return'): wiki.append('\n|-\n! scope=row | return value\n|') wiki.append('<br />\n'.join(arg_doc['return'].rstrip('\n').split('\n'))) return "\n".join(wiki) def generateCoreSide(self): signals_part = [] methods_part = [] sections = self.bridge_template.sections() sections.sort() for section in sections: function = self.getValues(section) print ("Adding %s %s" % (section, function["type"])) default = self.getDefault(section) async_msg = """<br />'''This method is asynchrone'''""" deprecated_msg = """<br />'''<font color="#FF0000">/!\ WARNING /!\ : This method is deprecated, please don't use it !</font>'''""" signature_signal = \ """\ ! scope=row | signature | %s |-\ """ % function['sig_in'] signature_method = \ """\ ! scope=row | signature in | %s |- ! scope=row | signature out | %s |-\ """ % (function['sig_in'], function['sig_out']) completion = { 'signature':signature_signal if function['type']=="signal" else signature_method, 'sig_out':function['sig_out'] or '', 'category':function['category'], 'name':section, 'doc':self.getDoc(section) or "FIXME: No description available", 'async':async_msg if "async" in self.getFlags(section) else "", 'deprecated':deprecated_msg if "deprecated" in self.getFlags(section) else "", 'parameters':self._wikiParameter(section, function['sig_in']), 'return':self._wikiReturn(section) if function['type'] == 'method' else '' } dest = signals_part if function['type'] == "signal" else methods_part dest.append("""\ == %(name)s == ''%(doc)s'' %(deprecated)s %(async)s {| class="wikitable" style="text-align:left; width:80%%;" ! scope=row | category | %(category)s |- %(signature)s ! scope=row | parameters | %(parameters)s%(return)s |} """ % completion) #at this point, signals_part, and methods_part should be filled, #we just have to place them in the right part of the template core_bridge = [] try: with open(self.core_template) as core_template: for line in core_template: if line.startswith('##SIGNALS_PART##'): core_bridge.extend(signals_part) elif line.startswith('##METHODS_PART##'): core_bridge.extend(methods_part) elif line.startswith('##TIMESTAMP##'): core_bridge.append('Generated on %s' % datetime.now()) else: core_bridge.append(line.replace('\n','')) except IOError: print ("Can't open template file [%s]" % self.core_template) sys.exit(1) #now we write to final file self.finalWrite(self.core_dest, core_bridge) class DbusConstructor(Constructor): def __init__(self, bridge_template, options): Constructor.__init__(self, bridge_template, options) self.core_template="dbus_core_template.py" self.frontend_template="dbus_frontend_template.py" self.frontend_dest = self.core_dest="DBus.py" def generateCoreSide(self): signals_part = [] methods_part = [] direct_calls = [] sections = self.bridge_template.sections() sections.sort() for section in sections: function = self.getValues(section) print ("Adding %s %s" % (section, function["type"])) default = self.getDefault(section) arg_doc = self.getArgumentsDoc(section) async = "async" in self.getFlags(section) completion = { 'sig_in':function['sig_in'] or '', 'sig_out':function['sig_out'] or '', 'category':'REQ' if function['category'] == 'request' else 'COMM', 'name':section, 'args':self.getArguments(function['sig_in'], name=arg_doc, default=default ) } if function["type"] == "signal": completion['body'] = "pass" if not self.options.debug else 'debug ("%s")' % section signals_part.append("""\ @dbus.service.signal(const_INT_PREFIX+const_%(category)s_SUFFIX, signature='%(sig_in)s') def %(name)s(self, %(args)s): %(body)s """ % completion) direct_calls.append("""\ def %(name)s(self, %(args)s): self.dbus_bridge.%(name)s(%(args)s) """ % completion) elif function["type"] == "method": completion['debug'] = "" if not self.options.debug else 'debug ("%s")\n%s' % (section,8*' ') completion['args_result'] = self.getArguments(function['sig_in'], name=arg_doc, unicode_protect=self.options.unicode) completion['async_comma'] = ', ' if async and function['sig_in'] else '' completion['async_args_def'] = 'callback=None, errback=None' if async else '' completion['async_args_call'] = 'callback, errback' if async else '' completion['async_callbacks'] = "('callback', 'errback')" if async else "None" methods_part.append("""\ @dbus.service.method(const_INT_PREFIX+const_%(category)s_SUFFIX, in_signature='%(sig_in)s', out_signature='%(sig_out)s', async_callbacks=%(async_callbacks)s) def %(name)s(self, %(args)s%(async_comma)s%(async_args_def)s): %(debug)sreturn self.cb["%(name)s"](%(args_result)s%(async_comma)s%(async_args_call)s) """ % completion) #at this point, signals_part, methods_part and direct_calls should be filled, #we just have to place them in the right part of the template core_bridge = [] try: with open(self.core_template) as core_template: for line in core_template: if line.startswith('##SIGNALS_PART##'): core_bridge.extend(signals_part) elif line.startswith('##METHODS_PART##'): core_bridge.extend(methods_part) elif line.startswith('##DIRECT_CALLS##'): core_bridge.extend(direct_calls) else: core_bridge.append(line.replace('\n','')) except IOError: print ("Can't open template file [%s]" % self.core_template) sys.exit(1) #now we write to final file self.finalWrite(self.core_dest, core_bridge) def generateFrontendSide(self): methods_part = [] sections = self.bridge_template.sections() sections.sort() for section in sections: function = self.getValues(section) print ("Adding %s %s" % (section, function["type"])) default = self.getDefault(section) arg_doc = self.getArgumentsDoc(section) async = "async" in self.getFlags(section) completion = { 'sig_in':function['sig_in'] or '', 'sig_out':function['sig_out'] or '', 'category':'req' if function['category'] == 'request' else 'comm', 'name':section, 'args':self.getArguments(function['sig_in'], name=arg_doc, default=default) } if function["type"] == "method": completion['debug'] = "" if not self.options.debug else 'debug ("%s")\n%s' % (section,8*' ') completion['args_result'] = self.getArguments(function['sig_in'], name=arg_doc) completion['async_args'] = ', callback=None, errback=None' if async else '' completion['async_comma'] = ', ' if async and function['sig_in'] else '' completion['async_args_result'] = 'reply_handler=callback, error_handler=errback' if async else '' result = "self.db_%(category)s_iface.%(name)s(%(args_result)s%(async_comma)s%(async_args_result)s)" % completion completion['result'] = ("unicode(%s)" if self.options.unicode and function['sig_out'] == 's' else "%s") % result methods_part.append("""\ def %(name)s(self, %(args)s%(async_args)s): %(debug)sreturn %(result)s """ % completion) #at this point, methods_part should be filled, #we just have to place it in the right part of the template frontend_bridge = [] try: with open(self.frontend_template) as frontend_template: for line in frontend_template: if line.startswith('##METHODS_PART##'): frontend_bridge.extend(methods_part) else: frontend_bridge.append(line.replace('\n','')) except IOError: print ("Can't open template file [%s]" % self.frontend_template) sys.exit(1) #now we write to final file self.finalWrite(self.frontend_dest, frontend_bridge) class ConstructorError(Exception): pass class ConstructorFactory: def create(self, bridge_template, options): if options.protocole=='dbus': return DbusConstructor(bridge_template, options) elif options.protocole=='mediawiki': return MediawikiConstructor(bridge_template, options) raise ConstructorError('Unknown constructor type') class BridgeConstructor: def __init__(self): self.options = None def check_options(self): """Check command line options""" _usage=""" %prog [options] %prog --help for options list """ parser = OptionParser(usage=_usage,version=ABOUT % VERSION) parser.add_option("-p", "--protocole", action="store", type="string", default=DEFAULT_PROTOCOLE, help="Generate bridge using PROTOCOLE (default: %s, possible values: [%s])" % (DEFAULT_PROTOCOLE, ", ".join(MANAGED_PROTOCOLES))) parser.add_option("-s", "--side", action="store", type="string", default="core", help="Which side of the bridge do you want to make ? (default: %default, possible values: [core, frontend])") parser.add_option("-t", "--template", action="store", type="string", default='bridge_template.ini', help="Use TEMPLATE to generate bridge (default: %default)") parser.add_option("-f", "--force", action="store_true", default=False, help=("Force overwritting of existing files")) parser.add_option("-d", "--debug", action="store_true", default=False, help=("Add debug information printing")) parser.add_option("--no_unicode", action="store_false", dest="unicode", default=True, help=("Remove unicode type protection from string results")) (self.options, args) = parser.parse_args() return args def go(self): self.check_options() self.template = Parser() try: self.template.readfp(open(self.options.template)) except IOError: print ("The template file doesn't exist or is not accessible") exit(1) constructor = ConstructorFactory().create(self.template, self.options) if self.options.side == "core": constructor.generateCoreSide() elif self.options.side == "frontend": constructor.generateFrontendSide() if __name__ == "__main__": bc = BridgeConstructor() bc.go()