消息如何保证100%的投递成功
- 保障消息的成功发出
- 保障MQ节点的成功接收
- 发送端收到MQ节点(Broker) 确认应答
- 完善的消息补偿机制




幂等性概念
幂等性是什么?
image.png
消费端-幂等性保障
在海量订单产生的业务高峰期。如果避免消息的重复消费问题
消费端实现幂等性,就意味着 我们的消息永远不会消费多次,即使我们收到了多条一样的消息。
业界主流的幂等性操作
-
唯一ID+指纹码 机制。 利用数据库主键去重
- 唯一ID +指纹码机制。利用数据库主键去重
select count(1) from t_order where id ='唯一ID+指纹码'
- 好处: 实现简单
- 坏处: 高并发下有数据库写入的瓶颈问题
- 解决方案: 跟进ID 进行分库分表进行算法路由
-
利用Redis的原子性去实现。
- 使用Redis进行幂等,需要考虑的问题
- 第一: 我们是否要进行数据落库,如果落库的话,关键解决的问题是数据库和缓存如何做到原子性?
- 第二: 如果不进行落库,那么都存到缓存中,如何设置定时同步的策略?
Confirm 确认消息
理解Confirm消息确认机制:
- 消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答。
- 生产者记性接收应答,用来确认这条消息是否正常的发送到Brokeer。 这种方式也是消息的可靠性投递的核心保障!
如何实现Confirm确认消息
- 第一步: 在channel上开启确认模式:
channel.confirmSelect()
- 第二步: 在channel上添加监听:
addConfirmListener
.监听成功和失败的返回结果。根据具体的结果对消息进行重新发送,或记录日志等后续处理!

/**
* 消息投递模式
* confirm 模式
* 消息确认模式
*
* @author yangHX
* createTime 2019/3/20 23:09
*/
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = RabbitMqUtil.getConnectionFactory();
Connection connection = connectionFactory.newConnection();
//通过collection获取一个新的channel
Channel channel = connection.createChannel();
//指定我们消息的投递模式 : 消息的确认模式
channel.confirmSelect();
String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.save";
//发送一条消息
String msg = "Hello RabbitMQ Send message";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
//添加一个确认监听
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("------------ACK------------");
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("-------------NO ACK------------------");
}
});
}
}
/**
* 消息投递模式
* confirm 模式
* 消息确认模式
*
* @author yangHX
* createTime 2019/3/20 23:09
*/
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = RabbitMqUtil.getConnectionFactory();
Connection connection = connectionFactory.newConnection();
//通过collection获取一个新的channel
Channel channel = connection.createChannel();
//指定我们消息的投递模式 : 消息的确认模式
channel.confirmSelect();
String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.save";
//发送一条消息
String msg = "Hello RabbitMQ Send message";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
//添加一个确认监听
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("------------ACK------------");
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("-------------NO ACK------------------");
}
});
}
}
网友评论