美文网首页
rocketmq源码11-broker消息消费流程

rocketmq源码11-broker消息消费流程

作者: modou1618 | 来源:发表于2019-02-27 22:17 被阅读0次

    一 总体流程

    总体流程.png

    二 broker消息消费处理

    2.1 PullMessageProcessor.processRequest()

    2.1.1 请求报文解析,响应报文构建

    • 设置报文id,表示响应的是哪个请求报文
    RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
    final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
    final PullMessageRequestHeader requestHeader =
        (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
    
    response.setOpaque(request.getOpaque());
    

    2.1.1.1 PullMessageRequestHeader

    • 解析请求报文头
    public class PullMessageRequestHeader implements CommandCustomHeader {
        @CFNotNull
        private String consumerGroup;//消费组
        @CFNotNull
        private String topic;//
        @CFNotNull
        private Integer queueId;//mq id
        @CFNotNull
        private Long queueOffset;//消费起始偏移
        @CFNotNull
        private Integer maxMsgNums;//最大消息数量,32
        @CFNotNull
    /*
        private final static int FLAG_COMMIT_OFFSET = 0x1 << 0;//pull消息时提交偏移
        private final static int FLAG_SUSPEND = 0x1 << 1;
        private final static int FLAG_SUBSCRIPTION = 0x1 << 2;//pull消息时提交消费配置
        private final static int FLAG_CLASS_FILTER = 0x1 << 3;
    */
        private Integer sysFlag;//消费标记
        @CFNotNull
        private Long commitOffset;//已消费的偏移位置
        @CFNotNull
        private Long suspendTimeoutMillis;
        @CFNullable
        private String subscription;
        @CFNotNull
        private Long subVersion;
        private String expressionType;
    }
    

    2.1.1.2 PullMessageResponseHeader

    public class SubscriptionData implements Comparable<SubscriptionData> {
        public final static String SUB_ALL = "*";
        private boolean classFilterMode = false;//过滤模式
        private String topic;
        private String subString;//消费的tag组装的串
        private Set<String> tagsSet = new HashSet<String>();//tag集合
        private Set<Integer> codeSet = new HashSet<Integer>();//tag的hash值的集合
        private long subVersion = System.currentTimeMillis();
        private String expressionType;//substring类型
        private String filterClassSource;//filter server使用
    }
    

    2.1.1.3 PullMessageResponseHeader

    • 构建响应报文头
    public class PullMessageResponseHeader implements CommandCustomHeader {
        @CFNotNull
        private Long suggestWhichBrokerId;
        @CFNotNull
        private Long nextBeginOffset;
        @CFNotNull
        private Long minOffset;
        @CFNotNull
        private Long maxOffset;
    }
    

    2.1.2 参数检查

    • broker可读性检查
    if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
        return response;
    }
    
    • topic消费组配置检查,获取消费组配置,检查是否是能允许消费。
    SubscriptionGroupConfig subscriptionGroupConfig =
        this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
    if (null == subscriptionGroupConfig) {
        response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
        response.setRemark(String.format("subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));
        return response;
    }
    
    if (!subscriptionGroupConfig.isConsumeEnable()) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());
        return response;
    }
    
    • 获取消费标记
    final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());
    final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());
    final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());
    
    final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;
    
    • 获取topic配置,检查是否在本broker上可读
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
    if (null == topicConfig) {
        log.error("the topic {} not exist, consumer: {}", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
        response.setCode(ResponseCode.TOPIC_NOT_EXIST);
        response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
        return response;
    }
    
    if (!PermName.isReadable(topicConfig.getPerm())) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");
        return response;
    }
    
    • 检查请求头queue id有效性,在1-读队列数之间
    if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
        String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
            requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());
        log.warn(errorInfo);
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark(errorInfo);
        return response;
    }
    
    • 消费配置,配置了过滤tag,则组装过滤结构ConsumerFilterData
    if (hasSubscriptionFlag) {
        try {
            subscriptionData = FilterAPI.build(
                requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType()
            );
            if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
                consumerFilterData = ConsumerFilterManager.build(
                    requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(),
                    requestHeader.getExpressionType(), requestHeader.getSubVersion()
                );
                assert consumerFilterData != null;
            }
        } catch (Exception e) {
            log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(),
                requestHeader.getConsumerGroup());
            response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
            response.setRemark("parse the consumer's subscription failed");
            return response;
        }
    }
    
    • 无消息过滤配置
      获取服务端消费组信息
      服务端是广播模式,客户端消费组配置不支持广播则返回错误
      获取服务端消费信息,请求版本低于服务端版本则返回错误
      非tag方式过滤,则检查过滤配置的版本
    else {
        ConsumerGroupInfo consumerGroupInfo =
            this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
        if (null == consumerGroupInfo) {
            log.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());
            response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
            response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
            return response;
        }
    
        if (!subscriptionGroupConfig.isConsumeBroadcastEnable()
            && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
            return response;
        }
    
        subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
        if (null == subscriptionData) {
            log.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
            response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
            response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
            return response;
        }
    
        if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
            log.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
                subscriptionData.getSubString());
            response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
            response.setRemark("the consumer's subscription not latest");
            return response;
        }
        if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
            consumerFilterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(),
                requestHeader.getConsumerGroup());
            if (consumerFilterData == null) {
                response.setCode(ResponseCode.FILTER_DATA_NOT_EXIST);
                response.setRemark("The broker's consumer filter data is not exist!Your expression may be wrong!");
                return response;
            }
            if (consumerFilterData.getClientVersion() < requestHeader.getSubVersion()) {
                log.warn("The broker's consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}",
                    requestHeader.getConsumerGroup(), requestHeader.getTopic(), consumerFilterData.getClientVersion(), requestHeader.getSubVersion());
                response.setCode(ResponseCode.FILTER_DATA_NOT_LATEST);
                response.setRemark("the consumer's consumer filter data not latest");
                return response;
            }
        }
    }
    
    • 非tag方式过滤需要broker支持属性过滤
    if (!ExpressionType.isTagType(subscriptionData.getExpressionType())
        && !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType());
        return response;
    }
    

    2.1.3 初始化MessageFilter

    MessageFilter messageFilter;
    if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
        messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
            this.brokerController.getConsumerFilterManager());
    } else {
        messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
            this.brokerController.getConsumerFilterManager());
    }
    

    2.1.4 获取消息

    final GetMessageResult getMessageResult =
        this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
            requestHeader.getQueueId(), re
    

    2.1.5 消息处理

    • 记录响应状态描述信息
    response.setRemark(getMessageResult.getStatus().name());
    
    • 响应头记录下次消费起始偏移,本次消息最小偏移,最大偏移
    responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
    responseHeader.setMinOffset(getMessageResult.getMinOffset());
    responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
    
    • 支持从服务端读消息,则在消费缓慢时配置建议下次消费的broker id
    if (getMessageResult.isSuggestPullingFromSlave()) {
        responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
    } else {
        responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
    }
    
    • 根据服务端角色,读配置,消费速度,配置建议的下次消费的broker id
    switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
        case ASYNC_MASTER:
        case SYNC_MASTER:
            break;
        case SLAVE:
            if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
                response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
                responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
            }
            break;
    }
    if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
        // consume too slow ,redirect to another machine
        if (getMessageResult.isSuggestPullingFromSlave()) {
            responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
        }
        // consume ok
        else {
            responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
        }
    } else {
        responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
    }
    
    • 设置响应码
    switch (getMessageResult.getStatus()) {
        case FOUND:
            response.setCode(ResponseCode.SUCCESS);
            break;
        case MESSAGE_WAS_REMOVING:
            response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
            break;
        case NO_MATCHED_LOGIC_QUEUE:
        case NO_MESSAGE_IN_QUEUE:
            if (0 != requestHeader.getQueueOffset()) {
                response.setCode(ResponseCode.PULL_OFFSET_MOVED);
    
                // XXX: warn and notify me
                log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}",
                    requestHeader.getQueueOffset(),
                    getMessageResult.getNextBeginOffset(),
                    requestHeader.getTopic(),
                    requestHeader.getQueueId(),
                    requestHeader.getConsumerGroup()
                );
            } else {
                response.setCode(ResponseCode.PULL_NOT_FOUND);
            }
            break;
        case NO_MATCHED_MESSAGE:
            response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
            break;
        case OFFSET_FOUND_NULL:
            response.setCode(ResponseCode.PULL_NOT_FOUND);
            break;
        case OFFSET_OVERFLOW_BADLY:
            response.setCode(ResponseCode.PULL_OFFSET_MOVED);
            // XXX: warn and notify me
            log.info("the request offset: {} over flow badly, broker max offset: {}, consumer: {}",
                requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());
            break;
        case OFFSET_OVERFLOW_ONE:
            response.setCode(ResponseCode.PULL_NOT_FOUND);
            break;
        case OFFSET_TOO_SMALL:
            response.setCode(ResponseCode.PULL_OFFSET_MOVED);
            log.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
                requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
                getMessageResult.getMinOffset(), channel.remoteAddress());
            break;
        default:
            assert false;
            break;
    }
    
    • 成功,堆内存传输则读取数据写入响应消息中
      非堆内存数据,则netty 0拷贝方式,直接从netty channel发送响应数据
    switch (response.getCode()) {
    case ResponseCode.SUCCESS:
    
        this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
            getMessageResult.getMessageCount());
    
        this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
            getMessageResult.getBufferTotalSize());
    
        this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
        if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
            final long beginTimeMills = this.brokerController.getMessageStore().now();
            final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
            this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
                requestHeader.getTopic(), requestHeader.getQueueId(),
                (int) (this.brokerController.getMessageStore().now() - beginTimeMills));
            response.setBody(r);
        } else {
            try {
                FileRegion fileRegion =
                    new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
                channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        getMessageResult.release();
                        if (!future.isSuccess()) {
                            log.error("transfer many message by pagecache failed, {}", channel.remoteAddress(), future.cause());
                        }
                    }
                });
            } catch (Throwable e) {
                log.error("transfer many message by pagecache exception", e);
                getMessageResult.release();
            }
    
            response = null;
        }
        break;
    
    • 无消息可消费,若支持longpulling,则构建PullRequest,等待新消息生产到达后可立即消费。
    case ResponseCode.PULL_NOT_FOUND:
    
    if (brokerAllowSuspend && hasSuspendFlag) {
        long pollingTimeMills = suspendTimeoutMillisLong;
        if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
            pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
        }
    
        String topic = requestHeader.getTopic();
        long offset = requestHeader.getQueueOffset();
        int queueId = requestHeader.getQueueId();
        PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
            this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
        this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
        response = null;
        break;
    }
    

    2.1.6 提交消费偏移

    服务端允许提交消费偏移
    客户端请求提交消费偏移
    主服务端才可以提交消费偏移
    ConsumerOffsetManager执行消费偏移持久化

    boolean storeOffsetEnable = brokerAllowSuspend;
    storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
    storeOffsetEnable = storeOffsetEnable
        && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
    if (storeOffsetEnable) {
        this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
            requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
    }
    
    • 偏移删除错误,则生成校验事件消息触发校验
    case ResponseCode.PULL_OFFSET_MOVED:
        if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE
            || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
            MessageQueue mq = new MessageQueue();
            mq.setTopic(requestHeader.getTopic());
            mq.setQueueId(requestHeader.getQueueId());
            mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
    
            OffsetMovedEvent event = new OffsetMovedEvent();
            event.setConsumerGroup(requestHeader.getConsumerGroup());
            event.setMessageQueue(mq);
            event.setOffsetRequest(requestHeader.getQueueOffset());
            event.setOffsetNew(getMessageResult.getNextBeginOffset());
            this.generateOffsetMovedEvent(event);
            log.warn(
                "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
                requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
                responseHeader.getSuggestWhichBrokerId());
        } else {
            responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
            response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
            log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",
                requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
                responseHeader.getSuggestWhichBrokerId());
        }
    
        break;
    

    2.2 存储层获取消息

    messageStore.getMessage

    • 获取commitLog的最大偏移量
    final long maxOffsetPy = this.commitLog.getMaxOffset();
    
    • 根据topic和queueid获取消费队列
    ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
    
    • 获取消费队列中的最大索引偏移和最小索引偏移
    minOffset = consumeQueue.getMinOffsetInQueue();
    
    public long getMinOffsetInQueue() {
        return this.minLogicOffset / CQ_STORE_UNIT_SIZE;
    }
    
    maxOffset = consumeQueue.getMaxOffsetInQueue();
    
    public long getMaxOffsetInQueue() {
        return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE;
    }
    //最后一个consumerQueue的mappedfile的文件名+可读记录的偏移量即已持久化或提交持久化的消息偏移
    public long getMaxOffset() {
        MappedFile mappedFile = getLastMappedFile();
        if (mappedFile != null) {
            return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();
        }
        return 0;
    }
    
    • 取消息的偏移索引不在偏移索引范围内,则修正下次读消息的偏移索引
    if (maxOffset == 0) {
        status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
        nextBeginOffset = nextOffsetCorrection(offset, 0);
    } else if (offset < minOffset) {
        status = GetMessageStatus.OFFSET_TOO_SMALL;
        nextBeginOffset = nextOffsetCorrection(offset, minOffset);
    } else if (offset == maxOffset) {
        status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
        nextBeginOffset = nextOffsetCorrection(offset, offset);
    } else if (offset > maxOffset) {
        status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
        if (0 == minOffset) {
            nextBeginOffset = nextOffsetCorrection(offset, minOffset);
        } else {
            nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
        }
    } 
    
    • 偏移索引在范围内,获取消费队列中消息所在的mappedfile,截取可消费消息对应的文件映射内存
    SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
    
    public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
        int mappedFileSize = this.mappedFileSize;
    //索引*长度得到消费消息的逻辑偏移
        long offset = startIndex * CQ_STORE_UNIT_SIZE;
        if (offset >= this.getMinLogicOffset()) {
    //获取消息所在mappedfile文件,通过偏移计算是第几个mappedfile,链表根据索引获取目标mappedfile
            MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
            if (mappedFile != null) {
    //从文件映射内存中代理一段内存,从消费偏移开始到已提交持久化的偏移位置截止
    //未持久化的消费队列的消息不可被消费
                SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
                return result;
            }
        }
        return null;
    }
    
    • 遍历消息,获取消息的commitlog物理偏移,大小,tag
    for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                    long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
                    int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
                    long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
    ...
    
    • 检查请求消费的消息是否已不在内存中,仅保存在磁盘上
    boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
    
    private boolean checkInDiskByCommitOffset(long offsetPy, long maxOffsetPy) {
        long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
        return (maxOffsetPy - offsetPy) > memory;
    }
    
    • 一次性消费的消息数达到上限则跳出循环
    if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
        isInDisk)) {
        break;
    }
    
    • 消费的消息过滤无需消费的消息
    boolean extRet = false, isTagsCodeLegal = true;
    if (consumeQueue.isExtAddr(tagsCode)) {
        extRet = consumeQueue.getExt(tagsCode, cqExtUnit);
        if (extRet) {
            tagsCode = cqExtUnit.getTagsCode();
        } else {
            // can't find ext content.Client will filter messages by tag also.
            log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}",
                tagsCode, offsetPy, sizePy, topic, group);
            isTagsCodeLegal = false;
        }
    }
    
    if (messageFilter != null
        && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
        if (getResult.getBufferTotalSize() == 0) {
            status = GetMessageStatus.NO_MATCHED_MESSAGE;
        }
    
        continue;
    }
    
    • 根据消息commitlog物理偏移和大小获取实际的消息
    SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
    
    • 计算下次消费的偏移索引
    nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
    
    • 消息已超过内存存储量,则建议从节点消费消息
    long diff = maxOffsetPy - maxPhyOffsetPulling;
    long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
        * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
    getResult.setSuggestPullingFromSlave(diff > memory);
    

    2.3 消费偏移持久化

    ConsumerOffsetManager

    • 保存消费偏移
    public void commitOffset(final String clientHost, final String group, final String topic, final int queueId,
        final long offset) {
        // topic@group
        String key = topic + TOPIC_GROUP_SEPARATOR + group;
        this.commitOffset(clientHost, key, queueId, offset);
    }
    
    private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
        ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
        if (null == map) {
            map = new ConcurrentHashMap<Integer, Long>(32);
            map.put(queueId, offset);
            this.offsetTable.put(key, map);
        } else {
            Long storeOffset = map.put(queueId, offset);
            if (storeOffset != null && offset < storeOffset) {
                log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
            }
        }
    }
    
    • 父类ConfigManager支持定时持久化到文件中和启动时从文件加载

    2.4 long polling支持

    PullRequestHoldService

    • 无消息时,保存拉消息请求
    public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
        String key = this.buildKey(topic, queueId);
        ManyPullRequest mpr = this.pullRequestTable.get(key);
        if (null == mpr) {
            mpr = new ManyPullRequest();
            ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
            if (prev != null) {
                mpr = prev;
            }
        }
    
        mpr.addPullRequest(pullRequest);
    }
    
    • 线程周期行检查long polling请求,若有消息则响应消费端
    public void run() {
        log.info("{} service started", this.getServiceName());
        while (!this.isStopped()) {
            try {
                if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                    this.waitForRunning(5 * 1000);
                } else {
                    this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
                }
    
                long beginLockTimestamp = this.systemClock.now();
                this.checkHoldRequest();
                long costTime = this.systemClock.now() - beginLockTimestamp;
                if (costTime > 5 * 1000) {
                    log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
                }
            } catch (Throwable e) {
                log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }
    
        log.info("{} service end", this.getServiceName());
    }
    
    • 响应消费端
    public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
        long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
        String key = this.buildKey(topic, queueId);
    //获取存储未响应的消费请求
        ManyPullRequest mpr = this.pullRequestTable.get(key);
        if (mpr != null) {
            List<PullRequest> requestList = mpr.cloneListAndClear();
            if (requestList != null) {
                List<PullRequest> replayList = new ArrayList<PullRequest>();
    
                for (PullRequest request : requestList) {
                    long newestOffset = maxOffset;
    //检查生产的消息是否超过请求消费的偏移
                    if (newestOffset <= request.getPullFromThisOffset()) {
                        newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                    }
    //检查消息是否匹配消费过滤条件
                    if (newestOffset > request.getPullFromThisOffset()) {
                        boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
                            new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
                        // match by bit map, need eval again when properties is not null.
                        if (match && properties != null) {
                            match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
                        }
    
                        if (match) {
                            try {
                                this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                    request.getRequestCommand());
                            } catch (Throwable e) {
                                log.error("execute request when wakeup failed.", e);
                            }
                            continue;
                        }
                    }
    //longpolling请求超时,则也触发消费请求响应处理,若无消息则丢弃longpulling请求
                    if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                        try {
                            this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                request.getRequestCommand());
                        } catch (Throwable e) {
                            log.error("execute request when wakeup failed.", e);
                        }
                        continue;
                    }
    
                    replayList.add(request);
                }
    
                if (!replayList.isEmpty()) {
                    mpr.addPullRequest(replayList);
                }
            }
        }
    }
    
    • 异步线程处理longpolling消费请求响应,超时或有生产消息到达。
    public void executeRequestWhenWakeup(final Channel channel,
        final RemotingCommand request) throws RemotingCommandException {
        Runnable run = new Runnable() {
            @Override
            public void run() {
                try {
                    final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);//处理消费请求
    
                    if (response != null) {//有消息可消费,则channel响应
                        response.setOpaque(request.getOpaque());
                        response.markResponseType();
                        try {
                            channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
                                @Override
                                public void operationComplete(ChannelFuture future) throws Exception {
                                    if (!future.isSuccess()) {
                                        log.error("processRequestWrapper response to {} failed",
                                            future.channel().remoteAddress(), future.cause());
                                        log.error(request.toString());
                                        log.error(response.toString());
                                    }
                                }
                            });
                        } catch (Throwable e) {
                            log.error("processRequestWrapper process request over, but response failed", e);
                            log.error(request.toString());
                            log.error(response.toString());
                        }
                    }
                } catch (RemotingCommandException e1) {
                    log.error("excuteRequestWhenWakeup run", e1);
                }
            }
        };
        this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));
    }
    

    2.5 偏移校验

    • 创建一个消息,发送到内部topic OFFSET_MOVED_EVENT。
    private void generateOffsetMovedEvent(final OffsetMovedEvent event) {
        try {
            MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
            msgInner.setTopic(MixAll.OFFSET_MOVED_EVENT);
            msgInner.setTags(event.getConsumerGroup());
            msgInner.setDelayTimeLevel(0);
            msgInner.setKeys(event.getConsumerGroup());
            msgInner.setBody(event.encode());
            msgInner.setFlag(0);
            msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
            msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(TopicFilterType.SINGLE_TAG, msgInner.getTags()));
    
            msgInner.setQueueId(0);
            msgInner.setSysFlag(0);
            msgInner.setBornTimestamp(System.currentTimeMillis());
            msgInner.setBornHost(RemotingUtil.string2SocketAddress(this.brokerController.getBrokerAddr()));
            msgInner.setStoreHost(msgInner.getBornHost());
    
            msgInner.setReconsumeTimes(0);
    
            PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
        } catch (Exception e) {
            log.warn(String.format("generateOffsetMovedEvent Exception, %s", event.toString()), e);
        }
    }
    

    相关文章

      网友评论

          本文标题:rocketmq源码11-broker消息消费流程

          本文链接:https://www.haomeiwen.com/subject/ouyquqtx.html