美文网首页spark
任务执行-源码分析

任务执行-源码分析

作者: 专职掏大粪 | 来源:发表于2021-09-21 07:55 被阅读0次

    CoarseGrainedExecutorBackend.receive
    收到LaunchTask消息

    override def receive: PartialFunction[Any, Unit] = {
       case RegisteredExecutor =>
         logInfo("Successfully registered with driver")
         try {
           executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,
             resources = _resources)
           driver.get.send(LaunchedExecutor(executorId))
         } catch {
           case NonFatal(e) =>
             exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
         }
    
       case LaunchTask(data) =>
         if (executor == null) {
           exitExecutor(1, "Received LaunchTask command but executor was null")
         } else {
           //反序列化task对象
           val taskDesc = TaskDescription.decode(data.value)
           logInfo("Got assigned task " + taskDesc.taskId)
           taskResources(taskDesc.taskId) = taskDesc.resources
          //计算对象运行task
           executor.launchTask(this, taskDesc)
         }
    

    使用executor的线程池threadPool执行task

     def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
        val tr = new TaskRunner(context, taskDescription, plugins)
        runningTasks.put(taskDescription.taskId, tr)
        threadPool.execute(tr)
        if (decommissioned) {
          log.error(s"Launching a task while in decommissioned state.")
        }
      }
    

    TaskRunner.run

    //任务运行
       val res = task.run(
                taskAttemptId = taskId,
                attemptNumber = taskDescription.attemptNumber,
                metricsSystem = env.metricsSystem,
                resources = taskDescription.resources,
                plugins = plugins)
    

    Task.run

    runTask(context)
    

    计算对象运行,计算逻辑在每个任务中

    相关文章

      网友评论

        本文标题:任务执行-源码分析

        本文链接:https://www.haomeiwen.com/subject/ctzmgltx.html