美文网首页Android进阶之路
Flutter之深入Framework框架——TaskRunne

Flutter之深入Framework框架——TaskRunne

作者: 码农的地中海 | 来源:发表于2022-07-07 17:23 被阅读0次

一、Task执行原理流程:

1.当Driver中SchedulerBackend给ExecutorBackend发送launchTask之后,首先会反序列化TaskDescription。

2.Executor会通过launchTask来执行Task

3.Executor会通过TaskRunner在ThreadPool来运行具体的Task,TaskRunner的run方法中首先会通过调用statueUpdate给Driver发送信息汇报自己的状态,说明自己是running状态

4.TaskRunner,内部会做一些准备工作,例如反序列化Task的依赖,然后通过网络来获取需要的文件、Jar等

5.然后是反序列化Task本身

6.调用反序列化后的Task.run方法来执行任务并获得执行的结果,其中Task的run方法会导致调用的时候会导致task的抽象方法runTask的调用,在Task的runTask内部会调用RDD的iterator()方法,该方法就是我们针对当前Task所对应的Partition进行计算的关键所在,在处理内部会迭代Partition的元素并给我们自定义的function进行处理。

对于shuffleMapTask,首先要要对RDD以及其依赖关系进行反序列化。最终计算会调用RDD的compute方法。具体计算时,有具体的RDD,例如Map

7.把执行结果序列化,并根据大小不同的结果传回给Driver的方式

8.CoresGraninExecutorBackend给DriverEndpoint发送StatusUpdate来传输结果,DriverEndpoint会把执行结果传递给TaskSchedulerImpl处理,然后交给TaskResultGetter去分别处理Task执行成功或者失败的不同情况,然后告诉DAGScheduler任务处理结束的状况。

二、源码阅读

从Executor中的TaskRunner开始入手。看run方法:

1、对序列化的task数据,进行反序列化
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)/2
2、把文件、jar包拷贝过来
updateDependencies(taskFiles, taskJars)
3、通过正式的反序列化操作,将整个task的数据集反序列化
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
4、用task的run方法执行task,这个res就是对于ShuffleMapTask来说就是MapStatus,封装了ShuffleMapTask计算的数据的输出位置信息。后面的task就会去联系MapOutputTracker来获取上个ShuffleMapTask的位置拉取数据
val res = task.run(
  taskAttemptId = taskId,
  attemptNumber = attemptNumber,
  metricsSystem = env.metricsSystem)
看下run方法()

创建了一个TaskContextImpl,就是task的上下文,里面记录了task执行的一些全局性变量,比如task重试了几次,属于哪个stage,要处理Rdd哪个partition。然后调用抽象方法runTask()(具体实现就在子类ShuffleMapTask和ResultTask中)

5、下面同样分两种情况
ShuffleMapTask:

一个ShuffleMapTask会将一个RDD的元素,切分为多个bucket,基于一个在ShuffleDependency中指定的partitioner,默认就是HashPartitioner。

接下来看到它的runTask方法,这里注意,ShuffleMapTask的runTask有MapStatus返回值

override def runTask(context: TaskContext): MapStatus = {
  // Deserialize the RDD using the broadcast variable.
  //对task要处理的rdd相关数据,做一些反序列化操作
    这个RDD,是怎么拿到的呢?
    多个task运行在多个Executor中,都是并行运行,可能都不在同一个地方。
    但是一个stage的task,其实要处理的rdd是一样的。
    所以task怎么拿到自己要处理的那个rdd的数据
    这里会通过broadcast variable直接拿到
  val threadMXBean = ManagementFactory.getThreadMXBean
  val deserializeStartTime = System.currentTimeMillis()
  val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
    threadMXBean.getCurrentThreadCpuTime
  } else 0L
  val ser = SparkEnv.get.closureSerializer.newInstance()
  val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
    ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
  _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
  _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
    threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
  } else 0L

  var writer: ShuffleWriter[Any, Any] = null
  try {
    //获取ShuffleManager,从ShuffleManager中获取ShuffleWriter
    val manager = SparkEnv.get.shuffleManager
    writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
    //最重要的一行代码
    首先调用了Rdd的iterator方法,并且传入了,当前task要处理哪个partition
    所以,核心的逻辑,就在rdd的iterator方法中,这里实现了对Rdd某个partition执行我们自己的算子|
    执行完了算子后,返回的数据,都是通过ShuffleWriter,经过HashPartition进行分区之后,写入自己对于的分区bucket
    writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
    //最后返回结果MapStatus
    MapStatus里面封装了ShuffleMapTask计算后的数据存储在哪里,其实就是BlockManager的相关信息
    BlockManager,是spark底层的内存、数据、磁盘数据管理的组件。
    writer.stop(success = true).get
  } catch {
    case e: Exception =>
      try {
        if (writer != null) {
          writer.stop(success = false)
        }
      } catch {
        case e: Exception =>
          log.debug("Could not stop writer", e)
      }
      throw e
  }
}

接着看RDD的iterator方法

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  if (storageLevel != StorageLevel.NONE) {
    //CacheManager后续再看
    SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
  } else {
    //进行RDD partition的计算
    computeOrReadCheckpoint(split, context)
  }
}
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
    //checkPoint后面再看
  if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)
}
def compute(split: Partition, context: TaskContext): Iterator[T]

compute是个虚方法。从子类中可以看到,compute是针对RDD中的某个partition执行我们给这个RDD定义的算子和函数。Spark内部进行了封装,还实现了一些其他的逻辑。到此,就执行了计算操作,并返回新的RDD的partition的数据

6、run方法中最后调用Executor所在的CoraseGrainedExecutorBackend的statusUpdate方法,通知它分区结束。会向SchedulerBackend发送信息(有空再继续深入研究)
execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason)
ResultTask:
override def runTask(context: TaskContext): U = {
  // Deserialize the RDD and the func using the broadcast variables.
  val threadMXBean = ManagementFactory.getThreadMXBean
  val deserializeStartTime = System.currentTimeMillis()
  val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
    threadMXBean.getCurrentThreadCpuTime
  } else 0L
  //进行基本的反序列化
  val ser = SparkEnv.get.closureSerializer.newInstance()
  val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
    ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
  _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
  _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
    threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
  } else 0L

  //执行通过rdd的iterator进行算子和函数操作
  func(context, rdd.iterator(partition, context))
}

文末

从源码来看,先对Task运行结果value进行一些序列化操作, 然后统计Task运行时候的一些时间信息,比如GC的时间,Task运行时间等;在将结果进行一些封装之后,在序列化为serializedDirectResult,这里面用到了BlockManager组件(shuffle底层的内存管理组件,后面再单独分析它),接着就调用executor所在的CoarseGrainedExecutorBackend的statusUpdate()方法发送StatusUpdate消息给Driver的SparkDeploySchedulerBackend。
更多flutter学习,可以前往领取免费学习资料:【一键领取】

实战混合式开发Flutter手册

task的运行一开始不是直接调用底层的task的run方法直接处理job–>stage–>taskSet–>task这条路线的task任务的,它是通过分层和分工的思想来完成。

相关文章

网友评论

    本文标题:Flutter之深入Framework框架——TaskRunne

    本文链接:https://www.haomeiwen.com/subject/ioypbrtx.html