<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: test-group
消息订阅
@Slf4j
@Component
@RocketMQMessageListener(topic = "delay-topic",consumerGroup = "delay-group")
public class DelayConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("received is {}", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now()));
log.info("received Result {}",s);
}
}
消息生产
@Component
@Slf4j
public class DelayProduce {
@Resource
private RocketMQTemplate rocketMQTemplate;
public void send(String topic,String message,int delayLebel){
SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(message).build(), 2000, delayLebel);
log.info("send is {}", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now()));
log.info("sendResult {}",sendResult);
}
}
测试
@SpringBootTest
class DelayProduceTest {
@Resource
private DelayProduce delayProduce;
@Test
void send() {
for (int i = 0; i < 10; i++) {
delayProduce.send("delay-topic","秒杀订单延迟消息==="+i,1);
}
}
}
@SpringBootTest
class DelayConsumerTest {
@Test
void onMessage() {
while (true){
}
}
}
网友评论