Mercurial > libervia-backend
comparison libervia/backend/core/log.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/core/log.py@524856bd7b19 |
children | 0d7bb4df2343 |
comparison
equal
deleted
inserted
replaced
4070:d10748475025 | 4071:4b842c1fb686 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 | |
4 # Libervia: an XMPP client | |
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
6 | |
7 # This program is free software: you can redistribute it and/or modify | |
8 # it under the terms of the GNU Affero General Public License as published by | |
9 # the Free Software Foundation, either version 3 of the License, or | |
10 # (at your option) any later version. | |
11 | |
12 # This program is distributed in the hope that it will be useful, | |
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
15 # GNU Affero General Public License for more details. | |
16 | |
17 # You should have received a copy of the GNU Affero General Public License | |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
19 | |
20 """High level logging functions""" | |
21 # XXX: this module use standard logging module when possible, but as SàT can work in different cases where logging is not the best choice (twisted, pyjamas, etc), it is necessary to have a dedicated module. Additional feature like environment variables and colors are also managed. | |
22 # TODO: change formatting from "%s" style to "{}" when moved to Python 3 | |
23 | |
24 from typing import TYPE_CHECKING, Any, Optional, Dict | |
25 | |
26 if TYPE_CHECKING: | |
27 from logging import _ExcInfoType | |
28 else: | |
29 _ExcInfoType = Any | |
30 | |
31 from libervia.backend.core.constants import Const as C | |
32 from libervia.backend.tools.common.ansi import ANSI as A | |
33 from libervia.backend.core import exceptions | |
34 import traceback | |
35 | |
36 backend = None | |
37 _loggers: Dict[str, "Logger"] = {} | |
38 handlers = {} | |
39 COLOR_START = '%(color_start)s' | |
40 COLOR_END = '%(color_end)s' | |
41 | |
42 | |
43 class Filtered(Exception): | |
44 pass | |
45 | |
46 | |
47 class Logger: | |
48 """High level logging class""" | |
49 fmt = None # format option as given by user (e.g. SAT_LOG_LOGGER) | |
50 filter_name = None # filter to call | |
51 post_treat = None | |
52 | |
53 def __init__(self, name): | |
54 if isinstance(name, Logger): | |
55 self.copy(name) | |
56 else: | |
57 self._name = name | |
58 | |
59 def copy(self, other): | |
60 """Copy values from other Logger""" | |
61 self.fmt = other.fmt | |
62 self.Filter_name = other.fmt | |
63 self.post_treat = other.post_treat | |
64 self._name = other._name | |
65 | |
66 def add_traceback(self, message): | |
67 tb = traceback.format_exc() | |
68 return message + "\n==== traceback ====\n" + tb | |
69 | |
70 def out( | |
71 self, | |
72 message: object, | |
73 level: Optional[str] = None, | |
74 exc_info: _ExcInfoType = False, | |
75 **kwargs | |
76 ) -> None: | |
77 """Actually log the message | |
78 | |
79 @param message: formatted message | |
80 """ | |
81 if exc_info: | |
82 message = self.add_traceback(message) | |
83 print(message) | |
84 | |
85 def log( | |
86 self, | |
87 level: str, | |
88 message: object, | |
89 exc_info: _ExcInfoType = False, | |
90 **kwargs | |
91 ) -> None: | |
92 """Print message | |
93 | |
94 @param level: one of C.LOG_LEVELS | |
95 @param message: message to format and print | |
96 """ | |
97 if exc_info: | |
98 message = self.add_traceback(message) | |
99 try: | |
100 formatted = self.format(level, message) | |
101 if self.post_treat is None: | |
102 self.out(formatted, level, **kwargs) | |
103 else: | |
104 self.out(self.post_treat(level, formatted), level, **kwargs) | |
105 except Filtered: | |
106 pass | |
107 | |
108 def format(self, level: str, message: object) -> object: | |
109 """Format message according to Logger.fmt | |
110 | |
111 @param level: one of C.LOG_LEVELS | |
112 @param message: message to format | |
113 @return: formatted message | |
114 | |
115 @raise: Filtered when the message must not be logged | |
116 """ | |
117 if self.fmt is None and self.filter_name is None: | |
118 return message | |
119 record = {'name': self._name, | |
120 'message': message, | |
121 'levelname': level, | |
122 } | |
123 try: | |
124 if not self.filter_name.dict_filter(record): | |
125 raise Filtered | |
126 except (AttributeError, TypeError): # XXX: TypeError is here because of a pyjamas bug which need to be fixed (TypeError is raised instead of AttributeError) | |
127 if self.filter_name is not None: | |
128 raise ValueError("Bad filter: filters must have a .filter method") | |
129 try: | |
130 return self.fmt % record | |
131 except TypeError: | |
132 return message | |
133 except KeyError as e: | |
134 if e.args[0] == 'profile': | |
135 # XXX: %(profile)s use some magic with introspection, for debugging purpose only *DO NOT* use in production | |
136 record['profile'] = configure_cls[backend].get_profile() | |
137 return self.fmt % record | |
138 else: | |
139 raise e | |
140 | |
141 def debug(self, msg: object, **kwargs) -> None: | |
142 self.log(C.LOG_LVL_DEBUG, msg, **kwargs) | |
143 | |
144 def info(self, msg: object, **kwargs) -> None: | |
145 self.log(C.LOG_LVL_INFO, msg, **kwargs) | |
146 | |
147 def warning(self, msg: object, **kwargs) -> None: | |
148 self.log(C.LOG_LVL_WARNING, msg, **kwargs) | |
149 | |
150 def error(self, msg: object, **kwargs) -> None: | |
151 self.log(C.LOG_LVL_ERROR, msg, **kwargs) | |
152 | |
153 def critical(self, msg: object, **kwargs) -> None: | |
154 self.log(C.LOG_LVL_CRITICAL, msg, **kwargs) | |
155 | |
156 def exception(self, msg: object, exc_info=True, **kwargs) -> None: | |
157 self.log(C.LOG_LVL_ERROR, msg, exc_info=exc_info, **kwargs) | |
158 | |
159 | |
160 class FilterName(object): | |
161 """Filter on logger name according to a regex""" | |
162 | |
163 def __init__(self, name_re): | |
164 """Initialise name filter | |
165 | |
166 @param name_re: regular expression used to filter names (using search and not match) | |
167 """ | |
168 assert name_re | |
169 import re | |
170 self.name_re = re.compile(name_re) | |
171 | |
172 def filter(self, record): | |
173 if self.name_re.search(record.name) is not None: | |
174 return 1 | |
175 return 0 | |
176 | |
177 def dict_filter(self, dict_record): | |
178 """Filter using a dictionary record | |
179 | |
180 @param dict_record: dictionary with at list a key "name" with logger name | |
181 @return: True if message should be logged | |
182 """ | |
183 class LogRecord(object): | |
184 pass | |
185 log_record = LogRecord() | |
186 log_record.name = dict_record['name'] | |
187 return self.filter(log_record) == 1 | |
188 | |
189 | |
190 class ConfigureBase: | |
191 LOGGER_CLASS = Logger | |
192 # True if color location is specified in fmt (with COLOR_START) | |
193 _color_location = False | |
194 | |
195 def __init__(self, level=None, fmt=None, output=None, logger=None, colors=False, | |
196 levels_taints_dict=None, force_colors=False, backend_data=None): | |
197 """Configure a backend | |
198 | |
199 @param level: one of C.LOG_LEVELS | |
200 @param fmt: format string, pretty much as in std logging. | |
201 Accept the following keywords (maybe more depending on backend): | |
202 - "message" | |
203 - "levelname" | |
204 - "name" (logger name) | |
205 @param logger: if set, use it as a regular expression to filter on logger name. | |
206 Use search to match expression, so ^ or $ can be necessary. | |
207 @param colors: if True use ANSI colors to show log levels | |
208 @param force_colors: if True ANSI colors are used even if stdout is not a tty | |
209 """ | |
210 self.backend_data = backend_data | |
211 self.pre_treatment() | |
212 self.configure_level(level) | |
213 self.configure_format(fmt) | |
214 self.configure_output(output) | |
215 self.configure_logger(logger) | |
216 self.configure_colors(colors, force_colors, levels_taints_dict) | |
217 self.post_treatment() | |
218 self.update_current_logger() | |
219 | |
220 def update_current_logger(self): | |
221 """update existing logger to the class needed for this backend""" | |
222 if self.LOGGER_CLASS is None: | |
223 return | |
224 for name, logger in list(_loggers.items()): | |
225 _loggers[name] = self.LOGGER_CLASS(logger) | |
226 | |
227 def pre_treatment(self): | |
228 pass | |
229 | |
230 def configure_level(self, level): | |
231 if level is not None: | |
232 # we deactivate methods below level | |
233 level_idx = C.LOG_LEVELS.index(level) | |
234 def dev_null(self, msg): | |
235 pass | |
236 for _level in C.LOG_LEVELS[:level_idx]: | |
237 setattr(Logger, _level.lower(), dev_null) | |
238 | |
239 def configure_format(self, fmt): | |
240 if fmt is not None: | |
241 if fmt != '%(message)s': # %(message)s is the same as None | |
242 Logger.fmt = fmt | |
243 if COLOR_START in fmt: | |
244 ConfigureBase._color_location = True | |
245 if fmt.find(COLOR_END,fmt.rfind(COLOR_START))<0: | |
246 # color_start not followed by an end, we add it | |
247 Logger.fmt += COLOR_END | |
248 | |
249 def configure_output(self, output): | |
250 if output is not None: | |
251 if output != C.LOG_OPT_OUTPUT_SEP + C.LOG_OPT_OUTPUT_DEFAULT: | |
252 # TODO: manage other outputs | |
253 raise NotImplementedError("Basic backend only manage default output yet") | |
254 | |
255 def configure_logger(self, logger): | |
256 if logger: | |
257 Logger.filter_name = FilterName(logger) | |
258 | |
259 def configure_colors(self, colors, force_colors, levels_taints_dict): | |
260 if colors: | |
261 # if color are used, we need to handle levels_taints_dict | |
262 for level in list(levels_taints_dict.keys()): | |
263 # we wants levels in uppercase to correspond to contstants | |
264 levels_taints_dict[level.upper()] = levels_taints_dict[level] | |
265 taints = self.__class__.taints = {} | |
266 for level in C.LOG_LEVELS: | |
267 # we want use values and use constant value as default | |
268 taint_list = levels_taints_dict.get(level, C.LOG_OPT_TAINTS_DICT[1][level]) | |
269 ansi_list = [] | |
270 for elt in taint_list: | |
271 elt = elt.upper() | |
272 try: | |
273 ansi = getattr(A, 'FG_{}'.format(elt)) | |
274 except AttributeError: | |
275 try: | |
276 ansi = getattr(A, elt) | |
277 except AttributeError: | |
278 # we use raw string if element is unknown | |
279 ansi = elt | |
280 ansi_list.append(ansi) | |
281 taints[level] = ''.join(ansi_list) | |
282 | |
283 def post_treatment(self): | |
284 pass | |
285 | |
286 def manage_outputs(self, outputs_raw): | |
287 """ Parse output option in a backend agnostic way, and fill handlers consequently | |
288 | |
289 @param outputs_raw: output option as enterred in environment variable or in configuration | |
290 """ | |
291 if not outputs_raw: | |
292 return | |
293 outputs = outputs_raw.split(C.LOG_OPT_OUTPUT_SEP) | |
294 global handlers | |
295 if len(outputs) == 1: | |
296 handlers[C.LOG_OPT_OUTPUT_FILE] = [outputs.pop()] | |
297 | |
298 for output in outputs: | |
299 if not output: | |
300 continue | |
301 if output[-1] == ')': | |
302 # we have options | |
303 opt_begin = output.rfind('(') | |
304 options = output[opt_begin+1:-1] | |
305 output = output[:opt_begin] | |
306 else: | |
307 options = None | |
308 | |
309 if output not in (C.LOG_OPT_OUTPUT_DEFAULT, C.LOG_OPT_OUTPUT_FILE, C.LOG_OPT_OUTPUT_MEMORY): | |
310 raise ValueError("Invalid output [%s]" % output) | |
311 | |
312 if output == C.LOG_OPT_OUTPUT_DEFAULT: | |
313 # no option for defaut handler | |
314 handlers[output] = None | |
315 elif output == C.LOG_OPT_OUTPUT_FILE: | |
316 if not options: | |
317 ValueError("{handler} output need a path as option" .format(handle=output)) | |
318 handlers.setdefault(output, []).append(options) | |
319 options = None # option are parsed, we can empty them | |
320 elif output == C.LOG_OPT_OUTPUT_MEMORY: | |
321 # we have memory handler, option can be the len limit or None | |
322 try: | |
323 limit = int(options) | |
324 options = None # option are parsed, we can empty them | |
325 except (TypeError, ValueError): | |
326 limit = C.LOG_OPT_OUTPUT_MEMORY_LIMIT | |
327 handlers[output] = limit | |
328 | |
329 if options: # we should not have unparsed options | |
330 raise ValueError("options [{options}] are not supported for {handler} output".format(options=options, handler=output)) | |
331 | |
332 @staticmethod | |
333 def memory_get(size=None): | |
334 """Return buffered logs | |
335 | |
336 @param size: number of logs to return | |
337 """ | |
338 raise NotImplementedError | |
339 | |
340 @classmethod | |
341 def ansi_colors(cls, level, message): | |
342 """Colorise message depending on level for terminals | |
343 | |
344 @param level: one of C.LOG_LEVELS | |
345 @param message: formatted message to log | |
346 @return: message with ANSI escape codes for coloration | |
347 """ | |
348 | |
349 try: | |
350 start = cls.taints[level] | |
351 except KeyError: | |
352 start = '' | |
353 | |
354 if cls._color_location: | |
355 return message % {'color_start': start, | |
356 'color_end': A.RESET} | |
357 else: | |
358 return '%s%s%s' % (start, message, A.RESET) | |
359 | |
360 @staticmethod | |
361 def get_profile(): | |
362 """Try to find profile value using introspection""" | |
363 raise NotImplementedError | |
364 | |
365 | |
366 class ConfigureCustom(ConfigureBase): | |
367 LOGGER_CLASS = None | |
368 | |
369 def __init__(self, logger_class, *args, **kwargs): | |
370 ConfigureCustom.LOGGER_CLASS = logger_class | |
371 | |
372 | |
373 configure_cls = { None: ConfigureBase, | |
374 C.LOG_BACKEND_CUSTOM: ConfigureCustom | |
375 } # XXX: (key: backend, value: Configure subclass) must be filled when new backend are added | |
376 | |
377 | |
378 def configure(backend_, **options): | |
379 """Configure logging behaviour | |
380 @param backend: can be: | |
381 C.LOG_BACKEND_BASIC: use a basic print based logging | |
382 C.LOG_BACKEND_CUSTOM: use a given Logger subclass | |
383 """ | |
384 global backend | |
385 if backend is not None: | |
386 raise exceptions.InternalError("Logging can only be configured once") | |
387 backend = backend_ | |
388 | |
389 try: | |
390 configure_class = configure_cls[backend] | |
391 except KeyError: | |
392 raise ValueError("unknown backend [{}]".format(backend)) | |
393 if backend == C.LOG_BACKEND_CUSTOM: | |
394 logger_class = options.pop('logger_class') | |
395 configure_class(logger_class, **options) | |
396 else: | |
397 configure_class(**options) | |
398 | |
399 def memory_get(size=None): | |
400 if not C.LOG_OPT_OUTPUT_MEMORY in handlers: | |
401 raise ValueError('memory output is not used') | |
402 return configure_cls[backend].memory_get(size) | |
403 | |
404 def getLogger(name=C.LOG_BASE_LOGGER) -> Logger: | |
405 try: | |
406 logger_class = configure_cls[backend].LOGGER_CLASS | |
407 except KeyError: | |
408 raise ValueError("This method should not be called with backend [{}]".format(backend)) | |
409 return _loggers.setdefault(name, logger_class(name)) | |
410 | |
411 _root_logger = getLogger() | |
412 | |
413 def debug(msg, **kwargs): | |
414 _root_logger.debug(msg, **kwargs) | |
415 | |
416 def info(msg, **kwargs): | |
417 _root_logger.info(msg, **kwargs) | |
418 | |
419 def warning(msg, **kwargs): | |
420 _root_logger.warning(msg, **kwargs) | |
421 | |
422 def error(msg, **kwargs): | |
423 _root_logger.error(msg, **kwargs) | |
424 | |
425 def critical(msg, **kwargs): | |
426 _root_logger.critical(msg, **kwargs) |