在上一篇文章中,我们初步介绍了一个客户端是如何准备一个Job并提交给ResourceManager.在这篇文章中,我们会简单介绍,在YARN端,是如何提交并启动这个Job的.
首先,Client会发送ApplicationSubmissionContext以及ContainerLaunchContext到ResourceManager.
@Override
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
throws IOException, InterruptedException {
addHistoryToken(ts);
// Construct necessary information to start the MR AM
ApplicationSubmissionContext appContext =
createApplicationSubmissionContext(conf, jobSubmitDir, ts);
// Submit to ResourceManager
try {
ApplicationId applicationId =
resMgrDelegate.submitApplication(appContext);
ApplicationReport appMaster = resMgrDelegate
.getApplicationReport(applicationId);
String diagnostics =
(appMaster == null ?
"application report is null" : appMaster.getDiagnostics());
if (appMaster == null
|| appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
|| appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
throw new IOException("Failed to run job : " +
diagnostics);
}
return clientCache.getClient(jobId).getJobStatus(jobId);
} catch (YarnException e) {
throw new IOException(e);
}
}
public ApplicationId
submitApplication(ApplicationSubmissionContext appContext)
throws YarnException, IOException {
....
// Automatically add the timeline DT into the CLC
// Only when the security and the timeline service are both enabled
if (isSecurityEnabled() && timelineServiceEnabled) {
addTimelineDelegationToken(appContext.getAMContainerSpec());
}
//看这里
//TODO: YARN-1763:Handle RM failovers during the submitApplication call.
rmClient.submitApplication(request);
int pollCount = 0;
long startTime = System.currentTimeMillis();
while (true) {
try {
YarnApplicationState state =
getApplicationReport(applicationId).getYarnApplicationState();
if (!state.equals(YarnApplicationState.NEW) &&
!state.equals(YarnApplicationState.NEW_SAVING)) {
LOG.info("Submitted application " + applicationId);
break;
}
....
我们可以看到,这里并不会使用rmClient.submitApplication(request)这个方法的返回结果,而是会在后面一直通过getApplicationReport(applicationId)方法返回的ApplicationReport中的YarnApplicationState来判断Job是否提交成功.
ApplicationReport中,包含了这么一些信息:
- ApplicationId
- Application user
- Application queue
- Application name
- Host on which the ApplicationMaster is running
- RPC port of the ApplicationMaster
- Tracking URL
- YarnApplicationState of the application
- Diagnostic information in case of errors
- Start time of the application
- Client Token of the application(if security is enabled)
其中的Host on which the ApplicationMaster is running以及RPC port of the ApplicationMaster,就可以让Client知道去哪里读取MapReduce Job的状态信息.
然后,ResourceManager中的ClientRMService接收到Client发送来的数据结构,并进行一些验证.
@Override
public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request) throws YarnException {
ApplicationSubmissionContext submissionContext = request
.getApplicationSubmissionContext();
ApplicationId applicationId = submissionContext.getApplicationId();
// ApplicationSubmissionContext needs to be validated for safety - only
// those fields that are independent of the RM's configuration will be
// checked here, those that are dependent on RM configuration are validated
// in RMAppManager.
String user = null;
try {
// Safety
user = UserGroupInformation.getCurrentUser().getShortUserName();
} catch (IOException ie) {
LOG.warn("Unable to get the current user.", ie);
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
ie.getMessage(), "ClientRMService",
"Exception in submitting application", applicationId);
throw RPCUtil.getRemoteException(ie);
}
// Check whether app has already been put into rmContext,
// If it is, simply return the response
if (rmContext.getRMApps().get(applicationId) != null) {
LOG.info("This is an earlier submitted application: " + applicationId);
return SubmitApplicationResponse.newInstance();
}
.....
然后,ResourceManager通知YarnScheduler进行资源的分配,为ApplicationMaster分配Container.
@VisibleForTesting
public static final class ScheduleTransition
implements
MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
@Override
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
ApplicationSubmissionContext subCtx = appAttempt.submissionContext;
if (!subCtx.getUnmanagedAM()) {
// Need reset #containers before create new attempt, because this request
// will be passed to scheduler, and scheduler will deduct the number after
// AM container allocated
// Currently, following fields are all hard code,
// TODO: change these fields when we want to support
// priority/resource-name/relax-locality specification for AM containers
// allocation.
appAttempt.amReq.setNumContainers(1);
appAttempt.amReq.setPriority(AM_CONTAINER_PRIORITY);
appAttempt.amReq.setResourceName(ResourceRequest.ANY);
appAttempt.amReq.setRelaxLocality(true);
// AM resource has been checked when submission
Allocation amContainerAllocation =
appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
Collections.singletonList(appAttempt.amReq),
EMPTY_CONTAINER_RELEASE_LIST, null, null);
if (amContainerAllocation != null
&& amContainerAllocation.getContainers() != null) {
assert (amContainerAllocation.getContainers().size() == 0);
}
return RMAppAttemptState.SCHEDULED;
} else {
// save state and then go to LAUNCHED state
appAttempt.storeAttempt();
return RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING;
}
}
}
然后,ResourceManager给ApplicationMasterLauncher发送一个事件-AMLauncherEventType.LAUNCH.
private void launchAttempt(){
// Send event to launch the AM Container
eventHandler.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH, this));
}
然后,ApplicationMasterLauncher在接收到这个事件之后,会启动一个AMLauncher.
@Override
public synchronized void handle(AMLauncherEvent appEvent) {
AMLauncherEventType event = appEvent.getType();
RMAppAttempt application = appEvent.getAppAttempt();
switch (event) {
case LAUNCH:
launch(application);
break;
case CLEANUP:
cleanup(application);
default:
break;
}
}
}
然后,这个AMLauncher通知NodeManager的ContainerManagerImpl启动一个ApplicationMaster.
private void launch() throws IOException, YarnException {
connect();
ContainerId masterContainerID = masterContainer.getId();
ApplicationSubmissionContext applicationContext =
application.getSubmissionContext();
LOG.info("Setting up container " + masterContainer
+ " for AM " + application.getAppAttemptId());
ContainerLaunchContext launchContext =
createAMContainerLaunchContext(applicationContext, masterContainerID);
StartContainerRequest scRequest =
StartContainerRequest.newInstance(launchContext,
masterContainer.getContainerToken());
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
list.add(scRequest);
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
StartContainersResponse response =
containerMgrProxy.startContainers(allRequests);
if (response.getFailedRequests() != null
&& response.getFailedRequests().containsKey(masterContainerID)) {
Throwable t =
response.getFailedRequests().get(masterContainerID).deSerialize();
parseAndThrowException(t);
} else {
LOG.info("Done launching container " + masterContainer + " for AM "
+ application.getAppAttemptId());
}
}
startContainers
@Override
public StartContainersResponse
startContainers(StartContainersRequest requests) throws YarnException,
IOException {
if (blockNewContainerRequests.get()) {
throw new NMNotYetReadyException(
"Rejecting new containers as NodeManager has not"
+ " yet connected with ResourceManager");
}
UserGroupInformation remoteUgi = getRemoteUgi();
NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
authorizeUser(remoteUgi,nmTokenIdentifier);
List<ContainerId> succeededContainers = new ArrayList<ContainerId>();
Map<ContainerId, SerializedException> failedContainers =
new HashMap<ContainerId, SerializedException>();
for (StartContainerRequest request : requests.getStartContainerRequests()) {
ContainerId containerId = null;
try {
ContainerTokenIdentifier containerTokenIdentifier =
BuilderUtils.newContainerTokenIdentifier(request.getContainerToken());
verifyAndGetContainerTokenIdentifier(request.getContainerToken(),
containerTokenIdentifier);
containerId = containerTokenIdentifier.getContainerID();
startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
request);
succeededContainers.add(containerId);
} catch (YarnException e) {
failedContainers.put(containerId, SerializedException.newInstance(e));
} catch (InvalidToken ie) {
failedContainers.put(containerId, SerializedException.newInstance(ie));
throw ie;
} catch (IOException e) {
throw RPCUtil.getRemoteException(e);
}
}
ContainerManagerImpl在接收到AMLauncher的通知之后,就会检查AMLauncher发送给它的Container相关的信息是否正确.如果验证通过,就将需要的资源进行本地化,供Container执行的时候使用.
Map<String, LocalResource> cntrRsrc = ctxt.getLocalResources();
if (!cntrRsrc.isEmpty()) {
try {
Iterator i$ = cntrRsrc.entrySet().iterator();
while(i$.hasNext()) {
Entry rsrc = (Entry)i$.next();
try {
LocalResourceRequest req = new LocalResourceRequest((LocalResource)rsrc.getValue());
List<String> links = (List)container.pendingResources.get(req);
if (links == null) {
links = new ArrayList();
container.pendingResources.put(req, links);
}
((List)links).add(rsrc.getKey());
switch(((LocalResource)rsrc.getValue()).getVisibility()) {
case PUBLIC:
container.publicRsrcs.add(req);
break;
case PRIVATE:
container.privateRsrcs.add(req);
break;
case APPLICATION:
container.appRsrcs.add(req);
}
} catch (URISyntaxException var10) {
ContainerImpl.LOG.info("Got exception parsing " + (String)rsrc.getKey() + " and value " + rsrc.getValue());
throw var10;
}
}
} catch (URISyntaxException var11) {
ContainerImpl.LOG.warn("Failed to parse resource-request", var11);
container.cleanup();
container.metrics.endInitingContainer();
return ContainerState.LOCALIZATION_FAILED;
}
这里我们可以看到,总共有三种visibility的LocalResource:
- PUBLIC: All the LocalResources that are marked PUBLIC are accessible for containers of any user.
- PRIVATE: LocalResources that are marked PRIVATE are shared among all applications of the same user on the node.
- APPLICATION: All the resources that are marked as having the APPLICATION scope are shared only among containers of the same application on the node.
在资源本地化完成之后,就会通过ContainersLauncher进行容器的加载.
@Override
public void handle(ContainersLauncherEvent event) {
// TODO: ContainersLauncher launches containers one by one!!
Container container = event.getContainer();
ContainerId containerId = container.getContainerId();
switch (event.getType()) {
case LAUNCH_CONTAINER:
Application app =
context.getApplications().get(
containerId.getApplicationAttemptId().getApplicationId());
ContainerLaunch launch =
new ContainerLaunch(context, getConfig(), dispatcher, exec, app,
event.getContainer(), dirsHandler, containerManager);
containerLauncher.submit(launch);
running.put(containerId, launch);
break;
case RECOVER_CONTAINER:
app = context.getApplications().get(
containerId.getApplicationAttemptId().getApplicationId());
launch = new RecoveredContainerLaunch(context, getConfig(), dispatcher,
exec, app, event.getContainer(), dirsHandler, containerManager);
containerLauncher.submit(launch);
running.put(containerId, launch);
break;
case CLEANUP_CONTAINER:
ContainerLaunch launcher = running.remove(containerId);
if (launcher == null) {
// Container not launched. So nothing needs to be done.
return;
}
// Cleanup a container whether it is running/killed/completed, so that
// no sub-processes are alive.
try {
launcher.cleanupContainer();
} catch (IOException e) {
LOG.warn("Got exception while cleaning container " + containerId
+ ". Ignoring.");
}
break;
}
}
ContainerLaunch的call()方法
try {
// /////////// Write out the container-script in the nmPrivate space.
List<Path> appDirs = new ArrayList<Path>(localDirs.size());
for (String localDir : localDirs) {
Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
Path userdir = new Path(usersdir, user);
Path appsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
appDirs.add(new Path(appsdir, appIdStr));
}
containerScriptOutStream =
lfs.create(nmPrivateContainerScriptPath,
EnumSet.of(CREATE, OVERWRITE));
// Set the token location too.
environment.put(
ApplicationConstants.CONTAINER_TOKEN_FILE_ENV_NAME,
new Path(containerWorkDir,
FINAL_CONTAINER_TOKENS_FILE).toUri().getPath());
// Sanitize the container's environment
sanitizeEnv(environment, containerWorkDir, appDirs, containerLogDirs,
localResources, nmPrivateClasspathJarDir);
// Write out the environment
exec.writeLaunchEnv(containerScriptOutStream, environment, localResources,
launchContext.getCommands());
// /////////// End of writing out container-script
// /////////// Write out the container-tokens in the nmPrivate space.
tokensOutStream =
lfs.create(nmPrivateTokensPath, EnumSet.of(CREATE, OVERWRITE));
Credentials creds = container.getCredentials();
creds.writeTokenStorageToStream(tokensOutStream);
// /////////// End of writing out container-tokens
} finally {
IOUtils.cleanup(LOG, containerScriptOutStream, tokensOutStream);
}
// LaunchContainer is a blocking call. We are here almost means the
// container is launched, so send out the event.
dispatcher.getEventHandler().handle(new ContainerEvent(
containerID,
ContainerEventType.CONTAINER_LAUNCHED));
context.getNMStateStore().storeContainerLaunched(containerID);
// Check if the container is signalled to be killed.
if (!shouldLaunchContainer.compareAndSet(false, true)) {
LOG.info("Container " + containerIdStr + " not launched as "
+ "cleanup already called");
ret = ExitCode.TERMINATED.getExitCode();
}
else {
exec.activateContainer(containerID, pidFilePath);
ret = exec.launchContainer(container, nmPrivateContainerScriptPath,
nmPrivateTokensPath, user, appIdStr, containerWorkDir,
localDirs, logDirs);
}
} catch (Throwable e) {
LOG.warn("Failed to launch container.", e);
dispatcher.getEventHandler().handle(new ContainerExitEvent(
containerID, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret,
e.getMessage()));
return ret;
} finally {
completed.set(true);
从上面的代码中,我们可以看到,ContainerLaunch会一直阻塞,直到Container执行完成,并向ApplicationMaster或者ResourceManager报告结果.
这样ApplicationMaster就启动完成了.
在ApplicationMaster内部,会根据InputSplit来决定Mapper的数量,通过ResourceRequest向ResourceManager请求资源,然后在NodeManager上进行分配.
ApplicationMaster为Mapper或者Reducer分配Container的过程,跟上面给ApplicationMaster分配Container的过程,都是一样的。
网友评论