美文网首页
共享变量:累加器

共享变量:累加器

作者: ryancao_b9b9 | 来源:发表于2020-05-01 21:51 被阅读0次

    一、使用场景
    上一篇提到了共享变量可以解决Task重复拷贝Driver端变量的问题,但广播变量在Task中只读不能修改。实际的应用场景会在Spark Streaming应用中记录某些事件的数量,广播变量无法满足需求,所以出现了累加器机制。

    二、使用特性
    1、累加器只能在Driver端构建及并只能是Driver读取结果,Task只能累加。
    2、累加器不会改变Spark Lazy计算的特点(只会在Job触发的时候进行相关累加操作)。

    三、使用方式(简单)
    注意:系统自带Long和Double类型的累加器
    简单示例如下:

    val sparkConf = new SparkConf().setAppName("Test").setMaster("local[2]")
        val sc = new SparkContext(sparkConf)
        val accum = sc.longAccumulator("longAccum") //统计奇数的个数
        val sum = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).filter(n=>{
          if(n%2!=0) accum.add(1L) 
          n%2==0
        }).reduce(_+_)
        println("sum: "+sum)
        println("accum: "+accum.value)
        sc.stop()
    

    统计结果

    sum: 20
    accum: 5
    

    四、常见问题
    1、少加

    val accum = sc.longAccumulator("longAccum")
        val numberRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).map(n=>{
          accum.add(1L)
          n+1
        })
        println("accum: "+accum.value)
    

    结果:

    accum1:0
    

    原因:
    累加器不会改变spark的lazy的计算模型,即在打印的时候像map这样的transformation还没有真正的执行,从而累加器的值也就不会更新。

    2、多加

    val accum = sc.longAccumulator("longAccum")
        val numberRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).map(n=>{
          accum.add(1L)
          n+1
        })
        numberRDD.count
        println("accum1:"+accum.value)
        numberRDD.reduce(_+_)
        println("accum2: "+accum.value)
    

    结果:

    accum1:9
    accum2: 18
    

    原因:
    虽然只在map里进行了累加器加1的操作,但是两次得到的累加器的值却不一样,这是由于count和reduce都是action类型的操作,触发了两次作业的提交,所以map算子实际上被执行了了两次,在reduce操作提交作业后累加器又完成了一轮计数,所以最终累加器的值为18。究其原因是因为count虽然促使numberRDD被计出来,但是由于没有对其进行缓存,所以下次再次需要使用numberRDD这个数据集是,还需要从并行化数据集的部分开始执行计算。解释到这里,这个问题的解决方法也就很清楚了,就是在count之前调用numberRDD的cache方法(或persist),这样在count后数据集就会被缓存下来,reduce操作就会读取缓存的数据集而无需从头开始计算了。
    改进代码如下:

    val accum = sc.longAccumulator("longAccum")
        val numberRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).map(n=>{
          accum.add(1L)
          n+1
        })
        numberRDD.cache().count
        println("accum1:"+accum.value)
        numberRDD.reduce(_+_)
        println("accum2: "+accum.value)
    

    五、自定义累加器
    自定义累加器类型的功能在1.X版本中就已经提供了,但是使用起来比较麻烦,在2.0版本后,累加器的易用性有了较大的改进,而且官方还提供了一个新的抽象类:AccumulatorV2来提供更加友好的自定义类型累加器的实现方式。官方同时给出了一个实现的示例:CollectionAccumulator类,这个类允许以集合的形式收集spark应用执行过程中的一些信息。例如,我们可以用这个类收集Spark处理数据时的一些细节,当然,由于累加器的值最终要汇聚到driver端,为了避免 driver端的outofmemory问题,需要对收集的信息的规模要加以控制,不宜过大。
    实现自定义类型累加器需要继承AccumulatorV2并至少覆写下例中出现的方法,下面这个累加器可以用于在程序运行过程中收集一些文本类信息,最终以Set[String]的形式返回。

    import java.util
    import org.apache.spark.util.AccumulatorV2
    
    class LogAccumulator extends AccumulatorV2[String, java.util.Set[String]] {
      private val _logArray: java.util.Set[String] = new java.util.HashSet[String]()
     
      override def isZero: Boolean = {
        _logArray.isEmpty
      }
     
      override def reset(): Unit = {
        _logArray.clear()
      }
     
      override def add(v: String): Unit = {
        _logArray.add(v)
      }
     
      override def merge(other: AccumulatorV2[String, java.util.Set[String]]): Unit = {
        other match {
          case o: LogAccumulator => _logArray.addAll(o.value)
        }
     
      }
     
      override def value: java.util.Set[String] = {
        java.util.Collections.unmodifiableSet(_logArray)
      }
     
      override def copy(): AccumulatorV2[String, util.Set[String]] = {
        val newAcc = new LogAccumulator()
        _logArray.synchronized{
          newAcc._logArray.addAll(_logArray)
        }
        newAcc
      }
    }
    

    测试类:

    import java.util
     
    import org.apache.spark.util.AccumulatorV2
     
    class LogAccumulator extends AccumulatorV2[String, java.util.Set[String]] {
      private val _logArray: java.util.Set[String] = new java.util.HashSet[String]()
     
      override def isZero: Boolean = {
        _logArray.isEmpty
      }
     
      override def reset(): Unit = {
        _logArray.clear()
      }
     
      override def add(v: String): Unit = {
        _logArray.add(v)
      }
     
      override def merge(other: AccumulatorV2[String, java.util.Set[String]]): Unit = {
        other match {
          case o: LogAccumulator => _logArray.addAll(o.value)
        }
     
      }
     
      override def value: java.util.Set[String] = {
        java.util.Collections.unmodifiableSet(_logArray)
      }
     
      override def copy(): AccumulatorV2[String, util.Set[String]] = {
        val newAcc = new LogAccumulator()
        _logArray.synchronized{
          newAcc._logArray.addAll(_logArray)
        }
        newAcc
      }
    }
    

    本例中利用自定义的收集器收集过滤操作中被过滤掉的元素,当然这部分的元素的数据量不能太大。
    运行结果如下:

    sum; 32
    7cd 4b 2a 
    

    相关文章

      网友评论

          本文标题:共享变量:累加器

          本文链接:https://www.haomeiwen.com/subject/ghdcghtx.html