美文网首页
Celery进阶

Celery进阶

作者: ZplD | 来源:发表于2019-03-15 12:26 被阅读0次

    Celery中Task类方法重写

    应用场景:下面四中方法在各自场景下执行,但源码却没有做出任何操作,所有如果有需要可重写一个类并继承Task类,重写下面的方法

    • after_return:在任务执行返回后交给 worker 执行
    • on_failure:在任务执行失败后交给 worker 执行
    • on_retry:在任务进行重试是交给 worker 执行
    • on_success:在任务执行成功后交给 worker 执行
    from celery import Celery
    import celery
    
    class MyTask(celery.Task):
        def on_success(self, retval, task_id, args, kwargs):
            print('task done: {0}'.format(retval))
            return super(MyTask, self).on_success(retval, task_id, args, kwargs)
        def on_failure(self, exc, task_id, args, kwargs, einfo):
            print('task fail, reason: {0}'.format(exc))
            # return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)
    
    app = Celery('tasks', backend='redis://localhost:6379/0',
                     broker='redis://localhost:6379/0')  # 配置好celery的backend和broker
    @app.task(base=MyTask,name='Demo.tasks.add')
    def add(x, y):
        raise KeyError
        return x + y
    
    

    将任务绑定为实例方法

    from celery import Celery
    from celery.utils.log import get_task_logger
    
    logger = get_task_logger(__name__)
    
    app = Celery('tasks', backend='redis://localhost:6379/0',
                     broker='redis://localhost:6379/0')  # 配置好celery的backend和broker
    @app.task(name='Demo.tasks.add',bind=True)
    def add(self,x, y):
        logger.info(self.request.__dict__)
        return x + y
    
    

    注意add方法里的self和类方法里的self相似都代表是自己本身,都是可不传的参数,可获取到与自身有关的所有参数

    具体参数参考这里

    任务状态回调

    任务状态为以下几种:


    image.png

    如果有一个耗时较长的任务进行的时候,如果我们想知道他的进度的话,我们可以定义个任务状态,用来说明任务的进度

    #tasks.py
    @app.task(name='Demo.tasks.test_mes',bind=True)
    def test_mes(self):
        for i in range(1,11):
            time.sleep(0.1)
            self.update_state(state='PROGRESS',meta={"p": i*10})
        return 'finish'
    
    ------------------------------------------------------------------------
    #trigger.py
    from Demo.tasks import test_mes
    import sys
    def pm(body):
        res = body.get('result')
        if body.get('status') == 'PROGRESS':
            print('res:'+str(res))
            sys.stdout.write('\r任务进度: {0}%'.format(res.get('p')))
            sys.stdout.flush()
        else:
            print('\r')
            print('over:'+str(res))
    
    r = test_mes.delay()
    
    r.get(on_message=pm, propagate=False)
    
    

    使用self.update_state可以状态发送给Woker,其中state为我们自己定义的状态,而meta为传给worker的参数。在woker中可以用r.get(on_message=pm, propagate=False)将这些参数与状态传给某个函数,从而可以知道该任务的进度,其中on_message为将传来的参数放于哪个函数中,progagate为是否要传递错误信息,如果为False即不传递错误信息

    定时任务处理

    有些任务我们需要定时间的去启动,那么这时候我们就需要配置celery,创建celery的配置文件,如celery_config,里面放入

    from datetime import timedelta
    from celery.schedules import crontab
    CELERYBEAT_SCHEDULE = {
        'tasks': {
            'task': 'tasks.period_task',
            'schedule': crontab(minute="*/1"),
        },
    
    }
    
    CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
    

    CELERYBEAT_SCHEDULE为定时启动的任务,其中task对应的value就是你要定期启动的函数,schedule就是定时时间,分别可以使用contab,timedelta都可以配置时间,如果函数需要传入参数,使用"args":参数
    "schedule": crontab(minute="/10", # 每十分钟执行
    "schedule": crontab(minute="
    /1"), # 每分钟执行
    "schedule": crontab(minute=0, hour="*/1"), # 每小时执行
    "schedule": timedelta(seconds=5) # 没五秒执行一次
    另外的一些参数:
    BROKER_URL: Broker配置,使用redis作为消息中间件
    CELERY_RESULT_BACKEND : BACKEND,使用redis存放结果
    CELERY_RESULT_SERIALIZER : 结果序列化方案
    CELERY_TASK_RESULT_EXPIRES : 任务过期时间

    在任务文件中tasks.py输入一下代码

    from celery import Celery
    
    app = Celery('tasks', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0')
    app.config_from_object('celery_config')
    @app.task(bind=True,name='tasks.period_task')
    def period_task(self):
        print('period task done: {0}'.format(self.request.id))
    

    开启一个终端执行celery -A tasks beat(需注意路劲问题,在与任务文件的同级目录下执行,tasks为文件名)
    开启另外一个终端执行celery -A tasks worker --pool=solo -l info


    image.png

    需要注意的是如何时间方面有涉及到中国地区的话,需在配置中加入时区信息CELERY_TIMEZONE = 'Asia/Shanghai',默认为以utc为标准。

    任务编排

    链式任务chain

    有时候我们一个任务需要等待上个任务执行完才能执行的话,我们就需要用到链式任务

    @app.task(name="tasks.fetch_page")
    def fetch_page(url):
        print('进入第一层'+url)
        return '我是第一层'
    
    @app.task(name="tasks.parse_page")
    
    def parse_page(page):
        print('进入第二层'+page)
        return '我是第二层'
    
    @app.task(name="tasks.store_page_info")
    
    def store_page_info(info, url):
    
        print('进入第三层'+info+url)
        return '执行结束'
    

    假定函数有三层每层都需要上一层的返回才能继续往下执行

    Woker文件中写入以下代码

    from celery import group, chain
    
    from Demo.tasks import *
    def update_page_info(url):
    
        # fetch_page -> parse_page -> store_page
        # 第一种写法
        # chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)
        #
        # chain()
        # 第二种写法,res.get()为最后返回的结果
        res = chain(fetch_page.s(url),parse_page.s(),store_page_info.s(url))()
        while True:
            if res.ready():
                print("res"+str(res.get()))
                break
    update_page_info('www.baidu.com')
    

    组任务group

    并行执行组内的每个任务

    group(fetch_page.s(url),parse_page.s('test1'),store_page_info.s('test2',url))()
    

    任务分割chord

    分为header和body两个部分,会先执行header在将header的结果传给body执行

    chord(header=[fetch_page.s(url)],body=parse_page.s())()
    

    任务分组chunks

    按照任务个数分组,并不是并发执行

    fetch_page.chunks(['1','2','3'],2)() #2代表每组的任务个数,需要注意的是如果第一个参数传入的是字符串的话,那么字符串会被分割成每个字符当作参数传入
    

    相关文章

      网友评论

          本文标题:Celery进阶

          本文链接:https://www.haomeiwen.com/subject/kqslmqtx.html