Mercurial > libervia-backend
diff libervia/backend/tools/utils.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/tools/utils.py@524856bd7b19 |
children | 10b6ad569157 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/tools/utils.py Fri Jun 02 11:49:51 2023 +0200 @@ -0,0 +1,362 @@ +#!/usr/bin/env python3 + +# SaT: an XMPP client +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" various useful methods """ + +from typing import Optional, Union +import unicodedata +import os.path +import datetime +import subprocess +import time +import sys +import random +import inspect +import textwrap +import functools +import asyncio +from twisted.python import procutils, failure +from twisted.internet import defer +from libervia.backend.core.constants import Const as C +from libervia.backend.core.log import getLogger +from libervia.backend.tools import xmpp_datetime + +log = getLogger(__name__) + + +NO_REPOS_DATA = "repository data unknown" +repos_cache_dict = None +repos_cache = None + + +def clean_ustr(ustr): + """Clean unicode string + + remove special characters from unicode string + """ + + def valid_chars(unicode_source): + for char in unicode_source: + if unicodedata.category(char) == "Cc" and char != "\n": + continue + yield char + + return "".join(valid_chars(ustr)) + + +def logError(failure_): + """Genertic errback which log the error as a warning, and re-raise it""" + log.warning(failure_.value) + raise failure_ + + +def partial(func, *fixed_args, **fixed_kwargs): + # FIXME: temporary hack to workaround the fact that inspect.getargspec is not working with functools.partial + # making partial unusable with current D-bus module (in add_method). + # Should not be needed anywore once moved to Python 3 + + ori_args = inspect.getargspec(func).args + func = functools.partial(func, *fixed_args, **fixed_kwargs) + if ori_args[0] == "self": + del ori_args[0] + ori_args = ori_args[len(fixed_args) :] + for kw in fixed_kwargs: + ori_args.remove(kw) + + exec( + textwrap.dedent( + """\ + def method({args}): + return func({kw_args}) + """ + ).format( + args=", ".join(ori_args), kw_args=", ".join([a + "=" + a for a in ori_args]) + ), + locals(), + ) + + return method + + +def as_deferred(func, *args, **kwargs): + """Call a method and return a Deferred + + the method can be a simple callable, a Deferred or a coroutine. + It is similar to defer.maybeDeferred, but also handles coroutines + """ + try: + ret = func(*args, **kwargs) + except Exception as e: + return defer.fail(failure.Failure(e)) + else: + if asyncio.iscoroutine(ret): + return defer.ensureDeferred(ret) + elif isinstance(ret, defer.Deferred): + return ret + elif isinstance(ret, failure.Failure): + return defer.fail(ret) + else: + return defer.succeed(ret) + + +def aio(func): + """Decorator to return a Deferred from asyncio coroutine + + Functions with this decorator are run in asyncio context + """ + def wrapper(*args, **kwargs): + return defer.Deferred.fromFuture(asyncio.ensure_future(func(*args, **kwargs))) + return wrapper + + +def as_future(d): + return d.asFuture(asyncio.get_event_loop()) + + +def ensure_deferred(func): + """Decorator to apply ensureDeferred to a function + + to be used when the function is called by third party library (e.g. wokkel) + Otherwise, it's better to use ensureDeferred as early as possible. + """ + def wrapper(*args, **kwargs): + return defer.ensureDeferred(func(*args, **kwargs)) + return wrapper + + +def xmpp_date( + timestamp: Optional[Union[float, int]] = None, + with_time: bool = True +) -> str: + """Return date according to XEP-0082 specification + + to avoid reveling the timezone, we always return UTC dates + the string returned by this method is valid with RFC 3339 + this function redirects to the functions in the :mod:`sat.tools.datetime` module + @param timestamp(None, float): posix timestamp. If None current time will be used + @param with_time(bool): if True include the time + @return(unicode): XEP-0082 formatted date and time + """ + dtime = datetime.datetime.fromtimestamp( + time.time() if timestamp is None else timestamp, + datetime.timezone.utc + ) + + return ( + xmpp_datetime.format_datetime(dtime) if with_time + else xmpp_datetime.format_date(dtime.date()) + ) + + +def parse_xmpp_date( + xmpp_date_str: str, + with_time: bool = True +) -> float: + """Get timestamp from XEP-0082 datetime + + @param xmpp_date_str: XEP-0082 formatted datetime or time + @param with_time: if True, ``xmpp_date_str`` must be a datetime, otherwise if must be + a time profile. + @return: datetime converted to unix time + @raise ValueError: the format is invalid + """ + if with_time: + dt = xmpp_datetime.parse_datetime(xmpp_date_str) + else: + d = xmpp_datetime.parse_date(xmpp_date_str) + dt = datetime.datetime.combine(d, datetime.datetime.min.time()) + + return dt.timestamp() + + +def generate_password(vocabulary=None, size=20): + """Generate a password with random characters. + + @param vocabulary(iterable): characters to use to create password + @param size(int): number of characters in the password to generate + @return (unicode): generated password + """ + random.seed() + if vocabulary is None: + vocabulary = [ + chr(i) for i in list(range(0x30, 0x3A)) + list(range(0x41, 0x5B)) + list(range(0x61, 0x7B)) + ] + return "".join([random.choice(vocabulary) for i in range(15)]) + + +def get_repository_data(module, as_string=True, is_path=False): + """Retrieve info on current mecurial repository + + Data is gotten by using the following methods, in order: + - using "hg" executable + - looking for a .hg/dirstate in parent directory of module (or in module/.hg if + is_path is True), and parse dirstate file to get revision + - checking package version, which should have repository data when we are on a dev version + @param module(unicode): module to look for (e.g. sat, libervia) + module can be a path if is_path is True (see below) + @param as_string(bool): if True return a string, else return a dictionary + @param is_path(bool): if True "module" is not handled as a module name, but as an + absolute path to the parent of a ".hg" directory + @return (unicode, dictionary): retrieved info in a nice string, + or a dictionary with retrieved data (key is not present if data is not found), + key can be: + - node: full revision number (40 bits) + - branch: branch name + - date: ISO 8601 format date + - tag: latest tag used in hierarchie + - distance: number of commits since the last tag + """ + global repos_cache_dict + if as_string: + global repos_cache + if repos_cache is not None: + return repos_cache + else: + if repos_cache_dict is not None: + return repos_cache_dict + + if sys.platform == "android": + # FIXME: workaround to avoid trouble on android, need to be fixed properly + repos_cache = "Cagou android build" + return repos_cache + + KEYS = ("node", "node_short", "branch", "date", "tag", "distance") + ori_cwd = os.getcwd() + + if is_path: + repos_root = os.path.abspath(module) + else: + repos_root = os.path.abspath(os.path.dirname(module.__file__)) + + try: + hg_path = procutils.which("hg")[0] + except IndexError: + log.warning("Can't find hg executable") + hg_path = None + hg_data = {} + + if hg_path is not None: + os.chdir(repos_root) + try: + hg_data_raw = subprocess.check_output( + [ + "python3", + hg_path, + "log", + "-r", + "-1", + "--template", + "{node}\n" + "{node|short}\n" + "{branch}\n" + "{date|isodate}\n" + "{latesttag}\n" + "{latesttagdistance}", + ], + text=True + ) + except subprocess.CalledProcessError as e: + log.error(f"Can't get repository data: {e}") + hg_data = {} + except Exception as e: + log.error(f"Unexpected error, can't get repository data : [{type(e)}] {e}") + hg_data = {} + else: + hg_data = dict(list(zip(KEYS, hg_data_raw.split("\n")))) + try: + hg_data["modified"] = "+" in subprocess.check_output(["python3", hg_path, "id", "-i"], text=True) + except subprocess.CalledProcessError: + pass + else: + hg_data = {} + + if not hg_data: + # .hg/dirstate method + log.debug("trying dirstate method") + if is_path: + os.chdir(repos_root) + else: + os.chdir(os.path.abspath(os.path.dirname(repos_root))) + try: + with open(".hg/dirstate", 'rb') as hg_dirstate: + hg_data["node"] = hg_dirstate.read(20).hex() + hg_data["node_short"] = hg_data["node"][:12] + except IOError: + log.debug("Can't access repository data") + + # we restore original working dir + os.chdir(ori_cwd) + + if not hg_data: + log.debug("Mercurial not available or working, trying package version") + try: + import pkg_resources + except ImportError: + log.warning("pkg_resources not available, can't get package data") + else: + try: + pkg_version = pkg_resources.get_distribution(C.APP_NAME_FILE).version + version, local_id = pkg_version.split("+", 1) + except pkg_resources.DistributionNotFound: + log.warning("can't retrieve package data") + except ValueError: + log.info( + "no local version id in package: {pkg_version}".format( + pkg_version=pkg_version + ) + ) + else: + version = version.replace(".dev0", "D") + if version != C.APP_VERSION: + log.warning( + "Incompatible version ({version}) and pkg_version ({pkg_version})" + .format( + version=C.APP_VERSION, pkg_version=pkg_version + ) + ) + else: + try: + hg_node, hg_distance = local_id.split(".") + except ValueError: + log.warning("Version doesn't specify repository data") + hg_data = {"node_short": hg_node, "distance": hg_distance} + + repos_cache_dict = hg_data + + if as_string: + if not hg_data: + repos_cache = NO_REPOS_DATA + else: + strings = ["rev", hg_data["node_short"]] + try: + if hg_data["modified"]: + strings.append("[M]") + except KeyError: + pass + try: + strings.extend(["({branch} {date})".format(**hg_data)]) + except KeyError: + pass + try: + strings.extend(["+{distance}".format(**hg_data)]) + except KeyError: + pass + repos_cache = " ".join(strings) + return repos_cache + else: + return hg_data