美文网首页
spark源码阅读——shuffle写

spark源码阅读——shuffle写

作者: WJL3333 | 来源:发表于2018-08-17 17:44 被阅读11次

    groupByKey这个操作一般会产生两个RDD:

    • (map操作)MapPartitionsRDD
    • (隐式转换之后聚合)ShuffledRDD
    def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
          : RDD[(K, Iterable[T])] = withScope {
        val cleanF = sc.clean(f)
        this.map(t => (cleanF(t), t)).groupByKey(p)
      }
    

    RDD -> Stage -> TaskSet -> Task 过程略

    Stage

    DAGScheduler根据RDD依赖关系拆分成Stage

    /**
       * Creates a ShuffleMapStage that generates the given shuffle dependency's partitions. If a
       * previously run stage generated the same shuffle data, this function will copy the output
       * locations that are still available from the previous shuffle to avoid unnecessarily
       * regenerating data.
       */
      def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
          ...
        val stage = new ShuffleMapStage(
          id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)
          ...
        if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
          ...
          mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
        }
        stage
      }
    

    创建过程中实际上向MapOutputTracker注册了Shuffle,让整个集群获知这个Shuffle。

    MapOutputTracker

    这个类是用来记录每个shuffle产生的输出的,集群中Driver端(主)和Executor端(从)都存在这个对象,实际上形成了一个master-slave模型,用来同步和传递信息。

    • Driver端,MapOutputTrackerMaster每个Task完成产生的输出文件信息都会被上报给Driver端并保存在这个对象中。

    • Executor端,MapOutputTrackerWorker 在任务运行的过程中,如果需要依赖之前shuffle的输出则这个Worker会先查看本地的Shuffle输出缓存,如果缓存没有则向master查询相关信息,知道了以来的输出文件的位置信息后方便下一步的读取。

    这里内部是有一个优化的

    // The size at which we use Broadcast to send the map output statuses to the executors
      private val minSizeForBroadcast =
        conf.getSizeAsBytes("spark.shuffle.mapOutput.minSizeForBroadcast", "512k").toInt
    

    如果这个Shuffle输出信息过大,则集群内部改用广播变量的方式进行传递

    Task

    ShuffleMapStage会被拆分成一组ShuffleMapTask
    随后这个Task会被发到Executor上运行

    private[spark] class ShuffleMapTask(
        stageId: Int,
        stageAttemptId: Int,
        taskBinary: Broadcast[Array[Byte]],
        partition: Partition,
        @transient private var locs: Seq[TaskLocation],
        localProperties: Properties,
        serializedTaskMetrics: Array[Byte],
        jobId: Option[Int] = None,
        appId: Option[String] = None,
        appAttemptId: Option[String] = None)
      extends Task[MapStatus](stageId, stageAttemptId, partition.index, localProperties,
        serializedTaskMetrics, jobId, appId, appAttemptId)
    

    说明一下参数
    taskBinary每个被拆分的task的任务信息(RDD和依赖信息),实际上是每个算子操作产生的RDD,只不过这里被序列化并被当成广播变量直接发到集群中来了

    任务信息实际上是 (RDD[_], ShuffleDependency[_, _, _])

    这个ShuffleDependency创建出来的时候会向ShuffleManager注册shuffle的信息。并返回一个handle(句柄,后面用这个),这个handle也被序列化到了任务信息中。

    ShuffleManager

    SparkEnv中包含了很多在任务运行时需要的对象。
    ShuffleManager便是负责Shuffle过程的
    主要功能

    • 注册一个Shuffle(用来唯一标识一个Shuffle)并获得一个handle
    • 输出Shuffle文件(一般输出到本机,作为临时文件)
    • 读取Shuffle输出的文件(其他任务以来Shuffle结果的时候,需要获取依赖的Shuffle信息,这是集群环境,数据不一定在本机上)

    执行过程

    Executor收到LaunchTask信息之后反序列化信息,之后运行了runTask方法

    override def runTask(context: TaskContext): MapStatus = {
          ...
        val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
          ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
          ...
        var writer: ShuffleWriter[Any, Any] = null
        try {
          val manager = SparkEnv.get.shuffleManager
          writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
          writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
          writer.stop(success = true).get
        } catch {
          ...
        }
      }
    

    这个方法向ShuffleManager 要了一个Writer直接把计算结果输出到本地文件整个Task就运算完了。

    rdd.iterator -> rdd.compute

    final def iterator(split: Partition, context: TaskContext): Iterator[T]
    
    def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
          : RDD[(K, Iterable[T])] = withScope {
        val cleanF = sc.clean(f)
        this.map(t => (cleanF(t), t)).groupByKey(p)
      }
    

    也就是说这里把之前的MapPartitionsRDD的运算结果直接输出到了本地文件中。

    SortShuffleWriter

    这个对象从SortShuffleManager中获取的,作用很简单,写东西到本地文件,上报状态给OutputTrackerMaster方便其他任务获取输出结果。

    override def write(records: Iterator[Product2[K, V]]): Unit = {
        sorter = 
        ...
          new ExternalSorter[K, V, C](
            context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
        ...
        sorter.insertAll(records)
        ...
    
        val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
        val tmp = Utils.tempFileWith(output)
        try {
          val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
          val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
          shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
          mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
        } finally {
          if (tmp.exists() && !tmp.delete()) {
            logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
          }
        }
      }
    
    

    代码上实际上初始化了一个ExternalSorter如果这次Shuffle支持mapsidejoin的话则在map阶段会聚合一次,然后根据参数判断是否需要排序,输出处理过后会按照分区输出结果到临时文件中,之后会写一个index文件,并聚合之前按照分区分散的文件(原来有很多个文件,现在拼接在一块,如何区分新文件中的每一块呢?记录原来文件的长度作为index文件标记即可,之后按照index,seek游标到指定位置读取即可)

    结束后会生成一个MapStatus对象,会被Executor发送回Driver,保存在MapOutputTrackerMater的数据结构中。

    相关文章

      网友评论

          本文标题:spark源码阅读——shuffle写

          本文链接:https://www.haomeiwen.com/subject/scftiftx.html