消息从producer发送到了broker之后,消息的订阅者就可以订阅消费消息。
-
roketmq消息拉取方式有两种方式:推(push)和拉(pull),推其实也只是对pull的一层封装,本质还是拉的方式。
-
rocketmq消息消费模式:集群消费和广播消费,不同的消费方式区别还是有点大的,消息队列分配、消息ack机制和消息消费进度管理上都有区别。
-
消息消费方式:并行消费和顺序消费。并行消费是ConsumeMessageConcurrentlyService进行处理,顺序消费是ConsumeMessageOrderlyService进行处理,顺序消费rocketmq只支持单队列的顺序。
我们以Pull拉取方式来进行分析:
启动一个consumer实例来进行消费,需要设置以下信息:
-
consumeGroup:设置消费者所在消费组
-
consumeFromWhere:设置消费偏移量,CONSUME_FROM_FIRST_OFFSET:从最开始消息消费,CONSUME_FROM_TIMESTAMP:从某个时间戳开始消费消息
-
subscribe:设置订阅信息,主要是订阅的topic以及tag信息
-
messageListener:设置消息消费业务处理逻辑,实际处理消息的业务逻辑
以上这些信息都是后续消息拉取消费的基本信息很有作用
public static void main(String[] args) throws InterruptedException, MQClientException {
//创建consumer实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
//设置消费偏移量
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置consumer订阅topic和tag信息
consumer.subscribe("TopicTest", "*");
//设置consumer的消息消费业务逻辑MessageListener
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动consumer
consumer.start();
System.out.printf("Consumer Started.%n");
}
接下来看下DefaultMQPushConsumer:这是消息消费的具体实例
消息消费实例在创建的时候,设置了consumeGrou信息和负载均衡策略,默认的负载均衡策略是平均分配的策略,这里rocketmq用到了策略模式
//创建实例的构造函数
public DefaultMQPushConsumer(final String consumerGroup) {
//这里设置了consumer的consumeGroup信息和消息消费的负载均衡策略,默认平均分配策略
this(consumerGroup, null, new AllocateMessageQueueAveragely());
}
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
//设置consumerGroup
this.consumerGroup = consumerGroup;
//设置消费负载均衡策略
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
//创建消息消费内部对象实例,具体的消息拉取消费委托给这个类
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
这里列举下DefaultMQPushConsumer的数据结构,简单认识下:
DefaultMQPushConsumer属性 //内部实现,这里大多数功能都是委托给它来处理
protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
//消费组名称
private String consumerGroup;
//消息消费模式
private MessageModel messageModel = MessageModel.CLUSTERING;
//消费偏移量
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
//消费时间戳
private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
//负载均衡策略
private AllocateMessageQueueStrategy allocateMessageQueueStrategy;
//消费订阅信息,以topic为维度
private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();
//消息消费业务逻辑
private MessageListener messageListener;
//消费进度管理器,集群消费和广播消费来确定管理器类型
private OffsetStore offsetStore;
//消息消费线程最小数
private int consumeThreadMin = 20;
//消息消费线程最大数
private int consumeThreadMax = 64;
//动态调整线程池个数阈值
private long adjustThreadPoolNumsThreshold = 100000;
//并发最大消息2000个
private int consumeConcurrentlyMaxSpan = 2000;
//本地拉取消息堆积个数,用于消息流量控制
private int pullThresholdForQueue = 1000;
//本地拉取消息堆积大小,用于消息流量控制
private int pullThresholdSizeForQueue = 100;
private int pullThresholdForTopic = -1;
private int pullThresholdSizeForTopic = -1;
private long pullInterval = 0;
//消息消费批次数
private int consumeMessageBatchMaxSize = 1;
//消息批量拉取数
private int pullBatchSize = 32;
private boolean postSubscriptionWhenPull = false;
private boolean unitMode = false;
//消息消费最大次数,-1表示16次,用延迟消息的方式来实现重新消费
private int maxReconsumeTimes = -1;
private long suspendCurrentQueueTimeMillis = 1000;
//消费超时时间15分钟
private long consumeTimeout = 15;
//这个是消息链路追踪用的
private TraceDispatcher traceDispatcher = null;
消息拉取消费的源头始于DefaultMQPushConsumerImpl.start(),这个是案发第一现场:
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
//消费者一些校验
this.checkConfig();
//将defaultMQPushConsumer中的订阅信息拷贝到rebalanceImpl中
this.copySubscription();
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
//创建或获取MQClientInstance,IP@InstanceName作为key,所以一个应用中他只有一个MQClientInstance
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
//消息广播模式,采用客户端本地管理消息消费进度
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
//消息集群模式,采用broker远端管理消息消费进度
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
//加载消息队列消费进度,以messageQueue为维度,广播模式从本地文件读取,集群模式从个空实现
this.offsetStore.load();
//设置消息消费服务,并行消费和顺序消费是不同的
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
//消息消费服务线程启动,没有消息的话线程会阻塞
this.consumeMessageService.start();
//注册消息消费类,以consumerGroup作为key,所以一个group中,必须消息订阅信息都是相同,不然会出现订阅信息覆盖,导致最终消息丢失
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
//MQClientInstance启动,这个是重头戏,开启了netty客户端,拉消息线程,消息消费的负载均衡,启动定时任务等
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
//更新消费者订阅主题的路由信息
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
//发送心跳信息
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
//开启消息队列负载均衡
this.mQClientFactory.rebalanceImmediately();
}
这里主要是做了下面这些事情:
-
将defaultMQPushConsumer中的订阅消息拷贝到rebalanceImpl中
-
创建或获取MQClientInstance,他是以IP@InstanceName作为keyMQClientInstance作为value,放在map中,一个应用中他只有一个实例
-
设置PullAPIWrapper,这个是拉取和处理消息的包装器
-
设置消息消费进度管理器,集群消费RemoteBrokerOffsetStore,广播消费LocalFileOffsetStore
-
加载消息队列消费进度this.offsetStore.load(),LocalFileOffsetStore从本地文件进行加载,RemoteBrokerOffsetStore是一个空的实现,其实最后是从broker获取消息消费进度
-
设置消息消费服务,并行消费是ConsumeMessageConcurrentlyService,顺序消费是ConsumeMessageOrderlyService
-
启动消息消费服务线程,this.consumeMessageService.start(),没有消息过来时,线程是阻塞的
-
注册消息消费类,以consumerGroup作为key,所以一个group中,必须消息订阅信息都是相同,不然会出现订阅信息覆盖,导致最终消息丢失
-
MQClientInstance线程启动,这个和DefaultMQPushConsumerImpl是一样的重头戏,它里面开启 了Netty客户端线程,拉取消息线程,消息消费的负载均衡,启动定时任务等。这里稍后分析,重中之重。
-
更新主体订阅信息
-
发送心跳信息给所有的broker
-
唤醒消息队列负载均衡线程
订阅信息的拷贝:订阅信息的拷贝主要是为了后面的消息拉取过滤和消息消费队列的负载均衡
-
把consumer的订阅信息,按topic的维度进行封装,因为一个consumer可能会订阅多个topic主题信息。封装的信息主要是topic、subString、tag和tag的hashCode信息
-
集群消费模式下,为这个consumerGroup构建一个重试主题信息,topic为%RETRY%+consumerGroup,订阅信息是*所有的
private void copySubscription() throws MQClientException {
try {
//获取consumer的消息订阅信息,一个consumer可能订阅多个topic信息
Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
if (sub != null) {
for (final Map.Entry<String, String> entry : sub.entrySet()) {
final String topic = entry.getKey();
final String subString = entry.getValue();
//组装订阅信息,主要是topic,consumerGroup,substring,tag和tag的hashcode
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
topic, subString);
//将这些订阅信息存放到rebalanceImpl中,以topic的维度
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
}
if (null == this.messageListenerInner) {
this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING:
//集群模式下,为一个consumerGroup构建一个重试主体订阅信息,topic为%RETRY%+consumerGroup,订阅信息是*,所有的重试消息
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
MQClientInstance启动流程:这里的东西都很关键,他是整个JVM共用的,多个consumer实例都是使用的一个MQClientInstance
-
rocketmq网络通信是基于netty的,rocketmq基于netty实现了一个私有化协议,所以需要开启一个netty客户端线程
-
启动定时任务,主要是获取nameServer地址,更新主题路由信息,给所有broker发送心跳信息,持久化消息消息消费进度信息
-
开启拉取消息服务线程,这个线程会不停地从消息拉取请求队列里面拿出请求,然后拉取消息进行消费
-
开启消息队列负载均衡线程
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
//开启客户端通信,netty网络通信客户端
this.mQClientAPIImpl.start();
//开启定时任务,获取nameServer地址,更新主体路由信息,给所有broker发送心跳信息,持久化消息消息消费进度信息
this.startScheduledTask();
//开启拉取消息服务线程,从消息拉取请求队列里面拿出请求拉取消息
this.pullMessageService.start();
//进行消息队列负载均衡处理
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
客户端向集群中所有的broker上报心跳信息:这里上报的消息主要是当前客户端的consumer信息和producer信息
-
consumer信息:groupName、消费偏移量、主题消息订阅信息
-
producer信息:producer group name
当前客户端往集群中的所有broker发送心跳信息,如果当前客户端只有producer信息,就只往master broker发送心跳信息,这样节省网络开销提升性能。定时任务会每30秒向所有broker发送心跳信息,同步consumer和producer信息。
private void sendHeartbeatToAllBroker() {
//准备当前客户端的consumer和producer的信息
final HeartbeatData heartbeatData = this.prepareHeartbeatData();
final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
if (producerEmpty && consumerEmpty) {
log.warn("sending heartbeat, but no consumer and no producer");
return;
}
if (!this.brokerAddrTable.isEmpty()) {
//发送心跳次数统计信息
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
//往集群中的所有broker发送心跳信息
while (it.hasNext()) {
Entry<String, HashMap<Long, String>> entry = it.next();
String brokerName = entry.getKey();
HashMap<Long, String> oneTable = entry.getValue();
if (oneTable != null) {
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
Long id = entry1.getKey();
String addr = entry1.getValue();
if (addr != null) {
if (consumerEmpty) {
//只有producer的信息的话,就不向slave broker发送心跳信息
if (id != MixAll.MASTER_ID)
continue;
}
try {
//采用netty客户端,向broker发送心跳信息
int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
if (!this.brokerVersionTable.containsKey(brokerName)) {
this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
}
this.brokerVersionTable.get(brokerName).put(addr, version);
if (times % 20 == 0) {
log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
log.info(heartbeatData.toString());
}
} catch (Exception e) {
if (this.isBrokerInNameServer(addr)) {
log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
} else {
log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
id, addr, e);
}
}
}
}
}
}
}
}
Consumer消费负载均衡:rocketmq的负载均衡都是在客户端进行实现的,producer发送的时候轮询主体队列进行发送消息,consumer消费消息的时候按消息队列和消费者进行负载均衡。
public class RebalanceService extends ServiceThread {
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
}
public class MQClientInstance {
public void doRebalance() {
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
}
RebalanceService负载均衡线程不停滴进行消费的负载均衡处理,按照每个consumerGroup进行doRebalance。然后每个consumer按每个topic进行rebalance,我们以集群消费模式去进行分析:
public void doRebalance(final boolean isOrder) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
//按主题进行负载均衡处理
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
//删除非当前consumer订阅的消息队列
this.truncateMessageQueueNotMyTopic();
}
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
case CLUSTERING: {
//获取主体的队列路由信息,因为定时任务会不停滴从nameServer获取主体路由信息
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//任意挑选一个broker来获取主体所有的消费者,因为consumer定时向集群中的所有broker发送了心跳信息,里面携带了消费者订阅信息,所以可以任意选一个broker
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
//排序
Collections.sort(mqAll);
Collections.sort(cidAll);
//获取负载均衡策略,rocketmq默认的是AllocateMessageQueueAveragely平均分配
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
//根据当前负载均衡策略,依据主题队列信息,所有的消费者信息,当前消费者来获取当前消费者的消费队列
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
//更新消息队列,移除非当前消费者订阅队列,新加入队列创建拉取消息请求
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
广播和集群的负载均衡不同,主要不同是广播是需要处理主题下的所有消息队列,集群是需要处理根据负载均衡策略产生的队列。
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
if (mq.getTopic().equals(topic)) {
if (!mqSet.contains(mq)) {
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) {
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
}
}
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
//这里就把消息拉取请求添加到了PullMessageServic的pullRequestQueue队列中
this.dispatchPullRequest(pullRequestList);
return changed;
}
-
移除不属于当前consumer的MessageQueue,因为负载均衡进行了队列分配,所以当前consumer只消费分配给自己的消息队列MessageQueue。
-
当前consumer消费的消息队列都是存放在processQueueTable中,ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable,每个消息对对对应一个处理队列。负载均衡新分配的队列会生成PullRequest消息拉取请求computePullFromWhere计算消息消费偏移量。这里常见消息拉取请求,把PullRequest添加到了PullMessageService中的pullRequestQueue中,这样就唤醒了PullMessageService消息拉取线程。
PullMessageService是具体处理消息拉取请求的,他是继承自ServiceThread,也可看成一个拉取消息的线程,在后台不停地从阻塞队列里拿消息拉取请求,然后从broker拉取消息消费。
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
//从阻塞队列获取消息拉取请求
PullRequest pullRequest = this.pullRequestQueue.take();
//拉取消息
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
//具体消息的拉取,这个是交给DefaultMQPushConsumerImpl来处理的
public void pullMessage(final PullRequest pullRequest) {
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
log.info("the pull request[{}] is dropped.", pullRequest.toString());
return;
}
//...
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
if (!this.consumeOrderly) {
//...
} else {
//这里是处理顺序消息的,处理拉取请求之前先要获取消费处理队列锁
if (processQueue.isLocked()) {
if (!pullRequest.isLockedFirst()) {
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
//...
pullRequest.setLockedFirst(true);
pullRequest.setNextOffset(offset);
}
} else {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.info("pull message later because not locked in broker, {}", pullRequest);
return;
}
}
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
//处理拉取到的消息
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
//设置下一次拉取请求的消息偏移量 pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
//...
break;
default:
break;
}
}
}
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
};
try {
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
}
PullMessageService在处理消息拉取请求的时候做了这些事情:
- 获取当前消息消费处理队列,判断当前待处理队列缓存未处理消息个数和消息大小,这个是进行消息消费流量控制的,防止客户端处理不过来继续拉取消息把客户端搞挂了。未处理消息个数超过1000个或消息大小超过100M就延迟拉取。
- 顺序消费的时候,拉取消息前需要获取消息消费处理队列的锁。这个是顺序消费里面的内容,后续顺序消息里面再讲。其实顺序消息处理就是做什么之前先去获取锁,创建消息拉取请求之前获取锁,执行消息拉取请求的时候先去获取锁,拉取到的消息消费之前先去获取锁。
- 这里把消息消费封装成PullCallback,用Netty的ChannelFutureListener异步回调,待消息拉取成功来回调PullCallback进行消息消费处理。
- 处理通过Netty拉取到的消息pullAPIWrapper.processPullResult,这里会把拉取到的消息用消费者订阅信息的tag进行过滤下,以为在broker端是用tag的hashcode进行过滤的,所以这里要用tag整串进行匹配下。
- 把拉取到的消息放到消息消费处理队列中processQueue.putMessage(pullResult.getMsgFoundList()),供后续消费
- 提交消息消费请求到ConsumeMessageService中
- 重新创建消息拉取请求,broker端会返回消息下一个拉取偏移量。这里拿到这个下一个消息偏移量重新构建消息拉取请求,继续拉取消息。这里就是Push方式,他是封装的Pull方式,不停地构建拉取请求拉取消息。
消息消费最后是把拉取到的消息封装成ConsumeRequest提交给ConsumeMessageService进行处理的,ConsumeMessageService对于并行消费和顺序消费分别有对应不同的实现类ConsumeMessageConcurrentlyService和ConsumeMessageOrderlyService,在它们里面是有个消费处理线程池,ConsumeRequest也是一个任务,提交到线程池中分配线程进行处理。
先来看下ConsumeMessageOrderlyService:
public void run() {
//消息消费前获取MessageQueue锁
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
//广播消息直接进来了,集群消息需要先锁定ProcessQueue才行
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) {
long interval = System.currentTimeMillis() - beginTime;
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
break;
}
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
if (!msgs.isEmpty()) {
try {
//获取ProcessQueue的重入锁
this.processQueue.getLockConsume().lock();
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
break;
}
//调用业务消费消息
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
} finally {
this.processQueue.getLockConsume().unlock();
}
//处理消息消费结果
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}
}
} else {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
}
- 获取MessageQueue队列的锁,获取到锁之后才能进去消费,没有获取到锁,延迟再进行消费
- 获取消息消费处理队列ProcessQueue的锁,这里是可重入锁
- 调用消息业务处理逻辑
- 处理消息消费结果
ConsumeMessageConcurrentlyService和ConsumeMessageOrderlyService的处理流程没有太大的区别,只是并行消费不需要获取MessageQueue和ProcessQueue 的锁。
接下来看下消息消费处理结果:
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty())
return;
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
case RECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
- 消息消费成功的时候ackIndex会被设置为consumeRequest.getMsgs().size() - 1,但是消费处理失败的时候,ackIndex会被设置为-1,这样这一批消息都会被从头重新消费一次,所以这里存在重复消费的问题
- 集群模式下消息消费失败会进行重试,会把这个消息发送到broker,broker会为每个消费组设置一个retry topic,这个消费组的consumer在构建消息订阅信息的时候,就已经为这个消费者构建了重试队列的订阅信息。如果发送重试消息失败,会把这个消息消费请求提交到本地,延迟再进行消费。
- 更新MessageQueue的消息消费偏移量,不同的消费进度管理方式处理方式不同,但是这里都是先更新到本地缓存中, 后续有定时任务会进行消费进度的持久化。
网友评论