1. 背景
数据增量同步是 ETL 关键功能,在全量同步后,持续增量同步,保证数据的完整,正确和时效,通常有两种方式实现,双写和 CDC
双写 优点,实现简单, 写入源库同时写入目标库;缺点,代码侵入,影响正常业务
CDC 优点,无侵入,读取数据库 log,获取数据变更;缺点,复杂,需要引入 CDC 组件,从数据变更(表/行/字段变更)到目标增量变更(通常是 DTO)需要复杂的映射
Cdc 组件本身通用设计,支持扩展 redis,elasticsearch 等数据库同步
本文包括两部分,cdc 组件设计和关系/图增量同步设计
2. 参考和术语
CDCchange data capture 数据变更抓获
RBT 基于规则的转换组件https://xie.infoq.cn/edit/77c1a56d807abadefb83a944e
3. 分布式 SETL 模块和规划
下图介绍 SETL 模块和规划
setl-rbt 全量同步组件,datax 组件,接入分布式调度,实现高性能的全量同步
setl-cdc cdc 增量同步 datax 组件,接入分布式时间槽实现高可靠增量,后续规划接入 kafka connect
setl-stream 研发中,流式 etl,引入 kafka connect,实现高吞吐低延时的增量同步
config-center 配置中心,datax 原生使用本地文件配置,配置中心摆脱本地文件限制,实现分布式系统的必要基础设施
sanner schema 扫描,辅助数据的同步
4. Debezium CDC 原理
Debezium cdc 组件,支持多种关系数据库,如 sqlserver,db2, oracle,mysql 等,也支持 newsql, 如 Cassandra,mangodb,抽取并解释数据库日志获取数据变更,支持以事件/监听模式消费事件,debezium 支持 ack 机制,实现事件消费的可靠性
下图是 dbz 主要模块
Apidbz 引擎定义,目前只有一个实现,EmbeddedEngine
Embeddeddbz 引擎的实现,顾名思义,可以嵌入到用户的代码,轻量级的,称为 api 方式
Serverdbz 预定义的事件分发服务,开箱即用,实现分发变更事件到 redis,plusar 和其他云消息服务,称为 dbz engine server 方式,但其依赖 Quarkus,一个对标 spring cloud 框架,如果你已使用 spring cloud 构建你的服务,可能对该方式不是很感兴趣
connector官方推荐的 deploy 方式,如下图,接入 kafka connect,connector 作为 kafka connect 的 source,捕获源数据库变更,以事件方式推动到 kafka,订阅事件的 sink,写入到目标库,该方式充分利用 kafka 特性
前 2 种方式没有接入 kafka,使用的是相同的 connector,但也依赖 kafka 类,如事件,消费接口等
5. Dbz cdc 开发框架
上图是 dbz cdc sdk,虚线框是 dbz 的类,其他属于 estl-data-cdc 组件
DbzEngineClientConfiguration构建和初始化 dbz 引擎客户端,支持读取本地配置文件,配置中心
DebeziumEngineClientdbz 引擎客户端,负责启动/停止 dbz 引擎
DispatchChangeEventConsumerImplcdc 事件消费实现,接收原生事件,负责批量 ack
DispatchChangeEventListenerImpl/ChangEventListener setl-cdc 组件
1)接收原生变更事件,读取和解释到 RowDataBean,使用内部模型,如,操作类型,数据的 before 和 after,解耦对 dbz 的依赖,也更方便后续使用
2)分类变更事件,目前只有行数据(row)事件, 过滤掉 query 操作,分发给 ChangeEventRowListener
BinlogSync ChangeEventRowListener 实现,实现订阅服务,依据 ChangeEventHandler 设置感兴趣的 db.table 和增删改类型调用处理器
开发人员实现自己的同步业务只需实现ChangeEventHandler 即可,sdk 有效地屏蔽 dbz cdc 的内部机制
6. 关系/图(neo4j)增量同步设计
增量同步是 cdc 开发框架应用,cdc 告诉我们哪个库哪个表哪行哪些字段变更,但增量同步的目标库,如,neo4j,elasticsearch 存储相当于 dto,因此需要映射计算框架,从表行字段的变更映射为变更的 dto
Ø 配置和执行
上图是 CDC 模型,包括 CDC 定义,映射 Action 设计,属于 rb-transformer 模块
CDC cdc 配置模型,同时负责选择和执行适用 cdc 动作
CdcAction cdc 动作,数据变更映射到目标库变更的逻辑实现
CdcRule cdc 动作是否适用数据变更,CdcRule 返回布尔值
Ø datax reader
Cdc 动作和变更事件处理器 datax 实现,该实现接入到 datax reader,使用 datax 的 reader->transform->writer 机制,cdc 发出数据变更事件,cdc 动作负责映射为目标变更,transform 规则转换,最后到 writer,目标变更写入目标库
Ø 同步实现时序
同步实现时序展示数据变更事件分发,CDCAction 执行
关键步骤:
1.2/1.21 ChangeConsumer 负责批量 ack
1.3/1.4 解释变更事件,并转化为内部 RowDataBean 对象,解耦 dbz
1.5 BinlogSync 是 ChangeEventRowListener 实现,从这开始进入同步业务
1.6/1.7 查找对表变更感兴趣的 cdc,表粒度较大,初步筛选,1.12 精确筛选
1.12/1.16 onConditional 通过规则返回 action 是否适用(RowDataBean),若适用执行
1.15 CdcReaderAction 依据 RowDataBean,源数据变更映射为目标变更,同步到目标库
Ø 关系/图 CDC 配置示例
1) 场景
上图 cdc 示例 schema,覆盖 3 类场景
1. 连接表关系变更 film->film_category->category
2. 外键关系变更 customer->store
3. 主从表多对一变更 store-address 对应图库的 store 顶点
2) 配置示例解释
Cdc 挂在 rbt 转换下,两者并无直接关系,这样设计主要方便 cdc 编写时参考
Tables 标签该 cdc 相关的表,可设置多个表
Insert/update/delete 标签 分别对应增删改分类的 cdc 动作,实际上,insert 的事件可以产生 update 的 action,该分类只是管理维度
Action 标签 数据变更对应的动作,映射源变更为目标变更
上图展示源 customer 表 insert 引起目标图库两个动作
1) 新增的 customer 顶点
2) 如果新增 customer 带有 store 外键,新增 customer->store 关系,适用规则
r.afterField(store_id)!=null,插入数据 store_id 不为空
上图源 customer 变更引起目标变更,
规则 !(r.updates().size()==2 &&r.isUpdate('store_id'))*
判断数据变更除了 store_id 还有其他字段变更才出发,store_id 变更,引起后面两个动作,删除旧关系,新增新关系,关系 rule 是一样的,参数来源不一样,删除的 store_id 来源于 before,新增来源于 after
*数据行 lastUpdate 必然变更,所以 r.updates().size()==2
上图删除变更映射,customer 删除,首先删除关系,再删除节点
Ø 退出策略(TBD)
Cdc 是事件流,事件源源不断产生,认为是永续执行,目前 cdc 接入 datax,定时调度执行,因此需要一个机制退出,待下次定时调度再执行,例如,执行时长,未接收到事件时长等
7. 架构质量设计
Ø可靠性数据变更不丢失,数据变更事件至少消费一次,但允许重复处理
依赖 ack 机制,只有事件处理完毕 ack,源头事件偏移,否则,事件重发,事件处理器需要识别事件是否处理,保证幂等性
Ø高可用变更事件处理是按顺序,只能单个线程按顺序处理,高可用是主备架构,处理节点失效,备用节点激活无缝接上失效节点
这里涉及两个关键点,处理状态恢复;处理节点失效发现,备用节点激活
Ø datax writer 数据变更事件允许重复处理,writer 需要考虑幂等性
附录
Dbz 引擎配置示例
网友评论