Mercurial > libervia-backend
comparison sat/tools/utils.py @ 2624:56f94936df1e
code style reformatting using black
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 27 Jun 2018 20:14:46 +0200 |
parents | 3e4e78de9cca |
children | 003b8b4b56a7 |
comparison
equal
deleted
inserted
replaced
2623:49533de4540b | 2624:56f94936df1e |
---|---|
21 | 21 |
22 import unicodedata | 22 import unicodedata |
23 import os.path | 23 import os.path |
24 from sat.core.constants import Const as C | 24 from sat.core.constants import Const as C |
25 from sat.core.log import getLogger | 25 from sat.core.log import getLogger |
26 | |
26 log = getLogger(__name__) | 27 log = getLogger(__name__) |
27 import datetime | 28 import datetime |
28 from twisted.python import procutils | 29 from twisted.python import procutils |
29 import subprocess | 30 import subprocess |
30 import time | 31 import time |
33 import inspect | 34 import inspect |
34 import textwrap | 35 import textwrap |
35 import functools | 36 import functools |
36 | 37 |
37 | 38 |
38 NO_REPOS_DATA = u'repository data unknown' | 39 NO_REPOS_DATA = u"repository data unknown" |
39 repos_cache_dict = None | 40 repos_cache_dict = None |
40 repos_cache = None | 41 repos_cache = None |
41 | 42 |
42 | 43 |
43 def clean_ustr(ustr): | 44 def clean_ustr(ustr): |
44 """Clean unicode string | 45 """Clean unicode string |
45 | 46 |
46 remove special characters from unicode string | 47 remove special characters from unicode string |
47 """ | 48 """ |
49 | |
48 def valid_chars(unicode_source): | 50 def valid_chars(unicode_source): |
49 for char in unicode_source: | 51 for char in unicode_source: |
50 if unicodedata.category(char) == 'Cc' and char!='\n': | 52 if unicodedata.category(char) == "Cc" and char != "\n": |
51 continue | 53 continue |
52 yield char | 54 yield char |
53 return ''.join(valid_chars(ustr)) | 55 |
56 return "".join(valid_chars(ustr)) | |
57 | |
54 | 58 |
55 def partial(func, *fixed_args, **fixed_kwargs): | 59 def partial(func, *fixed_args, **fixed_kwargs): |
56 # FIXME: temporary hack to workaround the fact that inspect.getargspec is not working with functools.partial | 60 # FIXME: temporary hack to workaround the fact that inspect.getargspec is not working with functools.partial |
57 # making partial unusable with current D-bus module (in addMethod). | 61 # making partial unusable with current D-bus module (in addMethod). |
58 # Should not be needed anywore once moved to Python 3 | 62 # Should not be needed anywore once moved to Python 3 |
59 | 63 |
60 ori_args = inspect.getargspec(func).args | 64 ori_args = inspect.getargspec(func).args |
61 func = functools.partial(func, *fixed_args, **fixed_kwargs) | 65 func = functools.partial(func, *fixed_args, **fixed_kwargs) |
62 if ori_args[0] == 'self': | 66 if ori_args[0] == "self": |
63 del ori_args[0] | 67 del ori_args[0] |
64 ori_args = ori_args[len(fixed_args):] | 68 ori_args = ori_args[len(fixed_args) :] |
65 for kw in fixed_kwargs: | 69 for kw in fixed_kwargs: |
66 ori_args.remove(kw) | 70 ori_args.remove(kw) |
67 | 71 |
68 exec(textwrap.dedent('''\ | 72 exec( |
73 textwrap.dedent( | |
74 """\ | |
69 def method({args}): | 75 def method({args}): |
70 return func({kw_args}) | 76 return func({kw_args}) |
71 ''').format( | 77 """ |
72 args = ', '.join(ori_args), | 78 ).format( |
73 kw_args = ', '.join([a+'='+a for a in ori_args])) | 79 args=", ".join(ori_args), kw_args=", ".join([a + "=" + a for a in ori_args]) |
74 , locals()) | 80 ), |
81 locals(), | |
82 ) | |
75 | 83 |
76 return method | 84 return method |
85 | |
77 | 86 |
78 def xmpp_date(timestamp=None, with_time=True): | 87 def xmpp_date(timestamp=None, with_time=True): |
79 """Return date according to XEP-0082 specification | 88 """Return date according to XEP-0082 specification |
80 | 89 |
81 to avoid reveling the timezone, we always return UTC dates | 90 to avoid reveling the timezone, we always return UTC dates |
84 @param with_time(bool): if True include the time | 93 @param with_time(bool): if True include the time |
85 @return(unicode): XEP-0082 formatted date and time | 94 @return(unicode): XEP-0082 formatted date and time |
86 """ | 95 """ |
87 template_date = u"%Y-%m-%d" | 96 template_date = u"%Y-%m-%d" |
88 template_time = u"%H:%M:%SZ" | 97 template_time = u"%H:%M:%SZ" |
89 template = u"{}T{}".format(template_date, template_time) if with_time else template_date | 98 template = ( |
90 return datetime.datetime.utcfromtimestamp(time.time() if timestamp is None else timestamp).strftime(template) | 99 u"{}T{}".format(template_date, template_time) if with_time else template_date |
100 ) | |
101 return datetime.datetime.utcfromtimestamp( | |
102 time.time() if timestamp is None else timestamp | |
103 ).strftime(template) | |
104 | |
91 | 105 |
92 def generatePassword(vocabulary=None, size=20): | 106 def generatePassword(vocabulary=None, size=20): |
93 """Generate a password with random characters. | 107 """Generate a password with random characters. |
94 | 108 |
95 @param vocabulary(iterable): characters to use to create password | 109 @param vocabulary(iterable): characters to use to create password |
96 @param size(int): number of characters in the password to generate | 110 @param size(int): number of characters in the password to generate |
97 @return (unicode): generated password | 111 @return (unicode): generated password |
98 """ | 112 """ |
99 random.seed() | 113 random.seed() |
100 if vocabulary is None: | 114 if vocabulary is None: |
101 vocabulary = [chr(i) for i in range(0x30,0x3A) + range(0x41,0x5B) + range (0x61,0x7B)] | 115 vocabulary = [ |
102 return u''.join([random.choice(vocabulary) for i in range(15)]) | 116 chr(i) for i in range(0x30, 0x3A) + range(0x41, 0x5B) + range(0x61, 0x7B) |
117 ] | |
118 return u"".join([random.choice(vocabulary) for i in range(15)]) | |
119 | |
103 | 120 |
104 def getRepositoryData(module, as_string=True, is_path=False): | 121 def getRepositoryData(module, as_string=True, is_path=False): |
105 """Retrieve info on current mecurial repository | 122 """Retrieve info on current mecurial repository |
106 | 123 |
107 Data is gotten by using the following methods, in order: | 124 Data is gotten by using the following methods, in order: |
131 else: | 148 else: |
132 if repos_cache_dict is not None: | 149 if repos_cache_dict is not None: |
133 return repos_cache_dict | 150 return repos_cache_dict |
134 | 151 |
135 if sys.platform == "android": | 152 if sys.platform == "android": |
136 # FIXME: workaround to avoid trouble on android, need to be fixed properly | 153 # FIXME: workaround to avoid trouble on android, need to be fixed properly |
137 repos_cache = u"Cagou android build" | 154 repos_cache = u"Cagou android build" |
138 return repos_cache | 155 return repos_cache |
139 | 156 |
140 KEYS=("node", "node_short", "branch", "date", "tag", "distance") | 157 KEYS = ("node", "node_short", "branch", "date", "tag", "distance") |
141 ori_cwd = os.getcwd() | 158 ori_cwd = os.getcwd() |
142 | 159 |
143 if is_path: | 160 if is_path: |
144 repos_root = os.path.abspath(module) | 161 repos_root = os.path.abspath(module) |
145 else: | 162 else: |
146 repos_root = os.path.abspath(os.path.dirname(module.__file__)) | 163 repos_root = os.path.abspath(os.path.dirname(module.__file__)) |
147 | 164 |
148 try: | 165 try: |
149 hg_path = procutils.which('hg')[0] | 166 hg_path = procutils.which("hg")[0] |
150 except IndexError: | 167 except IndexError: |
151 log.warning(u"Can't find hg executable") | 168 log.warning(u"Can't find hg executable") |
152 hg_path = None | 169 hg_path = None |
153 hg_data = {} | 170 hg_data = {} |
154 | 171 |
155 if hg_path is not None: | 172 if hg_path is not None: |
156 os.chdir(repos_root) | 173 os.chdir(repos_root) |
157 try: | 174 try: |
158 hg_data_raw = subprocess.check_output(["hg","log", "-r", "-1", "--template","{node}\n" | 175 hg_data_raw = subprocess.check_output( |
159 "{node|short}\n" | 176 [ |
160 "{branch}\n" | 177 "hg", |
161 "{date|isodate}\n" | 178 "log", |
162 "{latesttag}\n" | 179 "-r", |
163 "{latesttagdistance}"]) | 180 "-1", |
181 "--template", | |
182 "{node}\n" | |
183 "{node|short}\n" | |
184 "{branch}\n" | |
185 "{date|isodate}\n" | |
186 "{latesttag}\n" | |
187 "{latesttagdistance}", | |
188 ] | |
189 ) | |
164 except subprocess.CalledProcessError: | 190 except subprocess.CalledProcessError: |
165 hg_data = {} | 191 hg_data = {} |
166 else: | 192 else: |
167 hg_data = dict(zip(KEYS, hg_data_raw.split('\n'))) | 193 hg_data = dict(zip(KEYS, hg_data_raw.split("\n"))) |
168 try: | 194 try: |
169 hg_data['modified'] = '+' in subprocess.check_output(["hg","id","-i"]) | 195 hg_data["modified"] = "+" in subprocess.check_output(["hg", "id", "-i"]) |
170 except subprocess.CalledProcessError: | 196 except subprocess.CalledProcessError: |
171 pass | 197 pass |
172 else: | 198 else: |
173 hg_data = {} | 199 hg_data = {} |
174 | 200 |
178 if is_path: | 204 if is_path: |
179 os.chdir(repos_root) | 205 os.chdir(repos_root) |
180 else: | 206 else: |
181 os.chdir(os.path.abspath(os.path.dirname(repos_root))) | 207 os.chdir(os.path.abspath(os.path.dirname(repos_root))) |
182 try: | 208 try: |
183 with open('.hg/dirstate') as hg_dirstate: | 209 with open(".hg/dirstate") as hg_dirstate: |
184 hg_data['node'] = hg_dirstate.read(20).encode('hex') | 210 hg_data["node"] = hg_dirstate.read(20).encode("hex") |
185 hg_data['node_short'] = hg_data['node'][:12] | 211 hg_data["node_short"] = hg_data["node"][:12] |
186 except IOError: | 212 except IOError: |
187 log.debug(u"Can't access repository data") | 213 log.debug(u"Can't access repository data") |
188 | 214 |
189 # we restore original working dir | 215 # we restore original working dir |
190 os.chdir(ori_cwd) | 216 os.chdir(ori_cwd) |
191 | 217 |
192 if not hg_data: | 218 if not hg_data: |
193 log.debug(u"Mercurial not available or working, trying package version") | 219 log.debug(u"Mercurial not available or working, trying package version") |
194 try: | 220 try: |
195 import pkg_resources | 221 import pkg_resources |
222 | |
196 pkg_version = pkg_resources.get_distribution(C.APP_NAME_FILE).version | 223 pkg_version = pkg_resources.get_distribution(C.APP_NAME_FILE).version |
197 version, local_id = pkg_version.split('+', 1) | 224 version, local_id = pkg_version.split("+", 1) |
198 except ImportError: | 225 except ImportError: |
199 log.warning("pkg_resources not available, can't get package data") | 226 log.warning("pkg_resources not available, can't get package data") |
200 except pkg_resources.DistributionNotFound: | 227 except pkg_resources.DistributionNotFound: |
201 log.warning("can't retrieve package data") | 228 log.warning("can't retrieve package data") |
202 except ValueError: | 229 except ValueError: |
203 log.info(u"no local version id in package: {pkg_version}".format(pkg_version=pkg_version)) | 230 log.info( |
204 else: | 231 u"no local version id in package: {pkg_version}".format( |
205 version = version.replace('.dev0', 'D') | 232 pkg_version=pkg_version |
233 ) | |
234 ) | |
235 else: | |
236 version = version.replace(".dev0", "D") | |
206 if version != C.APP_VERSION: | 237 if version != C.APP_VERSION: |
207 log.warning("Incompatible version ({version}) and pkg_version ({pkg_version})".format( | 238 log.warning( |
208 version=C.APP_VERSION, pkg_version=pkg_version)) | 239 "Incompatible version ({version}) and pkg_version ({pkg_version})".format( |
240 version=C.APP_VERSION, pkg_version=pkg_version | |
241 ) | |
242 ) | |
209 else: | 243 else: |
210 try: | 244 try: |
211 hg_node, hg_distance = local_id.split('.') | 245 hg_node, hg_distance = local_id.split(".") |
212 except ValueError: | 246 except ValueError: |
213 log.warning("Version doesn't specify repository data") | 247 log.warning("Version doesn't specify repository data") |
214 hg_data = {'node_short': hg_node, 'distance': hg_distance} | 248 hg_data = {"node_short": hg_node, "distance": hg_distance} |
215 | 249 |
216 repos_cache_dict = hg_data | 250 repos_cache_dict = hg_data |
217 | 251 |
218 if as_string: | 252 if as_string: |
219 if not hg_data: | 253 if not hg_data: |
220 repos_cache = NO_REPOS_DATA | 254 repos_cache = NO_REPOS_DATA |
221 else: | 255 else: |
222 strings = [u'rev', hg_data['node_short']] | 256 strings = [u"rev", hg_data["node_short"]] |
223 try: | 257 try: |
224 if hg_data['modified']: | 258 if hg_data["modified"]: |
225 strings.append(u"[M]") | 259 strings.append(u"[M]") |
226 except KeyError: | 260 except KeyError: |
227 pass | 261 pass |
228 try: | 262 try: |
229 strings.extend([u'({branch} {date})'.format(**hg_data)]) | 263 strings.extend([u"({branch} {date})".format(**hg_data)]) |
230 except KeyError: | 264 except KeyError: |
231 pass | 265 pass |
232 try: | 266 try: |
233 strings.extend([u'+{distance}'.format(**hg_data)]) | 267 strings.extend([u"+{distance}".format(**hg_data)]) |
234 except KeyError: | 268 except KeyError: |
235 pass | 269 pass |
236 repos_cache = u' '.join(strings) | 270 repos_cache = u" ".join(strings) |
237 return repos_cache | 271 return repos_cache |
238 else: | 272 else: |
239 return hg_data | 273 return hg_data |