前言:缓慢变化知识点回顾
缓慢变化分三种类型:
type1:当数据发生变化时,直接覆盖旧值
type2:当数据发生变化时,新增一行
这里的DWID 也就是我们日常项目使用的代理键CustomerKey
type3:当数据发生变化时,新增一列
传统的拉链表分类(按照实现时的方式):
增量拉链表:
1.适用范围: 1).有增量字段可以识别数据更新,且业务数据变动时,增量时间会更新;2).数据不存在物理删除
2.实现方式: 2).基于mysql表的增量数据实现,此处我们假设增量数据存放于stg_a001_table1,全量表存放于ods_a001_table1,uuid为构造字段,具体逻辑为,md5(concat(nvl(a),nvl(b)...)),也就是拉链比对字段空处理后拼接,再MD5的值
则具体实现可体现为:
select
oat.ods_start_dt,
if(sat.id is not null and oat.uuid<>sat.uuid,'${ymd}',oat.ods_end_dt) as ods_end_dt -- 对更新数据进行关链
oat.uuid,
oat.其它字段
from ods_a001_table1 oat
left join stg_a001_table1 sat on oat.id=sat.id and sat.ymd='${ymd}' -- stg_a001_table1最好设置为分区表
where oat.ods_start_dt<='${ymd}' and oat.ods_end_dt>'${ymd}'
union all
select
'${ymd}' as ods_start_dt,
'99991231' as ods_end_dt, -- 新增和更新数据进行开链
sat.uuid,
sat.其它字段
from stg_a001_table1 sat
where sat.ymd='${ymd}'
全量拉链表:
1.适用范围: 1).无增量字段可以识别数据更新;2).数据存在物理删除
2.实现方式: 2).基于mysql表的增量数据实现,此处我们假设同步全量数据存放于stg_a001_table1,全量表存放于ods_a001_table1
select
oat.ods_start_dt,
case when sat.id is null or (sat.id is not null and oat.uuid<>sat.uuid),'${ymd}',oat.ods_end_dt) as ods_end_dt, -- 对更新和删除进行关链
oat.uuid,
oat.其它字段
from ods_a001_table1 oat
left join stg_a001_table1 sat on oat.id=sat.id and sat.ymd='${ymd}' -- stg_a001_table1最好设置为分区表
where oat.ods_start_dt<='${ymd}' and oat.ods_end_dt>'${ymd}'
union all
select
'${ymd}' as oat.ods_start_dt,
'99991231' as ods_end_dt, -- 对插入和更新的开链
sat.uuid,
sat.其它字段
from stg_a001_table1 sat
left join ods_a001_table1 oat on oat.id=sat.id and oat.ods_start_dt<='${ymd}' and oat.ods_end_dt>'${ymd}'-- stg_a001_table1最好设置为分区表
where sat.ymd='${ymd}'
and (oat.id is null or (oat.id is not null and oat.uuid<>sat.uuid))
基于binlog的实现
binlog的数据我们通过canel已经写入kafka,所以此处只需要通过flume去实时消费对应的topic并写入hdfs即可。
1.对每天路径下的数据进行去重
1).创建外表tmp_stg_a001_table1映射到hdfs上的增量数据存储路径
2).通过row_number函数,按照主键,对数据进行去重,只保留最后一条数据,将数据写入表stg_a001_table1
2.基于去重数据对数据进行拉链处理( binlog_type为每条数据的操作类型 D标识删除 I为插入、U为更新)
select
oat.ods_start_dt,
if((sat.id is not null and oat.uuid<>sat.uuid) or sat.binlog_type='D','${ymd}',oat.ods_end_dt) as ods_end_dt -- 对更新和删除数据进行关链
oat.uuid,
oat.其它字段
from ods_a001_table1 oat
left join stg_a001_table1 sat on oat.id=sat.id and sat.ymd='${ymd}' -- stg_a001_table1最好设置为分区表
where oat.ods_start_dt<='${ymd}' and oat.ods_end_dt>'${ymd}'
union all
select
'${ymd}' as ods_start_dt,
'99991231' as ods_end_dt, -- 新增和更新数据进行开链
sat.uuid,
sat.其它字段
from stg_a001_table1 sat
where sat.ymd='${ymd}' and sat.binlog_type<>'D' -- 当日新增且当日删除的数据无需处理
网友评论