美文网首页Flink
CDC之Debezium小探

CDC之Debezium小探

作者: 淡淡的小番茄 | 来源:发表于2021-07-08 17:03 被阅读0次

背景

项目初期我们基于canal实现了设备信息的准实时同步。基于MySQL binlog,将mysql中的设备变更信息,实时同步到OLAP数据库ClickHouse,以应对实时分析的场景。canal社区非常不活跃,很多bug都无人修复,上线运行的这段时间,也踩过了很多坑。基本上都是通过重启来解决的。

这段时间,在梳理项目的各个组件,打算把canal替换为debezium。其实,个人还是比较偏重直接使用ClickHouse的MaterializeMySQL引擎来实现数据的增量同步。奈何截止到 21.6版本此引擎暂不支持按照表来过滤,社区已经在规划此功能,希望尽快完成。

官方的架构图

debezium是基于Kafka Connect实现的。Kafka Connect 是一个框架,用于将数据传入和传出 Apache Kafka。整体架构图如下:

其实就是抽象出一层任务执行调度平台。支持节点的高可用、弹性扩容、故障引起的任务再平衡。部署模式有两种,一种是单机模式,一种是集群模式。生产环境肯定选择的集群模式。此模式需要引入:kafka connect、zookeeper,相对来说是比较重的。其还支持embed模式,通过java api的方式集成到现有的功能模块中。这块确实是我们所需要的,我们计划采用此集成方式,将canal替换。

新建项目

新建spring boot工程,使用注解PostConstruct启动引擎,PreDestroy停止引擎。生产中肯定不能是单点部署,所以我们选择使用kafka来存储offset和database.history信息。代码示例如下:

@Service

public class CDCServiceDeveziumImpl implements CDCService{

DebeziumEngine<ChangeEvent<String, String>> engine;

@PostConstruct

public void startEngine() throws IOException {

// Define the configuration for the Debezium Engine with MySQL connector...

final Properties props = new Properties();

props.setProperty("name", "engine");

props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");

//文件存储

//props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");

//props.setProperty("offset.storage.file.filename", "/tmp/offsets.dat");

//kafka

props.setProperty("offset.storage", "org.apache.kafka.connect.storage.KafkaOffsetBackingStore");

props.setProperty("offset.storage.topic", "cdc_offsets");

props.setProperty("bootstrap.servers", "172.30.xxx.xx:9092");

props.setProperty("offset.storage.partitions", "1");

props.setProperty("offset.storage.replication.factor", "1");

props.setProperty("offset.flush.interval.ms", "60000");

//props.setProperty("schema.whitelist", "xxx_[0-1]"); // 库.表名

props.setProperty("table.whitelist", "xxx_[0-1]\\.yyy_([1-9]|[12][0-9]|3[0-1])"); // 库.表名

/* begin connector properties */

props.setProperty("database.hostname", "172.30.xxx.xxx");

props.setProperty("database.port", "3306");

props.setProperty("database.user", "root");

props.setProperty("database.password", "xxxxyyy");

props.setProperty("database.server.id", "85744");

props.setProperty("database.server.name", "device-connector");

props.setProperty("snapshot.mode", "schema_only");

//文件存储

//props.setProperty("database.history","io.debezium.relational.history.FileDatabaseHistory");

//props.setProperty("database.history.file.filename","/path/to/storage/dbhistory.dat");

//kafka

props.setProperty("database.history","io.debezium.relational.history.KafkaDatabaseHistory");

props.setProperty("database.history.kafka.topic", "cdc_db_history");

props.setProperty("database.history.kafka.bootstrap.servers", "172.30.xxx.xx:9092");

// Create the engine with this configuration ...

        engine = DebeziumEngine.create(Json.class)

        .using(props)

        .notifying(record -> {

        notifying(record);

        }).build();

    // Run the engine asynchronously ...

    ExecutorService executor = Executors.newSingleThreadExecutor();

    executor.execute(engine);

}

@PreDestroy

public void stopEngine() {

if (engine != null) {

            try {

engine.close();

} catch (IOException e) {

e.printStackTrace();

}

        }

}

@Override

public void notifying(ChangeEvent<String, String> record) {

System.out.println(record);

}

}

微服务启动后会自动监听binlog日志。

存在的问题

kafka的连接有很好的重试机制,比如暂时由于某些原因连不上kafka服务端,客户端会一直重试的。但是DebeziumEngine 对于mysql连接这就快重试机制不健全。DebeziumEngine是通过在xxErrorHandler.java类中进行判断的。比较如下两个类

debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlErrorHandler.java

debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerErrorHandler.java

网络断开后异常信息如下:

网络恢复后无法正常监听binlog,重启后功能正常。

如何解决

已给社区提了缺陷,待反馈。如果不行,计划自行修改MySqlErrorHandler的代码,将相关异常判断加到逻辑中。

相关文章

  • CDC之Debezium小探

    背景 项目初期我们基于canal实现了设备信息的准实时同步。基于MySQL binlog,将mysql中的设备变更...

  • debezium关于cdc的使用(下)

    博文原址:debezium关于cdc的使用(下) 简介 debezium在debezium关于cdc的使用(上)中...

  • kafka+flink任务级顺序保证

    顺序保证难点 本文主要分析 CDC 业务场景中任务级顺序保证,技术选型为:debezium、kafka、flink...

  • 数据库数据变动实时监听

    使用Debezium、Postgres和Kafka进行数据实时采集(CDC) 码匠君 9小时前 1. 背景 一直在...

  • Debezium 初次使用

    世间的一切都可以用时间来解决 前言: 因项目需要,需要调研debezium,也就是捕获数据更改(CDC)。翻阅了网...

  • Debezium同步MySQL网络异常问题小探

    背景 Debezium引擎在进行数据同步的时候,如果期间发生了异常问题。会有专门的异常处理类来出来,MySQL异常...

  • 数据同步工具 debezium

    数据同步工具 debezium 简介 Kafka Connect 配置 安装 debezium 插件 分发 辅助脚...

  • SIDDHI(西迪) CDC配置

    CDC介绍 关于CDC的介绍可参考 cdc(程序CDC类)_百度百科 (baidu.com)[https://ba...

  • 增量数据同步工具Debezium介绍

    Debezium能做什么 RedHat开源的Debezium是一个将多种数据源实时变更数据捕获,形成数据流输出的开...

  • 112

    本文来自Tencent CDC,转载请注明作者信息及出处。Tencent CDC(http://cdc.tence...

网友评论

    本文标题:CDC之Debezium小探

    本文链接:https://www.haomeiwen.com/subject/ppstpltx.html