rocketmq客户端消费流程
只关注于集群模式下并发消费的push模式
组件概述
DefaultMQPushConsumerImpl
- 负载均衡实现 RebalanceImpl
- 拉取消息. PullAPIWrapper
- 消费进度存储 OffsetStore
- 消费服务 ConsumeMessageService
- MQClientInstance 客户端核心实现
MQClientInstance
- netty 客户端 业务线程池和回调线程池隔离
- 定时任务
- 负载均衡调度 RebalanceService
- 拉消息任务调度 pullMessageService
- 内部生产者 defaultMQProducer
MQClientInstance 和 消费者为一对多关系。使用InstanceName相同的生产者消费者都使用同一个MQClientInstance。
启动 DefaultMQPushConsumerImpl.start()
-
生成InstanceName,如果用户未设置则为pid。
-
创建 MQClientInstance,使用InstanceName相同的生产者消费者都使用同一个MQClientInstance。MQClientInstance是客户端的核心。
就是说一个MQClientInstance下会与多个消费者。MQClientInstance统一调度他们。
this.mQClientFactory =
MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer,this.rpcHook);
//后面会将消费者注册到mQClientFactory,让mQClientFactory有所有同一InstanceName消费者的引用。
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
- 为负载均衡实现rebalanceImpl 赋值
- 创建PullAPIWrapper 负责拉取消息
- 根据消费模式创建 OffsetStore
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING://广播存储在本地
this.offsetStore =
new LocalFileOffsetStore(this.mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING://集群进度存储在远程
this.offsetStore =
new RemoteBrokerOffsetStore(this.mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
OffsetStore 负责读取消费进度和同步消费进度
-
根据消费模式创建ConsumeMessageService 并启动
并发消费不启动线程。
顺序消费下启动定时任务,会调用消费者的RebalanceImpl的lockAll 方法。向broker发生请求锁住分配给他的队列。
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this,
(MessageListenerOrderly) this.getMessageListenerInner());
}
else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently)
{
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this,
(MessageListenerConcurrently) this.getMessageListenerInner());
}
-
启动MQClientInstance,多消费者引用同一个MQClientInstance时MQClientInstance只会启动一次
mQClientFactory.start();
-
初始化
//向nameser 拉取所关心的topic的路由信息 this.updateTopicSubscribeInfoWhenSubscriptionChanged(); //向所有路由信息里的所有broker发送心跳 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); //唤醒mQClientFactory的负责均衡服务, this.mQClientFactory.rebalanceImmediately();
启动 MQClientInstance.start()
一个MQClientInstance 只会启动一次。
1.启动netty 客户端
this.mQClientAPIImpl.start();//内置netty客户端
2.启动定时任务
this.startScheduledTask();//会启动5个定时任务
- 从远程获取nameServer地址 发生变动时可以更新本地nameServer
远程地址被写死,暂时没有用。
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
-
更新topic路由信息,topic路由发送变动时可以感知
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
-
更新消费进度到broker 最终调用 DefaultMQPushConsumerImpl.offsetStore.persistAll
这里可以看出更新消费进度是异步的,这也是出现重复消息的原因之一
MQClientInstance.this.persistAllConsumerOffset();
- 向broker发送心跳
MQClientInstance.this.cleanOfflineBroker();//清理下线的broker
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();//发送心跳
- 动态调整线程池 根据DefaultMQPushConsumer 的 adjustThreadPoolNumsThreshold 参数和消息在消费者内部的堆积调整
MQClientInstance.this.adjustThreadPool();
- 启动调度服务
//拉消息线程
this.pullMessageService.start();
//Start rebalance service
//重负载线程
this.rebalanceService.start();
//Start push service 内部生产者用于消费失败时,发送重试消息
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
拉消息流程
拉消息的流程是先从负载均衡开始的。MQClientInstance的rebalanceService启动后会定时调用,所有消费者的doRebalance 方法。间隔10s
private static long WaitInterval = 1000 * 10;//间隔10s
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStoped()) {
this.waitForRunning(WaitInterval);//等待
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
public void doRebalance() {
//调用所有消费者的doRebalance
for (String group : this.consumerTable.keySet()) {//consumerTable 消费者引用
MQConsumerInner impl = this.consumerTable.get(group);
if (impl != null) {
try {
impl.doRebalance();
} catch (Exception e) {
log.error("doRebalance exception", e);
}
}
}
}
//消费者最终会调用自己的负载均衡实现的doRebalance方法
@Override
public void doRebalance() {
if (this.rebalanceImpl != null) { //消费者调用自己的rebalanceImpl
this.rebalanceImpl.doRebalance();
}
}
负载均衡实现
先拿到topic路由信息,然后循环对topic做负载
public void doRebalance() {
//取得关心等待topic
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
//对topic做负载
this.rebalanceByTopic(topic);
} catch (Exception e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
//当topic变动时,移除多余topic对应的ProcessQueue
this.truncateMessageQueueNotMyTopic();
}
负载分集群和广播模式,广播模式不讨论
在rocketmq中一个topic有多个队列。负载均衡就是将队列合理的分配给一个消费组的所有消费者。
有多种分配算法,继承AllocateMessageQueueStrategy,默认为AllocateMessageQueueAveragely
//先获取负载所需要的参数
//topic对应的所有队列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//topic对应的所有客户端
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
然后调用,返回的list就是分配给当前消费者的队列
public List<MessageQueue> allocate(//
final String consumerGroup,//
final String currentCID,//
final List<MessageQueue> mqAll,//
final List<String> cidAll//
);
而区分不同客户端的cidAll 就是每个客户端的ip@InstanceName ,使用同一ip下不能有相同的InstanceName。
比如AllocateMessageQueueAveragely有这一行
//取自己在客户端集合的下标,如果两个客户端InstanceName相同,那么index都一样,分配的队列也相同
int index = cidAll.indexOf(currentCID);
而这个负载算法是没有同步和校验等操作的,不同客户端不会进行通信。客户端不知道别人分配了哪些队列。全靠“自觉”,同一组内都使用同一策略那么分配是合理的,如果同一组内使用不同策略,队列的分配就会发生混乱。
拉取任务
rocketmq为每个分配给它的队列生成一个 拉取任务 ProcessQueue
将其存储在PullMessageService 的pullRequestQueue中,这是一个LinkedBlockingQueue
PullMessageService 启动后会从堵塞队列中取出拉取任务,然后进行消息的拉取。
分配队列完成后
//返回队列是否发生了变化
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet);
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet) {//mqSet 分配给当前消费者的队列
boolean changed = false;
//存储上次分配的队列和对应的ProcessQueue拉取任务
//processQueueTable 是ConcurrentHashMap
Iterator<Entry<MessageQueue, ProcessQueue>> it =
this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
if (mq.getTopic().equals(topic)) {//topic 是否相等
if (!mqSet.contains(mq)) { //上次分配队列,这次没分配给我
pq.setDropped(true);//禁用拉取任务 修改dropped属性。是volatile变量
//移除OffsetStore中存储的队列进度,移除前先提交进度
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}",
consumerGroup, mq);
}
}
//据上次拉取间隔 120000ms 也移除它
else if (pq.isPullExpired()) {
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error(
"[BUG]doRebalance, {}, remove unnecessary mq, {},
because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
}
}
//新队列 处理
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
//生成拉取任务
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(new ProcessQueue());
//计算下次拉取的偏移
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
pullRequest.setNextOffset(nextOffset);
pullRequestList.add(pullRequest);
changed = true;
//记录下 用于下次对比
this.processQueueTable.put(mq, pullRequest.getProcessQueue());
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
//将拉取任务压入堵塞队列
//最终调用
//PullMessageService.executePullRequestImmediately
//this.pullRequestQueue.put(pullRequest);
this.dispatchPullRequest(pullRequestList);
return changed;
}
拉取消息
现在知道一个队列对应一个拉取任务ProcessQueue,存放在堵塞队列中,如果禁用了会将dropped属性修改为true。
谁来执行拉取呢,MQClientInstance.PullMessageService。
PullMessageService 启动后从堵塞队列取出拉取任务,找到对应的组调用pullMessage
PullMessageService 为单线程,所有拉取消息时为单线程拉取
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStoped()) {
//从堵塞队列中取出1
PullRequest pullRequest = this.pullRequestQueue.take();
if (pullRequest != null) {
this.pullMessage(pullRequest);
}
}
log.info(this.getServiceName() + " service end");
}
private void pullMessage(final PullRequest pullRequest) {
//找到对应的组调用pullMessage
final MQConsumerInner consumer =
this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
//调用消费者的pullMessage,最终调用pullAPIWrapper.pullKernelImpl
impl.pullMessage(pullRequest);
}
}
DefaultMQPushConsumerImpl.pullMessage
先进行限流等检查,如果不能通过会调用executePullRequestLater() 将任务放回队列,下次消费。
public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
//提交到定时任务中
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {//待会在放入队列
PullMessageService.this.executePullRequestImmediately(pullRequest);
}
}, timeDelay, TimeUnit.MILLISECONDS);
}
也会检查是否禁用。正常的任务拉取完成会放回队列,等待下次拉取。
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {//检查dropped属性。volatile修饰
log.info("the pull request[{}] is droped.", pullRequest.toString());
//被禁用直接抛弃 没被禁用的用完会放回队列
return;
}
都完成后创建一个回调函数 PullCallback,然后异步拉取
因为网络层是netty,所以其实所有请求都是异步。同步的操作只是做了异步转同步而已。
this.pullAPIWrapper.pullKernelImpl(//
pullRequest.getMessageQueue(), // 1
subExpression, // 2
subscriptionData.getSubVersion(), // 3
pullRequest.getNextOffset(), // 4
this.defaultMQPushConsumer.getPullBatchSize(), // 5
sysFlag, // 6
commitOffsetValue,// 7
BrokerSuspendMaxTimeMillis, // 8
ConsumerTimeoutMillisWhenSuspend, // 9
CommunicationMode.ASYNC, // 10
pullCallback// 11
);
请求成功后触发回调函数。主要看 case FOUND,就可以了。其他代表没有新消息,偏移量不对等
//这里有一个mq自己实现的性能统计。我们在外部也可以拿到
consumer.getDefaultMQPushConsumerImpl().getConsumerStatsManager()
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult =
DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(
pullRequest.getMessageQueue(), pullResult, subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
//性能统计
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(
pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null ||
pullResult.getMsgFoundList().isEmpty()) {
//空消息,放回队列
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
//性能统计
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(
pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(),
pullResult.getMsgFoundList().size());
boolean dispathToConsume =
processQueue.putMessage(pullResult.getMsgFoundList());
//开始消费
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
pullResult.getMsgFoundList(), //
processQueue, //
pullRequest.getMessageQueue(), //
dispathToConsume);
case NO_NEW_MSG:
case NO_MATCHED_MSG:
case OFFSET_ILLEGAL:
default:
break;
}
}
}
开始消费
这里有一个分批消费的逻辑,根据consumeMessageBatchMaxSize拆分
取决于这个参数private int consumeMessageBatchMaxSize = 1;
如果设置大于1那么这批消息消费时只能全部成功或者全部失败
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
else {
for (int total = 0; total < msgs.size();) {
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
}
else {
break;
}
}
//创建一个消费job
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue,
messageQueue);
//提交到线程池
this.consumeExecutor.submit(consumeRequest);
}
}
//ConsumeRequest 是Runnable的实现
ConsumeRequest implements Runnable
ConsumeRequest的run方法
@Override
public void run() {
//又进行了队列禁用的校验
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped {}",
this.messageQueue);
return;
}
//用户的消费listener 实现
MessageListenerConcurrently listener =
ConsumeMessageConcurrentlyService.this.messageListener;
//创建Context
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
//这个Context 用于hook,在4.5的消息追踪中是借助此hook和Context实现的
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext
.setConsumerGroup(ConsumeMessageConcurrentlyService.this.defaultMQPushConsumer
.getConsumerGroup());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
//调用hook:ConsumeMessageHook
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl
.executeHookBefore(consumeMessageContext);
}
long beginTimestamp = System.currentTimeMillis();
try {
//将重试消息的topic替换为原来的topic
ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
//调用用户方法
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
}
catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",//
RemotingHelper.exceptionSimpleDesc(e),//
ConsumeMessageConcurrentlyService.this.consumerGroup,//
msgs,//
messageQueue);
}
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {//返回null 或者异常设置为失败
log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",//
ConsumeMessageConcurrentlyService.this.consumerGroup,//
msgs,//
messageQueue);
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
// add by fuhaining@yolo24.com
if (consumeMessageLog.isInfoEnabled()) {
StringBuilder keys = new StringBuilder();
for (MessageExt msg : msgs) {
keys.append(msg.getMsgId()).append(",");
}
consumeMessageLog.info("concurrently - " + status.name() + " : " +
keys.deleteCharAt(keys.length() - 1).toString());
}
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS ==
status);
//调用hook
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl
.executeHookAfter(consumeMessageContext);
}
ConsumeMessageConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(
ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(),
consumeRT);
//再次校验
if (!processQueue.isDropped()) {
//处理结果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
}
else {
log.warn(
"processQueue is dropped without process consume result. messageQueue={},
msgTreeMap={}, msgs={}",
new Object[] { messageQueue, processQueue.getMsgTreeMap(), msgs });
}
}
重试消息
//ConsumeMessageConcurrentlyService.processConsumeResult方法
//在前面会进行性能统计
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING://广播略过
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>
(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
//发送重试消息
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
//发送失败的进定时任务,重试
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue());
}
break;
default:
break;
}
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0) {
// 将消费进度提交到OffsetStore
// OffsetStore 只会将进度记下,由前面说的定时任务同步给broker
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
将要重试的消息发会broker。只是把原来的id发回去。broker在会根据id读取原来消息的消息体
生成重试消息
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
//消息原来存哪,发会到哪
String brokerAddr =(null != brokerName) ?
this.mQClientFactory.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000);
}
catch (Exception e) {
//如果发送失败,使用内部生产者发送
this.mQClientFactory.getDefaultMQProducer().send(newMsg);
}
// consumerSendMessageBack 方法
ConsumerSendMsgBackRequestHeader requestHeader = new
ConsumerSendMsgBackRequestHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
requestHeader.setGroup(consumerGroupWithProjectGroup);
//原来的topic
requestHeader.setOriginTopic(msg.getTopic());
//原消息的偏移
requestHeader.setOffset(msg.getCommitLogOffset());
//重试级别
requestHeader.setDelayLevel(delayLevel);
//记录原来的id
requestHeader.setOriginMsgId(msg.getMsgId());
//通过netty发送
RemotingCommand response = this.remotingClient.invokeSync(addr,
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
流程总结
-
MQClientInstance的rebalanceService 线程启动。定时调用消费者的负载均衡实现RebalanceImpl的doRebalance方法。
-
RebalanceImpl根据负载策略AllocateMessageQueueStrategy计算属于自己的队列
-
根据队列的变化,生成新的拉取任务 ProcessQueue 或者将原来的ProcessQueue禁用
-
将新的 ProcessQueue放入MQClientInstance的PullMessageService的pullRequestQueue这是一个LinkedBlockingQueue
-
PullMessageService的线程会从队列中取出,然后调用对应消费者的PullAPIWrapper的pullKernelImpl方法发送请求拉取
-
拉取为异步,在回调中将消息封装成ConsumeMessageConcurrentlyService.ConsumeRequest任务提交到ConsumeMessageConcurrentlyService的线程池ScheduledExecutorService
-
最终调用用户的实现进行消费
-
将消费失败消息发回broker生成重试消息
-
消费成功将进度写入消费者的OffsetStore 定时回写broker
网友评论