美文网首页
(三)flink docker开发环境搭建

(三)flink docker开发环境搭建

作者: Nick_4438 | 来源:发表于2022-11-13 19:42 被阅读0次

    简介

    本文详细讲解怎么快速搭建一个flink的开发环境。笔者做以下假设

    • 读者已经安好docker,并对docker和docker-compose有一定了解
    • 读者熟悉mysql

    如果1、2不满足,还请读者自行补脑

    本文在docker内安装了一个flink jobmanager和一个flink taskmanager,再安装了一个mysql,然后使用flink sql创建任务,从mysql数据库内的表1同步数据到表2

    详细步骤

    环境准备

    • 新建一个test文件夹,在文件夹内新建一个docker-compose-yaml文件
    version: "2.2"
    services:
      jobmanager:
        container_name: jobmanager
        # image: apache/flink:1.14.6
        image: qiujiahong/flink:1.14.6_py
        networks:
          proxy:
            ipv4_address: 172.16.0.11
        ports:
          - "8081:8081"
        command: jobmanager
        environment:
          - |
            FLINK_PROPERTIES=
            jobmanager.rpc.address: jobmanager        
    
      taskmanager:
        container_name: taskmanager
        # image: apache/flink:1.14.6
        image: qiujiahong/flink:1.14.6_py
        networks:
          proxy:
            ipv4_address: 172.16.0.12
        depends_on:
          - jobmanager
        command: taskmanager
        scale: 1
        environment:
          - |
            FLINK_PROPERTIES=
            jobmanager.rpc.address: jobmanager
            taskmanager.numberOfTaskSlots: 2        
      # sql-client:
      #   image: apache/flink:1.14.6
      #   command: bin/sql-client.sh
      #   depends_on:
      #     - jobmanager
      #   environment:
      #     - |
      #       FLINK_PROPERTIES=
      #       jobmanager.rpc.address: jobmanager
      #       rest.address: jobmanager     
      mysql:
        image: qiujiaihong/example-mysql:1.1
        ports:
          - "3306:3306"
        networks:
          proxy:
            ipv4_address: 172.16.0.13
        environment:
          - MYSQL_ROOT_PASSWORD=123456
          - MYSQL_USER=mysqluser
          - MYSQL_PASSWORD=mysqlpw
    networks:
      proxy:
        ipam:
          config:
          - subnet: 172.16.0.0/24
    
    • 在tesk文件夹内,打开命令行窗口,启动容器
    #  启动
    docker-compose up
    # 关闭
    # docker-compose down
    
    
    • 准备mysql 数据
    docker exec -it jobmanager  bash
    mycli -h172.16.0.13 -uroot -p123456
    
    create database demo;
    use demo;
    
    # 创建源表
    CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512));
    ALTER TABLE products AUTO_INCREMENT = 101;
    INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"),(default,"car battery","12V car battery"),(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),(default,"hammer","12oz carpenter's hammer"),(default,"hammer","14oz carpenter's hammer"),(default,"hammer","16oz carpenter's hammer"),(default,"rocks","box of assorted rocks"),(default,"jacket","water resistent black wind breaker"),(default,"spare tire","24 inch spare tire");
    
    
    
    # 创建目标表
    CREATE TABLE dproducts (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512));
    
    
    

    使用命令行测试

    • 使用flink-sql.sh,创建flink关联表,并建立flinktask。
    docker exec -it jobmanager  bash ./bin/sql-client.sh
    
    SET execution.checkpointing.interval = 3s;
    -- 创建源表
    CREATE TABLE products (
        id INT,
        name STRING,
        description STRING,
        PRIMARY KEY (id) NOT ENFORCED
      ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = '172.16.0.13',
        'port' = '3306',
        'username' = 'root',
        'password' = '123456',
        'database-name' = 'demo',
        'table-name' = 'products'
      );
    -- 创建目标表
    CREATE TABLE dproducts (
      id BIGINT,
      name STRING,
      description STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
       'connector' = 'jdbc',
       'url' = 'jdbc:mysql://172.16.0.13:3306/demo?useSSL=false',
       'table-name' = 'dproducts',
       'username' = 'root',
       'password' = '123456'
    );
    # 执行该语句后,可登录mysql查看dproducts表结果
    INSERT INTO dproducts SELECT  p.id, p.name, p.description FROM products AS p;
    

    使用python 本地调试

    • 从mysql 表1取数,sink打印出来,创建程序文件demo.py
    from pyflink.table import EnvironmentSettings, TableEnvironment
    
    
    print('step 01')
    # 1. create a TableEnvironment
    env_settings = EnvironmentSettings.in_streaming_mode()
    table_env = TableEnvironment.create(env_settings)
    print('step 02')
    
    # table_env.get_config().get_configuration().set_string("pipeline.jars", "file:/opt/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar;file:/opt/flink/lib/mysql-connector-java-8.0.30.jar")
    print('step 03')
    
    # 2. create source Table=
    table_env.execute_sql("""
        CREATE TABLE products (
            id INT,
            name STRING,
            description STRING,
            PRIMARY KEY (id) NOT ENFORCED
        ) WITH (
            'connector' = 'mysql-cdc',
            'hostname' = '172.16.0.13',
            'port' = '3306',
            'username' = 'root',
            'password' = '123456',
            'database-name' = 'demo',
            'table-name' = 'products'
        )
    """)
    
    # 3. create sink Table
    table_env.execute_sql("""
        CREATE TABLE dproducts (
            id BIGINT,
            name STRING,
            description STRING
        ) WITH (
            'connector' = 'print'
        )
    """)
    
    print('step 04')
    table_env.execute_sql("INSERT INTO dproducts SELECT  p.id, p.name, p.description FROM products AS p").wait()
    print('step 06')
    
    
    • 执行测试程序
    # 拷贝成程序到容器内
    docker cp ./demo.py jobmanager:/opt/flink  
    # 执行,可看到原表的数据在命令行直接被打印了出来
    docker exec -it jobmanager python demo.py
    

    理论python程序本地开发电脑直接执行(提前装好相关依赖)

    • 从mysql表1 取数,sink到mysql表2内,创建程序文件demo1.py
    from pyflink.table import EnvironmentSettings, TableEnvironment
    
    
    print('step 01')
    # 1. create a TableEnvironment
    env_settings = EnvironmentSettings.in_streaming_mode()
    table_env = TableEnvironment.create(env_settings)
    print('step 02')
    
    table_env.get_config().get_configuration().set_string("pipeline.jars", "file:/opt/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar;file:/opt/flink/lib/mysql-connector-java-8.0.30.jar")
    print('step 03')
    
    # 2. create source Table
    table_env.execute_sql("""
        CREATE TABLE products (
            id INT,
            name STRING,
            description STRING,
            PRIMARY KEY (id) NOT ENFORCED
        ) WITH (
            'connector' = 'mysql-cdc',
            'hostname' = '172.16.0.13',
            'port' = '3306',
            'username' = 'root',
            'password' = '123456',
            'database-name' = 'demo',
            'table-name' = 'products'
        )
    """)
    # 3. create sink table
    table_env.execute_sql("""
        CREATE TABLE dproducts (
            id BIGINT,
            name STRING,
            description STRING,
            PRIMARY KEY (id) NOT ENFORCED
        ) WITH (
            'connector' = 'jdbc',
            'url' = 'jdbc:mysql://172.16.0.13:3306/demo?useSSL=false',
            'table-name' = 'dproducts',
            'username' = 'root',
            'password' = '123456'
        )
    """)
    # 'driver' = 'com.mysql.cj.jdbc.Driver',
    # 'driver' = 'com.mysql.jdbc.Driver',
    
    print('step 04')
    table_env.execute_sql("INSERT INTO dproducts SELECT  p.id, p.name, p.description FROM products AS p").wait()
    print('step 06')
    
    
    • 测试前清理目标表数据:
    docker exec -it jobmanager python demo1.py
    mycli -h172.16.0.13 -uroot -p123456
    truncate dproducts
    
    • 执行程序
    # 拷贝
    docker cp ./demo1.py jobmanager:/opt/flink  
    # 执行程序,执行后,该程序不会退出,执行该语句后,可登录mysql查看dproducts表结果
    docker exec -it jobmanager python demo1.py
    

    理论python程序本地开发电脑直接执行(提前装好相关依赖)

    其他方式提交

    # 在flink服务器上本地提交
     ./bin/flink run --python demo.py
    
    # 用文件夹提交pyflink job,并且使用--pyModule 指定入口模块 :
    ./bin/flink run \
          --pyModule table.word_count \
          --pyFiles examples/python/table
    
    # 提交 PyFlink到一个具体的JobManager :
    ./bin/flink run \
          --jobmanager <jobmanagerHost>:8081 \
          --python examples/python/table/word_count.py
    

    更多提交方法,请查看官网

    相关文章

      网友评论

          本文标题:(三)flink docker开发环境搭建

          本文链接:https://www.haomeiwen.com/subject/ywkkxdtx.html