本帖子学习一下rabbitMQ在消息的投递中的几个问题:
- 消息如何保障100%的投递成功?
- 幂等性概念详解
- 在海量订单产生的业务高峰期,如何避免消息的重复消费问题?
- Confirm确认消息、Return返回消息
1.消息如何保障100%的投递成功
1.1什么是生产端的可靠性投递
- 保障消息的成功发出
- 保障MQ节点的成功接收
- 发送端收到MQ节点(Broker)确认应答
- 完善的消息进行补偿机制
1.1.1消息落库,对消息状态进行打标
下面我以一个订单为例对上图进行一个流程性的讲解:
- 第一步我们接收到一个订单的消息,首先需要把这个订单给持久化到数据库中,然后再把消息给存储到消息表中
- 第二步我们把消息费发送到rabbitmq的队列中
- 第三部broker接收到消息之后会发一个消息的相应给生产端的确认监听器
- 第四步生产端接收到了确认的消息之后就会在消息数据库中把该条消息进行打标,证明这条消息已经成功的发送到了MQ中
- 第五步就是一个分布式的定时任务(确认同一个时间点只有一个定时任务查询数据库),来查找在消息发出之后的一定时间段之后该条消息的状态仍然未被打标的消息,然后进行重新的发送
- 在第五步的基础上,每次一条消息发送之后都会进行打标,如果没有打标过一段时间之后就会进行再次的消息发送,这样的次数也是有限制的,比如是3次,因为不能无休止的一直的发对吧,如果超过三次就证明这条消息无法成功发送了。
1.1.2保障MQ我们思考如果第一种可靠性投递,在高并发的场景下是否适合?
这个时候我们就换一种思路:消息采用延迟投递,做二次确认,回调检查
这里的步骤我详细的解释一下:
-
1.就还是先落库,一定要落库成功之后再发送消息,而且需要强调一点互联网大厂一般不加任何的分布式事务,因为事务会造成很大的性能问题。
-
2.在第一步把订单信息给持久化之后upstream service也就是上游服务就会发送消息给MQ Broker也就是队列,然后同时也发送一个延迟消息就是图中的step2,因为是延迟消息所以会过一段时间再发送,我们继续讲第一步发送的消息进入到了Broker之后,Downstream service也就是下游服务就会去监听消息然后消费,如果接收到消息之后就会发送一个confirm信息到broker中。 然后callback service就会监听这个confirm消息,如果接收到这个消息就会把消息会持久化到数据库中
- 过了一段时间之后,延迟消息开始发送到broker,而下游服务是不会监听这个消息的,callback service会监听这个消息,拿这个消息与之前第一次发送的落库的消息去对比,如果数据库中有这个消息那么就证明第一次发送的消息成功了,那么就到此结束,如果数据库中没有这条消息的话,那么这个服务就会发送一个rpc请求到上游服务,然后就可以根据消息中的id去找相应的订单信息组成消息开始第二次的消息发送;
上面就是两种方式来保障消息的可靠性,我们可以对比一下第二种相对于第一种的消息发送架构,相对来说减少了数据库持久化或者说操作的次数:1.第一种方式每次消息的发送都首先要持久化两次数据库,持久化订单信息然后持久化消息的记录,如果消息失败第二次依然还是要持久化数据库 2.第二种方式的同一条消息仅仅持久化一次,订单也是,而且没有修改状态的数据库操作,相对于第一种减少了一些数据库的操作,而这些操作如果随着并发数的提升,对于数据库性能会相对的提升很多,虽然一两次的消息发送不明显,可是当量级的请求上去,这种微小的资源节约将会对服务器来说,提升很多性能。
2. 幂等性
2.1概念
上图中就是用了乐观锁的机制,没有加version的控制的话,如果并发情况下两条SQL在执行这个语句的时候,如果count是商品的数量,那么如果一个商品仅剩1件的时候,两条SQL如果同时执行,那么数量很可能就会成为负数,这里就加上version的控制,每次执行完,version加1,这样一条SQL执行完,再次查询version=1的数据就没有了,这就用到了乐观锁机制。
而幂等性就好比我们举的例子,一件事情我们执行一次,两次,甚至几百次,最后的结果总是相同的,这就是幂等性。
2.2 消费端-幂等性保障
在海量订单产生的业务高峰期,如何避免消息的重复消费问题?
消费端实现幂等性,就意味着,我们的消息永远不会消费多次,即使我们收到了多条一样的消息。
2.3 业界主流的幂等性操作
- 唯一ID+指纹码机制,利用数据库主键去重
- 利用redis的原子性去重
在利用redis原子特性实现的时候,我们要注意这两个问题,就是因为redis和数据库是两个独立的存在,各自有各自的事务,所以两个一起使用的话如何实现原子性这是跟问题;
第二个问题就是如果不落库,存储在redis上的话,那么定时同步的策略就需要保障他的可靠性,同步失败怎么办,有没有什么措施,这些都需要考虑的问题。
3.Confirm确认消息
3.1代码实现
producer
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建一个ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2.通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3.通过connection创建一个channel
Channel channel = connection.createChannel();
channel.confirmSelect();
String msg = "this is a rabbit mq,I need a feedback";
channel.basicPublish("test_confirm_exchange","confirm.save",null,msg.getBytes());
channel.addConfirmListener(new ConfirmListener() {
//消息确认成功
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("---消息确认成功!-----");
}
//消息确认失败
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("---消息确认失败!-----");
}
});
}
consumer
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1.创建一个ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//设置mq连接的自动恢复
connectionFactory.setAutomaticRecoveryEnabled(true);
//设置mq连接自动回复的时间间隔
connectionFactory.setNetworkRecoveryInterval(3000);
//2.通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3.通过connection创建一个channel
Channel channel = connection.createChannel();
//4.声明(创建)一个队列
String queueName = "test_confirm_queue";
String exchangeName = "test_confirm_exchange";
String exchangeType = "topic";
String routingKey = "confirm.#";
channel.exchangeDeclare(exchangeName,exchangeType,true);
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName,exchangeName,routingKey);
//5.创建一个消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName,true,queueingConsumer);
while(true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消费端接受的消息:"+msg);
}
}
4.Return消息机制
return消息其实就是去获取路由不成功的消息,然后做后续的处理的机制而已:
Return消息机制流程
4.1代码实现
consumer
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1.创建一个ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2.通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3.通过connection创建一个channel
Channel channel = connection.createChannel();
//4.声明(创建)一个队列
String queueName = "return_queue";
String exchangeName = "return_exchange";
String exchangeType = "topic";
String routingKey = "return.#";
String errRoutingKey = "abc.#";
channel.exchangeDeclare(exchangeName,exchangeType,true);
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName,exchangeName,errRoutingKey);
//5.创建一个消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName,true,queueingConsumer);
while(true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消费端接受的消息:"+msg);
}
}
producer
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建一个ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2.通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3.通过connection创建一个channel
Channel channel = connection.createChannel();
channel.confirmSelect();
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange,
String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("----handle return ----");
System.out.println("replayCode: "+replyCode);
System.out.println("replyText: "+replyText);
System.out.println("exchange: "+exchange);
System.out.println("routingKey: "+routingKey);
System.out.println("properties: "+properties);
System.out.println("body: "+new String(body));
}
});
String msg = "this is a rabbit mq,I need a feedback";
channel.basicPublish("return_exchange","return.save",true,null,msg.getBytes());
}
在上面的consumer我们把exchange绑定一个abc.#的路由,然后这样生产端发送消息,肯定就无法路由到序列了,这样就会出现路由失败的情况,那么在生产端的return方法,我们就可以看到路由失败的信息:
网友评论