一、延迟队列
延迟队列就是比普通队列多了一个延迟属性。
单从字面意思,可以理解为这个队列是延迟的,但我们普遍默认的,是我们要实现的功能是延迟,而和队列本身是延迟队列或不是延迟队列无关。看完本篇文章就会明白,即使队列是普通队列,单给消息设置一个延迟时间,依然可以实现延迟功能。
注:不给队列加延迟属性,单给消息设置一个延迟时间,需要延迟插件才能实现
二、RabbitMQ中的 TTL 属性
TTL(Time To Live):表示消息的存活时间(毫秒)
如果一个队列或者一条消息设置了TTL,那么如果消息没有被及时处理,则会变为死信。
有两种设置TTL方式付下:
- 方式1:给队列设置TTL,那么进入该队列的消息,统一有一个存活时间
@Bean("delayQueueA")
public Queue delayQueueA() {
return QueueBuilder.durable(DELAY_QUEUEA_NAME)
.deadLetterExchange(DEAD_LETTER_EXCHANGE)
.deadLetterRoutingKey(DEAD_LETTER_QUEUEA_ROUTING_KEY)
.ttl(6000)//设置队列TTL 6秒(底层就是设置 " x-message-ttl ")
.build();
- 方式2:给单个消息设置TTL
@GetMapping("/delayMsg")
public void sendMsg2(String msg, Integer delayTime) {
log.info(delayTime/1000 + "s");
rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME, RabbitMQConfig.DELAY_QUEUEC_ROUTING_KEY,
msg+delayTime, message -> {
//单个消息设置TTL
message.getMessageProperties().setExpiration(delayTime.toString());
return message;
});
}
注意:如果两种方式都设置了,那么选TTL最短时间
有两种方式可实现延迟功能:
三、第一种延迟功能实现方式代码示例,给队列设置延迟
先看看消息流向图:
image.png
这种方式,就是利用死信机制,先让消息流向设置了延迟时间的 延迟队列,待消息到期后成为死信,便自动流向 死信队列,最后我们监听 死信队列 的消息,然后消费。(延迟队列、死信队列和正常队列是一样的,正常监听消费即可)
- 创建交换机、队列、绑定关系代码如下:
package com.zilong.mqpractice.delayqueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@Slf4j
public class RabbitMQConfig {
public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
public static final String DELAY_QUEUEA_NAME = "delay.queuea";
public static final String DELAY_QUEUEB_NAME = "delay.queueb";
public static final String DELAY_QUEUEA_ROUTING_KEY = "delay.queuea.routingkey";
public static final String DELAY_QUEUEB_ROUTING_KEY = "delay.queueb.routingkey";
public static final String DEAD_LETTER_EXCHANGE = "deadletter.exchange";
public static final String DEAD_LETTER_QUEUEA_NAME = "deadletter.queuea";
public static final String DEAD_LETTER_QUEUEB_NAME = "deadletter.queueb";
public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "deadletter.delay_6s.routingkey";
public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "deadletter.delay_30s.routingkey";
@Bean("delayExchange")
public DirectExchange delayExchange() {
log.info("create----> delayExchange");
return new DirectExchange(DELAY_EXCHANGE_NAME);
}
@Bean("delayQueueA")
public Queue delayQueueA() {
return QueueBuilder.durable(DELAY_QUEUEA_NAME)
.deadLetterExchange(DEAD_LETTER_EXCHANGE)
.deadLetterRoutingKey(DEAD_LETTER_QUEUEA_ROUTING_KEY)
.ttl(6000)
.build();
}
@Bean("delayQueueB")
public Queue delayQueueB() {
return QueueBuilder.durable(DELAY_QUEUEB_NAME)
.deadLetterExchange(DEAD_LETTER_EXCHANGE)
.deadLetterRoutingKey(DEAD_LETTER_QUEUEB_ROUTING_KEY)
.ttl(30000)
.build();
}
@Bean
public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue,
@Qualifier("delayExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEA_ROUTING_KEY);
}
@Bean
public Binding delayBindingB(@Qualifier("delayQueueB") Queue queue,
@Qualifier("delayExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEB_ROUTING_KEY);
}
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
@Bean("deadLetterQueueA")
public Queue deadLetterQueueA() {
return QueueBuilder.durable(DEAD_LETTER_QUEUEA_NAME)
.build();
}
@Bean("deadLetterQueueB")
public Queue deadLetterQueueB() {
return QueueBuilder.durable(DEAD_LETTER_QUEUEB_NAME)
.build();
}
@Bean
public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
}
@Bean
public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
}
}
- 创建消息生产者代码如下:
package com.zilong.mqpractice.delayqueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
@RequestMapping("/delayMsg")
public class Cotroller {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/6s/send")
public void sendMsg(String msg) {
log.info("6s");
rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME, RabbitMQConfig.DELAY_QUEUEA_ROUTING_KEY, msg);
}
@GetMapping("/30s/send")
public void sendMsg2(String msg) {
log.info("30s");
rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME, RabbitMQConfig.DELAY_QUEUEB_ROUTING_KEY, msg);
}
}
- 创建消费者代码如下:
package com.zilong.mqpractice.delayqueue;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class Consumer {
@RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUEA_NAME)
public void receiveA(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
log.info("收到6s,死信A: " + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUEB_NAME)
public void receiveB(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
log.info("收到30s,死信B: " + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
- 运行项目调用接口发送消息
http://localhost:8006/delayMsg/6s/send?msg=短时间
http://localhost:8006/delayMsg/30s/send?msg=长时间 - 日志如下:
2021-03-08 23:02:11.711 INFO 83451 --- [nio-8006-exec-3] c.z.mqpractice.delayqueue.Cotroller : 6s
2021-03-08 23:02:13.378 INFO 83451 --- [nio-8006-exec-4] c.z.mqpractice.delayqueue.Cotroller : 30s
2021-03-08 23:02:17.845 INFO 83451 --- [ntContainer#3-1] c.zilong.mqpractice.delayqueue.Consumer : 收到6s,死信A: 短时间
2021-03-08 23:02:43.548 INFO 83451 --- [ntContainer#2-1] c.zilong.mqpractice.delayqueue.Consumer : 收到30s,死信B: 长时间
四、第二种延迟功能实现方式代码示例,给消息设置延迟
这种方式是最简便的方式,不需要第一种那么复杂的消息流向
先看消息就想图:
image.png
利用rabbitmq延迟插件,创建一个延迟交换机,然后消息生产时设置上延迟属性(setDelay()方法,而不是setExpiration()方法),队列也用普通队列即可,不需要设置延迟属性,消费者只需要监听该队列即可。
- 创建交换机、队列、绑定关系代码如下:
package com.zilong.mqpractice.delayqueue2;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@Slf4j
public class RabbitMQConfig3 {
public static final String DELAY_EXCHANGE_NAME = "delay.exchange2";
public static final String DELAY_QUEUEC_NAME = "delay.queuec";
public static final String DELAY_QUEUEC_ROUTING_KEY = "delay.queuec.routingkey";
@Bean("delayExchange2")
public DirectExchange delayExchange2() {
log.info("create----> delayExchange2");
DirectExchange directExchange = new DirectExchange(DELAY_EXCHANGE_NAME);
//给交换机设置延迟属性
directExchange.setDelayed(true);
return directExchange;
}
@Bean("delayQueueC")
public Queue delayQueueC() {
//普通的队列
return QueueBuilder.durable(DELAY_QUEUEC_NAME)
.build();
}
@Bean
public Binding delayBindingC(@Qualifier("delayQueueC") Queue queue,
@Qualifier("delayExchange2") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEC_ROUTING_KEY);
}
}
- 创建消息生产者代码如下:
package com.zilong.mqpractice.delayqueue2;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
@RequestMapping("/delayMsg2")
public class Cotroller3 {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public void sendMsg(String msg, Integer delayTime) {
log.info(delayTime / 1000 + "s");
rabbitTemplate.convertAndSend(RabbitMQConfig3.DELAY_EXCHANGE_NAME, RabbitMQConfig3.DELAY_QUEUEC_ROUTING_KEY,
msg + delayTime, message -> {
//给消息设置延迟属性,不同于setExpiration()
message.getMessageProperties().setDelay(delayTime);
return message;
});
}
}
- 创建消费者代码如下:
package com.zilong.mqpractice.delayqueue2;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class Consumer3 {
@RabbitListener(queues = RabbitMQConfig3.DELAY_QUEUEC_NAME)
public void receivec(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
log.info("收到死信C: " + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
- 运行项目调用接口发送消息
http://localhost:8006/delayMsg2/send?msg=插件延迟消息&delayTime=6000
http://localhost:8006/delayMsg2/send?msg=插件延迟消息&delayTime=30000
-日志如下:
2021-03-08 23:30:12.225 INFO 96631 --- [nio-8006-exec-1] c.z.mqpractice.delayqueue2.Cotroller3 : 6s
2021-03-08 23:30:16.317 INFO 96631 --- [nio-8006-exec-2] c.z.mqpractice.delayqueue2.Cotroller3 : 30s
2021-03-08 23:30:18.368 INFO 96631 --- [ntContainer#4-1] c.z.mqpractice.delayqueue2.Consumer3 : 收到死信C: 插件延迟消息6000
2021-03-08 23:30:46.417 INFO 96631 --- [ntContainer#4-1] c.z.mqpractice.delayqueue2.Consumer3 : 收到死信C: 插件延迟消息30000
五、小结
实现延迟功能,就两种方式。
1.将队列设置为延迟队列。缺点是所有消息都固定的延迟时间,想要改变,就要重建一个延迟队列
2.利用延迟插件给消息设置延迟时间。缺点是插件会影响效率(没试过,官网说的)
疑问:用延迟队列,然后用setExpiration()给消息设置延迟时间可以么?
不可以,队列特点是先进先出,这样做,虽然有延迟时间,但是rabbitmq会按顺序检测信息是否死亡。例如:
先发送messageA:延迟60s,然后发送messageB:延迟6s,那么rabbitmq会先检测messageA是否变为死信,如果messageA变为死信,rabbitmq会将其丢到死信队列,然后rabbitmq才去检测messageB是否是死信.导致的结果就是,messageB会等待messageA60s被消费后,自己才能被检测到,然后被消费(messageB自己的延迟时间已经倒计时完毕,不会等messageA 60s过后才开始倒计时,也就是messageB不会等到66s,他已经是死信,只是rabbitmq没检测它)。所以,给消息设置延迟时间,还是需要按照官网方式,用延迟插件实现。
网友评论