一、存储总体结构
image.png从上面的图中可以看出,Broker都是通过DefaultMessageStore实现数据的存储和读取。消息的存储主要是通过调用DefaultMessageStore.putMessage(),实现消息的存储。
二、消息存储过程
本节将以消息发送存储为突破点,一点一点揭开RocketMQ 存储设计的神秘面纱。消息存储入口: org.apache.rocketmq.store.DefaultMessageStore#putMessage 。
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
if (this.shutdown) {
log.warn("message store has shutdown, so putMessage is forbidden");
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}
// 如果当前节点是从节点,拒绝写
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("message store is slave mode, so putMessage is forbidden ");
}
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}
// 当前broker状态,是否可写
if (!this.runningFlags.isWriteable()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
}
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
} else {
this.printTimes.set(0);
}
// 消息主题长度限制
if (msg.getTopic().length() > Byte.MAX_VALUE) {
log.warn("putMessage message topic length too long " + msg.getTopic().length());
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
// 消息属性限制
if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
}
// 是否os cache刷新繁忙
if (this.isOSPageCacheBusy()) {
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}
// 写数据
long beginTime = this.getSystemClock().now();
PutMessageResult result = this.commitLog.putMessage(msg);
long eclipseTime = this.getSystemClock().now() - beginTime;
if (eclipseTime > 500) {
log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
}
this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}
return result;
}
获取到对应的MappedFile写数据
MappedFile unlockMappedFile = null;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
在写入CommitLog 之前,先申请putMessageLock,也就是将消息存储到CornrnitLog 文件中是串行的。
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
// 写数据
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}
调用appendMessagesInner写数据
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
assert messageExt != null;
assert cb != null;
int currentPos = this.wrotePosition.get();
if (currentPos < this.fileSize) {
// 调用slice共享缓冲区
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
// 写入位置
byteBuffer.position(currentPos);
AppendMessageResult result = null;
if (messageExt instanceof MessageExtBrokerInner) {
// 追加消息
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
} else if (messageExt instanceof MessageExtBatch) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
// 更新写入位置
this.wrotePosition.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
创建全局唯一id,消息ID 有16 字节,消息ID 组成如图4-4 所示
image.png // PHY OFFSET
long wroteOffset = fileFromOffset + byteBuffer.position();
this.resetByteBuffer(hostHolder, 8);
String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);
消息存储格式
RocketMQ 消息存储格式如下。
- TOTALSIZE : 该消息条目总长度,4 字节。
- MAGICCODE : 魔数, 4 字节。固定值Oxdaa320a7 。
- BODYCRC : 消息体crc校验码, 4 字节。
- QUEUEID : 消息消费队列ID , 4 字节。
- FLAG : 消息FLAG , RocketMQ 不做处理, 供应用程序使用,默认4 字节。
- QUEUEOFFSET :消息在消息消费队列的偏移量, 8 字节。
- PHYSICALOFFSET : 消息在CommitLog 文件中的偏移量, 8 字节。
- SYSFLAG : 消息系统Flag ,例如是否压缩、是否是事务消息等, 4 字节。
- BORNTIMESTAMP : 消息生产者调用消息发送API 的时间戳, 8 字节。
- BORNHOST :消息发送者IP 、端口号, 8 字节。
- STORETIMESTAMP : 消息存储时间戳, 8 字节。
- STOREHOSTADDRESS: Broker 服务器IP+端口号, 8 字节。
- RECONSUMETIMES : 消息重试次数, 4 字节。
- Prepared Transaction Offset : 事务消息物理偏移量, 8 字节。
- BodyLength :消息体长度, 4 字节。
- Body : 消息体内容,长度为bodyLen th 中存储的值。
- TopieLength : 主题存储长度, 1 字节,表示主题名称不能超过255 个字符。
- Topic : 主题,长度为TopieL e n g th 中存储的值。
- PropertiesLength : 消息属性长度, 2 字节, 表示消息属性长度不能超过6 553 6 个字符。
- Properties : 消息属性,长度为PropertiesLength 中存储的值。
上述表示CommitLog 条目是不定长的,每一个条目的长度存储在前4 个字节中。
如果消息长度+END_FILE_MIN_BLANK_LENGTH 大于CommitLog 文件的空闲空间,则返回AppendMessageStatus.END_OF_FILE, Broker 会重新创建一个新的CommitLog 文件来存储该消息。
从这里可以看出,每个CommitLog 文件最少会空闲8个字节,高4 字节存储当前文件剩余空间,低4 字节存储魔数: CommitLog.BLANK_MAGIC_CODE 。
// Determines whether there is sufficient free space
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
// Here the length of the specially set maxBlank
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
将消息内容存储到ByteBuffer 中,然后创建AppendMessageResult 。这里只是将消息存储在MappedFile 对应的内存映射Buffer 中,并没有刷写到磁盘。
DefaultAppendMessageCallback#doAppend 只是将消息追加在内存中, 需要根据是同步刷盘还是异步刷盘方式,将内存中的数据持久化到磁盘。
AppendMessageResult 的属性:
-
AppendMessageStatus status :消息追加结果,取值PUT_OK : 追加成功; END_OF_FILE: 超过文件大小; MESSAGE_SIZE_EXCEEDED :消息长度超过最大允许长度:PROPERTIES_SIZE_EXCEEDED :消息、属性超过最大允许长度; UNKNOWN_ERROR :未知异常。
-
long wroteOffset :消息的物理偏移量。
-
String msgld :消息ID 。
-
long storeTimestamp :消息存储时间戳。
-
long logicsOffset :消息消费队列逻辑偏移量,类似于数组下标。
-
long pagecacheRT = 0 :当前未使用。
-
int msgNum = 1 :消息条数,批量消息发送时消息条数。
网友评论