Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能。但是确给我们提供了多个task对一个变量并行操作的功能。但是task只能对Accumulator进行累加操作,不能读取它的值。只有Driver程序可以读取Accumulator的值。
val sumAccumulator = sc.accumulator(0)
val arr = Array(1, 2, 3, 4, 5)
val rdd = sc.parallelize(arr)
rdd.foreach(num => sumAccumulator += num)
println(sumAccumulator.value)
Java版本
/**
* 累加变量
* @author Administrator
*
*/
public class AccumulatorVariable {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("Accumulator").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// 创建Accumulator变量
// 需要调用SparkContext的accumulator()方法
final Accumulator<Integer> sum = sc.accumulator(0);
List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> numbers = sc.parallelize(numberList);
numbers.foreach(new VoidFunction<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Integer t) throws Exception {
// 然后在函数内部,就可以对Accumulator变量,调用add()方法,累加值
sum.add(t);
}
});
// 在driver程序中,可以调用Accumulator的value()方法,获取其值
System.out.println(sum.value());
sc.close();
}
}
Scala版本
object AccumulatorVariable {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("AccumulatorVariable").setMaster("local")
val sc = new SparkContext(conf)
val sum = sc.accumulator(0)
val numberArray = Array(1,2,3,4,5)
val numbers = sc.parallelize(numberArray, 1)
numbers.foreach { num => sum += num }
println(sum)
}
}
网友评论