美文网首页rocketMq理论与实践
RocketMq broker 启动流程

RocketMq broker 启动流程

作者: 晴天哥_王志 | 来源:发表于2020-05-04 20:48 被阅读0次

    开篇

    • 这个系列的主要目的是介绍RocketMq broker的原理和用法,在这个系列当中会介绍 broker 配置文件、broker 启动流程、broker延迟消息、broker消息存储。

    • 这篇文章主要介绍broker 启动流程,主要介绍broker启动过程中会启动哪些服务,各个服务的作用等。

    启动流程

    BrokerStartup

    public class BrokerStartup {
    
        public static Properties properties = null;
        public static CommandLine commandLine = null;
        public static String configFile = null;
        public static InternalLogger log;
    
        public static void main(String[] args) {
            // createBrokerController创建brokerController对象
            // 通过start()方法启动
            start(createBrokerController(args));
        }
    
        public static BrokerController start(BrokerController controller) {
            try {
                // 启动controller
                controller.start();
    
                return controller;
            } catch (Throwable e) {
                e.printStackTrace();
                System.exit(-1);
            }
    
            return null;
        }
    
        public static BrokerController createBrokerController(String[] args) {
           
            try {
                final BrokerConfig brokerConfig = new BrokerConfig();
                final NettyServerConfig nettyServerConfig = new NettyServerConfig();
                final NettyClientConfig nettyClientConfig = new NettyClientConfig();
    
                nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
                    String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
                // 设置监听端口10911
                nettyServerConfig.setListenPort(10911);
                final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
    
                String namesrvAddr = brokerConfig.getNamesrvAddr();
                if (null != namesrvAddr) {
                    try {
                        String[] addrArray = namesrvAddr.split(";");
                        for (String addr : addrArray) {
                            RemotingUtil.string2SocketAddress(addr);
                        }
                    } catch (Exception e) {
                        System.out.printf(
                            "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
                            namesrvAddr);
                        System.exit(-3);
                    }
                }
    
                switch (messageStoreConfig.getBrokerRole()) {
                    case ASYNC_MASTER:
                    case SYNC_MASTER:
                        brokerConfig.setBrokerId(MixAll.MASTER_ID);
                        break;
                    case SLAVE:
                        if (brokerConfig.getBrokerId() <= 0) {
                            System.out.printf("Slave's brokerId must be > 0");
                            System.exit(-3);
                        }
    
                        break;
                    default:
                        break;
                }
    
                if (messageStoreConfig.isEnableDLegerCommitLog()) {
                    brokerConfig.setBrokerId(-1);
                }
    
                // 设置HA监听端口10912
                messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
                // 1、创建BrokerController对象
                final BrokerController controller = new BrokerController(
                    brokerConfig,
                    nettyServerConfig,
                    nettyClientConfig,
                    messageStoreConfig);
                // remember all configs to prevent discard
                controller.getConfiguration().registerConfig(properties);
                // 2、初始化BrokerController对象
                boolean initResult = controller.initialize();
                if (!initResult) {
                    controller.shutdown();
                    System.exit(-3);
                }
    
                return controller;
            } catch (Throwable e) {
                e.printStackTrace();
                System.exit(-1);
            }
    
            return null;
        }
    }
    
    • BrokerStartup负责创建BrokerController对象,并通过start启动BrokerController对象。
    • createBrokerController负责创建BrokerController对象和初始化该对象。
    • BrokerStartup#start负责启动BrokerController对象。
    • 额外的信息broker的服务监听端口为10911,HA监听端口为10912=10911+1。

    创建BrokerController对象

    public class BrokerController {
    
        public BrokerController(
            final BrokerConfig brokerConfig,
            final NettyServerConfig nettyServerConfig,
            final NettyClientConfig nettyClientConfig,
            final MessageStoreConfig messageStoreConfig
        ) {
            // 各类配置相关
            this.brokerConfig = brokerConfig;
            this.nettyServerConfig = nettyServerConfig;
            this.nettyClientConfig = nettyClientConfig;
            this.messageStoreConfig = messageStoreConfig;
            this.consumerOffsetManager = new ConsumerOffsetManager(this);
            this.topicConfigManager = new TopicConfigManager(this);
    
            // 各类service相关
            this.pullMessageProcessor = new PullMessageProcessor(this);
            this.pullRequestHoldService = new PullRequestHoldService(this);
            this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
            this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
            this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
            this.consumerFilterManager = new ConsumerFilterManager(this);
            this.producerManager = new ProducerManager();
            this.clientHousekeepingService = new ClientHousekeepingService(this);
            this.broker2Client = new Broker2Client(this);
            this.subscriptionGroupManager = new SubscriptionGroupManager(this);
            this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
            this.filterServerManager = new FilterServerManager(this);
            this.slaveSynchronize = new SlaveSynchronize(this);
    
            // 各类ThreadPoolQueue
            this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
            this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
            this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());
            this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
            this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
            this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
            this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
            this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());
    
            this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
            this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
    
            this.brokerFastFailure = new BrokerFastFailure(this);
            this.configuration = new Configuration(
                log,
                BrokerPathConfigHelper.getBrokerConfigPath(),
                this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
            );
        }
    }
    
    • BrokerController创建主要是初始化各类配置对象。

    初始化BrokerController对象

    public class BrokerController {
    
        public boolean initialize() throws CloneNotSupportedException {
            // 1、加载topicConfig、consumerOffset、subscriptionGroup、consumerFilter等配置
            boolean result = this.topicConfigManager.load();
            result = result && this.consumerOffsetManager.load();
            result = result && this.subscriptionGroupManager.load();
            result = result && this.consumerFilterManager.load();
    
            if (result) {
                try {
                    // 2、创建messageStore对象
                    this.messageStore =
                        new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
                            this.brokerConfig);
    
                    if (messageStoreConfig.isEnableDLegerCommitLog()) {
                        DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
                        ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
                    }
                    this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
                    //load plugin
                    MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
                    this.messageStore = MessageStoreFactory.build(context, this.messageStore);
                    this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
                } catch (IOException e) {
                    result = false;
                    log.error("Failed to initialize", e);
                }
            }
            // 3、加载messageStore对象
            result = result && this.messageStore.load();
    
            if (result) {
                // 4、创建remotingServer和fastRemotingServer
                this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
                NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
                fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
                this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
                // 5、创建消息发送的sendMessageExecutor
                this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
                    this.brokerConfig.getSendMessageThreadPoolNums(),
                    this.brokerConfig.getSendMessageThreadPoolNums(),
                    1000 * 60,
                    TimeUnit.MILLISECONDS,
                    this.sendThreadPoolQueue,
                    new ThreadFactoryImpl("SendMessageThread_"));
    
                // 6、创建消息拉取的pullMessageExecutor
                this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
                    this.brokerConfig.getPullMessageThreadPoolNums(),
                    this.brokerConfig.getPullMessageThreadPoolNums(),
                    1000 * 60,
                    TimeUnit.MILLISECONDS,
                    this.pullThreadPoolQueue,
                    new ThreadFactoryImpl("PullMessageThread_"));
    
                // 7、创建消息应答的replyMessageExecutor
                this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
                    this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
                    this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
                    1000 * 60,
                    TimeUnit.MILLISECONDS,
                    this.replyThreadPoolQueue,
                    new ThreadFactoryImpl("ProcessReplyMessageThread_"));
    
                // 8、创建消息查询的queryMessageExecutor
                this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
                    this.brokerConfig.getQueryMessageThreadPoolNums(),
                    this.brokerConfig.getQueryMessageThreadPoolNums(),
                    1000 * 60,
                    TimeUnit.MILLISECONDS,
                    this.queryThreadPoolQueue,
                    new ThreadFactoryImpl("QueryMessageThread_"));
    
                // 9、创建broker管理的adminBrokerExecutor
                this.adminBrokerExecutor =
                    Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
                        "AdminBrokerThread_"));
    
                // 10、创建client管理的clientManageExecutor 
                this.clientManageExecutor = new ThreadPoolExecutor(
                    this.brokerConfig.getClientManageThreadPoolNums(),
                    this.brokerConfig.getClientManageThreadPoolNums(),
                    1000 * 60,
                    TimeUnit.MILLISECONDS,
                    this.clientManagerThreadPoolQueue,
                    new ThreadFactoryImpl("ClientManageThread_"));
    
                // 11、创建心跳管理的heartbeatExecutor 
                this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
                    this.brokerConfig.getHeartbeatThreadPoolNums(),
                    this.brokerConfig.getHeartbeatThreadPoolNums(),
                    1000 * 60,
                    TimeUnit.MILLISECONDS,
                    this.heartbeatThreadPoolQueue,
                    new ThreadFactoryImpl("HeartbeatThread_", true));
    
                // 12、创建事物消息管理的endTransactionExecutor 
                this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
                    this.brokerConfig.getEndTransactionThreadPoolNums(),
                    this.brokerConfig.getEndTransactionThreadPoolNums(),
                    1000 * 60,
                    TimeUnit.MILLISECONDS,
                    this.endTransactionThreadPoolQueue,
                    new ThreadFactoryImpl("EndTransactionThread_"));
    
                // 13、创建consumer管理的consumerManageExecutor 
                this.consumerManageExecutor =
                    Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
                        "ConsumerManageThread_"));
    
                // 14、注册各类处理Processor
                this.registerProcessor();
                
                final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();
                final long period = 1000 * 60 * 60 * 24;
    
                // 15、初始化broker状态统计定时任务
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.getBrokerStats().record();
                        } catch (Throwable e) {
                            log.error("schedule record error.", e);
                        }
                    }
                }, initialDelay, period, TimeUnit.MILLISECONDS);
    
                // 16、初始化consumerOffset持久化定时任务
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.consumerOffsetManager.persist();
                        } catch (Throwable e) {
                            log.error("schedule persist consumerOffset error.", e);
                        }
                    }
                }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    
                // 17、初始化consumerFilter持久化定时任务
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.consumerFilterManager.persist();
                        } catch (Throwable e) {
                            log.error("schedule persist consumer filter error.", e);
                        }
                    }
                }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
    
                // 18、初始化protectBroker定时任务
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.protectBroker();
                        } catch (Throwable e) {
                            log.error("protectBroker error.", e);
                        }
                    }
                }, 3, 3, TimeUnit.MINUTES);
    
                // 19、初始化printWaterMark定时任务
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.printWaterMark();
                        } catch (Throwable e) {
                            log.error("printWaterMark error.", e);
                        }
                    }
                }, 10, 1, TimeUnit.SECONDS);
    
                // 20、初始化dispatchBehindBytes定时任务
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                    @Override
                    public void run() {
                        try {
                            log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
                        } catch (Throwable e) {
                            log.error("schedule dispatchBehindBytes error.", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
    
                // 21、初始化fetchNameServerAddr定时任务
                if (this.brokerConfig.getNamesrvAddr() != null) {
                    this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
                    log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
                } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
                    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                        @Override
                        public void run() {
                            try {
                                BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
                            } catch (Throwable e) {
                                log.error("ScheduledTask fetchNameServerAddr exception", e);
                            }
                        }
                    }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
                }
    
                // 省略中间的代码
    
                // 22、初始化事务消息相关
                initialTransaction();
                // 23、初始化acl相关
                initialAcl();
                initialRpcHooks();
            }
            return result;
        }
    }
    
    • 1、加载topicConfig、consumerOffset、subscriptionGroup、consumerFilter等配置。
    • 2、创建messageStore对象。
    • 3、加载messageStore对象。
    • 4、创建remotingServer和fastRemotingServer。
    • 5、创建消息发送的sendMessageExecutor。
    • 6、创建消息拉取的pullMessageExecutor。
    • 7、创建消息应答的replyMessageExecutor。
    • 8、创建消息查询的queryMessageExecutor。
    • 9、创建broker管理的adminBrokerExecutor。
    • 10、创建client管理的clientManageExecutor。
    • 11、创建心跳管理的heartbeatExecutor。
    • 12、创建事物消息管理的endTransactionExecutor。
    • 13、创建consumer管理的consumerManageExecutor。
    • 14、注册各类处理Processor。
    • 15、初始化broker状态统计定时任务。
    • 16、初始化consumerOffset持久化定时任务。
    • 17、初始化consumerFilter持久化定时任务。
    • 18、初始化protectBroker定时任务。
    • 19、初始化printWaterMark定时任务。
    • 20、初始化dispatchBehindBytes定时任务。
    • 21、初始化fetchNameServerAddr定时任务。
    • 22、初始化事务消息相关。
    • 23、初始化acl相关。

    registerProcessor

    public class BrokerController {
    
        public void registerProcessor() {
            /**
             * SendMessageProcessor
             */
            SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
            sendProcessor.registerSendMessageHook(sendMessageHookList);
            sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
    
            this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
            this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
            this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
            this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
            /**
             * PullMessageProcessor
             */
            this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
            this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
    
            /**
             * ReplyMessageProcessor
             */
            ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);
            replyMessageProcessor.registerSendMessageHook(sendMessageHookList);
    
            this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
            this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
    
            /**
             * QueryMessageProcessor
             */
            NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
            this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
            this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
    
            this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
    
            /**
             * ClientManageProcessor
             */
            ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
            this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
            this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
            this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
    
            this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
    
            /**
             * ConsumerManageProcessor
             */
            ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);
            this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
            this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
            this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
    
            this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
    
            /**
             * EndTransactionProcessor
             */
            this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
    
            /**
             * Default
             */
            AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
            this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
            this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
        }
    }
    
    • 注册各类Processor对象,具体功能看Processor的名字即可。

    initialAcl

    public class BrokerController {
        private void initialAcl() {
            if (!this.brokerConfig.isAclEnable()) {
                log.info("The broker dose not enable acl");
                return;
            }
    
            List<AccessValidator> accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);
            if (accessValidators == null || accessValidators.isEmpty()) {
                log.info("The broker dose not load the AccessValidator");
                return;
            }
    
            for (AccessValidator accessValidator: accessValidators) {
                final AccessValidator validator = accessValidator;
                accessValidatorMap.put(validator.getClass(),validator);
                this.registerServerRPCHook(new RPCHook() {
    
                    @Override
                    public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
                        //Do not catch the exception
                        validator.validate(validator.parse(request, remoteAddr));
                    }
    
                    @Override
                    public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
                    }
                });
            }
        }
    
        public void registerServerRPCHook(RPCHook rpcHook) {
            getRemotingServer().registerRPCHook(rpcHook);
            this.fastRemotingServer.registerRPCHook(rpcHook);
        }
    }
    
    • initialAcl的核心逻辑是往RemotingServer和fastRemotingServer注册rpcHook。
    • RemotingServer在处理各类事件的Processor之前会先执行rpcHook。

    启动BrokerController对象

    public class BrokerController {
    
        public void start() throws Exception {
            if (this.messageStore != null) {
                this.messageStore.start();
            }
    
            if (this.remotingServer != null) {
                this.remotingServer.start();
            }
    
            if (this.fastRemotingServer != null) {
                this.fastRemotingServer.start();
            }
    
            if (this.fileWatchService != null) {
                this.fileWatchService.start();
            }
    
            if (this.brokerOuterAPI != null) {
                this.brokerOuterAPI.start();
            }
    
            if (this.pullRequestHoldService != null) {
                this.pullRequestHoldService.start();
            }
    
            if (this.clientHousekeepingService != null) {
                this.clientHousekeepingService.start();
            }
    
            if (this.filterServerManager != null) {
                this.filterServerManager.start();
            }
    
            if (!messageStoreConfig.isEnableDLegerCommitLog()) {
                startProcessorByHa(messageStoreConfig.getBrokerRole());
                handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
                this.registerBrokerAll(true, false, true);
            }
    
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                    } catch (Throwable e) {
                        log.error("registerBrokerAll Exception", e);
                    }
                }
            }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
    
            if (this.brokerStatsManager != null) {
                this.brokerStatsManager.start();
            }
    
            if (this.brokerFastFailure != null) {
                this.brokerFastFailure.start();
            }
        }
    }
    
    • 启动messageStore,负责消息的存储。
    • 启动remotingServer,开启server端的监听。
    • 启动registerBrokerAll,定时汇报broker信息给namesrv。

    messageStore

    public class DefaultMessageStore implements MessageStore {
    
        public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
            final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
            this.messageArrivingListener = messageArrivingListener;
            this.brokerConfig = brokerConfig;
            this.messageStoreConfig = messageStoreConfig;
            this.brokerStatsManager = brokerStatsManager;
            this.allocateMappedFileService = new AllocateMappedFileService(this);
            // 核心对象CommitLog
            if (messageStoreConfig.isEnableDLegerCommitLog()) {
                this.commitLog = new DLedgerCommitLog(this);
            } else {
                this.commitLog = new CommitLog(this);
            }
            // 核心对象consumeQueue
            this.consumeQueueTable = new ConcurrentHashMap<>(32);
    
            this.flushConsumeQueueService = new FlushConsumeQueueService();
            this.cleanCommitLogService = new CleanCommitLogService();
            this.cleanConsumeQueueService = new CleanConsumeQueueService();
            this.storeStatsService = new StoreStatsService();
            // 核心服务IndexService
            this.indexService = new IndexService(this);
            if (!messageStoreConfig.isEnableDLegerCommitLog()) {
                this.haService = new HAService(this);
            } else {
                this.haService = null;
            }
            // 核心服务ReputMessageService,负责commitLog和consumer queue之间的索引同步
            this.reputMessageService = new ReputMessageService();
    
            // 核心服务ScheduleMessageService,负责延迟消息
            this.scheduleMessageService = new ScheduleMessageService(this);
    
            this.transientStorePool = new TransientStorePool(messageStoreConfig);
            if (messageStoreConfig.isTransientStorePoolEnable()) {
                this.transientStorePool.init();
            }
    
            this.allocateMappedFileService.start();
    
            this.indexService.start();
    
            this.dispatcherList = new LinkedList<>();
            this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
            this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
    
            File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));
            MappedFile.ensureDirOK(file.getParent());
            lockFile = new RandomAccessFile(file, "rw");
        }
    }
    
    • CommitLog用于消息存储对象。
    • consumeQueueTable用于存储consumeQueue。
    • indexService负责建立消息的索引。
    • ReputMessageService负责将CommitLog转存至consumeQueue。
    • ScheduleMessageService负责延迟消息相关的服务。

    registerBrokerAll

    public class BrokerController {
    
        public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
            TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
    
            if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
                || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
                ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
                for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
                    TopicConfig tmp =
                        new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
                            this.brokerConfig.getBrokerPermission());
                    topicConfigTable.put(topicConfig.getTopicName(), tmp);
                }
                topicConfigWrapper.setTopicConfigTable(topicConfigTable);
            }
    
            if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
                this.getBrokerAddr(),
                this.brokerConfig.getBrokerName(),
                this.brokerConfig.getBrokerId(),
                this.brokerConfig.getRegisterBrokerTimeoutMills())) {
                doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
            }
        }
    }
    
    
    public class TopicConfigManager extends ConfigManager {
    
        private final ConcurrentMap<String, TopicConfig> topicConfigTable =
            new ConcurrentHashMap<String, TopicConfig>(1024);
    
        public TopicConfigSerializeWrapper buildTopicConfigSerializeWrapper() {
            TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
            topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);
            topicConfigSerializeWrapper.setDataVersion(this.dataVersion);
            return topicConfigSerializeWrapper;
        }
    }
    
    
    public class TopicConfigSerializeWrapper extends RemotingSerializable {
        private ConcurrentMap<String, TopicConfig> topicConfigTable =
            new ConcurrentHashMap<String, TopicConfig>();
        private DataVersion dataVersion = new DataVersion();
    }
    
    
    public class TopicConfig {
        private static final String SEPARATOR = " ";
        public static int defaultReadQueueNums = 16;
        public static int defaultWriteQueueNums = 16;
        private String topicName;
        private int readQueueNums = defaultReadQueueNums;
        private int writeQueueNums = defaultWriteQueueNums;
        private int perm = PermName.PERM_READ | PermName.PERM_WRITE;
        private TopicFilterType topicFilterType = TopicFilterType.SINGLE_TAG;
        private int topicSysFlag = 0;
        private boolean order = false;
    }
    
    • registerBrokerAll定时将broker的TopicConfig信息上报namsrv。

    相关文章

      网友评论

        本文标题:RocketMq broker 启动流程

        本文链接:https://www.haomeiwen.com/subject/lutyghtx.html