美文网首页
rocketmq源码6-客户端-pull消费者

rocketmq源码6-客户端-pull消费者

作者: modou1618 | 来源:发表于2019-01-27 13:22 被阅读0次

    一 DefaultMQPullConsumer

    • 消费组
      String consumerGroup;
    • Long polling模式(即消费时没有消息时阻塞在broker端,等有消息可消费时再响应),消费端连接最长等待时间
      long brokerSuspendMaxTimeMillis = 1000 * 20;
    • Long polling模式,消费端连接超时时间
      long consumerTimeoutMillisWhenSuspend = 1000 * 30;
    • 非long polling模式,socket连接超时时间。
      long consumerPullTimeoutMillis = 1000 * 10;
    • 消费模式,默认集群模式,即一个消费组内的实例只有一个实例会消费一次。广播模式是所有实例各消费一次,无消费组概念。
      MessageModel messageModel = MessageModel.CLUSTERING;
    • 消息队列变更监听回调函数。
      rebalanceService重新按负载均衡策略分配消费队列时,若队列有变更则触发。
    MessageQueueListener messageQueueListener;
    
    public interface MessageQueueListener {
     messageQueueChanged(final String topic, final Set<MessageQueue> mqAll,
            final Set<MessageQueue> mqDivided);
    }
    
    • 消费偏移读写服务
      OffsetStore offsetStore;
    • 注册PullTaskCallback回调的topic集合。
      Set<String> registerTopics = new HashSet<String>();
    public void registerMessageQueueListener(String topic, MessageQueueListener listener) {
        synchronized (this.registerTopics) {
            this.registerTopics.add(topic);
            if (listener != null) {
                this.messageQueueListener = listener;
            }
        }
    }
    
    public void registerPullTaskCallback(final String topic, final PullTaskCallback callback) {
        this.callbackTable.put(topic, callback);
        this.defaultMQPullConsumer.registerMessageQueueListener(topic, null);
    }
    
    • MessageQueue消费负载均衡策略,默认平均分配
      AllocateMessageQueueStrategy allocateMessageQueueStrategy
    • 单元模式boolean unitMode = false;
    • 重试队列最大重试消费次数,达到后仍未消费则放入死信队列
      int maxReconsumeTimes = 16;

    1.1 OffsetStore

    • 初始化MessageQueue消费偏移读写服务
    if (this.defaultMQPullConsumer.getOffsetStore() != null) {
        this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();
    } else {
        switch (this.defaultMQPullConsumer.getMessageModel()) {
            case BROADCASTING:
                this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
                break;
            case CLUSTERING:
                this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
                break;
            default:
                break;
        }
        this.defaultMQPullConsumer.setOffsetStore(this.offsetStore);
    }
    //启动时从持久化数据加载消费偏移到内存中
    this.offsetStore.load();
    
    • 广播模式消费,则每个实例使用LocalFileOffsetStore本地文件持久化存储消费偏移
    • 集群模式消费,则在broker持久化每个消费组的消费偏移

    1.1.1 LocalFileOffsetStore

    • 持久化存储文件路径
      String storePath;
    • 内存存储的各MessageQueue的消费偏移,ConcurrentMap和AtomicLong控制并发。
      ConcurrentMap<MessageQueue, AtomicLong> offsetTable

    1.1.1.1 load()

    • 初始化时,本地文件加载消费偏移量
    public void load() throws MQClientException {
        OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
        if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
            offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
    
            for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {
                AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
                log.info("load consumer's offset, {} {} {}",
                    this.groupName,
                    mq,
                    offset.get());
            }
        }
    }
    

    1.1.1.2 readOffset

    • 根据读取类型,从内存或从文件缓存中读取消费便宜量
    • 从文件读取后,更新内存中的消费偏移量
    public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
        if (mq != null) {
            switch (type) {
                case MEMORY_FIRST_THEN_STORE:
                case READ_FROM_MEMORY: {
                    AtomicLong offset = this.offsetTable.get(mq);
                    if (offset != null) {
                        return offset.get();
                    } else if (ReadOffsetType.READ_FROM_MEMORY == type) {
                        return -1;
                    }
                }
                case READ_FROM_STORE: {
                    OffsetSerializeWrapper offsetSerializeWrapper;
                    try {
                        offsetSerializeWrapper = this.readLocalOffset();
                    } catch (MQClientException e) {
                        return -1;
                    }
                    if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
                        AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
                        if (offset != null) {
                            this.updateOffset(mq, offset.get(), false);
                            return offset.get();
                        }
                    }
                }
                default:
                    break;
            }
        }
    
        return -1;
    }
    

    1.1.1.3 updateOffset

    • 两种方式更新消费偏移量,一种是增量更改,一种是全量更改。
    public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
        if (mq != null) {
            AtomicLong offsetOld = this.offsetTable.get(mq);
            if (null == offsetOld) {
                offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
            }
    
            if (null != offsetOld) {
                if (increaseOnly) {
                    MixAll.compareAndIncreaseOnly(offsetOld, offset);
                } else {
                    offsetOld.set(offset);
                }
            }
        }
    }
    
    

    1.1.1.4 persistAll

    • 持久化内存缓存的消费偏移到文件缓存中
    • 通过定时任务周期调用
    public void persistAll(Set<MessageQueue> mqs) {
        if (null == mqs || mqs.isEmpty())
            return;
    
        OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
        for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
            if (mqs.contains(entry.getKey())) {
                AtomicLong offset = entry.getValue();
                offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
            }
        }
    
        String jsonString = offsetSerializeWrapper.toJson(true);
        if (jsonString != null) {
            try {
                MixAll.string2File(jsonString, this.storePath);
            } catch (IOException e) {
                log.error("persistAll consumer offset Exception, " + this.storePath, e);
            }
        }
    }
    

    1.1.2 RemoteBrokerOffsetStore

    • 功能和LocalFileOffsetStore相同
    • 持久化时,从本地文件缓存持久化改为broker持久化

    1.1.2.1 updateConsumeOffsetToBroker

    • 查找MessageQueue所在broker地址信息,不存在则从namesrv更新topic路由信息后再重新获取
    • updateTopicRouteInfoFromNameServer向目标broker更新消费偏移。
    public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
        MQBrokerException, InterruptedException, MQClientException {
        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
        if (null == findBrokerResult) {
    
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
        }
    
        if (findBrokerResult != null) {
            UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
            requestHeader.setTopic(mq.getTopic());
            requestHeader.setConsumerGroup(this.groupName);
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setCommitOffset(offset);
    
            if (isOneway) {
                this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
                    findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
            } else {
                this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
                    findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
            }
        } else {
            throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
        }
    }
    

    1.1.2.2 fetchConsumeOffsetFromBroker

    • 获取mq所在broker地址
    • 从目标broker获取mq的消费偏移
    private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
        InterruptedException, MQClientException {
        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
        if (null == findBrokerResult) {
    
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
        }
    
        if (findBrokerResult != null) {
            QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
            requestHeader.setTopic(mq.getTopic());
            requestHeader.setConsumerGroup(this.groupName);
            requestHeader.setQueueId(mq.getQueueId());
    
            return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
                findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
        } else {
            throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
        }
    }
    

    1.2 DefaultMQPullConsumerImpl

    • 配置项
      DefaultMQPullConsumer defaultMQPullConsumer;
    • 启动时间
      long consumerStartTimestamp = System.currentTimeMillis();
    • 钩子函数
    private final RPCHook rpcHook;
    private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
    private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
    
    • 客户端状态
      volatile ServiceState serviceState = ServiceState.CREATE_JUST;
    • 通信接口
      MQClientInstance mQClientFactory;
    • 消费偏移持久化服务
      OffsetStore offsetStore;
    • 消费组实例消费mq负载均衡服务
      RebalanceImpl rebalanceImpl = new RebalancePullImpl(this);
    • 消费端拉取消息封装接口
      PullAPIWrapper pullAPIWrapper;
    • 消费类型,拉模式
    @Override
    public ConsumeType consumeType() {
        return ConsumeType.CONSUME_ACTIVELY;
    }
    
    • 消费偏移位置,首次从最后开始消费
    @Override
    public ConsumeFromWhere consumeFromWhere() {
        return ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
    }
    
    public enum ConsumeFromWhere {
    //从最后位置开始消费
        CONSUME_FROM_LAST_OFFSET,
    //从起始位置开始消费
        CONSUME_FROM_FIRST_OFFSET,
    //按时间戳消费
        CONSUME_FROM_TIMESTAMP,
    }
    

    1.2.1 pull消息拉取

    1.2.1.1 同步处理pullSyncImpl

    private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block,
        long timeout)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    /* 参数校验
    * 客户端状态为ServiceState.RUNNING
    * 指定目标MessageQueue
    * 指定消费偏移
    * 指定最大获取数量
    */
        this.makeSureStateOK();
        if (null == mq) {
            throw new MQClientException("mq is null", null);
        }
        if (offset < 0) {
            throw new MQClientException("offset < 0", null);
        }
        if (maxNums <= 0) {
            throw new MQClientException("maxNums <= 0", null);
        }
    //纪录topic信息
        this.subscriptionAutomatically(mq.getTopic());
    //构建拉取消息的标记
    /*
    * int FLAG_COMMIT_OFFSET = 0x1 << 0; broker同时持久化消费偏移
    * int FLAG_SUSPEND = 0x1 << 1; 无消息时,long polling等待
    * int FLAG_SUBSCRIPTION = 0x1 << 2;   //消费类型,tag方式消费
    * int FLAG_CLASS_FILTER = 0x1 << 3;//过滤服务器处理
    */
        int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
    //构建消费类型信息
        SubscriptionData subscriptionData;
        try {
            subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
                mq.getTopic(), subExpression);
        } catch (Exception e) {
            throw new MQClientException("parse subscription error", e);
        }
    //消费超时时间
        long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
    //pullAPIWrapper接口调用
        PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
            mq,
            subscriptionData.getSubString(),
            0L,
            offset,
            maxNums,
            sysFlag,
            0,
            this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
            timeoutMillis,
            CommunicationMode.SYNC,
            null
        );
    //返回结果处理
        this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
    // 钩子函数调用
        if (!this.consumeMessageHookList.isEmpty()) {
            ConsumeMessageContext consumeMessageContext = null;
            consumeMessageContext = new ConsumeMessageContext();
            consumeMessageContext.setConsumerGroup(this.groupName());
            consumeMessageContext.setMq(mq);
            consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
            consumeMessageContext.setSuccess(false);
            this.executeHookBefore(consumeMessageContext);
            consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
            consumeMessageContext.setSuccess(true);
            this.executeHookAfter(consumeMessageContext);
        }
        return pullResult;
    }
    

    1.2.1.2异步处理pullAsyncImpl

    • 处理流程与同步调用类似,只是返回结果在异步回调中处理。根据返回成功或异常调用PullCallback对应接口。
    private void pullAsyncImpl(
        final MessageQueue mq,
        final String subExpression,
        final long offset,
        final int maxNums,
        final PullCallback pullCallback,
        final boolean block,
        final long timeout) throws MQClientException, RemotingException, InterruptedException {
       ...
            this.pullAPIWrapper.pullKernelImpl(
                mq,
                subscriptionData.getSubString(),
                0L,
                offset,
                maxNums,
                sysFlag,
                0,
                this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
                timeoutMillis,
                CommunicationMode.ASYNC,
                new PullCallback() {
    
                    @Override
                    public void onSuccess(PullResult pullResult) {
                        pullCallback
                            .onSuccess(DefaultMQPullConsumerImpl.this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData));
                    }
    
                    @Override
                    public void onException(Throwable e) {
                        pullCallback.onException(e);
                    }
                });
        } catch (MQBrokerException e) {
            throw new MQClientException("pullAsync unknow exception", e);
        }
    }
    

    1.3 RebalancePullImpl

    • 负载均衡策略功能在父类RebalanceImpl实现
    • 实现了负载均衡消费队列变更监听回调
    @Override
    public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
        MessageQueueListener messageQueueListener = this.defaultMQPullConsumerImpl.getDefaultMQPullConsumer().getMessageQueueListener();
        if (messageQueueListener != null) {
            try {
                messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
            } catch (Throwable e) {
                log.error("messageQueueChanged exception", e);
            }
        }
    }
    

    1.3.1 RebalanceImpl

    • 主要push方式使用,见后面

    1.4 PullAPIWrapper

    1.4.1 消息拉取接口代理

    • 获取消息队列所在broker地址
    • 根据broker地址,拉取消息

    1.4.2 获取消息结果处理

    • 解码消息体
    • 按tag过滤需要消费的消息
    • 钩子函数处理
    • 返回消息

    相关文章

      网友评论

          本文标题:rocketmq源码6-客户端-pull消费者

          本文链接:https://www.haomeiwen.com/subject/ozjgjqtx.html