1.场景
两张mysql 的表通过cdc 进行同步
2. 准备条件
Flink 1.12.4
Mysql 5.7
CDC 1.2
3 依赖
[root@basenode lib]# ll
总用量 350476
-rw-r--r-- 1 root root 661717 9月 25 21:01 fastjson-1.2.78.jar
-rw-r--r-- 1 root root 194725 9月 25 20:18 flink-connector-jdbc_2.11-1.12.4.jar
-rw-r--r-- 1 root root 27135783 9月 25 20:38 flink-connector-mysql-cdc-1.2.0.jar
-rw-r--r-- 1 501 games 89597 5月 11 05:03 flink-csv-1.12.4.jar
-rw-r--r-- 1 501 games 114594794 5月 11 05:07 flink-dist_2.11-1.12.4.jar
-rw-r--r-- 1 root root 81363 9月 25 20:18 flink-hadoop-compatibility_2.12-1.12.0.jar
-rw-r--r-- 1 root root 136663 9月 25 21:00 flink-json-1.12.0.jar
-rw-r--r-- 1 501 games 134826 5月 11 05:03 flink-json-1.12.4.jar
-rw-r--r-- 1 root root 43317025 9月 25 20:18 flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
-rw-r--r-- 1 501 games 7709741 10月 8 2020 flink-shaded-zookeeper-3.4.14.jar
-rw-r--r-- 1 root root 38101480 9月 25 20:18 flink-sql-connector-hive-2.3.6_2.11-1.11.0.jar
-rw-r--r-- 1 501 games 36096225 5月 11 05:06 flink-table_2.11-1.12.4.jar
-rw-r--r-- 1 root root 118412 9月 25 20:40 flink-table-api-java-bridge_2.11-1.12.4.jar
-rw-r--r-- 1 501 games 40258604 5月 11 05:06 flink-table-blink_2.11-1.12.4.jar
-rw-r--r-- 1 root root 822850 9月 25 20:50 flink-table-common-1.12.4.jar
-rw-r--r-- 1 root root 23265394 9月 25 20:18 iceberg-flink-runtime-0.12.0.jar
-rw-r--r-- 1 root root 23083607 9月 25 20:18 iceberg-hive-runtime-0.12.0.jar
-rw-r--r-- 1 501 games 67114 2月 21 2020 log4j-1.2-api-2.12.1.jar
-rw-r--r-- 1 501 games 276771 2月 21 2020 log4j-api-2.12.1.jar
-rw-r--r-- 1 501 games 1674433 2月 21 2020 log4j-core-2.12.1.jar
-rw-r--r-- 1 501 games 23518 2月 21 2020 log4j-slf4j-impl-2.12.1.jar
-rw-r--r-- 1 root root 1007502 9月 25 20:18 mysql-connector-java-5.1.47.jar
[root@basenode lib]# pwd
/opt/module/flink/flink-1.12.4/lib
[root@basenode lib]#
3.1 配置mysql
[root@basenode ~]# vi /etc/my.cnf
# For advice on how to change settings please see
# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
[mysqld]
max_allowed_packet=1024M
log-bin=mysql-bin
server-id=180
binlog-format=row
binlog-do-db=test
expire_logs_days=30
## 4.Mysql 建表
### 4.1 原表
```bash
CREATE TABLE `Flink_iceberg` (
`id` bigint(64) NOT NULL,
`name` varchar(64) DEFAULT NULL,
`age` int(20) DEFAULT NULL,
`dt` varchar(64) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1
4.2 目标表
CREATE TABLE `Flink_iceberg-cdc` (
`id` bigint(64) NOT NULL,
`name` varchar(64) DEFAULT NULL,
`age` int(20) DEFAULT NULL,
`dt` varchar(64) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1
5. 进入flink sql
./sql-client.sh embedded
5.1 flink sql cdc 连接
create table Flink_icebergcdc05(id bigint, name string, age int,dt string)
with(
'connector' = 'mysql-cdc',
'hostname' = '192.168.1.180',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'test',
'table-name' = 'Flink_iceberg'
);
5.2 flink mysql 连接
标注为主键
create table Flink_iceberg07(id bigint primary key, name string, age int,dt string)
with(
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.1.180:3306/test',
'username'='root',
'password'='123456',
'table-name' = 'Flink_iceberg-cdc',
'sink.buffer-flush.max-rows'='1',
'sink.buffer-flush.interval'='0'
);
5.3 插入数据
insert into Flink_iceberg07 select * from Flink_icebergcdc05;
6. 整体操作
Flink SQL> create table Flink_icebergcdc05(id bigint, name string, age int,dt string)
> with(
> 'connector' = 'mysql-cdc',
> 'hostname' = '192.168.1.180',
> 'port' = '3306',
> 'username' = 'root',
> 'password' = '123456',
> 'database-name' = 'test',
> 'table-name' = 'Flink_iceberg'
> );
[INFO] Table has been created.
Flink SQL> create table Flink_iceberg07(id bigint primary key, name string, age int,dt string)
> with(
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://192.168.1.180:3306/test',
> 'username'='root',
> 'password'='123456',
> 'table-name' = 'Flink_iceberg-cdc',
> 'sink.buffer-flush.max-rows'='1',
> 'sink.buffer-flush.interval'='0'
> );
[INFO] Table has been created.
link SQL> select * from Flink_iceberg07;
[INFO] Result retrieval cancelled.
Flink SQL> insert into Flink_iceberg07 select * from Flink_icebergcdc05;
[INFO] Submitting SQL update statement to the cluster...
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 1a86e138627179f6f44dd332871e39df
7.成功
Flink SQL> insert into Flink_iceberg07 select * from Flink_icebergcdc05;
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 325c988dd896e261c167f90e18b5f879
Flink SQL> select * from Flink_iceberg07;
SQL Query Result (Table)
Table program finished. Page: Last of 1 Updated: 23:35:03.544
id name age dt
10002 flink-cdc-update 22 2021-09-25
10011 flink-mysql 19 2021-09-24
10012 flink-mysqA 19 2021-09-24
网友评论