美文网首页
Spark之SortShuffle

Spark之SortShuffle

作者: 海轩_fan | 来源:发表于2020-07-24 18:18 被阅读0次

SortShuffleManager两种运行机制

SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制;
普通机制:


image.png

bypass机制:


image.png
普通机制为默认使用,在1.2之前使用HashShuffleManager;使用bypass机制需要满足shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值并且mapSideCombine(受聚合算子影响)不为true
private[spark] object SortShuffleWriter {
  def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
    // We cannot bypass sorting if we need to do map-side aggregation.
    if (dep.mapSideCombine) {
      require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
      false
    } else {
      val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
      dep.partitioner.numPartitions <= bypassMergeThreshold
    }
  }
}
// 创建shuffleHandler
override def registerShuffle[K, V, C](
      shuffleId: Int,
      numMaps: Int,
      dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
    if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
      // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
      // need map-side aggregation, then write numPartitions files directly and just concatenate
      // them at the end. This avoids doing serialization and deserialization twice to merge
      // together the spilled files, which would happen with the normal code path. The downside is
      // having multiple files open at a time and thus more memory allocated to buffers.
      new BypassMergeSortShuffleHandle[K, V](
        shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
      // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
      //帮助器方法,用于确定洗牌是否应使用优化的序列化洗牌路径,或者是否应该退回到对反序列化对象操作的原始路径。
// 同时满足序列化器支持其序列化对象的重定位,未定义聚合器 aggregator ,numPartitions小于2^24;才能使用SerializedShuffleHandle
      new SerializedShuffleHandle[K, V](
        shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else {
      // Otherwise, buffer map outputs in a deserialized form:
      new BaseShuffleHandle(shuffleId, numMaps, dependency)
    }
  }

spark.shuffle.sort.bypassMergeThreshold带来的影响

从上述过程来看,调整spark.shuffle.sort.bypassMergeThreshold的值,使其大于等于分区数,能有效的减少部分Shuffler算子的排序过程;同时能有效的减少rdd的复制情况,源码如下

 private def needToCopyObjectsBeforeShuffle(
      partitioner: Partitioner,
      serializer: Serializer): Boolean = {
    val conf = SparkEnv.get.conf
    val shuffleManager = SparkEnv.get.shuffleManager
    val sortBasedShuffleOn = shuffleManager.isInstanceOf[SortShuffleManager]
    val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
    if (sortBasedShuffleOn) {
      val bypassIsSupported = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
      if (bypassIsSupported && partitioner.numPartitions <= bypassMergeThreshold) {
        false
      } else if (serializer.supportsRelocationOfSerializedObjects) {
       // 满足序列化器支持其序列化对象的重定位 
        false
      } else {
        true
      }
    } else {
      true
    }
  }

val rddWithPartitionIds: RDD[Product2[Int, InternalRow]] = {
      if (needToCopyObjectsBeforeShuffle(part, serializer)) {
        rdd.mapPartitionsInternal { iter =>
          val getPartitionKey = getPartitionKeyExtractor()
          iter.map { row => (part.getPartition(getPartitionKey(row)), row.copy()) }
        }
      } else {
        rdd.mapPartitionsInternal { iter =>
          val getPartitionKey = getPartitionKeyExtractor()
          val mutablePair = new MutablePair[Int, InternalRow]()
          iter.map { row => mutablePair.update(part.getPartition(getPartitionKey(row)), row) }
        }
      }
    }

上述源码中如果序列化器不支持对象的重定位,则可能会产生数据复制;目前spark集成的JavaSerializer是不支持重定位的,但KryoSerializer支持重定位

相关文章

网友评论

      本文标题:Spark之SortShuffle

      本文链接:https://www.haomeiwen.com/subject/ookslktx.html