美文网首页大数据领域精选
【Spark 精选】EliminateOuterJoin 优化规

【Spark 精选】EliminateOuterJoin 优化规

作者: 熊本极客 | 来源:发表于2023-10-09 17:34 被阅读0次

    1.EliminateOuterJoin 优化规则的应用场景

    问题:为啥需要消除外链接即 out join
    解答:消除 out join 可以提高执行效率。因为 inner join 只保留左表和右表可以关联到的数据,left join 需要保留左表全表的数据,right join 需要保留右表全表的数据,full join 左右表数据都需要保留,所以四种 join 在数据处理上的效率:inner join > left join = right join > full join

    2.EliminateOuterJoin 源码解析

      private def buildNewJoinType(filter: Filter, join: Join): JoinType = {
        val conditions = splitConjunctivePredicates(filter.condition) ++ filter.constraints
        val leftConditions = conditions.filter(_.references.subsetOf(join.left.outputSet))
        val rightConditions = conditions.filter(_.references.subsetOf(join.right.outputSet))
    
        lazy val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull)
        lazy val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull)
    
        join.joinType match {
          case RightOuter if leftHasNonNullPredicate => Inner   // 1.right outer类型,且join的左表有过滤操作,则转换为inner类型
          case LeftOuter if rightHasNonNullPredicate => Inner   // 2.left outer类型,且join的右表有过滤操作,则转换为inner类型
          case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate => Inner   // 3.full outer类型,且join的左右表都有过滤操作,则转换为inner类型
          case FullOuter if leftHasNonNullPredicate => LeftOuter   // 4.full outer类型,且join的左表有过滤操作,则转换为left outer类型
          case FullOuter if rightHasNonNullPredicate => RightOuter   // 5.full outer类型,且join的右表有过滤操作,则转换为right outer类型
          case o => o
        }
      }
    
      def apply(plan: LogicalPlan): LogicalPlan = plan transform {
        // 匹配上Filter并且其子节点为Join的LogicalPlan
        case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _, _)) =>
          val newJoinType = buildNewJoinType(f, j)
          // 如果相等,则不符合优化条件
          if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))   // 如果不相等,则改变 JoinType
      }
    }
    
    EliminateOuterJoin执行流程.JPG

    问题 1:为啥 left out join 的右边有过滤条件,则转换为 inner?
    解答left join 的特点是右表没有对应的数据时补 null。如下所示,现在右表有个条件 a<1,这说明右表为 null 都会被 a<1 给过滤掉,此时和 inner join 是等价的。

    spark-sql> explain extended SELECT* FROM employees LEFT JOIN departments ON employees.dept_id = departments.dept_id where departments.dept_id < 200;
    
    == Parsed Logical Plan ==
    'Project [*]
    +- 'Filter ('departments.dept_id < 200)
       +- 'Join LeftOuter, ('employees.dept_id = 'departments.dept_id)
          :- 'UnresolvedRelation [employees], [], false
          +- 'UnresolvedRelation [departments], [], false
    
    == Analyzed Logical Plan ==
    emp_id: int, emp_name: string, dept_id: int, dept_id: int, dept_name: string, location_id: int
    Project [emp_id#102, emp_name#103, dept_id#104, dept_id#105, dept_name#106, location_id#107]
    +- Filter (dept_id#105 < 200)
       +- Join LeftOuter, (dept_id#104 = dept_id#105)
          :- SubqueryAlias spark_catalog.tpcds_text_varchar_5.employees
          :  +- HiveTableRelation [`tpcds_text_varchar_5`.`employees`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [emp_id#102, emp_name#103, dept_id#104], Partition Cols: []]
          +- SubqueryAlias spark_catalog.tpcds_text_varchar_5.departments
             +- HiveTableRelation [`tpcds_text_varchar_5`.`departments`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [dept_id#105, dept_name#106, location_id#107], Partition Cols: []]
    
    == Optimized Logical Plan ==
    Join Inner, (dept_id#104 = dept_id#105)
    :- Filter ((dept_id#104 < 200) AND isnotnull(dept_id#104))
    :  +- HiveTableRelation [`tpcds_text_varchar_5`.`employees`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [emp_id#102, emp_name#103, dept_id#104], Partition Cols: []]
    +- Filter (isnotnull(dept_id#105) AND (dept_id#105 < 200))
       +- HiveTableRelation [`tpcds_text_varchar_5`.`departments`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [dept_id#105, dept_name#106, location_id#107], Partition Cols: []]
    
    == Physical Plan ==
    *(2) BroadcastHashJoin [dept_id#104], [dept_id#105], Inner, BuildLeft, false
    :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[2, int, false] as bigint)),false), [id=#138]
    :  +- *(1) Filter ((dept_id#104 < 200) AND isnotnull(dept_id#104))
    :     +- Scan hive tpcds_text_varchar_5.employees [emp_id#102, emp_name#103, dept_id#104], HiveTableRelation [`tpcds_text_varchar_5`.`employees`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [emp_id#102, emp_name#103, dept_id#104], Partition Cols: []]
    +- *(2) Filter (isnotnull(dept_id#105) AND (dept_id#105 < 200))
       +- Scan hive tpcds_text_varchar_5.departments [dept_id#105, dept_name#106, location_id#107], HiveTableRelation [`tpcds_text_varchar_5`.`departments`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [dept_id#105, dept_name#106, location_id#107], Partition Cols: []]
    

    问题 2:为啥规则 EliminateOuterJoin 需要在谓语下推 PushDownPredicates 之前执行?

      // Optimizer 
      def defaultBatches: Seq[Batch] = {
        val operatorOptimizationRuleSet =
          Seq(
            // Operator push down
            PushProjectionThroughUnion,
            ReorderJoin,
            EliminateOuterJoin,   // 消除外链接
            PushDownPredicates,   // 谓语下推
            // 省略...
      }      
    

    解答:谓词下推是指尽量将过滤条件更贴近数据源,使得查询过程可以跳过无关的数据。因为 EliminateOuterJoin 需要根据原始过滤条件的位置,进行 out join 转换,如果先执行谓语下推 PushDownPredicates,会影响前者,所以消除外链接的规则需要在谓语下推规则之前执行。

    相关文章

      网友评论

        本文标题:【Spark 精选】EliminateOuterJoin 优化规

        本文链接:https://www.haomeiwen.com/subject/lcwibdtx.html