美文网首页
SparkSQL应用的一个排错和重构

SparkSQL应用的一个排错和重构

作者: 天之見證 | 来源:发表于2022-11-01 14:48 被阅读0次

    故事背景

    数据处理逻辑:

    1. 将一个json的数组从map结构里面扣出来
    2. 然后将json数组里面的每一个元素和map结构里面的其他元素重新组成一个新的map,存入一个新表

    实现方式:采用SparkSQL实现(Spark 3.1.2)
    问题:数据少了很多

    原始代码逻辑

    insert overwrite table iceberg_table
    select
        id, 
        map_concat(map_filter(config, (k, v) -> k != 'items'), elem)
    from (
        select 
            id, 
            config,
            from_json(config['items'], 'array<map<string,string>>') list
        from table
    ) as a
    LATERAL view explode(list) as elem
    

    怎么发现问题的

    1. 总体条数的减少
    2. 条数不少,但是config这个mapkv个数的减少,采用sum(size(config))

    问题

    explode的问题

    explode的参数如果是null或者数组为空的时候,整行记录都会被清除掉,而不是用一个null来补充数据,而且如果你缺省设置为 array()也是不行的,因为这个数组为空,这行数据还是会被清除掉的

    map_concat的问题

    map_concat如果传入的后续参数里面有null的话,整个函数的返回值也是为null

    总结

    基于以上2点的问题,导致了数据的大量减少

    调整之后的逻辑

    初版

    insert overwrite table iceberg_table
    select
        id, 
        map_concat(map_filter(config, (k, v) -> k != 'items'), COALESCE(elem, map()))
    from (
        select 
            id, 
            config,
            from_json(config['items'], 'array<map<string,string>>') list
        from table
    ) as a
    LATERAL view 
        explode(COALESCE(if(size(list)=0, null, list), array(null))) as elem
    

    这版可以看到,我们需要在2个地方进行修改

    终版

    切入点:将2处修改合并到一处

    insert overwrite table iceberg_table
    select
        id, 
        map_concat(map_filter(config, (k, v) -> k != 'items'), elem)
    from (
        select 
            id, 
            config,
            from_json(config['items'], 'array<map<string,string>>') list
        from table
    ) as a
    LATERAL view 
    explode(COALESCE(if(size(list)=0, null, list), array(map())) as elem
    

    相关文章

      网友评论

          本文标题:SparkSQL应用的一个排错和重构

          本文链接:https://www.haomeiwen.com/subject/vewrtdtx.html