[SPARK][CORE] 面试问题之 BypassMergeS

作者: Tim在路上 | 来源:发表于2022-05-20 23:12 被阅读0次

    欢迎关注公众号 “Tim在路上”
    BypassMergeSortShuffleWriter 就如其名,旁支的sort-baesd Shuffle, 他是采用Hash-style实现的Sort based Shuffle。在map阶段records会按分区写入不同的文件, 一个分区一个文件。然后链接这些分区文件形成一个output文件,并生成其index。reducer通过IndexShuffleBlockResolver 查找消费输出文件的不同分区。

    BypassMergeSortShuffleWriter 中records是不会缓存在内存中,所有的records最终都会被flush到磁盘。

    在写入时,BypassMergeSortShuffleWriter 会同时为所有的分区打开单独的序列化器和文件流,所以当reduce分区数量特别大的时候性能会非常低下。

    ShuffleWriter 的调用是在ShuffleMapTask的runTask中进行调用,每个mapTask 都会调用一次runTask。

    BypassMergeSortShuffleWriter 源码解析

    首先,我们来回顾下ShuffleWriter的过程。Shuffle发生与宽依赖的stage间,由于stage内的计算采用pipeline的方式。shuffle发生的上一个stage为map节点,下游的stage为reduce阶段。而shuffle写的过程就发生在map阶段,shuffleWriter的调用主要是在ShuffleMapStage中,每个ShuffleMapStage包含多个ShuffleMapTask, mapTask个数和分区数相关。

    这样每个ShuffleMapTask都会在其runTask调用下Writer接口,其并非直接调用到具体的执行类。而是在划分宽依赖时想ShuffleManage注册shuffle时,返回的ShuffleHandler决定的。

    在ShuffleMapTask调用Writer时,是先调用了ShuffleWriteProcessor ,主要控制了ShuffleWriter的生命周期。下面我们看下ShuffleWriteProcessor 中的Write的实现:

    // ShuffleWriteProcessor
    def write(
        rdd: RDD[_],
        dep: ShuffleDependency[_, _, _],
        mapId: Long,
        context: TaskContext,
        partition: Partition): MapStatus = {
      var writer: ShuffleWriter[Any, Any] = null
      try {
        // [1] 通过SparkEnv获取ShuffleManager, 并通过dep的shuffleHandle, 获取对应的shuffleWriter的具体实现。
        val manager = SparkEnv.get.shuffleManager
        writer = manager.getWriter[Any, Any](
          dep.shuffleHandle,
          mapId,
          context,
          createMetricsReporter(context))
        // [2] 调用shuffleWriter的write方法, 并将当前rdd的迭代器传入
        writer.write(
          rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
        // [3] shuffleWriter结束后,返回mapStatus,或清空数据
        val mapStatus = writer.stop(success = true)
        // [4] 如果shuffleWriter执行成功,初始化push-based shuffle, 后面再细讲
        if (mapStatus.isDefined) {
          // Initiate shuffle push process if push based shuffle is enabled
          // The map task only takes care of converting the shuffle data file into multiple
          // block push requests. It delegates pushing the blocks to a different thread-pool -
          // ShuffleBlockPusher.BLOCK_PUSHER_POOL.
          if (dep.shuffleMergeEnabled && dep.getMergerLocs.nonEmpty && !dep.shuffleMergeFinalized) {
            manager.shuffleBlockResolver match {
              case resolver: IndexShuffleBlockResolver =>
                val dataFile = resolver.getDataFile(dep.shuffleId, mapId)
                new ShuffleBlockPusher(SparkEnv.get.conf)
                  .initiateBlockPush(dataFile, writer.getPartitionLengths(), dep, partition.index)
              case _ =>
            }
          }
        }
        mapStatus.get
      }
    ...
    }
    
    

    ShuffleWriteProcessor 中主要做了三件事:

    • [1] 通过SparkEnv获取ShuffleManager, 并通过dep的shuffleHandle, 获取对应的shuffleWriter的具体实现。
    • [2] 调用shuffleWriter的write方法, 并将当前rdd的迭代器传入
    • [3] shuffleWriter结束后,返回mapStatus,或清空数据

    可见每一个ShuffleMapTask执行结束后,就会返回一个mapStatus。Task 结果被封装成 CompletionEvent发送到Driver DAG Scheduler 。判断Task的类型是ShuffleMapTask会DagScheduler 会向 MapOutputTracker 注册 MapOutput status 信息。

    那么map中的数据是如何通过BypassMergeSortShuffleWriter写入的?

    // BypassMergeSortShuffleWriter
    @Override
    public void write(Iterator<Product2<K, V>> records) throws IOException {
      assert (partitionWriters == null);
      // [1] 创建处理mapTask所有分区数据commit提交writer
      ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents
          .createMapOutputWriter(shuffleId, mapId, numPartitions);
      try {
        // 如果没有数据,直接提交所有分区的commit, 并返回分区长度,获取mapStatus
        if (!records.hasNext()) {
          partitionLengths = mapOutputWriter.commitAllPartitions(
            ShuffleChecksumHelper.EMPTY_CHECKSUM_VALUE).getPartitionLengths();
          mapStatus = MapStatus$.MODULE$.apply(
            blockManager.shuffleServerId(), partitionLengths, mapId);
          return;
        }
        final SerializerInstance serInstance = serializer.newInstance();
        final long openStartTime = System.nanoTime();
        // [2] 为每个分区创建一个DiskBlockObjectWriter写入流和FileSegment文件段
        partitionWriters = new DiskBlockObjectWriter[numPartitions];
        partitionWriterSegments = new FileSegment[numPartitions];
        for (int i = 0; i < numPartitions; i++) {
          // [2.1] 每个分区创建个临时file和blockid, 并生成维护一个写入流
          final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
              blockManager.diskBlockManager().createTempShuffleBlock();
          final File file = tempShuffleBlockIdPlusFile._2();
          final BlockId blockId = tempShuffleBlockIdPlusFile._1();
          DiskBlockObjectWriter writer =
            blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
          if (partitionChecksums.length > 0) {
            writer.setChecksum(partitionChecksums[i]);
          }
          partitionWriters[i] = writer;
        } 
        // Creating the file to write to and creating a disk writer both involve interacting with
        // the disk, and can take a long time in aggregate when we open many files, so should be
        // included in the shuffle write time.
        writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
        // [3] 依次将records写入到对应分区的写入流中, 并提交
        while (records.hasNext()) {
          final Product2<K, V> record = records.next();
          final K key = record._1();
          partitionWriters[partitioner.getPartition(key)].write(key, record._2());
        }
    
        // [3.1]依次对每个分区提交和flush写入流
        for (int i = 0; i < numPartitions; i++) {
          try (DiskBlockObjectWriter writer = partitionWriters[i]) {
            partitionWriterSegments[i] = writer.commitAndGet();
          }
        }
        // [4] 遍历所有分区的FileSegement, 并将其链接为一个文件,同时会调用writeMetadataFileAndCommit,为其生成索引文件
        partitionLengths = writePartitionedData(mapOutputWriter);
        mapStatus = MapStatus$.MODULE$.apply(
          blockManager.shuffleServerId(), partitionLengths, mapId);
      } catch (Exception e) {
        try {
          mapOutputWriter.abort(e);
        } catch (Exception e2) {
    logger.error("Failed to abort the writer after failing to write map output.", e2);
          e.addSuppressed(e2);
        }
        throw e;
      }
    }
    
    

    综上,Bypass的writer步骤有四步:

    • [1] 创建处理mapTask所有分区数据commit提交writer

    • [2] 为每个分区创建一个DiskBlockObjectWriter写入流和FileSegment文件段

      • [2.1] 每个分区创建个临时file和blockid, 并生成维护一个DiskBlockObjectWriter写入流
    • [3] 依次将records写入到对应分区的写入流中, 并提交

      • [3.1]依次对每个分区提交和flush写入流
    • [4] 遍历所有分区的FileSegement, 并将其链接为一个文件,同时会调用writeMetadataFileAndCommit,为其生成索引文件

    所以说, Bypass在进行写入时会为每个MapTask都会生成reduce分区个FileSegement, 写入时会并发的为所有的分区都创建临时文件和维护一个io的写入流, 最终在链接为一个文件。所以如果分区数特别多的情况下,是会维护很多io流,所以Bypass限制了分区的阈值。另外通过源码发现Bypass在实现过程中并没有使用buffer, 而是直接将数据写入到流中,这也就是为什么Bypass不能处理mapSide的预聚合的算子。

    那么BypassMergeSortShuffleWriter 属于sort-based Shuffle 到底有没有排序呢?

    接下来,我们再看下Bypass是如何将分区的FileSegement, 并将其链接为一个文件, 我们就需要详细看下writePartitionedData是如何实现的。

    private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) throws IOException {
      // Track location of the partition starts in the output file
      if (partitionWriters != null) {
        final long writeStartTime = System.nanoTime();
        try {
          for (int i = 0; i < numPartitions; i++) {
            // [1] 获取每个分区的 fileSegement 临时文件,和writer写出流
            final File file = partitionWriterSegments[i].file();
            ShufflePartitionWriter writer = mapOutputWriter.getPartitionWriter(i);
            if (file.exists()) {
              if (transferToEnabled) {
                // Using WritableByteChannelWrapper to make resource closing consistent between
                // this implementation and UnsafeShuffleWriter.
                Optional<WritableByteChannelWrapper> maybeOutputChannel = writer.openChannelWrapper();
                if (maybeOutputChannel.isPresent()) {
                  writePartitionedDataWithChannel(file, maybeOutputChannel.get());
                } else {
                  writePartitionedDataWithStream(file, writer);
                }
              } else {
                // [2] 将fileSegement合并为一个文件
                writePartitionedDataWithStream(file, writer);
              }
              if (!file.delete()) {
    logger.error("Unable to delete file for partition {}", i);
              }
            }
          }
        } finally {
          writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
        }
        partitionWriters = null;
      }
      // [3] 提交所有的分区,传入每个分区数据的长度, 调用 writeMetadataFileAndCommit生成索引文件,记录每个分区的偏移量
      return mapOutputWriter.commitAllPartitions(getChecksumValues(partitionChecksums))
        .getPartitionLengths();
    }
    
    

    writePartitionedData是如何实现,有三个步骤:

    • [1] 获取每个分区的 fileSegement 临时文件,和writer写出流
    • [2] 将fileSegement合并为一个文件
    • [3] 提交所有的分区,传入每个分区数据的长度, 调用 writeMetadataFileAndCommit生成索引文件,记录每个分区的偏移量
    bypass.png

    总结, BypassMergeSortShuffleWriter 的实现是hash-style的方式,其中没有sort, 没有buffer,每一个mapTask都会生成分区数量个FileSegment, 最后再合并为一个File, 最终根据分区的长度为其生成索引文件。所以BypassMergeSortShuffleWriter在分区数量比较小的情况下,性能是比较佳的。其最终每个task会生成2个文件, 所以最终的生成文件数也是2 * M个文件。

    今天就先到这里,通过上面的介绍,我们也留下些面试题:

    1. BypassMergeSortShuffleWriter和HashShuffle有什么区别?
    2. 为什么不保留HashShuffleManage, 而是将其作为SortShuffleManager中的一个Writer实现?

    欢迎关注公众号 “Tim在路上”

    相关文章

      网友评论

        本文标题:[SPARK][CORE] 面试问题之 BypassMergeS

        本文链接:https://www.haomeiwen.com/subject/jjpqprtx.html