美文网首页MQ
RocketMQ 与 Spring Boot整合(三、定时消息)

RocketMQ 与 Spring Boot整合(三、定时消息)

作者: 梅西爱骑车 | 来源:发表于2020-08-16 08:16 被阅读0次

在 RocketMQ 中,提供定时消息的功能。

定时消息,是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。

不过,RocketMQ 暂时不支持任意的时间精度的延迟,而是固化了 18 个延迟级别。如下表格:

延迟级别 时间 延迟级别 时间 延迟级别 时间
1 1s 7 3m 13 9m
2 5s 8 4m 14 10m
3 10s 9 5m 15 20m
4 30s 10 6m 16 30m
5 1m 11 7m 17 1h
6 2m 12 8m 18 2h

如果想要任一时刻的定时消息,可以考虑借助 MySQL + Job 来实现。又或者考虑使用 DDMQ(滴滴打车基于 RocketMQ 和 Kafka 改造的开源消息队列) 。

下面,我们开始本小节的示例。

一、快速开始

1.1 Demo03Message

创建 [Demo03Message]消息类,提供给当前示例使用。代码如下:

package com.ebadagang.springboot.rocketmq.message;

/**
 * 示例 03 的 Message 消息
 */
public class Demo03Message {

    public static final String TOPIC = "DEMO_03";

    /**
     * 编号
     */
    private Integer id;

    public Demo03Message setId(Integer id) {
        this.id = id;
        return this;
    }

    public Integer getId() {
        return id;
    }

    @Override
    public String toString() {
        return "Demo03Message{" +
                "id=" + id +
                '}';
    }

}

1.2 Demo03Producer

它会使用 RocketMQTemplate 实现发送定时消息。代码如下:

package com.ebadagang.springboot.rocketmq.producer;

import com.ebadagang.springboot.rocketmq.message.Demo03Message;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
public class Demo03Producer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public SendResult syncSendDelay(Integer id, int delayLevel) {
        // 创建 Demo03Message 消息
        Message message = MessageBuilder.withPayload(new Demo03Message().setId(id))
                .build();
        // 同步发送消息
        return rocketMQTemplate.syncSend(Demo03Message.TOPIC, message, 30 * 1000,
                delayLevel);
    }

    public void asyncSendDelay(Integer id, int delayLevel, SendCallback callback) {
        // 创建 Demo03Message 消息
        Message message = MessageBuilder.withPayload(new Demo03Message().setId(id))
                .build();
        // 异步发送消息
        rocketMQTemplate.asyncSend(Demo03Message.TOPIC, message, callback, 30 * 1000,
                delayLevel);
    }

}

1.3 Demo03Consumer

package com.ebadagang.springboot.rocketmq.consumer;

import com.ebadagang.springboot.rocketmq.message.Demo03Message;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(
        topic = Demo03Message.TOPIC,
        consumerGroup = "demo03-consumer-group-" + Demo03Message.TOPIC
)
public class Demo03Consumer implements RocketMQListener<Demo03Message> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void onMessage(Demo03Message message) {
        logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
    }

}

1.4 简单测试

编写一个单元测试方法,调用 Demo03Producer 发送定时消息。代码如下:

package com.ebadagang.springboot.rocketmq.producer;

import com.ebadagang.springboot.rocketmq.Application;
import org.apache.rocketmq.client.producer.SendResult;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.concurrent.CountDownLatch;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class Demo03ProducerTest {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private Demo03Producer producer;

    @Test
    public void testSyncSendDelay() throws InterruptedException {
        int id = (int) (System.currentTimeMillis() / 1000);
        SendResult result = producer.syncSendDelay(id, 3); // 延迟级别 3 ,即 10 秒后消费
        logger.info("[testSyncSendDelay][发送编号:[{}] 发送结果:[{}]]", id, result);

        // 阻塞等待,保证消费
        new CountDownLatch(1).await();
    }

}

1.5 执行 #testSyncSendDelay() 方法

测试发送定时消息。控制台输出如下:

2020-08-04 17:24:59.387  INFO 15940 --- [           main] c.e.s.r.producer.Demo03ProducerTest      : [testSyncSendDelay][发送编号:[1596533099] 发送结果:[SendResult [sendStatus=SEND_OK, msgId=240884E30114A66731EF8A0EAAFD768A3E4418B4AAC2132FCAFE0000, offsetMsgId=6585E30D00002A9F000000000003698B, messageQueue=MessageQueue [topic=DEMO_03, brokerName=broker-a, queueId=1], queueOffset=0]]]
2020-08-04 17:25:09.442  INFO 15940 --- [MessageThread_1] c.e.s.rocketmq.consumer.Demo03Consumer   : [onMessage][线程编号:172 消息内容:Demo03Message{id=1596533099}]

发送的消息,延迟 10 秒被 Demo03Consumer 消费。

底线


本文源代码使用 Apache License 2.0开源许可协议,这里是本文源码Gitee地址,可通过命令git clone+地址下载代码到本地,也可直接点击链接通过浏览器方式查看源代码。

相关文章

网友评论

    本文标题:RocketMQ 与 Spring Boot整合(三、定时消息)

    本文链接:https://www.haomeiwen.com/subject/oeiwrktx.html