在前面讲了消费者是如何拉取消息的,具体入口如下
public class PullMessageService extends ServiceThread {
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
}
}
通过一个线程,循环地从pullRequestQueue(拉取请求的队列中)获取拉取消息的请求,随后进行处理,如果没有会一直等待。那么这个队列的数据是如何生成的。这里的话,涉及到消费者中另外一个很重要的功能,负载功能。
先来说说负载功能是做啥的。从消费者的角度来说,就是定时的去更新所消费的消息队列,就是上个时刻,这个消费者所消费的队列是A,下一刻可能是B。那么为什么要这么设计?
从负载功能的实现上来看:
消费者通过所订阅的主题,从注册中心获取到该主题对应的所有消息队列,以及路由数据(每个消息队列对应的Broker的真实地址)
通过分配策略,随机选取该主题的某个broker,将该broker的部分队列分配给消费者,而以前消费的队列,则重新分配给其他消费者了,是不是之前的队列,还是要看具体的策略。
在消费者所消费的队列所映射的Broker故障的时候,那么一直重试是毫无意义的,通过负载功能的分配策略,采取合适的分配策略,可以给消费者重新分配可用的消息队列,避免资源浪费。
此外分配策略是根据主题下的所有消息队列,以及同个消费组的所有消费者进行分配。如果采取的是平均分配的策略,在消费者增多的时候,每个消费者所承担的压力会减小,反之,队列增多的时候,不会让某个消费者的压力暴增,而是平均分配。
简而言之就是实现了负载均衡,具体看策略。
下面还是先来看看这个负载均衡是如何实现的。
public class MQClientInstance {
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
}
上面代码比较简单,其实就是消费者启动的功能。RebalanceService的结构与PullMessageService的结构是相似的,都是runnable的子接口。
RebalanceService的结构
public class RebalanceService extends ServiceThread {
private static long waitInterval =
Long.parseLong(System.getProperty(
"rocketmq.client.rebalance.waitInterval", "20000"));
private final InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mqClientFactory;
public RebalanceService(MQClientInstance mqClientFactory) {
this.mqClientFactory = mqClientFactory;
}
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
@Override
public String getServiceName() {
return RebalanceService.class.getSimpleName();
}
}
从代码上来看,一样是runnable的接口,所以启动的话,也比较直观了。
RebalanceService的启动
父类中已经实现好了
public abstract class ServiceThread implements Runnable {
private Thread thread;
public void start() {
log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
if (!started.compareAndSet(false, true)) {
return;
}
stopped = false;
this.thread = new Thread(this, getServiceName());
this.thread.setDaemon(isDaemon);
this.thread.start();
}
}
还是看看如何进行负载均衡
还是回到RebalanceService的run方法,通过代码得知,负载均衡是通过客户端的实例来实现的。
public class RebalanceService extends ServiceThread {
private final MQClientInstance mqClientFactory;
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
通过客户端实例来实现负载均衡
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
}
public class MQClientInstance {
客户端实例,用于记录消费者与消费者核心实现的关系。
private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
public void doRebalance() {
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
往下走,实现类为DefaultMQPushConsumerImpl
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
}
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
@Override
public void doRebalance() {
if (!this.pause) {
继续往下走,关注实现即可
this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
}
}
}
public abstract class RebalanceImpl {
用于维护 消费者的订阅数据
protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
new ConcurrentHashMap<String, SubscriptionData>();
public void doRebalance(final boolean isOrder) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
对订阅的主题进行负载均衡
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
}
一个mq客户端中,只有一个MQClientInstance,会维系这个客户端中,每个group对应的消费者。一个客户端中,每个group的消费者只能有一个。
每个消费者则会有一个消费的实现类实例,消费的实现类实例中,则维护了对应的负载均衡服务。负载均衡则根据维护的订阅数据,进行队列的重新分配。
RebalanceService实现的细节
主要有俩种模式,集群 与 广播
集群
public abstract class RebalanceImpl {
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: ...省略,广播模式
case CLUSTERING: {
-------------------------------------------------------------------------------------------------------------------------------------------
1.获取队列,与同组的消费者id
从注册中心获取该主题下的所有队列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
随机从该主题下的某个broker中,获取同组的所有消费者id,因为消费者会向该主题下所有的broker发送心跳,相当于注册到broker中。
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
-------------------------------------------------------------------------------------------------------------------------------------------
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}
2.进行队列的重新分配
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
--------------------------------------------------------------------------------------------------------------------------------------
3.判断队列分配过后,是否分配的队列与原先消费的队列一致
如果不一致的话,则需要清理相关的缓存,以及将缓存的数据同步
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
-------------------------------------------------------------------------------------------------------------------------------------
}
break;
}
}
}
}
大致就分为3步。
1.获取队列,与同组的消费者id
2.进行队列的重新分配
3.更新队列缓存
1.获取队列,与同组的消费者id
先来说说这一步是怎么做的
根据主题,先从缓存中获取队列信息。topicSubscribeInfoTable是消费者启动的时候,通过注册中心,拉取到该主题的所有路由数据,随后存储到缓存中
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
获取同个消费组的所有消费者id,那么是如何获取到同组的所有的消费者id呢
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
队列的同步,负载均衡服务中队列缓存的同步
主题的消息队列是通过启动的时候,用一个定时任务定时去更新路由数据,顺便更新订阅数据,具体代码如下,实现的话不是本次需要关注的点,所以暂时只定位代码的位置,追踪一下最后是否会同步更新topicSubscribeInfoTable
public class MQClientInstance {
private void startScheduledTask() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
省略部分代码
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
省略部分代码
}
}
获取同组消费者的所有id
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
看看这句代码的实现
public class MQClientInstance {
public List<String> findConsumerIdList(final String topic, final String group) {
随机选取一个broker的master的地址
String brokerAddr = this.findBrokerAddrByTopic(topic);
if (null == brokerAddr) {
缓存中不存在,通过注册中心再次拉取数据,并存到缓存中
this.updateTopicRouteInfoFromNameServer(topic);
从缓存中获取地址
brokerAddr = this.findBrokerAddrByTopic(topic);
}
if (null != brokerAddr) {
通过Broker获取同一组内,所有的消费者id,并且返回。
try {
return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000);
} catch (Exception e) {
log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e);
}
}
return null;
}
public String findBrokerAddrByTopic(final String topic) {
随机选取该主题下的某个broker,返回该broker的master的地址
TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
if (topicRouteData != null) {
List<BrokerData> brokers = topicRouteData.getBrokerDatas();
if (!brokers.isEmpty()) {
int index = random.nextInt(brokers.size());
BrokerData bd = brokers.get(index % brokers.size());
return bd.selectBrokerAddr();
}
}
return null;
}
}
因为每个消费者都会定时地topic里的所有Broker发送心跳,顺便注册,所以该topic下的所有的broker的cids(消费者id数据)都是一样的。所以可以随机选择一个broker的消费者id集合,因为topic下的所有broker的cids都是一样的。
定时发送心跳的代码如下
public class MQClientInstance {
private void sendHeartbeatToAllBroker() {
final HeartbeatData heartbeatData = this.prepareHeartbeatData();
final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
if (producerEmpty && consumerEmpty) {
log.warn("sending heartbeat, but no consumer and no producer. [{}]", this.clientId);
return;
}
if (!this.brokerAddrTable.isEmpty()) {
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, HashMap<Long, String>> entry = it.next();
String brokerName = entry.getKey();
HashMap<Long, String> oneTable = entry.getValue();
if (oneTable != null) {
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
Long id = entry1.getKey();
String addr = entry1.getValue();
if (addr != null) {
if (consumerEmpty) {
if (id != MixAll.MASTER_ID)
continue;
}
try {
int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
if (!this.brokerVersionTable.containsKey(brokerName)) {
this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
}
this.brokerVersionTable.get(brokerName).put(addr, version);
if (times % 20 == 0) {
log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
log.info(heartbeatData.toString());
}
} catch (Exception e) {
if (this.isBrokerInNameServer(addr)) {
log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
} else {
log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
id, addr, e);
}
}
}
}
}
}
}
}
}
2.进行队列的重新分配
这里分配策略其实是比较复杂的。暂时不往细讲。
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
3.更新队列的缓存
分配完队列之后,要做的就是判断是否与之前维护的消息队列不一致,不一致的话,则需要将旧的队列信息进行清理。同时旧队列相关的操作也要停止
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
具体来看看是如何更新队列的
public abstract class RebalanceImpl {
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
---------------------------------------------------------------------------------------------------------------------------------------------
1.移除旧的队列
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
通过遍历processQueueTable,通过比较processQueueTable中的数据,与新分配的消息队列,将旧的队列清理
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
if (mq.getTopic().equals(topic)) {
如果新分配的队列中不包含该队列,说明该队列已经由其他消费者来进行维护
if (!mqSet.contains(mq)) {
将pq的废弃标记设为true
pq.setDropped(true);
清理缓存,并且移除出队列
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) {
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
}
}
-----------------------------------------------------------------------------------------------------------------------------------------------
2.将新分配的消息队列,生成新的拉取任务列表。
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
移除旧的偏移量
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
计算出该消息队列的偏移量
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
--------------------------------------------------------------------------------------------------------------------------------------------
3.将新的拉取任务列表进行分发
this.dispatchPullRequest(pullRequestList);
-------------------------------------------------------------------------------------------------------------------------------------------
return changed;
}
}
3.1移除旧的队列
从上面的代码代码来看,通过新队列中是否包含旧队列,就知道是否要将该旧队列移除
那么问题来了,如何移除改旧队列,除了移除出processQueueTable,还需要做什么操作。
此前说,集群模式下的偏移量是保存在Broker的,那么移除旧队列的时候,是否要将该消息队列的消费情况同步到原先的broker中呢?不然后面消费该队列的消费者,怎么知道这个队列的消费进度,从何处开始消费呢
看看下面的代码
public class RebalancePushImpl extends RebalanceImpl {
@Override
public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
偏移量持久化
this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
移除本地缓存
this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
省略部分代码
return true;
}
}
持久化偏移量,以及移除偏移量缓存的实现
public class RemoteBrokerOffsetStore implements OffsetStore {
@Override
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
}
if (findBrokerResult != null) {
UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setCommitOffset(offset);
if (isOneway) {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
} else {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
}
} else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
}
public void removeOffset(MessageQueue mq) {
if (mq != null) {
this.offsetTable.remove(mq);
log.info("remove unnecessary messageQueue offset. group={}, mq={}, offsetTableSize={}", this.groupName, mq,
offsetTable.size());
}
}
}
3.2将新分配的消息队列,生成新的拉取任务列表
第二步的逻辑很清楚,就跳过
3.3.将新的拉取任务列表进行分发
第三步逻辑很简单, 将新生成的拉取请求列表,进行执行
public class RebalancePushImpl extends RebalanceImpl {
@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
for (PullRequest pullRequest : pullRequestList) {
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
}
}
}
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void executePullRequestImmediately(final PullRequest pullRequest) {
this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
}
}
相对来说广播模式比较简单,消费者只需要关心消息队列的缓存是否发送变化,然后是否要去移除旧的队列即可,也不需要将消息队列的偏移量进行同步,因为广播的消息偏移量只需要客户端自己知晓即可。
网友评论