美文网首页
Spark计算中的数据倾斜

Spark计算中的数据倾斜

作者: 天之見證 | 来源:发表于2022-01-13 16:35 被阅读0次

    本文的讨论场景限定在spark计算引擎,但是并不局限于spark,相关的讨论可以迁移到其他的计算引擎
    Spark计算中什么是数据倾斜?
    所有的数据倾斜,以task粒度来说,就是单个task的数据条数,相比较其他task的数据条数大很多倍,具体我们可以通过task summary看出来,max的时候,读取的数据特别多

    除了一种极端情况,数据条数差异不大,但是由于单个字段大小的不同,可能出现数据容量差异很大,可能一个task中1000条数据是100k,在另一个task中1000条数据是100m,这里不讨论这种情况

    数据倾斜.png

    那么为什么会有这种情况出现呢?我们分2种场景讨论:

    1. 初始直接从存储层读数据

    一般来讲,这一层不会出现读数据倾斜,因为使用hash分片来读数据,hash可以保证每个分区读取数据条数的稳定
    但是如果是带有谓词下推的读取数据的话,会出现数据倾斜,这种情况无可厚非,而且不需要特别解决,因为后续的数据处理过程中,数据的再次shffule会磨平这种情况,所以不需要特别关注

    2. 任务计算中

    任务计算过程中,由于涉及到shuffle,导致数据会重分区,一些场景下容易造成数据倾斜

    2.1 group by场景

    group by的时候,如果使用的key有明显的倾斜倾向就会出现数据倾斜
    例如某款应用的用户信息的详情表有如下字段:

    -- user_tb
    city_id: string  //用户所在城市id,例如深圳,广州
    user_id: bigint  // 用户id
    
    select city_id, count(1)
    from user_tb
    group by city_id
    

    执行以上SQL的时候,spark会以city的hash做分区,可以想见如果某个分区里面有北京这样的城市的话,分区里面的数据会特别多
    这种情况下的数据倾斜可以添加随机前缀+二次聚合来处理

    2.2 join 场景

    2.2.1 双表

    双表join涉及到3个stage,以下面是sql为例

    select user_tb.user_id, city_info.city_name
    from user_tb
    left join city_info
    on user_tb.city_id = city_info.id
    

    2个stage是分别读取user_tbcity_info的阶段,经历2个读文件的阶段,这个时候不会有数据倾斜,但是读完文件之后,由于下一个步骤是shuffle,所以2个表都要进行shuffle write,并且以city_id的hash来重分区,这时候就类似上面group by的场景了,会出现某些分区特别大,真正第3个stage来join数据的时候就可以看出差异了,具体可以参看上面数据倾斜的截图

    在这种情况下,由于city_info比较小,可以采用mapjoin的方式,避免掉shuffle过程,直接一个stage就处理数据了

    2.2.2 三表

    为什么单拿出三表join的场景呢, 因为大多数情况下的时候可以将其看作2次双表join来处理,但是有些特殊的情况下,第二次的双表join的时候,数据的情况会有些变化,导致不好排查

    以下面的sql为例:

    select coalesce(user_df.id, user_di.id) id, other_user.info
    from user_df --用户全量表
    full join user_di --用户增量表
    on user_df.id = user_di.id
    left join other_user  --其他用户信息
    on user_id.id = other_user.id
    

    上面的写法是没有问题的,在第一个join的时候不会有数据倾斜
    但是在第二个join的时候会有问题,示意图如下


    三表join.jpg

    比较难理解的是后面user_di的那部分数据会被补成null
    由于user_dfuser_difull join来关联的,关联之后,我们如果输出user_idid字段,可以看到会有很多是null的,然后在这种场景下面再去join other_user表的时候,user_id.id就并不是它原来的值了,使用这个时候的user.id做分区的时候会有1个分区出现数据倾斜,并且这个分区是有很多null的,量级差不多就是user_dfuser_id的差集了,要解决这种数据也很容易,修改join的条件为

    select coalesce(user_df.id, user_di.id) id, other_user.info
    from user_df --用户全量表
    full join user_di --用户增量表
    on user_df.id = user_di.id
    left join other_user  --其他用户信息
    on coalesce(user_df.id, user_di.id) = other_user.id
    

    最后一个on的调整将null的数据补充上去,就可以解决这个问题了

    相关文章

      网友评论

          本文标题:Spark计算中的数据倾斜

          本文链接:https://www.haomeiwen.com/subject/vmgncrtx.html