美文网首页
《Hadoop-MapReduce源码解析》之二: org.ap

《Hadoop-MapReduce源码解析》之二: org.ap

作者: 大数据ZRL | 来源:发表于2024-03-02 14:59 被阅读0次

1. org.apache.hadoop.mapreduce.JobSubmitter#submitJobInternal

  /**
   * Internal method for submitting jobs to the system.
   * The job submission process involves:
   *   1. Checking the input and output specifications of the job.
   *   1. 检查作业输入输出规范
   *   2. Computing the InputSplits for the job.
   *   2. 计算作业的输入分片
   *   3. Setup the requisite accounting information for the DistributedCache of the job, if necessary.
   *   3. 如有必要,请为作业的分布式缓存设置必要的记帐信息
   *   4. Copying the job's jar and configuration to the map-reduce system directory on the distributed file-system.
   *   4. 将作业的jar和配置复制到分布式文件系统上的map reduce系统目录中
   *   5. Submitting the job to the JobTracker and optionally monitoring it's status.
   *   5. 将作业提交给JobTracker,并可选择监视其状态
   * Params:
   *   job – the configuration to submit cluster – the handle to the Cluster
   * Throws:
   *   ClassNotFoundException –
   *   InterruptedException –
   *   IOException
   */
  JobStatus submitJobInternal(Job job, Cluster cluster) 
  throws ClassNotFoundException, InterruptedException, IOException {

    // 验证作业的输出规范
    // 通常检查输出路径是否已经存在,当它已经存在时抛出异常,这样输出就不会被覆盖
    checkSpecs(job);

    // 除非明确关闭,否则Hadoop默认指定两个资源,按类路径的顺序加载:
    // core-default.xml:hadoop的只读默认值。
    // core-site.xml:给定hadoop安装的特定于站点的配置。
    Configuration conf = job.getConfiguration();

    // 从MRJobConfig.MAPREDUCE_APPLICATION_FRAMEWORK_PATH("mapreduce.application.framework.path")中
    // 解析路径中的任何符号链接
    // 解析后的uri添加到分布式缓存中:
    // MRJobConfig.CACHE_ARCHIVES = "mapreduce.job.cache.archives";
    // conf.set(MRJobConfig.CACHE_ARCHIVES, archives == null ? uri.toString()
    //       : archives + "," + uri.toString());
    addMRFrameworkToDistributedCache(conf);

    // 获取放置作业特定文件的暂存目录
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    
    //configure the command line options correctly on the submitting dfs
    InetAddress ip = InetAddress.getLocalHost();
    if (ip != null) {
      submitHostAddress = ip.getHostAddress();
      submitHostName = ip.getHostName();
      // 设置提交任务的主机名称和地址
      conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
      conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
    }

    // 这里的submitClient在后续解释,即:LocalJobRunner或者YARNRunner
    // 创建Applicant,生成JobId,返回JobId
    JobID jobId = submitClient.getNewJobID();
    job.setJobID(jobId);

    // 提交作业的路径(Path parent, String child),将两个参数拼接为一个新路径
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    // 作业状态
    JobStatus status = null;
    try {
      conf.set(MRJobConfig.USER_NAME,
          UserGroupInformation.getCurrentUser().getShortUserName());
      conf.set("hadoop.http.filter.initializers", 
          "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
      LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
          + " as the submit dir");
      // get delegation token for the dir
      // 从与传递的路径(作业文件目录)相对应的名称节点获取委派令牌
      TokenCache.obtainTokensForNamenodes(job.getCredentials(),
          new Path[] { submitJobDir }, conf);
      
      // 从所有NAMENODE节点处获取委派令牌
      populateTokenCache(conf, job.getCredentials());

      // generate a secret to authenticate shuffle transfers
      // 获取Shuffle密钥来授权Shuffle转换
      if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
        KeyGenerator keyGen;
        try {
          keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
          keyGen.init(SHUFFLE_KEY_LENGTH);
        } catch (NoSuchAlgorithmException e) {
          throw new IOException("Error generating shuffle secret key", e);
        }
        SecretKey shuffleKey = keyGen.generateKey();
        TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
            job.getCredentials());
      }
      // 在Hadoop MapReduce中,当进行数据溢出(spill)时,会将部分数据从内存中写入磁盘以释内存间
      // 为保证数据安全,当启用加密中间数据溢出时,最大ApplicationMaster(AM)尝试次数设置为1
      if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
        conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
        LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
                "data spill is enabled");
      }

      // 使用命令行选项-libjars、-files、-archives配置用户的jobconf
      // 并上载和配置与传递的作业相关的文件、libjar、jobjar和归档文件
      copyAndConfigureFiles(job, submitJobDir);
      
      // 获取作业conf文件,即:new Path(jobSubmitDir, "job.xml");
      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
      
      // Create the splits for the job
      // 计算任务输入的分片,并返回分片数量,即map任务的数量
      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
      int maps = writeSplits(job, submitJobDir);  
      conf.setInt(MRJobConfig.NUM_MAPS, maps);
      LOG.info("number of splits:" + maps);

      // 如果计算出来的map数大于设置的或者默认的最大map数,抛出异常
      int maxMaps = conf.getInt(MRJobConfig.JOB_MAX_MAP,
          MRJobConfig.DEFAULT_JOB_MAX_MAP);
      if (maxMaps >= 0 && maxMaps < maps) {
        throw new IllegalArgumentException("The number of map tasks " + maps +
            " exceeded limit " + maxMaps);
      }

      // write "queue admins of the queue to which job is being submitted"
      // to job file.
      // 将“作业提交到什么队列”写入job文件
      String queue = conf.get(MRJobConfig.QUEUE_NAME,
          JobConf.DEFAULT_QUEUE_NAME);
      AccessControlList acl = submitClient.getQueueAdmins(queue);
      conf.set(toFullPropertyName(queue,
          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

      // removing jobtoken referrals before copying the jobconf to HDFS
      // as the tasks don't need this setting, actually they may break
      // because of it if present as the referral will point to a
      // different job.
      // 在将jobconf复制到HDFS之前删除jobtoken引用,因为任务不需要此设置;
      // 实际上它们可能会因此而中断,因为引用将指向不同的作业。
      TokenCache.cleanUpTokenReferral(conf);

      if (conf.getBoolean(
          MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
          MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
        // Add HDFS tracking ids
        // 添加DHFS tracking ids:跟踪标识符,该标识符可用于在多个客户端会话中关联令牌的使用情况
        ArrayList<String> trackingIds = new ArrayList<String>();
        for (Token<? extends TokenIdentifier> t :
            job.getCredentials().getAllTokens()) {
          trackingIds.add(t.decodeIdentifier().getTrackingId());
        }
        conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
            trackingIds.toArray(new String[trackingIds.size()]));
      }

      // Set reservation info if it exists
      // reservationId是全局唯一作业的保留标识符,如果作业没有任何关联的保留,则为null
      ReservationId reservationId = job.getReservationId();
      if (reservationId != null) {
        conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
      }

      // Write job file to submit dir
      // 将submitJobFile写到JobTracker的文件系统中去
      writeConf(conf, submitJobFile);
      
      //
      // Now, actually submit the job (using the submit name)
      //
      printTokens(jobId, job.getCredentials());
      // 正式提交Job到Yarn或者本地
      status = submitClient.submitJob(  // 具体见Hadoop-MapReduce源码解析》之三
          jobId, submitJobDir.toString(), job.getCredentials());
      if (status != null) {
        // 返回Job提交后的状态
        return status;
      } else {
        // 任务启动失败
        throw new IOException("Could not launch job");
      }
    } finally {
      if (status == null) {
        LOG.info("Cleaning up the staging area " + submitJobDir);
        if (jtFs != null && submitJobDir != null)
          // 清空暂存目录
          jtFs.delete(submitJobDir, true);

      }
    }
  }

相关文章

网友评论

      本文标题:《Hadoop-MapReduce源码解析》之二: org.ap

      本文链接:https://www.haomeiwen.com/subject/fafoadtx.html