在celery中单任务可以使用self.update_state方法来更新进度的,如下:
self.update_state(task_id = task_id, state = state, meta = meta)
最重要的一个属性就是taskid了,这里可以不写,不写的话默认就是self.request.id自动生成的是当前的id
那么如果是group,chord这样的批量任务产生的多个任务,就有多个任务id,这样就没办法更新了,也没有办法将task id 传到前端来更新进度条了
在搜索这样的解决方案后找到了一个方法.
class progress_chord(chord):
def __call__(self, body = None, **kwargs):
_chord = self.type
body = (body or self.kwargs['body']).clone()
kwargs = dict(self.kwargs, body=body, **kwargs)
if _chord.app.conf.CELERY_ALWAYS_EAGER:
return self.apply((), kwargs)
# 设置chord自定义类的跟踪id
callback_id = body.options.setdefault('task_id', uuid())
r = _chord(**kwargs)
return _chord.AsyncResult(callback_id), r
这里重新继承了chord,并在body中的options字典中,将task_id 放入了, 这样当我们使用这个类作为默认的celery.chord的功能时候就可以获取到这个task id 了
header = [task.s(url = item['href'], page = item['page'], total =self.total, filename =self.filename)for itemin items]
callback = templink.s(1)
task = progress_chord(group(header))(callback) # callback 是一个回调的celery task任务
在task类中,使用self.request.chord['options']['task_id']来得到id
并使用
self.update_state(task_id = task_id, state = state, meta = meta)
来更新
那么思考下,group的操作可能与chord类似
网友评论