Mercurial > libervia-backend
comparison libervia/backend/tools/utils.py @ 4270:0d7bb4df2343
Reformatted code base using black.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 19 Jun 2024 18:44:57 +0200 |
parents | 10b6ad569157 |
children |
comparison
equal
deleted
inserted
replaced
4269:64a85ce8be70 | 4270:0d7bb4df2343 |
---|---|
117 def aio(func): | 117 def aio(func): |
118 """Decorator to return a Deferred from asyncio coroutine | 118 """Decorator to return a Deferred from asyncio coroutine |
119 | 119 |
120 Functions with this decorator are run in asyncio context | 120 Functions with this decorator are run in asyncio context |
121 """ | 121 """ |
122 | |
122 def wrapper(*args, **kwargs): | 123 def wrapper(*args, **kwargs): |
123 return defer.Deferred.fromFuture(asyncio.ensure_future(func(*args, **kwargs))) | 124 return defer.Deferred.fromFuture(asyncio.ensure_future(func(*args, **kwargs))) |
125 | |
124 return wrapper | 126 return wrapper |
125 | 127 |
126 | 128 |
127 def as_future(d): | 129 def as_future(d): |
128 return d.asFuture(asyncio.get_event_loop()) | 130 return d.asFuture(asyncio.get_event_loop()) |
132 """Decorator to apply ensureDeferred to a function | 134 """Decorator to apply ensureDeferred to a function |
133 | 135 |
134 to be used when the function is called by third party library (e.g. wokkel) | 136 to be used when the function is called by third party library (e.g. wokkel) |
135 Otherwise, it's better to use ensureDeferred as early as possible. | 137 Otherwise, it's better to use ensureDeferred as early as possible. |
136 """ | 138 """ |
139 | |
137 def wrapper(*args, **kwargs): | 140 def wrapper(*args, **kwargs): |
138 return defer.ensureDeferred(func(*args, **kwargs)) | 141 return defer.ensureDeferred(func(*args, **kwargs)) |
142 | |
139 return wrapper | 143 return wrapper |
140 | 144 |
141 | 145 |
142 def xmpp_date( | 146 def xmpp_date( |
143 timestamp: Optional[Union[float, int]] = None, | 147 timestamp: Optional[Union[float, int]] = None, with_time: bool = True |
144 with_time: bool = True | |
145 ) -> str: | 148 ) -> str: |
146 """Return date according to XEP-0082 specification | 149 """Return date according to XEP-0082 specification |
147 | 150 |
148 to avoid reveling the timezone, we always return UTC dates | 151 to avoid reveling the timezone, we always return UTC dates |
149 the string returned by this method is valid with RFC 3339 | 152 the string returned by this method is valid with RFC 3339 |
151 @param timestamp(None, float): posix timestamp. If None current time will be used | 154 @param timestamp(None, float): posix timestamp. If None current time will be used |
152 @param with_time(bool): if True include the time | 155 @param with_time(bool): if True include the time |
153 @return(unicode): XEP-0082 formatted date and time | 156 @return(unicode): XEP-0082 formatted date and time |
154 """ | 157 """ |
155 dtime = datetime.datetime.fromtimestamp( | 158 dtime = datetime.datetime.fromtimestamp( |
156 time.time() if timestamp is None else timestamp, | 159 time.time() if timestamp is None else timestamp, datetime.timezone.utc |
157 datetime.timezone.utc | |
158 ) | 160 ) |
159 | 161 |
160 return ( | 162 return ( |
161 xmpp_datetime.format_datetime(dtime) if with_time | 163 xmpp_datetime.format_datetime(dtime) |
164 if with_time | |
162 else xmpp_datetime.format_date(dtime.date()) | 165 else xmpp_datetime.format_date(dtime.date()) |
163 ) | 166 ) |
164 | 167 |
165 | 168 |
166 def parse_xmpp_date( | 169 def parse_xmpp_date(xmpp_date_str: str, with_time: bool = True) -> float: |
167 xmpp_date_str: str, | |
168 with_time: bool = True | |
169 ) -> float: | |
170 """Get timestamp from XEP-0082 datetime | 170 """Get timestamp from XEP-0082 datetime |
171 | 171 |
172 @param xmpp_date_str: XEP-0082 formatted datetime or time | 172 @param xmpp_date_str: XEP-0082 formatted datetime or time |
173 @param with_time: if True, ``xmpp_date_str`` must be a datetime, otherwise if must be | 173 @param with_time: if True, ``xmpp_date_str`` must be a datetime, otherwise if must be |
174 a time profile. | 174 a time profile. |
192 @return (unicode): generated password | 192 @return (unicode): generated password |
193 """ | 193 """ |
194 random.seed() | 194 random.seed() |
195 if vocabulary is None: | 195 if vocabulary is None: |
196 vocabulary = [ | 196 vocabulary = [ |
197 chr(i) for i in list(range(0x30, 0x3A)) + list(range(0x41, 0x5B)) + list(range(0x61, 0x7B)) | 197 chr(i) |
198 for i in list(range(0x30, 0x3A)) | |
199 + list(range(0x41, 0x5B)) | |
200 + list(range(0x61, 0x7B)) | |
198 ] | 201 ] |
199 return "".join([random.choice(vocabulary) for i in range(15)]) | 202 return "".join([random.choice(vocabulary) for i in range(15)]) |
200 | 203 |
201 | 204 |
202 def get_repository_data(module, as_string=True, is_path=False): | 205 def get_repository_data(module, as_string=True, is_path=False): |
266 "{branch}\n" | 269 "{branch}\n" |
267 "{date|isodate}\n" | 270 "{date|isodate}\n" |
268 "{latesttag}\n" | 271 "{latesttag}\n" |
269 "{latesttagdistance}", | 272 "{latesttagdistance}", |
270 ], | 273 ], |
271 text=True | 274 text=True, |
272 ) | 275 ) |
273 except subprocess.CalledProcessError as e: | 276 except subprocess.CalledProcessError as e: |
274 log.error(f"Can't get repository data: {e}") | 277 log.error(f"Can't get repository data: {e}") |
275 hg_data = {} | 278 hg_data = {} |
276 except Exception as e: | 279 except Exception as e: |
277 log.error(f"Unexpected error, can't get repository data : [{type(e)}] {e}") | 280 log.error(f"Unexpected error, can't get repository data : [{type(e)}] {e}") |
278 hg_data = {} | 281 hg_data = {} |
279 else: | 282 else: |
280 hg_data = dict(list(zip(KEYS, hg_data_raw.split("\n")))) | 283 hg_data = dict(list(zip(KEYS, hg_data_raw.split("\n")))) |
281 try: | 284 try: |
282 hg_data["modified"] = "+" in subprocess.check_output(["python3", hg_path, "id", "-i"], text=True) | 285 hg_data["modified"] = "+" in subprocess.check_output( |
286 ["python3", hg_path, "id", "-i"], text=True | |
287 ) | |
283 except subprocess.CalledProcessError: | 288 except subprocess.CalledProcessError: |
284 pass | 289 pass |
285 else: | 290 else: |
286 hg_data = {} | 291 hg_data = {} |
287 | 292 |
291 if is_path: | 296 if is_path: |
292 os.chdir(repos_root) | 297 os.chdir(repos_root) |
293 else: | 298 else: |
294 os.chdir(os.path.abspath(os.path.dirname(repos_root))) | 299 os.chdir(os.path.abspath(os.path.dirname(repos_root))) |
295 try: | 300 try: |
296 with open(".hg/dirstate", 'rb') as hg_dirstate: | 301 with open(".hg/dirstate", "rb") as hg_dirstate: |
297 hg_data["node"] = hg_dirstate.read(20).hex() | 302 hg_data["node"] = hg_dirstate.read(20).hex() |
298 hg_data["node_short"] = hg_data["node"][:12] | 303 hg_data["node_short"] = hg_data["node"][:12] |
299 except IOError: | 304 except IOError: |
300 log.debug("Can't access repository data") | 305 log.debug("Can't access repository data") |
301 | 306 |
321 ) | 326 ) |
322 ) | 327 ) |
323 else: | 328 else: |
324 if version != C.APP_VERSION: | 329 if version != C.APP_VERSION: |
325 log.warning( | 330 log.warning( |
326 "Incompatible version ({version}) and pkg_version ({pkg_version})" | 331 "Incompatible version ({version}) and pkg_version ({pkg_version})".format( |
327 .format( | |
328 version=C.APP_VERSION, pkg_version=pkg_version | 332 version=C.APP_VERSION, pkg_version=pkg_version |
329 ) | 333 ) |
330 ) | 334 ) |
331 else: | 335 else: |
332 try: | 336 try: |