美文网首页
Spark1.3.1Shuffle源码分析

Spark1.3.1Shuffle源码分析

作者: LancerLin_LX | 来源:发表于2018-06-17 17:02 被阅读0次
    image.png
    图片出处:https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/4-shuffleDetails.md image.png

    1.ShuffleMapTasks

    image.png image.png image.png image.png

    dirver像executor发送LaunchTask


    image.png

    executor接收到序列化好的task后,反序列化后执行task


    image.png

    CoarseGrainedExecutorBackend里的executor实际上是Executor

    image.png

    new TaskRunner来执行task


    image.png image.png image.png

    所以最后是调用ShuffleMapTask.runTask(),并且通过SparkEnv获取shuffleManager,SparkEnv初始化了很多重要的组件

    image.png

    shuffleManager获取writer,默认的writer是HashShuffleManager,获取HashShuffleWriter调用write方法

    image.png image.png image.png
    这里有个调优参数spark.shuffle.consolidateFiles
    image.png

    val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)

    image.png

    blockManager.getDiskWriter

    image.png image.png

    这里可以看出,使用了NIO的文件api


    image.png

    回到executor的run方法中


    image.png image.png

    这里分析
    env.blockManager.putBytes( blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)

    image.png image.png image.png image.png

    写数据逻辑


    image.png

    spill到磁盘的逻辑


    image.png image.png image.png

    复制副本操作


    image.png image.png

    最后写完数据后,上报给driver的MapStatusTracker

    image.png image.png

    driver端收到后


    image.png image.png image.png image.png image.png image.png image.png image.png

    2.ResultTask

    image.png image.png image.png

    3.ShuffledRDD

    image.png image.png image.png

    通过HashShuffleReader来读取数据


    image.png image.png image.png image.png image.png image.png

    相关文章

      网友评论

          本文标题:Spark1.3.1Shuffle源码分析

          本文链接:https://www.haomeiwen.com/subject/awkbeftx.html