美文网首页rocketMq理论与实践
RocketMq DefaultLitePullConsumer

RocketMq DefaultLitePullConsumer

作者: 晴天哥_王志 | 来源:发表于2020-07-05 09:49 被阅读0次

    开篇

    • 这篇文章的核心逻辑分析DefaultLitePullConsumer的消息拉取的流程,核心思路在于通过ReBalanceImpl来驱动整个消息拉取任务启动。
    • DefaultLitePullConsumer和DefaultMQPullConsumer的差别在于前者是consumer负责拉取消息,后者是业务代码负责拉取消息。

    DefaultLitePullConsumer拉取流程

    DefaultLitePullConsumer消息拉取
    • DefaultLitePullConsumer的整个拉取流程如上所示,核心在于start流程。
    • 核心的逻辑通过RebalanceService来驱动消息的拉取过程,负载均衡的逻辑和消息推送方式一致。

    消息拉取例子

    public class LitePullConsumerSubscribe {
    
        public static volatile boolean running = true;
    
        public static void main(String[] args) throws Exception {
            // 1、创建DefaultLitePullConsumer对象
            DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("LitePullConsumer");
            // 2、设置namesrv地址
            litePullConsumer.setNamesrvAddr("localhost:9876");
            // 3、订阅消费主题
            litePullConsumer.subscribe("TopicTest", "*");
            // 4、启动消费对象
            litePullConsumer.start();
            try {
                // 5、循环开始消费消息
                while (running) {
                    List<MessageExt> messageExts = litePullConsumer.poll();
                    System.out.printf("%s%n", messageExts);
                }
            } finally {
                litePullConsumer.shutdown();
            }
        }
    }
    
    • 1、创建DefaultLitePullConsumer对象。
    • 2、设置namesrv地址。
    • 3、订阅消费主题。
    • 4、启动消费对象。
    • 5、循环开始消费消息。

    创建消费者

    DefaultLitePullConsumer

    public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {
        // 消息拉取的实现DefaultLitePullConsumerImpl
        private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl;
        private String consumerGroup;
        private long brokerSuspendMaxTimeMillis = 1000 * 20;
        private long consumerTimeoutMillisWhenSuspend = 1000 * 30;
        private long consumerPullTimeoutMillis = 1000 * 10;
        private MessageModel messageModel = MessageModel.CLUSTERING;
        // 消费队列回调监听函数MessageQueueListener
        private MessageQueueListener messageQueueListener;
        private OffsetStore offsetStore;
        // MessageQueue的分配选择器
        private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
        private boolean unitMode = false;
        private boolean autoCommit = true;
        private int pullThreadNums = 20;
        private long autoCommitIntervalMillis = 5 * 1000;
        private int pullBatchSize = 10;
        private long pullThresholdForAll = 10000;
        private int consumeMaxSpan = 2000;
        private int pullThresholdForQueue = 1000;
        private int pullThresholdSizeForQueue = 100;
        private long pollTimeoutMillis = 1000 * 5;
        private long topicMetadataCheckIntervalMillis = 30 * 1000;
    
        public DefaultLitePullConsumer(final String consumerGroup) {
            this(null, consumerGroup, null);
        }
    
        public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
            this.namespace = namespace;
            this.consumerGroup = consumerGroup;
            // 创建DefaultLitePullConsumerImpl对象
            defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
        }
    }
    
    • 1、创建DefaultLitePullConsumer对象。
    • 2、DefaultLitePullConsumer内包含DefaultLitePullConsumerImpl对象。

    DefaultLitePullConsumerImpl

    public class DefaultLitePullConsumerImpl implements MQConsumerInner {
    
        private final InternalLogger log = ClientLogger.getLog();
        private final long consumerStartTimestamp = System.currentTimeMillis();
        private final RPCHook rpcHook;
        private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
        private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
        // 核心的消息消费的MQClientInstance对象
        protected MQClientInstance mQClientFactory;
        private PullAPIWrapper pullAPIWrapper;
        private OffsetStore offsetStore;
        // 消费队列的负载均衡
        private RebalanceImpl rebalanceImpl = new RebalanceLitePullImpl(this);
    
        private enum SubscriptionType {
            NONE, SUBSCRIBE, ASSIGN
        }
    
        private static final String NOT_RUNNING_EXCEPTION_MESSAGE = "The consumer not running, please start it first.";
        private static final String SUBSCRIPTION_CONFILCT_EXCEPTION_MESSAGE = "Subscribe and assign are mutually exclusive.";
        private SubscriptionType subscriptionType = SubscriptionType.NONE;
        private long pullTimeDelayMillsWhenException = 1000;
        private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
        private static final long PULL_TIME_DELAY_MILLS_WHEN_PAUSE = 1000;
        private DefaultLitePullConsumer defaultLitePullConsumer;
        // 消费任务的数据结构
        private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =
            new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
        // 分配的消息队列
        private AssignedMessageQueue assignedMessageQueue = new AssignedMessageQueue();
        // 消费消息的数据结构
        private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>();
        private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
        private final ScheduledExecutorService scheduledExecutorService;
        private Map<String, TopicMessageQueueChangeListener> topicMessageQueueChangeListenerMap = new HashMap<String, TopicMessageQueueChangeListener>();
        private Map<String, Set<MessageQueue>> messageQueuesForTopic = new HashMap<String, Set<MessageQueue>>();
        private long consumeRequestFlowControlTimes = 0L;
        private long queueFlowControlTimes = 0L;
        private long queueMaxSpanFlowControlTimes = 0L;
        private long nextAutoCommitDeadline = -1L;
        private final MessageQueueLock messageQueueLock = new MessageQueueLock();
    
        public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) {
            this.defaultLitePullConsumer = defaultLitePullConsumer;
            this.rpcHook = rpcHook;
    
            this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
                this.defaultLitePullConsumer.getPullThreadNums(),
                new ThreadFactoryImpl("PullMsgThread-" + this.defaultLitePullConsumer.getConsumerGroup())
            );
    
            this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "MonitorMessageQueueChangeThread");
                }
            });
            this.pullTimeDelayMillsWhenException = defaultLitePullConsumer.getPullTimeDelayMillsWhenException();
        }
    }
    
    • DefaultLitePullConsumerImpl是消息拉取消费的实现类。
    • RebalanceImpl是消息拉取的消费队列的负载均衡器。
    • taskTable负责保存MessageQueue和对应的消息拉取任务。
    • consumeRequestCache保存的消息消费任务。

    订阅消费主题

    DefaultLitePullConsumerImpl

    public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {
    
        @Override
        public void subscribe(String topic, String subExpression) throws MQClientException {
            this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression);
        }
    }
    
    
    public class DefaultLitePullConsumerImpl implements MQConsumerInner {
        // 消费队列的负载均衡
        private RebalanceImpl rebalanceImpl = new RebalanceLitePullImpl(this);
    
        private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =
            new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
        // assignedMessageQueue保存负责的MessageQueue对象
        private AssignedMessageQueue assignedMessageQueue = new AssignedMessageQueue();
    
        private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>();
    
        public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
            try {
                if (topic == null || topic.equals("")) {
                    throw new IllegalArgumentException("Topic can not be null or empty.");
                }
                // 1、创建订阅的数据结构对象
                setSubscriptionType(SubscriptionType.SUBSCRIBE);
                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(defaultLitePullConsumer.getConsumerGroup(),
                    topic, subExpression);
                // 2、负责均衡保存订阅数据对象
                this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
                // 3、注册消息队列回调处理函数
                this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
                // 4、assignedMessageQueue保存rebalanceImpl
                assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
    
                if (serviceState == ServiceState.RUNNING) {
                    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
                    updateTopicSubscribeInfoWhenSubscriptionChanged();
                }
            } catch (Exception e) {
                throw new MQClientException("subscribe exception", e);
            }
        }
    }
    
    • 1、创建订阅的数据结构对象。
    • 2、MessageQueue消费负载均衡器负责保存订阅数据对象。
    • 3、注册MessageQueue重新分配的回调处理器。
    • 4、assignedMessageQueue保存rebalanceImpl。

    启动消息拉取消费者

    DefaultLitePullConsumerImpl

    public class DefaultLitePullConsumerImpl implements MQConsumerInner {
    
        protected MQClientInstance mQClientFactory;
    
        public synchronized void start() throws MQClientException {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    this.checkConfig();
                    if (this.defaultLitePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                        this.defaultLitePullConsumer.changeInstanceNameToPID();
                    }
                    // 初始化Client端负责消息拉取
                    initMQClientFactory();
                    // 初始化MessageQueue的负载均衡器
                    initRebalanceImpl();
                    initPullAPIWrapper();
                    // 初始化消费位移保存
                    initOffsetStore();
                    // 核心启动MQClientInstance
                    mQClientFactory.start();
                    startScheduleTask();
                    this.serviceState = ServiceState.RUNNING;
                    operateAfterRunning();
    
                    break;
                case RUNNING:
                case START_FAILED:
                case SHUTDOWN_ALREADY:
                    throw new MQClientException("The PullConsumer service state not OK, maybe started once, "
                        + this.serviceState
                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                        null);
                default:
                    break;
            }
        }
    
        private void initMQClientFactory() throws MQClientException {
            // 创建MQClientInstance对象
            this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultLitePullConsumer, this.rpcHook);
    
            boolean registerOK = mQClientFactory.registerConsumer(this.defaultLitePullConsumer.getConsumerGroup(), this);
        }
    }
    
    
    public class MQClientInstance {
    
        private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
        private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
        private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
    
        public boolean registerConsumer(final String group, final MQConsumerInner consumer) {
            if (null == group || null == consumer) {
                return false;
            }
    
            MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
            if (prev != null) {
                log.warn("the consumer group[" + group + "] exist already.");
                return false;
            }
    
            return true;
        }
    }
    
    • initMQClientFactory负责创建MQClientInstance对象。
    • MQClientInstance对象负责消息拉取的核心逻辑。
    • consumerTable保存消费分组和consumer实例的映射关系。

    MQClientInstance

    public class MQClientInstance {
    
        public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
            this.clientConfig = clientConfig;
            this.instanceIndex = instanceIndex;
            this.nettyClientConfig = new NettyClientConfig();
            this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
            this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
            this.clientRemotingProcessor = new ClientRemotingProcessor(this);
            this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
    
            if (this.clientConfig.getNamesrvAddr() != null) {
                this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
            }
    
            this.clientId = clientId;
            this.mQAdminImpl = new MQAdminImpl(this);
            this.pullMessageService = new PullMessageService(this);
            this.rebalanceService = new RebalanceService(this);
            this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
            this.defaultMQProducer.resetClientConfig(clientConfig);
            this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
        }
    
        public void start() throws MQClientException {
    
            synchronized (this) {
                switch (this.serviceState) {
                    case CREATE_JUST:
                        this.serviceState = ServiceState.START_FAILED;
                        // If not specified,looking address from name server
                        if (null == this.clientConfig.getNamesrvAddr()) {
                            this.mQClientAPIImpl.fetchNameServerAddr();
                        }
                        // Start request-response channel
                        this.mQClientAPIImpl.start();
                        // Start various schedule tasks
                        this.startScheduledTask();
                        // Start pull service
                        this.pullMessageService.start();
                        // Start rebalance service
                        this.rebalanceService.start();
                        // Start push service
                        this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                        log.info("the client factory [{}] start OK", this.clientId);
                        this.serviceState = ServiceState.RUNNING;
                        break;
                    case START_FAILED:
                        throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                    default:
                        break;
                }
            }
        }
    }
    
    • rebalanceService是消息拉取流程的整个驱动器。
    • pullMessageService在消息拉取流程中没有起到作用。

    RebalanceService

    public class RebalanceService extends ServiceThread {
    
        private static long waitInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.waitInterval", "20000"));
        private final InternalLogger log = ClientLogger.getLog();
        private final MQClientInstance mqClientFactory;
    
        public RebalanceService(MQClientInstance mqClientFactory) {
            this.mqClientFactory = mqClientFactory;
        }
    
        @Override
        public void run() {
            while (!this.isStopped()) {
                this.waitForRunning(waitInterval);
                this.mqClientFactory.doRebalance();
            }
        }
    }
    
    
    public class MQClientInstance {
    
        public void doRebalance() {
            // consumerTable保存key为LitePullConsumer的DefaultLitePullConsumerImpl对象
            for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
                MQConsumerInner impl = entry.getValue();
                if (impl != null) {
                    try {
                        impl.doRebalance();
                    } catch (Throwable e) {
                    }
                }
            }
        }
    }
    
    • RebalanceService的核心流程在于触发DefaultLitePullConsumerImpl的重新负载均衡。
    • 核心需要关注DefaultLitePullConsumerImpl的doRebalance方法。

    RebalanceImpl

    public class DefaultLitePullConsumerImpl implements MQConsumerInner {
        // RebalanceLitePullImpl extends RebalanceImpl 
        private RebalanceImpl rebalanceImpl = new RebalanceLitePullImpl(this);
    
        public void doRebalance() {
            if (this.rebalanceImpl != null) {
                this.rebalanceImpl.doRebalance(false);
            }
        }
    }
    
    
    public abstract class RebalanceImpl {
    
        protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
        protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =
            new ConcurrentHashMap<String, Set<MessageQueue>>();
        protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
            new ConcurrentHashMap<String, SubscriptionData>();
    
        public void doRebalance(final boolean isOrder) {
            // 获取consumer
            Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
            if (subTable != null) {
                for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                    final String topic = entry.getKey();
                    try {
                        this.rebalanceByTopic(topic, isOrder);
                    } catch (Throwable e) {
                        if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                            log.warn("rebalanceByTopic Exception", e);
                        }
                    }
                }
            }
    
            this.truncateMessageQueueNotMyTopic();
        }
    
    
        public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() {
            return subscriptionInner;
        }
    
    
        private void rebalanceByTopic(final String topic, final boolean isOrder) {
            switch (messageModel) {
                case BROADCASTING: {
                    Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                    if (mqSet != null) {
                        boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                        if (changed) {
                            this.messageQueueChanged(topic, mqSet, mqSet);
                            log.info("messageQueueChanged {} {} {} {}",
                                consumerGroup,
                                topic,
                                mqSet,
                                mqSet);
                        }
                    } else {
                        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                    }
                    break;
                }
                case CLUSTERING: {
    
                    Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                    List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                    if (mqSet != null && cidAll != null) {
                        List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                        mqAll.addAll(mqSet);
                        Collections.sort(mqAll);
                        Collections.sort(cidAll);
                        AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
                        List<MessageQueue> allocateResult = null;
                        try {
                            allocateResult = strategy.allocate(
                                this.consumerGroup,
                                this.mQClientFactory.getClientId(),
                                mqAll,
                                cidAll);
                        } catch (Throwable e) {
                            return;
                        }
    
                        Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                        if (allocateResult != null) {
                            allocateResultSet.addAll(allocateResult);
                        }
    
                        boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                        if (changed) {
                            this.messageQueueChanged(topic, mqSet, allocateResultSet);
                        }
                    }
                    break;
                }
                default:
                    break;
            }
        }
    }
    
    
    public class RebalanceLitePullImpl extends RebalanceImpl {
    
        public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
            MessageQueueListener messageQueueListener = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener();
            if (messageQueueListener != null) {
                try {
                    messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
                } catch (Throwable e) {
                    log.error("messageQueueChanged exception", e);
                }
            }
        }
    }
    
    • DefaultLitePullConsumerImpl的doRebalance会执行RebalanceLitePullImpl的doRebalance方法。
    • 核心会遍历所有订阅的Topic依次执行rebalanceByTopic的操作。
    • rebalanceByTopic会执行MessageQueue的分配操作,按照MessageQueue和消费分组进行分配,分配策略和推送的消费分组是一致的。
    • 负载消费拉取的MessageQueue有变动的情况下会执行RebalanceLitePullImpl 的messageQueueChanged操作。
    • messageQueueListener对象为DefaultLitePullConsumerImpl的MessageQueueListenerImpl。

    MessageQueueListenerImpl

    public class DefaultLitePullConsumerImpl implements MQConsumerInner {
    
        private AssignedMessageQueue assignedMessageQueue = new AssignedMessageQueue();
    
        class MessageQueueListenerImpl implements MessageQueueListener {
            @Override
            public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
                MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
                switch (messageModel) {
                    case BROADCASTING: // 广播模式
                        updateAssignedMessageQueue(topic, mqAll);
                        updatePullTask(topic, mqAll);
                        break;
                    case CLUSTERING: // 集群模式
                        updateAssignedMessageQueue(topic, mqDivided);
                        updatePullTask(topic, mqDivided);
                        break;
                    default:
                        break;
                }
            }
        }
    
        private void updateAssignedMessageQueue(String topic, Set<MessageQueue> assignedMessageQueue) {
            // assignedMessageQueue负责保存topic和对应分配的MessageQueue
            this.assignedMessageQueue.updateAssignedMessageQueue(topic, assignedMessageQueue);
        }
    
        private void updatePullTask(String topic, Set<MessageQueue> mqNewSet) {
            Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();
            while (it.hasNext()) {
                // 移除不负责的消息拉取任务PullTask
                Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
                if (next.getKey().getTopic().equals(topic)) {
                    if (!mqNewSet.contains(next.getKey())) {
                        next.getValue().setCancelled(true);
                        it.remove();
                    }
                }
            }
            // 启动MessageQueue对应的拉取任务
            startPullTask(mqNewSet);
        }
    
        private void startPullTask(Collection<MessageQueue> mqSet) {
            for (MessageQueue messageQueue : mqSet) {
                if (!this.taskTable.containsKey(messageQueue)) {
                    PullTaskImpl pullTask = new PullTaskImpl(messageQueue);
                    this.taskTable.put(messageQueue, pullTask);
                    this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS);
                }
            }
        }
    }
    
    • updateAssignedMessageQueue负责赋值最新分配的MessageQueue。
    • startPullTask负责启动消息拉取任务。

    DefaultLitePullConsumerImpl

    public class DefaultLitePullConsumerImpl implements MQConsumerInner {
        
        private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>();
    
        public class PullTaskImpl implements Runnable {
            private final MessageQueue messageQueue;
            private volatile boolean cancelled = false;
    
            public PullTaskImpl(final MessageQueue messageQueue) {
                this.messageQueue = messageQueue;
            }
    
            @Override
            public void run() {
    
                if (!this.isCancelled()) {
                    // 从assignedMessageQueue获取ProcessQueue对象
                    ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
    
                    long offset = nextPullOffset(messageQueue);
                    long pullDelayTimeMills = 0;
                    try {
                        SubscriptionData subscriptionData;
                        if (subscriptionType == SubscriptionType.SUBSCRIBE) {
                            String topic = this.messageQueue.getTopic();
                            subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);
                        } else {
                            String topic = this.messageQueue.getTopic();
                            subscriptionData = FilterAPI.buildSubscriptionData(defaultLitePullConsumer.getConsumerGroup(),
                                topic, SubscriptionData.SUB_ALL);
                        }
                        // 执行消息的拉取动作
                        PullResult pullResult = pull(messageQueue, subscriptionData, offset, nextPullBatchSize());
                        switch (pullResult.getPullStatus()) {
                            case FOUND:
                                final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
                                synchronized (objLock) {
                                    if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
                                        // 拉取的消息放置到processQueue当中
                                        processQueue.putMessage(pullResult.getMsgFoundList());
                                        // submitConsumeRequest负责保存待消费的任务
                                        submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
                                    }
                                }
                                break;
                            case OFFSET_ILLEGAL:
                                log.warn("The pull request offset illegal, {}", pullResult.toString());
                                break;
                            default:
                                break;
                        }
                        // 更新下一次拉取的位移
                        updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
                    } catch (Throwable e) {
                        pullDelayTimeMills = pullTimeDelayMillsWhenException;
                    }
    
                    // 重新投递消息拉取任务
                    if (!this.isCancelled()) {
                        scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS);
                    } else {
                    }
                }
            }
        }
    
        private void submitConsumeRequest(ConsumeRequest consumeRequest) {
            try {
                consumeRequestCache.put(consumeRequest);
            } catch (InterruptedException e) {
                log.error("Submit consumeRequest error", e);
            }
        }
    
    
        public class ConsumeRequest {
            private final List<MessageExt> messageExts;
            private final MessageQueue messageQueue;
            private final ProcessQueue processQueue;
    
            public ConsumeRequest(final List<MessageExt> messageExts, final MessageQueue messageQueue,
                final ProcessQueue processQueue) {
                this.messageExts = messageExts;
                this.messageQueue = messageQueue;
                this.processQueue = processQueue;
            }
        }
    }
    
    • PullTaskImpl是负责消息拉取的任务。
    • PullTaskImpl#run过程中会执行消息的拉取的pull操作,更新下次拉取的位移,通过scheduledThreadPoolExecutor.schedule()再次投递消息拉取任务。
    • consumeRequestCache负责保存拉取待消费的任务ConsumeRequest任务。

    消息的消费

    public class DefaultLitePullConsumerImpl implements MQConsumerInner {
    
        public synchronized List<MessageExt> poll(long timeout) {
            try {
                checkServiceState();
                if (timeout < 0)
                    throw new IllegalArgumentException("Timeout must not be negative");
    
                if (defaultLitePullConsumer.isAutoCommit()) {
                    maybeAutoCommit();
                }
                long endTime = System.currentTimeMillis() + timeout;
    
                ConsumeRequest consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    
                if (endTime - System.currentTimeMillis() > 0) {
                    while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) {
                        consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
                        if (endTime - System.currentTimeMillis() <= 0)
                            break;
                    }
                }
    
                if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) {
                    List<MessageExt> messages = consumeRequest.getMessageExts();
                    long offset = consumeRequest.getProcessQueue().removeMessage(messages);
                    assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
                    // 设置messages的topic为重试队列
                    this.resetTopic(messages);
                    return messages;
                }
            } catch (InterruptedException ignore) {
    
            }
    
            return Collections.emptyList();
        }
    }
    
    • poll负责拉取待消费的任务进行处理。
    • resetTopic负责重新设置消息的Topic为重试队列,但是似乎没有什么用。

    相关文章

      网友评论

        本文标题:RocketMq DefaultLitePullConsumer

        本文链接:https://www.haomeiwen.com/subject/urpbfktx.html