一、定义
1、block:block是物理切块,在文件上传到HDFS文件系统后,对大文将以每128MB的大小切分若干,存放在不同的DataNode上。
2、split:split是逻辑切片,在mapreduce中的map task开始之前,将文件按照指定的大小切割成若干个部分,每一部分称为一个split,默认是split的大小与block的大小相等,均为128MB。
注意:在hadoop1.x版本中,block默认的大小为64MB,在hadoop2.x版本修改成了128MB
2、参数设置
1、block的默认配置在hdfs-site.xml中配置
<property>
<name>dfs.blocksize</name>
<value>134217728</value>
<description>
文件的默认块大小(以字节为单位)。还可以可以使用以下后缀(不区分大小写)(例如128k,512m,1g等),或提供完整的字节大小(例如134217728为128 MB)。
</description>
</property>
注意:默认配置就是最佳实践
2、split大小由minSize、maxSize、blocksize决定,以下是默认配置情况下的。
- long minSize = 1L
- long maxSize = Long.MAX_VALUE
- blocksize = 134217728
由上面三个参数就可以计算出分片大小了
在mapreduce的FileInputFormat类中的getSplits() 方法对文件进行split,算法如下:
Math.max(minSize,Math.min(maxSize, blockSize)),其中maxSize是取得longValueMax的值
1.如果blockSize小于maxSize && blockSize 大于 minSize之间,那么split就是blockSize(一对一);
2.如果blockSize小于maxSize && blockSize 小于 minSize之间,那么split就是minSize;
3.如果blockSize大于maxSize && maxSize 大于 minSize之间,那么split就是maxSize(多对一);
4.如果blockSize大于maxSize && maxSize 小于 minSize之间,那么split就是maxSize(不存在这种关系)。
注意:split大小如何调整,split只能是一个文件的分片,不能让多个小文件“划入”一个split中。
文件的最后一个分片可能会超过128MB,由于常量SPLIT_SLOP = 1.1决定,大小范围在:0MB < lastSplit < 128+12.8 MB。也就是说:当文件大小满足bytesRemaining)/splitSize(默认128m) > SPLIT_SLOP进行切片。
这里拿一段HadoopRDD的getPartition方法来说:
假设分区数是1.
获取的分片的过程通过调用FileInputFormat.getSplits来实现分片
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
Stopwatch sw = new Stopwatch().start();
//获取所有的FileStatus
//ListStatus 方法里面:
// 1,判断是否需要递归
//2,接着是创建路径过滤器,筛选掉一些我们不需要的文件,入以_,.开头的
//3,根据 mapreduce.input.fileinputformat.list-status.num-threads 决定是并发还是单线程
FileStatus[] files = listStatus(job);
// Save the number of input files for metrics/loadgen
job.setLong(NUM_INPUT_FILES, files.length);
long totalSize = 0; // compute total size
for (FileStatus file: files) { // check we have valid files
if (file.isDirectory()) {
throw new IOException("Not a file: "+ file.getPath());
}
totalSize += file.getLen();
}
//获取目标分片 goalsize 和最小 minsize ,numSplits在spark中为min(你设置的分区数,2)。
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
// generate splits
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
NetworkTopology clusterMap = new NetworkTopology();
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
FileSystem fs = path.getFileSystem(job);
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
//判断文件是否支持切分,不压缩或者压缩方式为 BZip2Codec 支持切分
if (isSplitable(fs, path)) {
long blockSize = file.getBlockSize();
//Math.max(minSize, Math.min(goalSize, blockSize))
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
long bytesRemaining = length;
//bytesRemaining/128m(默认)>1.1进行切片
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
splitHosts[0], splitHosts[1]));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
- bytesRemaining, bytesRemaining, clusterMap);
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
splitHosts[0], splitHosts[1]));
}
} else {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.elapsedMillis());
}
return splits.toArray(new FileSplit[splits.size()]);
}
网友评论