Python日志最佳实践

作者: 大蟒传奇 | 来源:发表于2017-05-02 08:01 被阅读891次

    本文翻译自logging-cookbook

    本文主要讲述一些Python记录日志的最佳实践。

    多模块记录日志

    多次调用logging.getLogger('someLogger')会返回对同一个记录器对象的引用。只要是在同一个Python解释器进程中,不管是单个模块,还是多个模块,都是这样。对同一个对象来说也是这样;此外,可以在一个模块中定义和配置父记录器,然后在另外一个模块中创建子记录器,对子记录器的调用将传递给父记录器。比如下面的例子:

    # encoding:utf8
    import logging
    import auxiliary_module
    
    # 创建名为'spam_application'的记录器
    logger = logging.getLogger('spam_application')
    logger.setLevel(logging.DEBUG)
    
    # 创建级别为DEBUG的日志处理器
    fh = logging.FileHandler('spam.log')
    fh.setLevel(logging.DEBUG)
    
    # 创建级别为ERROR的控制台日志处理器
    ch = logging.StreamHandler()
    ch.setLevel(logging.ERROR)
    
    # 创建格式器,加到日志处理器中
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    fh.setFormatter(formatter)
    ch.setFormatter(formatter)
    
    logger.addHandler(fh)
    logger.addHandler(ch)
    
    logger.info('creating an instance of auxiliary_module.Auxiliary')
    a = auxiliary_module.Auxiliary()
    logger.info('created an instance of auxiliary_module.Auxiliary')
    logger.info('calling auxiliary_module.Auxiliary.do_something')
    a.do_something()
    logger.info('finished auxiliary_module.Auxiliary.do_something')
    logger.info('calling auxiliary_module.some_function()')
    auxiliary_module.some_function()
    logger.info('done with auxiliary_module.some_function()')
    
    

    下面是auxiliary模块:

    import logging
    module_logger = logging.getLogger('spam_application.auxiliary')
    
    
    class Auxiliary:
        def __init__(self):
            self.logger = logging.getLogger('spam_application.auxiliary.Auxiliary')
            self.logger.info("creating an instance of Auxiliary")
    
        def do_something(self):
            self.logger.info('doing something')
            a = 1+1
            self.logger.info('done doing something')
    
    
    def some_function():
        module_logger.info('received a call to "some_function"')
        
    

    输出如下:

    2017-04-28 16:12:40,488 - spam_application - INFO - creating an instance of auxiliary_module.Auxiliary
    2017-04-28 16:12:40,488 - spam_application.auxiliary.Auxiliary - INFO - creating an instance of Auxiliary
    2017-04-28 16:12:40,488 - spam_application - INFO - created an instance of auxiliary_module.Auxiliary
    2017-04-28 16:12:40,488 - spam_application - INFO - calling auxiliary_module.Auxiliary.do_something
    2017-04-28 16:12:40,488 - spam_application.auxiliary.Auxiliary - INFO - doing something
    2017-04-28 16:12:40,488 - spam_application.auxiliary.Auxiliary - INFO - done doing something
    2017-04-28 16:12:40,488 - spam_application - INFO - finished auxiliary_module.Auxiliary.do_something
    2017-04-28 16:12:40,488 - spam_application - INFO - calling auxiliary_module.some_function()
    2017-04-28 16:12:40,488 - spam_application.auxiliary - INFO - received a call to "some_function"
    2017-04-28 16:12:40,488 - spam_application - INFO - done with auxiliary_module.some_function()
    
    

    多线程记录日志

    logging模块支持多线程。下面的代码展示了从主线程和子线程中记录日志的例子:

    import logging
    import threading
    import time
    
    
    def worker(arg):
        while not arg['stop']:
            logging.debug('Hi from myfunc')
            time.sleep(0.5)
    
    
    def main():
        logging.basicConfig(level=logging.DEBUG, format='%(relativeCreated)6d %(threadName)s %(message)s')
        info = {'stop': False}
        thread = threading.Thread(target=worker, args=(info,))
        thread.start()
        while True:
            try:
                logging.debug('Hello from main')
                time.sleep(0.75)
            except KeyboardInterrupt:
                info['stop'] = True
                break
        thread.join()
    
    if __name__ == '__main__':
        main()
    
    

    输出如下:

         0 Thread-1 Hi from myfunc
         0 MainThread Hello from main
       505 Thread-1 Hi from myfunc
       751 MainThread Hello from main
      1007 Thread-1 Hi from myfunc
      1506 MainThread Hello from main
      1512 Thread-1 Hi from myfunc
      2017 Thread-1 Hi from myfunc
      2261 MainThread Hello from main
      2522 Thread-1 Hi from myfunc
      3015 MainThread Hello from main
      3027 Thread-1 Hi from myfunc
      3532 Thread-1 Hi from myfunc
      3770 MainThread Hello from main
      4037 Thread-1 Hi from myfunc
      4524 MainThread Hello from main
      4542 Thread-1 Hi from myfunc
      5047 Thread-1 Hi from myfunc
      5279 MainThread Hello from main
      5552 Thread-1 Hi from myfunc
    
    

    上面的例子展示了期望的输出。当然,对于更多的子线程,该方法也是适用的。

    多个日志处理器和格式器

    记录器都是Python对象。调用addHandler()方法添加日志处理器没有数量限制。有时候应用程序需要将所有的日志记录到文件,同时将错误消息输出到控制台。要达到这种效果,配置多个日志处理器即可。应用程序中调用记录器接口的代码不需要做出改变。下面的代码基于之前的例子做了一些小的改动:

    # encoding:utf8
    import logging
    
    logger = logging.getLogger('simple_example')
    logger.setLevel(logging.DEBUG)
    
    # 创建名为'spam_application'的记录器,记录所有的日志
    fh = logging.FileHandler('spam.log')
    fh.setLevel(logging.DEBUG)
    
    # 创建级别为DEBUG的日志处理器
    ch = logging.StreamHandler()
    ch.setLevel(logging.ERROR)
    
    # 创建格式器,加到日志处理器中
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    ch.setFormatter(formatter)
    fh.setFormatter(formatter)
    
    logger.addHandler(ch)
    logger.addHandler(fh)
    
    logger.debug('debug message')
    logger.info('info message')
    logger.warn('warn message')
    logger.error('error message')
    logger.critical('critical message')
    
    

    注意到,应用程序中的代码并不关心日志处理器。

    创建较高或较低级别的日志处理器,在测试的时候是很有用的。可以使用logger.debug取代print来调试程序;打印语句在使用后必须删除或者注释掉,但是logger.debug可以留在代码中,下次再调试,只需要修改记录器或日志处理器的日志级别就行了。

    多目的地记录

    假设要根据不同的情况将不同的日志记录到不同的地方。比如,将DEBUG级别或以上的日志记录到文件,INFO级别或以上的日志输出到控制台。还假设文件中的日志应该包含时间戳,而输出到控制台的日志不包含时间戳。下面的代码实现了这一点:

    # encoding:utf8
    import logging
    
    logging.basicConfig(
        level=logging.DEBUG,
        format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
        datefmt='%m-%d %H:%M',
        filename='/tmp/myapp.log',
        filemode='w'
    )
    
    # 定义日志处理器将INFO或者以上级别的日志发送到 sys.stderr
    console = logging.StreamHandler()
    console.setLevel(logging.INFO)
    
    # 设置控制台日志的格式
    formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
    console.setFormatter(formatter)
    
    logging.getLogger('').addHandler(console)
    
    logging.info('Jackdaws love my big sphinx of quartz.')
    
    logger1 = logging.getLogger('myapp.area1')
    logger2 = logging.getLogger('myapp.area2')
    
    logger1.debug('Quick zephyrs blow, vexing daft Jim.')
    logger1.info('How quickly daft jumping zebras vex.')
    logger2.warning('Jail zesty vixen who grabbed pay from quack.')
    logger2.error('The five boxing wizards jump quickly.')
    
    

    执行上面的脚本,在控制台上会看到:

    root        : INFO     Jackdaws love my big sphinx of quartz.
    myapp.area1 : INFO     How quickly daft jumping zebras vex.
    myapp.area2 : WARNING  Jail zesty vixen who grabbed pay from quack.
    myapp.area2 : ERROR    The five boxing wizards jump quickly.
    
    

    在文件中会看到:

    04-28 16:41 root         INFO     Jackdaws love my big sphinx of quartz.
    04-28 16:41 myapp.area1  DEBUG    Quick zephyrs blow, vexing daft Jim.
    04-28 16:41 myapp.area1  INFO     How quickly daft jumping zebras vex.
    04-28 16:41 myapp.area2  WARNING  Jail zesty vixen who grabbed pay from quack.
    04-28 16:41 myapp.area2  ERROR    The five boxing wizards jump quickly.
    
    

    可以看到,DEBUG级别的日志只在文件中出现,其他的日志在控制台和文件中都有。

    配置服务器例子

    下面是一个日志配置服务器的例子:

    # encoding:utf8
    import logging
    import logging.config
    import time
    import os
    
    # 读取初始配置文件
    logging.config.fileConfig('logging.conf')
    
    # 创建服务,监听9999端口
    t = logging.config.listen(9999)
    t.start()
    
    logger = logging.getLogger('simpleExample')
    
    try:
        while True:
            logger.debug('debug message')
            logger.info('info message')
            logger.warn('warn message')
            logger.error('error message')
            logger.critical('critical message')
            time.sleep(5)
    
    except KeyboardInterrupt:
        logging.config.stopListening()
        t.join()
    
    

    下面的脚本传入文件名作为参数,并将文件内容发送到服务器作为新的日志配置,发送内容前,还会发送文件内容的二进制长度:

    #!/usr/bin/env python
    import socket, sys, struct
    
    with open(sys.argv[1], 'rb') as f:
        data_to_send = f.read()
        
    HOST = 'localhost'
    PORT = 9999
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    print('connecting...')
    s.connect((HOST, PORT))
    print('sending config...')
    s.send(struct.pack('>L', len(data_to_send)))
    s.send(data_to_send)
    s.close()
    print('complete')
    
    

    通过网络发送和接收日志事件

    假设要通过网络发送日志事件,并在接收端处理这些事件。一个简单的方法是将SocketHandler实例附加到发送端的根记录器:

    import logging
    import logging.handlers
    
    rootLogger = logging.getLogger('')
    rootLogger.setLevel(logging.DEBUG)
    socketHandler = logging.handlers.SocketHandler(
        'localhost', logging.handlers.DEFAULT_TCP_LOGGING_PORT
    )
    
    rootLogger.addHandler(socketHandler)
    rootLogger.info('Jackdaws love my big sphix of quartz.')
    
    logger1 = logging.getLogger('myapp.area1')
    logger2 = logging.getLogger('myapp.area2')
    
    logger1.debug('Quich zephyrs blow, vexing daft Jim.')
    logger1.info('How quickly daft jumping zebras vex.')
    logger2.warning('Jail zesty vixen who grabbed pay from quack.')
    logger2.error('The fix boxing wizards jump quickly.')
    
    

    在接收端可以使用SocketServer模块,下面是一个例子:

    import pickle
    import logging
    import logging.handlers
    import SocketServer
    import struct
    
    
    class LogRecordStreamHandler(SocketServer.StreamRequestHandler):
        def handle(self):
            while True:
                chunk = self.connection.recv(4)
                if len(chunk) < 4:
                    break
                slen = struct.unpack('>L', chunk)[0]
                chunk = self.connection.recv(slen)
                while len(chunk) < slen:
                    chunk = chunk + self.connection.recv(slen - len(chunk))
                obj = self.unPickle(chunk)
                record = logging.makeLogRecord(obj)
                self.handleLogRecord(record)
    
        def unPickle(self, data):
            return pickle.loads(data)
    
        def handleLogRecord(self, record):
            if self.server.logname is not None:
                name = self.server.logname
            else:
                name = record.name
            logger = logging.getLogger(name)
            logger.handle(record)
    
    
    class LogRecordSocketReceiver(SocketServer.ThreadingTCPServer):
        allow_reuse_address = 1
    
        def __init__(self, host='localhost',
                     port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
                     handler=LogRecordStreamHandler):
            SocketServer.ThreadingTCPServer.__init__(self, (host, port), handler)
            self.abort = 0
            self.timeout = 1
            self.logname = None
    
        def serve_until_stopped(self):
            import select
            abort = 0
            while not abort:
                rd, wr, ex = select.select([self.socket.fileno()], [], [], self.timeout)
                if rd:
                    self.handle_request()
                abort = self.abort
    
    
    def main():
        logging.basicConfig(format='%(relativeCreated)5d %(name)-15s %(levelname)-8s %(message)s')
    
        tcpserver = LogRecordSocketReceiver()
        print('About to start TCP server...')
        tcpserver.serve_until_stopped()
    
    
    if __name__ == '__main__':
        main()
    
    

    启动服务端后,启动客户端。客户端控制台没有输出;在服务端,可以看到:

    About to start TCP server...
        6 root            INFO     Jackdaws love my big sphix of quartz.
        9 myapp.area1     DEBUG    Quich zephyrs blow, vexing daft Jim.
        9 myapp.area1     INFO     How quickly daft jumping zebras vex.
        9 myapp.area2     WARNING  Jail zesty vixen who grabbed pay from quack.
        9 myapp.area2     ERROR    The fix boxing wizards jump quickly.
    

    请注意,在某些情况下使用pickle会带来一些安全问题。如果对应用有影响的话,可以通过覆盖makePickle来采用别的序列化方案,同时调整上面的脚本来使用新的序列化方法。

    日志输出中添加上下文信息

    有时候开发者会希望在输出的日志中包含上下文信息以及传递给日志调用的参数。比如,在网络应用程序中,可能希望在日志中记录客户端的信息(比如远程客户端的用户名或IP地址)。虽然可以通过额外的参数来实现这一点,但不是很方便。虽然可以对每个连接都创建一个Logger实例,但是这样并不好,因为这些实例不会被垃圾回收掉。这在实践中可能不是问题,但是当Logger实例的数量取决于应用程序中调用次数从而没有限制时,则可能难以管理。

    使用适配器传递上下文信息

    一个简单的将上下文信息传递给日志输出的方式是使用LoggerAdapter类。在设计上,这个类有点像Logger,提供了debug(), info(), warning(), error(), exception(), critical()log()这些接口。这些方法和Logger中的同名方法提供同样的功能。

    创建一个LoggerAdapter实例时,传入一个Logger实例和一个包含上下文的类字典对象。当调用LoggerAdapter实例的方法时,它会将调用委托给其底层的Logger实例,并在委托调用中传递上下文信息。比如下面的例子:

    def debug(self, msg, *args, **kwargs):
        msg, kwargs = self.process(msg, kwargs)
        self.logger.debug(msg, *args, **kwargs)
    

    其中process()方法用于在日志输出中添加上下文信息。这个方法修改传入的消息和参数,然后调用底层的记录器来处理修改后的消息和参数。该方法的默认实现是保留消息,然后在参数中加入类字典的名为'extra'的对象,这个对象在初始化的时候传入。当然也可以在调用适配器方法的时候传入,这样会覆盖初始化时传入的值。

    使用'extra'的好处在于,类字典对象中的值会合并到LogRecord实例中,这样就能够用Formatter实例来自定义输出的字符串。如果需要别的方法,比如将上下文的信息加入消息的头部或者尾部,则只需要在LoggerAdapter的子类中覆盖process()方法即可。下面是一个例子:

    class CustomAdapter(logging.LoggerAdapter):
        def process(self, msg, kwargs):
            return '[%s] %s' % (self.extra['connid'], msg), kwargs
    

    在代码中可以这样调用:

    logger = logging.getLogger(__name__)
    adapter = CustomAdapter(logger, {'connid': some_conn_id})
    

    这样,任何使用适配器记录的日志首部都会加上some_conn_id这个值。

    传入对象而不是字典

    在初始化LoggerAdapter实例时,不必传入字典--只需要传入的对象有__getitem____iter__方法即可。如果想要动态生成内容的话,这样会很有用(字典中的内容是固定的)。

    使用过滤器来传递上下文信息

    也可以使用自定义的Filter来添加上下文信息。Filter实例可以改变传入的LogRecords,比如添加附加属性后使用合适的格式字符串输出,如果需要的话可以使用自定义的Formatter

    比如在一个web应用中,正在处理的请求可以存放在一个threading.local变量中,然后通过一个Filter添加远程的IP地址和远程的用户名到LogRecord中,新加的属性名可以使用上面LoggerAdapter例子中的'ip'和'user'。下面是示例脚本:

    import logging
    from random import choice
    
    
    class ContextFilter(logging.Filter):
        USERS = ['jim', 'fred', 'sheila']
        IPS = ['123.123.123.123', '127.0.0.1', '192.168.0.1']
    
        def filter(self, record):
            record.ip = choice(ContextFilter.IPS)
            record.user = choice(ContextFilter.USERS)
            return True
    
    
    if __name__ == '__main__':
        levels = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL)
        logging.basicConfig(level=logging.DEBUG,
                            format='%(asctime)-15s %(name)-5s %(levelname)-8s IP: %(ip)-15s User: %(user)-8s %(message)s')
    
        a1 = logging.getLogger('a.b.c')
        a2 = logging.getLogger('d.e.f')
    
        f = ContextFilter()
        a1.addFilter(f)
        a2.addFilter(f)
        a1.debug('a debug message')
        a1.info('An info message with %s', 'some parameters')
        for x in range(10):
            lvl = choice(levels)
            lvlname = logging.getLevelName(lvl)
            a2.log(lvl, 'A message at %s level with %d %s', lvlname, 2, 'parameters')
    
    

    运行脚本的输出如下:

    2017-05-01 21:06:43,900 a.b.c DEBUG    IP: 127.0.0.1       User: fred     a debug message
    2017-05-01 21:06:43,900 a.b.c INFO     IP: 127.0.0.1       User: jim      An info message with some parameters
    2017-05-01 21:06:43,901 d.e.f ERROR    IP: 192.168.0.1     User: fred     A message at ERROR level with 2 parameters
    2017-05-01 21:06:43,901 d.e.f CRITICAL IP: 127.0.0.1       User: fred     A message at CRITICAL level with 2 parameters
    2017-05-01 21:06:43,901 d.e.f ERROR    IP: 192.168.0.1     User: sheila   A message at ERROR level with 2 parameters
    2017-05-01 21:06:43,901 d.e.f CRITICAL IP: 192.168.0.1     User: fred     A message at CRITICAL level with 2 parameters
    2017-05-01 21:06:43,901 d.e.f INFO     IP: 127.0.0.1       User: sheila   A message at INFO level with 2 parameters
    2017-05-01 21:06:43,901 d.e.f DEBUG    IP: 192.168.0.1     User: fred     A message at DEBUG level with 2 parameters
    2017-05-01 21:06:43,901 d.e.f WARNING  IP: 192.168.0.1     User: fred     A message at WARNING level with 2 parameters
    2017-05-01 21:06:43,901 d.e.f CRITICAL IP: 123.123.123.123 User: sheila   A message at CRITICAL level with 2 parameters
    2017-05-01 21:06:43,901 d.e.f ERROR    IP: 123.123.123.123 User: sheila   A message at ERROR level with 2 parameters
    2017-05-01 21:06:43,901 d.e.f INFO     IP: 192.168.0.1     User: jim      A message at INFO level with 2 parameters
    
    

    多进程日志写入同一文件

    虽然logging是线程安全的,并且支持单进程中的多线程写入同一日志文件,但是却并不支持多进程写入同一日志文件,因为在Python没有实现多进程对单个文件的序列化访问的标准方式。如果需要多进程将日志写入同一文件,实现的一种方式是让所有进程将日志写入SocketHandler,然后起一个Socket服务器读取所有日志并写入文件。这部分讲述了更多的细节,并且包含一个可用的例子作为参考。

    如果使用的Python版本包括multiprocess模块,可以使用Lock类来将进程对文件的访问序列化。

    日志轮转

    有时候需要在日志文件增长到一定大小后,将日志写入新的文件中。有时候你希望将这个文件的大小和数量限制在一定范围内。对于这样的需求,logging模块提供了RotatingFileHandler:

    import glob
    import logging
    import logging.handlers
    
    LOG_FILENAME = 'logging_rotatingfile_example.out'
    
    my_logger = logging.getLogger('MyLogger')
    my_logger.setLevel(logging.DEBUG)
    
    handler = logging.handlers.RotatingFileHandler(LOG_FILENAME, maxBytes=20, backupCount=5)
    
    my_logger.addHandler(handler)
    
    for i in range(20):
        my_logger.debug('i = %d' % i)
    
    logfiles = glob.glob('%s*' % LOG_FILENAME)
    
    for filename in logfiles:
        print(filename)
    
    

    运行上面的脚本会生成6个文件:

    logging_rotatingfile_example.out
    logging_rotatingfile_example.out.1
    logging_rotatingfile_example.out.2
    logging_rotatingfile_example.out.3
    logging_rotatingfile_example.out.4
    logging_rotatingfile_example.out.5
    
    

    当前的写入的日志文件名总是logging_rotatingfile_example.out,这个文件大小到达限制时会被加上.1后缀来重命名,其他文件的后缀会加一(.1变为.2,等等),后缀为.6的文件将被删除。

    这个例子显然将长度设置得太小了。实际应用中应该将maxBytes设置为合适的值。

    一个基于字典的配置示例

    下面是一个基于字典的配置示例,这个例子来自Django文档。在使用中,这个字典会传入dictConfig()来生效。

    LOGGING = {
        'version': 1,
        'disable_existing_loggers': True,
        'formatters': {
            'verbose': {
                'format': '%(levelname)s %(asctime)s %(module)s %(process)d %(thread)d %(message)s'
            },
            'simple': {
                'format': '%(levelname)s %(message)s'
            },
        },
        'filters': {
            'special': {
                '()': 'project.logging.SpecialFilter',
                'foo': 'bar',
            }
        },
        'handlers': {
            'null': {
                'level':'DEBUG',
                'class':'django.utils.log.NullHandler',
            },
            'console':{
                'level':'DEBUG',
                'class':'logging.StreamHandler',
                'formatter': 'simple'
            },
            'mail_admins': {
                'level': 'ERROR',
                'class': 'django.utils.log.AdminEmailHandler',
                'filters': ['special']
            }
        },
        'loggers': {
            'django': {
                'handlers':['null'],
                'propagate': True,
                'level':'INFO',
            },
            'django.request': {
                'handlers': ['mail_admins'],
                'level': 'ERROR',
                'propagate': False,
            },
            'myproject.custom': {
                'handlers': ['console', 'mail_admins'],
                'level': 'INFO',
                'filters': ['special']
            }
        }
    }
    

    要想了解这个配置的更多信息,请参见相关章节

    SyslogHandler消息中插入BOM

    RFC 5424要求将Unicode消息作为一组具有以下结构的字节发送到syslog守护程序:可选的纯ASCII组件,后跟UTF-8字节顺序标记(BOM),后跟使用UTF-8编码的Unicode。(见相关要求)

    在Python2.6和2.7中,SysLogHandler将BOM插入了消息并放在开头,但是不幸的是,这个实现是错误,导致了在BOM之前任何纯ASCII组件都不允许出现。

    这个错误的实现在Python2.7.4及其之后的版本中被删除。但是这个方法并没有被代替,如果要生成与RFC 5424兼容的消息,必须:

    1. SysLogHandler中加入Formatter实例,格式如u'ASCII section\ufeffUnicode section'。当使用UTF-8编码时,Unicode代码点u'\ufeff'将被编码为UTF-8 BOM-字节串\xef\xbb\xbf
    2. 将ASCII部分替换为你喜欢的任何占位符,但要确保替换后出现的数据始终是ASCII。
    3. 将Unicode部分替换为你喜欢的任何占位符;替换后的数据包含ASCII范围以外的编码也不要紧--它会使用UTF-8进行编码。

    如果格式化的消息是Unicode,SysLogHandler将使用UTF-8进行编码。遵从上面的规则就可以生成RFC 5424兼容的消息。如果生成的消息与RFC 5424不兼容,logging模块不会报错,但是syslog守护程序会出现报错信息。

    实现格式化日志

    虽然大多数的日志都是为了人类阅读而不是易于解析的,但是为了便于程序解析(不用复杂的正则表达式),可能会出现一些想要以格式化形式输出日志的情况。logging模块提供了这样的功能。有许多方法可以实现这一点,这里介绍一种简单的方法,它使用JSON以机器可分解的方式对事件进行序列化:

    import json
    import logging
    
    
    class StructureMessage(object):
        def __init__(self, message, **kwargs):
            self.message = message
            self.kwargs = kwargs
    
        def __str__(self):
            return '%s >>> %s' % (self.message, json.dumps(self.kwargs))
    
    
    _ = StructureMessage
    
    logging.basicConfig(level=logging.INFO, format='%(message)s')
    logging.info(_('message 1', foo='bar', bar='baz', num=123, fnum=123.456))
    
    

    如果运行上面的脚本,会输出:

    message 1 >>> {"fnum": 123.456, "num": 123, "bar": "baz", "foo": "bar"}
    
    

    不同的Python版本输出的顺序可能不同。

    如果需要更专门的处理,可以使用自定义的JSON编码器,如下:

    from __future__ import unicode_literals
    
    import json
    import logging
    
    try:
        unicode
    except NameError:
        unicode = str
    
    
    class Encoder(json.JSONEncoder):
        def default(self, o):
            if isinstance(o, set):
                return tuple(o)
            elif isinstance(o, unicode):
                return o.encode('unicode_escape'.decode('ascii'))
            return super(Encoder, self).default(o)
    
    
    class StructuredMessage(object):
        def __init__(self, message, **kwargs):
            self.message = message
            self.kwargs = kwargs
    
        def __str__(self):
            s = Encoder().encode(self.kwargs)
            return '%s >>> %s' % (self.message, s)
    
    
    _ = StructuredMessage
    
    
    def main():
        logging.basicConfig(level=logging.INFO, format='%(message)s')
        logging.info(_('message 1', set_value=set([1, 2, 3]), snowman='\u2603'))
    
    if __name__ == '__main__':
        main()
    
    

    运行上面的脚本,会输出

    message 1 >>> {"snowman": "\u2603", "set_value": [1, 2, 3]}
    
    

    使用dictConfig()自定义日志处理器

    有时候,你会希望以特定的方式自定义日志处理器,如果使用dictConfig(),则可以在不使用子类的情况下做到这一点。例如,可以设置日志文件的所有权。在POSIX上,这个操作可以通过os.chown()轻松完成,但是stdlib中的文件日志处理器不提供内置支持。可以通过简单的函数来定制日志处理器的创建,比如:

    def owned_file_handler(filename, mode='a', encoding=None, owner=None):
        if owner:
            import os, pwd, grp
            uid = pwd.getpwnam(owner[0]).pw_uid
            gid = grp.getgrnam(owner[1]).gr_gid
            owner = (uid, gid)
            if not os.path.exists(filename):
                open(filename, 'a').close()
            os.chown(filename, *owner)
        return logging.FileHandler(filename, mode, encoding)
    
    

    然后,你可以在传递给dictConfig()的日志配置中指定通过调用此函数创建日志处理器:

    LOGGING = {
        'version': 1,
        'disable_existing_loggers': False,
        'formatters': {
            'default': {
                'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
            },
        },
        'handlers': {
            'file': {
                '()': owned_file_handler,
                'level': 'DEBUG',
                'formatter': 'default',
                'owner': ['pulse', 'pulse'],
                'filename': 'chowntest.log',
                'mode': 'w',
                'encoding': 'utf-8',
            },
        },
        'root': {
            'handlers': ['file'],
            'level': 'DEBUG'
        }
    }
    

    这个例子使用pulse用户和组来展示。完整代码如下:

    # chowntest.py
    import logging, logging.config, os, shutil
    
    def owned_file_handler(filename, mode='a', encoding=None, owner=None):
        if owner:
            if not os.path.exists(filename):
                open(filename, 'a').close()
            shutil.chown(filename, *owner)
        return logging.FileHandler(filename, mode, encoding)
    
    LOGGING = {
        'version': 1,
        'disable_existing_loggers': False,
        'formatters': {
            'default': {
                'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
            },
        },
        'handlers': {
            'file':{
                # The values below are popped from this dictionary and
                # used to create the handler, set the handler's level and
                # its formatter.
                '()': owned_file_handler,
                'level':'DEBUG',
                'formatter': 'default',
                # The values below are passed to the handler creator callable
                # as keyword arguments.
                'owner': ['pulse', 'pulse'],
                'filename': 'chowntest.log',
                'mode': 'w',
                'encoding': 'utf-8',
            },
        },
        'root': {
            'handlers': ['file'],
            'level': 'DEBUG',
        },
    }
    
    logging.config.dictConfig(LOGGING)
    logger = logging.getLogger('mylogger')
    logger.debug('A debug message')
    

    使用root账号运行上面的脚本:

    $ python3 test.py
    $ cat chowntest.log
    2017-05-01 10:36:53,570 DEBUG mylogger A debug message
    $ ls -l chowntest.log
    -rw-r--r-- 1 pulse pulse 55 May  1 10:36 chowntest.log
    

    注意,因为使用了shutil.chown()方法,所以要采用Python3。在Python3.3之前的版本中,需要使用像os.chown()来改变文件的所有权。

    在实践中,创建日志处理器的函数可能位于项目某个模块中。应该将配置中的

    '()': owned_file_handler,
    

    修改为,比如这样:

    '()': 'ext://project.util.owned_file_handler',
    

    上面的project.util可以被实际的包名所取代。上面的脚本中,使用ext://__main__.owned_file_handler

    使用dictConfig()配置过滤器

    可以使用dictConfig()配置过滤器,但是乍看之下可能难以明白。因为标准库中只包含了Filter这一个过滤器,所以可能不太能满足所有的需求,通常需要在子类中重写filter()方法。实现这一点,要配置字典中过滤器相关的()配置,指定能够生成一个过滤器的可调用对象。下面是一个完整例子:

    import logging
    import logging.config
    import sys
    
    
    class MyFilter(logging.Filter):
        def __init__(self, param=None):
            self.param = param
    
        def filter(self, record):
            if self.param is None:
                allow = True
            else:
                allow = self.param not in record.msg
            if allow:
                record.msg = 'changed: ' + record.msg
            return allow
    
    
    LOGGING = {
        'version': 1,
        'filters': {
            'myfilter': {
                '()': MyFilter,
                'param': 'noshow',
            }
        },
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
                'filters': ['myfilter']
            }
        },
        'root': {
            'level': 'DEBUG',
            'handlers': ['console']
        },
    }
    
    if __name__ == '__main__':
        logging.config.dictConfig(LOGGING)
        logging.debug('hello')
        logging.debug(('hello - noshow'))
    
    

    这个例子展示了如何使用关键字形式的参数作为配置传入可调用对象,然后生成过滤器实例的过程。执行上面的脚本,会看到如下输出:

    changed: hello
    
    

    还有几点需要注意:

    • 如果不能直接在配置中引用该可调用对象(比如配置字典所在的模块不能导入另外一个模块中的内容),可以使用ext://...这样的形式,见Access to external objects。比如在上面的例子中,可以用ext://__main__.MyFilter来代替MyFilter
    • 除了过滤器,此技术还可用于配置自定义日志处理器和格式器。见User-defined objects

    自定义异常格式

    有时候可能需要自定义异常格式,为了说明问题,假定只能用一行记录每个事件,即使出现异常信息也一样。可以自定义格式器来解决这个问题,如下:

    import logging
    
    
    class OneLineExceptionFormatter(logging.Formatter):
        def formatException(self, ei):
            result = super(OneLineExceptionFormatter, self).formatException(ei)
            return repr(result)
    
        def format(self, record):
            s = super(OneLineExceptionFormatter, self).format(record)
            if record.exc_text:
                s = s.replace('\n', '') + '|'
            return s
    
    
    def config_logging():
        fh = logging.FileHandler('output.txt', 'w')
        f = OneLineExceptionFormatter('%(asctime)s|%(levelname)s|%(message)s|',
                                      '%d/%m/%Y %H:%M:%S')
    
        fh.setFormatter(f)
        root = logging.getLogger()
        root.setLevel(logging.DEBUG)
        root.addHandler(fh)
    
    
    def main():
        config_logging()
        logging.info('Sample message')
        try:
            x = 1 / 0
        except ZeroDivisionError as e:
            logging.exception('ZeroDivisionError: %s', e)
    
    
    if __name__ == '__main__':
        main()
    
    

    执行上面的脚本,会在日志文件中看到两行:

    02/05/2017 06:42:44|INFO|Sample message|
    02/05/2017 06:42:44|ERROR|ZeroDivisionError: integer division or modulo by zero|'Traceback (most recent call last):\n  File "custom_exception_format.py", line 31, in main\n    x = 1 / 0\nZeroDivisionError: integer division or modulo by zero'|
    
    

    念出日志信息

    可能在有些情况下,需要将日志消息呈现为可听而不是可见的格式。可以利用系统中'文本到语音'(TTS)功能,这个功能与Python无关。大多数的TTS系统都会有一个命令行工具,可以使用subprocess在日志处理器中调用它。这里假设TTS命令行程序不需要与用户交互,完成命令的时间不长,出现这种日志的频率不会太高,并且一次发送一个消息是可以接受的,下面的示例实现了一个将日志念出的功能,这可能会导致其他的日志处理器被阻塞。下面的例子中使用了espeakTTS包。

    import logging
    import subprocess
    import sys
    
    
    class TTSHandler(logging.Handler):
        def emit(self, record):
            msg = self.format(record)
            cmd = ['espeak', '-s150', '-ven+f3', msg]
            p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
            p.communicate()
    
    
    def configure_logging():
        h = TTSHandler()
        root = logging.getLogger()
        root.addHandler(h)
        root.setLevel(logging.DEBUG)
    
    
    def main():
        logging.info('Hello')
        logging.debug('Goodbye')
    
    
    if __name__ == '__main__':
        configure_logging()
        sys.exit(main())
    
    

    执行上面的脚本,会听到一个女性的声音读出'Hello'和'Goodbye'。

    缓冲日志信息并有条件地输出

    可能有些情况下,需要将日志记录在临时区域中,只有在某种条件下才输出。比如记录某个函数中的debug事件,如果该函数执行成功,那么不收集这样的调试信息;但是如果该函数执行失败,需要将调试信息和错误一起输出。

    下面的例子使用装饰器来完成这样的功能。这个例子中使用了logging.handlers.MemoryHandler,这个日志处理器允许缓存记录事件,直到满足一些条件时刷新缓存,将事件传递给另外的日志处理器target来做进一步处理。默认情况下,MemoryHandler日志处理器在其缓冲区被填满,或者其级别大于或等于指定阙值的事件时被刷新。可以在自定义的MemoryHandler子类中自定义刷新行为。

    示例脚本有一个简单函数foo,它负责循环遍历所有级别的日志,把将入记录的日志级别写入sys.stderr,然后在记录该级别日志。foo接收一个可选参数,如果参数为true,会在ERROR和CRITICAL上记录日志;否则只记录DEBUG,INFO和WARNING级别日志。

    脚本中有一个装饰器来装饰foo,该装饰器将执行所需要的条件记录。装饰器接收记录器作为参数,并在调用持续时间内附加一个内存处理器到装饰函数。该装饰器有三个可选参数,分别作为目标日志处理器,刷新发生的级别和缓冲区的默认容量。默认值为写入sys.stderrStreamHandlerlogging.ERROR和100。

    脚本如下:

    import logging
    from logging.handlers import MemoryHandler
    import sys
    
    logger = logging.getLogger(__name__)
    logger.addHandler(logging.NullHandler())
    
    
    def log_if_errors(logger, target_handler=None, flush_level=None, capacity=None):
        if target_handler is None:
            target_handler = logging.StreamHandler()
        if flush_level is None:
            flush_level = logging.ERROR
        if capacity is None:
            capacity = 100
        handler = MemoryHandler(capacity, flushLevel=flush_level, target=target_handler)
    
        def decorator(fn):
            def wrapper(*args, **kwargs):
                logger.addHandler(handler)
                try:
                    return fn(*args, **kwargs)
                except Exception:
                    logger.exception('call failed')
                    raise
                finally:
                    super(MemoryHandler, handler).flush()
                    logger.removeHandler(handler)
            return wrapper
    
        return decorator
    
    
    def write_line(s):
        sys.stderr.write('%s\n' % s)
    
    
    def foo(fail=False):
        write_line('about to log at DEBUG ...')
        logger.debug('Actually logged at DEBUG')
        write_line('about to log at INFO')
        logger.info('Actually logged at INFO')
        write_line('about ot log at WARNING')
        logger.warning('Actually logged at WARNING')
        if fail:
            write_line('about to log at ERROR')
            logger.error('Actually logged at ERROR')
            write_line('about to log at CRITICAL')
            logger.critical('Actually logged at CRITICAL')
        return fail
    
    
    decorated_foo = log_if_errors(logger)(foo)
    
    
    if __name__ == '__main__':
        logger.setLevel(logging.DEBUG)
        write_line('Calling undecorated foo with False')
        assert not foo(False)
        write_line('Calling undecorated foo with True')
        assert foo(True)
        write_line('Calling decorated foo with False')
        assert not decorated_foo(False)
        write_line('Calling decorated foo with True')
        assert decorated_foo(True)
    
    

    执行上面的脚本,可以看到如下输出:

    Calling undecorated foo with False
    about to log at DEBUG ...
    about to log at INFO
    about ot log at WARNING
    Calling undecorated foo with True
    about to log at DEBUG ...
    about to log at INFO
    about ot log at WARNING
    about to log at ERROR
    about to log at CRITICAL
    Calling decorated foo with False
    about to log at DEBUG ...
    about to log at INFO
    about ot log at WARNING
    Calling decorated foo with True
    about to log at DEBUG ...
    about to log at INFO
    about ot log at WARNING
    about to log at ERROR
    Actually logged at DEBUG
    Actually logged at INFO
    Actually logged at WARNING
    Actually logged at ERROR
    about to log at CRITICAL
    Actually logged at CRITICAL
    
    

    可以看到,只有记录ERROR或以上级别的日志时,才能看到实际输出,但是在这种情况下,先前较低级别的日志也会记录。

    可以使用装饰器的简便写法:

    @log_if_errors(logger):
    def foo(fail=False):
        ...
    

    配置输出的时间格式

    如果想使用UTC格式化输出时间,可以使用如下UTCFormatter

    import logging
    import time
    
    class UTCFormatter(logging.Formatter):
        converter = time.gmtime
    

    然后在代码中使用UTCFormatter。可以如下方式在dictConfig()中添加配置,如下:

    import logging
    import logging.config
    import time
    
    
    class UTCFormatter(logging.Formatter):
        converter = time.gmtime
    
    
    LOGGING = {
        'version': 1,
        'disable_existing_loggers': False,
        'formatters': {
            'utc': {
                '()': UTCFormatter,
                'format': '%(asctime)s %(message)s',
            },
            'local': {
                'format': '%(asctime)s %(message)s'
            }
        },
        'handlers': {
            'console1': {
                'class': 'logging.StreamHandler',
                'formatter': 'utc',
            },
            'console2': {
                'class': 'logging.StreamHandler',
                'formatter': 'local',
            },
        },
        'root': {
            'handlers': ['console1', 'console2'],
        }
    }
    
    
    if __name__ == '__main__':
        logging.config.dictConfig(LOGGING)
        logging.warning('The local time is %s', time.asctime())
    
    

    执行上面的脚本,输出类似下面:

    2017-05-01 23:43:11,958 The local time is Tue May  2 07:43:11 2017
    2017-05-02 07:43:11,958 The local time is Tue May  2 07:43:11 2017
    
    

    使用上下文管理器进行选择性记录

    有时候,临时更改日志配置会很有用。使用上下文管理器来做到这一点是最好的方法。下面是一个上下文管理器的简单示例,它允许可选地更改日志记录级别,并将日志处理器添加到上下文管理器的范围内:

    import logging
    import sys
    
    
    class LoggingContext(object):
        def __init__(self, logger, level=None, handler=None, close=True):
            self.logger = logger
            self.level = level
            self.handler = handler
            self.close = close
            
        def __enter__(self):
            if self.level is not None:
                self.old_level = self.logger.level
                self.logger.setLevel(self.level)
            if self.handler:
                self.logger.addHandler(self.handler)
                
        def __exit__(self, exc_type, exc_val, exc_tb):
            if self.level is not None:
                self.logger.setLevel(self.old_level)
            if self.handler:
                self.logger.removeHandler(self.handler)
            if self.handler and self.close:
                self.handler.close()
                
    

    如果指定级别,则在上下文管理器覆盖的范围内,记录器的级别将设置为该级别。如果指定日志处理器,则会在入口被加入,并在出口被移除。还可以要求管理器在出口关闭日志处理程序--如果不再需要该日志处理器,可以执行这样的操作。

    使用方法如下:

    if __name__ == '__main__':
        logger = logging.getLogger('foo')
        logger.addHandler(logging.StreamHandler())
        logger.setLevel(logging.INFO)
        logger.info('1. This should appear just once on stderr.')
        logger.debug('2. This should not appear')
        with LoggingContext(logger, level=logging.DEBUG):
            logger.debug('3. This should appear once on stderr.')
        logger.debug('4. This should not appear')
        h = logging.StreamHandler(sys.stdout)
        with LoggingContext(logger, level=logging.DEBUG, handler=h, close=True):
            logger.debug('5. This should appear twice - once on stderr and onceon stdout.')
        logger.info('6. This should appear just once on stderr.')
        logger.debug('7. This should not appear.')
    

    执行上面的脚本,可以看到如下输出:

    1. This should appear just once on stderr.
    3. This should appear once on stderr.
    5. This should appear twice - once on stderr and onceon stdout.
    5. This should appear twice - once on stderr and onceon stdout.
    6. This should appear just once on stderr.
    
    

    如果将stderr重定向到/dev/null,将只会看到输出到stdout的信息:

    $ python logctx.py 2>/dev/null
    5. This should appear twice - once on stderr and onceon stdout.
    
    

    同样的,将stdout重定向到/dev/null,将只会看到输出到stderr的信息:

    $ python logctx.py >/dev/null
    1. This should appear just once on stderr.
    3. This should appear once on stderr.
    5. This should appear twice - once on stderr and onceon stdout.
    6. This should appear just once on stderr.
    

    相关文章

      网友评论

        本文标题:Python日志最佳实践

        本文链接:https://www.haomeiwen.com/subject/yztazttx.html