美文网首页
使用Docke配置Rabbitmq及Celery的使用

使用Docke配置Rabbitmq及Celery的使用

作者: 越大大雨天 | 来源:发表于2019-08-04 15:09 被阅读0次

消息队列选择

这里对之前celery异步任务的使用做个总结,在生产环境使用celery时,最好选择rabbitmq作为消息队列更为稳定,测试时也可以使用redis,简单快捷。

RabbitMQ架构

Rabbitmq架构示意图

基于AMQP协议,使用Channel进行连接和数据传输

  • client客户端
    1. 生产者Publisher:负责产出任务,写入指定的消息队列中;
    2. 消费者Consumer:负责消费任务,从关联的消息队列中获取任务并执行;
  • server服务端(broker)
    Rabbitmq,其内部为可以为不同用户划分不同的Virtual Host,每个Virtual Host内包含交换机和队列两部分。默认只有一个Virtual Host。
    1. 交换机:用于按规则分发任务给指定队列,生产者可方便的只与交换机进行关联;
    2. 队列:用于存放任务,一个rabbitmq中可以有多个消息队列。

使用docker安装rabbitmq

rabbitmq的正产安装比较麻烦,所以这里选择使用docker进行安装。

  • 拉取镜像
sudo docker pull rabbitmq:management

如果速度很慢,可以去阿里云:https://cr.console.aliyun.com/
免费注册并获取镜像加速器,配置方法很简单,官方有说明。

  • 生成容器并运行
sudo docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management
  • RabbitMQ可视化管理
  1. 启动可视化管理界面:sudo rabbitmq-plugins enable rabbitmq_management,使用http://server-ip:15672访问界面
  2. 创建普通用户:sudo rabbitmqctl add_user [username] [password]
  3. 设置用户为管理员:sudo rabbitmqctl set_user_tags [username] administrator
  4. 用户添加管理权限:sudo rabbitmqctl set_permissions -p /[username] '.''.''.*'
  • Rabbitmq相关命令
  1. 查看用户:sudo rabbitmqctl list_users
  2. 查看队列:sudo rabbitmqctl list_queues
  3. 查看交换机:sudo rabbitmqctl list_exchanges
  4. 查看连接:sudo rabbitmqctl list_connections
  5. 查看消费者:sudo rabbitmqctl list_consumers
  • 管理界面测试
    在宿主机上访问:127.0.0.1:15672管理界面,默认可使用账号密码是guest,能访问到证明安装成功


    image.png

Celery工作流

Celery工作流
  • Task
    由我们自己编写的需异步执行的任务函数,也可以是定时任务
  • Broker
    中间人,接收生产者发来的消息即Task,将任务存入队列。任务的消费者是Worker。Celery本身不提供队列服务,推荐用Redis或RabbitMQ实现队列服务,但是Redis有丢任务的风险。
  • Worker
    消费者,执行任务的单元,它实时监控消息队列,如果有任务就获取任务并执行它。
  • Beat
    定时任务调度器,根据配置定时将任务发送给Broler。
  • Backend
    用于存储任务的执行结果。

创建Celery实例

在tasks.py文件中创建Celery实例,broker为中间人,用来存放任务,可使用rabbitmqredis;backend用于储存任务执行的结果,这里也可不指定。

from celery import Celery
imort time
# 实例化Celery对象,指定rabbitmq
app = Celery("task1", broker="amqp://",backend="rpc://")
@app.task
def add(x, y):
    print("计算2个值的和: %s %s" % (x, y))
    return x+y

@app.task
def multi(x,y):
    #模拟阻塞任务
    time.sleep(3)
    print("发送短信成功")
    return x * y
  • Django中的处理
    在Django中使用的话需要使用Django的配置环境,在创建celery实例之前,先在其上方使用以下配置为celery程序设置默认的Django设置模块,后面是你的设置文件。
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dailyfresh.settings')

启动celery worker监听

若不需要创建日志文件时,按如下启动worker监听:
切换到task1所在目录,使用命令开启监听:task1为创建实例时指定的名字

celery -A task1 worker --loglevel=info 

配置日志文件

若需要独立的日志文件记录Celery运行状态,需要使用celery自带的日志记录器。

  • 设置celery日志记录器
from celery.utils.log import get_task_logger
logger = get_task_logger('mycelery')

logger.info('Refresh task start and refresh success')

并在启动celery worker的时候指定日志存放位置,不指定的情况将会默认生成在/var/lib/docker/目录。

  • 启动celery worker命令
celery -A task worker --loglevel=info -f [存放日志的绝对路径]

调用任务,异步执行

在其他地方导入celery实例,调用任务,该任务将以非阻塞的方式异步执行。

from tasks import add, multi
print('开始')
add.delay(1,2)
multi.delay(4,4)
print('结束')
  • 运行情况
    你会发现,在主程序窗口,程序没有任何阻塞的瞬间运行完成,在任务函数中的time.sleep(3)并没有使主程序阻塞。这里的代码只是将任务发布到了消息中间件中,是不能直接获取执行结果的,执行结果保存到了你指定的backend中。
    主程序运行情况
    再看下后台监听程序:
    worker监听

你可以发现,celery自动在后台完成了任务,原本会阻塞主程序的任务在后台以异步的方式完成了。

  • 获取执行结果
    当然,若你想在当前位置获取执行结果也是可以的,可以使用get()方法获取,不过这会阻塞主程序,没有意义。也可使用ready()非阻塞方法判断任务是否执行完成。

定时任务

在celery4版本中,定时任务可以像普通任务一样单独定义,这里需要用到@app.on_after_configure.connect装饰器,详细情况有兴趣可以单独查找,网上资料很多,个人使用不多,因为在某些场景,celery的定时任务不能动态添加。对于定时任务更倾向于使用APScheduler,在框架中使用更方便简单。

使用场景

你能想到在某些特殊场景下,这种任务队列的机制是很有用的。比如邮件、短信发送等任务,需要调用第三方进行,而这些任务的耗时情况使我们不能控制的,为了不阻塞主程序,可以直接使用Celery+Rabbitmq或者Redis,将这些任务直接扔到消息队列中,让他们以异步的方式自动调用执行。

最后

这里这是对celery的最基本实用做了介绍,需要了解更多功能可以查看更多其他资料,比如任务重试、异常处理、返回结果获取等。需要注意的是,如果修改了任务内容,需要重启worker监听和IDE再执行才能生效。

相关文章

网友评论

      本文标题:使用Docke配置Rabbitmq及Celery的使用

      本文链接:https://www.haomeiwen.com/subject/sxqydctx.html