美文网首页vdjango
django使用原生celery

django使用原生celery

作者: bigyuang | 来源:发表于2019-02-22 17:09 被阅读0次

    包版本:
    django: 2.0.6
    celery: 4.2.1
    flower: 0.9.2
    redis: 2.10.6
    eventlet 0.24.1
    参考链接:
    celery: 4.2.1
    flower: 0.9.2
    django celery的分布式异步之路(一) 起步
    使用celery的backend异步获取结果
    Celery ValueError: not enough values to unpack (expected 3, got 0)的解决方案
    异步任务队列Celery在Django中的使用

    ├─celery_django_demo
        └─celery_demo
            ├─...
            ├─tasks.py
            └─views.py
        └─celery_django_demo
            ├─...
            └─celery.py
    

    celery.py

    #  celery配置文件
    from __future__ import absolute_import, unicode_literals
    import os
    from celery import Celery, platforms
    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_django_demo.settings')
    
    app = Celery('celery_django_demo',
                 broker='redis://127.0.0.1:6379/2',
                 backend='redis://127.0.0.1:6379/3',
                 include=['celery_demo.tasks']
          )
    # Using a string here means the worker doesn't have to serialize
    # the configuration object to child processes.
    # - namespace='CELERY' means all celery-related configuration keys
    #   should have a `CELERY_` prefix.
    app.config_from_object('django.conf:settings', namespace='CELERY')
    
    app.conf.update(
        CELERY_ACKS_LATE=True,
        CELERY_ACCEPT_CONTENT=['pickle', 'json'],
        CELERYD_FORCE_EXECV=True,
        CELERYD_MAX_TASKS_PER_CHILD=500,
        BROKER_HEARTBEAT=0,
    )
    
    # Optional configuration, see the application user guide.
    app.conf.update(
        CELERY_TASK_RESULT_EXPIRES=3600,   # celery任务执行结果的超时时间,即结果在backend里的保存时间,单位s
    )
    
    # Load task modules from all registered Django app configs.
    app.autodiscover_tasks()
    
    platforms.C_FORCE_ROOT = True
    

    tasks.py

    from __future__ import absolute_import, unicode_literals
    import json
    import logging
    import traceback
    import time
    from celery import shared_task
    
    logger = logging.getLogger(__name__)
    
    @shared_task
    def celery_demo_task(param_dict):
        logger.info('foot task start! param_dict:%s' % param_dict)
        time.sleep(5)
        return 'finished'
    
    @shared_task
    def celery_demo_task2(param_dict):
        logger.info('foot task start! param_dict:%s' % param_dict)
        time.sleep(5)
        return 'finished'
    
    @shared_task
    def celery_demo_task3(param_dict):
        logger.info('foot task start! param_dict:%s' % param_dict)
        time.sleep(5)
        return 'finished'
    
    def do_work(user):
        dispatch(celery_demo_task, {'x': user})
        dispatch(celery_demo_task3, {'z': user})
        sub_work(user)
    
    def sub_work(user):
        dispatch(celery_demo_task2, {'y': user})
    
    
    # 分发任务
    def dispatch(task, param_dict):
        param_json = json.dumps(param_dict)
        try:
            task.apply_async(
                [param_json],
                retry=True,
                retry_policy={
                    'max_retries': 1,
                    'interval_start': 0,
                    'interval_step': 0.2,
                    'interval_max': 0.2,
                },
            )
        except Exception as ex:
            logger.info(traceback.format_exc())
            raise
    

    views.py

    import traceback
    from django.http import JsonResponse, HttpResponse
    from django.views.decorators.csrf import csrf_exempt
    from .tasks import do_work
    
    
    @csrf_exempt
    def hello(request):
        if request.method == 'GET':
            try:
                user = request.GET.get('username')
                do_work(user)
                return JsonResponse({'code': 0, 'msg':'success'})
            except:
                return JsonResponse({'code': -1, 'msg': traceback.format_exc()})
    

    pre:配置好settings.py urls.py
    启动django:python manage.py runserver
    启动celery:celery -A celery_django_demo worker -l info -P eventlet
    可视化网页监控:celery flower --broker=redis://127.0.0.1:6379/2

    在浏览器打开flower http://localhost:5555/, 用postman发送get请求(或者在浏览器请求) http://127.0.0.1:8000/hello?username=xxx,就可以在flower监控celery执行了。

    注:
    增删改task之后,要重启一下celery

    ----------------------------------------------------欢迎指正----------------------------------------------------

    相关文章

      网友评论

        本文标题:django使用原生celery

        本文链接:https://www.haomeiwen.com/subject/xmbeyqtx.html