python的logging模块提供了灵活的标准模块,使得任何Python程序都可以使用这个第三方模块来实现日志记录。
但是 python 中logging 并不支持多进程,所以会遇到不少麻烦。
以 TimedRotatingFileHandler 这个类的问题作为例子。这个Handler本来的作用是:按天切割日志文件。(当天的文件是xxxx.log 昨天的文件是xxxx.log.2016-06-01)。这样的好处是,一来可以按天来查找日志,二来可以让日志文件不至于非常大, 过期日志也可以按天删除。
但是问题来了,如果是用多进程来输出日志,则只有一个进程会切换,其他进程会在原来的文件中继续打,还有可能某些进程切换的时候早就有别的进程在新的日志文件里打入东西了,那么他会无情删掉之,再建立新的日志文件。反正将会很乱很乱,完全没法开心的玩耍。还会有一些其他莫名其妙的麻烦比如: os.rename(self.baseFilename, dfn)WindowsError: [Error 32] 错误 (进程无法访问文件,因为另一个程序正在使用此文件 是文件已经打开的错误,改名前没有关闭文件。就是一个进程在使用此文件,另一个进程想要修改文件名)
so 我们需要改写一个 logging中的 handler 以使logging支持多进程
重写FileHandler类(这个类是所有写入文件的Handler都需要继承的TimedRotatingFileHandler 就是继承的这个类;我们增加一些简单的判断和操作就可以。
我们的逻辑是这样的:
1. 判断当前时间戳是否与指向的文件名是同一个时间
2. 如果不是,则切换 指向的文件即可
3. 结束,是不是很简单的逻辑。
以下代码参考messud4312的博客 感谢这位大哥
#multiprocessloghandler.py
import os
import re
import datetime
import logging
try:
import codecs
except ImportError:
codecs = None
class MultiprocessHandler(logging.FileHandler):
"""支持多进程的TimedRotatingFileHandler"""
def __init__(self,filename,when='D',backupCount=0,encoding=None,delay=False):
"""filename 日志文件名,when 时间间隔的单位,backupCount 保留文件个数
delay 是否开启 OutSteam缓存
True 表示开启缓存,OutStream输出到缓存,待缓存区满后,刷新缓存区,并输出缓存数据到文件。
False表示不缓存,OutStrea直接输出到文件"""
self.prefix = filename
self.backupCount = backupCount
self.when = when.upper()
#正则匹配 年-月-日
#正则写到这里就对了
self.extMath = r"^\d{4}-\d{2}-\d{2}"
# S 每秒建立一个新文件
# M 每分钟建立一个新文件
# H 每天建立一个新文件
# D 每天建立一个新文件
self.when_dict = {
'S':"%Y-%m-%d-%H-%M-%S",
'M':"%Y-%m-%d-%H-%M",
'H':"%Y-%m-%d-%H",
'D':"%Y-%m-%d"
}
#日志文件日期后缀
self.suffix = self.when_dict.get(when)
#源码中self.extMath写在这里
#这个正则匹配不应该写到这里,不然非D模式下 会造成 self.extMath属性不存在的问题
#不管是什么模式都是按照这个正则来搜索日志文件的。
# if self.when == 'D':
# 正则匹配 年-月-日
# self.extMath = r"^\d{4}-\d{2}-\d{2}"
if not self.suffix:
raise ValueError(u"指定的日期间隔单位无效: %s" % self.when)
#拼接文件路径 格式化字符串
self.filefmt = os.path.join("logs","%s.%s" % (self.prefix,self.suffix))
#使用当前时间,格式化文件格式化字符串
self.filePath = datetime.datetime.now().strftime(self.filefmt)
#获得文件夹路径
_dir = os.path.dirname(self.filefmt)
try:
#如果日志文件夹不存在,则创建文件夹
if not os.path.exists(_dir):
os.makedirs(_dir)
except Exception:
print u"创建文件夹失败"
print u"文件夹路径:" + self.filePath
pass
if codecs is None:
encoding = None
#调用FileHandler
logging.FileHandler.__init__(self,self.filePath,'a+',encoding,delay)
def shouldChangeFileToWrite(self):
"""更改日志写入目的写入文件
return True 表示已更改,False 表示未更改"""
#以当前时间获得新日志文件路径
_filePath = datetime.datetime.now().strftime(self.filefmt)
#新日志文件日期 不等于 旧日志文件日期,则表示 已经到了日志切分的时候
# 更换日志写入目的为新日志文件。
#例如 按 天 (D)来切分日志
# 当前新日志日期等于旧日志日期,则表示在同一天内,还不到日志切分的时候
# 当前新日志日期不等于旧日志日期,则表示不在
#同一天内,进行日志切分,将日志内容写入新日志内。
if _filePath != self.filePath:
self.filePath = _filePath
return True
return False
def doChangeFile(self):
"""输出信息到日志文件,并删除多于保留个数的所有日志文件"""
#日志文件的绝对路径
self.baseFilename = os.path.abspath(self.filePath)
#stream == OutStream
#stream is not None 表示 OutStream中还有未输出完的缓存数据
if self.stream:
self.stream.flush()
self.stream.close()
#delay 为False 表示 不OutStream不缓存数据 直接输出
# 所有,只需要关闭OutStream即可
if not self.delay:
self.stream.close()
#删除多于保留个数的所有日志文件
if self.backupCount > 0:
for s in self.getFilesToDelete():
#print s
os.remove(s)
def getFilesToDelete(self):
"""获得过期需要删除的日志文件"""
#分离出日志文件夹绝对路径
#split返回一个元组(absFilePath,fileName)
#例如:split('I:\ScripPython\char4\mybook\util\logs\mylog.2017-03-19)
#返回(I:\ScripPython\char4\mybook\util\logs, mylog.2017-03-19)
# _ 表示占位符,没什么实际意义,
dirName,_ = os.path.split(self.baseFilename)
fileNames = os.listdir(dirName)
result = []
#self.prefix 为日志文件名 列如:mylog.2017-03-19 中的 mylog
#加上 点号 . 方便获取点号后面的日期
prefix = self.prefix + '.'
plen = len(prefix)
for fileName in fileNames:
if fileName[:plen] == prefix:
#日期后缀 mylog.2017-03-19 中的 2017-03-19
suffix = fileName[plen:]
#匹配符合规则的日志文件,添加到result列表中
if re.compile(self.extMath).match(suffix):
result.append(os.path.join(dirName,fileName))
result.sort()
#返回 待删除的日志文件
# 多于 保留文件个数 backupCount的所有前面的日志文件。
if len(result) < self.backupCount:
result = []
else:
result = result[:len(result) - self.backupCount]
return result
def emit(self, record):
"""发送一个日志记录
覆盖FileHandler中的emit方法,logging会自动调用此方法"""
try:
if self.shouldChangeFileToWrite():
self.doChangeFile()
logging.FileHandler.emit(self,record)
except (KeyboardInterrupt,SystemExit):
raise
except:
self.handleError(record)
messud4312的博客 大哥的源代码是这个样子的,但是 经过我测试发现在使用中会造成一些I/O错误
下面我们来测试一下:
import sys
import time
import multiprocessing
from multiprocessloghandler import MultiprocessHandler
# 定义日志输出格式
formattler = '%(levelname)s - %(name)s - %(asctime)s - %(message)s'
fmt = logging.Formatter(formattler)
# 获得logger,默认获得root logger对象
# 设置logger级别 debug
# root logger默认的级别是warning级别。
# 不设置的话 只能发送 >= warning级别的日志
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
# 设置handleer日志处理器,日志具体怎么处理都在日志处理器里面定义
# SteamHandler 流处理器,输出到控制台,输出方式为stdout
# StreamHandler默认输出到sys.stderr
# 设置handler所处理的日志级别。
# 只能处理 >= 所设置handler级别的日志
# 设置日志输出格式
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setLevel(logging.DEBUG)
stream_handler.setFormatter(fmt)
# 使用我们写的多进程版Handler理器,定义日志输出到mylog.log文件内
# 文件打开方式默认为 a
# 按分钟进行日志切割
file_handler = MultiprocessHandler('mylog', when='M')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(fmt)
# 对logger增加handler日志处理器
logger.addHandler(stream_handler)
logger.addHandler(file_handler)
# 发送debug级别日志消息
def test(num):
time.sleep(3)
logger.debug('日志测试' + str(num))
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=10)
for i in range(10):
pool.apply_async(func=test, args=(i,))
pool.close()
pool.join()
print '完毕'
测试结果如下:
image.png
image.png
这样则能正常的使用
下面重点来了:
def test(num):
time.sleep(10)
logger.debug('日志测试' + str(num))
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
for i in range(10):
pool.apply_async(func=test, args=(i,))
pool.close()
pool.join()
print '完毕'
运行结果如下:
image.png
在休眠时间过长的情况下 会造成 对已关闭文件进行I/0操作的错误,也不是每次都出现。导致日志无法正确写入日志文件内。
为什么会造成这个原因呢?
在方法 doChangeFile中,我们每次输出完self.stream中的信息后,都把stream关闭了 self.stream.close():
def doChangeFile(self):
"""输出信息到日志文件,并删除多于保留个数的所有日志文件"""
#日志文件的绝对路径
self.baseFilename = os.path.abspath(self.filePath)
#stream == OutStream
#stream is not None 表示 OutStream中还有未输出完的缓存数据
if self.stream:
self.stream.flush()
self.stream.close()
#delay 为False 表示 不OutStream不缓存数据 直接输出
# 所有,只需要关闭OutStream即可
if not self.delay:
self.stream.close()
logging调用我们覆盖的emit方法
doChangeFile关闭了stream,
当 logging.FileHandler.emit(self,record)
的时候 stream其实已经关闭了。
def emit(self, record):
"""发送一个日志记录
覆盖FileHandler中的emit方法,logging会自动调用此方法"""
try:
if self.shouldChangeFileToWrite():
self.doChangeFile()
#此时 stram已经关闭
logging.FileHandler.emit(self,record)
except (KeyboardInterrupt,SystemExit):
raise
except:
self.handleError(record)
我们看一下 logging.FileHandler.emit的源码:
def emit(self, record):
"""
Emit a record.
If the stream was not opened because 'delay' was specified in the
constructor, open it before calling the superclass's emit.
"""
if self.stream is None:
#打开stream
self.stream = self._open()
StreamHandler.emit(self, record)
logging.FileHandler.emit中 检查 当stream为 None的情况下 重新打开 steam
然而我们在doChangeFile中仅仅关闭了stream stram.close()但是并没有设置stream为 None。关闭的stream仍然还是 标准流对象,并不会成为None
#coding=utf-8
import sys
#stream 就是标准输出流,或者标准错误流,logging源码中默认的是标准错误流
#我们来看一下stream是什么东西
stream = sys.stdout
#可以看到是一个file对象
print type(stream)
#写入文件,刷新缓冲区(如果没有设置缓冲区,则可以不刷新)关闭流
stream.write('abc\n')
stream.flush()
stream.close()
#流关闭后,还会是file对象么
#是的 关闭后仍然是file对象
print type(stream)
#可以看到 报错信息为 对已经关闭的文件对象file进行io操作,说明sream关闭后仍然是file对象。
#所以说我们需要 将已经关闭的stream设置为None,srteam = None
# 避免对已关闭的文件对象进行i0操作。
```
![image.png](https://img.haomeiwen.com/i4131789/0ff3830bc4ce6dba.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
找到问题所在 那么久好办咯:
在doChangeFile中将关闭后的stream 重新设置为 None即可
```
if self.stream:
#flush close 都会刷新缓冲区,flush不会关闭stream,close则关闭stream
#self.stream.flush()
self.stream.close()
#关闭stream后必须重新设置stream为None,否则会造成对已关闭文件进行IO操作。
self.stream = None
#delay 为False 表示 不OutStream不缓存数据 直接输出
# 所有,只需要关闭OutStream即可
if not self.delay:
#这个地方如果关闭colse那么就会造成进程往已关闭的文件中写数据,从而造成IO错误
#delay == False 表示的就是 不缓存直接写入磁盘
#我们需要重新在打开一次stream
#self.stream.close()
self.stream = self._open()
```
if not self.delay中为甚要打开stream内
![image.png](https://img.haomeiwen.com/i4131789/5033b4310e35461b.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
在这里我们可以看到
delay为False的时候 需要打开stream
FileHandler_init我们在 我们写的多进程版Handler_init中已经提前初始化了。多进程后面使用中可能会造成stream关闭。所以再打开一次。
这样就好了
改正后完整的代码如下:
```
#coding=utf-8
import os
import re
import datetime
import logging
try:
import codecs
except ImportError:
codecs = None
class MultiprocessHandler(logging.FileHandler):
"""支持多进程的TimedRotatingFileHandler"""
def __init__(self,filename,when='D',backupCount=0,encoding=None,delay=False):
"""filename 日志文件名,when 时间间隔的单位,backupCount 保留文件个数
delay 是否开启 OutSteam缓存
True 表示开启缓存,OutStream输出到缓存,待缓存区满后,刷新缓存区,并输出缓存数据到文件。
False表示不缓存,OutStrea直接输出到文件"""
self.prefix = filename
self.backupCount = backupCount
self.when = when.upper()
# 正则匹配 年-月-日
self.extMath = r"^\d{4}-\d{2}-\d{2}"
# S 每秒建立一个新文件
# M 每分钟建立一个新文件
# H 每天建立一个新文件
# D 每天建立一个新文件
self.when_dict = {
'S':"%Y-%m-%d-%H-%M-%S",
'M':"%Y-%m-%d-%H-%M",
'H':"%Y-%m-%d-%H",
'D':"%Y-%m-%d"
}
#日志文件日期后缀
self.suffix = self.when_dict.get(when)
if not self.suffix:
raise ValueError(u"指定的日期间隔单位无效: %s" % self.when)
#拼接文件路径 格式化字符串
self.filefmt = os.path.join("logs","%s.%s" % (self.prefix,self.suffix))
#使用当前时间,格式化文件格式化字符串
self.filePath = datetime.datetime.now().strftime(self.filefmt)
#获得文件夹路径
_dir = os.path.dirname(self.filefmt)
try:
#如果日志文件夹不存在,则创建文件夹
if not os.path.exists(_dir):
os.makedirs(_dir)
except Exception:
print u"创建文件夹失败"
print u"文件夹路径:" + self.filePath
pass
if codecs is None:
encoding = None
logging.FileHandler.__init__(self,self.filePath,'a+',encoding,delay)
def shouldChangeFileToWrite(self):
"""更改日志写入目的写入文件
:return True 表示已更改,False 表示未更改"""
#以当前时间获得新日志文件路径
_filePath = datetime.datetime.now().strftime(self.filefmt)
#新日志文件日期 不等于 旧日志文件日期,则表示 已经到了日志切分的时候
# 更换日志写入目的为新日志文件。
#例如 按 天 (D)来切分日志
# 当前新日志日期等于旧日志日期,则表示在同一天内,还不到日志切分的时候
# 当前新日志日期不等于旧日志日期,则表示不在
#同一天内,进行日志切分,将日志内容写入新日志内。
if _filePath != self.filePath:
self.filePath = _filePath
return True
return False
def doChangeFile(self):
"""输出信息到日志文件,并删除多于保留个数的所有日志文件"""
#日志文件的绝对路径
self.baseFilename = os.path.abspath(self.filePath)
#stream == OutStream
#stream is not None 表示 OutStream中还有未输出完的缓存数据
if self.stream:
#flush close 都会刷新缓冲区,flush不会关闭stream,close则关闭stream
#self.stream.flush()
self.stream.close()
#关闭stream后必须重新设置stream为None,否则会造成对已关闭文件进行IO操作。
self.stream = None
#delay 为False 表示 不OutStream不缓存数据 直接输出
# 所有,只需要关闭OutStream即可
if not self.delay:
#这个地方如果关闭colse那么就会造成进程往已关闭的文件中写数据,从而造成IO错误
#delay == False 表示的就是 不缓存直接写入磁盘
#我们需要重新在打开一次stream
#self.stream.close()
self.stream = self._open()
#删除多于保留个数的所有日志文件
if self.backupCount > 0:
print '删除日志'
for s in self.getFilesToDelete():
print s
os.remove(s)
def getFilesToDelete(self):
"""获得过期需要删除的日志文件"""
#分离出日志文件夹绝对路径
#split返回一个元组(absFilePath,fileName)
#例如:split('I:\ScripPython\char4\mybook\util\logs\mylog.2017-03-19)
#返回(I:\ScripPython\char4\mybook\util\logs, mylog.2017-03-19)
# _ 表示占位符,没什么实际意义,
dirName,_ = os.path.split(self.baseFilename)
fileNames = os.listdir(dirName)
result = []
#self.prefix 为日志文件名 列如:mylog.2017-03-19 中的 mylog
#加上 点号 . 方便获取点号后面的日期
prefix = self.prefix + '.'
plen = len(prefix)
for fileName in fileNames:
if fileName[:plen] == prefix:
#日期后缀 mylog.2017-03-19 中的 2017-03-19
suffix = fileName[plen:]
#匹配符合规则的日志文件,添加到result列表中
if re.compile(self.extMath).match(suffix):
result.append(os.path.join(dirName,fileName))
result.sort()
#返回 待删除的日志文件
# 多于 保留文件个数 backupCount的所有前面的日志文件。
if len(result) < self.backupCount:
result = []
else:
result = result[:len(result) - self.backupCount]
return result
def emit(self, record):
"""发送一个日志记录
覆盖FileHandler中的emit方法,logging会自动调用此方法"""
try:
if self.shouldChangeFileToWrite():
self.doChangeFile()
logging.FileHandler.emit(self,record)
except (KeyboardInterrupt,SystemExit):
raise
except:
self.handleError(record)
```
网友评论