前言
本文针对数据库CDC(change data capture)场景设计,探讨基于Flink1.12最新版本提供的实时写入Hive的技术可行性,下面为本地IDEA程序案例可供参考。
一、整体思路
数据库CDC(change data capture)是实时捕获数据库中的变化数据,经过处理之后(也可能无需处理),将其更新到目标端的一种技术。为实现实时捕获,本文引入Debezium作为数据库connector,其提供了MongoDB、MySQL、PostgreSQL、SQL Server、Oracle、Db2 、Cassandra 、Vitess(其中Oracle、Db2 、Cassandra 、Vitess连接器正在孵化中)等数据库的连接器。并通过kafka topic将各种数据库的全量数据、存量数据、增量数据发送出去。而在数据处理环节只需要创建kafka cunsumer并订阅相应topic即可获取topic中的数据。另外,Flink本身也提供了Flink SQL CDC的技术支持(由阿里巴巴技术团队伍翀 (云邪大佬)等提供,献上大佬博客地址),但其在flink 1.12发布版本中并未提供,预计将在1.13版本中提供MySQL 、PostgreSQL等数据库的支持。
第一次启动debezium时,其会扫描全表并发送schema信息以及全量数据,然后在实时捕捉变化信息(增量数据)以保证源端和目标端的数据一致性。在发送全量数据之前,首先将数据库、表等的schema信息通过history topic发送出去,并为数据库的每一张表创建一个topic,topic名为<connector name>. <database name>.<table name>,其中connector name为连接器的名称,开发者可以自定义。该topic用于发送全量数据、存量数据以及增量数据。
1aa73888721f254667c0752e38e61b25.pngdebezium捕获到的表结构信息
93185a4ecaad47a01e0399c96d4bb57b.pngdebezium捕获到的数据信息
二、引入依赖
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.12.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.7.5-9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sequence-file</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.12.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.12.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.12.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.22</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.11</artifactId>
<version>1.12.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.12.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.12.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.12.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.12.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libfb303</artifactId>
<version>0.9.3</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
</dependencies>
三、创建执行环境
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.setParallelism(1);
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
streamEnv.enableCheckpointing(60000);
EnvironmentSettings envSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, envSettings);
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMinutes(1));
四、注册hive catalog
注册hive catalog用于读写hive
String name = "hiveCatalog"; // Catalog名称,定义一个唯一的名称表示
String defaultDatabase = "test"; // 默认数据库名称
String hiveConfDir = "D:\\"; // hive-site.xml路径
String version = "3.1.2"; // Hive版本号
HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir,version);
tableEnv.registerCatalog(name, hiveCatalog);
tableEnv.useCatalog(name);
image.gif
五、连接kafka数据源
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
//创建kafka cunsumer
FlinkKafkaConsumer<ObjectNode> flinkKafkaConsumer = new FlinkKafkaConsumer<ObjectNode>("topic", new JSONKeyValueDeserializationSchema(true), properties)
flinkKafkaConsumer.setStartFromEarliest(); // start from the earliest record possible
//将kafka cunsumer加入数据源
DataStream<String> stream = streamEnv.addSource(flinkKafkaConsumer);
六、业务相关代码
String[] fieldNames = {"id", "log", "op"};
TypeInformation[] types = {Types.STRING, Types.STRING, Types.STRING};
//以下为业务相关代码,不做详细展开
SingleOutputStreamOperator<Row> mapedStream= dataStreamSource.map(new GetIncDataMap(),new RowTypeInfo(types, fieldNames));
//flink流转换为表
tableEnv.createTemporaryView("kafkaRow", mapedStream);
七、执行具体sql
将kafka流表插入到hive中
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.executeSql("DROP TABLE IF EXISTS hivelogtab").print();
tableEnv.executeSql("CREATE TABLE hivelogtab ( id STRING,log STRING,op STRING)").print();
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
// CloseableIterator<Row> result = tableEnv.sqlQuery("SELECT id,log,op FROM kafkaRow").execute().collect();
// while(result.hasNext()){
// System.out.println(result.next());
// }
TableResult tableResult = tableEnv.executeSql("INSERT INTO hiveCatalog.test.hivelogtab SELECT id,log,op FROM kafkaRow");
streamEnv.execute("job");
八、测试
mysql源数据 0e3de1181dfe6da1aec8b89bb3853e66.png
hive目标端同步的数据
注:
由于hive是数据仓库,其本身为数据分析而生,不支持update、delete操作,但是在CDC场景中,不能保证源端数据库的操作全部为insert,因此可采用以下方式进行处理(参考美团数仓平台的架构与实践):
63204251a21339e74d44db2ff5545b9f.jpg图片源于网络,侵权删
TO DO :
schema信息中ddl语句的自动解析(包括源端与目标端的数据类型、关键字差异解析)
关注公众号 HEY DATA,一起交流更多
网友评论