Mercurial > libervia-backend
view src/core/log.py @ 1006:325fd230c15d
core (log): added advanced feature to basic backend (colors/formatting/level and logger filtering)
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 05 May 2014 18:58:34 +0200 |
parents | b4af31a8a4f2 |
children | a7d33c7a8277 |
line wrap: on
line source
#!/usr/bin/python # -*- coding: utf-8 -*- # SàT: a XMPP client # Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. """High level logging functions""" # XXX: this module use standard logging module when possible, but as SàT can work in different cases where logging is not the best choice (twisted, pyjamas, etc), it is necessary to have a dedicated module. Additional feature like environment variables and colors are also managed. from sat.core.constants import Const as C from sat.core import exceptions _backend = None _loggers = {} _handlers = {} class Filtered(Exception): pass class Logger(object): """High level logging class""" fmt = None filter_name = None post_treat = None def __init__(self, name): self._name = name def log(self, level, message): """Print message @param level: one of C.LOG_LEVELS @param message: message to format and print """ try: formatted = self.format(level, message) if self.post_treat is None: print formatted else: print self.post_treat(level, formatted) except Filtered: pass def format(self, level, message): """Format message according to Logger.fmt @param level: one of C.LOG_LEVELS @param message: message to format @return: formatted message @raise: Filtered the message must not be logged """ if self.fmt is None and self.filter_name is None: return message record = {'name': self._name, 'message': message, 'levelname': level, } try: if not self.filter_name.dictFilter(record): raise Filtered except AttributeError: if self.filter_name is not None: raise ValueError("Bad filter: filters must have a .filter method") return self.fmt % record def debug(self, msg): self.log(C.LOG_LVL_DEBUG, msg) def info(self, msg): self.log(C.LOG_LVL_INFO, msg) def warning(self, msg): self.log(C.LOG_LVL_WARNING, msg) def error(self, msg): self.log(C.LOG_LVL_ERROR, msg) def critical(self, msg): self.log(C.LOG_LVL_CRITICAL, msg) class FilterName(object): """Filter on logger name according to a regex""" def __init__(self, name_re): """Initialise name filter @param name_re: regular expression used to filter names (using search and not match) """ assert name_re import re self.name_re = re.compile(name_re) def filter(self, record): if self.name_re.search(record.name) is not None: return 1 return 0 def dictFilter(self, dict_record): """Filter using a dictionary record @param dict_record: dictionary with at list a key "name" with logger name @return: True if message should be logged """ class LogRecord(object): pass log_record = LogRecord() log_record.name = dict_record['name'] return self.filter(log_record) == 1 def _ansiColors(level, message): """Colorise message depending on level for terminals @param level: one of C.LOG_LEVELS @param message: formatted message to log @return: message with ANSI escape codes for coloration """ if level == C.LOG_LVL_DEBUG: out = (C.ANSI_FG_CYAN, message, C.ANSI_RESET) elif level == C.LOG_LVL_WARNING: out = (C.ANSI_FG_YELLOW, message, C.ANSI_RESET) elif level == C.LOG_LVL_ERROR: out = (C.ANSI_FG_RED, C.ANSI_BLINK, r'/!\ ', C.ANSI_BLINK_OFF, message, C.ANSI_RESET) elif level == C.LOG_LVL_CRITICAL: out = (C.ANSI_BOLD, C.ANSI_FG_RED, 'Guru Meditation ', C.ANSI_NORMAL_WEIGHT, message, C.ANSI_RESET) else: out = message return ''.join(out) def _manageOutputs(outputs_raw): """ Parse output option in a backend agnostic way, and fill _handlers consequently @param outputs_raw: output option as enterred in environment variable or in configuration """ if not outputs_raw: return outputs = outputs_raw.split(C.LOG_OPT_OUTPUT_SEP) global _handlers if len(outputs) == 1: _handlers[C.LOG_OPT_OUTPUT_FILE] = outputs.pop() for output in outputs: if not output: continue if output[-1] == ')': # we have options opt_begin = output.rfind('(') options = output[opt_begin+1:-1] output = output[:opt_begin] else: options = None if output not in (C.LOG_OPT_OUTPUT_DEFAULT, C.LOG_OPT_OUTPUT_FILE, C.LOG_OPT_OUTPUT_MEMORY): raise ValueError(u"Invalid output [%s]" % output) if output == C.LOG_OPT_OUTPUT_DEFAULT: # no option for defaut handler _handlers[output] = None elif output == C.LOG_OPT_OUTPUT_FILE: if not options: ValueError("%(handler)s output need a path as option" % {'handler': output}) _handlers[output] = options options = None # option are parsed, we can empty them elif output == C.LOG_OPT_OUTPUT_MEMORY: # we have memory handler, option can be the len limit or None try: limit = int(options) options = None # option are parsed, we can empty them except (TypeError, ValueError): limit = C.LOG_OPT_OUTPUT_MEMORY_LIMIT _handlers[output] = limit if options: # we should not have unparsed options raise ValueError(u"options [%(options)s] are not supported for %(handler)s output" % {'options': options, 'handler': output}) def _configureStdLogging(logging, level=None, fmt=C.LOG_OPT_FORMAT[1], output=C.LOG_OPT_OUTPUT[1], logger=None, colors=False, force_colors=False): """Configure standard logging module @param logging: standard logging module @param level: one of C.LOG_LEVELS @param fmt: format string, pretty much as in std logging. Accept the following keywords (maybe more depending on backend): - "message" - "levelname" - "name" (logger name) @param logger: if set, use it as a regular expression to filter on logger name. Use search to match expression, so ^ or $ can be necessary. @param colors: if True use ANSI colors to show log levels @param force_colors: if True ANSI colors are used even if stdout is not a tty """ format_ = fmt if level is None: level = C.LOG_LVL_DEBUG import sys with_color = colors & (sys.stdout.isatty() or force_colors) if not colors and force_colors: raise ValueError("force_colors can't be used if colors is False") class SatFormatter(logging.Formatter): u"""Formatter which manage SàT specificities""" def __init__(self, fmt=None, datefmt=None): super(SatFormatter, self).__init__(fmt, datefmt) def format(self, record): s = super(SatFormatter, self).format(record) if with_color: s = _ansiColors(record.levelname, s) return s root_logger = logging.getLogger() if len(root_logger.handlers) == 0: _manageOutputs(output) formatter = SatFormatter(format_) name_filter = FilterName(logger) if logger else None for handler, options in _handlers.items(): if handler == C.LOG_OPT_OUTPUT_DEFAULT: hdlr = logging.StreamHandler() elif handler == C.LOG_OPT_OUTPUT_MEMORY: import logging.handlers hdlr = logging.handlers.BufferingHandler(options) _handlers[handler] = hdlr # we keep a reference to the handler to read the buffer later elif handler == C.LOG_OPT_OUTPUT_FILE: import os.path hdlr = logging.FileHandler(os.path.expanduser(options)) else: raise ValueError("Unknown handler type") hdlr.setFormatter(formatter) root_logger.addHandler(hdlr) root_logger.setLevel(level) if name_filter is not None: hdlr.addFilter(name_filter) else: root_logger.warning(u"Handlers already set on root logger") def _configureBasic(level=None, fmt=None, output=None, logger=None, colors=False, force_colors=False): """Configure basic backend @param level: same as _configureStdLogging.level @param fmt: same as _configureStdLogging.fmt @param output: not implemented yet TODO @param logger: same as _configureStdLogging.logger @param colors: same as _configureStdLogging.colors @param force_colors: same as _configureStdLogging.force_colors """ if level is not None: # we deactivate methods below level level_idx = C.LOG_LEVELS.index(level) def dev_null(self, msg): pass for _level in C.LOG_LEVELS[:level_idx]: setattr(Logger, _level.lower(), dev_null) if fmt is not None: if fmt != '%(message)s': # %(message)s is the same as None Logger.fmt = fmt if output is not None: if output != C.LOG_OPT_OUTPUT_SEP + C.LOG_OPT_OUTPUT_DEFAULT: # TODO: manage other outputs raise NotImplementedError("Basic backend only manage default output yet") if logger: Logger.filter_name = FilterName(logger) if colors: import sys if force_colors or sys.stdout.isatty(): # we need colors Logger.post_treat = lambda self, level, message: _ansiColors(level, message) elif force_colors: raise ValueError("force_colors can't be used if colors is False") def configure(backend=C.LOG_BACKEND_STANDARD, **options): """Configure logging behaviour @param backend: can be: C.LOG_BACKEND_STANDARD: use standard logging module C.LOG_BACKEND_TWISTED: use twisted logging module (with standard logging observer) C.LOG_BACKEND_BASIC: use a basic print based logging """ global _backend if _backend is not None: raise exceptions.InternalError("Logging can only be configured once") _backend = backend if backend in (C.LOG_BACKEND_TWISTED, C.LOG_BACKEND_STANDARD): if backend == C.LOG_BACKEND_TWISTED: from twisted.python import log observer = log.PythonLoggingObserver() observer.start() global getLogger global debug global info global warning global error global critical import logging _configureStdLogging(logging, **options) getLogger = logging.getLogger debug = logging.debug info = logging.info warning = logging.warning error = logging.error critical = logging.critical elif backend == C.LOG_BACKEND_BASIC: _configureBasic(**options) else: raise ValueError("unknown backend") def _parseOptions(options): """Parse string options as given in conf or environment variable, and return expected python value @param options (dict): options with (key: name, value: string value) """ COLORS = C.LOG_OPT_COLORS[0] LEVEL = C.LOG_OPT_LEVEL[0] if COLORS in options: if options[COLORS].lower() in ('1', 'true'): options[COLORS] = True elif options[COLORS] == 'force': options[COLORS] = True options['force_colors'] = True else: options[COLORS] = False if LEVEL in options: level = options[LEVEL].upper() if level not in C.LOG_LEVELS: level = C.LOG_LVL_INFO options[LEVEL] = level def satConfigure(backend=C.LOG_BACKEND_TWISTED): """Configure logging system for SàT, can be used by frontends logs conf is read in SàT conf, then in environment variables. It must be done before Memory init """ import ConfigParser import os log_conf = {} config = ConfigParser.SafeConfigParser() config.read(C.CONFIG_FILES) for opt_name, opt_default in C.LOG_OPTIONS: try: log_conf[opt_name] = os.environ[''.join((C.ENV_PREFIX, C.LOG_OPT_PREFIX.upper(), opt_name.upper()))] except KeyError: try: log_conf[opt_name] = config.get('DEFAULT', C.LOG_OPT_PREFIX + opt_name) except ConfigParser.NoOptionError: log_conf[opt_name] = opt_default _parseOptions(log_conf) configure(backend, **log_conf) def getLogger(name=C.LOG_BASE_LOGGER): return _loggers.setdefault(name, Logger(name)) _root_logger = getLogger() def debug(msg): _root_logger.debug(msg) def info(msg): _root_logger.info(msg) def warning(msg): _root_logger.warning(msg) def error(msg): _root_logger.error(msg) def critical(msg): _root_logger.critical(msg)