RocketMQ路由注册是通过Broker与NameServer的心跳功能实现。Broker启动时向集群中所有的NameServer发送心跳包,每隔30秒向集群中的所有NameServer发送心跳,NamerServer收到心跳包会更新brokerLiveTable缓存中的BrokerLiveInfo的lastUpdateTimestamp,然后NameServer每隔十秒扫描brokerLiveTable,如果连续120秒没有收到心跳包,NameServer将移除该broker的路由信息,同时关闭Socket连接。
这里也是为什么不建议线上开启autoCreateTopic的原因,因为默认缓存在broker中,所以开启后,多个broker同时默认创建topic,将会导致重复创建broker,最终同步到NameServer后的数据重复。
Broker发送心跳包
org.apache.rocketmq.broker.BrokerController#start
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
跟踪代码,真正执行注册
this.brokerOuterAPI.registerBrokerAll
org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll
public List<RegisterBrokerResult> registerBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
final int timeoutMills,
final boolean compressed) {
final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
//获取 NameServer 地址列表
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
//填充requestHeader属性
RegisterBrokerBody requestBody = new RegisterBrokerBody();
//填充body属性
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
//真正注册
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
}
});
}
}
return registerBrokerResultList;
}
注册逻辑在registerBroker中执行,逻辑较为简单,发送http请求处理返回数据即可。
private RegisterBrokerResult registerBroker(
final String namesrvAddr,
final boolean oneway,
final int timeoutMills,
final RegisterBrokerRequestHeader requestHeader,
final byte[] body
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
request.setBody(body);
if (oneway) {
try {
this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
} catch (RemotingTooMuchRequestException e) {
// Ignore
}
return null;
}
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
RegisterBrokerResponseHeader responseHeader =
(RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
RegisterBrokerResult result = new RegisterBrokerResult();
result.setMasterAddr(responseHeader.getMasterAddr());
result.setHaServerAddr(responseHeader.getHaServerAddr());
if (response.getBody() != null) {
result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
}
return result;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
以上完成后,返回BrokerController#doRegisterBrokerAll 方法中
//注册并获取结果
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(xxxx);
//更新心跳包返回数据
if (registerBrokerResultList.size() > 0) {
RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
if (registerBrokerResult != null) {
if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
}
this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
if (checkOrderConfig) {
this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
}
}
}
获取返回的结果,更新对应的masterAddr 以及 topicConfigTable
NameServer处理心跳包
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor 负责RocketMQ中处理所有相关的网络请求,对应的心跳包请求解析器为RequestCode.REGISTER_BROKER
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
跟踪对应的代码,我们可以看到请求的处理和返回代码如下:
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
registerBrokerBody.getTopicConfigSerializeWrapper(),
registerBrokerBody.getFilterServerList(),
ctx.channel());
responseHeader.setHaServerAddr(result.getHaServerAddr());
responseHeader.setMasterAddr(result.getMasterAddr());
byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
response.setBody(jsonValue);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
namesrvController.getRouteInfoManager().registerBroker 负责处理注册请求,处理后 返回体包装对应的 ServerAddr以及路由信息返回。
RouteInfoManager#registerBroker
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
//写加锁
this.lock.writeLock().lockInterruptibly();
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
boolean registerFirst = false;
//获取brokerName,如果为空,则注册
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> item = it.next();
if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
it.remove();
}
}
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}
return result;
}
路由删除
如上所示,broker每隔30秒向NameServer注册路由信息,那么如果一个broker宕机后,NameServer如何剔除Broker呢?同样的NameServer每隔10秒扫描broker列表,管理broker存活信息。
RocketMQ有两个出发点来完成路由删除:
- NameServer定时扫描brokerLiveTable检测上次心跳包与当前系统的时间差,如果时间差大于120s,则需要剔除该broker信息
- Broker正常关闭的情况下,执行unregisterBroker命令
RouteInfoManager#scanNotActiveBroker
public void scanNotActiveBroker() {
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
onChannelDestroy 中申请写锁,然后遍历移除。
路由发现
RocketMQ的路由发现不是实时的,当topic路由发生变化时,NameServer不主动推送给客户端,而是客户端定时拉取最新的主题信息。根据主题名拉取路由信息的命令编码命名为RequestCode.GET_ROUTEINTO_BY_TOPIC。
请求依然由DefaultRequestProcessor处理。
case RequestCode.GET_ROUTEINTO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
核心实现
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
public class TopicRouteData extends RemotingSerializable {
private String orderTopicConf;//顺序消息配置
private List<QueueData> queueDatas;//队列元数据
private List<BrokerData> brokerDatas;//topic分布的broker原数据
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;//broker上过滤服务器列表
核心实现则由 namesrvController.getRouteInfoManager().pickupTopicRouteData
网友评论