概览
RocketMQ的消息存储主要是在${ROCKETMQ_HOME}/store文件夹下,message消息主要存储在commitlog文件夹下,RocketMQ消息存储和索引是分开隔离的,已Topic为主题的消息索引存储在consumequeue文件夹下,通过MessageQueue映射为ConsumeQueue的文件就存储在这个文件夹下,然后index主要是以消息key和offset的对应关系,以类似HashMap的方式存储,方便消息查询。
image.png本片文章主要介绍消息存储组织结构、Message是如何快速存储都MappedFile文件中的。MappedFile文件就是一个个以首条消息的offset为名称的存储文件,如上图commitlog文件夹下展示的00000000000000000000、00000000001073741824等,每一个mappedFile文件的大小约为102410241024=1G。
image.pngDefaultMessageStore
DefaultMessageStore是消息相关操作的主要服务,包括消息的存储、查询、定时清除等等。这里主要介绍其中消息存储相关的事物,包括是否开启TransientStorePool临时消息存储池,一次创建2个MappedFile文件的AllocateMappedFileService消息存储预创建服务,还有历史存储文件mappedFile加载加载到直接内存MappedByteBuffer和对应的mmap文件映射等。
# org.apache.rocketmq.store.DefaultMessageStore
// MappedFile 分配服务
private final AllocateMappedFileService allocateMappedFileService;
// 是否开启
// 消息临时存储
private final TransientStorePool transientStorePool;
this.transientStorePool = new TransientStorePool(messageStoreConfig);
// 根据是否开启 transientStorePoolEnable,存在两种初始化情况。
// transientStorePoolEnable 为 true 表示内容先存储在堆外内存(直接内存),然后通过 Commit 线程将数据提交到FileChannel中,再通过 Flush 线程将内存映射 Buffer 中的数据持久化到磁盘中。
if (messageStoreConfig.isTransientStorePoolEnable()) {
this.transientStorePool.init();
}
//加载历史mappedFile文件,进行便于文件查询和消费
// load Commit Log
result = result && this.commitLog.load();
TransientStorePool
TransientStorePool是短暂的消息存储池。这里先进行简单介绍,具体作用到应用的时候详细介绍。这里直接开辟默认5个1G的直接内存ByteBuffer,用来临时存储消息。它还引入了内存锁的机制,避免直接内存的数据被替换到系统中的Swap分区中,提高系统存储性能,使RocketMQ消息低延迟、高吞吐量。
public class TransientStorePool {
// availableBuffers 个数,可通过在broker中配置文件中设置 transientStorePool,默认值为 5
private final int poolSize;
// 每个 ByteBuffer 大小,默认为 mappedFileSizeCommitLog,表明 TransientStorePool 为 commitlog 文件服务
private final int fileSize;
// 直接内存,ByteBuffer 容器,双端队列
private final Deque<ByteBuffer> availableBuffers;
/**
* 创建默认的堆外内存
* It's a heavy init method.
*/
public void init() {
for (int i = 0; i < poolSize; i++) {
// 利用 NIO 直接直接分配,堆外内存(直接内存),在系统中的内存,非 JVM 内存
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
// 内存地址
final long address = ((DirectBuffer) byteBuffer).address();
Pointer pointer = new Pointer(address);
// 内存锁定
LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
availableBuffers.offer(byteBuffer);
}
}
}
CommitLog
CommitLog主要有消息的刷盘的存储服务、消息的刷盘服务,存储消息的回调等等,这里主要介绍MappedFileQueue,它是对${ROCKET_HOME}/store/commitlog目录的封装,主要用来管理多个MappedFile。
public class CommitLog {
// 映射文件队列,ROCKETMQ_HOME/commitlog 文件夹下的文件对应
protected final MappedFileQueue mappedFileQueue;
// 默认消息存储服务
protected final DefaultMessageStore defaultMessageStore;
// commitLog 刷盘操作
private final FlushCommitLogService flushCommitLogService;
//If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
// 定时将 transientStorePoll 中的直接内存 ByteBuffer,提交条内存映射 MappedByteBuffer 中
private final FlushCommitLogService commitLogService;
// 存储消息到 mappedFile 的回调映射
private final AppendMessageCallback appendMessageCallback;
// 消息解码服务线程
private final ThreadLocal<MessageExtBatchEncoder> batchEncoderThreadLocal;
// topic-queue-id,offset;消息的key,和在 commitlog 中的 offset,方便消息存储时的索引
protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
public CommitLog(final DefaultMessageStore defaultMessageStore) {
// 在这里组织 commitlog 的对应的 MappedFile 文件,然后进行相应的文件操作,文件映射,刷线到磁盘文件
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
this.defaultMessageStore = defaultMessageStore;
// 异步、同步刷盘服务初始化
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
// 同步刷盘服务为 GroupCommitService
this.flushCommitLogService = new GroupCommitService();
} else {
// 异步刷盘服务为 FlushRealTimeService
this.flushCommitLogService = new FlushRealTimeService();
}
// 定时将 transientStorePoll 中的直接内存 ByteBuffer,提交条内存映射 MappedByteBuffer 中
this.commitLogService = new CommitRealTimeService();
// 存储消息到 mappedFile 的回调映射
this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() {
@Override
protected MessageExtBatchEncoder initialValue() {
return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}
};
// putMessage 到 mappedFile 时是否使用可重入锁,默认使用自旋锁
this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
}
}
MappedFileQueue
MappedFile 的存储集合和管理器,是对 ${ROCKET_HOME}/store/commitlog 文件夹的封装。主要用来管理MappedFile文件,包括消息的查询、提交、落盘的刷新,历史MappedFile文件的预热加载和直接内存映射mmap操作,过期文件的删除、追加消息的最后一个MappedFile文件的获取和创建等。
public class MappedFileQueue {
// 存储路径${ROCKET_HOME}/store/commitlog,该目录下会存在多个内存映射文件
private final String storePath;
// 单个文件的存储大小
private final int mappedFileSize;
// mappedFile 文件集合
// 一个线程安全的 ArrayList 的变种,通过可 reentrantLock 可重入锁实现数组的新建和数组旧有内容的 copy 到新建的数组,然后返回新建的数组
private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
// 创建 MappedFile 服务类
private final AllocateMappedFileService allocateMappedFileService;
// 当前刷盘指针,表示该指针之前的所有数据全部持久化到磁盘
// MappedFile 中的 MappedByteBuffer 中数据写入磁盘的指针,该指针之前的所有数据全部持久化到磁盘
private long flushedWhere = 0;
// Java 应用程序态数据要写入nio内存映射的ByteBuffer的提交了位置的指针
// commitWhere 只有开启 transientStorePool 的前提下才有作用;
// commitWhere 代表着 transientStorePool 中直接内存 ByteBuffer 需要提交数据到 MappedByteBuffer 直接内存的,位置为已经提交了数据的位置。下次要提交的开始位置,上次提交的结尾位置。
private long committedWhere = 0;
/**
* 项目启动,加载 commitlog 文件夹下对应的文件
* @return
*/
public boolean load() {
File dir = new File(this.storePath);
File[] files = dir.listFiles();
if (files != null) {
// ascending order
// 根据文件名(offset)排序
Arrays.sort(files);
for (File file : files) {
// 如果物理文件大小 != mappedFileSize,说明文件被破坏了,返回false
if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length()
+ " length not matched message store config value, please check it manually");
return false;
}
try {
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
// 更新 mappedFile 文件指针
mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
mappedFile.setCommittedPosition(this.mappedFileSize);
// 加入映射文件集合
this.mappedFiles.add(mappedFile);
log.info("load " + file.getPath() + " OK");
} catch (IOException e) {
log.error("load file " + file + " error", e);
return false;
}
}
}
return true;
}
/**
* 获取最后存储消息的映射mappedFile
*
* @param startOffset mappedFile 开始写文件的offset
* @param needCreate 是否需要创建新的 mappedFile 文件
* @return
*/
//
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
// 创建映射文件的起始偏移量,也就是即将的mappedfile文件名称
long createOffset = -1;
// 获取最后一个映射文件,如果为null或者写满则会执行创建逻辑
MappedFile mappedFileLast = getLastMappedFile();
// mappedFileLast == null,表示需要创建新的 mappedFile 文件,创建新文件的offset值;
if (mappedFileLast == null) {
// 计算将要创建的映射文件的起始偏移量
// 如果startOffset<=mappedFileSize则起始偏移量为0
// 如果startOffset>mappedFileSize则起始偏移量为是mappedFileSize的倍数
createOffset = startOffset - (startOffset % this.mappedFileSize);
}
// 映射文件满了,创建新的映射文件
if (mappedFileLast != null && mappedFileLast.isFull()) {
// 创建的映射文件的偏移量等于最后一个映射文件的起始偏移量 + 映射文件的大小(commitlog文件大小)
createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
}
// 创建新的映射文件
if (createOffset != -1 && needCreate) {
// 构造commitlog 文件名称
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
// nextNextFilePath,预先创建下一个 mappedFile 文件,通过 allocateMappedFileService 服务,一起创建两个文件,预先创建下一个文件
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
// 优先通过allocateMappedFileService中方式构建映射文件,预分配方式,性能高
// 如果上述方式失败则通过new创建映射文件
if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
if (mappedFile != null) {
// 是否是 MappedFileQueue 队列中第一个文件
if (this.mappedFiles.isEmpty()) {
mappedFile.setFirstCreateInQueue(true);
}
this.mappedFiles.add(mappedFile);
}
return mappedFile;
}
return mappedFileLast;
}
}
MappedFile
CommitLog、MappedFileQueue、MappedFile的关系如图:
image.png
MappedFile是RocketMQ消息存储的终极Boss,重中之重。涉及MapedFile的预创建和映射、历史数据MappedFile的磁盘文件预热。MappedByteBuffer是通过NIO方式创建的内存映射对象。ByteBuffer writeBuffer是直接内存从TransientStorePool中借来的,他们两个是在内存中用来存放消息的,其中区别下面详细介绍。这里先从CommitLog文件存放消息说起。
public class MappedFile extends ReferenceResource {
// 当前JVM实例中 MappedFile 虚拟内存
private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
// 当前JVM实例中MappedFile对象个数
private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
// 即将写入消息的mappedFile 的位置
// 当前 MappedFile 文件的写指针,从 0 开始(内存映射文件的写指针)
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
// 当前文件的提交到 MappedBuffer的指针,如果开启 transientStorePoolEnable,则数据会存储在 TransientStorePool 中,然后提交到内存映射 ByteBuffer 中,再刷写到磁盘
protected final AtomicInteger committedPosition = new AtomicInteger(0);
// 刷写到磁盘指针,该指针之前的数据持久化到磁盘中
private final AtomicInteger flushedPosition = new AtomicInteger(0);
// 文件大小
protected int fileSize;
// 文件通道
protected FileChannel fileChannel;
/**
* Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
*/
// 堆外内存 ByteBuffer,如果不为空,数据首先将存储在该 Buffer 中,然后提交到 MappedFile 对应的内存映射文件 Buffer。
// transientStorePoolEnable 为true时不为空。
protected ByteBuffer writeBuffer = null;
// 堆内存池,transientStorePoolEnable 为true 时启用
protected TransientStorePool transientStorePool = null;
// 文件名称
private String fileName;
// mappedFile 文件的开始偏移量地址
private long fileFromOffset;
// 物理文件
private File file;
// NIO 物理文件对应的内存映射Buffer
private MappedByteBuffer mappedByteBuffer;
// 文件最后一次内容写入的时间
private volatile long storeTimestamp = 0;
// 是否是 MappedFileQueue 队列中第一个文件
private boolean firstCreateInQueue = false;
}
CommitLog#putMessage方法是来存放消息的,存放消息到系统内存映射中,并没有落入磁盘中,等待同步刷盘、或者异步刷盘,然后是消息存储的高可用。
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// Set the storage time
// 消息存储时间
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
// 循环冗余校验码,检测数据在网络中传输是否发生变化
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
// 存储服务统计功能服务
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
// topic
String topic = msg.getTopic();
int queueId = msg.getQueueId();
// 事务回滚消息标志
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
// 如果是非事务消息,或者事务消息的 commit 操作;进而判断是不是延迟消息,存储到特殊的延迟消息队列;然后事务消息存储也进行了同样的消息 topic 的转换,从而实现了消息的事务;事务消息非提交阶段,先进行另一个 topic 的储存,如果事务提交了,才进行,存储到消息的真正的topic 中去。
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
// 如果是延迟级别消息
if (msg.getDelayTimeLevel() > 0) {
// 设置消息延迟级别
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
// 延迟消息topic
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
// 延迟消息消息队列Id
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
// 将真实的 topic 放入 message 属性中
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 替换为延迟消息topic
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
// 消息诞生地址 ipv6 设置
InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
if (bornSocketAddress.getAddress() instanceof Inet6Address) {
msg.setBornHostV6Flag();
}
// 消息存储地址 ipv6 设置
InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
if (storeSocketAddress.getAddress() instanceof Inet6Address) {
msg.setStoreHostAddressV6Flag();
}
long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
// 最后一个 消息 存储 commitlog 消息映射文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
// 自旋锁 或 可重入锁,上锁;消息写入 commitlog 的映射文件是串行的
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
// 开始上锁时间
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
// 消息存储时间,确定消息全局有序
msg.setStoreTimestamp(beginLockTimestamp);
//mappedFile==null标识CommitLog文件还未创建,第一次存消息则创建CommitLog文件
//mappedFile.isFull()表示mappedFile文件已满,需要重新创建CommitLog文件
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise;新文件,造成脏数据
}
// mappedFile==null说明创建CommitLog文件失败抛出异常,创建失败可能是磁盘空间不足或者权限不够
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
// 追加消息到 mappedFile 文件中
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
// 针对一条消息足够长,然后 mappedFile 文件不够存储,需要创建新的 mappedFile 进行消息存放。
case END_OF_FILE:
// 上一个 mappedFile 暂存文件,需要解锁这个 mappedFile
unlockMappedFile = mappedFile;
// broker 重新开辟,新的 commitlog 文件
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
// 开辟成功,再将消息写入
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
// 存储消息花费时间
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
// 最后释放存储消息的锁
putMessageLock.unlock();
}
// 存储消息花费时间 > 500
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
}
// 上一个有空闲空间,但不够存储消息的 mappedFile 文件,
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
// 解锁 mappedFile 的内存锁定
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
// topic 下存放消息次数
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
// topic 下存放消息字节数
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
// handle 硬盘刷新
handleDiskFlush(result, putMessageResult, msg);
// handle 高可用
handleHA(result, putMessageResult, msg);
// 返回存储消息的结果
return putMessageResult;
}
PutMessage重要步骤
- 获取上次最后一个写入消息的存储文件MappedFile,MappedFile文件的获取在后面会详细接受。
// 最后一个 消息 存储 commitlog 消息映射文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
- 向MappedFile文件追加消息,如果返回END_OF_FILE代表这个整备追加消息的MappedFile文件写满了,不够存储本条消息,然后再去获取这最后下一个MappedFile文件。
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
// 针对一条消息足够长,然后 mappedFile 文件不够存储,需要创建新的 mappedFile 进行消息存放。
case END_OF_FILE:
// 上一个 mappedFile 暂存文件,需要解锁这个 mappedFile
unlockMappedFile = mappedFile;
// broker 重新开辟,新的 commitlog 文件
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
// 开辟成功,再将消息写入
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
- 写MappedFile文件是会被mlock内存锁定,防止被交换到Swap分区中,写满的MappedFile文件进行锁定解除。
// 上一个有空闲空间,但不够存储消息的 mappedFile 文件,
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
// 解锁 mappedFile 的内存锁定
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
- 写入内存的消息进行刷盘,然后是HA消息存储的高可用,Broker存储消息的复制,这两部分内容也很重要,下次在介绍,在本章不是重点内容。
// handle 硬盘刷新
handleDiskFlush(result, putMessageResult, msg);
// handle 高可用
handleHA(result, putMessageResult, msg);
最后一个MappedFile的获取
这是MappedFile设计的经典,现在重点介绍。创建MappedFile对象有两种方式。
第一种:通过构造方法,new MappedFile()一个对象。然后进行MapepdFile对象MappedByteBuffer的内存映射。
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
public MappedFile(final String fileName, final int fileSize) throws IOException {
init(fileName, fileSize);
}
/**
* MappedFile 初始化,并做好 mappedFile 和 mappedByteBuffer 的NIO 直接内存映射关系
*
* @param fileName 物理文件路径
* @param fileSize mappedFileSize 文件大小
* @throws IOException
*/
private void init(final String fileName, final int fileSize) throws IOException {
this.fileName = fileName;
this.fileSize = fileSize;
// 物理文件名称
this.file = new File(fileName);
// 文件开始位置
this.fileFromOffset = Long.parseLong(this.file.getName());
boolean ok = false;
// 确保文件路径存在,不存在,进行路径文件创建
ensureDirOK(this.file.getParent());
try {
// 通过 RandomAccessFile 创建读写文件通道,并将文件内容使用NIO 的内存映射 Buffer 将文件映射到内存中
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
// 物理文件对应的内存映射Buffer
// 通过 NIO 文件通道和mappedFileSize 大小,创建内存映射文件 mappedByteBuffer
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
// 当前JVM实例中 MappedFile 虚拟内存
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
// 当前JVM实例中MappedFile对象个数
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
} catch (FileNotFoundException e) {
log.error("Failed to create file " + this.fileName, e);
throw e;
} catch (IOException e) {
log.error("Failed to map file " + this.fileName, e);
throw e;
} finally {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
}
第二种:预创建MappedFile,通过allocateMappedFileService服务一次创建两个MappedFile对象。
// 优先通过allocateMappedFileService中方式构建映射文件,预分配方式,性能高
// 如果上述方式失败则通过new创建映射文件
if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
AllocateMappedFileService
AllocateMappedFileService是预创建MappedFile文件的服务,通过一次构造两个创建MappedFile的AllocateRequest然后放入队列requestQueue中,通过CountDownLatch线程同步协调器等待mmapOperation()方法,创建MappedFile对象,并返回。RocketMQ中预分配MappedFile的设计非常巧妙,下次获取时候直接返回就可以不用等待MappedFile创建分配所产生的时间延迟。
CountDownLatch协调两个线程之间的通信
image.png/**
* 预先创建 MappedFile 文件,只是先创建2个创建mappedFile 文件的请求,放入队列中,具体 mappedFile 文件的创建和文件内存直接映射由 mmapOperation() 方法来实现。
* @param nextFilePath 创建 mappedFile 文件的全路径名称
* @param nextNextFilePath 创建下一个 mappedFile 文件的全路径名称
* @param fileSize 文件大小
* @return
*/
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
// 默认提交两个请求
int canSubmitRequests = 2;
// 是否启用 transientStorePool
if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
// SLAVE 节点中 transientByteBuffer 即使没有足够的 ByteBuffer,也不支持快速失败
// 启动快速失败策略时,计算TransientStorePool中剩余的buffer数量减去requestQueue中待分配的数量后,剩余的buffer数量
if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
&& BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
// 可用的 ByteBuffer - requestQueue,还剩余可用的 ByteBuffer 数量
canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size();
}
}
// 创建分配请求
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
// 判断requestTable中是否存在该路径的分配请求,如果存在则说明该请求已经在排队中
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
//该路径没有在排队
if (nextPutOK) {
// byteBuffer 数量不够,则快速失败
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
this.requestTable.remove(nextFilePath);
return null;
}
// 数量充足的话,将指定的元素插入到此优先级队列中
boolean offerOK = this.requestQueue.offer(nextReq);
if (!offerOK) {
log.warn("never expected here, add a request to preallocate queue failed");
}
// 请求数量 -1
canSubmitRequests--;
}
// 下下个请求的处理
AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
if (nextNextPutOK) {
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
this.requestTable.remove(nextNextFilePath);
} else {
boolean offerOK = this.requestQueue.offer(nextNextReq);
if (!offerOK) {
log.warn("never expected here, add a request to preallocate queue failed");
}
}
}
// 报错,日志
if (hasException) {
log.warn(this.getServiceName() + " service has exception. so return null");
return null;
}
// 下一个分配请求,获取当前请求,然后通过线程协调器CountDownLatch,协调另一个线程进行完mmpOperation操作后,返回创建好的MappedFile文件
AllocateRequest result = this.requestTable.get(nextFilePath);
try {
if (result != null) {
// 默认等待5s,等待 mmapOperation 操作创建 mappedFile
// 调用此方法的线程会被阻塞,直到 CountDownLatch 的 count 为 0;等到 mmapOperation() finally countDownLatch 为 0
boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
if (!waitOK) {
log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
return null;
} else {
// 成功从 requestTable 中移除请求,并返回 mappedFile 文件
this.requestTable.remove(nextFilePath);
return result.getMappedFile();
}
} else {
log.error("find preallocate mmap failed, this never happen");
}
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
return null;
}
具体创建MappedFile对象
AllocateMappedFileService服务开启了一个线程,不停地从创建MappedFile对象的请求队列requestQueue中获取AllocateRequest,并实时创建MappedFile对象,并通过CountDownLatch通知putRequestAndReturnMappedFile() 方法已经创建了MappedFile对象,从而获取返回。
/**
* 开始 mappedFile 文件分配服务,从 requestQueue 中获取创建 mappedFile 的文件请求
*/
public void run() {
log.info(this.getServiceName() + " service started");
// 除非停止,否则一直在进行 mmap 映射操作
while (!this.isStopped() && this.mmapOperation()) {
}
log.info(this.getServiceName() + " service end");
}
MmapOperation具体创建MappedFile对象
在这里创建MappedFile对象,也有两种情况,区别在于是否启用TransientStorePool消息暂存池,它里面有默认5个1G的直接内存,可以通过直接内存赋值给MappedFile的writerBuffer对象,省去了开辟内存的时间;还有一种是通过MappedFile的NIO创建的MappedByteBuffer直接内存映射来存储消息,需要进行文件的map映射操作,开辟内存空间。这两种方式的对比会在下面介绍。
第一种:不启用transientStorePool对象,通过构造方法创建。
第二种:通过ServerLoader.load的方式创建,如果失败了,再去尝试构造方法的方式。
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
try {
// 为每一个 mappedFile 文件,进行init中的mmap 映射操作
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
} catch (RuntimeException e) {
log.warn("Use default implementation.");
// spi 加载失败,使用构造方法创建 mappedFile
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
}
} else {
// 不开启 transientStorePool,直接内存映射
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}
MappedFile文件预热
通过 mmap 建立内存映射仅是将文件磁盘地址和虚拟地址通过映射对应起来,此时物理内存并没有填充磁盘文件内容。
当实际发生文件读写时,产生缺页中断并陷入内核,然后才会将磁盘文件内容读取至物理内存。针对上述场景,RocketMQ 设计了 MappedFile 预热机制。
当 RocketMQ 开启 MappedFile 内存预热(warmMapedFileEnable),且 MappedFile 文件映射空间大小大于等于 mapedFileSizeCommitLog(1 GB) 时,调用 warmMappedFile 方法对 MappedFile 进行预热。
if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
.getMappedFileSizeCommitLog()
&&
this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
// 对 mappedFile 进行预热
mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
}
MappedFile创建后,需要对 MappedFile 文件进行预热,将内存和磁盘映射起来,然后每页写入占位数据0,然后将这些0数据,刷新到磁盘,进行磁盘预热。
当调用Mmap进行内存映射后,OS只是建立了虚拟内存地址至物理地址的映射表,而实际并没有加载任何文件至内存中。
程序要访问数据时,OS会检查该部分的分页是否已经在内存中,如果不在,则发出一次 缺页中断。X86的Linux中一个标准页面大小是4KB,
那么1G的CommitLog需要发生 1024KB/4KB=256次 缺页中断,才能使得对应的数据完全加载至物理内存中。
为什么每个页都需要写入数据呢?
RocketMQ在创建并分配MappedFile的过程中预先写入了一些随机值到Mmap映射出的内存空间里。原因在于:
仅分配内存并进行mlock系统调用后并不会为程序完全锁定这些分配的内存,原因在于其中的分页可能是写时复制的。因此,就有必要对每个内存页面中写入一个假的值。
锁定的内存可能是写时复制的,这个时候,这个内存空间可能会改变。这个时候,写入假的临时值,这样就可以针对每一个内存分页的写入操做会强制 Linux 为当前进程分配一个独立、私有的内存页。
写时复制
写时复制:子进程依赖使用父进程开创的物理空间。
内核只为新生成的子进程创建虚拟空间结构,它们来复制于父进程的虚拟究竟结构,但是不为这些段分配物理内存,它们共享父进程的物理空间,当父子进程中有更改相应段的行为发生时,再为子进程相应的段分配物理空间。
https:www.cnblogs.com/biyeymyhjob/archive/2012/07/20/2601655.html
为了避免OS检查分页是否在内存中的过程出现大量缺页中断,RocketMQ在做Mmap内存映射的同时进行了madvise系统调用,
目的是使OS做一次内存映射后,使对应的文件数据尽可能多的预加载至内存中,降低缺页中断次数,从而达到内存预热的效果。
RocketMQ通过map+madvise映射后预热机制,将磁盘中的数据尽可能多的加载到PageCache中,保证后续对ConsumeQueue和CommitLog的读取过程中,能够尽可能从内存中读取数据,提升读写性能。
public void warmMappedFile(FlushDiskType type, int pages) {
long beginTime = System.currentTimeMillis();
// 创建一个新的字节缓冲区,其内容是此缓冲区内容的共享子序列
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
int flush = 0;
long time = System.currentTimeMillis();
// warmMappedFile 每间隔 OS_PAGE_SIZE 向 mappedByteBuffer 写入一个 0,此时对应页恰好产生一个缺页中断,操作系统为对应页分配物理内存
for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
byteBuffer.put(i, (byte) 0);
// 刷盘方式是同步策略时,进行刷盘操作
// 每修改 pages 个分页刷一次盘,相当于 4096 * 4k = 16M,每 16 M刷一次盘,1G 文件 1024M/16M = 64 次
// force flush when flush disk type is sync
// 如果刷盘策略为同步刷盘,需要对每个页进行刷盘
if (type == FlushDiskType.SYNC_FLUSH) {
if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
flush = i;
mappedByteBuffer.force();
}
}
// prevent gc
// 防止垃圾回收
if (j % 1000 == 0) {
log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
time = System.currentTimeMillis();
try {
Thread.sleep(0);
} catch (InterruptedException e) {
log.error("Interrupted", e);
}
}
}
// force flush when prepare load finished
// 前面对每个页,写入了数据(0 占位用,防止被内存交互),进行了刷盘,然后这个操作是对所有的内存进行刷盘。
if (type == FlushDiskType.SYNC_FLUSH) {
log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}",
this.getFileName(), System.currentTimeMillis() - beginTime);
// 刷盘,强制将此缓冲区内容的任何更改写入包含映射文件的存储设备
mappedByteBuffer.force();
}
log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(),
System.currentTimeMillis() - beginTime);
//通过 JNA 调用 mlock 方法锁定 mappedByteBuffer 对应的物理内存,阻止操作系统将相关的内存页调度到交换空间(swap space),以此提升后续在访问 MappedFile 时的读写性能。
this.mlock();
}
通过 JNA 调用 mlock 方法锁定 mappedByteBuffer 对应的物理内存,阻止操作系统将相关的内存页调度到交换空间(swap space),以此提升后续在访问 MappedFile 时的读写性能。
public void mlock() {
final long beginTime = System.currentTimeMillis();
final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
Pointer pointer = new Pointer(address);
{
int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
log.info("mlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
}
// RocketMQ的做法是,在做Mmap内存映射的同时进行madvise系统调用,目的是使OS做一次内存映射后对应的文件数据尽可能多的预加载至内存中,从而达到内存预热的效果。
{
int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);
log.info("madvise {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
}
}
MappedByteBuffer VS WriterBuffer
MappedByteBuffer和WriterBuffer都是MappedFile对象的成员属性,都是用来存放消息的,只有开启了TransientStorePool,才会向writerBuffer直接内存写入消息,然后commit消息到FileChannle中,然后再flush到磁盘。否则就是存储在NIO创建的MappedByteBuffer直接内存中,然后刷新到磁盘。
image.png从TransientStorePool中借取的MappedFile中的writerBuffer与MappedFile的MappedByteBuffer在数据处理上的差异在什么地方呢?
分析其代码,TransientStorePool会通过ByteBuffer.allocateDirect调用直接申请对外内存,消息数据在写入内存的时候是写入预申请的内存中。在异步刷盘的时候,再由刷盘线程将这些内存中的修改写入文件。
那么与直接使用MappedByteBuffer相比差别在什么地方呢?
MappedByteBuffer 和 WriteBuffer 都会经过,PageCache 这个操作进行写入磁盘。
MappedByteBuffer写入数据,写入的是MappedByteBuffer映射的磁盘文件对应的Page Cache,可能会慢一点。而TransientStorePool方案下写入的则为纯粹的内存,并不是PageCache,因此在消息写入操作上会更快,因此能更少的占用CommitLog.putMessageLock锁,从而能够提升消息处理量。然后再经过刷盘将直接内存中的数据经过Page Cache 写入磁盘。使用TransientStorePool方案的缺陷主要在于在异常崩溃的情况下回丢失更多的消息。
Mmap的写入操作是:Mmap的MappedByteBuffer映射直接内存,直接内存映射文件,然后文件会对应Page Cache,也就是 MmapedByteBuffer的直接内存可能是Page Cache的东西,然后通过写Page Cache,然后再写入磁盘。
FileChannle:是写直接内存,这个效率比较高,然后直接内存满了,在落盘的时候,再去经过Page Cache,落入磁盘。WriterBuffer的写入方式实际也就是FileChannel的写入方式,Mmap在写入4k一下的文件比较快,然后FileChannel写入文件大于4k时,比Mmap方式的要快,可能是因为PageCache 是4k,然后写着就可能去落盘了。而FileChannel 是写满了直接内存,才去经过PageCache,这样写入直接内存的效率更高,然后再经过Page Cache,当大于4k的时候,大于Page Cache的内存的时候,就是FileChannel快了。大概因为FileChannel是基于Block(块)的。
Mmap VS FileChannle参考https://juejin.cn/post/6844903842472001550
网友评论