美文网首页
foreach(func)

foreach(func)

作者: yayooo | 来源:发表于2019-08-06 20:19 被阅读0次

    作用:在数据集的每一个元素上,运行函数func进行更新。

    package com.atguigu
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Spark28_RDD_Action5 {
    
        def main(args: Array[String]): Unit = {
    
            // 准备Spark配置对象
            val conf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    
            // 获取Spark上下文环境对象 :
            val sc = new SparkContext(conf)
    
            // 行动算子 - foreach
            val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6))
    
            rdd.collect().foreach(println)
            println("********************")
            // Driver Coding
            rdd.foreach{
                // Executor Coding
                println
            }
    
            // 释放资源
            sc.stop()
    
        }
    }
    

    结论:算子前的code在Driver端执行的,算子里面的代码在Excutor端执行的。
    直接用foreach打印,将数字1,2,3,4,5,6发给多个Excutor,不能确定谁先打印。
    使用collect收集到Driver端,Driver在内存中打印。

    打印如下

    1
    2
    3
    4
    5
    6
    ********
    19/07/31 19:40:18 INFO SparkContext: Starting job: foreach at Action.scala:18
    19/07/31 19:40:18 INFO DAGScheduler: Got job 1 (foreach at Action.scala:18) with 4 output partitions
    19/07/31 19:40:18 INFO DAGScheduler: Final stage: ResultStage 1 (foreach at Action.scala:18)
    19/07/31 19:40:18 INFO DAGScheduler: Parents of final stage: List()
    19/07/31 19:40:18 INFO DAGScheduler: Missing parents: List()
    19/07/31 19:40:18 INFO DAGScheduler: Submitting ResultStage 1 (ParallelCollectionRDD[0] at makeRDD at Action.scala:12), which has no missing parents
    19/07/31 19:40:18 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 1304.0 B, free 1444.8 MB)
    19/07/31 19:40:18 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 938.0 B, free 1444.8 MB)
    19/07/31 19:40:18 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.68.1:12401 (size: 938.0 B, free: 1444.8 MB)
    19/07/31 19:40:18 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:996
    19/07/31 19:40:18 INFO DAGScheduler: Submitting 4 missing tasks from ResultStage 1 (ParallelCollectionRDD[0] at makeRDD at Action.scala:12)
    19/07/31 19:40:18 INFO TaskSchedulerImpl: Adding task set 1.0 with 4 tasks
    19/07/31 19:40:18 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 4, localhost, executor driver, partition 0, PROCESS_LOCAL, 5885 bytes)
    19/07/31 19:40:18 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 5, localhost, executor driver, partition 1, PROCESS_LOCAL, 5889 bytes)
    19/07/31 19:40:18 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 6, localhost, executor driver, partition 2, PROCESS_LOCAL, 5885 bytes)
    19/07/31 19:40:18 INFO TaskSetManager: Starting task 3.0 in stage 1.0 (TID 7, localhost, executor driver, partition 3, PROCESS_LOCAL, 5889 bytes)
    19/07/31 19:40:18 INFO Executor: Running task 0.0 in stage 1.0 (TID 4)
    19/07/31 19:40:18 INFO Executor: Running task 1.0 in stage 1.0 (TID 5)
    19/07/31 19:40:18 INFO Executor: Running task 2.0 in stage 1.0 (TID 6)
    19/07/31 19:40:18 INFO Executor: Running task 3.0 in stage 1.0 (TID 7)
    4
    19/07/31 19:40:18 INFO Executor: Finished task 3.0 in stage 1.0 (TID 7). 829 bytes result sent to driver
    19/07/31 19:40:18 INFO Executor: Finished task 1.0 in stage 1.0 (TID 5). 919 bytes result sent to driver
    5
    6
    2
    3
    1
    

    相关文章

      网友评论

          本文标题:foreach(func)

          本文链接:https://www.haomeiwen.com/subject/pxxtdctx.html