美文网首页
spark outer join push down filte

spark outer join push down filte

作者: 鸿乃江边鸟 | 来源:发表于2021-11-11 09:06 被阅读0次

背景

对于spark的不同的join操作来说,有些谓词是能下推,是有谓词是不能下推的,今天我们实地操作一番,从现象和源码级别分析一下,这到底是怎么回事。

版本&环境

spark 3.2.0
macbook pro

理论基础

1. 参考hive OuterJoinBehavior

我们解释一下几个名词:

  • Preserved Row table (留存表)
    在join操作中返回所有行的表
  • Null Supplying table (补空表)
    在join操作中,对于不匹配的行,补bull的表
  • During Join predicate (join中谓词)
    在join中on 语句中的谓词,例如:在 R1 join R2 on R1.x = 5,R1.x = 5 我们称之为 join中谓词
  • After Join predicate (join后谓词)
    在join中,位于where中的谓词

2. join type

根据当前spark版本,我们把join类型分为以下多种类型,也就是我们进行验证的各种join类型

  • inner
  • outer | full | fullouter
  • leftouter | left
  • rightouter | right
  • leftsemi | semi
  • leftanti | anti
  • cross

因为 fullouter join和inner join以及leftsemi/anti join 在join中谓词和join后谓词是没有区别的,所以我们不探讨
ross join 没有on操作这么一说,所以我们也不探讨

注意:理论只是理论,在实际应用中会做一些优化,这和理论是有区别

3. sql解析

对于spark来说,任何一个sql的解析都会经过以下几个阶段:

Unresolved Logical Plan -> Analyzer Logical Plan -> Optimzer Logical Plan -> SparkPlan -> ExecutedPlan

实践分析

运行以下代码:

 def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("delta-merge")
      .config("spark.master", "local[1]")
      .config("spark.app.name", "demo")
      .config("spark.sql.adaptive.autoBroadcastJoinThreshold", -1)
      .config("spark.sql.autoBroadcastJoinThreshold", -1)
      .config(SQLConf.PLAN_CHANGE_LOG_LEVEL.key, "warn")
      .getOrCreate()
    spark.sparkContext.setLogLevel("info")

    import spark.implicits._

    val df1 = Seq(
      (BigDecimal("11"), 1),
      (BigDecimal("22"), 2),
      (BigDecimal("33"), 3)).toDF("decNum1", "intNum1")
    df1.write
      .mode(SaveMode.Overwrite)
      .parquet("df1.parquet")
    val df2 = Seq(
      (BigDecimal("1111"), 1),
      (BigDecimal("2222"), 2),
      (BigDecimal("4444"), 4)).toDF("decNum2", "intNum2")
    df2.write
      .mode(SaveMode.Overwrite)
      .parquet("df2.parquet")

    spark.sql("select null > 2").show(2)


    val dfP1 = spark.read.parquet("df1.parquet")
    val dfP2 = spark.read.parquet("df2.parquet")
    dfP1.createOrReplaceTempView("tbl1")
    dfP2.createOrReplaceTempView("tbl2")
    val dfResult = spark.sql("select * from tbl1 join tbl2 on intNum1 == intNum2 where intNum1 > 1")
    dfResult.show(40, false)
    dfResult.explain("extended")
    println("==========")

    dfResult.queryExecution.tracker.rules map {
      case (key, value: RuleSummary) if (value.numEffectiveInvocations > 0) =>
        println(s"$key, $value")
      case (_, _) =>

    }
    Thread.sleep(10000000L)

  }

spark.sql.adaptive.autoBroadcastJoinThreshold 和spark.sql.autoBroadcastJoinThreshold设置为-1
是为了把SMJ(sort merge join)转换为BHJ(broastcast hash join)给禁掉,这样就能看到我们想要的结果。

SQLConf.PLAN_CHANGE_LOG_LEVEL.key和sparkcontext的log级别进行调整
是为了能够打印出sql所经历的逻辑计划优化规则以及物理规则,这样我们就很清楚的知道该条sql被洗礼的过程。

df3.explain("extended") 是为了更加清晰直观的打印出各个阶段的计划,方便追踪。

df3.queryExecution.tracker.rules 是为了打印出sql在逻辑计划阶段所经历的解析以及优化规则,排序不分先后,因为后端是用java.util.HashMap存储的。

  • leftouter-join中谓词-留存表
    运行
    val dfResult = spark.sql("select * from tbl1 left outer join tbl2 on intNum1 == intNum2 and intNum1 > 1")

ResolveRelations规则只是用catalog元数据解析出parquet表,如下:

=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations ===
 'Project [*]                                                     'Project [*]
 +- 'Join LeftOuter, (('intNum1 = 'intNum2) AND ('intNum1 > 1))   +- 'Join LeftOuter, (('intNum1 = 'intNum2) AND ('intNum1 > 1))
!   :- 'UnresolvedRelation [tbl1], [], false                         :- SubqueryAlias tbl1
!   +- 'UnresolvedRelation [tbl2], [], false                         :  +- View (`tbl1`, [decNum1#33,intNum1#34])
!                                                                    :     +- Relation [decNum1#33,intNum1#34] parquet
!                                                                    +- SubqueryAlias tbl2
!                                                                       +- View (`tbl2`, [decNum2#37,intNum2#38])
!                                                                          +- Relation [decNum2#37,intNum2#38] parquet

PushDownPredicates规则有所变化,只是变化了一下on中两个条件的位置,如下:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicates ===
!Join LeftOuter, ((intNum1#34 = intNum2#38) AND (intNum1#34 > 1))   Join LeftOuter, ((intNum1#34 > 1) AND (intNum1#34 = intNum2#38))
 :- Relation [decNum1#33,intNum1#34] parquet                        :- Relation [decNum1#33,intNum1#34] parquet
 +- Relation [decNum2#37,intNum2#38] parquet                        +- Relation [decNum2#37,intNum2#38] parquet

InferFiltersFromConstraints做了谓词下推,但是下推的是补空表,而不是保留表,如下:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
 Join LeftOuter, ((intNum1#34 > 1) AND (intNum1#34 = intNum2#38))   Join LeftOuter, ((intNum1#34 > 1) AND (intNum1#34 = intNum2#38))
 :- Relation [decNum1#33,intNum1#34] parquet                        :- Relation [decNum1#33,intNum1#34] parquet
!+- Relation [decNum2#37,intNum2#38] parquet                        +- Filter ((intNum2#38 > 1) AND isnotnull(intNum2#38))
!                                                                      +- Relation [decNum2#37,intNum2#38] parquet
 

其实从源码上我们也可以看到其实现,如下:

 case LeftOuter | LeftAnti =>
         val allConstraints = getAllConstraints(left, right, conditionOpt)
         val newRight = inferNewFilter(right, allConstraints)
         join.copy(right = newRight)

结果:

|decNum1              |intNum1|decNum2                |intNum2|
+---------------------+-------+-----------------------+-------+
|11.000000000000000000|1      |null                   |null   |
|22.000000000000000000|2      |2222.000000000000000000|2      |
|33.000000000000000000|3      |null                   |null   |
+---------------------+-------+-----------------------+-------+

对应的物理计划:


image
  • leftouter-join中谓词-补空表
    运行
    val dfResult = spark.sql("select * from tbl1 left outer join tbl2 on intNum1 == intNum2 and intNum2 > 1")

这个时候PushDownPredicates规则又有所变化,直接把谓词下推下去了,如下:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicates ===
!Join LeftOuter, ((intNum1#34 = intNum2#38) AND (intNum2#38 > 1))   Join LeftOuter, (intNum1#34 = intNum2#38)
 :- Relation [decNum1#33,intNum1#34] parquet                        :- Relation [decNum1#33,intNum1#34] parquet
!+- Relation [decNum2#37,intNum2#38] parquet                        +- Filter (intNum2#38 > 1)
!                                                                      +- Relation [decNum2#37,intNum2#38] parquet
  

源码实现部分参考如下:

case LeftOuter | LeftAnti | ExistenceJoin(_) =>
          // push down the right side only join filter for right sub query
          val newLeft = left
          val newRight = rightJoinConditions.
            reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
          val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And)

          Join(newLeft, newRight, joinType, newJoinCond, hint)

InferFiltersFromConstraints的规则,也就只是加了isnotnull(intNum2#38)判断,如下:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
 Join LeftOuter, (intNum1#34 = intNum2#38)        Join LeftOuter, (intNum1#34 = intNum2#38)
 :- Relation [decNum1#33,intNum1#34] parquet      :- Relation [decNum1#33,intNum1#34] parquet
!+- Filter (intNum2#38 > 1)                       +- Filter (isnotnull(intNum2#38) AND (intNum2#38 > 1))
    +- Relation [decNum2#37,intNum2#38] parquet      +- Relation [decNum2#37,intNum2#38] parquet
           

结果:

+---------------------+-------+-----------------------+-------+
|decNum1              |intNum1|decNum2                |intNum2|
+---------------------+-------+-----------------------+-------+
|11.000000000000000000|1      |null                   |null   |
|22.000000000000000000|2      |2222.000000000000000000|2      |
|33.000000000000000000|3      |null                   |null   |
+---------------------+-------+-----------------------+-------+

对应的物理计划:


在这里插入图片描述
  • leftouter-join后谓词-留存表
    运行
    val dfResult = spark.sql("select * from tbl1 left outer join tbl2 on intNum1 == intNum2 where intNum1 > 1")

PushDownPredicates规则把filter进行了下推,如下:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicates ===
!Filter (intNum1#34 > 1)                          Join LeftOuter, (intNum1#34 = intNum2#38)
!+- Join LeftOuter, (intNum1#34 = intNum2#38)     :- Filter (intNum1#34 > 1)
!   :- Relation [decNum1#33,intNum1#34] parquet   :  +- Relation [decNum1#33,intNum1#34] parquet
!   +- Relation [decNum2#37,intNum2#38] parquet   +- Relation [decNum2#37,intNum2#38] parquet
           

InferFiltersFromConstraints规则把谓词进行了推导,补空表也进行了下推,如下:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
 Join LeftOuter, (intNum1#34 = intNum2#38)        Join LeftOuter, (intNum1#34 = intNum2#38)
!:- Filter (intNum1#34 > 1)                       :- Filter (isnotnull(intNum1#34) AND (intNum1#34 > 1))
 :  +- Relation [decNum1#33,intNum1#34] parquet   :  +- Relation [decNum1#33,intNum1#34] parquet
!+- Relation [decNum2#37,intNum2#38] parquet      +- Filter ((intNum2#38 > 1) AND isnotnull(intNum2#38))
!                                                    +- Relation [decNum2#37,intNum2#38] parquet
           

运行结果如下:

+---------------------+-------+-----------------------+-------+
|decNum1              |intNum1|decNum2                |intNum2|
+---------------------+-------+-----------------------+-------+
|22.000000000000000000|2      |2222.000000000000000000|2      |
|33.000000000000000000|3      |null                   |null   |
+---------------------+-------+-----------------------+-------+

对应的物理计划:


在这里插入图片描述
  • leftouter-join后谓词-补空表
    运行:
    val dfResult = spark.sql("select * from tbl1 left outer join tbl2 on intNum1 == intNum2 where intNum2 > 1")

但是多了一条EliminateOuterJoin规则,这个规则会把left join操作,变换为inner join,如下:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin ===
 Filter (intNum2#38 > 1)                          Filter (intNum2#38 > 1)
!+- Join LeftOuter, (intNum1#34 = intNum2#38)     +- Join Inner, (intNum1#34 = intNum2#38)
    :- Relation [decNum1#33,intNum1#34] parquet      :- Relation [decNum1#33,intNum1#34] parquet
    +- Relation [decNum2#37,intNum2#38] parquet      +- Relation [decNum2#37,intNum2#38] parquet
           

PushDownPredicates规则和InferFiltersFromConstraints分析和leftouter-join后谓词-留存表 一样,只不过join类型变成了inner join(由于EliminateOuterJoin变换的),也是会进行下推.
结果如下:


+---------------------+-------+-----------------------+-------+
|decNum1              |intNum1|decNum2                |intNum2|
+---------------------+-------+-----------------------+-------+
|22.000000000000000000|2      |2222.000000000000000000|2      |
+---------------------+-------+-----------------------+-------+

对应的物理计划:

在这里插入图片描述
  • rightouter join中谓词-留存表
    运行:
    val dfResult = spark.sql("select * from tbl1 right outer join tbl2 on intNum1 == intNum2 and  intNum2 > 1")

PushDownPredicates规则只是把join条件的位置进行了变化,如下:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicates ===
!Join RightOuter, ((intNum1#34 = intNum2#38) AND (intNum2#38 > 1))   Join RightOuter, ((intNum2#38 > 1) AND (intNum1#34 = intNum2#38))
 :- Relation [decNum1#33,intNum1#34] parquet                         :- Relation [decNum1#33,intNum1#34] parquet
 +- Relation [decNum2#37,intNum2#38] parquet                         +- Relation [decNum2#37,intNum2#38] parquet

而InferFiltersFromConstraints会衍生出下推,如:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
 Join RightOuter, ((intNum2#38 > 1) AND (intNum1#34 = intNum2#38))   Join RightOuter, ((intNum2#38 > 1) AND (intNum1#34 = intNum2#38))
!:- Relation [decNum1#33,intNum1#34] parquet                         :- Filter ((intNum1#34 > 1) AND isnotnull(intNum1#34))
!+- Relation [decNum2#37,intNum2#38] parquet                         :  +- Relation [decNum1#33,intNum1#34] parquet
!                                                                    +- Relation [decNum2#37,intNum2#38] parquet

结果:

+---------------------+-------+-----------------------+-------+
|decNum1              |intNum1|decNum2                |intNum2|
+---------------------+-------+-----------------------+-------+
|null                 |null   |1111.000000000000000000|1      |
|22.000000000000000000|2      |2222.000000000000000000|2      |
|null                 |null   |4444.000000000000000000|4      |
+---------------------+-------+-----------------------+-------+

对应的物理计划:

在这里插入图片描述
  • rightouter join中谓词-补空表
    运行:
    val dfResult = spark.sql("select * from tbl1 right outer join tbl2 on intNum1 == intNum2 and intNum1 > 1")

PushDownPredicates规则会把补空表进行下推,如:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicates ===
!Join RightOuter, ((intNum1#34 = intNum2#38) AND (intNum1#34 > 1))   Join RightOuter, (intNum1#34 = intNum2#38)
!:- Relation [decNum1#33,intNum1#34] parquet                         :- Filter (intNum1#34 > 1)
!+- Relation [decNum2#37,intNum2#38] parquet                         :  +- Relation [decNum1#33,intNum1#34] parquet
!                                                                    +- Relation [decNum2#37,intNum2#38] parquet
       

InferFiltersFromConstraints规则,会添加isnull的判断:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
 Join RightOuter, (intNum1#34 = intNum2#38)       Join RightOuter, (intNum1#34 = intNum2#38)
!:- Filter (intNum1#34 > 1)                       :- Filter (isnotnull(intNum1#34) AND (intNum1#34 > 1))
 :  +- Relation [decNum1#33,intNum1#34] parquet   :  +- Relation [decNum1#33,intNum1#34] parquet
 +- Relation [decNum2#37,intNum2#38] parquet      +- Relation [decNum2#37,intNum2#38] parquet
           

结果:

+---------------------+-------+-----------------------+-------+
|decNum1              |intNum1|decNum2                |intNum2|
+---------------------+-------+-----------------------+-------+
|null                 |null   |1111.000000000000000000|1      |
|22.000000000000000000|2      |2222.000000000000000000|2      |
|null                 |null   |4444.000000000000000000|4      |
+---------------------+-------+-----------------------+-------+

对应的物理计划:


在这里插入图片描述
  • rightouter join后谓词-留存表
    运行:
    val dfResult = spark.sql("select * from tbl1 right outer join tbl2 on intNum1 == intNum2 where intNum2 > 1")

PushDownPredicates规则会把留存表的谓词下推到join之后,如下:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicates ===
!Filter (intNum2#38 > 1)                          Join RightOuter, (intNum1#34 = intNum2#38)
!+- Join RightOuter, (intNum1#34 = intNum2#38)    :- Relation [decNum1#33,intNum1#34] parquet
!   :- Relation [decNum1#33,intNum1#34] parquet   +- Filter (intNum2#38 > 1)
    +- Relation [decNum2#37,intNum2#38] parquet      +- Relation [decNum2#37,intNum2#38] parquet

InferFiltersFromConstraints则会进行衍生,如下:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
 Join RightOuter, (intNum1#34 = intNum2#38)       Join RightOuter, (intNum1#34 = intNum2#38)
!:- Relation [decNum1#33,intNum1#34] parquet      :- Filter ((intNum1#34 > 1) AND isnotnull(intNum1#34))
!+- Filter (intNum2#38 > 1)                       :  +- Relation [decNum1#33,intNum1#34] parquet
!   +- Relation [decNum2#37,intNum2#38] parquet   +- Filter (isnotnull(intNum2#38) AND (intNum2#38 > 1))
!                                                    +- Relation [decNum2#37,intNum2#38] parquet
           

结果:

+---------------------+-------+-----------------------+-------+
|decNum1              |intNum1|decNum2                |intNum2|
+---------------------+-------+-----------------------+-------+
|22.000000000000000000|2      |2222.000000000000000000|2      |
|null                 |null   |4444.000000000000000000|4      |
+---------------------+-------+-----------------------+-------+

对应的物理计划:


在这里插入图片描述
  • rightouter join后谓词-补空表
    运行:
    val dfResult = spark.sql("select * from tbl1 right outer join tbl2 on intNum1 == intNum2 where intNum1 > 1")

EliminateOuterJoin的规则和PushDownPredicates以及InferFiltersFromConstraints的分析和 leftouter-join后谓词-补空表一样,此处不再累赘

结果:

+---------------------+-------+-----------------------+-------+
|decNum1              |intNum1|decNum2                |intNum2|
+---------------------+-------+-----------------------+-------+
|22.000000000000000000|2      |2222.000000000000000000|2      |
+---------------------+-------+-----------------------+-------+

对应的物理计划:


在这里插入图片描述

结论

left join 留存表 补空表
join中谓词 不下推 下推
join后谓词 下推 下推
right join 留存表 补空表
join中谓词 不下推 下推
join后谓词 下推 下推

合并一下就是

outer join 留存表 补空表
join中谓词 不下推 下推
join后谓词 下推 下推

对比之下,其实 理论上说的 join后谓词 补空表不下推和实践中得出来的下推还是有区别(不同点用黑体进行了区分),也就印证了那句话,实践中会对理论做优化,也和Paxos原理类似。

其实这区别的来源是spark增加了EliminateOuterJoin规则

相关文章

网友评论

      本文标题:spark outer join push down filte

      本文链接:https://www.haomeiwen.com/subject/neuwzltx.html