消息队列选择
这里对之前celery异步任务的使用做个总结,在生产环境使用celery时,最好选择rabbitmq作为消息队列更为稳定,测试时也可以使用redis,简单快捷。
RabbitMQ架构
Rabbitmq架构示意图基于AMQP协议,使用Channel进行连接和数据传输
- client客户端
- 生产者Publisher:负责产出任务,写入指定的消息队列中;
- 消费者Consumer:负责消费任务,从关联的消息队列中获取任务并执行;
- server服务端(broker)
Rabbitmq,其内部为可以为不同用户划分不同的Virtual Host,每个Virtual Host内包含交换机和队列两部分。默认只有一个Virtual Host。- 交换机:用于按规则分发任务给指定队列,生产者可方便的只与交换机进行关联;
- 队列:用于存放任务,一个rabbitmq中可以有多个消息队列。
使用docker安装rabbitmq
rabbitmq的正产安装比较麻烦,所以这里选择使用docker进行安装。
- 拉取镜像
sudo docker pull rabbitmq:management
如果速度很慢,可以去阿里云:https://cr.console.aliyun.com/
免费注册并获取镜像加速器,配置方法很简单,官方有说明。
- 生成容器并运行
sudo docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management
- RabbitMQ可视化管理
- 启动可视化管理界面:
sudo rabbitmq-plugins enable rabbitmq_management
,使用http://server-ip:15672
访问界面 - 创建普通用户:
sudo rabbitmqctl add_user [username] [password]
- 设置用户为管理员:
sudo rabbitmqctl set_user_tags [username] administrator
- 用户添加管理权限:
sudo rabbitmqctl set_permissions -p /[username] '.''.''.*'
- Rabbitmq相关命令
- 查看用户:
sudo rabbitmqctl list_users
- 查看队列:
sudo rabbitmqctl list_queues
- 查看交换机:
sudo rabbitmqctl list_exchanges
- 查看连接:
sudo rabbitmqctl list_connections
- 查看消费者:
sudo rabbitmqctl list_consumers
-
管理界面测试
在宿主机上访问:127.0.0.1:15672管理界面,默认可使用账号密码是guest,能访问到证明安装成功
image.png
Celery工作流
Celery工作流- Task
由我们自己编写的需异步执行的任务函数,也可以是定时任务 - Broker
中间人,接收生产者发来的消息即Task,将任务存入队列。任务的消费者是Worker。Celery本身不提供队列服务,推荐用Redis或RabbitMQ实现队列服务,但是Redis有丢任务的风险。 - Worker
消费者,执行任务的单元,它实时监控消息队列,如果有任务就获取任务并执行它。 - Beat
定时任务调度器,根据配置定时将任务发送给Broler。 - Backend
用于存储任务的执行结果。
创建Celery实例
在tasks.py文件中创建Celery实例,broker为中间人,用来存放任务,可使用rabbitmq
和redis
;backend用于储存任务执行的结果,这里也可不指定。
from celery import Celery
imort time
# 实例化Celery对象,指定rabbitmq
app = Celery("task1", broker="amqp://",backend="rpc://")
@app.task
def add(x, y):
print("计算2个值的和: %s %s" % (x, y))
return x+y
@app.task
def multi(x,y):
#模拟阻塞任务
time.sleep(3)
print("发送短信成功")
return x * y
-
Django中的处理
在Django中使用的话需要使用Django的配置环境,在创建celery实例之前,先在其上方使用以下配置为celery程序设置默认的Django设置模块,后面是你的设置文件。
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dailyfresh.settings')
启动celery worker监听
若不需要创建日志文件时,按如下启动worker监听:
切换到task1所在目录,使用命令开启监听:task1为创建实例时指定的名字
celery -A task1 worker --loglevel=info
配置日志文件
若需要独立的日志文件记录Celery运行状态,需要使用celery自带的日志记录器。
- 设置celery日志记录器
from celery.utils.log import get_task_logger
logger = get_task_logger('mycelery')
logger.info('Refresh task start and refresh success')
并在启动celery worker的时候指定日志存放位置,不指定的情况将会默认生成在/var/lib/docker/目录。
- 启动celery worker命令
celery -A task worker --loglevel=info -f [存放日志的绝对路径]
调用任务,异步执行
在其他地方导入celery实例,调用任务,该任务将以非阻塞的方式异步执行。
from tasks import add, multi
print('开始')
add.delay(1,2)
multi.delay(4,4)
print('结束')
-
运行情况
你会发现,在主程序窗口,程序没有任何阻塞的瞬间运行完成,在任务函数中的time.sleep(3)并没有使主程序阻塞。这里的代码只是将任务发布到了消息中间件中,是不能直接获取执行结果的,执行结果保存到了你指定的backend中。
主程序运行情况
再看下后台监听程序:
worker监听
你可以发现,celery自动在后台完成了任务,原本会阻塞主程序的任务在后台以异步的方式完成了。
-
获取执行结果
当然,若你想在当前位置获取执行结果也是可以的,可以使用get()方法获取,不过这会阻塞主程序,没有意义。也可使用ready()非阻塞方法判断任务是否执行完成。
定时任务
在celery4版本中,定时任务可以像普通任务一样单独定义,这里需要用到@app.on_after_configure.connect
装饰器,详细情况有兴趣可以单独查找,网上资料很多,个人使用不多,因为在某些场景,celery的定时任务不能动态添加。对于定时任务更倾向于使用APScheduler,在框架中使用更方便简单。
使用场景
你能想到在某些特殊场景下,这种任务队列的机制是很有用的。比如邮件、短信发送等任务,需要调用第三方进行,而这些任务的耗时情况使我们不能控制的,为了不阻塞主程序,可以直接使用Celery+Rabbitmq或者Redis,将这些任务直接扔到消息队列中,让他们以异步的方式自动调用执行。
最后
这里这是对celery的最基本实用做了介绍,需要了解更多功能可以查看更多其他资料,比如任务重试、异常处理、返回结果获取等。需要注意的是,如果修改了任务内容,需要重启worker监听和IDE再执行才能生效。
网友评论