谈到MQ,一定会涉及一个问题,如何进行可靠性传输,换句话说,就是当发生不可预见的情况时(broker重启,sender重启,receiver重启,网线被挖了等),保证消息的送达。我们先再回顾一波rabbitmq消息传输的过程:
image.png
- 消息从producer传输到exchange。
- 消息从exchange到queue。
- cusumer监听queue接收消息。
搞清楚把大象装进冰箱需要这3步以后,我们一步一步来谈论:
1 保证producer的消息发送到了exchange
- 方法1: rabbitmq提供了事物支持,通过channel.txSelect开启事务,用channel.txCommit提交,用channel.txRollback进行回滚。 但是这个办法会严重影响mq的效率,有统计说开始事务以后,效率会下降10倍。
- 方法2: 发送方确认机制(publisher confirm):producer发送消息以给exchange会带上一个id号,exchange接收成功以后,会给producer发送一个ack确认;exchange如果接收失败,会给producer发送一个nack命令,producer可以进行处理(重发或者其他),如果要重发的话,那么producer自己就需要对消息进行持久化。
springboot下方法2的使用:
1、配置spring.rabbitmq.publisher-confirms=true
开启 publisher confirm。
2、实现RabbitTemplate.ConfirmCallback接口并实现confirm方法。
public class HelloSender implements RabbitTemplate.ConfirmCallback{
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
System.out.println("消息已经送达 exchange,correlationData="+correlationData);
}else {
System.out.println("消息未送达exchange,原因是:"+cause+", correlationData="+correlationData);
}
}
3、send的时候需要setConfirmCallback:
public void send() {
User u = new User();
u.setName("周杰伦");
u.setAge(23);
rabbitTemplate.setConfirmCallback(this);
this.rabbitTemplate.convertAndSend("hello", u);
System.out.println("Sender : " + u);
}
注意:这两个方法是互斥的,两个同时使用,rabbitmq会报错。
2 保证消息发送到了queue
1、添加配置:
spring.rabbitmq.template.mandatory=true
spring.rabbitmq.publisher-returns=true
当这spring.rabbitmq.template.mandatory
配置为false时,exchange找不到匹配的queue的时候,消息就丢弃掉。当这两个属性为true时,exchange找不到匹配的队列,会给producer发送消息。
2、implement RabbitTemplate.ReturnCallback,并实现returnedMessage方法:
public class HelloSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("消息没有送达queue:message="+message+",replyCode="+replyCode+
" ,replyText="+replyText+",exchange="+exchange+",routingKey="+routingKey);
}
3、发送的时候需要setReturnCallback
public void send() {
User u = new User();
u.setName("周杰伦");
u.setAge(23);
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
this.rabbitTemplate.convertAndSend("hello", u);
System.out.println("Sender : " + u);
}
3 持久化
当消息达到了exchange或者queue,但是还未处理 broker就重启了怎么办呢? 答案是持久化:
image.png
image.png
exchange和queue都是把通过Durability控制是否持久化,Durable表示持久化,Transient表示暂存。但是!这里的持久化是指exchange或者queue的自身元数据的持久化,并不是消息的持久化,我们还需要将消息持久化,通过将消息的投递模式(BasicProperties中的deliveryMode属性)设置为2即可实现消息的持久化。
public void sendPersistent() {
User u = new User();
u.setName("周杰伦");
u.setAge(23);
byte[] userByte=null;
try {
userByte=u.toString().getBytes("utf-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
MessageProperties msgProperties=MessagePropertiesBuilder.newInstance().setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
Message msg= MessageBuilder.withBody(userByte).andProperties(msgProperties).build();
this.rabbitTemplate.convertAndSend("persistentQueue", msg);
System.out.println("Sender : " + u);
}
4 保证消息被成功消费
springboot下 ack的参数:
#manual - 手动,需要自己调用代码去ack ,
#auto - 自动(默认),在cosumer方法执行完成以后自动进行ack ,
#none - 没有ack
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
- manual - 手动,需要自己调用代码去ack ,
- auto - 自动(默认),在cosumer方法执行完成以后自动进行ack ,
- none - 没有ack
所以我们要么配置auto(默认)要么配置manual,可以保证我们消费者消费完成再删queue里的消息。配置成none则无法保证。
还有一点需要注意,rabbitmq等待ack没有超时机制,如果链接不断,那么它会一直等待ack或者nack的返回。
另外,如果配置了ack,由于方法跑了一半挂掉启动的时候消息会再次被收到,所以cosumer的方法逻辑必须做到幂等。
手动ack代码:
@RabbitListener(queues = "persistentQueue")
@RabbitHandler
public void process(Channel channel, Message message) {
try {
System.out.println("Receiver1 : " + new String(message.getBody(),"utf-8"));
//消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
//ack返回false,并重新回到队列,api里面解释得很清楚
//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
//拒绝消息
//channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
} catch (Exception e) {
e.printStackTrace();
}
}
网友评论