今天想分享一个运维需求,使用 Python 来解决,希望对从事运维的同学有所帮助。
在一些 ETL 系统中,如数据仓库,需要先将源文件传输至 ETL 服务器,再开始 ETL 的跑批加工。有时候,源系统由于各种各样的问题导致文件未能及时传输到 ETL 服务器,这势必会影响 ETL 的加工进度,这时作为 ETL 服务器的运维人员就有必要对源系统的文件进行监控,该到达的时间未到达,大概率就是源系统出了问题,需要报警提示。
具体来说是这样的:
假如有 s1, s2, s3... s50 等共 50 个源系统供给 ETL 服务器,正常情况下,每个系统最后一个文件 finish.flag 达到时间分别为 t1, t2, t3...t50 ,假如 ETL 服务器在 t1+30 分钟仍未收到 s1 的 finish.flag,那么就需要报警 「s1 文件未到」,每天都要执行这样的检查,假如不借助调度工具,如何使用一个 Python 脚本来高效地实现呢?
思路:
1、50 个左右的源系统并不太多,我们可以系统名称和对应的时间存储在 Python 的列表中(对应其他语言中的数组),记代表为 A
2、 对 A 中的数据按时间先后顺序进行排序。并将相同时间系统的合并在一个列表中。
3、循环遍历数组 A:
取出第一个系统的检查时间(eg t1 + 30 分钟)记为 check_time,当前时间 now,计算时间差 diff = check_time - now。
如果 diff 为负值 ,则忽略,如果为正值 ,跳出当前循环。
接着循环,同样计算时间差,diff,如果为负,则加上一天的秒数。然后程序睡 diff 的时间再检查,未检查到相应 finish.flag,则报警。
然后取下一个系统。
如果循环结束,再从头开始循环。
程序持久运行在内存中。
4、这个问题,看起来简单,实际编码的过程中会遇到很多坑,给出代码如下,实际应用中可加以修改来适用。
#encoding=utf-8
import time
import datetime
import random
def main():
#模拟系统列表,这里用 6 个,实际为 50 个
#sys_list = [('s1',"23:03"),('s2',"23:04"),('s3',"23:06"),('s4',"00:29"),('s5',"02:25"),('s6',"23:08"),]
sys_list = [k for k in zip(['s1','s2','s3','s4','s5','s6'],[(datetime.datetime.now() + datetime.timedelta(seconds = random.randint(-5,6))) for _ in range(6)])]
#排序
sys_list = sorted(sys_list,key = lambda x:x[1])
tmp_dict = {}
for s in sys_list:
check_time = s[1].strftime("%H:%M:%S")
if check_time in tmp_dict:
tmp_dict[check_time].append(s[0])
else:
tmp_dict[check_time] = [s[0],]
sys_tupple = [(k,v) for k, v in tmp_dict.items()]
#系统个数,无限循环使用
sys_count = len(sys_tupple)
print(sys_tupple)
#循环检查 先将下标定位到 时间之差为 正值 的系统
#第二次循环如果时间差为负,说明需要第二天再检查,需要加上一天的秒数
i = 0
while i < sys_count:
now = datetime.datetime.now()
check_time_str = sys_tupple[i][0] #"22:25:00"
check_time_str_all = f'{now.strftime("%Y-%m-%d")} {check_time_str}' #"2019-03-03 22:25:00"
check_time = datetime.datetime.strptime(check_time_str_all,"%Y-%m-%d %H:%M:%S")# + datetime.timedelta(minutes = 30)
diff = (check_time - now).total_seconds()
if diff >= 0:
break
i += 1
while i < sys_count:
now = datetime.datetime.now()
check_time_str = sys_tupple[i][0] #"22:25:00"
check_time_str_all = f'{now.strftime("%Y-%m-%d")} {check_time_str}' #"2019-03-03 22:25:00"
check_time = datetime.datetime.strptime(check_time_str_all,"%Y-%m-%d %H:%M:%S")# + datetime.timedelta(minutes = 30)
diff = (check_time - now).total_seconds()
if diff < 0:
diff += 24*60*60
time.sleep(diff)
now = datetime.datetime.now()
print(f'now :{now.strftime("%H:%M:%S")}, check_time: {check_time_str}, sysid : {sys_tupple[i][0]}, sleep {diff}')
print(f'at {check_time.strftime("%H:%M:%S")} check {sys_tupple[i][1]}')
##在这里加入检查文件是否到达,未到达并报警的代码
#time.sleep(0.5)
i +=1
if i == sys_count:
i = 0
pass
if __name__ == "__main__":
main()
运行结果如下
[('16:35:34', ['s3', 's5']), ('16:35:38', ['s2']), ('16:35:39', ['s4']), ('16:35:44', ['s1', 's6'])]
now :16:35:39, check_time: 16:35:39, sysid : 16:35:39, sleep 0.913853
at 16:35:39 check ['s4']
now :16:35:44, check_time: 16:35:44, sysid : 16:35:44, sleep 4.999334
at 16:35:44 check ['s1', 's6']
网友评论