美文网首页Spark
Spark原理 | 任务执行流程

Spark原理 | 任务执行流程

作者: 木衍小哥 | 来源:发表于2021-12-24 09:48 被阅读0次

    Spark任务从提交到执行完成有很多步骤,整体上可以划分为三个阶段:

    • 应用的提交;

    • 执行环境的准备;

    • 任务的调度和执行。

    Spark任务执行流程

    一、执行流程概述

    Spark有多种不同的运行模式,在不同模式下这三个阶段的执行流程也不太相同。

    以on yarn模式为例,Spark应用提交shell命令如下:

    $SPARK_HOME/bin/spark-submit \
     --class org.apache.spark.examples.SparkPi \
     --master yarn \
     --deploy-mode client \
     $SPARK_HOME/examples/jars/spark-examples*.jar
    

    Spark应用执行过程可以划分如下三个阶段:

    第一步:应用的提交

    • Driver端:

    • 解析参数,验证参数合法性

    • 检查和准备依赖jar包

    • 确定运行的主类,也就是应用的入口

    • Executor端:未创建

    第二步:执行环境的准备

    • Driver端:

    • 进入应用的main函数,开始执行

    • 首先创建SparkContext对象,在创建时会执行

    • 初始化各个服务模块和通信的RPC环境

    • 向cluster manager申请资源

    • Executor端:

    • 在Worker节点启动Executor

    • 初始化Executor,启动各个服务模块

    • 连接到Driver端,汇报Executor的状态

    第三步:任务的调度和执行

    • Driver端:

    • 执行处理任务代码

    • Job分解为Stage,并将Stage划分为Task

    • 提交Task到Executor端

    • 接受Executor端的状态和结果信息

    • Executor端:

    • 启动TaskRunner线程,执行接收到的Task

    • 向Driver端汇报执行状态

    • 向Driver端返回执行结果

    二、执行流程详解

    以如下代码为例,讲解Spark应用执行的各个阶段。

    # HelloWorld.scala
    
    import scala.math.random
    import org.apache.spark.sql.SparkSession
    
    object HelloWorld {
      def main(args: Array[String]) {
            val spark = SparkSession.builder.appName("HelloWorld").getOrCreate()
            val rdd = spark.sparkContext.parallelize(Seq("Hello", "World"))
            rdd.collect()
        }
    }
    

    1、第一阶段:应用的提交

    这个阶段主要在Driver端完成,主要目标是:准备依赖jar包并确定Spark应用的执行主类。具体的任务包括:

    1. 解析任务提交的参数,并对参数进行解析和保存。

    2. 准备任务启动参数制定的依赖文件或者程序包。

    3. 根据Spark应用的执行模式和应用的编写语言,来确定执行的主类名称。

    4. 实例化执行主类,生成SparkApplication对象,并调用SparkApplication.start()函数来运行Spark应用(如果是Java/Scala代码则执行Spark应用中的main函数)。

    注意:第1阶段完成时,Driver端并没有向资源管理平台申请任何资源,也没有启动任何Spark内部的服务。

    2、第二阶段:执行环境的准备

    通过第1阶段,已经找到了运行在Driver端的Spark应用的执行主类,并创建了SparkApplication对象:app。此时,在app.start()函数中会直接调用主类的main函数开始执行应用,从而进入第2阶段。

    第二阶段主要目标是:创建SparkSession(包括SparkContext和SparkEnv),完成资源的申请和Executor的创建。第2阶段完成后Task的执行环境就准备好了。

    也就是说,第2阶段不仅会在Driver端进行初始化,而且还要准备好Executor。这一阶段的任务主要是在Driver端执行创建SparkSession的代码来完成,也就是执行下面一行代码:

    val spark = SparkSession.builder.appName("HelloWorld").getOrCreate()
    

    第二阶段的Driver端主要完成以下步骤:

    • 创建SparkContext和SparkEnv对象,在创建这两个对象时,向Cluster Manager申请资源,启动各个服务模块,并对服务模块进行初始化。
    • 这些服务模块包括:DAG调度服务,任务调度服务,shuffle服务,文件传输服务,数据块管理服务,内存管理服务等。

    第2阶段的Executor端主要完成以下步骤:

    • Driver端向Cluster Manager申请资源,若是Yarn模式会在NodeManager上创建ApplicationMaster,并由ApplicationMaster向Cluster Manager来申请资源,并启动Container,在Container中启动Executor。
    • 在启动Executor时向Driver端注册BlockManager服务,并创建心跳服务RPC环境,通过该RPC环境向Driver汇报Executor的状态信息。

    第二阶段执行完成后的Spark集群状态如下:

    Spark集群状态

    3、第三阶段:任务的调度和执行

    通过第2阶段已经完成了Task执行环境的初始化,此时,在Driver端已经完成了SparkContext和SparkEnv的创建,资源已经申请到了,并且已经启动了Executor。

    这一阶段会执行接下来的数据处理的代码:

    val rdd = spark.sparkContext.parallelize(Seq("Hello", "World"))
    rdd.collect()
    

    第3阶段Driver端主要完成以下步骤:

    • 执行Spark的处理代码,当执行map操作时,生成新的RDD;

    • 当执行Action操作时,触发Job的提交,此时会执行以下步骤:

    • 根据RDD的血缘,把Job划分成相互依赖的Stage;

    • 把每个Stage拆分成一个或多个Task;

    • 把这些Task提交给已经创建好的Executor去执行;

    • 获取Executor的执行状态信息,直到Executor完成所有Task的执行;

    • 获取执行结果和最终的执行状态。

    参考资料

    1. Spark Scheduler 内部原理剖析
    2. 如何理解Spark应用的执行过程

    相关文章

      网友评论

        本文标题:Spark原理 | 任务执行流程

        本文链接:https://www.haomeiwen.com/subject/aitkqrtx.html