美文网首页消息中间件选型
RocketMQ源码解析(六)-Broker#MessageSt

RocketMQ源码解析(六)-Broker#MessageSt

作者: 空挡 | 来源:发表于2019-01-10 22:48 被阅读0次

    Broker将消息存储抽象成MessageStore接口,默认实现类是DefaultMessageStore。主要提供如下方法:

    • 保存消息,包括单条和批量保存
    • 根据topic、queue和offset批量获取消息,consumer使用该方法来拉取消息
    • 根据消息offset读取消息详情,根据messageId查询消息时使用该方法
    • 根据messageKey查询消息,可提供给终端用户使用
      下面我们根据一个MessageStore的数据结构图来看下消息是如何存储的

    数据结构图

    MessageStore数据结构图
    【注】以上图片转载自博客RocketMQ消息存储流程图及数据结构

    数据结构

    通过上面的图可以看到消息存储涉及到一下几个数据结构:
    CommitLog,存储消息的详细内容,按照消息收到的顺序,所有消息都存储在一起。每个消息存储后都会有一个offset,代表在commitLog中的偏移量。举个例子,当前commitLog文件的大小是12413435字节,那下一条消息到来后它的offset就是12413436。这个说法不是非常准确,但是offset大概是这么计算来的。commitLog并不是一个文件,而是一系列文件(上图中的MappedFile)。每个文件的大小都是固定的(默认1G),写满一个会生成一个新的文件,新文件的文件名就是它存储的第一条消息的offset。
    ConsumeQueue,既然所有消息都是存储在一个commitLog中,但是consumer是按照topic+queue的维度来消费消息的,没有办法直接从commitLog中读取,所以针对每个topic的每个queue都会生成consumeQueue文件。ConsumeQueue文件中存储的是消息在commitLog中的offset,可以理解成一个按queue建的索引,每条消息占用20字节(上图中的一个cq)。跟commitLog一样,每个Queue文件也是一系列连续的文件组成,每个文件默认放30w个offset。
    IndexFile,CommitLog的另外一种形式的索引文件,只是索引的是messageKey,每个MsgKey经过hash后计算存储的slot,然后将offset存到IndexFile的相应slot上。根据msgKey来查询消息时,可以先到IndexFile中查询offset,然后根据offset去commitLog中查询消息详情。

    线程服务

    MessageStore除了上面的数据结构以外,还需要相应的服务来对数据做操作。
    IndexService,负责读写IndexFile的服务
    ReputMessageService,消息存储到commitLog后,MessageStore的接口调用就直接返回了,后续由ReputMessageService负责将消息分发到ConsumeQueueIndexService
    HAService,负责将master-slave之间的消息数据同步
    以上就是MessageStore的整体结构了,下面看下它的启动过程。

    MessageStore启动

    启动入口在DefaultMessageStore.start()方法:

    public void start() throws Exception {
            //1、写lock 文件,尝试获取lock文件锁,保证磁盘上的文件只会被一个messageStore读写
            lock = lockFile.getChannel().tryLock(0, 1, false);
            if (lock == null || lock.isShared() || !lock.isValid()) {
                throw new RuntimeException("Lock failed,MQ already started");
            }
    
            lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
            lockFile.getChannel().force(true);
            //2、启动FlushConsumeQueueService,是一个单线程的服务,定时将consumeQueue文件的数据刷新到磁盘,周期由参数flushIntervalConsumeQueue设置,默认1sec
            this.flushConsumeQueueService.start();
            //3、启动CommitLog
            this.commitLog.start();
            //4、消息存储指标统计服务,RT,TPS...
            this.storeStatsService.start();
            //5、针对master,启动延时消息调度服务
            if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) {
                this.scheduleMessageService.start();
            }
            //6、启动reputMessageService,该服务负责将CommitLog中的消息offset记录到cosumeQueue文件中
            if (this.getMessageStoreConfig().isDuplicationEnable()) {
                this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
            } else {
                this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
            }
            this.reputMessageService.start();
            //7、启动haService,数据主从同步的服务
            this.haService.start();
            //8、对于新的broker,初始化文件存储的目录
            this.createTempFile();
            //9、启动定时任务
            this.addScheduleTask();
            this.shutdown = false;
        }
    

    以上就是整个MessageStore服务启动的过程,其中有几项下面解释一下:

    • 第2步,数据写入文件后,因为多级缓存的原因不会马上写到磁盘上,所以会有一个单独的线程定时调用flush,这里是flush consumeQueue文件的。CommitLogIndexFile的也有类似的逻辑,只是不是在这里启动的
    • 第3步,启动CommitLog,CommitLog.start()代码如下:
        public void start() {
            //加载刷盘服务
            this.flushCommitLogService.start();
            //storePool flush
            if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                this.commitLogService.start();
            }
        }
    

    FlushCommitLogService,跟第2步类似的,该服务负责将CommitLog的数据flush到磁盘,针对同步刷盘和异步刷盘,有两种实现方式
    CommitLogService,这个service只有在采用内存池缓存消息的时候才需要启动。在使用内存池的时候,这个服务会定时将内存池中的数据刷新到FileChannel中,这个我们后面讲CommitLog的文章中再详细讲。

    • 第5步,在consumer的时候讲过,如果消息失败,broker会延时重发。对于延时重发消息(topic=SCHEDULE_TOPIC_XXXX),这个服务负责检查是否有消息到了发送时间,到了时间则从延时队列中取出后重新发送
    • 第7步,如果是Master,HAService默认监听10912端口,接收slave的连接请求,然后将消息推送给slave;如果是Slave,则通过该服务连接Master接收数据
    • 第9步,这里的定时任务主要有以下几个:
    1. 定时清理过期的commitLog、cosumeQueue和Index数据文件, 默认文件写满后会保存72小时
    2. 定时自检commitLog和consumerQueue文件,校验文件是否完整。主要用于监控,不会做修复文件的动作。
    3. 定时检查commitLog的Lock时长(因为在write或者flush时侯会lock),如果lock的时间过长,则打印jvm堆栈,用于监控。

    以上就是整个启动的过程了,后续的文章开始讲解Broker是怎样接收Producer消息,还有怎样将消息交给Consumer的。

    相关文章

      网友评论

        本文标题:RocketMQ源码解析(六)-Broker#MessageSt

        本文链接:https://www.haomeiwen.com/subject/eilnrqtx.html