美文网首页
RabbitMQ学习(四)可靠性投递和confirm机制

RabbitMQ学习(四)可靠性投递和confirm机制

作者: kobe0429 | 来源:发表于2018-11-22 15:16 被阅读0次

RMQ可靠性投递

一.什么是RMQ的可靠性投递

1.保障消息的成功发出
2.保障MQ节点的成功接收
3.发送端收到MQ节点(broker)确认应答
4.完善的消息补偿机制
在实际生产中,很难保障前三点的完全可靠,比如在极端的环境中,生产者发送消息失败了,发送端在接受确认应答时突然发生网络闪断等等情况,很难保障可靠性投递,所以就需要有第四点完善的消息补偿机制。

二、大厂的解决方案

第一种:消息落库

消息落库流程图.JPG
流程的示意图如上所示:
step1:比如我下单成功了,对我的业务数据进行入库,同时对消息数据入库;
setp2:发送消息到MQ服务上;
step3: 按照正常的流程就是消费者监听到该消息,就根据唯一id修改该消息的状态为已消费,并给一个确认应答ack到Listener。
step4: 修改消息的投递状态;
step5: 如果出现意外情况,消费者未接收到或者Listener接收确认时发生网络闪断,接收不到,这时候就需要用到我们的分布式定时任务来从msg数据库抓取那些超时了还未被消费的消息;
step6: 对超时任务重新发送一遍;
step7: 重试机制里面要设置重试次数限制,因为一些外部的原因导致一直发送失败的,不能重试太多次,要不然会拖垮整个服务。
例如重试三次还是失败的,就把消息的status设置成2,然后通过补偿机制,人工去处理。实际生产中,这种情况还是比较少的,但是你不能没有这个补偿机制,要不然就做不到可靠性了。
优点
架构实现清晰,流程简单;
缺点
一次业务操作2次入库,高并发场景下影响性能。
有人可能就想到采用分布式事务来保证数据的一致性,但是在大型互联网中,基本很少采用事务,都是采用补偿机制。
第二种:延迟投递,做二次确认,回调检查。
既然第一种方案在高并发场景下不适合,这时候需要我们的第二种方案,流程图如下。
延迟投递实现流程.JPG
step1:完成业务操作落库后,发送消息出去;
step2: 业务消息发送后,延迟3分钟或5分钟再发送消息数据;
step3: 消费者订阅消息进行消费;
step4: 完成后重新组装一个确认消费的消息;
step5: 补偿服务监听消费者发送到特定队列的确认消息,修改消息数据库的信息;
step6: 补偿服务延迟检查生产者发出的延迟检查的消息,去消息数据库判断,如果已经消费成功,检查通过;
step7: 如果检查没通过,则向生产者发送重新发送消息的命令。

虽然第二种方案也是无法做到100%的可靠传递,在特别极端的情况,还是需要定时任务和补偿机制进行辅助的。但是第二种方案的核心是减少数据库操作,在高并发场景下,我们考虑的不是百分百的可靠性了,而是考虑可用性,性能能否扛得住这个流量,所以我能减少一次数据库操作就减少一次。我上游服务减少了一次数据库操作,我的服务性能相对而言就提高了一些,而且又能把异步callback Server补偿服务解耦出来。

confirm机制

消息确认机制.JPG
  1. 消息的确认,是指生产投递消息后,如果broker收到消息,会给生产者一个应答
  2. 生产者接受应答,用来确认这条消息是否正常发送到broker
    如何实现消息的确认机制?
    第一步在channe开启确认模式,channel.confirmSelect();
    第二步在channel添加监听,addConfirmListener,根据监听的结果对消息进行重新发送或记录日志等处理。
    代码如下:
    Producer类
package com.bfxy.rabbitmq.api.confirm;

import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer { 
    public static void main(String[] args) throws Exception {   
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("10.136.198.52");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123456");    
        //2 获取C onnection
        Connection connection = connectionFactory.newConnection();  
        //3 通过Connection创建一个新的Channel
        Channel channel = connection.createChannel();       
        //4 指定我们的消息投递模式: 消息的确认模式 
        channel.confirmSelect();        
        String exchangeName = "test_confirm_exchange";
        String routingKey = "confirm.save"; 
        //5 发送一条消息
        String msg = "Hello RabbitMQ Send confirm message!";
        channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());       
        //6 添加一个确认监听
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.err.println("-------no ack!-----------");
            }           
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.err.println("-------ack!-----------");
            }
        });     
    }
}

Consumer类

package com.bfxy.rabbitmq.api.confirm;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

public class Consumer { 
    public static void main(String[] args) throws Exception {               
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("10.136.198.52");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123456");        
        //2 获取C onnection
        Connection connection = connectionFactory.newConnection();      
        //3 通过Connection创建一个新的Channel
        Channel channel = connection.createChannel();       
        String exchangeName = "test_confirm_exchange";
        String routingKey = "confirm.#";
        String queueName = "test_confirm_queue";        
        //4 声明交换机和队列 然后进行绑定设置, 最后制定路由Key
        channel.exchangeDeclare(exchangeName,"topic",true);
        channel.queueDeclare(queueName,true,false,false,null);      channel.queueBind(queueName,exchangeName,routingKey);
        //5 创建消费者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        //6 设置信道    channel.basicConsume(queueName,true,queueingConsumer);
        //7 消费消息内容
        while (true){
        Delivery delivery = queueingConsumer.nextDelivery();
        String msg = new String(delivery.getBody());
            System.out.println("消费端:"+ msg);
        }       
    }
}

先启动消费者,再启动生产者: 消费者打印出消费成功的信息,然后重新发送消息,生产者接收到确认消费成功的消息后,打印出确认信息。
首先消费者控制台如下

comsumer.JPG
然后生产者控制台打印信息如下:
producer.JPG

相关文章

网友评论

      本文标题:RabbitMQ学习(四)可靠性投递和confirm机制

      本文链接:https://www.haomeiwen.com/subject/bkslqqtx.html