本文根据的是flink1.12和flink1.13社区文章及分享整理。个人根据社区相关学习理解整理,仅供参考。
流批一体架构
image.pngimage.png image.png image.png
A.flink 1.11 及之前
- 统一了Tabel/SQL API & Planner
- 统一shuffle架构
B.flink1.12
优化总结:
1.DataStreamAPI 批执行模式
2.流批统一Source&Sink API
3.Pipeline Region scheduler
1.DataStream API 批执行模式
背景:flink 中虽然上层Table/Sql已经流批统一,但底层api仍是分开的,DataStream和DataSet。
因为批处理是流处理的特例,所以讲两种合并成统一的API,这样的好处是:
a. 具有好的复用性,作业可以在流和批这两种执行模式之间自由地切换,而无需重写任何代码。因此,用户可以复用同一个作业,来处理实时数据和历史数据。
b.维护简单,统一的 API 意味着流和批可以共用同一组 connector,维护同一套代码,并能够轻松地实现流批混合执行,例如 backfilling 之类的场景。
考虑到这些优点,社区已朝着流批统一的 DataStream API 迈出了第一步:支持高效的批处理(FLIP-134)。
从长远来看,这意味着 DataSet API 将被弃用(FLIP-131),其功能将被包含在 DataStream API 和 Table API / SQL 中。
■ 有限流上的批处理
您已经可以使用 DataStream API 来处理有限流(例如文件)了,但需要注意的是,运行时并不“知道”作业的输入是有限的。为了优化在有限流情况下运行时的执行性能,新的 BATCH 执行模式,对于聚合操作,全部在内存中进行,且使用 sort-based shuffle(FLIP-140)和优化过的调度策略(请参见 Pipelined Region Scheduling 了解更多详细信息)。因此,DataStream API 中的 BATCH 执行模式已经非常接近 Flink 1.12 中 DataSet API 的性能。有关性能的更多详细信息,请查看 FLIP-140。
在 Flink 1.12 中,默认执行模式为 STREAMING,要将作业配置为以 BATCH 模式运行,可以在提交作业的时候,设置参数 execution.runtime-mode:
$ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar
或者通过编程的方式:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeMode.BATCH);
2. 流批统一Source & Sink API
- 1.11版本已经支持了 source connector 工作在流批两种模式下。
TODO - 1.12支持了对Data Sink API的重构。
现有只支持FileSInkConnector。替换现有的StreamingFileSink Connector。
新的抽象引入了 write/commit 协议和一个更加模块化的接口。Sink 的实现者只需要定义 what和 how:
SinkWriter,用于写数据,并输出需要 commit 的内容(例如,committables);
Committer 和 GlobalCommitter,封装了如何处理 committables。
框架会负责 when 和 where:即在什么时间,以及在哪些机器或进程中 commit。
3. Pipelined Region 调度 (FLIP-119)
1). 1.12之前方案
(1)前提:两种模式:
a. pipelined result: 数据顺序一个一个的消费
b.blocking result: 在上游所有数据生成完成才开始执行。
(2)版本1.12之前是流批分开的
- sreaming: (使用的是a方式)
- batch: (stage内部使用的是pipelined,stage之间使用的是blocking)
这种方式优点:a.只用调度有数据的stage,所以更高效。 b.stage fail可以单独重启,不需要重新计算其他stage。
image.png(3) before 1.19 调度策略
统一的调度器需要对每个阶段,包括流处理和批处理,都要好的资源调度。1.12之前采用的是不同的调度策略,分别解决流批问题。
a. “all at once”
立刻执行,用于流处理。对于批处理,立刻执行可能会影响资源利用率,可能导致资源预先分配,等待上游数据而导致资源浪费。
b."lazy from sources"
对于批处理,使用懒加载方式,即input数据准备好之后再分配后续operator的资源。
这个策略独立运行在每个子任务中,所以不会识别同时在运行的所有subtask.
举例: image.pngA 是批数据,B是流表,C是需要join。
slot=1:B-C chain,那么C因为A未完成而无法执行。flink会尝试部署A,因为没有slot导致job失败。
slot=2:这时可用,flink能部署A,job能成功执行,但是当A在执行的时候,第一个slot会被B和C占用而浪费资源。
失败情况: 如果B→C失败,我们不用再重新执行A,但是1.9之前是不支持的。
社区为支持流批统一,设计了一个统一的调度和失败策略,Pipelined region scheduling.
2). pilelined region scheduling
image.png新调度策略在开始substask之前,通过分析ExecutionGraph,识别出pipelined region。
region内部使用的是pipelined方式,外部使用的是blocking方式。
(1)调度策略:
在region内,消费者需要不断消费生产的数据,以保证生产者不被block,并且避免背压。因此region的所有子任务必须被调度,失败是整体重启,同时运行。
图中r1→( r2,r3)→ r4,如果jobmanager有足够资源,那么在上游数据finished之后,将尽可能的执行更多的下游region。子任务执行是根据region分配的,要么成功,要么失败。
(2)失败策略
当然子任务失败,那么region重启,重新消费输入数据。如果一些输入数据丢失,那么flink会重新执行上游生产region。
好处:
1.可以在有限资源情况下,尽可能的执行批任务。
2.可以提高资源利用率并消除死锁。
参考:https://flink.apache.org/2020/12/15/pipelined-region-sheduling.html
C. flink1.13
优化总结:
1.大规模作业调度优化
2.sort-Merge Shuffle
3.有限作业一致性保证
1.大规模作业调度
背景:
image.png由于在创建图的时候,边会存储对象,那么在大规模作业调度时,会占用大量内存。
引入
A. 在ExecutionGraph中有两种,一种是pointwise模式(一对一或一对多),还有一种是alltoall(多堆多)
B. 什么情况是pointwise模式?
partition 分区方式
image.png
参考:区别https://blog.csdn.net/lvwenyuan_1/article/details/103722226
代码中:
forward Edge
ForwardPartitioner 和RescalePartitioner 属于pointwise模式,其他的均属于多对多。
C. 针对这两种方式,将多消费者合成消费组,减少对象创建,将O(n)变成了O(1)
image.png image.png
2.sort-merge shuffle
中间数据是如何保存和读取的?
在1.10以前实现了统一shuffle机制,参考:https://ververica.cn/developers/shuffle-mechanism/
背景:
针对批作业,在数据shuffle的优化。
上游跑完写中间文件
节省资源,不需要上游和下游同时起来。
失败不需要重新执行。
flink 默认的shuffle,给每个下游输出单独文件。
- 大量小文件
- 内存浪费,每个文件至少用1个buffer
- 下游数据读取产生大量随机I/O
新方案:sort shuffle
image.png-
先写缓冲区,把数据按照不同的下游分组,最后写入文件
image.png
(1)申请固定大小缓冲区,避免缓冲区随着规模增大而增大
(2)数据写入缓冲区,在缓冲区满的时候会对数据进行排序(合并分区),然后写入单独文件。后边数据接着写到文件后边。文件有多个段,每个段内有序。
没有采用外排序,merge不划算。
-
下游上层做I/O调度,下游读取是通过一个调度器。
image.png
image.png
3. 有限作业一致性保证
image.png背景:有限流不能做checkpoint,无法保证一致性。
优化:
image.png
所有subtask结束,只存标记
部分subtask结束,会存储剩下部分的数据。
结束语义整理:
数据有限正常结束
savepoint结束
endofinput 通知,统一做checkpoint,保证最后数据一定会提交到系统中。
stopwithsavepoint,不同统一做checkpoint,
正常结束,认为任务不再重启,调用endofinput,提交最后数据。
stop-with-savepoint,通过savepoint结束,后期会重启,不会提交最后数据
stop-with-savepoint --drain ,通过savepoint结束,后期不会重启,调用endofinput,提交最后数据。
参考:https://developer.aliyun.com/live/246712?spm=a2c6h.12873639.0.0.2f9612a824wQIq
网友评论