我们长久以来一直使用Spark作为离线和近实时计算框架,几乎承担了主要业务中所有的计算任务。最近Flink逐渐兴起,我们通过查阅资料和参加Meetup,了解到Flink在实时计算方面确实比Spark有优势。我们正准备涉足实时业务,实时数仓、实时推荐这些东西总是要有的,快速上手Flink势在必行。
Flink的官方文档比较详尽,并且设计理念与Spark多有相通,理解成本较低。Flink有多种集群部署方式(Local/Standalone/YARN/K8s/Mesos等等),考虑到已经有现成的YARN和ZooKeeper集群,所以直接配置Flink on YARN。
下图示出Flink on YARN的基本原理,可见与Spark on YARN非常相似。
首先设定Hadoop路径的环境变量,Flink需要用它来获取HDFS、YARN的配置信息。
~ vim /etc/profile
export HADOOP_CONF_DIR=/opt/cloudera/parcels/CDH/lib/hadoop/etc/hadoop
然后编辑flink-conf.yaml,其中包含了Flink的基础配置。
- 资源参数
# JobManager堆内存
jobmanager.heap.mb: 1024
# TaskManager堆内存
taskmanager.heap.mb: 2048
# 每个TaskManager上task slot数目
taskmanager.numberOfTaskSlots: 4
# 默认并行度
parallelism.default: 12
以上四项只是给出默认配置,实际执行作业时都可以用对应的命令行参数(-jm、-tm、-s、-p)修改。
- 高可用配置
# 开启基于ZK的高可用
high-availability: zookeeper
# ZK集群(即所谓Quorum)地址
high-availability.zookeeper.quorum: ha1:2181,ha2:2181,ha3:2181
# Flink在ZK存储中的根节点
high-availability.zookeeper.path.root: /flink
# JobManager元数据的持久化位置,必须是可靠存储
high-availability.storageDir: hdfs://mycluster/flink/ha/
# 程序启动时的最大尝试次数
# 应当与YARN ApplicationMaster的最大尝试次数(yarn.resourcemanager.am.max-attempts)相同
yarn.application-attempts: 4
注意YARN ApplicationMaster的最大尝试次数(yarn.resourcemanager.am.max-attempts)默认值仅为2,使得作业容错率很低,因此预先把它修改为4,或者更大些。
- StateBackend默认配置
# StateBackend类型
# 可选jobmanager(JM本身)/filesystem(外部文件系统)/rocksdb(自带的RocksDB数据库)
state.backend: filesystem
# 检查点目录
state.checkpoints.dir: hdfs://mycluster/flink-checkpoints
# 保存点目录(比检查点更重量级,一般手动操作,用于重启恢复)
state.savepoints.dir: hdfs://mycluster/flink-savepoints
选择filesystem或者rocksdb的话,可靠性比较高。对于轻量级的、逻辑不复杂的任务,可以选择jobmanager。程序中也能通过StreamExecutionEnvironment.setStateBackend()方法来指定。
- 额外的JVM参数
env.java.opts: -server -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError
这个类似于spark-submit中的extraJavaOptions。
Flink on YARN有两种执行模式。
- Session模式:通过yarn-session.sh创建一个持续运行的Flink Session,其中已经分配好了JobManager、TaskManager及所需资源,提交作业时,相当于提交给Session。
- Single job模式:通过flink run脚本每次提交单个作业,设定JobManager为yarn-cluster,由YARN单独分配资源,类似于spark-submit的yarn-cluster部署模式。生产环境一般用这种模式,下面是示例脚本。
/opt/flink-1.5.1/bin/flink run \
# 分离模式运行 (-d)
--detached \
# 指定JobManager (-m)
--jobmanager yarn-cluster \
# YARN Application的名称 (-ynm)
--yarnname "test-calendar-rt" \
# 分配的YARN Container数量 (-yn)
--yarncontainer 3 \
# JobManager内存 (-yjm)
--yarnjobManagerMemory 1024 \
# TaskManager内存 (-ytm)
--yarntaskManagerMemory 2048 \
# 每个TaskManager上task slot数目 (-ys)
--yarnslots 3 \
# 并行度 (-p)
--parallelism 9 \
# 用户程序的入口类 (-c)
--class com.xyz.bigdata.rt.flink.CalendarRecordETLTest \
/var/projects/rt/flink-test-0.1-jar-with-dependencies.jar
本来想在这篇文章里把今天搞的一个Demo(RocketMQ→Flink Streaming→HBase)也贴出来的,但总感觉与集群配置不相关。这套逻辑不久之后就要投入生产环境,到时候再来详细写自定义Source/Sink、检查点配置、事件时间、水印等话题吧。
网友评论