美文网首页
RocketMQ源码剖析

RocketMQ源码剖析

作者: 王侦 | 来源:发表于2023-02-15 19:11 被阅读0次

1.各个组件启动源码、框架结构

1.1 NameServer启动

NamesrvStartup#main

  • 1)NamesrvController controller = createNamesrvController(args);创建controller
    1-1)检测命令行参数
    1-2)创建两个核心配置:NamesrvConfig和NettyServerConfig
    1-3)解析-c 和 -p 参数,赋值到上面两个配置中
    1-4)ROCKETMQ_HOME环境变量检测
    1-5)controller = new NamesrvController(namesrvConfig, nettyServerConfig);创建controller
    1-6) controller.getConfiguration().registerConfig(properties);注册一下所有的配置
  • 2)start(controller);启动controller
    2-1)controller.initialize();初始化
     A)this.kvConfigManager.load();加载KV配置
     B)this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);创建NettyServer网络处理对象
     C) this.remotingExecutor = Executors.newFixedThreadPool()Netty服务器的工作线程池
     D)this.registerProcessor();注册NameServer的Processor 注册到RemotingServer中。
     E)NamesrvController.this.routeInfoManager.scanNotActiveBroker();定时任务:每间隔10S扫描一次Broker,移除不活跃的Broker
     F)NamesrvController.this.kvConfigManager.printAllPeriodically();定时任务:每间隔10min打印一次KV配置
    2-2)Runtime.getRuntime().addShutdownHook()服务关闭钩子,在服务正常关闭时执行。
    2-3) controller.start();启动服务
     A)this.remotingServer.start();启动remotingServer。
     B)不为null也启动this.fileWatchService.start();

NameServer的核心作用:

  • 一是维护Broker的服务地址并进行及时的更新。
  • 二是给Producer和Consumer提供服务获取Broker列表。

1.2 Broker启动

BrokerStartup#main

  • 1)createBrokerController(args)创建controller
    1-1)检测命令行参数
    1-2)创建四个核心配置:BrokerConfig、NettyServerConfig、NettyClientConfig和MessageStoreConfig
    1-3)解析-c 参数,赋值到上面四个配置中
    1-4)ROCKETMQ_HOME环境变量检测
    1-5)处理NamesrcAddr
    1-6)通过brokerId判断主从:ASYNC_MASTER、SYNC_MASTER、SLAVE;Dledger集群的所有Broker节点ID都是-1
    1-7)解析-p和-m参数,赋值到上面四个配置中
    1-8) controller = new BrokerController(),创建核心的BrokerController
    1-9)controller.getConfiguration().registerConfig(properties);注册配置
    1-10)controller.initialize();初始化BrokerController。
     A)加载磁盘上的配置信息(json)。topicConfigManager、consumerOffsetManager、subscriptionGroupManager、consumerFilterManager
     B)构建消息存储管理组件DefaultMessageStore,外层还会包装插件AbstractPluginMessageStore
     C)加载磁盘文件this.messageStore.load();
     D)this.remotingServer = new NettyRemotingServer()
     E)this.fastRemotingServer = new NettyRemotingServer()这个fastRemotingServer与RemotingServer功能基本差不多,处理VIP端口请求
     F) this.sendMessageExecutor发送消息的线程池
     G) this.pullMessageExecutor处理consumer的pull请求的线程池
     H) this.replyMessageExecutor回复消息的线程池
     I)this.queryMessageExecutor查询消息的线程池
     J)this.adminBrokerExecutor
     K)this.clientManageExecutor管理客户端的线程池
     L) this.heartbeatExecutor心跳请求线程池
     M)this.endTransactionExecutor
     N) this.consumerManageExecutor
     O)this.registerProcessor();Broker注册Processor
     P)BrokerController.this.getBrokerStats().record();定时进行broker统计的任务
     Q)BrokerController.this.consumerOffsetManager.persist();定时进行consumer消费Offset持久化到磁盘的任务
     R)BrokerController.this.consumerFilterManager.persist();对consumer的filter过滤器进行持久化的任务。这里可以看到,消费者的filter是被下推到了Broker来执行的。
     S)BrokerController.this.protectBroker();定时进行broker保护
     T)BrokerController.this.printWaterMark();定时打印水位线 U)BrokerController.this.getMessageStore().dispatchBehindByte
    s()定时进行落后commitlog分发的任务
     V)this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());或者BrokerController.this.brokerOuterAPI.fetchNameServerAddr();设置NameServer的地址列表。可以从配置加载,也可以发远程请求加载
     W)initialTransaction();
     X)initialAcl(); 权限控制
     Y)initialRpcHooks();
    1-11)Runtime.getRuntime().addShutdownHook()服务关闭钩子,在服务正常关闭时执行。
  • 2)start()启动controller
     A)this.messageStore.start();存储组件,这里启动服务主要是为了将CommitLog的写入事件分发给ComsumeQueue和IndexFile
     B)this.remotingServer.start();以及this.fastRemotingServer.start();
     C)this.fileWatchService.start();
     D) this.brokerOuterAPI.start();Broker的brokerOuterAPI可以理解为一个Netty客户端,往外发请求的组件。例如发送心跳
     E)this.pullRequestHoldService.start();长轮询请求暂存服务
     F)this.clientHousekeepingService.start();
     G)this.filterServerManager.start();
     H)BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());Broker核心的心跳注册任务
     I)this.brokerStatsManager.start();
     J) this.brokerFastFailure.start();

1.3 Netty网络框架

Broker端的NettyRemotingServer:

NettyRemotingServer#start

    public void start() {
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {

                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });

        prepareSharableHandlers();
        //K1 Netty服务启动的核心流程
        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        //Netty的核心服务流程,encoder和decoder,二进制传输协议。
                        //RocketMQ中的二进制传输协议比较复杂,是否能按照JSON自定义二进制协议?
                        //serverHandler负责最关键的网络请求处理。
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                            .addLast(defaultEventExecutorGroup,
                                encoder,
                                new NettyDecoder(),
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                connectionManageHandler,
                                serverHandler
                            );
                    }
                });

        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }
        //开始Socket监听
        try {
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }

        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }
        //每秒清理过期的异步请求暂存结果。
        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }

核心是NettyServerHandler

    @ChannelHandler.Sharable
    class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            processMessageReceived(ctx, msg);
        }
    }
    public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }
    public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
        final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());

核心的业务处理是在processorTable,其初始化是在BrokerController#registerProcessor

    public void registerProcessor() {
        /**
         * SendMessageProcessor
         */
        SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
        sendProcessor.registerSendMessageHook(sendMessageHookList);
        sendProcessor.registerConsumeMessageHook(consumeMessageHookList);

        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
        /**
         * PullMessageProcessor
         */
        this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
        this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);

所以关键的业务处理,后面只要去processorTable查找类型的对应Processor就行。

RocketMQ的同步结果推送与异步结果推送
RocketMQ的RemotingServer服务端,会维护一个responseTable,这是一个线程同步的Map结构。 key为请求的ID,value是异步的消息结果。ConcurrentMap<Integer , ResponseFuture> 。

处理同步请求(NettyRemotingAbstract#invokeSyncImpl)时,处理的结果会存入responseTable,通过ResponseFuture提供一定的服务端异步处理支持,提升服务端的吞吐量。 请求返回后,立即从responseTable中移除请求记录。

处理异步请求(NettyRemotingAbstract#invokeAsyncImpl)时,处理的结果依然会存入responsTable,等待客户端后续再来请求结果。但是他保存的依然是一个ResponseFuture,也就是在客户端请求结果时再去获取真正的结果。 另外,在RemotingServer启动时,会启动一个定时的线程任务,不断扫描responseTable,将其中过期的response清除掉。

1.4 Broker心跳注册过程

BrokerController#start

        //K2 Broker核心的心跳注册任务,需要深入解读下。
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
    public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
        ...
        //这里才是比较关键的地方。先判断是否需要注册,然后调用doRegisterBrokerAll方法真正去注册。
        if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
            this.getBrokerAddr(),
            this.brokerConfig.getBrokerName(),
            this.brokerConfig.getBrokerId(),
            this.brokerConfig.getRegisterBrokerTimeoutMills())) {
            doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
        }
    }
    //K2 Broker注册最核心的部分
    private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
        TopicConfigSerializeWrapper topicConfigWrapper) {
        List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
            this.brokerConfig.getBrokerClusterName(),
            this.getBrokerAddr(),
            this.brokerConfig.getBrokerName(),
            this.brokerConfig.getBrokerId(),
            this.getHAServerAddr(),
            topicConfigWrapper,
            this.filterServerManager.buildNewFilterServerList(),
            oneway,
            this.brokerConfig.getRegisterBrokerTimeoutMills(),
            this.brokerConfig.isCompressedRegister());

        if (registerBrokerResultList.size() > 0) {
            RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
            if (registerBrokerResult != null) {
                //注册完保存主从节点的地址
                if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
                    this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
                }

                this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());

                if (checkOrderConfig) {
                    this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
                }
            }
        }
    }

会向所有NameServer进行注册:

    public List<RegisterBrokerResult> registerBrokerAll(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final boolean oneway,
        final int timeoutMills,
        final boolean compressed) {
        //使用CopyOnWriteArrayList提升并发安全性
        final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
        if (nameServerAddressList != null && nameServerAddressList.size() > 0) {

            final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
            requestHeader.setBrokerAddr(brokerAddr);
            requestHeader.setBrokerId(brokerId);
            requestHeader.setBrokerName(brokerName);
            requestHeader.setClusterName(clusterName);
            requestHeader.setHaServerAddr(haServerAddr);
            requestHeader.setCompressed(compressed);

            RegisterBrokerBody requestBody = new RegisterBrokerBody();
            requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
            requestBody.setFilterServerList(filterServerList);
            final byte[] body = requestBody.encode(compressed);
            final int bodyCrc32 = UtilAll.crc32(body);
            requestHeader.setBodyCrc32(bodyCrc32);
            //通过CountDownLatch,保证在所有NameServer上完成注册后再一起结束。
            final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
            for (final String namesrvAddr : nameServerAddressList) {
                brokerOuterExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
                            if (result != null) {
                                registerBrokerResultList.add(result);
                            }

                            log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                        } catch (Exception e) {
                            log.warn("registerBroker Exception, {}", namesrvAddr, e);
                        } finally {
                            countDownLatch.countDown();
                        }
                    }
                });
            }

            try {
                countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
            }
        }

        return registerBrokerResultList;
    }

NameServer端的处理:
DefaultRequestProcessor#processRequest

    //K2 NameServer处理请求的核心代码
    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {

        if (ctx != null) {
            log.debug("receive request, {} {} {}",
                request.getCode(),
                RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                request);
        }


        switch (request.getCode()) {
            case RequestCode.PUT_KV_CONFIG:
                return this.putKVConfig(ctx, request);
            case RequestCode.GET_KV_CONFIG:
                return this.getKVConfig(ctx, request);
            case RequestCode.DELETE_KV_CONFIG:
                return this.deleteKVConfig(ctx, request);
            case RequestCode.QUERY_DATA_VERSION:
                return queryBrokerTopicConfig(ctx, request);
            case RequestCode.REGISTER_BROKER: //Broker注册请求处理。版本默认是当前框架版本
                Version brokerVersion = MQVersion.value2Version(request.getVersion());
                if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                    return this.registerBrokerWithFilterServer(ctx, request); //当前版本
                } else {
                    return this.registerBroker(ctx, request);
                }

最终会调用RouteInfoManager对Broker信息进行注册:

    //K2 NameServer 实际处理Broker注册的地方
    public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
        final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
        final RegisterBrokerRequestHeader requestHeader =
            (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);

        if (!checksum(ctx, request, requestHeader)) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("crc32 not match");
            return response;
        }

        RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();

        if (request.getBody() != null) {
            try {
                registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed());
            } catch (Exception e) {
                throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e);
            }
        } else {
            registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));
            registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);
        }
        //routeInfoManager就是管理路由信息的核心组件。
        RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
            requestHeader.getClusterName(),
            requestHeader.getBrokerAddr(),
            requestHeader.getBrokerName(),
            requestHeader.getBrokerId(),
            requestHeader.getHaServerAddr(),
            registerBrokerBody.getTopicConfigSerializeWrapper(),
            registerBrokerBody.getFilterServerList(),
            ctx.channel());

        responseHeader.setHaServerAddr(result.getHaServerAddr());
        responseHeader.setMasterAddr(result.getMasterAddr());

        byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
        response.setBody(jsonValue);

        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }

看看RouteInfoManager 管理的路由信息:

public class RouteInfoManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    //几个关键的Table
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

2.发送消息

2.1 普通消息发送DefaultMQProducer

DefaultMQProducer#start

    public void start() throws MQClientException {
        this.setProducerGroup(withNamespace(this.producerGroup));
        this.defaultMQProducerImpl.start();
        if (null != traceDispatcher) {
            try {
                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
            } catch (MQClientException e) {
                log.warn("trace dispatcher start failed ", e);
            }
        }
    }

DefaultMQProducerImpl#start()

    public void start() throws MQClientException {
        this.start(true);
    }
    //K2 消息生产者的启动方法
    public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;

                this.checkConfig();
                //修改当前的instanceName为当前进程ID
                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }
                //客户端核心的MQ客户端工厂 对于事务消息发送者,在这里面会完成事务消息的发送者的服务注册
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
                //注册MQ客户端工厂示例
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }

                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
                //启动示例 --所有客户端组件都交由mQClientFactory启动
                if (startFactory) {
                    mQClientFactory.start();
                }

                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }

        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

        this.startScheduledTask();

    }

总结:

  • 1)RocketMQ的所有客户端实例,包括生产者和消费者,都是统一交由mQClientFactory组件来启动,也就是说,所有客户端的启动流程是固定的,不同客户端的区别只是在于他们在启动前注册的一些信息不同。例如生产者注册到producerTable,消费者注册到consumerTable,管理控制端注册到adminExtTable
  • 2)MQClientInstance#start启动了很多服务
    this.mQClientAPIImpl.fetchNameServerAddr();
    this.mQClientAPIImpl.start();
    this.startScheduledTask();
    this.pullMessageService.start();
    this.rebalanceService.start();
    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);

DefaultMQProducer#send(Message)
-> DefaultMQProducerImpl#send(Message)
-> DefaultMQProducerImpl#sendDefaultImpl

  • 1)TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());获取Topic信息
  • 2)MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);Producer根据发送者负载均衡策略,计算把消息发到哪个MessageQueue中(Producer在获取路由信息后,会选出一个MessageQueue去发送消息。这个选MessageQueue的方法就是一个索引自增然后取模的方式。.sendLatencyFaultEnable默认是关闭的,Broker故障延迟机制,表示一种发送消息失败后一定时间内不再往同一个Queue重复发送的机制)
  • 3)sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);实际发送消息的方法(根据MessageQueue 获取对应的Broker地址)

Producer如何管理Borker路由信息?

Producer需要拉取Broker列表,然后跟Broker建立连接等等很多核心的流程,其实都是在发送消息时建立的。Send方法中,首先需要获得Topic的路由信息。这会从本地缓存中获取,如果本地缓存中没有,就从NameServer中去申请。

    //找路由表的过程都是先从本地缓存找,本地缓存没有,就去NameServer上申请
    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            //Producer向NameServer获取更新Topic的路由信息
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            //还是从本地缓存中寻找Topic的路由信息
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }

        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }

2.2 事务消息TransactionMQProducer

    //事务消息的启动过程。启动过程中会完成Processor的注册
    @Override
    public void start() throws MQClientException {
        this.defaultMQProducerImpl.initTransactionEnv();
        super.start();
    }
    public void initTransactionEnv() {
        TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
        if (producer.getExecutorService() != null) {
            this.checkExecutor = producer.getExecutorService();
        } else {
            this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
            this.checkExecutor = new ThreadPoolExecutor(
                producer.getCheckThreadPoolMinSize(),
                producer.getCheckThreadPoolMaxSize(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.checkRequestQueue);
        }
    }

这里唯一区别就是多了一个线程池checkExecutor。

    public TransactionSendResult sendMessageInTransaction(final Message msg,
        final Object arg) throws MQClientException {
        if (null == this.transactionListener) {
            throw new MQClientException("TransactionListener is null", null);
        }

        msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
        return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
    }

DefaultMQProducerImpl#sendMessageInTransaction

  • 1)TransactionListener transactionListener = getCheckListener();获取监听器
  • 2)设置消息属性
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
  • 3)发送消息sendResult = this.send(msg);
  • 4)发送成功,执行本地事务localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
  • 5)发送失败(各种错误原因),设置为回滚状态
  • 6)根据状态进行处理this.endTransaction(msg, sendResult, localTransactionState, localException);这里会向broker发起提交或者回滚请求

客户端处理Broker回查请求:
ClientRemotingProcessor#processRequest
-> RequestCode.CHECK_TRANSACTION_STATE
ClientRemotingProcessor#checkTransactionState
-> DefaultMQProducerImpl#checkTransactionState
-> this.checkExecutor.submit(request);
-> localTransactionState = transactionListener.checkLocalTransaction(message);

3.消费消息

DefaultMQPushConsumer#start

    public void start() throws MQClientException {
        setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
        this.defaultMQPushConsumerImpl.start();
        if (null != traceDispatcher) {
            try {
                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
            } catch (MQClientException e) {
                log.warn("trace dispatcher start failed ", e);
            }
        }
    }

DefaultMQPushConsumerImpl#start

  • 1)CLUSTERING模式,this.defaultMQPushConsumer.changeInstanceNameToPID();
  • 2)this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);客户端示例工厂,生产者也是交由这个工厂启动的。
  • 3)this.rebalanceImpl.setAllocateMessageQueueStrategy(
    this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());负载均衡策略
  • 4)this.pullAPIWrapper = new PullAPIWrapper();
  • 5)广播模式与集群模式的最本质区别就是offset存储的地方不一样。
    广播模式是在消费者本地存储offset:this.offsetStore = new LocalFileOffsetStore();
    集群模式是在Broker远端存储offset:this.offsetStore = new RemoteBrokerOffsetStore();
  • 6)消费者服务
    顺序消费监听创建ConsumeMessageOrderlyService;
    并发消费监听创建ConsumeMessageConcurrentlyService;
    this.consumeMessageService.start();
  • 7)注册消费者。与生产者类似,客户端只要按要求注册即可,后续会随mQClientFactory一起启动。
    mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
  • 8) mQClientFactory.start();
  • 9)this.updateTopicSubscribeInfoWhenSubscriptionChanged();
  • 10)this.mQClientFactory.checkClientInBroker();
  • 11)this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
  • 12)this.mQClientFactory.rebalanceImmediately();

消费端负载均衡:

  • AllocateMachineRoomNearby: 将同机房的Consumer和Broker优先分配在一起。
  • AllocateMessageQueueAveragely:平均分配。将所有MessageQueue平均分给每一个消费者
  • AllocateMessageQueueAveragelyByCircle: 轮询分配。轮流的给一个消费者分配一个MessageQueue。
  • AllocateMessageQueueByConfig: 不分配,直接指定一个messageQueue列表。类似于广播模式,直接指定所有队列。
  • AllocateMessageQueueByMachineRoom:按逻辑机房的概念进行分配。又是对BrokerName和ConsumerIdc有定制化的配置。
  • AllocateMessageQueueConsistentHash。源码中有测试代码AllocateMessageQueueConsitentHashTest。这个一致性哈希策略只需要指定一个虚拟节点数,是用的一个哈希环的算法,虚拟节点是为了让Hash数据在换上分布更为均匀。

消费者服务的串联:

  • DefaultMQPushConsumerImpl#start
  • this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
  • instance = new MQClientInstance()
  • this.pullMessageService = new PullMessageService(this);
  • PullMessageService#run
  • PullMessageService#pullMessage
  • DefaultMQPushConsumerImpl#pullMessage
  • DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest()消费者消息服务处理消费到的消息
  • ConsumeMessageConcurrentlyService#submitConsumeRequest
    ConsumeMessageOrderlyService#submitConsumeRequest
  • this.consumeExecutor.submit(consumeRequest);并发和顺序消费的线程池线程数都为20
  • ConsumeMessageConcurrentlyService.ConsumeRequest#run
    ConsumeMessageOrderlyService.ConsumeRequest#run 需要加锁
  • status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);这个listener就是自定义的业务逻辑。

4.Broker端

4.1 文件存储

DefaultMessageStore#putMessage

    //K1 Broker典型的消息存储处理
    //当前版本将默认的写入方式更改成了异步写入机制。
    @Override
    public PutMessageResult putMessage(MessageExtBrokerInner msg) {
        try {
            return asyncPutMessage(msg).get();
        } catch (InterruptedException | ExecutionException e) {
            return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
        }
    }

DefaultMessageStore#asyncPutMessage
-> CommitLog#asyncPutMessage

  • 1)迟消息的实现方式,就要修改一下msg的topic和queueID,改为系统默认创建的延迟队列。topic是固定的SCHEDULE_TOPIC_XXXX,queueId是根据延迟级别选择的。
  • 2)加锁putMessageLock.lock();
  • 3)找到最后一个CommitLog文件。最后一个就是当前写的文件
  • 4)mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); 以零拷贝的方式实现消息顺序写。 ByteBuffer.allocateDirect(fileSize)
  • 5)putMessageLock.unlock();
  • 6)submitFlushRequest(result, msg);提交刷盘请求
    同步刷盘机制:向this.flushCommitLogService提交GroupCommitRequest请求,每10ms执行一次flush;
    异步刷盘机制:flushCommitLogService.wakeup();或者commitLogService.wakeup();
  • 7)submitReplicaRequest(result, msg);提交主从同步请求。
    -> HAService#putRequest
    -> this.groupTransferService.putRequest(request);
    HAService有三个核心组件,与Master相关的是acceptSocketService和groupTransferService。其中acceptSocketService主要负责维护Master与Slave之间的TCP连接。groupTransferService主要与主从同步复制有关。而slave相关的则是haClient。

接下来看看ConsumeQueue和IndexFile的写入:

  • DefaultMessageStore#start
  • this.reputMessageService.start();
  • DefaultMessageStore.ReputMessageService#run,Commit日志分发服务,每隔1毫秒,会检查是否需要(就是看有没有新数据)向ConsumeQueue和IndexFile中转发一次CommitLog写入的消息。
  • DefaultMessageStore.ReputMessageService#doReput
  • DefaultMessageStore.this.doDispatch(dispatchRequest);
    CommitLogDispatcherBuildConsumeQueue#doDispatch
    CommitLogDispatcherBuildIndex#doDispatch
  • 长轮询: 如果有消息到了主节点,并且开启了长轮询。就要通过长轮询机制通知消费者,新消息已经到了,可以消费了。DefaultMessageStore.this.messageArrivingListener.arriving(),实例是NotifyMessageArrivingListener

定时删除过期文件:

  • DefaultMessageStore#start
  • DefaultMessageStore#addScheduleTask
  • 定时任务:DefaultMessageStore.this.cleanFilesPeriodically();
  • this.cleanCommitLogService.run();定时删除过期commitlog
    this.cleanConsumeQueueService.run();定时删除过期的consumequeue

4.2 长轮询机制

RocketMQ对消息消费者提供了Push推模式和Pull拉模式两种消费模式。但是这两种消费模式的本质其实都是Pull拉模式,Push模式可以认为是一种定时的Pull机制。但是这时有一个问题,当使用Push模式时,如果RocketMQ中没有对应的数据,那难道一直进行空轮询吗?如果是这样的话,那显然会极大的浪费网络带宽以及服务器的性能,并且,当有新的消息进来时,RocketMQ也没有办法尽快通知客户端,而只能等客户端下一次来拉取消息了。针对这个问题,RocketMQ实现了一种长轮询机制 long polling。

长轮询机制简单来说,就是当Broker接收到Consumer的Pull请求时,判断如果没有对应的消息,不用直接给Consumer响应(给响应也是个空的,没意义),而是就将这个Pull请求给缓存起来。当Producer发送消息过来时,增加一个步骤去检查是否有对应的已缓存的Pull请求,如果有,就及时将请求从缓存中拉取出来,并将消息通知给Consumer。

消费者拉取消息
BrokerController#registerProcessor
-> this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
-> PullMessageProcessor#processRequest()
-> ResponseCode.PULL_NOT_FOUND,消息长轮询1:消费者消费时,没有消息就会被缓存起来。brokerAllowSuspend 客户端初次请求消息时是指定的true。重新唤醒时指定为false,hasSuspendFlag默认都是true。
this.brokerController.getPullRequestHoldService().suspendPullRequest( topic, queueId, pullRequest);请求是PullRequest

生产者发送消息
BrokerController#registerProcessor

    public void registerProcessor() {
        /**
         * SendMessageProcessor
         */
        SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
        sendProcessor.registerSendMessageHook(sendMessageHookList);
        sendProcessor.registerConsumeMessageHook(consumeMessageHookList);

        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
        /**
         * PullMessageProcessor
         */
        this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
        this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);

SendMessageProcessor#processRequest

  • SendMessageProcessor#asyncProcessRequest()
  • SendMessageProcessor#asyncSendMessage
  • DefaultMessageStore#asyncPutMessage(参见4.1)

接下来看看关联的ReputMessageService

  • DefaultMessageStore#start
  • this.reputMessageService.start();
  • DefaultMessageStore.ReputMessageService#run,Commit日志分发服务,每隔1毫秒,会检查是否需要(就是看有没有新数据)向ConsumeQueue和IndexFile中转发一次CommitLog写入的消息。
  • DefaultMessageStore.ReputMessageService#doReput
  • DefaultMessageStore.this.doDispatch(dispatchRequest);
    CommitLogDispatcherBuildConsumeQueue#doDispatch
    CommitLogDispatcherBuildIndex#doDispatch
  • 长轮询: 如果有消息到了主节点,并且开启了长轮询。就要通过长轮询机制通知消费者,新消息已经到了,可以消费了。DefaultMessageStore.this.messageArrivingListener.arriving(),实例是NotifyMessageArrivingListener
  • NotifyMessageArrivingListener#arriving 长轮询:生产者发送消息后的监听事件
  • this.pullRequestHoldService.notifyMessageArriving()
  • PullRequestHoldService#notifyMessageArriving() 会去this.pullRequestTable.get(key);查找

相关文章

网友评论

      本文标题:RocketMQ源码剖析

      本文链接:https://www.haomeiwen.com/subject/sqwdkdtx.html