美文网首页Flink
CDC之Debezium小探

CDC之Debezium小探

作者: 淡淡的小番茄 | 来源:发表于2021-07-08 17:03 被阅读0次

    背景

    项目初期我们基于canal实现了设备信息的准实时同步。基于MySQL binlog,将mysql中的设备变更信息,实时同步到OLAP数据库ClickHouse,以应对实时分析的场景。canal社区非常不活跃,很多bug都无人修复,上线运行的这段时间,也踩过了很多坑。基本上都是通过重启来解决的。

    这段时间,在梳理项目的各个组件,打算把canal替换为debezium。其实,个人还是比较偏重直接使用ClickHouse的MaterializeMySQL引擎来实现数据的增量同步。奈何截止到 21.6版本此引擎暂不支持按照表来过滤,社区已经在规划此功能,希望尽快完成。

    官方的架构图

    debezium是基于Kafka Connect实现的。Kafka Connect 是一个框架,用于将数据传入和传出 Apache Kafka。整体架构图如下:

    其实就是抽象出一层任务执行调度平台。支持节点的高可用、弹性扩容、故障引起的任务再平衡。部署模式有两种,一种是单机模式,一种是集群模式。生产环境肯定选择的集群模式。此模式需要引入:kafka connect、zookeeper,相对来说是比较重的。其还支持embed模式,通过java api的方式集成到现有的功能模块中。这块确实是我们所需要的,我们计划采用此集成方式,将canal替换。

    新建项目

    新建spring boot工程,使用注解PostConstruct启动引擎,PreDestroy停止引擎。生产中肯定不能是单点部署,所以我们选择使用kafka来存储offset和database.history信息。代码示例如下:

    @Service

    public class CDCServiceDeveziumImpl implements CDCService{

    DebeziumEngine<ChangeEvent<String, String>> engine;

    @PostConstruct

    public void startEngine() throws IOException {

    // Define the configuration for the Debezium Engine with MySQL connector...

    final Properties props = new Properties();

    props.setProperty("name", "engine");

    props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");

    //文件存储

    //props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");

    //props.setProperty("offset.storage.file.filename", "/tmp/offsets.dat");

    //kafka

    props.setProperty("offset.storage", "org.apache.kafka.connect.storage.KafkaOffsetBackingStore");

    props.setProperty("offset.storage.topic", "cdc_offsets");

    props.setProperty("bootstrap.servers", "172.30.xxx.xx:9092");

    props.setProperty("offset.storage.partitions", "1");

    props.setProperty("offset.storage.replication.factor", "1");

    props.setProperty("offset.flush.interval.ms", "60000");

    //props.setProperty("schema.whitelist", "xxx_[0-1]"); // 库.表名

    props.setProperty("table.whitelist", "xxx_[0-1]\\.yyy_([1-9]|[12][0-9]|3[0-1])"); // 库.表名

    /* begin connector properties */

    props.setProperty("database.hostname", "172.30.xxx.xxx");

    props.setProperty("database.port", "3306");

    props.setProperty("database.user", "root");

    props.setProperty("database.password", "xxxxyyy");

    props.setProperty("database.server.id", "85744");

    props.setProperty("database.server.name", "device-connector");

    props.setProperty("snapshot.mode", "schema_only");

    //文件存储

    //props.setProperty("database.history","io.debezium.relational.history.FileDatabaseHistory");

    //props.setProperty("database.history.file.filename","/path/to/storage/dbhistory.dat");

    //kafka

    props.setProperty("database.history","io.debezium.relational.history.KafkaDatabaseHistory");

    props.setProperty("database.history.kafka.topic", "cdc_db_history");

    props.setProperty("database.history.kafka.bootstrap.servers", "172.30.xxx.xx:9092");

    // Create the engine with this configuration ...

            engine = DebeziumEngine.create(Json.class)

            .using(props)

            .notifying(record -> {

            notifying(record);

            }).build();

        // Run the engine asynchronously ...

        ExecutorService executor = Executors.newSingleThreadExecutor();

        executor.execute(engine);

    }

    @PreDestroy

    public void stopEngine() {

    if (engine != null) {

                try {

    engine.close();

    } catch (IOException e) {

    e.printStackTrace();

    }

            }

    }

    @Override

    public void notifying(ChangeEvent<String, String> record) {

    System.out.println(record);

    }

    }

    微服务启动后会自动监听binlog日志。

    存在的问题

    kafka的连接有很好的重试机制,比如暂时由于某些原因连不上kafka服务端,客户端会一直重试的。但是DebeziumEngine 对于mysql连接这就快重试机制不健全。DebeziumEngine是通过在xxErrorHandler.java类中进行判断的。比较如下两个类

    debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlErrorHandler.java

    debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerErrorHandler.java

    网络断开后异常信息如下:

    网络恢复后无法正常监听binlog,重启后功能正常。

    如何解决

    已给社区提了缺陷,待反馈。如果不行,计划自行修改MySqlErrorHandler的代码,将相关异常判断加到逻辑中。

    相关文章

      网友评论

        本文标题:CDC之Debezium小探

        本文链接:https://www.haomeiwen.com/subject/ppstpltx.html