基本库 sched
sched主要方法
- enter(delay, priority, action, argument),安排一个事件来延迟delay个时间单位。
- cancel(event):从队列中删除事件。如果事件不是当前队列中的事件,则该方法将跑出一个ValueError。
- run():运行所有预定的事件。这个函数将等待(使用传递给构造函数的delayfunc()函数),然后执行事件,直到不再有预定的事件。
使用案例:
import time
import sched
def thread():
# TODO do something
print("hello")
def time_printer():
thread()
loop_monitor()
def loop_monitor():
s = sched.scheduler(time.time, time.sleep) # 生成调度器
s.enter(5, 1, time_printer, ())
s.run()
if __name__ == "__main__":
loop_monitor()
第三方库 schedule
Scheduler相关API:
- run_pending(): 运行所有可以运行的任务
- run_all(delay_seconds=0): 运行所有任务,不管是否应该运行
- clear(): 删除所有调度的任务
- cancel_job(job): 删除一个任务
- everyevery(interval=1): 创建一个调度任务, 返回的是一个job
- _run_job(job): 运行一个job
- next_run(): 获取下一个要运行任务的时间, 这里使用的是min去得到最近将执行的job, 之所以这样使用,是Job重载了_lt方法,这样写起来确实很简洁.
- idle_seconds(): 还有多少秒即将开始运行任务.
- every(interval=1): 设置调度间隔
Job相关API:
- second/minute/hour/day/week/monday: 每秒/分钟/小时/天/周/月运行一次(interval ==1)
- seconds/minutes/hours/days/weeks/mondays: 每隔n秒/分钟/小时/天/周/月运行一次(interval ==n), 与every配合使用
- at: 表示某天的某个时间点
- do: 设置job对应的函数以及参数
使用案例:
import schedule
def thread():
# TODO do something
print("hello")
if __name__ == '__main__':
# 每隔10分钟执行一次任务
schedule.every(10).minutes.do(job)
# 每隔一小时执行一次任务
schedule.every().hour.do(job)
# 每天10:29清除所有任务
schedule.every().day.at("10:29").do(schedule.clear)
# 每天10:30执行一次任务
schedule.every().day.at("10:30").do(job)
# 每周一的这个时候执行一次任务
schedule.every().monday.do(job)
# 每周一的这个时候执行一次任务
schedule.every().wednesday.at("13:15").do(job)
# 执行任务
while True:
schedule.run_pending()
注:执行任务时 需要手动while True,所以他本身是阻塞的,如果想要非阻塞,需要自己创建线程
定时任务时间限制
如果想要某条任务运行时间不超过n秒
可以使用如下代码
import time
import eventlet
if __name__ == '__main__':
eventlet.monkey_patch() # 这句最好写在main或函数中,不然会对项目中其他的DNS造成影响
with eventlet.Timeout(2, False): #设置超时时间为2秒
print('这条语句正常执行') # 这行输出
time.sleep(4)
print('没有跳过这条输出') # 这行不输出跳过
print('跳过了输出')
网友评论