Mercurial > libervia-backend
comparison libervia/backend/tools/utils.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/tools/utils.py@524856bd7b19 |
children | 10b6ad569157 |
comparison
equal
deleted
inserted
replaced
4070:d10748475025 | 4071:4b842c1fb686 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # SaT: an XMPP client | |
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 """ various useful methods """ | |
20 | |
21 from typing import Optional, Union | |
22 import unicodedata | |
23 import os.path | |
24 import datetime | |
25 import subprocess | |
26 import time | |
27 import sys | |
28 import random | |
29 import inspect | |
30 import textwrap | |
31 import functools | |
32 import asyncio | |
33 from twisted.python import procutils, failure | |
34 from twisted.internet import defer | |
35 from libervia.backend.core.constants import Const as C | |
36 from libervia.backend.core.log import getLogger | |
37 from libervia.backend.tools import xmpp_datetime | |
38 | |
39 log = getLogger(__name__) | |
40 | |
41 | |
42 NO_REPOS_DATA = "repository data unknown" | |
43 repos_cache_dict = None | |
44 repos_cache = None | |
45 | |
46 | |
47 def clean_ustr(ustr): | |
48 """Clean unicode string | |
49 | |
50 remove special characters from unicode string | |
51 """ | |
52 | |
53 def valid_chars(unicode_source): | |
54 for char in unicode_source: | |
55 if unicodedata.category(char) == "Cc" and char != "\n": | |
56 continue | |
57 yield char | |
58 | |
59 return "".join(valid_chars(ustr)) | |
60 | |
61 | |
62 def logError(failure_): | |
63 """Genertic errback which log the error as a warning, and re-raise it""" | |
64 log.warning(failure_.value) | |
65 raise failure_ | |
66 | |
67 | |
68 def partial(func, *fixed_args, **fixed_kwargs): | |
69 # FIXME: temporary hack to workaround the fact that inspect.getargspec is not working with functools.partial | |
70 # making partial unusable with current D-bus module (in add_method). | |
71 # Should not be needed anywore once moved to Python 3 | |
72 | |
73 ori_args = inspect.getargspec(func).args | |
74 func = functools.partial(func, *fixed_args, **fixed_kwargs) | |
75 if ori_args[0] == "self": | |
76 del ori_args[0] | |
77 ori_args = ori_args[len(fixed_args) :] | |
78 for kw in fixed_kwargs: | |
79 ori_args.remove(kw) | |
80 | |
81 exec( | |
82 textwrap.dedent( | |
83 """\ | |
84 def method({args}): | |
85 return func({kw_args}) | |
86 """ | |
87 ).format( | |
88 args=", ".join(ori_args), kw_args=", ".join([a + "=" + a for a in ori_args]) | |
89 ), | |
90 locals(), | |
91 ) | |
92 | |
93 return method | |
94 | |
95 | |
96 def as_deferred(func, *args, **kwargs): | |
97 """Call a method and return a Deferred | |
98 | |
99 the method can be a simple callable, a Deferred or a coroutine. | |
100 It is similar to defer.maybeDeferred, but also handles coroutines | |
101 """ | |
102 try: | |
103 ret = func(*args, **kwargs) | |
104 except Exception as e: | |
105 return defer.fail(failure.Failure(e)) | |
106 else: | |
107 if asyncio.iscoroutine(ret): | |
108 return defer.ensureDeferred(ret) | |
109 elif isinstance(ret, defer.Deferred): | |
110 return ret | |
111 elif isinstance(ret, failure.Failure): | |
112 return defer.fail(ret) | |
113 else: | |
114 return defer.succeed(ret) | |
115 | |
116 | |
117 def aio(func): | |
118 """Decorator to return a Deferred from asyncio coroutine | |
119 | |
120 Functions with this decorator are run in asyncio context | |
121 """ | |
122 def wrapper(*args, **kwargs): | |
123 return defer.Deferred.fromFuture(asyncio.ensure_future(func(*args, **kwargs))) | |
124 return wrapper | |
125 | |
126 | |
127 def as_future(d): | |
128 return d.asFuture(asyncio.get_event_loop()) | |
129 | |
130 | |
131 def ensure_deferred(func): | |
132 """Decorator to apply ensureDeferred to a function | |
133 | |
134 to be used when the function is called by third party library (e.g. wokkel) | |
135 Otherwise, it's better to use ensureDeferred as early as possible. | |
136 """ | |
137 def wrapper(*args, **kwargs): | |
138 return defer.ensureDeferred(func(*args, **kwargs)) | |
139 return wrapper | |
140 | |
141 | |
142 def xmpp_date( | |
143 timestamp: Optional[Union[float, int]] = None, | |
144 with_time: bool = True | |
145 ) -> str: | |
146 """Return date according to XEP-0082 specification | |
147 | |
148 to avoid reveling the timezone, we always return UTC dates | |
149 the string returned by this method is valid with RFC 3339 | |
150 this function redirects to the functions in the :mod:`sat.tools.datetime` module | |
151 @param timestamp(None, float): posix timestamp. If None current time will be used | |
152 @param with_time(bool): if True include the time | |
153 @return(unicode): XEP-0082 formatted date and time | |
154 """ | |
155 dtime = datetime.datetime.fromtimestamp( | |
156 time.time() if timestamp is None else timestamp, | |
157 datetime.timezone.utc | |
158 ) | |
159 | |
160 return ( | |
161 xmpp_datetime.format_datetime(dtime) if with_time | |
162 else xmpp_datetime.format_date(dtime.date()) | |
163 ) | |
164 | |
165 | |
166 def parse_xmpp_date( | |
167 xmpp_date_str: str, | |
168 with_time: bool = True | |
169 ) -> float: | |
170 """Get timestamp from XEP-0082 datetime | |
171 | |
172 @param xmpp_date_str: XEP-0082 formatted datetime or time | |
173 @param with_time: if True, ``xmpp_date_str`` must be a datetime, otherwise if must be | |
174 a time profile. | |
175 @return: datetime converted to unix time | |
176 @raise ValueError: the format is invalid | |
177 """ | |
178 if with_time: | |
179 dt = xmpp_datetime.parse_datetime(xmpp_date_str) | |
180 else: | |
181 d = xmpp_datetime.parse_date(xmpp_date_str) | |
182 dt = datetime.datetime.combine(d, datetime.datetime.min.time()) | |
183 | |
184 return dt.timestamp() | |
185 | |
186 | |
187 def generate_password(vocabulary=None, size=20): | |
188 """Generate a password with random characters. | |
189 | |
190 @param vocabulary(iterable): characters to use to create password | |
191 @param size(int): number of characters in the password to generate | |
192 @return (unicode): generated password | |
193 """ | |
194 random.seed() | |
195 if vocabulary is None: | |
196 vocabulary = [ | |
197 chr(i) for i in list(range(0x30, 0x3A)) + list(range(0x41, 0x5B)) + list(range(0x61, 0x7B)) | |
198 ] | |
199 return "".join([random.choice(vocabulary) for i in range(15)]) | |
200 | |
201 | |
202 def get_repository_data(module, as_string=True, is_path=False): | |
203 """Retrieve info on current mecurial repository | |
204 | |
205 Data is gotten by using the following methods, in order: | |
206 - using "hg" executable | |
207 - looking for a .hg/dirstate in parent directory of module (or in module/.hg if | |
208 is_path is True), and parse dirstate file to get revision | |
209 - checking package version, which should have repository data when we are on a dev version | |
210 @param module(unicode): module to look for (e.g. sat, libervia) | |
211 module can be a path if is_path is True (see below) | |
212 @param as_string(bool): if True return a string, else return a dictionary | |
213 @param is_path(bool): if True "module" is not handled as a module name, but as an | |
214 absolute path to the parent of a ".hg" directory | |
215 @return (unicode, dictionary): retrieved info in a nice string, | |
216 or a dictionary with retrieved data (key is not present if data is not found), | |
217 key can be: | |
218 - node: full revision number (40 bits) | |
219 - branch: branch name | |
220 - date: ISO 8601 format date | |
221 - tag: latest tag used in hierarchie | |
222 - distance: number of commits since the last tag | |
223 """ | |
224 global repos_cache_dict | |
225 if as_string: | |
226 global repos_cache | |
227 if repos_cache is not None: | |
228 return repos_cache | |
229 else: | |
230 if repos_cache_dict is not None: | |
231 return repos_cache_dict | |
232 | |
233 if sys.platform == "android": | |
234 # FIXME: workaround to avoid trouble on android, need to be fixed properly | |
235 repos_cache = "Cagou android build" | |
236 return repos_cache | |
237 | |
238 KEYS = ("node", "node_short", "branch", "date", "tag", "distance") | |
239 ori_cwd = os.getcwd() | |
240 | |
241 if is_path: | |
242 repos_root = os.path.abspath(module) | |
243 else: | |
244 repos_root = os.path.abspath(os.path.dirname(module.__file__)) | |
245 | |
246 try: | |
247 hg_path = procutils.which("hg")[0] | |
248 except IndexError: | |
249 log.warning("Can't find hg executable") | |
250 hg_path = None | |
251 hg_data = {} | |
252 | |
253 if hg_path is not None: | |
254 os.chdir(repos_root) | |
255 try: | |
256 hg_data_raw = subprocess.check_output( | |
257 [ | |
258 "python3", | |
259 hg_path, | |
260 "log", | |
261 "-r", | |
262 "-1", | |
263 "--template", | |
264 "{node}\n" | |
265 "{node|short}\n" | |
266 "{branch}\n" | |
267 "{date|isodate}\n" | |
268 "{latesttag}\n" | |
269 "{latesttagdistance}", | |
270 ], | |
271 text=True | |
272 ) | |
273 except subprocess.CalledProcessError as e: | |
274 log.error(f"Can't get repository data: {e}") | |
275 hg_data = {} | |
276 except Exception as e: | |
277 log.error(f"Unexpected error, can't get repository data : [{type(e)}] {e}") | |
278 hg_data = {} | |
279 else: | |
280 hg_data = dict(list(zip(KEYS, hg_data_raw.split("\n")))) | |
281 try: | |
282 hg_data["modified"] = "+" in subprocess.check_output(["python3", hg_path, "id", "-i"], text=True) | |
283 except subprocess.CalledProcessError: | |
284 pass | |
285 else: | |
286 hg_data = {} | |
287 | |
288 if not hg_data: | |
289 # .hg/dirstate method | |
290 log.debug("trying dirstate method") | |
291 if is_path: | |
292 os.chdir(repos_root) | |
293 else: | |
294 os.chdir(os.path.abspath(os.path.dirname(repos_root))) | |
295 try: | |
296 with open(".hg/dirstate", 'rb') as hg_dirstate: | |
297 hg_data["node"] = hg_dirstate.read(20).hex() | |
298 hg_data["node_short"] = hg_data["node"][:12] | |
299 except IOError: | |
300 log.debug("Can't access repository data") | |
301 | |
302 # we restore original working dir | |
303 os.chdir(ori_cwd) | |
304 | |
305 if not hg_data: | |
306 log.debug("Mercurial not available or working, trying package version") | |
307 try: | |
308 import pkg_resources | |
309 except ImportError: | |
310 log.warning("pkg_resources not available, can't get package data") | |
311 else: | |
312 try: | |
313 pkg_version = pkg_resources.get_distribution(C.APP_NAME_FILE).version | |
314 version, local_id = pkg_version.split("+", 1) | |
315 except pkg_resources.DistributionNotFound: | |
316 log.warning("can't retrieve package data") | |
317 except ValueError: | |
318 log.info( | |
319 "no local version id in package: {pkg_version}".format( | |
320 pkg_version=pkg_version | |
321 ) | |
322 ) | |
323 else: | |
324 version = version.replace(".dev0", "D") | |
325 if version != C.APP_VERSION: | |
326 log.warning( | |
327 "Incompatible version ({version}) and pkg_version ({pkg_version})" | |
328 .format( | |
329 version=C.APP_VERSION, pkg_version=pkg_version | |
330 ) | |
331 ) | |
332 else: | |
333 try: | |
334 hg_node, hg_distance = local_id.split(".") | |
335 except ValueError: | |
336 log.warning("Version doesn't specify repository data") | |
337 hg_data = {"node_short": hg_node, "distance": hg_distance} | |
338 | |
339 repos_cache_dict = hg_data | |
340 | |
341 if as_string: | |
342 if not hg_data: | |
343 repos_cache = NO_REPOS_DATA | |
344 else: | |
345 strings = ["rev", hg_data["node_short"]] | |
346 try: | |
347 if hg_data["modified"]: | |
348 strings.append("[M]") | |
349 except KeyError: | |
350 pass | |
351 try: | |
352 strings.extend(["({branch} {date})".format(**hg_data)]) | |
353 except KeyError: | |
354 pass | |
355 try: | |
356 strings.extend(["+{distance}".format(**hg_data)]) | |
357 except KeyError: | |
358 pass | |
359 repos_cache = " ".join(strings) | |
360 return repos_cache | |
361 else: | |
362 return hg_data |