MapReduce

作者: 初夏时的猫 | 来源:发表于2021-12-05 10:56 被阅读0次

提交

提交源码

//true代表打印执行过程
boolean b = job.waitForCompletion(true);

Job.java

public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,                                            ClassNotFoundException {
//Job现在状态还是DEFINE,会执行submit
    if (state == JobState.DEFINE) {
      submit();
    if (verbose) {
//监控打印job的信息,执行完临时目录的内容就删掉了
      monitorAndPrintJob();
    } else {
      // get the completion poll interval from the client.
      int completionPollIntervalMillis = 
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
    }
    return isSuccessful();
  }
    }
public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    //确认状态
    ensureState(JobState.DEFINE);
    //设置新旧api兼容
    setUseNewAPI();
    //设置连接,如果没设置用户,doAs设置当前主机为登录用户
    connect();
    //设置提交job的fs,客户端,状态等信息
    final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(),          cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException, 
      ClassNotFoundException {
       //墨迹半天终于提交了
        return       submitter.submitJobInternal(Job.this, cluster);
    //提交流程结束,JobState改变为running
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());

提交进JobSubmitter.java

JobStatus submitJobInternal(Job job, Cluster cluster) 
  throws ClassNotFoundException, InterruptedException, IOException {

    //检查输出路径 
    checkSpecs(job);
    //进去看一眼,全是site.xml,存的都是配置文件
    Configuration conf = job.getConfiguration();
//字面意思,添加MR框架去分布式缓存
//大概就是把配置文件发给每个maptask和//reducetask    addMRFrameworkToDistributedCache(conf);
    //会生成一个临时目录,可为什么在d盘啊
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    //configure the command line options correctly on the submitting dfs
    InetAddress ip = InetAddress.getLocalHost();
    //生成了唯一标识jobid
    JobID jobId = submitClient.getNewJobID();
    job.setJobID(jobId);
//在刚才生成的临时目录下定义路径,准备生成
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    JobStatus status = null;
   //配置suffleid/qeque什么的
   ...
   //这个方法一执行,刚才的定义的路径就生成文件夹了
  //进到JobSubmitter里了,执行rUploader.uploadResources
   //然后进JobResourceUploader里,mkdir生成了文件夹
  //uploadJobJar,集群模式没jar,要提交到集群,所以先提交到文件夹里
   copyAndConfigureFiles(job, submitJobDir);
   //路径是文件夹路径/job.xml
   Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
      
      // Create the splits for the job
      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
     //切片源码,执行完切片文件放入刚才文件夹
      int maps = writeSplits(job, submitJobDir);
     //设置切片个数,切片赋给MRJobConfig.NUM_MAPS了
      conf.setInt(MRJobConfig.NUM_MAPS, maps);
     ...
  //把xml放进来了,都是配置信息
   writeConf(conf, submitJobFile);
      //
      // Now, actually submit the job (using the submit name)
      //
      printTokens(jobId, job.getCredentials());
    //设置job为的status变为running
      status = submitClient.submitJob(
          jobId, submitJobDir.toString(), job.getCredentials());

切片

切片源码

JobSubmitter.java

  private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
      Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    //jobSubmitDir 存储路径(就是生成那个临时文件夹)
    //jConf是job的信息,id什么的
    JobConf jConf = (JobConf)job.getConfiguration();
    int maps;
    //hadoop3.x的切片方式,我就进入这个方法。
    if (jConf.getUseNewMapper()) {
      maps = writeNewSplits(job, jobSubmitDir);
    } else {
   //hadoop2.x的切片方式
      maps = writeOldSplits(jConf, jobSubmitDir);
    }
    return maps;
  }
 //hadoop3.x具体切片
  private <T extends InputSplit>
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    InputFormat<?, ?> input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
   //切片了!!!!!!!!!!!
    List<InputSplit> splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
  }

InputFormat有很多实现类。默认进入FileInputFormat.java实现类
FileInputFormat.java

 public static long getMaxSplitSize(JobContext context) {
   //SPLIT_MAXSIZE不设置没有
    return context.getConfiguration().getLong(SPLIT_MAXSIZE, 
                                              Long.MAX_VALUE);
  }
  public List<InputSplit> getSplits(JobContext job) throws IOException {
    StopWatch sw = new StopWatch().start();
   //切片最小的size,getFormatMinSplitSize()返回1
   //getMinSplitSize()设置在yarn-default里,默认值是0
   //mapreduce.input.fileinputformat.split.minsize
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    //切片最大的size
    long maxSize = getMaxSplitSize(job);
    // generate splits
    List<InputSplit> splits = new ArrayList<InputSplit>();
    List<FileStatus> files = listStatus(job);
   ...
    循环遍历文件列表进行切片,说明是按照文件进行切分
    for (FileStatus file: files) {
             if (ignoreDirs && file.isDirectory()) {
        continue;
      }
      Path path = file.getPath();
      long length = file.getLen();
      if (length != 0) {
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        //文件是否可以切割
        if (isSplitable(job, path)) {
          //获取块大小
          long blockSize = file.getBlockSize();
           //获取切片大小
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);
          //一个文件的大小
          long bytesRemaining = length;
          //SPLIT_SLOP=1.1 
          //如果文件大于切片大小的1.1倍就切片
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
            bytesRemaining -= splitSize;
          }
          //感觉就是循环切完片了,如果还剩下点,就放在最后一片
            if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }
      }else { 
          // not splitable
      }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    sw.stop();
  }
图片.png
图片.png
图片.png

提交


图片.png
切片
图片.png

相关文章

网友评论

      本文标题:MapReduce

      本文链接:https://www.haomeiwen.com/subject/btsixrtx.html