MapReduce(一):工作机制初探

作者: b91cbec6a902 | 来源:发表于2019-01-08 13:56 被阅读3次

    概述

    基于Hadoop 2.x。
    Hadoop核心三大组件:HDFS负责存储,Yarn负责计算资源的管理,MapReduce负责具体的计算流程。

    MapReduce任务启动的时候会启动哪些进程?
    所有Java程序的入口都是main方法(public static void main(String[] args))。每执行一个main方法,就启动一个JVM进程。MapReduce有三类进程:
    com.hadoop.demo.job.demo1.MyJob#main:我们自己编写的Client,用来设置输入源、输出源、Mapper处理类、Reducer处理类等信息的。负责向Yarn申请Container启动MRAppMaster。
    org.apache.hadoop.mapreduce.v2.app.MRAppMaster#main:负责向Yarn申请Container启动YarnChild,控制整个计算流程。
    org.apache.hadoop.mapred.YarnChild#main:真正的Map进程和Reduce进程。

    Client进程是最先启动的,随后Client进程向Yarn申请Container启动MRAppMaster进程,最后MRAppMaster进程向Yarn申请Container启动YarnChild进程。

    以上三个main方法都在同一个jar包,只需要执行不同的jar -jar命令即可。

    MapReduce工作机制

    前期准备工作

    在我们编写的MapReduce程序的最后都会有一行提交任务的代码:

    job.waitForCompletion(true)
    

    里面封装了大量的细节,我们来看看里面到底干了什么,跟进waitForCompletion方法:

    public void submit() 
           throws IOException, InterruptedException, ClassNotFoundException {
      ensureState(JobState.DEFINE);
      setUseNewAPI();
      // 创建与文件系统、Yarn的连接
      connect();
      // 创建JobSubmitter对象,由JobSubmitter来执行提交操作
      final JobSubmitter submitter = 
          getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
      status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
        public JobStatus run() throws IOException, InterruptedException, 
        ClassNotFoundException {
          return submitter.submitJobInternal(Job.this, cluster);
        }
      });
      state = JobState.RUNNING;
      LOG.info("The url to track the job: " + getTrackingURL());
     }
    

    继续跟进submitJobInternal方法,从submitJobInternal方法上的注释可以看出做了以下这些事情:

    1、检查输入和输出目录,如果没有指定输入输出目录或指定的目录已存在则返回错误。

    checkSpecs(job);
    

    2、如果有必要的话,设置Job的一些必要的信息到DistributedCache中。addMRFrameworkToDistributedCache方法已被标注@SuppressWarnings("deprecation")。

    Configuration conf = job.getConfiguration();
    addMRFrameworkToDistributedCache(conf);
    

    3、根据配置获取Job在文件系统上的总提交目录。这个目录是一个父目录,每个Job会提交jar包、配置文件(根据Configuration生成,是一个XML文件)、分片信息文件到文件系统。

    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    

    举个栗子:hdfs:/tmp/hadoop-yarn/staging/root/.staging
    4、提交Job信息到Yarn的ResourceManager,根据Yarn的响应构建一个ApplicationSubmissionContext对象,并返回该Job的ID。

    JobID jobId = submitClient.getNewJobID();
    

    这里是一次RPC调用,返回一个job的ID。

    5、在第4步获取到job的ID之后,那么这个job在文件系统上的信息提交目录就是:第3步的jobStagingArea + "/" + {jobId}。后续生成的jar包、配置文件、分片信息文件都会提交到这个目录。

    job.setJobID(jobId);
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    

    举个栗子:hdfs:/tmp/hadoop-yarn/staging/root/.staging/{jobId}

    6、上传jar包,档案,共享文件等到job的提交目录。

    copyAndConfigureFiles(job, submitJobDir);
    
    // 上传共享的文件
    uploadFiles(job, files, submitJobDir, mapredSysPerms, replication, 
        fileSCUploadPolicies, statCache);
    // 上传依赖的jar包
    uploadLibJars(job, libjars, submitJobDir, mapredSysPerms, replication,
        fileSCUploadPolicies, statCache);
    // 上传档案
    uploadArchives(job, archives, submitJobDir, mapredSysPerms, replication,
        archiveSCUploadPolicies, statCache);
    // 上传job jar
    uploadJobJar(job, jobJar, submitJobDir, replication, statCache);
    

    7、计算分片信息,生成分片信息文件上传到job提交目录。

    int maps = writeSplits(job, submitJobDir);
    

    8、上传最终的job配置文件到job提交目录(根据全局唯一的Configuration对象生成一个job.xml文件)。

    writeConf(conf, submitJobFile);
    
    conf.writeXml(out);
    

    真正提交任务

    org.apache.hadoop.mapred.YARNRunner#submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
    只需要三个参数:
    jobId:前面向Yarn提交任务后返回的,这里可以大胆揣测jobId在Yarn上肯定有映射。
    jobSubmitDir:job的提交目录,里面有job运行所需的所有资源。
    ts:凭证,Hadoop安全认证所需。

    1、生成任务提交上下文信息ApplicationSubmissionContext对象。

    ApplicationSubmissionContext appContext =
          createApplicationSubmissionContext(conf, jobSubmitDir, ts);
    

    我们来看看构建出来的ApplicationSubmissionContext对象包含哪些信息:

    // job的ID
    private ApplicationId applicationId = null;
    // job的优先级
    private Priority priority = null;
    // Client向Yarn申请到Container容器资源之后启动容器需要的信息。这里需要向ResourceManager提供足够的信息以便在Container中启动MRAppMaster。
    // Client需要提供足够的细节信息,如运行application需要的文件和jar包,执行这些jar包需要的命令,一些unix环境设置等。
    private ContainerLaunchContext amContainer = null;
    // 申请到的Container中的资源,包含CPU核数和内存大小等信息
    private Resource resource = null;
    // job附加信息(暂时不知道啥作用)
    private Set<String> applicationTags = null;
    // 向Yarn申请启动MRAppMaster的Container的请求
    private List<ResourceRequest> amResourceRequests = null;
    // 日志聚集信息
    private LogAggregationContext logAggregationContext = null;
    // 预约ID(暂时不知道啥作用)
    private ReservationId reservationId = null;
    // job超时信息
    private Map<ApplicationTimeoutType, Long> applicationTimeouts = null;
    // 这里还有job投放的队列Queue信息
    
    

    再来看看重要的ContainerLaunchContext对象包含哪些信息:

    // 所需的资源:包括job.xml、job.jar、job.split、job.splitmetainfo、分布式缓存的文件等
    private Map<String, LocalResource> localResources = null;
    private ByteBuffer tokens = null;
    private ByteBuffer tokensConf = null;
    private Map<String, ByteBuffer> serviceData = null;
    // 环境准备信息,用于将Hadoop Jar,Job Jar等放到CLASSPATH下面
    private Map<String, String> environment = null;
    // MRAppMaster的启动命令。
    // 举个栗子:
    // $JAVA_HOME/bin/java -Dlog4j.configuration=container-log4j.properties 
    //  -Dyarn.app.container.log.dir=<LOG_DIR> 
    //  -Dyarn.app.container.log.filesize=0 
    //  -Dhadoop.root.logger=INFO,CLA  
    //  -Xmx1638m 
    //  -Djava.net.preferIPv4Stack=true
    //  -Xmx825955249 org.apache.hadoop.mapreduce.v2.app.MRAppMaster
    //  1><LOG_DIR>/stdout 2><LOG_DIR>/stderr
    private List<String> commands = null;
    private Map<ApplicationAccessType, String> applicationACLS = null;
    private ContainerRetryContext containerRetryContext = null;
    

    2、将任务提交到Yarn的ResourceManager。

    org.apache.hadoop.yarn.client.api.impl.YarnClientImpl#submitApplication(ApplicationSubmissionContext appContext)
    
    rmClient.submitApplication(request);
    

    这里注意了,这里向Yarn的ResourceManager提交任务后,目的是启动MRAppMaster进程。
    MRAppMaster进程是谁启动的?
    ResourceManager收到提交job的请求后,会分配一个Container用来运行MRAppMaster进程。Container来源于NodeManager节点,因此MRAppMaster进程运行在某个NodeManager节点上。也就是说MRAppMaster进程是由NodeManager启动的。

    3、向ResourceManager提交完任务之后,这里会等待MRAppMaster进程启动,然后MRAppMaster进程会主动跟Client进程建立连接。到这里Client进程就完成了主要的使命,剩下的就是不断与MRAppMaster进程通讯,来获取job的执行情况。

    while (true) {
      try {
        ApplicationReport appReport = getApplicationReport(applicationId);
        YarnApplicationState state = appReport.getYarnApplicationState();
        if (!waitingStates.contains(state)) {
          if(failToSubmitStates.contains(state)) {
            throw new YarnException("Failed to submit " + applicationId + 
                " to YARN : " + appReport.getDiagnostics());
          }
          LOG.info("Submitted application " + applicationId);
          break;
        }
    
        long elapsedMillis = System.currentTimeMillis() - startTime;
        if (enforceAsyncAPITimeout() &&
            elapsedMillis >= asyncApiPollTimeoutMillis) {
          throw new YarnException("Timed out while waiting for application " +
              applicationId + " to be submitted successfully");
        }
    
        // Notify the client through the log every 10 poll, in case the client
        // is blocked here too long.
        if (++pollCount % 10 == 0) {
          LOG.info("Application submission is not finished, " +
              "submitted application " + applicationId +
              " is still in " + state);
        }
        try {
          Thread.sleep(submitPollIntervalMillis);
        } catch (InterruptedException ie) {
          String msg = "Interrupted while waiting for application "
              + applicationId + " to be successfully submitted.";
          LOG.error(msg);
          throw new YarnException(msg, ie);
        }
      } catch (ApplicationNotFoundException ex) {
        // FailOver or RM restart happens before RMStateStore saves
        // ApplicationState
        LOG.info("Re-submit application " + applicationId + "with the " +
            "same ApplicationSubmissionContext");
        rmClient.submitApplication(request);
      }
    }
    

    MRAppMaster进程怎么主动跟Client进程建立连接?
    Client进程的通讯地址在job准备阶段被设置到了Configuration对象中,Configuration对象被转化成job.xml文件被job的所有进程共享。MRAppMaster进程启动之后会与这个通讯地址建立连接,来保持与Client进程之间的通讯。

    InetAddress ip = InetAddress.getLocalHost();
    if (ip != null) {
      submitHostAddress = ip.getHostAddress();
      submitHostName = ip.getHostName();
      conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
      conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
    }
    
    到了这里,MRAppMaster进程已启动。接下来是MRAppMaster进程的Show time ~

    相关文章

      网友评论

        本文标题:MapReduce(一):工作机制初探

        本文链接:https://www.haomeiwen.com/subject/jhgyrqtx.html