图片出处:https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/4-shuffleDetails.md image.png
1.ShuffleMapTasks
image.png image.png image.png image.pngdirver像executor发送LaunchTask
image.png
executor接收到序列化好的task后,反序列化后执行task
image.png
CoarseGrainedExecutorBackend
里的executor实际上是Executor
类
new TaskRunner来执行task
image.png image.png image.png
所以最后是调用ShuffleMapTask.runTask()
,并且通过SparkEnv
获取shuffleManager,SparkEnv初始化了很多重要的组件
shuffleManager获取writer,默认的writer是HashShuffleManager
,获取HashShuffleWriter
调用write方法
这里有个调优参数
spark.shuffle.consolidateFiles
image.png
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
blockManager.getDiskWriter
这里可以看出,使用了NIO的文件api
image.png
回到executor的run方法中
image.png image.png
这里分析
env.blockManager.putBytes( blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
写数据逻辑
image.png
spill到磁盘的逻辑
image.png image.png image.png
复制副本操作
image.png image.png
最后写完数据后,上报给driver的MapStatusTracker
driver端收到后
image.png image.png image.png image.png image.png image.png image.png image.png
2.ResultTask
image.png image.png image.png3.ShuffledRDD
image.png image.png image.png通过HashShuffleReader来读取数据
image.png image.png image.png image.png image.png image.png
网友评论