美文网首页
RocketMq源码解析

RocketMq源码解析

作者: 枫叶红花 | 来源:发表于2023-02-27 22:06 被阅读0次

    一、NameServer启动

    源码入口:NamesrvStartup#main

    1.NamesrvController controller = createNamesrvController(args);
    • 检测命令行参数
    • 创建核心配置对象,NamesrvConfig、NettyServerConfig
    • 解析 -c 、-p参数
    • 检查RocketMQ_HOME环境变量
    • final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);创建controller
    • controller.getConfiguration().registerConfig(properties); 注册所有配置信息
    2.start(controller);
    • controller.initialize(); 执行初始化
      ○ this.kvConfigManager.load(); 加载KV配置
      ○ this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);创建NettyServer网络处理对象
      ○ this.remotingExecutor =Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); 创建Netty服务器工作线程池
      ○ this.registerProcessor(); 注册NameServer的Processor 注册到RemotingServer中
      ○ NamesrvController.this.routeInfoManager.scanNotActiveBroker() 启动定时任务,移除不活跃的Broker
      ○ NamesrvController.this.kvConfigManager.printAllPeriodically() 定时打印KV配置信息
    • Runtime.getRuntime().addShutdownHook 注册关闭钩子,在关闭服务时释放资源
    • controller.start(); 启动controller

    NameServer的作用主要有两个:
    1.维护broker的服务地址信息,并进行更新
    2.给Producer、consumer提供Broker的服务列表


    image.png

    二、Broker启动

    源码入口:Brokerstartup#main

    1.createBrokerController(args)
    • 构建四个核心配置对象:BrokerConfig、NettyServerConfig、NettyClientConfig、MessageStoreConfig
    • BrokerConfig只解析 -c参数
    • RocketMq_HOME环境变量检查
    • RemotingUtil.string2SocketAddress(addr) 将namesrvAddr地址进行拆分
    • messageStoreConfig.getBrokerRole() 通过BrokerId判断主从:masterId=0,Deldger集群的所有Broker节点ID都是-1
    • 解析 -p、-m参数,并将解析的参数添加到四个核心配置对象中
    • BrokerController controller = new BrokerController 创建brokerController,将四个核心配置类传入
    • controller.getConfiguration().registerConfig(properties); 重新注册(更新)配置
    • controller.initialize(); 初始化controller
      ○ 加载磁盘上的配置文件:topicConfigManager、consumerOffsetManager、subscriptionGroupManager、consumerFilterManager
      ○ this.messageStore =new DefaultMessageStore() 构建消息存储组件
      ○ this.messageStore.load() 加载磁盘文件
      ○ this.remotingServer = new NettyRemotingServer 构建Netty网络组件
      ○ this.fastRemotingServer = new NettyRemotingServer 这个fastRemotingServer与RemotingServer功能基本差不多,处理VIP端口请求
      ○ 后面就是初始化一些线程池
      ○ this.registerProcessor(); broker注册一些Processor处理方法
    • Runtime.getRuntime().addShutdownHook 注册关闭钩子
    2.start(BrokerController controller)
    • this.messageStore.start(); 这里启动服务主要是为了将CommitLog的写入事件分发给ComsumeQueue和IndexFile
    • 启动两个Netty服务:remotingServer、fastRemotingServer
    • this.fileWatchService.start(); 文件监听服务
    • this.brokerOuterAPI.start(); brokerOuterAPI可以理解为一个Netty客户端,往外发请求的组件,例如发送心跳
    • this.pullRequestHoldService.start(); 长轮询请求暂停服务
    • this.filterServerManager.start(); 使用filter进行过滤
    • BrokerController.this.registerBrokerAll() Broker核心的心跳注册任务,主要作用就是将broker注册到Namesrv中

    broker的核心作用:
    1.作为client时,向nameServer发送心跳信息、发起事务的状态检查
    2.作为服务端时,用于存储消息、响应consumer端的请求

    三、Netty服务注册框架

    四、Broker心跳注册过程

    源码入口:BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister())

    public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
        TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
    
        if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
            || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
            ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
            for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
                TopicConfig tmp =
                    new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
                                    this.brokerConfig.getBrokerPermission());
                topicConfigTable.put(topicConfig.getTopicName(), tmp);
            }
            topicConfigWrapper.setTopicConfigTable(topicConfigTable);
        }
        //这里才是比较关键的地方。先判断是否需要注册,然后调用doRegisterBrokerAll方法真正去注册。
        if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
                                          this.getBrokerAddr(),
                                          this.brokerConfig.getBrokerName(),
                                          this.brokerConfig.getBrokerId(),
                                          this.brokerConfig.getRegisterBrokerTimeoutMills())) {
            doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
        }
    }
    
    // Broker注册最核心的部分
    private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
                                     TopicConfigSerializeWrapper topicConfigWrapper) {
        // 注册broker方法
        List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
            this.brokerConfig.getBrokerClusterName(),
            this.getBrokerAddr(),
            this.brokerConfig.getBrokerName(),
            this.brokerConfig.getBrokerId(),
            this.getHAServerAddr(),
            topicConfigWrapper,
            this.filterServerManager.buildNewFilterServerList(),
            oneway,
            this.brokerConfig.getRegisterBrokerTimeoutMills(),
            this.brokerConfig.isCompressedRegister());
    
        if (registerBrokerResultList.size() > 0) {
            RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
            if (registerBrokerResult != null) {
                //注册完保存主从节点的地址
                if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
                    this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
                }
    
                this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
    
                if (checkOrderConfig) {
                    this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
                }
            }
        }
    }
    
    public List<RegisterBrokerResult> registerBrokerAll(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final boolean oneway,
        final int timeoutMills,
        final boolean compressed) {
        //使用CopyOnWriteArrayList提升并发安全性
        final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
        // 获取所有nameServer的地址信息
        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
        if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
    
            final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
            requestHeader.setBrokerAddr(brokerAddr);
            requestHeader.setBrokerId(brokerId);
            requestHeader.setBrokerName(brokerName);
            requestHeader.setClusterName(clusterName);
            requestHeader.setHaServerAddr(haServerAddr);
            requestHeader.setCompressed(compressed);
    
            RegisterBrokerBody requestBody = new RegisterBrokerBody();
            requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
            requestBody.setFilterServerList(filterServerList);
            final byte[] body = requestBody.encode(compressed);
            final int bodyCrc32 = UtilAll.crc32(body);
            requestHeader.setBodyCrc32(bodyCrc32);
            //通过CountDownLatch,保证在所有NameServer上完成注册后再一起结束。
            final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
            for (final String namesrvAddr : nameServerAddressList) {
                brokerOuterExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
                            if (result != null) {
                                registerBrokerResultList.add(result);
                            }
    
                            log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                        } catch (Exception e) {
                            log.warn("registerBroker Exception, {}", namesrvAddr, e);
                        } finally {
                            countDownLatch.countDown();
                        }
                    }
                });
            }
    
            try {
                countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
            }
        }
    
        return registerBrokerResultList;
    }
    

    NameServer处理请求:

    //NameServer处理请求的核心代码
    @Override
        public RemotingCommand processRequest(ChannelHandlerContext ctx,
                                              RemotingCommand request) throws RemotingCommandException {
    
        if (ctx != null) {
            log.debug("receive request, {} {} {}",
                      request.getCode(),
                      RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                      request);
        }
    
    
        switch (request.getCode()) {
            case RequestCode.PUT_KV_CONFIG:
                return this.putKVConfig(ctx, request);
            case RequestCode.GET_KV_CONFIG:
                return this.getKVConfig(ctx, request);
            case RequestCode.DELETE_KV_CONFIG:
                return this.deleteKVConfig(ctx, request);
            case RequestCode.QUERY_DATA_VERSION:
                return queryBrokerTopicConfig(ctx, request);
            case RequestCode.REGISTER_BROKER: //Broker注册请求处理。版本默认是当前框架版本
                Version brokerVersion = MQVersion.value2Version(request.getVersion());
                if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                    return this.registerBrokerWithFilterServer(ctx, request); //当前版本
                } else {
                    return this.registerBroker(ctx, request);
                }
            case RequestCode.UNREGISTER_BROKER:
                return this.unregisterBroker(ctx, request);
            case RequestCode.GET_ROUTEINFO_BY_TOPIC:
                return this.getRouteInfoByTopic(ctx, request);
            case RequestCode.GET_BROKER_CLUSTER_INFO:
                return this.getBrokerClusterInfo(ctx, request);
            case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
                return this.wipeWritePermOfBroker(ctx, request);
            case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
                return getAllTopicListFromNameserver(ctx, request);
            case RequestCode.DELETE_TOPIC_IN_NAMESRV:
                return deleteTopicInNamesrv(ctx, request);
            case RequestCode.GET_KVLIST_BY_NAMESPACE:
                return this.getKVListByNamespace(ctx, request);
            case RequestCode.GET_TOPICS_BY_CLUSTER:
                return this.getTopicsByCluster(ctx, request);
            case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
                return this.getSystemTopicListFromNs(ctx, request);
            case RequestCode.GET_UNIT_TOPIC_LIST:
                return this.getUnitTopicList(ctx, request);
            case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
                return this.getHasUnitSubTopicList(ctx, request);
            case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
                return this.getHasUnitSubUnUnitTopicList(ctx, request);
            case RequestCode.UPDATE_NAMESRV_CONFIG:
                return this.updateConfig(ctx, request);
            case RequestCode.GET_NAMESRV_CONFIG:
                return this.getConfig(ctx, request);
            default:
                break;
        }
        return null;
    }
    

    实际就是将broker信息注册到routeInfo中:

    public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
        final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
        final RegisterBrokerRequestHeader requestHeader =
            (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
    
        if (!checksum(ctx, request, requestHeader)) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("crc32 not match");
            return response;
        }
    
        RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
    
        if (request.getBody() != null) {
            try {
                registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed());
            } catch (Exception e) {
                throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e);
            }
        } else {
            registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));
            registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);
        }
        //routeInfoManager就是管理路由信息的核心组件。
        RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
            requestHeader.getClusterName(),
            requestHeader.getBrokerAddr(),
            requestHeader.getBrokerName(),
            requestHeader.getBrokerId(),
            requestHeader.getHaServerAddr(),
            registerBrokerBody.getTopicConfigSerializeWrapper(),
            registerBrokerBody.getFilterServerList(),
            ctx.channel());
    
        responseHeader.setHaServerAddr(result.getHaServerAddr());
        responseHeader.setMasterAddr(result.getMasterAddr());
    
        byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
        response.setBody(jsonValue);
    
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }
    

    五、Producer发送消息

    源码入口:DefaultMQProducer#start
    1.this.defaultMQProducerImpl.start(); 生产端启动

    public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                // 默认就是CREATE_JUST
                this.serviceState = ServiceState.START_FAILED;
    
                this.checkConfig();
                //修改当前的instanceName为当前进程ID
                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }
                //客户端核心的MQ客户端工厂 对于事务消息发送者,在这里面会完成事务消息的发送者的服务注册
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
                //注册MQ客户端工厂示例
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                                                + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                                                null);
                }
    
                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
                //启动示例 --所有客户端组件都交由mQClientFactory启动
                if (startFactory) {
                    mQClientFactory.start();
                }
    
                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                         this.defaultMQProducer.isSendMessageWithVIPChannel());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                                            + this.serviceState
                                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                                            null);
            default:
                break;
        }
        // 向所有的broker发送心跳
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    
        this.startScheduledTask();
    
    }
    

    六、Consumer消费消息

    消费端入口:DefaultMQPushConsumer#start
    this.defaultMQPushConsumerImpl.start();

    public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                         this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                this.serviceState = ServiceState.START_FAILED;
    
                this.checkConfig();
    
                this.copySubscription();
    
                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultMQPushConsumer.changeInstanceNameToPID();
                }
                //客户端示例工厂,生产者也是交由这个工厂启动的。
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
                //负载均衡策略
                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
    
                this.pullAPIWrapper = new PullAPIWrapper(
                    mQClientFactory,
                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
    
                if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    //从这里可以看出,广播模式与集群模式的最本质区别就是offset存储的地方不一样。
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                            //广播模式是在消费者本地存储offset
                        case BROADCASTING:
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                            //集群模式是在Broker远端存储offset
                        case CLUSTERING:
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        default:
                            break;
                    }
                    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                }
                this.offsetStore.load();
                //顺序消费监听创建ConsumeMessageOrderlyService
                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                    //并发消费监听创建ConsumeMessageConcurrentlyService
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                   this.consumeOrderly = false;
                   this.consumeMessageService =
                   new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }
    
               this.consumeMessageService.start();
               //注册消费者。与生产者类似,客户端只要按要求注册即可,后续会随mQClientFactory一起启动。
               boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
               if (!registerOK) {
                   this.serviceState = ServiceState.CREATE_JUST;
                   this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
                   throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                   + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                   null);
               }
    
               mQClientFactory.start();
               log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
               this.serviceState = ServiceState.RUNNING;
               break;
               case RUNNING:
               case START_FAILED:
               case SHUTDOWN_ALREADY:
                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);
               default:
                    break;
               }
    
                   this.updateTopicSubscribeInfoWhenSubscriptionChanged();
                   this.mQClientFactory.checkClientInBroker();
                   this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
                   this.mQClientFactory.rebalanceImmediately();
               }
    

    1、consumer端的消费模式:
    ● 集群模式:集群模式下每个consumer都会分配不同的消息
    ● 广播模式:广播模式下每个消息都推送给所有consumer
    2、关于offset存储:
    ● 广播模式:this.offsetStore = new LocalFileOffsetStore(); 存储在每个consumer中
    ● 集群模式:this.offsetStore = new RemoteBrokerOffsetStore(); 存储在broker端

    相关文章

      网友评论

          本文标题:RocketMq源码解析

          本文链接:https://www.haomeiwen.com/subject/ygskldtx.html