1. Accumulator(累加器):分布式共享只写变量
求和示例:对rdd(两个分区)数据求和,driver端的sum传到不同executor分区计算,结果并不会返回driver端,如下代码所示:
val sc = SparkSession.builder.master("local").appName("test").getOrCreate()
val rdd = sc.sparkContext.parallelize(List(
1, 2, 3, 4
),2)
var sum = 0
rdd.foreach(e => {
sum += e
println(sum)
})
println("***")
println(sum)
sc.stop()
输出结果如下,每个分区获取sum独立计算结果。driver端sum值不变:
1
3
3
7
***
0
Acc变量会将每个分区计算结果返回driver端再合并结果。
val sc = SparkSession.builder.master("local").appName("test").getOrCreate()
val rdd = sc.sparkContext.parallelize(List(
1, 2, 3, 4
),2)
val sum = sc.sparkContext.longAccumulator("sum")
rdd.foreach(e => {
sum.add(e)
println(sum.value)
})
println("***")
println(sum.value)
sc.stop()
通过累加器执行,结果复合预期:
1
3
3
7
***
10
累加器原理:
少加:转换算子中调用累加器,如果转换后的rdd没有调用行动算子,累加器不会执行。
多加:如果算子中调用累加器后多次执行行动算子,则会多加一次。
executor之间acc不能互相访问,只有dirver端可以调用各分区的acc结果。
应用实例:wordcount
自己构造累加器实现wc功能,不需要reduceByKey的shullfe操作
object Test {
def main(args: Array[String]): Unit = {
val sc = SparkSession.builder.master("local").appName("test").getOrCreate()
val rdd = sc.sparkContext.parallelize(List(
"hello word", "hello spark"
), 2)
//1. reduceBykey会产生shullfe操作
val res = rdd.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
res.collect().foreach(println)
//2. 通过累加器实现
val wc = new WcAccumulator()
sc.sparkContext.register(wc, "wc")
rdd.flatMap(_.split(" ")).foreach(
word => {
wc.add(word)
}
)
println(wc.value)
sc.stop()
}
/*
AccumulatorV2[IN, OUT]
IN:输入类型
OUT:返回类型
*/
class WcAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
private var map = mutable.Map[String, Long]()
//判断是否为初始状态
override def isZero: Boolean = map.isEmpty
override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
new WcAccumulator()
}
override def reset(): Unit = map.clear()
//累加器计算规则
override def add(v: String): Unit = {
val count = map.getOrElse(v, 0L) + 1
map.update(v, count)
}
//driver合并多个累加器:将other的OUT合并到当前的OUT
override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
other.value.foreach {
case (word, count) => {
val newCount=this.map.getOrElse(word,0L)+count
this.map.update(word,newCount)
}
}
}
//累加器结果
override def value: mutable.Map[String, Long] = map
}
}
输出结果如下:两种方式结果一致
(word,1)
(hello,2)
(spark,1)
Map(spark -> 1, word -> 1, hello -> 2)
2. broadcast variables(广播变量):分布式共享只读变量
闭包数据以task为单位发送,一个executor中如果有多个tasks,则会包含多个重复的闭包数据。广播变量实现了一个executor(JVM)只保存一份闭包数据在内存中,多个tasks共享此数据。
//封装
val bc = sc.broadcast(values)
//获取
bc.value
网友评论