Celery中Task类方法重写
应用场景:下面四中方法在各自场景下执行,但源码却没有做出任何操作,所有如果有需要可重写一个类并继承Task类,重写下面的方法
- after_return:在任务执行返回后交给 worker 执行
- on_failure:在任务执行失败后交给 worker 执行
- on_retry:在任务进行重试是交给 worker 执行
- on_success:在任务执行成功后交给 worker 执行
from celery import Celery
import celery
class MyTask(celery.Task):
def on_success(self, retval, task_id, args, kwargs):
print('task done: {0}'.format(retval))
return super(MyTask, self).on_success(retval, task_id, args, kwargs)
def on_failure(self, exc, task_id, args, kwargs, einfo):
print('task fail, reason: {0}'.format(exc))
# return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)
app = Celery('tasks', backend='redis://localhost:6379/0',
broker='redis://localhost:6379/0') # 配置好celery的backend和broker
@app.task(base=MyTask,name='Demo.tasks.add')
def add(x, y):
raise KeyError
return x + y
将任务绑定为实例方法
from celery import Celery
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
app = Celery('tasks', backend='redis://localhost:6379/0',
broker='redis://localhost:6379/0') # 配置好celery的backend和broker
@app.task(name='Demo.tasks.add',bind=True)
def add(self,x, y):
logger.info(self.request.__dict__)
return x + y
注意add方法里的self和类方法里的self相似都代表是自己本身,都是可不传的参数,可获取到与自身有关的所有参数
具体参数参考这里
任务状态回调
任务状态为以下几种:
image.png
如果有一个耗时较长的任务进行的时候,如果我们想知道他的进度的话,我们可以定义个任务状态,用来说明任务的进度
#tasks.py
@app.task(name='Demo.tasks.test_mes',bind=True)
def test_mes(self):
for i in range(1,11):
time.sleep(0.1)
self.update_state(state='PROGRESS',meta={"p": i*10})
return 'finish'
------------------------------------------------------------------------
#trigger.py
from Demo.tasks import test_mes
import sys
def pm(body):
res = body.get('result')
if body.get('status') == 'PROGRESS':
print('res:'+str(res))
sys.stdout.write('\r任务进度: {0}%'.format(res.get('p')))
sys.stdout.flush()
else:
print('\r')
print('over:'+str(res))
r = test_mes.delay()
r.get(on_message=pm, propagate=False)
使用self.update_state可以状态发送给Woker,其中state为我们自己定义的状态,而meta为传给worker的参数。在woker中可以用r.get(on_message=pm, propagate=False)将这些参数与状态传给某个函数,从而可以知道该任务的进度,其中on_message为将传来的参数放于哪个函数中,progagate为是否要传递错误信息,如果为False即不传递错误信息
定时任务处理
有些任务我们需要定时间的去启动,那么这时候我们就需要配置celery,创建celery的配置文件,如celery_config,里面放入
from datetime import timedelta
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
'tasks': {
'task': 'tasks.period_task',
'schedule': crontab(minute="*/1"),
},
}
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERYBEAT_SCHEDULE为定时启动的任务,其中task对应的value就是你要定期启动的函数,schedule就是定时时间,分别可以使用contab,timedelta都可以配置时间,如果函数需要传入参数,使用"args":参数
"schedule": crontab(minute="/10", # 每十分钟执行
"schedule": crontab(minute="/1"), # 每分钟执行
"schedule": crontab(minute=0, hour="*/1"), # 每小时执行
"schedule": timedelta(seconds=5) # 没五秒执行一次
另外的一些参数:
BROKER_URL: Broker配置,使用redis作为消息中间件
CELERY_RESULT_BACKEND : BACKEND,使用redis存放结果
CELERY_RESULT_SERIALIZER : 结果序列化方案
CELERY_TASK_RESULT_EXPIRES : 任务过期时间
在任务文件中tasks.py输入一下代码
from celery import Celery
app = Celery('tasks', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0')
app.config_from_object('celery_config')
@app.task(bind=True,name='tasks.period_task')
def period_task(self):
print('period task done: {0}'.format(self.request.id))
开启一个终端执行celery -A tasks beat(需注意路劲问题,在与任务文件的同级目录下执行,tasks为文件名)
开启另外一个终端执行celery -A tasks worker --pool=solo -l info
image.png
需要注意的是如何时间方面有涉及到中国地区的话,需在配置中加入时区信息CELERY_TIMEZONE = 'Asia/Shanghai',默认为以utc为标准。
任务编排
链式任务chain
有时候我们一个任务需要等待上个任务执行完才能执行的话,我们就需要用到链式任务
@app.task(name="tasks.fetch_page")
def fetch_page(url):
print('进入第一层'+url)
return '我是第一层'
@app.task(name="tasks.parse_page")
def parse_page(page):
print('进入第二层'+page)
return '我是第二层'
@app.task(name="tasks.store_page_info")
def store_page_info(info, url):
print('进入第三层'+info+url)
return '执行结束'
假定函数有三层每层都需要上一层的返回才能继续往下执行
Woker文件中写入以下代码
from celery import group, chain
from Demo.tasks import *
def update_page_info(url):
# fetch_page -> parse_page -> store_page
# 第一种写法
# chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)
#
# chain()
# 第二种写法,res.get()为最后返回的结果
res = chain(fetch_page.s(url),parse_page.s(),store_page_info.s(url))()
while True:
if res.ready():
print("res"+str(res.get()))
break
update_page_info('www.baidu.com')
组任务group
并行执行组内的每个任务
group(fetch_page.s(url),parse_page.s('test1'),store_page_info.s('test2',url))()
任务分割chord
分为header和body两个部分,会先执行header在将header的结果传给body执行
chord(header=[fetch_page.s(url)],body=parse_page.s())()
任务分组chunks
按照任务个数分组,并不是并发执行
fetch_page.chunks(['1','2','3'],2)() #2代表每组的任务个数,需要注意的是如果第一个参数传入的是字符串的话,那么字符串会被分割成每个字符当作参数传入
网友评论