美文网首页Flink
flink cdc 整合 数据湖hudi 同步 hive

flink cdc 整合 数据湖hudi 同步 hive

作者: wudl | 来源:发表于2022-02-18 00:43 被阅读0次

1. 版本说明

组件 版本
hudi 10.0
flink 13.5
hive 3.1.0

2. 实现效果 通过flink cdc 整合 hudi 到hive

flink cdc 讲解
flink cdc 1.2实例
flink cdc 2.0 实例

3.flink 需要的jar 包

需要的包:flink-connector-mysql-cdc-2.0.2.jar

-rw-r--r-- 1 root root   7802399 2月  16 00:36 doris-flink-1.0-SNAPSHOT.jar
-rw-r--r-- 1 root root    249571 2月  16 00:36 flink-connector-jdbc_2.12-1.13.5.jar
-rw-r--r-- 1 root root    359138 2月  16 00:36 flink-connector-kafka_2.12-1.13.5.jar
-rw-r--r-- 1 root root  30087268 2月  17 22:12 flink-connector-mysql-cdc-2.0.2.jar
-rw-r--r-- 1 root root     92315 2月  16 00:36 flink-csv-1.13.5.jar
-rw-r--r-- 1 root root 106535830 2月  16 00:36 flink-dist_2.12-1.13.5.jar
-rw-r--r-- 1 root root    148127 2月  16 00:36 flink-json-1.13.5.jar
-rw-r--r-- 1 root root  43317025 2月  16 00:36 flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
-rw-r--r-- 1 root root   7709740 2月  16 00:36 flink-shaded-zookeeper-3.4.14.jar
-rw-r--r-- 1 root root   3674116 2月  16 00:36 flink-sql-connector-kafka_2.12-1.13.5.jar
-rw-r--r-- 1 root root  35051557 2月  16 00:35 flink-table_2.12-1.13.5.jar
-rw-r--r-- 1 root root  38613344 2月  16 00:36 flink-table-blink_2.12-1.13.5.jar
-rw-r--r-- 1 root root  62447468 2月  16 00:36 hudi-flink-bundle_2.12-0.10.0.jar
-rw-r--r-- 1 root root  17276348 2月  16 00:36 hudi-hadoop-mr-bundle-0.10.0.jar
-rw-r--r-- 1 root root    207909 2月  16 00:36 log4j-1.2-api-2.16.0.jar
-rw-r--r-- 1 root root    301892 2月  16 00:36 log4j-api-2.16.0.jar
-rw-r--r-- 1 root root   1789565 2月  16 00:36 log4j-core-2.16.0.jar
-rw-r--r-- 1 root root     24258 2月  16 00:36 log4j-slf4j-impl-2.16.0.jar
-rw-r--r-- 1 root root    724213 2月  16 00:36 mysql-connector-java-5.1.9.jar
[root@node01 lib]# pwd
/opt/module/flink/flink-1.13.5/lib
[root@node01 lib]# 

4. 实现功能场景

在这里插入图片描述

5. 实现步骤

1.创建数据库表,并且配置binlog 文件
2.在flinksql 中创建flink cdc 表
3.创建视图
4.创建输出表,关联Hudi表,并且自动同步到Hive表
5.查询视图数据,插入到输出表 -- flink  后台实时执行

5.1 开启mysql binlog

server-id=162
log-bin=mysql-bin
#sync-binlog=1
# 指定不同步的库
binlog-ignore-db=information_schema
binlog-ignore-db=performance_schema
binlog-ignore-db=sys
binlog-ignore-db=mysql
binlog_format=ROW
expire_logs_days=30
binlog_row_image=full
#指定同步的库
#binlog-do-db=test

重启mysql service mysqld restart

5.2 创建mysql 表

CREATE TABLE `Flink_cdc` (
  `id` BIGINT(64) AUTO_INCREMENT PRIMARY KEY,
  `name` VARCHAR(64)  NULL,
  `age` INT(20) NULL,
    birthday TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
   ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL
) ;
INSERT INTO `wudldb`.`Flink_cdc`(NAME,age) VALUES("flink",18) ;

5.3 在flinksql 中 创建flinkcdc 表

Flink SQL> CREATE TABLE source_mysql (
   id BIGINT PRIMARY KEY NOT ENFORCED,
   name STRING,
   age INT,
   birthday TIMESTAMP(3),
   ts TIMESTAMP(3)
 ) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = '192.168.1.162',
 'port' = '3306',
 'username' = 'root',
 'password' = '123456',
 'server-time-zone' = 'Asia/Shanghai',
 'debezium.snapshot.mode' = 'initial',
 'database-name' = 'wudldb',
 'table-name' = 'Flink_cdc'
 );
[INFO] Execute statement succeed.

5.4 创建flinksql 中的 flinkcdc 视图

Flink SQL> create view view_source_flinkcdc_mysql 
> AS 
> SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') as part FROM source_mysql;
[INFO] Execute statement succeed.

5.5 创建输出表,关联Hudi表,并且自动同步到Hive表

Flink SQL> CREATE TABLE flink_cdc_sink_hudi_hive(
> id bigint ,
> name string,
> age int,
> birthday TIMESTAMP(3),
> ts TIMESTAMP(3),
> part VARCHAR(20),
> primary key(id) not enforced
> )
> PARTITIONED BY (part)
> with(
> 'connector'='hudi',
> 'path'= 'hdfs://192.168.1.161:8020/flink_cdc_sink_hudi_hive', 
> 'table.type'= 'MERGE_ON_READ',
> 'hoodie.datasource.write.recordkey.field'= 'id', 
> 'write.precombine.field'= 'ts',
> 'write.tasks'= '1',
> 'write.rate.limit'= '2000', 
> 'compaction.tasks'= '1', 
> 'compaction.async.enabled'= 'true',
> 'compaction.trigger.strategy'= 'num_commits',
> 'compaction.delta_commits'= '1',
> 'changelog.enabled'= 'true',
> 'read.streaming.enabled'= 'true',
> 'read.streaming.check-interval'= '3',
> 'hive_sync.enable'= 'true',
> 'hive_sync.mode'= 'hms',
> 'hive_sync.metastore.uris'= 'thrift://node02.com:9083',
> 'hive_sync.jdbc_url'= 'jdbc:hive2://node02.com:10000',
> 'hive_sync.table'= 'flink_cdc_sink_hudi_hive',
> 'hive_sync.db'= 'db_hive',
> 'hive_sync.username'= 'root',
> 'hive_sync.password'= '123456',
> 'hive_sync.support_timestamp'= 'true'
> );
[INFO] Execute statement succeed.

5.6 . 查询视图数据,插入到输出表

Flink SQL> INSERT INTO flink_cdc_sink_hudi_hive SELECT id, name,age,birthday, ts, part FROM view_source_flinkcdc_mysql ;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: c618c9f528b9793adf4418640bb2a0fc

5.7 查看flink 运行job

在这里插入图片描述

6.hudi 与hive 整合

将hudi hudi-hadoop-mr-bundle-0.10.0.jar 拷贝到hive的lib 目录下面 , 重启hive 服务

6.1 连接hive 查看hudi 同步到hive 中的表

0: jdbc:hive2://node01.com:2181,node02.com:21> show tables;
INFO  : Compiling command(queryId=hive_20220218000941_016798b7-3ecd-4c41-ae54-65e6a034968f): show tables
INFO  : Semantic Analysis Completed (retrial = false)
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:tab_name, type:string, comment:from deserializer)], properties:null)
INFO  : Completed compiling command(queryId=hive_20220218000941_016798b7-3ecd-4c41-ae54-65e6a034968f); Time taken: 0.016 seconds
INFO  : Executing command(queryId=hive_20220218000941_016798b7-3ecd-4c41-ae54-65e6a034968f): show tables
INFO  : Starting task [Stage-0:DDL] in serial mode
INFO  : Completed executing command(queryId=hive_20220218000941_016798b7-3ecd-4c41-ae54-65e6a034968f); Time taken: 0.012 seconds
INFO  : OK
+------------------------------+
|           tab_name           |
+------------------------------+
| flink_cdc_sink_hudi_hive_ro  |
| flink_cdc_sink_hudi_hive_rt  |
+------------------------------+

6.1 查询

0: jdbc:hive2://node01.com:2181,node02.com:21> select id ,name , age , birthday from flink_cdc_sink_hudi_hive_ro;
INFO  : Compiling command(queryId=hive_20220218003353_57a46dca-3cd2-4da1-b455-bbd63da16413): select id ,name , age , birthday from flink_cdc_sink_hudi_hive_ro
INFO  : Semantic Analysis Completed (retrial = false)
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:id, type:bigint, comment:null), FieldSchema(name:name, type:string, comment:null), FieldSchema(name:age, type:int, comment:null), FieldSchema(name:birthday, type:bigint, comment:null)], properties:null)
INFO  : Completed compiling command(queryId=hive_20220218003353_57a46dca-3cd2-4da1-b455-bbd63da16413); Time taken: 0.124 seconds
INFO  : Executing command(queryId=hive_20220218003353_57a46dca-3cd2-4da1-b455-bbd63da16413): select id ,name , age , birthday from flink_cdc_sink_hudi_hive_ro
INFO  : Completed executing command(queryId=hive_20220218003353_57a46dca-3cd2-4da1-b455-bbd63da16413); Time taken: 0.029 seconds
INFO  : OK
+-----+--------+------+----------------+
| id  |  name  | age  |    birthday    |
+-----+--------+------+----------------+
| 1   | flink  | 18   | 1645142397000  |
+-----+--------+------+----------------+
1 row selected (0.278 seconds)
0: jdbc:hive2://node01.com:2181,node02.com:21> 

整体效果


在这里插入图片描述

错误 中途遇到一个错误

flinkcdc 需要的 flink-connector-mysql-cdc-2.0.2.jar 而不是 flink-sql-connector-mysql-cdc-2.0.2.jar 这个包
否在会遇到一下错误:

Flink SQL> select * from users_source_mysql;


Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
    at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
    at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/connect/data/Schema
    at java.lang.Class.getDeclaredMethods0(Native Method)
    at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
    at java.lang.Class.getDeclaredMethod(Class.java:2128)
    at java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1629)
    at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:79)
    at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520)
    at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:494)
    at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
    at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62)
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobVertex(StreamingJobGraphGenerator.java:597)
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:457)
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:378)
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:179)
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:117)
    at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:934)
    at org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:50)
    at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:39)
    at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:56)
    at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:67)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1957)
    at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:55)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:795)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1225)
    at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeOperation$3(LocalExecutor.java:213)
    at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:90)
    at org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:213)
    at org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:235)
    at org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:479)
    at org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:412)
    at org.apache.flink.table.client.cli.CliClient.lambda$executeStatement$0(CliClient.java:327)
    at java.util.Optional.ifPresent(Optional.java:159)
    at org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:327)
    at org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297)
    at org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221)
    at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151)
    at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95)
    at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
    ... 1 more
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.connect.data.Schema
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 69 more

Shutting down the session...
done.
[root@node01 bin]# 

相关文章

网友评论

    本文标题:flink cdc 整合 数据湖hudi 同步 hive

    本文链接:https://www.haomeiwen.com/subject/ryyelrtx.html