美文网首页
Spark Shufflewrite分析

Spark Shufflewrite分析

作者: WestC | 来源:发表于2020-09-16 10:37 被阅读0次

    分布式计算的shuffle操作通常是分布式应用计算性能的瓶颈点,因此一个好的shuffle实现(shuffle write和shuffle read)对于分布式计算引擎的性能起着至关重要的作用。

    最新的Spark的shuffleWriter一共有三种(原有的Hash-Based Shuffle已经被删除),分别对应不同的场景。这三种write分别是:

    UnsafeShuffleWriter
    BypassMergeSortShuffleWriter
    SortShuffleWriter

    然后这三种write分别使用什么场景,spark又是如何实现shufflewrite的设定

    1. 在driver端注册一个shuffle时,根据不同的场景,得到不同的shuffleHandler
    image
    override def registerShuffle[K, V, C](
          shuffleId: Int,
          numMaps: Int,
          dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
        if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
          // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
          // need map-side aggregation, then write numPartitions files directly and just concatenate
          // them at the end. This avoids doing serialization and deserialization twice to merge
          // together the spilled files, which would happen with the normal code path. The downside is
          // having multiple files open at a time and thus more memory allocated to buffers.
          new BypassMergeSortShuffleHandle[K, V](
            shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
        } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
          // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
          new SerializedShuffleHandle[K, V](
            shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
        } else {
          // Otherwise, buffer map outputs in a deserialized form:
          new BaseShuffleHandle(shuffleId, numMaps, dependency)
        }
      }
    
    
    1. 在executor端,进行数据shuffle写的时候,获取对应的writer,完成数据的写操作

    handler和writer的对应关系如下

    Handler Writer
    SerializedShuffleHandle UnsafeShuffleWriter
    BypassMergeSortShuffleHandle BypassMergeSortShuffleWriter
    BaseShuffleHandle SortShuffleWriter

    三种handler写数据的异同点:

    相同点:

    1. 在写数据时, 都有先将部分数据先落盘的流程
    2. 在数据处理完毕后,会将之前落盘的数据(可能也包括当前内存中未落盘的数据)进行读取,merge后落盘
    3 最终每个shuffleMapTask写出的文件都包含一个data文件和一个index文件
    4. 每个data文件中,都会根据partitionId进行排序
    5. 最终落盘数据(非index文件)只包含记录的key,value,不会包含每条记录的partitionId
    

    差异:

    * BypassMergeSortShuffleWriter:写盘首先是针对每个分区写一个文件,不涉及内存空间申请及spill
    * SortShuffleWriter: spill出的文件,分区有序, 如果有ordering或者aggregator时,也会在分区内对key进行排序
    * UnsafeShuffleWriter: 在申请内存时,可能申请堆外内存;分区有序,但分区内无序
    

    相关文章

      网友评论

          本文标题:Spark Shufflewrite分析

          本文链接:https://www.haomeiwen.com/subject/vawaektx.html