美文网首页RocketMQ
二、RocketMQ-Broker启动流程

二、RocketMQ-Broker启动流程

作者: ASD_92f7 | 来源:发表于2019-04-23 11:29 被阅读61次

一、简述

本文简述一下Broker的启动流程,主要涉及的步骤及简要配置,不做过多深入。

二、BrokerStartup、BrokerController

启动类,类结构如下:

BrokerStartup.java
首先会调用createBrokerController()实例化一个BrokerController(它才是核心的启动类),然后调用start()方法,这个套路和Namesrv的思路一致。

1、createBrokerController()

与Namesrv类似,通过一顿骚操作,将cmd命令行的参数进行解析,并产出4个配置(这四个配置本身有默认配置):

  • BrokerConfig
    broker自身的一些配置,例如namesrvAddr(namesrv地址),brokerIP1brokerIP2brokerNamedefaultTopicQueueNums(默认8个队列),autoCreateTopicEnable(默认居然是true)等等
  • NettyServerConfig
    nettyserver的一些配置,最重要的 listenPort10911)及serverWorkerThreads(worker的线程数量)、serverOnewaySemaphoreValue(one-way模式发送的最大线程数)、serverAsyncSemaphoreValue(异步模式最大的线程数),serverSocketSndBufSize(发送消息最大长度)等等
  • NettyClientConfig
    netty客户端(producerconsumer)的一些配置,connectTimeoutMillis(超时时间默认3秒),channelNotActiveInterval(通道异常检查时间,默认60秒)等等
  • MessageStoreConfig
    消息commitLog固化配置,storePathCommitLog(commitLog目录),mapedFileSizeCommitLog(commitLog大小,默认1G),
    messageDelayLevel(消息延迟投递级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)

与Namesrv类似,如果cmd中含有:

  • -c path,则会加载path指定的配置文件中的配置
  • -p 会打印所有的配置,并退出
  • -m 只打印 @ImportantField 注解标注的配置,并退出

待上面的配置都加载完毕后,调用initialize进行初始化:

boolean initResult = controller.initialize();

2、controller.initialize()

这个方法很重要,做了一堆的事情,在罗列功能之前,先看这个图

log地址
RocketMQ默认会在/home(我的是windows,所以home=C:\User\asd)下新建一个store文件夹,里面存放了一堆一堆很重要的配置,commitlog(固化的消息),configtopicsubscriptionGroupconsumerFilterconsumerOffsetdelayOffset配置),consumequeue(消费者队列配置)等等,都很重要,而controller.initialize()方法主要就是在broker启动的时候对这些配置进行初始化
  • topicConfigManager.load()
    加载C:\Users\asd\store\config\topics.json
  • consumerOffsetManager.load()
    加载C:\Users\asd\store\config\consumerOffset.json
  • subscriptionGroupManager.load()
    加载C:\Users\asd\store\config\subscriptionGroup.json
  • consumerFilterManager.load()
    加载C:\Users\asd\store\config\consumerFilter.json
  • messageStore.load()
    加载C:\Users\asd\store\commitLog\ 下所有消息日志文件

该加载的都加载完了,然后启动NettyServer以及一堆的 Executor 线程池

  • remotingServer
    默认监听端口 10911
  • fastRemotingServer
    默认监听端口 10911 - 2 = 10909,莫非和VIP通道有关?后续补充
  • sendMessageExecutor
    发送消息的线程池
  • pullMessageExecutor
    拉取消息的线程池
  • queryMessageExecutor
    查询消息的线程池
  • adminBrokerExecutor
    不知道干啥的线程池
  • clientManageExecutor
    客户端连接管理线程池
  • heartbeatExecutor
    心跳线程池
  • endTransactionExecutor
    事务结束线程池
  • consumerManageExecutor
    消费者连接线程池

线程池初始化呢完毕后,开启一些定时任务

  • BrokerController.this.getBrokerStats().record();
    固化broker的状态,默认1天执行一次
  • BrokerController.this.consumerOffsetManager.persist();
    固化offset,延迟10秒,每5秒执行一次
  • BrokerController.this.consumerFilterManager.persist();
    Filter固化,延迟10秒,每10秒执行一次
  • BrokerController.this.protectBroker();
    保护broker?后续完善,延迟3分,每3分执行一次
  • BrokerController.this.printWaterMark();
    打印流水信息,发送、拉取、查询、结束事务消息等日志
    延迟10秒,每1秒一次
  • log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
    打印日志:获取已经固化到commitlog,但是还没有被消费的日志的byte大小
    延迟10秒,每60秒执行一次
  • BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
    如果fetchNamesrvAddrByAddressServer=true,则会执行
    主要是通过httpclient从给定的URL动态获取NameSrv的地址
    延迟10秒,每120秒执行一次
  • BrokerController.this.slaveSynchronize.syncAll();
    如果broker的角色是SLAVE,则会执行,主要是从主Broker定期同步topicoffsetdelayOffsetgroup信息
    延迟10秒,每60秒执行一次
  • BrokerController.this.printMasterAndSlaveDiff();
    如果broker的角色是不是SLAVE,则会执行,主要打印SLAVEMaster之间差异的byte大小

如果启用了TLS安全传输配置,则会启动 fileWatchService?后续补充

最后调用了三个方法:

  • initialTransaction()
    初始化事务消息的一些服务及Listener
  • initialAcl();
    初始化ACL的一些服务,进行访问控制
  • initialRpcHooks();
    暂时不知道干啥的

到这里,controller.initialize()算是完成了,回顾一下,其实思路很清晰:

  • 1、加载配置文件
  • 2、启动Netty服务
  • 3、初始化线程池
  • 4、启动定时任务
  • 5、其他的一些配置

3、start()

启动就比较直接了,首先启动一些服务:

  • messageStore.start()
    消息固化服务启动
  • remotingServer.start()
    nettyServer启动 10911
  • fastRemotingServer.start()
    vip nettyServer启动,端口10909
  • fileWatchService.start();
    启用TLS加密的服务启动
  • brokerOuterAPI.start();
    启用动态获取NameSrv的服务启动
  • pullRequestHoldService.start();
    获取请求的服务启动
  • clientHousekeepingService.start();
    启动
  • filterServerManager.start();
    过滤器启动
  • BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
    定时任务,从Namesrv获取所有broker注册的信息,延迟10秒,每10秒或者registerNameServerPeriod指定的时间,但是不能少于10秒执行一次
  • brokerStatsManager.start();
    broker状态检查启动,貌似啥也没干
  • this.brokerFastFailure.start();
  • transactionalMessageCheckService.start();
    如果不是SLAVE,则会启动事务

相关文章

网友评论

    本文标题:二、RocketMQ-Broker启动流程

    本文链接:https://www.haomeiwen.com/subject/rocugqtx.html