什么是广播变量
广播变量:分布式共享只读变量。
广播变量用来高效分发较大
的对象。向所有工作节点发送一个较大的只读
值,以供一个或多个Spark操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark会为每个任务分别发送。
为什么需要广播变量
假设有一个集合数据量为500M,程序运行使用调用Action算子
,任务会以task
的形式发送到Executor
中执行的。task
会拿到这个数据的副本,并且每个task
都会有一个副本。这样有什么问题呢?若task数量过大,资源占用就越高,Execuort的资源占用=task数量*副本数据量
。

如上图,其实数据量只有500M,但是每个
Execuort
需要占用1000M的资源空间。若task
更多,资源占用便更多。若数据量小还没什么,但是数据量一大,那就比较致命了,所以
广播变量
出现了,就很好的解决了该问题。使用广播变量之后,数据副本将不再是每个task
一个数据副本,而是该数据副本只在Execuort
存放一份,由Task
去读取
。注意:
只读不写
。
这样就极大的减少资源占用
工作中RDD的分区数
一般指定为任务cpu总核数*2~3
个
假设服务器总核数为100个。
若分区数=50个,就会造成50个cpu浪费,得不到应用。
若分区数=1000个,就会造成时间片来回切换,导致服务器性能下降。
若分区数=100个,虽然同一时间确实可以一同工作,但是有些任务小处理速度比较快,很快就完成了,然后又会处于空闲状态,得不到利用。
所以设置分区数为 cpu总核数 的2~3倍比较合理,不仅不会造成资源浪费,也不会导致服务器性能下降。
使用广播变量步骤:
(1)调用SparkContext.broadcast(广播变量)创建出一个广播对象,任何可序列化的类型都可以这么实现。
(2)通过广播变量.value
,访问该对象的值。
(3)变量只会被发到各个节点一次,作为只读值处理(修改这个值不会影响到别的节点)。
广播变量的好处
场景:
- 大表join小表
- task使用driver数据的时候
原因: 当数据在Driver中,后续所有task都需要该数据的时候,spark默认向每个task都发送一份该数据,此时数据占用的总内存大小 = task个数 * 数据大小,此时占用的空间比较大
好处:
- 减少内存占用: 可以将该数据广播出去, 数据广播之后,会将该数据发送到task所在的executor中,后续task需要数据的时候从executor获取即可,任务运行过程中, 数据占用的总内存大小 = executor个数 * 数据大小
- 大小join小表的时候,将小表广播出去能够减少shuffle操作
如何使用:
- 广播数据: val bc = sc.broadcast(数据)
- 取出广播的值: bc.value
案例:
大表join小表 的案例演示
不使用广播变量
@Test
def mapJoin(): Unit ={
val conf =new SparkConf().setMaster("local[*]").setAppName("test")
val sc=new SparkContext(conf)
//大表 学生表 (班级id->(学号,姓名,年龄,性别))
val studentList=List(
1->("1001","张三",18,"男"),
2->("1002","赵四",16,"男"),
1->("1003","翠花",17,"女"),
3->("1004","福来",16,"男"),
2->("1005","牛儿娃",18,"男"),
1->("1007","妞妞",17,"女"),
3->("1008","岳岳",19,"男")
)
// 小表 班级表
val classList=List(
1->"一年级一班",
2->"一年级二班",
3->"一年级三班",
)
// 大表
val maxJoinRdd= sc.parallelize(studentList,2)
// 小表
val minJoinRdd =sc.parallelize(classList,2)
//关联
val data: RDD[(Int, ((String, String, Int, String), Option[String]))] = maxJoinRdd.leftOuterJoin(minJoinRdd)
data.foreach({
case (classId, ((stuNO, stuName, age, sex), className))=>{
println(s"班级id:$classId,班级名称:$className -> 学号:$stuNO,姓名:$stuName,年龄:$age,性别:$sex")
}
})
}
运行结果
班级id:1,班级名称:Some(一年级一班) -> 学号:1001,姓名:张三,年龄:18,性别:男
班级id:2,班级名称:Some(一年级二班) -> 学号:1002,姓名:赵四,年龄:16,性别:男
班级id:1,班级名称:Some(一年级一班) -> 学号:1003,姓名:翠花,年龄:17,性别:女
班级id:2,班级名称:Some(一年级二班) -> 学号:1005,姓名:牛儿娃,年龄:18,性别:男
班级id:1,班级名称:Some(一年级一班) -> 学号:1007,姓名:妞妞,年龄:17,性别:女
班级id:3,班级名称:Some(一年级三班) -> 学号:1004,姓名:福来,年龄:16,性别:男
班级id:3,班级名称:Some(一年级三班) -> 学号:1008,姓名:岳岳,年龄:19,性别:男
看看 web-ui
6个task
image.png
一个join竟然产生了两个stage(本身就有一个)
image.png
注意 Storage Memory
image.png
使用广播变量
@Test
def mapJoin3(): Unit ={
val conf =new SparkConf().setMaster("local[*]").setAppName("test")
val sc=new SparkContext(conf)
//大表 学生表 (班级id->(学号,姓名,年龄,性别))
val studentList=List(
1->("1001","张三",18,"男"),
2->("1002","赵四",16,"男"),
1->("1003","翠花",17,"女"),
3->("1004","福来",16,"男"),
2->("1005","牛儿娃",18,"男"),
1->("1007","妞妞",17,"女"),
3->("1008","岳岳",19,"男")
)
// 小表 班级表
val classList=List(
1->"一年级一班",
2->"一年级二班",
3->"一年级三班",
)
val classMap: Broadcast[Map[Int, String]] = sc.broadcast(classList.toMap)
// 大表
val maxJoinRdd= sc.parallelize(studentList,2)
// 关联
val data: RDD[(Int, ((String, String, Int, String), String))] = maxJoinRdd.map(e=>{
// 通过.value 取出广播变量的值
val value: Map[Int, String]= classMap.value
(e._1,(e._2,value.getOrElse(e._1,"")))
})
// 打印
data.foreach({
case (classId, ((stuNO, stuName, age, sex), className))=>{
println(s"班级id:$classId,班级名称:$className -> 学号:$stuNO,姓名:$stuName,年龄:$age,性别:$sex")
}
})
}
打印结果
班级id:1,班级名称:一年级一班 -> 学号:1001,姓名:张三,年龄:18,性别:男
班级id:3,班级名称:一年级三班 -> 学号:1004,姓名:福来,年龄:16,性别:男
班级id:2,班级名称:一年级二班 -> 学号:1002,姓名:赵四,年龄:16,性别:男
班级id:2,班级名称:一年级二班 -> 学号:1005,姓名:牛儿娃,年龄:18,性别:男
班级id:1,班级名称:一年级一班 -> 学号:1003,姓名:翠花,年龄:17,性别:女
班级id:1,班级名称:一年级一班 -> 学号:1007,姓名:妞妞,年龄:17,性别:女
班级id:3,班级名称:一年级三班 -> 学号:1008,姓名:岳岳,年龄:19,性别:男
两个task
image.png
一个stage
image.png
Executor 占用比上面的高许多,因为小表资源都存到Executor
中了。
image.png
你是否对这段代码由疑问?要是这样写,不用join算子也可以呀(主要我有疑问,不懂,我可以测试嘛)。
// 关联
val data: RDD[(Int, ((String, String, Int, String), String))] = maxJoinRdd.map(e=>{
val value: Map[Int, String]= classMap.value
(e._1,(e._2,value.getOrElse(e._1,"")))
})
@Test
def mapJoin4(): Unit ={
val conf =new SparkConf().setMaster("local[*]").setAppName("test")
val sc=new SparkContext(conf)
//大表 学生表 (班级id->(学号,姓名,年龄,性别))
val studentList=List(
1->("1001","张三",18,"男"),
2->("1002","赵四",16,"男"),
1->("1003","翠花",17,"女"),
3->("1004","福来",16,"男"),
2->("1005","牛儿娃",18,"男"),
1->("1007","妞妞",17,"女"),
3->("1008","岳岳",19,"男")
)
// 小表 班级表
val classList=List(
1->"一年级一班",
2->"一年级二班",
3->"一年级三班",
)
val classMap=classList.toMap
// 大表
val maxJoinRdd= sc.parallelize(studentList,2)
// 关联
val data: RDD[(Int, ((String, String, Int, String), String))] = maxJoinRdd.map(e=>{
val value: Map[Int, String]= classMap
(e._1,(e._2,value.getOrElse(e._1,"")))
})
data.foreach({
case (classId, ((stuNO, stuName, age, sex), className))=>{
println(s"班级id:$classId,班级名称:$className -> 学号:$stuNO,姓名:$stuName,年龄:$age,性别:$sex")
}
})
}
image.png
image.png
image.png
咦?好像没啥呢?也只有一个job,一个stage。除了Executor
不存储资源,感觉和第二个案例没啥区别。
但是第二个案例和第三个案例还是有区别的,小表定义在Driver
若没有进行广播,那么会每个Task
都会有一份小表数据,若Task
很多资源占用也会增大。数据广播出去之后,数据只有Executor
持有,其他Task
只需要读取这一份数据即可,减少资源占用。
第二个案例之所以需要那么样写,主要还是因为join
是spark的操作,类型必须为RDD
数据类型。
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = self.withScope {
leftOuterJoin(other, defaultPartitioner(self, other))
}
对于广播变量
这点一定要想明白。
明白为什么需要广播变量
,其实不用也可以,用了能给我带来什么好处。
大表join小表时,需要根据连接的key给两个表的数据分组聚合。广播小表之后,每个executor都有小表数据,每个executor中的大表数据可以直接根据广播的小表中相同key聚合,不用shuffle。
网友评论