美文网首页
从源码角度分析MapReduce运作_一.准备阶段

从源码角度分析MapReduce运作_一.准备阶段

作者: scott_alpha | 来源:发表于2020-02-11 21:58 被阅读0次

    一.目录

    本系列文章对Hadoop知识进行复盘。
    分为两个阶段,建立连接阶段,提交job阶段。

    waitForCompletion()
     
    submit();
     
    // 1建立连接
       connect();  
          // 1)创建提交Job的代理
          new Cluster(getConfiguration());
             // (1)判断是本地yarn还是远程
             initialize(jobTrackAddr, conf);
     
    // 2 提交job
    submitter.submitJobInternal(Job.this, cluster)
       // 1)创建给集群提交数据的Stag路径
       Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
     
       // 2)获取jobid ,并创建Job路径
       JobID jobId = submitClient.getNewJobID();
     
       // 3)拷贝jar包到集群
    copyAndConfigureFiles(job, submitJobDir); 
       rUploader.uploadFiles(job, jobSubmitDir);
     
       // 4)计算切片,生成切片规划文件
    writeSplits(job, submitJobDir);
          maps = writeNewSplits(job, jobSubmitDir);
          input.getSplits(job);
     
       // 5)向Stag路径写XML配置文件
    writeConf(conf, submitJobFile);
       conf.writeXml(out);
     
       // 6)提交Job,返回提交状态
    status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
    

    二.建立连接

    客户端提交MR程序后,首先是运行job.waitForCompletion(true),所以从waitForCompletion方法开始分析。

    /**
       * Submit the job to the cluster and wait for it to finish. 
       * @param verbose print the progress to the user
       * @return true if the job succeeded
       * @throws IOException thrown if the communication with the 
       *         <code>JobTracker</code> is lost
       */
      public boolean waitForCompletion(boolean verbose
                                       ) throws IOException, InterruptedException,
                                                ClassNotFoundException {
        if (state == JobState.DEFINE) {
          submit();    //提交作业,重点
        }
        if (verbose) {
          monitorAndPrintJob();  // 监控任务状态
        } else {
          // get the completion poll interval from the client.
          int completionPollIntervalMillis = 
            Job.getCompletionPollInterval(cluster.getConf());
          while (!isComplete()) {
            try {
              Thread.sleep(completionPollIntervalMillis);
            } catch (InterruptedException ie) {
            }
          }
        }
        return isSuccessful();
      }
    

    进入submit方法

    /**
       * Submit the job to the cluster and return immediately.
       * @throws IOException
       */
      public void submit() 
             throws IOException, InterruptedException, ClassNotFoundException {
        ensureState(JobState.DEFINE);
        setUseNewAPI();
        // 1.建立连接
        connect(); 
        final JobSubmitter submitter = 
            getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
        status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
          public JobStatus run() throws IOException, InterruptedException, 
          ClassNotFoundException {
         // 2.提交job
            return submitter.submitJobInternal(Job.this, cluster); 
          }
        });
        state = JobState.RUNNING;
        LOG.info("The url to track the job: " + getTrackingURL());
       }
    

    进入connect方法

    private synchronized void connect()
              throws IOException, InterruptedException, ClassNotFoundException {
        if (cluster == null) {
          cluster = 
            ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
                       public Cluster run()
                              throws IOException, InterruptedException, 
                                     ClassNotFoundException {
                         //  1)创建提交job的代理 
                         return new Cluster(getConfiguration()); 
                       }
                     });
        }
      }
    
    public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
          throws IOException {
        this.conf = conf;
        this.ugi = UserGroupInformation.getCurrentUser();
        // 判断是本地还是远程
        initialize(jobTrackAddr, conf); 
      }
      
      private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
          throws IOException {
    
        synchronized (frameworkLoader) {
          for (ClientProtocolProvider provider : frameworkLoader) {
            LOG.debug("Trying ClientProtocolProvider : "
                + provider.getClass().getName());
            ClientProtocol clientProtocol = null; 
            try {
              if (jobTrackAddr == null) {
                // 如果是远程,则创建yarn代理;如果是本地,则创建local代理
                clientProtocol = provider.create(conf);  
              } else {
                clientProtocol = provider.create(jobTrackAddr, conf);
              }
    
              if (clientProtocol != null) {
                clientProtocolProvider = provider;
                client = clientProtocol;
                LOG.debug("Picked " + provider.getClass().getName()
                    + " as the ClientProtocolProvider");
                break;
              }
              else {
                LOG.debug("Cannot pick " + provider.getClass().getName()
                    + " as the ClientProtocolProvider - returned null protocol");
              }
            } 
            catch (Exception e) {
              LOG.info("Failed to use " + provider.getClass().getName()
                  + " due to error: " + e.getMessage());
            }
          }
        }
    

    三.提交job

    接着来看submitJobInternal方法,用来提交作业到集群上,主要是以下几个步骤:

    • 检查作业的输入输出
    • 计算作业的分片
    • 设置job相关的计算信息
    • 复制需要的jar和配置信息到文件系统上
    • 提交作业以及监控其状态
    /**
       * Internal method for submitting jobs to the system.
       */
      JobStatus submitJobInternal(Job job, Cluster cluster) 
      throws ClassNotFoundException, InterruptedException, IOException {
        //检查输出路径是否存在,若存在抛出异常
        checkSpecs(job);    
    
        ...
    
        // 1)创建给集群提交数据的stage目录
        Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    
        ....
    
        // 2)获取JobID,并创建job路径
        JobID jobId = submitClient.getNewJobID(); 
        job.setJobID(jobId);
        Path submitJobDir = new Path(jobStagingArea, jobId.toString());//创建staging目录下的JobID文件夹
        
        ...
    
        try {
          .....
         
          // 3)拷贝jar包到集群
          copyAndConfigureFiles(job, submitJobDir);
        
          .....
    
          Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
          
          // Create the splits for the job
          LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
          // 4)计算Jobmap端的切片,生成切片规划文件
          int maps = writeSplits(job, submitJobDir);  
          conf.setInt(MRJobConfig.NUM_MAPS, maps);
    
          ...
    
          // 5)把job的配置信息写入staging+JobID目录下的job.xml
          writeConf(conf, submitJobFile); 
          
          //
          // Now, actually submit the job (using the submit name)
          //
          printTokens(jobId, job.getCredentials());
          // 6)真正开始提交作业,返回提交状态
          status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials()); 
          if (status != null) {
            return status;
          } else {
            throw new IOException("Could not launch job");
          }
        } finally {
          if (status == null) {
            LOG.info("Cleaning up the staging area " + submitJobDir);
            if (jtFs != null && submitJobDir != null)     
              jtFs.delete(submitJobDir, true);//删除staging+JobID目录下所有东西
          }
        }
      }
    

    接着我们看writeSplits方法

    private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
          Path jobSubmitDir) throws IOException,
          InterruptedException, ClassNotFoundException {
        JobConf jConf = (JobConf)job.getConfiguration();
        int maps;
        if (jConf.getUseNewMapper()) {
          maps = writeNewSplits(job, jobSubmitDir);  // 获取新的切片,重点
        } else {  
          maps = writeOldSplits(jConf, jobSubmitDir);
        }
        return maps;
      }
    

    进入writeNewSplits方法

    private <T extends InputSplit> int writeNewSplits(JobContext job, 
    Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = job.getConfiguration();
        InputFormat<?, ?> input =
          ReflectionUtils.newInstance(job.getInputFormatClass(), conf); 
        List<InputSplit> splits = input.getSplits(job); // 获取切片,重点
        T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
    
        // sort the splits into order based on size, so that the biggest
        // go first
        Arrays.sort(array, new SplitComparator());
        JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
            jobSubmitDir.getFileSystem(conf), array);
        return array.length;
      }
    

    这里我们来看下FileInputFormat对应的getSplits方法

    /** 
       * Generate the list of files and make them into FileSplits.
       * @param job the job context
       * @throws IOException
       */
      public List<InputSplit> getSplits(JobContext job) throws IOException {
        StopWatch sw = new StopWatch().start();
        long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
        long maxSize = getMaxSplitSize(job);
    
        // generate splits
        List<InputSplit> splits = new ArrayList<InputSplit>();
        List<FileStatus> files = listStatus(job);
        for (FileStatus file: files) {
          Path path = file.getPath();
          long length = file.getLen();
          if (length != 0) {
            BlockLocation[] blkLocations;
            if (file instanceof LocatedFileStatus) {
              blkLocations = ((LocatedFileStatus) file).getBlockLocations();
            } else {
              FileSystem fs = path.getFileSystem(job.getConfiguration());
              blkLocations = fs.getFileBlockLocations(file, 0, length);
            }
            if (isSplitable(job, path)) {
              long blockSize = file.getBlockSize();
              long splitSize = computeSplitSize(blockSize, minSize, maxSize); // 获取切片大小,重点
    
              long bytesRemaining = length;
              while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {  // 判断文件是否切片,重点
                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                            blkLocations[blkIndex].getHosts(),
                            blkLocations[blkIndex].getCachedHosts()));
                bytesRemaining -= splitSize;
              }
              ....
        }
        return splits;
      }
    

    这里有两个点需要注意下:

    • 切片的大小,是通过Math.max(minSize, Math.min(maxSize, blockSize))获取。默认minSize的值为1,maxSize的值为Long类型的最大值(即9223372036854775807),blockSize是块大小,故默认切片大小为块大小。
    • 文件是否切片,是通过((double) bytesRemaining)/splitSize > SPLIT_SLOP判断。SPLIT_SLOP的值为1.1,如果剩余文件大小/切片大小>1.1,则切片。
      接着我们回到submitJobInternal方法中,查看submitClient.submitJob动作。submitClient有本地和yarn两种,这里以yarn方式举例。
    public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
      throws IOException, InterruptedException {
        
        addHistoryToken(ts);
        
        // Construct necessary information to start the MR AM
        ApplicationSubmissionContext appContext =
          createApplicationSubmissionContext(conf, jobSubmitDir, ts);
    
        // Submit to ResourceManager
        // 把job提交到ResourceManager上
        try {
          ApplicationId applicationId =
              resMgrDelegate.submitApplication(appContext);
    
          ApplicationReport appMaster = resMgrDelegate
              .getApplicationReport(applicationId);
          String diagnostics =
              (appMaster == null ?
                  "application report is null" : appMaster.getDiagnostics());
          if (appMaster == null
              || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
              || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
            throw new IOException("Failed to run job : " +
                diagnostics);
          }
          return clientCache.getClient(jobId).getJobStatus(jobId);
        } catch (YarnException e) {
          throw new IOException(e);
        }
      }
    

    最后就是监控任务状态,等待返回任务执行结果,参考monitorAndPrintJob方法。

    相关文章

      网友评论

          本文标题:从源码角度分析MapReduce运作_一.准备阶段

          本文链接:https://www.haomeiwen.com/subject/evyffhtx.html