美文网首页
Spark-累加器

Spark-累加器

作者: 布莱安托 | 来源:发表于2020-07-06 19:56 被阅读0次

我们在算子中使用算子外定义的变量时,通常是将Driver中定义好的变量,传递一个副本给Executor,在Executor中操作该副本并不会对Driver中的变量产生影响,例如:

object AccumulatorDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("AccumulatorDemo").setMaster("local[4]")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(Array(1,2,3,4), 2)
    var sum = 0
    rdd.foreach(sum += _)
    println("sum: " + sum)
  }
}

结果:

sum: 0

累加器是一个分布式只写变量,用来对信息进行聚合,可以实现所有分区处理时更新共享变量的功能,例如:

object AccumulatorDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("AccumulatorDemo").setMaster("local[4]")
    val sc = new SparkContext(conf)

    val rdd = sc.parallelize(Array(1,2,3,4), 2)

    val accumulator = sc.longAccumulator

    rdd.foreach(accumulator.add(_))

    println("sum: " + accumulator.value)

  }
}

得到结果:

sum: 10

累加器并不是只能做加法运算,累加器是对元素进行累加运算,我们用到的LongAccumulator是Spark实现的对于长整形的累加器,它实现了抽象类AccumulatorV2

对于抽象类AccumulatorV2 他的定义如下:

abstract class AccumulatorV2[IN, OUT] extends Serializable

看到他有两个泛型,一个表示输入的参数类型,一个表示输出的参数类型,同时它继承了Serializable,使得累加器对象能够被序列化后在中传输。

我们可以继承AccumulatorV2来实现一个自定义的累加器,例如:

class MyAccumulator extends AccumulatorV2[String, util.ArrayList[String]] {

  val list = new util.ArrayList[String]()

  override def isZero: Boolean = {
    list.isEmpty
  }

  override def copy(): AccumulatorV2[String, util.ArrayList[String]] = {
    new MyAccumulator
  }

  override def reset(): Unit = {
    list.clear()
  }

  override def add(v: String): Unit = {
    if (v.contains("h")) list.add(v)
  }

  override def merge(other: AccumulatorV2[String, util.ArrayList[String]]): Unit = {
    list.addAll(other.value)
  }

  override def value: util.ArrayList[String] = list
}

其中需要实现覆盖父类的6个抽象方法,分别是:

  1. isZero:判断累加器是否为初始状态
  2. copy:复制一个累加器对象
  3. reset:重置累加器
  4. add:执行累加操作
  5. merge:合并另一个累加器
  6. value:获取累加器的结果

我们使用自定义的累加器进行累加:

import org.apache.spark.{SparkConf, SparkContext}

object AccumulatorDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("AccumulatorDemo").setMaster("local[4]")
    val sc = new SparkContext(conf)

    val rdd = sc.parallelize(Array("hadoop", "spark", "hive", "scala", "world"), 2)

    val accumulator = new MyAccumulator
    sc.register(accumulator)

    rdd.foreach(accumulator.add)

    println("sum: " + accumulator.value)

  }
}

结果:

sum: [hive, hadoop]

使用自定义累加其是,需要调用SparkContext对象的register方法注册累加器对象

相关文章

  • Spark-累加器

    我们在算子中使用算子外定义的变量时,通常是将Driver中定义好的变量,传递一个副本给Executor,在Exec...

  • 09-flink-Accumulator(累加器)

    09-flink-Accumulator(累加器) 概念 Accumulator(累加器):累加器主要作用在用户操...

  • spark-天池O2O竞赛

    地址转移到 : spark-天池O2O竞赛

  • MongoDB聚合管道——累加器(转)

    累加器(Accumulators) 累加器本来只能使用与project。当在project中使用时,累加器则是针对...

  • Spark累加器

    1. spark 累加器的原理: 2. 系统累加器 2. 自定义累加器(实现单词统计的累加)

  • Spark累加器(Accumulator)

    什么是累加器 累加器:分布式共享只写变量。(Executor和Executor之间不能读数据)累加器用来把Exec...

  • spark之广播变量&累加器

    Spark两种共享变量:广播变量(broadcast variable)与累加器(accumulator)。累加器...

  • Spark快速入门(9) 高级话题:累加器变量

    本节我们会介绍一种在tasks之间共享可读写变量的方式,就是累加器变量。 累加器变量 累加器变量是在tasks之间...

  • Hive 入门

    Hive官网 Hive概述 Hive 的底层执行引擎有 :MapReduce,Tez,Spark- Hive on...

  • Spark 之累加器

    1. Overview 本文将通过闭包的概念引出累加器,并介绍累加器的基本使用以及如何自定义累加器,文章最后将说明...

网友评论

      本文标题:Spark-累加器

      本文链接:https://www.haomeiwen.com/subject/gmbgqktx.html