美文网首页
Spark从入门到放弃—累加器和广播变量

Spark从入门到放弃—累加器和广播变量

作者: HaloZhang | 来源:发表于2021-02-26 15:06 被阅读0次

    简介

    累加器(accumulator)是Spark中提供的一种分布式的变量机制,其原理类似于MapReduce,即分布式的改变,然后聚合这些改变。累加器用来把 Executor端变量信息聚合到 Driver 端。在Driver程序中定义的变量,在Executor端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后, 传回 Driver 端进行 merge。Spark原生地支持数值型(numeric)的累加器,包含longAccumulator和DoubleAccumulator,运行在集群中的任务可以使用add方法来将数值累加到累加器上。但是这些任务只能做累加操作,不能读取累加器的值,只有任务的控制节点才可以使用value方法来获取累加器的值,除了数值类型的累加器外,程序开发人员可以通过继承AccumulatorV2类来支持新的类型。

    累加器运行示意图

    广播变量(Broadcast Variables)允许程序员在每一台机器上都缓存一个只读变量,而不是将其副本与任务一起发送到每个执行节点上。它可以一种很高效的方式向所有的工作节点发送一个较大的只读值,以供一个或者多个Spark操作使用。除此之外,Spark也尝试着使用高效的广播算法来分发广播变量,从而降低网络传输造成的损失。

    累加器创建以及使用

    累加器的应用场景相对于RDD来说较为单一,它的一个常见用途是对作业执行过程中的事件进行计数。下面举一个简单的例子来说明累加器的使用,以对一个数组中的所有元素进行累加为例,代码如下:

    val accum = sc.longAccumulator("My Acc")
    sc.makeRDD(1 to 9).foreach(x => accum.add(x)) //对RDD中的每个元素都进行累加
    println(accum.value)
    
    结果为:

    自定义累加器

    如果我们需要创建属于我们自己定义的类型的累加器,那么可以通过继承AccumulatorV2来实现。AccumulatorV2抽象类拥有许多需要实现的抽象方法:

    • reset方法
      是用来将累加器置为0
    • add方法
      将一个值累加到累加器中
    • merge方法
      用来将另外一个同样类型的累加器合并到当前的累加器
    • copy方法
      用来创建当前累加器的一个副本
    • isZero方法
      返回当前的累加器是否是0值
    • value方法
      定义了当前累加的值

    上述都是抽象方法,我们在子类中需要重写这些方法。
    下面以WordCount为例,实现一个自定义的Accumulator,代码如下:

    package rdd.builder
    
    import org.apache.spark.util.AccumulatorV2
    import org.apache.spark.{SparkConf, SparkContext}
    
    import scala.collection.mutable
    
    object RDD_Memory {
        def main(args: Array[String]): Unit = {
            val SparkConnf = new SparkConf().setMaster("local[*]").setAppName("RDD")
            val sc = new SparkContext(SparkConnf)
            val rdd = sc.makeRDD(List("hello", "spark", "hello", "scala"))
    
            // 累加器 : WordCount
            // 创建累加器对象
            val wcAcc = new MyAccumulator()
            // 向Spark进行注册自定义累加器
            sc.register(wcAcc, "wordCountAcc")
    
            rdd.foreach(
                word => {
                    // 数据的累加(使用累加器)
                    wcAcc.add(word)
                }
            )
    
            // 获取累加器累加的结果
            println(wcAcc.value)
    
            sc.stop()
        }
    
        class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
            private var wcMap = mutable.Map[String, Long]()
    
            // 判断是否初始状态
            override def isZero: Boolean = {
                wcMap.isEmpty
            }
    
            override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
                new MyAccumulator()
            }
    
            override def reset(): Unit = {
                wcMap.clear()
            }
    
            // 获取累加器需要计算的值
            override def add(word: String): Unit = {
                val newCnt = wcMap.getOrElse(word, 0L) + 1
                wcMap.update(word, newCnt)
            }
    
            // Driver合并多个累加器
            override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
    
                val map1 = this.wcMap
                val map2 = other.value
    
                map2.foreach{
                    case ( word, count ) => {
                        val newCount = map1.getOrElse(word, 0L) + count
                        map1.update(word, newCount)
                    }
                }
            }
    
            // 累加器结果
            override def value: mutable.Map[String, Long] = {
                wcMap
            }
        }
    }
    
    运行结果: WordCount结果

    广播变量

    为什么要将变量定义为广播变量

    如果我们要在分布式计算里面分发大对象,例如:字典,集合,黑白名单等,这个都会有Driver端进行分发。一般来说,如果这个变量不是广播变量,那么每个task都会持有该变量的一个副本,这在task数目十分多的情况下,Driver的带宽会成为系统的瓶颈,而且会大量消耗task服务器上的资源。如果将这个资源声明为广播变量,那么此时每个Executor节点只会包含一份该变量的副本,那么此Executor上启动的task都可以访问到此变量,故节省了网络传输的成本和服务器的资源。
    下面的两幅图展示了不使用和使用广播变量的区别:

    不使用广播变量: 使用广播变量:

    一个简单的例子如下,此例子是为了统计当前RDD中元素的key在广播变量中出现的次数,代码如下:

    package rdd.builder
    
    import org.apache.spark.broadcast.Broadcast
    import org.apache.spark.util.AccumulatorV2
    import org.apache.spark.{SparkConf, SparkContext}
    
    import scala.collection.mutable
    
    object RDD_Memory {
        def main(args: Array[String]): Unit = {
            val SparkConnf = new SparkConf().setMaster("local[*]").setAppName("RDD")
            val sc = new SparkContext(SparkConnf)
    
            val rdd1 = sc.makeRDD(List(
                ("a", 1),("b", 2),("c", 3),("d", 9)
            ))
            val map = mutable.Map(("a", 4),("b", 5),("c", 6), ("e", 8))
    
            // 封装广播变量
            val bc: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map)
    
            rdd1.map {
                case (w, c) => {
                    // 方法广播变量
                    val l: Int = bc.value.getOrElse(w, 0)
                    (w, (c, l))
                }
            }.collect().foreach(println)
    
            sc.stop()
        }
    }
    
    结果如下:

    参考

    相关文章

      网友评论

          本文标题:Spark从入门到放弃—累加器和广播变量

          本文链接:https://www.haomeiwen.com/subject/coumfltx.html