view sat/bridge/bridge_constructor/base_constructor.py @ 4001:32d714a8ea51

plugin XEP-0045: dot not wait for MAM retrieval to be completed: in `_join_MAM`, `room.fully_joined` is called before retrieving the MAM archive, as the process can be very long, and is not necessary to have the room working (message can be received after being in the room, and added out of order). This avoid blocking the `join` workflow for an extended time. Some renaming and coroutine integrations.
author Goffi <goffi@goffi.org>
date Fri, 10 Mar 2023 17:22:41 +0100
parents c605a0d6506f
children 524856bd7b19
line wrap: on
line source

#!/usr/bin/env python3


# Libervia: an XMPP client
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""base constructor class"""

from sat.bridge.bridge_constructor.constants import Const as C
from configparser import NoOptionError
import sys
import os
import os.path
import re
from importlib import import_module


class ParseError(Exception):
    # Used when the signature parsing is going wrong (invalid signature ?)
    pass


class Constructor(object):
    NAME = None  # used in arguments parsing, filename will be used if not set
    # following attribute are used by default generation method
    # they can be set to dict of strings using python formatting syntax
    # dict keys will be used to select part to replace (e.g. "signals" key will
    # replace ##SIGNALS_PART## in template), while the value is the format
    # keys starting with "signal" will be used for signals, while ones starting with
    # "method" will be used for methods
    #  check D-Bus constructor for an example
    CORE_FORMATS = None
    CORE_TEMPLATE = None
    CORE_DEST = None
    FRONTEND_FORMATS = None
    FRONTEND_TEMPLATE = None
    FRONTEND_DEST = None

    # set to False if your bridge needs only core
    FRONTEND_ACTIVATE = True

    def __init__(self, bridge_template, options):
        self.bridge_template = bridge_template
        self.args = options

    @property
    def constructor_dir(self):
        constructor_mod = import_module(self.__module__)
        return os.path.dirname(constructor_mod.__file__)

    def getValues(self, name):
        """Return values of a function in a dict
        @param name: Name of the function to get
        @return: dict, each key has the config value or None if the value is not set"""
        function = {}
        for option in ["type", "category", "sig_in", "sig_out", "doc"]:
            try:
                value = self.bridge_template.get(name, option)
            except NoOptionError:
                value = None
            function[option] = value
        return function

    def getDefault(self, name):
        """Return default values of a function in a dict
        @param name: Name of the function to get
        @return: dict, each key is the integer param number (no key if no default value)"""
        default_dict = {}
        def_re = re.compile(r"param_(\d+)_default")

        for option in self.bridge_template.options(name):
            match = def_re.match(option)
            if match:
                try:
                    idx = int(match.group(1))
                except ValueError:
                    raise ParseError(
                        "Invalid value [%s] for parameter number" % match.group(1)
                    )
                default_dict[idx] = self.bridge_template.get(name, option)

        return default_dict

    def getFlags(self, name):
        """Return list of flags set for this function

        @param name: Name of the function to get
        @return: List of flags (string)
        """
        flags = []
        for option in self.bridge_template.options(name):
            if option in C.DECLARATION_FLAGS:
                flags.append(option)
        return flags

    def getArgumentsDoc(self, name):
        """Return documentation of arguments
        @param name: Name of the function to get
        @return: dict, each key is the integer param number (no key if no argument doc), value is a tuple (name, doc)"""
        doc_dict = {}
        option_re = re.compile(r"doc_param_(\d+)")
        value_re = re.compile(r"^(\w+): (.*)$", re.MULTILINE | re.DOTALL)
        for option in self.bridge_template.options(name):
            if option == "doc_return":
                doc_dict["return"] = self.bridge_template.get(name, option)
                continue
            match = option_re.match(option)
            if match:
                try:
                    idx = int(match.group(1))
                except ValueError:
                    raise ParseError(
                        "Invalid value [%s] for parameter number" % match.group(1)
                    )
                value_match = value_re.match(self.bridge_template.get(name, option))
                if not value_match:
                    raise ParseError("Invalid value for parameter doc [%i]" % idx)
                doc_dict[idx] = (value_match.group(1), value_match.group(2))
        return doc_dict

    def getDoc(self, name):
        """Return documentation of the method
        @param name: Name of the function to get
        @return: string documentation, or None"""
        if self.bridge_template.has_option(name, "doc"):
            return self.bridge_template.get(name, "doc")
        return None

    def argumentsParser(self, signature):
        """Generator which return individual arguments signatures from a global signature"""
        start = 0
        i = 0

        while i < len(signature):
            if signature[i] not in ["b", "y", "n", "i", "x", "q", "u", "t", "d", "s",
                                    "a"]:
                raise ParseError("Unmanaged attribute type [%c]" % signature[i])

            if signature[i] == "a":
                i += 1
                if (
                    signature[i] != "{" and signature[i] != "("
                ):  # FIXME: must manage tuples out of arrays
                    i += 1
                    yield signature[start:i]
                    start = i
                    continue  # we have a simple type for the array
                opening_car = signature[i]
                assert opening_car in ["{", "("]
                closing_car = "}" if opening_car == "{" else ")"
                opening_count = 1
                while True:  # we have a dict or a list of tuples
                    i += 1
                    if i >= len(signature):
                        raise ParseError("missing }")
                    if signature[i] == opening_car:
                        opening_count += 1
                    if signature[i] == closing_car:
                        opening_count -= 1
                        if opening_count == 0:
                            break
            i += 1
            yield signature[start:i]
            start = i

    def getArguments(self, signature, name=None, default=None, unicode_protect=False):
        """Return arguments to user given a signature

        @param signature: signature in the short form (using s,a,i,b etc)
        @param name: dictionary of arguments name like given by getArgumentsDoc
        @param default: dictionary of default values, like given by getDefault
        @param unicode_protect: activate unicode protection on strings (return strings as unicode(str))
        @return (str): arguments that correspond to a signature (e.g.: "sss" return "arg1, arg2, arg3")
        """
        idx = 0
        attr_string = []

        for arg in self.argumentsParser(signature):
            attr_string.append(
                (
                    "str(%(name)s)%(default)s"
                    if (unicode_protect and arg == "s")
                    else "%(name)s%(default)s"
                )
                % {
                    "name": name[idx][0] if (name and idx in name) else "arg_%i" % idx,
                    "default": "=" + default[idx] if (default and idx in default) else "",
                }
            )
            # give arg_1, arg2, etc or name1, name2=default, etc.
            # give unicode(arg_1), unicode(arg_2), etc. if unicode_protect is set and arg is a string
            idx += 1

        return ", ".join(attr_string)

    def getTemplatePath(self, template_file):
        """return template path corresponding to file name

        @param template_file(str): name of template file
        """
        return os.path.join(self.constructor_dir, template_file)

    def core_completion_method(self, completion, function, default, arg_doc, async_):
        """override this method to extend completion"""
        pass

    def core_completion_signal(self, completion, function, default, arg_doc, async_):
        """override this method to extend completion"""
        pass

    def frontend_completion_method(self, completion, function, default, arg_doc, async_):
        """override this method to extend completion"""
        pass

    def frontend_completion_signal(self, completion, function, default, arg_doc, async_):
        """override this method to extend completion"""
        pass

    def generate(self, side):
        """generate bridge

        call generateCoreSide or generateFrontendSide if they exists
        else call generic self._generate method
        """
        try:
            if side == "core":
                method = self.generateCoreSide
            elif side == "frontend":
                if not self.FRONTEND_ACTIVATE:
                    print("This constructor only handle core, please use core side")
                    sys.exit(1)
                method = self.generateFrontendSide
        except AttributeError:
            self._generate(side)
        else:
            method()

    def _generate(self, side):
        """generate the backend

        this is a generic method which will use formats found in self.CORE_SIGNAL_FORMAT
        and self.CORE_METHOD_FORMAT (standard format method will be used)
        @param side(str): core or frontend
        """
        side_vars = []
        for var in ("FORMATS", "TEMPLATE", "DEST"):
            attr = "{}_{}".format(side.upper(), var)
            value = getattr(self, attr)
            if value is None:
                raise NotImplementedError
            side_vars.append(value)

        FORMATS, TEMPLATE, DEST = side_vars
        del side_vars

        parts = {part.upper(): [] for part in FORMATS}
        sections = self.bridge_template.sections()
        sections.sort()
        for section in sections:
            function = self.getValues(section)
            print(("Adding %s %s" % (section, function["type"])))
            default = self.getDefault(section)
            arg_doc = self.getArgumentsDoc(section)
            async_ = "async" in self.getFlags(section)
            completion = {
                "sig_in": function["sig_in"] or "",
                "sig_out": function["sig_out"] or "",
                "category": "plugin" if function["category"] == "plugin" else "core",
                "name": section,
                # arguments with default values
                "args": self.getArguments(
                    function["sig_in"], name=arg_doc, default=default
                ),
                "args_no_default": self.getArguments(function["sig_in"], name=arg_doc),
            }

            extend_method = getattr(
                self, "{}_completion_{}".format(side, function["type"])
            )
            extend_method(completion, function, default, arg_doc, async_)

            for part, fmt in FORMATS.items():
                if (part.startswith(function["type"])
                    or part.startswith(f"async_{function['type']}")):
                    parts[part.upper()].append(fmt.format(**completion))

        # at this point, signals_part, methods_part and direct_calls should be filled,
        # we just have to place them in the right part of the template
        bridge = []
        const_override = {
            env[len(C.ENV_OVERRIDE) :]: v
            for env, v in os.environ.items()
            if env.startswith(C.ENV_OVERRIDE)
        }
        template_path = self.getTemplatePath(TEMPLATE)
        try:
            with open(template_path) as template:
                for line in template:

                    for part, extend_list in parts.items():
                        if line.startswith("##{}_PART##".format(part)):
                            bridge.extend(extend_list)
                            break
                    else:
                        # the line is not a magic part replacement
                        if line.startswith("const_"):
                            const_name = line[len("const_") : line.find(" = ")].strip()
                            if const_name in const_override:
                                print(("const {} overriden".format(const_name)))
                                bridge.append(
                                    "const_{} = {}".format(
                                        const_name, const_override[const_name]
                                    )
                                )
                                continue
                        bridge.append(line.replace("\n", ""))
        except IOError:
            print(("can't open template file [{}]".format(template_path)))
            sys.exit(1)

        # now we write to final file
        self.finalWrite(DEST, bridge)

    def finalWrite(self, filename, file_buf):
        """Write the final generated file in [dest dir]/filename

        @param filename: name of the file to generate
        @param file_buf: list of lines (stings) of the file
        """
        if os.path.exists(self.args.dest_dir) and not os.path.isdir(self.args.dest_dir):
            print(
                "The destination dir [%s] can't be created: a file with this name already exists !"
            )
            sys.exit(1)
        try:
            if not os.path.exists(self.args.dest_dir):
                os.mkdir(self.args.dest_dir)
            full_path = os.path.join(self.args.dest_dir, filename)
            if os.path.exists(full_path) and not self.args.force:
                print((
                    "The destination file [%s] already exists ! Use --force to overwrite it"
                    % full_path
                ))
            try:
                with open(full_path, "w") as dest_file:
                    dest_file.write("\n".join(file_buf))
            except IOError:
                print(("Can't open destination file [%s]" % full_path))
        except OSError:
            print("It's not possible to generate the file, check your permissions")
            exit(1)