数据库准备
create table demo.source
(
ID int auto_increment
primary key,
data varchar(50) not null
);
create table demo.dest_table
(
ID int auto_increment
primary key,
data varchar(50) not null
);
数据处理
下面演示如何从mysql读取一个表,然后将数据搬运到另外一个表内。
#!# --coding:utf-8 --
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col
# 1. 创建TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
# 2. 创建源表Table
table_env.execute_sql("""
CREATE TABLE source (
id INT,
data STRING
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://10.201.103.144:3308/demo?useSSL=false&useUnicode=true&characterEncoding=utf8',
'connector.table' = 'source',
'connector.username' = 'crcpasepd',
'connector.password' = 'Crcportal_123',
'connector.write.flush.interval' = '1s',
'connector.driver' = 'com.mysql.cj.jdbc.Driver'
)
""")
# 3. 创建sink表
table_env.execute_sql("""
CREATE TABLE dest_table (
id INT,
data STRING
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://10.201.103.144:3308/demo?useSSL=false&useUnicode=true&characterEncoding=utf8',
'connector.table' = 'dest_table',
'connector.username' = 'crcpasepd',
'connector.password' = 'Crcportal_123',
'connector.write.flush.interval' = '1s',
'connector.driver' = 'com.mysql.cj.jdbc.Driver'
)
""")
# 4. 查询源表,然后执行一些计算
# 利用表API创建一个查询源表:
source_table = table_env.from_path("source")
# 或者用sql创建一个查询源表:
# source_table = table_env.sql_query("SELECT * FROM datagen")
# source_table加工成result table
result_table = source_table.select(col("id") + 1, col("data"))
# 输出到print表中
result_table.execute_insert("dest_table").wait()
# emit results via SQL query:
# table_env.execute_sql("INSERT INTO dest_table SELECT * FROM source").wait()
网友评论