美文网首页
Python celery 学习笔记

Python celery 学习笔记

作者: 逐风细雨 | 来源:发表于2021-10-19 17:08 被阅读0次

    概述

       celery是一个python实现的分布式任务执行框架,本文为学习笔记:

    • python 3.8.6
    • celery 5.1.2
    • 操作系统 win10
    • redis 3.0.504

    安装

    • pip install celery
    • pip install flower (任务监控平台)
    • pip install eventlet (win10 环境需要)

    官方文档地址
    参考的文章
    框架架构流程图:

    image.png
    参考官网上面的例子,将celery 当成一个独立工程来维护,方便将已有的业务按统一规范写成任务入口函数
    目录结构如下:
    image.png
    celery.py 文件代码:
    from celery import Celery
    
    app = Celery('proj',
                 broker='redis://:123456@10.2.13.167:6379/1',
                 backend='redis://:123456@10.2.13.167:6379/2',
                 include=['proj.tasks'])
    
    # Optional configuration, see the application user guide.
    app.conf.update(
        result_expires=3600,
    )
    
    if __name__ == '__main__':
        app.start()
    
    

    tasks.py 文件代码如下:

    from .celery import app
    
    
    @app.task
    def add(x, y):
        print("call add")
        return x + y
    
    
    @app.task
    def mul(x, y):
        return x * y
    
    
    @app.task
    def xsum(numbers):
        return sum(numbers)
    

    启动服务

    • 启动 celery celery -A proj worker -l INFO -P eventlet
    • 启动 flower celery -A proj flower --address=127.0.0.1 --port=5566 # web监控页面打开方式 http://127.0.0.1:5566
      直接ctrl+c 退出程序
      在liunx上面 支持以 守护进程方式启动 celery multi start w1 -A proj -l INFO multi 其它参数:重启是 restart 停止是stop,还支持启动多个worker 这里是只启动了一个,具体参数参考官方文档

    集成到flask框架中

       celery集成到flask框架中不需要安装任何扩展,网上有其它的方式进行集成,本人采用的是,直接调用 tasks.py 下面的任务函数的方式实现
    代码如下:

    from flask import Flask, request
    from proj.tasks import app, add, mul
    from celery.result import AsyncResult
    
    flask_app = Flask(__name__)
    
    
    @flask_app.route('/add', methods=["POST"])
    def celery_add():
        """
        执行add任务
        POST http://127.0.0.1:5000/add
        Body 为表单 参数为 args1,args2
        :return:
        """
        try:
            args1, args2 = request.form.values()
            print(f"获取到的参数:{args1},{args2}")
        except Exception as e:
            return str(e)
        result = add.delay(int(args1), int(args2))
        result_id = result.id
        print(f"任务id:{result_id}")
        return result_id
    
    
    @flask_app.route('/get_result', methods=["GET"])
    def get_result_id():
        """
        根据id获取任务结果
        GET http://127.0.0.1:5000/get_result?id=fd7cb7dc-5b1b-4e07-8199-dd89a0c08a2a
        :return:
        """
        result_id = request.args.get('id')
        async_result = AsyncResult(id=result_id, app=app)
        result = ""
        if async_result.successful():
            result = async_result.get()
            print(result)
            # result.forget() # 将结果删除
        elif async_result.status == 'PENDING':
            print('任务等待被执行')
        elif async_result.status == 'RETRY':
            print('任务异常后重试')
        elif async_result.status == 'STARTED':
            print('任务执行中')
        elif async_result.failed():
            print('任务执行失败')
    
        return str(result)
    
    
    if __name__ == "__main__":
        flask_app.run("127.0.0.1", port=5000)
    

    启动flask程序
    通过postman进行调用测试:

    1. 调用add函数,获取任务id:


      image.png
    2. 获取对应id的结果:


      image.png
    3. 打开flower 查看调用过程 如下图:


      image.png

    相关文章

      网友评论

          本文标题:Python celery 学习笔记

          本文链接:https://www.haomeiwen.com/subject/typaoltx.html