我们在算子中使用算子外定义的变量时,通常是将Driver中定义好的变量,传递一个副本给Executor,在Executor中操作该副本并不会对Driver中的变量产生影响,例如:
object AccumulatorDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("AccumulatorDemo").setMaster("local[4]")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(Array(1,2,3,4), 2)
var sum = 0
rdd.foreach(sum += _)
println("sum: " + sum)
}
}
结果:
sum: 0
累加器是一个分布式只写变量,用来对信息进行聚合,可以实现所有分区处理时更新共享变量的功能,例如:
object AccumulatorDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("AccumulatorDemo").setMaster("local[4]")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(Array(1,2,3,4), 2)
val accumulator = sc.longAccumulator
rdd.foreach(accumulator.add(_))
println("sum: " + accumulator.value)
}
}
得到结果:
sum: 10
累加器并不是只能做加法运算,累加器是对元素进行累加运算,我们用到的LongAccumulator
是Spark实现的对于长整形的累加器,它实现了抽象类AccumulatorV2
对于抽象类AccumulatorV2
他的定义如下:
abstract class AccumulatorV2[IN, OUT] extends Serializable
看到他有两个泛型,一个表示输入的参数类型,一个表示输出的参数类型,同时它继承了Serializable
,使得累加器对象能够被序列化后在中传输。
我们可以继承AccumulatorV2
来实现一个自定义的累加器,例如:
class MyAccumulator extends AccumulatorV2[String, util.ArrayList[String]] {
val list = new util.ArrayList[String]()
override def isZero: Boolean = {
list.isEmpty
}
override def copy(): AccumulatorV2[String, util.ArrayList[String]] = {
new MyAccumulator
}
override def reset(): Unit = {
list.clear()
}
override def add(v: String): Unit = {
if (v.contains("h")) list.add(v)
}
override def merge(other: AccumulatorV2[String, util.ArrayList[String]]): Unit = {
list.addAll(other.value)
}
override def value: util.ArrayList[String] = list
}
其中需要实现覆盖父类的6个抽象方法,分别是:
-
isZero
:判断累加器是否为初始状态 -
copy
:复制一个累加器对象 -
reset
:重置累加器 -
add
:执行累加操作 -
merge
:合并另一个累加器 -
value
:获取累加器的结果
我们使用自定义的累加器进行累加:
import org.apache.spark.{SparkConf, SparkContext}
object AccumulatorDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("AccumulatorDemo").setMaster("local[4]")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(Array("hadoop", "spark", "hive", "scala", "world"), 2)
val accumulator = new MyAccumulator
sc.register(accumulator)
rdd.foreach(accumulator.add)
println("sum: " + accumulator.value)
}
}
结果:
sum: [hive, hadoop]
注:
使用自定义累加其是,需要调用SparkContext对象的register
方法注册累加器对象
网友评论