作用:在数据集的每一个元素上,运行函数func进行更新。
package com.atguigu
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark28_RDD_Action5 {
def main(args: Array[String]): Unit = {
// 准备Spark配置对象
val conf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
// 获取Spark上下文环境对象 :
val sc = new SparkContext(conf)
// 行动算子 - foreach
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6))
rdd.collect().foreach(println)
println("********************")
// Driver Coding
rdd.foreach{
// Executor Coding
println
}
// 释放资源
sc.stop()
}
}
结论:算子前的code在Driver端执行的,算子里面的代码在Excutor端执行的。
直接用foreach打印,将数字1,2,3,4,5,6发给多个Excutor,不能确定谁先打印。
使用collect收集到Driver端,Driver在内存中打印。
打印如下
1
2
3
4
5
6
********
19/07/31 19:40:18 INFO SparkContext: Starting job: foreach at Action.scala:18
19/07/31 19:40:18 INFO DAGScheduler: Got job 1 (foreach at Action.scala:18) with 4 output partitions
19/07/31 19:40:18 INFO DAGScheduler: Final stage: ResultStage 1 (foreach at Action.scala:18)
19/07/31 19:40:18 INFO DAGScheduler: Parents of final stage: List()
19/07/31 19:40:18 INFO DAGScheduler: Missing parents: List()
19/07/31 19:40:18 INFO DAGScheduler: Submitting ResultStage 1 (ParallelCollectionRDD[0] at makeRDD at Action.scala:12), which has no missing parents
19/07/31 19:40:18 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 1304.0 B, free 1444.8 MB)
19/07/31 19:40:18 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 938.0 B, free 1444.8 MB)
19/07/31 19:40:18 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.68.1:12401 (size: 938.0 B, free: 1444.8 MB)
19/07/31 19:40:18 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:996
19/07/31 19:40:18 INFO DAGScheduler: Submitting 4 missing tasks from ResultStage 1 (ParallelCollectionRDD[0] at makeRDD at Action.scala:12)
19/07/31 19:40:18 INFO TaskSchedulerImpl: Adding task set 1.0 with 4 tasks
19/07/31 19:40:18 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 4, localhost, executor driver, partition 0, PROCESS_LOCAL, 5885 bytes)
19/07/31 19:40:18 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 5, localhost, executor driver, partition 1, PROCESS_LOCAL, 5889 bytes)
19/07/31 19:40:18 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 6, localhost, executor driver, partition 2, PROCESS_LOCAL, 5885 bytes)
19/07/31 19:40:18 INFO TaskSetManager: Starting task 3.0 in stage 1.0 (TID 7, localhost, executor driver, partition 3, PROCESS_LOCAL, 5889 bytes)
19/07/31 19:40:18 INFO Executor: Running task 0.0 in stage 1.0 (TID 4)
19/07/31 19:40:18 INFO Executor: Running task 1.0 in stage 1.0 (TID 5)
19/07/31 19:40:18 INFO Executor: Running task 2.0 in stage 1.0 (TID 6)
19/07/31 19:40:18 INFO Executor: Running task 3.0 in stage 1.0 (TID 7)
4
19/07/31 19:40:18 INFO Executor: Finished task 3.0 in stage 1.0 (TID 7). 829 bytes result sent to driver
19/07/31 19:40:18 INFO Executor: Finished task 1.0 in stage 1.0 (TID 5). 919 bytes result sent to driver
5
6
2
3
1
网友评论