美文网首页
flink sql -mysql cdc 到hudi表在输

flink sql -mysql cdc 到hudi表在输

作者: wudl | 来源:发表于2022-09-05 19:23 被阅读0次

    1. 版本 对应的版本

    mysql flink kafka hudi
    5.7.20-log fink 13.5 2.0.0.3 0.10

    2. 采用架构

    flink-hudi-kaka.png

    3. flink sql 的 mysql cdc 表

    3.1 mysql 表结构

    CREATE TABLE `Flink_cdc` (
      `id` bigint(64) NOT NULL AUTO_INCREMENT,
      `name` varchar(64) DEFAULT NULL,
      `age` int(20) DEFAULT NULL,
      `birthday` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
      `ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=69 DEFAULT CHARSET=utf8mb4;
    

    3.2 flink sql mysql cdc 表

    Flink SQL> CREATE TABLE source_mysql (
    >    id BIGINT PRIMARY KEY NOT ENFORCED,
    >    name STRING,
    >    age INT,
    >    birthday TIMESTAMP(3),
    >    ts TIMESTAMP(3)
    >  ) WITH (
    >  'connector' = 'mysql-cdc',
    >  'hostname' = '192.168.1.162',
    >  'port' = '3306',
    >  'username' = 'root',
    >  'password' = '123456',
    >  'server-time-zone' = 'Asia/Shanghai',
    >  'debezium.snapshot.mode' = 'initial',
    >  'database-name' = 'wudldb',
    >  'table-name' = 'Flink_cdc'
    >  );
    > 
    [INFO] Execute statement succeed.
    
    

    3.2 新建hudi 表 并且插入数据

    
    Flink SQL>  CREATE TABLE flink_cdc_sink_hudi_hive_wudl(
    > id bigint ,
    > name string,
    > age int,
    > birthday TIMESTAMP(3),
    > ts TIMESTAMP(3),
    > part STRING,
    > primary key(id) not enforced
    > )
    > PARTITIONED BY (part)
    > with(
    > 'connector'='hudi',
    > 'path'= 'hdfs://192.168.1.161:8020/flink_cdc_sink_hudi_hive_wudl', 
    > 'table.type'= 'MERGE_ON_READ',
    > 'hoodie.datasource.write.recordkey.field'= 'id', 
    > 'write.precombine.field'= 'ts',
    > 'write.tasks'= '1',
    > 'write.rate.limit'= '2000', 
    > 'compaction.tasks'= '1', 
    > 'compaction.async.enabled'= 'true',
    > 'compaction.trigger.strategy'= 'num_commits',
    > 'compaction.delta_commits'= '1',
    > 'changelog.enabled'= 'true',
    > 'read.streaming.enabled'= 'true',
    > 'read.streaming.check-interval'= '3',
    > 'hive_sync.enable'= 'true',
    > 'hive_sync.mode'= 'hms',
    > 'hive_sync.metastore.uris'= 'thrift://node02.com:9083',
    > 'hive_sync.jdbc_url'= 'jdbc:hive2://node02.com:10000',
    > 'hive_sync.table'= 'flink_cdc_sink_hudi_hive_wudl',
    > 'hive_sync.db'= 'db_hive',
    > 'hive_sync.username'= 'root',
    > 'hive_sync.password'= '123456',
    > 'hive_sync.support_timestamp'= 'true'
    > );
    [INFO] Execute statement succeed.
    
    
    

    3.3 将cdc 的表数据插入到hudi 表中

    Flink SQL> INSERT INTO flink_cdc_sink_hudi_hive_wudl SELECT id, name,age,birthday, ts, DATE_FORMAT(birthday, 'yyyyMMdd') as part FROM source_mysql ;
    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 8a6e4869c43e57d57357c1767e7c2b38
    
    
    

    4. 查看数据

    flink cdc hudi 表2022.png

    5. 批处理 从hudi 表输出到 kakfa

    5.1 创建hudi 表

    Flink SQL> CREATE TABLE hudi_flink_kafka_source (
    > id bigint ,
    > name string,
    > age int,
    > birthday TIMESTAMP(3),
    > ts TIMESTAMP(3),
    > part STRING,
    > primary key(id) not enforced
    > )
    > PARTITIONED BY (part) 
    > WITH (
    >   'connector' = 'hudi',
    >   'path'= 'hdfs://192.168.1.161:8020/flink_cdc_sink_hudi_hive20220905', 
    >   'table.type' = 'MERGE_ON_READ',
    >   'write.operation' = 'upsert',
    >   'hoodie.datasource.write.recordkey.field'= 'id',
    >   'write.precombine.field' = 'ts',
    >   'write.tasks'= '1',
    >   'compaction.tasks' = '1', 
    >   'compaction.async.enabled' = 'true', 
    >   'compaction.trigger.strategy' = 'num_commits', 
    >   'compaction.delta_commits' = '1'
    >   );
    > 
    
    

    5.2 创建kafka 表

    Flink SQL> CREATE TABLE kakfa_sink6 (
    > id bigint ,
    > name string,
    > age int,
    > birthday TIMESTAMP(3),
    > ts TIMESTAMP(3)
    > ) WITH (
    >   'connector' = 'kafka',
    >   'topic' = 'wudl2022flink03',
    >   'properties.bootstrap.servers' = '192.168.1.161:6667',
    >   'properties.group.id' = 'wudl20220905',
    >   'format' = 'json',
    >   'json.fail-on-missing-field' = 'false',
    >   'json.ignore-parse-errors' = 'true'
    > );
    [INFO] Execute statement succeed.
    
    Flink SQL> INSERT INTO kakfa_sink6  SELECT id, name,age,birthday, ts FROM hudi_flink_kafka_source ;
    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 005ee1b8011319d235c6485c2abb3efb
    
    
    
    

    6. 查看表结构数据

    flink hudi-kafka.png

    7. 时间转化函数

    7.1 flink sql LOCALTIMESTAMP 获取系统时间

    Flink SQL> select     DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss');
    +----+--------------------------------+
    | op |                         EXPR$0 |
    +----+--------------------------------+
    | +I |            2022-09-05 19:19:42 |
    +----+--------------------------------+
    Received a total of 1 row
    
     # TO_TIMESTAMP  时间的转化
     
    Flink SQL> 
    
    Flink SQL> select TO_TIMESTAMP(DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss'));
    +----+-------------------------+
    | op |                  EXPR$0 |
    +----+-------------------------+
    | +I | 2022-09-05 19:20:30.000 |
    +----+-------------------------+
    Received a total of 1 row
    
    Flink SQL> 
    
    

    相关文章

      网友评论

          本文标题:flink sql -mysql cdc 到hudi表在输

          本文链接:https://www.haomeiwen.com/subject/kvrbnrtx.html