美文网首页
Spark Task如何在Executor执行

Spark Task如何在Executor执行

作者: 嘻嘻是小猪 | 来源:发表于2020-06-06 09:16 被阅读0次
    1. CoarseGrainedExecutorBackend#receive() 接收LaunchTask消息并处理
    override def receive: PartialFunction[Any, Unit] = {
    ......  
    //接收到LaunchTask消息
      case LaunchTask(data) =>
          if (executor == null) {
            exitExecutor(1, "Received LaunchTask command but executor was null")
          } else {
            val taskDesc = TaskDescription.decode(data.value) //获得TaskDescription
            logInfo("Got assigned task " + taskDesc.taskId)
            executor.launchTask(this, taskDesc)  //启动Task
          }
    ......
    }
    

    2.org.apache.spark.executor.Executor#launchTask 得到TaskRunner并提交线程池

    // Maintains the list of running tasks.
    private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
    
     // Start worker thread pool 
    private val threadPool = {
        val threadFactory = new ThreadFactoryBuilder()
        ......
          })
          .build()
     Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
    }
    
    def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
        val tr = new TaskRunner(context, taskDescription) //TaskRunner继承自Runnable
        runningTasks.put(taskDescription.taskId, tr)
        threadPool.execute(tr) //TaskRunner对象提交线程池执行
      }
    
    1. org.apache.spark.executor.Executor.TaskRunner#run 反序列化得到Task并执行其run方法
    
    /**
         * The task to run. This will be set in run() by deserializing the task binary coming
         * from the driver. Once it is set, it will never be changed.
         */
    @volatile var task: Task[Any] = _
    override def run(): Unit = {
    ......
    //此处反序列化出真正的Task 关键在于taskDescription.serializedTask
    task = ser.deserialize[Task[Any]](
             taskDescription.serializedTask,Thread.currentThread.getContextClassLoader)
    ......
    //此处最终执行Task的run方法
    val value = try {
              val res = task.run(
                taskAttemptId = taskId,
                attemptNumber = taskDescription.attemptNumber,
                metricsSystem = env.metricsSystem)
              threwException = false
              res
            }
     ......
    }
    
    1. org.apache.spark.scheduler.Task#run() 执行 org.apache.spark.scheduler.Task#runTask()
      终于到了最后调用
    def runTask(context: TaskContext): T //这是一个需要子类实现的方法 e.g ShuffleMapTask ResultTask
    
    Task的子类们.png

    Task执行的主干逻辑还是非常简单,CoarseGrainedExecutorBackend收到消息,将传输过来的数据(TaskDescription)传入TaskRunner,并交由Executor内部线程池执行.
    处理中,主要靠org.apache.spark.scheduler.TaskDescription#serializedTask 这个ByteBuffer反序列化出真正的Task对象.
    最后Task本身run方法中调用子类实现的runTask() 最终逻辑就藏于此

    收工!!!

    相关文章

      网友评论

          本文标题:Spark Task如何在Executor执行

          本文链接:https://www.haomeiwen.com/subject/ahvazhtx.html