美文网首页
django+celery+rabbitmq实践

django+celery+rabbitmq实践

作者: 金融测试民工 | 来源:发表于2020-11-27 10:33 被阅读0次

    看到标题,相信大家就知道这个帖子要讲啥了……如果你希望在django中使用celery执行异步任务,用MQ(rabbimq,下同)做消息中间件,那么此贴完全可以满足你,包括celeryp、MQ配置,以及MQ路由配置等,相信你会喜欢。    

简单说明一下:

    1,django:web框架

   2,celery: 用于创建执行异步任务

   3,RabbitMQ:消息队列,主要用于消息存储

   对于celery,rabbimq安装没啥好说的, pip直接装就好了,配置与启动也不赘述。关键环节简要说明如下:

   一,在django中配置和使用celery

   配置大致如下:

  (1),工程目录下,创建celery.py, 内容编辑如下:

#!/usr/bin/python#!coding=utf-8

from __future__ import absolute_import

import os

from celery import Celery

from django.conf import settings

app = Celery('project')

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project.settings')

app.config_from_object('django.conf:settings')

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

'''

autodiscover_tasks instructs celery to auto-discover all asynchronous tasks for all the applications listed under `INSTALLED_APPS`,

Celery will look for definitions of asynchronous tasks within a file named `tasks.py` file in each of the application directory.

'''

@app.task(bind=True)

def debug_task(self):

    print('Request: {0!r}'.format(self.request))    另外。django settings文件下配置:

BROKER_URL= “”CELERY_RESULT_BACKEND =“”CELERYD_MAX_TASKS_PER_CHILD = 200  #每个worker执行了多少任务就会死掉

CELERYD_CONCURRENCY = 20 # celery worker的并发数.不是worker也越多越好,保证任务不堆积,加上一定新增任务的预留就可以

CELERYD_PREFETCH_MULTIPLIER = 3 # celery worker 每次去rabbitmq取任务的数量

CELERY_CREATE_MISSING_QUEUES = True

CELERY_TASK_RESULT_EXPIRES = 3600 # 任务执行结果的超时时间

……    

(2), 各app根目录下创建tasks.py

       app相关的task均放置在tasks.py中,并用装饰器shared_task装饰,示例如下:

@shared_task(

    # ignore_result=True, #忽略返回值

    name="*****",  #任务名称

    exchange="***",    #推送到MQ 的exchange分组

    routing_key="***"  # key

)

def get_jid_result(*args, **kwargs):    pass    任务调度方法:

    1)后台定时执行task,可安装djcelery模块,在django后台调度已注册的任务,关于djcelery的使用方法,在此不详细介绍。  

    2)触发调度,在需要调用task的d地方,直接调用各task的delay()方法即可(当然要先import),如:

    get_jid_result.delay(*args, **kwargs)

    二, celery woker

    工程目录下执行sudo -u apache bash -c 'celery -A patools worker -l info  (注意,此处使用apache用户执行,celery建议不要使用superuser 启动),启动celery  worker。

    建议:为确保celery  worker进程平稳运行, 可以使用superivsor守护该进程的执行, 配置示例如下:

    [*****]

    directory=*****(工程根目录)

    command=sudo -u apache bash -c 'celery -A prj worker -l info'  (其中prj为工程名)

    autostart=true

    autorestart=true

    stopwaitsecs=10

    startretries=5

    stderr_logfile=/var/log/***_in.log

    stdout_logfile=/var/log/***_out.log

    进入supervisorctl, 启动相应进程,OK!

    三,中间件rabbitmq

    对于中间件的选择,redis,MQ都可以,差异不作详细说明,视应用情况而选择。实践中采用rabbimq作为中间件,在此,对rabbimq路由做简单说明。

    django  settings配置,如celery配置部分, 另外,配置消息路由:

CELERY_DEFAULT_QUEUE = 'default'CELERY_DEFAULT_EXCHANGE = 'default'

CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'

CELERY_DEFAULT_ROUTING_KEY = 'default'# routing task

CELERY_IMPORTS = (

    # 注册各应用tasks文件在此,注意,必须确保注册模块存在,否则将导致worker进程报错

    "app1.tasks",

    "app2.tasks",

    # other apps

)

#交换器定义,示例如下:

default_exchange = Exchange('default', type='direct')

# bulkops related exchange

bulkops_exchange = Exchange("bulkops_exchange", type='direct')

# alphaops related exchange

Alphaops_exchange = Exchange("Alphaops_exchange", type='direct')# queue绑定# queue绑定,示例如下

CELERY_QUEUES = {

    "default": { # 默认队列

        "exchange": "default",

        "exchange_type": "direct",

        "routing_key": "/"

    },

    "bulkops": { # 默认队列

        "exchange": 'bulkops_exchange',

        "exchange_type": "direct",

        "routing_key": "bulkops.task"

    },

    # register other app-queue here

}

# 路由定义, 示例如下:

class MyRouter(object):

    def route_for_task(self, task, args=None, kwargs=None): #为任务指定route

        # 示例1

        if task.startswith('bulkops.tasks'):

            return {

                'exchange':'bulkops_exchange',

                'exchange_type': 'direct',

                'routing_key':'bulkops.task',

            }

        # 示例2

        elif task.startswith('Alphaops.tasks'):

            return {

                'exchange': 'Alphaops_exchange',

                'exchange_type': 'direct',

                'routing_key': 'Alphaops.task'

            }

        else: # 默认被放到默认队列

            return None

CELERY_ROUTES = (MyRouter(),)

    以上是我在搭建基于Django框架的过程中对celery及MQ配置应用实践的小结,希望对有相关需求的同志们有些许的帮助,不足之处多多批评指正。

相关文章

  • django+celery+rabbitmq实践

    看到标题,相信大家就知道这个帖子要讲啥了……如果你希望在django中使用celery执行异步任务,用MQ(r...

  • Day37 流程

    一个原因/没有原因些许想法定下目标翻阅资料以史为鉴制定计划模拟演练实践实践实践实践实践实践……实践实践实践实践实践...

  • 实践实践实践

    发现自己就是想的太多,做的太少。9月份考察的一个项目,不管从哪个方面来讲,都觉得这个项目是非常好,值得一生去做的事...

  • 白玉花果的一般老化特征和老光特征,跟仿品比较

    实践出真知,但不正确的实践必不出真知。正确的实践是个人的实践和社会的实践相结合,脱离社会实践的个人实践是错误的实践...

  • 暑假工作 ‖ 社会实践报告

    实践时间:2018.07.09—2018.08.25实践人:欧阳广梅实践事件:1.社会实践(爱心传递)2.工作实践...

  • 实践实践再实践

    生命是一种体验。

  • 实践实践再实践

    这是我参加11月MJ老师的超级数字力课程的最大体验。 我作为一名复训生参加此次课程,可能因为现在是熊市,大家都不太...

  • 实践实践再实践

    猫叔的分享总是言辞恳切,每一条经验都是他实践过的,我们只要认真跟着做就行。 一、手打字录视频 有这一决定之前,我问...

  • 实践实践再实践

    记录下近来的想法,当有些想法的时候,人一下子可能会规划的很多,说要做个还有那个,然后还可以做那个,睡了一觉之后,发...

  • 实践,实践,还是实践

    王阳明说,做不到就不是真知道。 嗯嗯,我深以为然。 如果说,你知道孝敬老人是传统美德,那你...

网友评论

      本文标题:django+celery+rabbitmq实践

      本文链接:https://www.haomeiwen.com/subject/kjztwktx.html