美文网首页
rocketmq源码1-namesrv

rocketmq源码1-namesrv

作者: modou1618 | 来源:发表于2019-01-22 07:54 被阅读0次

一 main函数入口

  • 命令行参数解析处理
  • 配置初始化
  • 日志初始化
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
  • NamesrvController初始化和启动
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
boolean initResult = controller.initialize();
if (!initResult) {
    controller.shutdown();
    System.exit(-3);
}
controller.start();
  • 注册shutdown回调
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
    @Override
    public Void call() throws Exception {
        controller.shutdown();
        return null;
    }
}));

1.1 命令行参数解析

  • 构建公共的命令行解析,
    h 帮助提示,
    n 名字服务器地址
Options options = ServerUtil.buildCommandlineOptions(new Options());

public static Options buildCommandlineOptions(final Options options) {
    Option opt = new Option("h", "help", false, "Print help");
    opt.setRequired(false);
    options.addOption(opt);

    opt =
        new Option("n", "namesrvAddr", true,
            "Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876");
    opt.setRequired(false);
    options.addOption(opt);

    return options;
}
  • 构建namesrv独有的命令行解析
    c 指定配置文件路径
    p 打印namesrv配置信息并退出。
public static Options buildCommandlineOptions(final Options options) {
    Option opt = new Option("c", "configFile", true, "Name server config properties file");
    opt.setRequired(false);
    options.addOption(opt);

    opt = new Option("p", "printConfigItem", false, "Print all config item");
    opt.setRequired(false);
    options.addOption(opt);

    return options;
}
  • 解析命令行参数
    若有参数h,则输出帮助信息
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());

public static CommandLine parseCmdLine(final String appName, String[] args, Options options,
    CommandLineParser parser) {
    HelpFormatter hf = new HelpFormatter();
    hf.setWidth(110);
    CommandLine commandLine = null;
    try {
        commandLine = parser.parse(options, args);
        if (commandLine.hasOption('h')) {
            hf.printHelp(appName, options, true);
            return null;
        }
    } catch (ParseException e) {
        hf.printHelp(appName, options, true);
    }

    return commandLine;
}
  • 命令行参数c处理
    获取配置文件路径,读取文件内容,初始化配置。
if (commandLine.hasOption('c')) {
    String file = commandLine.getOptionValue('c');
    if (file != null) {
        InputStream in = new BufferedInputStream(new FileInputStream(file));
        properties = new Properties();
        properties.load(in);
        MixAll.properties2Object(properties, namesrvConfig);
        MixAll.properties2Object(properties, nettyServerConfig);

        namesrvConfig.setConfigStorePath(file);

        System.out.printf("load config properties file OK, " + file + "%n");
        in.close();
    }
}
  • 命令行参数p处理
    日志打印配置信息,exit。
if (commandLine.hasOption('p')) {
    MixAll.printObjectProperties(null, namesrvConfig);
    MixAll.printObjectProperties(null, nettyServerConfig);
    System.exit(0);
}

1.2 NamesrvConfig

  • 类属性


    NamesrvConfig.png
  • 属性说明
属性 作用
rocketmqHome 部署路径
kvConfigPath kv配置缓存文件路径
configStorePath 命令行参数c指定的启动配置文件路径
orderMessageEnable 是否支持顺序消费
  • orderMessageEnable为true时,topic路由信息中
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
...
    TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());

    if (topicRouteData != null) {
        if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
            String orderTopicConf =
                this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                    requestHeader.getTopic());
            topicRouteData.setOrderTopicConf(orderTopicConf);
        }
...
    }
...;
}

1.3 NettyServerConfig

  • 类属性


    NettyServerConfig.png
  • 属性说明
属性 作用
listenPort netty监听端口
serverWorkerThreads 通信完成事件处理线程数,从socket读写数据
serverCallbackExecutorThreads 业务处理线程池线程数。1 读响应数据完成后,执行注册回调函数的线程池。2 读请求数据完成后,默认的命令码请求处理的线程池。重要的命令码则可以自定义处理线程池。
serverSelectorThreads 子通信socket的请求事件处理线程数
serverOnewaySemaphoreValue oneway报文最大发送并发量
serverAsyncSemphoreValue async报文最大发送并发量
serverChannelMaxIdleTimeSeconds channel空闲时间
serverSocketSndBufSize 发送缓存长度
serverSocketRcvBufSize 收包缓存长度
serverPooledByteBufAllocatorEnable 是否设置收包缓存为堆内存childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
useEpollNativeSelector 当前selector类型

二 NamesrvController

2.1 实例化

  • 初始化配置
  • 实例化KVConfigManager
public KVConfigManager(NamesrvController namesrvController){
    this.namesrvController = namesrvController;
}
  • 实例化RouteInfoManager
public RouteInfoManager() {
    this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
    this.brokerAddrTable = new HashMap<String, BrokerData>(128);
    this.clusterAddrTable = new HashMap<String, Set<String>>(32);
    this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
    this.filterServerTable = new HashMap<String, List<String>>(256);
}
  • 实例化BrokerHousekeepingService。ChannelEventListener监听回调。
public BrokerHousekeepingService(NamesrvController namesrvController) {
    this.namesrvController = namesrvController;
}

2.2 初始化

  • kv配置持久化文件中加载到内存this.kvConfigManager.load()
  • 初始化netty server
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
  • 初始化线程池,并注册为默认命令码处理线程池
this.remotingExecutor =    Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
this.registerProcessor();
  • 注册定时任务,遍历routeInfoManager的活跃broke表,心跳时间超过2分钟的则认为broker已下线,则从活跃broke表中删除并关闭netty连接
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
    NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);

public void scanNotActiveBroker() {
    Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, BrokerLiveInfo> next = it.next();
        long last = next.getValue().getLastUpdateTimestamp();
        if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
            RemotingUtil.closeChannel(next.getValue().getChannel());
            it.remove();
            log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
            this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
        }
    }
}
  • 注册定时任务,打印kv配置信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        NamesrvController.this.kvConfigManager.printAllPeriodically();
    }
}, 1, 10, TimeUnit.MINUTES);

2.3 start

  • 启动netty serverthis.remotingServer.start();

三 KVConfigManager

  • private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable = new HashMap<String, HashMap<String, String>>();存储配置
  • private final ReadWriteLock lock = new ReentrantReadWriteLock();读写锁控制configTable的并发读写

3.1 添加配置

  • 写锁控制并发,添加配置
  • persist()持久化
public void putKVConfig(final String namespace, final String key, final String value) {
    try {
        this.lock.writeLock().lockInterruptibly();
        try {
            HashMap<String, String> kvTable = this.configTable.get(namespace);
            if (null == kvTable) {
                kvTable = new HashMap<String, String>();
                this.configTable.put(namespace, kvTable);
                log.info("putKVConfig create new Namespace {}", namespace);
            }

            final String prev = kvTable.put(key, value);
            if (null != prev) {
                log.info("putKVConfig update config item, Namespace: {} Key: {} Value: {}",
                    namespace, key, value);
            } else {
                log.info("putKVConfig create new config item, Namespace: {} Key: {} Value: {}",
                    namespace, key, value);
            }
        } finally {
            this.lock.writeLock().unlock();
        }
    } catch (InterruptedException e) {
        log.error("putKVConfig InterruptedException", e);
    }

    this.persist();
}

3.2 删除配置

  • 写锁控制并发,删除配置
  • persist()持久化
public void deleteKVConfig(final String namespace, final String key) {
    try {
        this.lock.writeLock().lockInterruptibly();
        try {
            HashMap<String, String> kvTable = this.configTable.get(namespace);
            if (null != kvTable) {
                String value = kvTable.remove(key);
                log.info("deleteKVConfig delete a config item, Namespace: {} Key: {} Value: {}",
                    namespace, key, value);
            }
        } finally {
            this.lock.writeLock().unlock();
        }
    } catch (InterruptedException e) {
        log.error("deleteKVConfig InterruptedException", e);
    }

    this.persist();
}

3.3 获取配置

  • 读锁控制并发,根据namespace获取配置
public byte[] getKVListByNamespace(final String namespace) {
    try {
        this.lock.readLock().lockInterruptibly();
        try {
            HashMap<String, String> kvTable = this.configTable.get(namespace);
            if (null != kvTable) {
                KVTable table = new KVTable();
                table.setTable(kvTable);
                return table.encode();
            }
        } finally {
            this.lock.readLock().unlock();
        }
    } catch (InterruptedException e) {
        log.error("getKVListByNamespace InterruptedException", e);
    }

    return null;
}
  • 读锁控制并发,根据namespace+key获取配置
public String getKVConfig(final String namespace, final String key) {
    try {
        this.lock.readLock().lockInterruptibly();
        try {
            HashMap<String, String> kvTable = this.configTable.get(namespace);
            if (null != kvTable) {
                return kvTable.get(key);
            }
        } finally {
            this.lock.readLock().unlock();
        }
    } catch (InterruptedException e) {
        log.error("getKVConfig InterruptedException", e);
    }

    return null;
}

3.4 持久化

  • 获取配置信息,转换成json格式
  • 写入文件中this.namesrvController.getNamesrvConfig().getKvConfigPath(),
public void persist() {
    try {
        this.lock.readLock().lockInterruptibly();
        try {
            KVConfigSerializeWrapper kvConfigSerializeWrapper = new KVConfigSerializeWrapper();
            kvConfigSerializeWrapper.setConfigTable(this.configTable);

            String content = kvConfigSerializeWrapper.toJson();

            if (null != content) {
                MixAll.string2File(content, this.namesrvController.getNamesrvConfig().getKvConfigPath());
            }
        } catch (IOException e) {
            log.error("persist kvconfig Exception, "
                + this.namesrvController.getNamesrvConfig().getKvConfigPath(), e);
        } finally {
            this.lock.readLock().unlock();
        }
    } catch (InterruptedException e) {
        log.error("persist InterruptedException", e);
    }

}

3.5 加载

public void load() {
    String content = null;
    try {
        content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());
    } catch (IOException e) {
        log.warn("Load KV config table exception", e);
    }
    if (content != null) {
        KVConfigSerializeWrapper kvConfigSerializeWrapper =
            KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class);
        if (null != kvConfigSerializeWrapper) {
            this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable());
            log.info("load KV config table OK");
        }
    }
}

四 RouteInfoManager

4.1 topicQueueTable

  • HashMap<String/* topic */, List<QueueData>> topicQueueTable以topic为key,存储topic在各broker上的队列配置,读写状态,读写队列数
  • QueueData
public class QueueData implements Comparable<QueueData> {
    private String brokerName;//名称
    private int readQueueNums;//读队列数
    private int writeQueueNums;//写队列数
    private int perm;//读写配置
    private int topicSynFlag;//同步复制还是异步复制
}
  • perm值
perm值 作用
1 可继承队列配置,用于默认topic,创建topic时从默认topic继承队列配置
2 可写
4 可读

4.2 brokerAddrTable

  • HashMap<String/* brokerName */, BrokerData> brokerAddrTable存储broker信息
  • BrokerData
public class BrokerData implements Comparable<BrokerData> {
    private String cluster;// broker所属集群
    private String brokerName;//名称
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;//brokerId 0 master,大于0 slave,地址是对应的ip:port
    private final Random random = new Random();
}
  • 若broker的master下线后,获取broker地址时使用random随机选择一个
public String selectBrokerAddr() {
    String addr = this.brokerAddrs.get(MixAll.MASTER_ID);

    if (addr == null) {
        List<String> addrs = new ArrayList<String>(brokerAddrs.values());
        return addrs.get(random.nextInt(addrs.size()));
    }

    return addr;
}

4.3 clusterAddrTable

  • HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable 存储集群下的broker结合

4.4 brokerLiveTable

  • HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable 有效的broker列表
  • 10s定时任务,调用scanNotActiveBroker(),检查时间戳,删除心跳超时的broker
  • BrokerLiveInfo
class BrokerLiveInfo {
    private long lastUpdateTimestamp;//broker上次心跳报文时间
    private DataVersion dataVersion;//broker报文版本,变更配置时更新
    private Channel channel;//和broker建立的netty连接chnnel
    private String haServerAddr;//主从同步的broker服务地址
}
  • DataVersion,使用时间戳和AtomicLong纪录版本
public class DataVersion extends RemotingSerializable {
    private long timestamp = System.currentTimeMillis();
    private AtomicLong counter = new AtomicLong(0);
}

4.4 filterServerTable

  • HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable 过滤服务地址表

五 BrokerHousekeepingService

  • 初始化netty server时,
    初始化为NettyRemotingServer.channelEventListener = BrokerHousekeepingService,
    初始化事件分发线程protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();
    注册channel连接事件处理函数class NettyConnectManageHandler extends ChannelDuplexHandler
  • NettyConnectManageHandler在连接状态变更回调处理函数中,添加事件到NettyEventExecutor中
if (NettyRemotingServer.this.channelEventListener != null) {
    NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
}

public void putNettyEvent(final NettyEvent event) {
    this.nettyEventExecutor.putNettyEvent(event);
}
  • NettyEventExecutor根据channel事件调用BrokerHousekeepingService回调函数
public void run() {
    log.info(this.getServiceName() + " service started");

    final ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener();

    while (!this.isStopped()) {
        try {
            NettyEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS);
            if (event != null && listener != null) {
                switch (event.getType()) {
                    case IDLE:
                        listener.onChannelIdle(event.getRemoteAddr(), event.getChannel());
                        break;
                    case CLOSE:
                        listener.onChannelClose(event.getRemoteAddr(), event.getChannel());
                        break;
                    case CONNECT:
                        listener.onChannelConnect(event.getRemoteAddr(), event.getChannel());
                        break;
                    case EXCEPTION:
                        listener.onChannelException(event.getRemoteAddr(), event.getChannel());
                        break;
                    default:
                        break;

                }
            }
        } catch (Exception e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    log.info(this.getServiceName() + " service end");
}

六 命令码处理函数

6.1 DefaultRequestProcessor

6.1.1 请求码分类处理

  • 使用线程池ExecutorService remotingExecutor处理请求消息
  • 根据请求码进行不同处理
public RemotingCommand processRequest(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    if (log.isDebugEnabled()) {
        log.debug("receive request, {} {} {}",
            request.getCode(),
            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
            request);
    }

    switch (request.getCode()) {
        case RequestCode.PUT_KV_CONFIG:
            return this.putKVConfig(ctx, request);
        case RequestCode.GET_KV_CONFIG:
            return this.getKVConfig(ctx, request);
        case RequestCode.DELETE_KV_CONFIG:
            return this.deleteKVConfig(ctx, request);
        case RequestCode.REGISTER_BROKER:
            Version brokerVersion = MQVersion.value2Version(request.getVersion());
            if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                return this.registerBrokerWithFilterServer(ctx, request);
            } else {
                return this.registerBroker(ctx, request);
            }
        case RequestCode.UNREGISTER_BROKER:
            return this.unregisterBroker(ctx, request);
        case RequestCode.GET_ROUTEINTO_BY_TOPIC:
            return this.getRouteInfoByTopic(ctx, request);
        case RequestCode.GET_BROKER_CLUSTER_INFO:
            return this.getBrokerClusterInfo(ctx, request);
        case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
            return this.wipeWritePermOfBroker(ctx, request);
        case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
            return getAllTopicListFromNameserver(ctx, request);
        case RequestCode.DELETE_TOPIC_IN_NAMESRV:
            return deleteTopicInNamesrv(ctx, request);
        case RequestCode.GET_KVLIST_BY_NAMESPACE:
            return this.getKVListByNamespace(ctx, request);
        case RequestCode.GET_TOPICS_BY_CLUSTER:
            return this.getTopicsByCluster(ctx, request);
        case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
            return this.getSystemTopicListFromNs(ctx, request);
        case RequestCode.GET_UNIT_TOPIC_LIST:
            return this.getUnitTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
            return this.getHasUnitSubTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
            return this.getHasUnitSubUnUnitTopicList(ctx, request);
        case RequestCode.UPDATE_NAMESRV_CONFIG:
            return this.updateConfig(ctx, request);
        case RequestCode.GET_NAMESRV_CONFIG:
            return this.getConfig(ctx, request);
        default:
            break;
    }
    return null;
}
请求码 处理
PUT_KV_CONFIG,GET_KV_CONFIG,DELETE_KV_CONFIG kv配置的增删改查
GET_KVLIST_BY_NAMESPACE 获取namespace的所有kv配置
GET_TOPICS_BY_CLUSTER,GET_SYSTEM_TOPIC_LIST_FROM_NS,GET_UNIT_TOPIC_LIST,GET_HAS_UNIT_SUB_TOPIC_LIST,GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST,GET_ALL_TOPIC_LIST_FROM_NAMESERVER 获取topic列表信息
UPDATE_NAMESRV_CONFIG,GET_NAMESRV_CONFIG namesrv配置的修改和查询
DELETE_TOPIC_IN_NAMESRV 删除topic信息
WIPE_WRITE_PERM_OF_BROKER 删除topic在broker上的写权限
GET_BROKER_CLUSTER_INFO 获取集群和集群的broker信息
GET_ROUTEINTO_BY_TOPIC 获取topic路由信息
REGISTER_BROKER,UNREGISTER_BROKER 注册/注销broker

6.1.2 REGISTER_BROKER

  • broker注册接口调用
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
    requestHeader.getClusterName(),
    requestHeader.getBrokerAddr(),
    requestHeader.getBrokerName(),
    requestHeader.getBrokerId(),
    requestHeader.getHaServerAddr(),
    topicConfigWrapper,
    null,
    ctx.channel()
);
  • this.clusterAddrTable.put(clusterName, brokerNames);添加cluster的broker信息
  • this.brokerAddrTable.put(brokerName, brokerData);添加broker表信息
  • 对master broker,首次添加或配置有变更,则添加或修改topicQueueTable
if (null != topicConfigWrapper
    && MixAll.MASTER_ID == brokerId) {
    if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
        || registerFirst) {
        ConcurrentMap<String, TopicConfig> tcTable =
            topicConfigWrapper.getTopicConfigTable();
        if (tcTable != null) {
            for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                this.createAndUpdateQueueData(brokerName, entry.getValue());
            }
        }
    }
}
  • 添加brokerLiveTable表内容
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
    new BrokerLiveInfo(
        System.currentTimeMillis(),
        topicConfigWrapper.getDataVersion(),
        channel,
        haServerAddr));
  • 对于slave broker,则返回master broker地址
if (MixAll.MASTER_ID != brokerId) {
    String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
    if (masterAddr != null) {
        BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
        if (brokerLiveInfo != null) {
            result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
            result.setMasterAddr(masterAddr);
        }
    }
}

6.1.3 GET_ROUTEINTO_BY_TOPIC

  • 获取topic路由
    TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
  • 获取topic的queueData信息,broker信息,顺序消费配置信息,filterServer信息
public class TopicRouteData extends RemotingSerializable {
    private String orderTopicConf;
    private List<QueueData> queueDatas;
    private List<BrokerData> brokerDatas;
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
  • 开启顺序消息的添加顺序topic配置
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
    String orderTopicConf =
        this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
            requestHeader.getTopic());
    topicRouteData.setOrderTopicConf(orderTopicConf);
}

6.2 ClusterTestRequestProcessor

  • 重写了GET_ROUTEINTO_BY_TOPIC的实现。
  • 初始化DefaultMQAdminExt adminExt;
  • 先从本地获取topic路由信息,若获取失败则使用adminExt从其他namesrv获取tpoic路由信息
    topicRouteData = adminExt.examineTopicRouteInfo(requestHeader.getTopic());

相关文章

网友评论

      本文标题:rocketmq源码1-namesrv

      本文链接:https://www.haomeiwen.com/subject/xpvkjqtx.html