美文网首页
Spark利用广播变量简化大表和小表的join操作

Spark利用广播变量简化大表和小表的join操作

作者: alexlee666 | 来源:发表于2019-10-24 20:38 被阅读0次

一、问题

两个RDD进行join操作(即 rdd1.join(rdd2)) 会导致shuffle,这是因为join操作会对key一致的key-vlaue对进行合并,而** key相同的key-value对不太可能会在同一个partition, 因此很有可能是需要进行经过网络进行shuffle的,而shuffle会产生许多中间数据(小文件)并涉及到网络传输,这些通常比较耗时,Spark中要尽量避免shuffle


二、解决方案

优化方法:将小RDD的数据通过broadcast到每个executor中,各大RDD partition分别和小RDD做join操作
具体是:在driver端将小RDD转换成数组array并broadcast到各executor端,然后再各executor task中对各partion的大RDD的key-value对和小rdd的key-value对进行join;由于每个executor端都有完整的小RDD,因此小RDD的各partition不需要shuffle到RDD的各partition,小RDD广播到大RDD的各partition后,各partition分别进行join,最后再执行reduce所有分区的join结果汇总到driver端

利用广播变量简化大表和小表的join操作

三、业务代码

import org.apache.spark.sql.SparkSession

object BigRDDJoinSmallRDD {

  def main(args: Array[String]): Unit = {

    val sparkSession = SparkSession.builder().master("local[3]").appName("BigRDD Join SmallRDD").getOrCreate()
    val sc = sparkSession.sparkContext
    val list1 = List(("jame",23), ("wade",3), ("kobe",24))
    val list2 = List(("jame", 13), ("wade",6), ("kobe",16))
    val bigRDD = sc.makeRDD(list1)
    val smallRDD = sc.makeRDD(list2)

    println(bigRDD.getNumPartitions)
    println(smallRDD.getNumPartitions)

    // driver端rdd不broadcast广播smallRDD到各executor,RDD不能被broadcast,需要转换成数组array
    val  smallRDDB= sc.broadcast(smallRDD.collect())

    val joinedRDD = bigRDD.mapPartitions(partition => {
      val smallRDDBV = smallRDDB.value  // 各个executor端的task读取广播value
      partition.map(element => {
        //println(joinUtil(element, smallRDDBV))
        joinUtil(element, smallRDDBV)
      })
    })
    joinedRDD.foreach(x => println(x))



  }


/**
  * join操作:对两个rdd中的相同key的value1和value2进行聚合,即(key,value1).join(key,value2)得到(key,(value1, vlaue2))
  * 如果bigRDDEle的key和smallRDD的某个key一致,那么返回(key,(value1, vlaue2))
  * 该方法会在各executor的task上执行
  * */
 def joinUtil(bigRDDEle:(String,Int), smallRDD: Array[(String, Int)]): (String, (Int,Int)) = {
   var joinEle:(String, (Int, Int)) = null
   // 遍历数组smallRDD
   smallRDD.foreach(smallRDDEle => {
      if(smallRDDEle._1.equals(bigRDDEle._1)){
        // 如果bigRDD中某个元素的key和数组smallRDD的key一致,返回join结果
        joinEle = (bigRDDEle._1, (bigRDDEle._2, smallRDDEle._2))
      }
    })
   joinEle
 }

}

如有错误,敬请指正!

相关文章

网友评论

      本文标题:Spark利用广播变量简化大表和小表的join操作

      本文链接:https://www.haomeiwen.com/subject/ectumctx.html