消息分发
消费者客户端可以通过推模式和拉模式来进行消息消费。
当rabbitmq队列有多个消费者时,队列收到的消息将以轮询(round-robin)的分发方式发送给消费者。每条消息只会发送给订阅列表里的一个消费者。如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可。
很多时候轮询的分发机制也不是那么优雅。默认情况下,如果有n个消费者,那么RabbitMQ会将第m条消息分发给第m%n(取余的方式)个消费者,RabbitMQ不管消费者是否消费并已经确认(Basic.Ack)了消息。试想一下,如果某些消费者任务繁重,来不及消费那么多的消息,而某些其他消费者由于某些原因(比如业务逻辑简单、机器性能卓越等)很快地处理完了所分配到的消息,进而进程空闲,这样就会造成整体应用吞吐量的下降。
可以使用channel.basicQos()方法允许限制信道上的消费者所能保持的最大未确认消息的数量。
比如在订阅消费队列之前,消费端程序调用了channel.basicQos(5),之后订阅了某个队列进行消费。RabbitMQ 会保存一个消费者的列表,每发送一条消息都会为对应的消费者计数,如果达到了所设定的上限,那么 RabbitMQ 就不会向这个消费者再发送任何消息。直到消费者确认了某条消息之后 RabbitMQ 将相应的计数减1 ,之后消费者可以继续接收消息,直到再次到达计数上限。
Basic.Qos 的使用对于拉模式的消费方式无效.
对于channel.basicQos还有要特别注意的一点是它的重载方法
void basicQos(int prefetchCount, boolean global) throws IOException;
global为true的时候,是该信道上所有的消费者未确认的消息数上限总和
为false的时候是针对单个消费者
比如:一个信道设置了
channel.basicQos(3, false);
channel.basicQos(5, true);
那么这里每个消费者最多只能收到3个未确认的消息,两个消费者能收到的未确认的消息个数之和的上限为5。
如无特殊需要,最好只使用global=false 设置,这也是默认的设置。
消息顺序性
消息的顺序性是指消费者消费到的消息和发送者发布的消息的顺序是一致的。
如果生产者发布的消息分别为 msg1 msg2 msg3 ,那么消费者必然也是按照 msg1 msg2 msg3 的顺序进行消费的。
rabbitmq不会严格保证消息的顺序性
在以下情况下消息的顺序性会被打破:
1.如果生产者使用了事务机制,在发送消息之后遇到异常进行了事务回滚,那么需要重新补偿发送这条消息,如果补偿发送是在另一个线程实现的,那么消息在生产者这个源头就出现了错序。
2.同样,如果启用 publisher confirm时,在发生超时、中断,又或者是收到RabbitMQ的 Basic.Nack命令时,那么同样需要补偿发送,结果与事务机制一样会错序。或者这种说法有些牵强,我们可以固执地认为消息的顺序性保障是从存入队列之后开始的,而不是在发送的时候。
3.考虑另一种情形,如果生产者发送的消息设置了不同的超时时间,并且也设置了死信队列,整体上来说相当于一个延迟队列,那么消费者在消费这个延迟队列的时候,消息的顺序必然不会和生产者发送消息的顺序一致。
4.如果消息设置了优先级,那么消费者消费到的消息也必然不是顺序性的
5.使用Basic.Nack/.Reject将消息拒绝,比如:
如果一个队列按照前后顺序分有msg1、msg2、msg3、msg4这4个消息,同时有ConsumerA和 ConsumerB 这两个消费者同时订阅了这个队列。队列中的消息轮询分发到各个消费者之中, ConsumerA 中的消息为 msg1和msg3,ConsumerB中的消息为msg2、msg4。ConsumerA收到消息 msg1 之后并不想处理而调用了Basic.Nack/.Reject 将消息拒绝,与此同时将 requeue 设置为 true,这样这条消息就可以重新存入队列中。消息 msg1之后被发送到了 ConsumerB中,此时 ConsumerB已经消费了msg2、msg4,之后再消费 msg1,这样消息顺序性也就错乱了。或者消息 msg1 又重新发往 ConsumerA中,此时 ConsumerA已经消费了msg3,那么再消费 msg1,消息顺序性也无法得到保障。
如果要保证消息的顺序性,需要业务方使用 RabbitMQ之后做进一步的处理,比如在消息体内添加全局有序标识(类似 SequenceID )来实现
放弃QueueingConsumer
QueueingConsumer容易造成内存溢出问题,在客户端调用channel.basicConsume方法订阅队列的时候,RabbitMQ 会持续地将消息发往QueueingConsumer 中, QueueingConsumer内部使用 LinkedBlockingQueue来缓存这些消息。
QueueingConsumer 还包含(但不仅限于)以下一些缺陷∶
- QueueingConsumer会拖累同一个Connection下的所有信道,使其性能降低;
- 同步递归调用 QueueingConsumer 会产生死锁;
- RabbitMQ 的自动连接恢复机制(automatic conection recovery)不支持 QueueingConsumer的这种形式;
- QueueingConsumer不是事件驱动的。
尽量使用继承DefaultConsumer的方式消费
消息传输保证
一般消息中间件的消息传输保障分为三个层级。
At most once :最多一次。消息可能会丢失,但绝不会重复传输。
At least once :最少一次。消息绝不会丢失,但可能会重复传输。
Exactly once :恰好一次。每条消息肯定会被传输一次且仅传输一次。
rabbitmq支持最少一次和最多一次。
最少一次:
(1)消息生产者需要开启事务机制或者 publisher confim 机制,以确保消息可以可靠地传输到RabbitMQ中。
(2)消息生产者需要配合使用 mandatory参数或者备份交换器来确保消息能够从交换器路由到队列中,进而能够保存下来而不会被丢弃。
(3)消息和队列都需要进行持久化处理,以确保 RabbitMQ 服务器在遇到异常情况时不会造成消息丢失
(4)消费者在消费消息的同时需要将 autoAck 设置为 false,然后通过手动确认的方式去确认已经正确消费的消息,以避免在消费端引起不必要的消息丢失。
最多一次:
就无须考虑以上那些方面,生产者随意发送,消费者随意消费,不过这样很难确保消息不会丢失
恰好一次:
恰好一次是RabbitMQ目前无法保障的。
网友评论