美文网首页
RocketMQ消息消费以及进度管理解析

RocketMQ消息消费以及进度管理解析

作者: 丑人林宗己 | 来源:发表于2019-06-07 16:40 被阅读0次

    最近 ONS 消息堆积的很严重,并且经常发现部分几乎没有消息消费的消费者也提示堆积,所以有必要深入了解一下
    RocketMQ 的设计思路,来看看堆积量如何计算,以及如何正确的使用 Topic 以及 Consumer 等组件。

    产生的问题背景在于,由于一开始对于RocketMQ不够了解,同时足够懒得原因,导致我们所有业务都仅适用了一个topic,所有业务线通过订阅不同的tag来进行消费,本次深入了解后将进行业务重构,以正确的姿势使用RocketMQ

    本次要排查的问题包括:
    1、消息拉取时模型,是否会将非该消费者消息的消息也拉取到客户端?
    2、如何计算堆积?

    问题1的本质问题是消息拉取的过滤模型在于客户端,还是在服务端?问题2的本质问题是消息如何存储计算?欲探究该问题则需要明确RocketMQ的底层存储模型设计,从顶层设计俯瞰消息队列整个框架。

    底层存储模型

    摘自RocketMQ技术内幕.png

    commitlog 是整个消息队列存储的核心文件,而consumerquque是逻辑消息队列,主要存储commitlog offset消息长度tag的hashcode,用于在消息消费时快速定位消息在commit log文件位置,便于读取消息。IndexFile俗称索引文件,主要存储消息key的hashcode以及commitlog offset,用于通过key快速定位到消息在commit log文件位置,便于读取消息。

    消息拉取模型分析

    找到问题1的答案之前,先思考消息队列投递时做了什么?

    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    // Specify name server addresses.
    producer.setNamesrvAddr("localhost:9876");
    //Launch the instance.
    producer.start();
    for (int i = 0; i < 100; i++) {
        //Create a message instance, specifying topic, tag and message body.
        Message msg = new Message("TopicTest" /* Topic */,
            "TagA" /* Tag */,
            ("Hello RocketMQ " +
                i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        );
        //Call send message to deliver message to one of brokers.
        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);
    }
    

    以上是代码是从官网的地址copy而来,虽简单但是从其中足以找到消息投递时所需要的基本条件包括namesrvAddrtopictag

    消息投递

    // DefaultProducerImpl#sendDefaultImpl()
    // 省略大部分代码,关键看备注部分
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());// 从本地缓存或namesrv远程读取topic信息
            if (topicPublishInfo != null && topicPublishInfo.ok()) {
                boolean callTimeout = false;
                MessageQueue mq = null;
                Exception exception = null;
                SendResult sendResult = null;
                int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
                int times = 0;
                String[] brokersSent = new String[timesTotal];
                for (; times < timesTotal; times++) {
                    String lastBrokerName = null == mq ? null : mq.getBrokerName();‘
                    // 根据某种策略选择一个逻辑消息队列
                    MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
    
    

    从文中可以看到,在消息投递的过程中,已经在客户端通过某种策略找到指定的topic下的逻辑队列,逻辑队列具体指的是consumerqueue文件,服务端对应的处理主要是写入,具体有兴趣可以了解SendMessageProcessor类,最终通过DefaultMessageStore实现了数据的写入,但是并未看到写入consumerqueue,因为实现consumerqueue文件写入是通过另外的线程实现的,具体实现请参考ReputMessageService,本文不再深入。

    我们主要知道,在客户端除了上传基本属性数据之外,同时还在客户端选择好了将要写入的逻辑消息队列。

    消息拉取

    消息的拉取在客户端就不进行赘述了,主要看服务端的实现。有兴趣可以了解PullMessageService#run()。服务端则重点查阅PullMessageProcessor#processRequest()

    
    MessageFilter messageFilter;
    if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
        messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
            this.brokerController.getConsumerFilterManager());
    } else {
             // 构建消息过滤
        messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
            this.brokerController.getConsumerFilterManager());
    }
    
    // 消息过滤的核心源码在ExpressionMessageFilter#isMatchedByConsumeQueue方法
    @Override
    public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
        if (null == subscriptionData) {
            return true;
        }
    
        if (subscriptionData.isClassFilterMode()) {
            return true;
        }
    
        // by tags code.
        if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
    
            if (tagsCode == null) {
                return true;
            }
    
            if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
                return true;
            }
                    // tagecode其实就是tag的hashcode
            return subscriptionData.getCodeSet().contains(tagsCode.intValue());
        }
      /// ....
    }
    
    
    // 接着PullMessageProcessor#processRequest()往下看
    final GetMessageResult getMessageResult =
                this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                    requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
    // 注意该消息读取的参数,包括topic, queueid, queueoffset, 已经消息最大条数
    
    // 通过DefaultMessageStore#getMessage()继续查看
    // 注意,这里的offset是queueoffset,而不是commitlog offset
    public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
        
        // ...
        // 查找consumerqueue
        ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
        
        // 
        SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
        if (bufferConsumeQueue != null) {
            try {
                status = GetMessageStatus.NO_MATCHED_MESSAGE;
    
                long nextPhyFileStartOffset = Long.MIN_VALUE;
                long maxPhyOffsetPulling = 0;
    
                int i = 0;
                final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
                final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                    long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
                    int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
                    long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
                                    /// .....
                                    // 消息匹配,这个对象由前文的MessageFilter定义
                    if (messageFilter != null
                        && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
                        if (getResult.getBufferTotalSize() == 0) {
                            status = GetMessageStatus.NO_MATCHED_MESSAGE;
                        }
    
                        continue; //不匹配的消息则继续往下来读取
                    }
     
                    SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);// offsetPy与sizePy查找commitlog上存储的消息内容
                    
      ///....
    }
    

    以上源码阅读完后,问题1 不攻自破,在服务端上过滤好消息,但是很明显,查阅完整地源码可以清晰地确定,并非是每一次拉取消息都可以过滤到自己想要的消息,即该消费者拉取消息时可能在某一个comsumerqueue上拉取不到消息,因为充斥着同一个topic下的其他tag的消息,也就意味着不是每次拉取都有意义,而阿里云ONS的计费上明显提示拉取消息是要计算费用的。

    消息堆积

    消息堆积意为着服务端要维护消息的消费进度。

    先来看一张图,图中的brokerOffset - consumerOffset = diffTotal, 而diffTotal就是指堆积量,而描述堆积量的指标是消息条数。

    image.png

    从commitlog中来看,由于存储了大量的消息文件,并且消息消费是非顺序消费,继而很难从commitlog中看出哪个
    哪个consumer堆积量。

    那么哪里可以描述清楚消息条数呢?先来深入了解Consumer Queue的设计

    ConsumerQueue

    consumerqueue的设计以topic作为逻辑分区,每个topic下分多个消息队列进行,具体多少消息队列存储参照broker的配置参数,队列名称以数组0开始,比如配置0,1,2,3 四个消息队列。

    配置参数请参考BrokerConfig,其中有一个参数private int defaultTopicQueueNums = 8;

    从语义上理解,堆积量应该指未被消费的存在broker上的消息数量,这是基本认知。

    commitlog存储着broker上所有的消息,设想一下如果每次要查询消息并消费需要从该文件遍历查询,性能之差可想
    而知,为了提高查询的消息,优先想到的是诸如MySQL上的索引设计。同理,consumerqueue的设计之初就是为了
    快速定位到对应的消费者可以消费的消息,当然RocketMQ也提供了indexfile,俗称索引文件,主要是解决通过key
    快速定位消息的方式。

    consumerqueue 消息结构

    摘自RocketMQ技术内幕.png

    consumerqueue的结构设计,在consumequeue的条目设计是固定的,并且它整好对应一条消息。consumerqueue单个文件默认是30w个条目,单个文件长度30w * 20字节。从文件的存储模型可以看出,consumerqueue存储维度是topic,并非是consumer。那么如何找到consumer的堆积量?

    假设

    假设一个topic对应一个consumertopic的堆积量即consumer的堆积量。从这个维度来推理,前文提到部分consumer是几乎没有消息,但是却提示消息堆积即合理,因为堆积的消息并非是该consumer的需要消费的消息,而是该consumerqueue对应的topic的堆积

    论证过程

    rocketmq console后台看到的消费者的堆积数量,看到AdminBrokerProcess#getConsumeStats()

    
    private RemotingCommand getConsumeStats(ChannelHandlerContext ctx,   
        // ...
        for (String topic : topics) {
            // ...
            for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
                MessageQueue mq = new MessageQueue();
                mq.setTopic(topic);
                mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
                mq.setQueueId(i);
    
                OffsetWrapper offsetWrapper = new OffsetWrapper();
                // 核心的问题在于要确定brokerOffset 以及consumerOffset的语义
                long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
                if (brokerOffset < 0)
                    brokerOffset = 0;
    
                long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(
                    requestHeader.getConsumerGroup(),
                    topic,
                    i);
                if (consumerOffset < 0)
                    consumerOffset = 0;
    
        // ....
    }
    
    // 队列最大索引
    public long getMaxOffsetInQueue(String topic, int queueId) {
        ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
        if (logic != null) {
            long offset = logic.getMaxOffsetInQueue();
            return offset;
        }
        return 0;
    }
    public long getMaxOffsetInQueue() {
        return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE;
      // 总的逻辑偏移量 / 20 = 总的消息条数
    }
    
    public static final int CQ_STORE_UNIT_SIZE = 20;// 前文提到每个条目固定20个字节
    
    // 当前消费者的消费进度
    long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(),topic,i);
    if (consumerOffset < 0)
        consumerOffset = 0;
        public long queryOffset(final String group, final String topic, final int queueId) {
        // topic@group
        String key = topic + TOPIC_GROUP_SEPARATOR + group;
        ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);// 从offsetTable中读取
        if (null != map) {
            Long offset = map.get(queueId);
            if (offset != null)
                return offset;
        }
        return -1;
    }
    

    核心的问题在于从offset缓存中读取出来的,那么offset的数据 又是哪里来的?

    
    // 通过IDE快速可以很快找到如下代码
    @Override
    public String configFilePath() {
        return
        BrokerPathConfigHelper.getConsumerOffsetPath(this.brokerController.getMessageStoreConfig().
            getStorePathRootDir());
    }
    @Override
    public void decode(String jsonString) {
        if (jsonString != null) {
            ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString,ConsumerOffsetManager.class);
            if (obj != null) {
                this.offsetTable = obj.offsetTable;
            }
        }
    }
    public static String getConsumerOffsetPath(final String rootDir) {
        return rootDir + File.separator + "config" + File.separator + "consumerOffset.json";
    }
    
    

    也就是说offset的数据是从json文件中加载进来的。

    image.png

    这个文件描述的是topic与消费者的关系,每一个队列对应的消费进度。但是消费是实时更新的,所以必须实时更新消费进度,消费进度的更新是从消息的拉取得到的。

    DefaultStoreMessage

    前文看过该类的部分代码,主要是拉取的部分,这里补充拉取时的offset的值得语义。

    
    ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
    for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
        long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
        int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
        long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
    
        // ...
            // offsetPy 是commitlog的逻辑偏移量
        SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
        if (null == selectResult) {
            if (getResult.getBufferTotalSize() == 0) {
                status = GetMessageStatus.MESSAGE_WAS_REMOVING;
            }
    
            nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
            continue;
        }
            // 消息过滤
        if (messageFilter != null
            && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
            if (getResult.getBufferTotalSize() == 0) {
                status = GetMessageStatus.NO_MATCHED_MESSAGE;
            }
            // release...
            selectResult.release();
            continue;
        }
        // ....
    }
    
    // ...
    //
    // 计算下一次开始的offset,是前文的offset
    // i 是ConsumeQueue.CQ_STORE_UNIT_SIZE的倍数
    // ConsumeQueue.CQ_STORE_UNIT_SIZE是每一条consumerqueue中的条目的大小,20字节
    nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
    
    long diff = maxOffsetPy - maxPhyOffsetPulling;
    long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
        * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
    getResult.setSuggestPullingFromSlave(diff > memory);
    
    

    看到此处,可以明确消费者拉取消息时的nextBeginOffset就是consumerqueue的偏移量/20,意味着类似下标数组index
    到此处还要再确认拉取的这个消费进度是不是会更新到到offsetTable?核心看RemoteBrokerOffsetStore

    消息消费

    贴几张图简单了解客户端上报消费进度的过程

    image.png image.png image.png

    至此,可以看到堆积量的实际是根据topic来算,按照前文最开始的假设推断其实是成立的,那么现在那些没有消息堆积的消息为何还会显示堆积就可以理解了。

    总结

    消息消费属于服务端过滤模式,不过其实还要其他的消息过滤模式,只是本文并未提及(Class)。但是由于topic使用的不合理导致消息可能存在拉取不到数据,但是ONS是计算收费的。同时消息的堆积意义明朗,那么使用RocketMQ的姿势也就不言而喻,按照业务合理使用topic以及tag等。

    参考资料

    源码:https://github.com/apache/rocketmq
    官网:http://rocketmq.apache.org/docs/rmq-deployment/
    书籍:《RocketMQ技术内幕》,特别推荐该书,让你对RocketMQ的架构设计,代码有更深的了解

    相关文章

      网友评论

          本文标题:RocketMQ消息消费以及进度管理解析

          本文链接:https://www.haomeiwen.com/subject/mdwmxctx.html