背景
RabbitMQ是用Erlang实现的高并发,高可靠AMQP消息队列服务器
AMQP定义了一套消息系统规范,这个规范描述了在一个分布式的系统中各个子系统如何通过消息交互。子系统之间不再相互依赖,子系统仅依赖于消息,不关心消息的发送者和接受者。
AMQP
AMQP是一个面向消息中间件的开放式标准应用层协议,它定义了以下特性
- 消息方向
- 消息队列
- 消息路由
- 可靠性
- 安全性
RabbitMQ基本原理
RabbitMQ中的基本概念:
- ConnectionFactory:与RabbitMQ服务器连接的管理器
- Connection:与RabbitMQ服务器的TCP连接
- Channel:一个Connection可以包含多个Channel。因为tcp连接的建立是释放都是昂贵的,为了多路复用,应该尽量共用Channel
- Exchange:接收生产者的消息,并按照一定的规则路由给服务器中的队列
- Message Queue:消息队列,用于存储还未被消费者消费的消息
- RoutingKey:指定当前消息被谁接受
Exchage类型
在上面提到了Exchange,RabbitMq提供了四种Exchange:fanout,direct,topic,header,我们只讨论前三种,性能:fanout>direct>topic
- Fanout Exchage:不处理路由键,任何发送到Fanout Exchage上的消息都会被转发到与该Exchange绑定的多个Queue上,不需要指定RoutingKey
- Direct Exchage:这种模式下不需要Exchage进行任何绑定操作,消息传递时只需要一个RoutingKey,即只需要知道消息被谁接收
- Topic Exchage:这种模式较为复杂,简单来说就是每个队列都有其关心的主题,所有消息都带有一个标题(RouteKey),Exchage会把消息转发到所有关注主题与RouteKey模糊匹配的队列
消息队列常见应用场景
- 异步处理
- 流量削峰
- 日志处理
- 应用解耦
分发策略
默认情况下,rabbitmq会按顺序分发消息给下一个消费者,平均而言,每个消费者会得到相同数量的消息,这种发消息的策略称为轮询分发策略
消息应答
如果一个消费者执行了一个很长的任务只执行了一部分就死掉了,我们不想丢失掉那些还没有处理完成的消息;rabbitmq支持了消息应答,当消息被消费者接受并处理完成之后,消费者会向rabbitmq发送应答,rabbitmq收到应答后才会删除掉这条消息。如果一个消费者死掉了,这样就没有应答,rabbitmq会理解为消息没有被处理完毕,它会重新将消息扔回到队列中。
消息持久化
上面讲解了如何确保一个消费者在死掉之后任务不会丢失。但是我们的任务在rabbitmq停掉之后仍会丢失,当rabbitmq退出或挂掉之后,它会遗忘掉队列和队列中的消息。我们要将Exchange和消息队列都设为持久化的
- 设置Exchange和MessageQueue的durable属性为true进行持久化
- 消息什么时候需要持久化:1.消息本身在publish的时候就要求写入磁盘 2.内存紧张,需要将部分内存中的消息转移到磁盘
将消息标记为持久化并不能完全保证消息不会丢失。这里仍然会有一个短时间的错失窗口:rabbitmq已经接受消息但是还没对他进行保存。而且rabbitmq可能仅仅把消息保存到缓存中并不是真正的写入到磁盘中。
公平调度
有的消息很重量级,有的消息是轻量级的。负责重量级的会一直忙碌不停,负责轻量级的基本不工作。这种情况发生的原因在于rabbitmq在一个消息进入队列之后仅仅调度消息,仅仅盲目的每n个消息轮询给每n个消费者。为了解决这种问题,我们可以在消费者处理并应答上条消息之前不要为其分发新的消息
事务
Rabbitmq提供了txRollback()命令用于回滚某个事务
生产者消息投递可靠性-Confirm机制
rabbitmq提供了一个轻量级的的机制来保证生产者可以感知消息已经被路由到正确的队列中-Confirm
如果设置channel为confirm状态,则通过该channel发送的消息都会被分配一个唯一的ID,一旦该消息被正确路由到队列中后就返回给生产者一个confirm,该confirm包含了这个ID,这样就知道消息被正确分发。对于持久化消息,只有在消息被持久化后才返回confirm
confirm最大的优点在于异步,生产者在发送消息后就可以执行其他任务,而服务器返回confirm之后会触发生产者的回调函数去处理confirm消息。如果消息服务器发生异常导致该消息丢失,会返回给生产者一个nack,表示消息已经丢失,这样生产者就可以重发消息。
网友评论