美文网首页
Map阶段的切片逻辑

Map阶段的切片逻辑

作者: 苏坡闷 | 来源:发表于2019-03-07 17:31 被阅读0次

    1.切片逻辑入口 (JobSubmitter类中的submitJobInternal方法中)

      JobStatus submitJobInternal(Job job, Cluster cluster) 
      throws ClassNotFoundException, InterruptedException, IOException {
        ...
         
          // Create the splits for the job
          LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
          int maps = writeSplits(job, submitJobDir);
          ...
          }
        }
      }
    
    1. writeSplits()
    //切片方法主方法
    private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
          Path jobSubmitDir) throws IOException,
          InterruptedException, ClassNotFoundException {
        JobConf jConf = (JobConf)job.getConfiguration();
        int maps;
        //根据是否使用了新的API来确定使用那种切片方法
        if (jConf.getUseNewMapper()) {
          maps = writeNewSplits(job, jobSubmitDir);
        } else {
          maps = writeOldSplits(jConf, jobSubmitDir);
        }
        return maps;
    }
    

    3.writeNewSplits() 方法

    @SuppressWarnings("unchecked")
    private <T extends InputSplit>
    int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
        Configuration conf = job.getConfiguration();
        InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
    
        // 切片需要根据输入格式,调用getSplits()
        List<InputSplit> splits = input.getSplits(job);
        T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
    
        // sort the splits into order based on size, so that the biggest
        // go first
        Arrays.sort(array, new SplitComparator());
        //写入切片信息
        JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
            jobSubmitDir.getFileSystem(conf), array);
        return array.length;
    }
    

    4.默认的切片逻辑

    public List<InputSplit> getSplits(JobContext job) throws IOException {
        StopWatch sw = new StopWatch().start();      //以纳秒为单位测试执行时间
    
         // getFormatMinSplitSize()=1
         // getMinSplitSize(job)尝试获取mapreduce.input.fileinputformat.split.minsize=0
         // minSize默认是1
        long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
        // 尝试获取mapreduce.input.fileinputformat.split.maxsize,默认没有则使用Long.MaxValue
        long maxSize = getMaxSplitSize(job);
    
        // generate splits
        List<InputSplit> splits = new ArrayList<InputSplit>();
        // 获取输入路径下所有文件的状态信息
        List<FileStatus> files = listStatus(job);
    
        // 遍历每一个文件
        //由此可以看出这种切片策略是按照文件来切
        for (FileStatus file: files) {
            Path path = file.getPath();
            // 获取文件大小
            long length = file.getLen();
            if (length != 0) {
                BlockLocation[] blkLocations;   //File所属块的地址
                if (file instanceof LocatedFileStatus) {
                    blkLocations = ((LocatedFileStatus) file).getBlockLocations();   //本地文件,直接获取文件信息
                } else {
                    FileSystem fs = path.getFileSystem(job.getConfiguration());     //远程文件,需调取远程文件系统来获取块信息
                    blkLocations = fs.getFileBlockLocations(file, 0, length);
                }
                // 判断文件是否可切,如果可切,进行切割,
                if (isSplitable(job, path)) {
                    // 获取块大小,默认HDFS为128M,windows本地为32M,1.x版HDFS为64M
                    long blockSize = file.getBlockSize();
                    // 可以通过控制每片的大小,控制切片总数
                    long splitSize = computeSplitSize(blockSize, minSize, maxSize);
                    // 获取待切部分
                    long bytesRemaining = length;
                    // 判断待切部分 / 片大小 是否 > 1.1,如果大于,那就切一片,继续判断
                    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
                        int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                        splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                                    blkLocations[blkIndex].getHosts(),
                                    blkLocations[blkIndex].getCachedHosts()));
                        //起始位置分别为0,splitSize,2*splitSize........
                        bytesRemaining -= splitSize;
                    }
                    // 剩余部分/ 片大小 <=1.1 ,整个剩余部分作为1片! 每个文件,有可能最后一片会大于块大小,但是不会超过块的1.1倍
                    if (bytesRemaining != 0) {
                        int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                        splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
                    }
                } else { // not splitable
                    // 文件不可切,整个文件作为1片
                    splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                          blkLocations[0].getCachedHosts()));
                }
            } else { 
                //Create empty hosts array for zero length files
                splits.add(makeSplit(path, 0, length, new String[0]));
            }
        }
        // Save the number of input files for metrics/loadgen
        //设置mapreduce.input.fileinputformat.numinputfiles的值为输入文件的数量
        job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
        sw.stop();
        if (LOG.isDebugEnabled()) {
          LOG.debug("Total # of splits generated by getSplits: " + splits.size()
              + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
        }
        return splits;
    }
    

    4.1在此处可能会发生的问题

    如果splitSize等于blocksize,则InputSplit的起始位置与相应块在文件中offset一致,但如果splitSize小于blocksize,即通过参数mapreduce.input.fileinputformat.split.maxsize控制每个InputSplit的大小,那么InputSplit的起始位置就会位于文件块的内部。上述分割InputSplit的逻辑完全是针对大小进行的,那么就会存在将一行记录划分到两个InputSplit中的可能。但上述代码并未对这种情况进行处理,这也意味着在Map阶段存在数据不完整的可能,Hadoop当然不会允许这种情况的发生,而RecordReader就负责处理这种情况,下面以LineRecordReader为例,看看Hadoop是如何处理记录跨InputSplit的。LineRecordReader中的构造方法在初始化的时候调用一次,此方法将定位InputSplit中第一个换行符的位置,并将实际读取数据的位置定位到第一个换行符之后的位置,如果是第一个InputSplit则不用如此处理,而被过滤的内容则由读取该InputSplit之前的Split的LineRecordReader负责读取,确定第一个换行符位置的代码片段为: image.png

    读取InputSplit内容的代码位于next()方法中,具体代码为:


    image.png

    readLine()的功能是读取一行的数据到Text中,因为在分割FileSplit时是基于size的,如果一行被分割到两个split中,比如s1和s2中,在读取s1中的最后一行数据时,会一直读取到s2中的第一个换行符,这是在next()方法中实现的,而在处理s2时,则要将已经读取的数据跳过以避免重复读取,这是在构造方法中实现的。

    5.isSplitable()

    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        // 根据文件的后缀名,获取相关的压缩编码器
        final CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
        // 如果文件是个普通文件,没有后缀名或者后缀名不是一个压缩格式,返回true
        if (null == codec) {
            return true;
        }
        // 如果文件是压缩格式,继续判断当前压缩格式是否可切
        return codec instanceof SplittableCompressionCodec;
    }
    

    6.1JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array)方法


    image.png

    6.2进入createFile()方法:


    image.png

    6.3进入writeNewSplits()方法,它将切片数据写入切片文件,并得到切片元数据信息SplitMetaInfo数组info:


    image.png
    6.4最后进入writeJobSplitMetaInfo()方法,查看分片的元数据信息文件是如何产生的:
    image.png

    以上内容均基于TextInputFormat切片策略
    参考文档
    Hadoop-2.4.1源码分析--MapReduce作业切片(Split)过程
    https://blog.csdn.net/u010010428/article/details/51469994

    相关文章

      网友评论

          本文标题:Map阶段的切片逻辑

          本文链接:https://www.haomeiwen.com/subject/mgncpqtx.html