一、问题
两个RDD进行join操作(即 rdd1.join(rdd2)) 会导致shuffle,这是因为join操作会对key一致的key-vlaue对进行合并,而** key相同的key-value对不太可能会在同一个partition, 因此很有可能是需要进行经过网络进行shuffle的,而shuffle会产生许多中间数据(小文件)并涉及到网络传输,这些通常比较耗时,Spark中要尽量避免shuffle。
二、解决方案
优化方法:将小RDD的数据通过broadcast到每个executor中,各大RDD partition分别和小RDD做join操作。
具体是:在driver端将小RDD转换成数组array并broadcast到各executor端,然后再各executor task中对各partion的大RDD的key-value对和小rdd的key-value对进行join;由于每个executor端都有完整的小RDD,因此小RDD的各partition不需要shuffle到RDD的各partition,小RDD广播到大RDD的各partition后,各partition分别进行join,最后再执行reduce,所有分区的join结果汇总到driver端。
![](https://img.haomeiwen.com/i13578911/ad35f0986bd56e63.png)
三、业务代码
import org.apache.spark.sql.SparkSession
object BigRDDJoinSmallRDD {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().master("local[3]").appName("BigRDD Join SmallRDD").getOrCreate()
val sc = sparkSession.sparkContext
val list1 = List(("jame",23), ("wade",3), ("kobe",24))
val list2 = List(("jame", 13), ("wade",6), ("kobe",16))
val bigRDD = sc.makeRDD(list1)
val smallRDD = sc.makeRDD(list2)
println(bigRDD.getNumPartitions)
println(smallRDD.getNumPartitions)
// driver端rdd不broadcast广播smallRDD到各executor,RDD不能被broadcast,需要转换成数组array
val smallRDDB= sc.broadcast(smallRDD.collect())
val joinedRDD = bigRDD.mapPartitions(partition => {
val smallRDDBV = smallRDDB.value // 各个executor端的task读取广播value
partition.map(element => {
//println(joinUtil(element, smallRDDBV))
joinUtil(element, smallRDDBV)
})
})
joinedRDD.foreach(x => println(x))
}
/**
* join操作:对两个rdd中的相同key的value1和value2进行聚合,即(key,value1).join(key,value2)得到(key,(value1, vlaue2))
* 如果bigRDDEle的key和smallRDD的某个key一致,那么返回(key,(value1, vlaue2))
* 该方法会在各executor的task上执行
* */
def joinUtil(bigRDDEle:(String,Int), smallRDD: Array[(String, Int)]): (String, (Int,Int)) = {
var joinEle:(String, (Int, Int)) = null
// 遍历数组smallRDD
smallRDD.foreach(smallRDDEle => {
if(smallRDDEle._1.equals(bigRDDEle._1)){
// 如果bigRDD中某个元素的key和数组smallRDD的key一致,返回join结果
joinEle = (bigRDDEle._1, (bigRDDEle._2, smallRDDEle._2))
}
})
joinEle
}
}
如有错误,敬请指正!
网友评论