1. 定时运行
# pip install schedule
import schedule
import time
import datetime
def job(name):
print("her name is : ", name)
print('当前的时间: ', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
name = "longsongpong"
'''1 表示一分钟'''
#schedule.every(1).minutes.do(job, name)
'''每10秒执行一次job函数# '''
#schedule.every(10).seconds.do(job, name)
'''当every()没参数时默认是1小时/分钟/秒执行一次job函数'''
# schedule.every().hour.do(job, name)
'''当every(2)表示每2小时执行一次job函数'''
# schedule.every(2).hours.do(job, name)
schedule.every().day.at("11:10").do(job, name)
schedule.every().day.at("11:09").do(job, name)
# schedule.every(5).to(10).days.do(job, name)
'''具体某一天某个时刻执行一次job函数'''
# schedule.every().monday.do(job, name)
# schedule.every().wednesday.at("13:15").do(job, name)
while True:
tt = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(tt)
schedule.run_pending()
time.sleep(1) #通常是这个
2. 取消作业
import schedule
import datetime
from schedule import every, repeat, run_pending
import time
def some_task():
print('Hello world')
# 1.取消任务
def cancelOneJob():
job = schedule.every().seconds.do(some_task)
#schedule.cancel_job(job)
while True:
tt = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(tt)
run_pending()
time.sleep(1)
# 2. 运行一次作业
def job_that_executes_once():
# Do some work that only needs to happen once...
print('运行任务')
return schedule.CancelJob
def cancelJob():
schedule.every().day.at('20:38').do(job_that_executes_once)
while True:
schedule.run_pending()
time.sleep(1)
#3. 获取所有工作'''
def hello():
print('Hello world')
def getAllJob():
schedule.every().second.do(hello)
all_jobs = schedule.get_jobs()
print(all_jobs)
# [Every 1 second do hello() (last run: [never], next run: 2021-12-30 20:43:17)]
def canceAlllJob():
def greet(name):
print('Hello {}'.format(name))
schedule.every().second.do(greet)
schedule.clear()
# 要从调度程序中删除所有作业
# 5. 获取多个作业,按标签过滤
def getAllJob_tag():
def greet(name):
print('Hello {}'.format(name))
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')
friends = schedule.get_jobs('friend')
print(friends)
#6. 取消多个作业,按标签过滤'''
def canceMoreJob_tag():
def greet(name):
print('Hello {}'.format(name))
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')
schedule.clear('daily-tasks')
# 将阻止每个标记为的作业daily-tasks再次运行。
#7. 以随机间隔运行作业'''
def randomSpace_RunJob():
def my_job():
print('Foo')
# Run every 5 to 10 seconds.
schedule.every(5).to(10).seconds.do(my_job)
while True:
tt = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(tt)
schedule.run_pending()
time.sleep(1)
# every(A).to(B).seconds 每 N 秒执行一次作业函数,使得 A <= N <= B。
#8. 运行作业直到特定时间'''
def runJob_untilTime():
def job():
print('Boo')
from datetime import datetime, timedelta, time
# 该until方法设置作业截止日期。作业将不会在截止日期之后运行。
# run job until a 18:30 today
schedule.every(1).hours.until("18:30").do(job)
# run job until a 2030-01-01 18:33 today
schedule.every(1).hours.until("2030-01-01 18:33").do(job)
# Schedule a job to run for the next 8 hours
schedule.every(1).hours.until(timedelta(hours=8)).do(job)
# Run my_job until today 11:33:42
schedule.every(1).hours.until(time(11, 33, 42)).do(job)
# run job until a specific datetime
schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)
#9.距离下一次执行的时间
def nextExecution_TimeUntil():
def job():
print('Hello')
schedule.every(5).seconds.do(job)
while 1:
n = schedule.idle_seconds()
if n is None:
# no more jobs
break
elif n > 0:
# sleep exactly the right amount of time
print(n)
time.sleep(n)
schedule.run_pending()
# 10.立即运行所有作业,无论它们的调度如何'''
def runAllJob():
def job_1():
print('Foo2')
def job_2():
print('Bar3')
schedule.every().monday.at("21:12").do(job_1)
schedule.every().tuesday.at("21:13").do(job_2)
# 星期几没啥用
schedule.run_all()
# Add the delay_seconds argument to run the jobs with a number
# of seconds delay in between.
#schedule.run_all(delay_seconds=10)
if __name__ == '__main__':
'''1.取消任务'''
# cancelOneJob()
'''2.运行一次作业; 从作业返回以将其从调度程序中删除'''
# cancelJob()
'''3. 获取所有工作'''
# getAllJob()
'''4. 取消所有作业'''
# canceAlllJob()
'''5. 获取多个作业,按标签过滤'''
# getAllJob_tag()
'''6. 取消多个作业,按标签过滤'''
# canceMoreJob_tag()
'''7. 以随机间隔运行作业'''
# randomSpace_RunJob()
'''8. 运行作业直到特定时间'''
# runJob_untilTime()
'''9. 距离下一次执行的时间'''
#nextExecution_TimeUntil()
'''10.立即运行所有作业,无论它们的调度如何'''
runAllJob()
网友评论