view sat/tools/utils.py @ 3983:31c3d6652115

component AP gateway: ignore actor delection notifications: When a `Delete` activity was received and the object was the emitting actor itself, the signature checking was failing if the actor was unknown (due to the impossibility to retrieve the actor public key, as it is no more accessible). To avoid that, those notifications are ignored for now. In the future they should clean the cache linked to this actor.
author Goffi <goffi@goffi.org>
date Tue, 15 Nov 2022 18:15:16 +0100
parents 8179cff7ef5c
children 5a42c7842556
line wrap: on
line source

#!/usr/bin/env python3

# SaT: an XMPP client
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

""" various useful methods """

from typing import Optional, Union
import unicodedata
import os.path
import datetime
import subprocess
import time
import sys
import random
import inspect
import textwrap
import functools
import asyncio
from twisted.python import procutils, failure
from twisted.internet import defer
from sat.core.constants import Const as C
from sat.core.log import getLogger
from sat.tools import xmpp_datetime

log = getLogger(__name__)


NO_REPOS_DATA = "repository data unknown"
repos_cache_dict = None
repos_cache = None


def clean_ustr(ustr):
    """Clean unicode string

    remove special characters from unicode string
    """

    def valid_chars(unicode_source):
        for char in unicode_source:
            if unicodedata.category(char) == "Cc" and char != "\n":
                continue
            yield char

    return "".join(valid_chars(ustr))


def logError(failure_):
    """Genertic errback which log the error as a warning, and re-raise it"""
    log.warning(failure_.value)
    raise failure_


def partial(func, *fixed_args, **fixed_kwargs):
    # FIXME: temporary hack to workaround the fact that inspect.getargspec is not working with functools.partial
    #        making partial unusable with current D-bus module (in addMethod).
    #        Should not be needed anywore once moved to Python 3

    ori_args = inspect.getargspec(func).args
    func = functools.partial(func, *fixed_args, **fixed_kwargs)
    if ori_args[0] == "self":
        del ori_args[0]
    ori_args = ori_args[len(fixed_args) :]
    for kw in fixed_kwargs:
        ori_args.remove(kw)

    exec(
        textwrap.dedent(
            """\
    def method({args}):
        return func({kw_args})
    """
        ).format(
            args=", ".join(ori_args), kw_args=", ".join([a + "=" + a for a in ori_args])
        ),
        locals(),
    )

    return method


def asDeferred(func, *args, **kwargs):
    """Call a method and return a Deferred

    the method can be a simple callable, a Deferred or a coroutine.
    It is similar to defer.maybeDeferred, but also handles coroutines
    """
    try:
        ret = func(*args, **kwargs)
    except Exception as e:
        return defer.fail(failure.Failure(e))
    else:
        if asyncio.iscoroutine(ret):
            return defer.ensureDeferred(ret)
        elif isinstance(ret, defer.Deferred):
            return ret
        elif isinstance(ret, failure.Failure):
            return defer.fail(ret)
        else:
            return defer.succeed(ret)


def aio(func):
    """Decorator to return a Deferred from asyncio coroutine

    Functions with this decorator are run in asyncio context
    """
    def wrapper(*args, **kwargs):
        return defer.Deferred.fromFuture(asyncio.ensure_future(func(*args, **kwargs)))
    return wrapper


def as_future(d):
    return d.asFuture(asyncio.get_event_loop())


def ensure_deferred(func):
    """Decorator to apply ensureDeferred to a function

    to be used when the function is called by third party library (e.g. wokkel)
    Otherwise, it's better to use ensureDeferred as early as possible.
    """
    def wrapper(*args, **kwargs):
        return defer.ensureDeferred(func(*args, **kwargs))
    return wrapper


def xmpp_date(
    timestamp: Optional[Union[float, int]] = None,
    with_time: bool = True
) -> str:
    """Return date according to XEP-0082 specification

    to avoid reveling the timezone, we always return UTC dates
    the string returned by this method is valid with RFC 3339
    this function redirects to the functions in the :mod:`sat.tools.datetime` module
    @param timestamp(None, float): posix timestamp. If None current time will be used
    @param with_time(bool): if True include the time
    @return(unicode): XEP-0082 formatted date and time
    """
    dtime = datetime.datetime.fromtimestamp(
        time.time() if timestamp is None else timestamp,
        datetime.timezone.utc
    )

    return (
        xmpp_datetime.format_datetime(dtime) if with_time
        else xmpp_datetime.format_date(dtime.date())
    )


def parse_xmpp_date(
    xmpp_date_str: str,
    with_time: bool = True
) -> float:
    """Get timestamp from XEP-0082 datetime

    @param xmpp_date_str: XEP-0082 formatted datetime or time
    @param with_time: if True, ``xmpp_date_str`` must be a datetime, otherwise if must be
    a time profile.
    @return: datetime converted to unix time
    """
    if with_time:
        dt = xmpp_datetime.parse_datetime(xmpp_date_str)
    else:
        d = xmpp_datetime.parse_date(xmpp_date_str)
        dt = datetime.datetime.combine(d, datetime.datetime.min.time())

    return dt.timestamp()


def generatePassword(vocabulary=None, size=20):
    """Generate a password with random characters.

    @param vocabulary(iterable): characters to use to create password
    @param size(int): number of characters in the password to generate
    @return (unicode): generated password
    """
    random.seed()
    if vocabulary is None:
        vocabulary = [
            chr(i) for i in list(range(0x30, 0x3A)) + list(range(0x41, 0x5B)) + list(range(0x61, 0x7B))
        ]
    return "".join([random.choice(vocabulary) for i in range(15)])


def getRepositoryData(module, as_string=True, is_path=False):
    """Retrieve info on current mecurial repository

    Data is gotten by using the following methods, in order:
        - using "hg" executable
        - looking for a .hg/dirstate in parent directory of module (or in module/.hg if
            is_path is True), and parse dirstate file to get revision
        - checking package version, which should have repository data when we are on a dev version
    @param module(unicode): module to look for (e.g. sat, libervia)
        module can be a path if is_path is True (see below)
    @param as_string(bool): if True return a string, else return a dictionary
    @param is_path(bool): if True "module" is not handled as a module name, but as an
        absolute path to the parent of a ".hg" directory
    @return (unicode, dictionary): retrieved info in a nice string,
        or a dictionary with retrieved data (key is not present if data is not found),
        key can be:
            - node: full revision number (40 bits)
            - branch: branch name
            - date: ISO 8601 format date
            - tag: latest tag used in hierarchie
            - distance: number of commits since the last tag
    """
    global repos_cache_dict
    if as_string:
        global repos_cache
        if repos_cache is not None:
            return repos_cache
    else:
        if repos_cache_dict is not None:
            return repos_cache_dict

    if sys.platform == "android":
        #  FIXME: workaround to avoid trouble on android, need to be fixed properly
        repos_cache = "Cagou android build"
        return repos_cache

    KEYS = ("node", "node_short", "branch", "date", "tag", "distance")
    ori_cwd = os.getcwd()

    if is_path:
        repos_root = os.path.abspath(module)
    else:
        repos_root = os.path.abspath(os.path.dirname(module.__file__))

    try:
        hg_path = procutils.which("hg")[0]
    except IndexError:
        log.warning("Can't find hg executable")
        hg_path = None
        hg_data = {}

    if hg_path is not None:
        os.chdir(repos_root)
        try:
            hg_data_raw = subprocess.check_output(
                [
                    "python3",
                    hg_path,
                    "log",
                    "-r",
                    "-1",
                    "--template",
                    "{node}\n"
                    "{node|short}\n"
                    "{branch}\n"
                    "{date|isodate}\n"
                    "{latesttag}\n"
                    "{latesttagdistance}",
                ],
                text=True
            )
        except subprocess.CalledProcessError as e:
            log.error(f"Can't get repository data: {e}")
            hg_data = {}
        except Exception as e:
            log.error(f"Unexpected error, can't get repository data : [{type(e)}] {e}")
            hg_data = {}
        else:
            hg_data = dict(list(zip(KEYS, hg_data_raw.split("\n"))))
            try:
                hg_data["modified"] = "+" in subprocess.check_output(["python3", hg_path, "id", "-i"], text=True)
            except subprocess.CalledProcessError:
                pass
    else:
        hg_data = {}

    if not hg_data:
        # .hg/dirstate method
        log.debug("trying dirstate method")
        if is_path:
            os.chdir(repos_root)
        else:
            os.chdir(os.path.abspath(os.path.dirname(repos_root)))
        try:
            with open(".hg/dirstate", 'rb') as hg_dirstate:
                hg_data["node"] = hg_dirstate.read(20).hex()
                hg_data["node_short"] = hg_data["node"][:12]
        except IOError:
            log.debug("Can't access repository data")

    # we restore original working dir
    os.chdir(ori_cwd)

    if not hg_data:
        log.debug("Mercurial not available or working, trying package version")
        try:
            import pkg_resources
        except ImportError:
            log.warning("pkg_resources not available, can't get package data")
        else:
            try:
                pkg_version = pkg_resources.get_distribution(C.APP_NAME_FILE).version
                version, local_id = pkg_version.split("+", 1)
            except pkg_resources.DistributionNotFound:
                log.warning("can't retrieve package data")
            except ValueError:
                log.info(
                    "no local version id in package: {pkg_version}".format(
                        pkg_version=pkg_version
                    )
                )
            else:
                version = version.replace(".dev0", "D")
                if version != C.APP_VERSION:
                    log.warning(
                        "Incompatible version ({version}) and pkg_version ({pkg_version})"
                        .format(
                            version=C.APP_VERSION, pkg_version=pkg_version
                        )
                    )
                else:
                    try:
                        hg_node, hg_distance = local_id.split(".")
                    except ValueError:
                        log.warning("Version doesn't specify repository data")
                    hg_data = {"node_short": hg_node, "distance": hg_distance}

    repos_cache_dict = hg_data

    if as_string:
        if not hg_data:
            repos_cache = NO_REPOS_DATA
        else:
            strings = ["rev", hg_data["node_short"]]
            try:
                if hg_data["modified"]:
                    strings.append("[M]")
            except KeyError:
                pass
            try:
                strings.extend(["({branch} {date})".format(**hg_data)])
            except KeyError:
                pass
            try:
                strings.extend(["+{distance}".format(**hg_data)])
            except KeyError:
                pass
            repos_cache = " ".join(strings)
        return repos_cache
    else:
        return hg_data