一 main函数入口
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
controller.start();
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
1.1 命令行参数解析
- 构建公共的命令行解析,
h 帮助提示,
n 名字服务器地址
Options options = ServerUtil.buildCommandlineOptions(new Options());
public static Options buildCommandlineOptions(final Options options) {
Option opt = new Option("h", "help", false, "Print help");
opt.setRequired(false);
options.addOption(opt);
opt =
new Option("n", "namesrvAddr", true,
"Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876");
opt.setRequired(false);
options.addOption(opt);
return options;
}
- 构建namesrv独有的命令行解析
c 指定配置文件路径
p 打印namesrv配置信息并退出。
public static Options buildCommandlineOptions(final Options options) {
Option opt = new Option("c", "configFile", true, "Name server config properties file");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("p", "printConfigItem", false, "Print all config item");
opt.setRequired(false);
options.addOption(opt);
return options;
}
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
public static CommandLine parseCmdLine(final String appName, String[] args, Options options,
CommandLineParser parser) {
HelpFormatter hf = new HelpFormatter();
hf.setWidth(110);
CommandLine commandLine = null;
try {
commandLine = parser.parse(options, args);
if (commandLine.hasOption('h')) {
hf.printHelp(appName, options, true);
return null;
}
} catch (ParseException e) {
hf.printHelp(appName, options, true);
}
return commandLine;
}
- 命令行参数c处理
获取配置文件路径,读取文件内容,初始化配置。
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, " + file + "%n");
in.close();
}
}
if (commandLine.hasOption('p')) {
MixAll.printObjectProperties(null, namesrvConfig);
MixAll.printObjectProperties(null, nettyServerConfig);
System.exit(0);
}
1.2 NamesrvConfig
-
类属性
NamesrvConfig.png
- 属性说明
属性 |
作用 |
rocketmqHome |
部署路径 |
kvConfigPath |
kv配置缓存文件路径 |
configStorePath |
命令行参数c指定的启动配置文件路径 |
orderMessageEnable |
是否支持顺序消费 |
- orderMessageEnable为true时,topic路由信息中
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
...
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
if (topicRouteData != null) {
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
requestHeader.getTopic());
topicRouteData.setOrderTopicConf(orderTopicConf);
}
...
}
...;
}
1.3 NettyServerConfig
-
类属性
NettyServerConfig.png
- 属性说明
属性 |
作用 |
listenPort |
netty监听端口 |
serverWorkerThreads |
通信完成事件处理线程数,从socket读写数据 |
serverCallbackExecutorThreads |
业务处理线程池线程数。1 读响应数据完成后,执行注册回调函数的线程池。2 读请求数据完成后,默认的命令码请求处理的线程池。重要的命令码则可以自定义处理线程池。 |
serverSelectorThreads |
子通信socket的请求事件处理线程数 |
serverOnewaySemaphoreValue |
oneway报文最大发送并发量 |
serverAsyncSemphoreValue |
async报文最大发送并发量 |
serverChannelMaxIdleTimeSeconds |
channel空闲时间 |
serverSocketSndBufSize |
发送缓存长度 |
serverSocketRcvBufSize |
收包缓存长度 |
serverPooledByteBufAllocatorEnable |
是否设置收包缓存为堆内存childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
|
useEpollNativeSelector |
当前selector类型 |
二 NamesrvController
2.1 实例化
public KVConfigManager(NamesrvController namesrvController){
this.namesrvController = namesrvController;
}
public RouteInfoManager() {
this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
this.brokerAddrTable = new HashMap<String, BrokerData>(128);
this.clusterAddrTable = new HashMap<String, Set<String>>(32);
this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
this.filterServerTable = new HashMap<String, List<String>>(256);
}
- 实例化BrokerHousekeepingService。ChannelEventListener监听回调。
public BrokerHousekeepingService(NamesrvController namesrvController) {
this.namesrvController = namesrvController;
}
2.2 初始化
- kv配置持久化文件中加载到内存
this.kvConfigManager.load()
- 初始化netty server
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
- 初始化线程池,并注册为默认命令码处理线程池
this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
this.registerProcessor();
- 注册定时任务,遍历routeInfoManager的活跃broke表,心跳时间超过2分钟的则认为broker已下线,则从活跃broke表中删除并关闭netty连接
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
public void scanNotActiveBroker() {
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
2.3 start
- 启动netty server
this.remotingServer.start();
三 KVConfigManager
-
private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable = new HashMap<String, HashMap<String, String>>();
存储配置
-
private final ReadWriteLock lock = new ReentrantReadWriteLock();
读写锁控制configTable的并发读写
3.1 添加配置
public void putKVConfig(final String namespace, final String key, final String value) {
try {
this.lock.writeLock().lockInterruptibly();
try {
HashMap<String, String> kvTable = this.configTable.get(namespace);
if (null == kvTable) {
kvTable = new HashMap<String, String>();
this.configTable.put(namespace, kvTable);
log.info("putKVConfig create new Namespace {}", namespace);
}
final String prev = kvTable.put(key, value);
if (null != prev) {
log.info("putKVConfig update config item, Namespace: {} Key: {} Value: {}",
namespace, key, value);
} else {
log.info("putKVConfig create new config item, Namespace: {} Key: {} Value: {}",
namespace, key, value);
}
} finally {
this.lock.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("putKVConfig InterruptedException", e);
}
this.persist();
}
3.2 删除配置
public void deleteKVConfig(final String namespace, final String key) {
try {
this.lock.writeLock().lockInterruptibly();
try {
HashMap<String, String> kvTable = this.configTable.get(namespace);
if (null != kvTable) {
String value = kvTable.remove(key);
log.info("deleteKVConfig delete a config item, Namespace: {} Key: {} Value: {}",
namespace, key, value);
}
} finally {
this.lock.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("deleteKVConfig InterruptedException", e);
}
this.persist();
}
3.3 获取配置
public byte[] getKVListByNamespace(final String namespace) {
try {
this.lock.readLock().lockInterruptibly();
try {
HashMap<String, String> kvTable = this.configTable.get(namespace);
if (null != kvTable) {
KVTable table = new KVTable();
table.setTable(kvTable);
return table.encode();
}
} finally {
this.lock.readLock().unlock();
}
} catch (InterruptedException e) {
log.error("getKVListByNamespace InterruptedException", e);
}
return null;
}
- 读锁控制并发,根据namespace+key获取配置
public String getKVConfig(final String namespace, final String key) {
try {
this.lock.readLock().lockInterruptibly();
try {
HashMap<String, String> kvTable = this.configTable.get(namespace);
if (null != kvTable) {
return kvTable.get(key);
}
} finally {
this.lock.readLock().unlock();
}
} catch (InterruptedException e) {
log.error("getKVConfig InterruptedException", e);
}
return null;
}
3.4 持久化
- 获取配置信息,转换成json格式
- 写入文件中
this.namesrvController.getNamesrvConfig().getKvConfigPath(),
public void persist() {
try {
this.lock.readLock().lockInterruptibly();
try {
KVConfigSerializeWrapper kvConfigSerializeWrapper = new KVConfigSerializeWrapper();
kvConfigSerializeWrapper.setConfigTable(this.configTable);
String content = kvConfigSerializeWrapper.toJson();
if (null != content) {
MixAll.string2File(content, this.namesrvController.getNamesrvConfig().getKvConfigPath());
}
} catch (IOException e) {
log.error("persist kvconfig Exception, "
+ this.namesrvController.getNamesrvConfig().getKvConfigPath(), e);
} finally {
this.lock.readLock().unlock();
}
} catch (InterruptedException e) {
log.error("persist InterruptedException", e);
}
}
3.5 加载
public void load() {
String content = null;
try {
content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());
} catch (IOException e) {
log.warn("Load KV config table exception", e);
}
if (content != null) {
KVConfigSerializeWrapper kvConfigSerializeWrapper =
KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class);
if (null != kvConfigSerializeWrapper) {
this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable());
log.info("load KV config table OK");
}
}
}
四 RouteInfoManager
4.1 topicQueueTable
-
HashMap<String/* topic */, List<QueueData>> topicQueueTable
以topic为key,存储topic在各broker上的队列配置,读写状态,读写队列数
- QueueData
public class QueueData implements Comparable<QueueData> {
private String brokerName;//名称
private int readQueueNums;//读队列数
private int writeQueueNums;//写队列数
private int perm;//读写配置
private int topicSynFlag;//同步复制还是异步复制
}
perm值 |
作用 |
1 |
可继承队列配置,用于默认topic,创建topic时从默认topic继承队列配置 |
2 |
可写 |
4 |
可读 |
4.2 brokerAddrTable
-
HashMap<String/* brokerName */, BrokerData> brokerAddrTable
存储broker信息
- BrokerData
public class BrokerData implements Comparable<BrokerData> {
private String cluster;// broker所属集群
private String brokerName;//名称
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;//brokerId 0 master,大于0 slave,地址是对应的ip:port
private final Random random = new Random();
}
- 若broker的master下线后,获取broker地址时使用random随机选择一个
public String selectBrokerAddr() {
String addr = this.brokerAddrs.get(MixAll.MASTER_ID);
if (addr == null) {
List<String> addrs = new ArrayList<String>(brokerAddrs.values());
return addrs.get(random.nextInt(addrs.size()));
}
return addr;
}
4.3 clusterAddrTable
-
HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable
存储集群下的broker结合
4.4 brokerLiveTable
-
HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable
有效的broker列表
- 10s定时任务,调用
scanNotActiveBroker()
,检查时间戳,删除心跳超时的broker
- BrokerLiveInfo
class BrokerLiveInfo {
private long lastUpdateTimestamp;//broker上次心跳报文时间
private DataVersion dataVersion;//broker报文版本,变更配置时更新
private Channel channel;//和broker建立的netty连接chnnel
private String haServerAddr;//主从同步的broker服务地址
}
- DataVersion,使用时间戳和AtomicLong纪录版本
public class DataVersion extends RemotingSerializable {
private long timestamp = System.currentTimeMillis();
private AtomicLong counter = new AtomicLong(0);
}
4.4 filterServerTable
-
HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable
过滤服务地址表
五 BrokerHousekeepingService
- 初始化netty server时,
初始化为NettyRemotingServer.channelEventListener = BrokerHousekeepingService,
初始化事件分发线程protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();
注册channel连接事件处理函数class NettyConnectManageHandler extends ChannelDuplexHandler
- NettyConnectManageHandler在连接状态变更回调处理函数中,添加事件到NettyEventExecutor中
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
}
public void putNettyEvent(final NettyEvent event) {
this.nettyEventExecutor.putNettyEvent(event);
}
- NettyEventExecutor根据channel事件调用BrokerHousekeepingService回调函数
public void run() {
log.info(this.getServiceName() + " service started");
final ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener();
while (!this.isStopped()) {
try {
NettyEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS);
if (event != null && listener != null) {
switch (event.getType()) {
case IDLE:
listener.onChannelIdle(event.getRemoteAddr(), event.getChannel());
break;
case CLOSE:
listener.onChannelClose(event.getRemoteAddr(), event.getChannel());
break;
case CONNECT:
listener.onChannelConnect(event.getRemoteAddr(), event.getChannel());
break;
case EXCEPTION:
listener.onChannelException(event.getRemoteAddr(), event.getChannel());
break;
default:
break;
}
}
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info(this.getServiceName() + " service end");
}
六 命令码处理函数
6.1 DefaultRequestProcessor
6.1.1 请求码分类处理
- 使用线程池
ExecutorService remotingExecutor
处理请求消息
- 根据请求码进行不同处理
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
if (log.isDebugEnabled()) {
log.debug("receive request, {} {} {}",
request.getCode(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
request);
}
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
case RequestCode.GET_ROUTEINTO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return deleteTopicInNamesrv(ctx, request);
case RequestCode.GET_KVLIST_BY_NAMESPACE:
return this.getKVListByNamespace(ctx, request);
case RequestCode.GET_TOPICS_BY_CLUSTER:
return this.getTopicsByCluster(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST:
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
return this.getHasUnitSubUnUnitTopicList(ctx, request);
case RequestCode.UPDATE_NAMESRV_CONFIG:
return this.updateConfig(ctx, request);
case RequestCode.GET_NAMESRV_CONFIG:
return this.getConfig(ctx, request);
default:
break;
}
return null;
}
请求码 |
处理 |
PUT_KV_CONFIG,GET_KV_CONFIG,DELETE_KV_CONFIG |
kv配置的增删改查 |
GET_KVLIST_BY_NAMESPACE |
获取namespace的所有kv配置 |
GET_TOPICS_BY_CLUSTER,GET_SYSTEM_TOPIC_LIST_FROM_NS,GET_UNIT_TOPIC_LIST,GET_HAS_UNIT_SUB_TOPIC_LIST,GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST,GET_ALL_TOPIC_LIST_FROM_NAMESERVER |
获取topic列表信息 |
UPDATE_NAMESRV_CONFIG,GET_NAMESRV_CONFIG |
namesrv配置的修改和查询 |
DELETE_TOPIC_IN_NAMESRV |
删除topic信息 |
WIPE_WRITE_PERM_OF_BROKER |
删除topic在broker上的写权限 |
GET_BROKER_CLUSTER_INFO |
获取集群和集群的broker信息 |
GET_ROUTEINTO_BY_TOPIC |
获取topic路由信息 |
REGISTER_BROKER,UNREGISTER_BROKER |
注册/注销broker |
6.1.2 REGISTER_BROKER
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
topicConfigWrapper,
null,
ctx.channel()
);
-
this.clusterAddrTable.put(clusterName, brokerNames);
添加cluster的broker信息
-
this.brokerAddrTable.put(brokerName, brokerData);
添加broker表信息
- 对master broker,首次添加或配置有变更,则添加或修改
topicQueueTable
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
- 对于slave broker,则返回master broker地址
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
6.1.3 GET_ROUTEINTO_BY_TOPIC
- 获取topic路由
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
- 获取topic的queueData信息,broker信息,顺序消费配置信息,filterServer信息
public class TopicRouteData extends RemotingSerializable {
private String orderTopicConf;
private List<QueueData> queueDatas;
private List<BrokerData> brokerDatas;
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
requestHeader.getTopic());
topicRouteData.setOrderTopicConf(orderTopicConf);
}
6.2 ClusterTestRequestProcessor
- 重写了GET_ROUTEINTO_BY_TOPIC的实现。
- 初始化
DefaultMQAdminExt adminExt;
- 先从本地获取topic路由信息,若获取失败则使用adminExt从其他namesrv获取tpoic路由信息
topicRouteData = adminExt.examineTopicRouteInfo(requestHeader.getTopic());
网友评论