Flink 使用介绍相关文档目录
环境信息
- Hadoop 3.1.1
- Flink 1.17.2
- Hudi 0.15.0
- MySQL 5.7.x
MySQL开启Binlog
具体步骤参见:Flink 使用之 MySQL CDC
编译Hudi
首先clone HUDI项目。
git clone https://github.com/apache/hudi.git
切换到release-0.15.0分支。然后执行如下编译命令:
mvn clean package -Dflink1.17 -Dscala2.12 -Dspark3.3 -DskipTests -Pflink-bundle-shade-hive3
编译完毕后输出的hudi-flink1.17-bundle-0.15.0.jar
位于hudi/packaging/hudi-flink-bundle/target
中。复制走备用。
Flink配置
下载Flink 1.17.2二进制包解压到服务器任意目录备用。
配置checkpoint
编辑$FLINK_HOME/conf/flink-conf.yaml
文件,加入如下配置启用checkpoint。示例checkpoint间隔时间为3s。
execution.checkpointing.interval: 3s
添加依赖
添加如下依赖到$FLINK_HOME/lib
目录中。
- flink-sql-connector-mysql-cdc-3.1.0.jar 点我下载
- hudi-flink1.17-bundle-0.15.0.jar
- mysql-connector-java-8.0.27.jar 点我下载
操作演示
下面以student表为例,演示MySQL CDC到Hudi全过程。
student表有3个字段:
- id int类型
- name varchar类型
- score int类型
首先我们创建MySQL表,并写入初始数据:
create table student(
id int,
name varchar(50),
score int
);
insert into student values(1, 'Paul', 123456),(2, 'Kate', 654321),(3, 'Peter', 222222);
接下来进入Flink SQL Client,创建student CDC表和hudi_student表,然后插入CDC表的内容到Hudi表中。需要执行如下SQL:
CREATE TABLE student (
id int,
name string,
score int,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.0.1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'demo',
'table-name' = 'student',
'scan.incremental.snapshot.chunk.key-column' = 'id'
);
CREATE TABLE hudi_student (
id int,
name string,
score int,
PRIMARY KEY(id) NOT ENFORCED
) with (
'connector' = 'hudi',
'path' = 'hdfs:///hudi_student',
'table.type' = 'MERGE_ON_READ',
'compaction.async.enabled' = 'true',
'compaction.trigger.strategy' = 'num_commits',
'compaction.delta_commits' = '5'
);
insert into hudi_student select * from student;
需要注意的是,如果MySQL表没有指定主键约束,CDC表的属性必须要添加'scan.incremental.snapshot.chunk.key-column'配置项用来指定主键。
最后执行:
select * from hudi_student;
可以查看同步到hudi_student
表中的全量数据。
id name score
1 Paul 123456
2 Kate 654321
3 Peter 222222
新增/修改/删除同步演示
新增数据
在MySQL控制台执行:
insert into student values(4, Tom, 444444);
等待一段时间之后,在Flink SQL Client执行:
select * from hudi_student;
得到查询结果:
id name score
1 Paul 123456
2 Kate 654321
3 Peter 222222
4 Tom 444444
新的数据已经追加到了Hudi表中。
修改数据
在MySQL控制台执行:
update student set score=333333 where id=2;
等待一段时间之后,在Flink SQL Client执行:
select * from hudi_student;
得到查询结果:
id name score
1 Paul 123456
2 Kate 333333
3 Peter 222222
4 Tom 444444
发现id为2的数据已经修改。
删除数据
在MySQL控制台执行:
delete from student where id=2;
等待一段时间之后,在Flink SQL Client执行:
select * from hudi_student;
得到查询结果:
id name score
1 Paul 123456
3 Peter 222222
4 Tom 444444
发现id为2的数据已经删除。
MySQL CDC实时增量入湖的配置和演示过程到此结束。
附录
Oracle CDC情况说明
经本人试验Oracle CDC入湖存在如下问题:
Oracle CDC使用Flink SQL的方式只能获取到最后一条数据。无法读取到初始的全量数据。应为Flink CDC bug所致。参见:https://developer.aliyun.com/ask/623649。
Oracle CDC的延迟非常高。即便是按照文档添加了:
'debezium.log.mining.strategy' = 'online_catalog',
'debezium.log.mining.continuous.mine' = 'true'
效果仍然不好。
Oracle CDC增量修改的数据同步到Hudi表之后会导致Hudi表数据消失,原因未知。
本人结论,不建议使用Oracle CDC实时增量入湖,现阶段问题较多。
网友评论