美文网首页
Accumulator

Accumulator

作者: 一个人一匹马 | 来源:发表于2019-02-21 11:25 被阅读0次

    Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能。但是确给我们提供了多个task对一个变量并行操作的功能。但是task只能对Accumulator进行累加操作,不能读取它的值。只有Driver程序可以读取Accumulator的值。

    val sumAccumulator = sc.accumulator(0)
    val arr = Array(1, 2, 3, 4, 5)
    val rdd = sc.parallelize(arr)
    rdd.foreach(num => sumAccumulator += num)
    println(sumAccumulator.value)

    Java版本

    /**
     * 累加变量
     * @author Administrator
     *
     */
    public class AccumulatorVariable {
    
    public static void main(String[] args) {
    
    SparkConf conf = new SparkConf()​​​​.setAppName("Accumulator").setMaster("local");
    
    ​​JavaSparkContext sc = new JavaSparkContext(conf);
    
    ​​// 创建Accumulator变量
    ​​// 需要调用SparkContext的accumulator()方法
    ​​final Accumulator<Integer> sum = sc.accumulator(0);
    
    ​​List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5);
    
    ​​JavaRDD<Integer> numbers = sc.parallelize(numberList);
    
    ​​numbers.foreach(new VoidFunction<Integer>() {
    
    ​​​private static final long serialVersionUID = 1L;
    
    @Override
    public void call(Integer t) throws Exception {
    ​​​​// 然后在函数内部,就可以对Accumulator变量,调用add()方法,累加值
    ​​​​sum.add(t);  
    ​​​}
    ​​});
    
    // 在driver程序中,可以调用Accumulator的value()方法,获取其值
    ​​System.out.println(sum.value());  
    
    ​​sc.close();
    ​}
    }
    

    Scala版本

    object AccumulatorVariable {
    
    def main(args: Array[String]) {
    
    val conf = new SparkConf().setAppName("AccumulatorVariable").setMaster("local")
    
    val sc = new SparkContext(conf)
    
    val sum = sc.accumulator(0)
    
    val numberArray = Array(1,2,3,4,5)
    
    val numbers = sc.parallelize(numberArray, 1)
    
    numbers.foreach { num => sum += num }
    
    println(sum)
    
      }
    
    }

    相关文章

      网友评论

          本文标题:Accumulator

          本文链接:https://www.haomeiwen.com/subject/adcryqtx.html