美文网首页
spark broadcast variables and Ac

spark broadcast variables and Ac

作者: caster | 来源:发表于2021-05-31 21:15 被阅读0次

    1. Accumulator(累加器):分布式共享只写变量

    求和示例:对rdd(两个分区)数据求和,driver端的sum传到不同executor分区计算,结果并不会返回driver端,如下代码所示:

    val sc = SparkSession.builder.master("local").appName("test").getOrCreate()
    
    val rdd = sc.sparkContext.parallelize(List(
      1, 2, 3, 4
    ),2)
    var sum = 0
    rdd.foreach(e => {
      sum += e
      println(sum)
    })
    println("***")
    println(sum)
    
    sc.stop()
    

    输出结果如下,每个分区获取sum独立计算结果。driver端sum值不变:

    1
    3
    3
    7
    ***
    0
    

    Acc变量会将每个分区计算结果返回driver端再合并结果。

    val sc = SparkSession.builder.master("local").appName("test").getOrCreate()
    
    val rdd = sc.sparkContext.parallelize(List(
      1, 2, 3, 4
    ),2)
    val sum = sc.sparkContext.longAccumulator("sum")
    rdd.foreach(e => {
      sum.add(e)
      println(sum.value)
    })
    println("***")
    println(sum.value)
    
    sc.stop()
    

    通过累加器执行,结果复合预期:

    1
    3
    3
    7
    ***
    10
    

    累加器原理:
    少加:转换算子中调用累加器,如果转换后的rdd没有调用行动算子,累加器不会执行。
    多加:如果算子中调用累加器后多次执行行动算子,则会多加一次。
    executor之间acc不能互相访问,只有dirver端可以调用各分区的acc结果。
    应用实例:wordcount
    自己构造累加器实现wc功能,不需要reduceByKey的shullfe操作

    object Test {
      def main(args: Array[String]): Unit = {
    
        val sc = SparkSession.builder.master("local").appName("test").getOrCreate()
    
        val rdd = sc.sparkContext.parallelize(List(
          "hello word", "hello spark"
        ), 2)
    
        //1. reduceBykey会产生shullfe操作
        val res = rdd.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
        res.collect().foreach(println)
    
        //2. 通过累加器实现
        val wc = new WcAccumulator()
        sc.sparkContext.register(wc, "wc")
    
        rdd.flatMap(_.split(" ")).foreach(
          word => {
            wc.add(word)
          }
        )
        println(wc.value)
    
        sc.stop()
    
      }
    
      /*
        AccumulatorV2[IN, OUT]
        IN:输入类型
        OUT:返回类型
       */
      class WcAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
    
        private var map = mutable.Map[String, Long]()
        //判断是否为初始状态
        override def isZero: Boolean = map.isEmpty
    
        override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
          new WcAccumulator()
        }
    
        override def reset(): Unit = map.clear()
    
        //累加器计算规则
        override def add(v: String): Unit = {
          val count = map.getOrElse(v, 0L) + 1
          map.update(v, count)
        }
    
        //driver合并多个累加器:将other的OUT合并到当前的OUT
        override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
          other.value.foreach {
            case (word, count) => {
              val newCount=this.map.getOrElse(word,0L)+count
              this.map.update(word,newCount)
            }
          }
        }
        //累加器结果
        override def value: mutable.Map[String, Long] = map
      }
    
    }
    

    输出结果如下:两种方式结果一致

    (word,1)
    (hello,2)
    (spark,1)
    Map(spark -> 1, word -> 1, hello -> 2)
    

    2. broadcast variables(广播变量):分布式共享只读变量

    闭包数据以task为单位发送,一个executor中如果有多个tasks,则会包含多个重复的闭包数据。广播变量实现了一个executor(JVM)只保存一份闭包数据在内存中,多个tasks共享此数据。

    //封装
    val bc = sc.broadcast(values)
    //获取
    bc.value
    

    相关文章

      网友评论

          本文标题:spark broadcast variables and Ac

          本文链接:https://www.haomeiwen.com/subject/qelusltx.html