[SPARK][CORE] 面试问题之 Shuffle read

作者: Tim在路上 | 来源:发表于2022-06-04 11:36 被阅读0次

    欢迎关注微信公众号“Tim在路上”
    之前我们已经了解了shuffle writer的详细过程,那么生成文件后会发生什么呢?以及它们是如何被读取呢?读取是内存的操作吗?这些问题也随之产生,那么今天我们将先来了解了shuffle reader的细枝末节。

    在文章Spark Shuffle概述中我们已经知道,在ShuffleManager中不仅定义了getWriter来获取map writer的实现方式, 同时还定义了getReader来获取读取shuffle文件的实现方式。 在Spark中调用有两个调用getReader的抽象类的重要实现,分别是ShuffledRDD和ShuffleRowRDD。前者是与RDD API交互,后面一个是DataSet Api的交互实现。在Spark 3.0后其核心已经变成了Spark SQL,所以我们重点从ShuffleRowRDD调用getReader开始讲起。

    从ShuffleRowRDD开始

    ShuffleRowRDD主要是被ShuffleExchangeExec调用。这里简单介绍下ShuffleExchangeExec操作算子。它主要负责两件事:首先,准备ShuffleDependency,它根据父节点所需的分区方案对子节点的输出行进行分区。其次,添加一个ShuffleRowRDD并指定准备好的ShuffleDependency作为此RDD的依赖项。


    2927.png
    class ShuffledRowRDD(
        var dependency: ShuffleDependency[Int, InternalRow, InternalRow],
        metrics: Map[String, SQLMetric],
        partitionSpecs: Array[ShufflePartitionSpec])
      extends RDD[InternalRow](dependency.rdd.context,Nil)
    
    

    ShuffleRowRDD继承自RDD[InternalRow], 同时内部维护着三个参数,分别是dependency,metrics和partitionSpecs。dependency封装着shuffleIdshuffleHandlenumPartitions 可以基于其判断出shuffleWriter采用了哪种方式。partitionSpecs定义了分区规范的类型。

    目前在spark 3.2版本中partitionSpecs的实现类主要有以下四个:

    • CoalescedPartitionSpec用于coalesce shuffle partitions 逻辑规则
    • PartialReducerPartitionSpec参与了 skew join 优化
    • PartialMapperPartitionSpec用于本地随机读取器
    • CoalescedMapperPartitionSpec用于优化本地随机读取器

    不同类型的分区规范其实质是代表不同的随机读取的参数。我们都知道在Spark Shuffle中getReader仅有且唯一的一个实现方式, 即BlockStoreShuffleReader 的实现。但是不同的分区规范意味将给共享的reader器传递不同的参数, 下面是ShuffleRowRDD中的简化代码:

    // ShuffleRowRDD
    override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
      val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics()
      // `SQLShuffleReadMetricsReporter` will update its own metrics for SQL exchange operator,
      // as well as the `tempMetrics` for basic shuffle metrics.
      val sqlMetricsReporter = new SQLShuffleReadMetricsReporter(tempMetrics, metrics)
      val reader = split.asInstanceOf[ShuffledRowRDDPartition].spec match {
        // CoalescedPartitionSpec会读取map task为所有reducer所产生的shuffle file
        case CoalescedPartitionSpec(startReducerIndex, endReducerIndex, _) =>
          SparkEnv.get.shuffleManager.getReader(
            dependency.shuffleHandle,
            startReducerIndex,
            endReducerIndex,
            context,
            sqlMetricsReporter)
       // PartialReducerPartitionSpec 读取map task为一个reducer产生的部分数据
        case PartialReducerPartitionSpec(reducerIndex, startMapIndex, endMapIndex, _) =>
          SparkEnv.get.shuffleManager.getReader(
            dependency.shuffleHandle,
            startMapIndex,
            endMapIndex,
            reducerIndex,
            reducerIndex + 1,
            context,
            sqlMetricsReporter)
       // PartialMapperPartitionSpec读取shuffle map文件的部分
       case PartialMapperPartitionSpec(mapIndex, startReducerIndex, endReducerIndex) =>
            SparkEnv.get.shuffleManager.getReader(
              dependency.shuffleHandle,
              mapIndex,
              mapIndex + 1,
              startReducerIndex,
              endReducerIndex,
              context,
              sqlMetricsReporter)
    ...
        reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(_._2)
      }
    
    

    其实从上面传的参数中就可以看出点端倪CoalescedPartitionSpec(startReducerIndex,endReducer-Index) 读取map task为所有reducer所产生的shuffle file;PartialReducerPartitionSpec(startMap-Index, endMapIndex,reducerIndex,reducerIndex + 1) 可以看出每次读取只会为一个reducer读取部分数据。

    从上面代码可以看出ShuffleRowRDD 使用 read() 方法遍历 shuffle 数据并将其返回给客户端,那么接下来我们就详细的看下getReader是如何实现的?

    ShuffleReader调用前的准备

    SortShuffleManager是ShuffleManager的唯一实现,里面也实现getReader方法,那么就让我们从getReader开始。

    override def getReader[K, C](
        handle: ShuffleHandle,
        startMapIndex: Int,
        endMapIndex: Int,
        startPartition: Int,
        endPartition: Int,
        context: TaskContext,
        metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
      val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, _, C]]
      val (blocksByAddress, canEnableBatchFetch) =
        // 是否开启了push-based shuffle, 后续再分享,这里先跳过
        if (baseShuffleHandle.dependency.shuffleMergeEnabled) {
          val res = SparkEnv.get.mapOutputTracker.getPushBasedShuffleMapSizesByExecutorId(
            handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
          (res.iter, res.enableBatchFetch)
        } else {
          // [1] 使用mapOutputTracker获取shuffle块的位置
          val address = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
            handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
          (address, true)
        }
      // [2] 创建一个BlockStoreShuffleReader实例,该实例将负责将shuffle文件从mapper传递到 reducer 任务
      new BlockStoreShuffleReader(
        handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,
        shouldBatchFetch =
          canEnableBatchFetch &&canUseBatchFetch(startPartition, endPartition, context))
    }
    
    

    可以看到getReader主要做了两件事:

    • [1] 使用mapOutputTracker获取shuffle块的位置
    • [2] 创建一个BlockStoreShuffleReader实例,该实例将负责将shuffle文件从mapper传递到reducer 任务

    那么Spark中如何保存和获取shuffle块的位置呢?

    在spark中有两种mapOutputTracker,两种mapOutputTracker 都是在创建SparkEnv时创建。

    其中第一个是MapOutputTrackerMaster,它驻留在驱动程序中并跟踪每个阶段的map output输出, 并与DAGScheduler进行通信。

    另一个是MapOutputTrackerWorker,位于执行器上,它负责从MapOutputTrackerMaster获取shuffle 元数据信息。

    MapOutputTrackerMaster:

    1. DAGScheduler在创建 shuffle map 阶段时会调用registerShuffle方法,从下面的代码可以看出在创建ShuffleMapStage会调用registerShuffle,其实质是在向 shuffleStatuses 映射器中放入shuffleid, 并为其值创建一个新的new ShuffleStatus(numMaps)。
    def createShuffleMapStage[K, V, C](
        shuffleDep: ShuffleDependency[K, V, C], jobId: Int): ShuffleMapStage = {
      val rdd = shuffleDep.rdd
      ...
      stageIdToStage(id) = stage
      shuffleIdToMapStage(shuffleDep.shuffleId) = stage
      updateJobIdStageIdMaps(jobId, stage)
    
      if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
        // 在创建ShuffleMapStage会调用registerShuffle
        mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length,
          shuffleDep.partitioner.numPartitions)
      }
      stage
    }
    
    def registerShuffle(shuffleId: Int, numMaps: Int, numReduces: Int): Unit = {
        if (pushBasedShuffleEnabled) {
          if (shuffleStatuses.put(shuffleId, new ShuffleStatus(numMaps, numReduces)).isDefined) {
            throw new IllegalArgumentException("Shuffle ID " + shuffleId + " registered twice")
          }
        } else {
          // 可以看到其实质是在向 shuffleStatuses 放入shuffleid, 创建ShuffleStatus
          if (shuffleStatuses.put(shuffleId, new ShuffleStatus(numMaps)).isDefined) {
            throw new IllegalArgumentException("Shuffle ID " + shuffleId + " registered twice")
          }
        }
      }
    
    
    1. 到目前位置master tracker存放了一个shuffleid, 表明DAG中存在一个shuffle, 但还是不知道map output file的具体位置。
    // DAGScheduler中
    private[scheduler] def handleTaskCompletion(event: CompletionEvent): Unit = {
    
      case smt: ShuffleMapTask =>
         val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
         ...
         mapOutputTracker.registerMapOutput(
            shuffleStage.shuffleDep.shuffleId, smt.partitionId, status)
      }
    
    def registerMapOutput(shuffleId: Int, mapIndex: Int, status: MapStatus): Unit = {
        shuffleStatuses(shuffleId).addMapOutput(mapIndex, status)
    }
    
    

    从上面代码可以看出,在每次 shuffle map 阶段的任务终止时,DAGScheduler都会向MapOutputTrackerMaster发送状态更新。跟踪器将有关特定 shuffle 文件的位置和大小的信息添加到在注册步骤中初始化 的shuffleStatuses map中。


    3tled.png

    MapOutputTrackerWorker:

    当worker tracker 没有缓存shuffle信息, 这时就必须发送GetMapOutputStatuses消息来从master tracker获取它。

    再回过头来看看,在getReader中通过mapOutputTracker获取shuffle块的位置的方法。

    // mapOutTracker
    private def getMapSizesByExecutorIdImpl(
        shuffleId: Int,
        startMapIndex: Int,
        endMapIndex: Int,
        startPartition: Int,
        endPartition: Int,
        useMergeResult: Boolean): MapSizesByExecutorId = {
      logDebug(s"Fetching outputs for shuffle$shuffleId")
      // [1] 获取mapOutputStatuses
      val (mapOutputStatuses, mergedOutputStatuses) = getStatuses(shuffleId, conf,
        // EnableBatchFetch can be set to false during stage retry when the
        // shuffleDependency.shuffleMergeEnabled is set to false, and Driver
        // has already collected the mergedStatus for its shuffle dependency.
        // In this case, boolean check helps to insure that the unnecessary
        // mergeStatus won't be fetched, thus mergedOutputStatuses won't be
        // passed to convertMapStatuses. See details in [SPARK-37023].
        if (useMergeResult)fetchMergeResultelse false)
      ...
    }
    
    

    从上面可以看出获取具体的map output 位置的实现在getStatuses方法中。下面我们来具体分析下:

    private def getStatuses(
        shuffleId: Int,
        conf: SparkConf,
        canFetchMergeResult: Boolean): (Array[MapStatus], Array[MergeStatus]) = {
      // push-based shuffle 开启,获取MergeStatus, 现暂不考虑
      if (canFetchMergeResult) {
        ...
      } else {
        val statuses = mapStatuses.get(shuffleId).orNull
        // [1] 如果mapStatuses不包含statuses, 就向master tracker发送GetMapOutputStatuses消息
        if (statuses == null) {
          logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
          val startTimeNs = System.nanoTime()
    fetchingLock.withLock(shuffleId) {
            var fetchedStatuses =mapStatuses.get(shuffleId).orNull
            if (fetchedStatuses == null) {
              logInfo("Doing the fetch; tracker endpoint = " +trackerEndpoint)
              val fetchedBytes = askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId))
              try {
                fetchedStatuses =
                  MapOutputTracker.deserializeOutputStatuses[MapStatus](fetchedBytes, conf)
              } catch {
                ...
              }
              logInfo("Got the map output locations")
              mapStatuses.put(shuffleId, fetchedStatuses)
            }
            (fetchedStatuses, null)
          }
        // [2] 如果mapStatuses包含statuses, 直接返回
        } else {
          (statuses, null)
        }
      }
    }
    
    

    从getStatuses可以看出:

    • [1] 如果mapStatuses不包含statuses, 就向master tracker发送GetMapOutputStatuses消息
    • [2] 如果mapStatuses包含statuses, 直接返回
    private[spark] sealed trait MapStatus extends ShuffleOutputStatus {
      def location: BlockManagerId
    
      def updateLocation(newLoc: BlockManagerId): Unit
    
      def getSizeForBlock(reduceId: Int): Long
    
      def mapId: Long
    }
    
    

    可见MapStatus中包含了location, mapId等信息。

    最后,回到getReader方法中,通过SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId获取shuffle块信息后,再将其作为 shuffle 块的及其物理位置传递给BlockStoreShuffleReader。

    那么接下来就我们再来分析下BlockStoreShuffleReader的实现

    为避免冗长将BlockStoreShuffleReader放到下一讲进行分析。
    欢迎关注微信公众号“Tim在路上”

    相关文章

      网友评论

        本文标题:[SPARK][CORE] 面试问题之 Shuffle read

        本文链接:https://www.haomeiwen.com/subject/ntvkmrtx.html