Spark任务从提交到执行完成有很多步骤,整体上可以划分为三个阶段:
-
应用的提交;
-
执行环境的准备;
-
任务的调度和执行。
一、执行流程概述
Spark有多种不同的运行模式,在不同模式下这三个阶段的执行流程也不太相同。
以on yarn模式为例,Spark应用提交shell命令如下:
$SPARK_HOME/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
$SPARK_HOME/examples/jars/spark-examples*.jar
Spark应用执行过程可以划分如下三个阶段:
第一步:应用的提交
-
Driver端:
-
解析参数,验证参数合法性
-
检查和准备依赖jar包
-
确定运行的主类,也就是应用的入口
-
Executor端:未创建
第二步:执行环境的准备
-
Driver端:
-
进入应用的main函数,开始执行
-
首先创建SparkContext对象,在创建时会执行
-
初始化各个服务模块和通信的RPC环境
-
向cluster manager申请资源
-
Executor端:
-
在Worker节点启动Executor
-
初始化Executor,启动各个服务模块
-
连接到Driver端,汇报Executor的状态
第三步:任务的调度和执行
-
Driver端:
-
执行处理任务代码
-
Job分解为Stage,并将Stage划分为Task
-
提交Task到Executor端
-
接受Executor端的状态和结果信息
-
Executor端:
-
启动TaskRunner线程,执行接收到的Task
-
向Driver端汇报执行状态
-
向Driver端返回执行结果
二、执行流程详解
以如下代码为例,讲解Spark应用执行的各个阶段。
# HelloWorld.scala
import scala.math.random
import org.apache.spark.sql.SparkSession
object HelloWorld {
def main(args: Array[String]) {
val spark = SparkSession.builder.appName("HelloWorld").getOrCreate()
val rdd = spark.sparkContext.parallelize(Seq("Hello", "World"))
rdd.collect()
}
}
1、第一阶段:应用的提交
这个阶段主要在Driver端完成,主要目标是:准备依赖jar包并确定Spark应用的执行主类。具体的任务包括:
-
解析任务提交的参数,并对参数进行解析和保存。
-
准备任务启动参数制定的依赖文件或者程序包。
-
根据Spark应用的执行模式和应用的编写语言,来确定执行的主类名称。
-
实例化执行主类,生成SparkApplication对象,并调用SparkApplication.start()函数来运行Spark应用(如果是Java/Scala代码则执行Spark应用中的main函数)。
注意:第1阶段完成时,Driver端并没有向资源管理平台申请任何资源,也没有启动任何Spark内部的服务。
2、第二阶段:执行环境的准备
通过第1阶段,已经找到了运行在Driver端的Spark应用的执行主类,并创建了SparkApplication对象:app。此时,在app.start()函数中会直接调用主类的main函数开始执行应用,从而进入第2阶段。
第二阶段主要目标是:创建SparkSession(包括SparkContext和SparkEnv),完成资源的申请和Executor的创建。第2阶段完成后Task的执行环境就准备好了。
也就是说,第2阶段不仅会在Driver端进行初始化,而且还要准备好Executor。这一阶段的任务主要是在Driver端执行创建SparkSession的代码来完成,也就是执行下面一行代码:
val spark = SparkSession.builder.appName("HelloWorld").getOrCreate()
第二阶段的Driver端主要完成以下步骤:
- 创建SparkContext和SparkEnv对象,在创建这两个对象时,向Cluster Manager申请资源,启动各个服务模块,并对服务模块进行初始化。
- 这些服务模块包括:DAG调度服务,任务调度服务,shuffle服务,文件传输服务,数据块管理服务,内存管理服务等。
第2阶段的Executor端主要完成以下步骤:
- Driver端向Cluster Manager申请资源,若是Yarn模式会在NodeManager上创建ApplicationMaster,并由ApplicationMaster向Cluster Manager来申请资源,并启动Container,在Container中启动Executor。
- 在启动Executor时向Driver端注册BlockManager服务,并创建心跳服务RPC环境,通过该RPC环境向Driver汇报Executor的状态信息。
第二阶段执行完成后的Spark集群状态如下:
Spark集群状态3、第三阶段:任务的调度和执行
通过第2阶段已经完成了Task执行环境的初始化,此时,在Driver端已经完成了SparkContext和SparkEnv的创建,资源已经申请到了,并且已经启动了Executor。
这一阶段会执行接下来的数据处理的代码:
val rdd = spark.sparkContext.parallelize(Seq("Hello", "World"))
rdd.collect()
第3阶段Driver端主要完成以下步骤:
-
执行Spark的处理代码,当执行map操作时,生成新的RDD;
-
当执行Action操作时,触发Job的提交,此时会执行以下步骤:
-
根据RDD的血缘,把Job划分成相互依赖的Stage;
-
把每个Stage拆分成一个或多个Task;
-
把这些Task提交给已经创建好的Executor去执行;
-
获取Executor的执行状态信息,直到Executor完成所有Task的执行;
-
获取执行结果和最终的执行状态。
网友评论