美文网首页
django-celery -1 basic

django-celery -1 basic

作者: Busyasabee | 来源:发表于2021-03-17 16:52 被阅读0次

    django-celery

    https://blog.csdn.net/bbwangj/article/details/89312355

    计划任务功能

    • Celery 是一个强大的分布式任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。它的架构组成如下图
    • image.png

    Celery 主要包含以下几个模块:

    • 任务模块 Task,包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列。

    • Bear: 任务调度,通过celery beat可以周期性的将需要执行的任务发送给队列

    • 消息中间件 Broker,Broker,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。

    • 任务执行单元 Worker,Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它。

    • 任务结果存储 Backend,Backend 用于存储任务的执行结果,以供查询。同消息中间件一样,存储也可使用 RabbitMQ, Redis 和 MongoDB 等。

    • 异步任务

    使用 Celery 实现异步任务主要包含三个步骤:

    1. 创建一个 Celery 实例
    2. 启动 Celery Worker
    3. 应用程序调用异步任务

    django-celery配置

    0. 最终项目修改文件如下:
    aiops-base[xy_m]  # 项目名
      | -- apps
              | -- ops_nginx  # 应用
                        | -- tasks.py
      | -- xm_m # 工程名
              | --  __init__.py
              | -- celery.py
              | -- settings.py
    
    1. requirements.txt 依赖

    django-celery 目前支持到 3.3.1, 相应python3.6.x, celery使用3.1.x,

    django-celery
    flower
    redis==2.10.6
    
    2. setting.py
    • 开头增加如上配置文件,根据实际情况配置redis的地址和端口,时区一定要设置为Asia/Shanghai。否则时间不准确回影响定时任务的运行。

    • 首先导出djcelery模块,并调用setup_loader方法加载有关配置;注意配置时区,不然默认使用UTC时间会比东八区慢8个小时。其中INSTALLED_APPS末尾添加两项,分别表示添加celery服务和自己定义的apps服务。

    
    INSTALLED_APPS = {
    ...
    djcelery,
    ...
    }
    
    # Celery
    import djcelery
    djcelery.setup_loader()
    # 设置时区
    CELERY_TIMEZONE = 'Asia/Shanghai'
    # 使用redis作为任务队列
    BROKER_URL = 'redis://127.0.0.1:6379/8'
    # 定时任务调度器
    CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
    # 使用redis存储任务执行结果,默认不使用
    CELERY_RESULT_BACKEND = 'redis://localhost:6379/9'
    # CELERY_ACCEPT_CONTENT = ['application/json'] # 指定任务接收的内容序列化类型
    # 任务序列化方式
    CELERY_TASK_SERIALIZER = 'json'
    # 任务结果序列化方式
    CELERY_RESULT_SERIALIZER = 'json'
    # 任务执行结果的超时时间 s
    # CELERY_TASK_RESULT_EXPIRES = 900
    # 是否压缩
    CELERY_MESSAGE_COMPRESSION = 'zlib'
    # 并发数默认已CPU数量定
    CELERYD_CONCURRENCY = 4
    # celery worker 每次去redis取任务的数量
    CELERYD_PREFETCH_MULTIPLIER = 4
    # 每个worker最多执行N个任务就摧毁
    CELERYD_MAX_TASKS_PER_CHILD = 20
    # 可以防止死锁
    CELERYD_FORCE_EXECV = True
    # CELERY_ENABLE_UTC = False  # 关闭时区
    
    3. 编写celery.py # 项目目录下
    from __future__ import absolute_import
    import os
    from celery import Celery
    
    # 工程名
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'xy_m.settings')
    from django.conf import settings
    # 工程名
    app = Celery('xy_m')
    
    app.config_from_object('django.conf:settings')
    # This means that you don’t have to use multiple configuration files, and instead configure Celery directly from the Django settings.
    # You can pass the object directly here, but using a string is better since then the worker doesn’t have to serialize the object.
    
    app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
    
    
    # With the line above Celery will automatically discover tasks in reusable apps if you define all tasks in a separate tasks.py module.
    # The tasks.py should be in dir which is added to INSTALLED_APP in settings.py.
    # So you do not have to manually add the individual modules to the CELERY_IMPORT in settings.py.
    
    @app.task(bind=True)
    def debug_task(self):
        print('Request: {0!r}'.format(self.request))  # dumps its own request information
    
    4. 修改工程名下的 init.py
    #!/bin/python
    from __future__ import absolute_import
    
    # This will make sure the app is always imported when
    # Django starts so that shared_task will use this app.
    from .celery import app as celery_app
    
    5. 添加apps中需要执行App, 添加到tasks.py中
    from __future__ import absolute_import
    from celery import task
    
    
    @task()
    def add(x, y):
        """
        task 测试
        :param x:
        :param y:
        :return:
        """
        print("%d + %d = %d" % (x, y, x + y))
        return x + y
    
    6. 同步数据库
    python manage.py makemigrations
    python manage.py migrate
    
    7. 启动进程
    # 用来监控任务变化的
    python manage.py celery beat
    # 任务执行进程,worker进程
    python manage.py  celery worker -c 6 -l debug  
    # flow启动
    celery flower --address=0.0.0.0 --port=9001 --broker=redis://127.0.0.1:6379/8
    [I 210111 19:37:08 command:137] Visit me at http://0.0.0.0:9001
    [I 210111 19:37:08 command:142] Broker: redis://127.0.0.1:6379/8
    [I 210111 19:37:08 command:145] Registered tasks: 
        ['celery.backend_cleanup',
         'celery.chain',
         'celery.chord',
         'celery.chord_unlock',
         'celery.chunks',
         'celery.group',
         'celery.map',
         'celery.starmap']
    [I 210111 19:37:08 mixins:231] Connected to redis://127.0.0.1:6379/8
    
    9. django-admin 添加任务
    • image.png
    • image.png
    • 效果


      image.png

    相关文章

      网友评论

          本文标题:django-celery -1 basic

          本文链接:https://www.haomeiwen.com/subject/cjzdaktx.html