美文网首页
spark源码阅读之ExternalSorter

spark源码阅读之ExternalSorter

作者: cclucc | 来源:发表于2018-04-26 19:33 被阅读0次

在SortShuffleWriter中调用ExternalSorter的两个方法insertAll和writePartitionedFile

1】、blockManager

2】、diskBlockManager

3】、serializerManager

4】、fileBufferSize

spark.shuffle.file.buffer=32k

5】、serializerBatchSize

spark.shuffle.spill.batchSize=10000

6】、map(PartitionedAppendOnlyMap)

private var data = new Array[AnyRef](2 * capacity)

即消耗的并不是Storage的内存

7】、buffer(PartitionedPairBuffer)

8】、forceSpillFiles(ArrayBuffer[SpilledFile])

PartitionedAppendOnlyMap 放不下,要落地,那么不能硬生生的写磁盘,所以需要个buffer,然后把buffer再一次性写入磁盘文件,buffer的大小由fileBufferSize决定

9】、spills(ArrayBuffer[SpilledFile])

10】、insertAll
insertAll方法将数据存储在缓冲区

def insertAll(records: Iterator[Product2[K, V]]): Unit = {
   
    val shouldCombine = aggregator.isDefined
     //shouldCombine为true则缓冲区为PartitionedAppendOnlyMap反之为PartitionedPairBuffer
    if (shouldCombine) {
      val mergeValue = aggregator.get.mergeValue
      val createCombiner = aggregator.get.createCombiner
      var kv: Product2[K, V] = null
      //缓存区中对于新放入数据的更新策略,如果缓冲区中没有key则新建一个Combiner,反之将合并value
      val update = (hadValue: Boolean, oldValue: C) => {
        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
      }
      while (records.hasNext) {
        //map中放入一条数据,计数加一,用于判断是否进行spill(当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,这个过程就是spill),详情见下文的maybeSpillCollection以及maybeSpill方法
        addElementsRead()
        kv = records.next()
        // 数据通过partitioner获取到partitionID,按照partitionID,key将数据存放在内存AppendOnlyMap对象中
        map.changeValue((getPartition(kv._1), kv._1), update)
        //判断是否需要进行spill
        maybeSpillCollection(usingMap = true)
      }
    } else {
      // Stick values into our buffer
      while (records.hasNext) {
        addElementsRead()
        val kv = records.next()
        buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
        maybeSpillCollection(usingMap = false)
      }
    }
  }

我们看一下上段代码中中提到的PartitionedAppendOnlyMap.changeValue方法:

override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
    val newValue = super.changeValue(key, updateFunc)
    super.afterUpdate()
    newValue
  }

当被混入的集合的每次update操作以后,需要执行SizeTracker的afterUpdate方法,afterUpdate会判断这是第几次更新,需要的话就会使用SizeEstimator的estimate方法来估计下集合的大小。由于SizeEstimator的调用开销比较大,注释上说会是数毫秒,所以不能频繁调用。所以SizeTracker会记录更新的次数,发生estimate的次数是指数级增长的,基数是1.1,所以调用estimate时更新的次数会是1.1, 1.1 * 1.1, 1.1 * 1.1 *1.1, ....

这是指数的初始增长是很慢的, 1.1的96次方会是1w, 1.1 ^ 144次方是100w,即对于1w次update,它会执行96次estimate,对10w次update执行120次estimate, 对100w次update执行144次estimate,对1000w次update执行169次。

11】、maybeSpillCollection
我们看一下上段代码中中提到的maybeSpillCollection方法,他的作用是检查一次PartitionedAppendOnlyMap是否需要spill,所以每放一条记录就会检查一次PartitionedAppendOnlyMap是否需要spill。

private def maybeSpillCollection(usingMap: Boolean): Unit = {
    var estimatedSize = 0L
    if (usingMap) {
      estimatedSize = map.estimateSize()
      if (maybeSpill(map, estimatedSize)) {
        map = new PartitionedAppendOnlyMap[K, C]
      }
    } else {
      estimatedSize = buffer.estimateSize()
      if (maybeSpill(buffer, estimatedSize)) {
        buffer = new PartitionedPairBuffer[K, C]
      }
    }

    if (estimatedSize > _peakMemoryUsedBytes) {
      _peakMemoryUsedBytes = estimatedSize
    }
  }

从代码中我们可以看到如果需要spill,缓冲区会被释放,重新new一个缓冲区,那么新的缓冲区的内存空间会有多大呢,这个在maybeSpill方法中有详细的解释
12】、maybeSpill

如果每放一条记录就检查一次PartitionedAppendOnlyMap的内存,假设检查一次内存1ms, 1kw 就不得了的时间了。所以肯定是不行的,所以 estimateSize其实是使用采样算法来做的。

(1)、放入数据每32次且currentMemory (estimatedSize)大于myMemoryThreshold

(2)、满足1,则向 shuffleMemoryManager 申请新的缓冲区内存,新的内存大于要 2 * currentMemory - myMemoryThreshold 的内存

(3)、满足1、2或者内存中放入的记录大于numElementsForceSpillThreshold时可以进行spill

注:

currentMemory 通过map的estimatedSize获取

myMemoryThreshold可设置spark.shuffle.spill.initialMemoryThreshold配置,默认5 * 1024 * 1024

shuffleMemoryManager 可分配的内存是ExecutorHeapMemeory * 0.2 * 0.8

numElementsForceSpillThreshold通过spark.shuffle.spill.numElementsForceSpillThreshold配置,默认值Long.MaxValue

protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
    var shouldSpill = false
    if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
      val amountToRequest = 2 * currentMemory - myMemoryThreshold
      val granted = acquireMemory(amountToRequest)
      myMemoryThreshold += granted
      shouldSpill = currentMemory >= myMemoryThreshold
    }
    shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
    // Actually spill
    if (shouldSpill) {
      _spillCount += 1
      logSpillage(currentMemory)
      spill(collection)
      _elementsRead = 0
      _memoryBytesSpilled += currentMemory
      releaseMemory()
    }
    shouldSpill
  }

13】、writePartitionedFile

将in memory(map)以及spillFiles真实的写入文件

def writePartitionedFile(
      blockId: BlockId,
      outputFile: File): Array[Long] = {

    // Track location of each range in the output file
    val lengths = new Array[Long](numPartitions)
    val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
      context.taskMetrics().shuffleWriteMetrics)

    if (spills.isEmpty) {
      // 如果map过程中没有数据落地到磁盘则对缓冲区的数据进行排序即可
      val collection = if (aggregator.isDefined) map else buffer
      //destructiveSortedWritablePartitionedIterator调用partitionedDestructiveSortedIterator对map进行排序,如果map阶段不需要combiner的操作则无需对数据按照key值排序,只需要按照partitionID排序即可
      val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
      while (it.hasNext) {
        val partitionId = it.nextPartition()
        while (it.hasNext && it.nextPartition() == partitionId) {
          it.writeNext(writer)
        }
        val segment = writer.commitAndGet()
        lengths(partitionId) = segment.length
      }
    } else {
      //如果有spill文件则需要获取到各个partition的iterator,逐个读取记录写入磁盘,下文有详细介绍怎么获取到各个partition的iterator
      for ((id, elements) <- this.partitionedIterator) {
        if (elements.hasNext) {
          for (elem <- elements) {
            writer.write(elem._1, elem._2)
          }
          val segment = writer.commitAndGet()
          lengths(id) = segment.length
        }
      }
    }

destructiveSortedWritablePartitionedIterator调用partitionedDestructiveSortedIterator对map进行排序,首先构建comparator,如果传入key比较器则进行partitionID排序之后进行key排序,反之仅仅按照partitionID排序。接着调用destructiveSortedIteratordestructiveSortedIterator是真正的排序器,根据传入comparator的comparator以破坏map特性为代价使对map排序时不需要占用额外空间

val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)
destructiveSortedIterator(comparator)

并返回一个 WritablePartitionedIterator对象。WritablePartitionedIterator可以使用 BlockObjectWriter来写入它的元素。

14】、partitionedIterator

partitionedIterator返回一个对应partitionID的iterators

def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
    val usingMap = aggregator.isDefined
    val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer
    if (spills.isEmpty) {
      //如果spillFile为空则只需要返回缓冲区排序后的iterator
      if (!ordering.isDefined) {
        groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
      } else {
        groupByPartition(destructiveIterator(
          collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
      }
    } else {
      // 如果spillFile不为空则需要将缓冲区以及spillFile合并排序后返回各个partitions的iterator,这里使用的是对外排序的思想,下文中详细介绍了merge方法,注意此时传参spills和缓冲区的的iterator都是已经进行过排序的
      merge(spills, destructiveIterator(
        collection.partitionedDestructiveSortedIterator(comparator)))
    }
  }

15】、merge

将in memory(map)的数据以及spillFiles中的数据按照partitionID读取到内存中合并并按照comparator重新排序(堆外排序),返回一个对应partitionID的iterators
传入参数

private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)])
      : Iterator[(Int, Iterator[Product2[K, C]])] = {
    val readers = spills.map(new SpillReader(_))
    val inMemBuffered = inMemory.buffered
    (0 until numPartitions).iterator.map { p =>
      val inMemIterator = new IteratorForPartition(p, inMemBuffered)
      //将spills以及缓冲区取出当前partition的数据合并后按需排序写入文件
      val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator)
      if (aggregator.isDefined) {
        // Perform partial aggregation across partitions
        (p, mergeWithAggregation(
          iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))
      } else if (ordering.isDefined) {
        (p, mergeSort(iterators, ordering.get))
      } else {
        (p, iterators.iterator.flatten)
      }
    }
  }

16】、mergeWithAggregation

将传入的iterator根据是否需要排序,返回一个对应partitionID的iterator。如果iterator不需要排序,则在next时进行combine需要多做一些工作,而如果iterator进行过排序,则在直接combine时直接combine下一个key值相同的value即可

17】、mergeSort

将传入的iterators按照comparator排序的具体实现

19】、上文提到过spills文件是已经排过序的了,那他们是在何时进行排序的呢?
在maybespill方法中有这么一段代码:

if (shouldSpill) {
      _spillCount += 1
      logSpillage(currentMemory)
      spill(collection)
      _elementsRead = 0
      _memoryBytesSpilled += currentMemory
      releaseMemory()
    }

下面我们来详细看一下spill方法:

override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
    val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
    val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
    spills += spillFile
  }

collection.destructiveSortedWritablePartitionedIterator(comparator)将缓冲区数据进行排序,上文中出现过,将数据写入最终文件时也是使用destructiveSortedWritablePartitionedIterator,该方法返回一个可写的iterator,该方法其实是调用了partitionedDestructiveSortedIterator(keyComparator)进行排序的,上下文出现的需要对map进行拍的的地方其实都是在调用partitionedDestructiveSortedIterator方法
spillMemoryIteratorToDisk方法则是将缓冲区数据写入磁盘上的临时文件上

------------------------------------------------------------------------------------------------------------------------------------------------

shuffleMemoryManager 是被Executor 所有正在运行的Task(Core) 共享的,能够分配出去的内存是:ExecutorHeapMemeory * 0.2 * 0.8

上面的数字可通过下面两个配置来更改:

spark.shuffle.memoryFraction=0.2

spark.shuffle.safetyFraction=0.8

PartitionedAppendOnlyMap 放不下,要落地先写入buffer,然后把buffer再一次性写入磁盘文件。这个buffer是由参数fileBufferSize决定,通过下面配置来更改:

spark.shuffle.file.buffer=32k

数据获取的过程中,序列化反序列化,也是需要空间的,所以Spark 对数量做了限制,通过如下参数serializerBatchSize决定,通过下面配置来更改:

spark.shuffle.spill.batchSize=10000

假设一个Executor的可使用的Core为 C个,那么对应需要的内存消耗为:

C * 32k + C * 10000个Record + C * PartitionedAppendOnlyMap

这么看来

,写文件的buffer不是问题,而序列化的batchSize也不是问题,几万或者十几万个Record 而已。那C * PartitionedAppendOnlyMap 到底会有多大呢?我先给个结论:C * PartitionedAppendOnlyMap <shuffleManager可分配的内存空间

PartitionedAppendOnlyMap 通过map.estimateSize()获取占用内存大小,而map.estimateSize()是近似估计,所以会出现oom的情况

如果你内存开的比较大,其实反倒风险更高,因为estimateSize 并不是每次都去真实的算缓存。它是通过采样来完成的,而采样的周期不是固定的,而是指数增长的,比如第一次采样完后,PartitionedAppendOnlyMap 要经过1.1次的update/insert操作之后才进行第二次采样,然后经过1.1*.1.1次之后进行第三次采样,以此递推,假设你内存开的大,那PartitionedAppendOnlyMap可能要经过几十万次更新之后之后才会进行一次采样,然后才能计算出新的大小,这个时候几十万次更新带来的新的内存压力,可能已经让你的GC不堪重负了。


ExternalSorter的关键调用

注:下文中的buffer/map都是指存放inmemery obj的buffer

  1. insertAll【将数据放入buffer,注意此处并不占用storage内存】
  2. ---changeValue(map的方法)
  3. ---maybeSpillCollection
  4. ---maybeSpill
  5. ---insert(buffer的方法)
  6. ---maybeSpillCollection
  7. ---maybeSpill
  8. writePartitionedFile【将buffer以及spillfile中的文件合并为一个文件】
  9. ---destructiveSortedWritablePartitionedIterator(map的方法)
  10. ---partitionedDestructiveSortedIterator(map的方法)
  11. ---destructiveSortedIterator(map的方法)
  12. ---WritablePartitionedIterator (map的方法)
  13. ---partitionedIterator
  14. ---groupByPartition
  15. ---destructiveSortedIterator
  16. ---groupByPartition
  17. ---destructiveSortedIterator
  18. ---merge
  19. ---destructiveSortedIterator
  20. ---mergeWithAggregation
  21. ------mergeSort
  22. ---mergeSort

参考文章:

相关文章

网友评论

      本文标题:spark源码阅读之ExternalSorter

      本文链接:https://www.haomeiwen.com/subject/bemmlftx.html