美文网首页Python Web开发学习
使用Celery+Redis实现异步任务,supervisor守

使用Celery+Redis实现异步任务,supervisor守

作者: 吾星喵 | 来源:发表于2018-11-30 17:10 被阅读1次

    个人博客,欢迎查看:https://blog.starmeow.cn/

    使用Celery+Redis实现异步任务,supervisor守护进程运行

    http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html#using-celery-with-django

    Django2.1.3+Celery4.2.1+Redis2.10.6(如果用3的redis会报错:AttributeError: 'float' object has no attribute 'items')+Redis-Server

    settings.py中配置

    # Celery+Redis实现异步任务,首先需要pip install celery
    # 配置Broker
    # BROKER_URL = 'redis://localhost:63079/3'  # 无密码情况
    BROKER_URL = 'redis://:password@127.0.0.1:63079/3'
    BROKER_TRANSPORT = 'redis'
    CELERY_ACCEPT_CONTENT = ['pickle', 'json']  # 启用实际使用的序列化程序,不加似乎会出现警告
    CELERYD_MAX_TASKS_PER_CHILD = 40  # 设置每个worker执行了多少任务就会死掉,长时间运行Celery有可能发生内存泄露
    
    

    创建celery.py模块

    项目中和settings.py同级目录下创建celery.py模块,用于定义Celery实例

    然后编辑

    # StarMeow/StarMeow/celery.py
    
    from __future__ import absolute_import, unicode_literals
    
    import os
    import django
    
    from celery import Celery, platforms
    from django.conf import settings
    
    # 为“celery”程序设置默认的Django settings模块。
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'StarMeow.settings')
    django.setup()  # 使用settings.py中的相关配置,不加这个,就会提示app参数不够
    # 如果不加django.setup(),下面就需要使用app = Celery('StarMeow', broker='...broker配置地址...')
    app = Celery('StarMeow')
    
    # 在这里使用字符串意味着worker不必序列化子进程的配置对象。
    # namespace ='CELERY'表示所有与celery相关的配置键应该有一个`CELERY_`前缀,
    # 例如settings.py中的CELERY_BROKER_URL,这个就需要加CELERY_前缀,不定义namespace,则不加前缀。
    app.config_from_object('django.conf:settings', namespace='CELERY')
    
    # 从所有注册的Django app configs加载任务模块。
    # app.autodiscover_tasks()
    app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)  # 也可以使用这行,只添加INSTALLED_APPS中的app
    
    platforms.C_FORCE_ROOT = True  # Linux在root用户下不能启动,需要添加这一行
    
    
    @app.task(bind=True)
    def debug_task(self):
        print('Request: {0!r}'.format(self.request))
    
    

    分析:

    • from __future__ import absolute_import:从绝对路径中导入的celery.py模块不会与库冲突
    • os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'StarMeow.settings'):为celery命令行程序设置默认的DJANGO_SETTINGS_MODULE环境变量
    • app.autodiscover_tasks():可重用应用程序的常见实践是在单独的任务中定义所有tasks.py模块,Celery有一种自动发现这些模块的方法。

    导入Celery应用程序

    # StarMeow/StarMeow/__init__.py
    
    from __future__ import absolute_import, unicode_literals
    
    # 然后需要在StarMeow/StarMeow/__init__.py模块中导入这个应用程序。
    # 这样可以确保Django启动时加载应用程序,以便@shared_task装饰器使用它
    from .celery import app as celery_app
    
    __all__ = ('celery_app',)
    

    应用下创建tasks.py

    使用@shared_task装饰器

    编写的任务可能存在于可重用的应用程序中,而可重用的应用程序不能依赖于项目本身,因此也不能直接导入应用程序实例。

    @shared_task装饰器允许您创建任务,而不需要任何具体的应用程序实例

    在blog应用下创建tasks.py文件,写入

    from __future__ import absolute_import, unicode_literals
    from celery import shared_task
    
    
    @shared_task
    def print_info():
        print('正在进入blog应用的tasks.py')
        return None  # 可以不返回
    

    另一种运行方式

    from StarMeow.celery import app
    
    
    @app.task
    def print_info():
        print('正在进入blog应用的tasks.py')
        return None  # 可以不返回
    

    settings.py增加这个参数,Celery就会忽略全部调度机制,立即调用你的代码。

    CELERY_ALWAYS_EAGER = True
    

    这样print_info()print_info.delay()效果一样

    启动celery

    测试方法: https://github.com/celery/celery/tree/master/examples/django/

    进入项目所在目录,启动虚拟环境,启动worker

    Windows下启动

    C:\Users\LR>cd E:\Sync\OneDrive\PycharmProjects\StarMeow
    
    C:\Users\LR>E:
    
    E:\Sync\OneDrive\PycharmProjects\StarMeow>workon StarMeow
    (StarMeow) E:\Sync\OneDrive\PycharmProjects\StarMeow>celery -A StarMeow worker -l info
    
    image.png

    视图里面添加运行

    class IndexView(View):
        def get(self, request, **kwargs):
            print_info.delay()
    
            # 省略
    

    然后访问

    image.png

    这个问题网上都有解决方案,使用一个最简单的方案,更改运行方式,访问就正常了

    celery -A StarMeow worker --pool=solo -l info
    
    image.png

    Linux下启动celery

    celery不能用root用户启动问题 C_FORCE_ROOT environment

    (StarMeow) root@StarMeow-Svr:~/django-web/StarMeow# celery -A StarMeow worker -l info
    /root/django-web/StarMeow/media/schedule/exportfiles/
    Running a worker with superuser privileges when the
    worker accepts messages serialized with pickle is a very bad idea!
    
    If you really want to continue then you have to set the C_FORCE_ROOT
    environment variable (but please think about this before you do).
    
    User information: uid=0 euid=0 gid=0 egid=0
    

    解决办法,在settings.py同级下的celery.py文件中增加

    from celery import Celery, platforms
    
    platforms.C_FORCE_ROOT = True  #加上这一行
    
    image.png

    使用flower检测队列运行情况

    Windows下进行的

    pip install flower
    
    (StarMeow) E:\Sync\OneDrive\PycharmProjects\StarMeow>celery flower -A StarMeow
    

    进入 http://localhost:5555 即可查看

    celery+supervisor(后台进程)

    配置python2虚拟环境

    每个人虚拟环境不同,需要自行处理。

    pip install supervisor  # python3不能使用
    

    使用python2完成

    root@StarMeow-Svr:~/django-web# pyenv install --list
    Available versions:
        # ...
      2.7.15
        # ...
    
    root@StarMeow-Svr:~/django-web# pyenv install 2.7.15
    
    root@StarMeow-Svr:~/django-web# pyenv versions
      system
      2.7.15
    * 3.6.6 (set by /root/.pyenv/version)
      3.6.6/envs/StarMeow
      StarMeow
    
    root@StarMeow-Svr:~/django-web# pyenv virtualenv 2.7.15 Supervisor
    
    Collecting virtualenv
      Downloading http://mirrors.tencentyun.com/pypi/packages/7c/17/9b7b6cddfd255388b58c61e25b091047f6814183e1d63741c8df8dcd65a2/virtualenv-16.1.0-py2.py3-none-any.whl (1.9MB)
        100% |████████████████████████████████| 1.9MB 55.8MB/s 
    Installing collected packages: virtualenv
    Successfully installed virtualenv-16.1.0
    You are using pip version 9.0.3, however version 18.1 is available.
    You should consider upgrading via the 'pip install --upgrade pip' command.
    New python executable in /root/.pyenv/versions/2.7.15/envs/Supervisor/bin/python2.7
    Also creating executable in /root/.pyenv/versions/2.7.15/envs/Supervisor/bin/python
    Please make sure you remove any previous custom paths from your /root/.pydistutils.cfg file.
    Installing setuptools, pip, wheel...
    done.
    Requirement already satisfied: setuptools in /root/.pyenv/versions/2.7.15/envs/Supervisor/lib/python2.7/site-packages
    Requirement already satisfied: pip in /root/.pyenv/versions/2.7.15/envs/Supervisor/lib/python2.7/site-packages
    
    root@StarMeow-Svr:~/django-web# pyenv virtualenvs
      2.7.15/envs/Supervisor (created from /root/.pyenv/versions/2.7.15)
      3.6.6/envs/StarMeow (created from /root/.pyenv/versions/3.6.6)
      StarMeow (created from /root/.pyenv/versions/3.6.6)
      Supervisor (created from /root/.pyenv/versions/2.7.15)
    
    root@StarMeow-Svr:~/django-web# pyenv versions
      system
      2.7.15
      2.7.15/envs/Supervisor
    * 3.6.6 (set by /root/.pyenv/version)
      3.6.6/envs/StarMeow
      StarMeow
      Supervisor
    
    # 进入虚拟环境,激活
    root@StarMeow-Svr:~/django-web# mkdir Supervisor
    root@StarMeow-Svr:~/django-web# cd Supervisor/
    root@StarMeow-Svr:~/django-web/Supervisor# pyenv activate Supervisor
    
    (Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# pip list
    Package    Version
    ---------- -------
    pip        18.1   
    setuptools 40.6.2 
    wheel      0.32.3 
    (Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# pip install supervisor
    (Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# pip list
    Package    Version
    ---------- -------
    meld3      1.0.2  
    pip        18.1   
    setuptools 40.6.2 
    supervisor 3.3.4  
    wheel      0.32.3
    

    设置当前目录的python虚拟环境,pyenv local Supervisor中的Supervisor为虚拟环境名称,这样每次进入该目录就是python2虚拟环境

    (Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# pyenv local Supervisor
    

    生成supervisor配置文件

    # 生成supervisor配置文件,可以直接在当前目录生成,也可以指定位置
    (Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# echo_supervisord_conf > supervisord.conf
    (Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# ls
    supervisord.conf
    

    生成celery.ini配置文件

    我的项目的虚拟环境路径为/root/.pyenv/versions/StarMeow/

    可以通过root@StarMeow-Svr:~# /root/.pyenv/versions/StarMeow/bin/celery -A StarMeow worker -l info测试(虽然启不动)

    项目的路径为/root/django-web/StarMeow

    # 创建celery.ini文件
    (Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# vim celery.ini
    
    # 配置内容
    [program:celery]
    # celery命令的绝对路径
    command=/root/.pyenv/versions/StarMeow/bin/celery -A StarMeow worker -l info
    # 项目路径
    directory=/root/django-web/StarMeow
    # 日志文件路径
    stdout_logfile=/var/log/myweb/celery.log
    # 自动重启
    autorestart=true
    # 如果设置为true,进程则会把标准错误输出到supervisord后台的标准输出文件描述符
    redirect_stderr=true
    

    添加celery配置到supervisord中

    (Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# vim supervisord.conf
    

    修改supervisord.conf文件,在最后增加

    [include]
    files = celery.ini
    

    虚拟环境中启动

    # 以守护进程的形式运行一组应用程序,指定配置文件
    (Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# supervisord -c supervisord.conf
    
    # 查看进程
    (Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# ps -ef | grep supervisord
    root     32133     1  0 15:54 ?        00:00:00 /root/.pyenv/versions/2.7.15/envs/Supervisor/bin/python2.7 /root/.pyenv/versions/Supervisor/bin/supervisord -c supervisord.conf
    root     32212 14833  0 15:54 pts/0    00:00:00 grep supervisord
    
    # 载入最新的配置文件,停止原有进程并按新的配置启动、管理所有进程
    (Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# supervisorctl reload
    
    # 更新新的配置到supervisord,启动新配置或有改动的进程,配置没有改动的进程不会受影响而重启
    (Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# supervisorctl update
    
    # 查看正在守护的进程
    (Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# supervisorctl status
    celery                           RUNNING   pid 32171, uptime 0:15:08
    
    
    # 子进程启停
    (Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# supervisorctl stop celery
    celery: stopped
    (Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# supervisorctl start celery
    celery: started
    
    
    # 所有子进程启停
    (Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# supervisorctl stop all
    celery: stopped
    (Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# supervisorctl start all
    celery: started
    
    

    不进虚拟环境启动

    由于之前是进入虚拟环境中运行supervisord的,那么不进入虚拟环境怎么运行?

    root@StarMeow-Svr:~# /root/.pyenv/versions/2.7.15/envs/Supervisor/bin/supervisord -c /root/django-web/Supervisor/supervisord.conf 
    root@StarMeow-Svr:~# ps -ef | grep supervisord
    root      4140     1  0 16:16 ?        00:00:00 /root/.pyenv/versions/2.7.15/envs/Supervisor/bin/python2.7 /root/.pyenv/versions/2.7.15/envs/Supervisor/bin/supervisord -c /root/django-web/Supervisor/supervisord.conf
    root      4371  2184  0 16:17 pts/1    00:00:00 grep supervisord
    
    

    创建自动重新加载脚本

    root@StarMeow-Svr:~/django-web# vim SvUpdate.sh
    
    echo '正在更新Supervisor配置···'
    
    #! /bin/bash
    cd /root/django-web/Supervisor
    
    supervisorctl update
    

    完成后保存授权

    root@StarMeow-Svr:~/django-web# chmod +x SvUpdate.sh
    
    root@StarMeow-Svr:~/django-web# ./SvUpdate.sh
    正在更新Supervisor配置···
    
    

    业务运用

    访问某个页面获取访问者IP,查询IP归属地,推送QQ消息

    增加tasks.py函数

    from __future__ import absolute_import, unicode_literals
    from celery import shared_task
    
    from StarMeow.celery import app
    
    from utils.ip_attribution import get_ip_data_taobao, get_ip_data_ip138  # 获取IP地址归属地信息
    from app_studio.qqbot.api import qqbot_api_data, qqbot_host_port  # 用于qqbot接口
    
    
    # @app.task
    @ shared_task
    def print_info(info):
        print('正在访问:{}'.format(info))
        return None  # 可以不返回
    
    
    @app.task
    def send_qq_message(ip, article_title, absolute_uri):
        """
        发送QQ消息
        :param ip:
        :param article_title:
        :param absolute_uri:
        :return:
        """
        ip_info1 = get_ip_data_taobao(ip)
        ip_info2 = get_ip_data_ip138(ip)
        message = '又有人访问啦!\nIP:{}\n文章:《{}》({})\n地址参考1:{}\n地址参考2:{}'.format(ip, article_title, absolute_uri, ip_info1, ip_info2)
        qqbot_api_data(qqbot_host_port, '/send_group_msg', group_id=531809487, message=message)
        return '启动异步任务,发送访问信息到QQ'
    

    视图函数中调用

    # 访问写入日志
    if get_request_ip(request) != '127.0.0.1':
        ip = get_request_ip(request)
        user_agent = get_request_agent(request)
        absolute_uri = get_request_uri(request)
        BlogRequestLog.objects.create(ip=ip, user_agent=user_agent, absolute_uri=absolute_uri)
    
        # 异步发送QQ消息
        send_qq_message.delay(ip, article.title, absolute_uri)  # 异步调用
        # send_qq_message(ip, article.title, absolute_uri)  # 相当于直接调用该函数
    
    image.png

    相关文章

      网友评论

        本文标题:使用Celery+Redis实现异步任务,supervisor守

        本文链接:https://www.haomeiwen.com/subject/atnhcqtx.html