美文网首页Spark源码精读分析计划Spark深入学习
Spark基本sort shuffle write流程解析

Spark基本sort shuffle write流程解析

作者: LittleMagic | 来源:发表于2019-03-12 22:26 被阅读41次

    shuffle write入口

    先回忆一下基础知识:

    • Spark作业执行的单元从高到低为job→stage→task
    • stage分为ShuffleMapStage与ResultStage,task也分为ShuffleMapTask与ResultTask
    • 调用shuffle类算子会导致stage的划分

    上一篇shuffle机制概述文章已经提到,ShuffleWriter都实现了write()方法。它由o.a.s.scheduler.ShuffleMapTask.runTask()方法来调用:

          val manager = SparkEnv.get.shuffleManager
          writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
          writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
          writer.stop(success = true).get
    

    write()方法就是我们的入手点。

    #1 - o.a.s.shuffle.sort.SortShuffleWriter.write()方法

      /** Write a bunch of records to this task's output */
      override def write(records: Iterator[Product2[K, V]]): Unit = {
        //【创建外部排序器ExternalSorter】
        sorter = if (dep.mapSideCombine) {
          //【如果shuffle依赖中有map端预聚合,如reduceByKey()算子,就传入aggregator和keyOrdering】
          //【aggregator表示预聚合规则,keyOrdering表示key的排序】
          require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
          new ExternalSorter[K, V, C](
            context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
        } else {
          // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
          // care whether the keys get sorted in each partition; that will be done on the reduce side
          // if the operation being run is sortByKey.
          //【如果没有map端预聚合,也就不需要传aggregator和keyOrdering,如sortByKey()算子这样的排序就交给reduce做】
          new ExternalSorter[K, V, V](
            context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
        }
    
        //【#2 - 将shuffle数据放入ExternalSorter进行处理】
        sorter.insertAll(records)
    
        // Don't bother including the time to open the merged output file in the shuffle write time,
        // because it just opens a single file, so is typically too fast to measure accurately
        // (see SPARK-3570).
        //【创建输出数据文件与临时数据文件】
        val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
        val tmp = Utils.tempFileWith(output)
        try {
          //【根据shuffle ID和map ID确定shuffle块ID(reduce ID是0)】
          val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
          //【#6 - 将ExternalSorter中的shuffle数据按分区写入临时文件中,返回各个分区的大小】
          val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
          //【#9 - 创建输出索引文件和临时索引文件,写入索引信息,然后将临时文件改成输出文件】
          shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
          //【MapStatus是ShuffleMapTask返回给TaskScheduler的数据结构】
          mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
        } finally {
          if (tmp.exists() && !tmp.delete()) {
            logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
          }
        }
      }
    

    从上面的代码可以有个整体印象:SortShuffleWriter不仅会输出中间数据,还会输出索引,并且它主要借助一个叫ExternalSorter的类来处理数据。下面来看一下ExternalSorter的细节。

    内存缓存

    #2 - o.a.s.util.collection.ExternalSorter.insertAll()方法

      // Data structures to store in-memory objects before we spill. Depending on whether we have an
      // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we
      // store them in an array buffer.
      //【两个内存数据结构,注意volatile】
      @volatile private var map = new PartitionedAppendOnlyMap[K, C]
      @volatile private var buffer = new PartitionedPairBuffer[K, C]
    
      def insertAll(records: Iterator[Product2[K, V]]): Unit = {
        // TODO: stop combining if we find that the reduction factor isn't high
        val shouldCombine = aggregator.isDefined
    
        //【如果需要做map端聚合的话】
        if (shouldCombine) {
          // Combine values in-memory first using our AppendOnlyMap
          //【获取aggregator的mergeValue()函数,它将一个新值合并到当前聚合结果中】
          val mergeValue = aggregator.get.mergeValue
          //【获取aggregator的createCombiner()函数,它负责创建聚合过程的初始值】
          val createCombiner = aggregator.get.createCombiner
          var kv: Product2[K, V] = null
    
          //【如果一个key当前已有聚合值,执行mergeValue()合并value;如果无,执行createCombiner()创建初始值】
          val update = (hadValue: Boolean, oldValue: C) => {
            if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
          }
          while (records.hasNext) {
            //【Spillable类中的操作,将已读记录的计数+1】
            addElementsRead()
            kv = records.next()
            //【map是一个PartitionedAppendOnlyMap结构。将分区和key作为键,然后回调上面的update来聚合值】
            map.changeValue((getPartition(kv._1), kv._1), update)
            //【#3 - 检查并执行溢写磁盘】
            maybeSpillCollection(usingMap = true)
          }
        } else {
          // Stick values into our buffer
          //【如果不用做map端聚合】
          while (records.hasNext) {
            addElementsRead()
            val kv = records.next()
            //【buffer是一个PartitionedPairBuffer结构。直接将分区和键值数据加进去】
            buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
            //【#3 - 检查并执行溢写磁盘】
            maybeSpillCollection(usingMap = false)
          }
        }
      }
    

    ExternalSorter在插入数据时,会分需要与不需要map端预聚合两种情况来处理。对需要map端预聚合的情况,采用PartitionedAppendOnlyMap来保存聚合数据。它是一个能够保留分区信息的,且没有remove()方法的映射型结构。之所以能够保留分区,是因为它的键类型是(partition_id, key)二元组。对于不需要预聚合的情况,数据会放入PartitionedPairBuffer中。它是分区的键值对缓存,作用与PartitionedAppendOnlyMap大同小异,不过内部实现由映射换成了变长数组。

    前面的处理都是在内存中进行,当内存不够用时会向磁盘溢写。下面来看看判断及实行溢写的逻辑。

    磁盘溢写

    #3 - o.a.s.util.collection.Spillable.maybeSpill()方法

    Spillable抽象类是ExternalSorter的父类,上面代码#2中调用的maybeSpillCollection()方法就是maybeSpill()方法的简单封装。它会调用maybeSpill()方法检查是否需要溢写,一旦发生了溢写,就重新new出#2中的map或buffer结构,从零开始再缓存。

      /**
       * Spills the current in-memory collection to disk if needed. Attempts to acquire more
       * memory before spilling.
       *
       * @param collection collection to spill to disk
       * @param currentMemory estimated size of the collection in bytes
       * @return true if `collection` was spilled to disk; false otherwise
       */
      protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
        var shouldSpill = false
        //【溢写初始判断条件:读取的记录总数是32的倍数,并且map或者buffer的预估内存占用量大于当前阈值】
        //【溢写阈值是可变的,初始值由spark.shuffle.spill.initialMemoryThreshold参数指定,默认5MB】
        if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
          // Claim up to double our current memory from the shuffle memory pool
          //【先试图向ShuffleMemoryManager申请(2 * 预估占用内存 - 当前阈值)这么多的执行内存】
          //【详情可以深挖acquireMemory()方法,这里暂时先不展开】
          val amountToRequest = 2 * currentMemory - myMemoryThreshold
          val granted = acquireMemory(amountToRequest)
          //【用申请到的量更新阈值】
          myMemoryThreshold += granted
          // If we were granted too little memory to grow further (either tryToAcquire returned 0,
          // or we already had more memory than myMemoryThreshold), spill the current collection
          //【如果申请到的不够,map或buffer预估占用内存量还是大于阈值,确定溢写】
          shouldSpill = currentMemory >= myMemoryThreshold
        }
        //【如果上面判定不需要溢写,但读取的记录总数比spark.shuffle.spill.numElementsForceSpillThreshold大,也还是得溢写】
        //【这个参数默认值是Long.MAX_VALUE】
        shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
        // Actually spill
        if (shouldSpill) {
          _spillCount += 1
          logSpillage(currentMemory)
          //【#4 - 将缓存的集合溢写】
          spill(collection)
          _elementsRead = 0
          _memoryBytesSpilled += currentMemory
          //【释放内存】
          releaseMemory()
        }
        shouldSpill
      }
    

    在真正溢写数据之前,writer会先申请内存扩容,如果申请不到或者申请的过少,才会开始溢写。这符合Spark尽量充分地利用内存的中心思想。

    另外需要注意的是,传入的currentMemory参数含义为“缓存的预估内存占用量”,而不是“缓存的当前占用量”。这是因为PartitionedAppendOnlyMap与PartitionedPairBuffer都能动态扩容,并且具有SizeTracker特征,能够通过采样估计其大小。这是个很有点意思的实现,之后也会专门写文章来分析一下这些数据结构。

    负责溢写数据的spill()方法是抽象方法,其实现仍然在ExternalSorter中。

    #4 - o.a.s.util.collection.ExternalSorter.spill()方法

      /**
       * Spill our in-memory collection to a sorted file that we can merge later.
       * We add this file into `spilledFiles` to find it later.
       *
       * @param collection whichever collection we're using (map or buffer)
       */
      override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
        //【根据指定的比较器comparator进行排序,返回排序结果的迭代器】
        //【如果细看的话,destructiveSortedWritablePartitionedIterator()方法最终采用TimSort算法来排序】
        val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
        //【#5 - 将内存数据溢写到磁盘文件】
        val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
        //【用ArrayBuffer记录所有溢写的磁盘文件】
        spills += spillFile
      }
    

    #5 - o.a.s.util.collection.ExternalSorter.spillMemoryIteratorToDisk()方法

      /**
       * Spill contents of in-memory iterator to a temporary file on disk.
       */
      private[this] def spillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator)
          : SpilledFile = {
        // Because these files may be read during shuffle, their compression must be controlled by
        // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
        // createTempShuffleBlock here; see SPARK-3426 for more context.
        //【上面英文注释的大意:因为溢写文件在shuffle过程中会被读取,因此它们的压缩不由spill相关参数控制】
        //【所以要创建一个临时块】
        val (blockId, file) = diskBlockManager.createTempShuffleBlock()
    
        // These variables are reset after each flush
        var objectsWritten: Long = 0
        val spillMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics
        //【创建溢写文件的DiskBlockObjectWriter】
        //【fileBufferSize对应参数为spark.shuffle.file.buffer(默认值32K)。如果资源充足,可以适当增大,从而减少flush次数】
        val writer: DiskBlockObjectWriter =
          blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics)
    
        // List of batch sizes (bytes) in the order they are written to disk
        //【记录写入批次的大小】
        val batchSizes = new ArrayBuffer[Long]
    
        // How many elements we have in each partition
        //【记录每个分区的条数】
        val elementsPerPartition = new Array[Long](numPartitions)
    
        // Flush the disk writer's contents to disk, and update relevant variables.
        // The writer is committed at the end of this process.
        //【将内存数据按批次刷到磁盘的方法】
        def flush(): Unit = {
          //【提交写操作后,会产生一个FileSegment对象,表示一个java.io.File的一部分。其中包含offset和length】
          val segment = writer.commitAndGet()
          batchSizes += segment.length
          _diskBytesSpilled += segment.length
          objectsWritten = 0
        }
    
        var success = false
        try {
          //【遍历map或buffer缓存中的记录】
          while (inMemoryIterator.hasNext) {
            val partitionId = inMemoryIterator.nextPartition()
            require(partitionId >= 0 && partitionId < numPartitions,
              s"partition Id: ${partitionId} should be in the range [0, ${numPartitions})")
            //【逐条写入,更新计数值】
            inMemoryIterator.writeNext(writer)
            elementsPerPartition(partitionId) += 1
            objectsWritten += 1
            //【当写入的记录条数达到批次阈值spark.shuffle.spill.batchSize(默认值10000),将这批刷到磁盘】
            if (objectsWritten == serializerBatchSize) {
              flush()
            }
          }
          //【遍历完毕,将剩余的刷到磁盘】
          if (objectsWritten > 0) {
            flush()
          } else {
            writer.revertPartialWritesAndClose()
          }
          success = true
        } finally {
          if (success) {
            writer.close()
          } else {
            // This code path only happens if an exception was thrown above before we set success;
            // close our stuff and let the exception be thrown further
            writer.revertPartialWritesAndClose()
            if (file.exists()) {
              if (!file.delete()) {
                logWarning(s"Error deleting ${file}")
              }
            }
          }
        }
        //【返回溢写文件对象】
        SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)
      }
    

    至此,shuffle write缓存和溢写就完成了。缓存采用了高效的数据结构,并且溢写时会保证顺序。

    与溢写相关的四个配置参数是:

    • spark.shuffle.file.buffer
    • spark.shuffle.spill.initialMemoryThreshold
    • spark.shuffle.spill.numElementsForceSpillThreshold
    • spark.shuffle.spill.batchSize

    既然这个机制叫做sort shuffle,那么输出时也自然少不了排序。下面来看排序逻辑。

    排序与合并

    #6 - o.a.s.util.collection.ExternalSorter.writePartitionedFile()方法

      /**
       * Write all the data added into this ExternalSorter into a file in the disk store. This is
       * called by the SortShuffleWriter.
       *
       * @param blockId block ID to write to. The index file will be blockId.name + ".index".
       * @return array of lengths, in bytes, of each partition of the file (used by map output tracker)
       */
      def writePartitionedFile(
          blockId: BlockId,
          outputFile: File): Array[Long] = {
    
        // Track location of each range in the output file
        val lengths = new Array[Long](numPartitions)
        //【创建输出文件的DiskBlockObjectWriter】
        val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
          context.taskMetrics().shuffleWriteMetrics)
    
        if (spills.isEmpty) {
          // Case where we only have in-memory data
          //【如果不存在溢写文件,那么表示内存够用,根据有无聚合,只取map或buffer中的数据就行了】
          val collection = if (aggregator.isDefined) map else buffer
          //【根据指定的比较器comparator进行排序,返回排序结果的迭代器,与代码#4中逻辑相同】
          val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
          while (it.hasNext) {
            //【取分区ID,依次将分区内的数据按序输出,并记录分区大小】
            val partitionId = it.nextPartition()
            while (it.hasNext && it.nextPartition() == partitionId) {
              it.writeNext(writer)
            }
            //【提交写操作,获得FileSegment】
            val segment = writer.commitAndGet()
            lengths(partitionId) = segment.length
          }
        } else {
          // We must perform merge-sort; get an iterator by partition and write everything directly.
          //【如果存在溢写文件,那么要把溢写文件和缓存数据归并排序。排序完后再根据分区依次写入输出文件中】
          //【#7 - 归并和排序的逻辑在partitionedIterator中调用的merge()方法里】
          for ((id, elements) <- this.partitionedIterator) {
            if (elements.hasNext) {
              for (elem <- elements) {
                writer.write(elem._1, elem._2)
              }
              val segment = writer.commitAndGet()
              lengths(id) = segment.length
            }
          }
        }
    
        writer.close()
        //【记录内存和磁盘的指标,最终返回各个分区的大小,后面有用】
        context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
        context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
        context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)
    
        lengths
      }
    

    可见,在输出之前排序时,需要区分有无溢写文件。如果没有就比较简单,直接对缓存数据排序即可。如果有的话,还需要将溢写文件与缓存数据归并排序。但是,不管有无溢写文件以及有多少个溢写文件,最终所有中间数据都会被合并到一个文件中,而不会分散在多个文件。

    #7 - o.a.s.util.collection.ExternalSorter.partitionedIterator成员

      /**
       * Return an iterator over all the data written to this object, grouped by partition and
       * aggregated by the requested aggregator. For each partition we then have an iterator over its
       * contents, and these are expected to be accessed in order (you can't "skip ahead" to one
       * partition without reading the previous one). Guaranteed to return a key-value pair for each
       * partition, in order of partition ID.
       *
       * For now, we just merge all the spilled files in once pass, but this can be modified to
       * support hierarchical merging.
       * Exposed for testing.
       */
      def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
        val usingMap = aggregator.isDefined
        val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer
        if (spills.isEmpty) {
          // Special case: if we have only in-memory data, we don't need to merge streams, and perhaps
          // we don't even need to sort by anything other than partition ID
          if (!ordering.isDefined) {
            // The user hasn't requested sorted keys, so only sort by partition ID, not key
            //【如果没有溢写,并且没有排序规则,那么就只按照分区ID排序】
            groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
          } else {
            // We do need to sort by both partition ID and key
            //【如果没有溢写,但有排序规则,那么需要先按分区ID排序,再按key排序】
            groupByPartition(destructiveIterator(
              collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
          }
        } else {
          // Merge spilled and in-memory data
          //【#8 - 如果有溢写,就将溢写文件和缓存数据归并排序】
          merge(spills, destructiveIterator(
            collection.partitionedDestructiveSortedIterator(comparator)))
        }
      }
    

    从上面可以看出,排序时一定会保证按分区ID有序。至于是否按key值有序,需要看有无排序规则。所以从严格意义上讲,这个“排序”并不能完全保证数据有序。

    #8 - o.a.s.util.collection.ExternalSorter.merge()方法

      /**
       * Merge a sequence of sorted files, giving an iterator over partitions and then over elements
       * inside each partition. This can be used to either write out a new file or return data to
       * the user.
       *
       * Returns an iterator over all the data written to this object, grouped by partition. For each
       * partition we then have an iterator over its contents, and these are expected to be accessed
       * in order (you can't "skip ahead" to one partition without reading the previous one).
       * Guaranteed to return a key-value pair for each partition, in order of partition ID.
       */
      private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)])
          : Iterator[(Int, Iterator[Product2[K, C]])] = {
        //【创建多个SpillReader来读取溢写文件】
        val readers = spills.map(new SpillReader(_))
        val inMemBuffered = inMemory.buffered
        //【按序遍历分区】
        (0 until numPartitions).iterator.map { p =>
          //【将溢写文件与缓存对应分区的数据拼合起来】
          val inMemIterator = new IteratorForPartition(p, inMemBuffered)
          val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator)
          if (aggregator.isDefined) {
            // Perform partial aggregation across partitions
            //【如果有聚合逻辑,归并时做分区级别的聚合,按keyComparator做key排序】
            (p, mergeWithAggregation(
              iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))
          } else if (ordering.isDefined) {
            // No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey);
            // sort the elements without trying to merge them
            //【如果没有聚合逻辑但有排序逻辑,按照ordering做归并排序】
            (p, mergeSort(iterators, ordering.get))
          } else {
            //【什么都没有的话,直接返回拼合后的结果就是了】
            (p, iterators.iterator.flatten)
          }
        }
      }
    

    这里会将所有溢写文件与缓存数据合并起来,并且排序规则与代码#7中仍然一致,这样整个排序与合并过程就完成了。最后,来看数据和索引文件是如何输出的。

    输出数据和索引文件

    #9 - o.a.s.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit()方法

      /**
       * Write an index file with the offsets of each block, plus a final offset at the end for the
       * end of the output file. This will be used by getBlockData to figure out where each block
       * begins and ends.
       *
       * It will commit the data and index file as an atomic operation, use the existing ones, or
       * replace them with new ones.
       *
       * Note: the `lengths` will be updated to match the existing index file if use the existing ones.
       */
      def writeIndexFileAndCommit(
          shuffleId: Int,
          mapId: Int,
          lengths: Array[Long],
          dataTmp: File): Unit = {
        //【创建索引文件和临时索引文件】
        val indexFile = getIndexFile(shuffleId, mapId)
        val indexTmp = Utils.tempFileWith(indexFile)
        try {
          //【BufferedOutputStream用于批量写,性能好】
          val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
          Utils.tryWithSafeFinally {
            // We take in lengths of each block, need to convert it to offsets.
            //【取得之前代码#6中算出的每个分区的大小,累加成索引文件中的偏移量,先写入临时索引文件】
            var offset = 0L
            out.writeLong(offset)
            for (length <- lengths) {
              offset += length
              out.writeLong(offset)
            }
          } {
            out.close()
          }
    
          val dataFile = getDataFile(shuffleId, mapId)
          // There is only one IndexShuffleBlockResolver per executor, this synchronization make sure
          // the following check and rename are atomic.
          //【一个executor中能跑多个task,但只有一个IndexShuffleBlockResolver实例,下面用synchronized加锁】
          //【这样,对数据文件和索引文件的检查、提交操作都具有了原子性】
          synchronized {
            //【检查索引和数据文件是否已经存在了有效的对应关系】
            val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
            if (existingLengths != null) {
              // Another attempt for the same task has already written our map outputs successfully,
              // so just use the existing partition lengths and delete our temporary map outputs.
              //【如果已经存在,就是shuffle write早先已经成功完成,这次操作产生的临时文件就无用了,可以删掉】
              System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
              if (dataTmp != null && dataTmp.exists()) {
                dataTmp.delete()
              }
              indexTmp.delete()
            } else {
              // This is the first successful attempt in writing the map outputs for this task,
              // so override any existing index and data files with the ones we wrote.
              //【如果还没存在对应关系,就是第一次shuffle write刚刚成功,删除可能存在的其他索引和数据文件,防止混淆】
              if (indexFile.exists()) {
                indexFile.delete()
              }
              if (dataFile.exists()) {
                dataFile.delete()
              }
              //【然后将临时文件重命名成正式的文件】
              if (!indexTmp.renameTo(indexFile)) {
                throw new IOException("fail to rename file " + indexTmp + " to " + indexFile)
              }
              if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
                throw new IOException("fail to rename file " + dataTmp + " to " + dataFile)
              }
            }
          }
        } finally {
          if (indexTmp.exists() && !indexTmp.delete()) {
            logError(s"Failed to delete temporary index file at ${indexTmp.getAbsolutePath}")
          }
        }
      }
    

    IndexShuffleBlockResolver类专门负责维护shuffle数据与索引的对应关系,后面的shuffle read阶段也还要用到它。

    从整个shuffle write流程可知,每一个ShuffleMapTask通过SortShuffleWriter只会产生两个文件,一个分区的数据文件,一个索引文件。这与之前的hash shuffle机制相比,文件数量已经大大减少了。

    总结

    sort shuffle write流程简图

    暂未涉及细节的知识点

    • PartitionedAppendOnlyMap与PartitionedPairBuffer的底层实现
    • destructiveSortedWritablePartitionedIterator()方法和排序算法逻辑
    • ShuffleMemoryManager如何分配内存
    • DiskBlockObjectWriter

    相关文章

      网友评论

        本文标题:Spark基本sort shuffle write流程解析

        本文链接:https://www.haomeiwen.com/subject/zamopqtx.html