对于消费者如何拉取消息的方式,大家应该起码有一个概念了。
无非就是通过RebalanceService先进行队列的重新分配,随后通过PullMessageService一直轮询地去拉取消息。
那么拉取到消息如何进行消费呢?
首先定位一下入口,通过拉取消息的回调,来看看获取消息之后如何进行消费。
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void pullMessage(final PullRequest pullRequest) {
。。。省略部分代码
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
如果确实是没有消息可以被消费
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
再次发起拉取消息的请求
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
拉取到的消息的最小偏移量
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
这个函数主要的作用就是将待消费的消息,进行一个临时的存储,放入到处理队列(processQueue)的成员变量(msgTreeMap)中
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
提交消费请求---同拉取相似,一样用的是线程池的方式。
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
提交消费请求后,决定是否继续拉取消息
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
break;
省略没有拉取到消息 以及偏移量修正的代码逻辑。
}
}
}
};
进行拉取消息,更新回调
try {
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
}
从上面的代码可以得知,获取到消息之后,也是通过发送消费消息的请求,以进行消息的消费。
ConsumeMessageService的submitConsumeRequest方法。
在看如何消费之前,需要先看看消费服务的结构。
ConsumeMessageService
结构
public interface ConsumeMessageService {
void start();
void shutdown(long awaitTerminateMillis);
void updateCorePoolSize(int corePoolSize);
void incCorePoolSize();
void decCorePoolSize();
int getCorePoolSize();
ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume);
}
从这个接口的功能来看,必然也是线程池,可以控制核心线程数,以及开始/关闭。
另外提供了2个消费方法,提交消费申请或者直接消费。
接下来看看实现类。因为本次看的是并发消费,所以直接看看这个接口对应的实现类---并发消费。
ConsumeMessageConcurrentlyService---并发消费
结构上相对来说比较简单,所以就不直接画图了。先看看对应的成员变量
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
private static final InternalLogger log = ClientLogger.getLog();//日志
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
private final DefaultMQPushConsumer defaultMQPushConsumer;
private final MessageListenerConcurrently messageListener;//消息监听器
private final BlockingQueue<Runnable> consumeRequestQueue;//消费请求队列
private final ThreadPoolExecutor consumeExecutor;//消费的线程池
private final String consumerGroup;//消费组
private final ScheduledExecutorService scheduledExecutorService;//定时消费线程池,主要用于延时提交消费请求---单线程线程池
private final ScheduledExecutorService cleanExpireMsgExecutors;//清理过期消息---单线程线程池
}
从成员变量上来看,功能还是挺多的,可以延迟提交消费请求,清理过期消息
接下来看看并发消费服务的实例化
public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
MessageListenerConcurrently messageListener) {
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.messageListener = messageListener;
this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
无界队列
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
消费线程池,为定长线程池,默认20线程
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_"));
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
}
如何提交消费请求
首先还是看看提交消费请求的实现
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
@Override
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
默认为1.
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
consumeBatchSize,消息每次最多拉取32条,可以通过配置调整。
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
for (int total = 0; total < msgs.size(); ) {
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}
}
从上面的代码来看,其实每一个消息都会封装成一个消费的请求。
那么是如何进行消费的?由于是线程池消费的,所以我们只需要关注ConsumeRequest即可。
如何进行消费
由于是线程池消费的,所以我们只需要关注ConsumeRequest即可。
还是先来看看结构
ConsumeRequest的结构
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
class ConsumeRequest implements Runnable {
private final List<MessageExt> msgs;
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;
}
}
从结构上来看比较简单,由3个成员变量组成。本次消费的消息,对应的消息队列,以及处理队列。
再来看看消费的实现。这个时候关注run方法即可。
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
class ConsumeRequest implements Runnable {
private final List<MessageExt> msgs;
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;
}
@Override
public void run() {
if (this.processQueue.isDropped()) {
如果刚刚好进行队列的重新分配,processQueue废弃状态为true,那么说明该队列也不由该消费者消费,直接返回
return;
}
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap<String, String>());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
给消息的属性中设置开始消费的时间。
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}
通过监听器进行消费,并且返回消费状态
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
hasException = true;
}
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}
if (null == status) {
消费状态为空,说明消费失败,待会重试
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
如果处理队列没有被废弃
if (!processQueue.isDropped()) {
处理消费结果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}
}
消息监听器会返回消费的结果,最后根据结果再做处理。
对于消费结果的处理
代码如下
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty())
return;
switch (status) {
--------------------------------------------------------------------------------------------------------------------------------
消费成功
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex 为消息列表长度-1
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
----------------------------------------------------------------------------------------------------------------------------------
消费失败,稍后重试
case RECONSUME_LATER:
ackIndex设置为-1
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
---------------------------------------------------------------------------------------------------------------------------------
default:
break;
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
如果消费成功,ackIndex=size -1,自然不会开始遍历,为-1就会开始遍历
从代码来看,只会打印日志
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
用于存储sendback失败的消息
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
如果消费成功,ackIndex=size -1,自然不会开始遍历,为-1就会开始遍历
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
将消费失败的消息发送回broker
boolean result = this.sendMessageBack(msg, context);
if (!result) {
如果sendback失败,保存起来, 重复消费次数+1
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
对sendback失败的消息进行再次消费。
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
将本次消费的消息移除,避免重复消费,返回处理队列中最小的偏移量。
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
处理队列没有被废弃且偏移量大于0
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
将偏移量进行同步
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
并发消费的过程
从代码逻辑来看,并发消费,其实是通过线程池对每一个消费请求进行消费。
网友评论