精通spark源码-rdd是如何运行的

作者: 曾二爷耶 | 来源:发表于2019-01-13 21:29 被阅读1次

一、spark执行过程的一个例子

// rdd_people: id,年龄
var rdd_people = sc.range(1, 100, 1).map(i=>(i, 20+i%80) )
//rdd_score: id,成绩
var rdd_score =sc.range(1, 100, 1).map(i=>(i  ,i+2))
//两个进行join
var rdd_res = rdd_people.join(rdd_score)
rdd_res.count()

上面的例子就是一个两个数据集进行join然后count的一个操作。
那么在运行这段代码的时候spark内部是如何来处理数据并得到最终得结果的呢。

1.1 spark的角度看你的代码

当你执行下面的代码你会看到一些列连接起来的rdd。那么你上面的那些没有action操作的代码意义就在于组建一个rdd串起来的一个有向无环图(DAG)。

rdd_res.toDebugString
你会得到下面得结果:
(2) MapPartitionsRDD[23] at join at <console>:28 []
 |  MapPartitionsRDD[22] at join at <console>:28 []
 |  CoGroupedRDD[21] at join at <console>:28 []
 +-(2) MapPartitionsRDD[14] at map at <console>:24 []
 |  |  MapPartitionsRDD[13] at range at <console>:24 []
 |  |  ParallelCollectionRDD[12] at range at <console>:24 []
 +-(2) MapPartitionsRDD[17] at map at <console>:24 []
    |  MapPartitionsRDD[16] at range at <console>:24 []
    |  ParallelCollectionRDD[15] at range at <console>:24 []
DAG

1.2 rdd如何得到结果

上面说到我们写的代码都会在spark内部转化成各种rdd的相互连接的dag。那当我们执行count这样的action操作时,spark如何为我们计算并返回结果的呢。
我们在执行count之后可以在spark ui上看到下图。


执行计划

原来spark把这个dag拆分成了几个stage(也就是任务task的集合),再点击某个stage就能看到这个stage下都是那些rdd的操作。

1.3 小结

当你在使用rdd这样的编程范式来表达对数据的处理逻辑时,spark内部就转化成了各种rdd之间的连接关系;使用spark-sql/dataframe也是这样,只是上层的表达方式不同,底层都是各种rdd的连接。最后当你执行count之类的action操作,spark就将这一系列的rdd的连接进行分析,生成一些列的task分发到各个executor上去执行具体的操作,然后收集各个executor的结果最终返回。


二、任务生成流程

2.1 action操作

所谓的action操作其实内部都调用了一个函数sc.runJob 这个函数。sc.runJob进行一些函数闭包的处理还有进度条的控制。而sc又会调用DAGScheduler;DAGScheduler把job提交到一个消息队列中,然后回调handler,handler经过一系列的处理生成task提交到TaskScheduler,由TaskScheduler去把任务分发到各个Executor上运行。


action

2.2 DAGScheduler 都干了啥

总的来说就是切分stage,建立Task,提交Task到taskScheduler。

2.2.1 stage

stage

stage 分两种顾名思义,ResultStage就是最后返回结果的那种stage,shuffleMapStage就是中间的Stage,stage是根据shuffle边界(宽依赖)来划分的,stage之间自然就是shuffle。(关于stage划分之后的文章会有)
源码里会递归的访问rdd发现依赖是ShuffleDependency就会进入下一个stage。

2.2.2 Task

task

task也一样分两种,意思和stage的对应。ResultStage产生的就是ResultTask。
ShuffleTask就负责将rdd的数据计算后使用shuffleWriter把结果写如磁盘。源码片段:

#ShuffleMapTask.scala
    var writer: ShuffleWriter[Any, Any] = null
    try {
      val manager = SparkEnv.get.shuffleManager
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
      //这行是关键 rdd.iterator就会调用rdd定义好的计算逻辑产生数据,然后writer进行write。
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
      writer.stop(success = true).get
    }

三、总结

我们回看一下rdd的执行流程,我们使用spark的api构建rdd之间的关系,最后在action操作的时候,dagScheduler利用依赖关系划分stage,建立任务集,提交Task到TaskScheduler到executor中执行并返回结果。

相关文章

  • 精通spark源码-rdd是如何运行的

    一、spark执行过程的一个例子 上面的例子就是一个两个数据集进行join然后count的一个操作。那么在运行这段...

  • Spark Core

    1.spark core1.1 学习方法 1.2 什么是RDD 1.3 源码解释 1.3.1 源码中体现RDD的五...

  • 《从0到1学习spark》-- RDD

    RDD如何产生 RDD是Spark的基石,是实现Spark数据处理的核心抽象。那么RDD为什么会产生呢? Hado...

  • 【Spark】RDD操作详解1——Transformation和

    Spark算子的作用 下图描述了Spark在运行转换中通过算子对RDD进行转换。 算子是RDD中定义的函数,可以对...

  • RDD(二)

    class RDD源码解析 1.1 RDD源码 1.2 RDD类解释 1.3 RDD class中如何体现RDD的...

  • Spark 控制算子源码解析

    Spark 控制算子源码解析 RDD persist() 算子 使用指定的level来标记RDD进行存储。 可以看...

  • Spark源码之DAGScheduler

    Spark源码之DAGScheduler介绍篇 Spark Application中的RDD经过一系列的Trans...

  • Spark Stream对接kafka 源码分析

    spark RDD理解 DirectInputStream 序言 本文会讲解Spark Stream是如何与Kaf...

  • 1.spark基础-RDD

    1.RDD创建 Spark是以RDD概念为中心运行的。RDD是一个容错的、可以被并行操作的元素集合。创建一个RDD...

  • spark RDD常用函数/操作

    spark RDD常用函数/操作 文中的代码均可以在spark-shell中运行。 transformations...

网友评论

    本文标题:精通spark源码-rdd是如何运行的

    本文链接:https://www.haomeiwen.com/subject/mqllqqtx.html