前期介绍到,我们项目初期Flink部署使用的是standalone模式。但是此模式缺点很多:
1、资源利用率低
taskmanager、slot实现都是规划创建好。如果不用资源也一直占用着。
2、无法做到资源隔离
只是简单的对内存资源进行简单划分
3、job调度优先级无法保证
我们本打算采用Flink on k8s的方案,但是考虑到k8s虚拟了一层网络,性能肯定有损耗。而且对运维团队要求非常高。
最终,我们采用社区比较成熟的方案flink on yarn。成熟是别人的成熟,坑还是我们自己的坑。
根据自己的理解,画了个简单的图:
session模式下,需要预先执行创建session的命令,具体如下:
/data/flink-1.12.0/bin/yarn-session.sh -tm 3456 -nm cuiot-1 -d
启动一个名称为cuiot-1的session,其中每个tm的内存为3456M,具体的参数其他参数,会读取/data/flink-1.12.0/conf/flink-conf.yaml
的参数配置。
yarn-session命令行的其它参数,可以通过/data/flink-1.12.0/bin/yarn-session.sh --help命令来查看
flink-config.yaml配置文件中,需要端口配置为:rest.port注释掉。将rest.bind-port配置放开。
flink里的session对应yarn中的application,session启动的时候,会将flink的依赖包上传到hdfs。详情如下:
yarn hdfs装完之后。可以通过如下地址访问管理控制台:
yarn:
http://node1:8099/cluster/apps/RUNNING
hdoop:
http://node1:50070/dfshealth.html#tab-overview
hdfs:
http://node1:50070/explorer.html#/
端口和地址信息是在配置文件中指定的,具体参见:
/data/hadoop-3.1.4/etc/hadoop
core-site.xml、hdfs-site.xml、yarn-site.xml。
重点介绍下Session模式,也是我们项目中的模式。
我们flink的版本为1.12.0,之前使用的是1.11.2。由于数据转发我们使用的table sql,且kafka的source和sink处理部分,无法开起并行度。只能升级到1.12.0版本。
flink yarn session启动好了,相应的yarn application也就启动好了。一个application对应一个jobmanager。可以将yarn里面的application理解为jobmanager的代理。flink yarn session模式下,我们提交任务的时候,都是与yarn去交互。相应的flink已经做了集成。可以通过执行如下命令来启动一个job:
/data/flink-1.11.2/bin/flink run -C "http://node1:50070/webhdfs/v1/MyPattern185.jar?op=OPEN&namenoderpcaddress=ns&offset=0" -yid application_1608555068158_0002 /data/rule/cuiot-rule-cep-0.8.0.jar -prkId 185 -d
通过-yid指定application id,通过-C指定依赖包。需要注意的是-C参数里面的jar依赖,需要保证所有节点都能访问到。我们使用的on yarn模式,自然的将依赖jar上传到hdfs中。只有提交任务的时候,才会动态的创建taskmanager,每个taskmanager的相关配置都是读取的flink配置文件的属性,当然你也可以通过命令行额外指定。job执行完成后,taskmanager会自动释放的。
网友评论