美文网首页
三、table API-MSQL到MYSQL表搬家

三、table API-MSQL到MYSQL表搬家

作者: Nick_4438 | 来源:发表于2022-09-07 19:26 被阅读0次

    数据库准备

    create table demo.source
    (
        ID   int auto_increment
            primary key,
        data varchar(50) not null
    );
    
    create table demo.dest_table
    (
        ID   int auto_increment
            primary key,
        data varchar(50) not null
    );
    

    数据处理

    下面演示如何从mysql读取一个表,然后将数据搬运到另外一个表内。

    #!# --coding:utf-8 --
    
    
    from pyflink.table import EnvironmentSettings, TableEnvironment
    from pyflink.table.expressions import col
    
    # 1. 创建TableEnvironment
    env_settings = EnvironmentSettings.in_streaming_mode()
    table_env = TableEnvironment.create(env_settings)
    
    
    # 2. 创建源表Table
    table_env.execute_sql("""
        CREATE TABLE source (
            id INT,
            data STRING
        ) WITH (
        'connector.type' = 'jdbc',
        'connector.url' = 'jdbc:mysql://10.201.103.144:3308/demo?useSSL=false&useUnicode=true&characterEncoding=utf8',
        'connector.table' = 'source',
        'connector.username' = 'crcpasepd',
        'connector.password' = 'Crcportal_123',
        'connector.write.flush.interval' = '1s',
        'connector.driver' = 'com.mysql.cj.jdbc.Driver'
        )
    """)
    
    # 3. 创建sink表
    table_env.execute_sql("""
        CREATE TABLE dest_table (
            id INT,
            data STRING
        ) WITH (
        'connector.type' = 'jdbc',
        'connector.url' = 'jdbc:mysql://10.201.103.144:3308/demo?useSSL=false&useUnicode=true&characterEncoding=utf8',
        'connector.table' = 'dest_table',
        'connector.username' = 'crcpasepd',
        'connector.password' = 'Crcportal_123',
        'connector.write.flush.interval' = '1s',
        'connector.driver' = 'com.mysql.cj.jdbc.Driver'
        )
    """)
    
    # 4. 查询源表,然后执行一些计算
    # 利用表API创建一个查询源表:
    source_table = table_env.from_path("source")
    # 或者用sql创建一个查询源表:
    # source_table = table_env.sql_query("SELECT * FROM datagen")
    # source_table加工成result table
    result_table = source_table.select(col("id") + 1, col("data"))
    
    # 输出到print表中
    result_table.execute_insert("dest_table").wait()
    # emit results via SQL query:
    # table_env.execute_sql("INSERT INTO dest_table SELECT * FROM source").wait()
    
    
    
    

    相关文章

      网友评论

          本文标题:三、table API-MSQL到MYSQL表搬家

          本文链接:https://www.haomeiwen.com/subject/ohognrtx.html