1. 场景, 将hbase 表中的数据导入到mysql 中
·官网参考·
https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html
https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/connectors/hbase.html
2. 需要添加的pom 文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.wudlflink12</groupId>
<artifactId>wudl-flink-12</artifactId>
<version>1.0-SNAPSHOT</version>
<!-- 指定仓库位置,依次为aliyun、apache和cloudera仓库 -->
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>apache</id>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>spring-plugin</id>
<url>https://repo.spring.io/plugins-release/</url>
</repository>
</repositories>
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.11</scala.version>
<flink.version>1.12.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>1.12.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hbase-2.2_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>1.10.3</version>
</dependency>
<!--依赖Scala语言-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.11</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- blink执行计划,1.11+默认的-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink连接器-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.12</artifactId>
<version>${flink.version}</version>
</dependency>-->
<!--<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.12</artifactId>
<version>${flink.version}</version>
</dependency>-->
<!--<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_2.12</artifactId>
<version>${flink.version}</version>
</dependency>-->
<!--<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.10.0</version>
</dependency>-->
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
<exclusions>
<exclusion>
<artifactId>flink-streaming-java_2.11</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-runtime_2.11</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<!-- <exclusion>-->
<!-- <artifactId>flink-core</artifactId>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- </exclusion>-->
<exclusion>
<artifactId>flink-java</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>2.1.0</version>
<exclusions>
<exclusion>
<artifactId>hadoop-hdfs</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.1.0</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.7.5-10.0</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.0</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
<!--<version>8.0.20</version>-->
</dependency>
<!-- 高性能异步组件:Vertx-->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-jdbc-client</artifactId>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-redis-client</artifactId>
<version>3.9.0</version>
</dependency>
<!-- 日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.44</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<scope>provided</scope>
</dependency>
<!-- 参考:https://blog.csdn.net/f641385712/article/details/84109098-->
<!--<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.4</version>
</dependency>-->
<!--<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libfb303</artifactId>
<version>0.9.3</version>
<type>pom</type>
<scope>provided</scope>
</dependency>-->
<!--<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.2-jre</version>
</dependency>-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.3</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
3. 建表
3.1 mysql 建表
需要注意 建表的编码: utf8mb4
CREATE TABLE `wudlHbase` (
`id` int(11) DEFAULT NULL,
`name` varchar(64) CHARACTER SET latin1 DEFAULT NULL,
`address` varchar(64) CHARACTER SET latin1 DEFAULT NULL,
`age` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
然后给列加编码:
ALTER TABLE `wudlHbase` MODIFY NAME VARCHAR(64) CHARACTER SET utf8;
ALTER TABLE `wudlHbase` MODIFY id VARCHAR(64) CHARACTER SET utf8;
ALTER TABLE `wudlHbase` MODIFY address VARCHAR(64) CHARACTER SET utf8;
ALTER TABLE `wudlHbase` MODIFY age VARCHAR(64) CHARACTER SET utf8;
ALTER TABLE `wudlHbase` MODIFY NAME VARCHAR(20) CHARACTER SET utf8;
3.2 hbase 表结构
hbase(main):003:0> desc 'wudlHbase'
Table wudlHbase is ENABLED
wudlHbase
COLUMN FAMILIES DESCRIPTION
{NAME => 'cf', VERSIONS => '1', EVICT_BLOCKS_ON_CLOSE => 'false', NEW_VERSION_BEHAVIOR => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'false', DATA_BLOC
K_ENCODING => 'NONE', TTL => 'FOREVER', MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'ROW', CACHE_INDEX_ON_WRITE => 'false', IN_MEMORY => 'false', CACHE_B
LOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS_ON_OPEN => 'false', COMPRESSION => 'NONE', BLOCKCACHE => 'true', BLOCKSIZE => '65536'}
1 row(s)
Took 0.0686 seconds
hbase(main):004:0>
表数据
hbase(main):002:0> scan 'wudlHbase'
ROW COLUMN+CELL
10000 column=cf:address, timestamp=1638633405738, value=\xE5\xB9\xBF\xE4\xB8\x9C\xE7\x9C\x810
10000 column=cf:age, timestamp=1638633405738, value=10
10000 column=cf:name, timestamp=1638633405738, value=hdfs0
10001 column=cf:address, timestamp=1638633405741, value=\xE5\xB9\xBF\xE4\xB8\x9C\xE7\x9C\x811
10001 column=cf:age, timestamp=1638633405741, value=11
10001 column=cf:name, timestamp=1638633405741, value=hdfs1
10002 column=cf:address, timestamp=1638633405743, value=\xE5\xB9\xBF\xE4\xB8\x9C\xE7\x9C\x812
10002 column=cf:age, timestamp=1638633405743, value=12
10002 column=cf:name, timestamp=1638633405743, value=hdfs2
10003 column=cf:address, timestamp=1638633405746, value=\xE5\xB9\xBF\xE4\xB8\x9C\xE7\x9C\x813
10003 column=cf:age, timestamp=1638633405746, value=13
10003 column=cf:name, timestamp=1638633405746, value=hdfs3
10004 column=cf:address, timestamp=1638633405748, value=\xE5\xB9\xBF\xE4\xB8\x9C\xE7\x9C\x814
10004 column=cf:age, timestamp=1638633405748, value=14
10004 column=cf:name, timestamp=1638633405748, value=hdfs4
10005 column=cf:address, timestamp=1638633405750, value=\xE5\xB9\xBF\xE4\xB8\x9C\xE7\x9C\x815
10005 column=cf:age, timestamp=1638633405750, value=15
10005 column=cf:name, timestamp=1638633405750, value=hdfs5
10006 column=cf:address, timestamp=1638633405752, value=\xE5\xB9\xBF\xE4\xB8\x9C\xE7\x9C\x816
10006 column=cf:age, timestamp=1638633405752, value=16
10006 column=cf:name, timestamp=1638633405752, value=hdfs6
10007 column=cf:address, timestamp=1638633405753, value=\xE5\xB9\xBF\xE4\xB8\x9C\xE7\x9C\x817
10007 column=cf:age, timestamp=1638633405753, value=17
10007 column=cf:name, timestamp=1638633405753, value=hdfs7
10008 column=cf:address, timestamp=1638633405755, value=\xE5\xB9\xBF\xE4\xB8\x9C\xE7\x9C\x818
10008 column=cf:age, timestamp=1638633405755, value=18
10008 column=cf:name, timestamp=1638633405755, value=hdfs8
10009 column=cf:address, timestamp=1638633405757, value=\xE5\xB9\xBF\xE4\xB8\x9C\xE7\x9C\x819
10009 column=cf:age, timestamp=1638633405757, value=19
10009 column=cf:name, timestamp=1638633405757, value=hdfs9
10 row(s)
Took 0.1297 seconds
4. 代码实现:
package com.wudl.flink.source;
import com.wudl.flink.bean.HbaseUser;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
/**
* @author :wudl
* @date :Created in 2021-12-14 23:28
* @description:
* @modified By:
* @version: 1.0
*/
public class HbaseSouce {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
TableResult tableResult = tableEnv.executeSql(
"CREATE TABLE testWudlHbase (" +
" rowkey STRING," +
" cf ROW<name STRING,address STRING,age STRING>," +
" PRIMARY KEY (rowkey) NOT ENFORCED" +
" ) WITH (" +
" 'connector' = 'hbase-2.2' ," +
" 'table-name' = 'wudlHbase' ," +
" 'zookeeper.quorum' = '192.168.1.161:2181'" +
" )");
TableResult outPutTable = tableEnv.executeSql("CREATE TABLE MySqlwudlHbase (" +
"id INT ," +
"name STRING ," +
"address STRING , " +
"age INT " +
") " +
"WITH (" +
"'connector' = 'jdbc'," +
"'url' = 'jdbc:mysql://192.168.1.180:3306/test?useUnicode=true&characterEncoding=UTF-8'," +
"'table-name' = 'wudlHbase'," +
" 'username' = 'root'," +
" 'password' = '123456'" +
" )");
// tableEnv.executeSql(" SELECT cast(rowkey as INT) id, cf.name name ,cf.address address , cast(cf.age as BIGINT) age FROM testWudlHbase");
tableEnv.executeSql(" insert into MySqlwudlHbase SELECT cast(rowkey as INT) id, cf.name name ,cf.address address , cast(cf.age as INT) age FROM testWudlHbase");
5. 查看结果

6. 读取hbase 的数据:
package com.wudl.flink.source;
import com.wudl.flink.bean.HbaseUser;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
/**
* @author :wudl
* @date :Created in 2021-12-14 23:28
* @description:
* @modified By:
* @version: 1.0
*/
public class HbaseSouce {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
TableResult tableResult = tableEnv.executeSql(
"CREATE TABLE testWudlHbase (" +
" rowkey STRING," +
" cf ROW<name STRING,address STRING,age STRING>," +
" PRIMARY KEY (rowkey) NOT ENFORCED" +
" ) WITH (" +
" 'connector' = 'hbase-2.2' ," +
" 'table-name' = 'wudlHbase' ," +
" 'zookeeper.quorum' = '192.168.1.161:2181'" +
" )");
TableResult outPutTable = tableEnv.executeSql("CREATE TABLE MySqlwudlHbase (" +
"id INT ," +
"name STRING ," +
"address STRING , " +
"age INT " +
") " +
"WITH (" +
"'connector' = 'jdbc'," +
"'url' = 'jdbc:mysql://192.168.1.180:3306/test?useUnicode=true&characterEncoding=UTF-8'," +
"'table-name' = 'wudlHbase'," +
" 'username' = 'root'," +
" 'password' = '123456'" +
" )");
// tableEnv.executeSql(" SELECT cast(rowkey as INT) id, cf.name name ,cf.address address , cast(cf.age as BIGINT) age FROM testWudlHbase");
// tableEnv.executeSql(" insert into MySqlwudlHbase SELECT cast(rowkey as INT) id, cf.name name ,cf.address address , cast(cf.age as INT) age FROM testWudlHbase");
// 相当于 scan
Table table = tableEnv.sqlQuery("SELECT * FROM testWudlHbase");
// 查询的结果
TableResult executeResult = table.execute();
// 获取查询结果
CloseableIterator<Row> collect = executeResult.collect();
// 输出 (执行print或者下面的 Consumer之后,数据就被消费了。两个只能留下一个)
// executeResult.print();
List<HbaseUser> hbaseUsers = new ArrayList<>();
collect.forEachRemaining(new Consumer<Row>() {
@Override
public void accept(Row row) {
String field0 = String.valueOf(row.getField(0));
String cl = String.valueOf(row.getField(1));
String[] hUser = cl.split(",");
hbaseUsers.add(new HbaseUser(hUser[0].toString(),hUser[1].toString()));
}
});
System.out.println("................");
for(HbaseUser um : hbaseUsers){
System.out.println(um);
}
}
}

网友评论