开篇
- 这篇文章的核心逻辑分析DefaultLitePullConsumer的消息拉取的流程,核心思路在于通过ReBalanceImpl来驱动整个消息拉取任务启动。
- DefaultLitePullConsumer和DefaultMQPullConsumer的差别在于前者是consumer负责拉取消息,后者是业务代码负责拉取消息。
DefaultLitePullConsumer拉取流程
DefaultLitePullConsumer消息拉取
- DefaultLitePullConsumer的整个拉取流程如上所示,核心在于start流程。
- 核心的逻辑通过RebalanceService来驱动消息的拉取过程,负载均衡的逻辑和消息推送方式一致。
消息拉取例子
public class LitePullConsumerSubscribe {
public static volatile boolean running = true;
public static void main(String[] args) throws Exception {
// 1、创建DefaultLitePullConsumer对象
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("LitePullConsumer");
// 2、设置namesrv地址
litePullConsumer.setNamesrvAddr("localhost:9876");
// 3、订阅消费主题
litePullConsumer.subscribe("TopicTest", "*");
// 4、启动消费对象
litePullConsumer.start();
try {
// 5、循环开始消费消息
while (running) {
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.printf("%s%n", messageExts);
}
} finally {
litePullConsumer.shutdown();
}
}
}
- 1、创建DefaultLitePullConsumer对象。
- 2、设置namesrv地址。
- 3、订阅消费主题。
- 4、启动消费对象。
- 5、循环开始消费消息。
创建消费者
DefaultLitePullConsumer
public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {
// 消息拉取的实现DefaultLitePullConsumerImpl
private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl;
private String consumerGroup;
private long brokerSuspendMaxTimeMillis = 1000 * 20;
private long consumerTimeoutMillisWhenSuspend = 1000 * 30;
private long consumerPullTimeoutMillis = 1000 * 10;
private MessageModel messageModel = MessageModel.CLUSTERING;
// 消费队列回调监听函数MessageQueueListener
private MessageQueueListener messageQueueListener;
private OffsetStore offsetStore;
// MessageQueue的分配选择器
private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
private boolean unitMode = false;
private boolean autoCommit = true;
private int pullThreadNums = 20;
private long autoCommitIntervalMillis = 5 * 1000;
private int pullBatchSize = 10;
private long pullThresholdForAll = 10000;
private int consumeMaxSpan = 2000;
private int pullThresholdForQueue = 1000;
private int pullThresholdSizeForQueue = 100;
private long pollTimeoutMillis = 1000 * 5;
private long topicMetadataCheckIntervalMillis = 30 * 1000;
public DefaultLitePullConsumer(final String consumerGroup) {
this(null, consumerGroup, null);
}
public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.consumerGroup = consumerGroup;
// 创建DefaultLitePullConsumerImpl对象
defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
}
}
- 1、创建DefaultLitePullConsumer对象。
- 2、DefaultLitePullConsumer内包含DefaultLitePullConsumerImpl对象。
DefaultLitePullConsumerImpl
public class DefaultLitePullConsumerImpl implements MQConsumerInner {
private final InternalLogger log = ClientLogger.getLog();
private final long consumerStartTimestamp = System.currentTimeMillis();
private final RPCHook rpcHook;
private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
// 核心的消息消费的MQClientInstance对象
protected MQClientInstance mQClientFactory;
private PullAPIWrapper pullAPIWrapper;
private OffsetStore offsetStore;
// 消费队列的负载均衡
private RebalanceImpl rebalanceImpl = new RebalanceLitePullImpl(this);
private enum SubscriptionType {
NONE, SUBSCRIBE, ASSIGN
}
private static final String NOT_RUNNING_EXCEPTION_MESSAGE = "The consumer not running, please start it first.";
private static final String SUBSCRIPTION_CONFILCT_EXCEPTION_MESSAGE = "Subscribe and assign are mutually exclusive.";
private SubscriptionType subscriptionType = SubscriptionType.NONE;
private long pullTimeDelayMillsWhenException = 1000;
private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
private static final long PULL_TIME_DELAY_MILLS_WHEN_PAUSE = 1000;
private DefaultLitePullConsumer defaultLitePullConsumer;
// 消费任务的数据结构
private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =
new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
// 分配的消息队列
private AssignedMessageQueue assignedMessageQueue = new AssignedMessageQueue();
// 消费消息的数据结构
private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>();
private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
private final ScheduledExecutorService scheduledExecutorService;
private Map<String, TopicMessageQueueChangeListener> topicMessageQueueChangeListenerMap = new HashMap<String, TopicMessageQueueChangeListener>();
private Map<String, Set<MessageQueue>> messageQueuesForTopic = new HashMap<String, Set<MessageQueue>>();
private long consumeRequestFlowControlTimes = 0L;
private long queueFlowControlTimes = 0L;
private long queueMaxSpanFlowControlTimes = 0L;
private long nextAutoCommitDeadline = -1L;
private final MessageQueueLock messageQueueLock = new MessageQueueLock();
public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) {
this.defaultLitePullConsumer = defaultLitePullConsumer;
this.rpcHook = rpcHook;
this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
this.defaultLitePullConsumer.getPullThreadNums(),
new ThreadFactoryImpl("PullMsgThread-" + this.defaultLitePullConsumer.getConsumerGroup())
);
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "MonitorMessageQueueChangeThread");
}
});
this.pullTimeDelayMillsWhenException = defaultLitePullConsumer.getPullTimeDelayMillsWhenException();
}
}
- DefaultLitePullConsumerImpl是消息拉取消费的实现类。
- RebalanceImpl是消息拉取的消费队列的负载均衡器。
- taskTable负责保存MessageQueue和对应的消息拉取任务。
- consumeRequestCache保存的消息消费任务。
订阅消费主题
DefaultLitePullConsumerImpl
public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {
@Override
public void subscribe(String topic, String subExpression) throws MQClientException {
this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression);
}
}
public class DefaultLitePullConsumerImpl implements MQConsumerInner {
// 消费队列的负载均衡
private RebalanceImpl rebalanceImpl = new RebalanceLitePullImpl(this);
private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =
new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
// assignedMessageQueue保存负责的MessageQueue对象
private AssignedMessageQueue assignedMessageQueue = new AssignedMessageQueue();
private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>();
public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
try {
if (topic == null || topic.equals("")) {
throw new IllegalArgumentException("Topic can not be null or empty.");
}
// 1、创建订阅的数据结构对象
setSubscriptionType(SubscriptionType.SUBSCRIBE);
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(defaultLitePullConsumer.getConsumerGroup(),
topic, subExpression);
// 2、负责均衡保存订阅数据对象
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
// 3、注册消息队列回调处理函数
this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
// 4、assignedMessageQueue保存rebalanceImpl
assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
if (serviceState == ServiceState.RUNNING) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
updateTopicSubscribeInfoWhenSubscriptionChanged();
}
} catch (Exception e) {
throw new MQClientException("subscribe exception", e);
}
}
}
- 1、创建订阅的数据结构对象。
- 2、MessageQueue消费负载均衡器负责保存订阅数据对象。
- 3、注册MessageQueue重新分配的回调处理器。
- 4、assignedMessageQueue保存rebalanceImpl。
启动消息拉取消费者
DefaultLitePullConsumerImpl
public class DefaultLitePullConsumerImpl implements MQConsumerInner {
protected MQClientInstance mQClientFactory;
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
if (this.defaultLitePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultLitePullConsumer.changeInstanceNameToPID();
}
// 初始化Client端负责消息拉取
initMQClientFactory();
// 初始化MessageQueue的负载均衡器
initRebalanceImpl();
initPullAPIWrapper();
// 初始化消费位移保存
initOffsetStore();
// 核心启动MQClientInstance
mQClientFactory.start();
startScheduleTask();
this.serviceState = ServiceState.RUNNING;
operateAfterRunning();
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PullConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
}
private void initMQClientFactory() throws MQClientException {
// 创建MQClientInstance对象
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultLitePullConsumer, this.rpcHook);
boolean registerOK = mQClientFactory.registerConsumer(this.defaultLitePullConsumer.getConsumerGroup(), this);
}
}
public class MQClientInstance {
private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
public boolean registerConsumer(final String group, final MQConsumerInner consumer) {
if (null == group || null == consumer) {
return false;
}
MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
if (prev != null) {
log.warn("the consumer group[" + group + "] exist already.");
return false;
}
return true;
}
}
- initMQClientFactory负责创建MQClientInstance对象。
- MQClientInstance对象负责消息拉取的核心逻辑。
- consumerTable保存消费分组和consumer实例的映射关系。
MQClientInstance
public class MQClientInstance {
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
this.clientConfig = clientConfig;
this.instanceIndex = instanceIndex;
this.nettyClientConfig = new NettyClientConfig();
this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
if (this.clientConfig.getNamesrvAddr() != null) {
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
}
this.clientId = clientId;
this.mQAdminImpl = new MQAdminImpl(this);
this.pullMessageService = new PullMessageService(this);
this.rebalanceService = new RebalanceService(this);
this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
this.defaultMQProducer.resetClientConfig(clientConfig);
this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
}
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
}
- rebalanceService是消息拉取流程的整个驱动器。
- pullMessageService在消息拉取流程中没有起到作用。
RebalanceService
public class RebalanceService extends ServiceThread {
private static long waitInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.waitInterval", "20000"));
private final InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mqClientFactory;
public RebalanceService(MQClientInstance mqClientFactory) {
this.mqClientFactory = mqClientFactory;
}
@Override
public void run() {
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
}
}
public class MQClientInstance {
public void doRebalance() {
// consumerTable保存key为LitePullConsumer的DefaultLitePullConsumerImpl对象
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
impl.doRebalance();
} catch (Throwable e) {
}
}
}
}
}
- RebalanceService的核心流程在于触发DefaultLitePullConsumerImpl的重新负载均衡。
- 核心需要关注DefaultLitePullConsumerImpl的doRebalance方法。
RebalanceImpl
public class DefaultLitePullConsumerImpl implements MQConsumerInner {
// RebalanceLitePullImpl extends RebalanceImpl
private RebalanceImpl rebalanceImpl = new RebalanceLitePullImpl(this);
public void doRebalance() {
if (this.rebalanceImpl != null) {
this.rebalanceImpl.doRebalance(false);
}
}
}
public abstract class RebalanceImpl {
protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =
new ConcurrentHashMap<String, Set<MessageQueue>>();
protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
new ConcurrentHashMap<String, SubscriptionData>();
public void doRebalance(final boolean isOrder) {
// 获取consumer
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() {
return subscriptionInner;
}
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
case CLUSTERING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
}
public class RebalanceLitePullImpl extends RebalanceImpl {
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
MessageQueueListener messageQueueListener = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener();
if (messageQueueListener != null) {
try {
messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
} catch (Throwable e) {
log.error("messageQueueChanged exception", e);
}
}
}
}
- DefaultLitePullConsumerImpl的doRebalance会执行RebalanceLitePullImpl的doRebalance方法。
- 核心会遍历所有订阅的Topic依次执行rebalanceByTopic的操作。
- rebalanceByTopic会执行MessageQueue的分配操作,按照MessageQueue和消费分组进行分配,分配策略和推送的消费分组是一致的。
- 负载消费拉取的MessageQueue有变动的情况下会执行RebalanceLitePullImpl 的messageQueueChanged操作。
- messageQueueListener对象为DefaultLitePullConsumerImpl的MessageQueueListenerImpl。
MessageQueueListenerImpl
public class DefaultLitePullConsumerImpl implements MQConsumerInner {
private AssignedMessageQueue assignedMessageQueue = new AssignedMessageQueue();
class MessageQueueListenerImpl implements MessageQueueListener {
@Override
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
switch (messageModel) {
case BROADCASTING: // 广播模式
updateAssignedMessageQueue(topic, mqAll);
updatePullTask(topic, mqAll);
break;
case CLUSTERING: // 集群模式
updateAssignedMessageQueue(topic, mqDivided);
updatePullTask(topic, mqDivided);
break;
default:
break;
}
}
}
private void updateAssignedMessageQueue(String topic, Set<MessageQueue> assignedMessageQueue) {
// assignedMessageQueue负责保存topic和对应分配的MessageQueue
this.assignedMessageQueue.updateAssignedMessageQueue(topic, assignedMessageQueue);
}
private void updatePullTask(String topic, Set<MessageQueue> mqNewSet) {
Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();
while (it.hasNext()) {
// 移除不负责的消息拉取任务PullTask
Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
if (next.getKey().getTopic().equals(topic)) {
if (!mqNewSet.contains(next.getKey())) {
next.getValue().setCancelled(true);
it.remove();
}
}
}
// 启动MessageQueue对应的拉取任务
startPullTask(mqNewSet);
}
private void startPullTask(Collection<MessageQueue> mqSet) {
for (MessageQueue messageQueue : mqSet) {
if (!this.taskTable.containsKey(messageQueue)) {
PullTaskImpl pullTask = new PullTaskImpl(messageQueue);
this.taskTable.put(messageQueue, pullTask);
this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS);
}
}
}
}
- updateAssignedMessageQueue负责赋值最新分配的MessageQueue。
- startPullTask负责启动消息拉取任务。
DefaultLitePullConsumerImpl
public class DefaultLitePullConsumerImpl implements MQConsumerInner {
private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>();
public class PullTaskImpl implements Runnable {
private final MessageQueue messageQueue;
private volatile boolean cancelled = false;
public PullTaskImpl(final MessageQueue messageQueue) {
this.messageQueue = messageQueue;
}
@Override
public void run() {
if (!this.isCancelled()) {
// 从assignedMessageQueue获取ProcessQueue对象
ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
long offset = nextPullOffset(messageQueue);
long pullDelayTimeMills = 0;
try {
SubscriptionData subscriptionData;
if (subscriptionType == SubscriptionType.SUBSCRIBE) {
String topic = this.messageQueue.getTopic();
subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);
} else {
String topic = this.messageQueue.getTopic();
subscriptionData = FilterAPI.buildSubscriptionData(defaultLitePullConsumer.getConsumerGroup(),
topic, SubscriptionData.SUB_ALL);
}
// 执行消息的拉取动作
PullResult pullResult = pull(messageQueue, subscriptionData, offset, nextPullBatchSize());
switch (pullResult.getPullStatus()) {
case FOUND:
final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
synchronized (objLock) {
if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
// 拉取的消息放置到processQueue当中
processQueue.putMessage(pullResult.getMsgFoundList());
// submitConsumeRequest负责保存待消费的任务
submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
}
}
break;
case OFFSET_ILLEGAL:
log.warn("The pull request offset illegal, {}", pullResult.toString());
break;
default:
break;
}
// 更新下一次拉取的位移
updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
} catch (Throwable e) {
pullDelayTimeMills = pullTimeDelayMillsWhenException;
}
// 重新投递消息拉取任务
if (!this.isCancelled()) {
scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS);
} else {
}
}
}
}
private void submitConsumeRequest(ConsumeRequest consumeRequest) {
try {
consumeRequestCache.put(consumeRequest);
} catch (InterruptedException e) {
log.error("Submit consumeRequest error", e);
}
}
public class ConsumeRequest {
private final List<MessageExt> messageExts;
private final MessageQueue messageQueue;
private final ProcessQueue processQueue;
public ConsumeRequest(final List<MessageExt> messageExts, final MessageQueue messageQueue,
final ProcessQueue processQueue) {
this.messageExts = messageExts;
this.messageQueue = messageQueue;
this.processQueue = processQueue;
}
}
}
- PullTaskImpl是负责消息拉取的任务。
- PullTaskImpl#run过程中会执行消息的拉取的pull操作,更新下次拉取的位移,通过scheduledThreadPoolExecutor.schedule()再次投递消息拉取任务。
- consumeRequestCache负责保存拉取待消费的任务ConsumeRequest任务。
消息的消费
public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public synchronized List<MessageExt> poll(long timeout) {
try {
checkServiceState();
if (timeout < 0)
throw new IllegalArgumentException("Timeout must not be negative");
if (defaultLitePullConsumer.isAutoCommit()) {
maybeAutoCommit();
}
long endTime = System.currentTimeMillis() + timeout;
ConsumeRequest consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
if (endTime - System.currentTimeMillis() > 0) {
while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) {
consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
if (endTime - System.currentTimeMillis() <= 0)
break;
}
}
if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) {
List<MessageExt> messages = consumeRequest.getMessageExts();
long offset = consumeRequest.getProcessQueue().removeMessage(messages);
assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
// 设置messages的topic为重试队列
this.resetTopic(messages);
return messages;
}
} catch (InterruptedException ignore) {
}
return Collections.emptyList();
}
}
- poll负责拉取待消费的任务进行处理。
- resetTopic负责重新设置消息的Topic为重试队列,但是似乎没有什么用。
网友评论