美文网首页
Hadoop中job提交split的原理解析

Hadoop中job提交split的原理解析

作者: 吃货大米饭 | 来源:发表于2019-07-05 11:12 被阅读0次

    一、定义

    1、block:block是物理切块,在文件上传到HDFS文件系统后,对大文将以每128MB的大小切分若干,存放在不同的DataNode上。
    2、split:split是逻辑切片,在mapreduce中的map task开始之前,将文件按照指定的大小切割成若干个部分,每一部分称为一个split,默认是split的大小与block的大小相等,均为128MB。
    注意:在hadoop1.x版本中,block默认的大小为64MB,在hadoop2.x版本修改成了128MB

    2、参数设置

    1、block的默认配置在hdfs-site.xml中配置

    <property>
      <name>dfs.blocksize</name>
      <value>134217728</value>
      <description>
            文件的默认块大小(以字节为单位)。还可以可以使用以下后缀(不区分大小写)(例如128k,512m,1g等),或提供完整的字节大小(例如134217728为128 MB)。
      </description>
    </property>
    

    注意:默认配置就是最佳实践
    2、split大小由minSizemaxSizeblocksize决定,以下是默认配置情况下的。

    • long minSize = 1L
    • long maxSize = Long.MAX_VALUE
    • blocksize = 134217728
      由上面三个参数就可以计算出分片大小了
      在mapreduce的FileInputFormat类中的getSplits() 方法对文件进行split,算法如下:
      Math.max(minSize,Math.min(maxSize, blockSize)),其中maxSize是取得longValueMax的值
      1.如果blockSize小于maxSize && blockSize 大于 minSize之间,那么split就是blockSize(一对一);
      2.如果blockSize小于maxSize && blockSize 小于 minSize之间,那么split就是minSize;
      3.如果blockSize大于maxSize && maxSize 大于 minSize之间,那么split就是maxSize(多对一);
      4.如果blockSize大于maxSize && maxSize 小于 minSize之间,那么split就是maxSize(不存在这种关系)。
      注意:split大小如何调整,split只能是一个文件的分片,不能让多个小文件“划入”一个split中。
      文件的最后一个分片可能会超过128MB,由于常量SPLIT_SLOP = 1.1决定,大小范围在:0MB < lastSplit < 128+12.8 MB。也就是说:当文件大小满足bytesRemaining)/splitSize(默认128m) > SPLIT_SLOP进行切片。

    这里拿一段HadoopRDD的getPartition方法来说:
    假设分区数是1.
    获取的分片的过程通过调用FileInputFormat.getSplits来实现分片

    public InputSplit[] getSplits(JobConf job, int numSplits)
        throws IOException {
        Stopwatch sw = new Stopwatch().start();
        //获取所有的FileStatus
        //ListStatus 方法里面:
        // 1,判断是否需要递归 
        //2,接着是创建路径过滤器,筛选掉一些我们不需要的文件,入以_,.开头的
        //3,根据 mapreduce.input.fileinputformat.list-status.num-threads 决定是并发还是单线程 
        FileStatus[] files = listStatus(job);
        
        // Save the number of input files for metrics/loadgen
        job.setLong(NUM_INPUT_FILES, files.length);
        long totalSize = 0;                           // compute total size
        for (FileStatus file: files) {                // check we have valid files
          if (file.isDirectory()) {
            throw new IOException("Not a file: "+ file.getPath());
          }
          totalSize += file.getLen();
        }
      //获取目标分片 goalsize 和最小 minsize ,numSplits在spark中为min(你设置的分区数,2)。
        long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
        long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
          FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
    
        // generate splits
        ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
        NetworkTopology clusterMap = new NetworkTopology();
        for (FileStatus file: files) {
          Path path = file.getPath();
          long length = file.getLen();
          if (length != 0) {
            FileSystem fs = path.getFileSystem(job);
            BlockLocation[] blkLocations;
            if (file instanceof LocatedFileStatus) {
              blkLocations = ((LocatedFileStatus) file).getBlockLocations();
            } else {
              blkLocations = fs.getFileBlockLocations(file, 0, length);
            }
    //判断文件是否支持切分,不压缩或者压缩方式为 BZip2Codec 支持切分 
            if (isSplitable(fs, path)) {
              long blockSize = file.getBlockSize();
    //Math.max(minSize, Math.min(goalSize, blockSize))
              long splitSize = computeSplitSize(goalSize, minSize, blockSize);
    
              long bytesRemaining = length;
    //bytesRemaining/128m(默认)>1.1进行切片
              while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
                String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
                    length-bytesRemaining, splitSize, clusterMap);
                splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                    splitHosts[0], splitHosts[1]));
                bytesRemaining -= splitSize;
              }
    
              if (bytesRemaining != 0) {
                String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
                    - bytesRemaining, bytesRemaining, clusterMap);
                splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
                    splitHosts[0], splitHosts[1]));
              }
            } else {
              String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
              splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
            }
          } else { 
            //Create empty hosts array for zero length files
            splits.add(makeSplit(path, 0, length, new String[0]));
          }
        }
        sw.stop();
        if (LOG.isDebugEnabled()) {
          LOG.debug("Total # of splits generated by getSplits: " + splits.size()
              + ", TimeTaken: " + sw.elapsedMillis());
        }
        return splits.toArray(new FileSplit[splits.size()]);
      }
    

    相关文章

      网友评论

          本文标题:Hadoop中job提交split的原理解析

          本文链接:https://www.haomeiwen.com/subject/ixfshctx.html