美文网首页
rabbitMQ - 5 可靠性传输

rabbitMQ - 5 可靠性传输

作者: cf6bfeab5260 | 来源:发表于2019-04-28 12:18 被阅读0次

谈到MQ,一定会涉及一个问题,如何进行可靠性传输,换句话说,就是当发生不可预见的情况时(broker重启,sender重启,receiver重启,网线被挖了等),保证消息的送达。我们先再回顾一波rabbitmq消息传输的过程:


image.png
  1. 消息从producer传输到exchange。
  2. 消息从exchange到queue。
  3. cusumer监听queue接收消息。
    搞清楚把大象装进冰箱需要这3步以后,我们一步一步来谈论:

1 保证producer的消息发送到了exchange

  • 方法1: rabbitmq提供了事物支持,通过channel.txSelect开启事务,用channel.txCommit提交,用channel.txRollback进行回滚。 但是这个办法会严重影响mq的效率,有统计说开始事务以后,效率会下降10倍。
  • 方法2: 发送方确认机制(publisher confirm):producer发送消息以给exchange会带上一个id号,exchange接收成功以后,会给producer发送一个ack确认;exchange如果接收失败,会给producer发送一个nack命令,producer可以进行处理(重发或者其他),如果要重发的话,那么producer自己就需要对消息进行持久化。
    springboot下方法2的使用:
    1、配置spring.rabbitmq.publisher-confirms=true 开启 publisher confirm。
    2、实现RabbitTemplate.ConfirmCallback接口并实现confirm方法。
public class HelloSender implements RabbitTemplate.ConfirmCallback{
 @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
       if(ack){
           System.out.println("消息已经送达 exchange,correlationData="+correlationData);
       }else {
           System.out.println("消息未送达exchange,原因是:"+cause+", correlationData="+correlationData);
       }
    }

3、send的时候需要setConfirmCallback:

public void send() {
        User u = new User();
        u.setName("周杰伦");
        u.setAge(23);
        rabbitTemplate.setConfirmCallback(this); 
        this.rabbitTemplate.convertAndSend("hello", u);
        System.out.println("Sender : " + u);
    }

注意:这两个方法是互斥的,两个同时使用,rabbitmq会报错。

2 保证消息发送到了queue

1、添加配置:

spring.rabbitmq.template.mandatory=true
spring.rabbitmq.publisher-returns=true

当这spring.rabbitmq.template.mandatory配置为false时,exchange找不到匹配的queue的时候,消息就丢弃掉。当这两个属性为true时,exchange找不到匹配的队列,会给producer发送消息。
2、implement RabbitTemplate.ReturnCallback,并实现returnedMessage方法:

public class HelloSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
 @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("消息没有送达queue:message="+message+",replyCode="+replyCode+
                " ,replyText="+replyText+",exchange="+exchange+",routingKey="+routingKey);
    }

3、发送的时候需要setReturnCallback

public void send() {
        User u = new User();
        u.setName("周杰伦");
        u.setAge(23);
        
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
        this.rabbitTemplate.convertAndSend("hello", u);
        System.out.println("Sender : " + u);
    }

3 持久化

当消息达到了exchange或者queue,但是还未处理 broker就重启了怎么办呢? 答案是持久化:


image.png
image.png

exchange和queue都是把通过Durability控制是否持久化,Durable表示持久化,Transient表示暂存。但是!这里的持久化是指exchange或者queue的自身元数据的持久化,并不是消息的持久化,我们还需要将消息持久化,通过将消息的投递模式(BasicProperties中的deliveryMode属性)设置为2即可实现消息的持久化。

public void sendPersistent() {
        User u = new User();
        u.setName("周杰伦");
        u.setAge(23);
        byte[] userByte=null;
        try {
            userByte=u.toString().getBytes("utf-8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        MessageProperties msgProperties=MessagePropertiesBuilder.newInstance().setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
        Message msg= MessageBuilder.withBody(userByte).andProperties(msgProperties).build();

        this.rabbitTemplate.convertAndSend("persistentQueue", msg);
        System.out.println("Sender : " + u);
    }

4 保证消息被成功消费

springboot下 ack的参数:

#manual - 手动,需要自己调用代码去ack ,
#auto - 自动(默认),在cosumer方法执行完成以后自动进行ack ,
#none - 没有ack
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
  • manual - 手动,需要自己调用代码去ack ,
  • auto - 自动(默认),在cosumer方法执行完成以后自动进行ack ,
  • none - 没有ack

所以我们要么配置auto(默认)要么配置manual,可以保证我们消费者消费完成再删queue里的消息。配置成none则无法保证。
还有一点需要注意,rabbitmq等待ack没有超时机制,如果链接不断,那么它会一直等待ack或者nack的返回。
另外,如果配置了ack,由于方法跑了一半挂掉启动的时候消息会再次被收到,所以cosumer的方法逻辑必须做到幂等
手动ack代码:

@RabbitListener(queues = "persistentQueue")
    @RabbitHandler
    public void process(Channel channel, Message message) {
        try {
            System.out.println("Receiver1  : " + new String(message.getBody(),"utf-8"));
            //消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
             channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

            //ack返回false,并重新回到队列,api里面解释得很清楚
            //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            //拒绝消息
            //channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

相关文章

网友评论

      本文标题:rabbitMQ - 5 可靠性传输

      本文链接:https://www.haomeiwen.com/subject/crvqnqtx.html