namesrv三部曲
开篇
-
本文基于rocketmq-all-4.6.1的版本进行分析,主要分析rocketmq的namesrv功能,namesrv的核心功能包括启动流程、元数据存储、以及交互流程。
-
这篇文章主要是分析namesrv的元数据存储。
namesrv的定位
- namesrv的定位是作为注册中心,保存broker节点的路由信息,保存一些简单的k/v配置信息。
- namesrv支持集群模式,但是每个namesrv之间相互独立不进行任何通信,它的多点容灾通过producer/consumer在访问namesrv的时候轮询获取信息(当前节点访问失败就转向下一个)。
- namesrv作为注册中心,负责接收broker定期的注册信息并维持在内存当中,没错namesrv是没有持久化功能的,所有数据都保存在内存当中,broker的注册过程也是循环遍历所有namesrv进行注册。
- namesrv提供对外接口给producer和consumer访问broker的路由信息,底层通过netty来实现。
- namesrv对broker的存活检测机制:心跳机制即namesrv作为broker的server端定期接收broker的心跳信息,超时无心跳就移除broker;连接异常检测机制即底层通过epoll的消息机制来检测连接的断开。
namesrv的元数据存储
public class NamesrvController {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final NamesrvConfig namesrvConfig;
private final NettyServerConfig nettyServerConfig;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"NSScheduledThread"));
private final KVConfigManager kvConfigManager;
private final RouteInfoManager routeInfoManager;
private RemotingServer remotingServer;
private BrokerHousekeepingService brokerHousekeepingService;
private ExecutorService remotingExecutor;
private Configuration configuration;
private FileWatchService fileWatchService;
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
this.namesrvConfig = namesrvConfig;
this.nettyServerConfig = nettyServerConfig;
this.kvConfigManager = new KVConfigManager(this);
this.routeInfoManager = new RouteInfoManager();
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
this.configuration = new Configuration(
log,
this.namesrvConfig, this.nettyServerConfig
);
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}
}
- KVConfigManager kvConfigManager保存KV配置信息。
- RouteInfoManager routeInfoManager保存路由信息。
KVConfigManager

public class KVConfigManager {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
// 外层的Namespace表示的是Topic维度,内层的kv为就是配置的key/value
private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable =
new HashMap<String, HashMap<String, String>>();
public KVConfigManager(NamesrvController namesrvController) {
this.namesrvController = namesrvController;
}
public void load() {
String content = null;
try {
content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());
} catch (IOException e) {
log.warn("Load KV config table exception", e);
}
if (content != null) {
KVConfigSerializeWrapper kvConfigSerializeWrapper =
KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class);
if (null != kvConfigSerializeWrapper) {
this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable());
log.info("load KV config table OK");
}
}
}
public void putKVConfig(final String namespace, final String key, final String value) {
try {
this.lock.writeLock().lockInterruptibly();
try {
HashMap<String, String> kvTable = this.configTable.get(namespace);
if (null == kvTable) {
kvTable = new HashMap<String, String>();
this.configTable.put(namespace, kvTable);
log.info("putKVConfig create new Namespace {}", namespace);
}
final String prev = kvTable.put(key, value);
if (null != prev) {
log.info("putKVConfig update config item, Namespace: {} Key: {} Value: {}",
namespace, key, value);
} else {
log.info("putKVConfig create new config item, Namespace: {} Key: {} Value: {}",
namespace, key, value);
}
} finally {
this.lock.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("putKVConfig InterruptedException", e);
}
this.persist();
}
public void persist() {
try {
this.lock.readLock().lockInterruptibly();
try {
KVConfigSerializeWrapper kvConfigSerializeWrapper = new KVConfigSerializeWrapper();
kvConfigSerializeWrapper.setConfigTable(this.configTable);
String content = kvConfigSerializeWrapper.toJson();
if (null != content) {
MixAll.string2File(content, this.namesrvController.getNamesrvConfig().getKvConfigPath());
}
} catch (IOException e) {
log.error("persist kvconfig Exception, "
+ this.namesrvController.getNamesrvConfig().getKvConfigPath(), e);
} finally {
this.lock.readLock().unlock();
}
} catch (InterruptedException e) {
log.error("persist InterruptedException", e);
}
}
public void deleteKVConfig(final String namespace, final String key) {
try {
this.lock.writeLock().lockInterruptibly();
try {
HashMap<String, String> kvTable = this.configTable.get(namespace);
if (null != kvTable) {
String value = kvTable.remove(key);
log.info("deleteKVConfig delete a config item, Namespace: {} Key: {} Value: {}",
namespace, key, value);
}
} finally {
this.lock.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("deleteKVConfig InterruptedException", e);
}
this.persist();
}
}
- KVConfigManager的配置按照Topic维度的key/value进行保存。
- KVConfigManager提供增删改查接口,通过读写锁保证线程安全。
- KVConfigManager的configTable用于保存配置,ReadWriteLock用以读写隔离。
- putKVConfig负责保存KV、deleteKVConfig负责删除KV,persist负责持久化KV。
RouteInfoManager

- topicQueueTable保存以topic为维度的queue信息,queue信息以broker为维度。
- brokerAddrTable保存以brokerName为维度的broker实例信息BrokerData,BrokerData内部按照brokerId为维度的broker的地址。
- clusterAddrTable保存以clusterName为维度的broker的集群信息,保存cluster下的brokerName集合。
- brokerLiveTable保存以ip地址为维度的活动broker信息。
public class RouteInfoManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
public class QueueData implements Comparable<QueueData> {
private String brokerName;
private int readQueueNums;
private int writeQueueNums;
private int perm;
private int topicSynFlag;
}
public class BrokerData implements Comparable<BrokerData> {
private String cluster;
private String brokerName;
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
private final Random random = new Random();
public BrokerData() {
}
public BrokerData(String cluster, String brokerName, HashMap<Long, String> brokerAddrs) {
this.cluster = cluster;
this.brokerName = brokerName;
this.brokerAddrs = brokerAddrs;
}
}
class BrokerLiveInfo {
private long lastUpdateTimestamp;
private DataVersion dataVersion;
private Channel channel;
private String haServerAddr;
public BrokerLiveInfo(long lastUpdateTimestamp, DataVersion dataVersion, Channel channel,
String haServerAddr) {
this.lastUpdateTimestamp = lastUpdateTimestamp;
this.dataVersion = dataVersion;
this.channel = channel;
this.haServerAddr = haServerAddr;
}
}
- RouteInfoManager的核心变量数据结果如上图。
- 核心变量topicQueueTable、brokerAddrTable、clusterAddrTable、brokerLiveTable。
topicQueueTable

{
"RMQ_SYS_TRANS_HALF_TOPIC": [{
"brokerName": "broker-a",
"perm": 6,
"readQueueNums": 1,
"topicSynFlag": 0,
"writeQueueNums": 1
}],
"SELF_TEST_TOPIC": [{
"brokerName": "broker-a",
"perm": 6,
"readQueueNums": 1,
"topicSynFlag": 0,
"writeQueueNums": 1
}],
"TBW102": [{
"brokerName": "broker-a",
"perm": 7,
"readQueueNums": 8,
"topicSynFlag": 0,
"writeQueueNums": 8
}],
"BenchmarkTest": [{
"brokerName": "broker-a",
"perm": 6,
"readQueueNums": 1024,
"topicSynFlag": 0,
"writeQueueNums": 1024
}],
"DefaultCluster": [{
"brokerName": "broker-a",
"perm": 7,
"readQueueNums": 16,
"topicSynFlag": 0,
"writeQueueNums": 16
}],
"DefaultCluster_REPLY_TOPIC": [{
"brokerName": "broker-a",
"perm": 6,
"readQueueNums": 1,
"topicSynFlag": 0,
"writeQueueNums": 1
}],
"OFFSET_MOVED_EVENT": [{
"brokerName": "broker-a",
"perm": 6,
"readQueueNums": 1,
"topicSynFlag": 0,
"writeQueueNums": 1
}]
}
- topicQueueTable保存topic的信息,包括所在的broker、读写队列等。
brokerAddrTable

{
"broker-a": {
"brokerAddrs": {
0: "192.168.0.8:10911"
},
"brokerName": "broker-a",
"cluster": "DefaultCluster"
}
}
- brokerAddrTable保存的broker的信息,以brokerName(如broker-a)作为key,保存的value为<brokerId : brokerAddr>信息。
clusterAddrTable

{
"DefaultCluster": ["broker-a"]
}
- clusterAddrTable保存的集群信息,以集群名(如DefaultCluster)为key,value为该集群下的所有的broker信息。
brokerLiveTable

{
"192.168.0.8:10911": {
"channel": {
"active": false,
"inputShutdown": false,
"open": false,
"outputShutdown": true,
"registered": false,
"writable": false
},
"dataVersion": {
"counter": 1,
"timestamp": 1588318700534
},
"haServerAddr": "192.168.0.8:10912",
"lastUpdateTimestamp": 1588323761374
}
}
- brokerLiveTable保存的保活broker信息,key为broker的地址信息(如192.168.0.8:10911),value为存活的broker信息。
RouteInfoManager的核心操作

registerBroker
public class RouteInfoManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
public RouteInfoManager() {
this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
this.brokerAddrTable = new HashMap<String, BrokerData>(128);
this.clusterAddrTable = new HashMap<String, Set<String>>(32);
this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
this.filterServerTable = new HashMap<String, List<String>>(256);
}
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
this.lock.writeLock().lockInterruptibly();
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
boolean registerFirst = false;
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> item = it.next();
if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
it.remove();
}
}
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
// 同一个brokerName下brokerId=0表示为master,才会进行register操作
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}
return result;
}
private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
QueueData queueData = new QueueData();
queueData.setBrokerName(brokerName);
queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
queueData.setReadQueueNums(topicConfig.getReadQueueNums());
queueData.setPerm(topicConfig.getPerm());
queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());
List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
if (null == queueDataList) {
queueDataList = new LinkedList<QueueData>();
queueDataList.add(queueData);
this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
} else {
boolean addNewOne = true;
Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
if (qd.getBrokerName().equals(brokerName)) {
if (qd.equals(queueData)) {
addNewOne = false;
} else {
log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,
queueData);
it.remove();
}
}
}
if (addNewOne) {
queueDataList.add(queueData);
}
}
}
}
- registerBroker的过程通过读写锁ReadWriteLock来实现线程安全隔离。
- 按照clusterAddrTable、brokerAddrTable、BrokerData、topicQueueTable的顺序判断是否存在以及对应的操作。
- 针对topicQueueTable的操作只会在brokerId=0为master角色的情况下才会执行。
unregisterBroker
public class RouteInfoManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
public RouteInfoManager() {
this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
this.brokerAddrTable = new HashMap<String, BrokerData>(128);
this.clusterAddrTable = new HashMap<String, Set<String>>(32);
this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
this.filterServerTable = new HashMap<String, List<String>>(256);
}
public void unregisterBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId) {
try {
try {
this.lock.writeLock().lockInterruptibly();
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr);
log.info("unregisterBroker, remove from brokerLiveTable {}, {}",
brokerLiveInfo != null ? "OK" : "Failed",
brokerAddr
);
this.filterServerTable.remove(brokerAddr);
boolean removeBrokerName = false;
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null != brokerData) {
String addr = brokerData.getBrokerAddrs().remove(brokerId);
log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}",
addr != null ? "OK" : "Failed",
brokerAddr
);
if (brokerData.getBrokerAddrs().isEmpty()) {
this.brokerAddrTable.remove(brokerName);
log.info("unregisterBroker, remove name from brokerAddrTable OK, {}",
brokerName
);
removeBrokerName = true;
}
}
if (removeBrokerName) {
Set<String> nameSet = this.clusterAddrTable.get(clusterName);
if (nameSet != null) {
boolean removed = nameSet.remove(brokerName);
log.info("unregisterBroker, remove name from clusterAddrTable {}, {}",
removed ? "OK" : "Failed",
brokerName);
if (nameSet.isEmpty()) {
this.clusterAddrTable.remove(clusterName);
log.info("unregisterBroker, remove cluster from clusterAddrTable {}",
clusterName
);
}
}
this.removeTopicByBrokerName(brokerName);
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("unregisterBroker Exception", e);
}
}
private void removeTopicByBrokerName(final String brokerName) {
Iterator<Entry<String, List<QueueData>>> itMap = this.topicQueueTable.entrySet().iterator();
while (itMap.hasNext()) {
Entry<String, List<QueueData>> entry = itMap.next();
String topic = entry.getKey();
List<QueueData> queueDataList = entry.getValue();
Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
if (qd.getBrokerName().equals(brokerName)) {
log.info("removeTopicByBrokerName, remove one broker's topic {} {}", topic, qd);
it.remove();
}
}
if (queueDataList.isEmpty()) {
log.info("removeTopicByBrokerName, remove the topic all queue {}", topic);
itMap.remove();
}
}
}
}
- unregisterBroker的过程通过读写锁ReadWriteLock来实现线程安全隔离。
- 按照brokerLiveTable、filterServerTable、brokerAddrTable、clusterAddrTable的逆向顺序进行操作。
- 针对broker移出的连锁操作:如果brokerId的移除导致该brokerName的broker为空就接着移除brokerName,如果移除了brokerName就会导致该brokerName上所有topic信息需要移除。
RocketMQ broker配置信息
2m-2s配置
- RocketMQ的集群模式下brokerClusterName必须相同。
- RocketMQ的集群模式下同一个broker的主从模式下brokerName相同,brokerId=0为master,slave按照1/2/3顺序叠加。
- 了解了RocketMQ集群模式的配置有利于了解RouteInfoManager信息。
##broker-a.properties
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
##broker-a-s.properties
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
##broker-b.properties
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
##broker-b-s.properties
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
网友评论