简介
本文详细讲解怎么快速搭建一个flink的开发环境。笔者做以下假设
- 读者已经安好docker,并对docker和docker-compose有一定了解
- 读者熟悉mysql
如果1、2不满足,还请读者自行补脑
本文在docker内安装了一个flink jobmanager和一个flink taskmanager,再安装了一个mysql,然后使用flink sql创建任务,从mysql数据库内的表1同步数据到表2
详细步骤
环境准备
- 新建一个test文件夹,在文件夹内新建一个docker-compose-yaml文件
version: "2.2"
services:
jobmanager:
container_name: jobmanager
# image: apache/flink:1.14.6
image: qiujiahong/flink:1.14.6_py
networks:
proxy:
ipv4_address: 172.16.0.11
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager:
container_name: taskmanager
# image: apache/flink:1.14.6
image: qiujiahong/flink:1.14.6_py
networks:
proxy:
ipv4_address: 172.16.0.12
depends_on:
- jobmanager
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
# sql-client:
# image: apache/flink:1.14.6
# command: bin/sql-client.sh
# depends_on:
# - jobmanager
# environment:
# - |
# FLINK_PROPERTIES=
# jobmanager.rpc.address: jobmanager
# rest.address: jobmanager
mysql:
image: qiujiaihong/example-mysql:1.1
ports:
- "3306:3306"
networks:
proxy:
ipv4_address: 172.16.0.13
environment:
- MYSQL_ROOT_PASSWORD=123456
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
networks:
proxy:
ipam:
config:
- subnet: 172.16.0.0/24
- 在tesk文件夹内,打开命令行窗口,启动容器
# 启动
docker-compose up
# 关闭
# docker-compose down
- 准备mysql 数据
docker exec -it jobmanager bash
mycli -h172.16.0.13 -uroot -p123456
create database demo;
use demo;
# 创建源表
CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512));
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"),(default,"car battery","12V car battery"),(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),(default,"hammer","12oz carpenter's hammer"),(default,"hammer","14oz carpenter's hammer"),(default,"hammer","16oz carpenter's hammer"),(default,"rocks","box of assorted rocks"),(default,"jacket","water resistent black wind breaker"),(default,"spare tire","24 inch spare tire");
# 创建目标表
CREATE TABLE dproducts (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512));
使用命令行测试
- 使用flink-sql.sh,创建flink关联表,并建立flinktask。
docker exec -it jobmanager bash ./bin/sql-client.sh
SET execution.checkpointing.interval = 3s;
-- 创建源表
CREATE TABLE products (
id INT,
name STRING,
description STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '172.16.0.13',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'demo',
'table-name' = 'products'
);
-- 创建目标表
CREATE TABLE dproducts (
id BIGINT,
name STRING,
description STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://172.16.0.13:3306/demo?useSSL=false',
'table-name' = 'dproducts',
'username' = 'root',
'password' = '123456'
);
# 执行该语句后,可登录mysql查看dproducts表结果
INSERT INTO dproducts SELECT p.id, p.name, p.description FROM products AS p;
使用python 本地调试
- 从mysql 表1取数,sink打印出来,创建程序文件
demo.py
from pyflink.table import EnvironmentSettings, TableEnvironment
print('step 01')
# 1. create a TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
print('step 02')
# table_env.get_config().get_configuration().set_string("pipeline.jars", "file:/opt/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar;file:/opt/flink/lib/mysql-connector-java-8.0.30.jar")
print('step 03')
# 2. create source Table=
table_env.execute_sql("""
CREATE TABLE products (
id INT,
name STRING,
description STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '172.16.0.13',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'demo',
'table-name' = 'products'
)
""")
# 3. create sink Table
table_env.execute_sql("""
CREATE TABLE dproducts (
id BIGINT,
name STRING,
description STRING
) WITH (
'connector' = 'print'
)
""")
print('step 04')
table_env.execute_sql("INSERT INTO dproducts SELECT p.id, p.name, p.description FROM products AS p").wait()
print('step 06')
- 执行测试程序
# 拷贝成程序到容器内
docker cp ./demo.py jobmanager:/opt/flink
# 执行,可看到原表的数据在命令行直接被打印了出来
docker exec -it jobmanager python demo.py
理论python程序本地开发电脑直接执行(提前装好相关依赖)
- 从mysql表1 取数,sink到mysql表2内,创建程序文件
demo1.py
from pyflink.table import EnvironmentSettings, TableEnvironment
print('step 01')
# 1. create a TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
print('step 02')
table_env.get_config().get_configuration().set_string("pipeline.jars", "file:/opt/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar;file:/opt/flink/lib/mysql-connector-java-8.0.30.jar")
print('step 03')
# 2. create source Table
table_env.execute_sql("""
CREATE TABLE products (
id INT,
name STRING,
description STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '172.16.0.13',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'demo',
'table-name' = 'products'
)
""")
# 3. create sink table
table_env.execute_sql("""
CREATE TABLE dproducts (
id BIGINT,
name STRING,
description STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://172.16.0.13:3306/demo?useSSL=false',
'table-name' = 'dproducts',
'username' = 'root',
'password' = '123456'
)
""")
# 'driver' = 'com.mysql.cj.jdbc.Driver',
# 'driver' = 'com.mysql.jdbc.Driver',
print('step 04')
table_env.execute_sql("INSERT INTO dproducts SELECT p.id, p.name, p.description FROM products AS p").wait()
print('step 06')
- 测试前清理目标表数据:
docker exec -it jobmanager python demo1.py
mycli -h172.16.0.13 -uroot -p123456
truncate dproducts
- 执行程序
# 拷贝
docker cp ./demo1.py jobmanager:/opt/flink
# 执行程序,执行后,该程序不会退出,执行该语句后,可登录mysql查看dproducts表结果
docker exec -it jobmanager python demo1.py
理论python程序本地开发电脑直接执行(提前装好相关依赖)
其他方式提交
# 在flink服务器上本地提交
./bin/flink run --python demo.py
# 用文件夹提交pyflink job,并且使用--pyModule 指定入口模块 :
./bin/flink run \
--pyModule table.word_count \
--pyFiles examples/python/table
# 提交 PyFlink到一个具体的JobManager :
./bin/flink run \
--jobmanager <jobmanagerHost>:8081 \
--python examples/python/table/word_count.py
更多提交方法,请查看官网
网友评论