美文网首页
Hudi系列11:Flink CDC 将MySQL的数据写入Hu

Hudi系列11:Flink CDC 将MySQL的数据写入Hu

作者: 只是甲 | 来源:发表于2023-01-31 15:45 被阅读0次

    一. 下载依赖包

    将 flink-sql-connector-mysql-cdc-2.2.1.jar 下载后,上传到$FLINK_HOME/lib目录

    二. 源端数据准备

    use test;
    DROP TABLE IF EXISTS mysql_cdc;
    
    create table mysql_cdc
    (
    id          int NOT NULL AUTO_INCREMENT ,
    name    varchar(100),
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='mysql cdc 表';
    
    insert into mysql_cdc(id, name) values (1,'test1');
    insert into mysql_cdc(id, name) values (2,'test2');
    insert into mysql_cdc(id, name) values (3,'test3');
    insert into mysql_cdc(id, name) values (4,'test4');
    insert into mysql_cdc(id, name) values (5,'test5');
    insert into mysql_cdc(id, name) values (6,'test6');
    insert into mysql_cdc(id, name) values (7,'test7');
    insert into mysql_cdc(id, name) values (8,'test8');
    insert into mysql_cdc(id, name) values (9,'test9');
    insert into mysql_cdc(id, name) values (10,'test10');
    insert into mysql_cdc(id, name) values (11,'test11');
    insert into mysql_cdc(id, name) values (12,'test12');
    insert into mysql_cdc(id, name) values (13,'test13');
    insert into mysql_cdc(id, name) values (14,'test14');
    insert into mysql_cdc(id, name) values (15,'test15');
    insert into mysql_cdc(id, name) values (16,'test16');
    insert into mysql_cdc(id, name) values (17,'test17');
    insert into mysql_cdc(id, name) values (18,'test18');
    insert into mysql_cdc(id, name) values (19,'test19');
    

    三. 使用Flink cdc mysql连接器创建flinkSQL映射表

    代码:
    这个地方的server-id我给了一个动态的端口,之前给静态的端口,总是报错:

    io.debezium.DebeziumException: A slave with the same server_uuid/server_id as this slave has connect
    
    set execution.checkpointing.interval=10sec;
    
    CREATE TABLE flink_mysql_cdc5 (
        id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
        name varchar(100)
      ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = 'hp8',
        'port' = '3306',
        'username' = 'root',
        'password' = 'abc123',
        'database-name' = 'test',
        'table-name' = 'mysql_cdc',
        'server-id' = '5400-5408',
        'scan.incremental.snapshot.enabled'='true'
    );
    
    set sql-client.execution.result-mode=tableau;
    
    select * from flink_mysql_cdc5;
    

    测试记录:

    image.png

    四. 创建FlinkSQL Hudi连接器创建hudi表

    代码:

    CREATE TABLE flink_hudi_mysql_cdc5(
        id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
        name varchar(100)
      ) WITH (
       'connector' = 'hudi',
       'path' = 'hdfs://hp5:8020/tmp/hudi/flink_hudi_mysql_cdc5',
       'table.type' = 'MERGE_ON_READ',
       'changelog.enabled' = 'true',
       'hoodie.datasource.write.recordkey.field' = 'id',
       'write.precombine.field' = 'name',
       'compaction.async.enabled' = 'false'
    );
    

    五. 将数据从CDC表插入hudi表

    insert into flink_hudi_mysql_cdc5 select * from flink_mysql_cdc5;
    
    select * from flink_hudi_mysql_cdc5 ;
    

    HDFS上也有数据:

    六. 测试增删改

    insert into mysql_cdc(id, name) values (20,'test20');
    delete from mysql_cdc where id = 1;
    update mysql_cdc set name ='test2-updated' where id = 2;
    update mysql_cdc set name ='test3-updated' where id = 3;
    delete from mysql_cdc where id = 4;
    
    image.png

    参考:

    1. https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html
    2. https://www.pudn.com/news/6228ca059ddf223e1ad0b87f.html
    3. https://zhuanlan.zhihu.com/p/479832928

    相关文章

      网友评论

          本文标题:Hudi系列11:Flink CDC 将MySQL的数据写入Hu

          本文链接:https://www.haomeiwen.com/subject/kuoefdtx.html