美文网首页
rocketMq broker对consumer的处理

rocketMq broker对consumer的处理

作者: 圣村的希望 | 来源:发表于2018-12-11 18:17 被阅读0次

      在broker中是会把消息存放在commitLog中的,在后台还会把消息的逻辑位置(类似索引)存放到consumeQueue中。所以在获取消息的时候,broker端是先读取consumeQueue中的消息逻辑位置,拿到offset后再去commitLog中获取消息返回给client端。

      在broker启动初始化的时候回去注册PullMessageProcessor来处理获取消息的请求,接下来从这个类来查看对拉取消息请求的处理:

    final GetMessageResult getMessageResult =
                this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                    requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
    

      在PullMessageProcessor中的processRequest可到,获取消息的请求处理也是交给了MessageStore来进行获取消息:

    //根据offset去ConsumeQueue中获取消息在commitLog中的逻辑位置
    SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
    
    //从commitLog中获取消息
    SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
    

      从DefaultMessageStore中很简单地就可以看出获取消息的大致逻辑,线程consumeQueue中获取逻辑消息位置然后再去commitLog中获取实际消息。

      ConsumeQueue的维护:在rocketMq中,每个MessageQueue都对应有一个consumeQueue,他类似于是消息的逻辑队列。

    public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
            final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
        ······
        //这个是一个后台线程,轮询把新写入的消息写入到consumeQueue中
        this.reputMessageService = new ReputMessageService();
    }
    
    class ReputMessageService extends ServiceThread {
        @Override
            public void run() {
                DefaultMessageStore.log.info(this.getServiceName() + " service started");
    
                while (!this.isStopped()) {
                    try {
                        Thread.sleep(1);
                        //新写入的消息添加到consumeQueue
                        this.doReput();
                    } catch (Exception e) {
                        DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
                    }
                }
    
                DefaultMessageStore.log.info(this.getServiceName() + " service end");
            }
    }
    
    private void doReput() {
        //从mappedFile中获取到新写入的消息
        SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
    
        DefaultMessageStore.this.doDispatch(dispatchRequest);
    }
    
    class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
    
            @Override
            public void dispatch(DispatchRequest request) {
                final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
                //prepared消息和rollback消息是不会往consumeQueue中存放的
                switch (tranType) {
                    case MessageSysFlag.TRANSACTION_NOT_TYPE:
                    case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                        DefaultMessageStore.this.putMessagePositionInfo(request);
                        break;
                    case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                    case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                        break;
                }
            }
        }
    
    public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
            //通过topic和queueId获取到对应的consumeQueue,如果没有就创建对应queueId的consumeQueue
            ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
            cq.putMessagePositionInfoWrapper(dispatchRequest);
    }
    
    1. 在DefaultMessageStore初始化的时候实例化了ReputMessageService,这是一个后台线程,主要靠它来进行后台轮询吧新写入到mappedFile中的消息添加到consumeQueue中。
    2. 后台线程不停滴从mappedFile中获取到消息,然后再把消息添加到consumeQueue中,但是也不是所有的消息都会添加到consumeQueue中,事务消息的prepared消息和rollback消息是不会被写入到consumeQueue中,所以事务消息再没有被commit的时候是不会被consumer消费的。
    3. 存放消息到consumeQueue中时,也是先去查看当前内存是否已经有对应的consumeQueue,没有就进行新建messageQueue对应的consumeQueue,用messageQueue的id作为key,consumeQueue作为value放到map中,然后把这个map添加到对应的topic下。所以这里可以看出MessageQueue和ConsumeQueue是一一对应的。
    //这个是DefaultMessageStore中的一个属性
    private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;
    
    

    相关文章

      网友评论

          本文标题:rocketMq broker对consumer的处理

          本文链接:https://www.haomeiwen.com/subject/irlehqtx.html