简介
累加器(accumulator)是Spark中提供的一种分布式的变量机制,其原理类似于MapReduce,即分布式的改变,然后聚合这些改变。累加器用来把 Executor端变量信息聚合到 Driver 端。在Driver程序中定义的变量,在Executor端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后, 传回 Driver 端进行 merge。Spark原生地支持数值型(numeric)的累加器,包含longAccumulator和DoubleAccumulator,运行在集群中的任务可以使用add方法来将数值累加到累加器上。但是这些任务只能做累加操作,不能读取累加器的值,只有任务的控制节点才可以使用value方法来获取累加器的值,除了数值类型的累加器外,程序开发人员可以通过继承AccumulatorV2类来支持新的类型。
累加器运行示意图广播变量(Broadcast Variables)允许程序员在每一台机器上都缓存一个只读变量,而不是将其副本与任务一起发送到每个执行节点上。它可以一种很高效的方式向所有的工作节点发送一个较大的只读值,以供一个或者多个Spark操作使用。除此之外,Spark也尝试着使用高效的广播算法来分发广播变量,从而降低网络传输造成的损失。
累加器创建以及使用
累加器的应用场景相对于RDD来说较为单一,它的一个常见用途是对作业执行过程中的事件进行计数。下面举一个简单的例子来说明累加器的使用,以对一个数组中的所有元素进行累加为例,代码如下:
val accum = sc.longAccumulator("My Acc")
sc.makeRDD(1 to 9).foreach(x => accum.add(x)) //对RDD中的每个元素都进行累加
println(accum.value)
结果为:
自定义累加器
如果我们需要创建属于我们自己定义的类型的累加器,那么可以通过继承AccumulatorV2来实现。AccumulatorV2抽象类拥有许多需要实现的抽象方法:
- reset方法
是用来将累加器置为0 - add方法
将一个值累加到累加器中 - merge方法
用来将另外一个同样类型的累加器合并到当前的累加器 - copy方法
用来创建当前累加器的一个副本 - isZero方法
返回当前的累加器是否是0值 - value方法
定义了当前累加的值
上述都是抽象方法,我们在子类中需要重写这些方法。
下面以WordCount为例,实现一个自定义的Accumulator,代码如下:
package rdd.builder
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
object RDD_Memory {
def main(args: Array[String]): Unit = {
val SparkConnf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(SparkConnf)
val rdd = sc.makeRDD(List("hello", "spark", "hello", "scala"))
// 累加器 : WordCount
// 创建累加器对象
val wcAcc = new MyAccumulator()
// 向Spark进行注册自定义累加器
sc.register(wcAcc, "wordCountAcc")
rdd.foreach(
word => {
// 数据的累加(使用累加器)
wcAcc.add(word)
}
)
// 获取累加器累加的结果
println(wcAcc.value)
sc.stop()
}
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
private var wcMap = mutable.Map[String, Long]()
// 判断是否初始状态
override def isZero: Boolean = {
wcMap.isEmpty
}
override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
new MyAccumulator()
}
override def reset(): Unit = {
wcMap.clear()
}
// 获取累加器需要计算的值
override def add(word: String): Unit = {
val newCnt = wcMap.getOrElse(word, 0L) + 1
wcMap.update(word, newCnt)
}
// Driver合并多个累加器
override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
val map1 = this.wcMap
val map2 = other.value
map2.foreach{
case ( word, count ) => {
val newCount = map1.getOrElse(word, 0L) + count
map1.update(word, newCount)
}
}
}
// 累加器结果
override def value: mutable.Map[String, Long] = {
wcMap
}
}
}
运行结果:
WordCount结果
广播变量
为什么要将变量定义为广播变量
如果我们要在分布式计算里面分发大对象,例如:字典,集合,黑白名单等,这个都会有Driver端进行分发。一般来说,如果这个变量不是广播变量,那么每个task都会持有该变量的一个副本,这在task数目十分多的情况下,Driver的带宽会成为系统的瓶颈,而且会大量消耗task服务器上的资源。如果将这个资源声明为广播变量,那么此时每个Executor节点只会包含一份该变量的副本,那么此Executor上启动的task都可以访问到此变量,故节省了网络传输的成本和服务器的资源。
下面的两幅图展示了不使用和使用广播变量的区别:
一个简单的例子如下,此例子是为了统计当前RDD中元素的key在广播变量中出现的次数,代码如下:
package rdd.builder
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
object RDD_Memory {
def main(args: Array[String]): Unit = {
val SparkConnf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(SparkConnf)
val rdd1 = sc.makeRDD(List(
("a", 1),("b", 2),("c", 3),("d", 9)
))
val map = mutable.Map(("a", 4),("b", 5),("c", 6), ("e", 8))
// 封装广播变量
val bc: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map)
rdd1.map {
case (w, c) => {
// 方法广播变量
val l: Int = bc.value.getOrElse(w, 0)
(w, (c, l))
}
}.collect().foreach(println)
sc.stop()
}
}
结果如下:
网友评论