美文网首页
Spark从入门到放弃—累加器和广播变量

Spark从入门到放弃—累加器和广播变量

作者: HaloZhang | 来源:发表于2021-02-26 15:06 被阅读0次

简介

累加器(accumulator)是Spark中提供的一种分布式的变量机制,其原理类似于MapReduce,即分布式的改变,然后聚合这些改变。累加器用来把 Executor端变量信息聚合到 Driver 端。在Driver程序中定义的变量,在Executor端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后, 传回 Driver 端进行 merge。Spark原生地支持数值型(numeric)的累加器,包含longAccumulator和DoubleAccumulator,运行在集群中的任务可以使用add方法来将数值累加到累加器上。但是这些任务只能做累加操作,不能读取累加器的值,只有任务的控制节点才可以使用value方法来获取累加器的值,除了数值类型的累加器外,程序开发人员可以通过继承AccumulatorV2类来支持新的类型。

累加器运行示意图

广播变量(Broadcast Variables)允许程序员在每一台机器上都缓存一个只读变量,而不是将其副本与任务一起发送到每个执行节点上。它可以一种很高效的方式向所有的工作节点发送一个较大的只读值,以供一个或者多个Spark操作使用。除此之外,Spark也尝试着使用高效的广播算法来分发广播变量,从而降低网络传输造成的损失。

累加器创建以及使用

累加器的应用场景相对于RDD来说较为单一,它的一个常见用途是对作业执行过程中的事件进行计数。下面举一个简单的例子来说明累加器的使用,以对一个数组中的所有元素进行累加为例,代码如下:

val accum = sc.longAccumulator("My Acc")
sc.makeRDD(1 to 9).foreach(x => accum.add(x)) //对RDD中的每个元素都进行累加
println(accum.value)
结果为:

自定义累加器

如果我们需要创建属于我们自己定义的类型的累加器,那么可以通过继承AccumulatorV2来实现。AccumulatorV2抽象类拥有许多需要实现的抽象方法:

  • reset方法
    是用来将累加器置为0
  • add方法
    将一个值累加到累加器中
  • merge方法
    用来将另外一个同样类型的累加器合并到当前的累加器
  • copy方法
    用来创建当前累加器的一个副本
  • isZero方法
    返回当前的累加器是否是0值
  • value方法
    定义了当前累加的值

上述都是抽象方法,我们在子类中需要重写这些方法。
下面以WordCount为例,实现一个自定义的Accumulator,代码如下:

package rdd.builder

import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

object RDD_Memory {
    def main(args: Array[String]): Unit = {
        val SparkConnf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        val sc = new SparkContext(SparkConnf)
        val rdd = sc.makeRDD(List("hello", "spark", "hello", "scala"))

        // 累加器 : WordCount
        // 创建累加器对象
        val wcAcc = new MyAccumulator()
        // 向Spark进行注册自定义累加器
        sc.register(wcAcc, "wordCountAcc")

        rdd.foreach(
            word => {
                // 数据的累加(使用累加器)
                wcAcc.add(word)
            }
        )

        // 获取累加器累加的结果
        println(wcAcc.value)

        sc.stop()
    }

    class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
        private var wcMap = mutable.Map[String, Long]()

        // 判断是否初始状态
        override def isZero: Boolean = {
            wcMap.isEmpty
        }

        override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
            new MyAccumulator()
        }

        override def reset(): Unit = {
            wcMap.clear()
        }

        // 获取累加器需要计算的值
        override def add(word: String): Unit = {
            val newCnt = wcMap.getOrElse(word, 0L) + 1
            wcMap.update(word, newCnt)
        }

        // Driver合并多个累加器
        override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {

            val map1 = this.wcMap
            val map2 = other.value

            map2.foreach{
                case ( word, count ) => {
                    val newCount = map1.getOrElse(word, 0L) + count
                    map1.update(word, newCount)
                }
            }
        }

        // 累加器结果
        override def value: mutable.Map[String, Long] = {
            wcMap
        }
    }
}
运行结果: WordCount结果

广播变量

为什么要将变量定义为广播变量

如果我们要在分布式计算里面分发大对象,例如:字典,集合,黑白名单等,这个都会有Driver端进行分发。一般来说,如果这个变量不是广播变量,那么每个task都会持有该变量的一个副本,这在task数目十分多的情况下,Driver的带宽会成为系统的瓶颈,而且会大量消耗task服务器上的资源。如果将这个资源声明为广播变量,那么此时每个Executor节点只会包含一份该变量的副本,那么此Executor上启动的task都可以访问到此变量,故节省了网络传输的成本和服务器的资源。
下面的两幅图展示了不使用和使用广播变量的区别:

不使用广播变量: 使用广播变量:

一个简单的例子如下,此例子是为了统计当前RDD中元素的key在广播变量中出现的次数,代码如下:

package rdd.builder

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

object RDD_Memory {
    def main(args: Array[String]): Unit = {
        val SparkConnf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        val sc = new SparkContext(SparkConnf)

        val rdd1 = sc.makeRDD(List(
            ("a", 1),("b", 2),("c", 3),("d", 9)
        ))
        val map = mutable.Map(("a", 4),("b", 5),("c", 6), ("e", 8))

        // 封装广播变量
        val bc: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map)

        rdd1.map {
            case (w, c) => {
                // 方法广播变量
                val l: Int = bc.value.getOrElse(w, 0)
                (w, (c, l))
            }
        }.collect().foreach(println)

        sc.stop()
    }
}
结果如下:

参考

相关文章

网友评论

      本文标题:Spark从入门到放弃—累加器和广播变量

      本文链接:https://www.haomeiwen.com/subject/coumfltx.html