Spark Broadcast

作者: 进击的码奴 | 来源:发表于2018-11-01 18:55 被阅读4次

    Broadcast Variables(广播变量)

    Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.

    (广播变量容许程序在每台机器上保存一份只读变量缓存,而不是为每一个任务发送一个副本。他们可以被使用,例如以一种高效的方式给每一个节点拷贝一份大的数据集。spark 试图通过分布式广播变量来有效低减少通信开销。)

    以上直意,意思是这样。就是spark执行任务的时候会用到共享变量。一般方式是为每个task拷贝一份共享变量。那么问题来了,如果一个机器上有上百个task那么就要拷贝上百次,延迟不说,对内存溢出造成隐患。那么使用广播变量后,所有任务公用一个只读变量。有点类似于readonly,那么只需要传输一次,且内存保留一个副本,大大提高效率。

    那么广播变量这么有用,大家铁定多用,不知道有木有遇到空指正bug的。

    spark Broadcast 空指正异常:(看下面代码 查查他有没有毛病)

    package com.migu.dpi

    import java.util

    import org.apache.spark.{SparkConf, SparkContext}

    import org.apache.spark.broadcast.Broadcast

    object Test2 {

    //代码纯属为了引出问题 不要较真

      var data: Broadcast[java.util.ArrayList[String]] =null

      def formatFlag(sc: SparkContext, datas: util.ArrayList[String]):Unit = {

    data = sc.broadcast(datas)

    }

    def main(args: Array[String]):Unit = {

    var datas: util.ArrayList[String] =new util.ArrayList[String]()

    datas.add("mmp")

    var conf =new SparkConf().setAppName(Test2.getClass.getName).setMaster("local[2]")

    var sc =new SparkContext(conf)

    formatFlag(sc, datas)

    //...... args(0) is a path

    sc.textFile(args(0)).map(x => {

    x +data.value.get(0)

    }).foreach(println)

      }

    }

    如果没有那么这样呢,提交到集群呢?

    package com.migu.dpi

    import java.util

    import org.apache.spark.{SparkConf, SparkContext}

    import org.apache.spark.broadcast.Broadcast

    object Test2 {

    //代码纯属为了引出问题 不要较真

      var data: Broadcast[java.util.ArrayList[String]] =null

      def formatFlag(sc: SparkContext, datas: util.ArrayList[String]):Unit = {

    data = sc.broadcast(datas)

    }

    def main(args: Array[String]):Unit = {

    var datas: util.ArrayList[String] =new util.ArrayList[String]()

    datas.add("mmp")

    var conf =new SparkConf().setAppName(Test2.getClass.getName)

    var sc =new SparkContext(conf)

    formatFlag(sc, datas)

    //...... args(0) is a path

    sc.textFile(args(0)).map(x => {

    x +data.value.get(0)

    }).foreach(println)

      }

    }

    这时候你会发现,第二种情况,报data广播变量空指正异常。但是你会想,不是格式化过了吗?是的,我当时也是这么想的。

    但是 我们 来看看spark提交任务是怎么干的。

    Spark actions are executed through a set of stages, separated by distributed “shuffle” operations. Spark automatically broadcasts the common data needed by tasks within each stage. The data broadcasted this way is cached in serialized form and deserialized before running each task. This means that explicitly creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.

    (意译:spark行为是通过一个stage集执行来完成的。stage是被分布式shuffle动作分割的。spark自动将广播变量广播到每一个需要的节点上。广播变量广播是在每一个运行任务之前完成的。下面没用不译了

    spark 对任务分发是通过 预读码 然后 根据编译中代码中行为动作和转换动作来做shuffle切分的。切分好后把各个任务发送到各个节点,在任务执行前,把广播变量发过去。具体谁前谁后,我猜是广播变量在前,因为task很多,有些广播变量贯穿整个stage,所以我也就不读源码直接猜了。

    那么问题来了,代码没运行你的广播变量格式化个毛啊。所以除非本地运行,否则其他机器全空指正异常。

    造成这个现象的根本原因是——全局变量的滥用。所以自我检讨一下,能用局部变量别用全局,除非局部变量造成内存溢出,为了防止OOM。其他别为了省几个变量的空间而使用全局变量。

    相关文章

      网友评论

        本文标题:Spark Broadcast

        本文链接:https://www.haomeiwen.com/subject/zhaxxqtx.html