美文网首页
RocketMQ源码解析(八)-Broker#消息发送

RocketMQ源码解析(八)-Broker#消息发送

作者: 空挡 | 来源:发表于2019-01-13 22:13 被阅读0次

    从这篇文章开始,主要看一下Broker处理一条消息的完整过程:接收Producer发来的消息->存储消息->将消息推给Consumer。这一篇主要看下接收Producer消息的过程。

    消息接口

    Broker提供的消息发送的接口有:单条消息、批量消息、RETRY消息。Retry消息即consumer消费失败,要求broker重发的消息。失败的原因有两种,一种是业务端代码处理失败;还有一种是消息在consumer的缓存队列中待的时间超时,consumer会将消息从队列中移除,然后退回给Broker重发。前面在讲Consumer的时候已经讲过了,不再重复。
    下面看一下消息到达Broker后的处理过程。

    消息处理流程

    Producer或者consumer发送消息后,Broker通过SendMessageProcessor做接收和处理。一个消息的包可以只包含了一条消息,也可以包含多条消息。这两种的处理逻辑比较类似,我们只看下单条消息的源码。

    SendMessageProcessor

    对于单发消息是在Processor的sendMessage()方法中处理的。

    private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
                                            final RemotingCommand request,
                                            final SendMessageContext sendMessageContext,
                                            final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
    
            //1、构建Response的Header
            final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
            final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
    
            response.setOpaque(request.getOpaque());
    
            response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
            response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
    
            log.debug("receive SendMessage request command, {}", request);
            //2、判断当前时间broker是否提供服务,不提供则返回code为SYSTEM_ERROR的response
            final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
            if (this.brokerController.getMessageStore().now() < startTimstamp) {//broker还没开始提供接收消息服务
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
                return response;
            }
    
            response.setCode(-1);
            //3、检查topic和queue,如果不存在且broker设置中允许自动创建,则自动创建
            super.msgCheck(ctx, requestHeader, response); 
            if (response.getCode() != -1) {
                return response;
            }
    
            final byte[] body = request.getBody();
            
            int queueIdInt = requestHeader.getQueueId();
            //4、获取topic的配置
            TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
            //5、如果消息中的queueId小于0,则随机选取一个queue
            if (queueIdInt < 0) {
                queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
            }
            //6、重新封装request中的message成MessageExtBrokerInner
            MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
            msgInner.setTopic(requestHeader.getTopic());
            msgInner.setQueueId(queueIdInt);
            //7、对于RETRY消息,1)判断是否consumer还存在
            //  2)如果超过最大重发次数,尝试创建DLQ,并将topic设置成DeadQueue,消息将被存入死信队列
            if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
                return response;
            }
    
            msgInner.setBody(body);
            msgInner.setFlag(requestHeader.getFlag());
            MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
            msgInner.setPropertiesString(requestHeader.getProperties());
            msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
            msgInner.setBornHost(ctx.channel().remoteAddress());
            msgInner.setStoreHost(this.getStoreHost());
            msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
            PutMessageResult putMessageResult = null;
            Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
            String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
            if (traFlag != null && Boolean.parseBoolean(traFlag)) {
                //如果是事务消息
                if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                    response.setCode(ResponseCode.NO_PERMISSION);
                    response.setRemark(
                        "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                            + "] sending transaction message is forbidden");
                    return response;
                }
                putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
            } else {
                //8、调用MessageStore接口存储消息
                putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
            }
            //9、根据putResult设置repsonse状态,更新broker统计信息,成功则回复producer,更新context上下文
            return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
        }
    
    • 第3步,broker收到消息后可能存在topic不存在的情况,broker可以设置成自动创建topic,不如不支持自动创建,则返回错误。
    • 第7步,对于retry消息,这里会做两件事情:
      首先,会判断consumer是否还存在,因为consumer将消息返回给broker后,会设置一个延时时间,broker有一个定时任务在扫描到重发时间到了以后,会调用processor。所以要check一下调用的时候consumer还在不在。
      再次,消息重发是有次数限制的,默认是16次。这里会检查是否已经超过最大次数,超过的话会把消息的topic设置成并将topic设置成DeadQueue,这样后续处理中就会放入死信队列。
    • 第8步,调用MessageStore接口存储消息,这个我们后面详细讲
    • 第9步,根据存储状态,设置repsonse状态。这里有一点特殊的地方就是如果成功会写response给客户端,如果失败则由外层统一的错误处理逻辑处理。
    MessageStore保存消息

    调用DefaultMessageStore.putMessage()方法,篇幅原因,一些检查的逻辑代码就不贴了:

    public PutMessageResult putMessage(MessageExtBrokerInner msg) {
            ...
            //SLAVE不接收消息写入
            if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                long value = this.printTimes.getAndIncrement();
                if ((value % 50000) == 0) {
                    log.warn("message store is slave mode, so putMessage is forbidden ");
                }
    
                return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
            }
            ...
            ...
            //commit log timeout
            if (this.isOSPageCacheBusy()) {
                return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
            }
    
            long beginTime = this.getSystemClock().now();
            PutMessageResult result = this.commitLog.putMessage(msg);
    
            long eclipseTime = this.getSystemClock().now() - beginTime;
            ...
            //收集消息store时间
            this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
    
            if (null == result || !result.isOk()) {
                //记录失败次数
                this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
            }
    
            return result;
        }
    

    首先判断broker是否是master,并且master当前是可写的。然后判断commitLog上次flush的时候是否超时,如果超时则返回OS_PAGECACHE_BUSY的错误。最终调用commitLog.putMessage()方法保存消息。下面看下CommitLog的方法实现

    CommitLog保存消息
    public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
            // 1、Set the storage time
            msg.setStoreTimestamp(System.currentTimeMillis());
            // 2、Set the message body BODY CRC (consider the most appropriate setting
            // on the client)
            msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
            // Back to Results
            AppendMessageResult result = null;
    
            StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
    
            String topic = msg.getTopic();
            int queueId = msg.getQueueId();
    
            final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
            if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
                || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
                //非事务消息
                // Delay Delivery
                if (msg.getDelayTimeLevel() > 0) {
                    //3、延时投放消息,变更topic
                    if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                        msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                    }
    
                    topic = ScheduleMessageService.SCHEDULE_TOPIC;
                    queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
    
                    // Backup real topic, queueId
                    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                    msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
    
                    msg.setTopic(topic);
                    msg.setQueueId(queueId);
                }
            }
    
            long eclipseTimeInLock = 0;
            MappedFile unlockMappedFile = null;
            //4、获取当前正在写入文件
            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
            //5、获取写message的锁
            putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
            try {
                long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
                this.beginTimeInLock = beginLockTimestamp;//记录lock time
    
                // Here settings are stored timestamp, in order to ensure an orderly
                // global
                msg.setStoreTimestamp(beginLockTimestamp);
                //6、新建一个mapp file如果文件不存在或者文件已经写满
                if (null == mappedFile || mappedFile.isFull()) {
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
                }
                if (null == mappedFile) {
                    //文件创建失败,则返回错误
                    log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
                }
                //7、消息写文件
                result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                switch (result.getStatus()) {
                    case PUT_OK:
                        break;
                    case END_OF_FILE:
                        //8、如果文件已满,则新建一个文件继续
                        unlockMappedFile = mappedFile;
                        // Create a new file, re-write the message
                        mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                        if (null == mappedFile) {
                            log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                            beginTimeInLock = 0;
                            return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                        }
                        result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                        break;
                    case MESSAGE_SIZE_EXCEEDED:
                    case PROPERTIES_SIZE_EXCEEDED:
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
                    case UNKNOWN_ERROR:
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                    default:
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                }
    
                eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
                beginTimeInLock = 0;
            } finally {
                //9、释放第5)步中获取到的锁
                putMessageLock.unlock();
            }
    
            if (eclipseTimeInLock > 500) {//写消息时间过长
                log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
            }
            //unlock已经写满的文件,释放内存锁
            if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
                this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
            }
    
            PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
    
            // Statistics
            storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
            storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
            //10、flush数据到磁盘,分同步和异步
            handleDiskFlush(result, putMessageResult, msg);
            //11、如果broker设置成SYNC_MASTER,则等SLAVE接收到数据后才返回(接收到数据是指offset延后没有超过制定的字节数)
            handleHA(result, putMessageResult, msg);
    
            return putMessageResult;
        }
    
    • 第3步,判断是否是Consumer发来的Retry消息,如果是则修改topic为 SCHEDULE_TOPIC_XXXX,根据延时时长计算queueId。将原始的topic和queueId放到消息的properties字段中。这样这个消息只会被重发的Schedule任务读到。
    • 第4步,前面讲MessageStore的时候讲过,CommitLog是由连续的MappedFile的列表组成的。在同一时间,只有最后一个MappedFile有写入,因为之前的文件都已经写满了,所以这里是取最后一个。
    • 第6步,如果commitLog是第一次启动,或者文件size已经达到maxSize,则新建一个文件
    • 第7步,将消息内容写入MappedFile,这里会传一个callback的参数,真正的写入是在callback中实现的,后面我们再看这个实现
    • 第8步,在上一步中的appendMessage()接口中,如果文件剩余的空间已经不足以写下这条消息,则会用一个EOF消息补齐文件,然后返回一个EOF错误。在收到这个错误时,会新建一个文件,然后重写一次。
    • 第10步,用户可以使用MessageStoreFlushDiskType参数来控制数据flush到磁盘的方式,如果参数值SYNC_FLUSH,则每次写完消息都会做一次flush,完成才会返回结果。如果是ASYNC_FLUSH,只会唤醒flushCommitLogService,由它异步的去检查是否要做flush。
    • 第9步,Broker的主从数据同步也可以有两种方式,如果是SYNC_MASTER,则Master保存消息后,需要将消息同步给slave后才会返回结果。如果ASYNC_MASTER,这里不会做任何操作,由HAService的后台线程做数据同步。

    下面我们看下MappedFile写入消息的实现

    MappedFile消息写入
    public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
            assert messageExt != null;
            assert cb != null;
            //1、获取当前的write position
            int currentPos = this.wrotePosition.get();
     
            if (currentPos < this.fileSize) {
                //2、生成buffer切片
                ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
                byteBuffer.position(currentPos);
                AppendMessageResult result = null;
                if (messageExt instanceof MessageExtBrokerInner) {
                    //3、写单条消息到byteBuffer
                    result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
                } else if (messageExt instanceof MessageExtBatch) {
                    //3、批量消息到byteBuffer
                    result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
                } else {
                    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
                }
                //4、更新write position,到最新值
                this.wrotePosition.addAndGet(result.getWroteBytes());
                this.storeTimestamp = result.getStoreTimestamp();
                return result;
            }
            log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
            return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
        }
    

    消息写入首先是获取ByteBuffer,从第2步中可以发现有个判断,这里是因为MappedFile写数据到文件有两种实现方式:

    • FileChannel获取直接内存映射,收到消息后,将数据写入到这块内存中,内存和物理文件的数据交互由操作系统负责
    • CommitLog启动的时候初始化一块内存池(通过ByteBuffer申请的堆外内存),消息数据首先写入内存池中,然后后台有个线程定时将内存池中的数据commit到FileChannel中。这种方式只有MessageStore是ASYNC模式时才能开启。代码中if判断writeBuffer不为空的情况就是使用的这种写入方式。

    第3步,最终回调的Callback类将数据写入buffer中,消息的序列化也是在callback里面完成的

    序列化和保存

    这里调用的是DefaultAppendMessageCallback.doAppend()方法:

            public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
                final MessageExtBrokerInner msgInner) {
                // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
    
                //1、 PHY OFFSET ,消息偏移
                long wroteOffset = fileFromOffset + byteBuffer.position();//
    
                this.resetByteBuffer(hostHolder, 8);//重置host holder buffer
                //2、生成message ID, 前8位是host后8位是wroteOffset,目的是便于使用msgID来查找消息
                String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);
    
                //3、 Record ConsumeQueue information
                keyBuilder.setLength(0);
                keyBuilder.append(msgInner.getTopic());
                keyBuilder.append('-');
                keyBuilder.append(msgInner.getQueueId());
                String key = keyBuilder.toString();
                //4、取得具体Queue的offset,值是当前是Queue里的第几条消息
                Long queueOffset = CommitLog.this.topicQueueTable.get(key);
                if (null == queueOffset) {
                    //5、如果是这个queue的第一条消息,需要初始化
                    queueOffset = 0L;
                    CommitLog.this.topicQueueTable.put(key, queueOffset);
                }
    
                //6、 Transaction messages that require special handling
                final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
                switch (tranType) {
                    // Prepared and Rollback message is not consumed, will not enter the
                    // consumer queuec
                    // 事务消息
                    case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                    case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                        queueOffset = 0L;
                        break;
                    // 非事务消息
                    case MessageSysFlag.TRANSACTION_NOT_TYPE:
                    case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    default:
                        break;
                }
    
                /**
                 * Serialize message
                 */
                final byte[] propertiesData =
                    msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
    
                final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
    
                if (propertiesLength > Short.MAX_VALUE) {
                    log.warn("putMessage message properties length too long. length={}", propertiesData.length);
                    return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
                }
    
                final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
                final int topicLength = topicData.length;
    
                final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
                //7、计算消息实际存储长度
                final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);
    
                // Exceeds the maximum message
                if (msgLen > this.maxMessageSize) {
                    CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
                        + ", maxMessageSize: " + this.maxMessageSize);
                    return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
                }
    
                // Determines whether there is sufficient free space
                //8、 如果空间不足,magic code设置成EOF,然后剩余字节随机,保证所有文件大小都是FileSize
                if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
                    this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
                    // 1 TOTALSIZE
                    this.msgStoreItemMemory.putInt(maxBlank);
                    // 2 MAGICCODE
                    this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
                    // 3 The remaining space may be any value
                    // Here the length of the specially set maxBlank
                    final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
                    byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
                    return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
                        queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
                }
    
                // Initialization of storage space
                this.resetByteBuffer(msgStoreItemMemory, msgLen);
                // 1 TOTALSIZE
                this.msgStoreItemMemory.putInt(msgLen);
                // 2 MAGICCODE
                this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
                // 3 BODYCRC
                this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
                // 4 QUEUEID
                this.msgStoreItemMemory.putInt(msgInner.getQueueId());
                // 5 FLAG
                this.msgStoreItemMemory.putInt(msgInner.getFlag());
                // 6 QUEUEOFFSET
                this.msgStoreItemMemory.putLong(queueOffset);
                // 7 PHYSICALOFFSET
                this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
                // 8 SYSFLAG
                this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
                // 9 BORNTIMESTAMP
                this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
                // 10 BORNHOST
                this.resetByteBuffer(hostHolder, 8);
                this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));
                // 11 STORETIMESTAMP
                this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
                // 12 STOREHOSTADDRESS
                this.resetByteBuffer(hostHolder, 8);
                this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));
                //this.msgBatchMemory.put(msgInner.getStoreHostBytes());
                // 13 RECONSUMETIMES
                this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
                // 14 Prepared Transaction Offset
                this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
                // 15 BODY
                this.msgStoreItemMemory.putInt(bodyLength);
                if (bodyLength > 0)
                    this.msgStoreItemMemory.put(msgInner.getBody());
                // 16 TOPIC
                this.msgStoreItemMemory.put((byte) topicLength);
                this.msgStoreItemMemory.put(topicData);
                // 17 PROPERTIES
                this.msgStoreItemMemory.putShort((short) propertiesLength);
                if (propertiesLength > 0)
                    this.msgStoreItemMemory.put(propertiesData);
    
                final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
                // Write messages to the queue buffer
                byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
    
                AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
                    msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
    
                switch (tranType) {
                    case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                    case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                        break;
                    case MessageSysFlag.TRANSACTION_NOT_TYPE:
                    case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                        // The next update ConsumeQueue information
                        CommitLog.this.topicQueueTable.put(key, ++queueOffset); //Queue offset + 1
                        break;
                    default:
                        break;
                }
                return result;
            }
    

    doAppend()代码主要就是消息的序列化过程:
    第1步,计算消息在CommitLog的offset,之前的文章已经反复提到这个值了,可以看到这个偏移量是一个全局的值,而不是只针对这个文件。这里fileFromOffset就是文件第一条消息的offset,也即是文件名。
    第2步,生成message ID,可以看到这个id是由broker的host和offset拼接起来的,所以很容易根据id得到消息存在哪个broker的哪个文件中。根据id查询消息速度非常快。
    第3步,消息在queue中的偏移量,这里的偏移量值是消息在queue中的序号,从0开始。比如,queue中第10条消息offset就是9
    第8步,如果文件剩余的空间不足以存下这条消息,则剩余空间用一条EOF填充,然后返回EOF错误,前面已经讲过调用保存接口的地方收到这个错误会新起一个文件重新写入。
    第10步,这里就是按照commitLog的格式要求拼装写入bytebuffer了。

    总结

    一条消息从到达Broker到存到文件中的过程就结束了,之前关于消息存储说到的几个点再重复一下:

    1. 所有的消息在存储时都是按顺序存在一起的,不会按topic和queueId做物理隔离
    2. 每条消息存储时都会有一个offset,通过offset是定位到消息位置并获取消息详情的唯一办法,所有的消息查询操作最终都是转化成通过offset查询消息详情
    3. 每条消息存储前都会产生一个Message ID,通过这个id可以快速的得到消息存储的broker和它在CommitLog中的offset
    4. Broker收到消息后的处理线程只负责消息存储,不负责通知consumer或者其它逻辑,最大化消息吞吐量
    5. Broker返回成功不代表消息已经写入磁盘,如果对消息的可靠性要求高的话,可以将FlushDiskType设置成SYNC_FLUSH,这样每次收到消息写入文件后都会做flush操作。

    相关文章

      网友评论

          本文标题:RocketMQ源码解析(八)-Broker#消息发送

          本文链接:https://www.haomeiwen.com/subject/pdrldqtx.html