docker部署 和 Pyflink环境准备
创建网络
FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager"
docker network create flink-network
启动一个 job_manager 和一个 task_manager
docker run \
--rm \
--name=jobmanager \
--network flink-network \
--publish 8099:8081 \
--env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
flink:latest jobmanager
docker run \
-d \
--name=taskmanager \
--network flink-network \
--env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
flink:latest taskmanager
访问
访问 ip:8099 查看 Web UI
Pyflink环境
- jdk11
- python3.7
- 安装 pyflink
pip3 install apache-flink -i https://pypi.tuna.tsinghua.edu.cn/simple some-package
实例
pyflink代码
from pyflink.table import EnvironmentSettings, TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
df = pd.DataFrame() # 一个pandas.DataFrame
table = table_env.from_pandas(df)
query = ''' select * from %s '''
table_out = table_env.sql_query(query % (table,table))
pdf = table_out.to_pandas()
print(pdf)
本地测试
- 直接执行即可
提交到flink集群
- 下载 flink (用于执行命令行)
wget https://dlcdn.apache.org/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.12.tgz --no-check-certificate
tar -zxvf flink-1.14.3-bin-scala_2.12.tgz -C ./
- 修改配置 指定python所在位置
flink/conf/flink-conf.yaml
python.client.executable: /usr/local/python3/bin/python3
- 提交任务
flink run --jobmanager ip:8099 -Dexecution.runtime-mode=BATCH --python flink_demo.py
- 在 WebUI 查看任务
ip:8099
网友评论