Mercurial > libervia-backend
view libervia/backend/bridge/bridge_constructor/base_constructor.py @ 4306:94e0968987cd
plugin XEP-0033: code modernisation, improve delivery, data validation:
- Code has been rewritten using Pydantic models and `async` coroutines for data validation
and cleaner element parsing/generation.
- Delivery has been completely rewritten. It now works even if server doesn't support
multicast, and send to local multicast service first. Delivering to local multicast
service first is due to bad support of XEP-0033 in server (notably Prosody which has an
incomplete implementation), and the current impossibility to detect if a sub-domain
service handles fully multicast or only for local domains. This is a workaround to have
a good balance between backward compatilibity and use of bandwith, and to make it work
with the incoming email gateway implementation (the gateway will only deliver to
entities of its own domain).
- disco feature checking now uses `async` corountines. `host` implementation still use
Deferred return values for compatibility with legacy code.
rel 450
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 26 Sep 2024 16:12:01 +0200 |
parents | 4cf98f506269 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia: an XMPP client # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. """base constructor class""" from libervia.backend.bridge.bridge_constructor.constants import Const as C from configparser import NoOptionError import sys import os import os.path import re from importlib import import_module class ParseError(Exception): # Used when the signature parsing is going wrong (invalid signature ?) pass class Constructor(object): NAME = None # used in arguments parsing, filename will be used if not set # following attribute are used by default generation method # they can be set to dict of strings using python formatting syntax # dict keys will be used to select part to replace (e.g. "signals" key will # replace ##SIGNALS_PART## in template), while the value is the format # keys starting with "signal" will be used for signals, while ones starting with # "method" will be used for methods # check D-Bus constructor for an example CORE_FORMATS = None CORE_TEMPLATE = None CORE_DEST = None FRONTEND_FORMATS = None FRONTEND_TEMPLATE = None FRONTEND_DEST = None # set to False if your bridge needs only core FRONTEND_ACTIVATE = True def __init__(self, bridge_template, options): self.bridge_template = bridge_template self.args = options @property def constructor_dir(self): constructor_mod = import_module(self.__module__) return os.path.dirname(constructor_mod.__file__) def getValues(self, name): """Return values of a function in a dict @param name: Name of the function to get @return: dict, each key has the config value or None if the value is not set""" function = {} for option in ["type", "category", "sig_in", "sig_out", "doc"]: try: value = self.bridge_template.get(name, option) except NoOptionError: value = None function[option] = value return function def get_default(self, name): """Return default values of a function in a dict @param name: Name of the function to get @return: dict, each key is the integer param number (no key if no default value) """ default_dict = {} def_re = re.compile(r"param_(\d+)_default") for option in self.bridge_template.options(name): match = def_re.match(option) if match: try: idx = int(match.group(1)) except ValueError: raise ParseError( "Invalid value [%s] for parameter number" % match.group(1) ) default_dict[idx] = self.bridge_template.get(name, option) return default_dict def getFlags(self, name): """Return list of flags set for this function @param name: Name of the function to get @return: List of flags (string) """ flags = [] for option in self.bridge_template.options(name): if option in C.DECLARATION_FLAGS: flags.append(option) return flags def get_arguments_doc(self, name): """Return documentation of arguments @param name: Name of the function to get @return: dict, each key is the integer param number (no key if no argument doc), value is a tuple (name, doc) """ doc_dict = {} option_re = re.compile(r"doc_param_(\d+)") value_re = re.compile(r"^(\w+): (.*)$", re.MULTILINE | re.DOTALL) for option in self.bridge_template.options(name): if option == "doc_return": doc_dict["return"] = self.bridge_template.get(name, option) continue match = option_re.match(option) if match: try: idx = int(match.group(1)) except ValueError: raise ParseError( "Invalid value [%s] for parameter number" % match.group(1) ) value_match = value_re.match(self.bridge_template.get(name, option)) if not value_match: raise ParseError("Invalid value for parameter doc [%i]" % idx) doc_dict[idx] = (value_match.group(1), value_match.group(2)) return doc_dict def get_doc(self, name): """Return documentation of the method @param name: Name of the function to get @return: string documentation, or None""" if self.bridge_template.has_option(name, "doc"): return self.bridge_template.get(name, "doc") return None def arguments_parser(self, signature): """Generator which return individual arguments signatures from a global signature""" start = 0 i = 0 while i < len(signature): if signature[i] not in [ "b", "y", "n", "i", "x", "q", "u", "t", "d", "s", "a", ]: raise ParseError("Unmanaged attribute type [%c]" % signature[i]) if signature[i] == "a": i += 1 if ( signature[i] != "{" and signature[i] != "(" ): # FIXME: must manage tuples out of arrays i += 1 yield signature[start:i] start = i continue # we have a simple type for the array opening_car = signature[i] assert opening_car in ["{", "("] closing_car = "}" if opening_car == "{" else ")" opening_count = 1 while True: # we have a dict or a list of tuples i += 1 if i >= len(signature): raise ParseError("missing }") if signature[i] == opening_car: opening_count += 1 if signature[i] == closing_car: opening_count -= 1 if opening_count == 0: break i += 1 yield signature[start:i] start = i def get_arguments(self, signature, name=None, default=None, unicode_protect=False): """Return arguments to user given a signature @param signature: signature in the short form (using s,a,i,b etc) @param name: dictionary of arguments name like given by get_arguments_doc @param default: dictionary of default values, like given by get_default @param unicode_protect: activate unicode protection on strings (return strings as unicode(str)) @return (str): arguments that correspond to a signature (e.g.: "sss" return "arg1, arg2, arg3") """ idx = 0 attr_string = [] for arg in self.arguments_parser(signature): attr_string.append( ( "str(%(name)s)%(default)s" if (unicode_protect and arg == "s") else "%(name)s%(default)s" ) % { "name": name[idx][0] if (name and idx in name) else "arg_%i" % idx, "default": "=" + default[idx] if (default and idx in default) else "", } ) # give arg_1, arg2, etc or name1, name2=default, etc. # give unicode(arg_1), unicode(arg_2), etc. if unicode_protect is set and arg is a string idx += 1 return ", ".join(attr_string) def get_template_path(self, template_file): """return template path corresponding to file name @param template_file(str): name of template file """ return os.path.join(self.constructor_dir, template_file) def core_completion_method(self, completion, function, default, arg_doc, async_): """override this method to extend completion""" pass def core_completion_signal(self, completion, function, default, arg_doc, async_): """override this method to extend completion""" pass def frontend_completion_method(self, completion, function, default, arg_doc, async_): """override this method to extend completion""" pass def frontend_completion_signal(self, completion, function, default, arg_doc, async_): """override this method to extend completion""" pass def generate(self, side): """generate bridge call generate_core_side or generateFrontendSide if they exists else call generic self._generate method """ try: if side == "core": method = self.generate_core_side elif side == "frontend": if not self.FRONTEND_ACTIVATE: print("This constructor only handle core, please use core side") sys.exit(1) method = self.generateFrontendSide except AttributeError: self._generate(side) else: method() def _generate(self, side): """generate the backend this is a generic method which will use formats found in self.CORE_SIGNAL_FORMAT and self.CORE_METHOD_FORMAT (standard format method will be used) @param side(str): core or frontend """ side_vars = [] for var in ("FORMATS", "TEMPLATE", "DEST"): attr = "{}_{}".format(side.upper(), var) value = getattr(self, attr) if value is None: raise NotImplementedError side_vars.append(value) FORMATS, TEMPLATE, DEST = side_vars del side_vars parts = {part.upper(): [] for part in FORMATS} sections = self.bridge_template.sections() sections.sort() for section in sections: function = self.getValues(section) print(("Adding %s %s" % (section, function["type"]))) default = self.get_default(section) arg_doc = self.get_arguments_doc(section) async_ = "async" in self.getFlags(section) completion = { "sig_in": function["sig_in"] or "", "sig_out": function["sig_out"] or "", "category": "plugin" if function["category"] == "plugin" else "core", "name": section, # arguments with default values "args": self.get_arguments( function["sig_in"], name=arg_doc, default=default ), "args_no_default": self.get_arguments(function["sig_in"], name=arg_doc), } extend_method = getattr( self, "{}_completion_{}".format(side, function["type"]) ) extend_method(completion, function, default, arg_doc, async_) for part, fmt in FORMATS.items(): if part.startswith(function["type"]) or part.startswith( f"async_{function['type']}" ): parts[part.upper()].append(fmt.format(**completion)) # at this point, signals_part, methods_part and direct_calls should be filled, # we just have to place them in the right part of the template bridge = [] const_override = { env[len(C.ENV_OVERRIDE) :]: v for env, v in os.environ.items() if env.startswith(C.ENV_OVERRIDE) } template_path = self.get_template_path(TEMPLATE) try: with open(template_path) as template: for line in template: for part, extend_list in parts.items(): if line.lstrip().startswith("##{}_PART##".format(part)): bridge.extend(extend_list) break else: # the line is not a magic part replacement if line.lstrip().startswith("const_"): const_name = line[len("const_") : line.find(" = ")].strip() if const_name in const_override: print(("const {} overriden".format(const_name))) bridge.append( "const_{} = {}".format( const_name, const_override[const_name] ) ) continue bridge.append(line.replace("\n", "")) except IOError: print(("can't open template file [{}]".format(template_path))) sys.exit(1) # now we write to final file self.final_write(DEST, bridge) def final_write(self, filename, file_buf): """Write the final generated file in [dest dir]/filename @param filename: name of the file to generate @param file_buf: list of lines (stings) of the file """ if os.path.exists(self.args.dest_dir) and not os.path.isdir(self.args.dest_dir): print( "The destination dir [%s] can't be created: a file with this name already exists !" ) sys.exit(1) try: if not os.path.exists(self.args.dest_dir): os.mkdir(self.args.dest_dir) full_path = os.path.join(self.args.dest_dir, filename) if os.path.exists(full_path) and not self.args.force: print( ( "The destination file [%s] already exists ! Use --force to overwrite it" % full_path ) ) try: with open(full_path, "w") as dest_file: dest_file.write("\n".join(file_buf)) except IOError: print(("Can't open destination file [%s]" % full_path)) except OSError: print("It's not possible to generate the file, check your permissions") exit(1)