一般的join如果两边的join条件是一一对应是性能比较好的情况,但是当遇到join条件两边存在多行对应一行或者多行对应多行的情况。就会出现M行join N行 结果就会产生笛卡尔积M*N行。如果M和N都不大还好。通过shuffle到磁盘一样能计算出来。但是如果M和N都很大而且不止两表,有很多表同时join。那么将会出现final stage之前将shuffle write M*N*K*P... 无限扩张的数据集。而且根本没法优化。
如下DAG:
cross join本来只有百万级别的数据量,经过cross join后 数据量达到百亿千亿级别。将耗费大量时间去shuffle write,也会导致大量executor磁盘消耗。
join条件皆为非对等join刚开始可能会想如果将左表进行collect操作 先减少主表的数据量就会减少整个DAG的数据量。但是collect之后只能在map/foreach函数进行接下来的操作。就会导致整个DAG产生大量的小job,能算了 但是特别慢而且不优雅。
因为经过join多表才产生的cross join,那么不妨把多表进行单独的俩表join,就算产生cross join 数据量也是可控的。
但是spark的DAG是根据整体算子构建最后的action算子才会触发计算,那么无论何种方式进行DAG构建,最终还是会汇总到一起再计算。还是会产生多表cross join。
那么问题就来到如何提前触发俩表join的提前计算了。
众所周知repartition可以触发shuffle,但是shuffle只是改变分区数,shuffle=提前计算吗?
于是我就试了一试。果然。。。
俩表之间cross joinrepartition触发了shuffle,相当于触发了提前计算。大大减少了数据量 而且最后还能以对等join进行连接。
网友评论