Mercurial > libervia-backend
view sat/tools/utils.py @ 3891:989df1047c3c
tests (AP gateway): reactions tests:
2 new tests check XMPP <=> AP reactions conversion
rel 371
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 31 Aug 2022 17:07:03 +0200 |
parents | 46930301f0c1 |
children | 022ae35a9d82 |
line wrap: on
line source
#!/usr/bin/env python3 # SaT: an XMPP client # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. """ various useful methods """ from typing import Optional, Union import unicodedata import os.path import datetime import subprocess import time import sys import random import inspect import textwrap import functools import asyncio from twisted.python import procutils, failure from twisted.internet import defer from sat.core.constants import Const as C from sat.core.log import getLogger from sat.tools import xmpp_datetime log = getLogger(__name__) NO_REPOS_DATA = "repository data unknown" repos_cache_dict = None repos_cache = None def clean_ustr(ustr): """Clean unicode string remove special characters from unicode string """ def valid_chars(unicode_source): for char in unicode_source: if unicodedata.category(char) == "Cc" and char != "\n": continue yield char return "".join(valid_chars(ustr)) def logError(failure_): """Genertic errback which log the error as a warning, and re-raise it""" log.warning(failure_.value) raise failure_ def partial(func, *fixed_args, **fixed_kwargs): # FIXME: temporary hack to workaround the fact that inspect.getargspec is not working with functools.partial # making partial unusable with current D-bus module (in addMethod). # Should not be needed anywore once moved to Python 3 ori_args = inspect.getargspec(func).args func = functools.partial(func, *fixed_args, **fixed_kwargs) if ori_args[0] == "self": del ori_args[0] ori_args = ori_args[len(fixed_args) :] for kw in fixed_kwargs: ori_args.remove(kw) exec( textwrap.dedent( """\ def method({args}): return func({kw_args}) """ ).format( args=", ".join(ori_args), kw_args=", ".join([a + "=" + a for a in ori_args]) ), locals(), ) return method def asDeferred(func, *args, **kwargs): """Call a method and return a Deferred the method can be a simple callable, a Deferred or a coroutine. It is similar to defer.maybeDeferred, but also handles coroutines """ try: ret = func(*args, **kwargs) except Exception as e: return defer.fail(failure.Failure(e)) else: if asyncio.iscoroutine(ret): return defer.ensureDeferred(ret) elif isinstance(ret, defer.Deferred): return ret elif isinstance(ret, failure.Failure): return defer.fail(ret) else: return defer.succeed(ret) def aio(func): """Decorator to return a Deferred from asyncio coroutine Functions with this decorator are run in asyncio context """ def wrapper(*args, **kwargs): return defer.Deferred.fromFuture(asyncio.ensure_future(func(*args, **kwargs))) return wrapper def as_future(d): return d.asFuture(asyncio.get_event_loop()) def ensure_deferred(func): """Decorator to apply ensureDeferred to a function to be used when the function is called by third party library (e.g. wokkel) Otherwise, it's better to use ensureDeferred as early as possible. """ def wrapper(*args, **kwargs): return defer.ensureDeferred(func(*args, **kwargs)) return wrapper def xmpp_date( timestamp: Optional[Union[float, int]] = None, with_time: bool = True ) -> str: """Return date according to XEP-0082 specification to avoid reveling the timezone, we always return UTC dates the string returned by this method is valid with RFC 3339 this function redirects to the functions in the :mod:`sat.tools.datetime` module @param timestamp(None, float): posix timestamp. If None current time will be used @param with_time(bool): if True include the time @return(unicode): XEP-0082 formatted date and time """ dtime = datetime.datetime.fromtimestamp( time.time() if timestamp is None else timestamp, datetime.timezone.utc ) return ( xmpp_datetime.format_datetime(dtime) if with_time else xmpp_datetime.format_date(dtime.date()) ) def generatePassword(vocabulary=None, size=20): """Generate a password with random characters. @param vocabulary(iterable): characters to use to create password @param size(int): number of characters in the password to generate @return (unicode): generated password """ random.seed() if vocabulary is None: vocabulary = [ chr(i) for i in list(range(0x30, 0x3A)) + list(range(0x41, 0x5B)) + list(range(0x61, 0x7B)) ] return "".join([random.choice(vocabulary) for i in range(15)]) def getRepositoryData(module, as_string=True, is_path=False): """Retrieve info on current mecurial repository Data is gotten by using the following methods, in order: - using "hg" executable - looking for a .hg/dirstate in parent directory of module (or in module/.hg if is_path is True), and parse dirstate file to get revision - checking package version, which should have repository data when we are on a dev version @param module(unicode): module to look for (e.g. sat, libervia) module can be a path if is_path is True (see below) @param as_string(bool): if True return a string, else return a dictionary @param is_path(bool): if True "module" is not handled as a module name, but as an absolute path to the parent of a ".hg" directory @return (unicode, dictionary): retrieved info in a nice string, or a dictionary with retrieved data (key is not present if data is not found), key can be: - node: full revision number (40 bits) - branch: branch name - date: ISO 8601 format date - tag: latest tag used in hierarchie - distance: number of commits since the last tag """ global repos_cache_dict if as_string: global repos_cache if repos_cache is not None: return repos_cache else: if repos_cache_dict is not None: return repos_cache_dict if sys.platform == "android": # FIXME: workaround to avoid trouble on android, need to be fixed properly repos_cache = "Cagou android build" return repos_cache KEYS = ("node", "node_short", "branch", "date", "tag", "distance") ori_cwd = os.getcwd() if is_path: repos_root = os.path.abspath(module) else: repos_root = os.path.abspath(os.path.dirname(module.__file__)) try: hg_path = procutils.which("hg")[0] except IndexError: log.warning("Can't find hg executable") hg_path = None hg_data = {} if hg_path is not None: os.chdir(repos_root) try: hg_data_raw = subprocess.check_output( [ "python3", hg_path, "log", "-r", "-1", "--template", "{node}\n" "{node|short}\n" "{branch}\n" "{date|isodate}\n" "{latesttag}\n" "{latesttagdistance}", ], text=True ) except subprocess.CalledProcessError as e: log.error(f"Can't get repository data: {e}") hg_data = {} except Exception as e: log.error(f"Unexpected error, can't get repository data : [{type(e)}] {e}") hg_data = {} else: hg_data = dict(list(zip(KEYS, hg_data_raw.split("\n")))) try: hg_data["modified"] = "+" in subprocess.check_output(["python3", hg_path, "id", "-i"], text=True) except subprocess.CalledProcessError: pass else: hg_data = {} if not hg_data: # .hg/dirstate method log.debug("trying dirstate method") if is_path: os.chdir(repos_root) else: os.chdir(os.path.abspath(os.path.dirname(repos_root))) try: with open(".hg/dirstate", 'rb') as hg_dirstate: hg_data["node"] = hg_dirstate.read(20).hex() hg_data["node_short"] = hg_data["node"][:12] except IOError: log.debug("Can't access repository data") # we restore original working dir os.chdir(ori_cwd) if not hg_data: log.debug("Mercurial not available or working, trying package version") try: import pkg_resources except ImportError: log.warning("pkg_resources not available, can't get package data") else: try: pkg_version = pkg_resources.get_distribution(C.APP_NAME_FILE).version version, local_id = pkg_version.split("+", 1) except pkg_resources.DistributionNotFound: log.warning("can't retrieve package data") except ValueError: log.info( "no local version id in package: {pkg_version}".format( pkg_version=pkg_version ) ) else: version = version.replace(".dev0", "D") if version != C.APP_VERSION: log.warning( "Incompatible version ({version}) and pkg_version ({pkg_version})" .format( version=C.APP_VERSION, pkg_version=pkg_version ) ) else: try: hg_node, hg_distance = local_id.split(".") except ValueError: log.warning("Version doesn't specify repository data") hg_data = {"node_short": hg_node, "distance": hg_distance} repos_cache_dict = hg_data if as_string: if not hg_data: repos_cache = NO_REPOS_DATA else: strings = ["rev", hg_data["node_short"]] try: if hg_data["modified"]: strings.append("[M]") except KeyError: pass try: strings.extend(["({branch} {date})".format(**hg_data)]) except KeyError: pass try: strings.extend(["+{distance}".format(**hg_data)]) except KeyError: pass repos_cache = " ".join(strings) return repos_cache else: return hg_data