美文网首页
RocketMQ源码解析——存储部分(8)操作消息相关日志的中介

RocketMQ源码解析——存储部分(8)操作消息相关日志的中介

作者: szhlcy | 来源:发表于2021-07-01 13:51 被阅读0次

    @[toc]

    作用

    前面介绍了RocketMQ的一些主要的日志文件,CommitLogConsumeQueueIndexFile的结构和存储操作原理。这些文件的处理类都在不同的类中处理的。RocketMQ中提供了DefaultMessageStore来对这些类进行一个封装聚合和额外的扩展。比如过期消息的清理,新消息的保存,消息的查询,消息的刷盘等。除此之外也提供一些服务启动时候的一些逻辑,比如从磁盘加载元数据,服务停止时候的消息保存逻辑等。

    分析

    构造方法DefaultMessageStore

    DefaultMessageStore的构造方法,主要是给该类的一些成员变量进行赋值或者初始化。这些成员变量都是DefaultMessageStore间接操作CommitLog文件或者是ConsumeQueue的对象以及持久化等相关的操作类。

    public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
            final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
            //消息到达监听器,这个跟PullRequestHoldService一起使用的,消息到达后调用PullRequestHoldService中的notifyMessageArriving方法
            this.messageArrivingListener = messageArrivingListener;
            //broker配置
            this.brokerConfig = brokerConfig;
            //消息存储配置
            this.messageStoreConfig = messageStoreConfig;
            //broker状态管理类,统计broker相关信息,比如消息数量,topic数量等
            this.brokerStatsManager = brokerStatsManager;
            //提前主动申请内存文件服务类,用于CommitLog
            this.allocateMappedFileService = new AllocateMappedFileService(this);
            //
            if (messageStoreConfig.isEnableDLegerCommitLog()) {
                this.commitLog = new DLedgerCommitLog(this);
            } else {
                this.commitLog = new CommitLog(this);
            }
            //管理ConsumeQueue文件的集合
            this.consumeQueueTable = new ConcurrentHashMap<>(32);
            //将ConsumeQueue逻辑队列刷盘的服务类,1秒刷一次,最多尝试3次,
            this.flushConsumeQueueService = new FlushConsumeQueueService();
            //清理物理文件服务,定期清理72小时之前的物理文件。
            this.cleanCommitLogService = new CleanCommitLogService();
            //清理逻辑文件服务,定期清理在逻辑队列中的物理偏移量小于commitlog中的最小物理偏移量的数据,同时也清理Index中物理偏移量小于commitlog中的最小物理偏移量的数据
            this.cleanConsumeQueueService = new CleanConsumeQueueService();
            //存储层内部统计服务
            this.storeStatsService = new StoreStatsService();
            //index文件的存储
            this.indexService = new IndexService(this);
            //用于commitlog数据的主备同步
            if (!messageStoreConfig.isEnableDLegerCommitLog()) {
                this.haService = new HAService(this);
            } else {
                this.haService = null;
            }
            //用于转发CommitLog中消息到ConsumeQueue中
            this.reputMessageService = new ReputMessageService();
            //用于监控延迟消息,并到期后执行,吧延迟消息写入CommitLog
            this.scheduleMessageService = new ScheduleMessageService(this);
            //临时日志文件存储池
            this.transientStorePool = new TransientStorePool(messageStoreConfig);
    
            if (messageStoreConfig.isTransientStorePoolEnable()) {
                this.transientStorePool.init();
            }
            //启动allocateMappedFileService
            this.allocateMappedFileService.start();
            //启动indexService
            this.indexService.start();
            //消息转存任务的集合
            this.dispatcherList = new LinkedList<>();
            //负责把CommitLog消息转存到ConsumeQueue文件
            this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
            //负责吧CommitLog消息转存到Index文件
            this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
            //在保存日志数据文件的根目录 {user.home}/store 路径下 创建一个一个名字为lock 的文件
            File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));
            MappedFile.ensureDirOK(file.getParent());
            //创建根目录文件的随机文件流,权限是读写
            lockFile = new RandomAccessFile(file, "rw");
        }
    

     这里对一些重要的变量进行讲解

    变量对象 作用描述
    messageArrivingListener 消息到达监听器,这个跟PullRequestHoldService一起使用的,消息到达后调用PullRequestHoldService中的notifyMessageArriving方法 ,在push模式下会调用
    brokerConfig broker配置
    messageStoreConfig 消息存储相关配置
    brokerStatsManager broker状态管理类,统计broker相关信息,比如消息数量,topic数量等
    allocateMappedFileService 提前主动申请内存文件服务类,用于CommitLog
    commitLog CommitLog日志文件的生成和处理类,根据不同的情况可能是普通的CommitLog页可能是基于Raft协议扩展实现的CommitLog
    consumeQueueTable 管理ConsumeQueue文件的集合
    flushConsumeQueueService 将ConsumeQueue逻辑队列刷盘的服务类,1秒刷一次,最多尝试3次,
    cleanCommitLogService 清理物理文件CommitLog服务,定期清理72小时之前的物理文件。或者CommitLog日志文件的在磁盘占比达到了75以上就会进行清理
    cleanConsumeQueueService 清理逻辑文件服务,定期清理在逻辑队列中的物理偏移量小于commitlog中的最小物理偏移量的数据,同时也清理Index中物理偏移量小于commitlog中的最小物理偏移量的数据
    storeStatsService 存储层内部统计服务
    indexService index文件的存储和读取类
    haService 高可用的服务类,如果是基于Raft协议的高可用那么这个类就是null
    reputMessageService 用于转发CommitLog中消息到ConsumeQueue中
    scheduleMessageService 用于监控延迟消息,并到期后执行,吧延迟消息真实的信息写入CommitLog
    transientStorePool 临时日志文件存储池,在CommitLog使用的临时存储池的时候会用到
    dispatcherList 消息转存任务的集合 , 用于把CommitLog的消息日志同时转存一份到ConsumeQueue中和Index中,用于构建索引和消费队列

    内部方法分析

     这里分析Broker按照启动的前后顺序来进行分析。

    Broker启动后服务于日志相关的类启动的start方法

     Broker在启动时会创建一个DefaultMessageStore,利用前面说的构造器进行创建,然后会调用start方法对成员变量中的一些类进行启动。

       public void start() throws Exception {
            //获取lock文件的唯一FileChannel对象,然后获取文件锁
            lock = lockFile.getChannel().tryLock(0, 1, false);
            //如果锁是null 或者 锁是共享类型的 或者 锁不是有效的,则抛出异常
            if (lock == null || lock.isShared() || !lock.isValid()) {
                throw new RuntimeException("Lock failed,MQ already started");
            }
            //写入lock 四个字符到lock文件
            lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
            lockFile.getChannel().force(true);
            {
                /**
                 * 1. Make sure the fast-forward messages to be truncated during the recovering according to the max physical offset of the commitlog;
                 * 2. DLedger committedPos may be missing, so the maxPhysicalPosInLogicQueue maybe bigger that maxOffset returned by DLedgerCommitLog, just let it go;
                 * 3. Calculate the reput offset according to the consume queue;
                 * 4. Make sure the fall-behind messages to be dispatched before starting the commitlog, especially when the broker role are automatically changed.
                 */
                //获取CommitLog的最小物理偏移量
                long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
                //比较ConsumeQueue文件记录的最大物理偏移量和实际的物理偏移量的大小,取ConsumeQueue文件记录的
                for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
                    for (ConsumeQueue logic : maps.values()) {
                        //如果ConsumeQueue记录的最大物理偏移量 大于 CommitLog的最小偏移量,则选用ConsumeQueue记录的偏移量,
                        // 因为ConsumeQueue记录下来的表示已经换发到了ConsumeQueue了,不需要处理,只需要处理没有转发的
                        if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
                            maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
                        }
                    }
                }
                //记录的小于0,则置位0
                if (maxPhysicalPosInLogicQueue < 0) {
                    maxPhysicalPosInLogicQueue = 0;
                }
                /**
                 *  如果逻辑文件记录的物理偏移量  小于 实际的最小物理偏移量,则置位实际的。
                 *   1. 如果有人移除了ConsumeQueue文件 或者 保存文件的磁盘损坏了
                 *   2. 从别的broker上面复制CommitLog到一个新启动的broker机器中
                 *  这几种情况都可能使得逻辑日志ConsumeQueue文件记录的最小物理偏移量为0
                 *
                 */
                if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
                    maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
                    /**
                     * This happens in following conditions:
                     * 1. If someone removes all the consumequeue files or the disk get damaged.
                     * 2. Launch a new broker, and copy the commitlog from other brokers.
                     *
                     * All the conditions has the same in common that the maxPhysicalPosInLogicQueue should be 0.
                     * If the maxPhysicalPosInLogicQueue is gt 0, there maybe something wrong.
                     */
                    log.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset());
                }
                log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}",
                    maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset());
                //启动reputMessageService 来按照CommitLog 生成ConsumeQueue
                this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
                this.reputMessageService.start();
    
                /**
                 *  1. Finish dispatching the messages fall behind, then to start other services.
                 *  2. DLedger committedPos may be missing, so here just require dispatchBehindBytes <= 0
                 */
                //等待同步完成
                while (true) {
                    if (dispatchBehindBytes() <= 0) {
                        break;
                    }
                    Thread.sleep(1000);
                    log.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", this.reputMessageService.getReputFromOffset(), this.getMaxPhyOffset(), this.dispatchBehindBytes());
                }
                this.recoverTopicQueueTable();
            }
            //如果没有使用DLegerCommitLog,就使用HaService来做高可用
            if (!messageStoreConfig.isEnableDLegerCommitLog()) {
                this.haService.start();
                //如果是master,则启动延迟消息的检测任务
                this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
            }
            //刷新ConsumeQueue的服务启动
            this.flushConsumeQueueService.start();
            //CommitLog刷新的服务启动
            this.commitLog.start();
            //存储状态检测的服务启动
            this.storeStatsService.start();
            //创建临时文件,来表示是否正常关机
            this.createTempFile();
            //启动其他服务。比如清除过期日志的服务等
            this.addScheduleTask();
            this.shutdown = false;
        }
    

     这里发现有很大的一段逻辑实在判断CommitLog文件的最小物理偏移量和ConsumeQueue记录的最大物理移量的比较,这里比较的原因是。reputMessageService这个类,会对CommitLog中的消息进行转存到ConsumeQueue中。这里可以参考前面的ConsumeQueue的分析的文章CommitLog的分析文章。还有一些别的任务也会同时启动,这里就不仔细分析。

    Broker启动加载文件的 load方法

     Broker在启动后,会对机器中的可能存在的日志先关的文件比如CommitLog,ConsumeQueue,IndexFile等先进行恢复处理。这个恢复的处理逻辑就在 load方法中。

        public boolean load() {
            boolean result = true;
    
            try {
                //是否存在abort文件,如果存在说明上次服务关闭时异常关闭的
                boolean lastExitOK = !this.isTempFileExist();
                log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
                //加载定时任务的配置文件,解析延迟级别配置信息
                if (null != scheduleMessageService) {
                    result = result && this.scheduleMessageService.load();
                }
    
                // 加载 Commit Log文件
                result = result && this.commitLog.load();
    
                // 加载 Consume Queue文件
                result = result && this.loadConsumeQueue();
                //检查前面3个文件是不是加载成功
                if (result) {
                    //加载成功则继续加载checkpoint文件
                    this.storeCheckpoint =
                        new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
                    //加载indexFile
                    this.indexService.load(lastExitOK);
                    //进行文件的恢复逻辑
                    this.recover(lastExitOK);
    
                    log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
                }
            } catch (Exception e) {
                log.error("load exception", e);
                result = false;
            }
    
            if (!result) {
                this.allocateMappedFileService.shutdown();
            }
    
            return result;
        }
    
    
    
    
        private void recover(final boolean lastExitOK) {
            long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
            //上次服务关闭是不是正常关闭
            if (lastExitOK) {
                //正常关闭情况关闭
                this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
            } else {
                //异常情况关闭
                this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
            }
            //恢复topic消费相关相关的缓存
            this.recoverTopicQueueTable();
        }
    

     这里可以看到文件的恢复存在一种情况,就是上次Broker的关闭时异常关闭的情况。这种情况下对应的CommitLog的恢复与正常情况的恢复是不一样的。整体的 load方法逻辑如下:

    1. 检查abort文件是不是存在,如果存在表示上次是异常关闭,这个文件是一个空文件,在启动之后会创建,正常关闭的情况会删除掉。
    2. 加载延迟消息相关的配置,加载 Commit Log文件,加载Consume Queue文件
    3. 如果步骤2成功加载,则加载checkpoint文件,加载indexFile然后进行文件的恢复逻辑
    4. 对于文件的恢复逻辑在recover方法中,会调用CommitLog类中的方法

     这里有几个点没有进行分析,可以看前面的文章。第一个就是CommitLog文件正常恢复和异常恢复相关逻辑在前面的CommitLog的文章中有分析。第二个是延迟消息相关的逻辑在延迟消息的原理ScheduleMessageService中。第三个IndexFile的逻辑在IndexFile消息索引日志文件相关的IndexService

    存储消息的putMessage方法

    DefaultMessageStore中的putMessage方法其实主要是检查存储状态,校验消息和记录消息存储的耗时和次数,主要的逻辑还是在CommitLog类中。这里不对CommitLog类中的逻辑进行分析

        public PutMessageResult putMessage(MessageExtBrokerInner msg) {
            //检查存储状态,检查操作系统是不是繁忙
            PutMessageStatus checkStoreStatus = this.checkStoreStatus();
            if (checkStoreStatus != PutMessageStatus.PUT_OK) {
                return new PutMessageResult(checkStoreStatus, null);
            }
            //校验消息的topic长度和属性长度
            PutMessageStatus msgCheckStatus = this.checkMessage(msg);
            if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
                return new PutMessageResult(msgCheckStatus, null);
            }
            long beginTime = this.getSystemClock().now();
            PutMessageResult result = this.commitLog.putMessage(msg);
            //计算耗时
            long elapsedTime = this.getSystemClock().now() - beginTime;
            if (elapsedTime > 500) {
                log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
            }
            //记录消息的存储时间,分为13个耗时区间段来存储,每个区间段统计放入的消息个
            this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
            //如果消息存储失败,则统计失败的次数
            if (null == result || !result.isOk()) {
                this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
            }
    
            return result;
        }
    
        private PutMessageStatus checkStoreStatus() {
            if (this.shutdown) {
                log.warn("message store has shutdown, so putMessage is forbidden");
                return PutMessageStatus.SERVICE_NOT_AVAILABLE;
            }
            //salve  机器不能存储消息
            if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                long value = this.printTimes.getAndIncrement();
                if ((value % 50000) == 0) {
                    log.warn("broke role is slave, so putMessage is forbidden");
                }
                return PutMessageStatus.SERVICE_NOT_AVAILABLE;
            }
            //检查是不是可写的状态
            if (!this.runningFlags.isWriteable()) {
                long value = this.printTimes.getAndIncrement();
                if ((value % 50000) == 0) {
                    log.warn("the message store is not writable. It may be caused by one of the following reasons: " +
                        "the broker's disk is full, write to logic queue error, write to index file error, etc");
                }
                return PutMessageStatus.SERVICE_NOT_AVAILABLE;
            } else {
                this.printTimes.set(0);
            }
            //如果写入CommitLog时间超过1m,则认为操作系统的pageCache繁忙
            if (this.isOSPageCacheBusy()) {
                return PutMessageStatus.OS_PAGECACHE_BUSY;
            }
            return PutMessageStatus.PUT_OK;
        }
    
    获取消息的getMessage方法

     获取消息的逻辑比较长,主要复杂点在于计算消息的偏移量信息。主要逻辑如下

    1. 检查服务的状态
    2. 获取CommitLog的最大物理偏移量,根据要获取的消息的topic和queueId找到对应的ConsumeQueue,并获取最大逻辑偏移量和最小逻辑偏移量
    3. 根据获取的逻辑偏移量进行一系列的校验,校验通过则根据ConsumeQueue的消息单元中记录的消息物理偏移量和传入的偏移量进行对比寻找
    4. 找到对应的消息的物理偏移量,然后进行封装最后返回
        public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
            final int maxMsgNums,
            final MessageFilter messageFilter) {
            if (this.shutdown) {
                log.warn("message store has shutdown, so getMessage is forbidden");
                return null;
            }
            //检查服务的状态是否是可读
            if (!this.runningFlags.isReadable()) {
                log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());
                return null;
            }
    
            long beginTime = this.getSystemClock().now();
    
            GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
            long nextBeginOffset = offset;
            long minOffset = 0;
            long maxOffset = 0;
    
            GetMessageResult getResult = new GetMessageResult();
            //获取存储的消息的最大的物理偏移量
            final long maxOffsetPy = this.commitLog.getMaxOffset();
            //根据topic和queueId 先找到对应的逻辑日志ConsumeQueue
            ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
            //如果逻辑日志不为空,获取最小和最大的逻辑偏移量
            if (consumeQueue != null) {
                minOffset = consumeQueue.getMinOffsetInQueue();
                maxOffset = consumeQueue.getMaxOffsetInQueue();
                //如果最大逻辑日志为0,说明没有存任何逻辑消息记录
                if (maxOffset == 0) {
                    status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
                    nextBeginOffset = nextOffsetCorrection(offset, 0);
                } else if (offset < minOffset) {
                    //消息偏移量小于最小的偏移量
                    status = GetMessageStatus.OFFSET_TOO_SMALL;
                    nextBeginOffset = nextOffsetCorrection(offset, minOffset);
                } else if (offset == maxOffset) {
                    //偏移等于最大偏移量,则表示消息溢出了
                    status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
                    nextBeginOffset = nextOffsetCorrection(offset, offset);
                } else if (offset > maxOffset) {
                    //大于最大逻辑偏移量,则严重溢出
                    status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
                    if (0 == minOffset) {
                        nextBeginOffset = nextOffsetCorrection(offset, minOffset);
                    } else {
                        nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
                    }
                } else {
                    //根据偏移量获取对应的逻辑消息
                    SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
                    if (bufferConsumeQueue != null) {
                        try {
                            status = GetMessageStatus.NO_MATCHED_MESSAGE;
    
                            long nextPhyFileStartOffset = Long.MIN_VALUE;
                            long maxPhyOffsetPulling = 0;
    
                            int i = 0;
                            //计算最大获取的消息长度  = 单个逻辑单元长度 * 获取的消息个数 = 20*maxMsgNums
                            final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
                            //是否开启磁盘降幅
                            final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
                            ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                            //依次获取消息
                            for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                                //获取消息的物理偏移量 在CommitLog中的位置
                                long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
                                //获取消息的大小
                                int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
                                //获取消息的tagcode
                                long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
                                //当前消息的CommitLog的偏移量
                                maxPhyOffsetPulling = offsetPy;
    
                                if (nextPhyFileStartOffset != Long.MIN_VALUE) {
                                    if (offsetPy < nextPhyFileStartOffset)
                                        continue;
                                }
                                /**
                                 * 检查获取的消息是不是在磁盘上
                                 * 1. 计算系统的内存大小,然后✖️ 对应的访问消息在内存中的最大比率参数(accessMessageInMemoryMaxRatio)得到存储消息的内存大小memory
                                 * 2. 比较CommitLog的最大偏移量和当前消息的偏移量差,如果小于memory 则在内存中获取消息。否则从磁盘获取消息
                                 */
                                boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
                                /**
                                 * 消息是否获取完毕。如果已经获取的消息的大小,或者消息的数量,达到了设置的上限也会直接返回
                                 * - 从磁盘获取时
                                 *   maxTransferBytesOnMessageInDisk : 一批次从磁盘获取消息的最大允许大小
                                 *   maxTransferCountOnMessageInDisk :  一次从磁盘获取消息的最大允许数量
                                 *
                                 * - 从内存获取
                                 *   maxTransferBytesOnMessageInMemory:一批次从内存获取消息的最大允许大小
                                 *   maxTransferCountOnMessageInMemory:一次从内存获取消息的最大允许数量
                                 */
                                if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
                                    isInDisk)) {
                                    break;
                                }
                                //
                                boolean extRet = false, isTagsCodeLegal = true;
                                //检查是否有ConsumeQueue扩展文件,是的则从扩展文件获取
                                if (consumeQueue.isExtAddr(tagsCode)) {
                                    extRet = consumeQueue.getExt(tagsCode, cqExtUnit);
                                    if (extRet) {
                                        tagsCode = cqExtUnit.getTagsCode();
                                    } else {
                                        // can't find ext content.Client will filter messages by tag also.
                                        log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}",
                                            tagsCode, offsetPy, sizePy, topic, group);
                                        isTagsCodeLegal = false;
                                    }
                                }
                                //如果有ConsumeQueue对应的过滤器,则进行过滤
                                if (messageFilter != null
                                    && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
                                    if (getResult.getBufferTotalSize() == 0) {
                                        status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                    }
    
                                    continue;
                                }
                                //从CommitLog获取消息
                                SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
                                if (null == selectResult) {
                                    if (getResult.getBufferTotalSize() == 0) {
                                        status = GetMessageStatus.MESSAGE_WAS_REMOVING;
                                    }
    
                                    nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
                                    continue;
                                }
                                //如果有CommitLog先关的过滤器则进行处理
                                if (messageFilter != null
                                    && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
                                    if (getResult.getBufferTotalSize() == 0) {
                                        status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                    }
                                    // release...
                                    selectResult.release();
                                    continue;
                                }
                                //增加消息存储服务的相关消息获数量记录
                                this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
                                getResult.addMessage(selectResult);
                                status = GetMessageStatus.FOUND;
                                nextPhyFileStartOffset = Long.MIN_VALUE;
                            }
                            //开启磁盘降幅,
                            if (diskFallRecorded) {
                                long fallBehind = maxOffsetPy - maxPhyOffsetPulling;
                                brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);
                            }
                            //计算下一个开始的位置
                            nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                            //当前消息的偏移量和最大偏移量的差距
                            long diff = maxOffsetPy - maxPhyOffsetPulling;
                            long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
                                * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
                            //如果剩余的偏移差  大于 设置的内存百分比的大小 则建议从 salve节点拉取
                            getResult.setSuggestPullingFromSlave(diff > memory);
                        } finally {
    
                            bufferConsumeQueue.release();
                        }
                    } else {
                        //如果在ConsumeQueue中没有对应的消息,那么
                        status = GetMessageStatus.OFFSET_FOUND_NULL;
                        nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));
                        log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
                            + maxOffset + ", but access logic queue failed.");
                    }
                }
            } else {
                //如果ConsumeQueue没有则返回
                status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
                nextBeginOffset = nextOffsetCorrection(offset, 0);
            }
            //无论是否找到,都增加对应的记录
            if (GetMessageStatus.FOUND == status) {
                this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
            } else {
                this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
            }
            long elapsedTime = this.getSystemClock().now() - beginTime;
            this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime);
            //设置返回的信息
            getResult.setStatus(status);
            getResult.setNextBeginOffset(nextBeginOffset);
            getResult.setMaxOffset(maxOffset);
            getResult.setMinOffset(minOffset);
            return getResult;
        }
    

     在DefaultMessageStore中还有一些别的方法,基本都是跟获取消息有关系的,但是最最后几乎都是调用上面说的getMessage方法。因此这里就不在进行分析了。

    相关文章

      网友评论

          本文标题:RocketMQ源码解析——存储部分(8)操作消息相关日志的中介

          本文链接:https://www.haomeiwen.com/subject/fmpjultx.html