一、为什么使用共享变量?
当Spark在使用某个算子对RDD进行运算时,如果需要用到外部变量,比如对RDD[Int]中的每个element乘以一个系数factor,得到一个新的RDD(定义在main方法中,driver进程):
val num = 3 # 外部变量
val newRdd = rdd.map(num => num * factor) # transformation操作,lazy方式执行
newRdd.count # action操作,立即执行
那么需要将factor从driver端通过网络传输到所有task中(task数=RDD的partition数,每个task只能处理自己的那个factor副本),如果说1个executor负责处理多个task,那么该executor将会得到多份相同的factor,这种方式无疑增加了网络传输的开销和内存开销。
很显然,如果同一个executor节点上的所有task能够共享同一份factor副本,那么将有效减小网络传输的开销和内存开销,这时候“共享变量”就闪亮登场了,这里的“共享“是指被多个task所共享。
Spark的共享变量包括两类:广播变量和累加变量。
上图红色箭头表示使用共享变量,蓝色箭头表示不使用共享变量。
二、Broadcast Variable 广播变量
2.1 广播变量的特点
- 广播变量即将一个外部变量从driver端高效地通过网络发送到各executor节点,采取的是高效地“广播”方式,即先发送给离driver端最近(Spark内部定义了这种近的衡量方法)的一个executor(比如executor A),然后executor A会将该变量发送到它附近的executor,以此类推这种指数型增长的发送方式,可以有效地节省网络传输开销;
- 一个application可能涉及到多个stage,在每个stage(driver中的DAG Scheduler将main方法代码解析得到DAG,分析每个stage得到该stage的taskset,taskset发送给task scheduler)中Spark会自动发送task所需的common data,只有当广播变量跨stage被task使用,才认为是有效的;
- 由于广播变量会涉及到网络传输,因此必然涉及到对象的序列化和反序列化,driver端的外部变量被序列化之后得到byte数组,通过网络发送到集群其他机器的executor节点并以序列化的格式缓存在内存中,再反序列化为对象(在task执行之前完成);
- executor只能读取(read-only)广播变量,而不能修改它;
- 广播变量是从driver(即main方法)中定义并发出(sc.broadcast(factor)),在executor端接收并使用(broadcast.value),比如:
# 被广播的外部变量list
val sparkSession = SparkSession.builder().master("yarn").appName("Datalake")getOrCreate()
val sc = sparkSession.sparkContext
// driver端定义广播变量
val listB = sc.broadcast(list)
// 初始化操作,用mappartition比map更加高效
val rddMap: RDD[(Int, Row)] = oldRdd.mapPartitions {
partition => {
// initialization
// executor端接收广播变量
InitUtil.init(listB.value)
partition.map(row => (getPartitionOrder(row), getNewRow(row)))
}
}
2.2 典型case:小表和大表做join操作
大表和小表做join操作时,可以把小表broadcast到各个节点,从而就可以把join操作转变成普通的操作(hashmap.get),以避免耗时的shuffle操作。
代码可以参考博客:https://www.jianshu.com/p/0c77036ad01b
三、Accumulator 累加变量
3.1 累加变量特点
- 累加变量主要用于多个task对同一个变量进行共享性(并行)的操作;
- 累加器主要分为三种,即:LongAccumulator(长整数)、DoubleAccumulator(浮点数)、CollectionAccumulator(集合);
- 可以实现自定义的累加器(比如BigDecimalAccumulator),比如:
import java.math.BigInteger
import org.apache.spark.util.AccumulatorV2
class BigIntegerAccumulator extends AccumulatorV2 {
var num = BigInteger.ZERO
def BigIntegerAccumulator(num: BigInteger) {
this.num = num
}
override def isZero: Boolean = {
num.compareTo(BigInteger.ZERO)
}
override def copy(): AccumulatorV2[Nothing, Nothing] = new BigIntegerAccumulator()
override def reset(): Unit = {
num = BigInteger.ZERO
}
override def add(v: Nothing): Unit = {
}
override def merge(other: AccumulatorV2[Nothing, Nothing]): Unit = {
num = num.add(other.value)
}
override def value(): BigInteger = {
num
}
}
import org.apache.spark.sql.SparkSession
class Main {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
val sc = spark.sparkContext
// 直接new自定义的累加器
val bigIntegerAccumulator = new BigIntegerAccumulator()
// 然后在SparkContext上注册一下
sc.register(bigIntegerAccumulator, "bigIntegerAccumulator")
bigIntegerAccumulator.reset()
rdd.repartition(6).map(element => {
println(element)
bigIntegerAccumulator.add(new BigInteger("1"))
})
rdd.count()
// rdd.count()
}
}
- 重点:累加变量的最终结果应该不受累加顺序的影响,比如StringAccumulator就是一个错误的例子,就相当于开了多个线程,每个线程随机sleep若干毫秒然后往StringBuffer中追加字符,但是最后追加出来的字符串是无法被预测的;
- executor端只能对累加变量做累加操作,driver端只能读取累加变量的值;
3.2 使用陷进:避免重复累加
每执行一个task,就会对累加变量做一次累加操作。我们知道Spark存在两种操作,即transformation和action,前者是lazy方式执行,只有遇到action操作才会执行之前的transformation操作(当前action和上一个action之间的)。设想,如果一个application的main方法中,对一个RDD先后执行了1个transfromation操作和2个action操作的话,那么同样的transformation操作会被执行两次,这就会导致执行了2*partition个数(个)task,那么累加变量就被重复多做了“parttiion个数”次累加操作。比如:
class Main {
def main(args: Array[String]): Unit = {
......
rdd.repartition(6).map(element => {
println(element)
bigIntegerAccumulator.add(new BigInteger("1"))
})
rdd.count()
rdd.count()
}
}
解决方法就是:对上一次的transformation计算结果进行cache,这样的话遇到第二个count操作时就不会再做一次transformation操作了,比如:
class Main {
def main(args: Array[String]): Unit = {
......
rdd.repartition(6).map(element => {
println(element)
bigIntegerAccumulator.add(new BigInteger("1"))
}).cache()
rdd.count()
rdd.count()
}
}
执行第一个count时,会执行 rdd.repartition(6).map() 操作并将结果进行了cache;执行第二个count时,不会再执行rdd.repartition(6).map() 操作。
参考:
https://blog.csdn.net/qq_35866165/article/details/86671302
网友评论