美文网首页spark
Shuffle流程-源码分析

Shuffle流程-源码分析

作者: 专职掏大粪 | 来源:发表于2021-09-25 20:16 被阅读0次

    shuffle写阶段

    DAGScheduler.submitMissingTasks

      val tasks: Seq[Task[_]] = try {
          val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
          stage match {
            case stage: ShuffleMapStage =>
              stage.pendingPartitions.clear()
              partitionsToCompute.map { id =>
                val locs = taskIdToLocations(id)
                val part = partitions(id)
                stage.pendingPartitions += id
                new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
                  taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
                  Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
              }
    
    
    

    ShuffleMapTask

        try {
          runTask(context)
        }
    

    ShuffleMapTask.runTask

        dep.shuffleWriterProcessor.write(rdd, dep, mapId, context, partition)
    
    

    shuffleWriterProcessor 写处理器

    val manager = SparkEnv.get.shuffleManager
        writer = manager.getWriter[Any, Any](
          dep.shuffleHandle,
          mapId,
          context,
          createMetricsReporter(context))
    

    ShuffleManager


    图片.png

    SortShuffleManager.getWriter

       handle match {
         //处理器
          case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
            //写对象
            new UnsafeShuffleWriter(
              env.blockManager,
              context.taskMemoryManager(),
              unsafeShuffleHandle,
              mapId,
              context,
              env.conf,
              metrics,
              shuffleExecutorComponents)
          case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
            new BypassMergeSortShuffleWriter(
              env.blockManager,
              bypassMergeSortHandle,
              mapId,
              env.conf,
              metrics,
              shuffleExecutorComponents)
          case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
            new SortShuffleWriter(
              shuffleBlockResolver, other, mapId, context, shuffleExecutorComponents)
        }
    

    ShuffleDependency.registerShuffle

      val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
        shuffleId, this)
    

    SortShuffleManager.registerShuffle

      override def registerShuffle[K, V, C](
          shuffleId: Int,
          dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
        //能不能忽略合并的排序
        //1.条件是依赖关系存不存在map端聚合 reducebykey、aggbykey这种(不能存在预聚合算子)
        //2.spark.shuffle.sort.bypassMergeThreshold 阈值(200默认)  >= 依赖赖的分区数目,就可以走bypass
        if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
          // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
          // need map-side aggregation, then write numPartitions files directly and just concatenate
          // them at the end. This avoids doing serialization and deserialization twice to merge
          // together the spilled files, which would happen with the normal code path. The downside is
          // having multiple files open at a time and thus more memory allocated to buffers.
          new BypassMergeSortShuffleHandle[K, V](
            shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
        } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
          // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
     //是否把对象序列化后报错在内存中 
    //1.序列化规则是否支持序列化重定位 (序列化后的属性关联) KVRO支持
    //2.  依赖mapside预聚合 也不能用
    //numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE  (16777215+1) 也不能用
    new SerializedShuffleHandle[K, V](
            shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
        } else {
          // Otherwise, buffer map outputs in a deserialized form:
           //base就是mregesort的
          new BaseShuffleHandle(shuffleId, dependency)
        }
      }
    
    

    SortShuffleWriter.write

      override def write(records: Iterator[Product2[K, V]]): Unit = {
       //排序器,先排序
        sorter = if (dep.mapSideCombine) {
          new ExternalSorter[K, V, C](
            context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
        } else {
          // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
          // care whether the keys get sorted in each partition; that will be done on the reduce side
          // if the operation being run is sortByKey.
          new ExternalSorter[K, V, V](
            context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
        }
        sorter.insertAll(records)
    
        // Don't bother including the time to open the merged output file in the shuffle write time,
        // because it just opens a single file, so is typically too fast to measure accurately
        // (see SPARK-3570).
        val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(
          dep.shuffleId, mapId, dep.partitioner.numPartitions)
       //开始写分区的output
        sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter)
        val partitionLengths = mapOutputWriter.commitAllPartitions().getPartitionLengths
        mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
      }
    
    图片.png

    shuffle读阶段

    DAGScheduler.submitMissingTasks

    
        case stage: ResultStage =>
              partitionsToCompute.map { id =>
                val p: Int = stage.partitions(id)
                val part = partitions(p)
                val locs = taskIdToLocations(id)
                new ResultTask(stage.id, stage.latestInfo.attemptNumber,
                  taskBinary, part, locs, id, properties, serializedTaskMetrics,
                  Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
                  stage.rdd.isBarrier())
              }
    

    ResultTask.runTask

    func(context, rdd.iterator(partition, context))
    
     final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
        if (storageLevel != StorageLevel.NONE) {
          getOrCompute(split, context)
        } else {
          computeOrReadCheckpoint(split, context)
        }
      }
    
      private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
      {
        if (isCheckpointedAndMaterialized) {
          firstParent[T].iterator(split, context)
        } else {
          compute(split, context)
        }
      }
    

    ShuffledRDD.compute

      override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
        val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
        val metrics = context.taskMetrics().createTempShuffleReadMetrics()
        读取对应的file的reader读取数据
        SparkEnv.get.shuffleManager.getReader(
          dep.shuffleHandle, split.index, split.index + 1, context, metrics)
          .read()
          .asInstanceOf[Iterator[(K, C)]]
      }
    

    -BlockStoreShuffleReader
    -BlockStoreShuffleReader.read

    相关文章

      网友评论

        本文标题:Shuffle流程-源码分析

        本文链接:https://www.haomeiwen.com/subject/lwrxnltx.html