美文网首页大数据
Spark任务如何执行?

Spark任务如何执行?

作者: 阿猫阿狗Hakuna | 来源:发表于2019-02-15 14:01 被阅读10次

Spark执行模型

       Spark执行模型可以分为三部分:创建逻辑计划,将其翻译为物理计划,在集群上执行task。
       可以在http://<driver-node>:4040上查看关于Spark Jobs的信息。对于已经完成的Spark应用,可以在http://<server-url>:18080上查看信息。
       下面来浏览一下这三个阶段。

逻辑执行计划

       第一阶段,逻辑执行计划被创建。这个计划展示了哪些steps被执行。回顾一下,当对一个Dataset执行一个转换操作,会有一个新的Dataset被创建。这时,新的Dataset会指向其父Dataset,最终形成一个有向无环图(DAG)。

物理执行计划

       行动操作会触发逻辑DAG图向物理执行计划的转换。Spark Catalyst query optimizer会为DataFrames创建物理执行计划,如下图所示:


image.png

       物理执行计划标识执行计划的资源,例如内存分区和计算任务。

查看逻辑执行计划和物理执行计划

       可以调用explain(true)方法查看逻辑和物理执行计划。如下例所示:

import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
var file = “maprfs:///data/flights20170102.json”
case class Flight(_id: String, dofW: Long, carrier: String,
origin: String, dest: String, crsdephour: Long, crsdeptime:
Double, depdelay: Double,crsarrtime: Double, arrdelay: Double,
crselapsedtime: Double, dist: Double) extends Serializable
val df = spark.read.format(“json”).option(“inferSchema”, “true”).
load(file).as[Flight]
val df2 = df.filter($”depdelay” > 40)
df2.take(1)
result:
Array[Flight] = Array(Flight(MIA_IAH_2017-01-01_AA_2315,
7,AA,MIA,IAH,20,2045.0,80.0,2238.0,63.0,173.0,964.0))
df2.explain(true)
result:
== Parsed Logical Plan ==
‘Filter (‘depdelay > 40)
+- Relation[_id#8,arrdelay#9,…] json
== Analyzed Logical Plan ==
_id: string, arrdelay: double…
Filter (depdelay#15 > cast(40 as double))
+- Relation[_id#8,arrdelay#9…] json
== Optimized Logical Plan ==
Filter (isnotnull(depdelay#15) && (depdelay#15 > 40.0))
+- Relation[_id#8,arrdelay#9,…] json
== Physical Plan ==
*Project [_id#8, arrdelay#9,…]
+- *Filter (isnotnull(depdelay#15) && (depdelay#15 > 40.0))
+- *FileScan json [_id#8,arrdelay#9,…] Batched: false, Format:
JSON, Location: InMemoryFileIndex[maprfs:///..],

       在web页面http://<driver-node>:4040/SQL/上可以看到计划生成的更多细节。


image.png

       在以下的代码中,我们看到df3的物理计划由FileScan、Filter、Project、HashAggregate、Exchange以及HashAggregate组成。Exchange是由groupBy转换导致的shuffle。Spark在每次shuffle之前对Exchange的数据进行hash aggregation。在shuffle后会针对之前的子aggragation进行一次hash aggregation。

val df3 = df2.groupBy(“carrier”).count
df3.collect
result:
Array[Row] = Array([UA,2420], [AA,757], [DL,1043], [WN,244])
df3.explain
result:
== Physical Plan ==
*HashAggregate(keys=[carrier#124], functions=[count(1)])
+- Exchange hashpartitioning(carrier#124, 200)
+- *HashAggregate(keys=[carrier#124], functions=[partial_
count(1)])
+- *Project [carrier#124]
+- *Filter (isnotnull(depdelay#129) && (depdelay#129 >
40.0))
+- *FileScan json [carrier#124,depdelay#129]
image.png
image.png

在集群上执行tasks

       第三阶段,tasks在集群上被调度执行。scheduler将根据转换操作将DAG图划分为stage。窄依赖转换操作(没有数据移动的转换)将被分组到一个单一的stage中。


image.png

       每个stage由基于partitions的task组成,这些任务将并行执行相同计算。


image.png
       scheduler将这些stage task提交给task scheduler,task scheduler通过cluster manager启动task。
image.png

       以下是关于执行组成的一些总结:

  • Task:单台机器上运行的执行单元。
  • Stage:基于partitions的一组task,执行并行计算。
  • Job:具有一个或多个stages。
  • Pipelining:当数据集转换操作时没有数据移动时,将Datasets折叠为单一stage。
  • DAG:数据集操作时的逻辑视图。

       Tasks的数量取决于partitions:在第一个阶段读取文件时,有2个partitions;shuffle过后,partitions的数量为200.可以通过rdd.partitions.size方法查看Dataset的partition数量。

df3.rdd.partitions.size
result: Int = 200
df2.rdd.partitions.size
result: Int = 2

相关文章

  • Spark任务如何执行?

    Spark执行模型 Spark执行模型可以分为三部分:创建逻辑计划,将其翻译为物理计划,在集群上执行task...

  • 脚本提交spark任务会自动重试

    1)如果spark任务执行成功,不会自动重试2)如果spark任务执行失败,手动提交时不会重试3)如果spark任...

  • SparkCore(二)

    每种部署模式如何提交任务? Client模式yarn 本地通过Spark-Submit提交任务,执行Main进程,...

  • spark-submit 参数讲解

    spark-submit 可以提交任务到 spark 集群执行,也可以提交到 hadoop 的 yarn 集群执行...

  • spark任务执行过程

    ​ 在学习了Spark RDD和RDD操作之后,是不是很想快点写个Spark程序来巩固一下所学的知识。学习大数...

  • spark wordcount

    wordcount java编写spark执行 maven pom 项目结构 java代码 bash提交任务到Spark

  • 【Spark】Spark作业执行原理--执行任务

    本篇结构: CoarseGrainedExecutorBackend 接收 LaunchTask 消息 Execu...

  • spark四种运行模式

    1. spark的核心组件 1.1 Driver Spark 驱动器节点,用于执行 Spark 任务中的 mai...

  • 利用Akka获取Spark任务的返回结果

    通过spark-submit提交的任务都需要指定Main类作为程序的入口,Main类执行结束即Spark任务终结。...

  • Spark 的shell操作

    执行spark的任务的工具 一、spark-submit 相当于hadoop jar命令-->提交MapReduc...

网友评论

    本文标题:Spark任务如何执行?

    本文链接:https://www.haomeiwen.com/subject/jpppeqtx.html