个人博客,欢迎查看:https://blog.starmeow.cn/
使用Celery+Redis实现异步任务,supervisor守护进程运行
http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html#using-celery-with-django
Django2.1.3+Celery4.2.1+Redis2.10.6(如果用3的redis会报错:AttributeError: 'float' object has no attribute 'items')+Redis-Server
settings.py中配置
# Celery+Redis实现异步任务,首先需要pip install celery
# 配置Broker
# BROKER_URL = 'redis://localhost:63079/3' # 无密码情况
BROKER_URL = 'redis://:password@127.0.0.1:63079/3'
BROKER_TRANSPORT = 'redis'
CELERY_ACCEPT_CONTENT = ['pickle', 'json'] # 启用实际使用的序列化程序,不加似乎会出现警告
CELERYD_MAX_TASKS_PER_CHILD = 40 # 设置每个worker执行了多少任务就会死掉,长时间运行Celery有可能发生内存泄露
创建celery.py模块
项目中和settings.py同级目录下创建celery.py模块,用于定义Celery实例
然后编辑
# StarMeow/StarMeow/celery.py
from __future__ import absolute_import, unicode_literals
import os
import django
from celery import Celery, platforms
from django.conf import settings
# 为“celery”程序设置默认的Django settings模块。
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'StarMeow.settings')
django.setup() # 使用settings.py中的相关配置,不加这个,就会提示app参数不够
# 如果不加django.setup(),下面就需要使用app = Celery('StarMeow', broker='...broker配置地址...')
app = Celery('StarMeow')
# 在这里使用字符串意味着worker不必序列化子进程的配置对象。
# namespace ='CELERY'表示所有与celery相关的配置键应该有一个`CELERY_`前缀,
# 例如settings.py中的CELERY_BROKER_URL,这个就需要加CELERY_前缀,不定义namespace,则不加前缀。
app.config_from_object('django.conf:settings', namespace='CELERY')
# 从所有注册的Django app configs加载任务模块。
# app.autodiscover_tasks()
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) # 也可以使用这行,只添加INSTALLED_APPS中的app
platforms.C_FORCE_ROOT = True # Linux在root用户下不能启动,需要添加这一行
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
分析:
-
from __future__ import absolute_import
:从绝对路径中导入的celery.py模块不会与库冲突 -
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'StarMeow.settings')
:为celery命令行程序设置默认的DJANGO_SETTINGS_MODULE
环境变量 -
app.autodiscover_tasks()
:可重用应用程序的常见实践是在单独的任务中定义所有tasks.py模块,Celery有一种自动发现这些模块的方法。
导入Celery应用程序
# StarMeow/StarMeow/__init__.py
from __future__ import absolute_import, unicode_literals
# 然后需要在StarMeow/StarMeow/__init__.py模块中导入这个应用程序。
# 这样可以确保Django启动时加载应用程序,以便@shared_task装饰器使用它
from .celery import app as celery_app
__all__ = ('celery_app',)
应用下创建tasks.py
使用@shared_task
装饰器
编写的任务可能存在于可重用的应用程序中,而可重用的应用程序不能依赖于项目本身,因此也不能直接导入应用程序实例。
@shared_task
装饰器允许您创建任务,而不需要任何具体的应用程序实例
在blog应用下创建tasks.py文件,写入
from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task
def print_info():
print('正在进入blog应用的tasks.py')
return None # 可以不返回
另一种运行方式
from StarMeow.celery import app
@app.task
def print_info():
print('正在进入blog应用的tasks.py')
return None # 可以不返回
settings.py增加这个参数,Celery就会忽略全部调度机制,立即调用你的代码。
CELERY_ALWAYS_EAGER = True
这样print_info()
和print_info.delay()
效果一样
启动celery
测试方法: https://github.com/celery/celery/tree/master/examples/django/
进入项目所在目录,启动虚拟环境,启动worker
Windows下启动
C:\Users\LR>cd E:\Sync\OneDrive\PycharmProjects\StarMeow
C:\Users\LR>E:
E:\Sync\OneDrive\PycharmProjects\StarMeow>workon StarMeow
(StarMeow) E:\Sync\OneDrive\PycharmProjects\StarMeow>celery -A StarMeow worker -l info
image.png
视图里面添加运行
class IndexView(View):
def get(self, request, **kwargs):
print_info.delay()
# 省略
然后访问
image.png这个问题网上都有解决方案,使用一个最简单的方案,更改运行方式,访问就正常了
celery -A StarMeow worker --pool=solo -l info
image.png
Linux下启动celery
celery不能用root用户启动问题 C_FORCE_ROOT environment
(StarMeow) root@StarMeow-Svr:~/django-web/StarMeow# celery -A StarMeow worker -l info
/root/django-web/StarMeow/media/schedule/exportfiles/
Running a worker with superuser privileges when the
worker accepts messages serialized with pickle is a very bad idea!
If you really want to continue then you have to set the C_FORCE_ROOT
environment variable (but please think about this before you do).
User information: uid=0 euid=0 gid=0 egid=0
解决办法,在settings.py同级下的celery.py文件中增加
from celery import Celery, platforms
platforms.C_FORCE_ROOT = True #加上这一行
image.png
使用flower检测队列运行情况
Windows下进行的
pip install flower
(StarMeow) E:\Sync\OneDrive\PycharmProjects\StarMeow>celery flower -A StarMeow
进入 http://localhost:5555 即可查看
celery+supervisor(后台进程)
配置python2虚拟环境
每个人虚拟环境不同,需要自行处理。
pip install supervisor # python3不能使用
使用python2完成
root@StarMeow-Svr:~/django-web# pyenv install --list
Available versions:
# ...
2.7.15
# ...
root@StarMeow-Svr:~/django-web# pyenv install 2.7.15
root@StarMeow-Svr:~/django-web# pyenv versions
system
2.7.15
* 3.6.6 (set by /root/.pyenv/version)
3.6.6/envs/StarMeow
StarMeow
root@StarMeow-Svr:~/django-web# pyenv virtualenv 2.7.15 Supervisor
Collecting virtualenv
Downloading http://mirrors.tencentyun.com/pypi/packages/7c/17/9b7b6cddfd255388b58c61e25b091047f6814183e1d63741c8df8dcd65a2/virtualenv-16.1.0-py2.py3-none-any.whl (1.9MB)
100% |████████████████████████████████| 1.9MB 55.8MB/s
Installing collected packages: virtualenv
Successfully installed virtualenv-16.1.0
You are using pip version 9.0.3, however version 18.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.
New python executable in /root/.pyenv/versions/2.7.15/envs/Supervisor/bin/python2.7
Also creating executable in /root/.pyenv/versions/2.7.15/envs/Supervisor/bin/python
Please make sure you remove any previous custom paths from your /root/.pydistutils.cfg file.
Installing setuptools, pip, wheel...
done.
Requirement already satisfied: setuptools in /root/.pyenv/versions/2.7.15/envs/Supervisor/lib/python2.7/site-packages
Requirement already satisfied: pip in /root/.pyenv/versions/2.7.15/envs/Supervisor/lib/python2.7/site-packages
root@StarMeow-Svr:~/django-web# pyenv virtualenvs
2.7.15/envs/Supervisor (created from /root/.pyenv/versions/2.7.15)
3.6.6/envs/StarMeow (created from /root/.pyenv/versions/3.6.6)
StarMeow (created from /root/.pyenv/versions/3.6.6)
Supervisor (created from /root/.pyenv/versions/2.7.15)
root@StarMeow-Svr:~/django-web# pyenv versions
system
2.7.15
2.7.15/envs/Supervisor
* 3.6.6 (set by /root/.pyenv/version)
3.6.6/envs/StarMeow
StarMeow
Supervisor
# 进入虚拟环境,激活
root@StarMeow-Svr:~/django-web# mkdir Supervisor
root@StarMeow-Svr:~/django-web# cd Supervisor/
root@StarMeow-Svr:~/django-web/Supervisor# pyenv activate Supervisor
(Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# pip list
Package Version
---------- -------
pip 18.1
setuptools 40.6.2
wheel 0.32.3
(Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# pip install supervisor
(Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# pip list
Package Version
---------- -------
meld3 1.0.2
pip 18.1
setuptools 40.6.2
supervisor 3.3.4
wheel 0.32.3
设置当前目录的python虚拟环境,pyenv local Supervisor
中的Supervisor为虚拟环境名称,这样每次进入该目录就是python2虚拟环境
(Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# pyenv local Supervisor
生成supervisor配置文件
# 生成supervisor配置文件,可以直接在当前目录生成,也可以指定位置
(Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# echo_supervisord_conf > supervisord.conf
(Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# ls
supervisord.conf
生成celery.ini配置文件
我的项目的虚拟环境路径为/root/.pyenv/versions/StarMeow/
可以通过root@StarMeow-Svr:~# /root/.pyenv/versions/StarMeow/bin/celery -A StarMeow worker -l info
测试(虽然启不动)
项目的路径为/root/django-web/StarMeow
# 创建celery.ini文件
(Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# vim celery.ini
# 配置内容
[program:celery]
# celery命令的绝对路径
command=/root/.pyenv/versions/StarMeow/bin/celery -A StarMeow worker -l info
# 项目路径
directory=/root/django-web/StarMeow
# 日志文件路径
stdout_logfile=/var/log/myweb/celery.log
# 自动重启
autorestart=true
# 如果设置为true,进程则会把标准错误输出到supervisord后台的标准输出文件描述符
redirect_stderr=true
添加celery配置到supervisord中
(Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# vim supervisord.conf
修改supervisord.conf文件,在最后增加
[include]
files = celery.ini
虚拟环境中启动
# 以守护进程的形式运行一组应用程序,指定配置文件
(Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# supervisord -c supervisord.conf
# 查看进程
(Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# ps -ef | grep supervisord
root 32133 1 0 15:54 ? 00:00:00 /root/.pyenv/versions/2.7.15/envs/Supervisor/bin/python2.7 /root/.pyenv/versions/Supervisor/bin/supervisord -c supervisord.conf
root 32212 14833 0 15:54 pts/0 00:00:00 grep supervisord
# 载入最新的配置文件,停止原有进程并按新的配置启动、管理所有进程
(Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# supervisorctl reload
# 更新新的配置到supervisord,启动新配置或有改动的进程,配置没有改动的进程不会受影响而重启
(Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# supervisorctl update
# 查看正在守护的进程
(Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# supervisorctl status
celery RUNNING pid 32171, uptime 0:15:08
# 子进程启停
(Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# supervisorctl stop celery
celery: stopped
(Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# supervisorctl start celery
celery: started
# 所有子进程启停
(Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# supervisorctl stop all
celery: stopped
(Supervisor) root@StarMeow-Svr:~/django-web/Supervisor# supervisorctl start all
celery: started
不进虚拟环境启动
由于之前是进入虚拟环境中运行supervisord
的,那么不进入虚拟环境怎么运行?
root@StarMeow-Svr:~# /root/.pyenv/versions/2.7.15/envs/Supervisor/bin/supervisord -c /root/django-web/Supervisor/supervisord.conf
root@StarMeow-Svr:~# ps -ef | grep supervisord
root 4140 1 0 16:16 ? 00:00:00 /root/.pyenv/versions/2.7.15/envs/Supervisor/bin/python2.7 /root/.pyenv/versions/2.7.15/envs/Supervisor/bin/supervisord -c /root/django-web/Supervisor/supervisord.conf
root 4371 2184 0 16:17 pts/1 00:00:00 grep supervisord
创建自动重新加载脚本
root@StarMeow-Svr:~/django-web# vim SvUpdate.sh
echo '正在更新Supervisor配置···'
#! /bin/bash
cd /root/django-web/Supervisor
supervisorctl update
完成后保存授权
root@StarMeow-Svr:~/django-web# chmod +x SvUpdate.sh
root@StarMeow-Svr:~/django-web# ./SvUpdate.sh
正在更新Supervisor配置···
业务运用
访问某个页面获取访问者IP,查询IP归属地,推送QQ消息
增加tasks.py函数
from __future__ import absolute_import, unicode_literals
from celery import shared_task
from StarMeow.celery import app
from utils.ip_attribution import get_ip_data_taobao, get_ip_data_ip138 # 获取IP地址归属地信息
from app_studio.qqbot.api import qqbot_api_data, qqbot_host_port # 用于qqbot接口
# @app.task
@ shared_task
def print_info(info):
print('正在访问:{}'.format(info))
return None # 可以不返回
@app.task
def send_qq_message(ip, article_title, absolute_uri):
"""
发送QQ消息
:param ip:
:param article_title:
:param absolute_uri:
:return:
"""
ip_info1 = get_ip_data_taobao(ip)
ip_info2 = get_ip_data_ip138(ip)
message = '又有人访问啦!\nIP:{}\n文章:《{}》({})\n地址参考1:{}\n地址参考2:{}'.format(ip, article_title, absolute_uri, ip_info1, ip_info2)
qqbot_api_data(qqbot_host_port, '/send_group_msg', group_id=531809487, message=message)
return '启动异步任务,发送访问信息到QQ'
视图函数中调用
# 访问写入日志
if get_request_ip(request) != '127.0.0.1':
ip = get_request_ip(request)
user_agent = get_request_agent(request)
absolute_uri = get_request_uri(request)
BlogRequestLog.objects.create(ip=ip, user_agent=user_agent, absolute_uri=absolute_uri)
# 异步发送QQ消息
send_qq_message.delay(ip, article.title, absolute_uri) # 异步调用
# send_qq_message(ip, article.title, absolute_uri) # 相当于直接调用该函数
image.png
网友评论