1. 概述
我们前面可以看到在执行SortShuffleManager
的过程中, 会初始化IndexShuffleBlockResolver
作为各种ShuffleWriter
必须的初始化组件.
IndexShuffleBlockResolver
就像名字描述的一样, 主要用于shuffle blocks, 从逻辑block到物理文件之间的映射关系. 它会确保每个Map过程最终生成的block(也就是blockManager维护的那些block) 会被按照key sort后放在同一个文件里, 然后另外生成一份index file告诉使用者文件的每一段都是哪个key对应的block
2. 主要的方法
2.1 writeIndexFileAndCommit
如下图所示, 写一个index文件, 里面是标记数据文件中每个block起始点的offset, 这些数据后边会被getBlockData
调用来寻找每个block的开始和结束位置
这个方法还需要保证, data和index文件的写是atomic的, 保证两个文件的一致性
def writeIndexFileAndCommit(
shuffleId: Int,
mapId: Int,
lengths: Array[Long],
dataTmp: File): Unit = {
// 为了保证原子性, 使用了传统的先写到一个临时文件, check所有都一致, 然后重命名的策略. 如果中间发生失败就抛弃临时文件即可
val indexFile = getIndexFile(shuffleId, mapId)
val indexTmp = Utils.tempFileWith(indexFile)
try {
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
Utils.tryWithSafeFinally {
// 每个block的大小转换成offset
var offset = 0L
out.writeLong(offset)
for (length <- lengths) {
offset += length
out.writeLong(offset)
}
} {
out.close()
}
val dataFile = getDataFile(shuffleId, mapId)
// 下面这段同步代码保证了每个executor同时只会执行一个IndexShuffleBlockResolver, 后续的检测和重命名过程是源自的
synchronized {
val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
if (existingLengths != null) {
// 如果相关的index已经存在, 就可以直接退出了, 这是因为这个mapTask可能已经运行过了.
// 当然也可能因为其它原因失败, 但总之这次写是不成功的, 直接删除tmp文件完事
System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
if (dataTmp != null && dataTmp.exists()) {
dataTmp.delete()
}
indexTmp.delete()
} else {
// 写成功的话, 删除旧的data文件和index文件, 用心的覆盖
if (indexFile.exists()) {
indexFile.delete()
}
if (dataFile.exists()) {
dataFile.delete()
}
if (!indexTmp.renameTo(indexFile)) {
throw new IOException("fail to rename file " + indexTmp + " to " + indexFile)
}
if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
throw new IOException("fail to rename file " + dataTmp + " to " + dataFile)
}
}
}
} finally {
if (indexTmp.exists() && !indexTmp.delete()) {
logError(s"Failed to delete temporary index file at ${indexTmp.getAbsolutePath}")
}
}
}
2.2 getBlockData
用于根据index文件从data文件中把需要的block提取出来, 好等待后一步进入shuffle过程
override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
// 因为设计上是每个shuffle过程生成一个file, 这里根据shuffleId获取到它的index文件
val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
// 读取文件的过程
val in = new DataInputStream(new FileInputStream(indexFile))
try {
ByteStreams.skipFully(in, blockId.reduceId * 8)
val offset = in.readLong()
val nextOffset = in.readLong()
new FileSegmentManagedBuffer(
transportConf,
getDataFile(blockId.shuffleId, blockId.mapId),
offset,
nextOffset - offset)
} finally {
in.close()
}
}
网友评论