20220317_rocketMQ入门学习笔记
1概述
1.1涉及内容
基于SpringBoot,主要是对mq进行基本操作的学习,并进行模块化划分,便于业务系统的拓展,涉及如下内容:
- 公共模块的抽取(mycommon-rocketmq),提供mq消息的生产、消费
- 业务模块的调用(myunreadserver)
1.2依赖的mq版本
<!--2.rocketmq-->
<!--v2.0.4对应 v4.5.2-->
<!--v2.2.1对应 v4.9-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
2代码示例
2.1公共模块
[图片上传失败...(image-106e3f-1647519560464)]

2.1.1ConsumerUtils
package com.kikop.utils;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @author kikop
* @version 1.0
* @project mycommon-rocketmq
* @file ConsumerUtils
* @desc
* @date 2022/3/16
* @time 9:30
* @by IDE IntelliJ IDEA
*/
@Component
public class ConsumerUtils {
// rocketMQTemplate 底层会自动去取
// 容器中的 producer、consumer
@Autowired
public RocketMQTemplate rocketMQTemplate;
/**
* 接收同步可靠消息
*
* @param rocketMQListener
* @param pollTimeoutMills 如果有数据的化,在范围内,会一直poll的,注意,好设计,空闲时,每隔多长时间从队列里面去消费数据
* @throws MQClientException
*/
public void getMessage(RocketMQListener<List<MessageExt>> rocketMQListener, long pollTimeoutMills) throws MQClientException {
// 启动线程进行消费,否则会阻塞其他 Bean的创建
new Thread(new Runnable() {
@Override
public void run() {
try {
DefaultLitePullConsumer defaultLitePullConsumer = rocketMQTemplate.getConsumer();
while (defaultLitePullConsumer.isRunning()) {
// poll:不阻塞
// poll by timeout:可中断,释放cpu时间片,到指定的超时时间会被唤醒
// 此方法从此LinkedBlockingQueue的头部检索并删除元素,如果在元素可用之前经过了指定的等待时间,则为null。
List<MessageExt> messageExts = defaultLitePullConsumer.poll(pollTimeoutMills);
// 1.业务回调内容
rocketMQListener.onMessage(messageExts);
// 2.消费完手动提交(默认就是)
// defaultLitePullConsumer.commitSync();
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
}).start();
}
}
2.1.2ProducerUtils
package com.kikop.utils;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author kikop
* @version 1.0
* @project mycommon-rocketmq
* @file ProducerUtils
* @desc
* @date 2022/3/16
* @time 9:30
* @by IDE IntelliJ IDEA
*/
@Component
public class ProducerUtils {
// rocketMQTemplate 底层会自动去取
// 容器中的 producer、consumer
@Autowired
public RocketMQTemplate rocketMQTemplate;
/**
* 发送同步可靠消息
*
* @param topic
* @param strMsg
* @return
*/
public SendResult syncSend(String topic, String strMsg) {
/**
* 发送可靠同步消息 ,可以拿到SendResult 返回数据
* 同步发送是指消息发送出去后,会在收到mq发出响应之后才会发送下一个数据包的通讯方式。
* 这种方式应用场景非常广泛,例如重要的右键通知、报名短信通知、营销短信等。
*
* 参数1: topic:tag
* 参数2: 消息体 可以为一个对象
* 参数3: 超时时间 毫秒
*/
SendResult result = rocketMQTemplate.syncSend(topic, strMsg, 10000);
return result;
}
}
2.1.3.spring.factories
# 扩展点:外部业务系统为 SpringBoot工程时,实现 Bean 自动注入
# 特别是命名空间不一致时
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.kikop.config.MyRocketMQConfig
2.2业务模块
[图片上传失败...(image-c90504-1647519560465)]

2.2.1MyConsumerConfig
package com.kikop.config;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.Assert;
/**
* @author kikop
* @version 1.0
* @project myunreadserver
* @file MyConsumerConfig
* @desc
* @date 2022/3/16
* @time 9:30
* @by IDE IntelliJ IDEA
*/
@Configuration
public class MyConsumerConfig {
/**
* RocketMQ配置文件
*/
@Autowired
public RocketMQProperties rocketMQProperties;
public DefaultLitePullConsumer defaultLitePullConsumer(RocketMQProperties rocketMQProperties) throws MQClientException {
RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
String nameServer = rocketMQProperties.getNameServer();
String groupName = consumerConfig.getGroup();
// 指定消费的 topic 名称
String topicName = consumerConfig.getTopic();
Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
Assert.hasText(groupName, "[rocketmq.consumer.group] must not be null");
Assert.hasText(topicName, "[rocketmq.consumer.topic] must not be null");
String accessChannel = rocketMQProperties.getAccessChannel();
MessageModel messageModel = MessageModel.valueOf(consumerConfig.getMessageModel());
// String转换成需要的枚举,设计思路
SelectorType selectorType = SelectorType.valueOf(consumerConfig.getSelectorType());
String selectorExpression = consumerConfig.getSelectorExpression();
String ak = consumerConfig.getAccessKey();
String sk = consumerConfig.getSecretKey();
int pullBatchSize = consumerConfig.getPullBatchSize();
DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel, groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize);
litePullConsumer.setEnableMsgTrace(consumerConfig.isEnableMsgTrace());
litePullConsumer.setCustomizedTraceTopic(consumerConfig.getCustomizedTraceTopic());
// 指定消费位置
// 选择从 queue 头部开始 pull 还是从尾部开始 pull
// litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
return litePullConsumer;
}
}
2.2.2MyProducerConfig
package com.kikop.config;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* @author kikop
* @version 1.0
* @project myunreadserver
* @file MyProducerConfig
* @desc
* @date 2022/3/16
* @time 9:30
* @by IDE IntelliJ IDEA
*/
@Configuration
public class MyProducerConfig {
// @Value("${myunread.topic}")
// public String topic;
//
// @Value("${myunread.producer.group}")
// public String strProducerGroup;
/**
* RocketMQ配置文件
*/
@Autowired
public RocketMQProperties rocketMQProperties;
@Bean
DefaultMQProducer defaultMQProducer() {
RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
String nameServer = rocketMQProperties.getNameServer();
String groupName = producerConfig.getGroup();
Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");
String accessChannel = rocketMQProperties.getAccessChannel();
String ak = rocketMQProperties.getProducer().getAccessKey();
String sk = rocketMQProperties.getProducer().getSecretKey();
boolean isEnableMsgTrace = rocketMQProperties.getProducer().isEnableMsgTrace();
String customizedTraceTopic = rocketMQProperties.getProducer().getCustomizedTraceTopic();
DefaultMQProducer producer = RocketMQUtil.createDefaultMQProducer(groupName, ak, sk, isEnableMsgTrace, customizedTraceTopic);
producer.setNamesrvAddr(nameServer);
if (!StringUtils.isEmpty(accessChannel)) {
producer.setAccessChannel(AccessChannel.valueOf(accessChannel));
}
producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());
producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());
producer.setMaxMessageSize(producerConfig.getMaxMessageSize());
producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());
producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());
// 默认的topic: TBW102,自动创建的topci会继承该衣钵, 否则 No route info of this topic: ur-unread-topic
// producer.setCreateTopicKey(rocketMQProperties.getConsumer().getTopic()
// + ":" + rocketMQProperties.getConsumer().getSelectorExpression());
// 官方生产端无法预先指定topic todo
return producer;
}
}
2.2.3MyRocketMQListener
package com.kikop.mycomponent.custom;
import com.kikop.utils.DateUtils;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.core.RocketMQListener;
import java.util.Date;
import java.util.List;
/**
* @author kikop
* @version 1.0
* @project myunreadserver
* @file MyRocketMQListener
* @desc 自定义业务处理消费端监听接口
* @date 2022/3/16
* @time 9:30
* @by IDE IntelliJ IDEA
*/
public class MyRocketMQListener implements RocketMQListener<List<MessageExt>> {
/**
* 接收mq消息
*
* @param messageExts
*/
@Override
public void onMessage(List<MessageExt> messageExts) { // 默认周期:5秒一次
if (messageExts.size() >= 1) {
System.out.println(String.format("%s 收到数据,数据量: %d", DateUtils.getTime(), messageExts.size()));
} else {
System.out.println(String.format("%s 没有可消费的数据", DateUtils.getTime()));
}
}
}
2.2.4ConsumerUnReadServiceByCommon
package com.kikop.mycomponent.custom;
import com.kikop.utils.ConsumerUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @author kikop
* @version 1.0
* @project myunreadserver
* @file ConsumerUnReadServiceByCommon
* @desc
* @date 2022/3/16
* @time 9:30
* @by IDE IntelliJ IDEA
*/
@Component
public class ConsumerUnReadServiceByCommon {
/**
* 如果有数据的化,在范围内,会一直poll的
* 注意,好设计,空闲时,每隔多长时间从队列里面去消费数据
*/
@Value("${myunread.consumer.polltimeoutmills}")
private long pollTimeOutMills;
@Autowired
public ConsumerUtils consumerUtils;
@PostConstruct
public void regisConsumerData() throws MQClientException {
consumerUtils.getMessage(new MyRocketMQListener(),pollTimeOutMills);
}
}
2.2.5UnReadService
package com.kikop.service;
/**
* @author kikop
* @version 1.0
* @project myunreadserver
* @file UnReadService
* @desc
* @date 2022/3/16
* @time 9:30
* @by IDE IntelliJ IDEA
*/
public interface UnReadService {
void syncSendMsg(String strMsg);
}
2.2.6UnReadServiceImpl
package com.kikop.service.impl;
import com.kikop.service.UnReadService;
import com.kikop.utils.DateUtils;
import com.kikop.utils.ProducerUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* @author kikop
* @version 1.0
* @project myunreadserver
* @file UnReadServiceImpl
* @desc
* @date 2022/3/16
* @time 9:30
* @by IDE IntelliJ IDEA
*/
@Component
public class UnReadServiceImpl implements UnReadService {
// @Value("${myunread.topic}")
// public String topic;
//
// @Value("${myunread.selectorExpression}")
// public String tag;
@Autowired
public RocketMQProperties rocketMQProperties;
@Autowired
public ProducerUtils producerUtil;
/**
* 指定需要些的topic(或+tag)
*
* @param strMsg
*/
@Override
public void syncSendMsg(String strMsg) {
System.out.println(String.format("%s start syncSendMsg", DateUtils.getTime()));
SendResult sendResult = producerUtil.syncSend(rocketMQProperties.getConsumer().getTopic()
+ ":" + rocketMQProperties.getConsumer().getSelectorExpression(), strMsg);
System.out.println(String.format("%s getMsgId:%s", DateUtils.getTime(), sendResult.getMsgId()));
System.out.println(String.format("%s end syncSendMsg", DateUtils.getTime()));
}
}
2.2.7配置
2.2.7.1application.yml
spring:
# 1.指定环境:开发v
profiles:
active: dev
2.2.7.2application-dev.properties
server.port=9999
server.servlet.context-path=/myunreadserver
rocketmq.name-server=127.0.0.1:9876
# 默认生产端 config by RocketTemplate
rocketmq.producer.group=ur-producer-group4
rocketmq.producer.send-message-timeout=3000
# 默认消费端 config by RocketTemplate
rocketmq.consumer.group=ur-consumer-group4
rocketmq.consumer.topic=ur-unread-topic4
rocketmq.consumer.selector-type=TAG
rocketmq.consumer.selector-expression=order4
## 自定义主题
#myunread.topic=ur-unread-topic
## 自定义参数生产
#myunread.producer.group=ur-producer-group
## 自定义消费端
#myunread.consumer.group=ur-consumer-group
#myunread.consumer.group2=ur-consumer-group2
#myunread.selectorType=SelectorType.TAG
#myunread.selectorExpression=ur
# 如果有数据的化,在范围内,会一直poll的
# 注意,好设计,空闲时,每隔多长时间从队列里面去消费数据
myunread.consumer.polltimeoutmills=5000
2.3测试
package com.kikop;
import com.kikop.service.UnReadService;
import com.kikop.utils.spring.SpringUtils;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Controller;
import java.util.concurrent.TimeUnit;
/**
* @author kikop
* @version 1.0
* @project myunreadserver
* @file MyUnreadServerApplication
* @desc
* @date 2022/3/17
* @time 16:30
* @by IDE IntelliJ IDEA
*/
@SpringBootApplication
@Controller
// 注解扫描多个包下示例,内嵌包中有@Component注解,需开启如下内容
//@ComponentScan({"com.kikopxxx", "com.kikop"})
public class MyUnreadServerApplication implements CommandLineRunner {
public static void main(String[] args) {
ConfigurableApplicationContext configurableApplicationContext =
SpringApplication.run(MyUnreadServerApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
// 1.发送带tag的同步消息
UnReadService unReadService = SpringUtils.getBean(UnReadService.class);
for (int i = 0; i < 3; i++) {
unReadService.syncSendMsg(String.format("你好,我的名字叫大海:%d!", i));
TimeUnit.SECONDS.sleep(3);
}
}
}
2022-03-17 20:16:13 start syncSendMsg
2022-03-17 20:16:13 getMsgId:7F00000106B414DAD5DC56BF3D620000
2022-03-17 20:16:13 end syncSendMsg
2022-03-17 20:16:13 收到数据,数据量: 1
3其他
3.1Producer配置createTopicKey功能
[图片上传失败...(image-92ee4c-1647519560465)]

[图片上传失败...(image-185324-1647519560465)]

作为发送消息所在的topic的路由。
参数名:createTopicKey
默认值:TBW102
作用:在发送消息时,自动创建Broker服务器不存在的topic,需要指定Key,该Key可用于配置发送消息所在topic的默认路由。
3.2DefaultMQProducerImpl代码分析
E:\workdirectory\mavenRep\LocalMavenRep\org\apache\rocketmq\rocketmq-client\4.9.1\rocketmq-client-4.9.1-sources.jar!\org\apache\rocketmq\client\impl\producer\DefaultMQProducerImpl.java
private SendResult sendDefaultImpl(
// 获取topic的默认路由,key:默认TBW102
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
public boolean ok() {
return null != this.messageQueueList && !this.messageQueueList.isEmpty();
}
网友评论