美文网首页Hadoop
Spark sql 小表join大表优化,用filter方法代替

Spark sql 小表join大表优化,用filter方法代替

作者: maozicb | 来源:发表于2019-06-11 22:00 被阅读0次

     优化spark代码的有一条是避免使用会产生shuffle 的算法,比如 join。对于习惯了写sql的人来说,使用spark sql 来分析数据,和常规的关系型数据库写sql的感觉差不多。spark.sql("select  * from tab1 , tab2 where x=y and z<100")  ,分析就写完了,多方便。当然有时候对于这样的写法,性能不一定是最好的,我在学习一个项目的时候,在最后的优化阶段,看着网上的各种优化方法,结合自己的项目看看该如何调到最优。


    图1

    发现这个job用的时间蛮多的,里面对应的shuffle 还不少对应代码的里的 

    sparkSession.sql("select b.* from global_temp.topTenClick a , user_visit_action b where a.session_id=b.session_id")

    有join 产生,这是一个一千行的小表和一个五千万左右的大表join,网上搜了一些资料看看怎么优化,发现一处写的不错

    这个例子里面用的是map操作来避免join,我实际使用中发现map写完执行后报Encoders 相关的错误,我使用了filter进行替换。

    val topTenClickBroadCastArray= topTenClickBroadCast.value.map(row=> row.getString(0)).collect()

    val topTenClickSession = sparkSession.table("user_visit_action").filter( row => topTenClickBroadCastArray.contains(row.getString(2)))


    这样join是避免了,但我运行之后发现job执行时间更长了,一时郁闷

    shuffle没有之后,为什么执行时间边长了? 

    我想了半天,一开始以为是序列化问题,单独修改了序列 化的参数 .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")来使用spark专用序列化,事实表明没用。

    后来发现job执行时间基本都在compuing time上,跟序列化肯定没关系,都花在计算上面了。

    后来我把注意力放在了代还join仅有的一行代码上, topTenClickBroadCastArray.contains(***),我用的是collect方法直接得到的 topTenClickBroadCastArray这个,它属于Array,array是一个带下标的集合,我搜索了一下Array的contain方法是否可以优化,

    发现了这个文章,写的不错,是我想要的

    val topTenClickBroadCastArray= topTenClickBroadCast.value.map(row=> row.getString(0)).collect().toSet

    用toSet 把Array转换成Set后,性能上去了很多。


    从2.3分钟(138秒)到34秒,性能提升4倍多,还是很可观的。

    相关文章

      网友评论

        本文标题:Spark sql 小表join大表优化,用filter方法代替

        本文链接:https://www.haomeiwen.com/subject/aricfctx.html