美文网首页大数据,机器学习,人工智能大数据玩转大数据
Spark2.1和2.2 SQL物理执行策略之Join源码分析以

Spark2.1和2.2 SQL物理执行策略之Join源码分析以

作者: orisonchan | 来源:发表于2018-10-16 23:15 被阅读2次

    1. object ExtractEquiJoinKeys

    一个模式匹配,官方注释是:

    A pattern that finds joins with equality conditions that can be evaluated using equi-join. Null-safe equality will be transformed into equality as joining key (replace null with default value).

    那什么叫null-safe equality呢?这里有个case class EqualNullSafe,解释是:

    expr1 FUNC expr2 - Returns same result as the EQUAL(=) operator for non-null operands, but returns true if both are null, false if one of the them is null.

    意思也就是除了正常的值会判断相等之外,当等式左右两边都是null时候也会认为其相等,当有一边为null时候认为其不等。查看源码会发现,当两边都是null时候其实会被当作是0来处理。

    源码:

    /**
     * A pattern that finds joins with equality conditions that can be evaluated using equi-join.
     *
     * Null-safe equality will be transformed into equality as joining key (replace null with default
     * value).
     */
    object ExtractEquiJoinKeys extends Logging with PredicateHelper {
      /** (joinType, leftKeys, rightKeys, condition, leftChild, rightChild) */
      type ReturnType =
        (JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan)
    
      def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
        case join @ Join(left, right, joinType, condition) =>
          logDebug(s"Considering join on: $condition")
          // Find equi-join predicates that can be evaluated before the join, and thus can be used
          // as join keys.
          val predicates = condition.map(splitConjunctivePredicates).getOrElse(Nil)
          val joinKeys = predicates.flatMap {
            case EqualTo(l, r) if l.references.isEmpty || r.references.isEmpty => None
            case EqualTo(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => Some((l, r))
            case EqualTo(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => Some((r, l))
            // Replace null with default value for joining key, then those rows with null in it could
            // be joined together
            case EqualNullSafe(l, r) if canEvaluate(l, left) && canEvaluate(r, right) =>
              Some((Coalesce(Seq(l, Literal.default(l.dataType))),
                Coalesce(Seq(r, Literal.default(r.dataType)))))
            case EqualNullSafe(l, r) if canEvaluate(l, right) && canEvaluate(r, left) =>
              Some((Coalesce(Seq(r, Literal.default(r.dataType))),
                Coalesce(Seq(l, Literal.default(l.dataType)))))
            case other => None
          }
          val otherPredicates = predicates.filterNot {
            case EqualTo(l, r) if l.references.isEmpty || r.references.isEmpty => false
            case EqualTo(l, r) =>
              canEvaluate(l, left) && canEvaluate(r, right) ||
                canEvaluate(l, right) && canEvaluate(r, left)
            case other => false
          }
    
          if (joinKeys.nonEmpty) {
            val (leftKeys, rightKeys) = joinKeys.unzip
            logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
            Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
          } else {
            None
          }
        case _ => None
      }
    }
    

    首先将join的所有conditions收集出来(如果有and则收集and.left和and.right),然后分成两个Sequence,一个是joinKeys,一个是otherPredicates,前者是对于canEvaluate()为true的收集其(left, right),后者是除了前者收集到的之外的其他condition。那么其中的canEvaluate()是什么,源码如下:

    /**
     * Returns true if `expr` can be evaluated using only the output of `plan`.  This method
     * can be used to determine when it is acceptable to move expression evaluation within a query
     * plan.
     *
     * For example consider a join between two relations R(a, b) and S(c, d).
     *
     * - `canEvaluate(EqualTo(a,b), R)` returns `true`
     * - `canEvaluate(EqualTo(a,c), R)` returns `false`
     * - `canEvaluate(Literal(1), R)` returns `true` as literals CAN be evaluated on any plan
     */
    protected def canEvaluate(expr: Expression, plan: LogicalPlan): Boolean =
      expr.references.subsetOf(plan.outputSet)
    

    即左边表达式的字段必须是右边的子集。

    满足ExtractEquiJoinKeys模式的case,会被应用到Join的物理策略中来。

    2. object JoinSelection

    源码:

      /**
       * Select the proper physical plan for join based on joining keys and size of logical plan.
       *
       * At first, uses the [[ExtractEquiJoinKeys]] pattern to find joins where at least some of the
       * predicates can be evaluated by matching join keys. If found,  Join implementations are chosen
       * with the following precedence:
       *
       * - Broadcast: if one side of the join has an estimated physical size that is smaller than the
       *     user-configurable [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold
       *     or if that side has an explicit broadcast hint (e.g. the user applied the
       *     [[org.apache.spark.sql.functions.broadcast()]] function to a DataFrame), then that side
       *     of the join will be broadcasted and the other side will be streamed, with no shuffling
       *     performed. If both sides of the join are eligible to be broadcasted then the
       * - Shuffle hash join: if the average size of a single partition is small enough to build a hash
       *     table.
       * - Sort merge: if the matching join keys are sortable.
       *
       * If there is no joining keys, Join implementations are chosen with the following precedence:
       * - BroadcastNestedLoopJoin: if one side of the join could be broadcasted
       * - CartesianProduct: for Inner join
       * - BroadcastNestedLoopJoin
       */
      object JoinSelection extends Strategy with PredicateHelper {
        def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
          // --- BroadcastHashJoin --------------------------------------------------------------------
          case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
            if canBuildRight(joinType) && canBroadcast(right) =>
            Seq(joins.BroadcastHashJoinExec(
              leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right)))
    
          case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
            if canBuildLeft(joinType) && canBroadcast(left) =>
            Seq(joins.BroadcastHashJoinExec(
              leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right)))
    
          // --- ShuffledHashJoin ---------------------------------------------------------------------
          case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
             if !conf.preferSortMergeJoin && canBuildRight(joinType) && canBuildLocalHashMap(right)
               && muchSmaller(right, left) ||
               !RowOrdering.isOrderable(leftKeys) =>
            Seq(joins.ShuffledHashJoinExec(
              leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right)))
    
          case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
             if !conf.preferSortMergeJoin && canBuildLeft(joinType) && canBuildLocalHashMap(left)
               && muchSmaller(left, right) ||
               !RowOrdering.isOrderable(leftKeys) =>
            Seq(joins.ShuffledHashJoinExec(
              leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right)))
    
          // --- SortMergeJoin ------------------------------------------------------------
          case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
            if RowOrdering.isOrderable(leftKeys) =>
            joins.SortMergeJoinExec(
              leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil
    
          // --- Without joining keys ------------------------------------------------------------
    
          // Pick BroadcastNestedLoopJoin if one side could be broadcasted
          case j @ logical.Join(left, right, joinType, condition)
              if canBuildRight(joinType) && canBroadcast(right) =>
            joins.BroadcastNestedLoopJoinExec(
              planLater(left), planLater(right), BuildRight, joinType, condition) :: Nil
          case j @ logical.Join(left, right, joinType, condition)
              if canBuildLeft(joinType) && canBroadcast(left) =>
            joins.BroadcastNestedLoopJoinExec(
              planLater(left), planLater(right), BuildLeft, joinType, condition) :: Nil
    
          // Pick CartesianProduct for InnerJoin
          case logical.Join(left, right, _: InnerLike, condition) =>
            joins.CartesianProductExec(planLater(left), planLater(right), condition) :: Nil
    
          case logical.Join(left, right, joinType, condition) =>
            val buildSide =
              if (right.stats(conf).sizeInBytes <= left.stats(conf).sizeInBytes) {
                BuildRight
              } else {
                BuildLeft
              }
            // This join could be very slow or OOM
            joins.BroadcastNestedLoopJoinExec(
              planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
    
          // --- Cases where this strategy does not apply ---------------------------------------------
          case _ => Nil
        }
      }
    

    首先看注释,该策略会首先使用ExtractEquiJoinKeys来确定join至少有一个谓词是可以去估算的,如果有的话,就要根据这些谓词来去计算选择哪种join,这里分三种Join,广播Join,Shuffle Hash Join,还有最常见的Sort Merge Join。如果没有谓词可以估算的话,那么也是有两种方式:BroadcastNestedLoopJoin和CartesianProduct。

    接下来分析源码。在满足有谓词可去估算的情况下,是如何判别是哪种Join?

    2.1 Broadcast Join

    涉及到两个方法,canBroadcast()canBuildX(canBuildLeft或者canBuildRight)。

    /**
     * Matches a plan whose output should be small enough to be used in broadcast join.
     */
    private def canBroadcast(plan: LogicalPlan): Boolean = {
      plan.stats(conf).hints.isBroadcastable.getOrElse(false) ||
        (plan.stats(conf).sizeInBytes >= 0 &&
          plan.stats(conf).sizeInBytes <= conf.autoBroadcastJoinThreshold)
    }
    

    可以看到canBroadcast()这边会去配置项里查找AUTO_BROADCASTJOIN_THRESHOLD,这个配置为-1是不可用。

    private def canBuildRight(joinType: JoinType): Boolean = joinType match {
      case _: InnerLike | LeftOuter | LeftSemi | LeftAnti => true
      case j: ExistenceJoin => true
      case _ => false
    }
    
    private def canBuildLeft(joinType: JoinType): Boolean = joinType match {
      case _: InnerLike | RightOuter => true
      case _ => false
    }
    

    可以看到,canBuildRight()canBuildLeft()方法的意思当以另外一边为主时候为true。

    2.2 Shuffled Hash Join

    涉及到canBuildLocalHashMap()方法、muchSmaller()方法和一个配置项PREFER_SORTMERGEJOIN,这个配置项的解释是:

    When true, prefer sort merge join over shuffle hash join.

    canBuildLocalHashMap()方法源码是:

    /**
     * Matches a plan whose single partition should be small enough to build a hash table.
     *
     * Note: this assume that the number of partition is fixed, requires additional work if it's
     * dynamic.
     */
    private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = {
      plan.stats(conf).sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
    }
    

    该方法涉及到了两个配置项,一个是AUTO_BROADCASTJOIN_THRESHOLD,这个配置项在广播Join中已经有使用到,是对于查询优化非常有用的配置,另外一个是SHUFFLE_PARTITIONS,是为了join或者aggregate进行shuffle时的分区数,不配置的话,默认200。

    muchSmaller()源码:

    /**
     * Returns whether plan a is much smaller (3X) than plan b.
     *
     * The cost to build hash map is higher than sorting, we should only build hash map on a table
     * that is much smaller than other one. Since we does not have the statistic for number of rows,
     * use the size of bytes here as estimation.
     */
    private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
      a.stats(conf).sizeInBytes * 3 <= b.stats(conf).sizeInBytes
    }
    

    也就是所谓的much就是3倍大小。

    2.3 Sort Merge Join

    收集到的join keys在数据类型上都是可以排序的情况下,可以用Sort Merge Join。

    3. BroadcastHashJoinExec

    case class BroadcastHashJoinExec(
        leftKeys: Seq[Expression],
        rightKeys: Seq[Expression],
        joinType: JoinType,
        buildSide: BuildSide,
        condition: Option[Expression],
        left: SparkPlan,
        right: SparkPlan)
      extends BinaryExecNode with HashJoin with CodegenSupport {
    
      protected override def doExecute(): RDD[InternalRow] = {
        val numOutputRows = longMetric("numOutputRows")
    
        val broadcastRelation = buildPlan.executeBroadcast[HashedRelation]()
        streamedPlan.execute().mapPartitions { streamedIter =>
          val hashed = broadcastRelation.value.asReadOnlyCopy()
          TaskContext.get().taskMetrics().incPeakExecutionMemory(hashed.estimatedSize)
          join(streamedIter, hashed, numOutputRows)
        }
      }
    }
    
    protected def join(
        streamedIter: Iterator[InternalRow],
        hashed: HashedRelation,
        numOutputRows: SQLMetric): Iterator[InternalRow] = {
    
      val joinedIter = joinType match {
        case _: InnerLike =>
          innerJoin(streamedIter, hashed)
        case LeftOuter | RightOuter =>
          outerJoin(streamedIter, hashed)
        case LeftSemi =>
          semiJoin(streamedIter, hashed)
        case LeftAnti =>
          antiJoin(streamedIter, hashed)
        case j: ExistenceJoin =>
          existenceJoin(streamedIter, hashed)
        case x =>
          throw new IllegalArgumentException(
            s"BroadcastHashJoin should not take $x as the JoinType")
      }
    
      val resultProj = createResultProjection
      joinedIter.map { r =>
        numOutputRows += 1
        resultProj(r)
      }
    }
    

    上述两段代码,第一段是BroadcastHashJoinExec的定义和基本方法,第二段是其继承的HashJoinjoin()方法。在HashJoin中,存在着一些二元对象,命名为(buildXXX, streamedXXX),这里没有打出来可以自行翻源码,比如(buildPlan, streamedPlan),那么在这里,buildXXX是要被Hash或者要被广播的小表,streamedXXX是大表,stream意思就是通过迭代流过去一条条处理的意思(个人理解)。

    所以在这边,将buildPlan广播出去以后,将streamedPlan调用execute()过后返回的RDD[InternalRow],调用mapPartitions,根据每个分区和广播的小表进行join操作。

    4. ShuffledHashJoinExec

    case class ShuffledHashJoinExec(
        leftKeys: Seq[Expression],
        rightKeys: Seq[Expression],
        joinType: JoinType,
        buildSide: BuildSide,
        condition: Option[Expression],
        left: SparkPlan,
        right: SparkPlan)
      extends BinaryExecNode with HashJoin {
    
      private def buildHashedRelation(iter: Iterator[InternalRow]): HashedRelation = {
        val buildDataSize = longMetric("buildDataSize")
        val buildTime = longMetric("buildTime")
        val start = System.nanoTime()
        val context = TaskContext.get()
        val relation = HashedRelation(iter, buildKeys, taskMemoryManager = context.taskMemoryManager())
        buildTime += (System.nanoTime() - start) / 1000000
        buildDataSize += relation.estimatedSize
        // This relation is usually used until the end of task.
        context.addTaskCompletionListener(_ => relation.close())
        relation
      }
    
      protected override def doExecute(): RDD[InternalRow] = {
        val numOutputRows = longMetric("numOutputRows")
        streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, buildIter) =>
          val hashed = buildHashedRelation(buildIter)
          join(streamIter, hashed, numOutputRows)
        }
      }
    }
    

    ShuffledHashJoinBroadcastJoin在构造Hash Table上有不同,后者是依靠广播生成的HashedRelation,前者是调用zipPartitions方法,该方法的作用是将两个有相同分区数的RDD合并,映射参数是两个RDD的迭代器,可以看到在这里是(streamIter, buildIter),然后对buildIter构造HashRelation。这也就说明:BroadcastJoin的HashRelation是小表的全部数据,而ShuffledHashJoin的HashRelation只是小表跟大表在同一分区内的一部分数据

    5. SortMergeJoinExec

    case class SortMergeJoinExec(
        leftKeys: Seq[Expression],
        rightKeys: Seq[Expression],
        joinType: JoinType,
        condition: Option[Expression],
        left: SparkPlan,
        right: SparkPlan) extends BinaryExecNode with CodegenSupport {
    
      protected override def doExecute(): RDD[InternalRow] = {
        val numOutputRows = longMetric("numOutputRows")
        val spillThreshold = getSpillThreshold
        left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
          ...
          ...
        }
      }
    }
    

    可以看到,同样是将两个RDD做zipPartitions后然后将每个partition迭代做Join。

    相关文章

      网友评论

      • f289f782a082:博主好, 文章写的非常有深度, 小弟深表佩服!

        下面的SQL在SPARKSQL的执行计划中就会产生 ExistenceJoin 关键字
        set spark.sql.autoBroadcastJoinThreshold=-1;
        explain
        select *
        from dept d
        where exists (select 1
        from emp e
        where e.deptno = d.deptno
        and e.deptno is not null)
        or d.loc = 'DALLAS';

        如果只是普通的半连接, 则执行计划中会有关键字 LeftSemi
        set spark.sql.autoBroadcastJoinThreshold=-1;
        explain
        select *
        from dept d
        where exists (select 1
        from emp e
        where e.deptno = d.deptno
        and e.deptno is not null)


        有个问题想请教下,sparksql执行计划中出现 ExistenceJoin 关键字, 这个和 LeftSemi 有啥区别吗???
        orisonchan:@自由_c195 ExistenceJoin不会出现在你的任何SQL的初始解析计划中,只会在优化器中出现。他是spark SQL自己定义的东西,不是标准的某种join。

      本文标题:Spark2.1和2.2 SQL物理执行策略之Join源码分析以

      本文链接:https://www.haomeiwen.com/subject/tbwkzftx.html