美文网首页
Spark之广播变量

Spark之广播变量

作者: 万事万物 | 来源:发表于2021-07-21 17:28 被阅读0次

什么是广播变量

广播变量:分布式共享只读变量。
广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个Spark操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark会为每个任务分别发送。

为什么需要广播变量

假设有一个集合数据量为500M,程序运行使用调用Action算子,任务会以task的形式发送到Executor中执行的。task会拿到这个数据的副本,并且每个task都会有一个副本。这样有什么问题呢?若task数量过大,资源占用就越高,Execuort的资源占用=task数量*副本数据量

草图
如上图,其实数据量只有500M,但是每个Execuort需要占用1000M的资源空间。若task更多,资源占用便更多。
若数据量小还没什么,但是数据量一大,那就比较致命了,所以广播变量出现了,就很好的解决了该问题。使用广播变量之后,数据副本将不再是每个task一个数据副本,而是该数据副本只在Execuort存放一份,由Task读取
注意:只读不写
广播变量
这样就极大的减少资源占用

工作中RDD的分区数一般指定为任务cpu总核数*2~3
假设服务器总核数为100个。
若分区数=50个,就会造成50个cpu浪费,得不到应用。
若分区数=1000个,就会造成时间片来回切换,导致服务器性能下降。
若分区数=100个,虽然同一时间确实可以一同工作,但是有些任务小处理速度比较快,很快就完成了,然后又会处于空闲状态,得不到利用。
所以设置分区数为 cpu总核数 的2~3倍比较合理,不仅不会造成资源浪费,也不会导致服务器性能下降。

使用广播变量步骤:

(1)调用SparkContext.broadcast(广播变量)创建出一个广播对象,任何可序列化的类型都可以这么实现。
(2)通过广播变量.value,访问该对象的值。
(3)变量只会被发到各个节点一次,作为只读值处理(修改这个值不会影响到别的节点)。

广播变量的好处

场景:

  1. 大表join小表
  2. task使用driver数据的时候
    原因: 当数据在Driver中,后续所有task都需要该数据的时候,spark默认向每个task都发送一份该数据,此时数据占用的总内存大小 = task个数 * 数据大小,此时占用的空间比较大

好处:

  1. 减少内存占用: 可以将该数据广播出去, 数据广播之后,会将该数据发送到task所在的executor中,后续task需要数据的时候从executor获取即可,任务运行过程中, 数据占用的总内存大小 = executor个数 * 数据大小
  2. 大小join小表的时候,将小表广播出去能够减少shuffle操作

如何使用:

  1. 广播数据: val bc = sc.broadcast(数据)
  2. 取出广播的值: bc.value

案例:

大表join小表 的案例演示

不使用广播变量

 @Test
  def mapJoin(): Unit ={
    val conf =new SparkConf().setMaster("local[*]").setAppName("test")
    val sc=new SparkContext(conf)

    //大表 学生表 (班级id->(学号,姓名,年龄,性别))
    val studentList=List(
      1->("1001","张三",18,"男"),
      2->("1002","赵四",16,"男"),
      1->("1003","翠花",17,"女"),
      3->("1004","福来",16,"男"),
      2->("1005","牛儿娃",18,"男"),
      1->("1007","妞妞",17,"女"),
      3->("1008","岳岳",19,"男")
    )

    // 小表 班级表
    val classList=List(
      1->"一年级一班",
      2->"一年级二班",
      3->"一年级三班",
    )

    // 大表
    val maxJoinRdd= sc.parallelize(studentList,2)
    // 小表
    val minJoinRdd =sc.parallelize(classList,2)

    //关联
    val data: RDD[(Int, ((String, String, Int, String), Option[String]))] = maxJoinRdd.leftOuterJoin(minJoinRdd)

    data.foreach({
      case (classId, ((stuNO, stuName, age, sex), className))=>{
        println(s"班级id:$classId,班级名称:$className -> 学号:$stuNO,姓名:$stuName,年龄:$age,性别:$sex")
      }
    })
  }

运行结果

班级id:1,班级名称:Some(一年级一班) -> 学号:1001,姓名:张三,年龄:18,性别:男
班级id:2,班级名称:Some(一年级二班) -> 学号:1002,姓名:赵四,年龄:16,性别:男
班级id:1,班级名称:Some(一年级一班) -> 学号:1003,姓名:翠花,年龄:17,性别:女
班级id:2,班级名称:Some(一年级二班) -> 学号:1005,姓名:牛儿娃,年龄:18,性别:男
班级id:1,班级名称:Some(一年级一班) -> 学号:1007,姓名:妞妞,年龄:17,性别:女
班级id:3,班级名称:Some(一年级三班) -> 学号:1004,姓名:福来,年龄:16,性别:男
班级id:3,班级名称:Some(一年级三班) -> 学号:1008,姓名:岳岳,年龄:19,性别:男

看看 web-ui
6个task

image.png

一个join竟然产生了两个stage(本身就有一个)

image.png

注意 Storage Memory

image.png

使用广播变量

  @Test
  def mapJoin3(): Unit ={
    val conf =new SparkConf().setMaster("local[*]").setAppName("test")
    val sc=new SparkContext(conf)

    //大表 学生表 (班级id->(学号,姓名,年龄,性别))
    val studentList=List(
      1->("1001","张三",18,"男"),
      2->("1002","赵四",16,"男"),
      1->("1003","翠花",17,"女"),
      3->("1004","福来",16,"男"),
      2->("1005","牛儿娃",18,"男"),
      1->("1007","妞妞",17,"女"),
      3->("1008","岳岳",19,"男")
    )

    // 小表 班级表
    val classList=List(
      1->"一年级一班",
      2->"一年级二班",
      3->"一年级三班",
    )

    val classMap: Broadcast[Map[Int, String]] = sc.broadcast(classList.toMap)

    // 大表
    val maxJoinRdd= sc.parallelize(studentList,2)

    // 关联
    val data: RDD[(Int, ((String, String, Int, String), String))] = maxJoinRdd.map(e=>{
      // 通过.value 取出广播变量的值
      val value: Map[Int, String]= classMap.value
      (e._1,(e._2,value.getOrElse(e._1,"")))
    })

    // 打印
    data.foreach({
      case (classId, ((stuNO, stuName, age, sex), className))=>{
        println(s"班级id:$classId,班级名称:$className -> 学号:$stuNO,姓名:$stuName,年龄:$age,性别:$sex")
      }
    })
  }

打印结果

班级id:1,班级名称:一年级一班 -> 学号:1001,姓名:张三,年龄:18,性别:男
班级id:3,班级名称:一年级三班 -> 学号:1004,姓名:福来,年龄:16,性别:男
班级id:2,班级名称:一年级二班 -> 学号:1002,姓名:赵四,年龄:16,性别:男
班级id:2,班级名称:一年级二班 -> 学号:1005,姓名:牛儿娃,年龄:18,性别:男
班级id:1,班级名称:一年级一班 -> 学号:1003,姓名:翠花,年龄:17,性别:女
班级id:1,班级名称:一年级一班 -> 学号:1007,姓名:妞妞,年龄:17,性别:女
班级id:3,班级名称:一年级三班 -> 学号:1008,姓名:岳岳,年龄:19,性别:男

两个task

image.png

一个stage

image.png

Executor 占用比上面的高许多,因为小表资源都存到Executor中了。

image.png

你是否对这段代码由疑问?要是这样写,不用join算子也可以呀(主要我有疑问,不懂,我可以测试嘛)。

    // 关联
    val data: RDD[(Int, ((String, String, Int, String), String))] = maxJoinRdd.map(e=>{
      val value: Map[Int, String]= classMap.value
      (e._1,(e._2,value.getOrElse(e._1,"")))
    })
  @Test
  def mapJoin4(): Unit ={
    val conf =new SparkConf().setMaster("local[*]").setAppName("test")
    val sc=new SparkContext(conf)

    //大表 学生表 (班级id->(学号,姓名,年龄,性别))
    val studentList=List(
      1->("1001","张三",18,"男"),
      2->("1002","赵四",16,"男"),
      1->("1003","翠花",17,"女"),
      3->("1004","福来",16,"男"),
      2->("1005","牛儿娃",18,"男"),
      1->("1007","妞妞",17,"女"),
      3->("1008","岳岳",19,"男")
    )

    // 小表 班级表
    val classList=List(
      1->"一年级一班",
      2->"一年级二班",
      3->"一年级三班",
    )

    val classMap=classList.toMap

    // 大表
    val maxJoinRdd= sc.parallelize(studentList,2)

    // 关联
    val data: RDD[(Int, ((String, String, Int, String), String))] = maxJoinRdd.map(e=>{
      val value: Map[Int, String]= classMap
      (e._1,(e._2,value.getOrElse(e._1,"")))
    })

    data.foreach({
      case (classId, ((stuNO, stuName, age, sex), className))=>{
        println(s"班级id:$classId,班级名称:$className -> 学号:$stuNO,姓名:$stuName,年龄:$age,性别:$sex")
      }
    })
 }
image.png
image.png
image.png

咦?好像没啥呢?也只有一个job,一个stage。除了Executor不存储资源,感觉和第二个案例没啥区别。

但是第二个案例和第三个案例还是有区别的,小表定义在Driver若没有进行广播,那么会每个Task都会有一份小表数据,若Task很多资源占用也会增大。数据广播出去之后,数据只有Executor持有,其他Task只需要读取这一份数据即可,减少资源占用。

第二个案例之所以需要那么样写,主要还是因为join 是spark的操作,类型必须为RDD数据类型。

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = self.withScope {
    leftOuterJoin(other, defaultPartitioner(self, other))
}

对于广播变量这点一定要想明白。

明白为什么需要广播变量,其实不用也可以,用了能给我带来什么好处。

大表join小表时,需要根据连接的key给两个表的数据分组聚合。广播小表之后,每个executor都有小表数据,每个executor中的大表数据可以直接根据广播的小表中相同key聚合,不用shuffle。

相关文章

  • Spark 之广播变量

    1. Background Spark 中有两种共享变量,其中一个是累加器,另一个是广播变量。前者解决了 Spar...

  • Spark之广播变量

    什么是广播变量 广播变量:分布式共享只读变量。广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,...

  • Spark-broadcast

    参见Spark相关--共享变量-广播变量-broadcast

  • spark广播变量

  • Spark广播变量

    原文链接

  • Spark—广播变量

    广播变量 Spark有两种共享变量——累加器、广播变量。广播变量可以让程序高效地向所有工作节点发送一个较大的只读值...

  • spark广播变量

    广播变量的好处:如果你的算子函数中,使用到了特别大的数据,那么,这个时候,推荐将该数据进行广播。这样的话,就不至于...

  • Spark的广播变量机制

    Spark广播变量 什么是广播变量? 在同一个Execute共享同一份计算逻辑的变量 广播变量使用场景 我现在要在...

  • spark使用广播变量

  • Spark广播变量应用

    一、广播变量 1、广播变量的优点 不需要每个task带上一份变量副本,而是变成每个节点的executor存一份副本...

网友评论

      本文标题:Spark之广播变量

      本文链接:https://www.haomeiwen.com/subject/suwwultx.html