美文网首页Spark
Spark累加器

Spark累加器

作者: wudl | 来源:发表于2021-06-17 17:44 被阅读0次

1. spark 累加器的原理:

累加器用来把Executor端变量信息聚合到Driver端。
在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。

2. 系统累加器

 获取累加器的值
1. 少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
2. 多加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
package com.wudl.core
import org.apache.spark.{SparkConf, SparkContext}
/**
 *
 * @author wudl
 * @create 2021/6/17 14:21
 * @description add 的累加
 */
object AddRDD {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[1]").setAppName("AddRdd")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(List(1, 2, 3, 4, 5))
    /**
     * 系统的累加器
     */
    val sum = sc.longAccumulator("sum")
    val mapRdd = rdd.map(
      num => {
        sum.add(num)
      }
    )
    // 获取累加器的值
    // 少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
    // 多加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
    mapRdd.collect();
    mapRdd.collect()
    mapRdd.collect()
    println(sum.value)
    sc.stop()
  }
}

2. 自定义累加器(实现单词统计的累加)

package com.wudl.core
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
/**
 *
 * @author wudl
 * @create 2021/6/17 17:08
 * @description  自定义累加器然 统计单词出现的次数
 */
object AccumulatorWordCount {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[1]").setAppName("AccunlatorCount")
    val sc = new SparkContext(sparkConf)
    val dataRdd = sc.makeRDD(List("java", "spark", "flink", "hbase", "spark"))
    val mywcc = new myAccumulator()
    // 向spark进行注册
    sc.register(mywcc, "workCountWcc")
    dataRdd.foreach(
      word => {
        mywcc.add(word)
      }
    )
    println(mywcc.value)
    sc.stop()
  }

  /**
   * 自定义累加器
   */
  class myAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
    private var wcMap = mutable.Map[String, Long]()
    /** 设置初始化状态 */
    override def isZero: Boolean = {
      wcMap.isEmpty
    }
    override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
      new myAccumulator
    }
    // 清空上一次的内容
    override def reset(): Unit = {
      wcMap.clear()
    }
    // 添加新元素并且更新
    override def add(v: String): Unit = {
      val newCnt = wcMap.getOrElse(v, 0L) + 1
      wcMap.update(v, newCnt)
    }
    /**
     * Driver 合并多个累加器
     *
     * @param other
     */
    override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
      val map1 = this.wcMap
      val map2 = other.value
      map2.foreach {
        case (word, count) => {
          val newCount = map1.getOrElse(word, 0L) + count
          map1.update(word, newCount)
        }
      }
    }
    /**
     * 累加的结果
     *
     * @return
     */
    override def value: mutable.Map[String, Long] = {
      wcMap
    }
  }
}

相关文章

网友评论

    本文标题:Spark累加器

    本文链接:https://www.haomeiwen.com/subject/njgcyltx.html