美文网首页
Spark的共享变量--累加变量和广播变量及其应用示例

Spark的共享变量--累加变量和广播变量及其应用示例

作者: alexlee666 | 来源:发表于2019-10-24 20:39 被阅读0次

    一、为什么使用共享变量?

    当Spark在使用某个算子对RDD进行运算时,如果需要用到外部变量,比如对RDD[Int]中的每个element乘以一个系数factor,得到一个新的RDD(定义在main方法中,driver进程):

    val num = 3      # 外部变量
    val newRdd = rdd.map(num => num * factor)    # transformation操作,lazy方式执行
    newRdd.count      # action操作,立即执行
    
    

    那么需要将factor从driver端通过网络传输到所有task中task数=RDD的partition数,每个task只能处理自己的那个factor副本),如果说1个executor负责处理多个task,那么该executor将会得到多份相同的factor,这种方式无疑增加了网络传输的开销和内存开销
    很显然,如果同一个executor节点上的所有task能够共享同一份factor副本,那么将有效减小网络传输的开销和内存开销,这时候“共享变量”就闪亮登场了,这里的“共享“是指被多个task所共享。

    Spark的共享变量包括两类:广播变量和累加变量

    Spark共享变量的优点
    上图红色箭头表示使用共享变量,蓝色箭头表示不使用共享变量。

    二、Broadcast Variable 广播变量

    2.1 广播变量的特点

    • 广播变量即将一个外部变量从driver端高效地通过网络发送到各executor节点,采取的是高效地“广播”方式,即先发送给离driver端最近(Spark内部定义了这种近的衡量方法)的一个executor(比如executor A),然后executor A会将该变量发送到它附近的executor,以此类推这种指数型增长的发送方式,可以有效地节省网络传输开销;
    • 一个application可能涉及到多个stage,在每个stage(driver中的DAG Scheduler将main方法代码解析得到DAG,分析每个stage得到该stage的taskset,taskset发送给task scheduler)中Spark会自动发送task所需的common data,只有当广播变量跨stage被task使用,才认为是有效的;
    • 由于广播变量会涉及到网络传输,因此必然涉及到对象的序列化和反序列化,driver端的外部变量被序列化之后得到byte数组,通过网络发送到集群其他机器的executor节点并以序列化的格式缓存在内存中,再反序列化为对象(在task执行之前完成);
    • executor只能读取(read-only)广播变量,而不能修改它;
    • 广播变量是从driver(即main方法)中定义并发出(sc.broadcast(factor)),在executor端接收并使用(broadcast.value),比如:
    # 被广播的外部变量list
    val sparkSession = SparkSession.builder().master("yarn").appName("Datalake")getOrCreate()
    val sc = sparkSession.sparkContext
    // driver端定义广播变量
    val listB = sc.broadcast(list)
    // 初始化操作,用mappartition比map更加高效
    val rddMap: RDD[(Int, Row)] = oldRdd.mapPartitions {
         partition => {
           // initialization
           // executor端接收广播变量
           InitUtil.init(listB.value)
           partition.map(row => (getPartitionOrder(row), getNewRow(row)))
         }
    }
    
    

    2.2 典型case:小表和大表做join操作

    大表和小表做join操作时,可以把小表broadcast到各个节点,从而就可以把join操作转变成普通的操作(hashmap.get),以避免耗时的shuffle操作。
    代码可以参考博客:https://www.jianshu.com/p/0c77036ad01b


    三、Accumulator 累加变量

    3.1 累加变量特点

    • 累加变量主要用于多个task对同一个变量进行共享性(并行)的操作
    • 累加器主要分为三种,即:LongAccumulator(长整数)、DoubleAccumulator(浮点数)、CollectionAccumulator(集合)
    • 可以实现自定义的累加器(比如BigDecimalAccumulator),比如:
    import java.math.BigInteger
    
    import org.apache.spark.util.AccumulatorV2
    
    class BigIntegerAccumulator extends AccumulatorV2 {
      var num = BigInteger.ZERO
    
      def BigIntegerAccumulator(num: BigInteger)  {
        this.num = num
      }
    
      override def isZero: Boolean = {
        num.compareTo(BigInteger.ZERO)
      }
    
      override def copy(): AccumulatorV2[Nothing, Nothing] = new BigIntegerAccumulator()
    
      override def reset(): Unit = {
        num = BigInteger.ZERO
      }
    
      override def add(v: Nothing): Unit = {
    
      }
    
      override def merge(other: AccumulatorV2[Nothing, Nothing]): Unit = {
        num = num.add(other.value)
      }
    
      override def value(): BigInteger = {
        num
      }
    }
    
    
    
    
    import org.apache.spark.sql.SparkSession
    
    class Main {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
        val sc = spark.sparkContext
        // 直接new自定义的累加器
        val bigIntegerAccumulator = new BigIntegerAccumulator()
        // 然后在SparkContext上注册一下
        sc.register(bigIntegerAccumulator, "bigIntegerAccumulator")
        bigIntegerAccumulator.reset()
        rdd.repartition(6).map(element => {
          println(element)
          bigIntegerAccumulator.add(new BigInteger("1"))
        })
        rdd.count()
        // rdd.count()
      }
    }
    
    
    • 重点:累加变量的最终结果应该不受累加顺序的影响,比如StringAccumulator就是一个错误的例子,就相当于开了多个线程,每个线程随机sleep若干毫秒然后往StringBuffer中追加字符,但是最后追加出来的字符串是无法被预测的;
    • executor端只能对累加变量做累加操作,driver端只能读取累加变量的值;

    3.2 使用陷进:避免重复累加

    每执行一个task,就会对累加变量做一次累加操作。我们知道Spark存在两种操作,即transformation和action,前者是lazy方式执行,只有遇到action操作才会执行之前的transformation操作(当前action和上一个action之间的)。设想,如果一个application的main方法中,对一个RDD先后执行了1个transfromation操作和2个action操作的话,那么同样的transformation操作会被执行两次,这就会导致执行了2*partition个数(个)task,那么累加变量就被重复多做了“parttiion个数”次累加操作。比如:

    class Main {
      def main(args: Array[String]): Unit = {
        ......
        rdd.repartition(6).map(element => {
          println(element)
          bigIntegerAccumulator.add(new BigInteger("1"))
        })
        rdd.count()
        rdd.count()
      }
    }
    
    

    解决方法就是:对上一次的transformation计算结果进行cache,这样的话遇到第二个count操作时就不会再做一次transformation操作了,比如:

    class Main {
      def main(args: Array[String]): Unit = {
        ......
        rdd.repartition(6).map(element => {
          println(element)
          bigIntegerAccumulator.add(new BigInteger("1"))
        }).cache()
        rdd.count()
        rdd.count()
      }
    }
    
    

    执行第一个count时,会执行 rdd.repartition(6).map() 操作并将结果进行了cache;执行第二个count时,不会再执行rdd.repartition(6).map() 操作。


    参考:
    https://blog.csdn.net/qq_35866165/article/details/86671302

    相关文章

      网友评论

          本文标题:Spark的共享变量--累加变量和广播变量及其应用示例

          本文链接:https://www.haomeiwen.com/subject/ycylvctx.html