comparison libervia/backend/bridge/bridge_constructor/base_constructor.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/bridge/bridge_constructor/base_constructor.py@524856bd7b19
children 0d7bb4df2343
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3
4 # Libervia: an XMPP client
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 """base constructor class"""
21
22 from libervia.backend.bridge.bridge_constructor.constants import Const as C
23 from configparser import NoOptionError
24 import sys
25 import os
26 import os.path
27 import re
28 from importlib import import_module
29
30
31 class ParseError(Exception):
32 # Used when the signature parsing is going wrong (invalid signature ?)
33 pass
34
35
36 class Constructor(object):
37 NAME = None # used in arguments parsing, filename will be used if not set
38 # following attribute are used by default generation method
39 # they can be set to dict of strings using python formatting syntax
40 # dict keys will be used to select part to replace (e.g. "signals" key will
41 # replace ##SIGNALS_PART## in template), while the value is the format
42 # keys starting with "signal" will be used for signals, while ones starting with
43 # "method" will be used for methods
44 #  check D-Bus constructor for an example
45 CORE_FORMATS = None
46 CORE_TEMPLATE = None
47 CORE_DEST = None
48 FRONTEND_FORMATS = None
49 FRONTEND_TEMPLATE = None
50 FRONTEND_DEST = None
51
52 # set to False if your bridge needs only core
53 FRONTEND_ACTIVATE = True
54
55 def __init__(self, bridge_template, options):
56 self.bridge_template = bridge_template
57 self.args = options
58
59 @property
60 def constructor_dir(self):
61 constructor_mod = import_module(self.__module__)
62 return os.path.dirname(constructor_mod.__file__)
63
64 def getValues(self, name):
65 """Return values of a function in a dict
66 @param name: Name of the function to get
67 @return: dict, each key has the config value or None if the value is not set"""
68 function = {}
69 for option in ["type", "category", "sig_in", "sig_out", "doc"]:
70 try:
71 value = self.bridge_template.get(name, option)
72 except NoOptionError:
73 value = None
74 function[option] = value
75 return function
76
77 def get_default(self, name):
78 """Return default values of a function in a dict
79 @param name: Name of the function to get
80 @return: dict, each key is the integer param number (no key if no default value)"""
81 default_dict = {}
82 def_re = re.compile(r"param_(\d+)_default")
83
84 for option in self.bridge_template.options(name):
85 match = def_re.match(option)
86 if match:
87 try:
88 idx = int(match.group(1))
89 except ValueError:
90 raise ParseError(
91 "Invalid value [%s] for parameter number" % match.group(1)
92 )
93 default_dict[idx] = self.bridge_template.get(name, option)
94
95 return default_dict
96
97 def getFlags(self, name):
98 """Return list of flags set for this function
99
100 @param name: Name of the function to get
101 @return: List of flags (string)
102 """
103 flags = []
104 for option in self.bridge_template.options(name):
105 if option in C.DECLARATION_FLAGS:
106 flags.append(option)
107 return flags
108
109 def get_arguments_doc(self, name):
110 """Return documentation of arguments
111 @param name: Name of the function to get
112 @return: dict, each key is the integer param number (no key if no argument doc), value is a tuple (name, doc)"""
113 doc_dict = {}
114 option_re = re.compile(r"doc_param_(\d+)")
115 value_re = re.compile(r"^(\w+): (.*)$", re.MULTILINE | re.DOTALL)
116 for option in self.bridge_template.options(name):
117 if option == "doc_return":
118 doc_dict["return"] = self.bridge_template.get(name, option)
119 continue
120 match = option_re.match(option)
121 if match:
122 try:
123 idx = int(match.group(1))
124 except ValueError:
125 raise ParseError(
126 "Invalid value [%s] for parameter number" % match.group(1)
127 )
128 value_match = value_re.match(self.bridge_template.get(name, option))
129 if not value_match:
130 raise ParseError("Invalid value for parameter doc [%i]" % idx)
131 doc_dict[idx] = (value_match.group(1), value_match.group(2))
132 return doc_dict
133
134 def get_doc(self, name):
135 """Return documentation of the method
136 @param name: Name of the function to get
137 @return: string documentation, or None"""
138 if self.bridge_template.has_option(name, "doc"):
139 return self.bridge_template.get(name, "doc")
140 return None
141
142 def arguments_parser(self, signature):
143 """Generator which return individual arguments signatures from a global signature"""
144 start = 0
145 i = 0
146
147 while i < len(signature):
148 if signature[i] not in ["b", "y", "n", "i", "x", "q", "u", "t", "d", "s",
149 "a"]:
150 raise ParseError("Unmanaged attribute type [%c]" % signature[i])
151
152 if signature[i] == "a":
153 i += 1
154 if (
155 signature[i] != "{" and signature[i] != "("
156 ): # FIXME: must manage tuples out of arrays
157 i += 1
158 yield signature[start:i]
159 start = i
160 continue # we have a simple type for the array
161 opening_car = signature[i]
162 assert opening_car in ["{", "("]
163 closing_car = "}" if opening_car == "{" else ")"
164 opening_count = 1
165 while True: # we have a dict or a list of tuples
166 i += 1
167 if i >= len(signature):
168 raise ParseError("missing }")
169 if signature[i] == opening_car:
170 opening_count += 1
171 if signature[i] == closing_car:
172 opening_count -= 1
173 if opening_count == 0:
174 break
175 i += 1
176 yield signature[start:i]
177 start = i
178
179 def get_arguments(self, signature, name=None, default=None, unicode_protect=False):
180 """Return arguments to user given a signature
181
182 @param signature: signature in the short form (using s,a,i,b etc)
183 @param name: dictionary of arguments name like given by get_arguments_doc
184 @param default: dictionary of default values, like given by get_default
185 @param unicode_protect: activate unicode protection on strings (return strings as unicode(str))
186 @return (str): arguments that correspond to a signature (e.g.: "sss" return "arg1, arg2, arg3")
187 """
188 idx = 0
189 attr_string = []
190
191 for arg in self.arguments_parser(signature):
192 attr_string.append(
193 (
194 "str(%(name)s)%(default)s"
195 if (unicode_protect and arg == "s")
196 else "%(name)s%(default)s"
197 )
198 % {
199 "name": name[idx][0] if (name and idx in name) else "arg_%i" % idx,
200 "default": "=" + default[idx] if (default and idx in default) else "",
201 }
202 )
203 # give arg_1, arg2, etc or name1, name2=default, etc.
204 # give unicode(arg_1), unicode(arg_2), etc. if unicode_protect is set and arg is a string
205 idx += 1
206
207 return ", ".join(attr_string)
208
209 def get_template_path(self, template_file):
210 """return template path corresponding to file name
211
212 @param template_file(str): name of template file
213 """
214 return os.path.join(self.constructor_dir, template_file)
215
216 def core_completion_method(self, completion, function, default, arg_doc, async_):
217 """override this method to extend completion"""
218 pass
219
220 def core_completion_signal(self, completion, function, default, arg_doc, async_):
221 """override this method to extend completion"""
222 pass
223
224 def frontend_completion_method(self, completion, function, default, arg_doc, async_):
225 """override this method to extend completion"""
226 pass
227
228 def frontend_completion_signal(self, completion, function, default, arg_doc, async_):
229 """override this method to extend completion"""
230 pass
231
232 def generate(self, side):
233 """generate bridge
234
235 call generate_core_side or generateFrontendSide if they exists
236 else call generic self._generate method
237 """
238 try:
239 if side == "core":
240 method = self.generate_core_side
241 elif side == "frontend":
242 if not self.FRONTEND_ACTIVATE:
243 print("This constructor only handle core, please use core side")
244 sys.exit(1)
245 method = self.generateFrontendSide
246 except AttributeError:
247 self._generate(side)
248 else:
249 method()
250
251 def _generate(self, side):
252 """generate the backend
253
254 this is a generic method which will use formats found in self.CORE_SIGNAL_FORMAT
255 and self.CORE_METHOD_FORMAT (standard format method will be used)
256 @param side(str): core or frontend
257 """
258 side_vars = []
259 for var in ("FORMATS", "TEMPLATE", "DEST"):
260 attr = "{}_{}".format(side.upper(), var)
261 value = getattr(self, attr)
262 if value is None:
263 raise NotImplementedError
264 side_vars.append(value)
265
266 FORMATS, TEMPLATE, DEST = side_vars
267 del side_vars
268
269 parts = {part.upper(): [] for part in FORMATS}
270 sections = self.bridge_template.sections()
271 sections.sort()
272 for section in sections:
273 function = self.getValues(section)
274 print(("Adding %s %s" % (section, function["type"])))
275 default = self.get_default(section)
276 arg_doc = self.get_arguments_doc(section)
277 async_ = "async" in self.getFlags(section)
278 completion = {
279 "sig_in": function["sig_in"] or "",
280 "sig_out": function["sig_out"] or "",
281 "category": "plugin" if function["category"] == "plugin" else "core",
282 "name": section,
283 # arguments with default values
284 "args": self.get_arguments(
285 function["sig_in"], name=arg_doc, default=default
286 ),
287 "args_no_default": self.get_arguments(function["sig_in"], name=arg_doc),
288 }
289
290 extend_method = getattr(
291 self, "{}_completion_{}".format(side, function["type"])
292 )
293 extend_method(completion, function, default, arg_doc, async_)
294
295 for part, fmt in FORMATS.items():
296 if (part.startswith(function["type"])
297 or part.startswith(f"async_{function['type']}")):
298 parts[part.upper()].append(fmt.format(**completion))
299
300 # at this point, signals_part, methods_part and direct_calls should be filled,
301 # we just have to place them in the right part of the template
302 bridge = []
303 const_override = {
304 env[len(C.ENV_OVERRIDE) :]: v
305 for env, v in os.environ.items()
306 if env.startswith(C.ENV_OVERRIDE)
307 }
308 template_path = self.get_template_path(TEMPLATE)
309 try:
310 with open(template_path) as template:
311 for line in template:
312
313 for part, extend_list in parts.items():
314 if line.startswith("##{}_PART##".format(part)):
315 bridge.extend(extend_list)
316 break
317 else:
318 # the line is not a magic part replacement
319 if line.startswith("const_"):
320 const_name = line[len("const_") : line.find(" = ")].strip()
321 if const_name in const_override:
322 print(("const {} overriden".format(const_name)))
323 bridge.append(
324 "const_{} = {}".format(
325 const_name, const_override[const_name]
326 )
327 )
328 continue
329 bridge.append(line.replace("\n", ""))
330 except IOError:
331 print(("can't open template file [{}]".format(template_path)))
332 sys.exit(1)
333
334 # now we write to final file
335 self.final_write(DEST, bridge)
336
337 def final_write(self, filename, file_buf):
338 """Write the final generated file in [dest dir]/filename
339
340 @param filename: name of the file to generate
341 @param file_buf: list of lines (stings) of the file
342 """
343 if os.path.exists(self.args.dest_dir) and not os.path.isdir(self.args.dest_dir):
344 print(
345 "The destination dir [%s] can't be created: a file with this name already exists !"
346 )
347 sys.exit(1)
348 try:
349 if not os.path.exists(self.args.dest_dir):
350 os.mkdir(self.args.dest_dir)
351 full_path = os.path.join(self.args.dest_dir, filename)
352 if os.path.exists(full_path) and not self.args.force:
353 print((
354 "The destination file [%s] already exists ! Use --force to overwrite it"
355 % full_path
356 ))
357 try:
358 with open(full_path, "w") as dest_file:
359 dest_file.write("\n".join(file_buf))
360 except IOError:
361 print(("Can't open destination file [%s]" % full_path))
362 except OSError:
363 print("It's not possible to generate the file, check your permissions")
364 exit(1)