我们在运行Spark Application的时候,或许没有注意过jobId这个东西。或许,HistoryServer中的这个tab我们都没注意看过。
在这个tab中,我们可以看到,有多个jobId。那么它们是怎样生成的呢?
上面这个Application,是下面这段代码:
def main(args: Array[String]): Unit = {
val sparkConfig = new SparkConf
sparkConfig.setMaster("spark://alstonwilliams:7077")
val sparkContext = new SparkContext(sparkConfig)
/**
*
*
* test1RDDStep1
* | map
* test1RDDStep2 ->(map) test1RDDStep3
* | foreach | foreach
*
*
*
* */
val test1RDDStep1: RDD[String] = sparkContext.parallelize(Seq("A", "B", "C", "D"))
val test1RDDStep2 = test1RDDStep1.map(str => {str + str})
test1RDDStep2.foreach(println(_))
val test1RDDStep3 = test1RDDStep2.map(str => {str + str})
test1RDDStep3.foreach(println(_))
}
我们可以看到,有几个action,就有几个jobId。
我们也可以从Spark的源码中来验证这一点。
DAGScheduler这个方法,每个Action都会调用。我们可以看到,其中就会生成一个新的jobId.
网友评论