美文网首页
基于binlog构建拉链表

基于binlog构建拉链表

作者: HenlyX | 来源:发表于2021-05-25 18:22 被阅读0次

    前言:缓慢变化知识点回顾

    缓慢变化分三种类型:

    type1:当数据发生变化时,直接覆盖旧值

    type2:当数据发生变化时,新增一行

    这里的DWID 也就是我们日常项目使用的代理键CustomerKey

    type3:当数据发生变化时,新增一列

    传统的拉链表分类(按照实现时的方式):

    增量拉链表:

            1.适用范围:         1).有增量字段可以识别数据更新,且业务数据变动时,增量时间会更新;2).数据不存在物理删除

            2.实现方式:         2).基于mysql表的增量数据实现,此处我们假设增量数据存放于stg_a001_table1,全量表存放于ods_a001_table1,uuid为构造字段,具体逻辑为,md5(concat(nvl(a),nvl(b)...)),也就是拉链比对字段空处理后拼接,再MD5的值

    则具体实现可体现为:

    select

        oat.ods_start_dt,

        if(sat.id is not null and oat.uuid<>sat.uuid,'${ymd}',oat.ods_end_dt) as ods_end_dt -- 对更新数据进行关链

        oat.uuid,

        oat.其它字段

    from ods_a001_table1 oat

    left join stg_a001_table1 sat on oat.id=sat.id and sat.ymd='${ymd}' -- stg_a001_table1最好设置为分区表

    where oat.ods_start_dt<='${ymd}' and oat.ods_end_dt>'${ymd}'

    union all

    select

        '${ymd}' as ods_start_dt,

        '99991231' as ods_end_dt, -- 新增和更新数据进行开链

        sat.uuid,

        sat.其它字段

    from stg_a001_table1 sat

    where sat.ymd='${ymd}'

    全量拉链表:

     1.适用范围:         1).无增量字段可以识别数据更新;2).数据存在物理删除

     2.实现方式:         2).基于mysql表的增量数据实现,此处我们假设同步全量数据存放于stg_a001_table1,全量表存放于ods_a001_table1

    select

        oat.ods_start_dt,

        case when sat.id is null or (sat.id is not null and oat.uuid<>sat.uuid),'${ymd}',oat.ods_end_dt) as ods_end_dt, -- 对更新和删除进行关链

        oat.uuid,

        oat.其它字段

    from ods_a001_table1 oat

    left join stg_a001_table1 sat on oat.id=sat.id and sat.ymd='${ymd}' -- stg_a001_table1最好设置为分区表

    where oat.ods_start_dt<='${ymd}' and oat.ods_end_dt>'${ymd}'

    union all

    select

        '${ymd}' as oat.ods_start_dt,

        '99991231' as ods_end_dt, -- 对插入和更新的开链

        sat.uuid,

        sat.其它字段

    from stg_a001_table1 sat

    left join ods_a001_table1 oat on oat.id=sat.id and oat.ods_start_dt<='${ymd}' and oat.ods_end_dt>'${ymd}'-- stg_a001_table1最好设置为分区表

    where sat.ymd='${ymd}'

    and (oat.id is null or (oat.id is not null and oat.uuid<>sat.uuid))

    基于binlog的实现

    binlog的数据我们通过canel已经写入kafka,所以此处只需要通过flume去实时消费对应的topic并写入hdfs即可。

    1.对每天路径下的数据进行去重

    1).创建外表tmp_stg_a001_table1映射到hdfs上的增量数据存储路径

    2).通过row_number函数,按照主键,对数据进行去重,只保留最后一条数据,将数据写入表stg_a001_table1

    2.基于去重数据对数据进行拉链处理( binlog_type为每条数据的操作类型 D标识删除 I为插入、U为更新)

    select

        oat.ods_start_dt,

        if((sat.id is not null and oat.uuid<>sat.uuid) or sat.binlog_type='D','${ymd}',oat.ods_end_dt) as ods_end_dt -- 对更新和删除数据进行关链

        oat.uuid,

        oat.其它字段

    from ods_a001_table1 oat

    left join stg_a001_table1 sat on oat.id=sat.id and sat.ymd='${ymd}' -- stg_a001_table1最好设置为分区表

    where oat.ods_start_dt<='${ymd}' and oat.ods_end_dt>'${ymd}'

    union all

    select

        '${ymd}' as ods_start_dt,

        '99991231' as ods_end_dt, -- 新增和更新数据进行开链

        sat.uuid,

        sat.其它字段

    from stg_a001_table1 sat

    where sat.ymd='${ymd}' and sat.binlog_type<>'D' -- 当日新增且当日删除的数据无需处理

    相关文章

      网友评论

          本文标题:基于binlog构建拉链表

          本文链接:https://www.haomeiwen.com/subject/xoqksltx.html