消息丢失的场景
- 消息发送时消息丢失
- 路由消息时消息丢失
- 消息未持久化消息丢失
- 消费消息时消息丢失
消息发送可靠性
AMQP协议提供的一个事务机制
一般不使用,影响吞吐量
发送方确认机制(publisher confirm)
首先生产者通过调用channel.confirmSelect方法将信道设置为confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一deliveryTag和multiple参数),这就使得生产者知晓消息已经正确到达了目的地了。
Confirm模式有三种方式实现
- 串行confirm模式:producer每发送一条消息后,调用waitForConfirms()方法,等待broker端confirm,如果服务器端返回false或者在超时时间内未返回,客户端进行消息重传。
- 批量confirm模式:producer每发送一批消息后,调用waitForConfirms()方法,等待broker端confirm。
- 异步confirm模式:提供一个回调方法,broker confirm了一条或者多条消息后producer端会回调这个方法。 我们分别来看看这三种confirm模式
异步Confirm模式
package com.rabbitmq.client;
import java.io.IOException;
/**
* Implement this interface in order to be notified of Confirm events.
* Acks represent messages handled successfully; Nacks represent
* messages lost by the broker. Note, the lost messages could still
* have been delivered to consumers, but the broker cannot guarantee
* this.
*/
public interface ConfirmListener {
void handleAck(long deliveryTag, boolean multiple) throws IOException;
void handleNack(long deliveryTag, boolean multiple) throws IOException;
}
这里就需要在发送消息之前将消息存储起来,以便于在ConfirmListener中处理消息发送成功和失败的情况,可以存储到数据库或者Redis中。
如果发送失败则需要进行消息重发,重试超过一定次数后仍然失败则需要记录日志,告警,人工处理。
消息端如何保证消息可靠性
手动确认机制
消费者消费完毕后手动地向 Broker 发送确认通知,Broker 收到确认通知后再从队列中删除对应的消息。
重试
在消息消费处理逻辑中加入重试机制,以处理一些被调用服务网络抖动等情况导致的消息消费失败的情况。
如何重试超过一定次数后仍然失败则将消息发送到死信队列。
死信队列
1、消息被否定确认使用 channel.basicNack
或 channel.basicReject
,并且此时requeue
属性被设置为false
。
2、消息在队列中的时间超过了设置的TTL(time to live)时间。
3、消息数量超过了队列的容量限制。
当一个队列中的消息满足上述三种情况任一个时,该消息就会从原队列移至死信队列,若改队列没有绑定死信队列则消息被丢弃。
死信队列和普通的业务队列没有什么差别,只不过是业务上创建用来存储处理失败的消息的队列。所以其工作方式也和业务队列相同,死信仍然需要交换机的转发到达死信队列。
根据实际的业务情况,我们可以创建专门的死信消费者对死信进行处理,或者进行人工补偿。
如何保证消息100%被消费
举个例子,用户注册赠送积分,这里赠送积分是通过消息队列进行解耦。
解决方案一、消息落库 + 定时任务 + 幂等 + 重试 + 人工补偿
用户表 + 消息表,在同一个事务中存储用户注册数据和赠送积分数据。
在事务之外执行消息发送,通过发送端confirm机制保证消息发送成功。
消费端消费消息,消费完成后进行手动ack, 这里也会出现ack时消息队列server突然宕机的情况,这时就需要保证消费端消费消息需要实现幂等(因为消息会被重发)。消息消费成功后将消息表中的消息状态设置为完成。
定时任务,定时扫描未处理的消息,进行消息重发,重发超过一定次数后标记为失败,转人工处理。
解决方案二、延迟投递 + 回调检查
上游服务完成业务处理后,发送两条消息,一条给下游服务进行业务处理,如赠送积分业务,另一条给callback服务。
下游服务接收到业务消息并处理完成之后就直接发送一条消息给callback服务,callback服务接收到消息后就知道刚才有一条消息被成功处理了,callback服务把这条消息持久到数据库中,当上游服务之前发送的延迟消息到达callback服务时进行数据库检查,如果存在则说明消息被成功消费了,如果不存在则通过PRC调用通知上游服务有消息没有处理,上游服务重新发送业务消息和延迟确定消息进行重试。
Step 1: 上游服务业务处理
上游服务 --- 【a.业务消息】 ----> 下游服务
上游服务 --- 【b.延迟确认消息】 ----> callback服务
Step 2: 下游服务业务处理
下游服务 --- 消费a消息 ---- 【c.消费确认消息】 ---> callback服务
Step 3: 消息处理情况持久化
callback服务 --- 消费【c. 消费确认消息】 ---持久化到DB(a消息已被成功消费)
Step 4: 上游服务检查消息处理情况
callback服务 --- 消费【b.延迟确认消息】 --- 检查DB
Step 5: 重试
callback服务 --- 检查DB 通过 --- 完成
callback服务 --- 检查DB 不通过 --- RPC通知上游服务
虽然这种方案也是无法做到 100% 的可靠传递,在特别极端的情况,还是需要定时任务和补偿机制进行辅助。但是该方案的核心是减少数据库操作,这个点很重要,因为这是在高并发的场景下,主要考虑性能。当然我们还是要补偿机制,即可以做到最终一致性。
网友评论