https://blog.csdn.net/weixin_35353187/article/details/84303518
数据倾斜发生的原理 :
在进行shuffle的时候,必须将各个节点上相同的Key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或者join操作。如果某个key对应的数据量特别大的话,会发生数据倾斜。比如大部分key对应的10条数据,但个别key却对应了100万条数据,那么大部分task会只分配到10条数据,而个别task可能会分配了100万数据。整个spark作业的运行进度是由运行时间最长的那个task决定的。
- 数据倾斜会发生在数据开发的各个环节中,通过日志和查看监控数据可以看见:
- 用Hive算数据的时候reduce阶段卡在99.99%(Hive的数据倾斜,一般都发生在Sql中Group和On上,而且和数据逻辑绑定比较深。)
- 用SparkStreaming做实时算法时候,一直会有executor出现OOM的错误,但是其余的executor内存使用率却很低。数据倾斜有一个关键因素是数据量大,可以达到千亿级。
- 万恶的shuffle
数据倾斜只会发生在shuffle中,下面是常用的可能会触发shuffle操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。出现数据倾斜时,一旦触发,所有相同key的值就会拉到一个或几个节点上,就容易发生单点问题。
image.png- 从业务计角度来理解数据倾斜
我们在某一天在北京和上海两个城市做了强力的推广,结果可能是这两个城市的订单量增长了10000%,其余城市的数据量不变。然后我们要统计不同城市的订单情况,这样,一做group操作,可能直接就数据倾斜了。
技术上如何解决?
1、增加jvm内存,这适用于唯一值非常少,极少数值有非常多的记录值的情况(唯一值少于几千),这种情况下,往往只能通过硬件的手段来进行调优,增加jvm内存可以显著的提高运行效率。因为分布相比2还算均匀一些。
2、增加reduce的个数,这适用于唯一值比较多,这个字段的某些值有远远多于其他值的记录数,但是它的占比也小于百分之一或千分之一的情况,我们知道,这种情况下,最容易造成的结果就是大量相同key被partition到一个分区,从而一个reduce执行了大量的工作,而如果我们增加了reduce的个数,这种情况相对来说会减轻很多,毕竟计算的节点多了,就算工作量还是不均匀的,那也要小很多。
3、自定义分区,这需要用户自己继承partition类,指定分区策略,这种方式效果比较显著。
4、重新设计key(哈希算法),有一种方案是在map阶段时给key加上一个随机数,有了随机数的key就不会被大量的分配到同一节点(小几率),待到reduce后再把随机数去掉即可。
image.png5、使用combinner合并,combinner是在map阶段,reduce之前的一个中间阶段,在这个阶段可以选择性的把大量的相同key数据先进行一个合并,可以看做是local reduce,然后再交给reduce来处理,这样做的好处很多,即减轻了map端向reduce端发送的数据量(减轻了网络带宽),也减轻了map端和reduce端中间的shuffle阶段的数据拉取数量(本地化磁盘IO速率),推荐使用这种方法。
网友评论