美文网首页
在django中使用celery和RabbitMQ

在django中使用celery和RabbitMQ

作者: 王峰芝 | 来源:发表于2018-01-17 17:40 被阅读0次

    为什么要使用celery?

    Web应用程序适用于请求和响应周期。当用户访问您的应用程序的某个URL时,Web浏览器向您的服务器发送一个请求。Django收到这个请求,并用它做一些事情。通常它涉及在数据库中执行查询,处理数据。当Django做他的事情并处理请求时,用户必须等待。当Django完成处理请求的工作时,它会发送一个响应给最终会看到某些东西的用户。

    理想情况下,这个请求和响应周期应该很快,否则我们会让用户等待太久。更糟的是,我们的Web服务器一次只能服务一定数量的用户。所以,如果这个过程很慢,它可以限制您的应用程序一次可以提供的页面数量。

    大多数情况下,我们可以使用缓存,优化数据库查询等来解决此问题。但是有些情况下,没有其他的选择:需要做大量的工作。一个报告页面,大量的数据输出,视频/图像处理是你可能想要使用celery的几个例子。

    我们在不是在整个项目中使用celery,而只是用于耗费时间的特定任务。这里的想法是尽可能快地响应用户,并将耗时的任务传递给队列,以便在后台执行,并始终保持服务器准备好响应新的请求。

    安装

    安装celery最简单的方法是使用点:

    pip install Celery

    现在我们必须安装RabbitMQ。

    在Ubuntu 16.04上安装RabbitMQ

    要将其安装在较新的Ubuntu版本上非常简单:

    apt-get install -y erlangapt-get install rabbitmq-server

    然后启用并启动RabbitMQ服务:

    systemctlenable rabbitmq-serversystemctl start rabbitmq-server

    检查状态以确保一切运行平稳:

    systemctl status rabbitmq-server

    在Mac上安装RabbitMQ

        brew install rabbitmq

    RabbitMQ脚本安装到/usr/local/sbin。你可以把它添加到您的.bash_profile或.profile。

        vim ~/.bash_profile

    然后将其添加到文件的底部:

        export PATH=$PATH:/usr/local/sbin

    重新启动终端,确保更改已生效。

    现在您可以使用以下命令启动RabbitMQ服务器:

        rabbitmq-server

    celery基本设置

    首先,考虑名为core的应用程序名为mysite的以下Django项目:

    将CELERY_BROKER_URL配置添加到settings.py文件中:

    settings.py

       CELERY_BROKER_URL='amqp://localhost'

    除了settings.pyurls.py文件之外,我们还要创建一个名为celery.py的新文件。

    celery.py

    import  os

     from  celery import    Celery os.environ.setdefault('DJANGO_SETTINGS_MODULE','mysite.settings')

    app=Celery('mysite')app.config_from_object('django.conf:settings',namespace='CELERY')

    app.autodiscover_tasks()

    现在编辑项目根目录下的__init__.py文件:

    __init__.py

    from.celeryimportappascelery_app__all__=['celery_app']

    这将确保每次Django启动,我们的celery应用程序是重要的。

    创建我们的第一个芹菜任务。

    我们可以在Django应用程序中创建一个名为tasks.py的文件,并将所有的Celery任务放到这个文件中。我们在项目根目录中创建的Celery应用程序将收集在INSTALLED_APPS 配置中列出的所有Django应用程序中定义的所有任务。

    为了测试目的,我们创建一个Celery任务,生成一些随机的用户帐户。

    app/ tasks.py

    from __future__import absolute_import, unicode_literals

    import string

    from celeryimport shared_task

    from django.contrib.auth.modelsimport User

    from django.utils.cryptoimport get_random_string

    @shared_task

    def create_random_user_accounts(total):

        for i in range(total):

            username = 'user_{}'.format(get_random_string(10, string.ascii_letters))

            email = '{}@example.com'.format(username)

            password = get_random_string(50)

            User.objects.create_user(username=username, email=email, password=password)

        return '{} random users created with success!'.format(total)

    这里的重要部分是:

    from celery import shared_task

    @shared_task

    def name_of_your_function(optional_param):

       pass# do something heavy

    然后我定义了一个表单和一个视图来处理我的Celery任务:

    forms.py

    from djangoimport forms

    from django.core.validatorsimport MinValueValidator, MaxValueValidator

    class GenerateRandomUserForm(forms.Form):

        total = forms.IntegerField(

            validators=[

            MinValueValidator(50),

            MaxValueValidator(500)

            ]

    )

    这个是需要一个50到500之间的正整数字段。如图:

    然后在views中:

    from django.contrib.auth.modelsimport User

    from django.contribimport messages

    from django.views.generic.editimport FormView

    from django.shortcutsimport redirect

    from .formsimport GenerateRandomUserForm

    from .tasksimport create_random_user_accounts

    class GenerateRandomUserView(FormView):

    template_name ='core/generate_random_users.html'

        form_class = GenerateRandomUserForm

    def form_valid(self, form):

        total = form.cleaned_data.get('total')

        create_random_user_accounts.delay(total)  # 重点 就是便是这个任务在cerely后台执行。然后Django继续处理我的视图,GenerateRandomUserView并顺利返回给用户。

        messages.success(self.request,'We are generating your random users! Wait a moment and     refresh this page.')

        return redirect('users_list')

    启动工作进程

    打开一个新的终端选项卡,然后运行以下命令:

    celery -A mysite worker -l info

    mysite更改为您的项目名称。结果是这样的:

    现在我们可以测试它。我提交了500个表单,创建了500个随机用户。

    同时,检查celery worker过程:

    [2017-08-20 19:11:17,485: INFO/MainProcess] Received task:mysite.core.tasks.create_random_user_accounts[8799cfbd-deae-41aa-afac-95ed4cc859b0]

    然后几秒钟后,如果我们刷新页面,用户在那里:

    如果我们再次查看芹菜工作进程,我们可以看到它完成了执行:

    [2017-08-20 19:11:45,721: INFO/ForkPoolWorker-2] Taskmysite.core.tasks.create_random_user_accounts[8799cfbd-deae-41aa-afac-95ed4cc859b0] succeededin28.225658523035236s:'500 random users created with success!'

    用Supervisord管理生产中的工人流程

    如果您将应用程序部署到DigitalOcean等 VPS ,则需要在后台运行辅助进程。在我的教程中,我喜欢使用Supervisord来管理Gunicorn的工作人员,所以它通常与Celery很合适。

    首先安装它(在Ubuntu上):

    sudo apt-get install supervisor

    然后创建一个文件名为mysite的-celery.conf的文件夹中:/etc/supervisor/conf.d/mysite-celery.conf

    [program:mysite-celery]

    command=/home/mysite/bin/celery worker -A mysite --loglevel=INFO

    directory=/home/mysite/mysite

    user=nobody

    numprocs=1

    stdout_logfile=/home/mysite/logs/celery.log

    stderr_logfile=/home/mysite/logs/celery.log

    autostart=true

    autorestart=true

    startsecs=10

    ; Need to waitfor currently executing tasks to finish at shutdown.

    ; Increase thisif you have verylong running tasks.

    stopwaitsecs =600

    stopasgroup=true

    ; Set Celery priority higher than default (999)

    ; so,if rabbitmqis supervised, it will start first.

    priority=1000

    在下面的例子中,Django项目是在虚拟环境中。路径为/ home / mysite /

    现在重新读取配置并添加新的过程:

    sudo supervisorctl reread

    sudo supervisorctl update

    如果您不熟悉将Django部署到生产服务器并使用Supervisord,可以参考:如何将Django应用程序部署到supervisor。

    相关文章

      网友评论

          本文标题:在django中使用celery和RabbitMQ

          本文链接:https://www.haomeiwen.com/subject/jkpuoxtx.html