2020-01-03
时间属性
image.pngFlink支持不同的时间语义,核心是 Processing Time 和 Event Time(Row Time)
image.png
一般情况下,当你的应用遇到某些问题要从上一个 checkpoint 或者 savepoint 进行重放,如果希望结果完全相同,就只能用 Event Time。如果接受结果不同,则可以用 Processing Time。Processing Time 的一个常见的用途是,根据现实时间来统计整个系统的吞吐量。
如果单条数据之间是乱序,我们就考虑对于整个序列进行更大程度的离散化
一个 watermark 本质上就代表了这个 watermark 所包含的 timestamp 数值,表示以后到来的数据已经再也没有小于或等于这个时间的了
Flink 支持两种 watermark 生成方式。第一种是在 SourceFunction 中产生,相当于把整个的 timestamp 分配和 watermark 生成的逻辑放在流处理应用的源头。我们可以在 SourceFunction 里面通过这两个方法产生 watermark:
- 通过 collectWithTimestamp 方法发送一条数据,其中第一个参数就是我们要发送的数据,第二个参数就是这个数据所对应的时间戳;也可以调用 emitWatermark 去产生一条 watermark,表示接下来不会再有时间戳小于等于这个数值记录。
- 另外,有时候我们不想在 SourceFunction 里生成 timestamp 或者 watermark,或者说使用的 SourceFunction 本身不支持,我们还可以在使用 DataStream API 的时候指定,调用的 DataStream.assignTimestampsAndWatermarks 这个方法,能够接收不同的 timestamp 和 watermark 的生成器。
在分配 timestamp 和生成 watermark 的过程,虽然在 SourceFunction 和 DataStream 中都可以指定,但是还是建议生成的工作越靠近 DataSource 越好。这样会方便让程序逻辑里面更多的 operator 去判断某些数据是否乱序。Flink 内部提供了很好的机制去保证这些 timestamp 和 watermark 被正确地传递到下游的节点。
watermark传播策略基本上遵循这三点。
- watermark 会以广播的形式在算子之间进行传播。比如说上游的算子连接了三个下游的任务,它会把自己当前的收到的 watermark 以广播的形式传到下游。
- 如果在程序里面收到了一个 Long.MAX_VALUE 这个数值的 watermark,就表示对应的那一条流的一个部分不会再有数据发过来了,它相当于就是一个终止的标志。
- 对于单流而言,这个策略比较好理解,而对于有多个输入的算子,watermark 的计算就有讲究了,一个原则是:单输入取其大,多输入取小。
传播是幂等的
watermark处理过程:
- 一个算子的实例在收到 watermark 的时候,首先要更新当前的算子时间,这样的话在 ProcessFunction 里方法查询这个算子时间的时候,就能获取到最新的时间。
- 第二步它会遍历计时器队列,这个计时器队列就是我们刚刚说到的 timer,你可以同时注册很多 timer,Flink 会把这些 Timer 按照触发时间放到一个优先队列中。
- 第三步 Flink 得到一个时间之后就会遍历计时器的队列,然后逐一触发用户的回调逻辑。 通过这种方式,Flink 的某一个任务就会将当前的 watermark 发送到下游的其他任务实例上,从而完成整个 watermark 的传播,从而形成一个闭环。
state
keyed state
- 只能应用于 KeyedStream 的函数与操作中,例如 Keyed UDF, window state
- keyed state 是已经分区 / 划分好的,每一个 key 只能属于某一个 keyed state
operator state
- 称为 non-keyed state,每一个 operator state 都仅与一个 operator 的实例绑定。
- 常见的 operator state 是 source state,例如记录当前 source 的 offset
Managed State:由 Flink 管理的 state,刚才举例的所有 state 均是 managed state
Raw State:Flink 仅提供 stream 可以进行存储数据,对 Flink 而言 raw state 只是一些 bytes
state 是 Checkpoint 进行持久化备份的主要角色
exactly once 需要一个input buffer将数据缓存
image.pngFlink 的 Checkpoint 机制只能保证 Flink 的计算过程可以做到 EXACTLY ONCE,端到端的 EXACTLY ONCE 需要 source 和 sink 支持
Savepoint
用户通过命令触发,由用户管理其创建与删除
标准化格式存储,允许作业升级或者配置变更
用户在恢复时需要提供用于恢复作业状态的 savepoint 路径
Externalized Checkpoint
Checkpoint 完成时,在用户给定的外部持久化存储保存
当作业 FAILED(或者 CANCELED)时,外部存储的 Checkpoint 会保留下来
用户在恢复时需要提供用于恢复的作业状态的 Checkpoint 路径
架构图
image.png
用户通过 DataStream API、DataSet API、SQL 和 Table API 编写 Flink 任务,它会生成一个 JobGraph。JobGraph 是由 source、map()、keyBy()/window()/apply() 和 Sink 等算子组成的。当 JobGraph 提交给 Flink 集群后,能够以 Local、Standalone、Yarn 和 Kubernetes 四种模式运行。
image.pngjob manager
- 将 JobGraph 转换成 Execution Graph,最终将 Execution Graph 拿来运行
- Scheduler 组件负责 Task 的调度
- Checkpoint Coordinator 组件负责协调整个任务的 Checkpoint,包括 Checkpoint 的开始和完成
- 通过 Actor System 与 TaskManager 进行通信
- 其它的一些功能,例如 Recovery Metadata,用于进行故障恢复时,可以从 Metadata 里面读取数据。
image.pngtask manager
主要组件
- Memory & I/O Manager,即内存 I/O 的管理
- Network Manager,用来对网络方面进行管理
- Actor system,用来负责网络的通信
TaskManager 被分成很多个 TaskSlot,每个任务都要运行在一个 TaskSlot 里面,TaskSlot 是调度资源里的最小单位。
image.pngstandalone
- 在 Master 进程中,Standalone ResourceManager 的作用是对资源进行管理。当用户通过 Flink Cluster Client 将 JobGraph 提交给 Master 时,JobGraph 先经过 Dispatcher。
- 当 Dispatcher 收到客户端的请求之后,生成一个 JobManager。接着 JobManager 进程向 Standalone ResourceManager 申请资源,最终再启动 TaskManager。
- TaskManager 启动之后,会有一个注册的过程,注册之后 JobManager 再将具体的 Task 任务分发给这个 TaskManager 去执行。
作业流程解析
image.png image.png
DataStream 中物理分组方式包括:
- Global: 上游算子将所有记录发送给下游算子的第一个实例。
- Broadcast: 上游算子将每一条记录发送给下游算子的所有实例。
- Forward:只适用于上游算子实例数与下游算子相同时,每个上游算子实例将记录发送给下游算子对应的实例。
- Shuffle:上游算子对每条记录随机选择一个下游算子进行发送。
- Rebalance:上游算子通过轮询的方式发送数据。
- Rescale:当上游和下游算子的实例数为 n 或 m 时,如果 n < m,则每个上游实例向 ceil(m/n) 或 floor(m/n) 个下游实例轮询发送数据;如果 n > m,则 floor(n/m) 或 ceil(n/m) 个上游实例向下游实例轮询发送数据。
- PartitionCustomer:当上述内置分配方式不满足需求时,用户还可以选择自定义分组方式
当我们调用 DataStream#map 算法时,Flink 在底层会创建一个 Transformation 对象,这一对象就代表我们计算逻辑图中的节点。它其中就记录了我们传入的 MapFunction,也就是 UDF(User Define Function)。随着我们调用更多的方法,我们创建了更多的 DataStream 对象,每个对象在内部都有一个 Transformation 对象,这些对象根据计算依赖关系组成一个图结构,就是我们的计算图。后续 Flink 将对这个图结构进行进一步的转换,从而最终生成提交作业所需要的 JobGraph
客户端方式
image.png
取消任务
- cancel() 调用,立即调用作业算子的 cancel() 方法,以尽快取消它们。如果算子在接到 cancel() 调用后没有停止,Flink 将开始定期中断算子线程的执行,直到所有算子停止为止。
- stop() 调用,是更优雅的停止正在运行流作业的方式。stop() 仅适用于 Source 实现了 StoppableFunction 接口的作业。当用户请求停止作业时,作业的所有 Source 都将接收 stop() 方法调用。直到所有 Source 正常关闭时,作业才会正常结束。这种方式,使作业正常处理完所有作业。
window
Window 方法接收的输入是一个WindowAssigner, WindowAssigner 负责将每条输入的数据分发到正确的 Window 中(一条数据可能同时分发到多个 Window 中),Flink 提供了几种通用的 WindowAssigner:tumbling window(窗口间的元素无重复),sliding window(窗口间的元素可能重复),session window 以及 global window。如果需要自己定制数据分发策略,则可以实现一个 class,继承自 WindowAssigner。
Evictor 主要用于做一些数据的自定义操作,可以在执行用户代码之前,也可以在执行用户代码之后,更详细的描述可以参考 org.apache.flink.streaming.api.windowing.evictors.Evictor 的 evicBefore 和 evicAfter 两个方法。Flink 提供了如下三种通用的 evictor:
- CountEvictor 保留指定数量的元素
- DeltaEvictor 通过执行用户给定的 DeltaFunction 以及预设的 threshold,判断是否删除一个元素。
- TimeEvictor设定一个阈值 interval,删除所有不再 max_ts - interval 范围内的元素,其中 max_ts 是窗口内时间戳的最大值。
Evictor 是可选的方法,如果用户不选择,则默认没有。
Trigger 用来判断一个窗口是否需要被触发,每个 WindowAssigner 都自带一个默认的 Trigger,如果默认的 Trigger 不能满足你的需求,则可以自定义一个类,继承自 Trigger 即可,我们详细描述下 Trigger 的接口以及含义:
- onElement() 每次往 window 增加一个元素的时候都会触发
- onEventTime() 当 event-time timer 被触发的时候会调用
- onProcessingTime() 当 processing-time timer 被触发的时候会调用
- onMerge() 对两个 trigger 的 state 进行 merge 操作
- clear() window 销毁的时候被调用
上面的接口中前三个会返回一个 TriggerResult,TriggerResult 有如下几种可能的选择:
- CONTINUE 不做任何事情
- FIRE 触发 window
- PURGE 清空整个 window 的元素并销毁窗口
- FIRE_AND_PURGE 触发窗口,然后销毁窗口
time
指定watermark后允许的最大延迟,使用side output可以获取到这些数据
image.png
需要注意的是,设置了 allowedLateness 之后,迟到的数据也可能触发窗口,对于 Session window 来说,可能会对窗口进行合并,产生预期外的行为
Window 中的的元素同样是通过 State 进行维护,然后由 Checkpoint 机制保证 Exactly Once 语义
image.png image.png image.png- ValueState 存储单个值,比如 Wordcount,用 Word 当 Key,State 就是它的 Count。这里面的单个值可能是数值或者字符串,作为单个值,访问接口可能有两种,get 和 set。在 State 上体现的是 update(T) / T value()。
- MapState 的状态数据类型是 Map,在 State 上有 put、remove 等。需要注意的是在 MapState 中的 key 和 Keyed state 中的 key 不是同一个。
- ListState 状态数据类型是 List,访问接口如 add、update 等。
- ReducingState 和 AggregatingState 与 ListState 都是同一个父类,但状态数据类型上是单个值,原因在于其中的 add 方法不是把当前的元素追加到列表中,而是把当前元素直接更新进了 Reducing 的结果中。
- AggregatingState 的区别是在访问接口,ReducingState 中 add(T)和 T get() 进去和出来的元素都是同一个类型,但在 AggregatingState 输入的 IN,输出的是 OUT。
image.png image.png
image.png
image.png
image.png
SQL示例
https://ververica.cn/developers/flink-sql-programming-practice/
group aggregate
SELECT psgCnt, COUNT(*) AS cnt
FROM Rides
WHERE isInNYC(lon, lat)
GROUP BY psgCnt;
window aggregate
SELECT
toAreaId(lon, lat) AS area,
TUMBLE_END(rideTime, INTERVAL '5' MINUTE) AS window_end,
COUNT(*) AS cnt
FROM Rides
WHERE isInNYC(lon, lat) and isStart
GROUP BY
toAreaId(lon, lat),
TUMBLE(rideTime, INTERVAL '5' MINUTE)
HAVING COUNT(*) >= 5;
image.png
建议对 Group Aggregate 的作业配上 State TTL 的配置
chain operator
通过OperatorChain这个类来将多个operator链在一起形成一个新的operator
可以形成chain的条件:
- 上下游的并行度一致
- 下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
- 上下游节点都在同一个 slot group 中(下面会解释 slot group)
- 下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)
- 上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)
- 两个节点间数据分区方式是 forward(参考理解数据流的分区)
- 用户没有禁用 chain
主要通过OperatorChain的ChainingOutput实现性能优化
可以调用startNewChain()开启新chain
可以调用disableChaining()表明该operator不参与chain
createChainedOperator方法里递归调用了createOutputCollector,所以chained operators实际上是从下游往上游去反向一个个创建和setup的。以word count为例,chained operators为:StreamGroupedReduce - StreamFilter - StreamSink,而实际初始化顺序则相反:StreamSink - StreamFilter - StreamGroupedReduce。
在OperatorChain类中,headOperator为StreamGroupedReduce。createOutputCollector的调用过程如下:
createOutputCollector(operatorConfig=<StreamGroupedReduce config>, ...)
--> chainedOpConfig = <StreamFilter config>
--> createChainedOperator(chainedOpConfig=<StreamFilter config>)
--> createOutputCollector(<StreamFilter config>)
--> chainedOpConfig = <StreamSink config>
--> createChainedOperator(<StreamSink config>)
--> createOutputCollector(<StreamSink config>)
--> chainedOpConfig = null, 返回BroadcastingOutputCollector
--> StreamSink.setup(<output=BroadcastingOutputCollector>)
--> return CopyingChainingOutput
--> output = CopyingChainingOutput
--> StreamFilter.setup(<output=CopyingChainingOutput>)
--> return CopyingChainingOutput
--> output = CopyingChainingOutput
--> headOperator.setup(<output=CopyingChainingOutput>)
如果operator chain中只有一个operator,在word count的例子中,在StreamSource之后的flatMap,就是这种情况,它不能跟后面的操作chain在一起。
首先OperatorChain构造函数中的chainedConfigs会为空,因为下游没有跟它chain在一起的operator。接下来看下它的chainEntryPoint:
在createOutputCollector方法中,由于没有chained outputs,因此会直接返回RecordWriterOutput,即headOperator的output就直接交给record writer输出了。
code reading
注解
@FunctionalInterface
- 加上标注,则会触发JavaCompiler的检查。
- Markable接口
- 可用于lambda类型使用
- 唯一的抽象方法,有且仅有一个
代码
SlotSharingGroup
是Flink中用来实现slot共享的类,它尽可能地让subtasks共享一个slot。相应的,还有一个 CoLocationGroup
类用来强制将 subtasks 放到同一个 slot 中。CoLocationGroup
主要用于迭代流中,用来保证迭代头与迭代尾的第i个subtask能被调度到同一个TaskManager上
默认情况下,所有的operator都属于默认的共享组default,也就是说默认情况下所有的operator都是可以共享一个slot的。而当所有input operators具有相同的slot共享组时,该operator会继承这个共享组。为了防止不合理的共享,用户也能通过API来强制指定operator的共享组,比如:someStream.filter(...).slotSharingGroup("group1");就强制指定了filter的slot共享组为group1。
抽象类Slot定义了该槽位属于哪个TaskManager(instance)的第几个槽位(slotNumber),属于哪个Job(jobID)等信息。最简单的情况下,一个slot只持有一个task,也就是SimpleSlot的实现。复杂点的情况,一个slot能共享给多个task使用,也就是SharedSlot的实现。SharedSlot能包含其他的SharedSlot,也能包含SimpleSlot。所以一个SharedSlot能定义出一棵slots树。
关于Flink调度,有两个非常重要的原则我们必须知道:(1)同一个operator的各个subtask是不能呆在同一个SharedSlot中的,例如FlatMap[1]和FlatMap[2]是不能在同一个SharedSlot中的。(2)Flink是按照拓扑顺序从Source一个个调度到Sink的。例如WordCount(Source并行度为1,其他并行度为2),那么调度的顺序依次是:Source -> FlatMap[1] -> FlatMap[2] -> KeyAgg->Sink[1] -> KeyAgg->Sink[2]。
网友评论