美文网首页
8.2 Shuffle 过程之 MapOutputTracker

8.2 Shuffle 过程之 MapOutputTracker

作者: GongMeng | 来源:发表于2018-11-26 17:38 被阅读0次

    1. 概述

    MapOutputTracker用来跟踪中间过程Stage的输出, 为后续的shuffle过程准备好上游的数据. 这些数据的句柄由BlockManager来管理, 大小由BlockManager来估计.

    在Driver端跑着MapOutputTrackerMaster
    在Executor端跑着MapOutputTrackerWorker
    通信使用的是一个注册到akka的RPC调用MapOutputTrackerMasterEndpoint

    整个MapOutputTracker被维护在sparkEnv结构, sparkEnv结构是sparkContexty的一部分, 只要持有sc, 就可以对应的使用这个结构了.

    2. MapOutputTrackerMasterEndpoint的基本概念

    作为一个RPC调用, 它处理两个时间

    • GetMapOutputStatuses(shuffleId: Int)
      得到ShuffleId, 继而判断传递的shuffle status结构序列化后是否超过了akka的最大消息体积(默认128MB)

    • StopMapOutputTracker
      停止这个MapOutputTracker

    3. MapOutputTracker的Abstract对象

    3.1 初始化和维护的结构

    MapOutputTracker是一个abstract class, 后边两个具体实现分别是Master端和Worker端. 它们分别维护了不同的映射表, Master端需要维护的是全局的映射表, 而Worker端只维护本地的即可

    初始化方法和说明如下

    /**
     * Class that keeps track of the location of the map output of
     * a stage. This is abstract because different versions of MapOutputTracker
     * (driver and executor) use different HashMap to store its metadata.
     */
    private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
    
    • Driver端维护master的RPCendpoint
      /** Set to the MapOutputTrackerMasterEndpoint living on the driver. */
      var trackerEndpoint: RpcEndpointRef = _
    
    • 维护shuffleMapTask的句柄, 从这个句柄可以拿到Task, 继而拿到Task涉及的状态和Block, 继而可以获取数据或者操作数据
      /**
       * This HashMap has different behavior for the driver and the executors.
       *
       * On the driver, it serves as the source of map outputs recorded from ShuffleMapTasks.
       * On the executors, it simply serves as a cache, in which a miss triggers a fetch from the
       * driver's corresponding HashMap.
       *
       * Note: because mapStatuses is accessed concurrently, subclasses should make sure it's a
       * thread-safe map.
       */
      protected val mapStatuses: Map[Int, Array[MapStatus]]
    
    • 计数锁结构
      /**
       * Incremented every time a fetch fails so that client nodes know to clear
       * their cache of map output locations if this happens.
       */
      protected var epoch: Long = 0
      protected val epochLock = new AnyRef
    
    • 记录哪些数据正处于被拉取状态
      /** Remembers which map output locations are currently being fetched on an executor. */
      private val fetching = new HashSet[Int]
    

    3.2 重要的方法

    • getMapSizesByExecutorId
      获取shuffle block的状态和所在的位置以及大小等基本信息
    /**
       * Called from executors to get the server URIs and output sizes for each shuffle block that
       * needs to be read from a given reduce task.
       *
       * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
       *         and the second item is a sequence of (shuffle block id, shuffle block size) tuples
       *         describing the shuffle blocks that are stored at that block manager.
       */
      def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int)
          : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
        getMapSizesByExecutorId(shuffleId, reduceId, reduceId + 1)
      }
    
      /**
       * Called from executors to get the server URIs and output sizes for each shuffle block that
       * needs to be read from a given range of map output partitions (startPartition is included but
       * endPartition is excluded from the range).
       *
       * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
       *         and the second item is a sequence of (shuffle block id, shuffle block size) tuples
       *         describing the shuffle blocks that are stored at that block manager.
       */
      def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
          : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
        logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition")
        val statuses = getStatuses(shuffleId)
        // Synchronize on the returned array because, on the driver, it gets mutated in place
        statuses.synchronized {
          return MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses)
        }
      }
    
    • getStatuses(shuffleId: Int)
      获得shuffle output的基本的状态

    相关文章

      网友评论

          本文标题:8.2 Shuffle 过程之 MapOutputTracker

          本文链接:https://www.haomeiwen.com/subject/ldqbqqtx.html