美文网首页Spark
Spark累加器

Spark累加器

作者: wudl | 来源:发表于2021-06-17 17:44 被阅读0次

    1. spark 累加器的原理:

    累加器用来把Executor端变量信息聚合到Driver端。
    在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。
    

    2. 系统累加器

     获取累加器的值
    1. 少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
    2. 多加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
    
    package com.wudl.core
    import org.apache.spark.{SparkConf, SparkContext}
    /**
     *
     * @author wudl
     * @create 2021/6/17 14:21
     * @description add 的累加
     */
    object AddRDD {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setMaster("local[1]").setAppName("AddRdd")
        val sc = new SparkContext(sparkConf)
        val rdd = sc.makeRDD(List(1, 2, 3, 4, 5))
        /**
         * 系统的累加器
         */
        val sum = sc.longAccumulator("sum")
        val mapRdd = rdd.map(
          num => {
            sum.add(num)
          }
        )
        // 获取累加器的值
        // 少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
        // 多加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
        mapRdd.collect();
        mapRdd.collect()
        mapRdd.collect()
        println(sum.value)
        sc.stop()
      }
    }
    
    

    2. 自定义累加器(实现单词统计的累加)

    package com.wudl.core
    import org.apache.spark.util.AccumulatorV2
    import org.apache.spark.{SparkConf, SparkContext}
    import scala.collection.mutable
    /**
     *
     * @author wudl
     * @create 2021/6/17 17:08
     * @description  自定义累加器然 统计单词出现的次数
     */
    object AccumulatorWordCount {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setMaster("local[1]").setAppName("AccunlatorCount")
        val sc = new SparkContext(sparkConf)
        val dataRdd = sc.makeRDD(List("java", "spark", "flink", "hbase", "spark"))
        val mywcc = new myAccumulator()
        // 向spark进行注册
        sc.register(mywcc, "workCountWcc")
        dataRdd.foreach(
          word => {
            mywcc.add(word)
          }
        )
        println(mywcc.value)
        sc.stop()
      }
    
      /**
       * 自定义累加器
       */
      class myAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
        private var wcMap = mutable.Map[String, Long]()
        /** 设置初始化状态 */
        override def isZero: Boolean = {
          wcMap.isEmpty
        }
        override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
          new myAccumulator
        }
        // 清空上一次的内容
        override def reset(): Unit = {
          wcMap.clear()
        }
        // 添加新元素并且更新
        override def add(v: String): Unit = {
          val newCnt = wcMap.getOrElse(v, 0L) + 1
          wcMap.update(v, newCnt)
        }
        /**
         * Driver 合并多个累加器
         *
         * @param other
         */
        override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
          val map1 = this.wcMap
          val map2 = other.value
          map2.foreach {
            case (word, count) => {
              val newCount = map1.getOrElse(word, 0L) + count
              map1.update(word, newCount)
            }
          }
        }
        /**
         * 累加的结果
         *
         * @return
         */
        override def value: mutable.Map[String, Long] = {
          wcMap
        }
      }
    }
    
    

    相关文章

      网友评论

        本文标题:Spark累加器

        本文链接:https://www.haomeiwen.com/subject/njgcyltx.html