view sat/tools/utils.py @ 3725:9b45f0f168cf

tools (common): new `async_utils` module with an async version of `lru_cache`
author Goffi <goffi@goffi.org>
date Tue, 25 Jan 2022 17:25:10 +0100
parents aa58451b77ba
children 0a87cae85746
line wrap: on
line source

#!/usr/bin/env python3

# SaT: an XMPP client
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

""" various useful methods """

import unicodedata
import os.path
import datetime
import subprocess
import time
import sys
import random
import inspect
import textwrap
import functools
import asyncio
from twisted.python import procutils, failure
from twisted.internet import defer
from sat.core.constants import Const as C
from sat.core.log import getLogger

log = getLogger(__name__)


NO_REPOS_DATA = "repository data unknown"
repos_cache_dict = None
repos_cache = None


def clean_ustr(ustr):
    """Clean unicode string

    remove special characters from unicode string
    """

    def valid_chars(unicode_source):
        for char in unicode_source:
            if unicodedata.category(char) == "Cc" and char != "\n":
                continue
            yield char

    return "".join(valid_chars(ustr))


def logError(failure_):
    """Genertic errback which log the error as a warning, and re-raise it"""
    log.warning(failure_.value)
    raise failure_


def partial(func, *fixed_args, **fixed_kwargs):
    # FIXME: temporary hack to workaround the fact that inspect.getargspec is not working with functools.partial
    #        making partial unusable with current D-bus module (in addMethod).
    #        Should not be needed anywore once moved to Python 3

    ori_args = inspect.getargspec(func).args
    func = functools.partial(func, *fixed_args, **fixed_kwargs)
    if ori_args[0] == "self":
        del ori_args[0]
    ori_args = ori_args[len(fixed_args) :]
    for kw in fixed_kwargs:
        ori_args.remove(kw)

    exec(
        textwrap.dedent(
            """\
    def method({args}):
        return func({kw_args})
    """
        ).format(
            args=", ".join(ori_args), kw_args=", ".join([a + "=" + a for a in ori_args])
        ),
        locals(),
    )

    return method


def asDeferred(func, *args, **kwargs):
    """Call a method and return a Deferred

    the method can be a simple callable, a Deferred or a coroutine.
    It is similar to defer.maybeDeferred, but also handles coroutines
    """
    try:
        ret = func(*args, **kwargs)
    except Exception as e:
        return defer.fail(failure.Failure(e))
    else:
        if asyncio.iscoroutine(ret):
            return defer.ensureDeferred(ret)
        elif isinstance(ret, defer.Deferred):
            return ret
        elif isinstance(ret, failure.Failure):
            return defer.fail(ret)
        else:
            return defer.succeed(ret)


def aio(func):
    """Decorator to return a Deferred from asyncio coroutine

    Functions with this decorator are run in asyncio context
    """
    def wrapper(*args, **kwargs):
        return defer.Deferred.fromFuture(asyncio.ensure_future(func(*args, **kwargs)))
    return wrapper


def as_future(d):
    return d.asFuture(asyncio.get_event_loop())


def ensure_deferred(func):
    """Decorator to apply ensureDeferred to a function

    to be used when the function is called by third party library (e.g. wokkel)
    Otherwise, it's better to use ensureDeferred as early as possible.
    """
    def wrapper(*args, **kwargs):
        return defer.ensureDeferred(func(*args, **kwargs))
    return wrapper


def xmpp_date(timestamp=None, with_time=True):
    """Return date according to XEP-0082 specification

    to avoid reveling the timezone, we always return UTC dates
    the string returned by this method is valid with RFC 3339
    @param timestamp(None, float): posix timestamp. If None current time will be used
    @param with_time(bool): if True include the time
    @return(unicode): XEP-0082 formatted date and time
    """
    template_date = "%Y-%m-%d"
    template_time = "%H:%M:%SZ"
    template = (
        "{}T{}".format(template_date, template_time) if with_time else template_date
    )
    return datetime.datetime.utcfromtimestamp(
        time.time() if timestamp is None else timestamp
    ).strftime(template)


def generatePassword(vocabulary=None, size=20):
    """Generate a password with random characters.

    @param vocabulary(iterable): characters to use to create password
    @param size(int): number of characters in the password to generate
    @return (unicode): generated password
    """
    random.seed()
    if vocabulary is None:
        vocabulary = [
            chr(i) for i in list(range(0x30, 0x3A)) + list(range(0x41, 0x5B)) + list(range(0x61, 0x7B))
        ]
    return "".join([random.choice(vocabulary) for i in range(15)])


def getRepositoryData(module, as_string=True, is_path=False):
    """Retrieve info on current mecurial repository

    Data is gotten by using the following methods, in order:
        - using "hg" executable
        - looking for a .hg/dirstate in parent directory of module (or in module/.hg if
            is_path is True), and parse dirstate file to get revision
        - checking package version, which should have repository data when we are on a dev version
    @param module(unicode): module to look for (e.g. sat, libervia)
        module can be a path if is_path is True (see below)
    @param as_string(bool): if True return a string, else return a dictionary
    @param is_path(bool): if True "module" is not handled as a module name, but as an
        absolute path to the parent of a ".hg" directory
    @return (unicode, dictionary): retrieved info in a nice string,
        or a dictionary with retrieved data (key is not present if data is not found),
        key can be:
            - node: full revision number (40 bits)
            - branch: branch name
            - date: ISO 8601 format date
            - tag: latest tag used in hierarchie
            - distance: number of commits since the last tag
    """
    global repos_cache_dict
    if as_string:
        global repos_cache
        if repos_cache is not None:
            return repos_cache
    else:
        if repos_cache_dict is not None:
            return repos_cache_dict

    if sys.platform == "android":
        #  FIXME: workaround to avoid trouble on android, need to be fixed properly
        repos_cache = "Cagou android build"
        return repos_cache

    KEYS = ("node", "node_short", "branch", "date", "tag", "distance")
    ori_cwd = os.getcwd()

    if is_path:
        repos_root = os.path.abspath(module)
    else:
        repos_root = os.path.abspath(os.path.dirname(module.__file__))

    try:
        hg_path = procutils.which("hg")[0]
    except IndexError:
        log.warning("Can't find hg executable")
        hg_path = None
        hg_data = {}

    if hg_path is not None:
        os.chdir(repos_root)
        try:
            hg_data_raw = subprocess.check_output(
                [
                    "hg",
                    "log",
                    "-r",
                    "-1",
                    "--template",
                    "{node}\n"
                    "{node|short}\n"
                    "{branch}\n"
                    "{date|isodate}\n"
                    "{latesttag}\n"
                    "{latesttagdistance}",
                ],
                text=True
            )
        except subprocess.CalledProcessError:
            hg_data = {}
        else:
            hg_data = dict(list(zip(KEYS, hg_data_raw.split("\n"))))
            try:
                hg_data["modified"] = "+" in subprocess.check_output(["hg", "id", "-i"], text=True)
            except subprocess.CalledProcessError:
                pass
    else:
        hg_data = {}

    if not hg_data:
        # .hg/dirstate method
        log.debug("trying dirstate method")
        if is_path:
            os.chdir(repos_root)
        else:
            os.chdir(os.path.abspath(os.path.dirname(repos_root)))
        try:
            with open(".hg/dirstate", 'rb') as hg_dirstate:
                hg_data["node"] = hg_dirstate.read(20).hex()
                hg_data["node_short"] = hg_data["node"][:12]
        except IOError:
            log.debug("Can't access repository data")

    # we restore original working dir
    os.chdir(ori_cwd)

    if not hg_data:
        log.debug("Mercurial not available or working, trying package version")
        try:
            import pkg_resources
        except ImportError:
            log.warning("pkg_resources not available, can't get package data")
        else:
            try:
                pkg_version = pkg_resources.get_distribution(C.APP_NAME_FILE).version
                version, local_id = pkg_version.split("+", 1)
            except pkg_resources.DistributionNotFound:
                log.warning("can't retrieve package data")
            except ValueError:
                log.info(
                    "no local version id in package: {pkg_version}".format(
                        pkg_version=pkg_version
                    )
                )
            else:
                version = version.replace(".dev0", "D")
                if version != C.APP_VERSION:
                    log.warning(
                        "Incompatible version ({version}) and pkg_version ({pkg_version})"
                        .format(
                            version=C.APP_VERSION, pkg_version=pkg_version
                        )
                    )
                else:
                    try:
                        hg_node, hg_distance = local_id.split(".")
                    except ValueError:
                        log.warning("Version doesn't specify repository data")
                    hg_data = {"node_short": hg_node, "distance": hg_distance}

    repos_cache_dict = hg_data

    if as_string:
        if not hg_data:
            repos_cache = NO_REPOS_DATA
        else:
            strings = ["rev", hg_data["node_short"]]
            try:
                if hg_data["modified"]:
                    strings.append("[M]")
            except KeyError:
                pass
            try:
                strings.extend(["({branch} {date})".format(**hg_data)])
            except KeyError:
                pass
            try:
                strings.extend(["+{distance}".format(**hg_data)])
            except KeyError:
                pass
            repos_cache = " ".join(strings)
        return repos_cache
    else:
        return hg_data