美文网首页
CELERY 集群管理实现

CELERY 集群管理实现

作者: MoonMonsterss | 来源:发表于2018-10-21 19:05 被阅读60次

    这篇需要结合[CELERY 测试多服务器]来看。

    主要功能实现: 当创建新任务时,可以指定虚拟环境(服务器)执行任务。

    1.代码

    1.1 __init__.py

    from celery import Celery
    
    app = Celery('celery_app')
    # 加载配置
    app.config_from_object('celery_app.celeryconfig')
    
    

    1.2 celeryconfig.py

    from datetime import timedelta
    from kombu import Queue, Exchange
    
    result_serializer = 'json'
    
    # 中间件
    broker_url = 'redis://127.0.0.1:6379/7'
    # 结果处理
    result_backend = 'redis://127.0.0.1:6379/8'
    # 时区
    timezone = 'Asia/Shanghai'
    
    # 导入任务模块
    imports = (
        'celery_app.task1',
        'celery_app.task2'
    )
    # 定时任务
    beat_schedule = {
        'add-every-20-seconds': {
            'task': 'celery_app.task1.multiply',
            'schedule': timedelta(seconds=20),
            'args': (5, 7)
        },
        'add-every-10-seconds': {
            'task': 'celery_app.task2.add',
            'schedule': timedelta(seconds=10),
            'args': (100, 200)
        }
    }
    # 任务队列
    # 保持三个数据一致
    # exchange 对应 一个消息队列(queue),即:通过"消息路由"的机制使exchange对应queue,每个queue对应每个worker
    task_queues = (
        Queue('default', exchange=Exchange('default'), routing_key='default'),
        Queue('priority_high', exchange=Exchange('priority_high'), routing_key='priority_high'),
        Queue('priority_low', exchange=Exchange('priority_low'), routing_key='priority_low')
    )
    
    task_routes = {
        'celery_app.task1.multiply': {'queue': 'priority_high', 'routing_key': 'priority_high'},
        'celery_app.task2.add': {'queue': 'priority_low', 'routing_key': 'priority_low'}
    }
    

    1.3 task1.py

    import time
    from . import app
    
    
    @app.task
    def multiply(x, y):
        print('multiply')
        time.sleep(4)
        return x * y
    
    

    1.4 task2.py

    import time
    from . import app
    
    
    @app.task
    def add(x, y):
        print('add')
        time.sleep(2)
        return x + y
    

    2. 执行

    2.1 创建虚拟环境

    创建使用pipenv( pipenv install )创建两个虚拟环境,并将上面的代码文件分别复制到相应环境下。

    2.2 启动celery

    使用pipenv shell打开虚拟环境
    回到celery_app的上一层,使用一下命令
    在虚拟环境(env1)中,
    celery -A celery_app worker -l info -Q priority_high -P eventlet
    在虚拟环境(env2)中
    celery -A celery_app worker -l info -Q priority_low -P eventlet
    作用是,queue=priority_high, routing_key=priority_high的任务都将在env1中执行,
    queue=priority_low, routing_key=priority_low的任务都将在env2中执行。

    2.3 创建任务

    使用apply_async函数创建任务,args表示传参,queue结合routing_key确定使用的虚拟环境(服务器)

    >>> for _ in range(30):
    ...     re = task1.multiply.apply_async(args=[20,20],queue='priority_high',routing_key='priority_high')
    ...     re2 = task2.add.apply_async(args=[20,20],queue='priority_low',routing_key='priority_low')
    ...     print(re.get())
    ...     print(re2.get())
    

    在虚拟环境1中,只执行了queue=priority_high, routing_key=priority_high的任务

    [2018-09-06 16:48:01,572: INFO/MainProcess] Received task: celery_app.task1.mult
    iply[3c058487-eee8-4505-9238-f549726680fb]
    [2018-09-06 16:48:01,574: WARNING/MainProcess] multiply
    [2018-09-06 16:48:01,575: INFO/MainProcess] Received task: celery_app.task1.mult
    iply[ead3df3d-a23c-4a57-825a-8e85d5033fd1]
    [2018-09-06 16:48:05,581: INFO/MainProcess] Task celery_app.task1.multiply[ead3d
    f3d-a23c-4a57-825a-8e85d5033fd1] succeeded in 4.0090000000018335s: 400
    [2018-09-06 16:48:05,583: INFO/MainProcess] Received task: celery_app.task1.mult
    iply[954575ef-fe0a-4752-907b-ded4e4a4173b]
    [2018-09-06 16:48:05,584: WARNING/MainProcess] multiply
    [2018-09-06 16:48:05,586: INFO/MainProcess] Received task: celery_app.task1.mult
    iply[864f666f-8a5c-490b-950f-5ac5b37a83b5]
    [2018-09-06 16:48:09,591: INFO/MainProcess] Task celery_app.task1.multiply[864f6
    66f-8a5c-490b-950f-5ac5b37a83b5] succeeded in 4.008999999998196s: 400
    

    在虚拟环境2中,执行了queue=priority_low, routing_key=priority_low的任务

    [2018-09-06 16:47:49,519: WARNING/MainProcess] add
    [2018-09-06 16:47:51,520: INFO/MainProcess] Task celery_app.task2.add[9ab04f96-d
    1c5-4d3c-87c4-6e5707ef90c1] succeeded in 2.0119999999988067s: 40
    [2018-09-06 16:47:53,542: INFO/MainProcess] Received task: celery_app.task2.add[
    dcf76311-e251-4f8e-bf48-b3c422f63eec]
    [2018-09-06 16:47:53,543: WARNING/MainProcess] add
    [2018-09-06 16:47:55,545: INFO/MainProcess] Task celery_app.task2.add[dcf76311-e
    251-4f8e-bf48-b3c422f63eec] succeeded in 2.0120000000024447s: 40
    [2018-09-06 16:47:57,553: INFO/MainProcess] Received task: celery_app.task2.add[
    60c201fa-bad4-4af4-9d02-bb68e718fb51]
    [2018-09-06 16:47:57,554: WARNING/MainProcess] add
    [2018-09-06 16:47:59,556: INFO/MainProcess] Task celery_app.task2.add[60c201fa-b
    ad4-4af4-9d02-bb68e718fb51] succeeded in 2.0119999999988067s: 40
    [2018-09-06 16:48:01,575: INFO/MainProcess] Received task: celery_app.task2.add[
    1640912d-536e-43e2-9960-ebd58af5cb3f]
    [2018-09-06 16:48:01,576: WARNING/MainProcess] add
    [2018-09-06 16:48:03,580: INFO/MainProcess] Task celery_app.task2.add[1640912d-5
    36e-43e2-9960-ebd58af5cb3f] succeeded in 2.0120000000024447s: 40
    

    3.参考

    https://my.oschina.net/hochikong/blog/518587
    https://www.213.name/archives/1105

    相关文章

      网友评论

          本文标题:CELERY 集群管理实现

          本文链接:https://www.haomeiwen.com/subject/ratizftx.html