Flink 使用之配置与调优

作者: AlienPaul | 来源:发表于2019-11-26 14:55 被阅读0次

    Flink 使用介绍相关文档目录

    Flink 使用介绍相关文档目录

    Flink常用参数配置

    端口地址

    jobmanger.rpc.address jm的地址。

    jobmanager.rpc.port jm的端口号。

    web.port Flink web ui的端口号。

    jobmanager.archive.fs.dir 将已完成的任务归档存储的目录。

    history.web.port 基于web的history server的端口号。

    historyserver.archive.fs.dir history server的归档目录。该配置必须包含jobmanager.archive.fs.dir配置的目录,以便history server能够读取到已完成的任务信息。

    historyserver.archive.fs.refresh-interval 刷新存档作业目录时间间隔。

    fs.hdfs.hadoopconf hadoop配置文件路径(已被废弃,建议使用HADOOP_CONF_DIR环境变量

    yarn.application-attempts job失败尝试次数,主要是指job manager的重启尝试次数。该值不应该超过yarn-site.xml中的yarn.resourcemanager.am.max-attemps的值。

    资源配置

    jobmanager.heap.mb jm的堆内存大小。不建议配的太大,1-2G足够。

    taskmanager.heap.mb tm的堆内存大小。大小视任务量而定。需要存储任务的中间值,网络缓存,用户数据等。

    taskmanager.network.memory.max 网络缓冲区最大内存大小。

    taskmanager.network.memory.min 网络缓冲区最小内存大小。

    taskmanager.network.memory.fraction 网络缓冲区使用的内存占据总JVM内存的比例。如果配置了taskmanager.network.memory.maxtaskmanager.network.memory.min,本配置项会被覆盖。

    taskmanager.memory.task.off-heap.size:任务使用的堆外内存。

    taskmanager.memory.flink.size:总flink内存。适用于standalone模式。

    taskmanager.memory.process.size:总进程内存。适用于container模式(yarn,k8s等)。

    taskmanager.memory.managed.size:managed内存。流任务可用于rockdb转台后端。批任务可以用于排序,hash表和中间结果缓存。

    taskmanager.numberOfTaskSlots slot数量。在yarn模式使用的时候会受到yarn.scheduler.maximum-allocation-vcores值的影响。此处指定的slot数量如果超过yarn的maximum-allocation-vcores,flink启动会报错。在yarn模式,flink启动的task manager个数可以参照如下计算公式:

    num_of_tm = ceil(parallelism / slot)
    

    即并行度除以slot个数,结果向上取整。

    parallelsm.default 任务默认并行度,如果任务未指定并行度,将采用此设置。

    Flink内存关系图

    参见:
    https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html#framework-memory

    Flink 运行batch job尽量配置更大的managed memory

    状态后端

    state.backend 存储和检查点的后台存储。可选值为rocksdb filesystem hdfs。

    state.backend.fs.checkpointdir 检查点数据文件和元数据的默认目录。

    state.checkpoints.dir 保存检查点目录。

    state.savepoints.dir save point的目录。

    state.checkpoints.num-retained 保留最近检查点的数量。

    state.backend.incremental 增量存储。

    Akka

    akka.ask.timeout Job Manager和Task Manager通信连接的超时时间。如果网络拥挤经常出现超时错误,可以增大该配置值。

    akka.watch.heartbeat.interval 心跳发送间隔,用来检测task manager的状态。

    akka.watch.heartbeat.pause 如果超过该时间仍未收到task manager的心跳,该task manager 会被认为已挂掉。

    Netty

    taskmanager.network.netty.server.numThreads和taskmanager.network.netty.client.numThreads:netty的客户端和服务端的线程数目,默认为taskmanager.numberOfTaskSlots

    taskmanager.network.netty.client.connectTimeoutSec:taskmanager的客户端连接超时的时间,默认为120s

    taskmanager.network.netty.sendReceiveBufferSize:netty的发送和接收的缓冲区大小

    taskmanager.network.netty.transport:netty的传输方式,默认为nio。可选nio和epoll

    Flink HA(Job Manager)的配置

    high-availability: zookeeper 使用zookeeper负责HA实现
    high-availability.zookeeper.path.root: /flink flink信息在zookeeper存储节点的名称
    high-availability.zookeeper.quorum: zk1,zk2,zk3 zookeeper集群节点的地址和端口
    high-availability.storageDir: hdfs://nameservice/flink/ha/ job manager元数据在文件系统储存的位置,zookeeper仅保存了指向该目录的指针。

    Flink metrics 监控相关配置

    下面例子中为Prometheus监控的相关配置

    metrics.reporters: prom
    metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
    metrics.reporter.prom.port: 9250-9260
    

    缓冲区超时配置

    该配置项可以平衡吞吐量和延迟。设置值的解释如下:

    • -1:直到缓冲区填满才发送,最大化吞吐量
    • 0:一有数据就立刻发送,最小化延迟
    • 其他值:在配置时间后发送,如果在配置时间内缓冲区填满,则立刻发送

    设置方法:

    env.setBufferTimeout(xxx)
    

    提交应用时参数配置

    注意:per-job模式或者application模式可以为某个作业单独指定JM和TM的资源消耗。资源的消耗情况应该以能扛住高峰时段的数据处理压力为准。可提前对集群进行压测,记录极限情况的资源使用量。

    JobManager内存

    yarn-session:-jm 2048
    yarn-cluster:-yjm 2048

    TaskManager内存

    yarn-session:-tm 2048
    yarn-cluster:-ytm 2048

    每个TaskManager 的slot个数

    yarn-session:-s 8
    yarn-cluster:-ys 8

    通过系统变量方式配置

    还可以在提交作业的时候使用-D参数配置。支持的参数如下:

    -Dyarn.application.queue=test \ 指定yarn队列
    -Djobmanager.memory.process.size=2048mb \ 指定JM的总进程大小
    -Dtaskmanager.memory.process.size=2048mb \ 指定每个TM的总进程大小
    -Dtaskmanager.numberOfTaskSlots=2 \ 指定每个TM的slot数
    

    Kafka相关调优配置

    linger.ms/batch.size 这两个配置项配合使用,可以在吞吐量和延迟中得到最佳的平衡点。batch.size是kafka producer发送数据的批量大小,当数据量达到batch size的时候,会将这批数据发送出去,避免了数据一条一条的发送,频繁建立和断开网络连接。但是如果数据量比较小,导致迟迟不能达到batch.size,为了保证延迟不会过大,kafka不能无限等待数据量达到batch.size的时候才发送。为了解决这个问题,引入了linger.ms配置项。当数据在缓存中的时间超过linger.ms时,无论缓存中数据是否达到批量大小,都会被强制发送出去。
    ack 数据源是否需要kafka得到确认。all表示需要收到所有ISR节点的确认信息,1表示只需要收到kafka leader的确认信息,0表示不需要任何确认信息。该配置项需要对数据精准性和延迟吞吐量做出权衡。

    Kafka topic分区数和Flink并行度的关系

    Flink Kafka source的并行度需要和kafka topic的分区数一致。最大化利用kafka多分区topic的并行读取能力。由于一个Kafka分区只能被一个消费者消费,因此一定要确保Flink Kafka source的并行度不要大于Kafka分区数,否则有些计算资源会空闲。如果并行度和分区数相同配置后,消费数据的速度仍然跟不上生产数据的速度,需要加大Kafka的分区数。

    同理,如果Sink端也是Kafka,sink的平行度尽量和Kafka分区数一致。

    Yarn相关调优配置

    yarn.scheduler.maximum-allocation-vcores
    yarn.scheduler.minimum-allocation-vcores

    Flink单个task manager的slot数量必须介于这两个值之间

    yarn.scheduler.maximum-allocation-mb
    yarn.scheduler.minimum-allocation-mb

    Flink的job manager 和task manager内存不得超过container最大分配内存大小。

    yarn.nodemanager.resource.cpu-vcores yarn的虚拟CPU内核数,建议设置为物理CPU核心数的2-3倍,如果设置过少,会导致CPU资源无法被充分利用,跑任务的时候CPU占用率不高。

    数据倾斜问题

    几个解决套路:

    • 尽量使用大的并行度
    • 避免使用Map作为分区字段数据类型,同样避免使用String
    • 避免使用windowAll等无法并发操作的算子
    • 使用合适的分区方法

    分区方法有如下几种:

    • shuffle:随机分区
    • rebalance:轮询
    • rescale:每个源的数据分散给下游,类似于一对多,这点和rebalance不同
    • broadcast:广播
    • 自定义分区,一个例子如下所示:
    stream.partitionCustom(new Partitioner[String] {
      override def partition(key: String, numPartitions: Int): Int = {
        key.hashCode % numPartitions
      }
    }, s => s.charAt(0).toString)
    

    这个例子取首字母作为key,使用key的hashCode和分区数取余后的值作为分区编号。

    并行度配置

    并行度一般配置为CPU核心数的2-3倍。

    并行度配置的几个层次如下所示。从上到下作用范围依次增大,但是上面的配置可以覆盖下面的配置。

    • 算子层次。算子的setParallelism方法
    • 执行环境层次。env的setParallelism方法
    • 客户端层次。提交任务时候的-p参数
    • 系统层次。flink-conf.yaml配置文件,parallelism.default配置项。

    JVM参数配置

    以增加GC日志为例,修改"conf/flink-conf.yaml配置文件的env.java.opts参数,增加:

    -Xloggc:/path/to/gc_log.log
    -XX:+PrintGCDetails 
    -XX:-OmitStackTraceInFastThrow 
    -XX:+PrintGCTimeStamps 
    -XX:+PrintGCDateStamps 
    -XX:+UseGCLogFileRotation 
    -XX:NumberOfGCLogFiles=10 
    -XX:GCLogFileSize=50M
    

    Checkpoint

    Checkpoint周期性进行。如果checkpoint操作耗时比checkpoint间隔时间还长,在上一个checkpoint未完成的时候,即便到了下一个checkpoint触发时间,新的checkpoint操作不会立即开始。只有在前一个checkpoint完成之后下一个checkpoint才能开始。这种情况下checkpoint会一直进行下去,严重影响系统性能。

    为了避免这种情况,可以指定checkpoint之间的最小时间间隔。方法如下:

    StreamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(milliseconds)
    

    注意:可以在应用中通过配置CheckpointConfig,可以允许多个checkpoint过程同步执行。

    除此之外,checkpoint还支持更多的配置:

    // 开启Checkpoint,设置间隔时间
    env.enableCheckpointing(TimeUnit.MINUTES.toMillis(10));
    // 配置 Checkpoint
    CheckpointConfig checkpointConf = env.getCheckpointConfig();
    // 启用EXACTLY_ONCE模式,使用Unaligned Checkpoint,保证数据精准一次投送,但会略微增大延迟
    // 启用AT_LEAST_ONCE模式,barrier不会对齐,投送数据可能会重复,但是延迟很低
    checkpointConf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    // 最小间隔时间,上面已介绍过
    checkpointConf.setMinPauseBetweenCheckpoints(TimeUnit.MINUTES.toMillis(10))
    // 超时时间。如果超过这个时间checkpoint操作仍未完成,checkpoint会被废弃
    checkpointConf.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));
    // 作业取消后checkpoint仍然保留(需要人工清理)
    checkpointConf.enableExternalizedCheckpoints(
    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    

    使用RocksDB backend

    RocksDB支持增量checkpoint,相比全量checkpoint而言耗时更短。

    启用增量checkpoint方法,在flink-conf.yaml中增加:

    state.backend.incremental: true
    

    也可以在代码中设置:

    RocksDBStateBackend backend = new RocksDBStateBackend(checkpointDirURI, true);
    env.setStateBackend(backend);
    

    RocksDB调优参见: https://mp.weixin.qq.com/s/YpDi3BV8Me3Ay4hzc0nPQA

    调整SSTable的block和block cache

    state.backend.rocksdb.block.blocksize
    state.backend.rocksdb.block.cache-size

    经过实践这两个参数值对checkpoint性能影响较大。

    使用全局参数

    可以通过全局参数的方式,将参数从JobManager传递给各个TaskManager。

    在JobManager中注册全局参数(ParameterTool是可序列化的):

    env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));
    

    TaskManager中,通过Rich函数中使用如下方式获取全局参数:

    ParameterTool parameterTool = (ParameterTool)getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
    

    使用分布式缓存

    上一节介绍了如何为各个TaskManager传递参数。这里的分布式缓存则用于向各个TaskManager分发文件。

    注意:AsyncFunction不支持分布式缓存,直接使用会抛出异常。

    首先,需要在JobManager注册需要分发的文件。注册的文件由JobManager发送给各个TaskManager,保存在TM运行环境的临时目录中。

    val env = ExecutionEnvironment.getExecutionEnvironment
    
    env.registerCachedFile("d:\\data\\file\\a.txt","b.txt")
    

    然后TaskManager使用的时候,通过Rich函数拉取文件:

    getRuntimeContext.getDistributedCache.getFile("b.txt")
    

    反压分析

    性能瓶颈测试方法

    测试反压可以快速的定位流处理系统的性能瓶颈所在。先在Kafka中积攒一批数据,然后在使用Flink消费,就好比水库泄洪,很容易找到下游性能薄弱的环节。

    反压的可能原因

    反压的原因可能会有:

    • 短时间的负载高峰,超过流处理系统极限
    • 下游sink负载变大,数据无法及时输出
    • GC压力过大,停顿时间太长
    • 某个算子作业过于复杂,执行耗时较长
    • 集群网络波动,上游传递给下游的网络通道受阻

    定位方式

    定位之前禁用掉OperatorChain,这样原本chain到一起的多个算子会分开,方便我们更精细的定位性能瓶颈。

    • 看页面:查看Flink Web UI中对应算子的Back Pressure页面,如果各个SubTask显示的结果为High,说明该算子存在反压情况。
    • 看监控:查看算子的inPoolUsage监控项。如果数值过高,说明存在反压。

    找到反压算子之后,我们可以使用Flame Graph火焰图,来分析每个方法调用的耗时,从而找到耗时较长的方法。

    开启火焰图的方法:

    flink-conf.yaml中配置。

    参数 默认值 含义
    rest.flamegraph.enabled false 是否开启火焰图
    rest.flamegraph.cleanup-interval 10min 统计信息的缓存清除时间
    rest.flamegraph.delay-between-samples 50 ms 构建 FlameGraph 的单个堆栈跟踪样本之间的延迟
    rest.flamegraph.num-samples 100 构建flamegraph的采样数
    rest.flamegraph.refresh-interval 1 min 火焰图刷新的时间间隔
    rest.flamegraph.stack-depth 100 创建FlameGraphs 的堆栈跟踪的最大深度

    一些通用的方法:

    • 优化反压算子的业务逻辑代码
    • 调用外部系统使用AsyncFunction
    • 增大TM的内存资源
    • 增大反压算子的并行度
    • 减少反压上游算子的并行度

    分区空闲检测

    Barrier对齐环节(一个算子有多个input)需要收集齐各个input的watermark才放行数据,如果某一个input的数据量很少,导致该input迟迟收不到watermark,则整个数据链路会被阻塞。

    为了解决这个问题,Flink提供了分区空闲检测功能,如果某个input在一段时间内没有收到数据,会被标记为空闲。在barrier对齐环节,这个input上面的watermark会被忽略。

    配置分区空闲判定时间的方法如下:

    SourceFunction.assignTimestampsAndWatermarks(
                    WatermarkStrategy
                            .forBoundedOutOfOrderness(Duration.ofMinutes(2))
                            .withIdleness(Duration.ofMinutes(5))
    );
    

    作业提交方式

    参考链接:YARN | Apache Flink

    Flink作业提交的方式分为application模式,per-job模式和session模式。

    per-job模式

    在Yarn创建一个Flink集群,然后在提交任务客户端所在机器本地运行作业jar的main方法,提交生成的JobGraph到Flink集群的JobManager。如果附带--detached参数,提交的作业被accept的时候,客户端会停止运行(命令行不用一直开着,生产环境必须。开发测试时可不带--detached参数,通过命令行查看运行日志)。

    实际生产中推荐使用此模式。每个作业都使用单独的Flink集群,它们消耗的资源是互相隔离的,一个作业的崩溃不会影响到其他作业。

    flink run -t yarn-per-job --detached /path/to/job.jar
    

    session模式

    首先启动Flink Yarn Session,它是一个常驻与Yarn的Flink集群。启动成功后,无论是否有作业执行,或者作业是否执行完毕,该session始终保持运行。启动yarn session的方法如下:

    export HADOOP_CLASSPATH=`hadoop classpath`
    ./bin/yarn-session.sh --detached
    

    yarn-session支持的相关参数解释:

    • -d/--detached: Detach模式,session启动成功后client停止运行。不用保持控制台一直开启。
    • -nm: Application名称
    • -jm: Job Manager 容器的内存
    • -tm: Task Manager 容器的内存
    • -t: 传送文件至集群,使用相对路径。程序中读取文件仍使用相对路径
    • -qu: 指定使用的Yarn队列

    提交作业到Yarn session:

    flink run -t yarn-session \
      -Dyarn.application.id=application_XXXX_YY \
      /path/to/job.jar
    

    停止Flink Yarn session可以通过Yarn UI的kill按钮。当然也可以通过如下方式:

    echo "stop" | ./bin/yarn-session.sh -id application_XXXXX_XXX
    

    注意:一个Flink Yarn Session可以同时跑多个Flink作业。

    application模式

    和per-job模式类似,提交一次任务会在Yarn运行一个Flink集群。不同之处为作业jar包的main方法在Yarn集群的JobManager上运行,而不是提交作业的client端运行。作业执行完毕后,Flink on yarn集群会被关闭。

    flink run-application -t yarn-application /path/to/job.jar
    

    application模式的好处是Flink yarn集群可以直接从HDFS上查找并下载作业jar以及所需依赖,避免了从client机器上传。

    flink run-application -t yarn-application \
        -Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir" \
        hdfs://myhdfs/jars/my-application.jar
    

    其中yarn.provided.lib.dirs为Flink作业所需依赖包的地址。

    相关文章

      网友评论

        本文标题:Flink 使用之配置与调优

        本文链接:https://www.haomeiwen.com/subject/mweswctx.html