美文网首页
celery 定时任务 ,异步邮箱任务,flower web监控

celery 定时任务 ,异步邮箱任务,flower web监控

作者: 你常不走的路 | 来源:发表于2018-08-15 10:35 被阅读475次

    celery安装:

    pip install celery
    pip install celery[redis] 
    

    flower 安装:

    docker pull placr/flower
    docker run -d –p 5555:5555 --name flower --link redis:redis placr/flower
    

    #######访问端口5555就可以看到web 界面

    Celery 定时任务:

        项目目录结构:
        demo/
            celery_app/
                __init__.py
                celeryconfig.py
                tasks.py
    

    init.py 内容

    from celery import Celery
    
    app = Celery('demo')                                # 创建 Celery 实例
    app.config_from_object('celery_app.celeryconfig')      # 添加配置文件
    
    

    celeryconfig.py 内容

    # -*- coding: utf-8 -*-
    
    from celery.schedules import timedelta
    from celery.schedules import crontab
    
    # Broker and Backend
    BROKER_URL = 'redis://192.168.5.151:6379' # 指定 Broker
    #BROKER_URL = 'redis://127.0.0.1:6379'
    CELERY_RESULT_BACKEND = 'redis://192.168.5.151:6379/0' # 指定 Backend
    #CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
    CELERYD_PREFETCH_MULTIPLIER = 10  # 并发量
    CELERY_TASK_RESULT_EXPIRES = 3600 # 结果过期
    CELERY_TASK_ALWAYS_EAGER = False # 如果是这样True,所有任务将通过阻塞在本地执行,直到任务返回
    CELERY_ENABLE_UTC = False
    # Timezone
    CELERY_TIMEZONE="Asia/Shanghai"    # 指定时区,不指定默认为 'UTC'
    # CELERY_TIMEZONE='UTC'
    
    # import # 指定导入的任务模块
    CELERY_IMPORTS = (
    'celery_app.tasks',
    )
    
    # schedules
    CELERYBEAT_SCHEDULE = {
        'add-every-30-seconds': {
             'task': 'celery_app.tasks.add',
             'schedule': crontab(minute="*"),       # 每 60 秒执行一次
             'args': (5, 8)                           # 任务函数参数
    },
    }
    
    

    Tasks.py 内容

    # coding:utf-8
    
    import time
    from celery_app import app
    import os
    
    os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
    @app.task
    def add(x, y):
        time.sleep(2)
    return x + y
    

    启动一个worker

    Celery –A celery_app worker –l info
    

    启动一个 beat # 随时检查配置变化

    Celery –A celery_app beat –l info
    

    Celery 配合flask 异步发送邮件任务:

    Flask-demo/
        Main.py
        Celeryconfig.py
        Celerytasks.py
        Celeryapp.py
    

    Main.py 内容:

    from flask import Flask, request, render_template, session, flash, redirect, \
        url_for
    from flask_mail import Mail, Message
    from celeryapp import make_celery
    app = Flask(__name__)
    app.config['SECRET_KEY'] = 'top-secret!'
    
    # Flask-Mail configuration
    app.config['MAIL_SERVER'] = "smtp.163.com"
    app.config['MAIL_PORT'] = 25
    app.config['MAIL_USE_TLS'] = True
    app.config['MAIL_USERNAME'] = "xxxx@163.com"  # 配置自己的
    app.config['MAIL_PASSWORD'] = "xxxx"            # 配置自己的邮箱授权码
    app.config['MAIL_DEFAULT_SENDER'] = 'xxxx@163.com'
    
    mail = Mail(app)
    celery = make_celery(app)
    from celerytasks import add, send_async_email  # 只有在 celery 配置后 才可以调task
    
    
    @app.route('/', methods=['GET', 'POST'])
    def index():
        
        title = 'Hello from Flask'
    email = “to_email@xx.com”
    body = 'This is a test email sent from a background Celery task.'
    send_async_email.delay(title, email, body)
        return redirect(url_for('index'))
    if __name__ == '__main__':
        app.run(debug=True)
    

    celeryapp.py 内容:

    from celery import Celery
    
    def make_celery(app):
        celery = Celery(app.import_name)
        celery.config_from_object("celerytest")
    
        class ContextTask(celery.Task):
            def __call__(self, *args, **kwargs):
                with app.app_context():
                    return self.run(*args, **kwargs)
    
        celery.Task = ContextTask
    return celery
    

    Celeryconfig.py 内容:

    # -*- coding: utf-8 -*-
    from celery.schedules import timedelta
    from celery.schedules import crontab
    
    # Broker and Backend
    # BROKER_URL = 'redis://192.168.5.151:6379' # 指定 Broker
    BROKER_URL = 'redis://127.0.0.1:6379'
    # CELERY_RESULT_BACKEND = 'redis://192.168.5.151:6379/0' # 指定 Backend
    CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'
    CELERYD_PREFETCH_MULTIPLIER = 1  # 并发量
    CELERY_TASK_ALWAYS_EAGER = False  # 如果是这样True,所有任务将通过阻塞在本地执行,直到任务返回
    CELERY_ENABLE_UTC = False # 如果消息中的已启用日期和时间将转换为使用UTC时区。
    
    # 任务序列化和反序列化使用msgpack方案
    # CELERY_TASK_SERIALIZER = 'json'  # 可以是 json(默认),pickle,yaml,msgpack
    
    # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON
    # CELERY_RESULT_SERIALIZER = 'json'
    
    # 任务过期时间,这样写更加明显
    CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 结果过期
    
    
    # Timezone
    CELERY_TIMEZONE = "Asia/Shanghai"  # 指定时区,不指定默认为 'UTC'
    
    # import # 指定导入的任务模块
    CELERY_IMPORTS = ['test3',"celerydemo"]
    

    Celerytasks.py 内容

    # coding:utf-8
    import os
    from main import celery,mail,app
    from flask_mail import Message
    os.environ.setdefault('FORKED_BY_MULTIPROCESSING','1')
    
    
    @celery.task
    def add(x, y):
        return x + y
    
    
    @celery.task
    def send_async_email(title,email,body):
        """Background task to send an email with Flask-Mail."""
    
        msg = Message(title,sender="xx@163.com",
                      recipients=[email])
        msg.body = body
        with app.app_context():
            return mail.send(msg)
    

    启动一个worker

    Celery –A main.celery worker –l info

    启动flask项目

    Python main.py

    5.celery 多队列、多路由

    5.0 多队列、多worker流程图

    如果要说celery的分布式应用的话,我觉得就要提到celery的消息路由机制,就要提一下AMQP协议。具体的可以查看AMQP的文档。简单地说就是可以有多个消息队列(Message Queue),不同的消息可以指定发送给不同的Message Queue,而这是通过Exchange来实现的。发送消息到Message Queue中时,可以指定routiing_key,Exchange通过routing_key来把消息路由(routes)到不同的Message Queue中去,

    image.png

    5.1 celeryconfig.py 配置

    #!/usr/bin/env python
    
    # -*- coding:utf-8 -*-
    
    from kombu import Exchange, Queue
    
    BROKER_URL = "redis://127.0.0.1:6379/1"
    
    CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/2"
    
    # 多队列
    
    CELERY_QUEUES = (
    
    Queue("default", Exchange("default"), routing_key="default"),
    
    Queue("for_task_A", Exchange("for_task_A"), routing_key="for_task_A"),
    
    Queue("for_task_B", Exchange("for_task_B"), routing_key="for_task_B")
    
    )
    
    # 路由 通过CELERY_ROUTES来为每一个task指定队列,如果有任务到达时,通过任务的名字来让指定的worker来处理。
    
    CELERY_ROUTES = {
    
    'tasks.taskA': {"queue": "for_task_A", "routing_key": "for_task_A"},
    
    'tasks.taskB': {"queue": "for_task_B", "routing_key": "for_task_B"}
    
    }
    

    5.2 tasks.py 内容

    #!/usr/bin/env python
    
    #-*- coding:utf-8 -*-
    
    from celery import Celery
    
    app = Celery()
    
    app.config_from_object("celeryconfig") # 指定配置文件
    
    @app.task
    
    def taskA(x,y):
    
     return x + y
    
    @app.task
    
    def taskB(x,y,z):
    
     return x + y + z
    
    @app.task
    
    def add(x,y):
    
    return x + y
    

    5.3 测试文件 run_redis_queue.py

    # coding:utf-8
    
    from redis_queue.tasks import *
    
    re1 = taskA.delay(100, 200)
    
    re2 = taskB.delay(1, 2, 3)
    
    re3 = add.delay(1, 2)
    

    5.4 启动

    # windows
    
    celery -A tasks worker -l info -n workerA.%h -Q for_task_A -P eventlet
    
    celery -A tasks worker -l info -n workerB.%h -Q for_task_B -P eventlet
    
    celery -A tasks worker -l info -n worker.%h -Q celery -P eventlet
    
    # linux
    
    celery -A tasks worker -l info -n workerA.%h -Q for_task_A
    
    celery -A tasks worker -l info -n workerB.%h -Q for_task_B
    
    celery -A tasks worker -l info -n worker.%h -Q celery
    
    linux 直接运行 python run_redis_queue.py
    
    windows 无法使用  运行后 无效果  只能直接在python命令窗口中调用
    
    各个任务 只在指定队列中的worker中运行
    

    6. celery multi

    ### **6.1** **后台启动worker**
    
    celery multi start w1 -A proj -l info  # 启动
    
    celery multi restart w1 -A proj -l info  # 重新启动
    
    celery multi stop w1 -A proj -l info # 异步关闭 立即返回
    
    celery multi stopwait w1 -A proj -l info # 等待关闭操作完成
    
     # 默认情况下,celery会在当前目录下创建pidfile和logfile.为了防止多个worker在启动时相互影响,你可以指定一个特定的目录。
    
    $ mkdir -p /var/run/celery
    
    $ mkdir -p /var/log/celery
    
    $ celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid \
    
     --logfile=/var/log/celery/%n%I.log
    
    在linux下执行 windows有报错
    

    相关文章

      网友评论

          本文标题:celery 定时任务 ,异步邮箱任务,flower web监控

          本文链接:https://www.haomeiwen.com/subject/mepibftx.html