comparison libervia/backend/core/log.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/core/log.py@524856bd7b19
children 0d7bb4df2343
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3
4 # Libervia: an XMPP client
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 """High level logging functions"""
21 # XXX: this module use standard logging module when possible, but as SàT can work in different cases where logging is not the best choice (twisted, pyjamas, etc), it is necessary to have a dedicated module. Additional feature like environment variables and colors are also managed.
22 # TODO: change formatting from "%s" style to "{}" when moved to Python 3
23
24 from typing import TYPE_CHECKING, Any, Optional, Dict
25
26 if TYPE_CHECKING:
27 from logging import _ExcInfoType
28 else:
29 _ExcInfoType = Any
30
31 from libervia.backend.core.constants import Const as C
32 from libervia.backend.tools.common.ansi import ANSI as A
33 from libervia.backend.core import exceptions
34 import traceback
35
36 backend = None
37 _loggers: Dict[str, "Logger"] = {}
38 handlers = {}
39 COLOR_START = '%(color_start)s'
40 COLOR_END = '%(color_end)s'
41
42
43 class Filtered(Exception):
44 pass
45
46
47 class Logger:
48 """High level logging class"""
49 fmt = None # format option as given by user (e.g. SAT_LOG_LOGGER)
50 filter_name = None # filter to call
51 post_treat = None
52
53 def __init__(self, name):
54 if isinstance(name, Logger):
55 self.copy(name)
56 else:
57 self._name = name
58
59 def copy(self, other):
60 """Copy values from other Logger"""
61 self.fmt = other.fmt
62 self.Filter_name = other.fmt
63 self.post_treat = other.post_treat
64 self._name = other._name
65
66 def add_traceback(self, message):
67 tb = traceback.format_exc()
68 return message + "\n==== traceback ====\n" + tb
69
70 def out(
71 self,
72 message: object,
73 level: Optional[str] = None,
74 exc_info: _ExcInfoType = False,
75 **kwargs
76 ) -> None:
77 """Actually log the message
78
79 @param message: formatted message
80 """
81 if exc_info:
82 message = self.add_traceback(message)
83 print(message)
84
85 def log(
86 self,
87 level: str,
88 message: object,
89 exc_info: _ExcInfoType = False,
90 **kwargs
91 ) -> None:
92 """Print message
93
94 @param level: one of C.LOG_LEVELS
95 @param message: message to format and print
96 """
97 if exc_info:
98 message = self.add_traceback(message)
99 try:
100 formatted = self.format(level, message)
101 if self.post_treat is None:
102 self.out(formatted, level, **kwargs)
103 else:
104 self.out(self.post_treat(level, formatted), level, **kwargs)
105 except Filtered:
106 pass
107
108 def format(self, level: str, message: object) -> object:
109 """Format message according to Logger.fmt
110
111 @param level: one of C.LOG_LEVELS
112 @param message: message to format
113 @return: formatted message
114
115 @raise: Filtered when the message must not be logged
116 """
117 if self.fmt is None and self.filter_name is None:
118 return message
119 record = {'name': self._name,
120 'message': message,
121 'levelname': level,
122 }
123 try:
124 if not self.filter_name.dict_filter(record):
125 raise Filtered
126 except (AttributeError, TypeError): # XXX: TypeError is here because of a pyjamas bug which need to be fixed (TypeError is raised instead of AttributeError)
127 if self.filter_name is not None:
128 raise ValueError("Bad filter: filters must have a .filter method")
129 try:
130 return self.fmt % record
131 except TypeError:
132 return message
133 except KeyError as e:
134 if e.args[0] == 'profile':
135 # XXX: %(profile)s use some magic with introspection, for debugging purpose only *DO NOT* use in production
136 record['profile'] = configure_cls[backend].get_profile()
137 return self.fmt % record
138 else:
139 raise e
140
141 def debug(self, msg: object, **kwargs) -> None:
142 self.log(C.LOG_LVL_DEBUG, msg, **kwargs)
143
144 def info(self, msg: object, **kwargs) -> None:
145 self.log(C.LOG_LVL_INFO, msg, **kwargs)
146
147 def warning(self, msg: object, **kwargs) -> None:
148 self.log(C.LOG_LVL_WARNING, msg, **kwargs)
149
150 def error(self, msg: object, **kwargs) -> None:
151 self.log(C.LOG_LVL_ERROR, msg, **kwargs)
152
153 def critical(self, msg: object, **kwargs) -> None:
154 self.log(C.LOG_LVL_CRITICAL, msg, **kwargs)
155
156 def exception(self, msg: object, exc_info=True, **kwargs) -> None:
157 self.log(C.LOG_LVL_ERROR, msg, exc_info=exc_info, **kwargs)
158
159
160 class FilterName(object):
161 """Filter on logger name according to a regex"""
162
163 def __init__(self, name_re):
164 """Initialise name filter
165
166 @param name_re: regular expression used to filter names (using search and not match)
167 """
168 assert name_re
169 import re
170 self.name_re = re.compile(name_re)
171
172 def filter(self, record):
173 if self.name_re.search(record.name) is not None:
174 return 1
175 return 0
176
177 def dict_filter(self, dict_record):
178 """Filter using a dictionary record
179
180 @param dict_record: dictionary with at list a key "name" with logger name
181 @return: True if message should be logged
182 """
183 class LogRecord(object):
184 pass
185 log_record = LogRecord()
186 log_record.name = dict_record['name']
187 return self.filter(log_record) == 1
188
189
190 class ConfigureBase:
191 LOGGER_CLASS = Logger
192 # True if color location is specified in fmt (with COLOR_START)
193 _color_location = False
194
195 def __init__(self, level=None, fmt=None, output=None, logger=None, colors=False,
196 levels_taints_dict=None, force_colors=False, backend_data=None):
197 """Configure a backend
198
199 @param level: one of C.LOG_LEVELS
200 @param fmt: format string, pretty much as in std logging.
201 Accept the following keywords (maybe more depending on backend):
202 - "message"
203 - "levelname"
204 - "name" (logger name)
205 @param logger: if set, use it as a regular expression to filter on logger name.
206 Use search to match expression, so ^ or $ can be necessary.
207 @param colors: if True use ANSI colors to show log levels
208 @param force_colors: if True ANSI colors are used even if stdout is not a tty
209 """
210 self.backend_data = backend_data
211 self.pre_treatment()
212 self.configure_level(level)
213 self.configure_format(fmt)
214 self.configure_output(output)
215 self.configure_logger(logger)
216 self.configure_colors(colors, force_colors, levels_taints_dict)
217 self.post_treatment()
218 self.update_current_logger()
219
220 def update_current_logger(self):
221 """update existing logger to the class needed for this backend"""
222 if self.LOGGER_CLASS is None:
223 return
224 for name, logger in list(_loggers.items()):
225 _loggers[name] = self.LOGGER_CLASS(logger)
226
227 def pre_treatment(self):
228 pass
229
230 def configure_level(self, level):
231 if level is not None:
232 # we deactivate methods below level
233 level_idx = C.LOG_LEVELS.index(level)
234 def dev_null(self, msg):
235 pass
236 for _level in C.LOG_LEVELS[:level_idx]:
237 setattr(Logger, _level.lower(), dev_null)
238
239 def configure_format(self, fmt):
240 if fmt is not None:
241 if fmt != '%(message)s': # %(message)s is the same as None
242 Logger.fmt = fmt
243 if COLOR_START in fmt:
244 ConfigureBase._color_location = True
245 if fmt.find(COLOR_END,fmt.rfind(COLOR_START))<0:
246 # color_start not followed by an end, we add it
247 Logger.fmt += COLOR_END
248
249 def configure_output(self, output):
250 if output is not None:
251 if output != C.LOG_OPT_OUTPUT_SEP + C.LOG_OPT_OUTPUT_DEFAULT:
252 # TODO: manage other outputs
253 raise NotImplementedError("Basic backend only manage default output yet")
254
255 def configure_logger(self, logger):
256 if logger:
257 Logger.filter_name = FilterName(logger)
258
259 def configure_colors(self, colors, force_colors, levels_taints_dict):
260 if colors:
261 # if color are used, we need to handle levels_taints_dict
262 for level in list(levels_taints_dict.keys()):
263 # we wants levels in uppercase to correspond to contstants
264 levels_taints_dict[level.upper()] = levels_taints_dict[level]
265 taints = self.__class__.taints = {}
266 for level in C.LOG_LEVELS:
267 # we want use values and use constant value as default
268 taint_list = levels_taints_dict.get(level, C.LOG_OPT_TAINTS_DICT[1][level])
269 ansi_list = []
270 for elt in taint_list:
271 elt = elt.upper()
272 try:
273 ansi = getattr(A, 'FG_{}'.format(elt))
274 except AttributeError:
275 try:
276 ansi = getattr(A, elt)
277 except AttributeError:
278 # we use raw string if element is unknown
279 ansi = elt
280 ansi_list.append(ansi)
281 taints[level] = ''.join(ansi_list)
282
283 def post_treatment(self):
284 pass
285
286 def manage_outputs(self, outputs_raw):
287 """ Parse output option in a backend agnostic way, and fill handlers consequently
288
289 @param outputs_raw: output option as enterred in environment variable or in configuration
290 """
291 if not outputs_raw:
292 return
293 outputs = outputs_raw.split(C.LOG_OPT_OUTPUT_SEP)
294 global handlers
295 if len(outputs) == 1:
296 handlers[C.LOG_OPT_OUTPUT_FILE] = [outputs.pop()]
297
298 for output in outputs:
299 if not output:
300 continue
301 if output[-1] == ')':
302 # we have options
303 opt_begin = output.rfind('(')
304 options = output[opt_begin+1:-1]
305 output = output[:opt_begin]
306 else:
307 options = None
308
309 if output not in (C.LOG_OPT_OUTPUT_DEFAULT, C.LOG_OPT_OUTPUT_FILE, C.LOG_OPT_OUTPUT_MEMORY):
310 raise ValueError("Invalid output [%s]" % output)
311
312 if output == C.LOG_OPT_OUTPUT_DEFAULT:
313 # no option for defaut handler
314 handlers[output] = None
315 elif output == C.LOG_OPT_OUTPUT_FILE:
316 if not options:
317 ValueError("{handler} output need a path as option" .format(handle=output))
318 handlers.setdefault(output, []).append(options)
319 options = None # option are parsed, we can empty them
320 elif output == C.LOG_OPT_OUTPUT_MEMORY:
321 # we have memory handler, option can be the len limit or None
322 try:
323 limit = int(options)
324 options = None # option are parsed, we can empty them
325 except (TypeError, ValueError):
326 limit = C.LOG_OPT_OUTPUT_MEMORY_LIMIT
327 handlers[output] = limit
328
329 if options: # we should not have unparsed options
330 raise ValueError("options [{options}] are not supported for {handler} output".format(options=options, handler=output))
331
332 @staticmethod
333 def memory_get(size=None):
334 """Return buffered logs
335
336 @param size: number of logs to return
337 """
338 raise NotImplementedError
339
340 @classmethod
341 def ansi_colors(cls, level, message):
342 """Colorise message depending on level for terminals
343
344 @param level: one of C.LOG_LEVELS
345 @param message: formatted message to log
346 @return: message with ANSI escape codes for coloration
347 """
348
349 try:
350 start = cls.taints[level]
351 except KeyError:
352 start = ''
353
354 if cls._color_location:
355 return message % {'color_start': start,
356 'color_end': A.RESET}
357 else:
358 return '%s%s%s' % (start, message, A.RESET)
359
360 @staticmethod
361 def get_profile():
362 """Try to find profile value using introspection"""
363 raise NotImplementedError
364
365
366 class ConfigureCustom(ConfigureBase):
367 LOGGER_CLASS = None
368
369 def __init__(self, logger_class, *args, **kwargs):
370 ConfigureCustom.LOGGER_CLASS = logger_class
371
372
373 configure_cls = { None: ConfigureBase,
374 C.LOG_BACKEND_CUSTOM: ConfigureCustom
375 } # XXX: (key: backend, value: Configure subclass) must be filled when new backend are added
376
377
378 def configure(backend_, **options):
379 """Configure logging behaviour
380 @param backend: can be:
381 C.LOG_BACKEND_BASIC: use a basic print based logging
382 C.LOG_BACKEND_CUSTOM: use a given Logger subclass
383 """
384 global backend
385 if backend is not None:
386 raise exceptions.InternalError("Logging can only be configured once")
387 backend = backend_
388
389 try:
390 configure_class = configure_cls[backend]
391 except KeyError:
392 raise ValueError("unknown backend [{}]".format(backend))
393 if backend == C.LOG_BACKEND_CUSTOM:
394 logger_class = options.pop('logger_class')
395 configure_class(logger_class, **options)
396 else:
397 configure_class(**options)
398
399 def memory_get(size=None):
400 if not C.LOG_OPT_OUTPUT_MEMORY in handlers:
401 raise ValueError('memory output is not used')
402 return configure_cls[backend].memory_get(size)
403
404 def getLogger(name=C.LOG_BASE_LOGGER) -> Logger:
405 try:
406 logger_class = configure_cls[backend].LOGGER_CLASS
407 except KeyError:
408 raise ValueError("This method should not be called with backend [{}]".format(backend))
409 return _loggers.setdefault(name, logger_class(name))
410
411 _root_logger = getLogger()
412
413 def debug(msg, **kwargs):
414 _root_logger.debug(msg, **kwargs)
415
416 def info(msg, **kwargs):
417 _root_logger.info(msg, **kwargs)
418
419 def warning(msg, **kwargs):
420 _root_logger.warning(msg, **kwargs)
421
422 def error(msg, **kwargs):
423 _root_logger.error(msg, **kwargs)
424
425 def critical(msg, **kwargs):
426 _root_logger.critical(msg, **kwargs)