美文网首页
Hadoop wordcount作业提交流程源码分析

Hadoop wordcount作业提交流程源码分析

作者: 吃货大米饭 | 来源:发表于2019-08-29 15:45 被阅读0次

    一、概括:

    wordcount作业提交流程,主要集中在JobSubmitter.submitJobInternal中,包括检测输出目录合法性,设置作业提交信息(主机和用户),获得JobID,向HDFS中拷贝作业所需文件(Job.jar Job.xml split文件等)最后执行作业提交。这里以WordCount为例介绍提交流程.

    二、wordcount流程代码

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    
            //step 1: 获取job对象
            Configuration configuration = new Configuration();
            Job job = Job.getInstance(configuration);
    
            String input = "data/wc.data";
            String output = "out/";
            FileUtils.deleteTarget(output, configuration);
    
            //step 2: 设置jar的相关信息
            job.setJarByClass(WordCountApp.class);
    
            //step 3:设置自定义的mapper和reducer信息
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);
    
            //step 4:设置mapper阶段输出的key和value类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            //step 5:设置reducer阶段输出的key和value类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            //step 6:设置输入输出路径
            FileInputFormat.setInputPaths(job, new Path(input));
            FileOutputFormat.setOutputPath(job, new Path(output));
    
            //step 7:提交job
            boolean result = job.waitForCompletion(true);//此处进入作业提交流程,然后循环监控作业状态 
            System.exit(result ? 0 : 1);
        }
    

    三、流程主要代码讲解

    • waitForCompletion方法中主要是提交作业和监控状态,这里我们主要分析作业提交
    public boolean waitForCompletion(boolean verbose
                                       ) throws IOException, InterruptedException,
                                                ClassNotFoundException {
        if (state == JobState.DEFINE) {
          //提交作业
          submit();
        }
        if (verbose) {
          //监控job,并实时打印任务状态
          monitorAndPrintJob();
        } else {
          // get the completion poll interval from the client.
          int completionPollIntervalMillis = 
            Job.getCompletionPollInterval(cluster.getConf());
          while (!isComplete()) {
            try {
              Thread.sleep(completionPollIntervalMillis);
            } catch (InterruptedException ie) {
            }
          }
        }
        return isSuccessful();
      }
    
    • submit方法中主要是:1.连接集群。2.作业提交
    public void submit() 
             throws IOException, InterruptedException, ClassNotFoundException {
        ensureState(JobState.DEFINE);
        setUseNewAPI();
        //连接集群(lcoal ,Yarn)
        connect();
        final JobSubmitter submitter = 
            getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
        status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
          public JobStatus run() throws IOException, InterruptedException, 
          ClassNotFoundException {
             //提交作业
            return submitter.submitJobInternal(Job.this, cluster);
          }
        });
        state = JobState.RUNNING;
        LOG.info("The url to track the job: " + getTrackingURL());
       }
    
    • 连接集群时会通过conf构造一个cluster实例,重要的是cluster的初始化部份
    private synchronized void connect()
              throws IOException, InterruptedException, ClassNotFoundException {
        if (cluster == null) {
          cluster = 
            ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
                       public Cluster run()
                              throws IOException, InterruptedException, 
                                     ClassNotFoundException {
                         return new Cluster(getConfiguration());
                       }
                     });
        }
      }
    
    public Cluster(Configuration conf) throws IOException {
        this(null, conf);
      }
    
      public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
          throws IOException {
        this.conf = conf;
        this.ugi = UserGroupInformation.getCurrentUser();
        initialize(jobTrackAddr, conf);
      }
    
    private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
          throws IOException {
    
        synchronized (frameworkLoader) {
          for (ClientProtocolProvider provider : frameworkLoader) {
            LOG.debug("Trying ClientProtocolProvider : "
                + provider.getClass().getName());
            ClientProtocol clientProtocol = null; 
            try {
              if (jobTrackAddr == null) {
                //实际上是根据你的配置创建YarnRunner对象,还是LocalJobRunner对象
                clientProtocol = provider.create(conf);
              } else {
                clientProtocol = provider.create(jobTrackAddr, conf);
              }
              //初始化cluster内部成员变量
              if (clientProtocol != null) {
                clientProtocolProvider = provider;
                client = clientProtocol;
                LOG.debug("Picked " + provider.getClass().getName()
                    + " as the ClientProtocolProvider");
                break;
              }
              else {
                LOG.debug("Cannot pick " + provider.getClass().getName()
                    + " as the ClientProtocolProvider - returned null protocol");
              }
            } 
            catch (Exception e) {
              LOG.info("Failed to use " + provider.getClass().getName()
                  + " due to error: " + e.getMessage());
            }
          }
        }
    
        if (null == clientProtocolProvider || null == client) {
          throw new IOException(
              "Cannot initialize Cluster. Please check your configuration for "
                  + MRConfig.FRAMEWORK_NAME
                  + " and the correspond server addresses.");
        }
      }
    
    • YARNRunner 构造方法
    public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
          ClientCache clientCache) {
        this.conf = conf;
        try {
          this.resMgrDelegate = resMgrDelegate;
          this.clientCache = clientCache;
          this.defaultFileContext = FileContext.getFileContext(this.conf);
        } catch (UnsupportedFileSystemException ufe) {
          throw new RuntimeException("Error in instantiating YarnClient", ufe);
        }
      }
    
    • ResourceMgrDelegate构造方法
    public ResourceMgrDelegate(YarnConfiguration conf) {
        super(ResourceMgrDelegate.class.getName());
        this.conf = conf;
        //实际上是创建YarnClientImpl对象
        this.client = YarnClient.createYarnClient();
        init(conf);
        //实际上是创建rmClient对象
        start();
      }
    
    • 接下来看JobSubmitter.submitJobInternal
    JobStatus submitJobInternal(Job job, Cluster cluster) 
      throws ClassNotFoundException, InterruptedException, IOException {
        
        //检测输出目录合法性,是否已存在,或未设置
        //validate the jobs output specs 
        checkSpecs(job);
    
        Configuration conf = job.getConfiguration();
        addMRFrameworkToDistributedCache(conf);
        //获取staging路径,用以存放作业执行过程中用到的文件,默认位置/tmp/hadoop-yarn/staging/hadoop/.staging ,可通过yarn.app.mapreduce.am.staging-dir修改
        Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
        //configure the command line options correctly on the submitting dfs
        //主机名和地址设置 
        InetAddress ip = InetAddress.getLocalHost();
        if (ip != null) {
          submitHostAddress = ip.getHostAddress();
          submitHostName = ip.getHostName();
          conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
          conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
        }
         //获取新的JobID,此处需要RPC调用
        JobID jobId = submitClient.getNewJobID();
        job.setJobID(jobId);
        //获取提交的job目录:/tmp/hadoop-yarn/staging/root/.staging/job_xxxxxx
        Path submitJobDir = new Path(jobStagingArea, jobId.toString());
        JobStatus status = null;
        try {
          conf.set(MRJobConfig.USER_NAME,
              UserGroupInformation.getCurrentUser().getShortUserName());
          conf.set("hadoop.http.filter.initializers", 
              "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
          conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
          LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
              + " as the submit dir");
          // get delegation token for the dir
          TokenCache.obtainTokensForNamenodes(job.getCredentials(),
              new Path[] { submitJobDir }, conf);
          
          populateTokenCache(conf, job.getCredentials());
    
          // generate a secret to authenticate shuffle transfers
          if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
            KeyGenerator keyGen;
            try {
             
              int keyLen = CryptoUtils.isShuffleEncrypted(conf) 
                  ? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS, 
                      MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS)
                  : SHUFFLE_KEY_LENGTH;
              keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
              keyGen.init(keyLen);
            } catch (NoSuchAlgorithmException e) {
              throw new IOException("Error generating shuffle secret key", e);
            }
            SecretKey shuffleKey = keyGen.generateKey();
            TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
                job.getCredentials());
          }
          //向集群中拷贝所需文件,包含-files, -libjars and -archives和你作业的jar包
          copyAndConfigureFiles(job, submitJobDir);
          
          //设置job.xml路径
          Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
         
          // Create the splits for the job
          LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
           // 写分片文件(job.split文件、job.splitmetainfo文件)到提交的目录
           //获取map数量
          int maps = writeSplits(job, submitJobDir);
          conf.setInt(MRJobConfig.NUM_MAPS, maps);
          LOG.info("number of splits:" + maps);
    
          // write "queue admins of the queue to which job is being submitted"
          // to job file.
          //设置队列名
          String queue = conf.get(MRJobConfig.QUEUE_NAME,
              JobConf.DEFAULT_QUEUE_NAME);
          AccessControlList acl = submitClient.getQueueAdmins(queue);
          conf.set(toFullPropertyName(queue,
              QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
    
          // removing jobtoken referrals before copying the jobconf to HDFS
          // as the tasks don't need this setting, actually they may break
          // because of it if present as the referral will point to a
          // different job.
          TokenCache.cleanUpTokenReferral(conf);
    
          if (conf.getBoolean(
              MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
              MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
            // Add HDFS tracking ids
            ArrayList<String> trackingIds = new ArrayList<String>();
            for (Token<? extends TokenIdentifier> t :
                job.getCredentials().getAllTokens()) {
              trackingIds.add(t.decodeIdentifier().getTrackingId());
            }
            conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
                trackingIds.toArray(new String[trackingIds.size()]));
          }
    
          // Set reservation info if it exists
          ReservationId reservationId = job.getReservationId();
          if (reservationId != null) {
            conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
          }
    
          // Write job file to submit dir
          //将本job信息写入job.xml 到提交的目录
          writeConf(conf, submitJobFile);
          
          //
          // Now, actually submit the job (using the submit name)
          //
          printTokens(jobId, job.getCredentials());
          //真正提交job到RM,底层是通过rmClient提交应用
          status = submitClient.submitJob(
              jobId, submitJobDir.toString(), job.getCredentials());
          if (status != null) {
            return status;
          } else {
            throw new IOException("Could not launch job");
          }
        } finally {
          if (status == null) {
            LOG.info("Cleaning up the staging area " + submitJobDir);
            if (jtFs != null && submitJobDir != null)
              //删除提交的目录
              jtFs.delete(submitJobDir, true);
          }
        }
      }
    
    • writeSplits方法中主要的writeNewSplits方法
    private <T extends InputSplit>
      int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
          InterruptedException, ClassNotFoundException {
        Configuration conf = job.getConfiguration();
        //反射获取输入的inputFormat类型,默认是TextInputFormat
        InputFormat<?, ?> input =
          ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
        //inputFomat对读取的文件进行分片操作
        List<InputSplit> splits = input.getSplits(job);
        T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
    
        // sort the splits into order based on size, so that the biggest
        // go first
        //对分片进行排序,把分片大的放数组前面
        Arrays.sort(array, new SplitComparator());
        //将分片文件写入提交的目录中去
        JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
            jobSubmitDir.getFileSystem(conf), array);
        return array.length;
      }
    
    • 默认是走的TextInputFormat的getSplit方法
    public List<InputSplit> getSplits(JobContext job) throws IOException {
        Stopwatch sw = new Stopwatch().start();
        //分片最小size,默认为1
        long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
        //分片最大size,默认为Long.MAX_VALUE
        long maxSize = getMaxSplitSize(job);
    
        // generate splits
        List<InputSplit> splits = new ArrayList<InputSplit>();
        //获取文件的文件状态
        List<FileStatus> files = listStatus(job);
        for (FileStatus file: files) {
          Path path = file.getPath();
          long length = file.getLen();
          //如果文件大小不为0
          if (length != 0) {
            //获取文件block地址
            BlockLocation[] blkLocations;
            if (file instanceof LocatedFileStatus) {
              blkLocations = ((LocatedFileStatus) file).getBlockLocations();
            } else {
              FileSystem fs = path.getFileSystem(job.getConfiguration());
              blkLocations = fs.getFileBlockLocations(file, 0, length);
            }
            //判断文件是否可分割
            if (isSplitable(job, path)) {
              //获取文件blocksize,默认128M
              long blockSize = file.getBlockSize();
              //计算splitSize,具体就是取中间那个。
              long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    
              long bytesRemaining = length;
              //如果bytesRemaining)/splitSize>1.1就执行
              while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                            blkLocations[blkIndex].getHosts(),
                            blkLocations[blkIndex].getCachedHosts()));
                bytesRemaining -= splitSize;
              }
              //添加最后一个块信息到splits数组中
              if (bytesRemaining != 0) {
                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                           blkLocations[blkIndex].getHosts(),
                           blkLocations[blkIndex].getCachedHosts()));
              }
            } else { // not splitable
              //不可分的,直接作为一个块加到splits数组中
              splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                          blkLocations[0].getCachedHosts()));
            }
          } else { 
            //Create empty hosts array for zero length files
            splits.add(makeSplit(path, 0, length, new String[0]));
          }
        }
        // Save the number of input files for metrics/loadgen
        job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
        sw.stop();
        if (LOG.isDebugEnabled()) {
          LOG.debug("Total # of splits generated by getSplits: " + splits.size()
              + ", TimeTaken: " + sw.elapsedMillis());
        }
        return splits;
      }
    

    相关文章

      网友评论

          本文标题:Hadoop wordcount作业提交流程源码分析

          本文链接:https://www.haomeiwen.com/subject/ghumectx.html