美文网首页
rocketmq源码10-消息生产及落盘流程

rocketmq源码10-消息生产及落盘流程

作者: modou1618 | 来源:发表于2019-02-25 22:37 被阅读0次

    一 消息生产,落盘流程

    1.1 生产者发送流程

    1.1.1 Message

    • 发送的消息结构体
    • 格式
    public class Message implements Serializable {
        private static final long serialVersionUID = 8445773977080406428L;
    
        private String topic;
        private int flag;
        private Map<String, String> properties;
        private byte[] body;
    }
    
    • 实例化
    public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {
        this.topic = topic;
        this.flag = flag;
        this.body = body;
    
        if (tags != null && tags.length() > 0)
            this.setTags(tags);
    
        if (keys != null && keys.length() > 0)
            this.setKeys(keys);
    
        this.setWaitStoreMsgOK(waitStoreMsgOK);
    }
    
    public static final String PROPERTY_KEYS = "KEYS";
    public static final String PROPERTY_TAGS = "TAGS";
    
    public void setTags(String tags) {
        this.putProperty(MessageConst.PROPERTY_TAGS, tags);
    }
    
    public void setKeys(String keys) {
        this.putProperty(MessageConst.PROPERTY_KEYS, keys);
    }
    

    1.1.2 创建消息唯一id

    • 设置消息id属性
    public static final String PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = "UNIQ_KEY";
    
    public static void setUniqID(final Message msg) {
            if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
                msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, createUniqID());
            }
        }
    
    • 生成消息id
    static {
        LEN = 4 + 2 + 4 + 4 + 2;
        ByteBuffer tempBuffer = ByteBuffer.allocate(10);
        tempBuffer.position(2);
        tempBuffer.putInt(UtilAll.getPid());
        tempBuffer.position(0);
        try {
            tempBuffer.put(UtilAll.getIP());
        } catch (Exception e) {
            tempBuffer.put(createFakeIP());
        }
        tempBuffer.position(6);
        tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode());
        FIX_STRING = UtilAll.bytes2string(tempBuffer.array());
        setStartTime(System.currentTimeMillis());
        COUNTER = new AtomicInteger(0);
    }
    
    public static String createUniqID() {
        StringBuilder sb = new StringBuilder(LEN * 2);
        sb.append(FIX_STRING);
        sb.append(UtilAll.bytes2string(createUniqIDBuffer()));
        return sb.toString();
    }
    
    private static byte[] createUniqIDBuffer() {
        ByteBuffer buffer = ByteBuffer.allocate(4 + 2);
        long current = System.currentTimeMillis();
        if (current >= nextStartTime) {
            setStartTime(current);
        }
        buffer.position(0);
        buffer.putInt((int) (System.currentTimeMillis() - startTime));
        buffer.putShort((short) COUNTER.getAndIncrement());
        return buffer.array();
    }
    

    1.1.3 组装消息

    public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
        RemotingCommand cmd = new RemotingCommand();
        cmd.setCode(code);
        cmd.customHeader = customHeader;
        setCmdVersion(cmd);
        return cmd;
    }
    
    private static void setCmdVersion(RemotingCommand cmd) {
        if (configVersion >= 0) {
            cmd.setVersion(configVersion);
        } else {
            String v = System.getProperty(REMOTING_VERSION_KEY);
            if (v != null) {
                int value = Integer.parseInt(v);
                cmd.setVersion(value);
                configVersion = value;
            }
        }
    }
    

    1.1.4 发送流程

    image.png

    1.2 broker处理

    • 注册发送消息处理函数
    this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
    

    1.2.1 处理流程图

    image.png

    1.2.2 mapperFileQueue.getLastMappedFile

    • 获取最后一个mappedFile,用于存储commitlog,之前的mappedfile肯定已经写满消息了。
    //写拷贝链表存储
    private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
    
    public MappedFile getLastMappedFile() {
        MappedFile mappedFileLast = null;
    
        while (!this.mappedFiles.isEmpty()) {
            try {
                mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
                break;
            } catch (IndexOutOfBoundsException e) {
                //continue;
            } catch (Exception e) {
                log.error("getLastMappedFile has exception.", e);
                break;
            }
        }
    
        return mappedFileLast;
    }
    
    • 按偏移新建
    public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
        long createOffset = -1;
    //获取最后一个mapperfile
        MappedFile mappedFileLast = getLastMappedFile();
    //为空表示需要新建,则计算作为文件名的偏移量,
        if (mappedFileLast == null) {
            createOffset = startOffset - (startOffset % this.mappedFileSize);
        }
    //最后一个mappedfile已经写满,则最后一个文件名加文件大小得到作为新文件名的偏移量
        if (mappedFileLast != null && mappedFileLast.isFull()) {
            createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
        }
    //需要新建文件
        if (createOffset != -1 && needCreate) {
    //计算两个文件的名称+路径
            String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
            String nextNextFilePath = this.storePath + File.separator
                + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
            MappedFile mappedFile = null;
    
            if (this.allocateMappedFileService != null) {
    //使用分配服务创建mappedfile,一次创建两个
                mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                    nextNextFilePath, this.mappedFileSize);
            } else {
                try {//直接对象实例化一个mappedfile
                    mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
                } catch (IOException e) {
                    log.error("create mappedFile exception", e);
                }
            }
    
            if (mappedFile != null) {
                if (this.mappedFiles.isEmpty()) {
    //首个mappedfile创建则标记
                    mappedFile.setFirstCreateInQueue(true);
                }
    //添加到写拷贝链表中
                this.mappedFiles.add(mappedFile);
            }
    
            return mappedFile;
        }
    
        return mappedFileLast;
    }
    
    • mappedFile实例化,使用map映射文件到内存空间中,避免文件数据在系统内存空间和用户内存空间之间的一次拷贝
    private void init(final String fileName, final int fileSize) throws IOException {
        this.fileName = fileName;
        this.fileSize = fileSize;
        this.file = new File(fileName);
        this.fileFromOffset = Long.parseLong(this.file.getName());
        boolean ok = false;
    
        ensureDirOK(this.file.getParent());
    
        try {
            this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
    //使用map映射文件到内存空间中,避免文件数据在系统内存空间和用户内存空间之间的一次拷贝
            this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
            TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
            TOTAL_MAPPED_FILES.incrementAndGet();
            ok = true;
        } catch (FileNotFoundException e) {
            log.error("create file channel " + this.fileName + " Failed. ", e);
            throw e;
        } catch (IOException e) {
            log.error("map file " + this.fileName + " Failed. ", e);
            throw e;
        } finally {
            if (!ok && this.fileChannel != null) {
                this.fileChannel.close();
            }
        }
    }
    

    1.2.2.1 allocateMappedFileService

    • putRequestAndReturnMappedFile,构建AllocateRequest,异步初始化mappedfile。
      一次创建两个,本次返回第一个,第一个mappedfile使用完后,下次调用putRequestAndReturnMappedFile函数请求时从requestTable map中根据文件路径获取到第二个
    public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
        int canSubmitRequests = 2;// 默认一次创建2个mappedfile
        if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {//支持使用堆外内存创建mappedfile
            if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
                && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
                canSubmitRequests = this.messageStore.getTransientStorePool().remainBufferNumbs() - this.requestQueue.size();
            }
        }
    
        AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
        boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
    
        if (nextPutOK) {
            if (canSubmitRequests <= 0) {
                log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
                    "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
                this.requestTable.remove(nextFilePath);
                return null;
            }
            boolean offerOK = this.requestQueue.offer(nextReq);
            if (!offerOK) {
                log.warn("never expected here, add a request to preallocate queue failed");
            }
            canSubmitRequests--;
        }
    
        AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
        boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
        if (nextNextPutOK) {
            if (canSubmitRequests <= 0) {
                log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
                    "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
                this.requestTable.remove(nextNextFilePath);
            } else {
                boolean offerOK = this.requestQueue.offer(nextNextReq);
                if (!offerOK) {
                    log.warn("never expected here, add a request to preallocate queue failed");
                }
            }
        }
    
        if (hasException) {
            log.warn(this.getServiceName() + " service has exception. so return null");
            return null;
        }
    //构建请求放入阻塞队列中
    //countDownLatch等待mappedfile创建成功
        AllocateRequest result = this.requestTable.get(nextFilePath);
        try {
            if (result != null) {
                boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
                if (!waitOK) {
                    log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
                    return null;
                } else {
                    this.requestTable.remove(nextFilePath);
                    return result.getMappedFile();
                }
            } else {
                log.error("find preallocate mmap failed, this never happen");
            }
        } catch (InterruptedException e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
        }
    
        return null;
    }
    
    • 异步创建mappedfile,
      支持使用堆外内存创建mappedfile用于写消息,读文件仍使用mmap文件映射的内存,从而读写分离。
      文件预热,mmap文件映射内存的每page写一个字节0并刷新到磁盘上。每1000页sleep(0)释放一次cpu, mlock锁定文件映射内存,禁止swap换到磁盘上,标记内存为LibC.MADV_WILLNEED
    private boolean mmapOperation() {
        boolean isSuccess = false;
        AllocateRequest req = null;
        try {
            req = this.requestQueue.take();
            AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());//超时还未创建,则请求从表中删除,不再处理
            if (null == expectedRequest) {
                log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
                    + req.getFileSize());
                return true;
            }
            if (expectedRequest != req) {
                log.warn("never expected here,  maybe cause timeout " + req.getFilePath() + " "
                    + req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
                return true;
            }
    
            if (req.getMappedFile() == null) {
                long beginTime = System.currentTimeMillis();
    
                MappedFile mappedFile;
                if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {//使用堆外内存创建mappedfile
                    try {
                        mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
                        mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                    } catch (RuntimeException e) {
                        log.warn("Use default implementation.");
                        mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                    }
                } else {
                    mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
                }
    
                long eclipseTime = UtilAll.computeEclipseTimeMilliseconds(beginTime);
                if (eclipseTime > 10) {
                    int queueSize = this.requestQueue.size();
                    log.warn("create mappedFile spent time(ms) " + eclipseTime + " queue size " + queueSize
                        + " " + req.getFilePath() + " " + req.getFileSize());
                }
    
                // pre write mappedFile
                if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
                    .getMapedFileSizeCommitLog()
                    &&
                    this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {//mappedfile文件预热,
                    mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
                        this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
                }
    
                req.setMappedFile(mappedFile);
                this.hasException = false;
                isSuccess = true;
            }
        } catch (InterruptedException e) {
            log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
            this.hasException = true;
            return false;
        } catch (IOException e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
            this.hasException = true;
            if (null != req) {
                requestQueue.offer(req);
                try {
                    Thread.sleep(1);
                } catch (InterruptedException ignored) {
                }
            }
        } finally {
            if (req != null && isSuccess)//通知等待线程
                req.getCountDownLatch().countDown();
        }
        return true;
    }
    

    1.2.3 mappedFile.appendMessage

    • 保存消息
    public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
        assert messageExt != null;
        assert cb != null;
    
        int currentPos = this.wrotePosition.get();
    //当前写消息写到的位置
        if (currentPos < this.fileSize) {
    //堆外内存写,还是mmap文件映射内存写
            ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
            byteBuffer.position(currentPos);//调整位置
            AppendMessageResult result = null;
            if (messageExt instanceof MessageExtBrokerInner) {
    //存储消息,参数为文件名其实偏移,存储内存,剩余空间,消息体
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
            } else if (messageExt instanceof MessageExtBatch) {
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
            } else {
                return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
            }//更新写偏移位置,和写时间戳
            this.wrotePosition.addAndGet(result.getWroteBytes());
            this.storeTimestamp = result.getStoreTimestamp();
            return result;
        }
        log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }
    
    • 实际写内存函数doAppend()
    public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
        final MessageExtBrokerInner msgInner) {
        // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
    
        // PHY OFFSET,写消息的物理偏移地址
        long wroteOffset = fileFromOffset + byteBuffer.position();
    //生产消息id
        this.resetByteBuffer(hostHolder, 8);
        String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);
    
        //topic-queueid为key,存储队列偏移量到topicQueueTable中
        keyBuilder.setLength(0);
        keyBuilder.append(msgInner.getTopic());
        keyBuilder.append('-');
        keyBuilder.append(msgInner.getQueueId());
        String key = keyBuilder.toString();
        Long queueOffset = CommitLog.this.topicQueueTable.get(key);
        if (null == queueOffset) {
            queueOffset = 0L;
            CommitLog.this.topicQueueTable.put(key, queueOffset);
        }
    
        // Transaction messages that require special handling
       ...
    
        /**
         * Serialize message
         */
        final byte[] propertiesData =
            msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
    
        final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
    
        if (propertiesLength > Short.MAX_VALUE) {
            log.warn("putMessage message properties length too long. length={}", propertiesData.length);
            return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
        }
    
        final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
        final int topicLength = topicData.length;
    
        final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
    
        final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);
    
        // Exceeds the maximum message
        if (msgLen > this.maxMessageSize) {
            CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
                + ", maxMessageSize: " + this.maxMessageSize);
            return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
        }
    
        // Determines whether there is sufficient free space
        if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {//没有足够空间存储消息,标记当前文件结束。返回结果
            this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
            // 1 TOTALSIZE
            this.msgStoreItemMemory.putInt(maxBlank);
            // 2 MAGICCODE
            this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
            // 3 The remaining space may be any value
            // Here the length of the specially set maxBlank
            final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
            byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
            return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
                queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
        }
       //写消息
        ...
    
        AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
            msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
    ...
        return result;
    }
    

    1.2.4 END_OF_FILE返回值

    • 当前mappedfile空间不够写消息
    • 重新分配新的mappedfile写消息
    case END_OF_FILE:
        unlockMappedFile = mappedFile;
        // Create a new file, re-write the message
        mappedFile = this.mappedFileQueue.getLastMappedFile(0);
        if (null == mappedFile) {
            // XXX: warn and notify me
            log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
            beginTimeInLock = 0;
            return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
        }
        result = mappedFile.appendMessage(msg, this.appendMessageCallback);
        break;
    

    1.2.5 磁盘持久化

    1.2.5.1同步刷盘GroupCommitService

    • 线程处理同步刷盘请求
    • countDownLatch等待刷盘完成
    public void run() {
        CommitLog.log.info(this.getServiceName() + " service started");
    
        while (!this.isStopped()) {
            try {
    //切换刷盘请求到read列表中,作为本次刷盘处理目标。
                this.waitForRunning(10);
    //执行read列表中的刷盘请求
                this.doCommit();
            } catch (Exception e) {
                CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }
    
        // Under normal circumstances shutdown, wait for the arrival of the
        // request, and then flush
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            CommitLog.log.warn("GroupCommitService Exception, ", e);
        }
    
        synchronized (this) {
            this.swapRequests();
        }
    
        this.doCommit();
    
        CommitLog.log.info(this.getServiceName() + " service end");
    }
    
    //无刷盘请求,则直接执行刷盘,有刷盘请求则按请求刷盘,countdownlatch通知放入刷盘请求的等待方。
    private void doCommit() {
        synchronized (this.requestsRead) {
            if (!this.requestsRead.isEmpty()) {//线程结束,只处理读请求
                for (GroupCommitRequest req : this.requestsRead) {
                    // There may be a message in the next file, so a maximum of
                    // two times the flush
                    boolean flushOK = false;
                    for (int i = 0; i < 2 && !flushOK; i++) {
                        flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
    
                        if (!flushOK) {
                            CommitLog.this.mappedFileQueue.flush(0);
                        }
                    }
    
                    req.wakeupCustomer(flushOK);
                }
    
                long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                if (storeTimestamp > 0) {
                    CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                }
    
                this.requestsRead.clear();
            } else {
                // Because of individual messages is set to not sync flush, it
                // will come to this process
                CommitLog.this.mappedFileQueue.flush(0);
            }
        }
    //mappedFileQueue刷盘接口
    public boolean flush(final int flushLeastPages) {
        boolean result = true;
    //获取未刷盘的第一个mappedfile
        MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
        if (mappedFile != null) {
            long tmpTimeStamp = mappedFile.getStoreTimestamp();
            int offset = mappedFile.flush(flushLeastPages);//刷盘
            long where = mappedFile.getFileFromOffset() + offset;
            result = where == this.flushedWhere;
            this.flushedWhere = where;//更新刷盘的位置
            if (0 == flushLeastPages) {
                this.storeTimestamp = tmpTimeStamp;//更新刷盘时间
            }
        }
    
        return result;
    }
    //mappedfile刷盘接口
    public int flush(final int flushLeastPages) {
    //检查是否有未刷盘的数据
            if (this.isAbleToFlush(flushLeastPages)) {
                if (this.hold()) {//增加引用计数
                    int value = getReadPosition();
    
                    try {
                        //We only append data to fileChannel or mappedByteBuffer, never both.  
                        if (writeBuffer != null || this.fileChannel.position() != 0) {//堆外内存刷盘
                            this.fileChannel.force(false);
                        } else {//mapped 文件映射内存刷盘
                            this.mappedByteBuffer.force();
                        }
                    } catch (Throwable e) {
                        log.error("Error occurred when force data to disk.", e);
                    }
    //更新刷盘位置为写入消息长度
                    this.flushedPosition.set(value);
                    this.release();//释放引用计数
                } else {
                    log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
                    this.flushedPosition.set(getReadPosition());
                }
            }
            return this.getFlushedPosition();
        }
    

    1.2.5.2异步刷盘

    1.2.5.2.1 mmap文件映射内存刷盘,FlushRealTimeService
    • 线程周期执行刷盘服务
    public void run() {
        CommitLog.log.info(this.getServiceName() + " service started");
    
        while (!this.isStopped()) {
            boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
    
            int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();//刷盘周期,少于配置页数可不刷盘
            int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();//最少刷盘页数数据,
    
            int flushPhysicQueueThoroughInterval =
                CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();//必须刷盘的周期
    
            boolean printFlushProgress = false;
    
            // Print flush progress
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
                this.lastFlushTimestamp = currentTimeMillis;
                flushPhysicQueueLeastPages = 0;
                printFlushProgress = (printTimes++ % 10) == 0;
            }
    
            try {
                if (flushCommitLogTimed) {
                    Thread.sleep(interval);
                } else {
                    this.waitForRunning(interval);
                }
    
                if (printFlushProgress) {
                    this.printFlushProgress();
                }
    
                long begin = System.currentTimeMillis();
                CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
                long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                if (storeTimestamp > 0) {
                    CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                }
                long past = System.currentTimeMillis() - begin;
                if (past > 500) {
                    log.info("Flush data to disk costs {} ms", past);
                }
            } catch (Throwable e) {
                CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                this.printFlushProgress();
            }
        }
    
        // Normal shutdown, to ensure that all the flush before exit
        boolean result = false;
        for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
            result = CommitLog.this.mappedFileQueue.flush(0);
            CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
        }
    
        this.printFlushProgress();
    
        CommitLog.log.info(this.getServiceName() + " service end");
    }
    
    1.2.5.2.2 堆外内存刷盘 CommitRealTimeService
    • 周期提交堆外内存数据到文件mmap映射内存中,然后由FlushRealTimeService周期刷盘到磁盘中
    public void run() {
            CommitLog.log.info(this.getServiceName() + " service started");
            while (!this.isStopped()) {
               ...
                try {//提交堆外内存数据到文件映射内存中
                    boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
                   ...
            }
    
            boolean result = false;
            for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
                result = CommitLog.this.mappedFileQueue.commit(0);
                CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
            }
            CommitLog.log.info(this.getServiceName() + " service end");
        }
    }
    
    • mappedfilequeue.commit()提交数据到文件映射内存中
    public boolean commit(final int commitLeastPages) {
        boolean result = true;
        MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);
        if (mappedFile != null) {//提交
            int offset = mappedFile.commit(commitLeastPages);
            long where = mappedFile.getFileFromOffset() + offset;
            result = where == this.committedWhere;
            this.committedWhere = where;//计算提交偏移量
        }
    
        return result;
    }
    
    public int commit(final int commitLeastPages) {
        if (writeBuffer == null) {
            //no need to commit data to file channel, so just regard wrotePosition as committedPosition.
            return this.wrotePosition.get();
        }
        if (this.isAbleToCommit(commitLeastPages)) {//有新数据未提交
            if (this.hold()) {
                commit0(commitLeastPages);//提交
                this.release();
            } else {
                log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
            }
        }
    
        // All dirty data has been committed to FileChannel.
        if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
            this.transientStorePool.returnBuffer(writeBuffer);
            this.writeBuffer = null;
        }
    
        return this.committedPosition.get();
    }
    
    protected void commit0(final int commitLeastPages) {
        int writePos = this.wrotePosition.get();
        int lastCommittedPosition = this.committedPosition.get();
    
        if (writePos - this.committedPosition.get() > 0) {
            try {
                ByteBuffer byteBuffer = writeBuffer.slice();
                byteBuffer.position(lastCommittedPosition);
                byteBuffer.limit(writePos);
                this.fileChannel.position(lastCommittedPosition);
                this.fileChannel.write(byteBuffer);//获取堆外内存,写入文件映射内存中。
                this.committedPosition.set(writePos);
            } catch (Throwable e) {
                log.error("Error occurred when commit data to FileChannel.", e);
            }
        }
    }
    

    1.2.6 主从同步

    • 同步刷新方式,调用haservice同步
    public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
        if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
            HAService service = this.defaultMessageStore.getHaService();
            if (messageExt.isWaitStoreMsgOK()) {
                // Determine whether to wait
                if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
                    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                    service.putRequest(request);
                    service.getWaitNotifyObject().wakeupAll();
                    boolean flushOK =
                        request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                    if (!flushOK) {
                        log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
                            + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
                        putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                    }
                }
                // Slave problem
                else {
                    // Tell the producer, slave not available
                    putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
                }
            }
        }
    
    }
    

    1.3 ReputMessageService

    • 生成consumequeue和indexfile
    private void doReput() {
        for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {//存在新的消息,需要构建消费队列和索引文件
    
            if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
                && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
                break;
            }
    
            SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
            if (result != null) {//获取新消息
                try {
                    this.reputFromOffset = result.getStartOffset();
    
                    for (int readSize = 0; readSize < result.getSize() && doNext; ) {//解析消息,组装重构请求DispatchRequest
                        DispatchRequest dispatchRequest =
                            DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                        int size = dispatchRequest.getMsgSize();
    
                        if (dispatchRequest.isSuccess()) {
                            if (size > 0) {//构建索引文件和消费队列文件
                                DefaultMessageStore.this.doDispatch(dispatchRequest);
    //master且支持longpolling,则通知有新的请求达到,可处理
                                if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
                                    && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
                                    DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                        dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                        dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                        dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                                }
    
                                this.reputFromOffset += size;
                                readSize += size;
                                if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {//从节点进行统计
                                    DefaultMessageStore.this.storeStatsService
                                        .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
                                    DefaultMessageStore.this.storeStatsService
                                        .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
                                        .addAndGet(dispatchRequest.getMsgSize());
                                }
                            } else if (size == 0) {//无消息体,则继续处理下一消息
                                this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                                readSize = result.getSize();
                            }
                        } else if (!dispatchRequest.isSuccess()) {
    
                            if (size > 0) {
                                log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
                                this.reputFromOffset += size;
                            } else {
                                doNext = false;
                                if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
                                    log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
                                        this.reputFromOffset);
    
                                    this.reputFromOffset += result.getSize() - readSize;
                                }
                            }
                        }
                    }
                } finally {
                    result.release();
                }
            } else {
                doNext = false;
            }
        }
    }
    

    1.3.1 消费队列文件构建

    class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
    
            @Override
            public void dispatch(DispatchRequest request) {
                final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
                switch (tranType) {
                    case MessageSysFlag.TRANSACTION_NOT_TYPE:
                    case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                        //非事务消息构建消费队列
    DefaultMessageStore.this.putMessagePositionInfo(request);
                        break;
                    case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                    case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                        break;
                }
            }
        }
    
    public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
    //查找消费队列,topic+queue id
        ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
    //添加消息信息
        cq.putMessagePositionInfoWrapper(dispatchRequest);
    }
    
    • 写入消息物理偏移,消息大小,消息tag信息到消费队列文件的mappedfile中
     boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
                    request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
    

    1.3.2 索引文件构建

        class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
    
            @Override
            public void dispatch(DispatchRequest request) {
                if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
                    DefaultMessageStore.this.indexService.buildIndex(request);
                }
            }
        }
    
    • 获取索引文件的内存映射,写入消息索引信息
    //唯一key+topic生成索引
    if (req.getUniqKey() != null) {
        indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
        if (indexFile == null) {
            log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
            return;
        }
    }
    //消息的key+topic生成索引
    if (keys != null && keys.length() > 0) {
        String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
        for (int i = 0; i < keyset.length; i++) {
            String key = keyset[i];
            if (key.length() > 0) {
                indexFile = putKey(indexFile, msg, buildKey(topic, key));
                if (indexFile == null) {
                    log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                    return;
                }
            }
        }
    }
    
    • 索引文件写入key+消息在commitlog中的物理偏移位置
    public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
    //存放固定数量索引
        if (this.indexHeader.getIndexCount() < this.indexNum) {
            int keyHash = indexKeyHashMethod(key);//计算hash
            int slotPos = keyHash % this.hashSlotNum;//计算hash所在桶
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;//计算桶的起始偏移
    
            FileLock fileLock = null;
    
            try {
    
                // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
                // false);
    //获取下一索引存储位置的索引
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                    slotValue = invalidIndex;
                }
    
                long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
    
                timeDiff = timeDiff / 1000;
    
                if (this.indexHeader.getBeginTimestamp() <= 0) {
                    timeDiff = 0;
                } else if (timeDiff > Integer.MAX_VALUE) {
                    timeDiff = Integer.MAX_VALUE;
                } else if (timeDiff < 0) {
                    timeDiff = 0;
                }
    //计算索引实际存储位置
                int absIndexPos =
                    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                        + this.indexHeader.getIndexCount() * indexSize;
    //写入索引hash,消息commitlog物理偏移,时间,索引的桶内索引
                this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
    //写入本桶的下一可用索引
                this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
              //更新索引文件的统计数据
                if (this.indexHeader.getIndexCount() <= 1) {
                    this.indexHeader.setBeginPhyOffset(phyOffset);
                    this.indexHeader.setBeginTimestamp(storeTimestamp);
                }
    
                this.indexHeader.incHashSlotCount();
                this.indexHeader.incIndexCount();
                this.indexHeader.setEndPhyOffset(phyOffset);
                this.indexHeader.setEndTimestamp(storeTimestamp);
    
                return true;
            } catch (Exception e) {
                log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
            } finally {
                if (fileLock != null) {
                    try {
                        fileLock.release();
                    } catch (IOException e) {
                        log.error("Failed to release the lock", e);
                    }
                }
            }
        } else {
            log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
                + "; index max num = " + this.indexNum);
        }
    
        return false;
    }
    

    相关文章

      网友评论

          本文标题:rocketmq源码10-消息生产及落盘流程

          本文链接:https://www.haomeiwen.com/subject/stnbyqtx.html