comparison libervia/backend/tools/utils.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/tools/utils.py@524856bd7b19
children 10b6ad569157
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3 # SaT: an XMPP client
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 """ various useful methods """
20
21 from typing import Optional, Union
22 import unicodedata
23 import os.path
24 import datetime
25 import subprocess
26 import time
27 import sys
28 import random
29 import inspect
30 import textwrap
31 import functools
32 import asyncio
33 from twisted.python import procutils, failure
34 from twisted.internet import defer
35 from libervia.backend.core.constants import Const as C
36 from libervia.backend.core.log import getLogger
37 from libervia.backend.tools import xmpp_datetime
38
39 log = getLogger(__name__)
40
41
42 NO_REPOS_DATA = "repository data unknown"
43 repos_cache_dict = None
44 repos_cache = None
45
46
47 def clean_ustr(ustr):
48 """Clean unicode string
49
50 remove special characters from unicode string
51 """
52
53 def valid_chars(unicode_source):
54 for char in unicode_source:
55 if unicodedata.category(char) == "Cc" and char != "\n":
56 continue
57 yield char
58
59 return "".join(valid_chars(ustr))
60
61
62 def logError(failure_):
63 """Genertic errback which log the error as a warning, and re-raise it"""
64 log.warning(failure_.value)
65 raise failure_
66
67
68 def partial(func, *fixed_args, **fixed_kwargs):
69 # FIXME: temporary hack to workaround the fact that inspect.getargspec is not working with functools.partial
70 # making partial unusable with current D-bus module (in add_method).
71 # Should not be needed anywore once moved to Python 3
72
73 ori_args = inspect.getargspec(func).args
74 func = functools.partial(func, *fixed_args, **fixed_kwargs)
75 if ori_args[0] == "self":
76 del ori_args[0]
77 ori_args = ori_args[len(fixed_args) :]
78 for kw in fixed_kwargs:
79 ori_args.remove(kw)
80
81 exec(
82 textwrap.dedent(
83 """\
84 def method({args}):
85 return func({kw_args})
86 """
87 ).format(
88 args=", ".join(ori_args), kw_args=", ".join([a + "=" + a for a in ori_args])
89 ),
90 locals(),
91 )
92
93 return method
94
95
96 def as_deferred(func, *args, **kwargs):
97 """Call a method and return a Deferred
98
99 the method can be a simple callable, a Deferred or a coroutine.
100 It is similar to defer.maybeDeferred, but also handles coroutines
101 """
102 try:
103 ret = func(*args, **kwargs)
104 except Exception as e:
105 return defer.fail(failure.Failure(e))
106 else:
107 if asyncio.iscoroutine(ret):
108 return defer.ensureDeferred(ret)
109 elif isinstance(ret, defer.Deferred):
110 return ret
111 elif isinstance(ret, failure.Failure):
112 return defer.fail(ret)
113 else:
114 return defer.succeed(ret)
115
116
117 def aio(func):
118 """Decorator to return a Deferred from asyncio coroutine
119
120 Functions with this decorator are run in asyncio context
121 """
122 def wrapper(*args, **kwargs):
123 return defer.Deferred.fromFuture(asyncio.ensure_future(func(*args, **kwargs)))
124 return wrapper
125
126
127 def as_future(d):
128 return d.asFuture(asyncio.get_event_loop())
129
130
131 def ensure_deferred(func):
132 """Decorator to apply ensureDeferred to a function
133
134 to be used when the function is called by third party library (e.g. wokkel)
135 Otherwise, it's better to use ensureDeferred as early as possible.
136 """
137 def wrapper(*args, **kwargs):
138 return defer.ensureDeferred(func(*args, **kwargs))
139 return wrapper
140
141
142 def xmpp_date(
143 timestamp: Optional[Union[float, int]] = None,
144 with_time: bool = True
145 ) -> str:
146 """Return date according to XEP-0082 specification
147
148 to avoid reveling the timezone, we always return UTC dates
149 the string returned by this method is valid with RFC 3339
150 this function redirects to the functions in the :mod:`sat.tools.datetime` module
151 @param timestamp(None, float): posix timestamp. If None current time will be used
152 @param with_time(bool): if True include the time
153 @return(unicode): XEP-0082 formatted date and time
154 """
155 dtime = datetime.datetime.fromtimestamp(
156 time.time() if timestamp is None else timestamp,
157 datetime.timezone.utc
158 )
159
160 return (
161 xmpp_datetime.format_datetime(dtime) if with_time
162 else xmpp_datetime.format_date(dtime.date())
163 )
164
165
166 def parse_xmpp_date(
167 xmpp_date_str: str,
168 with_time: bool = True
169 ) -> float:
170 """Get timestamp from XEP-0082 datetime
171
172 @param xmpp_date_str: XEP-0082 formatted datetime or time
173 @param with_time: if True, ``xmpp_date_str`` must be a datetime, otherwise if must be
174 a time profile.
175 @return: datetime converted to unix time
176 @raise ValueError: the format is invalid
177 """
178 if with_time:
179 dt = xmpp_datetime.parse_datetime(xmpp_date_str)
180 else:
181 d = xmpp_datetime.parse_date(xmpp_date_str)
182 dt = datetime.datetime.combine(d, datetime.datetime.min.time())
183
184 return dt.timestamp()
185
186
187 def generate_password(vocabulary=None, size=20):
188 """Generate a password with random characters.
189
190 @param vocabulary(iterable): characters to use to create password
191 @param size(int): number of characters in the password to generate
192 @return (unicode): generated password
193 """
194 random.seed()
195 if vocabulary is None:
196 vocabulary = [
197 chr(i) for i in list(range(0x30, 0x3A)) + list(range(0x41, 0x5B)) + list(range(0x61, 0x7B))
198 ]
199 return "".join([random.choice(vocabulary) for i in range(15)])
200
201
202 def get_repository_data(module, as_string=True, is_path=False):
203 """Retrieve info on current mecurial repository
204
205 Data is gotten by using the following methods, in order:
206 - using "hg" executable
207 - looking for a .hg/dirstate in parent directory of module (or in module/.hg if
208 is_path is True), and parse dirstate file to get revision
209 - checking package version, which should have repository data when we are on a dev version
210 @param module(unicode): module to look for (e.g. sat, libervia)
211 module can be a path if is_path is True (see below)
212 @param as_string(bool): if True return a string, else return a dictionary
213 @param is_path(bool): if True "module" is not handled as a module name, but as an
214 absolute path to the parent of a ".hg" directory
215 @return (unicode, dictionary): retrieved info in a nice string,
216 or a dictionary with retrieved data (key is not present if data is not found),
217 key can be:
218 - node: full revision number (40 bits)
219 - branch: branch name
220 - date: ISO 8601 format date
221 - tag: latest tag used in hierarchie
222 - distance: number of commits since the last tag
223 """
224 global repos_cache_dict
225 if as_string:
226 global repos_cache
227 if repos_cache is not None:
228 return repos_cache
229 else:
230 if repos_cache_dict is not None:
231 return repos_cache_dict
232
233 if sys.platform == "android":
234 #  FIXME: workaround to avoid trouble on android, need to be fixed properly
235 repos_cache = "Cagou android build"
236 return repos_cache
237
238 KEYS = ("node", "node_short", "branch", "date", "tag", "distance")
239 ori_cwd = os.getcwd()
240
241 if is_path:
242 repos_root = os.path.abspath(module)
243 else:
244 repos_root = os.path.abspath(os.path.dirname(module.__file__))
245
246 try:
247 hg_path = procutils.which("hg")[0]
248 except IndexError:
249 log.warning("Can't find hg executable")
250 hg_path = None
251 hg_data = {}
252
253 if hg_path is not None:
254 os.chdir(repos_root)
255 try:
256 hg_data_raw = subprocess.check_output(
257 [
258 "python3",
259 hg_path,
260 "log",
261 "-r",
262 "-1",
263 "--template",
264 "{node}\n"
265 "{node|short}\n"
266 "{branch}\n"
267 "{date|isodate}\n"
268 "{latesttag}\n"
269 "{latesttagdistance}",
270 ],
271 text=True
272 )
273 except subprocess.CalledProcessError as e:
274 log.error(f"Can't get repository data: {e}")
275 hg_data = {}
276 except Exception as e:
277 log.error(f"Unexpected error, can't get repository data : [{type(e)}] {e}")
278 hg_data = {}
279 else:
280 hg_data = dict(list(zip(KEYS, hg_data_raw.split("\n"))))
281 try:
282 hg_data["modified"] = "+" in subprocess.check_output(["python3", hg_path, "id", "-i"], text=True)
283 except subprocess.CalledProcessError:
284 pass
285 else:
286 hg_data = {}
287
288 if not hg_data:
289 # .hg/dirstate method
290 log.debug("trying dirstate method")
291 if is_path:
292 os.chdir(repos_root)
293 else:
294 os.chdir(os.path.abspath(os.path.dirname(repos_root)))
295 try:
296 with open(".hg/dirstate", 'rb') as hg_dirstate:
297 hg_data["node"] = hg_dirstate.read(20).hex()
298 hg_data["node_short"] = hg_data["node"][:12]
299 except IOError:
300 log.debug("Can't access repository data")
301
302 # we restore original working dir
303 os.chdir(ori_cwd)
304
305 if not hg_data:
306 log.debug("Mercurial not available or working, trying package version")
307 try:
308 import pkg_resources
309 except ImportError:
310 log.warning("pkg_resources not available, can't get package data")
311 else:
312 try:
313 pkg_version = pkg_resources.get_distribution(C.APP_NAME_FILE).version
314 version, local_id = pkg_version.split("+", 1)
315 except pkg_resources.DistributionNotFound:
316 log.warning("can't retrieve package data")
317 except ValueError:
318 log.info(
319 "no local version id in package: {pkg_version}".format(
320 pkg_version=pkg_version
321 )
322 )
323 else:
324 version = version.replace(".dev0", "D")
325 if version != C.APP_VERSION:
326 log.warning(
327 "Incompatible version ({version}) and pkg_version ({pkg_version})"
328 .format(
329 version=C.APP_VERSION, pkg_version=pkg_version
330 )
331 )
332 else:
333 try:
334 hg_node, hg_distance = local_id.split(".")
335 except ValueError:
336 log.warning("Version doesn't specify repository data")
337 hg_data = {"node_short": hg_node, "distance": hg_distance}
338
339 repos_cache_dict = hg_data
340
341 if as_string:
342 if not hg_data:
343 repos_cache = NO_REPOS_DATA
344 else:
345 strings = ["rev", hg_data["node_short"]]
346 try:
347 if hg_data["modified"]:
348 strings.append("[M]")
349 except KeyError:
350 pass
351 try:
352 strings.extend(["({branch} {date})".format(**hg_data)])
353 except KeyError:
354 pass
355 try:
356 strings.extend(["+{distance}".format(**hg_data)])
357 except KeyError:
358 pass
359 repos_cache = " ".join(strings)
360 return repos_cache
361 else:
362 return hg_data