美文网首页
RocketMq源码之Broker启动分析

RocketMq源码之Broker启动分析

作者: 奔跑地蜗牛 | 来源:发表于2020-01-14 20:09 被阅读0次

    简介

    本文用来剖析rocketmq broker相关逻辑,以便了解broker启动时做了哪些事;

    启动类BrokerStartup

    启动方法是createBrokerController,具体流程:

      1. 解析启动参数,获取commandLine对象,代码如下:
     commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
    
    • 2.根据-c 指定的conf文件设置BrokerConfig,NettyServerConfig,NettyClientConfig,如下:
    MixAll.properties2Object(properties, brokerConfig);
    MixAll.properties2Object(properties, nettyServerConfig);
     MixAll.properties2Object(properties, nettyClientConfig);
    MixAll.properties2Object(properties, messageStoreConfig);
    
    • 3.根据分号解析namsrv地址,源码如下:
    String[] addrArray = namesrvAddr.split(";");
    
    • 4.设置HaListenrPort,源码如下:
     messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
    
    • 5.在broker日志中打印出,各项配置信息:
    //BROKER_LOGGER_NAME表示对应log_broker.xml中的设置的日志类型
      log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
                MixAll.printObjectProperties(log, brokerConfig);
                MixAll.printObjectProperties(log, nettyServerConfig);
                MixAll.printObjectProperties(log, nettyClientConfig);
                MixAll.printObjectProperties(log, messageStoreConfig);
    
      1. 生成BrokerContoller对象
        源码如下:
     final BrokerController controller = new BrokerController(
                    brokerConfig,
                    nettyServerConfig,
                    nettyClientConfig,
                    messageStoreConfig);
    

    BrokerController

    属性介绍

    BrokerController是Broker服务管理包装类,其内维护了大量的服务,来管理Topic,ConsumerOffset,PullMessage请求以及相关配置等等,详情如下:

    • NettyServerConfig:Broker充当netty 服务器的配置类,用于设置netty server的属性,以便client连接和拉取消息;

    • NettyClientConfig: Broker充当netty客户端时的配置类,用于设置client的连接属性,以便连接到namesrv,注册broker;

    • MessageStoreConfig:用来设置store Message时的配置类,可以设置消息存储路径,haListenPort,haMasterAddress,diskMaxUsedSpaceRatio(磁盘最大使用比率)等等;

    • BrokerConfig:Broker自身属性配置类,如BrokerIP1用于client连接消费消息,BrokerIP2用于HA服务,autoCreateTopicEnable是否允许自动创建topic等;

    • ConsumerOffsetManager:用来管理消费组索引,其内维护的offsetTable是一个ConcurrentHashMap用来保存topic@Group和消费队列及队列消费索引;

    • ConsumerManager: 用来管理消费者组,其内维护的consumerTable是一个ConcurrentHashMap用来保存group和ConsumerGroupInfo相关信息;

    • ConsumerFilterManager: 用来管理使用了表达式进行数据过滤的消费组;

    • ProducerManager: 用来管理生产者组与相关通道信息,其内维护了的groupChannelTable是一个HashMap;

    • ClientHousekeepingService: 是一个ChannelEventListener,用来监听Netty通道事件,然后进行相关处理,同时启动了一个定时任务,用来检测not active的通道;

    • PullMessageProcessor:是一个NettyRequestProcessor,用来处理拉取消息的请求;

    • PullRequestHoldService:是一个服务线程,用来维护topic@queueId的拉取请求相关的信息的,如对请求中的maxOffset进行判断是否进行存取,其内有一个pullRequestTable是个HashMap;

    • MessageArrivingListener:该listenr是一个接口,用来和PullRequestHoldService配合,可以实现在消息来临时通知对拉取消息请求的管理;

    • Broker2Client:将Broker转变成一个Client,来实现相关远程调用;

    • SubscriptionGroupManager:订阅组管理,其内维护了一个HashMap,用来保存订阅组和订阅组配置信息;

    • ConsumerIdsChangeListener: 消费组发生改变将触发该listener,实现相关逻辑处理,消费组事件分为CHANGE,UNREGISTER,REGISTER三类;

    • RebalanceLockManager: 用来管理消费组中消费队列的LockEntry来管理,每一个MessageQueue都会有一个LockEntry,用来管理每个队列的消费平衡;

    • BrokerOuterAPI: 其内维护了的方法主要是用来和namesrv以及slave和master进行交互的,如registerBroker,unregisterBroker,getAllDelayOffset等;

    • SlaveSynchronize: 用于slave同步master信息的;

    • FilterServerManager:过滤服务管理器;

    • BrokerStatsManager:broker状态管理器;

    • MessageStore:用来管理消息存储,默认实现类是DefaultMessageStore;

    • TopicConfigManager:订阅配置管理器;

    • TransactionalMessageCheckService:是一个服务线程,用来校验事务消息状态;

    • TransactionalMessageService:事务消息服务,用来操作事务消息的,如prepare,deletePrepareMessage等;

    • FileWatchService: 文件监视服务,用来观察文件是否发生变更;

    initialize()方法介绍

    initialize()方法会做如下事情:

      1. 加载topic,consumerOffset,subscriptionGroup,consumerFilter,主要是加载config文件夹里面的配置,源码如下:
       public boolean initialize() throws CloneNotSupportedException {
            boolean result = this.topicConfigManager.load();
    
            result = result && this.consumerOffsetManager.load();
            result = result && this.subscriptionGroupManager.load();
            result = result && this.consumerFilterManager.load();
    
    • 2 .加载MessageStore,获取之前存储的消息信息;
         if (result) {
                try {
                    this.messageStore =
                        new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
                            this.brokerConfig);
                    this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
                    //load plugin
                    MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
                    this.messageStore = MessageStoreFactory.build(context, this.messageStore);
                    this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
                } catch (IOException e) {
                    result = false;
                    log.error("Failed to initialize", e);
                }
            }
    
            result = result && this.messageStore.load();
    
      1. 设置remotingServer,fastRemotingServer,以及创建sendMessageExecutor,pullMessageExecutor,queryMessageExecutor,adminBrokerExecutor,clientManageExecutor,heartbeatExecutor,endTransactionExecutor,consumerManagerExecutor等线程池,以及调用registerProcessro方法将相关Processor注册到这些线程里,进行定期任务调度;
     public void registerProcessor() {
            /**
             * SendMessageProcessor
             */
            SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
            sendProcessor.registerSendMessageHook(sendMessageHookList);
            sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
    
            this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
            this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
            this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
            this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
            /**
             * PullMessageProcessor
             */
            this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
            this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
    
            /**
             * QueryMessageProcessor
             */
            NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
            this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
            this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
    
            this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
    
            /**
             * ClientManageProcessor
             */
            ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
            this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
            this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
            this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
    
            this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
    
            /**
             * ConsumerManageProcessor
             */
            ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);
            this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
            this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
            this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
    
            this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
    
            /**
             * EndTransactionProcessor
             */
            this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
    
            /**
             * Default
             */
            AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
            this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
            this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
        }
    
    
    • 4 .进行各类定时任务;
    • 4.1 定时处理brokerStats记录,源码如下:
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.getBrokerStats().record();
                        } catch (Throwable e) {
                            log.error("schedule record error.", e);
                        }
                    }
                }, initialDelay, period, TimeUnit.MILLISECONDS);
    
    
    • 4.2 定时处理消费索引持久化,源码如下:
     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.consumerOffsetManager.persist();
                        } catch (Throwable e) {
                            log.error("schedule persist consumerOffset error.", e);
                        }
                    }
                }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    
    • 4.3 定时处理消费过滤管理,源码如下:
      this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.consumerFilterManager.persist();
                        } catch (Throwable e) {
                            log.error("schedule persist consumer filter error.", e);
                        }
                    }
                }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
    
    • 4.4 定时进行Broker保护,如果消费组消费太慢,则会禁用该消费组;
      this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.protectBroker();
                        } catch (Throwable e) {
                            log.error("protectBroker error.", e);
                        }
                    }
                }, 3, 3, TimeUnit.MINUTES);
    
    • 4.5 定时进行namesrv地址拉取
       if (this.brokerConfig.getNamesrvAddr() != null) {
                    this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
                    log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
                } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
                    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                        @Override
                        public void run() {
                            try {
                                BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
                            } catch (Throwable e) {
                                log.error("ScheduledTask fetchNameServerAddr exception", e);
                            }
                        }
                    }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
                }
    
    • 4.6 salve定时同步master生产和消费信息,master定时打印出master和slave差异信息:
     if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                    if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
                        this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
                        this.updateMasterHAServerAddrPeriodically = false;
                    } else {
                        this.updateMasterHAServerAddrPeriodically = true;
                    }
    
                    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                        @Override
                        public void run() {
                            try {
                                BrokerController.this.slaveSynchronize.syncAll();
                            } catch (Throwable e) {
                                log.error("ScheduledTask syncAll slave exception", e);
                            }
                        }
                    }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
                } else {
                    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                        @Override
                        public void run() {
                            try {
                                BrokerController.this.printMasterAndSlaveDiff();
                            } catch (Throwable e) {
                                log.error("schedule printMasterAndSlaveDiff error.", e);
                            }
                        }
                    }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
                }
    
    
      1. 处理tslMode,以及调用initialTransaction,initialAcl,initialRpcHooks方法;
     if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
                    // Register a listener to reload SslContext
                    try {
                        fileWatchService = new FileWatchService(
                            new String[] {
                                TlsSystemConfig.tlsServerCertPath,
                                TlsSystemConfig.tlsServerKeyPath,
                                TlsSystemConfig.tlsServerTrustCertPath
                            },
                            new FileWatchService.Listener() {
                                boolean certChanged, keyChanged = false;
    
                                @Override
                                public void onChanged(String path) {
                                    if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                        log.info("The trust certificate changed, reload the ssl context");
                                        reloadServerSslContext();
                                    }
                                    if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                                        certChanged = true;
                                    }
                                    if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                        keyChanged = true;
                                    }
                                    if (certChanged && keyChanged) {
                                        log.info("The certificate and private key changed, reload the ssl context");
                                        certChanged = keyChanged = false;
                                        reloadServerSslContext();
                                    }
                                }
    
                                private void reloadServerSslContext() {
                                    ((NettyRemotingServer) remotingServer).loadSslContext();
                                    ((NettyRemotingServer) fastRemotingServer).loadSslContext();
                                }
                            });
                    } catch (Exception e) {
                        log.warn("FileWatchService created error, can't load the certificate dynamically");
                    }
                }
                initialTransaction();
                initialAcl();
                initialRpcHooks();
    

    相关文章

      网友评论

          本文标题:RocketMq源码之Broker启动分析

          本文链接:https://www.haomeiwen.com/subject/tsbgyctx.html