美文网首页RocketMQ
二、RocketMQ-Broker启动流程

二、RocketMQ-Broker启动流程

作者: ASD_92f7 | 来源:发表于2019-04-23 11:29 被阅读61次

    一、简述

    本文简述一下Broker的启动流程,主要涉及的步骤及简要配置,不做过多深入。

    二、BrokerStartup、BrokerController

    启动类,类结构如下:

    BrokerStartup.java
    首先会调用createBrokerController()实例化一个BrokerController(它才是核心的启动类),然后调用start()方法,这个套路和Namesrv的思路一致。

    1、createBrokerController()

    与Namesrv类似,通过一顿骚操作,将cmd命令行的参数进行解析,并产出4个配置(这四个配置本身有默认配置):

    • BrokerConfig
      broker自身的一些配置,例如namesrvAddr(namesrv地址),brokerIP1brokerIP2brokerNamedefaultTopicQueueNums(默认8个队列),autoCreateTopicEnable(默认居然是true)等等
    • NettyServerConfig
      nettyserver的一些配置,最重要的 listenPort10911)及serverWorkerThreads(worker的线程数量)、serverOnewaySemaphoreValue(one-way模式发送的最大线程数)、serverAsyncSemaphoreValue(异步模式最大的线程数),serverSocketSndBufSize(发送消息最大长度)等等
    • NettyClientConfig
      netty客户端(producerconsumer)的一些配置,connectTimeoutMillis(超时时间默认3秒),channelNotActiveInterval(通道异常检查时间,默认60秒)等等
    • MessageStoreConfig
      消息commitLog固化配置,storePathCommitLog(commitLog目录),mapedFileSizeCommitLog(commitLog大小,默认1G),
      messageDelayLevel(消息延迟投递级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)

    与Namesrv类似,如果cmd中含有:

    • -c path,则会加载path指定的配置文件中的配置
    • -p 会打印所有的配置,并退出
    • -m 只打印 @ImportantField 注解标注的配置,并退出

    待上面的配置都加载完毕后,调用initialize进行初始化:

    boolean initResult = controller.initialize();
    

    2、controller.initialize()

    这个方法很重要,做了一堆的事情,在罗列功能之前,先看这个图

    log地址
    RocketMQ默认会在/home(我的是windows,所以home=C:\User\asd)下新建一个store文件夹,里面存放了一堆一堆很重要的配置,commitlog(固化的消息),configtopicsubscriptionGroupconsumerFilterconsumerOffsetdelayOffset配置),consumequeue(消费者队列配置)等等,都很重要,而controller.initialize()方法主要就是在broker启动的时候对这些配置进行初始化
    • topicConfigManager.load()
      加载C:\Users\asd\store\config\topics.json
    • consumerOffsetManager.load()
      加载C:\Users\asd\store\config\consumerOffset.json
    • subscriptionGroupManager.load()
      加载C:\Users\asd\store\config\subscriptionGroup.json
    • consumerFilterManager.load()
      加载C:\Users\asd\store\config\consumerFilter.json
    • messageStore.load()
      加载C:\Users\asd\store\commitLog\ 下所有消息日志文件

    该加载的都加载完了,然后启动NettyServer以及一堆的 Executor 线程池

    • remotingServer
      默认监听端口 10911
    • fastRemotingServer
      默认监听端口 10911 - 2 = 10909,莫非和VIP通道有关?后续补充
    • sendMessageExecutor
      发送消息的线程池
    • pullMessageExecutor
      拉取消息的线程池
    • queryMessageExecutor
      查询消息的线程池
    • adminBrokerExecutor
      不知道干啥的线程池
    • clientManageExecutor
      客户端连接管理线程池
    • heartbeatExecutor
      心跳线程池
    • endTransactionExecutor
      事务结束线程池
    • consumerManageExecutor
      消费者连接线程池

    线程池初始化呢完毕后,开启一些定时任务

    • BrokerController.this.getBrokerStats().record();
      固化broker的状态,默认1天执行一次
    • BrokerController.this.consumerOffsetManager.persist();
      固化offset,延迟10秒,每5秒执行一次
    • BrokerController.this.consumerFilterManager.persist();
      Filter固化,延迟10秒,每10秒执行一次
    • BrokerController.this.protectBroker();
      保护broker?后续完善,延迟3分,每3分执行一次
    • BrokerController.this.printWaterMark();
      打印流水信息,发送、拉取、查询、结束事务消息等日志
      延迟10秒,每1秒一次
    • log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
      打印日志:获取已经固化到commitlog,但是还没有被消费的日志的byte大小
      延迟10秒,每60秒执行一次
    • BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
      如果fetchNamesrvAddrByAddressServer=true,则会执行
      主要是通过httpclient从给定的URL动态获取NameSrv的地址
      延迟10秒,每120秒执行一次
    • BrokerController.this.slaveSynchronize.syncAll();
      如果broker的角色是SLAVE,则会执行,主要是从主Broker定期同步topicoffsetdelayOffsetgroup信息
      延迟10秒,每60秒执行一次
    • BrokerController.this.printMasterAndSlaveDiff();
      如果broker的角色是不是SLAVE,则会执行,主要打印SLAVEMaster之间差异的byte大小

    如果启用了TLS安全传输配置,则会启动 fileWatchService?后续补充

    最后调用了三个方法:

    • initialTransaction()
      初始化事务消息的一些服务及Listener
    • initialAcl();
      初始化ACL的一些服务,进行访问控制
    • initialRpcHooks();
      暂时不知道干啥的

    到这里,controller.initialize()算是完成了,回顾一下,其实思路很清晰:

    • 1、加载配置文件
    • 2、启动Netty服务
    • 3、初始化线程池
    • 4、启动定时任务
    • 5、其他的一些配置

    3、start()

    启动就比较直接了,首先启动一些服务:

    • messageStore.start()
      消息固化服务启动
    • remotingServer.start()
      nettyServer启动 10911
    • fastRemotingServer.start()
      vip nettyServer启动,端口10909
    • fileWatchService.start();
      启用TLS加密的服务启动
    • brokerOuterAPI.start();
      启用动态获取NameSrv的服务启动
    • pullRequestHoldService.start();
      获取请求的服务启动
    • clientHousekeepingService.start();
      启动
    • filterServerManager.start();
      过滤器启动
    • BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
      定时任务,从Namesrv获取所有broker注册的信息,延迟10秒,每10秒或者registerNameServerPeriod指定的时间,但是不能少于10秒执行一次
    • brokerStatsManager.start();
      broker状态检查启动,貌似啥也没干
    • this.brokerFastFailure.start();
    • transactionalMessageCheckService.start();
      如果不是SLAVE,则会启动事务

    相关文章

      网友评论

        本文标题:二、RocketMQ-Broker启动流程

        本文链接:https://www.haomeiwen.com/subject/rocugqtx.html