最近一段时间由于公司做一些LDC(阿里称为逻辑数据中心,)架构升级,所以有幸参与中间件RocketMQ改造,所以整这个机会会把RocketMQ整个流程梳理一份。
image先简单概述一下RocketMQ中成员
-
Producer :消息生产者,可集群部署。它会先和 NameServer 集群中的随机一台建立⻓连接,得 知当前要发送的 Topic 存在哪台 Broker Master上,然后再与其建立⻓连接,支持多种负载平衡模式 发送消息。
-
Consumer :消息消费者,可集群部署。它会先和 NameServer 集群中的随机一台建立⻓连接,得 知当前要消费的 Topic 存在哪台 Broker Master、Slave上,然后它们建立⻓连接,支持集群消费和广 播消费消息。
-
Broker :主要负责消息的存储、查询消费,支持主从部署,一个 Master 可以对应多个 Slave,Master 支持读写,Slave 只支持读。Broker 会向集群中的每一台 NameServer 注册自己的路由信息。
-
NameServer : Topic 路由注册中心,支持 Broker 的动态注册和发现,保存 Topic 和 Borker之间的关系。通常是集群部署,但是各 NameServer 之间不会互相通信, 各 NameServer 都有完整的 路由信息。
以下则是本系列开篇,首先会梳理一下各个成员之间的路由信息通信
一、Broker上报元数据流程
启动过程
1. BrokerController创建
主要是加载类成员,包括配置类、管理类、服务类、任务类、策略类等。后续在分析这些。
2. 启动过程
主要包括消息存储服务、远程通信服务、心跳服务等启动过程,我们重点看一下定时线程池,启动后10s执行。时间间隔则是在10-60s期间,可以在conf文件中配置。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
BrokerController.this.registerBrokerAll(true, false,brokerConfig.isForceRegister());
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
3. BrokerController#registerBrokerAll
-
判断Broker是否有读写权限进行
-
broker侧 needRegister 发起一次远程请求确认数据版本是否有变化
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.getHAServerAddr(),
topicConfigWrapper,
this.filterServerManager.buildNewFilterServerList(),
oneway,
this.brokerConfig.getRegisterBrokerTimeoutMills(),
this.brokerConfig.isCompressedRegister());
4. 真正进行Broker元信息上报
BrokerController#doRegisterBrokerAll
-
首先获取nameserver地址信息,开始是通过配置读取,如果其中一个挂掉则会更新nameserver列表信息。
-
接着Broker会将请求参数拼接,包括BrokerID、Name、集群名称、主节点地址等元数据,并转换为二进制进行远程通信将BrokerData上报,这里采用CountDownLatch+线程池并发执行上报,采用remotingClient进行Netty通信,最终将结果添加到List中
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
if (result != null) {
registerBrokerResultList.add(result);
}
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
}
});
}
try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
二、Nameserver接受Broker请求处理
DefaultRequestProcessor#processRequest
该方法采用策略分配请求处理,对于Broker上报请求Code则是Register_Broker,最终到DefaultRequestProcessor#registerBroker ,该方法主要更新Nameserver元数据基本都是Map对象结构。
- 加锁控制,避免更新时客户端读到的数据不一致
- 更新clusterAddrTable
- 更新brokerAddrTable
- 更新 brokerAddrTable 中的 brokerData
- 如果是新注册的 Master Broker,或者 Broker 中的路由信息变了,需要更新 topicQueueTable
- 更新 brokerLiveTable
- 更新 filterServerTable
- 如果是 Slave Broker,需要在返回的信息中带上 master 的相关信息
- 返回结果RegisterBrokerResult
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
三、客户端拉取Namesrv元数据
1. 生产端启动
MQClientInstance#start
- 启动 scheduledExecutorService进行和NameServer建立连接,每隔2*60s
- 定时线程池更新topic路由信updateTopicRouteInfoFromNameServer,每个topic对应的TopicRouteData。如果Topic路由信息变化则进行更新,发送端、消费端同时更新topicPublishInfoTable、topicSubscribeInfoTable最终缓存topicRouteTable
// 元数据
public class TopicRouteData extends RemotingSerializable {
private List<QueueData> queueDatas;
private List<BrokerData> brokerDatas;
}
2. 发送消息
- tryToFindTopicPublishInfo
当producer调用send()会判断是否有路由信息,没有则去更新路由信息,最终还是调用updateTopicRouteInfoFromNameServer.
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
-
selectOneMessageQueue
选择messageQueue传入topic发送端topicPublishInfo。queueDatas存放主题中所有队列信息,brokerDatas保存主题相关所有Broker信息。客户端选定队列后,在对应的QueueData找到BrokerName同时可以得到BrokerData,最终确定好MessageQueue(BrokerId和Address)。
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
四、NameServer处理客户端请求
DefaultRequestProcessor#processRequest
客户端请求获取路由信息,请求Code是Get_RouteInfo_By_Topic。接着处理客户端获取的请求,调用pickupTopicRouteData,核心流程主要根据topic获取QueueData获取BrokerNameSet,遍历Set后得到BrokerData最终返回最新的topicRouteData。
try {
this.lock.readLock().lockInterruptibly();
List<QueueData> queueDataList = this.topicQueueTable.get(topic);
if (queueDataList != null) {
topicRouteData.setQueueDatas(queueDataList);
foundQueueData = true;
Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
brokerNameSet.add(qd.getBrokerName());
}
for (String brokerName : brokerNameSet) {
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null != brokerData) {
BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData
.getBrokerAddrs().clone());
brokerDataList.add(brokerDataClone);
foundBrokerData = true;
for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
List<String> filterServerList = this.filterServerTable.get(brokerAddr);
filterServerMap.put(brokerAddr, filterServerList);
}
}
}
}
} finally {
this.lock.readLock().unlock();
}
网友评论