美文网首页大数据
yarn任务提交过程源码分析

yarn任务提交过程源码分析

作者: JX907 | 来源:发表于2018-12-28 14:17 被阅读0次
    clipboard.png

    在Yarn上运行的container包含两类,一类是ApplicationMaster,这是每个yarn任务启动的第一个Container;另一类是运行用户任务的Container。

    ApplicationMaster进程启动

    1.Yarn Client 向 Yarn 中提交应用程序。(对应上图中1)

    客户端向Yarn提交任务时需要调用org.apache.hadoop.yarn.client.api.YarnClient的两个api方法:

    
    public abstract YarnClientApplication createApplication()
    
       throws YarnException, IOException;
    
    public abstract ApplicationId submitApplication(
    
       ApplicationSubmissionContext appContext) throws YarnException,
    
       IOException;
    
    

    createApplication向通过RPC与ResourceManager进程通信(rmClient.getNewApplication(request)),让其分配一个新的AppLication,结果存在GetNewApplicationResponse实体中,其中包括ApplicationId、集群最大可分配资源。createApplication的结果存在YarnClientApplication实体中。

    客户端获取到YarnClientApplication后需要设置其中的上下文对象中的信息org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext,包括aplicationName、资源、队列、优先级、ApplicationMaster启动命令(在ContainerLaunchContext实体中,普通Container启动也使用这个实体),最后调用上面提到的第二个方法submitApplication,将ApplicationSubmissionContext实体传到ResourceManger端(rmClient.submitApplication(request);)。

    2.ResourceManager 收到请求后,在集群中选择一个 NodeManager,并为该应用程序分配一个 Container,在这个 Container 中启动应用程序的 ApplicationMaster。 (对应图中2)

    ResourceManager端对应处理代码:

    1)org.apache.hadoop.yarn.server.resourcemanager.ClientRMService类处理对RM的所有RPC请求,提交任务对应处理的方法为submitApplication。

    2)org.apache.hadoop.yarn.server.resourcemanager.RMAppManager#submitApplication方法,首先会实例化RMAppImpl,在实例化RMAppImpl时会初始化状态机(RMAppImpl类155行,见参考 2)

    最后调用

     this.rmContext.getDispatcher().getEventHandler()
    
            .handle(new RMAppEvent(applicationId, RMAppEventType.START));
    

    该处使用异步调度器和状态机模式,状态机的注册在ResouceManger#serviceInit方法中:

    rmDispatcher.register(RMAppEventType.class,
    
          new ApplicationEventDispatcher(rmContext));
    

    3)ApplicationMaster启动:

    上面提到到了RMApp状态机,在RM中,一个RMApp对应一到多个RMappAttempt,即假如RMApp的第一个RMAppAttempt失败后,RM会根据配置启动新的RMAppAttempt,RMAppAttempt的状态机见参考3。

    RMApp启动RMAppAttempt的源码:

    org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl#createAndStartNewAttempt(RMApp状态机流转到APP_ACCEPTED或ATTEMPT_FAILED时触发),触发RMAppAttempt状态机中的START状态。当流转到ALLOCATED状态时RM中的ApplicationMasterLauncher与对应的NodeManager通信,启动ApplicationMaster,此时Application Attempt将被置为LAUNCHED状态。(资源申请过程略)

    代码参考

    org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl.AttemptStoredTransition、

    org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl#launchAttempt、

    org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher

    状态机关系:

    // Transitions from ALLOCATED_SAVING State

    .addTransition(RMAppAttemptState.ALLOCATED_SAVING,

    RMAppAttemptState.ALLOCATED,
    
    RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition())
    

    // Transitions from ALLOCATED State

    .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.LAUNCHED,

    RMAppAttemptEventType.LAUNCHED, new AMLaunchedTransition())
    

    RM与NM通信真正启动AppMaster进程及启动后修改状态机为LAUNCHED:

    org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher#run

    try {

    LOG.info("Launching master" + application.getAppAttemptId());

    launch();

    handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(),

      RMAppAttemptEventType.LAUNCHED));
    

    } catch(Exception ie) {

    launch()方法是真正启动APPMaster的地方,关键代码:

    StartContainersResponse response =

    containerMgrProxy.startContainers(allRequests);
    

    此处containerMgrProxy是AppMaster和NM通信用来启动、停止、获取container状态的rpc协议,由于AppMaster是特殊的Container,且由RM发起启动,所以此处是RM与NM的通信过程。

    3、AppMaster向RM申请资源并要求NM启动container(对应图中3、4)

    AppMaster是一个yarn任务运行时第一个由RM启动的container,然后负责整个任务的运行,包括container的申请、启动、kill、状态检查等。ApplicationMaster属于应用程序级,其实现不是由Yarn框架提供(历史原因,yarn提供了MapReduce的ApplicationMaster的实现),需要用户自己实现ApplicationMaster进程的具体实现。以spakr的ApplicationMaster为例;

    申请资源过程:

    提交需求,通过心跳,把需求发送给 RM;

    获取Container,通过心跳,拿到申请好的 Container;

    每申请到一个 Container ,与 NM 通信,启动这个Container;

    启动container过程:

    调用过程:

    org.apache.spark.deploy.yarn.ApplicationMaster#runExecutorLauncher
    
    org.apache.spark.deploy.yarn.ApplicationMaster#registerAM
    
    org.apache.spark.deploy.yarn.ExecutorRunnable#run
    
    org.apache.spark.deploy.yarn.ExecutorRunnable#startContainer
    
    try {
    
      nmClient.startContainer(container.get, ctx)
    
    } catch {
    
    nmClient为NM的client类
    
    org.apache.hadoop.yarn.client.api.impl.NMClientImpl#startContainer
    
    StartContainersResponse response = proxy      .getContainerManagementProtocol().startContainers(allRequests);
    

    此处通过ContainerManagementProtocol与NM通信,启动Container。

    对应NM端的代码:

    org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl#startContainers
    

    Container进程启动的主类为CoarseGrainedBackend。(在哪指定的类名?)

    container启动状态如何通知到AM?

    4、driver端向完成DAG划分和Task划分,向excutor发送任务。(对应图中5)

    图中YarnClusterScheduler类是yarn集群cluster模式下的TaskScheduler,继承YarnScheduler,YarnScheduler又继承TaskSchedulerImpl类,YarnClusterScheduler和YarnScheduler几乎没有其他实现,主要逻辑都集中在TaskSchedulerImpl类中,TaskSchedulerImpl在SparkContext类中被实例化(参考spark源码“第五节 taskscheduler”)。

    driver端:

    org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#launchTasks
    

    excutor端注册、接受driver派发的任务:

    org.apache.spark.executor.CoarseGrainedExecutorBackend#receive处理的消息类型包括:

    RegisteredExecutor(成功向driver注册)、

    RegisterExecutorFailed(向driver注册失败)、

    LaunchTask(driver向excutor派发的任务)、

    KillTask(杀死任务)、

    StopExecutor(停止executor,转调shutdown)、

    Shutdown(停止executor)

    如果消息是“RegisteredExecutor”表示已经成功向driver注册,此时创建Excutor类

    override def receive: PartialFunction[Any, Unit] = {
    
      case RegisteredExecutor =>
    
        logInfo("Successfully registered with driver")
    
        try {
    
          executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
    
        } catch {
    
          case NonFatal(e) =>
    
            exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
    
        }
    

    如果消息是“LaunchTask”则代表是driver派发过来的任务,此时调用executor的launchTask方法。

    case LaunchTask(data) =>
    
      if (executor == null) {
    
        exitExecutor(1, "Received LaunchTask command but executor was null")
    
      } else {
    
        val taskDesc = TaskDescription.decode(data.value)
    
        logInfo("Got assigned task " + taskDesc.taskId)
    
        executor.launchTask(this, taskDesc)
    
      }
    

    每个task的运行实际运行于线程池中的一个线程中,如下代码:

    def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
    
      val tr = new TaskRunner(context, taskDescription)
    
      runningTasks.put(taskDescription.taskId, tr)
    
      threadPool.execute(tr)
    
    }
    

    参考:

    状态机:

    1、https://blog.csdn.net/lfdanding/article/details/51786110

    2、http://www.cnblogs.com/shenh062326/p/3586510.html

    3、https://www.cnblogs.com/shenh062326/p/3587108.html

    4、

    https://blog.csdn.net/wankunde/article/details/78315438

    相关文章

      网友评论

        本文标题:yarn任务提交过程源码分析

        本文链接:https://www.haomeiwen.com/subject/abislqtx.html