美文网首页
rocketmq之消息存储学习笔记

rocketmq之消息存储学习笔记

作者: heyong | 来源:发表于2019-07-24 15:34 被阅读0次

    一、存储总体结构

    image.png

    从上面的图中可以看出,Broker都是通过DefaultMessageStore实现数据的存储和读取。消息的存储主要是通过调用DefaultMessageStore.putMessage(),实现消息的存储。

    二、消息存储过程

    本节将以消息发送存储为突破点,一点一点揭开RocketMQ 存储设计的神秘面纱。消息存储入口: org.apache.rocketmq.store.DefaultMessageStore#putMessage 。

        public PutMessageResult putMessage(MessageExtBrokerInner msg) {
            if (this.shutdown) {
                log.warn("message store has shutdown, so putMessage is forbidden");
                return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
            }
                    // 如果当前节点是从节点,拒绝写
            if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                long value = this.printTimes.getAndIncrement();
                if ((value % 50000) == 0) {
                    log.warn("message store is slave mode, so putMessage is forbidden ");
                }
                return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
            }
    
            // 当前broker状态,是否可写
            if (!this.runningFlags.isWriteable()) {
                long value = this.printTimes.getAndIncrement();
                if ((value % 50000) == 0) {
                    log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
                }
                return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
            } else {
                this.printTimes.set(0);
            }
                    // 消息主题长度限制
            if (msg.getTopic().length() > Byte.MAX_VALUE) {
                log.warn("putMessage message topic length too long " + msg.getTopic().length());
                return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
            }
                    // 消息属性限制
            if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
                log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
                return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
            }
                    // 是否os cache刷新繁忙
            if (this.isOSPageCacheBusy()) {
                return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
            }
                    // 写数据
            long beginTime = this.getSystemClock().now();
            PutMessageResult result = this.commitLog.putMessage(msg);
            long eclipseTime = this.getSystemClock().now() - beginTime;
            if (eclipseTime > 500) {
                log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
            }
            this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
            if (null == result || !result.isOk()) {
                this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
            }
            return result;
        }
    

    获取到对应的MappedFile写数据

      MappedFile unlockMappedFile = null;
            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
    

    在写入CommitLog 之前,先申请putMessageLock,也就是将消息存储到CornrnitLog 文件中是串行的。

    putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
            try {
                long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
                this.beginTimeInLock = beginLockTimestamp;
                // Here settings are stored timestamp, in order to ensure an orderly
                // global
                msg.setStoreTimestamp(beginLockTimestamp);
    
                if (null == mappedFile || mappedFile.isFull()) {
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
                }
                if (null == mappedFile) {
                    log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
                }
    
                result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                switch (result.getStatus()) {
                    case PUT_OK:
                        break;
                    case END_OF_FILE:
                        unlockMappedFile = mappedFile;
                        // Create a new file, re-write the message
                        mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                        if (null == mappedFile) {
                            // XXX: warn and notify me
                            log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                            beginTimeInLock = 0;
                            return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                        }
                            // 写数据
                        result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                        break;
                    case MESSAGE_SIZE_EXCEEDED:
                    case PROPERTIES_SIZE_EXCEEDED:
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
                    case UNKNOWN_ERROR:
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                    default:
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                }
    
                eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
                beginTimeInLock = 0;
            } finally {
                putMessageLock.unlock();
       }
    

    调用appendMessagesInner写数据

        public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
            assert messageExt != null;
            assert cb != null;
    
            int currentPos = this.wrotePosition.get();
    
            if (currentPos < this.fileSize) {
                // 调用slice共享缓冲区
                ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
                // 写入位置
                byteBuffer.position(currentPos);
                AppendMessageResult result = null;
                if (messageExt instanceof MessageExtBrokerInner) {
                    // 追加消息
                    result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
                } else if (messageExt instanceof MessageExtBatch) {
                    result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
                } else {
                    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
                }
                // 更新写入位置
                this.wrotePosition.addAndGet(result.getWroteBytes());
                this.storeTimestamp = result.getStoreTimestamp();
                return result;
            }
            log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
            return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
        }
    

    创建全局唯一id,消息ID 有16 字节,消息ID 组成如图4-4 所示

    image.png
                // PHY OFFSET
                long wroteOffset = fileFromOffset + byteBuffer.position();
    
                this.resetByteBuffer(hostHolder, 8);
                String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);
    
    

    消息存储格式

    RocketMQ 消息存储格式如下。

    • TOTALSIZE : 该消息条目总长度,4 字节。
    • MAGICCODE : 魔数, 4 字节。固定值Oxdaa320a7 。
    • BODYCRC : 消息体crc校验码, 4 字节。
    • QUEUEID : 消息消费队列ID , 4 字节。
    • FLAG : 消息FLAG , RocketMQ 不做处理, 供应用程序使用,默认4 字节。
    • QUEUEOFFSET :消息在消息消费队列的偏移量, 8 字节。
    • PHYSICALOFFSET : 消息在CommitLog 文件中的偏移量, 8 字节。
    • SYSFLAG : 消息系统Flag ,例如是否压缩、是否是事务消息等, 4 字节。
    • BORNTIMESTAMP : 消息生产者调用消息发送API 的时间戳, 8 字节。
    • BORNHOST :消息发送者IP 、端口号, 8 字节。
    • STORETIMESTAMP : 消息存储时间戳, 8 字节。
    • STOREHOSTADDRESS: Broker 服务器IP+端口号, 8 字节。
    • RECONSUMETIMES : 消息重试次数, 4 字节。
    • Prepared Transaction Offset : 事务消息物理偏移量, 8 字节。
    • BodyLength :消息体长度, 4 字节。
    • Body : 消息体内容,长度为bodyLen th 中存储的值。
    • TopieLength : 主题存储长度, 1 字节,表示主题名称不能超过255 个字符。
    • Topic : 主题,长度为TopieL e n g th 中存储的值。
    • PropertiesLength : 消息属性长度, 2 字节, 表示消息属性长度不能超过6 553 6 个字符。
    • Properties : 消息属性,长度为PropertiesLength 中存储的值。

    上述表示CommitLog 条目是不定长的,每一个条目的长度存储在前4 个字节中。

    如果消息长度+END_FILE_MIN_BLANK_LENGTH 大于CommitLog 文件的空闲空间,则返回AppendMessageStatus.END_OF_FILE, Broker 会重新创建一个新的CommitLog 文件来存储该消息。

    从这里可以看出,每个CommitLog 文件最少会空闲8个字节,高4 字节存储当前文件剩余空间,低4 字节存储魔数: CommitLog.BLANK_MAGIC_CODE 。

                // Determines whether there is sufficient free space
                if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
                    this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
                    // 1 TOTALSIZE
                    this.msgStoreItemMemory.putInt(maxBlank);
                    // 2 MAGICCODE
                    this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
                    // 3 The remaining space may be any value
                    // Here the length of the specially set maxBlank
                    final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
                    byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
                    return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
                        queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
                }
    

    将消息内容存储到ByteBuffer 中,然后创建AppendMessageResult 。这里只是将消息存储在MappedFile 对应的内存映射Buffer 中,并没有刷写到磁盘。

    DefaultAppendMessageCallback#doAppend 只是将消息追加在内存中, 需要根据是同步刷盘还是异步刷盘方式,将内存中的数据持久化到磁盘。

    AppendMessageResult 的属性:

    • AppendMessageStatus status :消息追加结果,取值PUT_OK : 追加成功; END_OF_FILE: 超过文件大小; MESSAGE_SIZE_EXCEEDED :消息长度超过最大允许长度:PROPERTIES_SIZE_EXCEEDED :消息、属性超过最大允许长度; UNKNOWN_ERROR :未知异常。

    • long wroteOffset :消息的物理偏移量。

    • String msgld :消息ID 。

    • long storeTimestamp :消息存储时间戳。

    • long logicsOffset :消息消费队列逻辑偏移量,类似于数组下标。

    • long pagecacheRT = 0 :当前未使用。

    • int msgNum = 1 :消息条数,批量消息发送时消息条数。

    相关文章

      网友评论

          本文标题:rocketmq之消息存储学习笔记

          本文链接:https://www.haomeiwen.com/subject/adwxrctx.html