美文网首页
Spark踩坑vlog——join时shuffle的大坑

Spark踩坑vlog——join时shuffle的大坑

作者: 小段DSH12138 | 来源:发表于2021-07-25 21:39 被阅读0次

    业务背景

        项目中将两个表进行join,一个大表,一个小表,在平时200 executor-core * 20G executor-memory的资源下跑的挺好的,随着业务数据的增加,有一天,这个任务就跑不出来了,重试5次每次都失败,最后任务报错;
        报错时,俩表情况如下:大表的数据量约为278亿,1TB左右,另一个的数据量约为480万,4GB左右;通过DAG图发现,任务卡在俩表join的那个stage上;

    报错信息

    1.spark sql实现报错

        当使用SparkSQL对俩表进行join时,报错为:
    org.apache.spark.shuffle.MetadataFetchFailedException:Missing an output location for shuffle 0
    以及
    org.apache.spark.shuffle.FetchFailedException:Failed to connect to hostname:port

    2.rdd实现报错

        当使用rdd对俩表进行join时,报错为:
    WARN TaskSetManager:Lost task 17.1 in stage 4.1:java.io.FileNotFoundException:一个文件/目录
    以及
    org.apache.spark.shuffle.FetchFailedException:Error in opening FileSegmentManagedBuffer

    梳理过程

        通过看这些错哈,能发现是在join时产生的shuffle出了问题,那么我们对shuffle进行分析一下:shuffle过程分为两部分shuffle write和shuffle read;
        其中shuffle write就相当于write in local memory,这个过程中的分区数,是由上一个阶段的rdd分区数来决定的;shuffle read就是把数据读出来,然后在根据其对应的key进行reduce,得到结果;
        分析了这两个过程,发现报错中,Missing an output location for shuffle 0还有其他的报错,原因都是因为,在一个task执行对应数据计算的时候,太大了,最终失败,导致心跳检测无法检测到或者是超过了connection-time,所以最终找不到这个task及结果;那么我们还是得从降低这个task中处理的数据来入手。

    优化策略

        首先,减小分区数据的最直接的办法,就是将整个数据都变小,所以尽可能在shuffle前,把改filter掉的数据全都过滤掉;其次,最粗暴的办法就是直接增大每个executor-memory,直接增大每个task的可支配内存;接着,通过“少量多次”的思想,把shuffle时的数据,分成尽可能多的、合理的分区数,官方建议分区数为程序总executor-core的2~3倍;最后,那就是投机取巧一点的办法了,想办法避开shuffle,如:使用map side join或broadcast join来避免shuffle过程;
        小tips:两表join时,将较小的表放在右边,这样会将小表读进内存,与接下来的大表匹配;hive则相反;
        总结一下:

    • 1.增大资源
    • 2.filter
    • 3.调整shuffle partition的数量
    • 4.是否数据倾斜
    • 5.map side join或broadcast join

    具体实施效果

        此次出现的bug,不适用于方案2、5,因为那些数据都是要用的,并且数据量太大,不支持广播等;

    方案1

        把整个程序的资源增加到executor-core=3; executor-num=100; executor-memory=30g; driver-memory=20g最终的结果还是在join时的最后两个task上失败了;

    方案3

        使用了SparkSQL,在Spark SQL中设置shuffle时的分区时,应该设置参数spark.sql.shuffle.partition,这个参数默认为200,按照官方建议的总程序executor-core的2~3倍,设置值为800;
        并且为了更好的将数据打散,将两个表单独select出来后,小表的分区从自身的200 repartition到800,大表则按照自身的3200参与计算;
        发现运行结果,最终卡在join最后的4个task上,4个task失败,导致最终失败;

    方案4

        通过查看,发现最终失败的几个task,各自计算的memory都为17.8G及以上,因此最终还是数据倾斜,调整数据倾斜的方式有很多,因为我想早点睡觉,之后如果翻看到的同学,就去翻翻前面的文章吧。
        最终定位到此次数据倾斜的原因是因为,两个表的join字段的数据类型不一致,大表的关联字段为String型,小表的关联字段为bigint型;在关联前,对小表执行cast(bigint to string),然后再join,并加上以上方案的行为,之后的task分区就变得均匀多了,成功运行~

    相关文章

      网友评论

          本文标题:Spark踩坑vlog——join时shuffle的大坑

          本文链接:https://www.haomeiwen.com/subject/dryimltx.html