前言
前面讲到flink实时计算用户画像的功能,flink需要调用python写的算法库,之前的两种方案都存在各种各样的问题,最后决定采用pyflink来开发。
一门新技术的引入必定面临各种坑,期间很多命令都是照着官网写的,但是还是报错。没办法,只能花时间一个一个的解决,本篇就总结下这两天部署pyflink过程中遇到的问题。
部署脚本
run.sh
#!/usr/bin/env bash
unset PYTHONPATH
export PYTHONPATH="/home/work/python3.7.1"
export FLINK_HOME="/home/work/flink-1.15.3"
if [ ! -f realtime_calc_label.zip ];then
zip -q -r ./realtime_calc_label.zip ./*
fi
# 不加这个alias命令会失效
shopt -s expand_aliases
alias python=/home/work/python3.7.1/bin/python3
/home/work/flink-1.15.3/bin/flink run \
--detached \
-t yarn-per-job \
-Dyarn.application.name=flink_user_profile \
-Djobmanager.memory.process.size=2048m \
-Dtaskmanager.memory.process.size=3096m \
-Dtaskmanager.numberOfTaskSlots=2 \
-Dyarn.application.queue=t0 \
-Dpython.systemenv.enabled=false \
-p 24 \
-pyarch /home/work/python3.7.1/python3.7.1.zip,./realtime_calc_label.zip \
-pyclientexec ./python3.7.1.zip/bin/python3 \
-pyexec ./python3.7.1.zip/bin/python3 \
-pyfs ./realtime_calc_label.zip \
--python ./label_calc_stream.py \
--jarfile jars/flink-sql-connector-kafka-1.15.2.jar
- 与run.sh同级目录下有整个项目的打包文件realtime_calc_label.zip,pyflink程序入口label_calc_stream.py,依赖Java的jar包文件夹jars
- -pyclientexec、-pyexec采用的都是相对路径,因为flink作业提交的时候会把需要的资源都拷贝到临时目录下
- 由于yarn集群运行着很多pyspark任务,这次由要运行pyflink任务,所以不可避免需要支持多种版本的python环境,所以我们最好打包上自己的python环境和作业一起提交,这里需要执行cd /home/work/python3.7.1 & zip -q -r ./python3.7.1.zip ./* 把python环境打包
- 由于PYTHONPATH下python2.x的包跟自己python3.7.1.zip中有个包是冲突的,目前发布的flink版本1.15.2对于这种情况的处理存在bug,这里需要把flink从1.15.2升级到1.15.3,与flink社区沟通后得知最快2周之后才能发布1.15.3版本,这里手动从github中下载flink的release-1.15 build源码进行编译安装
编译python
前面讲到提交pyflink代码,最好携带上自己需要的python依赖,为了尽量使包体小一些,这里自己打包一份纯洁版的python环境,命令如下:
cd /home/work
mkdir python3.9.0
#注意如果在私有云环境下,不能直接联网下载,可手动上官网下载,再通过工具上传
wget https://www.python.org/ftp/python/3.9.0/Python-3.9.0.tgz
cd Python-3.9.0
./configure --prefix=/home/work/python3.9.0 --with-ssl
make
make install
- 下载依赖
# 更新pip版本
/home/work/python3.7.1/bin/python3 -m pip install --upgrade --force pip
/home/work/python3.7.1/bin/python3 -m pip install apache-flink==1.15.2
# 业务代码中用到,读取配置文件的
/home/work/python3.7.1/bin/python3 -m pip install configparser==5.0.0
/home/work/python3.7.1/bin/python3 -m pip install protobuf
- 打包依赖
# 在python目录下执行命令打zip包
zip -q -r ./python3.7.1.zip ./*
# 也可以打成gz包
tar -czvf ./python3.7.1.tgz ./*
编译Flink
- mac下shell环境配置。编辑~/.zshrc文件
JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_241.jdk/Contents/Home
SCALA_HOME=/Users/david.dong/soft/scala-2.12.13
MAVEN_HOME=/Users/david.dong/soft/apache-maven-3.6.3
export PATH=$JAVA_HOME/bin:$SCALA_HOME/bin:$MAVEN_HOME/bin:$PATH:.
- 查看mac当前环境下所使用的java环境
# 命令行输入: /usr/libexec/java_home
david.dong@MacBook-Pro ~ % /usr/libexec/java_home
/Users/david.dong/Library/Java/JavaVirtualMachines/corretto-11.0.14.1/Contents/Home
# 会发现跟采用:java -version命令看到的不一样
david.dong@MacBook-Pro ~ % java -version
java version "1.8.0_241"
Java(TM) SE Runtime Environment (build 1.8.0_241-b07)
Java HotSpot(TM) 64-Bit Server VM (build 25.241-b07, mixed mode)
- 编译flink源码
# 下载源码
git clone https://github.com/apache/flink.git
# 由于已发布的版本1.15.2有bug,但是在1.15.3会修复,代码已经修改,所以切换到release-1.15分支
git checkout -b release-1.15
# 在源码中注释掉test相关的module,然后执行编译命令,编译后的安装包存放在flink/flink-dist/target/目录下
mvn clean install -DskipTests -Drat.skip=true
PyFlink代码
-
项目结构
project structure -
主逻辑
import json
import sys
from pyflink.common import Types
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer
from udf_lib.calc_label_func import CalcLabelFunc
sys.path.append(".")
def label_calc(profile):
env = StreamExecutionEnvironment.get_execution_environment()
# the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues
# env.add_jars(
# "file:///Users/dengpengfei/PycharmProjects/realtime_calc_label/src/jars/flink-sql-connector-kafka-1.15.2.jar")
job_conf = {'env': profile}
env.get_config().set_global_job_parameters(job_conf)
kafka_consumer = FlinkKafkaConsumer(
topics='change_user_preview',
deserialization_schema=SimpleStringSchema(),
properties={'bootstrap.servers': '127.0.0.1:9092',
'group.id': 'offline',
'auto.offset.reset': 'earliest'})
source_ds = env.add_source(kafka_consumer).name('source_kafka')
map_df = source_ds.map(lambda row: json.loads(row))
process_df = map_df.key_by(lambda row: row['unique_id']).flat_map(CalcLabelFunc(), output_type=Types.STRING()).name('flat_map_calc')
kafka_producer = FlinkKafkaProducer(
topic='change_label_preview',
serialization_schema=SimpleStringSchema(),
producer_config={'bootstrap.servers': '127.0.0.1:9092'})
process_df.add_sink(kafka_producer).name('sink_kafka')
env.execute('label_calc')
if __name__ == '__main__':
run_env = 'preview'
if len(sys.argv) > 1:
run_env = sys.argv[1]
label_calc(run_env)
- udf函数
import json
import sys
import time
from pyflink.datastream import RuntimeContext, FlatMapFunction
from pymongo import MongoClient
from calc_lib.online_calc import OnlineCalc
from db_utils.mysql_util import get_sql_conn, get_dict_data_sql
sys.path.append(".")
class CalcLabelFunc(FlatMapFunction):
def __init__(self):
self.env = None
self.mongo = None
self.mysql_conf = None
self.online_calc = None
self.cur_time_stamp = None
def open(self, runtime_context: RuntimeContext):
self.mongo = MongoClient('mongodb://localhost:27017')
self.mysql_conf = {
'host': '127.0.0.1',
'username': 'root',
'password': '123456',
'db': 'user_profile'
}
self.cur_time_stamp = 0
def flat_map(self, value):
# update conf
if time.time() - self.cur_time_stamp > 60 * 3:
self.cur_time_stamp = time.time()
self.update_conf()
unique_id = value['unique_id']
entity_type = value['entity_type']
version = value['version']
pp_db = 'pp_{}_{}_{}'.format(entity_type, version, self.env)
pp_doc = self.mongo[pp_db]['pp_table'].find_one({'unique_id': unique_id})
profile_db = 'profile_{}_{}_{}'.format(entity_type, version, self.env)
profile_doc = self.mongo[profile_db]['profile_table'].find_one({'unique_id': unique_id})
if pp_doc:
if not profile_doc:
profile_doc = {}
profile_new = self.online_calc.calc(pp_doc, profile_doc)
self.mongo[profile_db]['profile_table'].replace_one({'unique_id': unique_id}, profile_new, True)
profile_new['entity_type'] = entity_type
return [json.dumps(profile_new)]
else:
return []
def close(self):
self.mongo.close()
def update_conf(self):
con, cursor = get_sql_conn(self.mysql_conf)
strategy_data = get_dict_data_sql(cursor, 'SELECT * FROM calc_strategy')
relation_data = get_dict_data_sql(cursor, 'SELECT * FROM table_relation')
con.close()
self.online_calc = OnlineCalc(strategy_data, relation_data)
print('update config ...')
- mysql 工具类
import pymysql
def get_sql_conn(conf):
"""
获取数据库连接
"""
conn = pymysql.connect(host=conf['host'], user=conf['username'], password=conf['password'], db=conf['db'])
cursor = conn.cursor()
return conn, cursor
def get_index_dict(cursor):
"""
获取数据库对应表中的字段名
"""
index_dict = dict()
index = 0
for desc in cursor.description:
index_dict[desc[0]] = index
index = index + 1
return index_dict
def get_dict_data_sql(cursor, sql):
"""
运行sql语句,获取结果,并根据表中字段名,转化成dict格式(默认是tuple格式)
"""
cursor.execute(sql)
data = cursor.fetchall()
index_dict = get_index_dict(cursor)
res = []
for datai in data:
resi = dict()
for indexi in index_dict:
resi[indexi] = datai[index_dict[indexi]]
res.append(resi)
return res
结
本次部署主要遇到的问题是使用的flink的新版本,提交命令发生了变化。然后也发现了一个flink1.15.2的bug,解决方式是升级版本,添加配置 python.systemenv.enabled=false
网友评论