import logging
import logging.handlers
import queue
import time
import threading
import schedule
import os
import sys
import datetime
import hashlib
from pathlib import Path
import subprocess
import filecmp
import git
from git import Repo
import shutil
import os
# 第一次跑读取当前的config文件和tasks目录下的py文件存到内存,定时任务跑git pull 下载下来的config文件和tasks目录下的py文件和内存作对比,
# 有修改了的py脚本对应的schedule清掉,再新加schedule,其他schedule正常跑
# 对比config每一行配置是否改变,/对比tasks下文件数量、哪个py文件内容、文件名是否改变,变了就停掉这一行对应的py文件的schedule,其他schedule不停
def modify(pull_modi):
global files
global name_path
global sem
global run_job
# 删除本地
subprocess.run('rm -rf ./tasks')
subprocess.run('rm ./config.csv')
#主线程join 运行完再走后面的
repo = Repo("./")
git = repo.git
print(git.fetch('--all'))
git.reset('--hard', 'origin')
print("git pull is running", datetime.datetime.now())
logger.info("git pull is running")
if not Path('./tasks').exists():
logger.error("git拉取最新文件./tasks还没完成,不能开启新schedule,这个缓冲时间内旧的schedule也会找不到文件")
print("%s is running" % pull_modi, datetime.datetime.now())
# new_job 定时任务列表
# print(new_job)
# 读新下载下来的配置文件
with open('config.csv', 'r') as f2:
# csv文件第一行是格式注释,第一行不读跳过
files2 = f2.readlines()[1:]
# 每行内容文件名和频率
file1 = []
file2 = []
for x in files:
file1.append(x.rstrip())
for y in files2:
file2.append(y.rstrip())
# 旧py路径
print('name_path',
name_path) # name_path {'1.py': './tasks\\1.py', '2.py': './tasks\\2.py', '3.py': './tasks\\3.py', '4.py': './tasks\\4.py', '5.py': './tasks\\5.py', '6.py': './tasks\\6.py'}
logger.info("old_py_path:%s" % name_path)
old_files = []
for key in name_path.keys():
old_files.append(key)
# 新tasks文件夹下py文件的路径
for root_new, dirs_new, py_new in os.walk('./tasks'):
print(len(py_new))
new_path = {}
for file in py_new:
# print(os.path.join(root_new,file))
new_path[file] = os.path.join(root_new, file)
print('new_path',
new_path)
logger.info("new_py_path:%s" % new_path)
# 新旧py文件名对比
con_py = []
for i in range(0, len(files)):
config = {}
args = files[i].rstrip('\n').split(',')
if not args[0].endswith('.py'):
# print('%s格式不正确,不会定时跑,其他继续遍历' % args[0])
continue
else:
con_py.append(args[0])
print('old_py', con_py) # con_py ['1.py', '2.py', '3.py', '4.py', '5.py', '6.py', '8.py']
logger.info("old_py:%s" % con_py)
con_new_py = []
for i in range(0, len(files2)):
config = {}
args = files2[i].rstrip('\n').split(',')
if not args[0].endswith('.py'):
# print('%s格式不正确,不会定时跑,其他继续遍历' % args[0])
continue
else:
con_new_py.append(args[0])
print('new_py', con_new_py)
logger.info("new_py:%s" % con_new_py)
# 文件名称没变
same_py = list(set(con_new_py).intersection(set(con_py)))
print('same_py', same_py)
logger.info("same_py:%s" % same_py)
print('old_config', file1)
logger.info("old_config:%s" % file1)
print('new_config', file2)
logger.info("new_config:%s" % file2)
# 频率文件名称都没变
same_py_con = list(set(file1).intersection(set(file2)))
print('same_py_con', same_py_con)
clear = []
add = []
print('new_tasks', py_new)
print('old_tasks', old_files)
logger.info("new_tasks:%s" % py_new)
logger.info("old_tasks:%s" % old_files)
# 频率or文件名称变化了
for new in file2:
if new not in file1 and new.split(',')[0] in py_new:
add.append(new.split(',')[0])
for old in file1:
if old not in file2:
clear.append(old.split(',')[0])
# 频率文件名称都没变,文件增删改
for ss in same_py_con:
sspy = ss.split(',')[0]
# 文件还要在最新的tasks目录下
if sspy in py_new and sspy in old_files:
print('新旧tasks文件夹都有这个py文件%s,git pull后文件夹下是新的,python xx.py运行的是最新的' % sspy)
logger.info('新旧tasks文件夹都有这个py文件%s,git pull后文件夹下是新的,python xx.py运行的是最新的' % sspy)
elif sspy not in py_new and sspy not in old_files:
print('新旧tasks文件夹都无py文件%s,不清除旧的,也不加新的' % sspy)
logger.info('新旧tasks文件夹都无py文件%s,不清除旧的,也不加新的' % sspy)
elif sspy in py_new and sspy not in old_files:
add.append(sspy)
logger.info('新开启%s' % sspy)
elif sspy not in py_new and sspy in old_files:
clear.append(sspy)
logger.info('清除旧的%s' % sspy)
print('con_py', con_py)
print('con_new_py', con_new_py)
print('add', add)
print('clear', clear)
logger.info('新开启的任务一共有%s' % add)
logger.info('清除的任务一共有%s' % clear)
jobs = []
try:
for ad in add:
if not ad.endswith('.py'):
logger.warning('%s格式不正确,不会定时跑,其他继续遍历' % ad)
continue
for fi in range(0, len(file2)): # config py[1,3,5,7,8,11]
config = {}
args = file2[fi].rstrip('\n').split(',')
if ad == args[0]:
print('会跑',ad)
logger.info('新跑%s' % ad)
config['command'] = 'python ./tasks/' + args[0]
else:
continue
config['at'] = args[1]
if args[2].isdigit() and args[3].isdigit() and args[4].isdigit():
config['seconds'] = args[2]
config['minutes'] = args[3]
config['hours'] = args[4]
else:
logger.warning('%s时/分/秒格式不正确,不会定时跑,其他继续遍历' % args[0])
continue
if args[5].isdigit() and int(args[5]) in range(0,8):
config['day'] = args[5]
else:
logger.warning('%s星期%s格式不正确,不会定时跑,其他继续遍历' % (args[0],args[5]))
continue
jobs.append(config)
except NameError:
logger.error('配置文件变量获取失败')
except ValueError:
logger.error('配置定时参数类型转换异常')
except:
logger.error('配置定时参数异常')
logger.info('配置文件和定时参数如下')
logger.info(jobs)
# 获取各文件名和配置的频率,抽出来所有不为空的值和它对应的键,存到数组
each = []
try:
for jo in jobs:
content = {}
for key, value in jo.items():
if value != '0':
content[key] = value
each.append(content)
except NameError:
logger.error('配置定时参数不全为0的内容获取失败')
except:
logger.error('定时参数不全为0的获取异常')
com_job = []
try:
for eac in each:
key = list(eac.keys())
if len(key) != 2:
print('%s所配置的频率不支持,这个文件不跑,其他文件正常跑' % eac['command']) # 命令和频率放在同一个字典,key长度不为2则不添加schedule列表
logger.warning('%s所配置的频率不支持,这个文件不跑,其他文件正常跑' % eac['command'])
# 判断条件不能是key[1] == 'at',key每次排序都不一样
elif 'at' in key:
# schedule.every().day.at("19:13").do(thread,job,"python tasks/4.py")
s = 'schedule.every().day.at(\"%s\").do(thread,sem,job,\"%s\")' % (eac['at'], eac['command'])
print('每天几点跑', s)
# com_job.append(s + '.tag("3.py")')
com_job.append('%s.tag("%s")' % (s, eac['command'].split('/')[2]))
elif 'day' in key:
k = int(eac['day']) - 1
s = 'schedule.every().%s.do(thread,sem,job,\"%s\")' % (weekday[k], eac['command'])
print('每星期几跑', s)
com_job.append('%s.tag("%s")' % (s, eac['command'].split('/')[2]))
elif key[1] or key[0] in ['seconds', 'minutes', 'hours']:
print('每几秒、分、时跑', key)
# key[0] key[1]每次跑的结果不一样,所有不能以角标取固定的值
if 'seconds' in key:
s = 'schedule.every(%s).seconds.do(thread,sem,job,\"%s\")' % (eac['seconds'], eac['command'])
com_job.append('%s.tag("%s")' % (s, eac['command'].split('/')[2]))
elif 'minutes' in key:
s = 'schedule.every(%s).minutes.do(thread,sem,job,\"%s\")' % (eac['minutes'], eac['command'])
com_job.append('%s.tag("%s")' % (s, eac['command'].split('/')[2]))
elif 'hours' in key:
s = 'schedule.every(%s).hours.do(thread,sem,job,\"%s\")' % (eac['hours'], eac['command'])
com_job.append('%s.tag("%s")' % (s, eac['command'].split('/')[2]))
print('配置的定时任务是:', com_job)
logger.info('新配置的定时任务是:%s' % com_job)
except NameError:
logger.error('schedule语句拼接失败,其中有参数获取异常')
except:
logger.error('schedule语句错误')
for cl in clear:
print('schedule.clear("%s")' % cl)
logger.info('schedule.clear("%s")' % cl)
eval('schedule.clear("%s")' % cl)
sem = threading.Semaphore(len(run_job) + 1-len(clear)+len(add))
logger.info('新设置最大子线程数量%d' % int(len(run_job)-len(clear)+len(add)))
print('新设置最大子线程数量%d' % int(len(run_job)-len(clear)+len(add)))
try:
for new in com_job:
print('new schedule', new)
logger.info('new schedule%s' % new)
eval(new)
except:
print('new schedule运行异常')
# log打印,先创建文件夹
log_dir = Path('logs')
if not log_dir.exists():
os.makedirs('logs')
# 创建Logger
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
# 文件Handler
fileHandler = logging.handlers.TimedRotatingFileHandler('logs/run_tasks%s.log' % datetime.datetime.now().strftime('%y%m%d'), encoding='UTF-8', when='D', interval=1, backupCount=7)
fileHandler.setLevel(logging.NOTSET) #输出warning级别以上的日志
# Formatter
formatter = logging.Formatter('%(asctime)s-line %(lineno)d in %(filename)s - %(levelname)s - %(message)s')
fileHandler.setFormatter(formatter)
# 添加到Logger中
logger.addHandler(fileHandler)
files = ''
py_files = ''
weekday = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday']
name_path = {}
files2= ''
#封装读配置文件方法
def config_read():
global files
global weekday
global name_path
try:
# 读取配置文件,文件名+频率
with open('config.csv', 'r') as f:
# csv文件第一行是格式注释,第一行不读跳过
files = f.readlines()[1:]
if len(files)==0:
logger.error("配置文件无内容,请填好配置文件再重新运行")
sys.exit()
print('配置的脚本文件个数',len(files))
logger.info('配置的脚本文件个数%d' % len(files))
except FileNotFoundError:
logger.error("无配置文件,请填好配置文件再重新运行")
sys.exit()
except OSError:
logger.error('配置文件打开失败')
except:
logger.error('配置文件读取异常')
try:
# 对比config.csv里的文件个数和要跑的Py文件数量是否一致
for root, dirs, py_files in os.walk('./tasks'):
print(len(py_files))
for file in py_files:
# print(os.path.join(root,file))
name_path[file] = os.path.join(root, file)
if len(files) != len(py_files):
logger.warning('config.csv里的文件个数和要跑的Py文件数量不一致,请检查')
except:
logger.error('脚本文件夹读取异常')
# 定时单位 {'count','at','seconds', 'minutes', 'minute', 'hours', 'hour','days', 'day', 'start', 'end', 'weekday'}删减后只支持'at','seconds','minutes','hours','day'
# filename.py,'at','seconds','minutes','hours','day' 只支持配一个参数值,如果每天跑,必须传at其他不认,day 1-7 对应星期一~星期日
jobs = []
# 文件夹下面的文件
for py_name in os.walk('./tasks'):
print(py_name[2])
try:
for i in range(0, len(files)):
config = {}
args = files[i].rstrip('\n').split(',')
print('配置文件第%d行内容是%s' % (i+1, files[i]))
# 配置不合法的处理
if args[1] == '0' and args[2] == '0' and args[3] == '0' and args[4] == '0' and args[5] == '0':
print('%s频率不能全为空' % args[0])
continue
if args[0] not in py_name[2]:
print('%s文件不在tasks文件夹' % args[0])
continue
if args[0].endswith('.py'):
config['command'] = 'python ./tasks/' + args[0] # 换用+拼接,用string.join()不能传变量
else:
logger.warning('%s格式不正确,不会定时跑,其他继续遍历' % args[0])
continue
config['at'] = args[1]
if args[2].isdigit() and args[3].isdigit() and args[4].isdigit():
config['seconds'] = args[2]
config['minutes'] = args[3]
config['hours'] = args[4]
else:
logger.warning('%s时/分/秒格式不正确,不会定时跑,其他继续遍历' % args[0])
continue
if args[5].isdigit() and int(args[5]) in range(0,8):
config['day'] = args[5]
else:
logger.warning('%s星期%s格式不正确,不会定时跑,其他继续遍历' % (args[0],args[5]))
continue
jobs.append(config)
except NameError:
logger.error('配置文件变量获取失败')
except ValueError:
logger.error('配置定时参数类型转换异常')
except:
logger.error('配置定时参数异常')
logger.info('配置文件和定时参数如下')
logger.info(jobs)
# 获取各文件名和配置的频率,抽出来所有不为空的值和它对应的键,存到数组
each = []
try:
for i in jobs:
content = {}
for key, value in i.items():
if value != '0':
content[key] = value
each.append(content)
except NameError:
logger.error('配置定时参数不全为0的内容获取失败')
except:
logger.error('定时参数不全为0的获取异常')
# 拼接command_job语句
com_job = []
try:
for e in each:
key = list(e.keys())
if len(key) != 2:
print('%s所配置的频率不支持,这个文件不跑,其他文件正常跑' % e['command']) # 命令和频率放在同一个字典,key长度不为2则不添加schedule列表
logger.warning('%s所配置的频率不支持,这个文件不跑,其他文件正常跑' % e['command'])
# 判断条件不能是key[1] == 'at',key每次排序都不一样
elif 'at' in key:
# schedule.every().day.at("19:13").do(thread,job,"python tasks/4.py")
s = 'schedule.every().day.at(\"%s\").do(thread,sem,job,\"%s\")' % (e['at'], e['command'])
print('每天几点跑', s)
# com_job.append(s + '.tag("tasks")')
com_job.append('%s.tag("%s")' % (s, e['command'].split('/')[2]))
elif 'day' in key:
k = int(e['day']) - 1
s = 'schedule.every().%s.do(thread,sem,job,\"%s\")' % (weekday[k], e['command'])
print('每星期几跑', s)
com_job.append('%s.tag("%s")' % (s, e['command'].split('/')[2]))
elif key[1] or key[0] in ['seconds', 'minutes', 'hours']:
print('每几秒、分、时跑', key)
# key[0] key[1]每次跑的结果不一样,所有不能以角标取固定的值
if 'seconds' in key:
s = 'schedule.every(%s).seconds.do(thread,sem,job,\"%s\")' % (e['seconds'], e['command'])
com_job.append('%s.tag("%s")' % (s, e['command'].split('/')[2]))
elif 'minutes' in key:
s = 'schedule.every(%s).minutes.do(thread,sem,job,\"%s\")' % (e['minutes'], e['command'])
com_job.append('%s.tag("%s")' % (s, e['command'].split('/')[2]))
elif 'hours' in key:
s = 'schedule.every(%s).hours.do(thread,sem,job,\"%s\")' % (e['hours'], e['command'])
com_job.append('%s.tag("%s")' % (s, e['command'].split('/')[2]))
print('配置的定时任务是:', com_job)
except NameError:
logger.error('schedule语句拼接失败,其中有参数获取异常')
except:
logger.error('schedule语句错误')
return com_job
# 通用方法:命令执行py文件
def job(command):
# command 如'python ./tasks/1.py '
subprocess.run(command)
# 开启多线程 判断线程没等待就开启新线程 子线程发生的异常打印到log日志
def thread(sem_ar,job_func, command):
if sem_ar.acquire(False):
t = MyThread(sem_ar, job_func, command)
t.start()
else:
logger.warning('%s子线程等待中' % command)
class MyThread(threading.Thread):
def __init__(self,my_sem,func,command):
super().__init__()
self.my_sem = my_sem
self.func = func
self.command = command
def run(self):
try:
self.func(self.command)
except:
logger.error('子线程异常')
self.my_sem.release()
# 控制最大子线程数
run_job = config_read()
sem = threading.Semaphore(len(run_job)+1)
logger.info('设置最大子线程数量%d' % (len(run_job)+1))
print(('设置最大子线程数量%d' % (len(run_job)+1)))
# 每个配置文件的频率各起线程跑 如'schedule.every(1).seconds.do(thread,sem,job,"python tasks/1.py").tag("1.py")'
for j in range(0, len(run_job)):
logger.info('运行定时任务%s' % run_job[j])
eval(run_job[j])
#git pull & modify
schedule.every().day.at("20:06").do(thread,sem, modify,"pull_modify")
# #获取当前线程数量和内容
# def get_thread():
# print('当前线程', len(threading.enumerate()),threading.enumerate())
# #每五秒获取一次
# schedule.every(5).seconds.do(get_thread)
try:
while 1:
schedule.run_pending()
except TypeError:
logger.error('schedule传参异常')
except:
logger.error('schedule运行异常')
网友评论