美文网首页
celery分布式设计

celery分布式设计

作者: WithUs | 来源:发表于2019-09-25 22:48 被阅读0次

    Celery的架构

    • Celery包含如下组件:
    1. Celery Beat:任务调度器,Beat进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列(一般用于定时任务使用)。
    2. Celery Worker:执行任务的消费者,通常会在多台服务器运行多个消费者来提高执行效率。
    3. Broker:消息代理,或者叫作消息中间件,接受任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方(本方案使用redis)。
    4. Producer:调用了Celery提供的API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。
    5. Result Backend:任务处理完后保存状态信息和结果,以供查询。(本方案使用redis来存储结果)

    Celery的架构图如图所示。

    celery架构图

    根据celery架构图将分布式设计方案如下。

    1. 任务发布者:产品在后台添加股票完成之后,点击回测,然后分发任务都不同的机器。
    2. 任务调度:按照设定的时间调用delay方法。
    3. 消息代理:使用redis来存储股票代码。每次执行的股票代码都是从redis读取。
    4. 任务消费者:在不同的机器上开启worker,执行调度的股票进行回测。
    5. 回测结果:存放在redis中,然后读取出渲染在后台上展示出来。

    需要用到

    from kombu import Queue
    from flask import Flask
    

    部分代码如下:

    CELERY_TIMEZONE = 'Asia/Shanghai'
    CELERY_QUEUES = (  # 定义任务队列
        Queue("default", routing_key="distributed.#"),
        Queue("tasks_A", routing_key="A.#"),
        Queue("tasks_B", routing_key="B.#"), 
    )
    
    CELERY_ROUTES = (
        [
            ("web_management.web.trade.distributed.add", {"queue": "default"}),  
            ("web_management.web.trade.distributed.taskA", {"queue": "tasks_A"}), 
            ("web_management.web.trade.distributed.taskB", {"queue": "tasks_B"}), 
        ],
    )
    
    CELERY_RESULT_SERIALIZER = "json"  
    
    CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 
    app = Flask(__name__)
    app.config['CELERY_BROKER_URL'] = 'redis://' + ppt.redis_ip + ':' + ppt.redis_port + '/0'
    app.config['CELERY_RESULT_BACKEND'] = 'redis://' + ppt.redis_ip + ':' + ppt.redis_port + '/1'
    app.config['CELERY_QUEUES'] = CELERY_QUEUES
    app.config['CELERY_TIMEZONE'] = CELERY_TIMEZONE
    app.config['CELERY_ROUTES'] = CELERY_ROUTES
    app.config['CELERY_RESULT_SERIALIZER'] = CELERY_RESULT_SERIALIZER
    app.config['CELERY_TASK_RESULT_EXPIRES'] = CELERY_TASK_RESULT_EXPIRES
    celery = Celery('distributed', backend=app.config['CELERY_RESULT_BACKEND'], broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    

    开启work的方式

    celery worker -A web_management.web.trade.distributed.celery -Q tasks_A --concurrency=30 -l info -Q后面为方法对应的队列名称 --concurrency= 后面 为开启的worker数目,也可以使用 -c= 具体开启是数目根据你的电脑CPU个数确定,小于等于cpu个数即可

    备注

    • celery 使用4.1.1版本
    • kombu 使用4.2.0版本
      *先安装kombu,然后安装celery
    • 可以解决如图问题
    • 版本选择

    相关文章

      网友评论

          本文标题:celery分布式设计

          本文链接:https://www.haomeiwen.com/subject/apawuctx.html