一、广播变量
1、广播变量的优点
不需要每个task带上一份变量副本,而是变成每个节点的executor存一份副本。这样的话, 就可以让变量产生的副本数量大大减少。
2、广播变量的用法
//将mapRdd广播后返回broadcastValue
val broadcastValue: Broadcast[Array[(String, String)]] = sc.broadcast(mapRdd)
//获取广播变量的值
val getBroadCastMap: Map[String, String] = broadcastValue.value.toMap
3、广播变量的原理
初始的时候,在Driver端有一个副本数据。广播变量后,task运行的时候,在使用副本数据前,首先在所在本地Executor对应的BlockManager中,尝试获取副本数据;如果本地没有,即从Driver端拉取副本数据,并且保存在所在本地的BlockManager中;此后这个Executor上所有的task,都会直接使用本地BlockManager中的副本数据。另Executor的BlockManager除了从Driver端拉取数据,也可能从其他节点的BlockManager中拉去副本数据。
BlockManager:负责管理某个Executor对应的内存和磁盘的数据,尝试本地BlockManager中招map数据。
4、优化说明
假设有50个Executor,共1000个task;若每个map数据10M。默认情况下,1000个副本10M共10G数据。在集群中,通过网络传输,耗费10G的内存资源;如果使用了广播变量,50个Executor即50个副本10M共500M数据。而且Executor的BlockManager不一定都从Driver传输到本地,还可能从最近的节点的Executor的BlockManager中拉取数据,网络传输速度大大增加,传输数据大大减少。
10G/500M=20倍,极大的提高了性能。
二、代码实例
1、准备数据
//订单数据
1001,20150710,p0001,2
1002,20150710,p0002,3
1002,20150710,p0003,3
//产品数据
p0001,xiaomi,1000,2
p0002,appale,1000,3
p0003,samsung,1000,4
2、代码开发
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SparkBroadCast {
def main(args: Array[String]): Unit = {
//构造Spark程序执行环境
val conf = new SparkConf().setAppName("appName").setMaster("local[*]")
//如果集群运行,则不需要设置setMaster("local[*]")
val sc = new SparkContext(conf)
//设置日志级别
sc.setLogLevel("WARN")
//创建RDD,读取产品信息数据
//产品记录样例:p0001,xiaomi,1000,2
val productRdd: RDD[String] = sc.textFile(path = "D:\\03 Knowledge Related\\02 Learning\\02 Bigdata\\00 kkb\\02 Projects\\IDEA_WORKSPACE\\bigdata_hadoop\\Spark_core\\src\\main\\resources\\prts.txt")
val productMapRdd: Array[(String, String)] = productRdd.map(x => {
(x.split(",")(0), x)
}).collect()
// productMapRdd.foreach(println)
/**
* (p0001,p0001,xiaomi,1000,2)
* (p0002,p0002,appale,1000,3)
* (p0003,p0003,samsung,1000,4)
*/
//将产品数据作为广播变量
val broadcastValue: Broadcast[Array[(String, String)]] = sc.broadcast(productMapRdd)
//读取订单记录:1001,20150710,p0001,2
val ordersRdd: RDD[String] = sc.textFile(path = "D:\\03 Knowledge Related\\02 Learning\\02 Bigdata\\00 kkb\\02 Projects\\IDEA_WORKSPACE\\bigdata_hadoop\\Spark_core\\src\\main\\resources\\orders.txt")
//将订单记录按照分区处理
val productAndOrderRdd: RDD[String] = ordersRdd.mapPartitions(eachPartition => {
//获取产品广播变量的数据并转换为map类型,目的是通过getOrElse获取产品数据
val getBroadCastMap: Map[String, String] = broadcastValue.value.toMap
//处理分区内的订单数据记录
val finalStr = eachPartition.map(eachLine => {
//将每条订单记录按照逗号拆分,返回集合类型
val ordersGet: Array[String] = eachLine.split(",")
//产品的map类型,通过key(订单的产品id)获取对应的产品记录,返回产品数据记录
val getProductStr: String = getBroadCastMap.getOrElse(ordersGet(2), "")
//订单记录拼接产品记录
eachLine + "\t" + getProductStr
})
finalStr
})
productAndOrderRdd.foreach(println)
/**
* 1001,20150710,p0001,2 p0001,xiaomi,1000,2
* 1002,20150710,p0002,3 p0002,appale,1000,3
* 1002,20150710,p0003,3 p0003,samsung,1000,4
*/
//关闭Spark环境
sc.stop()
}
}
三、注意事项
- 能不能将一个RDD使用广播变量广播出去?
不能,因为RDD是不存储数据的。可以将RDD的结果广播出去。 - 广播变量只能在Driver端定义,不能在Executor端定义。
- 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。
- 当Executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。
- 当Executor端用到了Driver的变量,如果使用广播变量在每个Executor中只有一份Driver端的变量副本。
网友评论