美文网首页
《Hadoop-MapReduce源码解析》之二: org.ap

《Hadoop-MapReduce源码解析》之二: org.ap

作者: 大数据ZRL | 来源:发表于2024-03-02 14:59 被阅读0次

    1. org.apache.hadoop.mapreduce.JobSubmitter#submitJobInternal

      /**
       * Internal method for submitting jobs to the system.
       * The job submission process involves:
       *   1. Checking the input and output specifications of the job.
       *   1. 检查作业输入输出规范
       *   2. Computing the InputSplits for the job.
       *   2. 计算作业的输入分片
       *   3. Setup the requisite accounting information for the DistributedCache of the job, if necessary.
       *   3. 如有必要,请为作业的分布式缓存设置必要的记帐信息
       *   4. Copying the job's jar and configuration to the map-reduce system directory on the distributed file-system.
       *   4. 将作业的jar和配置复制到分布式文件系统上的map reduce系统目录中
       *   5. Submitting the job to the JobTracker and optionally monitoring it's status.
       *   5. 将作业提交给JobTracker,并可选择监视其状态
       * Params:
       *   job – the configuration to submit cluster – the handle to the Cluster
       * Throws:
       *   ClassNotFoundException –
       *   InterruptedException –
       *   IOException
       */
      JobStatus submitJobInternal(Job job, Cluster cluster) 
      throws ClassNotFoundException, InterruptedException, IOException {
    
        // 验证作业的输出规范
        // 通常检查输出路径是否已经存在,当它已经存在时抛出异常,这样输出就不会被覆盖
        checkSpecs(job);
    
        // 除非明确关闭,否则Hadoop默认指定两个资源,按类路径的顺序加载:
        // core-default.xml:hadoop的只读默认值。
        // core-site.xml:给定hadoop安装的特定于站点的配置。
        Configuration conf = job.getConfiguration();
    
        // 从MRJobConfig.MAPREDUCE_APPLICATION_FRAMEWORK_PATH("mapreduce.application.framework.path")中
        // 解析路径中的任何符号链接
        // 解析后的uri添加到分布式缓存中:
        // MRJobConfig.CACHE_ARCHIVES = "mapreduce.job.cache.archives";
        // conf.set(MRJobConfig.CACHE_ARCHIVES, archives == null ? uri.toString()
        //       : archives + "," + uri.toString());
        addMRFrameworkToDistributedCache(conf);
    
        // 获取放置作业特定文件的暂存目录
        Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
        
        //configure the command line options correctly on the submitting dfs
        InetAddress ip = InetAddress.getLocalHost();
        if (ip != null) {
          submitHostAddress = ip.getHostAddress();
          submitHostName = ip.getHostName();
          // 设置提交任务的主机名称和地址
          conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
          conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
        }
    
        // 这里的submitClient在后续解释,即:LocalJobRunner或者YARNRunner
        // 创建Applicant,生成JobId,返回JobId
        JobID jobId = submitClient.getNewJobID();
        job.setJobID(jobId);
    
        // 提交作业的路径(Path parent, String child),将两个参数拼接为一个新路径
        Path submitJobDir = new Path(jobStagingArea, jobId.toString());
        // 作业状态
        JobStatus status = null;
        try {
          conf.set(MRJobConfig.USER_NAME,
              UserGroupInformation.getCurrentUser().getShortUserName());
          conf.set("hadoop.http.filter.initializers", 
              "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
          conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
          LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
              + " as the submit dir");
          // get delegation token for the dir
          // 从与传递的路径(作业文件目录)相对应的名称节点获取委派令牌
          TokenCache.obtainTokensForNamenodes(job.getCredentials(),
              new Path[] { submitJobDir }, conf);
          
          // 从所有NAMENODE节点处获取委派令牌
          populateTokenCache(conf, job.getCredentials());
    
          // generate a secret to authenticate shuffle transfers
          // 获取Shuffle密钥来授权Shuffle转换
          if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
            KeyGenerator keyGen;
            try {
              keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
              keyGen.init(SHUFFLE_KEY_LENGTH);
            } catch (NoSuchAlgorithmException e) {
              throw new IOException("Error generating shuffle secret key", e);
            }
            SecretKey shuffleKey = keyGen.generateKey();
            TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
                job.getCredentials());
          }
          // 在Hadoop MapReduce中,当进行数据溢出(spill)时,会将部分数据从内存中写入磁盘以释内存间
          // 为保证数据安全,当启用加密中间数据溢出时,最大ApplicationMaster(AM)尝试次数设置为1
          if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
            conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
            LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
                    "data spill is enabled");
          }
    
          // 使用命令行选项-libjars、-files、-archives配置用户的jobconf
          // 并上载和配置与传递的作业相关的文件、libjar、jobjar和归档文件
          copyAndConfigureFiles(job, submitJobDir);
          
          // 获取作业conf文件,即:new Path(jobSubmitDir, "job.xml");
          Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
          
          // Create the splits for the job
          // 计算任务输入的分片,并返回分片数量,即map任务的数量
          LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
          int maps = writeSplits(job, submitJobDir);  
          conf.setInt(MRJobConfig.NUM_MAPS, maps);
          LOG.info("number of splits:" + maps);
    
          // 如果计算出来的map数大于设置的或者默认的最大map数,抛出异常
          int maxMaps = conf.getInt(MRJobConfig.JOB_MAX_MAP,
              MRJobConfig.DEFAULT_JOB_MAX_MAP);
          if (maxMaps >= 0 && maxMaps < maps) {
            throw new IllegalArgumentException("The number of map tasks " + maps +
                " exceeded limit " + maxMaps);
          }
    
          // write "queue admins of the queue to which job is being submitted"
          // to job file.
          // 将“作业提交到什么队列”写入job文件
          String queue = conf.get(MRJobConfig.QUEUE_NAME,
              JobConf.DEFAULT_QUEUE_NAME);
          AccessControlList acl = submitClient.getQueueAdmins(queue);
          conf.set(toFullPropertyName(queue,
              QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
    
          // removing jobtoken referrals before copying the jobconf to HDFS
          // as the tasks don't need this setting, actually they may break
          // because of it if present as the referral will point to a
          // different job.
          // 在将jobconf复制到HDFS之前删除jobtoken引用,因为任务不需要此设置;
          // 实际上它们可能会因此而中断,因为引用将指向不同的作业。
          TokenCache.cleanUpTokenReferral(conf);
    
          if (conf.getBoolean(
              MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
              MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
            // Add HDFS tracking ids
            // 添加DHFS tracking ids:跟踪标识符,该标识符可用于在多个客户端会话中关联令牌的使用情况
            ArrayList<String> trackingIds = new ArrayList<String>();
            for (Token<? extends TokenIdentifier> t :
                job.getCredentials().getAllTokens()) {
              trackingIds.add(t.decodeIdentifier().getTrackingId());
            }
            conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
                trackingIds.toArray(new String[trackingIds.size()]));
          }
    
          // Set reservation info if it exists
          // reservationId是全局唯一作业的保留标识符,如果作业没有任何关联的保留,则为null
          ReservationId reservationId = job.getReservationId();
          if (reservationId != null) {
            conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
          }
    
          // Write job file to submit dir
          // 将submitJobFile写到JobTracker的文件系统中去
          writeConf(conf, submitJobFile);
          
          //
          // Now, actually submit the job (using the submit name)
          //
          printTokens(jobId, job.getCredentials());
          // 正式提交Job到Yarn或者本地
          status = submitClient.submitJob(  // 具体见Hadoop-MapReduce源码解析》之三
              jobId, submitJobDir.toString(), job.getCredentials());
          if (status != null) {
            // 返回Job提交后的状态
            return status;
          } else {
            // 任务启动失败
            throw new IOException("Could not launch job");
          }
        } finally {
          if (status == null) {
            LOG.info("Cleaning up the staging area " + submitJobDir);
            if (jtFs != null && submitJobDir != null)
              // 清空暂存目录
              jtFs.delete(submitJobDir, true);
    
          }
        }
      }
    

    相关文章

      网友评论

          本文标题:《Hadoop-MapReduce源码解析》之二: org.ap

          本文链接:https://www.haomeiwen.com/subject/fafoadtx.html