python定时任务有以下常见方案
- python-crontab 系列
如python-crontab、django-crontab
封装了Linux提供的crontab命令
在Linux上需开启crontab,不支持windows,适用于中小型项目 - apscheduler 系列
如apscheduler、django-apscheduler、flask-apscheduler
支持windows和linux,适用于中小型项目 - Celery 系列
如celery、django-celery、flask-celery
支持windows和linux,支持分布式,配置较复杂,适用于大型项目 - 自建轮子
import os, sys, time, datetime
import threading
import django
base_apth = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# print(base_apth)
# 将项目路径加入到系统path中,这样在导入模型等模块时就不会报模块找不到了
sys.path.append(base_apth)
os.environ['DJANGO_SETTINGS_MODULE'] ='base_django_api.settings' # 注意:base_django_api 是我的模块名,你在使用时需要跟换为你的模块
django.setup()
from base.models import ConfDict
def confdict_handle():
while True:
try:
loca_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print('本地时间:'+str(loca_time))
time.sleep(10)
except Exception as e:
print('发生错误,错误信息为:', e)
continue
def main():
'''
主函数,用于启动所有定时任务,因为当前定时任务是手动实现,因此可以自由发挥
'''
try:
# 启动定时任务,多个任务时,使用多线程
task1 = threading.Thread(target=confdict_handle)
task1.start()
except Exception as e:
print('发生异常:%s' % str(e))
if __name__ == '__main__':
main()
Celery
Celery是一个任务队列管理工具,可用于实现异步接口、定期删除/缓存Redis数据、定期发送消息等。Celery本身不提供消息存储。
- Producer
生产者,调用Celery的API产生任务并交给任务队列 - Celery Beat
任务调度器,Beat进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。 - Brokers
中间人,指任务队列,仅支持Redis、RabbitMQ。 - Celery Workers
消费者,从队列中取出任务并执行。可在多台服务器运行多个消费者提高效率 -
Result Stores / backend
任务处理完后保存状态信息和结果,可以使用Redis、RabbitMQ或DjangoORM等。
celery工作流程
消息队列
使用场景
- 解耦
如在订单与库存系统中加入消息队列,使两个系统解耦 - 异步任务
如发送短信、邮件、刷新缓存等 - 流量削峰
如秒杀活动等高并发场景
broker选择
- Redis
其list适用于做轻量级的MQ存储,但功能和逻辑需上层应用自行实现 - RabbitMQ
使用生产-消费者模式,并引入了Echange(交换器)概念,根据调度策略将生产者的消息转发给符合的Queue,实现解耦
相比于redis的优势:
- 发布确认:生产者发布消息给Broker后,会收到Broker的反馈,保证发布成功
- 消费确认:消息提交给消费者后,如未成功消费确认,会返回到消息队列
- 高可用性:自带集群和负载均衡
- 持久化:redis只能将整个数据库持久化,而RabbitMQ可以对每条队列或消息分别设置持久化
![](https://img.haomeiwen.com/i6177832/b3e8da55f72dd99a.png)
基本使用DEMO
- 安装redis和celery
如果celery>=4.0,需要确保redis>=2.10.4
apt-get install redis-server
pip install redis
pip install celery
- 建立task
#tasks.py
from celery import Celery
app = Celery('tasks', backend='redis://:yourpassword@localhost:6379/0', broker='redis://:yourpassword@localhost:6379/0') #配置好celery的backend和broker
@app.task #普通函数装饰为 celery task
def add(x, y):
return x + y
也可以通过app.config_from_object()
加载配置模块:
app.config_from_object('celeryconfig')
# celeryconfig.py
broker_url = 'pyamqp://'
result_backend = 'rpc://'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
- 启动worker(开始从队列中读取任务并执行)
celery -A tasks worker --loglevel=info
可通过celery worker --help
查看命令列表,常用包括以下内容:
- -A, --app
指定使用的 Celery 实例,必须为module.path:attribute
格式,如-A swallow.celery
。 - -l, --loglevel [DEBUG(默认)|INFO|WARNING|ERROR|CRITICAL|FATAL]
日志级别 - -P, --pool [prefork(默认)|eventlet|gevent|solo|processes|threads]
并发模式,其默认值prefork在windows上不支持,会报错not enough values to unpack (expected 3, got 0)
可使用--pool=solo
(单进程)或-P solo
代替
也可以使用gevent
或eventlet
,但需先用pip下载 - -c, --concurrency
同时处理任务的工作进程数量,超出的任务需等待其他任务完成后执行。默认为CPU数
- 触发任务
- 通过
delay
或apply_async
直接触发
当任务完成时result.ready()
为 True,然后用result.get()
取结果即可。
确保调用了result.get()
或result.forget()
,否则资源不会释放
#trigger.py
from tasks import add
result = add.delay(4, 4) #不要直接 add(4, 4),这里需要用 celery 提供的接口 delay 进行调用
while not result.ready():
time.sleep(1)
print 'task done: {0}'.format(result.get())
- 通过
beat
定时触发
celery beat 是一个调度程序,会定期触发任务
通过celery -A 【beat实例】 beat
启动,如celery -A swallow.celery beat -l info
此处的crontab
模块并未调用系统的crontab
,只是同名罢了
#settings.py
from celery.schedules import timedelta, crontab
# 默认使用UTC时区,建议改为`'Asia/Shanghai'`
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERYBEAT_SCHEDULE = {
"task_every_30_minutes": {# 该key随便起,不需要调用。
"task": "task_every_30_minutes",# 实际调用的是这个task
"schedule": timedelta(minutes=30)
},
"task_every_day_start": {
"task": "task_every_day_start",
"schedule": crontab(minute=0, hour=0)
},
"task_every_even_hour": {
"task": "task_every_even_hour",
'schedule': crontab(minute=0, hour='0,2,4,6,8,10,12,14,16,18,20,22'),
},
}
在django中使用celery
- 独立使用
celery
需进行相关配置,使celery可以调用django中的内容,如django.setup()
等。 - 使用
celery
+djcelery
配置更方便
网友评论