美文网首页
RocketMQ源码剖析

RocketMQ源码剖析

作者: 王侦 | 来源:发表于2023-02-15 19:11 被阅读0次

    1.各个组件启动源码、框架结构

    1.1 NameServer启动

    NamesrvStartup#main

    • 1)NamesrvController controller = createNamesrvController(args);创建controller
      1-1)检测命令行参数
      1-2)创建两个核心配置:NamesrvConfig和NettyServerConfig
      1-3)解析-c 和 -p 参数,赋值到上面两个配置中
      1-4)ROCKETMQ_HOME环境变量检测
      1-5)controller = new NamesrvController(namesrvConfig, nettyServerConfig);创建controller
      1-6) controller.getConfiguration().registerConfig(properties);注册一下所有的配置
    • 2)start(controller);启动controller
      2-1)controller.initialize();初始化
       A)this.kvConfigManager.load();加载KV配置
       B)this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);创建NettyServer网络处理对象
       C) this.remotingExecutor = Executors.newFixedThreadPool()Netty服务器的工作线程池
       D)this.registerProcessor();注册NameServer的Processor 注册到RemotingServer中。
       E)NamesrvController.this.routeInfoManager.scanNotActiveBroker();定时任务:每间隔10S扫描一次Broker,移除不活跃的Broker
       F)NamesrvController.this.kvConfigManager.printAllPeriodically();定时任务:每间隔10min打印一次KV配置
      2-2)Runtime.getRuntime().addShutdownHook()服务关闭钩子,在服务正常关闭时执行。
      2-3) controller.start();启动服务
       A)this.remotingServer.start();启动remotingServer。
       B)不为null也启动this.fileWatchService.start();

    NameServer的核心作用:

    • 一是维护Broker的服务地址并进行及时的更新。
    • 二是给Producer和Consumer提供服务获取Broker列表。

    1.2 Broker启动

    BrokerStartup#main

    • 1)createBrokerController(args)创建controller
      1-1)检测命令行参数
      1-2)创建四个核心配置:BrokerConfig、NettyServerConfig、NettyClientConfig和MessageStoreConfig
      1-3)解析-c 参数,赋值到上面四个配置中
      1-4)ROCKETMQ_HOME环境变量检测
      1-5)处理NamesrcAddr
      1-6)通过brokerId判断主从:ASYNC_MASTER、SYNC_MASTER、SLAVE;Dledger集群的所有Broker节点ID都是-1
      1-7)解析-p和-m参数,赋值到上面四个配置中
      1-8) controller = new BrokerController(),创建核心的BrokerController
      1-9)controller.getConfiguration().registerConfig(properties);注册配置
      1-10)controller.initialize();初始化BrokerController。
       A)加载磁盘上的配置信息(json)。topicConfigManager、consumerOffsetManager、subscriptionGroupManager、consumerFilterManager
       B)构建消息存储管理组件DefaultMessageStore,外层还会包装插件AbstractPluginMessageStore
       C)加载磁盘文件this.messageStore.load();
       D)this.remotingServer = new NettyRemotingServer()
       E)this.fastRemotingServer = new NettyRemotingServer()这个fastRemotingServer与RemotingServer功能基本差不多,处理VIP端口请求
       F) this.sendMessageExecutor发送消息的线程池
       G) this.pullMessageExecutor处理consumer的pull请求的线程池
       H) this.replyMessageExecutor回复消息的线程池
       I)this.queryMessageExecutor查询消息的线程池
       J)this.adminBrokerExecutor
       K)this.clientManageExecutor管理客户端的线程池
       L) this.heartbeatExecutor心跳请求线程池
       M)this.endTransactionExecutor
       N) this.consumerManageExecutor
       O)this.registerProcessor();Broker注册Processor
       P)BrokerController.this.getBrokerStats().record();定时进行broker统计的任务
       Q)BrokerController.this.consumerOffsetManager.persist();定时进行consumer消费Offset持久化到磁盘的任务
       R)BrokerController.this.consumerFilterManager.persist();对consumer的filter过滤器进行持久化的任务。这里可以看到,消费者的filter是被下推到了Broker来执行的。
       S)BrokerController.this.protectBroker();定时进行broker保护
       T)BrokerController.this.printWaterMark();定时打印水位线 U)BrokerController.this.getMessageStore().dispatchBehindByte
      s()定时进行落后commitlog分发的任务
       V)this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());或者BrokerController.this.brokerOuterAPI.fetchNameServerAddr();设置NameServer的地址列表。可以从配置加载,也可以发远程请求加载
       W)initialTransaction();
       X)initialAcl(); 权限控制
       Y)initialRpcHooks();
      1-11)Runtime.getRuntime().addShutdownHook()服务关闭钩子,在服务正常关闭时执行。
    • 2)start()启动controller
       A)this.messageStore.start();存储组件,这里启动服务主要是为了将CommitLog的写入事件分发给ComsumeQueue和IndexFile
       B)this.remotingServer.start();以及this.fastRemotingServer.start();
       C)this.fileWatchService.start();
       D) this.brokerOuterAPI.start();Broker的brokerOuterAPI可以理解为一个Netty客户端,往外发请求的组件。例如发送心跳
       E)this.pullRequestHoldService.start();长轮询请求暂存服务
       F)this.clientHousekeepingService.start();
       G)this.filterServerManager.start();
       H)BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());Broker核心的心跳注册任务
       I)this.brokerStatsManager.start();
       J) this.brokerFastFailure.start();

    1.3 Netty网络框架

    Broker端的NettyRemotingServer:

    NettyRemotingServer#start

        public void start() {
            this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
                nettyServerConfig.getServerWorkerThreads(),
                new ThreadFactory() {
    
                    private AtomicInteger threadIndex = new AtomicInteger(0);
    
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                    }
                });
    
            prepareSharableHandlers();
            //K1 Netty服务启动的核心流程
            ServerBootstrap childHandler =
                this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                    .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .option(ChannelOption.SO_REUSEADDR, true)
                    .option(ChannelOption.SO_KEEPALIVE, false)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                    .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                    .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            //Netty的核心服务流程,encoder和decoder,二进制传输协议。
                            //RocketMQ中的二进制传输协议比较复杂,是否能按照JSON自定义二进制协议?
                            //serverHandler负责最关键的网络请求处理。
                            ch.pipeline()
                                .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                                .addLast(defaultEventExecutorGroup,
                                    encoder,
                                    new NettyDecoder(),
                                    new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                    connectionManageHandler,
                                    serverHandler
                                );
                        }
                    });
    
            if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
                childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            }
            //开始Socket监听
            try {
                ChannelFuture sync = this.serverBootstrap.bind().sync();
                InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
                this.port = addr.getPort();
            } catch (InterruptedException e1) {
                throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
            }
    
            if (this.channelEventListener != null) {
                this.nettyEventExecutor.start();
            }
            //每秒清理过期的异步请求暂存结果。
            this.timer.scheduleAtFixedRate(new TimerTask() {
    
                @Override
                public void run() {
                    try {
                        NettyRemotingServer.this.scanResponseTable();
                    } catch (Throwable e) {
                        log.error("scanResponseTable exception", e);
                    }
                }
            }, 1000 * 3, 1000);
        }
    

    核心是NettyServerHandler

        @ChannelHandler.Sharable
        class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
    
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
                processMessageReceived(ctx, msg);
            }
        }
    
        public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            final RemotingCommand cmd = msg;
            if (cmd != null) {
                switch (cmd.getType()) {
                    case REQUEST_COMMAND:
                        processRequestCommand(ctx, cmd);
                        break;
                    case RESPONSE_COMMAND:
                        processResponseCommand(ctx, cmd);
                        break;
                    default:
                        break;
                }
            }
        }
    
        public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
            final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
    

    核心的业务处理是在processorTable,其初始化是在BrokerController#registerProcessor

        public void registerProcessor() {
            /**
             * SendMessageProcessor
             */
            SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
            sendProcessor.registerSendMessageHook(sendMessageHookList);
            sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
    
            this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
            this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
            this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
            this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
            /**
             * PullMessageProcessor
             */
            this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
            this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
    

    所以关键的业务处理,后面只要去processorTable查找类型的对应Processor就行。

    RocketMQ的同步结果推送与异步结果推送
    RocketMQ的RemotingServer服务端,会维护一个responseTable,这是一个线程同步的Map结构。 key为请求的ID,value是异步的消息结果。ConcurrentMap<Integer , ResponseFuture> 。

    处理同步请求(NettyRemotingAbstract#invokeSyncImpl)时,处理的结果会存入responseTable,通过ResponseFuture提供一定的服务端异步处理支持,提升服务端的吞吐量。 请求返回后,立即从responseTable中移除请求记录。

    处理异步请求(NettyRemotingAbstract#invokeAsyncImpl)时,处理的结果依然会存入responsTable,等待客户端后续再来请求结果。但是他保存的依然是一个ResponseFuture,也就是在客户端请求结果时再去获取真正的结果。 另外,在RemotingServer启动时,会启动一个定时的线程任务,不断扫描responseTable,将其中过期的response清除掉。

    1.4 Broker心跳注册过程

    BrokerController#start

            //K2 Broker核心的心跳注册任务,需要深入解读下。
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                    } catch (Throwable e) {
                        log.error("registerBrokerAll Exception", e);
                    }
                }
            }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
    
        public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
            ...
            //这里才是比较关键的地方。先判断是否需要注册,然后调用doRegisterBrokerAll方法真正去注册。
            if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
                this.getBrokerAddr(),
                this.brokerConfig.getBrokerName(),
                this.brokerConfig.getBrokerId(),
                this.brokerConfig.getRegisterBrokerTimeoutMills())) {
                doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
            }
        }
    
        //K2 Broker注册最核心的部分
        private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
            TopicConfigSerializeWrapper topicConfigWrapper) {
            List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
                this.brokerConfig.getBrokerClusterName(),
                this.getBrokerAddr(),
                this.brokerConfig.getBrokerName(),
                this.brokerConfig.getBrokerId(),
                this.getHAServerAddr(),
                topicConfigWrapper,
                this.filterServerManager.buildNewFilterServerList(),
                oneway,
                this.brokerConfig.getRegisterBrokerTimeoutMills(),
                this.brokerConfig.isCompressedRegister());
    
            if (registerBrokerResultList.size() > 0) {
                RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
                if (registerBrokerResult != null) {
                    //注册完保存主从节点的地址
                    if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
                        this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
                    }
    
                    this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
    
                    if (checkOrderConfig) {
                        this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
                    }
                }
            }
        }
    

    会向所有NameServer进行注册:

        public List<RegisterBrokerResult> registerBrokerAll(
            final String clusterName,
            final String brokerAddr,
            final String brokerName,
            final long brokerId,
            final String haServerAddr,
            final TopicConfigSerializeWrapper topicConfigWrapper,
            final List<String> filterServerList,
            final boolean oneway,
            final int timeoutMills,
            final boolean compressed) {
            //使用CopyOnWriteArrayList提升并发安全性
            final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
            List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
            if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
    
                final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
                requestHeader.setBrokerAddr(brokerAddr);
                requestHeader.setBrokerId(brokerId);
                requestHeader.setBrokerName(brokerName);
                requestHeader.setClusterName(clusterName);
                requestHeader.setHaServerAddr(haServerAddr);
                requestHeader.setCompressed(compressed);
    
                RegisterBrokerBody requestBody = new RegisterBrokerBody();
                requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
                requestBody.setFilterServerList(filterServerList);
                final byte[] body = requestBody.encode(compressed);
                final int bodyCrc32 = UtilAll.crc32(body);
                requestHeader.setBodyCrc32(bodyCrc32);
                //通过CountDownLatch,保证在所有NameServer上完成注册后再一起结束。
                final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
                for (final String namesrvAddr : nameServerAddressList) {
                    brokerOuterExecutor.execute(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
                                if (result != null) {
                                    registerBrokerResultList.add(result);
                                }
    
                                log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                            } catch (Exception e) {
                                log.warn("registerBroker Exception, {}", namesrvAddr, e);
                            } finally {
                                countDownLatch.countDown();
                            }
                        }
                    });
                }
    
                try {
                    countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                }
            }
    
            return registerBrokerResultList;
        }
    

    NameServer端的处理:
    DefaultRequestProcessor#processRequest

        //K2 NameServer处理请求的核心代码
        @Override
        public RemotingCommand processRequest(ChannelHandlerContext ctx,
            RemotingCommand request) throws RemotingCommandException {
    
            if (ctx != null) {
                log.debug("receive request, {} {} {}",
                    request.getCode(),
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                    request);
            }
    
    
            switch (request.getCode()) {
                case RequestCode.PUT_KV_CONFIG:
                    return this.putKVConfig(ctx, request);
                case RequestCode.GET_KV_CONFIG:
                    return this.getKVConfig(ctx, request);
                case RequestCode.DELETE_KV_CONFIG:
                    return this.deleteKVConfig(ctx, request);
                case RequestCode.QUERY_DATA_VERSION:
                    return queryBrokerTopicConfig(ctx, request);
                case RequestCode.REGISTER_BROKER: //Broker注册请求处理。版本默认是当前框架版本
                    Version brokerVersion = MQVersion.value2Version(request.getVersion());
                    if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                        return this.registerBrokerWithFilterServer(ctx, request); //当前版本
                    } else {
                        return this.registerBroker(ctx, request);
                    }
    

    最终会调用RouteInfoManager对Broker信息进行注册:

        //K2 NameServer 实际处理Broker注册的地方
        public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
            throws RemotingCommandException {
            final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
            final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
            final RegisterBrokerRequestHeader requestHeader =
                (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
    
            if (!checksum(ctx, request, requestHeader)) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("crc32 not match");
                return response;
            }
    
            RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
    
            if (request.getBody() != null) {
                try {
                    registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed());
                } catch (Exception e) {
                    throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e);
                }
            } else {
                registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));
                registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);
            }
            //routeInfoManager就是管理路由信息的核心组件。
            RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
                requestHeader.getClusterName(),
                requestHeader.getBrokerAddr(),
                requestHeader.getBrokerName(),
                requestHeader.getBrokerId(),
                requestHeader.getHaServerAddr(),
                registerBrokerBody.getTopicConfigSerializeWrapper(),
                registerBrokerBody.getFilterServerList(),
                ctx.channel());
    
            responseHeader.setHaServerAddr(result.getHaServerAddr());
            responseHeader.setMasterAddr(result.getMasterAddr());
    
            byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
            response.setBody(jsonValue);
    
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
            return response;
        }
    

    看看RouteInfoManager 管理的路由信息:

    public class RouteInfoManager {
        private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
        private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
        private final ReadWriteLock lock = new ReentrantReadWriteLock();
        //几个关键的Table
        private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
        private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
        private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
        private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
        private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
    

    2.发送消息

    2.1 普通消息发送DefaultMQProducer

    DefaultMQProducer#start

        public void start() throws MQClientException {
            this.setProducerGroup(withNamespace(this.producerGroup));
            this.defaultMQProducerImpl.start();
            if (null != traceDispatcher) {
                try {
                    traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
                } catch (MQClientException e) {
                    log.warn("trace dispatcher start failed ", e);
                }
            }
        }
    

    DefaultMQProducerImpl#start()

        public void start() throws MQClientException {
            this.start(true);
        }
        //K2 消息生产者的启动方法
        public void start(final boolean startFactory) throws MQClientException {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
    
                    this.checkConfig();
                    //修改当前的instanceName为当前进程ID
                    if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                        this.defaultMQProducer.changeInstanceNameToPID();
                    }
                    //客户端核心的MQ客户端工厂 对于事务消息发送者,在这里面会完成事务消息的发送者的服务注册
                    this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
                    //注册MQ客户端工厂示例
                    boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                    if (!registerOK) {
                        this.serviceState = ServiceState.CREATE_JUST;
                        throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                            null);
                    }
    
                    this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
                    //启动示例 --所有客户端组件都交由mQClientFactory启动
                    if (startFactory) {
                        mQClientFactory.start();
                    }
    
                    log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                        this.defaultMQProducer.isSendMessageWithVIPChannel());
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                case START_FAILED:
                case SHUTDOWN_ALREADY:
                    throw new MQClientException("The producer service state not OK, maybe started once, "
                        + this.serviceState
                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                        null);
                default:
                    break;
            }
    
            this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    
            this.startScheduledTask();
    
        }
    

    总结:

    • 1)RocketMQ的所有客户端实例,包括生产者和消费者,都是统一交由mQClientFactory组件来启动,也就是说,所有客户端的启动流程是固定的,不同客户端的区别只是在于他们在启动前注册的一些信息不同。例如生产者注册到producerTable,消费者注册到consumerTable,管理控制端注册到adminExtTable
    • 2)MQClientInstance#start启动了很多服务
      this.mQClientAPIImpl.fetchNameServerAddr();
      this.mQClientAPIImpl.start();
      this.startScheduledTask();
      this.pullMessageService.start();
      this.rebalanceService.start();
      this.defaultMQProducer.getDefaultMQProducerImpl().start(false);

    DefaultMQProducer#send(Message)
    -> DefaultMQProducerImpl#send(Message)
    -> DefaultMQProducerImpl#sendDefaultImpl

    • 1)TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());获取Topic信息
    • 2)MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);Producer根据发送者负载均衡策略,计算把消息发到哪个MessageQueue中(Producer在获取路由信息后,会选出一个MessageQueue去发送消息。这个选MessageQueue的方法就是一个索引自增然后取模的方式。.sendLatencyFaultEnable默认是关闭的,Broker故障延迟机制,表示一种发送消息失败后一定时间内不再往同一个Queue重复发送的机制)
    • 3)sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);实际发送消息的方法(根据MessageQueue 获取对应的Broker地址)

    Producer如何管理Borker路由信息?

    Producer需要拉取Broker列表,然后跟Broker建立连接等等很多核心的流程,其实都是在发送消息时建立的。Send方法中,首先需要获得Topic的路由信息。这会从本地缓存中获取,如果本地缓存中没有,就从NameServer中去申请。

        //找路由表的过程都是先从本地缓存找,本地缓存没有,就去NameServer上申请
        private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
            TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
            if (null == topicPublishInfo || !topicPublishInfo.ok()) {
                this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
                //Producer向NameServer获取更新Topic的路由信息
                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
                //还是从本地缓存中寻找Topic的路由信息
                topicPublishInfo = this.topicPublishInfoTable.get(topic);
            }
    
            if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
                return topicPublishInfo;
            } else {
                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
                topicPublishInfo = this.topicPublishInfoTable.get(topic);
                return topicPublishInfo;
            }
        }
    

    2.2 事务消息TransactionMQProducer

        //事务消息的启动过程。启动过程中会完成Processor的注册
        @Override
        public void start() throws MQClientException {
            this.defaultMQProducerImpl.initTransactionEnv();
            super.start();
        }
    
        public void initTransactionEnv() {
            TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
            if (producer.getExecutorService() != null) {
                this.checkExecutor = producer.getExecutorService();
            } else {
                this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
                this.checkExecutor = new ThreadPoolExecutor(
                    producer.getCheckThreadPoolMinSize(),
                    producer.getCheckThreadPoolMaxSize(),
                    1000 * 60,
                    TimeUnit.MILLISECONDS,
                    this.checkRequestQueue);
            }
        }
    

    这里唯一区别就是多了一个线程池checkExecutor。

        public TransactionSendResult sendMessageInTransaction(final Message msg,
            final Object arg) throws MQClientException {
            if (null == this.transactionListener) {
                throw new MQClientException("TransactionListener is null", null);
            }
    
            msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
            return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
        }
    

    DefaultMQProducerImpl#sendMessageInTransaction

    • 1)TransactionListener transactionListener = getCheckListener();获取监听器
    • 2)设置消息属性
      MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
      MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
    • 3)发送消息sendResult = this.send(msg);
    • 4)发送成功,执行本地事务localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
    • 5)发送失败(各种错误原因),设置为回滚状态
    • 6)根据状态进行处理this.endTransaction(msg, sendResult, localTransactionState, localException);这里会向broker发起提交或者回滚请求

    客户端处理Broker回查请求:
    ClientRemotingProcessor#processRequest
    -> RequestCode.CHECK_TRANSACTION_STATE
    ClientRemotingProcessor#checkTransactionState
    -> DefaultMQProducerImpl#checkTransactionState
    -> this.checkExecutor.submit(request);
    -> localTransactionState = transactionListener.checkLocalTransaction(message);

    3.消费消息

    DefaultMQPushConsumer#start

        public void start() throws MQClientException {
            setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
            this.defaultMQPushConsumerImpl.start();
            if (null != traceDispatcher) {
                try {
                    traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
                } catch (MQClientException e) {
                    log.warn("trace dispatcher start failed ", e);
                }
            }
        }
    

    DefaultMQPushConsumerImpl#start

    • 1)CLUSTERING模式,this.defaultMQPushConsumer.changeInstanceNameToPID();
    • 2)this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);客户端示例工厂,生产者也是交由这个工厂启动的。
    • 3)this.rebalanceImpl.setAllocateMessageQueueStrategy(
      this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());负载均衡策略
    • 4)this.pullAPIWrapper = new PullAPIWrapper();
    • 5)广播模式与集群模式的最本质区别就是offset存储的地方不一样。
      广播模式是在消费者本地存储offset:this.offsetStore = new LocalFileOffsetStore();
      集群模式是在Broker远端存储offset:this.offsetStore = new RemoteBrokerOffsetStore();
    • 6)消费者服务
      顺序消费监听创建ConsumeMessageOrderlyService;
      并发消费监听创建ConsumeMessageConcurrentlyService;
      this.consumeMessageService.start();
    • 7)注册消费者。与生产者类似,客户端只要按要求注册即可,后续会随mQClientFactory一起启动。
      mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
    • 8) mQClientFactory.start();
    • 9)this.updateTopicSubscribeInfoWhenSubscriptionChanged();
    • 10)this.mQClientFactory.checkClientInBroker();
    • 11)this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    • 12)this.mQClientFactory.rebalanceImmediately();

    消费端负载均衡:

    • AllocateMachineRoomNearby: 将同机房的Consumer和Broker优先分配在一起。
    • AllocateMessageQueueAveragely:平均分配。将所有MessageQueue平均分给每一个消费者
    • AllocateMessageQueueAveragelyByCircle: 轮询分配。轮流的给一个消费者分配一个MessageQueue。
    • AllocateMessageQueueByConfig: 不分配,直接指定一个messageQueue列表。类似于广播模式,直接指定所有队列。
    • AllocateMessageQueueByMachineRoom:按逻辑机房的概念进行分配。又是对BrokerName和ConsumerIdc有定制化的配置。
    • AllocateMessageQueueConsistentHash。源码中有测试代码AllocateMessageQueueConsitentHashTest。这个一致性哈希策略只需要指定一个虚拟节点数,是用的一个哈希环的算法,虚拟节点是为了让Hash数据在换上分布更为均匀。

    消费者服务的串联:

    • DefaultMQPushConsumerImpl#start
    • this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
    • instance = new MQClientInstance()
    • this.pullMessageService = new PullMessageService(this);
    • PullMessageService#run
    • PullMessageService#pullMessage
    • DefaultMQPushConsumerImpl#pullMessage
    • DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest()消费者消息服务处理消费到的消息
    • ConsumeMessageConcurrentlyService#submitConsumeRequest
      ConsumeMessageOrderlyService#submitConsumeRequest
    • this.consumeExecutor.submit(consumeRequest);并发和顺序消费的线程池线程数都为20
    • ConsumeMessageConcurrentlyService.ConsumeRequest#run
      ConsumeMessageOrderlyService.ConsumeRequest#run 需要加锁
    • status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);这个listener就是自定义的业务逻辑。

    4.Broker端

    4.1 文件存储

    DefaultMessageStore#putMessage

        //K1 Broker典型的消息存储处理
        //当前版本将默认的写入方式更改成了异步写入机制。
        @Override
        public PutMessageResult putMessage(MessageExtBrokerInner msg) {
            try {
                return asyncPutMessage(msg).get();
            } catch (InterruptedException | ExecutionException e) {
                return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
            }
        }
    

    DefaultMessageStore#asyncPutMessage
    -> CommitLog#asyncPutMessage

    • 1)迟消息的实现方式,就要修改一下msg的topic和queueID,改为系统默认创建的延迟队列。topic是固定的SCHEDULE_TOPIC_XXXX,queueId是根据延迟级别选择的。
    • 2)加锁putMessageLock.lock();
    • 3)找到最后一个CommitLog文件。最后一个就是当前写的文件
    • 4)mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); 以零拷贝的方式实现消息顺序写。 ByteBuffer.allocateDirect(fileSize)
    • 5)putMessageLock.unlock();
    • 6)submitFlushRequest(result, msg);提交刷盘请求
      同步刷盘机制:向this.flushCommitLogService提交GroupCommitRequest请求,每10ms执行一次flush;
      异步刷盘机制:flushCommitLogService.wakeup();或者commitLogService.wakeup();
    • 7)submitReplicaRequest(result, msg);提交主从同步请求。
      -> HAService#putRequest
      -> this.groupTransferService.putRequest(request);
      HAService有三个核心组件,与Master相关的是acceptSocketService和groupTransferService。其中acceptSocketService主要负责维护Master与Slave之间的TCP连接。groupTransferService主要与主从同步复制有关。而slave相关的则是haClient。

    接下来看看ConsumeQueue和IndexFile的写入:

    • DefaultMessageStore#start
    • this.reputMessageService.start();
    • DefaultMessageStore.ReputMessageService#run,Commit日志分发服务,每隔1毫秒,会检查是否需要(就是看有没有新数据)向ConsumeQueue和IndexFile中转发一次CommitLog写入的消息。
    • DefaultMessageStore.ReputMessageService#doReput
    • DefaultMessageStore.this.doDispatch(dispatchRequest);
      CommitLogDispatcherBuildConsumeQueue#doDispatch
      CommitLogDispatcherBuildIndex#doDispatch
    • 长轮询: 如果有消息到了主节点,并且开启了长轮询。就要通过长轮询机制通知消费者,新消息已经到了,可以消费了。DefaultMessageStore.this.messageArrivingListener.arriving(),实例是NotifyMessageArrivingListener

    定时删除过期文件:

    • DefaultMessageStore#start
    • DefaultMessageStore#addScheduleTask
    • 定时任务:DefaultMessageStore.this.cleanFilesPeriodically();
    • this.cleanCommitLogService.run();定时删除过期commitlog
      this.cleanConsumeQueueService.run();定时删除过期的consumequeue

    4.2 长轮询机制

    RocketMQ对消息消费者提供了Push推模式和Pull拉模式两种消费模式。但是这两种消费模式的本质其实都是Pull拉模式,Push模式可以认为是一种定时的Pull机制。但是这时有一个问题,当使用Push模式时,如果RocketMQ中没有对应的数据,那难道一直进行空轮询吗?如果是这样的话,那显然会极大的浪费网络带宽以及服务器的性能,并且,当有新的消息进来时,RocketMQ也没有办法尽快通知客户端,而只能等客户端下一次来拉取消息了。针对这个问题,RocketMQ实现了一种长轮询机制 long polling。

    长轮询机制简单来说,就是当Broker接收到Consumer的Pull请求时,判断如果没有对应的消息,不用直接给Consumer响应(给响应也是个空的,没意义),而是就将这个Pull请求给缓存起来。当Producer发送消息过来时,增加一个步骤去检查是否有对应的已缓存的Pull请求,如果有,就及时将请求从缓存中拉取出来,并将消息通知给Consumer。

    消费者拉取消息
    BrokerController#registerProcessor
    -> this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
    -> PullMessageProcessor#processRequest()
    -> ResponseCode.PULL_NOT_FOUND,消息长轮询1:消费者消费时,没有消息就会被缓存起来。brokerAllowSuspend 客户端初次请求消息时是指定的true。重新唤醒时指定为false,hasSuspendFlag默认都是true。
    this.brokerController.getPullRequestHoldService().suspendPullRequest( topic, queueId, pullRequest);请求是PullRequest

    生产者发送消息
    BrokerController#registerProcessor

        public void registerProcessor() {
            /**
             * SendMessageProcessor
             */
            SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
            sendProcessor.registerSendMessageHook(sendMessageHookList);
            sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
    
            this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
            this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
            this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
            this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
            /**
             * PullMessageProcessor
             */
            this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
            this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
    

    SendMessageProcessor#processRequest

    • SendMessageProcessor#asyncProcessRequest()
    • SendMessageProcessor#asyncSendMessage
    • DefaultMessageStore#asyncPutMessage(参见4.1)

    接下来看看关联的ReputMessageService

    • DefaultMessageStore#start
    • this.reputMessageService.start();
    • DefaultMessageStore.ReputMessageService#run,Commit日志分发服务,每隔1毫秒,会检查是否需要(就是看有没有新数据)向ConsumeQueue和IndexFile中转发一次CommitLog写入的消息。
    • DefaultMessageStore.ReputMessageService#doReput
    • DefaultMessageStore.this.doDispatch(dispatchRequest);
      CommitLogDispatcherBuildConsumeQueue#doDispatch
      CommitLogDispatcherBuildIndex#doDispatch
    • 长轮询: 如果有消息到了主节点,并且开启了长轮询。就要通过长轮询机制通知消费者,新消息已经到了,可以消费了。DefaultMessageStore.this.messageArrivingListener.arriving(),实例是NotifyMessageArrivingListener
    • NotifyMessageArrivingListener#arriving 长轮询:生产者发送消息后的监听事件
    • this.pullRequestHoldService.notifyMessageArriving()
    • PullRequestHoldService#notifyMessageArriving() 会去this.pullRequestTable.get(key);查找

    相关文章

      网友评论

          本文标题:RocketMQ源码剖析

          本文链接:https://www.haomeiwen.com/subject/sqwdkdtx.html