美文网首页
python watchdog 实现文件目录的监控

python watchdog 实现文件目录的监控

作者: 风轻云淡lq | 来源:发表于2019-08-13 10:04 被阅读0次

    使用: 如果想监控目录,做相应逻辑处理(不想大费周章),相信这会是你很好的选择

    **需求: 实现linux 目录监控,将新增的文件放入处理引擎中执行, python 3以上 **

    注意事项:

    由于文件上传到指定目录时,会触发多重监控事件,需要做逻辑处理

    watchdog 简单demo 使用

    # -*- coding: utf-8 -*-
    # 作用: 用于监控目录的变化,调用对应的处理逻辑
    
    from watchdog.observers import Observer
    from watchdog.events import *
    import time
    import sys
    import os
    import logging
    import zipfile
    
    # 设置系统编码格式
    reload(sys)
    sys.setdefaultencoding('utf8')
    
    # 创建一个logger,设置日志
    logger = logging.getLogger('MonitorDir')
    logger.setLevel(logging.DEBUG)
    
    # 创建一个handler,用于写入日志文件
    fh = logging.FileHandler('E:/testLog.log')
    fh.setLevel(logging.DEBUG)
    
    # 再创建一个handler,用于输出到控制台
    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)
    
    # 定义handler的输出格式
    formatter = logging.Formatter(
        '[%(asctime)s] [%(thread)d] [%(filename)s] [line: %(lineno)d][%(levelname)s] ## %(message)s')
    fh.setFormatter(formatter)
    ch.setFormatter(formatter)
    
    # 给logger添加handler
    logger.addHandler(fh)
    logger.addHandler(ch)
    
    class FileEventHandler(FileSystemEventHandler):
    
        def __init__(self):
            FileSystemEventHandler.__init__(self)
    
        def on_moved(self, event):
            if event.is_directory:
                logger.info("directory moved from {0} to {1}".format(event.src_path, event.dest_path))
            else:
                logger.info("file moved from {0} to {1}".format(event.src_path, event.dest_path))
    
        def on_created(self, event):
            if event.is_directory:
                logger.info("directory created:{0}".format(event.src_path))
            else:
                logger.info("file created:{0}".format(event.src_path))
    
        def on_deleted(self, event):
            if event.is_directory:
                logger.info("directory deleted:{0}".format(event.src_path))
            else:
                logger.info("file deleted:{0}".format(event.src_path))
        # 主要监控目录下有文件修改
        def on_modified(self, event):
            # 监控目录下面的目录
            if event.is_directory:
                logger.info("directory modified:{0}".format(event.src_path))
            else:
                logger.info("file modified:{0}".format(event.src_path))
    
    if __name__ == "__main__":
        observer = Observer()
        event_handler = FileEventHandler()
        # 监控目录
        observer.schedule(event_handler, "E:\TestMonitor", True)
        observer.start()
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            observer.stop()
        observer.join()
    

    通过上面的学习,想必已经对watchdog 有个简单的了解,下面进入正题,直接上实际使用中的代码,也会讲解使用中用到的一些问题,多的不说,正式上代码开始

    # -*- coding: utf-8 -*-
    # 作用: 用于监控目录的变化,调用对应的处理逻辑
    
    from watchdog.observers import Observer
    from watchdog.events import *
    import time
    import sys
    import os
    import logging
    import zipfile
    import DiffPlatDeal
    import json
    import requests
    
    # 设置系统编码格式
    # reload(sys)
    # sys.setdefaultencoding('utf8')
    
    # 创建一个logger
    logger = logging.getLogger('MonitorDir')
    logger.setLevel(logging.DEBUG)
    
    # 创建一个handler,用于写入日志文件
    logTime = time.strftime('%Y%m%d', time.localtime(time.time()))
    fh = logging.FileHandler('/home/liuqing/logData/MonitorDirLog_' + logTime + '.log')
    fh.setLevel(logging.DEBUG)
    
    # 再创建一个handler,用于输出到控制台
    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)
    
    # 定义handler的输出格式
    formatter = logging.Formatter(
        '[%(asctime)s] [%(thread)d] [%(filename)s] [line: %(lineno)d][%(levelname)s] ## %(message)s')
    fh.setFormatter(formatter)
    ch.setFormatter(formatter)
    
    # 给logger添加handler
    logger.addHandler(fh)
    logger.addHandler(ch)
    
    
    class FileEventHandler(FileSystemEventHandler):
    
        def __init__(self):
            FileSystemEventHandler.__init__(self)
    
        def on_moved(self, event):
            if event.is_directory:
                logger.info("directory moved from {0} to {1}".format(event.src_path, event.dest_path))
            else:
                logger.info("file moved from {0} to {1}".format(event.src_path, event.dest_path))
    
        def on_created(self, event):
            if event.is_directory:
                logger.info("directory created:{0}".format(event.src_path))
            else:
                logger.info("file created:{0}".format(event.src_path))
    
        def on_deleted(self, event):
            if event.is_directory:
                logger.info("directory deleted:{0}".format(event.src_path))
            else:
                logger.info("file deleted:{0}".format(event.src_path))
    
        # 主要监控目录下有文件修改
        def on_modified(self, event):
            sourcePath = event.src_path
            global lastFile
            global n
            # 监控目录下面的目录
            if event.is_directory:
                logger.info("directory modified:{0}".format(sourcePath))
            else:
                size = os.path.getsize(sourcePath)
                if size == 0:
                    logger.info('文件是空的')
                else:
                    # 用于钉钉监控
                    url = "xxxxxxxx"
                    header = {
                        "Content-Type": "application/json",
                        "charset": "utf-8"
                    }
                    try:
                        filename, type = os.path.splitext(sourcePath)
                        # 处理临时文件以及压缩文件
                        if 'tmp' in type:
                            logger.info('文件是临时文件,不做处理')
                        elif 'zip' in type:
                            logger.info('需要先解压文件')
                            f = zipfile.ZipFile(sourcePath, 'r')
                            f.extractall(filename + '/zipData')
                        elif 'swp' in type:
                            logger.info('文件是swp临时文件,不做处理')
                        elif lastFile == sourcePath:
                            logger.info('处理文件重复的问题:{0}'.format(sourcePath))
                        else:
                            # 在这里面调用处理逻辑
                            nowTime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
                            if type != ".txt":
                                lastFile = sourcePath
                                logger.info("file modified:{0}".format(sourcePath))
                                listFile.add(sourcePath)
                                logger.info("list:{0}".format(listFile))
                            # 时间处理,20s 执行一次或则list 文件中数量大于4个直接处理
                            if int(n) > 20 and len(listFile) == 0:
                                n = 0
                            if int(n) > 20 and len(listFile) > 0 and len(listFile) < 4:
                                logger.info("n:{0};listFile:{1}".format(n, listFile))
                                # 调用处理的方法
                                DiffPlatDeal.xxxx(logger, listFile)
                                sendList = []
                                for file in listFile:
                                    sendList.append(os.path.basename(file))
    
                                dataSucee1 = {
                                    "msgtype": "text",
                                    "text": {
                                        "content": "【Success: 处理成功】:【"+nowTime+"】\n"+ ";  ####\n".join(sendList)+"\n 【总条数:】"+str(len(sendList))
                                    }
                                }
                                sendData1 = json.dumps(dataSucee1).encode("utf-8")
                                requests.post(url=url, data=sendData1, headers=header)
                                n = 0
                                listFile.clear()
                            # 当list中的值大于等于4个后在处理
                            if len(listFile) >= 4:
                                # 调用处理逻辑然后清空list
                                DiffPlatDeal.xxxxxx(logger, listFile)
                                # 清空list
                                sendList = []
                                for file in listFile:
                                    sendList.append(os.path.basename(file))
                                dataSucee = {
                                    "msgtype": "text",
                                    "text": {
                                        "content": "【Success: 处理成功】:【"+nowTime+"】\n"+  ";  ####\n".join(sendList)+"\n 【总条数:】"+str(len(sendList))
                                    }
                                }
                                sendData = json.dumps(dataSucee).encode("utf-8")
                                requests.post(url=url, data=sendData, headers=header)
                                listFile.clear()
    
                    except Exception as e:
                        logger.error(e)
                        time.sleep(5)
                        #  读取文件,获取异常文件名发出通知
                        with open("/xxxxx/py/readExcelList/list.txt", 'r') as f:  # 打开文件
                            lines = f.readlines()  # 读取所有行
                            last_line = lines[-1].strip()  # 取最后一行
                            # 处理异常
                            if len(last_line) != 0:
                                if len(last_line) != 0 and len(last_line.split("::")) == 2:
                                    if "," not in last_line.split("::")[1]:
                                        logger.error('当前批次文件全部有问题')
                                        sendList = []
                                        for file in listFile:
                                            sendList.append(os.path.basename(file))
                                        data = {
                                            "msgtype": "text",
                                            "text": {
                                                "content": "【Fail: 文件没有处理】:【"+nowTime+"】\n"+  ";  ####\n".join(sendList)+"\n【错误原因】:\n"+str(e)+"\n 【总条数:】"+str(len(sendList))
                                            }
                                        }
                                        sendData = json.dumps(data).encode("utf-8")
                                        requests.post(url=url, data=sendData, headers=header)
                                    else:
                                        spline = last_line.split("::")[1].split(",")
                                        logger.info('Except当前批次处理的文件:{0}'.format(spline))
                                        tmpset = listFile - set(spline)
                                        sendList = []
                                        for file in tmpset:
                                            sendList.append(os.path.basename(file))
                                        data = {
                                            "msgtype": "text",
                                            "text": {
                                                "content": "【Fail: 文件没有处理】:【"+nowTime+"】\n"+  ";  ####\n".join(sendList)+"\n【错误原因】:\n"+str(e)+"\n 【总条数:】"+str(len(sendList))
                                            }
                                        }
                                        sendData = json.dumps(data).encode("utf-8")
                                        requests.post(url=url, data=sendData, headers=header)
                                # 当文件异常时写入换行符
                                with open('/xxxxx/readExcelList/list.txt', 'a') as f:
                                    f.write("\n")
                        listFile.clear()
    
    
    if __name__ == "__main__":
        # 定义一个集合,将一段时间内的目录放入这个集合中:
        listFile = set()
        # 定义一个变量,如果变量达到某个值就执行处理操作
        n = 0
        # 用于获取上次监控的文件
        lastFile = ""
        observer = Observer()
        event_handler = FileEventHandler()
        observer.schedule(event_handler, "/xxxxxx/importData", True)
        observer.start()
        try:
            while True:
                time.sleep(1)
                n += 1
                # 为了触发 def on_modified(self, event) 操作
                if n == 25:
                    with open('/xxxxxxx/importData/test.txt', 'w+') as f:
                        f.write("test")
        except KeyboardInterrupt:
            observer.stop()
        observer.join()
    
    

    代码讲解:

    1. 作用: 用于监控目录 observer.schedule(event_handler, "/xxxxxx/importData", True) , 程序会将上传文件写入listFile = set() 集合当中,触发逻辑处理条件是: f int(n) > 20 and len(listFile) > 0 and len(listFile) < 4【大于20秒,并且set当中文件数量需要小于4,为什么要用set,因为文件写入目录的过程当作会多次触发def on_modified(self, event),避免文件重复】 或则 len(listFile) >= 4【文件数量大于4个】; 处理执行逻辑DiffPlatDeal.xxxxxx,根据自己业务需求做变化
    1. 文件处理的过程中【如果上次传入文件和这次相同,不做任何操作,也是为了避免文件误传,多次触发】 elif lastFile == sourcePath:
      logger.info('处理文件重复的问题:{0}'.format(sourcePath))
    1. 如果文件处理失败,会记录下处理到那个文件了,然后做出对应钉钉告警信息
      异常处理逻辑: except Exception as e: xxxxxx

    相关文章

      网友评论

          本文标题:python watchdog 实现文件目录的监控

          本文链接:https://www.haomeiwen.com/subject/etomjctx.html