在 RocketMQ 中,提供定时消息的功能。
定时消息,是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。
不过,RocketMQ 暂时不支持任意的时间精度的延迟,而是固化了 18 个延迟级别。如下表格:
延迟级别 | 时间 | 延迟级别 | 时间 | 延迟级别 | 时间 |
---|---|---|---|---|---|
1 | 1s | 7 | 3m | 13 | 9m |
2 | 5s | 8 | 4m | 14 | 10m |
3 | 10s | 9 | 5m | 15 | 20m |
4 | 30s | 10 | 6m | 16 | 30m |
5 | 1m | 11 | 7m | 17 | 1h |
6 | 2m | 12 | 8m | 18 | 2h |
如果想要任一时刻的定时消息,可以考虑借助 MySQL + Job 来实现。又或者考虑使用 DDMQ(滴滴打车基于 RocketMQ 和 Kafka 改造的开源消息队列) 。
下面,我们开始本小节的示例。
一、快速开始
1.1 Demo03Message
创建 [Demo03Message]消息类,提供给当前示例使用。代码如下:
package com.ebadagang.springboot.rocketmq.message;
/**
* 示例 03 的 Message 消息
*/
public class Demo03Message {
public static final String TOPIC = "DEMO_03";
/**
* 编号
*/
private Integer id;
public Demo03Message setId(Integer id) {
this.id = id;
return this;
}
public Integer getId() {
return id;
}
@Override
public String toString() {
return "Demo03Message{" +
"id=" + id +
'}';
}
}
1.2 Demo03Producer
它会使用 RocketMQTemplate 实现发送定时消息。代码如下:
package com.ebadagang.springboot.rocketmq.producer;
import com.ebadagang.springboot.rocketmq.message.Demo03Message;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class Demo03Producer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public SendResult syncSendDelay(Integer id, int delayLevel) {
// 创建 Demo03Message 消息
Message message = MessageBuilder.withPayload(new Demo03Message().setId(id))
.build();
// 同步发送消息
return rocketMQTemplate.syncSend(Demo03Message.TOPIC, message, 30 * 1000,
delayLevel);
}
public void asyncSendDelay(Integer id, int delayLevel, SendCallback callback) {
// 创建 Demo03Message 消息
Message message = MessageBuilder.withPayload(new Demo03Message().setId(id))
.build();
// 异步发送消息
rocketMQTemplate.asyncSend(Demo03Message.TOPIC, message, callback, 30 * 1000,
delayLevel);
}
}
1.3 Demo03Consumer
package com.ebadagang.springboot.rocketmq.consumer;
import com.ebadagang.springboot.rocketmq.message.Demo03Message;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(
topic = Demo03Message.TOPIC,
consumerGroup = "demo03-consumer-group-" + Demo03Message.TOPIC
)
public class Demo03Consumer implements RocketMQListener<Demo03Message> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void onMessage(Demo03Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}
}
1.4 简单测试
编写一个单元测试方法,调用 Demo03Producer 发送定时消息。代码如下:
package com.ebadagang.springboot.rocketmq.producer;
import com.ebadagang.springboot.rocketmq.Application;
import org.apache.rocketmq.client.producer.SendResult;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.CountDownLatch;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class Demo03ProducerTest {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private Demo03Producer producer;
@Test
public void testSyncSendDelay() throws InterruptedException {
int id = (int) (System.currentTimeMillis() / 1000);
SendResult result = producer.syncSendDelay(id, 3); // 延迟级别 3 ,即 10 秒后消费
logger.info("[testSyncSendDelay][发送编号:[{}] 发送结果:[{}]]", id, result);
// 阻塞等待,保证消费
new CountDownLatch(1).await();
}
}
1.5 执行 #testSyncSendDelay() 方法
测试发送定时消息。控制台输出如下:
2020-08-04 17:24:59.387 INFO 15940 --- [ main] c.e.s.r.producer.Demo03ProducerTest : [testSyncSendDelay][发送编号:[1596533099] 发送结果:[SendResult [sendStatus=SEND_OK, msgId=240884E30114A66731EF8A0EAAFD768A3E4418B4AAC2132FCAFE0000, offsetMsgId=6585E30D00002A9F000000000003698B, messageQueue=MessageQueue [topic=DEMO_03, brokerName=broker-a, queueId=1], queueOffset=0]]]
2020-08-04 17:25:09.442 INFO 15940 --- [MessageThread_1] c.e.s.rocketmq.consumer.Demo03Consumer : [onMessage][线程编号:172 消息内容:Demo03Message{id=1596533099}]
发送的消息,延迟 10 秒被 Demo03Consumer 消费。
底线
本文源代码使用 Apache License 2.0开源许可协议,这里是本文源码Gitee地址,可通过命令git clone+地址
下载代码到本地,也可直接点击链接通过浏览器方式查看源代码。
网友评论