包版本:
django: 2.0.6
celery: 4.2.1
flower: 0.9.2
redis: 2.10.6
eventlet 0.24.1
参考链接:
celery: 4.2.1
flower: 0.9.2
django celery的分布式异步之路(一) 起步
使用celery的backend异步获取结果
Celery ValueError: not enough values to unpack (expected 3, got 0)的解决方案
异步任务队列Celery在Django中的使用
├─celery_django_demo
└─celery_demo
├─...
├─tasks.py
└─views.py
└─celery_django_demo
├─...
└─celery.py
celery.py
# celery配置文件
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery, platforms
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_django_demo.settings')
app = Celery('celery_django_demo',
broker='redis://127.0.0.1:6379/2',
backend='redis://127.0.0.1:6379/3',
include=['celery_demo.tasks']
)
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
app.conf.update(
CELERY_ACKS_LATE=True,
CELERY_ACCEPT_CONTENT=['pickle', 'json'],
CELERYD_FORCE_EXECV=True,
CELERYD_MAX_TASKS_PER_CHILD=500,
BROKER_HEARTBEAT=0,
)
# Optional configuration, see the application user guide.
app.conf.update(
CELERY_TASK_RESULT_EXPIRES=3600, # celery任务执行结果的超时时间,即结果在backend里的保存时间,单位s
)
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
platforms.C_FORCE_ROOT = True
tasks.py
from __future__ import absolute_import, unicode_literals
import json
import logging
import traceback
import time
from celery import shared_task
logger = logging.getLogger(__name__)
@shared_task
def celery_demo_task(param_dict):
logger.info('foot task start! param_dict:%s' % param_dict)
time.sleep(5)
return 'finished'
@shared_task
def celery_demo_task2(param_dict):
logger.info('foot task start! param_dict:%s' % param_dict)
time.sleep(5)
return 'finished'
@shared_task
def celery_demo_task3(param_dict):
logger.info('foot task start! param_dict:%s' % param_dict)
time.sleep(5)
return 'finished'
def do_work(user):
dispatch(celery_demo_task, {'x': user})
dispatch(celery_demo_task3, {'z': user})
sub_work(user)
def sub_work(user):
dispatch(celery_demo_task2, {'y': user})
# 分发任务
def dispatch(task, param_dict):
param_json = json.dumps(param_dict)
try:
task.apply_async(
[param_json],
retry=True,
retry_policy={
'max_retries': 1,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
},
)
except Exception as ex:
logger.info(traceback.format_exc())
raise
views.py
import traceback
from django.http import JsonResponse, HttpResponse
from django.views.decorators.csrf import csrf_exempt
from .tasks import do_work
@csrf_exempt
def hello(request):
if request.method == 'GET':
try:
user = request.GET.get('username')
do_work(user)
return JsonResponse({'code': 0, 'msg':'success'})
except:
return JsonResponse({'code': -1, 'msg': traceback.format_exc()})
pre:配置好settings.py urls.py
启动django:python manage.py runserver
启动celery:celery -A celery_django_demo worker -l info -P eventlet
可视化网页监控:celery flower --broker=redis://127.0.0.1:6379/2
在浏览器打开flower http://localhost:5555/
, 用postman发送get请求(或者在浏览器请求) http://127.0.0.1:8000/hello?username=xxx
,就可以在flower监控celery执行了。
注:
增删改task之后,要重启一下celery
----------------------------------------------------欢迎指正----------------------------------------------------
网友评论