可靠消息
- confirm
-
当所有的QUEUE都受到消息后,如果是durable QUEUE消息会持久化到磁盘,回调发送方
For unroutable messages, the broker will issue a confirm once the exchange verifies a message won't route to any queue (returns an empty list of queues). If the message is also published as mandatory, the basic.return is sent to the client before basic.ack. The same is true for negative acknowledgements (basic.nack).
For routable messages, the basic.ack is sent when a message has been accepted by all the queues. For persistent messages routed to durable queues, this means persisting to disk. For mirrored queues, this means that all mirrors have accepted the message.
-
设置spring.rabbitmq.publisher-confirms=true
-
需要自定义RabbitTemplate,指定 ConfirmCallback逻辑:
new RabbitTemplate().setConfirmCallback((correlationData, ack, cause)->{ System.out.println(correlationData.getId()); });
-
可以参考AsyncRabbitTemplate#confirm实现
-
spring.rabbitmq.template.retry.* 重试只实现了发送到Broker的重试,幷不能保证收到confirm
-
如果不考虑发送方应用挂掉的情况,请使用AsyncRabbitTemplate实现可靠提交,AsyncRabbitTemplate实现了confirm超时功能
-
如果考虑发送方应用挂掉的情况,必须在流程中的某处保存状态到数据库。根据数据库状态,定时发起重试。
-
- return
-
如果消息无法正常路由,会回调发送方的RabbitTemplate.ReturnCallback
-
设置spring.rabbitmq.publisher-returns=true
-
设置spring.rabbitmq.template.mandatory=true
-
需要自定义RabbitTemplate,指定 ReturnCallback逻辑:
new RabbitTemplate().setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{ System.out.println(message.getMessageProperties().getCorrelationId()); });
-
可以参考AsyncRabbitTemplate#returnedMessage实现
-
- transaction
- 事务功能需要非常多的交互,性能比较差
-
不能与数据库事务同步,不具备分布式事务功能
-
ack
- spring 定义了三种AcknowledgeMode, 默认是AUTO
public enum AcknowledgeMode { /** * No acks - {@code autoAck=true} in {@code Channel.basicConsume()}. */ NONE, /** * Manual acks - user must ack/nack via a channel aware listener. */ MANUAL, /** * Auto - the container will issue the ack/nack based on whether * the listener returns normally, or throws an exception. * <p><em>Do not confuse with RabbitMQ {@code autoAck} which is * represented by {@link #NONE} here</em>. */ AUTO; }
- NONE 就是Rabbit自动确认。MQ发给消费者,不需要ACK。消息就在QUEUE删掉了
- MANUAL 就是手动确认,需要程序显示调用channel.BasicAck
- AUTO 不是Rabbit的功能,而是Spring自己实现的。如果Listener可以无错处理完,spring就调用channel.basicAck。 如果有异常, spring就调用channel.basicReject
- SimpleMessageListenerContainer#doReceiveAndExecute
private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Throwable { //NOSONAR ... for (int i = 0; i < this.txSize; i++) { try { executeListener(channel, message); }catch (Throwable ex) { consumer.rollbackOnExceptionIfNecessary(ex); } } return consumer.commitIfNecessary(...); }
- org.springframework.amqp.rabbit.listener.BlockingQueueConsumer
public boolean commitIfNecessary(boolean locallyTransacted) throws IOException { ... if (ackRequired) { this.channel.basicAck(deliveryTag, true); } ... }
public void rollbackOnExceptionIfNecessary(Throwable ex) throws Exception { ... if (ackRequired) { this.channel.basicReject(deliveryTag, shouldRequeue); } ... }
吞吐
-
prefetch
- 限制队列发给consumer的未确认消息数量,限制消费者本地缓存消息的大小
- 默认为1, 设置spring.rabbitmq.listener.simple.prefetch
- org.springframework.amqp.rabbit.listener.BlockingQueueConsumer
this.queue = new LinkedBlockingQueue<Delivery>(prefetchCount); this.channel.basicQos(this.prefetchCount);
-
txSize
-
设置批量大小,可以批量ACK,减少ACK次数
-
txSize必须小于等于prefetch,默认为1
-
设置: spring.rabbitmq.listener.simple.transactionSize
-
SimpleMessageListenerContainer#doReceiveAndExecute
private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Throwable { //NOSONAR ... for (int i = 0; i < this.txSize; i++) { try { executeListener(channel, message); }catch (Throwable ex) { consumer.rollbackOnExceptionIfNecessary(ex); } } return consumer.commitIfNecessary(...); }
-
-
多线程并行消费
- 设置: spring.rabbitmq.listener.simple.concurrency
- 设置: spring.rabbitmq.listener.simple.maxConcurrency
-
批量投递
- org.springframework.amqp.rabbit.core.BatchRabbitTemplate
- 其实是把几个消息压缩到一个消息里面,类似hystrix collapser请求合并
- 似乎是个半成品,消费端需要自己解析这个合成消息
-
AsyncRabbitTemplate
- 实现了ReturnCallback, ConfirmCallback接口
- 把发送结果转换成Future里的结果或异常
- 通过内部线程池来定时清除超时Future
-
consumer pull
- 消费者自己拉取数据,其实和prefetch差不多,可能想要完全自定义控制的情况比较有用
- 直接调用channel
channel.basicGet(queueName, false);
-
consistent hash exchange/RabbitMQ Sharding Plugin
- 都玩到一致性hash了,要不还是换Kafka吧-。-
高可用
-
cluster
- 局域网高可用。与之对比的是Federation and/or Shovel广域网(跨机房复制)
- 对比
Federation and/or Shovel Clustering Broker 逻辑上分离,分属不同所有者 统一的逻辑Broker Brokers 可以使用不同的版本,可以不兼容 必须版本兼容 可以在不可靠的广域网上连接,需要设置TLS,用户权限 在可靠的局域网中连接,共享秘要 可以以任意拓扑结构连接 所有节点互联 CAP中强调AP CAP中强调CP 可以一部分应用federated,一部分正常 要么cluster,要么普通,二选一 -
durable
- 队列设置成durable会保存在磁盘,confirm模式下,会在写入磁盘后confirm
-
mirror queue
- durable queue 在cluster多个节点复制数据,可能因为未及时同步而丢失消息
-
quorum queue
- 3.8 以后新功能,根据Raft协议同步数据
- 性能不比mirrorqueue差,建议磁盘换个快点的
Quorum queues are designed to trade latency for throughput and have been tested and compared against mirrored queues in 3, 5 and 7 node configurations at several message sizes. In scenarios using both consumer acks and publisher confirms quorum queues have been observed to be have equal or greater throughput to classic mirrored queues.
As quorum queues persist all data to disks before doing anything it is recommended to use the fastest disks possible. Quorum queues also benefit from consumers using higher prefetch values to ensure consumers aren't starved whilst acknowledgements are flowing through the system and allowing messages to be delivered in a timely fashion.
Feature | Classic Mirrored | Quorum |
---|---|---|
Non-durable queues | yes | no |
Exclusivity | yes | no |
Per message persistence | per message | always |
Membership changes | automatic | manual |
TTL | yes | no |
Queue length limits | yes | partial (drop-head strategy only) |
Lazy behaviour | yes | partial (see Memory Limit) |
Priority | yes | no |
Dead letter exchanges | yes | yes |
Adheres to policies | yes | partial (dlx, queue length limits) |
Reacts to memory alarms | yes | partial (truncates log) |
Poison message handling | no | yes |
Global QoS Prefetch | yes | no |
网友评论