pip3 install celery
在根目录下添加celery.py文件
celery.py文件
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "adminIE.settings") #
app = Celery("adminIE")
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object("django.conf:settings", namespace="CELERY")
设置以CELERY开头
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print("Request: {0!r}".format(self.request))
在根目录下的init.py中配置
from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ("celery_app",)
settings.py中Celery 配置
# Celery 配置
# 地址 MQ地址 配置端口
# 配置消息中间件
CELERY_BROKER_URL = 'amqp://kulong:11111111@localhost:5672/myvhost'
# 以上信息需要在 rabbitmq 中添加:
# 用户 shark 命令为: rabbitmqctl add_user shark QFedu123
# 虚拟主机 qfvhost 命令为: rabbitmqctl add_vhost qfvhost
# 授权信息 命令为: rabbitmqctl set_permissions -p qfvhost shark ".*" ".*" ".*"
# CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'
# redis 配置
# 将数据存储在redis的 1库中
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
# CELERY_TASK_SERIALIZER = 'json'
# 执行任务的并发工作进程/线程/绿色线程的数量。
# 如果您主要执行I / O,则可以有更多的进程,
# 但如果主要是CPU约束,请尝试使其与计算机上的CPU数量保持接近。
# 如果未设置,将使用主机上的CPU /内核数。
# 并发数
CELERY_WORKER_CONCURRENCY = 6
# 延迟确认 意味着任务消息将在任务执行后得到确认
CELERY_TASK_ACKS_LATE = True
# 每个 worker 最多执行 60 个任务就自动销毁,防止内存泄露
# 任务数
CELERY_WORKER_MAX_TASKS_PER_CHILD = 60
# 单个任务的硬时间限制(秒)。
# 超过此值时,处理任务的工作进程将被终止并替换为新的工作进程。
CELERY_TASk_TIME_LIMIT = 5 * 60
celery -A tasks worker --loglevel=info
-A 指定app名称()
RabbitMQ
pip3 install rabbitmq
pip3 install redis
pip3 install django-debug-toolbar

设置中间人
$ sudo rabbitmqctl add_user myuser mypassword
$ sudo rabbitmqctl add_vhost myvhost
$ sudo rabbitmqctl set_user_tags myuser mytag
$ sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"
settings.py配置
broker_url = 'amqp://myuser:mypassword@localhost:5672/myvhost'
Django配置settings.py
CELERY_BROKER_URL = 'amqp://shark:QFedu123@localhost:5672/qfvhost'
启动容器,映射端口
docker run -d -p 5672:5672 rabbitmq
docker run -d -p 6379:6379 redis:alpine
在app的目录下创建task.py
被装饰器装饰(@shared_task)用一个函数将异步任务封装
,在视图函数调用
from __future__ import absolute_import, unicode_literals
from celery import shared_task
from cmdb.models import InventoryPool
from .utils.handle_command import HandleCommand
@shared_task
def task_ansible(command):
inventorys = InventoryPool.objects.all()
handler = HandleCommand(command, inventorys)
ret = handler.exec_command()
return ret
在视图函数中调用
调用函数的delay方法
from .tasks import task_ansible
task = task_ansible.delay(command) # 必须调用delay方法,异步
print(task.task_id)

task_json = {
"id": task_obj.task_id,
"status": task_obj.status,
"success": task_obj.successful(),
"result": task_obj.result,
}
print(task_json)

网友评论