美文网首页
Spark shuffle writer源码解析

Spark shuffle writer源码解析

作者: 幸福的小棉袄 | 来源:发表于2018-03-11 14:07 被阅读0次

    Shuffle分类

    一个作业经过spark的DAGSchedule调度器划分为多个stage,同时有些下游的stage依赖上游的stage,这样会导致上游的stage做map的工作,下游的stage做reduce的工作。而上下游stage就是通过shuffle连接在一起的。shuffle分为shuffleWriter和ShulleReader,writer即为上游的stage,reader为下游的stage。
      在这里我们首先介绍一下ShuffleWriter的分类。如下:
      1.BypassMergeSortShuffleWriter。和Hash Shuffle的实现基本相同,区别在于map task输出汇总一个文件,同时还会产生一个index file。
      2.UnsafeShuffleWriter。使用Java Unsafe直接操作内存,避免Java对象多余的开销和GC 延迟,效率高。
      3.SortShullfleWriter。sortShulleWriter和shuffle不同的在于map端的sort。
      以上三种shuffleWriter都具有各自的应用场景。分别如下:
      1.没有map端聚合操作且RDD的partition分区数小于200个,使用BypassMergerSortShuffleWriter。
      2.没有map端聚合,RDD的partitions分区数小于16777216且Serializer支持relocation,使用UnsafeShuffleWriter。
      3.(1)和(2)不满足使用SortShuffle
    上面介绍了ShullfleWriter的分类,下面将详细介绍SortShuffleWriter的具体工作原理。

    SortShuffleWriter工作原理

    预备知识

    org.apache.spark.util.collection.AppendOnlyMap
    org.apache.spark.util.collection.PartitionedPairBuffer
    org.apache.spark.util.collection.TimSorter
    org.apache.spark.shuffle.sort.SortShuffleWriter
      其中,AppendOnlyMap和PartitionedPairBuffer是shuffle过程中使用的两个数据结构,但是底层都是基于Array实现的。当需要map端的combine或者sort时,使用的是AppendOnlyMap结构;否则,使用的是PartitionedPairBuffer结构。而TimSorter主要是封装了归并排序,用于内存中的数据排序。
      首先我们看一下SortShullfleWriter源码,如下

    /** Write a bunch of records to this task's output */
      override def write(records: Iterator[Product2[K, V]]): Unit = {
        sorter = if (dep.mapSideCombine) {
        #需要map端的聚合
          require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
          new ExternalSorter[K, V, C](
            context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
        } else {
            #不需要map端的聚合或者排序
          // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
          // care whether the keys get sorted in each partition; that will be done on the reduce side
          // if the operation being run is sortByKey.
          new ExternalSorter[K, V, V](
            context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
        }
        sorter.insertAll(records)
        // Don't bother including the time to open the merged output file in the shuffle write time,
        // because it just opens a single file, so is typically too fast to measure accurately
        // (see SPARK-3570).
        val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
        val tmp = Utils.tempFileWith(output)
        try {
          val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
          val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
          shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
          mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
        } finally {
          if (tmp.exists() && !tmp.delete()) {
            logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
          }
        }
      }
    

    接下来看一下ExternalSorter类,具体在org.apache.spark.util.collection.ExternalSorter。下面转到了ExternalSort的insert方法中,该方法做了shuffleWriter的绝大部分工作 。我们先看一下insert的源代码,然后介绍该方法所做的主要工作。

    def insertAll(records: Iterator[Product2[K, V]]): Unit = {
        // TODO: stop combining if we find that the reduction factor isn't high
        val shouldCombine = aggregator.isDefined
        if (shouldCombine) {
          // Combine values in-memory first using our AppendOnlyMap
          val mergeValue = aggregator.get.mergeValue
          val createCombiner = aggregator.get.createCombiner
          var kv: Product2[K, V] = null
          val update = (hadValue: Boolean, oldValue: C) => {
            if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
          }
          while (records.hasNext) {
            addElementsRead()
            kv = records.next()
            map.changeValue((getPartition(kv._1), kv._1), update)
         #判断是否会发生溢出
            maybeSpillCollection(usingMap = true)
          }
        } else {
          // Stick values into our buffer
          while (records.hasNext) {
            addElementsRead()
            val kv = records.next()
            buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
           #判断是否会发生溢出
            maybeSpillCollection(usingMap = false)
          }
        }
      }
    

    具体的工作如下:
    (1)根据是否需要map端做shouldCombine,决定使用map还是buffer。由于使用的是map,在进行join的过程中,不停地遍历map,当发现相同的时候,直接更新map的value即可。在这里会存在一个问题,就是假如key存在不同的disk的文件上,该如何,这时候当调用Iterator时,会返回一个基于内存和disk的多路归并迭代器。这个迭代器,每次在调用next 方法的时候是基于优先级队列,也就是每个迭代器最小hash值作为比较对象的堆结构,寻找最小的hash值且key值相等的所有元素(因为我们每个map 都是排序过的,所以这总能实现),进行merge,将所有符合要求的元素merge完成后返回。这样便完成了最终的聚合操作。
    (2)判断是否为溢出
    那么,接下来的重点是看缓存是否会发生溢出,主要在maybeSpillCollection方法中。下面是该方法的源码。

     /**
       * Spill the current in-memory collection to disk if needed.
       *
       * @param usingMap whether we're using a map or buffer as our current in-memory collection
       */
      private def maybeSpillCollection(usingMap: Boolean): Unit = {
        var estimatedSize = 0L
        if (usingMap) {
          estimatedSize = map.estimateSize()
          if (maybeSpill(map, estimatedSize)) {
            map = new PartitionedAppendOnlyMap[K, C]
          }
        } else {
          estimatedSize = buffer.estimateSize()
          if (maybeSpill(buffer, estimatedSize)) {
            buffer = new PartitionedPairBuffer[K, C]
          }
        }
    
        if (estimatedSize > _peakMemoryUsedBytes) {
          _peakMemoryUsedBytes = estimatedSize
        }
      }
    

    在该方法中首先需要评估一下是否会发生OOM。由于PartitionedAppendOnlyMap或者PartitionedPairBuffer都继承了SizeTracker类,所以不管是buffer还是map都是使用了SizeTracker中的estimateSize方法。在处理每条record时,都会进行一次estimateSize,评估出已经使用了的缓存大小。下面是该方法

    /**
       * Estimate the current size of the collection in bytes. O(1) time.
       */
      def estimateSize(): Long = {
        assert(samples.nonEmpty)
        val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
        (samples.last.size + extrapolatedDelta).toLong
      }
    

    其中bytesPerUpdate是两次采样之间每次update这个集合,size增长了多少。也就是说每操作集合一下,集合的size增长了多少。

    bytesPerUpdate=(latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)
    

    然后以bytePerUpdate作为最近平均每次更新时的bytePerUpdate,用当前的update次数减去最后一个Sample的update次数,然后乘以bytePerUpdate,结果加上最后一个Sample记录的大小。

    val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
        (samples.last.size + extrapolatedDelta).toLong
    

    我们回到maybeSpillCollection方法中,当评估出map或者buffer的大小以后,直接调用方法maybeSpill,判断是否发生溢写操作。该方法主要存在org.apache.spark.util.collection.Spillable类中,方法如下

    /**
       * Spills the current in-memory collection to disk if needed. Attempts to acquire more
       * memory before spilling.
       *
       * @param collection collection to spill to disk
       * @param currentMemory estimated size of the collection in bytes
       * @return true if `collection` was spilled to disk; false otherwise
       */
      protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
        var shouldSpill = false
        if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
          // Claim up to double our current memory from the shuffle memory pool
          val amountToRequest = 2 * currentMemory - myMemoryThreshold
          val granted = acquireMemory(amountToRequest)
          myMemoryThreshold += granted
          // If we were granted too little memory to grow further (either tryToAcquire returned 0,
          // or we already had more memory than myMemoryThreshold), spill the current collection
          shouldSpill = currentMemory >= myMemoryThreshold
        }
        shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
        // Actually spill
        if (shouldSpill) {
          _spillCount += 1
          logSpillage(currentMemory)
          spill(collection)
          _elementsRead = 0
          _memoryBytesSpilled += currentMemory
          releaseMemory()
        }
        shouldSpill
      }
    

    首先是传入这个集合以及该集合的使用大小,在这个方法中,我们每32个记录判断一次。当当前的集合大小大于阈值时,需要重新扩大能存的大小,同时把集合中的数据写入磁盘上。
      接下来是调用ExternalSort的spill方法,该方法的主要作用是集合写到磁盘,并且把添加到一个spills数组中。

    /**
       * Spill our in-memory collection to a sorted file that we can merge later.
       * We add this file into `spilledFiles` to find it later.
       *
       * @param collection whichever collection we're using (map or buffer)
       */
      override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
        val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
        val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
        spills += spillFile
      }
    

    接下来就是meger这些磁盘文件了,在这里我们把文件记录在spills数组中了。

    merge过程

    当我们请求一个iterator或者文件时,会将所有的SpilledFile和在内存当中未进行溢写的数据进行合并。
      从上面可以看出接下来执行的是ExternalSort的writePartitionedFile方法,其主要作用就是把数据写入到磁盘,并返回一个索引。

    /**
       * Write all the data added into this ExternalSorter into a file in the disk store. This is
       * called by the SortShuffleWriter.
       *
       * @param blockId block ID to write to. The index file will be blockId.name + ".index".
       * @return array of lengths, in bytes, of each partition of the file (used by map output tracker)
       */
      def writePartitionedFile(
          blockId: BlockId,
          outputFile: File): Array[Long] = {
    
        // Track location of each range in the output file
        val lengths = new Array[Long](numPartitions)
        val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
          context.taskMetrics().shuffleWriteMetrics)
          #所有的数据都存储在内存中,直接写入文件
        if (spills.isEmpty) {
          // Case where we only have in-memory data
          val collection = if (aggregator.isDefined) map else buffer
          val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
          while (it.hasNext) {
            val partitionId = it.nextPartition()
            while (it.hasNext && it.nextPartition() == partitionId) {
              it.writeNext(writer)
            }
            val segment = writer.commitAndGet()
            lengths(partitionId) = segment.length
          }
        } else {
          // We must perform merge-sort; get an iterator by partition and write everything directly.
        # 当发生了溢写操作
          for ((id, elements) <- this.partitionedIterator) {
            if (elements.hasNext) {
              for (elem <- elements) {
                writer.write(elem._1, elem._2)
              }
              val segment = writer.commitAndGet()
              lengths(id) = segment.length
            }
          }
        }
    
        writer.close()
        context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
        context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
        context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)
    
        lengths
      }
    

    当发生了溢写操作,这时候需要合并多个小文件,这时候主要是通过调用了partitionedIterator方法,在partitionedIterator主要调用了是merge方法,下面看一下merge方法。

    /**
       * Merge a sequence of sorted files, giving an iterator over partitions and then over elements
       * inside each partition. This can be used to either write out a new file or return data to
       * the user.
       *
       * Returns an iterator over all the data written to this object, grouped by partition. For each
       * partition we then have an iterator over its contents, and these are expected to be accessed
       * in order (you can't "skip ahead" to one partition without reading the previous one).
       * Guaranteed to return a key-value pair for each partition, in order of partition ID.
       */
      private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)])
          : Iterator[(Int, Iterator[Product2[K, C]])] = {
         #每一个spilledFile创建一个reader
        val readers = spills.map(new SpillReader(_))
        val inMemBuffered = inMemory.buffered
        (0 until numPartitions).iterator.map { p =>
          val inMemIterator = new IteratorForPartition(p, inMemBuffered)
            #合并内存与spill文件
          val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator)
          if (aggregator.isDefined) {
            ##聚合
            // Perform partial aggregation across partitions
            (p, mergeWithAggregation(
              iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))
          } else if (ordering.isDefined) {
          #排序
            // No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey);
            // sort the elements without trying to merge them
            (p, mergeSort(iterators, ordering.get))
          } else {
            (p, iterators.iterator.flatten)
          }
        }
      }
    

    该方法将属于同一个reduce端的partition的内存数据和spill文件数据合并起来,再进行聚合排序(有需要的话),最后返回(reduce对应的partitionId,该分区数据迭代器)。将数据merge-sort后写入最终的文件后,需要将每个partition的偏移量持久化到文件以供后续每个reduce根据偏移量获取自己的数据,写偏移量的逻辑很简单,就是根据前面得到的partition长度的数组将偏移量写到index文件中。接下来看一下mergeSort方法。

     /**
       * Merge-sort a sequence of (K, C) iterators using a given a comparator for the keys.
       */
      private def mergeSort(iterators: Seq[Iterator[Product2[K, C]]], comparator: Comparator[K])
          : Iterator[Product2[K, C]] =
      {
        val bufferedIters = iterators.filter(_.hasNext).map(_.buffered)
        type Iter = BufferedIterator[Product2[K, C]]
        val heap = new mutable.PriorityQueue[Iter]()(new Ordering[Iter] {
          // Use the reverse of comparator.compare because PriorityQueue dequeues the max
          override def compare(x: Iter, y: Iter): Int = -comparator.compare(x.head._1, y.head._1)
        })
        heap.enqueue(bufferedIters: _*)  // Will contain only the iterators with hasNext = true
        new Iterator[Product2[K, C]] {
          override def hasNext: Boolean = !heap.isEmpty
    
          override def next(): Product2[K, C] = {
            if (!hasNext) {
              throw new NoSuchElementException
            }
            val firstBuf = heap.dequeue()
            val firstPair = firstBuf.next()
            if (firstBuf.hasNext) {
              heap.enqueue(firstBuf)
            }
            firstPair
          }
        }
      }
    

    该方法主要是利用了java collection的优先队列,这样每次遍历的时候都是取最小或者最大的一个,这样就可以把spill文件按照(partitionid,key)排序了,最后生成一个文件,同时会生成一个索引文件,索引文件是由上面的writePartitionedFile方法生成的。
       下一节,我们将介绍基于sort shuffle的shufflereader。

    相关文章

      网友评论

          本文标题:Spark shuffle writer源码解析

          本文链接:https://www.haomeiwen.com/subject/goemxftx.html