美文网首页
flink spark strom 反压机制

flink spark strom 反压机制

作者: 邵红晓 | 来源:发表于2019-08-02 18:13 被阅读0次

strom 反压

实现原理

Storm 是通过监控 Bolt 中的接收队列负载情况,如果超过高水位值就会将反压信息写到 Zookeeper ,Zookeeper 上的 watch 会通知该拓扑的所有 Worker 都进入反压状态,最后 Spout 停止发送 tuple

  • 问题
    1、停止发送,等待系统回复,再次高速生产,然后再次停止发送,造成往数据流颠簸
    【图示】


    image.png

spark 反压

  • sparkstreaming 是以微批次模拟流失处理,设置batch
    val ssc = new StreamingContext(sparkConf, Seconds(3))
  • 如果processtime大于intervaltime=3s,就会导致内存溢出问题

解决办法

  1. 静态限速 spark 1.5 以前
    sparkConf.set("spark.streaming.kafka.maxRatePerPartition","10")
  2. 开启动态反压
    .set("spark.streaming.backpressure.enabled","true")

实现原理

在原架构的基础上加上一个新的组件RateController,这个组件负责监听“OnBatchCompleted”事件,然后从中抽取processingDelay 及schedulingDelay信息. Estimator依据这些信息估算出最大处理速度(rate),最后由基于Receiver的Input Stream将rate通过ReceiverTracker与ReceiverSupervisorImpl转发给BlockGenerator(继承自RateLimiter).


image.png

流量控制点

当Receiver开始接收数据时,会通过supervisor.pushSingle()方法将接收的数据存入currentBuffer等待BlockGenerator定时将数据取走,包装成block. 在将数据存放入currentBuffer之时,要获取许可(令牌)。如果获取到许可就可以将数据存入buffer, 否则将被阻塞,进而阻塞Receiver从数据源拉取数据。

其令牌投放采用令牌桶机制进行, 原理如下图所示:

image.png
令牌桶机制:

大小固定的令牌桶可自行以恒定的速率源源不断地产生令牌。如果令牌不被消耗,或者被消耗的速度小于产生的速度,令牌就会不断地增多,直到把桶填满。后面再产生的令牌就会从桶中溢出。最后桶中可以保存的最大令牌数永远不会超过桶的大小。当进行某操作时需要令牌时会从令牌桶中取出相应的令牌数,如果获取到则继续操作,否则阻塞。用完之后不用放回。

flink 反压

  • 在flink1.5 以前flink依靠tcp feedback实现网络流控
    存在问题:
    1.流控实现链路太长
    2.由于依赖tcp本身流控,所以task任务触发反压机制,会直接阻塞tcp网络通信,造成无法正常通信,比如
    checkpoint barrier 无法通过tcp发出
  • 在flink1.5 以后flink在inputchannel层实现了tcp的反压机制,避免在tcp层阻塞

链路演示

  • consumer: socket => netty => inputchannel => recoder reader
  • producter: recoder writer => resultSubPartition => netty => socket
  • 内存分配链条 : ic(input channel) full => local buffer pool full => network buffer pool full => disable netty autoread(off -heap - memory)


    image.png

flink 反压原理

  • product [1,2,3] send window size=3
  • consumer [1,2,3,4,5] reveive window size = 5
  1. product 发送3个,consumer 消费1个,reveive window size 剩余3个,发送ack,可接受数量3
  2. product接受到ack,滑动window,再发送3个,comsumer此时已经慢了,可接受数量0个,发送ack和可接受数量0个
  3. product接受到ack,不在发送0个给consumer,并且周期性探测

代码实现

您可以使用以下配置键配置JobManager的样本数:

web.backpressure.refresh-interval:刷新时间间隔(默认值:60000,1分钟)。
web.backpressure.num-samples:用于确定backpressure的堆栈跟踪样本数(默认值:100个)。
web.backpressure.delay-between-samples:跟踪样本延迟时间(默认值:50  ms)。

flink 反压机制不一定触发

image.png
  • 如果sink到kafka,kafka实现了网络流控,所以会在从storage反压传递到上游
  • 如果sink到es,es只会报错,所以反压机制不一定触发

相关文章

网友评论

      本文标题:flink spark strom 反压机制

      本文链接:https://www.haomeiwen.com/subject/rhcrdctx.html