spark-shuffle v4

作者: LancerLin_LX | 来源:发表于2020-07-17 20:21 被阅读0次

    spark shuffle

    Shuffle就是对数据进行重组,由于分布式计算的特性和要求,在实现细节上更加繁琐和复杂
    在MapReduce框架,Shuffle是连接Map和Reduce之间的桥梁,Map阶段通过shuffle读取数据并输出到对应的Reduce;而Reduce阶段负责从Map端拉取数据并进行计算。在整个shuffle过程中,往往伴随着大量的磁盘和网络I/O。所以shuffle性能的高低也直接决定了整个程序的性能高低。Spark也会有自己的shuffle实现过程。原文链接:https://blog.csdn.net/zhanglh046/article/details/78360762

    总的来说,spark跟MR的shuffle并没有多大区别,都涉及到map(写数据的阶段),跟reduce(读数据阶段)。

    spark shuffle 执行流程

    本文通过源码分析spark shuffle的执行过程,以及相关参数的调优。

    通过分析spark 提交的源码,我们可以知道,最终调用的是org.apache.spark.scheduler.TaskrunTask方法,而Task有2个子类,ShuffleMapTask(write(也可能存在先read后write,最后阶段是write)相当于MR中的Map阶段)跟ResultTask(开始阶段是read,相当于MR中的Reduce阶段)

    image.png
    image.png

    shuffle write阶段

    image.png

    首先,shuffle write分为write buffer、spill disk、merge file 三个阶段,分别对应上图的1,2,3

    • write buffer: 将数据写到缓存buffer中,如果容量大于0.7,则会申请扩容buffer,如果能够申请到内存,就扩容继续写buffer
    • sort and spill: 内存不足的时候,就会把缓存的数据排序后spill 到磁盘中
    • merge file: 经过1,2阶段后,会产生多个spill后的文件,以及buffer内存中的数据,将内存以及磁盘的数据,排序后合并成1个分区且排序的大文件(数据文件),跟一个索引文件(描述数据文件),reducer会根据索引文件,拉取数据文件所需要的数据。

    shuffle read阶段

    image.png

    shuffle read阶段,其实就是读取数据的阶段,你可以理解成,client向server发送请求,下载数据。主要是从client读取数据的过程,超时、并发度、异常重试等方面入手,server端则通过调整处理的并发数方面入手。

    • Shuffle read过程
      Reduce解析需要读取的数据,封装成一个个请求,向每个mergeData file读取相应的数据。

    ExternalShuffleService

    Spark 的 Executor 节点不仅负责数据的计算,还涉及到数据的管理。如果发生了 shuffle 操作,Executor 节点不仅需要生成 shuffle 数据,还需要负责处理读取请求。如果 一个 Executor 节点挂掉了,那么它也就无法处理 shuffle 的数据读取请求了,它之前生成的数据都没有意义了。

    为了解耦数据计算和数据读取服务,Spark 支持单独的服务来处理读取请求。这个单独的服务叫做 ExternalShuffleService,运行在每台主机上,管理该主机的所有 Executor 节点生成的 shuffle 数据。有读者可能会想到性能问题,因为之前是由多个 Executor 负责处理读取请求,而现在一台主机只有一个 ExternalShuffleService 处理请求,其实性能问题不必担心,因为它主要消耗磁盘和网络,而且采用的是异步读取,所以并不会有性能影响。

    解耦之后,如果 Executor 在数据计算时不小心挂掉,也不会影响 shuffle 数据的读取。而且Spark 还可以实现动态分配,动态分配是指空闲的 Executor 可以及时释放掉。


    image.png

    spark shuffle 参数调优

    为了进一步优化内存的使用以及提高Shuffle时排序的效率,Spark引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据。除了没有other空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存。所以,开启堆外内存对于调优非常重要

    • spark.shuffle.io.preferDirectBufs:是否优先使用堆外内存
    • spark.memory.offHeap.enabled: 是否启用堆外内存
    • spark.memory.offHeap.size: 设置堆外内存大小

    shuffle write阶段参数调优

    提高task可用内存,减少spill文件

    • spark.executor.memory

    shuffle read阶段参数调优

    提高拉取的并行度

    • spark.sql.shuffle.partitions
    • spark.shuffle.file.buffer
      提高拉取的数据量
    • spark.reducer.maxSizeInFlight
    • spark.reducer.maxReqsInFlight
    • spark.reducer.maxBlocksInFlightPerAddress
    • spark.reducer.maxReqSizeShuffleToMem
    • spark.shuffle.io.clientThreads
      提高重试、超时增加拉取数据的容错性
    • spark.shuffle.io.maxRetries
    • spark.shuffle.io.retryWait

    ExternalShuffleService参数调优

    ExternalShuffleService本质是一个基于Netty写的Netty服务,所以相关调优就是对Netty参数的调优,主要有以下这些参数,具体调整,需要根据实际情况做出相应的调整,提高服务稳定性。

    服务启动时处理请求的线程数,默认是服务器的cores * 2
    spark.shuffle.io.serverThreads
    spark.shuffle.io.receiveBuffer
    spark.shuffle.io.backLog
    spark.shuffle.io.sendBuffer

    实战

    核心ETL任务调优

    • 首先确认调优目标:比原来的执行时间减少50%以上
    • 统计出每个ETL任务的数据量,根据上文的调优方向,write、read和ExternalShuffleService各个阶段的调优方向
    • 经过大量的测试验证,最终我们发现,数据量在1亿左右使用1T1P120C的使用配置,对比原来的资源,资源占用增加1倍,耗时减少90%
    • 3-5亿数据,使用1T1P120C资源使用配置,资源增加20%~60%,时间减少55%
    • 28亿数据,使用3T3P600C资源使用减少40%,速度提升3.8倍

    T: 使用的内存 1T=1024G
    P: 配置spark.sql.shuffle.partitions,1P=1000
    C: cpu cores数量

    ETL任务调优效果

    image.png

    参考链接
    https://blog.csdn.net/zhanglh046/article/details/78360762
    https://github.com/JerryLead/SparkInternals
    https://www.cnblogs.com/itboys/p/9201750.html
    https://www.dazhuanlan.com/2019/12/19/5dfb2a10d780d/
    https://blog.csdn.net/pre_tender/article/details/101517789

    相关文章

      网友评论

        本文标题:spark-shuffle v4

        本文链接:https://www.haomeiwen.com/subject/oyrnhktx.html