美文网首页
【spark系列3】spark 3.0.1 AQE(Adapti

【spark系列3】spark 3.0.1 AQE(Adapti

作者: 鸿乃江边鸟 | 来源:发表于2020-12-01 15:46 被阅读0次

    AQE简介

    spark configuration,到在最早在spark 1.6版本就已经有了AQE;到了spark 2.x版本,intel大数据团队进行了相应的原型开发和实践;到了spark 3.0时代,Databricks和intel一起为社区贡献了新的AQE

    spark 3.0.1中的AQE的配置

    配置项 默认值 官方说明 分析
    spark.sql.adaptive.enabled false 是否开启自适应查询 此处设置为true开启
    spark.sql.adaptive.coalescePartitions.enabled true 是否合并临近的shuffle分区(根据'spark.sql.adaptive.advisoryPartitionSizeInBytes'的阈值来合并) 此处默认为true开启,分析见: 分析1
    spark.sql.adaptive.coalescePartitions.initialPartitionNum (none) shuffle合并分区之前的初始分区数,默认为spark.sql.shuffle.partitions的值 分析见:分析2
    spark.sql.adaptive.coalescePartitions.minPartitionNum (none) shuffle 分区合并后的最小分区数,默认为spark集群的默认并行度 分析见: 分析3
    spark.sql.adaptive.advisoryPartitionSizeInBytes 64MB 建议的shuffle分区的大小,在合并分区和处理join数据倾斜的时候用到 分析见:分析3
    spark.sql.adaptive.skewJoin.enabled true 是否开启join中数据倾斜的自适应处理
    spark.sql.adaptive.skewJoin.skewedPartitionFactor 5 数据倾斜判断因子,必须同时满足skewedPartitionFactor和skewedPartitionThresholdInBytes 分析见:分析4
    spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 256MB 数据倾斜判断阈值,必须同时满足skewedPartitionFactor和skewedPartitionThresholdInBytes 分析见:分析4
    spark.sql.adaptive.logLevel debug 配置自适应执行的计划改变日志 调整为info级别,便于观察自适应计划的改变
    spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin 0.2 转为broadcastJoin的非空分区比例阈值,>=该值,将不会转换为broadcastjoin 分析见:分析5

    分析1

    OptimizeSkewedJoin.scala中,我们看到ADVISORY_PARTITION_SIZE_IN_BYTES,也就是spark.sql.adaptive.advisoryPartitionSizeInBytes被引用的地方, (OptimizeSkewedJoin是物理计划中的规则)

     /**
       * The goal of skew join optimization is to make the data distribution more even. The target size
       * to split skewed partitions is the average size of non-skewed partition, or the
       * advisory partition size if avg size is smaller than it.
       */
      private def targetSize(sizes: Seq[Long], medianSize: Long): Long = {
        val advisorySize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
        val nonSkewSizes = sizes.filterNot(isSkewed(_, medianSize))
        // It's impossible that all the partitions are skewed, as we use median size to define skew.
        assert(nonSkewSizes.nonEmpty)
        math.max(advisorySize, nonSkewSizes.sum / nonSkewSizes.length)
      }
    
    

    其中:

    1. nonSkewSizes为task非倾斜的分区
    2. targetSize返回的是max(非倾斜的分区的平均值,advisorySize),其中advisorySize为spark.sql.adaptive.advisoryPartitionSizeInBytes值,所以说
      targetSize不一定是spark.sql.adaptive.advisoryPartitionSizeInBytes值
    3. medianSize值为task的分区大小的中位值

    分析2

    SQLConf.scala

    def numShufflePartitions: Int = {
        if (adaptiveExecutionEnabled && coalesceShufflePartitionsEnabled) {
          getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(defaultNumShufflePartitions)
        } else {
          defaultNumShufflePartitions
        }
      }
    

    从spark 3.0.1开始如果开启了AQE和shuffle分区合并,则用的是spark.sql.adaptive.coalescePartitions.initialPartitionNum,这在如果有多个shuffle stage的情况下,增加分区数,可以有效的增强shuffle分区合并的效果

    分析3

    CoalesceShufflePartitions.scala,CoalesceShufflePartitions是一个物理计划的规则,会执行如下操作

     if (!shuffleStages.forall(_.shuffle.canChangeNumPartitions)) {
          plan
        } else {
          // `ShuffleQueryStageExec#mapStats` returns None when the input RDD has 0 partitions,
          // we should skip it when calculating the `partitionStartIndices`.
          val validMetrics = shuffleStages.flatMap(_.mapStats)
    
          // We may have different pre-shuffle partition numbers, don't reduce shuffle partition number
          // in that case. For example when we union fully aggregated data (data is arranged to a single
          // partition) and a result of a SortMergeJoin (multiple partitions).
          val distinctNumPreShufflePartitions =
            validMetrics.map(stats => stats.bytesByPartitionId.length).distinct
          if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1) {
            // We fall back to Spark default parallelism if the minimum number of coalesced partitions
            // is not set, so to avoid perf regressions compared to no coalescing.
            val minPartitionNum = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM)
              .getOrElse(session.sparkContext.defaultParallelism)
            val partitionSpecs = ShufflePartitionsUtil.coalescePartitions(
              validMetrics.toArray,
              advisoryTargetSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES),
              minNumPartitions = minPartitionNum)
            // This transformation adds new nodes, so we must use `transformUp` here.
            val stageIds = shuffleStages.map(_.id).toSet
            plan.transformUp {
              // even for shuffle exchange whose input RDD has 0 partition, we should still update its
              // `partitionStartIndices`, so that all the leaf shuffles in a stage have the same
              // number of output partitions.
              case stage: ShuffleQueryStageExec if stageIds.contains(stage.id) =>
                CustomShuffleReaderExec(stage, partitionSpecs, COALESCED_SHUFFLE_READER_DESCRIPTION)
            }
          } else {
            plan
          }
        }
      }
    

    也就是说:

    1. 如果是用户自己指定的分区操作,如repartition操作,spark.sql.adaptive.coalescePartitions.minPartitionNum无效,且跳过分区合并优化
    2. 如果多个task进行shuffle,且task有不同的分区数的话,spark.sql.adaptive.coalescePartitions.minPartitionNum无效,且跳过分区合并优化
    3. 见ShufflePartitionsUtil.coalescePartition分析

    分析4

    OptimizeSkewedJoin.scala中,我们看到

    /**
       * A partition is considered as a skewed partition if its size is larger than the median
       * partition size * ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR and also larger than
       * ADVISORY_PARTITION_SIZE_IN_BYTES.
       */
      private def isSkewed(size: Long, medianSize: Long): Boolean = {
        size > medianSize * conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR) &&
          size > conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD)
      }
    
    1. OptimizeSkewedJoin是个物理计划的规则,会根据isSkewed来判断是否数据数据有倾斜,而且必须是满足SKEW_JOIN_SKEWED_PARTITION_FACTOR和SKEW_JOIN_SKEWED_PARTITION_THRESHOLD才会判断为数据倾斜了
    2. medianSize为task的分区大小的中位值

    分析5

    在AdaptiveSparkPlanExec方法getFinalPhysicalPlan中调用了reOptimize方法,而reOptimize方法则会执行逻辑计划的优化操作:

    private def reOptimize(logicalPlan: LogicalPlan): (SparkPlan, LogicalPlan) = {
        logicalPlan.invalidateStatsCache()
        val optimized = optimizer.execute(logicalPlan)
        val sparkPlan = context.session.sessionState.planner.plan(ReturnAnswer(optimized)).next()
        val newPlan = applyPhysicalRules(sparkPlan, preprocessingRules ++ queryStagePreparationRules)
        (newPlan, optimized)
      }
    

    而optimizer 中有个DemoteBroadcastHashJoin规则:

    @transient private val optimizer = new RuleExecutor[LogicalPlan] {
        // TODO add more optimization rules
        override protected def batches: Seq[Batch] = Seq(
          Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin(conf))
        )
      }
    
    

    而对于DemoteBroadcastHashJoin则有对是否broadcastjoin的判断:

    case class DemoteBroadcastHashJoin(conf: SQLConf) extends Rule[LogicalPlan] {
    
      private def shouldDemote(plan: LogicalPlan): Boolean = plan match {
        case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if stage.resultOption.isDefined
          && stage.mapStats.isDefined =>
          val mapStats = stage.mapStats.get
          val partitionCnt = mapStats.bytesByPartitionId.length
          val nonZeroCnt = mapStats.bytesByPartitionId.count(_ > 0)
          partitionCnt > 0 && nonZeroCnt > 0 &&
            (nonZeroCnt * 1.0 / partitionCnt) < conf.nonEmptyPartitionRatioForBroadcastJoin
        case _ => false
      }
    
      def apply(plan: LogicalPlan): LogicalPlan = plan.transformDown {
        case j @ Join(left, right, _, _, hint) =>
          var newHint = hint
          if (!hint.leftHint.exists(_.strategy.isDefined) && shouldDemote(left)) {
            newHint = newHint.copy(leftHint =
              Some(hint.leftHint.getOrElse(HintInfo()).copy(strategy = Some(NO_BROADCAST_HASH))))
          }
          if (!hint.rightHint.exists(_.strategy.isDefined) && shouldDemote(right)) {
            newHint = newHint.copy(rightHint =
              Some(hint.rightHint.getOrElse(HintInfo()).copy(strategy = Some(NO_BROADCAST_HASH))))
          }
          if (newHint.ne(hint)) {
            j.copy(hint = newHint)
          } else {
            j
          }
      }
    }
    

    shouldDemote就是对是否进行broadcastjoin的判断:

    1. 首先得是ShuffleQueryStageExec操作
    2. 如果非空分区比列大于nonEmptyPartitionRatioForBroadcastJoin,也就是spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin,则不会把mergehashjoin转换为broadcastJoin
    3. 这在sql中先join在groupby的场景中比较容易出现

    ShufflePartitionsUtil.coalescePartition分析(合并分区的核心代码)

    coalescePartition如示:

    def coalescePartitions(
          mapOutputStatistics: Array[MapOutputStatistics],
          advisoryTargetSize: Long,
          minNumPartitions: Int): Seq[ShufflePartitionSpec] = {
        // If `minNumPartitions` is very large, it is possible that we need to use a value less than
        // `advisoryTargetSize` as the target size of a coalesced task.
        val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum
        // The max at here is to make sure that when we have an empty table, we only have a single
        // coalesced partition.
        // There is no particular reason that we pick 16. We just need a number to prevent
        // `maxTargetSize` from being set to 0.
        val maxTargetSize = math.max(
          math.ceil(totalPostShuffleInputSize / minNumPartitions.toDouble).toLong, 16)
        val targetSize = math.min(maxTargetSize, advisoryTargetSize)
    
        val shuffleIds = mapOutputStatistics.map(_.shuffleId).mkString(", ")
        logInfo(s"For shuffle($shuffleIds), advisory target size: $advisoryTargetSize, " +
          s"actual target size $targetSize.")
    
        // Make sure these shuffles have the same number of partitions.
        val distinctNumShufflePartitions =
          mapOutputStatistics.map(stats => stats.bytesByPartitionId.length).distinct
        // The reason that we are expecting a single value of the number of shuffle partitions
        // is that when we add Exchanges, we set the number of shuffle partitions
        // (i.e. map output partitions) using a static setting, which is the value of
        // `spark.sql.shuffle.partitions`. Even if two input RDDs are having different
        // number of partitions, they will have the same number of shuffle partitions
        // (i.e. map output partitions).
        assert(
          distinctNumShufflePartitions.length == 1,
          "There should be only one distinct value of the number of shuffle partitions " +
            "among registered Exchange operators.")
    
        val numPartitions = distinctNumShufflePartitions.head
        val partitionSpecs = ArrayBuffer[CoalescedPartitionSpec]()
        var latestSplitPoint = 0
        var coalescedSize = 0L
        var i = 0
        while (i < numPartitions) {
          // We calculate the total size of i-th shuffle partitions from all shuffles.
          var totalSizeOfCurrentPartition = 0L
          var j = 0
          while (j < mapOutputStatistics.length) {
            totalSizeOfCurrentPartition += mapOutputStatistics(j).bytesByPartitionId(i)
            j += 1
          }
    
          // If including the `totalSizeOfCurrentPartition` would exceed the target size, then start a
          // new coalesced partition.
          if (i > latestSplitPoint && coalescedSize + totalSizeOfCurrentPartition > targetSize) {
            partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, i)
            latestSplitPoint = i
            // reset postShuffleInputSize.
            coalescedSize = totalSizeOfCurrentPartition
          } else {
            coalescedSize += totalSizeOfCurrentPartition
          }
          i += 1
        }
        partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, numPartitions)
    
        partitionSpecs
      }
    
    1. totalPostShuffleInputSize 先计算出总的shuffle的数据大小
    2. maxTargetSize取max(totalPostShuffleInputSize/minNumPartitions,16)的最大值,minNumPartitions也就是spark.sql.adaptive.coalescePartitions.minPartitionNum的值
    3. targetSize取min(maxTargetSize,advisoryTargetSize),advisoryTargetSize也就是spark.sql.adaptive.advisoryPartitionSizeInBytes的值,所以说该值只是建议值,不一定是targetSize
    4. while循环就是取相邻的分区合并,对于每个task中的每个相邻分区合并,直到不大于targetSize

    OptimizeSkewedJoin.optimizeSkewJoin分析(数据倾斜优化的核心代码)

    optimizeSkewJoin如示:

    def optimizeSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp {
        case smj @ SortMergeJoinExec(_, _, joinType, _,
            s1 @ SortExec(_, _, ShuffleStage(left: ShuffleStageInfo), _),
            s2 @ SortExec(_, _, ShuffleStage(right: ShuffleStageInfo), _), _)
            if supportedJoinTypes.contains(joinType) =>
          assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)
          val numPartitions = left.partitionsWithSizes.length
          // Use the median size of the actual (coalesced) partition sizes to detect skewed partitions.
          val leftMedSize = medianSize(left.partitionsWithSizes.map(_._2))
          val rightMedSize = medianSize(right.partitionsWithSizes.map(_._2))
          logDebug(
            s"""
              |Optimizing skewed join.
              |Left side partitions size info:
              |${getSizeInfo(leftMedSize, left.partitionsWithSizes.map(_._2))}
              |Right side partitions size info:
              |${getSizeInfo(rightMedSize, right.partitionsWithSizes.map(_._2))}
            """.stripMargin)
          val canSplitLeft = canSplitLeftSide(joinType)
          val canSplitRight = canSplitRightSide(joinType)
          // We use the actual partition sizes (may be coalesced) to calculate target size, so that
          // the final data distribution is even (coalesced partitions + split partitions).
          val leftActualSizes = left.partitionsWithSizes.map(_._2)
          val rightActualSizes = right.partitionsWithSizes.map(_._2)
          val leftTargetSize = targetSize(leftActualSizes, leftMedSize)
          val rightTargetSize = targetSize(rightActualSizes, rightMedSize)
    
          val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
          val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
          val leftSkewDesc = new SkewDesc
          val rightSkewDesc = new SkewDesc
          for (partitionIndex <- 0 until numPartitions) {
            val isLeftSkew = isSkewed(leftActualSizes(partitionIndex), leftMedSize) && canSplitLeft
            val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1
            val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex
    
            val isRightSkew = isSkewed(rightActualSizes(partitionIndex), rightMedSize) && canSplitRight
            val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1
            val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex
    
            // A skewed partition should never be coalesced, but skip it here just to be safe.
            val leftParts = if (isLeftSkew && !isLeftCoalesced) {
              val reducerId = leftPartSpec.startReducerIndex
              val skewSpecs = createSkewPartitionSpecs(
                left.mapStats.shuffleId, reducerId, leftTargetSize)
              if (skewSpecs.isDefined) {
                logDebug(s"Left side partition $partitionIndex is skewed, split it into " +
                  s"${skewSpecs.get.length} parts.")
                leftSkewDesc.addPartitionSize(leftActualSizes(partitionIndex))
              }
              skewSpecs.getOrElse(Seq(leftPartSpec))
            } else {
              Seq(leftPartSpec)
            }
    
            // A skewed partition should never be coalesced, but skip it here just to be safe.
            val rightParts = if (isRightSkew && !isRightCoalesced) {
              val reducerId = rightPartSpec.startReducerIndex
              val skewSpecs = createSkewPartitionSpecs(
                right.mapStats.shuffleId, reducerId, rightTargetSize)
              if (skewSpecs.isDefined) {
                logDebug(s"Right side partition $partitionIndex is skewed, split it into " +
                  s"${skewSpecs.get.length} parts.")
                rightSkewDesc.addPartitionSize(rightActualSizes(partitionIndex))
              }
              skewSpecs.getOrElse(Seq(rightPartSpec))
            } else {
              Seq(rightPartSpec)
            }
    
            for {
              leftSidePartition <- leftParts
              rightSidePartition <- rightParts
            } {
              leftSidePartitions += leftSidePartition
              rightSidePartitions += rightSidePartition
            }
          }
    
          logDebug("number of skewed partitions: " +
            s"left ${leftSkewDesc.numPartitions}, right ${rightSkewDesc.numPartitions}")
          if (leftSkewDesc.numPartitions > 0 || rightSkewDesc.numPartitions > 0) {
            val newLeft = CustomShuffleReaderExec(
              left.shuffleStage, leftSidePartitions, leftSkewDesc.toString)
            val newRight = CustomShuffleReaderExec(
              right.shuffleStage, rightSidePartitions, rightSkewDesc.toString)
            smj.copy(
              left = s1.copy(child = newLeft), right = s2.copy(child = newRight), isSkewJoin = true)
          } else {
            smj
          }
      }
    
    1. SortMergeJoinExec说明适用于sort merge join
    2. assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)保证进行join的两个task的分区数相等
    3. 分别计算进行join的task的分区中位数的大小leftMedSize和rightMedSize
    4. 分别计算进行join的task的分区的targetzise大小leftTargetSize和rightTargetSize
    5. 循环判断两个task的每个分区的是否存在倾斜,如果倾斜且满足没有进行过shuffle分区合并,则进行倾斜分区处理,否则不处理
    6. createSkewPartitionSpecs方法为:
      1.获取每个join的task的对应分区的数据大小
      2.根据targetSize分成多个slice
    7. 如果存在数据倾斜,则构造包装成CustomShuffleReaderExec,进行后续任务的运行,最最终调用ShuffledRowRDD的compute方法 匹配case PartialMapperPartitionSpec进行数据的读取,其中还会自动开启“spark.sql.adaptive.fetchShuffleBlocksInBatch”批量fetch减少io

    OptimizeSkewedJoin/CoalesceShufflePartitions 在哪里被调用

    如:AdaptiveSparkPlanExec

    @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
        ReuseAdaptiveSubquery(conf, context.subqueryCache),
        CoalesceShufflePartitions(context.session),
        // The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs'
        // added by `CoalesceShufflePartitions`. So they must be executed after it.
        OptimizeSkewedJoin(conf),
        OptimizeLocalShuffleReader(conf)
      )
    

    可见在AdaptiveSparkPlanExec中被调用 ,且CoalesceShufflePartitions先于OptimizeSkewedJoin,
    而AdaptiveSparkPlanExec在InsertAdaptiveSparkPlan中被调用
    ,而InsertAdaptiveSparkPlan在QueryExecution中被调用

    而在InsertAdaptiveSparkPlan.shouldApplyAQE方法和supportAdaptive中我们看到

    private def shouldApplyAQE(plan: SparkPlan, isSubquery: Boolean): Boolean = {
        conf.getConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY) || isSubquery || {
          plan.find {
            case _: Exchange => true
            case p if !p.requiredChildDistribution.forall(_ == UnspecifiedDistribution) => true
            case p => p.expressions.exists(_.find {
              case _: SubqueryExpression => true
              case _ => false
            }.isDefined)
          }.isDefined
        }
      }
    
    private def supportAdaptive(plan: SparkPlan): Boolean = {
        // TODO migrate dynamic-partition-pruning onto adaptive execution.
        sanityCheck(plan) &&
          !plan.logicalLink.exists(_.isStreaming) &&
          !plan.expressions.exists(_.find(_.isInstanceOf[DynamicPruningSubquery]).isDefined) &&
        plan.children.forall(supportAdaptive)
      }
    
    

    如果不满足以上条件也是不会开启AQE的,如果要强制开启,也可以配置spark.sql.adaptive.forceApply 为true(文档中提示是内部配置)

    注意:

    在spark 3.0.1中已经废弃了如下的配置:

    spark.sql.adaptive.skewedPartitionMaxSplits    
    spark.sql.adaptive.skewedPartitionRowCountThreshold    
    spark.sql.adaptive.skewedPartitionSizeThreshold   
    

    本文部分参考:
    https://mp.weixin.qq.com/s?__biz=MzA5MTc0NTMwNQ==&mid=2650718363&idx=1&sn=d20fffebafdd2bed6939eaeb39f5e6e3
    https://mp.weixin.qq.com/s/RvFpXWpV8APcGTHhftS6NQ

    相关文章

      网友评论

          本文标题:【spark系列3】spark 3.0.1 AQE(Adapti

          本文链接:https://www.haomeiwen.com/subject/mwyswktx.html