美文网首页
Hudi系列11:Flink CDC 将MySQL的数据写入Hu

Hudi系列11:Flink CDC 将MySQL的数据写入Hu

作者: 只是甲 | 来源:发表于2023-01-31 15:45 被阅读0次

一. 下载依赖包

将 flink-sql-connector-mysql-cdc-2.2.1.jar 下载后,上传到$FLINK_HOME/lib目录

二. 源端数据准备

use test;
DROP TABLE IF EXISTS mysql_cdc;

create table mysql_cdc
(
id          int NOT NULL AUTO_INCREMENT ,
name    varchar(100),
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='mysql cdc 表';

insert into mysql_cdc(id, name) values (1,'test1');
insert into mysql_cdc(id, name) values (2,'test2');
insert into mysql_cdc(id, name) values (3,'test3');
insert into mysql_cdc(id, name) values (4,'test4');
insert into mysql_cdc(id, name) values (5,'test5');
insert into mysql_cdc(id, name) values (6,'test6');
insert into mysql_cdc(id, name) values (7,'test7');
insert into mysql_cdc(id, name) values (8,'test8');
insert into mysql_cdc(id, name) values (9,'test9');
insert into mysql_cdc(id, name) values (10,'test10');
insert into mysql_cdc(id, name) values (11,'test11');
insert into mysql_cdc(id, name) values (12,'test12');
insert into mysql_cdc(id, name) values (13,'test13');
insert into mysql_cdc(id, name) values (14,'test14');
insert into mysql_cdc(id, name) values (15,'test15');
insert into mysql_cdc(id, name) values (16,'test16');
insert into mysql_cdc(id, name) values (17,'test17');
insert into mysql_cdc(id, name) values (18,'test18');
insert into mysql_cdc(id, name) values (19,'test19');

三. 使用Flink cdc mysql连接器创建flinkSQL映射表

代码:
这个地方的server-id我给了一个动态的端口,之前给静态的端口,总是报错:

io.debezium.DebeziumException: A slave with the same server_uuid/server_id as this slave has connect
set execution.checkpointing.interval=10sec;

CREATE TABLE flink_mysql_cdc5 (
    id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
    name varchar(100)
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'hp8',
    'port' = '3306',
    'username' = 'root',
    'password' = 'abc123',
    'database-name' = 'test',
    'table-name' = 'mysql_cdc',
    'server-id' = '5400-5408',
    'scan.incremental.snapshot.enabled'='true'
);

set sql-client.execution.result-mode=tableau;

select * from flink_mysql_cdc5;

测试记录:

image.png

四. 创建FlinkSQL Hudi连接器创建hudi表

代码:

CREATE TABLE flink_hudi_mysql_cdc5(
    id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
    name varchar(100)
  ) WITH (
   'connector' = 'hudi',
   'path' = 'hdfs://hp5:8020/tmp/hudi/flink_hudi_mysql_cdc5',
   'table.type' = 'MERGE_ON_READ',
   'changelog.enabled' = 'true',
   'hoodie.datasource.write.recordkey.field' = 'id',
   'write.precombine.field' = 'name',
   'compaction.async.enabled' = 'false'
);

五. 将数据从CDC表插入hudi表

insert into flink_hudi_mysql_cdc5 select * from flink_mysql_cdc5;

select * from flink_hudi_mysql_cdc5 ;

HDFS上也有数据:

六. 测试增删改

insert into mysql_cdc(id, name) values (20,'test20');
delete from mysql_cdc where id = 1;
update mysql_cdc set name ='test2-updated' where id = 2;
update mysql_cdc set name ='test3-updated' where id = 3;
delete from mysql_cdc where id = 4;
image.png

参考:

  1. https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html
  2. https://www.pudn.com/news/6228ca059ddf223e1ad0b87f.html
  3. https://zhuanlan.zhihu.com/p/479832928

相关文章

网友评论

      本文标题:Hudi系列11:Flink CDC 将MySQL的数据写入Hu

      本文链接:https://www.haomeiwen.com/subject/kuoefdtx.html