diff libervia/backend/tools/utils.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/tools/utils.py@524856bd7b19
children 10b6ad569157
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/tools/utils.py	Fri Jun 02 11:49:51 2023 +0200
@@ -0,0 +1,362 @@
+#!/usr/bin/env python3
+
+# SaT: an XMPP client
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+""" various useful methods """
+
+from typing import Optional, Union
+import unicodedata
+import os.path
+import datetime
+import subprocess
+import time
+import sys
+import random
+import inspect
+import textwrap
+import functools
+import asyncio
+from twisted.python import procutils, failure
+from twisted.internet import defer
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.log import getLogger
+from libervia.backend.tools import xmpp_datetime
+
+log = getLogger(__name__)
+
+
+NO_REPOS_DATA = "repository data unknown"
+repos_cache_dict = None
+repos_cache = None
+
+
+def clean_ustr(ustr):
+    """Clean unicode string
+
+    remove special characters from unicode string
+    """
+
+    def valid_chars(unicode_source):
+        for char in unicode_source:
+            if unicodedata.category(char) == "Cc" and char != "\n":
+                continue
+            yield char
+
+    return "".join(valid_chars(ustr))
+
+
+def logError(failure_):
+    """Genertic errback which log the error as a warning, and re-raise it"""
+    log.warning(failure_.value)
+    raise failure_
+
+
+def partial(func, *fixed_args, **fixed_kwargs):
+    # FIXME: temporary hack to workaround the fact that inspect.getargspec is not working with functools.partial
+    #        making partial unusable with current D-bus module (in add_method).
+    #        Should not be needed anywore once moved to Python 3
+
+    ori_args = inspect.getargspec(func).args
+    func = functools.partial(func, *fixed_args, **fixed_kwargs)
+    if ori_args[0] == "self":
+        del ori_args[0]
+    ori_args = ori_args[len(fixed_args) :]
+    for kw in fixed_kwargs:
+        ori_args.remove(kw)
+
+    exec(
+        textwrap.dedent(
+            """\
+    def method({args}):
+        return func({kw_args})
+    """
+        ).format(
+            args=", ".join(ori_args), kw_args=", ".join([a + "=" + a for a in ori_args])
+        ),
+        locals(),
+    )
+
+    return method
+
+
+def as_deferred(func, *args, **kwargs):
+    """Call a method and return a Deferred
+
+    the method can be a simple callable, a Deferred or a coroutine.
+    It is similar to defer.maybeDeferred, but also handles coroutines
+    """
+    try:
+        ret = func(*args, **kwargs)
+    except Exception as e:
+        return defer.fail(failure.Failure(e))
+    else:
+        if asyncio.iscoroutine(ret):
+            return defer.ensureDeferred(ret)
+        elif isinstance(ret, defer.Deferred):
+            return ret
+        elif isinstance(ret, failure.Failure):
+            return defer.fail(ret)
+        else:
+            return defer.succeed(ret)
+
+
+def aio(func):
+    """Decorator to return a Deferred from asyncio coroutine
+
+    Functions with this decorator are run in asyncio context
+    """
+    def wrapper(*args, **kwargs):
+        return defer.Deferred.fromFuture(asyncio.ensure_future(func(*args, **kwargs)))
+    return wrapper
+
+
+def as_future(d):
+    return d.asFuture(asyncio.get_event_loop())
+
+
+def ensure_deferred(func):
+    """Decorator to apply ensureDeferred to a function
+
+    to be used when the function is called by third party library (e.g. wokkel)
+    Otherwise, it's better to use ensureDeferred as early as possible.
+    """
+    def wrapper(*args, **kwargs):
+        return defer.ensureDeferred(func(*args, **kwargs))
+    return wrapper
+
+
+def xmpp_date(
+    timestamp: Optional[Union[float, int]] = None,
+    with_time: bool = True
+) -> str:
+    """Return date according to XEP-0082 specification
+
+    to avoid reveling the timezone, we always return UTC dates
+    the string returned by this method is valid with RFC 3339
+    this function redirects to the functions in the :mod:`sat.tools.datetime` module
+    @param timestamp(None, float): posix timestamp. If None current time will be used
+    @param with_time(bool): if True include the time
+    @return(unicode): XEP-0082 formatted date and time
+    """
+    dtime = datetime.datetime.fromtimestamp(
+        time.time() if timestamp is None else timestamp,
+        datetime.timezone.utc
+    )
+
+    return (
+        xmpp_datetime.format_datetime(dtime) if with_time
+        else xmpp_datetime.format_date(dtime.date())
+    )
+
+
+def parse_xmpp_date(
+    xmpp_date_str: str,
+    with_time: bool = True
+) -> float:
+    """Get timestamp from XEP-0082 datetime
+
+    @param xmpp_date_str: XEP-0082 formatted datetime or time
+    @param with_time: if True, ``xmpp_date_str`` must be a datetime, otherwise if must be
+    a time profile.
+    @return: datetime converted to unix time
+    @raise ValueError: the format is invalid
+    """
+    if with_time:
+        dt = xmpp_datetime.parse_datetime(xmpp_date_str)
+    else:
+        d = xmpp_datetime.parse_date(xmpp_date_str)
+        dt = datetime.datetime.combine(d, datetime.datetime.min.time())
+
+    return dt.timestamp()
+
+
+def generate_password(vocabulary=None, size=20):
+    """Generate a password with random characters.
+
+    @param vocabulary(iterable): characters to use to create password
+    @param size(int): number of characters in the password to generate
+    @return (unicode): generated password
+    """
+    random.seed()
+    if vocabulary is None:
+        vocabulary = [
+            chr(i) for i in list(range(0x30, 0x3A)) + list(range(0x41, 0x5B)) + list(range(0x61, 0x7B))
+        ]
+    return "".join([random.choice(vocabulary) for i in range(15)])
+
+
+def get_repository_data(module, as_string=True, is_path=False):
+    """Retrieve info on current mecurial repository
+
+    Data is gotten by using the following methods, in order:
+        - using "hg" executable
+        - looking for a .hg/dirstate in parent directory of module (or in module/.hg if
+            is_path is True), and parse dirstate file to get revision
+        - checking package version, which should have repository data when we are on a dev version
+    @param module(unicode): module to look for (e.g. sat, libervia)
+        module can be a path if is_path is True (see below)
+    @param as_string(bool): if True return a string, else return a dictionary
+    @param is_path(bool): if True "module" is not handled as a module name, but as an
+        absolute path to the parent of a ".hg" directory
+    @return (unicode, dictionary): retrieved info in a nice string,
+        or a dictionary with retrieved data (key is not present if data is not found),
+        key can be:
+            - node: full revision number (40 bits)
+            - branch: branch name
+            - date: ISO 8601 format date
+            - tag: latest tag used in hierarchie
+            - distance: number of commits since the last tag
+    """
+    global repos_cache_dict
+    if as_string:
+        global repos_cache
+        if repos_cache is not None:
+            return repos_cache
+    else:
+        if repos_cache_dict is not None:
+            return repos_cache_dict
+
+    if sys.platform == "android":
+        #  FIXME: workaround to avoid trouble on android, need to be fixed properly
+        repos_cache = "Cagou android build"
+        return repos_cache
+
+    KEYS = ("node", "node_short", "branch", "date", "tag", "distance")
+    ori_cwd = os.getcwd()
+
+    if is_path:
+        repos_root = os.path.abspath(module)
+    else:
+        repos_root = os.path.abspath(os.path.dirname(module.__file__))
+
+    try:
+        hg_path = procutils.which("hg")[0]
+    except IndexError:
+        log.warning("Can't find hg executable")
+        hg_path = None
+        hg_data = {}
+
+    if hg_path is not None:
+        os.chdir(repos_root)
+        try:
+            hg_data_raw = subprocess.check_output(
+                [
+                    "python3",
+                    hg_path,
+                    "log",
+                    "-r",
+                    "-1",
+                    "--template",
+                    "{node}\n"
+                    "{node|short}\n"
+                    "{branch}\n"
+                    "{date|isodate}\n"
+                    "{latesttag}\n"
+                    "{latesttagdistance}",
+                ],
+                text=True
+            )
+        except subprocess.CalledProcessError as e:
+            log.error(f"Can't get repository data: {e}")
+            hg_data = {}
+        except Exception as e:
+            log.error(f"Unexpected error, can't get repository data : [{type(e)}] {e}")
+            hg_data = {}
+        else:
+            hg_data = dict(list(zip(KEYS, hg_data_raw.split("\n"))))
+            try:
+                hg_data["modified"] = "+" in subprocess.check_output(["python3", hg_path, "id", "-i"], text=True)
+            except subprocess.CalledProcessError:
+                pass
+    else:
+        hg_data = {}
+
+    if not hg_data:
+        # .hg/dirstate method
+        log.debug("trying dirstate method")
+        if is_path:
+            os.chdir(repos_root)
+        else:
+            os.chdir(os.path.abspath(os.path.dirname(repos_root)))
+        try:
+            with open(".hg/dirstate", 'rb') as hg_dirstate:
+                hg_data["node"] = hg_dirstate.read(20).hex()
+                hg_data["node_short"] = hg_data["node"][:12]
+        except IOError:
+            log.debug("Can't access repository data")
+
+    # we restore original working dir
+    os.chdir(ori_cwd)
+
+    if not hg_data:
+        log.debug("Mercurial not available or working, trying package version")
+        try:
+            import pkg_resources
+        except ImportError:
+            log.warning("pkg_resources not available, can't get package data")
+        else:
+            try:
+                pkg_version = pkg_resources.get_distribution(C.APP_NAME_FILE).version
+                version, local_id = pkg_version.split("+", 1)
+            except pkg_resources.DistributionNotFound:
+                log.warning("can't retrieve package data")
+            except ValueError:
+                log.info(
+                    "no local version id in package: {pkg_version}".format(
+                        pkg_version=pkg_version
+                    )
+                )
+            else:
+                version = version.replace(".dev0", "D")
+                if version != C.APP_VERSION:
+                    log.warning(
+                        "Incompatible version ({version}) and pkg_version ({pkg_version})"
+                        .format(
+                            version=C.APP_VERSION, pkg_version=pkg_version
+                        )
+                    )
+                else:
+                    try:
+                        hg_node, hg_distance = local_id.split(".")
+                    except ValueError:
+                        log.warning("Version doesn't specify repository data")
+                    hg_data = {"node_short": hg_node, "distance": hg_distance}
+
+    repos_cache_dict = hg_data
+
+    if as_string:
+        if not hg_data:
+            repos_cache = NO_REPOS_DATA
+        else:
+            strings = ["rev", hg_data["node_short"]]
+            try:
+                if hg_data["modified"]:
+                    strings.append("[M]")
+            except KeyError:
+                pass
+            try:
+                strings.extend(["({branch} {date})".format(**hg_data)])
+            except KeyError:
+                pass
+            try:
+                strings.extend(["+{distance}".format(**hg_data)])
+            except KeyError:
+                pass
+            repos_cache = " ".join(strings)
+        return repos_cache
+    else:
+        return hg_data