顺序消息的实现
顺序消息进行消费时,若是第一次消费失败,可以返回SUSPEND_CURRENT_QUEUE_A_MOMENT
,下一次会继续消费此消息。
顺序消息的消费失败时的重试逻辑,具体代码在ProccessQueue中,顺序消费时手动从processQueue中取消息,内部是从msgTreeMap中取出消息后,将消息添加到consumingMsgOrderlyTreeMap中,若是消费成功,将该消息从consumingMsgOrderlyTreeMap中删除即可。若是消费失败,执行makeMessageToConsumeAgain方法,将这些消息再放回msgTreeMap。
顺序消费时有回滚和重试的逻辑,但是新版本不建议使用。回滚和重试的逻辑和上面相同,回滚时将消息重新放回treeMap,提交时不用操作treeMap,但是需要根据consumingMsgOrderlyTreeMap找到当前消费的offset,从下一个继续消费。
顺序消息消费时使用同一个线程,可以看一下ConsumeMessageOrderlyService
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(), // 迷惑性代码...
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_"));
因为queue的长度是Integer.MAX_VALUE,因此在进行消费时使用的是一个线程,并且有序执行。
顺序消息的消费使用同一个线程是在ConsumeMessageOrderlyService.ConsumeRequest和ProcessQueue中实现的。
// ProcessQueue
private volatile boolean consuming = false;
public boolean putMessage(final List<MessageExt> msgs) {
boolean dispatchToConsume = false;
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
int validMsgCnt = 0;
for (MessageExt msg : msgs) {
MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
if (null == old) {
validMsgCnt++;
this.queueOffsetMax = msg.getQueueOffset();
msgSize.addAndGet(msg.getBody().length);
}
}
msgCount.addAndGet(validMsgCnt);
// 如果有消息可以进行消费,并且当前queue没有消费,则将dispatchToConsume和consuming置为true
if (!msgTreeMap.isEmpty() && !this.consuming) {
dispatchToConsume = true;
this.consuming = true;
}
if (!msgs.isEmpty()) {
MessageExt messageExt = msgs.get(msgs.size() - 1);
String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
if (property != null) {
long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
if (accTotal > 0) {
this.msgAccCnt = accTotal;
}
}
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("putMessage exception", e);
}
return dispatchToConsume;
}
// ConsumeMessageOrderlyService
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume) {
if (dispathToConsume) { // putMessage返回true时,才将request提交到线程池
// 如果已经开始对该queue进行消费了,就不会再次提交任务
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
}
// 提交给线程池的任务
// 主要代码
class ConsumeRequest implements Runnable {
@Override
public void run() {
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
// 如果可以继续消费,直接在当前线程中轮询消费该ProcessQueue即可
for (boolean continueConsume = true; continueConsume; ) {
// 在consumerImpl中的pullMessage方法中持续给ProcessQueue添加消息
// 手动从ProcessQueue中取消息
List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
if (!msgs.isEmpty()) {
try {
this.processQueue.getLockConsume().lock();
//消费消息
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
} finally {
this.processQueue.getLockConsume().unlock();
}
// 处理消费结果,若是成功继续消费
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}
}
}
}
看代码可以发现,如果顺序消息消费失败的话,即消费返回SUSPEND_CURRENT_QUEUE_A_MONENT
时,当前线程会停止消费,在processConsumeResult时,会提交新的任务到线程池,在新的线程中继续消费该消息。
核心逻辑是保证一个ProcessQueue只在一个线程中轮询消费消息。
发送顺序消息时会添加一个队列选择器,将需要有序的消息发送到同一个队列。消费端拉取特定queue的数据时天生有序,在消费时使用同一个线程进行消费,因此就实现了顺序消息。
事务消息
二阶段提交加补偿机制
第一阶段提交消息到broker,broker将topic修改为RMQ_SYS_TRANS_HALF_TOPIC
,存入对consumer不可见的topic/queue。如果此阶段写入成功,执行transactionListener.executeLocalTransaction()
。
第二阶段,根据本地事务的执行结果提交或者回滚第一阶段提交至broker的消息,这里使用的是OneWay方法,可靠性低,可能出现失败或者超时的情况。
broker端处理RequestCode.END_TRANSACTION
的请求,如果是commit,则将原来的消息取出,更改为正确的topic/queue,并进行落盘,然后添加Op状态。如果是rollback,则直接添加Op状态即可。
添加Op状态是将消息添加到Op队列中,Op队列是为了补偿逻辑时减少判断。
补偿逻辑:
BrokerController启动时会启动TransactionMessageCheckService,默认每隔60s检查一次HALF_TOPIC下所有的queue中的消息,检查步骤如下
- 先判断当前queue和对应的opQueue是否添加过消息,如果没有,遍历下一个queue,若有,进行下一步判断
- 获取对应的opQueue中的消息,若是没有消息,遍历下一个queue,若有,进行下一步判断
- 遍历当前queue
- 如果当前偏移量已经添加了oP状态,直接遍历至下一个偏移量,否则进行下一步判断
- 获取当前消息,若为null,遍历下一个偏移量,若不为null,进行下一步判断
- 若当前消息需要舍弃或者跳过,遍历下一个偏移量,否则进行下一步判断
- 判断当前消息是否需要check,若暂时不需要,重新走判断流程
- 若是需要check,broker端给producer发送
CHECK_TRANSACTION_STATE
消息,producer端接收到消息后,执行TransactionListener.checkLocalTransaction
,将check结果回发给broker。
网友评论