概述
基于Hadoop 2.x。
Hadoop核心三大组件:HDFS负责存储,Yarn负责计算资源的管理,MapReduce负责具体的计算流程。
MapReduce任务启动的时候会启动哪些进程?
所有Java程序的入口都是main方法(public static void main(String[] args))。每执行一个main方法,就启动一个JVM进程。MapReduce有三类进程:
com.hadoop.demo.job.demo1.MyJob#main:我们自己编写的Client,用来设置输入源、输出源、Mapper处理类、Reducer处理类等信息的。负责向Yarn申请Container启动MRAppMaster。
org.apache.hadoop.mapreduce.v2.app.MRAppMaster#main:负责向Yarn申请Container启动YarnChild,控制整个计算流程。
org.apache.hadoop.mapred.YarnChild#main:真正的Map进程和Reduce进程。
Client进程是最先启动的,随后Client进程向Yarn申请Container启动MRAppMaster进程,最后MRAppMaster进程向Yarn申请Container启动YarnChild进程。
以上三个main方法都在同一个jar包,只需要执行不同的jar -jar命令即可。
MapReduce工作机制前期准备工作
在我们编写的MapReduce程序的最后都会有一行提交任务的代码:
job.waitForCompletion(true)
里面封装了大量的细节,我们来看看里面到底干了什么,跟进waitForCompletion方法:
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
// 创建与文件系统、Yarn的连接
connect();
// 创建JobSubmitter对象,由JobSubmitter来执行提交操作
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
继续跟进submitJobInternal方法,从submitJobInternal方法上的注释可以看出做了以下这些事情:
1、检查输入和输出目录,如果没有指定输入输出目录或指定的目录已存在则返回错误。
checkSpecs(job);
2、如果有必要的话,设置Job的一些必要的信息到DistributedCache中。addMRFrameworkToDistributedCache方法已被标注@SuppressWarnings("deprecation")。
Configuration conf = job.getConfiguration();
addMRFrameworkToDistributedCache(conf);
3、根据配置获取Job在文件系统上的总提交目录。这个目录是一个父目录,每个Job会提交jar包、配置文件(根据Configuration生成,是一个XML文件)、分片信息文件到文件系统。
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
举个栗子:hdfs:/tmp/hadoop-yarn/staging/root/.staging
4、提交Job信息到Yarn的ResourceManager,根据Yarn的响应构建一个ApplicationSubmissionContext对象,并返回该Job的ID。
JobID jobId = submitClient.getNewJobID();
这里是一次RPC调用,返回一个job的ID。
5、在第4步获取到job的ID之后,那么这个job在文件系统上的信息提交目录就是:第3步的jobStagingArea + "/" + {jobId}。后续生成的jar包、配置文件、分片信息文件都会提交到这个目录。
job.setJobID(jobId);
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
举个栗子:hdfs:/tmp/hadoop-yarn/staging/root/.staging/{jobId}
6、上传jar包,档案,共享文件等到job的提交目录。
copyAndConfigureFiles(job, submitJobDir);
// 上传共享的文件
uploadFiles(job, files, submitJobDir, mapredSysPerms, replication,
fileSCUploadPolicies, statCache);
// 上传依赖的jar包
uploadLibJars(job, libjars, submitJobDir, mapredSysPerms, replication,
fileSCUploadPolicies, statCache);
// 上传档案
uploadArchives(job, archives, submitJobDir, mapredSysPerms, replication,
archiveSCUploadPolicies, statCache);
// 上传job jar
uploadJobJar(job, jobJar, submitJobDir, replication, statCache);
7、计算分片信息,生成分片信息文件上传到job提交目录。
int maps = writeSplits(job, submitJobDir);
8、上传最终的job配置文件到job提交目录(根据全局唯一的Configuration对象生成一个job.xml文件)。
writeConf(conf, submitJobFile);
conf.writeXml(out);
真正提交任务
org.apache.hadoop.mapred.YARNRunner#submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
只需要三个参数:
jobId:前面向Yarn提交任务后返回的,这里可以大胆揣测jobId在Yarn上肯定有映射。
jobSubmitDir:job的提交目录,里面有job运行所需的所有资源。
ts:凭证,Hadoop安全认证所需。
1、生成任务提交上下文信息ApplicationSubmissionContext对象。
ApplicationSubmissionContext appContext =
createApplicationSubmissionContext(conf, jobSubmitDir, ts);
我们来看看构建出来的ApplicationSubmissionContext对象包含哪些信息:
// job的ID
private ApplicationId applicationId = null;
// job的优先级
private Priority priority = null;
// Client向Yarn申请到Container容器资源之后启动容器需要的信息。这里需要向ResourceManager提供足够的信息以便在Container中启动MRAppMaster。
// Client需要提供足够的细节信息,如运行application需要的文件和jar包,执行这些jar包需要的命令,一些unix环境设置等。
private ContainerLaunchContext amContainer = null;
// 申请到的Container中的资源,包含CPU核数和内存大小等信息
private Resource resource = null;
// job附加信息(暂时不知道啥作用)
private Set<String> applicationTags = null;
// 向Yarn申请启动MRAppMaster的Container的请求
private List<ResourceRequest> amResourceRequests = null;
// 日志聚集信息
private LogAggregationContext logAggregationContext = null;
// 预约ID(暂时不知道啥作用)
private ReservationId reservationId = null;
// job超时信息
private Map<ApplicationTimeoutType, Long> applicationTimeouts = null;
// 这里还有job投放的队列Queue信息
再来看看重要的ContainerLaunchContext对象包含哪些信息:
// 所需的资源:包括job.xml、job.jar、job.split、job.splitmetainfo、分布式缓存的文件等
private Map<String, LocalResource> localResources = null;
private ByteBuffer tokens = null;
private ByteBuffer tokensConf = null;
private Map<String, ByteBuffer> serviceData = null;
// 环境准备信息,用于将Hadoop Jar,Job Jar等放到CLASSPATH下面
private Map<String, String> environment = null;
// MRAppMaster的启动命令。
// 举个栗子:
// $JAVA_HOME/bin/java -Dlog4j.configuration=container-log4j.properties
// -Dyarn.app.container.log.dir=<LOG_DIR>
// -Dyarn.app.container.log.filesize=0
// -Dhadoop.root.logger=INFO,CLA
// -Xmx1638m
// -Djava.net.preferIPv4Stack=true
// -Xmx825955249 org.apache.hadoop.mapreduce.v2.app.MRAppMaster
// 1><LOG_DIR>/stdout 2><LOG_DIR>/stderr
private List<String> commands = null;
private Map<ApplicationAccessType, String> applicationACLS = null;
private ContainerRetryContext containerRetryContext = null;
2、将任务提交到Yarn的ResourceManager。
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl#submitApplication(ApplicationSubmissionContext appContext)
rmClient.submitApplication(request);
这里注意了,这里向Yarn的ResourceManager提交任务后,目的是启动MRAppMaster进程。
MRAppMaster进程是谁启动的?
ResourceManager收到提交job的请求后,会分配一个Container用来运行MRAppMaster进程。Container来源于NodeManager节点,因此MRAppMaster进程运行在某个NodeManager节点上。也就是说MRAppMaster进程是由NodeManager启动的。
3、向ResourceManager提交完任务之后,这里会等待MRAppMaster进程启动,然后MRAppMaster进程会主动跟Client进程建立连接。到这里Client进程就完成了主要的使命,剩下的就是不断与MRAppMaster进程通讯,来获取job的执行情况。
while (true) {
try {
ApplicationReport appReport = getApplicationReport(applicationId);
YarnApplicationState state = appReport.getYarnApplicationState();
if (!waitingStates.contains(state)) {
if(failToSubmitStates.contains(state)) {
throw new YarnException("Failed to submit " + applicationId +
" to YARN : " + appReport.getDiagnostics());
}
LOG.info("Submitted application " + applicationId);
break;
}
long elapsedMillis = System.currentTimeMillis() - startTime;
if (enforceAsyncAPITimeout() &&
elapsedMillis >= asyncApiPollTimeoutMillis) {
throw new YarnException("Timed out while waiting for application " +
applicationId + " to be submitted successfully");
}
// Notify the client through the log every 10 poll, in case the client
// is blocked here too long.
if (++pollCount % 10 == 0) {
LOG.info("Application submission is not finished, " +
"submitted application " + applicationId +
" is still in " + state);
}
try {
Thread.sleep(submitPollIntervalMillis);
} catch (InterruptedException ie) {
String msg = "Interrupted while waiting for application "
+ applicationId + " to be successfully submitted.";
LOG.error(msg);
throw new YarnException(msg, ie);
}
} catch (ApplicationNotFoundException ex) {
// FailOver or RM restart happens before RMStateStore saves
// ApplicationState
LOG.info("Re-submit application " + applicationId + "with the " +
"same ApplicationSubmissionContext");
rmClient.submitApplication(request);
}
}
MRAppMaster进程怎么主动跟Client进程建立连接?
Client进程的通讯地址在job准备阶段被设置到了Configuration对象中,Configuration对象被转化成job.xml文件被job的所有进程共享。MRAppMaster进程启动之后会与这个通讯地址建立连接,来保持与Client进程之间的通讯。
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
submitHostAddress = ip.getHostAddress();
submitHostName = ip.getHostName();
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
}
网友评论