在当消息消费失败,返回给broker的时候,有一个延时级别,然后在保存消息之前,会根据延时级别将主题修改。
public class CommitLog {
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
msg.setStoreTimestamp(System.currentTimeMillis());
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
修改主题为"SCHEDULE_TOPIC_XXXX"
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
备份原有的topic以及队列id,用于后续转发
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
更新主题
msg.setTopic(topic);
msg.setQueueId(queueId);
return putMessageResult;
});
}
}
那么这个调度主题SCHEDULE_TOPIC_XXXX里面的消息是如何被消费的。这个主题其实是broker内部的主题,那么生产跟消费,必然都是broker内部去实现的。
下面先来看看一个接口MessageStore
MessageStore
这个接口定义了如何从commitLog中读取数据,写入数据,以及查询数据。
public interface MessageStore {
boolean load();
void start() throws Exception;
void shutdown();
void destroy();
default CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
return CompletableFuture.completedFuture(putMessage(msg));
}
default CompletableFuture<PutMessageResult> asyncPutMessages(final MessageExtBatch messageExtBatch) {
return CompletableFuture.completedFuture(putMessages(messageExtBatch));
}
PutMessageResult putMessage(final MessageExtBrokerInner msg);
PutMessageResult putMessages(final MessageExtBatch messageExtBatch);
GetMessageResult getMessage(final String group, final String topic, final int queueId,
final long offset, final int maxMsgNums, final MessageFilter messageFilter);
long getMaxOffsetInQueue(final String topic, final int queueId);
long getMinOffsetInQueue(final String topic, final int queueId);
long getCommitLogOffsetInQueue(final String topic, final int queueId, final long consumeQueueOffset);
long getOffsetInQueueByTime(final String topic, final int queueId, final long timestamp);
MessageExt lookMessageByOffset(final long commitLogOffset);
SelectMappedBufferResult selectOneMessageByOffset(final long commitLogOffset);
SelectMappedBufferResult selectOneMessageByOffset(final long commitLogOffset, final int msgSize);
String getRunningDataInfo();
HashMap<String, String> getRuntimeInfo();
long getMaxPhyOffset();
long getMinPhyOffset();
long getEarliestMessageTime(final String topic, final int queueId);
long getEarliestMessageTime();
long getMessageStoreTimeStamp(final String topic, final int queueId, final long consumeQueueOffset);
long getMessageTotalInQueue(final String topic, final int queueId);
SelectMappedBufferResult getCommitLogData(final long offset);
boolean appendToCommitLog(final long startOffset, final byte[] data);
void executeDeleteFilesManually();
QueryMessageResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
final long end);
void updateHaMasterAddress(final String newAddr);
long slaveFallBehindMuch();
long now();
int cleanUnusedTopic(final Set<String> topics);
void cleanExpiredConsumerQueue();
boolean checkInDiskByConsumeOffset(final String topic, final int queueId, long consumeOffset);
long dispatchBehindBytes();
long flush();
boolean resetWriteOffset(long phyOffset);
long getConfirmOffset();
void setConfirmOffset(long phyOffset);
boolean isOSPageCacheBusy();
long lockTimeMills();
boolean isTransientStorePoolDeficient();
LinkedList<CommitLogDispatcher> getDispatcherList();
ConsumeQueue getConsumeQueue(String topic, int queueId);
BrokerStatsManager getBrokerStatsManager();
这个方法是延时调度的核心
void handleScheduleMessageService(BrokerRole brokerRole);
}
简单看看接口定义的方法,就大概知道这个接口是做什么用的。就是用于跟commitLog交互的。那么下面看看其对于的实现类DefaultMessageStore.
DefaultMessageStore
对于handleScheduleMessageService的实现
public class DefaultMessageStore implements MessageStore {
private final ScheduleMessageService scheduleMessageService;
@Override
public void handleScheduleMessageService(final BrokerRole brokerRole) {
if (this.scheduleMessageService != null) {
if (brokerRole == BrokerRole.SLAVE) {
this.scheduleMessageService.shutdown();
} else {
为master才可以进行延时调度
this.scheduleMessageService.start();
}
}
}
}
下面直接看看如何Start
public class ScheduleMessageService extends ConfigManager {
启动的标记
private final AtomicBoolean started = new AtomicBoolean(false);
定时器
private Timer timer;
延时级别,以及对应的延时时间
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
new ConcurrentHashMap<Integer, Long>(32);
延时级别对应的队列的偏移量
private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
new ConcurrentHashMap<Integer, Long>(32);
public void start() {
避免重复启动
if (started.compareAndSet(false, true)) {
后台启动一个守护线程
this.timer = new Timer("ScheduleMessageTimerThread", true);
遍历延时级别的表,每一个延时级别对应不同的延时时间
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
根据延时级别以及偏移量创建定时任务,第一次延时执行
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
定时进行持久化
if (started.get()) ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
}
}
从上面代码来看,就是每一个延时级别创建一个定时任务。执行内容为DeliverDelayedMessageTimerTask
随后再创建一个持久化的定时任务。
如何根据延时级别进行延时处理
public class ScheduleMessageService extends ConfigManager {
class DeliverDelayedMessageTimerTask extends TimerTask {
private final int delayLevel;
private final long offset;
public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
this.delayLevel = delayLevel;
this.offset = offset;
}
@Override
public void run() {
try {
if (isStarted()) {
实现的核心
this.executeOnTimeup();
}
} catch (Exception e) {
log.error("ScheduleMessageService, executeOnTimeup exception", e);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
}
}
}
}
从上面代码看 实现的核心在于executeOnTimeup方法。
接下来看看是如何实现的。
public class ScheduleMessageService extends ConfigManager {
class DeliverDelayedMessageTimerTask extends TimerTask {
private final int delayLevel;
private final long offset;
class DeliverDelayedMessageTimerTask extends TimerTask {
public void executeOnTimeup() {
1.根据调度主题,以及延时级别找到对应的队列。(如果不存在就创建队列),如果创建后队列还为空,啥也不做。
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
if (cq != null) {
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
} else {
//can't find ext content.So re compute tags code.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
tagsCode, offsetPy, sizePy);
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
if (countdown <= 0) {
根据偏移量取出消息
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);
if (msgExt != null) {
try {
2.将消息还原
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
msgInner.getTopic(), msgInner);
continue;
}
3.将消息写入
PutMessageResult putMessageResult =
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
continue;
} else {
出现异常,延时调度
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
nextOffset), DELAY_FOR_A_PERIOD);
更新延时级别的偏移量
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
结束
return;
}
} catch (Exception e) {
log.error(
"ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
+ offsetPy + ",sizePy=" + sizePy, e);
}
}
} else {
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
}
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
bufferCQ.release();
}
} // end of if (bufferCQ != null)
else {
long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
+ cqMinOffset + ", queueId=" + cq.getQueueId());
}
}
} // end of if (cq != null)
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}
}
}
上面的代码比较复杂,但是核心还是在于从队列中取出消息。然后进行还原,最后写入。
消息还原
还原的方法如下
public class ScheduleMessageService extends ConfigManager {
class DeliverDelayedMessageTimerTask extends TimerTask {
private final int delayLevel;
private final long offset;
private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
long tagsCodeValue =
MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
msgInner.setTagsCode(tagsCodeValue);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(msgExt.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
msgInner.setWaitStoreMsgOK(false);
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
将主题还原,真实主题以及队列id被保存在property中
----------------------------------------------------------------------------------------------------------------------------------------------
msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
----------------------------------------------------------------------------------------------------------------------------------------------
int queueId = Integer.parseInt(queueIdStr);
msgInner.setQueueId(queueId);
return msgInner;
}
}
延时调度的机制就是,给每个延时级别分配一个定时任务,用于将每个延时级别对应的队列中的消息进行消费,这个消费其实就是将消息还原,最后写入到真实TOPIC对应的队列中去。
网友评论