view src/core/log.py @ 1006:325fd230c15d

core (log): added advanced feature to basic backend (colors/formatting/level and logger filtering)
author Goffi <goffi@goffi.org>
date Mon, 05 May 2014 18:58:34 +0200
parents b4af31a8a4f2
children a7d33c7a8277
line wrap: on
line source

#!/usr/bin/python
# -*- coding: utf-8 -*-

# SàT: a XMPP client
# Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""High level logging functions"""
# XXX: this module use standard logging module when possible, but as SàT can work in different cases where logging is not the best choice (twisted, pyjamas, etc), it is necessary to have a dedicated module. Additional feature like environment variables and colors are also managed.

from sat.core.constants import Const as C
from sat.core import exceptions

_backend = None
_loggers = {}
_handlers = {}

class Filtered(Exception):
    pass

class Logger(object):
    """High level logging class"""
    fmt = None
    filter_name = None
    post_treat = None

    def __init__(self, name):
        self._name = name

    def log(self, level, message):
        """Print message

        @param level: one of C.LOG_LEVELS
        @param message: message to format and print
        """
        try:
            formatted = self.format(level, message)
            if self.post_treat is None:
                print formatted
            else:
                print self.post_treat(level, formatted)
        except Filtered:
            pass

    def format(self, level, message):
        """Format message according to Logger.fmt

        @param level: one of C.LOG_LEVELS
        @param message: message to format
        @return: formatted message

        @raise: Filtered the message must not be logged
        """
        if self.fmt is None and self.filter_name is None:
            return message
        record = {'name': self._name,
                  'message': message,
                  'levelname': level,
                 }
        try:
            if not self.filter_name.dictFilter(record):
                raise Filtered
        except AttributeError:
            if self.filter_name is not None:
                raise ValueError("Bad filter: filters must have a .filter method")
        return self.fmt % record

    def debug(self, msg):
        self.log(C.LOG_LVL_DEBUG, msg)

    def info(self, msg):
        self.log(C.LOG_LVL_INFO, msg)

    def warning(self, msg):
        self.log(C.LOG_LVL_WARNING, msg)

    def error(self, msg):
        self.log(C.LOG_LVL_ERROR, msg)

    def critical(self, msg):
        self.log(C.LOG_LVL_CRITICAL, msg)


class FilterName(object):
    """Filter on logger name according to a regex"""

    def __init__(self, name_re):
        """Initialise name filter

        @param name_re: regular expression used to filter names (using search and not match)
        """
        assert name_re
        import re
        self.name_re = re.compile(name_re)

    def filter(self, record):
        if self.name_re.search(record.name) is not None:
            return 1
        return 0

    def dictFilter(self, dict_record):
        """Filter using a dictionary record

        @param dict_record: dictionary with at list a key "name" with logger name
        @return: True if message should be logged
        """
        class LogRecord(object):
            pass
        log_record = LogRecord()
        log_record.name = dict_record['name']
        return self.filter(log_record) == 1


def _ansiColors(level, message):
    """Colorise message depending on level for terminals

    @param level: one of C.LOG_LEVELS
    @param message: formatted message to log
    @return: message with ANSI escape codes for coloration
    """
    if level == C.LOG_LVL_DEBUG:
        out = (C.ANSI_FG_CYAN, message, C.ANSI_RESET)
    elif level == C.LOG_LVL_WARNING:
        out = (C.ANSI_FG_YELLOW, message, C.ANSI_RESET)
    elif level == C.LOG_LVL_ERROR:
        out = (C.ANSI_FG_RED,
               C.ANSI_BLINK,
               r'/!\ ',
               C.ANSI_BLINK_OFF,
               message,
               C.ANSI_RESET)
    elif level == C.LOG_LVL_CRITICAL:
        out = (C.ANSI_BOLD,
               C.ANSI_FG_RED,
               'Guru Meditation ',
               C.ANSI_NORMAL_WEIGHT,
               message,
               C.ANSI_RESET)
    else:
        out = message
    return ''.join(out)

def _manageOutputs(outputs_raw):
    """ Parse output option in a backend agnostic way, and fill _handlers consequently

    @param outputs_raw: output option as enterred in environment variable or in configuration
    """
    if not outputs_raw:
        return
    outputs = outputs_raw.split(C.LOG_OPT_OUTPUT_SEP)
    global _handlers
    if len(outputs) == 1:
        _handlers[C.LOG_OPT_OUTPUT_FILE] = outputs.pop()

    for output in outputs:
        if not output:
            continue
        if output[-1] == ')':
            # we have options
            opt_begin = output.rfind('(')
            options = output[opt_begin+1:-1]
            output = output[:opt_begin]
        else:
            options = None

        if output not in (C.LOG_OPT_OUTPUT_DEFAULT, C.LOG_OPT_OUTPUT_FILE, C.LOG_OPT_OUTPUT_MEMORY):
            raise ValueError(u"Invalid output [%s]" % output)

        if output == C.LOG_OPT_OUTPUT_DEFAULT:
            # no option for defaut handler
            _handlers[output] = None
        elif output == C.LOG_OPT_OUTPUT_FILE:
            if not options:
                ValueError("%(handler)s output need a path as option" % {'handler': output})
            _handlers[output] = options
            options = None # option are parsed, we can empty them
        elif output == C.LOG_OPT_OUTPUT_MEMORY:
            # we have memory handler, option can be the len limit or None
            try:
                limit = int(options)
                options = None # option are parsed, we can empty them
            except (TypeError, ValueError):
                limit = C.LOG_OPT_OUTPUT_MEMORY_LIMIT
            _handlers[output] = limit

        if options: # we should not have unparsed options
            raise ValueError(u"options [%(options)s] are not supported for %(handler)s output" % {'options': options, 'handler': output})

def _configureStdLogging(logging, level=None, fmt=C.LOG_OPT_FORMAT[1], output=C.LOG_OPT_OUTPUT[1], logger=None, colors=False, force_colors=False):
    """Configure standard logging module

    @param logging: standard logging module
    @param level: one of C.LOG_LEVELS
    @param fmt: format string, pretty much as in std logging. Accept the following keywords (maybe more depending on backend):
        - "message"
        - "levelname"
        - "name" (logger name)
    @param logger: if set, use it as a regular expression to filter on logger name.
        Use search to match expression, so ^ or $ can be necessary.
    @param colors: if True use ANSI colors to show log levels
    @param force_colors: if True ANSI colors are used even if stdout is not a tty
    """
    format_ = fmt
    if level is None:
        level = C.LOG_LVL_DEBUG
    import sys
    with_color = colors & (sys.stdout.isatty() or force_colors)
    if not colors and force_colors:
        raise ValueError("force_colors can't be used if colors is False")

    class SatFormatter(logging.Formatter):
        u"""Formatter which manage SàT specificities"""

        def __init__(self, fmt=None, datefmt=None):
            super(SatFormatter, self).__init__(fmt, datefmt)

        def format(self, record):
            s = super(SatFormatter, self).format(record)
            if with_color:
                s = _ansiColors(record.levelname, s)
            return s

    root_logger = logging.getLogger()
    if len(root_logger.handlers) == 0:
        _manageOutputs(output)
        formatter = SatFormatter(format_)
        name_filter = FilterName(logger) if logger else None
        for handler, options in _handlers.items():
            if handler == C.LOG_OPT_OUTPUT_DEFAULT:
                hdlr = logging.StreamHandler()
            elif handler == C.LOG_OPT_OUTPUT_MEMORY:
                import logging.handlers
                hdlr = logging.handlers.BufferingHandler(options)
                _handlers[handler] = hdlr # we keep a reference to the handler to read the buffer later
            elif handler == C.LOG_OPT_OUTPUT_FILE:
                import os.path
                hdlr = logging.FileHandler(os.path.expanduser(options))
            else:
                raise ValueError("Unknown handler type")
            hdlr.setFormatter(formatter)
            root_logger.addHandler(hdlr)
            root_logger.setLevel(level)
            if name_filter is not None:
                hdlr.addFilter(name_filter)
    else:
        root_logger.warning(u"Handlers already set on root logger")

def _configureBasic(level=None, fmt=None, output=None, logger=None, colors=False, force_colors=False):
    """Configure basic backend
    @param level: same as _configureStdLogging.level
    @param fmt: same as _configureStdLogging.fmt
    @param output: not implemented yet TODO
    @param logger: same as _configureStdLogging.logger
    @param colors: same as _configureStdLogging.colors
    @param force_colors: same as _configureStdLogging.force_colors
    """
    if level is not None:
        # we deactivate methods below level
        level_idx = C.LOG_LEVELS.index(level)
        def dev_null(self, msg):
            pass
        for _level in C.LOG_LEVELS[:level_idx]:
            setattr(Logger, _level.lower(), dev_null)
    if fmt is not None:
        if fmt != '%(message)s': # %(message)s is the same as None
            Logger.fmt = fmt
    if output is not None:
        if output != C.LOG_OPT_OUTPUT_SEP + C.LOG_OPT_OUTPUT_DEFAULT:
            # TODO: manage other outputs
            raise NotImplementedError("Basic backend only manage default output yet")

    if logger:
        Logger.filter_name = FilterName(logger)

    if colors:
        import sys
        if force_colors or sys.stdout.isatty():
            # we need colors
            Logger.post_treat = lambda self, level, message: _ansiColors(level, message)
    elif force_colors:
        raise ValueError("force_colors can't be used if colors is False")

def configure(backend=C.LOG_BACKEND_STANDARD, **options):
    """Configure logging behaviour
    @param backend: can be:
        C.LOG_BACKEND_STANDARD: use standard logging module
        C.LOG_BACKEND_TWISTED: use twisted logging module (with standard logging observer)
        C.LOG_BACKEND_BASIC: use a basic print based logging
    """
    global _backend
    if _backend is not None:
        raise exceptions.InternalError("Logging can only be configured once")
    _backend = backend

    if backend in (C.LOG_BACKEND_TWISTED, C.LOG_BACKEND_STANDARD):
        if backend ==  C.LOG_BACKEND_TWISTED:
            from twisted.python import log
            observer = log.PythonLoggingObserver()
            observer.start()
        global getLogger
        global debug
        global info
        global warning
        global error
        global critical
        import logging
        _configureStdLogging(logging, **options)
        getLogger = logging.getLogger
        debug = logging.debug
        info = logging.info
        warning = logging.warning
        error = logging.error
        critical = logging.critical

    elif backend == C.LOG_BACKEND_BASIC:
        _configureBasic(**options)

    else:
        raise ValueError("unknown backend")

def _parseOptions(options):
    """Parse string options as given in conf or environment variable, and return expected python value

    @param options (dict): options with (key: name, value: string value)
    """
    COLORS = C.LOG_OPT_COLORS[0]
    LEVEL = C.LOG_OPT_LEVEL[0]

    if COLORS in options:
        if options[COLORS].lower() in ('1', 'true'):
            options[COLORS] = True
        elif options[COLORS] == 'force':
            options[COLORS] = True
            options['force_colors'] = True
        else:
            options[COLORS] = False
    if LEVEL in options:
        level = options[LEVEL].upper()
        if level not in C.LOG_LEVELS:
            level = C.LOG_LVL_INFO
        options[LEVEL] = level

def satConfigure(backend=C.LOG_BACKEND_TWISTED):
    """Configure logging system for SàT, can be used by frontends

    logs conf is read in SàT conf, then in environment variables. It must be done before Memory init
    """
    import ConfigParser
    import os
    log_conf = {}
    config = ConfigParser.SafeConfigParser()
    config.read(C.CONFIG_FILES)
    for opt_name, opt_default in C.LOG_OPTIONS:
        try:
            log_conf[opt_name] = os.environ[''.join((C.ENV_PREFIX, C.LOG_OPT_PREFIX.upper(), opt_name.upper()))]
        except KeyError:
            try:
                log_conf[opt_name] = config.get('DEFAULT', C.LOG_OPT_PREFIX + opt_name)
            except ConfigParser.NoOptionError:
                log_conf[opt_name] = opt_default

    _parseOptions(log_conf)
    configure(backend, **log_conf)

def getLogger(name=C.LOG_BASE_LOGGER):
    return _loggers.setdefault(name, Logger(name))

_root_logger = getLogger()

def debug(msg):
    _root_logger.debug(msg)

def info(msg):
    _root_logger.info(msg)

def warning(msg):
    _root_logger.warning(msg)

def error(msg):
    _root_logger.error(msg)

def critical(msg):
    _root_logger.critical(msg)