RocketMQ 作为一种纯java,分布式,队列模型的开源消息中间件,支持事务消息,顺序消息,批量消息,定时消息,消息回溯等。
ROcketMQ优点:
1,RocketMQ 去除zk 的依赖;
2,RocketMQ 支持同步和异步2种方式刷盘
3,RocketMQ 单机支持的队列或者toipic数量是5w
4,Rocket 支持消息重试;
5,RocketMQ支持严格按照一定顺序发送消息
6,RcoketMQ支持定时发送消息
,7,RocketMQ支持根据消息ID来进行查询;
8,RocketMQ 支持根据某个时间点进行消息的回溯
9,RocketMQ支持消息服务端的过滤
10,RocketMQ 消费并行度
RocketMQ架构原理
[图片上传失败...(image-81e25d-1621778488985)]
RocketMQ专业名词:
Producer 生产者角色--投递消息给mq
Producer Group生产组---
Consumer 消费者 采用拉取/mq推送方式 获取消息offset
Consumer Group 消费者组---在同一个组中,是不允许多个不同的消费者消费同一个消息
多个消费者消费同一条消息呢? 两个分组 多个不同的分组中可以允许有不同分组中消费者消费同一条消息的。----
以组的名义关联该组消费的offset位置---
Topic
业务队列存放消息
异步发送短信
异步发送邮件
邮件topic
短信topic
业务区分不同的队列消息
Queue
会将一个topic主题中的消息存放在多个不同的queue 与kafka 分区模型是一样的
MEssage:
生产者投递消息会自动对给消息生成一个全局消息id,后期的可以根据该消息全局id实现业务的防止重复执行----幂等性概念。
tag
区分 过滤
Broker
MQ服务器端
Name server
与zk相同思想,作为rocketmq注册中心 存放生产者消费者topic主题信息;
Producer:
消息生产者,位于用户的进程内,Producer 通过NAMEsERVER获取所有的Broker的路由信息,根据负载均衡选择将消息发到哪个Broker,然后调用Broker接口提交消息;
Producer Group
生产者组,简单的说就是多个发送同一类消息的生产者称之为一个生产者组
ConSumer
消息消费者,位于用户进程内。Consumer通过NameServer 获取所有broker的路由信息后,向Broker发送pull请求来获取消息数据。Consumer可以以2种模式启动,广播(Broadcast)和集群(Cluser),
RocketMQ环境搭建
注意:一定要配置rocketmq 环境变量 不然启动 mqnamesrv.cmd
报错: Please set the ROCKETMQ_HOME variable in your environment!
启动mqnamesrv
-
下载rocketmq安装包
-
解压rocketmq安装包
-
配置rocketmq环境变量
系统环境变量配置
变量名:ROCKETMQ_HOME
变量值:MQ解压路径\MQ文件夹名
eg、ROCKETMQ_HOME=D:\rocketmq-all-4.3.0-bin-release
4.启动 mqnamesrv.cmd
[图片上传失败...(image-d05b9c-1622358860602)]
启动mqBroker
Cmd命令框执行进入至‘MQ文件夹\bin’下,然后执行‘start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true’,启动BROKER。成功后会弹出提示框,此框勿关闭。
启动Rocketmq-console
1.下载rocketmq-externals-master
,进入‘rocketmq-externals\rocketmq-console\src\main\resources’文件夹,打开‘application.properties’进行配置。
-
新增:rocketmq.config.namesrvAddr=127.0.0.1:9876
-
执行
用CMD进入‘\rocketmq-externals\rocketmq-console’文件夹,执行‘mvn clean package -Dmaven.test.skip=true’,编译生成。
编译成功之后,Cmd进入‘target’文件夹,执行‘java -jar rocketmq-console-ng-1.0.0.jar’,启动‘rocketmq-console-ng-1.0.0.jar’。
4.浏览器中输入‘127.0.0.1:配置端口’,成功后即可查看。
eg:http://127.0.0.1:8088
[图片上传失败...(image-3f7598-1622358860602)]
Springboot整合方式
注意springboot整合rocketmq server端 版本一定要与rocketmq 不然可能启动报错
Maven依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<!-- SpringBoot整合Web组件 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.1</version>
</dependency>
</dependencies>
生产者
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 普通消息投递 单向发送
*/
@GetMapping("/sendMsg")
public String sendMsg() {
MsgEntity msg = new MsgEntity("mayikt" + UUID.randomUUID().toString(), 1234);
rocketMQTemplate.convertAndSend(RocketMQConfig.TOPIC_NAME, msg);
return "投递消息 => " + msg.toString() + " => 成功";
}
消费者
/**
* @ClassName RocketMQConsumer
* @Author 蚂蚁课堂余胜军 QQ644064779 www.mayikt.com
* @Version V1.0
**/
@Service
@Slf4j
@RocketMQMessageListener(consumerGroup = "mayikt-group5", topic = "topic_meite")
public class RocketMQConsumer implements RocketMQListener<MsgEntity> {
@Override
public void onMessage(MsgEntity msgEntity) {
log.info("消费者监听到消息:<msg:{}>", msgEntity);
}
}
配置文件
spring:
application:
name: mayikt-rocketmq
server:
port: 8000
rocketmq:
# rocketmq地址
name-server: 127.0.0.1:9876
producer:
# 必须填写 group
group: mayikt-group
Rocketmq配置文件详解
所属集群名字
brokerClusterName=rocketmq-cluster
此处需手动更改
broker名字,注意此处不同的配置文件填写的不一样
附加:按配置文件文件名来匹配
brokerName=broker-a
0 表示Master, > 0 表示slave
brokerId=0
此处许手动更改
(此处nameserver跟host配置相匹配,9876为默认rk服务默认端口)nameServer 地址,分号分割
附加:broker启动时会跟nameserver建一个长连接,broker通过长连接才会向nameserver发新建的topic主题,然后java的客户端才能跟nameserver端发起长连接,向nameserver索取topic,找到topic主题之后,判断其所属的broker,建立长连接进行通讯,这是一个至关重要的路由的概念,重点,也是区别于其它版本的一个重要特性
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
在发送消息时,自动创建服务器不存在的Topic,默认创建的队列数
defaultTopicQueueNums=4
是否允许Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
Broker 对外服务的监听端口
listenPort=10911
删除文件时间点,默认是凌晨4点
deleteWhen=04
文件保留时间,默认48小时
fileReservedTime=120
commitLog每个文件的大小默认1G
附加:消息实际存储位置,和ConsumeQueue是mq的核心存储概念,之前搭建2m环境的时候创建在store下面,用于数据存储,consumequeue是一个逻辑的概念,消息过来之后,consumequeue并不是把消息所有保存起来,而是记录一个数据的位置,记录好之后再把消息存到commitlog文件里
mapedFileSizeCommitLog=1073741824
ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000
检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
存储路径
storePathRootDir=/usr/local/rocketmq/store
commitLog存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
消费队列存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
限制的消息大小
maxMessageSize=65536
flushCommitLogLeastPages=4
flushConsumeQueueLeastPages=2
flushCommitLogThoroughInterval=10000
flushConsumeQueueThoroughInterval=60000
Broker 的角色
• ASYNC_MASTER 异步复制Master
• SYNC_MASTER 同步双写Master
• SLAVE
brokerRote=ASYNC_MASTER
刷盘方式
• ASYNC_FLUSH 异步刷盘
• SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
checkTransactionMessageEnable=false
发消息线程池数量
sendMessageTreadPoolNums=128
拉消息线程池数量
pullMessageTreadPoolNums=128
Rocketmq 队列分区模型:
Rocketmq 底层存储结构中 将一个主题分成n多个不同的队列实现存放消息
创建了一个主题: 需要指定队列个数 默认是4 和16
写队列数量: 对我们生产者写投递shul 16
读队列数量: 对我们消费者获取消息队列数量 16
在rocketmq中,如果一个topic只有一个队列的情况下支持的并行能力比较弱,所以会将一个topic分成分成n多个不同的队列queue来实现存放, 类似于Kafka的分区模型概念。
RocketMQ 解决方案:
如果消费者获取消息,处理器都是同一个线程的情况下有可能会影响我们的效率
消费者获取消息10毫秒
Rocketmq 中消费者消费的消息,采用多线程的形式,需要注意消息顺序一致性的问题:
与kafka思想一样,消费者不管消费成功还是失败,最终消息不会立即删除,后期通过日志删除策略,定时删除消息;
生产者发送消息有三种形式:同步,异步,和单向
三种形式:
1,单向: 生产者投递消息到mq中,不需要返回结果
优点: 延迟概率比较低
缺点: 丢失消息数据
2,异步:生产者投递消息到mq中,使用回调形式返回;
- 同步
生产者投递消息到mq中,采用同步的形式获取到返回消息是否有
投递成功的结果,导致接口延迟概率比较大。
投递消息过程比较耗时时间10毫秒
发送请求 基于请求与响应
1.同步发送:发送请求模式属于同步的,发送该条消息不需等待该条消息发送成功之后,才可以继续发送下一条。
2.异步发送:采用异步的发送模式,不需要同步阻塞等待,通过回调的形式监听生产者消息投递结果
3单向发送:只负责发送消息给mq,不管是否有发送成功。
同步发送
/**
* 同步发送
*
* @throws Exception
*/
@GetMapping("/sync")
public void sync() {
MsgEntity msg = new MsgEntity("mayikt" + UUID.randomUUID().toString(), 1234);
SendResult sendResult = rocketMQTemplate.syncSend(RocketMQConfig.TOPIC_NAME, msg);
log.info("同步发送字符串{}, 发送结果{}", msg.toString(), sendResult);
}
异步发送
/**
* 异步发送
*
* @throws Exception
*/
@GetMapping("async")
public void async() {
MsgEntity msg = new MsgEntity("mayikt" + UUID.randomUUID().toString(), 1234);
log.info(">msg:<<" + msg);
rocketMQTemplate.asyncSend(RocketMQConfig.TOPIC_NAME, msg.toString(), new SendCallback() {
@Override
public void onSuccess(SendResult var1) {
log.info("异步发送成功{}", var1);
}
@Override
public void onException(Throwable var1) {
log.info("异步发送失败{}", var1);
}
});
}
单向发送
/**
* 普通消息投递 单向发送
*/
@GetMapping("/sendMsg")
public String sendMsg() {
MsgEntity msg = new MsgEntity("mayikt" + UUID.randomUUID().toString(), 1234);
rocketMQTemplate.convertAndSend(RocketMQConfig.TOPIC_NAME, msg);
return "投递消息 => " + msg.toString() + " => 成功";
}
顺序消息
Rocketmq中,消费者消处理业务逻辑的时候是采用多线程。
如何解决消息顺序一=一致性的问题
1,生产者投递消息根据key 投递到同一个队列中存放
2,消费者应该订阅到同一个队列中实现消费
3,最终在同一个线程去消费消息(不能够实现多想想消费)
生产者
String uuid = UUID.randomUUID().toString();
SendResult result1 = rocketMQTemplate.syncSendOrderly(RocketMQConfig.TOPIC_SEQUENTIAL, "insert", uuid);
log.info("insert:" + result1.toString());
SendResult result2 = rocketMQTemplate.syncSendOrderly(RocketMQConfig.TOPIC_SEQUENTIAL, "update", uuid);
log.info("update:" + result2.toString());
SendResult result3 = rocketMQTemplate.syncSendOrderly(RocketMQConfig.TOPIC_SEQUENTIAL, "delete", uuid);
log.info("delete:" + result3.toString());
消费者
@Service
@Slf4j
@RocketMQMessageListener(consumerGroup = "mayikt-group20", topic = "topic_seq", consumeMode = ConsumeMode.ORDERLY
)
public class RocketMQConsumer01 implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
try {
Random r = new Random(100);
int i = r.nextInt(500);
Thread.sleep(i);
} catch (Exception e) {
}
log.info("消费者监听到消息:<msg:{}>", msg);
}
}
网友评论