Shuffle

作者: 搞什么呀 | 来源:发表于2018-02-02 14:39 被阅读0次

    # 简介

    shuffle 是spark 计算核心的的部分之一,很多优化也是基于shuffle来做,所以了解它也是必要的。stage按照是否是宽依赖来切分,而两个stage之间就需要shuffle来做桥梁。shuffle分为shuffle write 和 shuffle read。现在来看看。

    # Shuffle Write

    一、在ShuffleMapTask的runTask方法里可以看到下面这段

    ```

    var writer: ShuffleWriter[Any, Any] = null

        try {

          val manager = SparkEnv.get.shuffleManager

          writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)

          writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])

          writer.stop(success = true).get

    ```

    二、根据该依赖是否需要map-side aggreation或序列化或其它来选择不同的writer

    ```

    handle match {

          case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>

            new UnsafeShuffleWriter(

              env.blockManager,

              shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],

              context.taskMemoryManager(),

              unsafeShuffleHandle,

              mapId,

              context,

              env.conf)

          case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>

            new BypassMergeSortShuffleWriter(

              env.blockManager,

              shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],

              bypassMergeSortHandle,

              mapId,

              context,

              env.conf)

          case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>

            new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)

    ```

    三、先来分析SortShuffleWriter

    ```

    先根据是否需要map-side combine 获取sorter,接着调用sorter.insert

    sorter = if (dep.mapSideCombine) {

          require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")

          new ExternalSorter[K, V, C](

            context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)

        } else {

          // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't

          // care whether the keys get sorted in each partition; that will be done on the reduce side

          // if the operation being run is sortByKey.

          new ExternalSorter[K, V, V](

            context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)

        }

        sorter.insertAll(records)

    ```

    四、分析insertAll

    ```

      if (shouldCombine) {

          // Combine values in-memory first using our AppendOnlyMap

          val mergeValue = aggregator.get.mergeValue

          val createCombiner = aggregator.get.createCombiner

          var kv: Product2[K, V] = null

          val update = (hadValue: Boolean, oldValue: C) => {

            if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)

          }

          while (records.hasNext) {

            addElementsRead()

            kv = records.next()

            map.changeValue((getPartition(kv._1), kv._1), update)

            maybeSpillCollection(usingMap = true)

          }

        } else {

          。。。。。

          }

        }

    ```

    五、分析 maybeSpillCollection 根据是否有map-side 的combine,选取不同的数据存储结构

    ```

    private def maybeSpillCollection(usingMap: Boolean): Unit = {

        var estimatedSize = 0L

        if (usingMap) {

          estimatedSize = map.estimateSize()

          if (maybeSpill(map, estimatedSize)) {

            map = new PartitionedAppendOnlyMap[K, C]

          }

        } else {

          estimatedSize = buffer.estimateSize()

          if (maybeSpill(buffer, estimatedSize)) {

            buffer = new PartitionedPairBuffer[K, C]

          }

        }

    ```

    六、maybeSpill 该函数会判断当前内存数据是否超过了阈值,如果超过了,会调用spill()函数按照partitionId 和 key排序后写入磁盘。

    ```

    override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {

        val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)

        val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)

        spills += spillFile

      }

    ```

    七、数据处理完后,还需要将内存的数据和各个spillFile的数据全局排序后写到一个磁盘文件里,并且生成一个索引文件,标记每个partition的offset。

    ```

    private def mergeSort(iterators: Seq[Iterator[Product2[K, C]]], comparator: Comparator[K])

          : Iterator[Product2[K, C]] =

      {

        val bufferedIters = iterators.filter(_.hasNext).map(_.buffered)

        type Iter = BufferedIterator[Product2[K, C]]

        val heap = new mutable.PriorityQueue[Iter]()(new Ordering[Iter] {

          // Use the reverse of comparator.compare because PriorityQueue dequeues the max

          override def compare(x: Iter, y: Iter): Int = -comparator.compare(x.head._1, y.head._1)

        })

        heap.enqueue(bufferedIters: _*)  // Will contain only the iterators with hasNext = true

        new Iterator[Product2[K, C]] {

          override def hasNext: Boolean = !heap.isEmpty

          override def next(): Product2[K, C] = {

            if (!hasNext) {

              throw new NoSuchElementException

            }

            val firstBuf = heap.dequeue()

            val firstPair = firstBuf.next()

            if (firstBuf.hasNext) {

              heap.enqueue(firstBuf)

            }

            firstPair

          }

        }

      }

    可以看到其使用了PriorityQueue这样的数据机构来排序,把内存和spillFile抽象成iterator,放入queue,queue会按照自定义的comparetor把最大的(key,value)排在前面,每次从queue取出的值都是当前最大的,最后写到disk。这样就能生成一个全局有序的大文件。

    ```

    八、生成索引文件 记录每个partition的offset,方便 shuffle read 读取。

    ```

    def writeIndexFileAndCommit(

          shuffleId: Int,

          mapId: Int,

          lengths: Array[Long],

          dataTmp: File): Unit = {

        val indexFile = getIndexFile(shuffleId, mapId)

        val indexTmp = Utils.tempFileWith(indexFile)

        try {

          val out = new DataOutputStream(

            new BufferedOutputStream(Files.newOutputStream(indexTmp.toPath)))

          Utils.tryWithSafeFinally {

            // We take in lengths of each block, need to convert it to offsets.

            var offset = 0L

            out.writeLong(offset)

            for (length <- lengths) {

              offset += length

              out.writeLong(offset)

    ,,,,,,,,,,,

    ```

    # Shuffle Read

    shuffle read的调用,在shuffle rdd 的compute算子里

    ```

    override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {

        val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]

        SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)

          .read()

          .asInstanceOf[Iterator[(K, C)]]

      }

    //根据partitionid去blockManager查找对应partition的shuffle文件

    override def getReader[K,C](

    handle: ShuffleHandle,

    startPartition: Int,

    endPartition: Int,

    context: TaskContext): ShuffleReader[K,C] = {

    new BlockStoreShuffleReader(

    handle.asInstanceOf[BaseShuffleHandle[K, _,C]], startPartition, endPartition, context)

    }

    之后的排序 合并和shffule write基本一样

    //

    ```

    相关文章

      网友评论

          本文标题:Shuffle

          本文链接:https://www.haomeiwen.com/subject/oxcezxtx.html