发送端
![](https://img.haomeiwen.com/i3824541/ddcbe78f06203e4d.png)
从消息发送的链路可以看出,消息最终到达Queue才算真正的流转成功。
所以消息流转出错可能有2个地方一个是从生产者到路由,还有一个就是从路由到队列。
从生产者到路由的确认监听ConfirmCallback
消息是否成功发送到路由的确认,这个监听有2个参数一个是是否到达一个是未到达的原因
![](https://img.haomeiwen.com/i3824541/85924ebe6fc342ce.png)
从路由到队列的监听returnedMessage
如果消息未从路由成功发送到队列那么会走这个回调,这里会把消息的整个明细返回
![](https://img.haomeiwen.com/i3824541/f7e96c404dab6b44.png)
实现
配置
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
监听设置
@Component
public class ComfirmListener implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
void init(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
//消息发送到路由 true表明发送到路由 flase表明失败
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("-------"+b);
System.out.println("-------"+s);
System.out.println("-------"+correlationData);
}
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("-------returnedMessage"+returnedMessage.toString());
}
}
测试
//测试消息未到路由,ethan.exchange_topics_0005不存在这个路由
//rabbitTemplate.convertAndSend("ethan.exchange_topics_0005","123","123");
//测试消息未到队列,123不存在
rabbitTemplate.convertAndSend("ethan.exchange_topics","123","123");
confirm 这个回调无论成功还是失败都会回调
消费端Ack
默认情况下是自动确认消息的,如果消费者在根据消息处理业务逻辑的时候发生异常,这个时候相关业务无法完成,设置成自动确认就会有问题。下面我们来看如何设置消息确认机制为手动确认。
配置
@Configuration
public class RabbitConsumerConfig {
@Bean
public SimpleRabbitListenerContainerFactory messageListenerContainer() {
SimpleRabbitListenerContainerFactory container = new SimpleRabbitListenerContainerFactory();
container.setConnectionFactory(rabbitConnectionFactory());
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//限流表示每次消费端拉取一条消息进行消费直到收到确认完成后在拉取下一条
container.setPrefetchCount(1);
container.setMessageConverter(new MessageConverter() {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
return new Message(object.toString().getBytes(), messageProperties);
}
@SneakyThrows
@Override
public Object fromMessage(Message message) throws MessageConversionException {
return new String(message.getBody(),"utf-8");
}
});
return container;
}
// @RabbitHandler
// public void process(String hello,Channel channel, Message message) throws IOException {
// System.out.println("HelloReceiver收到 : " + hello +"收到时间"+new Date());
// try {
// //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
// channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
// System.out.println("receiver success");
// } catch (IOException e) {
// e.printStackTrace();
// //丢弃这条消息
// //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
// System.out.println("receiver fail");
// }
//
// }
// @RabbitListener(queues = "queue_topic_ack", containerFactory = "messageListenerContainer")
// public void processMessage(String content,Channel channel,Message message) throws IOException {
// System.out.println("content===>"+content);
// if (channel!=null){
// System.out.println("nulll----");
// channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
// }
//
// }
@Bean
public CachingConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
}
设置全局监听
@Component
public class MsgHander {
//这里要主要哦一定要自定义containerFactory不然会抛异常unknown delivery tag 1
@RabbitListener(queues = "queue_topic_ack",containerFactory = "messageListenerContainer")
public void processMessage(Channel channel,Message message) throws IOException {
try{
//签收消息
System.out.println("---签收消息");
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e){
System.out.println("---签收消息异常");
//处理抛出异常,如果重新把消息放回队列则requeue设置为true否则设置为false
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
}
}
}
这里解释下2个参数的含义
deliveryTag(唯一标识 ID):它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数。
multiple:批处理标志,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息。
注意这里要自定义containerFactory不要写在.yml里面
代码
发送端comfirm:
https://gitee.com/ethanlab/rabbitmq/tree/master/rabbit-cofirms-producer
消费ack:
https://gitee.com/ethanlab/rabbitmq/tree/master/rabbit-ack-consumer
https://gitee.com/ethanlab/rabbitmq/tree/master/rabbit-ack-produce
网友评论