美文网首页大数据
Spark任务如何执行?

Spark任务如何执行?

作者: 阿猫阿狗Hakuna | 来源:发表于2019-02-15 14:01 被阅读10次

    Spark执行模型

           Spark执行模型可以分为三部分:创建逻辑计划,将其翻译为物理计划,在集群上执行task。
           可以在http://<driver-node>:4040上查看关于Spark Jobs的信息。对于已经完成的Spark应用,可以在http://<server-url>:18080上查看信息。
           下面来浏览一下这三个阶段。

    逻辑执行计划

           第一阶段,逻辑执行计划被创建。这个计划展示了哪些steps被执行。回顾一下,当对一个Dataset执行一个转换操作,会有一个新的Dataset被创建。这时,新的Dataset会指向其父Dataset,最终形成一个有向无环图(DAG)。

    物理执行计划

           行动操作会触发逻辑DAG图向物理执行计划的转换。Spark Catalyst query optimizer会为DataFrames创建物理执行计划,如下图所示:


    image.png

           物理执行计划标识执行计划的资源,例如内存分区和计算任务。

    查看逻辑执行计划和物理执行计划

           可以调用explain(true)方法查看逻辑和物理执行计划。如下例所示:

    import org.apache.spark.sql.types._
    import org.apache.spark.sql._
    import org.apache.spark.sql.functions._
    var file = “maprfs:///data/flights20170102.json”
    case class Flight(_id: String, dofW: Long, carrier: String,
    origin: String, dest: String, crsdephour: Long, crsdeptime:
    Double, depdelay: Double,crsarrtime: Double, arrdelay: Double,
    crselapsedtime: Double, dist: Double) extends Serializable
    val df = spark.read.format(“json”).option(“inferSchema”, “true”).
    load(file).as[Flight]
    val df2 = df.filter($”depdelay” > 40)
    df2.take(1)
    result:
    Array[Flight] = Array(Flight(MIA_IAH_2017-01-01_AA_2315,
    7,AA,MIA,IAH,20,2045.0,80.0,2238.0,63.0,173.0,964.0))
    df2.explain(true)
    
    result:
    == Parsed Logical Plan ==
    ‘Filter (‘depdelay > 40)
    +- Relation[_id#8,arrdelay#9,…] json
    == Analyzed Logical Plan ==
    _id: string, arrdelay: double…
    Filter (depdelay#15 > cast(40 as double))
    +- Relation[_id#8,arrdelay#9…] json
    == Optimized Logical Plan ==
    Filter (isnotnull(depdelay#15) && (depdelay#15 > 40.0))
    +- Relation[_id#8,arrdelay#9,…] json
    == Physical Plan ==
    *Project [_id#8, arrdelay#9,…]
    +- *Filter (isnotnull(depdelay#15) && (depdelay#15 > 40.0))
    +- *FileScan json [_id#8,arrdelay#9,…] Batched: false, Format:
    JSON, Location: InMemoryFileIndex[maprfs:///..],
    

           在web页面http://<driver-node>:4040/SQL/上可以看到计划生成的更多细节。


    image.png

           在以下的代码中,我们看到df3的物理计划由FileScan、Filter、Project、HashAggregate、Exchange以及HashAggregate组成。Exchange是由groupBy转换导致的shuffle。Spark在每次shuffle之前对Exchange的数据进行hash aggregation。在shuffle后会针对之前的子aggragation进行一次hash aggregation。

    val df3 = df2.groupBy(“carrier”).count
    df3.collect
    result:
    Array[Row] = Array([UA,2420], [AA,757], [DL,1043], [WN,244])
    df3.explain
    result:
    == Physical Plan ==
    *HashAggregate(keys=[carrier#124], functions=[count(1)])
    +- Exchange hashpartitioning(carrier#124, 200)
    +- *HashAggregate(keys=[carrier#124], functions=[partial_
    count(1)])
    +- *Project [carrier#124]
    +- *Filter (isnotnull(depdelay#129) && (depdelay#129 >
    40.0))
    +- *FileScan json [carrier#124,depdelay#129]
    
    image.png
    image.png

    在集群上执行tasks

           第三阶段,tasks在集群上被调度执行。scheduler将根据转换操作将DAG图划分为stage。窄依赖转换操作(没有数据移动的转换)将被分组到一个单一的stage中。


    image.png

           每个stage由基于partitions的task组成,这些任务将并行执行相同计算。


    image.png
           scheduler将这些stage task提交给task scheduler,task scheduler通过cluster manager启动task。
    image.png

           以下是关于执行组成的一些总结:

    • Task:单台机器上运行的执行单元。
    • Stage:基于partitions的一组task,执行并行计算。
    • Job:具有一个或多个stages。
    • Pipelining:当数据集转换操作时没有数据移动时,将Datasets折叠为单一stage。
    • DAG:数据集操作时的逻辑视图。

           Tasks的数量取决于partitions:在第一个阶段读取文件时,有2个partitions;shuffle过后,partitions的数量为200.可以通过rdd.partitions.size方法查看Dataset的partition数量。

    df3.rdd.partitions.size
    result: Int = 200
    df2.rdd.partitions.size
    result: Int = 2
    

    相关文章

      网友评论

        本文标题:Spark任务如何执行?

        本文链接:https://www.haomeiwen.com/subject/jpppeqtx.html