一个完整的RocketMQ集群中,有如下几个角色:
Broker:暂存和传输消息;举例:邮局
NameServer:管理Broker;举例:各个邮局的管理机构
一个RocketMQ的Jar包里面
有一个 Broker启动器(单独启动) 有一个NameServer(单独启动)
一个Broker 可以注册到 多个NameServer中(NameServer集群)
多个Broker(Broker集群) 可以注册到 一个NameServer中
Broker由NameServer管理
Topic:区分消息的种类
给Topic发消息 的消息 可以等同于 主题的信件
从Topic收消息 的消息 可以等同于 主题的信件
Topic的消息:主题的消息 (主题的信件) 存放在Borker(邮局)中
Broker里面有多个Topic (邮局里面有 多种主题的信件)
默认一个topic有4个Message队列(可配置)
一个queueId就代表了一个MessageQueue。
Message队列 = queueId = MessageQueue = 该主题的信件的处理通道
用于并行发送和接收消息
Producer:消息的发送者;举例:发信者
Consumer:消息接收者;举例:收信者
一个发送者可以发送消息给一个或者多个Topic消息;
一个消息的接收者可以订阅一个或者多个Topic消息
某个Producer存在于某个Producer_Group中 认为这个Producer_Group是一伙的
ProducerA ProducerB 存在于My_Producer_Group中
那么ProducerA ProducerB 就是一伙的
Consumer 与 Consumer_Group 以此类推
1:生产者 消费者 生产者组 消费者组 主题
生产者组 的 生产者 给 节点 发送某个主题 的消息
消费者组 的 消费者 从 节点 消费某个主题 的消息
首先 对于消息的订阅发布模型
发布端 也就是生产端 基本是一个服务 也就是有一个生产者组
订阅端 也就是消费端 可能有多个服务 一个服务一个消费者组 也就是有多个消费者组
通常 一个服务里面的 生产者 是一伙的 My_Producer_Group
通常 一个服务里面的 消费者 是一伙的 My_Consumer_Group
当一条 主题为My_Topic 的消息发送到 节点
消费组与消费组之间:
消费者组 A 有消费者订阅了 主题为 “My_Topic” (消费)
消费者组 B 有消费者订阅了 主题为“My_Topic”(消费)
对于消费组 每一个消费者组(消费组A 和消费组B)
也就是每个服务会去理 这个My_Topic的这条消息
在消费组A(消费组B也是一样)逻辑里面:
对于消费组A内的消费者:里面有消费者1 消费者2
集群消费模式:消费者1消费 或者 消费者2消费
广播消费模式: 消费者1消费 和消费者2消费
//集群,消费组中的每个消费者都可以消费消息中的一部分消息
// 广播,消费组中的每个消费者都可以消费所有消息
注意:
在消息的消费者设置 消费的模式
defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
//推荐使用集群消费的方式(默认也是集群消费)
2:单节点
只有一个master节点
消息的发送与接收 都是对master节点
当master节点宕机 RocketMQ的读写功能不可用
3:主从节点
有一个master节点 有一个slave节点
消息的发送与接收 都是对master节点
slave只是不断的同步master的数据 作为master节点宕机后使用
当master节点宕机
RocketMQ的写功能不可用
RocketMQ的读功能切换到slave节点
后续:可以使用脚本直接切换slave节点为master节点
IP1 master节点 IP2是其slave节点
生成者:
//如果是主从模式 同时namesrvAddr形成了集群 以 ; 分开
//"IP1:9876;IP2:9876;"
defaultMQProducer.setNamesrvAddr(nameSrvAddr);
消费者:
//如果是主从模式 同时namesrvAddr形成了集群 以 ; 分开
//"IP1:9876;IP2:9876;"
defaultMQPushConsumer.setNamesrvAddr(nameSrvAddr);
4:集群(两对 主从节点)
1:按照配置把集群配置好
2:Java代码中修改配置:
生产者:
//如果是namesrvAddr集群模式 以 ; 分开
//"IP1:9876;IP2:9876;IP3:9876;IP4:9876;"
defaultMQProducer.setNamesrvAddr(nameSrvAddr);
消费者:
//如果是namesrvAddr集群模式 以 ; 分开
//"IP1:9876;IP2:9876;IP3:9876;IP4:9876;"
defaultMQPushConsumer.setNamesrvAddr(nameSrvAddr);
3: Java代码会找到 namesrvAddr或者namesrvAddr集群
实际上:
IP1 IP2 IP3 IP4 都是namesrvAddr
他们形成 namesrvAddr集群 去一起管理注册到集群里面的所有Broker
它能够自动区分出 哪个几个是master节点 哪几个是对应的slave节点
4:在进行 发送消息 或者 消费消息 的时候 会按照某个策略 进行集群节点的选择
达到一个负载均衡的效果(一个master的压力 分摊到两个master节点)
5:Topic 消息的产生消费 核心理解
Topic是理解整个RocketMQ集群最关键的一点。
Broker 与 NameServer 都是在服务器上的
一个服务器 一个Broker 一个NameServer 也就是他们的关系是一对一的
Topic 也就是主题 是存放在Broker上的
一个Topic 默认有4条队列(读4条 写4条) 也可以设置更多
一个Borker可以有多个Topic
同一个Topic名称也可以存放在多个Borker上
设置 主题“Topic”名称为 test 在boker-a 主节点创建
默认为创建4条队列(4条读写队列 读写队列通常是一致)
设置 主题“Topic”名称为 test1 在boker-a 主节点 与 boker-b主节创建
各自创建4条读写队列
实际上会创建4乘以2节点 也就是8条(8条读写队列)
该 test1 名称的Topic总数就是8条了
RocketMQ集群的发消息(负载均衡) 单条消息
如何在发送消息的时候实现负载均衡
生产者给 指定主题 存在的集群中某个broker主节点(负载均衡) 的 该指定主题的某个队列(负载均衡)发消息
关键点:
1:主题是必须指定的 先判断的是有没有这个主题
2:拥有该主题的集群某个broker主节点发送 已经实现了第一次 负载均衡
3:该指定主题 的某个队列 已经实现了第二次 负载均衡
4:最小维度是队列 那么实际上是给队列发的消息
RocketMQ集群的收消息(消费速度) 单条消息
RokcetMQ是如何收消息的
多个消费组:
每个消费组都需要处理该消息 集群消费模式或者广播消费模式
每个消费组的其中 一个消费组里面
集群消费模式:
消费组其中多个Consumer订阅的该主题的消息 如果ConsumerA消费 其他Consumer(B、C、D、E)不能重复消费
广播消费模式:
消费组其中多个Consumer订阅的该主题的消息 如果ConsumerA消费 其他Consumer(B、C、D、E)需要重复消费
注意:Consumer的处理也是以主题的队列为单位的 (核心要点 消费速度)
一个Consumer可以设置同时处理多个队列(Consumer轮训处理队列A 队列B)
一个队列里面的消息最多同时给一个Consumer进行真正意义上消费(消费速度)
集群消费模式:被一个Consumer消费完 该消息则消费完了
广播消费模式:被一个Consumer消费完 该消息还没被消费完
需要被其他Consumer再消费一遍
消费的速度还是可以看做是 被一个Consumer消费的速度
因此为保证消费速度:
队列的数量建议和Consumer的数量一致
网友评论