一. 数据源准备
建表:
CREATE TABLE `mysql_cdc` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB;
写存储过程批量插入数据:
DELIMITER //
CREATE PROCEDURE p5()
BEGIN
declare l_n1 int default 1;
while l_n1 <= 10000000 DO
insert into mysql_cdc (id,name) values (l_n1,concat('test',l_n1));
set l_n1 = l_n1 + 1;
end while;
END;
//
DELIMITER ;
二. FLink SQL客户端操作
启动yarn session
内存尽量多指定,不然会包 OOM的错误
$FLINK_HOME/bin/yarn-session.sh -jm 8192 -tm 8192 -d 2>&1 &
/home/flink-1.14.5/bin/sql-client.sh embedded -s yarn-session
Flink SQL操作:
set execution.checkpointing.interval=10sec;
CREATE TABLE flink_mysql_cdc8 (
id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
name varchar(100)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'hp8',
'port' = '3306',
'username' = 'root',
'password' = 'abc123',
'database-name' = 'test',
'table-name' = 'mysql_cdc',
'server-id' = '5409-5415',
'scan.incremental.snapshot.enabled'='true'
);
set sql-client.execution.result-mode=tableau;
select count(*) from flink_mysql_cdc8;
CREATE TABLE flink_hudi_mysql_cdc8(
id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
name varchar(100)
) WITH (
'connector' = 'hudi',
'path' = 'hdfs://hp5:8020/tmp/hudi/flink_hudi_mysql_cdc8',
'table.type' = 'COPY_ON_WRITE',
'changelog.enabled' = 'true',
'hoodie.datasource.write.recordkey.field' = 'id',
'write.precombine.field' = 'name',
'compaction.async.enabled' = 'false'
);
insert into flink_hudi_mysql_cdc8 select * from flink_mysql_cdc8;
select count(*) from flink_hudi_mysql_cdc8 ;
三. 查看运行情况
如果是生产环境,可以指定一个较高的并行度,我这个地方因为是测试环境,并行度指定为1
image.png
去掉checkpoint 且将并行度由1调整到4,速度提升了几十倍
set table.exec.resource.default-parallelism=4;
image.png
image.png
网友评论