Confirm
消息消费后的回调,对消费结果进行确认
Return
当消息路由不可达时,触发回调
配置
#--------------------------------------------rabbitmq--------------------------------------------
#连接工厂
spring.rabbitmq.addresses=xxxx:5677,xxx:5678
spring.rabbitmq.username=guest
spring.rabbitmq.password=qwg-rabbitmq@guest
#虚拟机
spring.rabbitmq.virtual-host=qwg-app-dev
#------------生产端
# 开启:消息确认
spring.rabbitmq.publisherConfirms= true
# 开启:路由不可达的消息返回
spring.rabbitmq.publisher-returns= true
# 设置true 监听器会收到:路由不可达的消息,从而可对路由不可达的消息进行处理,保证消息的路由成功;如果为false,那么Broker会自动删除该消息
spring.rabbitmq.template.mandatory= true
#------------消费端
# 手工签收
spring.rabbitmq.listener.simple.acknowledge-mode=manual
生产端
package com.finlay.scaffold.boot;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RabbitSender {
private static final String EXCHANGE_NAME = "boot.exchange";
private static final String ROUTING_KEY = "springboot.hello";
// private static final String ROUTING_KEY = "springboot.hexllo"; ----------当routing_key 或 exchange不存在时:触发ReturnCallback
@Autowired
private RabbitTemplate rabbitTemplate;
//confirm
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("*************消息发送成功**************");
System.out.println("ack签收结果------------>:" + b);
System.out.println("若发生异常,异常信息------------>:" + s);
}
};
//return
final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText,
String exchange, String routingKey) {
System.out.println("**************Return********************");
System.out.println("replyCode:" + replyCode);
System.out.println("replyText:" + replyText);
System.out.println("exchange:" + exchange);
System.out.println("routingKey:" + routingKey);
}
};
public void sendString() {
String msg = "rabbitmq--------->springboot--------->hello";
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, msg);
}
}
消费端
package com.finlay.scaffold.boot;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
Component
public class RabbitReceiver {
private static final String EXCHANGE_NAME = "boot.exchange";
// private static final String ROUTING_KEY = "springboot.#";
private static final String ROUTING_KEY = "springboot.hello";
private static final String QUEUE_NAME = "boot.queue";
//直接通过@RabbitListener,完成QUEUE,EXCHANGE 的【声明、绑定】
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = QUEUE_NAME,
durable = "true"),
exchange = @Exchange(value = EXCHANGE_NAME,
type = ExchangeTypes.TOPIC,
durable = "false"),
key = ROUTING_KEY
))
@RabbitHandler
public void rec(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
try {
channel.basicQos(1);
System.out.println("receiver: " + msg);
channel.basicAck(tag, false);
} catch (Exception e) {
//不建议再放回队列,可以采用ConfirmCallback 机制进行处理
throw new RuntimeException(e);
}
}
}
网友评论