美文网首页
8.3 Shuffle 过程之 ShuffleManager

8.3 Shuffle 过程之 ShuffleManager

作者: GongMeng | 来源:发表于2018-12-01 15:16 被阅读0次

    1. 概述

    shuffleManager是一个trait类型, 定义了几个基本的接口用于实现.主要目的是实现shuffle相关信息从executor到driver的注册和汇总等等

    我们看到一个方法registerShuffle, 前面在DAG计算时. 我们记得有两种依赖, 一种是narrowDependency, 一种是shuffleDependency, 这里注册的就是Shuffle Dependency, 源码在之前的章节也出现过.

    trait ShuffleManager {
      def registerShuffle[K, V, C](
          shuffleId: Int,
          numMaps: Int,
          dependency: ShuffleDependency[K, V, C]): ShuffleHandle
      def getWriter[K, V](
        handle: ShuffleHandle,
        mapId: Int,
        context: TaskContext): ShuffleWriter[K, V]
      def getReader[K, C](
        handle: ShuffleHandle,
        startPartition: Int,
        endPartition: Int,
        context: TaskContext): ShuffleReader[K, C]
      def unregisterShuffle(shuffleId: Int): Boolean
      def shuffleBlockResolver: ShuffleBlockResolver
      def stop(): Unit
    }
    

    它在spark1.6里面有两个基本实现, 一个是HashShuffleManager, 另外一个是SortShuffleManager

    2. ShuffleManager的实现和具体使用.

    很多人都详细写了HashShuffle, SortShuffle的实现, 以及它们是如何压缩中间文件的数量以便更快的实现数据从一个Node 到另外一个Node的转移, 从而做到了让Spark的MR模型比Hadoop2.0的MR模型要快.

    下面的内容和介绍大部分来自Cloudera团队开发SortShuffle时的说明, 熟悉的忽略即可

    2.1 什么是shuffle

    Shuffle

    在分布式计算中, 不得不让数据在node之间转移, 等同于把数据从一批机器洗到另外一批机器. reduceByKey的时候, 必须让同一个pairRDD中同样key的数据进入同一个机器, 这就需要shuffle. Spark中RDD如果到前一层RDD的依赖是shuffleDependency, 就说明需要执行shuffle操作, 划分stage了.
    Shuffle过程也是Stage和Stage之间的分割线

    2.2 Basic Shuffle

    传统的shuffle过程, 每个Map进程, 为每个Reduce进程维护一个文件, 把结果写进去. 这样假设说我们有M个Map进程, R个Reduce进程, 又恰好数据非常分散, 每个Map进程都包含了后续每个Reduce进程可能用到的数据. 那么整个集群需要需要维护M*R个文件. 对于存储系统来说, 它带来了小文件的问题. 这意味着更多的磁盘IO操作, 非连续写, 内存分页碎片问题, 这些都是小文件高负载时存储系统面对的问题. 传统文件系统的解决方案也很简单, 就是类似EXT文件系统的日志写, 让小文件表现成一个连续的日志即可.


    BasicShuffle

    https://issues.apache.org/jira/browse/SPARK-2503

    2.3 Hash Shuffle

    同一台机器上的Map在写shuffle文件的时候, 把数据append到reduce需要的key的bucket中. 这样M就被去掉了, 每个机器维护R个文件即可. 在这个地方我们解决了小文件的问题, 但是不同进程append同一个文件涉及到一些同步过程和很多优化过程.

    Hash Shuffle

    https://issues.apache.org/jira/browse/SPARK-751

    2.4 Sort Shuffle

    更进一步的, 非常类似日志文件系统的策略

    • 每个Map过程会产生一个shuffle结果文件, 和一个index文件, index文件指出了结果文件中, start-end放的是哪个Reduce需要的key. 这样每台计算节点为了M个文件对好存信息, 最后可以把这M个排序好的文件Merge到一起成为一个大的排序好的带索引的文件.
    • 在真正执行中就变成了, 如果R比较小, spark.shuffle.sort.bypassMergeThreshold = 200, 就走Hash Sort路线. 如果R比较大了, 就走Sort Shuffle路线, 每个MapTask产生一个结果数据, 最后再Merge到一起.
      这个merge过程并不是在写的时候发生的, 而是在有机器过来拉数据的时候触发的!
      Sort Shuffle

    2.5 Tungsten Sort Shuffle

    NODE: 根据更新日志, 这个方案到2.x.x还是在序列化上存在一定的BUG
    实现的思路

    https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html
    钨丝计划并不是单单对Shuffle进行优化, 实际上这个持续到今天的计划是对整个资源调度进行优化, 让更多的序列化的字节码在机器间流动, 让CPU和内存都能够被框架直接分配. 对我来说, 这意味着Spark越来越接近MPP的方案.也许有一天, 会把Spark变成了一个java的MPP + Cgroup, 大量的CPU分片和内存分片都是直接在内核里做

    相关文章

      网友评论

          本文标题:8.3 Shuffle 过程之 ShuffleManager

          本文链接:https://www.haomeiwen.com/subject/yglhcqtx.html