1. spark 累加器的原理:
累加器用来把Executor端变量信息聚合到Driver端。
在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。
2. 系统累加器
获取累加器的值
1. 少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
2. 多加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
package com.wudl.core
import org.apache.spark.{SparkConf, SparkContext}
/**
*
* @author wudl
* @create 2021/6/17 14:21
* @description add 的累加
*/
object AddRDD {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[1]").setAppName("AddRdd")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5))
/**
* 系统的累加器
*/
val sum = sc.longAccumulator("sum")
val mapRdd = rdd.map(
num => {
sum.add(num)
}
)
// 获取累加器的值
// 少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
// 多加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
mapRdd.collect();
mapRdd.collect()
mapRdd.collect()
println(sum.value)
sc.stop()
}
}
2. 自定义累加器(实现单词统计的累加)
package com.wudl.core
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
/**
*
* @author wudl
* @create 2021/6/17 17:08
* @description 自定义累加器然 统计单词出现的次数
*/
object AccumulatorWordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[1]").setAppName("AccunlatorCount")
val sc = new SparkContext(sparkConf)
val dataRdd = sc.makeRDD(List("java", "spark", "flink", "hbase", "spark"))
val mywcc = new myAccumulator()
// 向spark进行注册
sc.register(mywcc, "workCountWcc")
dataRdd.foreach(
word => {
mywcc.add(word)
}
)
println(mywcc.value)
sc.stop()
}
/**
* 自定义累加器
*/
class myAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
private var wcMap = mutable.Map[String, Long]()
/** 设置初始化状态 */
override def isZero: Boolean = {
wcMap.isEmpty
}
override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
new myAccumulator
}
// 清空上一次的内容
override def reset(): Unit = {
wcMap.clear()
}
// 添加新元素并且更新
override def add(v: String): Unit = {
val newCnt = wcMap.getOrElse(v, 0L) + 1
wcMap.update(v, newCnt)
}
/**
* Driver 合并多个累加器
*
* @param other
*/
override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
val map1 = this.wcMap
val map2 = other.value
map2.foreach {
case (word, count) => {
val newCount = map1.getOrElse(word, 0L) + count
map1.update(word, newCount)
}
}
}
/**
* 累加的结果
*
* @return
*/
override def value: mutable.Map[String, Long] = {
wcMap
}
}
}
网友评论