美文网首页
Spark累加器及广播变量的探讨

Spark累加器及广播变量的探讨

作者: 喵星人ZC | 来源:发表于2019-05-19 14:37 被阅读0次

    场景:

    val a = 10
    val rdd = sc.parallelize(List(1,2,3,4,5,6))
    rdd.mapPartition(x => {
    
    //TODO需要用到Driver端的变量 a
    
    })
    

    1、定义一个变量a(driver端)
    2、计算时(executor端)需要用到变量a
    3、假设rdd有1万个Partition,那么没有广播变量的情况下,a会被发送到每个分区,这将耗费极大的资源。而且executor端变更了a的值,driver端也不会变,因为driver和executor数据不能共享

    基于以上场景Spark就催生了广播变量以及累加器 。

    一、广播变量

    a是在driver端创建的,但是因为需要在excutor端使用,所以driver会把a以task的形式发送到excutor端,如果有很多个task,就会有很多给excutor端携带很多个a,如果这个a换成一个数据集非常大的数据时,就可能会造成内存溢出(如下图所示)。这个时候就引出了广播变量。

    image.png

    使用广播变量后,广播变量允许程序员在每台机器上缓存一个只读变量,而不是将其副本与任务一起发送

    image.png

    从代码层面看使用广播变量和不使用广播变量的区别
    不使用广播变量

    package com.soul.bigdata.spark.core4
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    object BroadCastApp {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
          .setAppName("AccumulatorApp").setMaster("local[2]")
    
        val sc = new SparkContext(conf)
    
        commonJoin(sc)
    
        Thread.sleep(30000 * 10)
    
        sc.stop()
      }
    
      def commonJoin(sc: SparkContext): Unit = {
        val info1 = sc.parallelize(Array(("601", "张三"), ("602", "李四")))
        val info2 = sc.parallelize(Array(("601", "哈弗", "25"), ("603", "浙大", "22"), ("603", "深大", "26")))
          .map(x => (x._1, (x._2, x._3)))
        //TODO 需得到 601,张三,哈弗
        info1.join(info2).map(x => {
          //(601,(张三,(哈弗,25)))
          x._1 + "," + x._2._1 + "," + x._2._2._1
        }).foreach(println)
      }
    
    }
    
    
    image.png

    使用广播变量后

    package com.soul.bigdata.spark.core4
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    object BroadCastApp {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
          .setAppName("AccumulatorApp").setMaster("local[2]")
    
        val sc = new SparkContext(conf)
    
        broadcastJoin(sc)
    
    
        Thread.sleep(30000 * 10)
    
        sc.stop()
      }
    
      def broadcastJoin(sc: SparkContext): Unit = {
        //小数据 -> 广播
        val info1 = sc.parallelize(Array(("601", "张三"), ("602", "李四"))).collectAsMap() //转成Map 可以通过get得到key
    
        //Driver数据才需广播
        val broadcastinfo1 = sc.broadcast(info1)
    
        //大数据
        val info2 = sc.parallelize(Array(("601", "哈弗", "25"), ("603", "浙大", "22"), ("603", "深大", "26")))
          .map(x => (x._1, (x._2, x._3)))
    
        //broadcst以后就不会用Join实现。而是大表数据读取出来一条就和广播出去的小表记录做匹配
    
        info2.mapPartitions(x => {
          val broadcastMap = broadcastinfo1.value
    
          for ((key, value) <- x if broadcastMap.contains(key))
              //TODO 需得到 601,张三,哈弗
            yield (key, broadcastMap.get(key).getOrElse(), value._1)
    
        }).foreach(println)
      }
    
    }
    
    image.png

    一个存在shuffle,一个不存在shuffle。性能对比一目了然,但是广播的前提是你的数据不能太大,否则也会发生OOM。
    注意事项

    • 只能广播RDD的结果数据,不能直接广播RDD
    • 广播变量只能在Driver端定义,不能在Executor端定义

    一、累加器

    1、先看一个列子

    package com.soul.bigdata.spark.core4
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    object AccumulatorApp {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
          .setAppName("AccumulatorApp").setMaster("local[2]")
        val sc = new SparkContext(conf)
        val line = sc.textFile("file:///D:\\RZ-G6\\2019G6\\data\\wordcount.txt")
    
        var i = 0
        val result = line.map(x => {
          i = i + 1
          x
        })
        result.collect().foreach(println)
        //0  driver端数据与executor端数据不能共享导致 所以Spark就引出累加器
        println("不用累加器统计 word lines is " + i)
        sc.stop()
    
      }
    }
    

    运行结果为0


    image.png

    依然是因为driver端数据与executor端数据不能共享导致,所以Spark就引出累加器。

    使用累加器之后

    package com.soul.bigdata.spark.core4
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    object AccumulatorApp {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
          .setAppName("AccumulatorApp").setMaster("local[2]")
        val sc = new SparkContext(conf)
        var accu = sc.longAccumulator("MyAccumulator")
        println("累加器原始值: " + accu.value)
    
        val line = sc.textFile("file:///D:\\RZ-G6\\2019G6\\data\\wordcount.txt")
    
           val result2 = line.map(x => {
             accu.add(1)//有一行数据就增加1
             x
           })
    
           //.foreach(println) //必须触发一个action算子 将结果返回到Driver    累加器的值只有Driver可以读取   executor端只能累加计数器得到值,但不能获取
           result2.collect()
           println("使用累加器统计 word lines is " +accu.value)
    
    
        sc.stop()
    
      }
    }
    
    image.png
    image.png

    累加器的作用:提供了将工作节点中的值聚合到驱动器程序中的简单语法

    注意事项:累加器在Driver端定义赋初始值,累加器只能在Driver端读取,在Excutor端更新

    相关文章

      网友评论

          本文标题:Spark累加器及广播变量的探讨

          本文链接:https://www.haomeiwen.com/subject/fgtqzqtx.html