美文网首页
celery使用group或者chord如何实时更新状态进度?

celery使用group或者chord如何实时更新状态进度?

作者: 雨夜剪魂 | 来源:发表于2019-05-02 18:16 被阅读0次

    在celery中单任务可以使用self.update_state方法来更新进度的,如下:

    self.update_state(task_id = task_id, state = state, meta = meta)

    最重要的一个属性就是taskid了,这里可以不写,不写的话默认就是self.request.id自动生成的是当前的id

    那么如果是group,chord这样的批量任务产生的多个任务,就有多个任务id,这样就没办法更新了,也没有办法将task id 传到前端来更新进度条了

    在搜索这样的解决方案后找到了一个方法.

    class progress_chord(chord):

        def __call__(self, body = None, **kwargs):

            _chord = self.type

            body = (body or self.kwargs['body']).clone()

            kwargs = dict(self.kwargs, body=body, **kwargs)

            if _chord.app.conf.CELERY_ALWAYS_EAGER:

                return self.apply((), kwargs)

            # 设置chord自定义类的跟踪id

            callback_id = body.options.setdefault('task_id', uuid())

            r = _chord(**kwargs)

            return _chord.AsyncResult(callback_id), r

    这里重新继承了chord,并在body中的options字典中,将task_id 放入了, 这样当我们使用这个类作为默认的celery.chord的功能时候就可以获取到这个task id 了

    header = [task.s(url = item['href'], page = item['page'], total =self.total, filename =self.filename)for itemin items]

    callback = templink.s(1)

    task = progress_chord(group(header))(callback) # callback 是一个回调的celery task任务

    在task类中,使用self.request.chord['options']['task_id']来得到id

    并使用

    self.update_state(task_id = task_id, state = state, meta = meta)

    来更新

    那么思考下,group的操作可能与chord类似

    相关文章

      网友评论

          本文标题:celery使用group或者chord如何实时更新状态进度?

          本文链接:https://www.haomeiwen.com/subject/agannqtx.html