美文网首页
批量定时任务框架,支持动态修改文件和频率

批量定时任务框架,支持动态修改文件和频率

作者: Mikasa___ | 来源:发表于2020-05-18 11:18 被阅读0次
    import logging
    import logging.handlers
    import queue
    import time
    import threading
    import schedule
    import os
    import sys
    import datetime
    import hashlib
    from pathlib import Path
    import subprocess
    import filecmp
    import git
    from git import Repo
    import shutil
    import os
    
    
    # 第一次跑读取当前的config文件和tasks目录下的py文件存到内存,定时任务跑git pull 下载下来的config文件和tasks目录下的py文件和内存作对比,
    # 有修改了的py脚本对应的schedule清掉,再新加schedule,其他schedule正常跑
    # 对比config每一行配置是否改变,/对比tasks下文件数量、哪个py文件内容、文件名是否改变,变了就停掉这一行对应的py文件的schedule,其他schedule不停
    def modify(pull_modi):
        global files
        global name_path
        global sem
        global run_job
    
    
        # 删除本地
        subprocess.run('rm -rf ./tasks')
        subprocess.run('rm ./config.csv')
        #主线程join 运行完再走后面的
        repo = Repo("./")
        git = repo.git
        print(git.fetch('--all'))
        git.reset('--hard', 'origin')
    
        print("git pull is running", datetime.datetime.now())
        logger.info("git pull is running")
        if not Path('./tasks').exists():
            logger.error("git拉取最新文件./tasks还没完成,不能开启新schedule,这个缓冲时间内旧的schedule也会找不到文件")
    
    
        print("%s is running" % pull_modi, datetime.datetime.now())
        # new_job 定时任务列表
        # print(new_job)
    
        # 读新下载下来的配置文件
        with open('config.csv', 'r') as f2:
            # csv文件第一行是格式注释,第一行不读跳过
            files2 = f2.readlines()[1:]
    
        # 每行内容文件名和频率
        file1 = []
        file2 = []
        for x in files:
            file1.append(x.rstrip())
    
        for y in files2:
            file2.append(y.rstrip())
        # 旧py路径
        print('name_path',
              name_path)  # name_path {'1.py': './tasks\\1.py', '2.py': './tasks\\2.py', '3.py': './tasks\\3.py', '4.py': './tasks\\4.py', '5.py': './tasks\\5.py', '6.py': './tasks\\6.py'}
        logger.info("old_py_path:%s" % name_path)
        old_files = []
        for key in name_path.keys():
            old_files.append(key)
    
        # 新tasks文件夹下py文件的路径
        for root_new, dirs_new, py_new in os.walk('./tasks'):
            print(len(py_new))
            new_path = {}
            for file in py_new:
                # print(os.path.join(root_new,file))
                new_path[file] = os.path.join(root_new, file)
        print('new_path',
              new_path)
        logger.info("new_py_path:%s" % new_path)
        # 新旧py文件名对比
        con_py = []
        for i in range(0, len(files)):
            config = {}
            args = files[i].rstrip('\n').split(',')
            if not args[0].endswith('.py'):
                # print('%s格式不正确,不会定时跑,其他继续遍历' % args[0])
                continue
            else:
                con_py.append(args[0])
    
        print('old_py', con_py)  # con_py ['1.py', '2.py', '3.py', '4.py', '5.py', '6.py', '8.py']
        logger.info("old_py:%s" % con_py)
        con_new_py = []
        for i in range(0, len(files2)):
            config = {}
            args = files2[i].rstrip('\n').split(',')
            if not args[0].endswith('.py'):
                # print('%s格式不正确,不会定时跑,其他继续遍历' % args[0])
                continue
    
            else:
                con_new_py.append(args[0])
    
        print('new_py', con_new_py)
        logger.info("new_py:%s" % con_new_py)
        # 文件名称没变
        same_py = list(set(con_new_py).intersection(set(con_py)))
        print('same_py', same_py)
        logger.info("same_py:%s" % same_py)
        print('old_config', file1)
        logger.info("old_config:%s" % file1)
        print('new_config', file2)
        logger.info("new_config:%s" % file2)
        # 频率文件名称都没变
        same_py_con = list(set(file1).intersection(set(file2)))
        print('same_py_con', same_py_con)
        clear = []
        add = []
        print('new_tasks', py_new)
        print('old_tasks', old_files)
        logger.info("new_tasks:%s" % py_new)
        logger.info("old_tasks:%s" % old_files)
        # 频率or文件名称变化了
        for new in file2:
            if new not in file1 and new.split(',')[0] in py_new:
                add.append(new.split(',')[0])
    
        for old in file1:
            if old not in file2:
                clear.append(old.split(',')[0])
        # 频率文件名称都没变,文件增删改
        for ss in same_py_con:
            sspy = ss.split(',')[0]
            # 文件还要在最新的tasks目录下
            if sspy in py_new and sspy in old_files:
                print('新旧tasks文件夹都有这个py文件%s,git pull后文件夹下是新的,python xx.py运行的是最新的' % sspy)
                logger.info('新旧tasks文件夹都有这个py文件%s,git pull后文件夹下是新的,python xx.py运行的是最新的' % sspy)
            elif sspy not in py_new and sspy not in old_files:
                print('新旧tasks文件夹都无py文件%s,不清除旧的,也不加新的' % sspy)
                logger.info('新旧tasks文件夹都无py文件%s,不清除旧的,也不加新的' % sspy)
            elif sspy in py_new and sspy not in old_files:
                add.append(sspy)
                logger.info('新开启%s' % sspy)
            elif sspy not in py_new and sspy in old_files:
                clear.append(sspy)
                logger.info('清除旧的%s' % sspy)
    
        print('con_py', con_py)
        print('con_new_py', con_new_py)
        print('add', add)
        print('clear', clear)
        logger.info('新开启的任务一共有%s' % add)
        logger.info('清除的任务一共有%s' % clear)
    
        jobs = []
        try:
            for ad in add:
                if not ad.endswith('.py'):
                    logger.warning('%s格式不正确,不会定时跑,其他继续遍历' % ad)
                    continue
                for fi in range(0, len(file2)):        # config py[1,3,5,7,8,11]
                    config = {}
                    args = file2[fi].rstrip('\n').split(',')
    
                    if ad == args[0]:
                        print('会跑',ad)
                        logger.info('新跑%s' % ad)
                        config['command'] = 'python ./tasks/' + args[0]
                    else:
                        continue
    
                    config['at'] = args[1]
                    if args[2].isdigit() and args[3].isdigit() and args[4].isdigit():
                        config['seconds'] = args[2]
                        config['minutes'] = args[3]
                        config['hours'] = args[4]
                    else:
                        logger.warning('%s时/分/秒格式不正确,不会定时跑,其他继续遍历' % args[0])
                        continue
                    if args[5].isdigit() and int(args[5]) in range(0,8):
                        config['day'] = args[5]
                    else:
                        logger.warning('%s星期%s格式不正确,不会定时跑,其他继续遍历' % (args[0],args[5]))
                        continue
                    jobs.append(config)
        except NameError:
            logger.error('配置文件变量获取失败')
        except ValueError:
            logger.error('配置定时参数类型转换异常')
        except:
            logger.error('配置定时参数异常')
        logger.info('配置文件和定时参数如下')
        logger.info(jobs)
        # 获取各文件名和配置的频率,抽出来所有不为空的值和它对应的键,存到数组
        each = []
        try:
            for jo in jobs:
                content = {}
                for key, value in jo.items():
                    if value != '0':
                        content[key] = value
                each.append(content)
        except NameError:
            logger.error('配置定时参数不全为0的内容获取失败')
        except:
            logger.error('定时参数不全为0的获取异常')
    
        com_job = []
        try:
            for eac in each:
                key = list(eac.keys())
                if len(key) != 2:
                    print('%s所配置的频率不支持,这个文件不跑,其他文件正常跑' % eac['command'])   # 命令和频率放在同一个字典,key长度不为2则不添加schedule列表
                    logger.warning('%s所配置的频率不支持,这个文件不跑,其他文件正常跑' % eac['command'])
                # 判断条件不能是key[1] == 'at',key每次排序都不一样
                elif 'at' in key:
                    # schedule.every().day.at("19:13").do(thread,job,"python tasks/4.py")
                    s = 'schedule.every().day.at(\"%s\").do(thread,sem,job,\"%s\")' % (eac['at'], eac['command'])
                    print('每天几点跑', s)
                    # com_job.append(s + '.tag("3.py")')
                    com_job.append('%s.tag("%s")' % (s, eac['command'].split('/')[2]))
                elif 'day' in key:
    
                    k = int(eac['day']) - 1
                    s = 'schedule.every().%s.do(thread,sem,job,\"%s\")' % (weekday[k], eac['command'])
                    print('每星期几跑', s)
                    com_job.append('%s.tag("%s")' % (s, eac['command'].split('/')[2]))
                elif key[1] or key[0] in ['seconds', 'minutes', 'hours']:
                    print('每几秒、分、时跑', key)
                    # key[0] key[1]每次跑的结果不一样,所有不能以角标取固定的值
                    if 'seconds' in key:
                        s = 'schedule.every(%s).seconds.do(thread,sem,job,\"%s\")' % (eac['seconds'], eac['command'])
                        com_job.append('%s.tag("%s")' % (s, eac['command'].split('/')[2]))
                    elif 'minutes' in key:
                        s = 'schedule.every(%s).minutes.do(thread,sem,job,\"%s\")' % (eac['minutes'], eac['command'])
                        com_job.append('%s.tag("%s")' % (s, eac['command'].split('/')[2]))
                    elif 'hours' in key:
                        s = 'schedule.every(%s).hours.do(thread,sem,job,\"%s\")' % (eac['hours'], eac['command'])
                        com_job.append('%s.tag("%s")' % (s, eac['command'].split('/')[2]))
            print('配置的定时任务是:', com_job)
            logger.info('新配置的定时任务是:%s' % com_job)
        except NameError:
            logger.error('schedule语句拼接失败,其中有参数获取异常')
        except:
            logger.error('schedule语句错误')
    
    
        for cl in clear:
            print('schedule.clear("%s")' % cl)
            logger.info('schedule.clear("%s")' % cl)
            eval('schedule.clear("%s")' % cl)
    
        sem = threading.Semaphore(len(run_job) + 1-len(clear)+len(add))
        logger.info('新设置最大子线程数量%d' % int(len(run_job)-len(clear)+len(add)))
        print('新设置最大子线程数量%d' % int(len(run_job)-len(clear)+len(add)))
        try:
    
            for new in com_job:
                print('new schedule', new)
                logger.info('new schedule%s' % new)
                eval(new)
        except:
            print('new schedule运行异常')
    
    
    
    # log打印,先创建文件夹
    log_dir = Path('logs')
    if not log_dir.exists():
        os.makedirs('logs')
    # 创建Logger
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    
    # 文件Handler
    fileHandler = logging.handlers.TimedRotatingFileHandler('logs/run_tasks%s.log' % datetime.datetime.now().strftime('%y%m%d'), encoding='UTF-8', when='D', interval=1, backupCount=7)
    fileHandler.setLevel(logging.NOTSET)  #输出warning级别以上的日志
    
    # Formatter
    formatter = logging.Formatter('%(asctime)s-line %(lineno)d in %(filename)s - %(levelname)s - %(message)s')
    fileHandler.setFormatter(formatter)
    # 添加到Logger中
    logger.addHandler(fileHandler)
    
    files = ''
    py_files = ''
    weekday = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday']
    name_path = {}
    files2= ''
    
    #封装读配置文件方法
    def config_read():
        global files
        global weekday
        global name_path
        try:
            # 读取配置文件,文件名+频率
            with open('config.csv', 'r') as f:
                # csv文件第一行是格式注释,第一行不读跳过
                files = f.readlines()[1:]
            if len(files)==0:
                logger.error("配置文件无内容,请填好配置文件再重新运行")
                sys.exit()
            print('配置的脚本文件个数',len(files))
            logger.info('配置的脚本文件个数%d' % len(files))
        except FileNotFoundError:
            logger.error("无配置文件,请填好配置文件再重新运行")
            sys.exit()
        except OSError:
            logger.error('配置文件打开失败')
        except:
            logger.error('配置文件读取异常')
    
        try:
            # 对比config.csv里的文件个数和要跑的Py文件数量是否一致
    
            for root, dirs, py_files in os.walk('./tasks'):
                print(len(py_files))
                for file in py_files:
                    # print(os.path.join(root,file))
                    name_path[file] = os.path.join(root, file)
            if len(files) != len(py_files):
                logger.warning('config.csv里的文件个数和要跑的Py文件数量不一致,请检查')
        except:
            logger.error('脚本文件夹读取异常')
        # 定时单位 {'count','at','seconds', 'minutes', 'minute', 'hours', 'hour','days', 'day', 'start', 'end', 'weekday'}删减后只支持'at','seconds','minutes','hours','day'
        # filename.py,'at','seconds','minutes','hours','day' 只支持配一个参数值,如果每天跑,必须传at其他不认,day 1-7 对应星期一~星期日
        jobs = []
        # 文件夹下面的文件
        for py_name in os.walk('./tasks'):
            print(py_name[2])
        try:
            for i in range(0, len(files)):
                config = {}
                args = files[i].rstrip('\n').split(',')
                print('配置文件第%d行内容是%s' % (i+1, files[i]))
                # 配置不合法的处理
                if args[1] == '0' and args[2] == '0' and args[3] == '0' and args[4] == '0' and args[5] == '0':
                    print('%s频率不能全为空' % args[0])
                    continue
                if args[0] not in py_name[2]:
                    print('%s文件不在tasks文件夹' % args[0])
                    continue
                if args[0].endswith('.py'):
                    config['command'] = 'python ./tasks/' + args[0]  # 换用+拼接,用string.join()不能传变量
                else:
                    logger.warning('%s格式不正确,不会定时跑,其他继续遍历' % args[0])
                    continue
    
                config['at'] = args[1]
                if args[2].isdigit() and args[3].isdigit() and args[4].isdigit():
                    config['seconds'] = args[2]
                    config['minutes'] = args[3]
                    config['hours'] = args[4]
                else:
                    logger.warning('%s时/分/秒格式不正确,不会定时跑,其他继续遍历' % args[0])
                    continue
                if args[5].isdigit() and int(args[5]) in range(0,8):
                    config['day'] = args[5]
                else:
                    logger.warning('%s星期%s格式不正确,不会定时跑,其他继续遍历' % (args[0],args[5]))
                    continue
                jobs.append(config)
        except NameError:
            logger.error('配置文件变量获取失败')
        except ValueError:
            logger.error('配置定时参数类型转换异常')
        except:
            logger.error('配置定时参数异常')
        logger.info('配置文件和定时参数如下')
        logger.info(jobs)
        # 获取各文件名和配置的频率,抽出来所有不为空的值和它对应的键,存到数组
        each = []
        try:
            for i in jobs:
                content = {}
                for key, value in i.items():
                    if value != '0':
                        content[key] = value
                each.append(content)
        except NameError:
            logger.error('配置定时参数不全为0的内容获取失败')
        except:
            logger.error('定时参数不全为0的获取异常')
    
        # 拼接command_job语句
        com_job = []
        try:
            for e in each:
                key = list(e.keys())
                if len(key) != 2:
                    print('%s所配置的频率不支持,这个文件不跑,其他文件正常跑' % e['command'])   # 命令和频率放在同一个字典,key长度不为2则不添加schedule列表
                    logger.warning('%s所配置的频率不支持,这个文件不跑,其他文件正常跑' % e['command'])
                # 判断条件不能是key[1] == 'at',key每次排序都不一样
                elif 'at' in key:
                    # schedule.every().day.at("19:13").do(thread,job,"python tasks/4.py")
                    s = 'schedule.every().day.at(\"%s\").do(thread,sem,job,\"%s\")' % (e['at'], e['command'])
                    print('每天几点跑', s)
                    # com_job.append(s + '.tag("tasks")')
                    com_job.append('%s.tag("%s")' % (s, e['command'].split('/')[2]))
                elif 'day' in key:
    
                    k = int(e['day']) - 1
                    s = 'schedule.every().%s.do(thread,sem,job,\"%s\")' % (weekday[k], e['command'])
                    print('每星期几跑', s)
                    com_job.append('%s.tag("%s")' % (s, e['command'].split('/')[2]))
                elif key[1] or key[0] in ['seconds', 'minutes', 'hours']:
                    print('每几秒、分、时跑', key)
                    # key[0] key[1]每次跑的结果不一样,所有不能以角标取固定的值
                    if 'seconds' in key:
                        s = 'schedule.every(%s).seconds.do(thread,sem,job,\"%s\")' % (e['seconds'], e['command'])
                        com_job.append('%s.tag("%s")' % (s, e['command'].split('/')[2]))
                    elif 'minutes' in key:
                        s = 'schedule.every(%s).minutes.do(thread,sem,job,\"%s\")' % (e['minutes'], e['command'])
                        com_job.append('%s.tag("%s")' % (s, e['command'].split('/')[2]))
                    elif 'hours' in key:
                        s = 'schedule.every(%s).hours.do(thread,sem,job,\"%s\")' % (e['hours'], e['command'])
                        com_job.append('%s.tag("%s")' % (s, e['command'].split('/')[2]))
            print('配置的定时任务是:', com_job)
        except NameError:
            logger.error('schedule语句拼接失败,其中有参数获取异常')
        except:
            logger.error('schedule语句错误')
    
        return com_job
    
    # 通用方法:命令执行py文件
    def job(command):
        # command 如'python ./tasks/1.py '
        subprocess.run(command)
    
    
    # 开启多线程 判断线程没等待就开启新线程 子线程发生的异常打印到log日志
    def thread(sem_ar,job_func, command):
        if sem_ar.acquire(False):
            t = MyThread(sem_ar, job_func, command)
            t.start()
        else:
            logger.warning('%s子线程等待中' % command)
    
    class MyThread(threading.Thread):
        def __init__(self,my_sem,func,command):
            super().__init__()
            self.my_sem = my_sem
            self.func = func
            self.command = command
    
    
        def run(self):
            try:
                self.func(self.command)
            except:
                logger.error('子线程异常')
            self.my_sem.release()
    
    # 控制最大子线程数
    run_job = config_read()
    sem = threading.Semaphore(len(run_job)+1)
    logger.info('设置最大子线程数量%d' % (len(run_job)+1))
    print(('设置最大子线程数量%d' % (len(run_job)+1)))
    
    # 每个配置文件的频率各起线程跑 如'schedule.every(1).seconds.do(thread,sem,job,"python tasks/1.py").tag("1.py")'
    for j in range(0, len(run_job)):
        logger.info('运行定时任务%s' % run_job[j])
        eval(run_job[j])
    #git pull & modify
    schedule.every().day.at("20:06").do(thread,sem, modify,"pull_modify")
    
    
    
    # #获取当前线程数量和内容
    # def get_thread():
    #     print('当前线程', len(threading.enumerate()),threading.enumerate())
    # #每五秒获取一次
    # schedule.every(5).seconds.do(get_thread)
    try:
        while 1:
            schedule.run_pending()
    except TypeError:
        logger.error('schedule传参异常')
    except:
        logger.error('schedule运行异常')
    
    

    相关文章

      网友评论

          本文标题:批量定时任务框架,支持动态修改文件和频率

          本文链接:https://www.haomeiwen.com/subject/nyquohtx.html