在rocketmq里面。消费消息的方式有俩种。
DefaultMQPushConsumer的结构
先来看看DefaultMQPushConsumer的结构
DefaultMQPushConsumer.png
结构如上图所示
从结构看出,DefaultMQPushConsumer具备了以下的功能
1.管理topic,具备创建topic的功能,以及相关的查询功能(如检索队列的偏移量,以及根据msgId获取消息信息,根据关键字查询消息)---MQAdmin接口
2.拉取消息队列信息,消费失败的时候对Broker的通知---MQConsumer
3.订阅/取消订阅主题,注册监听器,消费行为的控制(开始,关闭,暂停,恢复)---MQPushConsumer
在DefaultMQPushConsumer中有一个很重要的成员变量。
DefaultMQPushConsumerImpl
DefaultMQPushConsumerImplDefaultMQPushConsumer的启动是通过成员变量DefaultMQPushConsumerImpl来执行的。
下面直接来看看DefaultMQPushConsumerImpl的启动方法
代码如下
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
默认状态为仅创建
private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
MQ客户端实例
private MQClientInstance mQClientFactory;
对应的消费者实例---控制反转
private final DefaultMQPushConsumer defaultMQPushConsumer;
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
//检查配置项
this.checkConfig();
//复制订阅信息到rebalanceImpl(负载服务的成员变量中)
this.copySubscription();
//如果为集群模式,将消费者的实例id改为pid@hostname,避免多个实例的时候,实例名称出现重复
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
//负载均衡服务的初始化---这里面细节还是挺多的,所以暂时忽略
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
//由于消费,必然要知道从何处开始消费。对于不同的订阅模式,偏移量的保存也是不一样的
//广播模式,偏移量是保存在各个消费者
//集群模式,偏移量则是保存在broker中
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
//消费者设置偏移量
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
//消费者的实例化 分为俩种,顺序消费,以及并发消费
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
//消费服务的启动
this.consumeMessageService.start();
//注册到MQClientInstance中的consumerTable,一个jvm中,每个消费组只能有一个对应的消费者
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
//注册不成功会抛异常
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
mq实例工厂的启动---负载服务,以及拉取消息的服务的实现都在其中。
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
//将服务的状态改为运行中
this.serviceState = ServiceState.RUNNING;
break;
省略部分代码
}
省略部分代码
}
}
从启动的流程来看,大致流程如下
1.检查配置
2.复制订阅信息,给与负载服务使用
3.修改消费者实例的id,Pid@hostname,避免一个虚拟机中有多个消费者的时候出现Id重复
4.负载均衡服务的初始化
5.消息偏移量的获取及保存。这样子消费者才知道主题对应的相关队列应该从哪里开始消费。
广播模式---消息队列的偏移量存储在消费者本身
集群模式---消息队列的偏移量存储在broker中
6.消费服务的初始化以及启动
7.mq客户端的启动-负载服务,以及拉取消息的服务的实现都在其中。
从上面这个过程,没有观察到消费者是如何拉取消息的。接下来看看消费者是如何拉取消息的。关注mQClientFactory.start();即可
消费者如何拉取消息(push)方式
public class MQClientInstance {
拉取消息的服务
private final PullMessageService pullMessageService;
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
this.mQClientAPIImpl.start();
this.startScheduledTask();
拉取消息服务的启动
this.pullMessageService.start();
负载服务的启动
this.rebalanceService.start();
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
}
从上面得知,拉取消息的服务就是PullMessageService。
下面先来看看PullMessageService的结构。
PullMessageService
PullMessageService的结构
PullMessageService从上面的图比较清晰,其实PullMessageService是利用线程池去执行各种任务。另外维护了一个拉取消息的一个队列(PullRequest)。下面来看看PullRequest这个结构。
PullMessageService的成员变量的相关结构体
MessageQueue
消息队列
public class MessageQueue implements Comparable<MessageQueue>, Serializable {
private static final long serialVersionUID = 6191200464116433425L;
private String topic;
private String brokerName;
private int queueId;
}
ProcessQueue
执行队列
public class ProcessQueue {
public final static long REBALANCE_LOCK_MAX_LIVE_TIME =
Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockMaxLiveTime", "30000"));
public final static long REBALANCE_LOCK_INTERVAL = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));
private final static long PULL_MAX_IDLE_TIME = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000"));
private final InternalLogger log = ClientLogger.getLog();
private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
private final AtomicLong msgCount = new AtomicLong();
private final AtomicLong msgSize = new AtomicLong();存储的消息容量
private final Lock lockConsume = new ReentrantLock();
private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<Long, MessageExt>();
private final AtomicLong tryUnlockTimes = new AtomicLong(0);
private volatile long queueOffsetMax = 0L;
private volatile boolean dropped = false;
private volatile long lastPullTimestamp = System.currentTimeMillis(); 最近拉取队列的时间
private volatile long lastConsumeTimestamp = System.currentTimeMillis(); 最近消费队列的时间
private volatile boolean locked = false;
private volatile long lastLockTimestamp = System.currentTimeMillis();
private volatile boolean consuming = false;
private volatile long msgAccCnt = 0;存储的消息数量
}
PullRequest
public class PullRequest {
private String consumerGroup;属于哪个消费组发起的拉取请求
private MessageQueue messageQueue;消息队列
private ProcessQueue processQueue;处理队列
private long nextOffset;偏移量,从队列哪里开始拉取
private boolean lockedFirst = false;是否被锁住
}
PullMessageService是如何拉取消息的
MQClientInstance中拉取消息的代码
this.pullMessageService.start();
先来看看PullMessageService父类的方法,实现了Runnable接口,但是是一个抽象类,并没有覆写run方法,由子类去覆写。start其实就是统一了启动的入口。PullMessageService实现了run方法。
public abstract class ServiceThread implements Runnable{
public void start() {
log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
if (!started.compareAndSet(false, true)) {
return;
}
stopped = false;
this.thread = new Thread(this, getServiceName());
this.thread.setDaemon(isDaemon);
this.thread.start();
}
}
PullMessageService实现的run方法,代码如下
public class PullMessageService extends ServiceThread {
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
如果拉取请求队列,一直没有数据的话,就会一直等待
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
}
所以下面比较清晰,只需要专注pullMessage方法即可
public class PullMessageService extends ServiceThread {
private void pullMessage(final PullRequest pullRequest) {
通过mQClientFactory保存的consumerTable拿到对应消费组的消费者
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
强转之后,再去执行拉取,MQConsumerInner 为DefaultMQPushConsumerImpl 的父类
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}
}
通过上面代码,发现是在DefaultMQPushConsumerImpl中实现的。
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void pullMessage(final PullRequest pullRequest) {
final ProcessQueue processQueue = pullRequest.getProcessQueue();
如果请求对应的处理队列已经被废弃了,直接结束
if (processQueue.isDropped()) {
log.info("the pull request[{}] is dropped.", pullRequest.toString());
return;
}
更新最近拉取消息的时间
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
try {
确保这个服务已经起来了,如果不是在运行中,直接结束
this.makeSureStateOK();
} catch (MQClientException e) {
log.warn("pullMessage exception, consumer state not ok", e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
return;
}
允许暂停状态的时候,仍然去拉取消息。(延迟拉取---一段时间后拉取)
if (this.isPause()) {
省略日志打印
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
}
缓存的消息数量
long cachedMessageCount = processQueue.getMsgCount().get();
缓存的消息内容大小
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
如果当消息还有超过1000条没有被消费完时就不拉去消息。pullThresholdForQueue默认 = 1000
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
延迟拉取
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
超过1000次,则打印分流日志
if ((queueFlowControlTimes++ % 1000) == 0) {
省略日志打印
}
结束
return;
}
当缓存的消息队列容量大于1000兆的时候,也执行延迟拉取。避免内存溢出以及过多的消费堆积
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
省略日志打印
}
return;
}
if (!this.consumeOrderly) {
对于并发消费方式的处理,待消费的消息中最大的偏移量与最小偏移量的差值大于2000也延迟拉取
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
省略日志打印
}
return;
}
} else {
对于顺序消费的处理
if (processQueue.isLocked()) {
if (!pullRequest.isLockedFirst()) {
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
pullRequest, offset, brokerBusy);
if (brokerBusy) {
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
pullRequest, offset);
}
pullRequest.setLockedFirst(true);
pullRequest.setNextOffset(offset);
}
} else {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.info("pull message later because not locked in broker, {}", pullRequest);
return;
}
}
订阅数据为空,直接抛异常
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}
final long beginTimestamp = System.currentTimeMillis();
PullCallback pullCallback = new PullCallback() {
回调,暂时忽略
};
省略部分代码
try {
拉取方法
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
}
}
上面的代码设计主要是利用了队列,如果当堆积消息过多,就不再马上拉取消息,而是通过新增请求的方式,将请求加入队列,延迟一段时间再去进行拉取,这样子避免了堆积的消息过多,从而导致服务崩溃。传统的push方式就是Broker给多少消息,consumer就消费多少,这样子当消息陡增的时候,很容易造成内存溢出,以及某个消息很久才可能被消费的问题。代码里面的回调,其实也是蛮重要的,不过暂时忽略,后面可以再看。
再来看看拉取的具体实现
public class PullAPIWrapper {
public PullResult pullKernelImpl(
final MessageQueue mq,
final String subExpression,
final String expressionType,
final long subVersion,
final long offset,
final int maxNums,
final int sysFlag,
final long commitOffset,
final long brokerSuspendMaxTimeMillis,
final long timeoutMillis,
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
找到对应的broker信息,地址,是否主从,broker版本
FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
if (null == findBrokerResult) {
找不到则从注册中心拉取一遍路由信息,并且存到成员变量中
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
再从成员变量中查找
findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
}
if (findBrokerResult != null) {
省略部分代码
int sysFlagInner = sysFlag;
if (findBrokerResult.isSlave()) {
sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
}
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);消费组
requestHeader.setTopic(mq.getTopic());主题
requestHeader.setQueueId(mq.getQueueId());队列id
requestHeader.setQueueOffset(offset);消息偏移量
requestHeader.setMaxMsgNums(maxNums);最大的消息数,默认32
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);消费进度
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType);
String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr);
}
进行拉取
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
brokerAddr,
requestHeader,
timeoutMillis,
communicationMode,
pullCallback);
return pullResult;
}
找不到则抛异常
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
}
以上代码就是通向broker端获取消息,需要broker的地址(通过队列对应的brokername,再通过路由表可以获取到最后的实际地址),以及需要指定通信的方式,同步还是异步,以及偏移量等。
接下来看看返回结构的结构体
PullResult
public class PullResult {
拉取的状态
private final PullStatus pullStatus;
下次开始拉取的偏移量
private final long nextBeginOffset;
当前队列最小的偏移量
private final long minOffset;
当前队列最大的偏移量
private final long maxOffset;
拉取的消息
private List<MessageExt> msgFoundList;
}
public enum PullStatus {
/**
* Founded
*/
FOUND,
/**
* No new message can be pull
*/
NO_NEW_MSG,
/**
* Filtering results can not match
*/
NO_MATCHED_MSG,
/**
* Illegal offset,may be too big or too small
*/
OFFSET_ILLEGAL
}
当拉取完之后,肯定是通过回调来处理。所以这下可以看看回调的内容了
还是回调之前的代码
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void pullMessage(final PullRequest pullRequest) {
省略若干代码
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
如果找到消息
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
将返回结果的偏移量(拉取消息后,Broker端的偏移量),作为下次拉取的偏移量
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
返回消息若为空,则马上进行再次拉取
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
记录下获取消息中的最小偏移量
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
将消息放入processQueue的msgTreeMap中
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
进行消费
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
如果接口返回的偏移量 小于拉取时的偏移量,或者返回数据的最小偏移量小于拉取时的偏移量,说明可能偏移量是错误的,存在重复消费的情况
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}
break;
case NO_NEW_MSG:
case NO_MATCHED_MSG:
没有匹配的消息,则进行再次拉取
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
再次进行拉取
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL:
偏移量错误
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
对应的处理队列进行废弃,但是要进行偏移量的纠正,避免下次拉取还是偏移量错误
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
进行偏移量的纠正,10秒后
@Override
public void run() {
try {
更新偏移量
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
持久化偏移量
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
负载服务移除之前的
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
default:
break;
}
}
}
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
出现异常的话,再进行拉取
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
};
省略若干代码
try {
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
}
}
从以上的代码来看,push方式的拉取,其实还是基于pull,只不过是在consumer消费情况允许的前提下,可以一直Pull(就有点像Push一样)
从push方式拉取的特点来说:
1.会根据自身的消费情况(是否堆积了过多消息,消息的数量或者说消息占用的内存大小,或者说消息的偏移量),决定是否继续拉取,或者延迟拉取
这样子做的好处是,可以规避内存溢出,避免消费者堆积的消息过多,导致消息很久才被消费,此外只要多部署几个消费者,就可以缓解单个消费者的压力,因为还是基于消费者pull的方式(消费能力可以的情况下,再去拉消息)。
2.在拉取服务暂停的情况下,也可以进行延时拉取
3.对于偏移量会做矫正,通过每次拉取返回的结果进行矫正,避免一直重复错误,进行没必要的拉取。
网友评论