美文网首页
Spark SortShuffleWriter

Spark SortShuffleWriter

作者: wangdy12 | 来源:发表于2018-07-14 21:49 被阅读0次

    这是三种ShuffleWriter中最通用的情况,对应BaseShuffleHandle,此时可以在map端进行数据合并,否则不向排序工具ExternalSorter传入排序相关参数,只会根据key值获取对应的分区id,来划分数据,不会在分区内排序,如果结果需要排序,例如sortByKey,会在reduce端获取shuffle数据后进行

      override def write(records: Iterator[Product2[K, V]]): Unit = {
        //根据是否在map端进行数据合并初始化ExternalSorter
        sorter = if (dep.mapSideCombine) {
          require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
          new ExternalSorter[K, V, C](
            context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
        } else {
          // 不进行聚合,就不进行排序,如果有需要reduce端再进行排序
          new ExternalSorter[K, V, V](
            context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
        }
        sorter.insertAll(records)
    
        // shuffle输出文件
        val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
        val tmp = Utils.tempFileWith(output)
        try {
          val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
          //sorter中的数据写出到该文件中
          val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
          // 写出对应的index文件,纪录每个Partition对应的偏移量
          shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
          //shuffleWriter的返回结果
          mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
        } finally {
          if (tmp.exists() && !tmp.delete()) {
            logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
          }
        }
      }
    

    ExternalSorter[K, V, C]

    继承关系

    MemoryConsumer (org.apache.spark.memory)
      Spillable (org.apache.spark.util.collection)
        ExternalSorter (org.apache.spark.util.collection)
    

    对[K,V]键值对数据排序,支持聚合,可以按照键值,转换为(K, C)类型的数据

    首先将键值K根据分区器分配到相应的分区中,如果需要聚合数据,就对每个分区中的键值进行排序,否则,则类型C必须等于V,不会进行分区内部排序

    参数:

    • aggregator 聚合器 - 可选,用于合并数据
    • partitioner 分区器 - 优先按分区id排序
    • ordering 排序 - 可选,为每个分区内的键值排序
    • serializer 序列化器 - 写出数据到磁盘时使用

    只有在确实需要输出的K为有序的时候才提供Ordering。如果map端的ShuffleMapTask不需要合并,排序参数传递Null,以避免额外排序;另一方面,如果确实想要合并,有Ordering参数会更加高效,否则使用的hash值排序,然后hash值相同再判定key值本身是否相同进行合并

    1. 实例化一个ExternalSorter
    2. 用一组记录调用insertAll()
    3. 调用writePartitionedFile()写出一个文件,作为Shuffle输出,也支持调用iterator方法,返回排序合并过的所有元素

    类在内部工作原理如下:

    • 重复填充内存数据缓冲区,如果要按K组合,使用PartitionedAppendOnlyMap,否则使用PartitionedPairBuffer。在这些缓冲区中,按分区ID对元素进行排序,然后也可能通过K值来排序。为避免每个K值多次调用分区器获取分区id,将分区ID与每条记录的K一起存储,作为实际的键值
    • 当每个缓冲区达到内存限制时,会将其写出(spill)到一个文件中,这个文件内存会按分区id顺序写出,如果需要进行聚合,排序时会先比较id,相同时再按K值或K值的哈希码(没有传递K的比较器的时候)排序
    • 当用户请求迭代器或文件输出时,溢出文件将以及剩余的内存数据合写成一个有序的文件。如果需要按K值进行聚合,可以使用排序参数决定顺序,或者先根据键值的哈希码排序,然后将相同hash的相互比较,如果相等就进行合并
    • 最后调用stop()来删除所有中间文件

    源码分析:

    Aggregator

    Aggregator内部包含三个函数,分别用来初始化C,合并V到C,合并C,在map端合并时会用到

    case class Aggregator[K, V, C] (
        createCombiner: V => C,
        mergeValue: (C, V) => C,
        mergeCombiners: (C, C) => C)
    
    存储对象的数据结构
    @volatile private var map = new PartitionedAppendOnlyMap[K, C]
    @volatile private var buffer = new PartitionedPairBuffer[K, C]
    

    PartitionedAppendOnlyMap底层是一个AppendOnlyMap类型的简单哈希表存储结构,特点是只会添加keys,不会删除key,数据存储在数组private var data = new Array[AnyRef](2 * capacity)中,每对key和value连续存放,PartitionedAppendOnlyMap对此进行了封装,它的键值是Tuple2类型的(Int, K),值是类型C,其中Int表示的是分区id,如果key值冲突,采用线性探测再散列处理冲突,同时混入了SizeTracker特质,能够评估自身大致占用的内存数量

    PartitionedPairBuffer底层直接是一个数组存储数据,数据类型与PartitionedAppendOnlyMap相同,将每条记录顺序插入即可,在不需要进行map端聚合的时候使用,也混入了SizeTracker特质,用于评估自身大致占用的内存数量,其初始容量为64,即128长度的AnyRef类型数组,实际相邻存储((partitionId, K), V)

    比较函数

    排序使用的是TimSort,源自归并排序和插入排序

    没有定义聚合使用WritablePartitionedPairCollection.partitionComparator,只比较分区id
    如果定义了排序,使用WritablePartitionedPairCollection.partitionKeyComparator,先比较分区,后比较Key值

    比较key值时,如果传入了自定义的Key值比较器,就是用该比较器进行Partition内部的比较,否则使用hash值比较,后续需要进行聚合时判断相同hash的key是否相同

    // ExternalSorter 类
    private val keyComparator: Comparator[K] = ordering.getOrElse(new Comparator[K] {
        override def compare(a: K, b: K): Int = {
          val h1 = if (a == null) 0 else a.hashCode()
          val h2 = if (b == null) 0 else b.hashCode()
          if (h1 < h2) -1 else if (h1 == h2) 0 else 1
        }
      })
    
    private def comparator: Option[Comparator[K]] = {
      if (ordering.isDefined || aggregator.isDefined) {
        Some(keyComparator)
      } else {
        None
      }
    }
    
    // WritablePartitionedPairCollection类,两种存储结构都混入了该伴生特质,如果Comparator[K]非空,先比较键值中的分区id,相等时比较实际的key
    def partitionKeyComparator[K](keyComparator: Comparator[K]): Comparator[(Int, K)] = {
      new Comparator[(Int, K)] {
        override def compare(a: (Int, K), b: (Int, K)): Int = {
          val partitionDiff = a._1 - b._1
          if (partitionDiff != 0) {
            partitionDiff
          } else {
            keyComparator.compare(a._2, b._2)
          }
        }
      }
    }
    
    //只比较分区
    def partitionComparator[K]: Comparator[(Int, K)] = new Comparator[(Int, K)] {
      override def compare(a: (Int, K), b: (Int, K)): Int = {
        a._1 - b._1
      }
    }
    
    写入数据

    具体调用:

      def insertAll(records: Iterator[Product2[K, V]]): Unit = {
        val shouldCombine = aggregator.isDefined
    
        if (shouldCombine) {
          // Combine values in-memory first using our AppendOnlyMap
          // 定义update函数,根据不同情况,决定是合并V值到C中,还是创建一个新的C
          val mergeValue = aggregator.get.mergeValue
          val createCombiner = aggregator.get.createCombiner
          var kv: Product2[K, V] = null
          val update = (hadValue: Boolean, oldValue: C) => {
            if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
          }
          while (records.hasNext) {
            //计数,元素数目加1
            addElementsRead()
            kv = records.next()
            //写入数据
            map.changeValue((getPartition(kv._1), kv._1), update)
            //决定是否写出数据
            maybeSpillCollection(usingMap = true)
          }
        } else {
          // Stick values into our buffer
          while (records.hasNext) {
            addElementsRead()
            val kv = records.next()
            buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
            maybeSpillCollection(usingMap = false)
          }
        }
      }
    

    每次插入数据以后,都会调用maybeSpillCollection判断是否需要将数据写入磁盘,内部会调用maybeSpill,当底层存储的数据的集合内存不足申请内存时,通过spill(collection: C)写出

    spill写出

    SpillableExternalSorter的父类,当前集合中的记录数目是elementsRead,估计大小为currentMemory,当集合中记录条数为32的整数倍,且估计大小大于内存阈值时,先尝试申请两倍内存大小的空间,如果成功增大阈值不进行落盘,如果内存不足时,则写出集合中的数据到磁盘中,内存阈值初始大小通过spark.shuffle.spill.initialMemoryThreshold设定,初始为5MB

    如果内存足够,即MemoryManager始终能够提供足够大小的内存空间,一般就不需要落盘,除非达到了底层数组存储数目的上限

      protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
        var shouldSpill = false
        // 集合中元素的数目为32的整数倍,且大于内存阈值时,判断是否需要写出
        if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
          // 内存扩容为当前两倍时需要额外的空间
          val amountToRequest = 2 * currentMemory - myMemoryThreshold
          // 确保有足够的空间,可以增加阈值,否则进行写出
          val granted = acquireMemory(amountToRequest)
          myMemoryThreshold += granted
          shouldSpill = currentMemory >= myMemoryThreshold
        }
        // 元素数目大于阈值,强制写出,通过"spark.shuffle.spill.numElementsForceSpillThreshold"设定,默认是Long.MaxValue
        shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
        // 实际进行写出,调用ExternalSorter的spill方法
        if (shouldSpill) {
          _spillCount += 1
          logSpillage(currentMemory)
          spill(collection)
          _elementsRead = 0
          _memoryBytesSpilled += currentMemory
          releaseMemory()
        }
        shouldSpill
      }
    
    private val spills = new ArrayBuffer[SpilledFile]
    
      override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
        //传入比较器,对内存数据进行排序,返回一个排序后结果的迭代器
        val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
        //写出到文件
        val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
        spills += spillFile
      }
    

    写出函数spillMemoryIteratorToDisk通过DiskBlockManager获得一个文件,以及对应的DiskBlockObjectWriter,缓存是32K,这里分批(batch,通过spark.shuffle.spill.batchSize设定,默认10000纪录算一批)写出串行化后的数据,同时记录相关信息,封装返回,返回对象为SpilledFile(File, blockId, 每个batch写出的数据量大小 : Array[Long], 每个分区元素数目 : Array[Long])

    最终写出数据

    写出添加到ExternalSorter的所有数据:

      def writePartitionedFile(
          blockId: BlockId,
          outputFile: File): Array[Long] = {
    
        // 纪录每个分区的数据长度
        val lengths = new Array[Long](numPartitions)
        val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
          context.taskMetrics().shuffleWriteMetrics)
    
        if (spills.isEmpty) {
          // 只有内存中的数据,没有溢出文件,那么顺序写出,记录每个分区大小即可
          val collection = if (aggregator.isDefined) map else buffer
          val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
          while (it.hasNext) {
            val partitionId = it.nextPartition()
            while (it.hasNext && it.nextPartition() == partitionId) {
              it.writeNext(writer)
            }
            val segment = writer.commitAndGet()
            lengths(partitionId) = segment.length
          }
        } else {
          // 归并排序,把所有文件和当前内存中的数据合并,然后写出
          // 读取时是按照partition顺序逐个处理,一个分区一个分区的合并
          for ((id, elements) <- this.partitionedIterator) {
            if (elements.hasNext) {
              for (elem <- elements) {
                writer.write(elem._1, elem._2)
              }
              val segment = writer.commitAndGet()
              lengths(id) = segment.length
            }
          }
        }
    
        writer.close()
        ...
        lengths
      }
    

    这里的核心是partitionedIterator,将已排序的文件序列和内存中的数据合并,返回Iterator[(Int, Iterator[Product2[K, C]])]迭代器,按分区分组,对每个分区,都有一个遍历其内容的迭代器,按顺序访问数据

      private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)])
          : Iterator[(Int, Iterator[Product2[K, C]])] = {
        // 文件读取器
        val readers = spills.map(new SpillReader(_))
        val inMemBuffered = inMemory.buffered
        //逐个分区处理
        (0 until numPartitions).iterator.map { p =>
          // 内存中的数据已经按照partition排序好了,所有可以通过迭代器顺序输出
          val inMemIterator = new IteratorForPartition(p, inMemBuffered)
          // 合并该分区ID下,文件数据迭代器和内存数据迭代器,类型为 Seq[Iterator[Product2[K, C]]]
          val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator)
          if (aggregator.isDefined) {
            // key值相同进行聚合
            (p, mergeWithAggregation(
              iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))
          } else if (ordering.isDefined) {
            // 没有聚合,但指定顺序的情况
            (p, mergeSort(iterators, ordering.get))
          } else {
            (p, iterators.iterator.flatten)
          }
        }
      }
    
    • mergeSort:指定ordering时,算法采用的是经典的将多个有序数据,合并为一个有序序列的算法,底层采用优先级队列,将多个迭代器作为元素,根据迭代器内的第一个元素作为大小比较的依据,读取时将元素最小的迭代器出队列,取出元素,再加入队列
    • 什么都没有指定时,将Seq[Iterator[Product2[K, C]]]直接转换为Iterator[Product2[K, C]]

    如果指定了聚合函数,实现细节:

    • 如果指定了ordering时,采用mergeSort,然后顺序读取时,key相同进行合并
    • 如果没有指定顺序,依然采用mergeSort,不过是使用hash值进行比较的,读取时,将key的hash值相同的记录作为一批,同时读取处理,将key值实际相同的进行合并,最后顺序输出即可
      private def mergeWithAggregation(
          iterators: Seq[Iterator[Product2[K, C]]],
          mergeCombiners: (C, C) => C,
          comparator: Comparator[K],
          totalOrder: Boolean)
          : Iterator[Product2[K, C]] =
      {
        if (!totalOrder) {
          // 没有提供自定义的排序方法,只是部分有序,使用hash值排序,
          // 存在hash值相同,key值不同的情况
          new Iterator[Iterator[Product2[K, C]]] {
            //对此partition内的数据,根据K的hash值进行排序,底层是优先队列实现
            val sorted = mergeSort(iterators, comparator).buffered
    
            // Buffers reused across elements to decrease memory allocation
            val keys = new ArrayBuffer[K]
            val combiners = new ArrayBuffer[C]
    
            override def hasNext: Boolean = sorted.hasNext
    
            override def next(): Iterator[Product2[K, C]] = {
              if (!hasNext) {
                throw new NoSuchElementException
              }
              // 清空缓存
              keys.clear()
              combiners.clear()
              // 获取第一个数据,不一定是sorter中的第一个数据
              val firstPair = sorted.next()
              // 添加到缓存
              keys += firstPair._1
              combiners += firstPair._2
              val key = firstPair._1
              // 获取下一个hash值相同的数据
              while (sorted.hasNext && comparator.compare(sorted.head._1, key) == 0) {
                // 当前数据
                val pair = sorted.next()
                var i = 0
                var foundKey = false
                // 遍历缓存,如果key值与当前数据相同,进行合并
                while (i < keys.size && !foundKey) {
                  if (keys(i) == pair._1) {
                    combiners(i) = mergeCombiners(combiners(i), pair._2)
                    foundKey = true
                  }
                  i += 1
                }
                // 没有找到当前数据key值相同的记录,将当前记录添加到缓存
                if (!foundKey) {
                  keys += pair._1
                  combiners += pair._2
                }
              }
    
              //返回迭代器Iterator[Product2[K, C],他们key的hash值都相等
              keys.iterator.zip(combiners.iterator)
            }
          }.flatMap(i => i)//把二次迭代展开
        } else {
          // 定义了ordering,先进行归并排序,然后key值相同的简单合并起来
          new Iterator[Product2[K, C]] {
            val sorted = mergeSort(iterators, comparator).buffered
    
            override def hasNext: Boolean = sorted.hasNext
    
            override def next(): Product2[K, C] = {
              if (!hasNext) {
                throw new NoSuchElementException
              }
              val elem = sorted.next()
              val k = elem._1
              var c = elem._2
              while (sorted.hasNext && sorted.head._1 == k) {
                val pair = sorted.next()
                c = mergeCombiners(c, pair._2)
              }
              (k, c)
            }
          }
        }
    

    相关文章

      网友评论

          本文标题:Spark SortShuffleWriter

          本文链接:https://www.haomeiwen.com/subject/qfhupftx.html