背景
项目初期我们基于canal实现了设备信息的准实时同步。基于MySQL binlog,将mysql中的设备变更信息,实时同步到OLAP数据库ClickHouse,以应对实时分析的场景。canal社区非常不活跃,很多bug都无人修复,上线运行的这段时间,也踩过了很多坑。基本上都是通过重启来解决的。
这段时间,在梳理项目的各个组件,打算把canal替换为debezium。其实,个人还是比较偏重直接使用ClickHouse的MaterializeMySQL引擎来实现数据的增量同步。奈何截止到 21.6版本此引擎暂不支持按照表来过滤,社区已经在规划此功能,希望尽快完成。
官方的架构图
debezium是基于Kafka Connect实现的。Kafka Connect 是一个框架,用于将数据传入和传出 Apache Kafka。整体架构图如下:
其实就是抽象出一层任务执行调度平台。支持节点的高可用、弹性扩容、故障引起的任务再平衡。部署模式有两种,一种是单机模式,一种是集群模式。生产环境肯定选择的集群模式。此模式需要引入:kafka connect、zookeeper,相对来说是比较重的。其还支持embed模式,通过java api的方式集成到现有的功能模块中。这块确实是我们所需要的,我们计划采用此集成方式,将canal替换。
新建项目
新建spring boot工程,使用注解PostConstruct启动引擎,PreDestroy停止引擎。生产中肯定不能是单点部署,所以我们选择使用kafka来存储offset和database.history信息。代码示例如下:
@Service
public class CDCServiceDeveziumImpl implements CDCService{
DebeziumEngine<ChangeEvent<String, String>> engine;
@PostConstruct
public void startEngine() throws IOException {
// Define the configuration for the Debezium Engine with MySQL connector...
final Properties props = new Properties();
props.setProperty("name", "engine");
props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
//文件存储
//props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
//props.setProperty("offset.storage.file.filename", "/tmp/offsets.dat");
//kafka
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.KafkaOffsetBackingStore");
props.setProperty("offset.storage.topic", "cdc_offsets");
props.setProperty("bootstrap.servers", "172.30.xxx.xx:9092");
props.setProperty("offset.storage.partitions", "1");
props.setProperty("offset.storage.replication.factor", "1");
props.setProperty("offset.flush.interval.ms", "60000");
//props.setProperty("schema.whitelist", "xxx_[0-1]"); // 库.表名
props.setProperty("table.whitelist", "xxx_[0-1]\\.yyy_([1-9]|[12][0-9]|3[0-1])"); // 库.表名
/* begin connector properties */
props.setProperty("database.hostname", "172.30.xxx.xxx");
props.setProperty("database.port", "3306");
props.setProperty("database.user", "root");
props.setProperty("database.password", "xxxxyyy");
props.setProperty("database.server.id", "85744");
props.setProperty("database.server.name", "device-connector");
props.setProperty("snapshot.mode", "schema_only");
//文件存储
//props.setProperty("database.history","io.debezium.relational.history.FileDatabaseHistory");
//props.setProperty("database.history.file.filename","/path/to/storage/dbhistory.dat");
//kafka
props.setProperty("database.history","io.debezium.relational.history.KafkaDatabaseHistory");
props.setProperty("database.history.kafka.topic", "cdc_db_history");
props.setProperty("database.history.kafka.bootstrap.servers", "172.30.xxx.xx:9092");
// Create the engine with this configuration ...
engine = DebeziumEngine.create(Json.class)
.using(props)
.notifying(record -> {
notifying(record);
}).build();
// Run the engine asynchronously ...
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(engine);
}
@PreDestroy
public void stopEngine() {
if (engine != null) {
try {
engine.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void notifying(ChangeEvent<String, String> record) {
System.out.println(record);
}
}
微服务启动后会自动监听binlog日志。
存在的问题
kafka的连接有很好的重试机制,比如暂时由于某些原因连不上kafka服务端,客户端会一直重试的。但是DebeziumEngine 对于mysql连接这就快重试机制不健全。DebeziumEngine是通过在xxErrorHandler.java类中进行判断的。比较如下两个类
debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlErrorHandler.java
debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerErrorHandler.java
网络断开后异常信息如下:
网络恢复后无法正常监听binlog,重启后功能正常。
如何解决
已给社区提了缺陷,待反馈。如果不行,计划自行修改MySqlErrorHandler的代码,将相关异常判断加到逻辑中。
网友评论