view sat/bridge/bridge_constructor/ @ 3244:b10d207f95f9

core (xmpp): properly clean profile data in case of startConnection failure: - profile is now always removed from profiles_connecting - an exception is raised if client already exists only if client is connected. Otherwise, it's deleted (this allow to keep session open to e.g. modify settings in case of failure during connection)
author Goffi <>
date Wed, 01 Apr 2020 16:17:09 +0200
parents 559a625a236b
children be6d91572633
line wrap: on
line source

#!/usr/bin/env python3

# SàT: a XMPP client
# Copyright (C) 2009-2020 Jérôme Poisson (

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <>.

"""base constructor class"""

from sat.bridge.bridge_constructor.constants import Const as C
from configparser import NoOptionError
import sys
import os
import os.path
import re
from importlib import import_module

class ParseError(Exception):
    # Used when the signature parsing is going wrong (invalid signature ?)

class Constructor(object):
    NAME = None  # used in arguments parsing, filename will be used if not set
    # following attribute are used by default generation method
    # they can be set to dict of strings using python formatting syntax
    # dict keys will be used to select part to replace (e.g. "signals" key will
    # replace ##SIGNALS_PART## in template), while the value is the format
    # keys starting with "signal" will be used for signals, while ones starting with
    # "method" will be used for methods
    #  check D-Bus constructor for an example
    CORE_DEST = None

    # set to False if your bridge need only core

    def __init__(self, bridge_template, options):
        self.bridge_template = bridge_template
        self.args = options

    def constructor_dir(self):
        constructor_mod = import_module(self.__module__)
        return os.path.dirname(constructor_mod.__file__)

    def getValues(self, name):
        """Return values of a function in a dict
        @param name: Name of the function to get
        @return: dict, each key has the config value or None if the value is not set"""
        function = {}
        for option in ["type", "category", "sig_in", "sig_out", "doc"]:
                value = self.bridge_template.get(name, option)
            except NoOptionError:
                value = None
            function[option] = value
        return function

    def getDefault(self, name):
        """Return default values of a function in a dict
        @param name: Name of the function to get
        @return: dict, each key is the integer param number (no key if no default value)"""
        default_dict = {}
        def_re = re.compile(r"param_(\d+)_default")

        for option in self.bridge_template.options(name):
            match = def_re.match(option)
            if match:
                    idx = int(
                except ValueError:
                    raise ParseError(
                        "Invalid value [%s] for parameter number" %
                default_dict[idx] = self.bridge_template.get(name, option)

        return default_dict

    def getFlags(self, name):
        """Return list of flags set for this function

        @param name: Name of the function to get
        @return: List of flags (string)
        flags = []
        for option in self.bridge_template.options(name):
            if option in C.DECLARATION_FLAGS:
        return flags

    def getArgumentsDoc(self, name):
        """Return documentation of arguments
        @param name: Name of the function to get
        @return: dict, each key is the integer param number (no key if no argument doc), value is a tuple (name, doc)"""
        doc_dict = {}
        option_re = re.compile(r"doc_param_(\d+)")
        value_re = re.compile(r"^(\w+): (.*)$", re.MULTILINE | re.DOTALL)
        for option in self.bridge_template.options(name):
            if option == "doc_return":
                doc_dict["return"] = self.bridge_template.get(name, option)
            match = option_re.match(option)
            if match:
                    idx = int(
                except ValueError:
                    raise ParseError(
                        "Invalid value [%s] for parameter number" %
                value_match = value_re.match(self.bridge_template.get(name, option))
                if not value_match:
                    raise ParseError("Invalid value for parameter doc [%i]" % idx)
                doc_dict[idx] = (,
        return doc_dict

    def getDoc(self, name):
        """Return documentation of the method
        @param name: Name of the function to get
        @return: string documentation, or None"""
        if self.bridge_template.has_option(name, "doc"):
            return self.bridge_template.get(name, "doc")
        return None

    def argumentsParser(self, signature):
        """Generator which return individual arguments signatures from a global signature"""
        start = 0
        i = 0

        while i < len(signature):
            if signature[i] not in ["b", "y", "n", "i", "x", "q", "u", "t", "d", "s",
                raise ParseError("Unmanaged attribute type [%c]" % signature[i])

            if signature[i] == "a":
                i += 1
                if (
                    signature[i] != "{" and signature[i] != "("
                ):  # FIXME: must manage tuples out of arrays
                    i += 1
                    yield signature[start:i]
                    start = i
                    continue  # we have a simple type for the array
                opening_car = signature[i]
                assert opening_car in ["{", "("]
                closing_car = "}" if opening_car == "{" else ")"
                opening_count = 1
                while True:  # we have a dict or a list of tuples
                    i += 1
                    if i >= len(signature):
                        raise ParseError("missing }")
                    if signature[i] == opening_car:
                        opening_count += 1
                    if signature[i] == closing_car:
                        opening_count -= 1
                        if opening_count == 0:
            i += 1
            yield signature[start:i]
            start = i

    def getArguments(self, signature, name=None, default=None, unicode_protect=False):
        """Return arguments to user given a signature

        @param signature: signature in the short form (using s,a,i,b etc)
        @param name: dictionary of arguments name like given by getArgumentsDoc
        @param default: dictionary of default values, like given by getDefault
        @param unicode_protect: activate unicode protection on strings (return strings as unicode(str))
        @return (str): arguments that correspond to a signature (e.g.: "sss" return "arg1, arg2, arg3")
        idx = 0
        attr_string = []

        for arg in self.argumentsParser(signature):
                    if (unicode_protect and arg == "s")
                    else "%(name)s%(default)s"
                % {
                    "name": name[idx][0] if (name and idx in name) else "arg_%i" % idx,
                    "default": "=" + default[idx] if (default and idx in default) else "",
            # give arg_1, arg2, etc or name1, name2=default, etc.
            # give unicode(arg_1), unicode(arg_2), etc. if unicode_protect is set and arg is a string
            idx += 1

        return ", ".join(attr_string)

    def getTemplatePath(self, template_file):
        """return template path corresponding to file name

        @param template_file(str): name of template file
        return os.path.join(self.constructor_dir, template_file)

    def core_completion_method(self, completion, function, default, arg_doc, async_):
        """override this method to extend completion"""

    def core_completion_signal(self, completion, function, default, arg_doc, async_):
        """override this method to extend completion"""

    def frontend_completion_method(self, completion, function, default, arg_doc, async_):
        """override this method to extend completion"""

    def frontend_completion_signal(self, completion, function, default, arg_doc, async_):
        """override this method to extend completion"""

    def generate(self, side):
        """generate bridge

        call generateCoreSide or generateFrontendSide if they exists
        else call generic self._generate method
            if side == "core":
                method = self.generateCoreSide
            elif side == "frontend":
                if not self.FRONTEND_ACTIVATE:
                    print("This constructor only handle core, please use core side")
                method = self.generateFrontendSide
        except AttributeError:

    def _generate(self, side):
        """generate the backend

        this is a generic method which will use formats found in self.CORE_SIGNAL_FORMAT
        and self.CORE_METHOD_FORMAT (standard format method will be used)
        @param side(str): core or frontend
        side_vars = []
        for var in ("FORMATS", "TEMPLATE", "DEST"):
            attr = "{}_{}".format(side.upper(), var)
            value = getattr(self, attr)
            if value is None:
                raise NotImplementedError

        FORMATS, TEMPLATE, DEST = side_vars
        del side_vars

        parts = {part.upper(): [] for part in FORMATS}
        sections = self.bridge_template.sections()
        for section in sections:
            function = self.getValues(section)
            print(("Adding %s %s" % (section, function["type"])))
            default = self.getDefault(section)
            arg_doc = self.getArgumentsDoc(section)
            async_ = "async" in self.getFlags(section)
            completion = {
                "sig_in": function["sig_in"] or "",
                "sig_out": function["sig_out"] or "",
                "category": "plugin" if function["category"] == "plugin" else "core",
                "name": section,
                # arguments with default values
                "args": self.getArguments(
                    function["sig_in"], name=arg_doc, default=default

            extend_method = getattr(
                self, "{}_completion_{}".format(side, function["type"])
            extend_method(completion, function, default, arg_doc, async_)

            for part, fmt in FORMATS.items():
                if (part.startswith(function["type"])
                    or part.startswith(f"async_{function['type']}")):

        # at this point, signals_part, methods_part and direct_calls should be filled,
        # we just have to place them in the right part of the template
        bridge = []
        const_override = {
            env[len(C.ENV_OVERRIDE) :]: v
            for env, v in os.environ.items()
            if env.startswith(C.ENV_OVERRIDE)
        template_path = self.getTemplatePath(TEMPLATE)
            with open(template_path) as template:
                for line in template:

                    for part, extend_list in parts.items():
                        if line.startswith("##{}_PART##".format(part)):
                        # the line is not a magic part replacement
                        if line.startswith("const_"):
                            const_name = line[len("const_") : line.find(" = ")].strip()
                            if const_name in const_override:
                                print(("const {} overriden".format(const_name)))
                                    "const_{} = {}".format(
                                        const_name, const_override[const_name]
                        bridge.append(line.replace("\n", ""))
        except IOError:
            print(("can't open template file [{}]".format(template_path)))

        # now we write to final file
        self.finalWrite(DEST, bridge)

    def finalWrite(self, filename, file_buf):
        """Write the final generated file in [dest dir]/filename

        @param filename: name of the file to generate
        @param file_buf: list of lines (stings) of the file
        if os.path.exists(self.args.dest_dir) and not os.path.isdir(self.args.dest_dir):
                "The destination dir [%s] can't be created: a file with this name already exists !"
            if not os.path.exists(self.args.dest_dir):
            full_path = os.path.join(self.args.dest_dir, filename)
            if os.path.exists(full_path) and not self.args.force:
                    "The destination file [%s] already exists ! Use --force to overwrite it"
                    % full_path
                with open(full_path, "w") as dest_file:
            except IOError:
                print(("Can't open destination file [%s]" % full_path))
        except OSError:
            print("It's not possible to generate the file, check your permissions")