美文网首页
007-docker部署Flink测试PyFlink提交任务

007-docker部署Flink测试PyFlink提交任务

作者: 7ming | 来源:发表于2022-08-04 17:59 被阅读0次

docker部署 和 Pyflink环境准备

创建网络
FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager"
docker network create flink-network
启动一个 job_manager 和一个 task_manager
docker run \
    --rm \
    --name=jobmanager \
    --network flink-network \
    --publish 8099:8081 \
    --env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
    flink:latest jobmanager
    
docker run \
    -d \
    --name=taskmanager \
    --network flink-network \
    --env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
    flink:latest taskmanager
访问

访问 ip:8099 查看 Web UI

Pyflink环境
  • jdk11
  • python3.7
  • 安装 pyflink
pip3 install apache-flink -i https://pypi.tuna.tsinghua.edu.cn/simple some-package

实例

pyflink代码
from pyflink.table import EnvironmentSettings, TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

df = pd.DataFrame() # 一个pandas.DataFrame
table = table_env.from_pandas(df)


query = ''' select * from %s '''
table_out = table_env.sql_query(query % (table,table))
pdf = table_out.to_pandas()
print(pdf)
本地测试
  • 直接执行即可

提交到flink集群

  • 下载 flink (用于执行命令行)
wget https://dlcdn.apache.org/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.12.tgz --no-check-certificate
tar -zxvf flink-1.14.3-bin-scala_2.12.tgz -C ./
  • 修改配置 指定python所在位置
    flink/conf/flink-conf.yaml
python.client.executable: /usr/local/python3/bin/python3
  • 提交任务
flink run --jobmanager ip:8099 -Dexecution.runtime-mode=BATCH --python flink_demo.py
  • 在 WebUI 查看任务
    ip:8099

相关文章

网友评论

      本文标题:007-docker部署Flink测试PyFlink提交任务

      本文链接:https://www.haomeiwen.com/subject/pfwpwrtx.html