主要分析FileInputFormat类的createInputSplits方法
参数minNumSplits,通常是readFile等读取文件操作的并行度决定的。
(1)minNumSplits = Math.max(minNumSplits, this.numSplits);
先获取这个minNumSplits变量,在并行度与numSplits中取最大值(numSplits通常是在FileInputFormat类的configure方法中定义)
(2)获取这个路径下的所有文件的总长度 totalLength
(3)之后进行unsplittable
分支判断
这个unsplittable用来判断文件格式是否能够spliit,有些文件格式如avro,deflate等在块级别上是不能分割的(are not splittable),只能整个文件读取。
- 文件格式不支持在块级别上分割
final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, file.getLen());
一个文件可能占据多个block块。
FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), 0, len, hosts.toArray(new String[hosts.size()]));
一个文件就是一个split,这个split可能是占据多个block块的。
所以,如果该文件格式是不支持split的,那么分片的数目就等于该路径下文件的数目。
- 文件格式支持在块级别上分割
a、final long maxSplitSize = totalLength / minNumSplits + (totalLength % minNumSplits == 0 ? 0 : 1);
先计算分片最大大小maxSplitSize ,totalLength/minNumSplits(不能够整除就加1)。
b、获取splitSize即分片大小
this.minSplitSize是在FileInputFormat类configure方法中定义的,如果该值小于等于block大小,
那么minSplitSize就是configure中定义的minSplitSize;
否则的话minSplitSize就是block size。
splitSize = Math.max(minSplitSize, Math.min(maxSplitSize, blockSize));
halfSplit就是splitSize右移1位(相当于除以2)
定义最后一个分片的大小,可以是一般的1.1倍
final long maxBytesForLastSplit = (long) (splitSize * MAX_SPLIT_SIZE_DISCREPANCY);
c、开始对一个文件开始分片
- 文件长度大于0
<1> 先获取当前文件所占据的block,并进行排序
final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, len); Arrays.sort(blocks);
<2> 进行循环,文件的总字节为bytesUnassigned
,每个split的字节为splitSize
,最后一个split的最大字节为maxBytesForLastSplit
。
<2>的第一部分:
先分析下getBlockIndexForPosition方法:
该方法检索BlockLocation的索引,该索引包含给定文件描述的文件部分偏移量。
四个参数:
blocks
:该文件的所有block块(按照offset排序)
offset
:文件当前内容位置的offset
startIndex
:block的index顺序
先去遍历该文件的每一个block(按照offset排序过),获取该block的start位置和end位置;
如果当前文件内容offset在start和end之间,则进入if分支;
接下来再去判断当前block的分片内容是不是超过一半了,没有超过一半在返回下一个block的索引位置,否则返回当前索引位置(即返回包含大部分数据的block索引位置)。
<2>的第二部分:
blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, blockIndex);
返回当前split超过一半内容所在的block,并进行初始化分片即FileInputSplit实例
<3> 分配最后一个分片split
网友评论