美文网首页FlinkFlink
flink 流批统一优化整理

flink 流批统一优化整理

作者: ZYvette | 来源:发表于2021-06-07 14:56 被阅读0次

    本文根据的是flink1.12和flink1.13社区文章及分享整理。个人根据社区相关学习理解整理,仅供参考。

    流批一体架构

    image.png
    image.png image.png image.png

    A.flink 1.11 及之前

    • 统一了Tabel/SQL API & Planner
    • 统一shuffle架构

    B.flink1.12

    优化总结:
    1.DataStreamAPI 批执行模式
    2.流批统一Source&Sink API
    3.Pipeline Region scheduler

    1.DataStream API 批执行模式

    背景:flink 中虽然上层Table/Sql已经流批统一,但底层api仍是分开的,DataStream和DataSet。

    因为批处理是流处理的特例,所以讲两种合并成统一的API,这样的好处是:

    a. 具有好的复用性,作业可以在流和批这两种执行模式之间自由地切换,而无需重写任何代码。因此,用户可以复用同一个作业,来处理实时数据和历史数据。

    b.维护简单,统一的 API 意味着流和批可以共用同一组 connector,维护同一套代码,并能够轻松地实现流批混合执行,例如 backfilling 之类的场景。

    考虑到这些优点,社区已朝着流批统一的 DataStream API 迈出了第一步:支持高效的批处理(FLIP-134)。

    从长远来看,这意味着 DataSet API 将被弃用(FLIP-131),其功能将被包含在 DataStream API 和 Table API / SQL 中。

    ■ 有限流上的批处理
    您已经可以使用 DataStream API 来处理有限流(例如文件)了,但需要注意的是,运行时并不“知道”作业的输入是有限的。为了优化在有限流情况下运行时的执行性能,新的 BATCH 执行模式,对于聚合操作,全部在内存中进行,且使用 sort-based shuffle(FLIP-140)和优化过的调度策略(请参见 Pipelined Region Scheduling 了解更多详细信息)。因此,DataStream API 中的 BATCH 执行模式已经非常接近 Flink 1.12 中 DataSet API 的性能。有关性能的更多详细信息,请查看 FLIP-140。

    在 Flink 1.12 中,默认执行模式为 STREAMING,要将作业配置为以 BATCH 模式运行,可以在提交作业的时候,设置参数 execution.runtime-mode:

    $ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar
    

    或者通过编程的方式:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setRuntimeMode(RuntimeMode.BATCH); 
    

    2. 流批统一Source & Sink API

    • 1.11版本已经支持了 source connector 工作在流批两种模式下。
      TODO
    • 1.12支持了对Data Sink API的重构。
    image.png

    现有只支持FileSInkConnector。替换现有的StreamingFileSink Connector。

    新的抽象引入了 write/commit 协议和一个更加模块化的接口。Sink 的实现者只需要定义 whathow

    SinkWriter,用于写数据,并输出需要 commit 的内容(例如,committables);

    Committer 和 GlobalCommitter,封装了如何处理 committables。

    框架会负责 whenwhere:即在什么时间,以及在哪些机器或进程中 commit。

    3. Pipelined Region 调度 (FLIP-119)

    1). 1.12之前方案

    (1)前提:两种模式:

    a. pipelined result: 数据顺序一个一个的消费

    b.blocking result: 在上游所有数据生成完成才开始执行。

    (2)版本1.12之前是流批分开的

    • sreaming: (使用的是a方式)
    image.png
    • batch: (stage内部使用的是pipelined,stage之间使用的是blocking)

    这种方式优点:a.只用调度有数据的stage,所以更高效。 b.stage fail可以单独重启,不需要重新计算其他stage。

    image.png

    (3) before 1.19 调度策略

    统一的调度器需要对每个阶段,包括流处理和批处理,都要好的资源调度。1.12之前采用的是不同的调度策略,分别解决流批问题。

    a. “all at once”

    立刻执行,用于流处理。对于批处理,立刻执行可能会影响资源利用率,可能导致资源预先分配,等待上游数据而导致资源浪费。

    b."lazy from sources"

    对于批处理,使用懒加载方式,即input数据准备好之后再分配后续operator的资源。

    这个策略独立运行在每个子任务中,所以不会识别同时在运行的所有subtask.

    举例: image.png

    A 是批数据,B是流表,C是需要join。

    slot=1:B-C chain,那么C因为A未完成而无法执行。flink会尝试部署A,因为没有slot导致job失败。

    slot=2:这时可用,flink能部署A,job能成功执行,但是当A在执行的时候,第一个slot会被B和C占用而浪费资源。

    失败情况: 如果B→C失败,我们不用再重新执行A,但是1.9之前是不支持的。

    社区为支持流批统一,设计了一个统一的调度和失败策略,Pipelined region scheduling.

    2). pilelined region scheduling

    image.png

    新调度策略在开始substask之前,通过分析ExecutionGraph,识别出pipelined region。

    region内部使用的是pipelined方式,外部使用的是blocking方式。

    (1)调度策略:

    在region内,消费者需要不断消费生产的数据,以保证生产者不被block,并且避免背压。因此region的所有子任务必须被调度,失败是整体重启,同时运行。

    图中r1→( r2,r3)→ r4,如果jobmanager有足够资源,那么在上游数据finished之后,将尽可能的执行更多的下游region。子任务执行是根据region分配的,要么成功,要么失败。

    (2)失败策略

    当然子任务失败,那么region重启,重新消费输入数据。如果一些输入数据丢失,那么flink会重新执行上游生产region。

    好处:

    1.可以在有限资源情况下,尽可能的执行批任务。

    2.可以提高资源利用率并消除死锁。

    参考:https://flink.apache.org/2020/12/15/pipelined-region-sheduling.html

    C. flink1.13

    优化总结:
    1.大规模作业调度优化
    2.sort-Merge Shuffle
    3.有限作业一致性保证

    1.大规模作业调度

    背景:
    image.png

    由于在创建图的时候,边会存储对象,那么在大规模作业调度时,会占用大量内存。

    引入

    A. 在ExecutionGraph中有两种,一种是pointwise模式(一对一或一对多),还有一种是alltoall(多堆多)

    B. 什么情况是pointwise模式?

    partition 分区方式

    image.png
    参考:区别https://blog.csdn.net/lvwenyuan_1/article/details/103722226

    代码中:


    forward Edge

    ForwardPartitioner 和RescalePartitioner 属于pointwise模式,其他的均属于多对多。

    C. 针对这两种方式,将多消费者合成消费组,减少对象创建,将O(n)变成了O(1)


    image.png image.png

    2.sort-merge shuffle

    中间数据是如何保存和读取的?

    在1.10以前实现了统一shuffle机制,参考:https://ververica.cn/developers/shuffle-mechanism/

    1. flink 网络流控和反压机制:https://ververica.cn/developers/advanced-tutorial-2-analysis-of-network-flow-control-and-back-pressure/

    背景:
    针对批作业,在数据shuffle的优化。
    上游跑完写中间文件
    节省资源,不需要上游和下游同时起来。
    失败不需要重新执行。

    image.png

    flink 默认的shuffle,给每个下游输出单独文件。

    • 大量小文件
    • 内存浪费,每个文件至少用1个buffer
    • 下游数据读取产生大量随机I/O

    新方案:sort shuffle

    image.png
    1. 先写缓冲区,把数据按照不同的下游分组,最后写入文件


      image.png

    (1)申请固定大小缓冲区,避免缓冲区随着规模增大而增大
    (2)数据写入缓冲区,在缓冲区满的时候会对数据进行排序(合并分区),然后写入单独文件。后边数据接着写到文件后边。文件有多个段,每个段内有序。

    没有采用外排序,merge不划算。

    1. 下游上层做I/O调度,下游读取是通过一个调度器。


      image.png
      image.png

    参考:https://wiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink

    3. 有限作业一致性保证

    image.png

    背景:有限流不能做checkpoint,无法保证一致性。

    优化:


    image.png

    所有subtask结束,只存标记
    部分subtask结束,会存储剩下部分的数据。

    结束语义整理:
    数据有限正常结束
    savepoint结束

    image.png

    endofinput 通知,统一做checkpoint,保证最后数据一定会提交到系统中。

    stopwithsavepoint,不同统一做checkpoint,
    正常结束,认为任务不再重启,调用endofinput,提交最后数据。
    stop-with-savepoint,通过savepoint结束,后期会重启,不会提交最后数据
    stop-with-savepoint --drain ,通过savepoint结束,后期不会重启,调用endofinput,提交最后数据。

    image.png

    参考:https://developer.aliyun.com/live/246712?spm=a2c6h.12873639.0.0.2f9612a824wQIq

    相关文章

      网友评论

        本文标题:flink 流批统一优化整理

        本文链接:https://www.haomeiwen.com/subject/ufbcsltx.html