美文网首页
Flink cdc同步mysql

Flink cdc同步mysql

作者: wudl | 来源:发表于2021-09-25 23:37 被阅读0次

    1.场景

    两张mysql 的表通过cdc 进行同步
    

    2. 准备条件

    Flink 1.12.4
    Mysql 5.7
    CDC 1.2

    3 依赖

    [root@basenode lib]# ll
    总用量 350476
    -rw-r--r-- 1 root root     661717 9月  25 21:01 fastjson-1.2.78.jar
    -rw-r--r-- 1 root root     194725 9月  25 20:18 flink-connector-jdbc_2.11-1.12.4.jar
    -rw-r--r-- 1 root root   27135783 9月  25 20:38 flink-connector-mysql-cdc-1.2.0.jar
    -rw-r--r-- 1  501 games     89597 5月  11 05:03 flink-csv-1.12.4.jar
    -rw-r--r-- 1  501 games 114594794 5月  11 05:07 flink-dist_2.11-1.12.4.jar
    -rw-r--r-- 1 root root      81363 9月  25 20:18 flink-hadoop-compatibility_2.12-1.12.0.jar
    -rw-r--r-- 1 root root     136663 9月  25 21:00 flink-json-1.12.0.jar
    -rw-r--r-- 1  501 games    134826 5月  11 05:03 flink-json-1.12.4.jar
    -rw-r--r-- 1 root root   43317025 9月  25 20:18 flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
    -rw-r--r-- 1  501 games   7709741 10月  8 2020 flink-shaded-zookeeper-3.4.14.jar
    -rw-r--r-- 1 root root   38101480 9月  25 20:18 flink-sql-connector-hive-2.3.6_2.11-1.11.0.jar
    -rw-r--r-- 1  501 games  36096225 5月  11 05:06 flink-table_2.11-1.12.4.jar
    -rw-r--r-- 1 root root     118412 9月  25 20:40 flink-table-api-java-bridge_2.11-1.12.4.jar
    -rw-r--r-- 1  501 games  40258604 5月  11 05:06 flink-table-blink_2.11-1.12.4.jar
    -rw-r--r-- 1 root root     822850 9月  25 20:50 flink-table-common-1.12.4.jar
    -rw-r--r-- 1 root root   23265394 9月  25 20:18 iceberg-flink-runtime-0.12.0.jar
    -rw-r--r-- 1 root root   23083607 9月  25 20:18 iceberg-hive-runtime-0.12.0.jar
    -rw-r--r-- 1  501 games     67114 2月  21 2020 log4j-1.2-api-2.12.1.jar
    -rw-r--r-- 1  501 games    276771 2月  21 2020 log4j-api-2.12.1.jar
    -rw-r--r-- 1  501 games   1674433 2月  21 2020 log4j-core-2.12.1.jar
    -rw-r--r-- 1  501 games     23518 2月  21 2020 log4j-slf4j-impl-2.12.1.jar
    -rw-r--r-- 1 root root    1007502 9月  25 20:18 mysql-connector-java-5.1.47.jar
    [root@basenode lib]# pwd
    /opt/module/flink/flink-1.12.4/lib
    [root@basenode lib]# 
    
    

    3.1 配置mysql

    [root@basenode ~]# vi /etc/my.cnf
    
    # For advice on how to change settings please see
    # http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
    
    [mysqld]
    max_allowed_packet=1024M
    log-bin=mysql-bin
    server-id=180
    binlog-format=row
    
    binlog-do-db=test
    
    expire_logs_days=30
    ## 4.Mysql  建表
      ### 4.1 原表
    ```bash
    CREATE TABLE `Flink_iceberg` (
      `id` bigint(64) NOT NULL,
      `name` varchar(64) DEFAULT NULL,
      `age` int(20) DEFAULT NULL,
      `dt` varchar(64) DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=latin1
    

    4.2 目标表

    CREATE TABLE `Flink_iceberg-cdc` (
      `id` bigint(64) NOT NULL,
      `name` varchar(64) DEFAULT NULL,
      `age` int(20) DEFAULT NULL,
      `dt` varchar(64) DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=latin1
    

    5. 进入flink sql

    ./sql-client.sh embedded
    

    5.1 flink sql cdc 连接

    create table Flink_icebergcdc05(id bigint, name string, age int,dt string)
    with(
      'connector' = 'mysql-cdc',
      'hostname' = '192.168.1.180',
      'port' = '3306',
      'username' = 'root',
      'password' = '123456',
      'database-name' = 'test',
      'table-name' = 'Flink_iceberg'
    );
    

    5.2 flink mysql 连接

    标注为主键

    create table Flink_iceberg07(id bigint primary key, name string, age int,dt string)
    with(
        'connector' = 'jdbc',
        'url' = 'jdbc:mysql://192.168.1.180:3306/test',
        'username'='root',
        'password'='123456',
        'table-name' = 'Flink_iceberg-cdc',
         'sink.buffer-flush.max-rows'='1',
          'sink.buffer-flush.interval'='0'
    );
    

    5.3 插入数据

    insert into Flink_iceberg07 select * from Flink_icebergcdc05;
    

    6. 整体操作

    Flink SQL> create table Flink_icebergcdc05(id bigint, name string, age int,dt string)
    > with(
    >   'connector' = 'mysql-cdc',
    >   'hostname' = '192.168.1.180',
    >   'port' = '3306',
    >   'username' = 'root',
    >   'password' = '123456',
    >   'database-name' = 'test',
    >   'table-name' = 'Flink_iceberg'
    > );
    [INFO] Table has been created.
    
    
    Flink SQL> create table Flink_iceberg07(id bigint primary key, name string, age int,dt string)
    > with(
    >     'connector' = 'jdbc',
    >     'url' = 'jdbc:mysql://192.168.1.180:3306/test',
    >     'username'='root',
    >     'password'='123456',
    >     'table-name' = 'Flink_iceberg-cdc',
    >  'sink.buffer-flush.max-rows'='1',
    >   'sink.buffer-flush.interval'='0'
    > );
    [INFO] Table has been created.
    
    
    link SQL> select * from Flink_iceberg07;
    [INFO] Result retrieval cancelled.
    
    Flink SQL> insert into Flink_iceberg07 select * from Flink_icebergcdc05;
    [INFO] Submitting SQL update statement to the cluster...
    Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
    [INFO] Table update statement has been successfully submitted to the cluster:
    Job ID: 1a86e138627179f6f44dd332871e39df
    
    
    
    

    7.成功

    Flink SQL> insert into Flink_iceberg07 select * from Flink_icebergcdc05;
    [INFO] Submitting SQL update statement to the cluster...
    [INFO] Table update statement has been successfully submitted to the cluster:
    Job ID: 325c988dd896e261c167f90e18b5f879
    
    
    Flink SQL> select * from Flink_iceberg07;
    
                                                                                              SQL Query Result (Table)                                                                                          
     Table program finished.                                                                      Page: Last of 1                                                                         Updated: 23:35:03.544 
    
                            id                      name                       age                        dt
                         10002          flink-cdc-update                        22                2021-09-25
                         10011               flink-mysql                        19                2021-09-24
                         10012               flink-mysqA                        19                2021-09-24
    
    
    
    

    相关文章

      网友评论

          本文标题:Flink cdc同步mysql

          本文链接:https://www.haomeiwen.com/subject/kdbfnltx.html