1. 通过Flink Sql 将mysql 的数据同步到Mysql 中
套路
官网示例:
// create a TableEnvironment for batch or streaming execution
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// create an input Table
tableEnv.executeSql("CREATE TEMPORARY TABLE table1 ... WITH ( 'connector' = ... )");
// register an output Table
tableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector' = ... )");
// create a Table object from a Table API query
Table table2 = tableEnv.from("table1").select(...);
// create a Table object from a SQL query
Table table3 = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");
// emit a Table API result Table to a TableSink, same for SQL result
TableResult tableResult = table2.executeInsert("outputTable");
tableResult...
mysqlk 同步到Mysql 中 总结为:
准备环境 ----> 准备源表 -----> 准备目标表 ----> (查询原表插入目标表)
2. 我的实现 原表 ---> 目标表 -----> 查询原表插入到目标表
package com.wudl.flink.examples;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.jdbc.catalog.JdbcCatalog;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* @ClassName : FlinkSqlMysqlToMySql
* @Description : Flink sql-mysql
* @Author :wudl
* @Date: 2021-08-24 23:28
*/
public class FlinkSqlMysqlToMySql02 {
public static void main(String[] args) {
String driverClass = "com.mysql.jdbc.Driver";
String dbUrl = "jdbc:mysql://192.168.1.180:3306/MyFlink";
String userNmae = "root";
String passWord = "123456";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamTableEnvironment tableEvn = StreamTableEnvironment .create(env,settings);
//1. 指定方言
tableEvn.getConfig().setSqlDialect(SqlDialect.DEFAULT);
String schema = "id INT ,name STRING";
String source_table = "student";
String flink_sink_table = "SinkStudent";
TableResult inputTable = tableEvn.executeSql("CREATE TABLE sourceTable (" +
"id int ," +
"name varchar" +
") " +
"WITH (" +
"'connector' = 'jdbc'," +
"'url' = '" + dbUrl + "'," +
"'table-name' = '"+source_table +"'," +
" 'username' = '" + userNmae + "'," +
" 'password' = '" + passWord + "'" +
" )");
TableResult outPutTable = tableEvn.executeSql("CREATE TABLE outTable (" +
"id int ," +
"name varchar" +
") " +
"WITH (" +
"'connector' = 'jdbc'," +
"'url' = '" + dbUrl + "'," +
"'table-name' = '"+flink_sink_table+"'," +
" 'username' = '" + userNmae + "'," +
" 'password' = '" + passWord + "'" +
" )");
String sql = " select id,name from sourceTable";
Table ResultTable = tableEvn.sqlQuery(sql);
// 使用追加流 多次同步防止 数据重复
DataStream<Tuple2<Boolean, Row>> resultDS = tableEvn.toRetractStream(ResultTable, Row.class);
tableEvn.executeSql("insert into outTable select * from "+ResultTable);
tableEvn.executeSql("insert into outTable select * from sourceTable" );
System.out.println("数据写入mysql成功");
}
}
Flink-sql mysql-mysql.png
3. 需要注意的是:
数据类型的对应
也就是Flink 中建的表对应到Mysql 表中的数据类型
官网链接 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html
下一个版本加批量 加视图 查询拉去 。
网友评论