1、引入rabbitMQ相关的jar包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、配置文件设置
spring.rabbitmq.host=xx.xx.xx.xx
spring.rabbitmq.port=xxxx
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
3、RabbitMq配置类
@Configuration
public class RabbitConfig {
/**
* 方法rabbitAdmin的功能描述:动态声明queue、exchange、routing
* @param connectionFactory
* @return
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
//声明死信队列(Fanout类型的exchange)
Queue deadQueue = new Queue("dead_queue");
// 死信队列交换机
FanoutExchange deadExchange = new FanoutExchange("exchange_dead_queue");
rabbitAdmin.declareQueue(deadQueue);
rabbitAdmin.declareExchange(deadExchange);
rabbitAdmin.declareBinding(BindingBuilder.bind(deadQueue).to(deadExchange));
// 正常队列Direct交换机
DirectExchange exchange = new DirectExchange("direct_exchange");
Queue couponQueue = queue("direc_queue");
//声明消息队列(Direct类型的exchange)
rabbitAdmin.declareQueue(couponQueue);
rabbitAdmin.declareExchange(exchange);
//绑定交换机路由
rabbitAdmin.declareBinding(BindingBuilder.bind(couponQueue).to(exchange).with("routing"));
return rabbitAdmin;
}
public Queue queue(String name) {
Map<String, Object> args = new HashMap<>();
// 设置死信队列
args.put("x-dead-letter-exchange", "exchange_dead_queue");
args.put("x-dead-letter-routing-key", "dead_routing");
// 设置消息的过期时间, 单位是毫秒
args.put("x-message-ttl", 5000);
// 是否持久化
boolean durable = true;
// 仅创建者可以使用的私有队列,断开后自动删除
boolean exclusive = false;
// 当所有消费客户端连接断开后,是否自动删除队列
boolean autoDelete = false;
return new Queue(name, durable, exclusive, autoDelete, args);
}
}
4、发送端
@RestController
@RequestMapping("/mq")
public class RabbitmqController {
@Autowired
private RabbitSender rabbitSender;
@RequestMapping("/send")
public Object sendMsg(String name) {
SendMessage sendMessage = new SendMessage();
sendMessage.setId(1);
sendMessage.setAge(20);
sendMessage.setName(name);
JSONObject jsonObject = (JSONObject)JSONObject.toJSON(sendMessage);
String content = jsonObject.toJSONString();
//绑定交换机和路由,把对象通过json转成string传递
rabbitSender.sendMessage("direct_exchange", "routing", content);
return "发送成功";
}
}
5、接收端,监听死信队列实现延迟的效果
@RabbitListener(queues = "direc_queue")
public void process(String sendMessage, Channel channel, Message message) throws Exception {
try {
// 参数校验
Assert.notNull(sendMessage, "sendMessage 消息体不能为NULL");
// TODO 处理消息
// 确认消息已经消费成功
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
try {
// TODO 保存消息到数据库
// 确认消息已经消费成功
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception dbe) {
// 确认消息将消息放到死信队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
}
网友评论