美文网首页
Flink 优化及问题

Flink 优化及问题

作者: 专职掏大粪 | 来源:发表于2020-05-07 14:08 被阅读0次

问题排查

  • 看反压:背压高的下游oprator就是瓶颈
  • 关注checkpoint时长
    checkpoint时长一定程度影响系统吞吐
  • 看核心指标
    延迟指标、吞吐等
  • 资源使用率

常见问题

  1. json序列化反序列化
    通常在source和sink的task上,在指标上没有体现,容易被忽略(取消operator chain,查看反压)
  2. 数据倾斜
    数据倾斜影响系统吞吐
  3. 频繁GC
    内存比例分配不合理导致频繁gc,影响吞吐甚至tm失联
  4. MAP和SET的hash冲突
    有hashmap和hashset 随着负载因子的增高,引起插入和查询性能下降
  5. 和低速的系统交互
    和低速的外部系统交互,mysql、hbase (增加应用缓存、积攒批次处理避免单条查询)
  6. 大窗口
    1)窗口size大、数据量大
    2)滑动窗口size和step步长比例较大 eg size=5min,step=1s 同一条数据查法很多窗口的计算

数据倾斜影响

  • 数据热点:数据集中在某些task中,数据不平衡
  • GC频繁:过多的数据在jvm,导致内存资源短缺,触发频繁gc
  • 吞吐下降,数据延迟增大
  • 系统崩溃 过长的gc会导致tm和jm失联,系统崩溃

数据倾斜

1.源头
数据源消费不均匀,调整并行度(eg kakfa 分区 3个kafka分区 两个并行度 导致其中一个task处理2个kafka分区 另一个处理一个分区)
解决办法:通常source的并法度是kafka分区的整数倍

2.聚合场景


image.png

解决办法:两段聚合的方式(局部聚合+全局聚合)
方案适用场景:对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案。

方案实现思路:这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key都打上一个随机数(注意:随机数范为选择为下游并发度),比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。

内存调优

image.png

Flink中的总内存由JVM堆、manager memory和network buffers构成
对于容器化部署,总内存还可以包括一个容器预留内存

所有其他内存组件都是在启动Flink进程之前从total memory中计算出来的。启动后,manager memory和network buffers在某些情况下根据进程内可用的JVM内存进行调整
total memory 由taskmanager.heap.size指定
在Yarn or Mesos部署时,是请求容器大小

Container cut-off

引入截断是为了适应其他类型的内存消耗,而这些消耗在这个内存模型中没有考虑,例如RocksDB本机内存、JVM开销等。它也是一个安全余量,以防止容器超出其内存限制并被容器管理器杀死。
containerized.heap-cutoff-ratio :默认为0.25(占总内存的)

堆外内存调优

  • NetworkBuffer
    taskmanager.network.memory.fraction:默认是0.1
    taskmanager.network.memory.min:默认是64M
    taskmanager.network.memory.max: 默认是1G
    一般taskmanager.network.memory.fraction是0.1或小于0.1 根据使用情况调整

  • ManagerBuffer :
    taskmanager.memory.off-heap:true(默认是false不开启)
    taskmanager.memory.fraction(默认是0.7)
    考虑到流计算过程中ManagerBuffer没有使用,可以taskmanager.memory.fraction调整小于0.3

堆内内存调优
flink运行在jvm上,Flink使用的 Parallel Scavenge垃圾回收器可以改为G1

env.java.opts= -server -XX:+UseG1GC - XX:MaxGCPauseMillis=300 -XX:+PrintGCDetails

内存模型的例子

TM总内存8G, cutoff:容器的预留内存(k8s、yarn)为8G * 0.25

taskmanager.memory.fraction设置,例如
0.8的值意味着TM为内部数据缓冲区保留了80%的内存(堆内或堆外,取决于taskmanager.memory.off . heap),将20%的空闲内存留给TM的堆,供用户定义的函数创建的对象使用。此参数仅在taskmanager.memory未设置是生效。
managed = (total - network) x fraction
heap = total - managed (if off-heap) - network
network = Min(max, Max(min, fraction x total)

  • 作业failover的常见原因
    jobmanager
    zk访问超时
    长时间GC
    资源问题
    主机故障(磁盘等)
  • taskmanager
    上下游异常
    数据问题
    runtime异常
    主机异常

延迟问题处理

  • 延迟与吞吐
    确定延时节点及时间
  • 反压分析
    找到反压节点
    指标分析
    查看一段时间的指标
    堆栈
    找到指定节点jvm进程、分析jstack等信息
    相关日志
    查看taskmanager日志是否有异常

作业性能问题

  • 延时与吞吐
    延时指标
    tps吞吐
    节点输入输出
  • 反压
    找出反压源节点
    节点连接方式 shuffle、rebanlance、hash
    节点个并发情况
    业务逻辑、是否有正则、外部系统访问等

作业性能问题

  • 指标
    gc时间
    gc次数
    state checkpoint性能
    外部系统访问延时
  • 堆栈
    节点所在taskmanager进程
    查看线程PID


    image.png
  • 常见处理方式

  • 调整节点并发
    性能瓶颈问题增加并发

  • 调整节点资源
    增加节点内存 cpu资源

  • 拆分节点
    将chain在一起消耗资源较多的operator分开,增加并发

  • 作业集群优化
    主键设置 数据去重 数据倾斜
    GC参数
    jobmanager参数

image.png

堆栈
failover信息补全,需要到job mamaner 中看更详细的日志

  • 建立指标系统
    延迟和吞吐是flink最重要的指标
    tps 每秒有多少数据进入系统 消费是否能跟上生产
  • 如何查看日志
    yarn的container 日志和查看jobmanamger taskmanager 的日志
1588821660971.png image.png image.png image.png

参考
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html
https://tech.meituan.com/2016/05/12/spark-tuning-pro.html
https://blog.csdn.net/nazeniwaresakini/article/details/104220120?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-8&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-8

相关文章

网友评论

      本文标题:Flink 优化及问题

      本文链接:https://www.haomeiwen.com/subject/khyoghtx.html