美文网首页
Spark Shuffle

Spark Shuffle

作者: wangdy12 | 来源:发表于2018-07-14 21:44 被阅读0次

    Spark是一种MapReduce框架的实现,与Hadoop一样,具有Shuffle阶段

    Shuffle是连接map和reduce之间的桥梁,它将map(ShuffleMapTask)的输出对应到reduce(ShuffleMapTask或ResultTask)输入中,这期间涉及到序列化反序列化、网络数据传输以及磁盘读写,可能还会有压缩解压,加密解密等,是非常耗时的操作

    Shuffle有三个模块组成

    • ShuffleManager:shuffle管理器,所有的shuffle过程都需要先向shuffleManger注册,决定采取何种具体的shuffle方式
    • Shuffle Write:map端进行shuffle的写操作,每个shuffleMapTask将计算结果通过shuffleWriter写出到磁盘中
    • Shuffle Read:reduce端进行shuffle的读操作,即下游的Task通过shuffleReader获取上游写出的数据

    Spark Shuffle演进

    • Spark 1.0及以前只有Hash Shuffle
    • Spark 1.1+,引入Sort Shuffle
    • Spark 1.2+,默认使用Sort Shuffle
    • Spark 1.4+,引入Tungsten Sort或者叫做Unsafe Shuffle
    • Spark 2.0+,统一使用Sort Shuffle

    Tungsten Project 钨丝计划,是对Spark进行优化的一个项目,旨在提升CPU和内存的效率,tungsten-sort中涉及的内存分配,底层可以通过sun.misc.Unsafe类直接获取内存实现

    Hash Shuffle

    在map阶段(ShuffleMapTask),每个ShuffleMapTask都会为下游stage的每个partition写一个临时文件,它的问题在于创建的文件数目过多,现在已经不再使用

    如果有M个ShuffleMapTask,下游stage有R个ShuffleMapTask或ResultTask,理论上会生成M * R个数据文件

    涉及多个小文件的随机读取,硬盘的性能容易称为瓶颈,为了解决文件过多的问题,后来加入了Consolidate Files机制(文件合并机制),运行在同一个core的ShuffleMapTask,第一个会创建R个文件,后续task只是追加写入到已经创建的文件中,当每个task分配1个core时,文件总数为total-executor-cores * R

    Sort Shuffle

    ShuffleMapTask不再为每个Reducer生成一个单独的文件,而是将所有的结果写到一个文件里,同时产生一个索引index文件,减少了文件的数量,文件中的记录首先是按照分区的partition id顺序排列,index文件记录的各个分区对应文件内的偏移,文件总数是2 * M,Reduce阶段可以通过索引,获取相关数据,这样降低了随机磁盘IO与缓存的开销

    Sort Based Shuffle的缺点是必须要进行排序,至少是要按照分区排序,如果指定了keyOrdering才需要在分区内部根据数据的key再次排序,相较于hash shuffle这是额外的性能消耗

    它对应于SortShuffleManager,也是目前唯一的shuffle管理器

    SortShuffleManager

    SortShuffleManagerSparkContext初始化过程中的SparkEnv类初始化,创建ShuffleManager时初始化,当然也可以通过spark.shuffle.manager指定一种自定义的ShuffleManager

        val shortShuffleMgrNames = Map(
          "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
          "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
        val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
        val shuffleMgrClass =
          shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
        val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
    

    在RDD创建过程中,如果对应为宽依赖,即ShuffleDependency,该依赖初始化过程中会通过SparkContext获取一个ShuffleId,然后通过ShuffleManager注册Shuffle,根据不同的情况返回不同的ShuffleHandle

      override def registerShuffle[K, V, C](
          shuffleId: Int,
          numMaps: Int,
          dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
        if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
          // reduce端分区数量少于spark.shuffle.sort.bypassMergeThreshold,没有map端的聚合操作
          // 不再进行归并排序,直接写numPartitions个文件,最后连接到一起,避免了序列化和反序列化,但是缓存需要较高
          new BypassMergeSortShuffleHandle[K, V](
            shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
        } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
          // 以序列化的形式缓存输出
          new SerializedShuffleHandle[K, V](
            shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
        } else {
          // 以反序列化的形式缓存
          new BaseShuffleHandle(shuffleId, numMaps, dependency)
        }
      }
    

    ShuffleMapTask中的runTask(context: TaskContext)函数会通过shuffleManager获取ShuffleWriter,对RDD元素进行写出

    var writer: ShuffleWriter[Any, Any] = null
    try {
        val manager = SparkEnv.get.shuffleManager
        writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
        writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
        writer.stop(success = true).get
    }
    

    SortShuffleManager根据ShuffleHandle的不同类型,初始化对应的Shuffle Writer

      override def getWriter[K, V](
          handle: ShuffleHandle,
          mapId: Int,
          context: TaskContext): ShuffleWriter[K, V] = {
        //记录shuffle id => 该shuffle产生输出的mapper数目
        numMapsForShuffle.putIfAbsent(
          handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
        val env = SparkEnv.get
        handle match {
          case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
            new UnsafeShuffleWriter(
              env.blockManager,
              shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
              context.taskMemoryManager(),
              unsafeShuffleHandle,
              mapId,
              context,
              env.conf)
          case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
            new BypassMergeSortShuffleWriter(
              env.blockManager,
              shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
              bypassMergeSortHandle,
              mapId,
              context,
              env.conf)
          case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
            new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
        }
      }
    

    ShuffledRDDcompute方法中获取shuffle结果,此外还有CoGroupedRDD等RDD也会请求获取Reader

      override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
        val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
        SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
          .read()
          .asInstanceOf[Iterator[(K, C)]]
      }
    

    SortShuffleManagergetReader实现

      override def getReader[K, C](
          handle: ShuffleHandle,
          startPartition: Int,
          endPartition: Int,
          context: TaskContext): ShuffleReader[K, C] = {
        new BlockStoreShuffleReader(
          handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
      }
    

    Shuffle Writer

    共有三种类型的ShuffleWriter

    • BypassMergeSortShuffleWriter:带hash风格的基于sort的shuffle机制,在不进行map端聚合,且reduce端的分区数目不大于阈值(默认200)时优先使用,每个ShuffleMapTask内部同时创建打开Partitioner指定的分区数目个文件,每条record根据分区器,分别写入到不同的文件内,然后创建一个新的文件,默认使用NIO将每个分区文件的数据顺序合并到该文件后删除,最终再同时产生一个记录分区数据偏移的index文件
    • UnsafeShuffleWriter:没有map端聚合,reduce端的分区数目小于等于16777216,序列化器支持重定位的情况下使用,依靠ShuffleExternalSorter对串行化的数据依据partitionId排序,文件内数据按照分区ID顺序排列,如果之前内存不足,会写出多个数据文件,这些文件最终会合并为一个文件,同时产生一个index文件,详细说明
    • SortShuffleWriter:不满足以上两中情况就使用的最基本的写出方式,将数据写入到ExternalSorter,可以进行聚合操作,内存不足时可以先写出到磁盘,最终进行按照分区顺序,不同文件同个分区的数据以及内存中的数据再合并/排序,写出到一个文件中,并产生一个index文件详细说明

    Shuffle Reader

    只有一种ShuffleReader:BlockStoreShuffleReader

    MapOutputTracker中获取上游shuffle数据,通过网络获取远程数据块,本地数据直接读取,如果有需要在对数据进行聚合和排序,最终返回数据的一个迭代器详细说明

    Hadoop shuffle

    本质上与Spark shuffle的原理是一致的,但是在具体实现上有差别

    MapReduce Shuffle Spark Shuffle
    map端写出 在内存中构造一个缓冲区(默认100MB),超过则溢写,最终合并写出到本地 BypassMergeSortShuffleWriter直接写出,另外两种类型会存储在内存中,直到内存不足(这里内存相对较大)再溢写,最终同样合并写出到本地
    map端数据的顺序 分区内部的数据同样是有序的 除非需要进行map端合并,分区内部不进行排序
    copy reduce在每个map完成后立即开始复制它们的输出,即reduce端的任务不是等map端结束才启动的 map,reduce分别归属不同的Stage,Stage有明确的先后顺序,必须等到map task全部完成,才会启动reduce人进行拉取
    内存使用 一般内存都不大,如果内存不足会进行写出,最后进行合并 通常内存较大,有专门的内存管理器,还可以通过配置允许使用堆外内存,数据尽可能的存在内存中

    Spark中shuffle数据直接写出,每个文件对应的缓冲器大小默认为32KB,通过spark.shuffle.file.buffer设定

    spill merge过程是否是两次IO
    拉取的文件是直接位于内存还是写入到磁盘

    Shuffle and sort in MapReduce

    Hadoop MapReduce Shuffle map过程的数据在写磁盘时,task首先将数据写出到环形缓冲区(默认大小100MB,mapreduce.task.io.sort.mb),达到阈值就启动后台线程将数据溢出写到磁盘,此时map的输出会继续写入缓存,如果缓存被填满,map会被阻塞直到溢写过程完成

    写出到磁盘之前,后台线程首先要划分分区,在每个分区内部,后台线程按照键值进行内存中排序,如果存在combiner函数,就在排序后的数据上进行结合

    每次内存缓冲区达到溢出阈值,会新建一个溢出文件,最后所有的溢出文件被合并成一个已分区且已排序的输出文件,如果至少存在3个溢出文件,那么combiner就会在输出文件写到磁盘之前再次运行

    reduce端因为map task可能在不同时间完成,因此reduce task在每个map完成后立即开始复制它们的输出,这称为reduce任务的复制阶段copy phase,reduce task具有少量的复制线程(默认是5,mapreduce.reduce.shuffle.parallelcopies),可以并行获取map输出,获取的数据优先放置在内存中,否则写出到磁盘,随着拷贝的增多,后台线程将他们合并为更大的有序文件,方便之后的merger

    当所有的map输出都复制过来以后,reduce task进入sort phase或者叫做merger phase,将map的输出合并,并保证顺序,这里可能涉及到多轮次合并(合并因子默认为10,mapreduce.task.io.sort.factor),在此过程中会省略最后一轮次合并结果写入磁盘的过程,直接将数据输入reduce函数,即reduce phase

    合并因子为10,有效合并40个文件片段的方式

    相关文章

      网友评论

          本文标题:Spark Shuffle

          本文链接:https://www.haomeiwen.com/subject/bvcupftx.html