美文网首页
rocketmq之消息存储学习笔记

rocketmq之消息存储学习笔记

作者: heyong | 来源:发表于2019-07-24 15:34 被阅读0次

一、存储总体结构

image.png

从上面的图中可以看出,Broker都是通过DefaultMessageStore实现数据的存储和读取。消息的存储主要是通过调用DefaultMessageStore.putMessage(),实现消息的存储。

二、消息存储过程

本节将以消息发送存储为突破点,一点一点揭开RocketMQ 存储设计的神秘面纱。消息存储入口: org.apache.rocketmq.store.DefaultMessageStore#putMessage 。

    public PutMessageResult putMessage(MessageExtBrokerInner msg) {
        if (this.shutdown) {
            log.warn("message store has shutdown, so putMessage is forbidden");
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        }
                // 如果当前节点是从节点,拒绝写
        if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
            long value = this.printTimes.getAndIncrement();
            if ((value % 50000) == 0) {
                log.warn("message store is slave mode, so putMessage is forbidden ");
            }
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        }

        // 当前broker状态,是否可写
        if (!this.runningFlags.isWriteable()) {
            long value = this.printTimes.getAndIncrement();
            if ((value % 50000) == 0) {
                log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
            }
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        } else {
            this.printTimes.set(0);
        }
                // 消息主题长度限制
        if (msg.getTopic().length() > Byte.MAX_VALUE) {
            log.warn("putMessage message topic length too long " + msg.getTopic().length());
            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
        }
                // 消息属性限制
        if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
            log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
            return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
        }
                // 是否os cache刷新繁忙
        if (this.isOSPageCacheBusy()) {
            return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
        }
                // 写数据
        long beginTime = this.getSystemClock().now();
        PutMessageResult result = this.commitLog.putMessage(msg);
        long eclipseTime = this.getSystemClock().now() - beginTime;
        if (eclipseTime > 500) {
            log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
        }
        this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
        if (null == result || !result.isOk()) {
            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
        }
        return result;
    }

获取到对应的MappedFile写数据

  MappedFile unlockMappedFile = null;
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

在写入CommitLog 之前,先申请putMessageLock,也就是将消息存储到CornrnitLog 文件中是串行的。

putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
        try {
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;
            // Here settings are stored timestamp, in order to ensure an orderly
            // global
            msg.setStoreTimestamp(beginLockTimestamp);

            if (null == mappedFile || mappedFile.isFull()) {
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
            }
            if (null == mappedFile) {
                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
            }

            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
            switch (result.getStatus()) {
                case PUT_OK:
                    break;
                case END_OF_FILE:
                    unlockMappedFile = mappedFile;
                    // Create a new file, re-write the message
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                    if (null == mappedFile) {
                        // XXX: warn and notify me
                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                    }
                        // 写数据
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                    break;
                case MESSAGE_SIZE_EXCEEDED:
                case PROPERTIES_SIZE_EXCEEDED:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
                case UNKNOWN_ERROR:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                default:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
            }

            eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            beginTimeInLock = 0;
        } finally {
            putMessageLock.unlock();
   }

调用appendMessagesInner写数据

    public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
        assert messageExt != null;
        assert cb != null;

        int currentPos = this.wrotePosition.get();

        if (currentPos < this.fileSize) {
            // 调用slice共享缓冲区
            ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
            // 写入位置
            byteBuffer.position(currentPos);
            AppendMessageResult result = null;
            if (messageExt instanceof MessageExtBrokerInner) {
                // 追加消息
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
            } else if (messageExt instanceof MessageExtBatch) {
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
            } else {
                return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
            }
            // 更新写入位置
            this.wrotePosition.addAndGet(result.getWroteBytes());
            this.storeTimestamp = result.getStoreTimestamp();
            return result;
        }
        log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }

创建全局唯一id,消息ID 有16 字节,消息ID 组成如图4-4 所示

image.png
            // PHY OFFSET
            long wroteOffset = fileFromOffset + byteBuffer.position();

            this.resetByteBuffer(hostHolder, 8);
            String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);

消息存储格式

RocketMQ 消息存储格式如下。

  • TOTALSIZE : 该消息条目总长度,4 字节。
  • MAGICCODE : 魔数, 4 字节。固定值Oxdaa320a7 。
  • BODYCRC : 消息体crc校验码, 4 字节。
  • QUEUEID : 消息消费队列ID , 4 字节。
  • FLAG : 消息FLAG , RocketMQ 不做处理, 供应用程序使用,默认4 字节。
  • QUEUEOFFSET :消息在消息消费队列的偏移量, 8 字节。
  • PHYSICALOFFSET : 消息在CommitLog 文件中的偏移量, 8 字节。
  • SYSFLAG : 消息系统Flag ,例如是否压缩、是否是事务消息等, 4 字节。
  • BORNTIMESTAMP : 消息生产者调用消息发送API 的时间戳, 8 字节。
  • BORNHOST :消息发送者IP 、端口号, 8 字节。
  • STORETIMESTAMP : 消息存储时间戳, 8 字节。
  • STOREHOSTADDRESS: Broker 服务器IP+端口号, 8 字节。
  • RECONSUMETIMES : 消息重试次数, 4 字节。
  • Prepared Transaction Offset : 事务消息物理偏移量, 8 字节。
  • BodyLength :消息体长度, 4 字节。
  • Body : 消息体内容,长度为bodyLen th 中存储的值。
  • TopieLength : 主题存储长度, 1 字节,表示主题名称不能超过255 个字符。
  • Topic : 主题,长度为TopieL e n g th 中存储的值。
  • PropertiesLength : 消息属性长度, 2 字节, 表示消息属性长度不能超过6 553 6 个字符。
  • Properties : 消息属性,长度为PropertiesLength 中存储的值。

上述表示CommitLog 条目是不定长的,每一个条目的长度存储在前4 个字节中。

如果消息长度+END_FILE_MIN_BLANK_LENGTH 大于CommitLog 文件的空闲空间,则返回AppendMessageStatus.END_OF_FILE, Broker 会重新创建一个新的CommitLog 文件来存储该消息。

从这里可以看出,每个CommitLog 文件最少会空闲8个字节,高4 字节存储当前文件剩余空间,低4 字节存储魔数: CommitLog.BLANK_MAGIC_CODE 。

            // Determines whether there is sufficient free space
            if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
                this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
                // 1 TOTALSIZE
                this.msgStoreItemMemory.putInt(maxBlank);
                // 2 MAGICCODE
                this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
                // 3 The remaining space may be any value
                // Here the length of the specially set maxBlank
                final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
                byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
                return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
                    queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
            }

将消息内容存储到ByteBuffer 中,然后创建AppendMessageResult 。这里只是将消息存储在MappedFile 对应的内存映射Buffer 中,并没有刷写到磁盘。

DefaultAppendMessageCallback#doAppend 只是将消息追加在内存中, 需要根据是同步刷盘还是异步刷盘方式,将内存中的数据持久化到磁盘。

AppendMessageResult 的属性:

  • AppendMessageStatus status :消息追加结果,取值PUT_OK : 追加成功; END_OF_FILE: 超过文件大小; MESSAGE_SIZE_EXCEEDED :消息长度超过最大允许长度:PROPERTIES_SIZE_EXCEEDED :消息、属性超过最大允许长度; UNKNOWN_ERROR :未知异常。

  • long wroteOffset :消息的物理偏移量。

  • String msgld :消息ID 。

  • long storeTimestamp :消息存储时间戳。

  • long logicsOffset :消息消费队列逻辑偏移量,类似于数组下标。

  • long pagecacheRT = 0 :当前未使用。

  • int msgNum = 1 :消息条数,批量消息发送时消息条数。

相关文章

  • rocketmq之消息存储学习笔记

    一、存储总体结构 从上面的图中可以看出,Broker都是通过DefaultMessageStore实现数据的存储和...

  • RocketMQ阅读笔记之消息存储

    消息存储部分是RocketMQ的重要组成部分,良好的存储机制会有效降低延迟,提高整体效率。RocketMQ利用到了...

  • RocketMQ消息存储

    RocketMQ消息存储 1 CommitLog 要想知道RocketMQ如何存储消息,我们先看看CommitLo...

  • RocketMQ的消息存储格式

    总体代码 我们可以通过阅读RocketMQ的消息存储代码来了解RocketMQ的消息存储格式,消息的存储入口是De...

  • RocketMQ——RocketMQ消息存储

    DefaultMQPushConsumer 属性consumerGroup消费组名称messageModel消息消...

  • 四、设计

    1 消息存储 消息存储是RocketMQ中最为复杂和最为重要的一部分,本节将分别从RocketMQ的消息存储整体架...

  • rocketMQ 设计

    1 消息存储 消息存储是RocketMQ中最为复杂和最为重要的一部分,本节将分别从RocketMQ的消息存储整体架...

  • RocketMq消息存储

    一个消息中间件最核心的东西就是消息存储结构。 这是kafka的消息存储: 每个topic_partition对应一...

  • RocketMQ:消息存储

    通常来说我们对分布式队列有高可靠性的要求,所以数据要进行持久化存储。 消息生产者发送消息到MQ。 MQ收到消息,将...

  • rocketMQ消息存储

网友评论

      本文标题:rocketmq之消息存储学习笔记

      本文链接:https://www.haomeiwen.com/subject/adwxrctx.html