美文网首页大数据玩转大数据
Spark Core - 高效的使用 RDD join

Spark Core - 高效的使用 RDD join

作者: 希尔大 | 来源:发表于2019-02-09 16:02 被阅读4次

    Spark 作为分布式的计算框架,最为影响其执行效率的地方就是频繁的网络传输。所以一般的,在不存在数据倾斜的情况下,想要提高 Spark job 的执行效率,就尽量减少 job 的 shuffle 过程(减少 job 的 stage),或者退而减小 shuffle 带来的影响,join 操作也不例外。

    所以,针对 spark RDD 的 join 操作的使用,提供一下几条建议:

    1. 尽量减少参与 join 的 RDD 的数据量。
    2. 尽量避免参与 join 的 RDD 都具有重复的key。
    3. 尽量避免或者减少 shuffle 过程。
    4. 条件允许的情况下,使用 map-join 完成 join。

    我们来举个例子,现在一共有两个 RDD,一个是元素为 (String, Double) 类型的 userScoresRDD, 其 key 代表用户 id,其 value 代表用户游戏的历史分数,id 与分数为一对多的关系。另一个为元素为 (String, String) 的 userMobileRDD, 其 key 代表用户的id,value 代表用户的手机号码。我们现在需要得到每个用户的最高分以及其手机号,使得可以使用短信的方式向每个用户告知其最高的游戏记录(发短信有些浪费了)。

    尽量减少参与 join 的 RDD 的数据量

    按照套路,先举个反例,如下:

    代码 1

      def joinGetUserBestScoreWithMobile1(userScoresRDD: RDD[(String, Double)],
                                         userMobileRDD: RDD[(String, String)])
                                            : RDD[(String, (Double, String))] = {
        val userScoreAndMobile = userScoresRDD.join(userMobileRDD)
        userScoreAndMobile.reduceByKey((x, y) => if (x._1 > y._1) x else y)
      }
    

    在上面的例子中,先进行的 join 操作,在用户的每条游戏记录上都添加了一枚手机号,然后在带着手机号的 RDD 上通过 reduceByKey 得到每个用户最高分已经手机号。

    这样做明显会影响效率,我们明显可以先算出每个用户的最高分,然后在去得到他的手机号:

    代码 2

      def joinGetUserBestScoreWithMobile2(userScoresRDD: RDD[(String, Double)],
                                          userMobilesRDD: RDD[(String, String)])
                                            : RDD[(String, (Double, String))] = {
        val userBestScore = userScoresRDD.reduceByKey((x, y) => if (x > y) x else y)
        userBestScore.join(userMobilesRDD)
      }
    

    两种都使用的reduceByKey,但后者会明显减少参与 join 操作的数据量,即减少了shuffle 的时间,又减少了计算的时间,增加效率,降低了数据的冗余。

    尽量避免参与 join 的 RDD 都具有重复的key

    此条建议是为了避免发生两个RDD full join 而笛卡尔积的情况。

    在我们的例子中,假如每个用户都拥有多个手机号,为了避免 full join 而使数据暴增,我们可以在代码2的基础上,先对 userMobilesRDD 使用 combileByKey 进行处理,减少重复的 key。

    尽量避免或者减少 shuffle 过程

    Join 怎么才能避免或减少 shuffle 操作呢? 我们知道只有父子RDD的依赖关系为宽依赖的时候,才会发生shuffle,所以关键就是控制父子RDD的依赖关系。join 操作有两个父RDD(即被join的RDD),一个子RDD(join后的结果),首先需要了解一下join操作时依赖的判断过程。下面即为过程源代码:

    Spark 源代码 org.apache.spark.rdd.CoGroupedRDD

      override def getDependencies: Seq[Dependency[_]] = {
        rdds.map { rdd: RDD[_] =>
          if (rdd.partitioner == Some(part)) {
            logDebug("Adding one-to-one dependency with " + rdd)
            new OneToOneDependency(rdd)
          } else {
            logDebug("Adding shuffle dependency with " + rdd)
            new ShuffleDependency[K, Any, CoGroupCombiner](
              rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer)
          }
        }
      }
    

    其中 part 为 join 所使用的分区器,rdds 为参加 join 的RDD。通过代码,我们就可以了解到,当父RDD与 join操作 使用相同的分区器的时候,父子RDD才会建立窄依赖(OneToOneDependency)关系,否则就使用宽依赖关系,并且 shuffle 使用join操作的分区器来进行分区。

    所以最差情况下,如下图一,两个父RDD的分区器与 join 使用的分区都不相同(一般是父RDD的分区器都为 None),两个父RDD到子RDD,都会进行shuffle操作:

    图一

    好一点的情况,如下图二,即只有一个父RDD的分区器与 join操作 所使用的相同。这样只会在一个RDD上发生 shuffle。

    图二

    最后就是最完美的情况,两个父RDD的分区器都与join操作使用的分区器相同。如下图三,不会发生任何shuffle操作:

    图三

    所以,我们可以实际情况,至少减少一次不必要的 shuffle 操作。

    下一步我们要做的就是指定父RDD与 join 操作的的分区器为相同的。我们知道,许多的宽依赖操作都可以为其指定分区器,以决定其生成的RDD所使用的分区器,比如 reduceByKey。当然 join 操作也例外,所以我们可以在 join 的时候传入指定的分区器,这样来达到我们想要减少 shuffle 的目的。但是,当我们为两个父RDD指定了相同的分区器的时候,就不需要再为 join 操作传入指定的分区器,这是因为join操作会拿到两个父RDD的中分区器中分区数多的那个分区器作为默认分区器。

    关于 join 操作获取默认分区器的详细,具体请看源代码(org.apache.spark.Partitioner 的 defaultPartitioner)

    实践一下

    让我们回到我们的例子,可以发现我们在使用 reduceByKey 生成 userBestScoreRDD 的时候,使用的是 userMobilesRDD 的分区器(或者在 join 时将要被使用的分区器)。

      def joinGetUserBestScoreWithMobile4(userScoresRDD: RDD[(String, Double)],
                                          userMobilesRDD: RDD[(String, String)]): RDD[(String, (Double, Option[String]))] = {
        // 如果 userMobilesRDD 存在已知的 partitioner,就直接获取
        // 没有就构建返回 userMobilesRDD 将要默认使用的 HashPartitioner.
        val mobileRDDPartitioner = userMobilesRDD.partitioner match {
          case (Some(p)) => p
          case (None) => new HashPartitioner(userMobilesRDD.partitions.length)
        }
        //
        val userBestScoreRDD = userScoresRDD.reduceByKey(mobileRDDPartitioner,
                                                   (x,y) => if (x > y) x else y)
        // 在做 join 的时候。至少省去了一次 shuffle 的所带来的代价。
        userBestScoreRDD.join(userMobilesRDD)
      }
    

    仔细分析的话,在整个joinGetUserBestScoreWithMobile4方法里,相比于之前的代码示例,我们至少减少了一次shuffle操作。这取决于userMobilesRDD的分区器情况。如果userMobilesRDD没有分区器(为None),则userMobilesRDD在参与join的时候会进行 shuffle 操作,而userBestScoreRDD则不会发生shuffle操作。则一共的shuffle次数为2(加上 reduceByKey 一次).这也就是我们所说的“好一点的情况”。

    如果userMobilesRDD已经有了分区器,则 userMobilesRDD 与 userBestScoreRDD 在join的时候都不需要shuffle,所以仅仅 reduceByKey 进行了一次shuffle.这也就是我们所说的“完美情况”。

    两个分区器怎样才叫做相同,具体要看分区器 equals 方法的实现,以HashPartitioner为例,分区数相同,分区器就相同。

    条件允许的情况下,使用 map-join 完成 join

    Map join 想必都很熟悉,就不在写介绍了。Spark core 没有提供 map-join 的实现,具体的实现方案就是将小的 RDD 持久化到driver中后,广播到大RDD的各个分区中,自己实现 join 操作。较为通用的代码如下:

      def manualBroadCastHashJoin[K: ClassTag, V1: ClassTag, V2: ClassTag](
                                          smallRDD: RDD[(K, V1)],
                                          bigRDD: RDD[(K, V2)],
                                          sc: SparkContext): RDD[(K, (V1, V2))] = {
        val smallDataLocaled: Map[K, V1] = smallRDD.collectAsMap()
        bigRDD.sparkContext.broadcast(smallDataLocaled)
    
        bigRDD.mapPartitions(p => {
          p.flatMap {
            case (k, v2) =>
              smallDataLocaled.get(k) match {
                case None => Seq.empty[(K, (V1, V2))]
                case Some(v1) => Seq((k, (v1, v2)))
              }
          }
        }, preservesPartitioning = true)
      }
    

    End!!

    相关文章

      网友评论

        本文标题:Spark Core - 高效的使用 RDD join

        本文链接:https://www.haomeiwen.com/subject/kyfzsqtx.html