美文网首页
PyFlink集群部署

PyFlink集群部署

作者: Coder小咚 | 来源:发表于2022-11-07 16:06 被阅读0次

    前言

    前面讲到flink实时计算用户画像的功能,flink需要调用python写的算法库,之前的两种方案都存在各种各样的问题,最后决定采用pyflink来开发。

    一门新技术的引入必定面临各种坑,期间很多命令都是照着官网写的,但是还是报错。没办法,只能花时间一个一个的解决,本篇就总结下这两天部署pyflink过程中遇到的问题。

    部署脚本

    run.sh

    #!/usr/bin/env bash
    
    unset PYTHONPATH
    
    export PYTHONPATH="/home/work/python3.7.1"
    export FLINK_HOME="/home/work/flink-1.15.3"
    
    if [ ! -f realtime_calc_label.zip ];then
        zip -q -r ./realtime_calc_label.zip ./*
    fi
    
    # 不加这个alias命令会失效
    shopt -s expand_aliases
    
    alias python=/home/work/python3.7.1/bin/python3
    
    /home/work/flink-1.15.3/bin/flink run \
    --detached \
    -t yarn-per-job \
    -Dyarn.application.name=flink_user_profile \
    -Djobmanager.memory.process.size=2048m \
    -Dtaskmanager.memory.process.size=3096m \
    -Dtaskmanager.numberOfTaskSlots=2 \
    -Dyarn.application.queue=t0 \
    -Dpython.systemenv.enabled=false \
    -p 24 \
    -pyarch /home/work/python3.7.1/python3.7.1.zip,./realtime_calc_label.zip \
    -pyclientexec ./python3.7.1.zip/bin/python3 \
    -pyexec ./python3.7.1.zip/bin/python3 \
    -pyfs ./realtime_calc_label.zip \
    --python ./label_calc_stream.py \
    --jarfile jars/flink-sql-connector-kafka-1.15.2.jar
    
    • 与run.sh同级目录下有整个项目的打包文件realtime_calc_label.zip,pyflink程序入口label_calc_stream.py,依赖Java的jar包文件夹jars
    • -pyclientexec、-pyexec采用的都是相对路径,因为flink作业提交的时候会把需要的资源都拷贝到临时目录下
    • 由于yarn集群运行着很多pyspark任务,这次由要运行pyflink任务,所以不可避免需要支持多种版本的python环境,所以我们最好打包上自己的python环境和作业一起提交,这里需要执行cd /home/work/python3.7.1 & zip -q -r ./python3.7.1.zip ./* 把python环境打包
    • 由于PYTHONPATH下python2.x的包跟自己python3.7.1.zip中有个包是冲突的,目前发布的flink版本1.15.2对于这种情况的处理存在bug,这里需要把flink从1.15.2升级到1.15.3,与flink社区沟通后得知最快2周之后才能发布1.15.3版本,这里手动从github中下载flink的release-1.15 build源码进行编译安装

    编译python

    前面讲到提交pyflink代码,最好携带上自己需要的python依赖,为了尽量使包体小一些,这里自己打包一份纯洁版的python环境,命令如下:

    cd /home/work
    
    mkdir python3.9.0
    #注意如果在私有云环境下,不能直接联网下载,可手动上官网下载,再通过工具上传
    wget https://www.python.org/ftp/python/3.9.0/Python-3.9.0.tgz
    
    cd Python-3.9.0
    
    ./configure --prefix=/home/work/python3.9.0  --with-ssl
    
    make
    
    make install
    
    • 下载依赖
    # 更新pip版本
    /home/work/python3.7.1/bin/python3 -m pip install --upgrade --force pip
    
    /home/work/python3.7.1/bin/python3 -m pip install apache-flink==1.15.2
    # 业务代码中用到,读取配置文件的
    /home/work/python3.7.1/bin/python3 -m pip install configparser==5.0.0
    
    /home/work/python3.7.1/bin/python3 -m pip install protobuf
    
    • 打包依赖
    # 在python目录下执行命令打zip包
    zip -q -r ./python3.7.1.zip ./*
    
    # 也可以打成gz包
    tar -czvf ./python3.7.1.tgz ./*
    

    编译Flink

    • mac下shell环境配置。编辑~/.zshrc文件
    JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_241.jdk/Contents/Home
    SCALA_HOME=/Users/david.dong/soft/scala-2.12.13
    MAVEN_HOME=/Users/david.dong/soft/apache-maven-3.6.3
    
    export PATH=$JAVA_HOME/bin:$SCALA_HOME/bin:$MAVEN_HOME/bin:$PATH:.
    
    • 查看mac当前环境下所使用的java环境
    # 命令行输入: /usr/libexec/java_home
    david.dong@MacBook-Pro ~ % /usr/libexec/java_home
    /Users/david.dong/Library/Java/JavaVirtualMachines/corretto-11.0.14.1/Contents/Home
    
    # 会发现跟采用:java -version命令看到的不一样
    david.dong@MacBook-Pro ~ % java -version
    java version "1.8.0_241"
    Java(TM) SE Runtime Environment (build 1.8.0_241-b07)
    Java HotSpot(TM) 64-Bit Server VM (build 25.241-b07, mixed mode)
    
    • 编译flink源码
    # 下载源码
    git clone https://github.com/apache/flink.git
    # 由于已发布的版本1.15.2有bug,但是在1.15.3会修复,代码已经修改,所以切换到release-1.15分支
    git checkout -b release-1.15
    # 在源码中注释掉test相关的module,然后执行编译命令,编译后的安装包存放在flink/flink-dist/target/目录下
    mvn clean install -DskipTests -Drat.skip=true
    

    PyFlink代码

    • 项目结构


      project structure
    • 主逻辑

    import json
    import sys
    from pyflink.common import Types
    from pyflink.common.serialization import SimpleStringSchema
    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer
    from udf_lib.calc_label_func import CalcLabelFunc
    
    sys.path.append(".")
    
    
    def label_calc(profile):
        env = StreamExecutionEnvironment.get_execution_environment()
        # the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues
        # env.add_jars(
        #     "file:///Users/dengpengfei/PycharmProjects/realtime_calc_label/src/jars/flink-sql-connector-kafka-1.15.2.jar")
    
        job_conf = {'env': profile}
        env.get_config().set_global_job_parameters(job_conf)
    
        kafka_consumer = FlinkKafkaConsumer(
            topics='change_user_preview',
            deserialization_schema=SimpleStringSchema(),
            properties={'bootstrap.servers': '127.0.0.1:9092',
                        'group.id': 'offline',
                        'auto.offset.reset': 'earliest'})
    
        source_ds = env.add_source(kafka_consumer).name('source_kafka')
    
        map_df = source_ds.map(lambda row: json.loads(row))
    
        process_df = map_df.key_by(lambda row: row['unique_id']).flat_map(CalcLabelFunc(), output_type=Types.STRING()).name('flat_map_calc')
    
        kafka_producer = FlinkKafkaProducer(
            topic='change_label_preview',
            serialization_schema=SimpleStringSchema(),
            producer_config={'bootstrap.servers': '127.0.0.1:9092'})
    
        process_df.add_sink(kafka_producer).name('sink_kafka')
    
        env.execute('label_calc')
    
    
    if __name__ == '__main__':
        run_env = 'preview'
        if len(sys.argv) > 1:
            run_env = sys.argv[1]
        label_calc(run_env)
    
    • udf函数
    import json
    import sys
    import time
    from pyflink.datastream import RuntimeContext, FlatMapFunction
    from pymongo import MongoClient
    from calc_lib.online_calc import OnlineCalc
    from db_utils.mysql_util import get_sql_conn, get_dict_data_sql
    
    sys.path.append(".")
    
    
    class CalcLabelFunc(FlatMapFunction):
    
        def __init__(self):
            self.env = None
            self.mongo = None
            self.mysql_conf = None
            self.online_calc = None
            self.cur_time_stamp = None
    
        def open(self, runtime_context: RuntimeContext):
    
            self.mongo = MongoClient('mongodb://localhost:27017')
            self.mysql_conf = {
                'host': '127.0.0.1',
                'username': 'root',
                'password': '123456',
                'db': 'user_profile'
            }
            self.cur_time_stamp = 0
    
        def flat_map(self, value):
            # update conf
            if time.time() - self.cur_time_stamp > 60 * 3:
                self.cur_time_stamp = time.time()
                self.update_conf()
    
            unique_id = value['unique_id']
            entity_type = value['entity_type']
            version = value['version']
    
            pp_db = 'pp_{}_{}_{}'.format(entity_type, version, self.env)
            pp_doc = self.mongo[pp_db]['pp_table'].find_one({'unique_id': unique_id})
    
            profile_db = 'profile_{}_{}_{}'.format(entity_type, version, self.env)
            profile_doc = self.mongo[profile_db]['profile_table'].find_one({'unique_id': unique_id})
    
            if pp_doc:
                if not profile_doc:
                    profile_doc = {}
                profile_new = self.online_calc.calc(pp_doc, profile_doc)
                self.mongo[profile_db]['profile_table'].replace_one({'unique_id': unique_id}, profile_new, True)
                profile_new['entity_type'] = entity_type
                return [json.dumps(profile_new)]
            else:
                return []
    
        def close(self):
            self.mongo.close()
    
        def update_conf(self):
            con, cursor = get_sql_conn(self.mysql_conf)
            strategy_data = get_dict_data_sql(cursor, 'SELECT * FROM calc_strategy')
            relation_data = get_dict_data_sql(cursor, 'SELECT * FROM table_relation')
            con.close()
    
            self.online_calc = OnlineCalc(strategy_data, relation_data)
    
            print('update config ...')
    
    • mysql 工具类
    import pymysql
    
    
    def get_sql_conn(conf):
        """
        获取数据库连接
        """
        conn = pymysql.connect(host=conf['host'], user=conf['username'], password=conf['password'], db=conf['db'])
        cursor = conn.cursor()
        return conn, cursor
    
    
    def get_index_dict(cursor):
        """
        获取数据库对应表中的字段名
        """
        index_dict = dict()
        index = 0
        for desc in cursor.description:
            index_dict[desc[0]] = index
            index = index + 1
        return index_dict
    
    
    def get_dict_data_sql(cursor, sql):
        """
        运行sql语句,获取结果,并根据表中字段名,转化成dict格式(默认是tuple格式)
        """
        cursor.execute(sql)
        data = cursor.fetchall()
        index_dict = get_index_dict(cursor)
        res = []
        for datai in data:
            resi = dict()
            for indexi in index_dict:
                resi[indexi] = datai[index_dict[indexi]]
            res.append(resi)
        return res
    

    本次部署主要遇到的问题是使用的flink的新版本,提交命令发生了变化。然后也发现了一个flink1.15.2的bug,解决方式是升级版本,添加配置 python.systemenv.enabled=false

    https://issues.apache.org/jira/browse/FLINK-29479

    https://github.com/apache/flink/pull/21110

    相关文章

      网友评论

          本文标题:PyFlink集群部署

          本文链接:https://www.haomeiwen.com/subject/qpdzzrtx.html