过期时间TTL
TTL,Time to Live的简称,即过期时间。在RabbitMQ中,你可以对消息或者队列设置过期时间.
你可以通过两种方式来设置消息的TTL:
- 通过队列属性设置,即在队列初始化的时候设置一个消息的过期时间,那么进入这个队列的所有消息都会按照这个时间来失效。
原生Java代码实现为:
HashMap<String, Object> args = new HashMap<>();
// 这里为队列设置消息过期时间
args.put("x-message-ttl", 6000);
ConnectionFactory connectionFactory = new ConnectionFactory();
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
- 单独为每个消息设置过期时间。如果同时存在队列设置的消息过期时间和消息本身的过期时间,以更小的为准。
原生Java代码实现为:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
// 持久化消息
builder.deliveryMode(2);
// 设置过期时间为6S
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName,routingKey,mandatory,properties,message);
死信
- 被拒绝的消息可以看作死信.
- 过期的消息可以看作死信.
- 队列爆满无法投递的消息可以看作死信.
死信队列
DLX,全称为Deal-Letter-Exchange,死信交换机。当消息在一个队列中变成了死信后,它可以被重定向到另一个交换机,而这个用来接收死信的交换机就是DLX,与这个死信交换机绑定的队列我们通常叫死信队列。
Java原生代码实现:
ConnectionFactory connectionFactory = new ConnectionFactory();
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 死信交换机
channel.exchangeDeclare("dlx_exchange", "direct");
HashMap<String, Object> args = new HashMap<>();
// 帮当前队列设置参数,如果出现死信,转发到死信交换机
args.put("x-dead-letter-exchange", "dlx_exchange");
// 把刚才的参数配置到这个队列
channel.queueDeclare("myqueue", false, false, false, args);
死信队列的链路过程
image.pngSpringBoot中实现延迟消息
pom.xml 安装依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
配置文件(这里只是简单的配置,生产上根据自己的需求配置)
server:
port: 8053
spring:
application:
name: rabbitmq
#配置rabbitMq 服务器
rabbitmq:
host: 192.168.11.131
port: 5672
username: root
password: root
RabbitMQDelayConfig
package com.xjm.mid.compent.rabbitmq.config;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @author jaymin
*/
@Configuration
@Slf4j
public class RabbitMQDelayConfig {
/**
* 延迟队列,把消息放在这里会自动过期
*/
public static final String DELAY_QUEUE = "jay.delay.queue";
/**
* 延迟队列交换机
*/
public static final String DELAY_EXCHANGE = "jay.delay.exchange";
/**
* 延迟队列路由
*/
public static final String DELAY_ROUTINGKEY = "jay.delay.routingKey";
/**
* 死信队列
*/
public static final String DEAD_LETTER_QUEUE = "jay.dlx.queue";
/**
* 死信交换机
*/
private static final String DEAD_LETTER_EXCHANGE = "jay.dlx.exchange";
/**
* 死信队列路由
*/
private final String DEAD_LETTER_ROUTINGKEY = "jay.dlx.routingKey";
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 当交换机无法正确投递消息的时候,RabbitMQ会调用Basic.Return命令将消息返回给生产者
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause));
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message));
return rabbitTemplate;
}
/**
*
* @return FanoutExchange
*/
@Bean
public FanoutExchange delayExchange() {
return new FanoutExchange(DELAY_EXCHANGE);
}
/**
* 延迟队列配置
* <p>
* 1、params.put("x-message-ttl", 5 * 1000);
* 第一种方式是直接设置 Queue 延迟时间 但如果直接给队列设置过期时间,这种做法不是很灵活,(当然二者是兼容的,默认是时间小的优先)
* 2、rabbitTemplate.convertAndSend(book, message -> {
* message.getMessageProperties().setExpiration(2 * 1000 + "");
* return message;
* });
* 第二种就是每次发送消息动态设置延迟时间,这样我们可以灵活控制
**/
@Bean
public Queue delayQueue() {
Map<String, Object> args = new HashMap<>();
// x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTINGKEY);
return new Queue(DELAY_QUEUE, true, false, false, args);
}
@Bean
public Binding delayBinding() {
return BindingBuilder.bind(delayQueue()).to(delayExchange());
}
@Bean
public Queue dlxQueue() {
return new Queue(DEAD_LETTER_QUEUE, true);
}
/**
*
**/
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
@Bean
public Binding dlxBinding() {
return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DEAD_LETTER_ROUTINGKEY);
}
}
生产者
package com.xjm.mid.compent.rabbitmq.web;
import com.xjm.mid.compent.rabbitmq.config.RabbitMQDelayConfig;
import com.xjm.mid.compent.rabbitmq.model.Letter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
@RestController
@RequestMapping("/hello")
@Slf4j
public class HelloController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/delay")
public void sendMsg(){
Letter letter = new Letter();
letter.setRecipient("福尔摩斯");
letter.setContext("您有新的订单需要处理!");
String expiredTime = "5000";
rabbitTemplate.convertAndSend(RabbitMQDelayConfig.DELAY_EXCHANGE,RabbitMQDelayConfig.DELAY_ROUTINGKEY,letter,message->{
message.getMessageProperties().setExpiration(expiredTime);
return message;
});
log.info("[发送时间] - [{}]", LocalDateTime.now());
}
}
消息者
package com.xjm.mid.compent.rabbitmq.web;
import com.rabbitmq.client.Channel;
import com.xjm.mid.compent.rabbitmq.config.RabbitMQDelayConfig;
import com.xjm.mid.compent.rabbitmq.model.Letter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.time.LocalDateTime;
@Component
@Slf4j
public class CustomerController {
@RabbitListener(queues = {RabbitMQDelayConfig.DEAD_LETTER_QUEUE})
public void listenerDelayQueue(Letter letter, Message message, Channel channel) {
log.info("[listenerDelayQueue 监听的消息] - [消费时间] - [{}] - [{}]", LocalDateTime.now(), letter.toString());
try {
// TODO 通知 MQ 消息已被成功消费,可以ACK了
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
log.error("出了点小BUG,问题很大");
}
}
}
TTL无法解决的问题
死信队列确实可以用来支持延迟消息的发送,但是由于队列的原因,所有的消息都是FIFO的,因此,放到TTL队列中的消息最好是保持过期时间一致;如果读者想实现不同的过期时间的消息都放一个队列中,那么我建议安装rabbitmq_delayed_message_exchange来解决这个问题。
网友评论