DataSet与DataStream的区别、使用
- DataSet同DataStream从其接口封装、真实计算Operator有很大的差别,Dataset的实现在
flink-java
module中,而DataStream的实现在flink-streaming-java
中; - DataSet: 批式处理,其接口封装类似于Spark的Dataset,支持丰富的函数操作,比如map/fliter/join/cogroup等;
- 数据源创建初始数据集,例如来自文件或Java集合等静态数据;
- 所有的操作为Operator的子类,实现具体逻辑,比如Join逻辑是在JoinOperator中实现;
- DataStram: 流式处理,其结构封装实现输入流的处理,其也实现了丰富的函数支持;
- 所有的操作为StreamOperator的子类,实现具体逻辑,比如Join逻辑是在
IntervalJoinOperator
中实现的;
- 所有的操作为StreamOperator的子类,实现具体逻辑,比如Join逻辑是在
当前Flink中的使用中主要还是使用DataStream的较多,网上查看DataSet的使用场景少一点;
StreamOperator的具体实现类介绍
所有的StreamOperator都应该集成如下两个类型的接口实现:
- AbstractStreamOperator: 该抽象类实现了StreamOperator的一些具体方法,如果包含了用户自定义函数应该继承
AbstractUdfStreamOperator
; - OneInputStreamOperator/TwoInputStreamOperator: 具体的Operator应该是其中之一;
AbstractStreamOperator
- 生命周期相关方法:
- setup:
- 调用链是invoke(StreamTask) -> constructor(OperatorChain) -> setup;
- 在调用setup方法时,StreamTask已经在各个TaskManager节点上,StreamTask在启动是会创建OperatorChain,OperatorChain会一一调用所包含的算子的setup方法;
- 在setup方法中,通过StreamTask的Environment和UserCodeClassLoader成员变量进行初始化。
- open和close方法是空方法,由继承的类实现;
- dispose方法是算子生命周期的最后一环,当StreamTask被取消或者出现异常时调用,负责释放与operator状态相关的资源。
- setup:
- 状态管理方法:
- snapshotState
- 调用链是triggerCheckpoint(CheckpointCoordinater) -> triggerCheckpoint(Execution) -(AkkaMsg: TriggerCheckpoint)-> (triggerCheckpointBarrier)TaskManager->checkpointStreamOperator(StreamTask) -> snapshotState;
- 从这条链路可以看到checkpoint是由Master节点发起,通过akka触发TaskManager对所有StreamTask做checkpoint,然后最后由算子执行snapshot;
- 在这一层将TimerService做备份,防止触发器丢失(后续分章节再梳理);
- 并且调用operatorStateBackend和keyedStateBackend的snapshot方法将stateBackend的备份到用户指定的文件系统;
- initializeState是空方法,留给子类去覆盖,作用是从checkpoint中恢复状态;
- getPartitionedState(无namespace)方法创建了一个partitioned state的句柄,使得聚合类的方法可以操作状态,并且这些状态会在snapshotState被调用的时候被checkpoint(AbstractKeyedStateBackend的注释里提到)。在该方法中,调用了重载方法(参数有namespace和相应Serializer)并且传入VoidNamespace相应参数;
- getOrCreateKeyedState,该方法会被WindowOperator调用,由于同一个key不同的window会对应不同的值,所以每个Window就是一个namespace,在处理具体Element前不仅要切换Key,还要切换namespace。本质上与上一个方法的区别就在于取得的state是否有namespace;
- notifyCheckpointComplete方法调用了keyStateBackend的同名方法,通知其执行checkpoint完成之后的逻辑。
- snapshotState
OneInputStreamOperator/TwoInputStreamOperator
- 这两个接口非常类似,本质上就是处理流上存在的三种元素StreamRecord,Watermark和LatencyMarker;
- 一个用作单流输入,一个用作双流输入;
- 除了StreamSource以外的所有Stream算子都必须实现并且只能实现其中一个接口。
OneInputStreamOperator
- StreamFilter/StreamMap/StreamFlatMap
- 算子在实现的processElement分别调用传入的FilterFunction,MapFunction, FlatMapFunction的udf将element传到下游;
- 其中StreamFlatMap用到了TimestampedCollector,它是output的一层封装,将timestamp加入到StreamRecord中发送到下游;
- StreamGroupedReduce/StreamGroupedFold
- 算子相似的点是都涉及到了
状态操作
(均为ValueState
状态), 所以在覆盖open方法时通过创建一个状态的描述符以及调用AbstractStreamOperator实现的getPartitionedState方法获取了一个stateBackend的操作句柄; - 在processElement方法中借助这个句柄获取当前状态值,在用UDF将新的元素聚合进去并更新状态值,最后输出到下游;
- 不同的是Fold的输出类型可能不一样(所以实现了OutputTypeConfigurable接口的setOutputType方法),并且有初始值。
- 算子相似的点是都涉及到了
- ProcessOperator/LegacyKeyedProcessOperator/KeyedProcessOperator
- 前两者提供了对ProcessFunction的支持, KeyedProcessOperator提供了对KeyedProcessFunction的支持;
- ProcessFunction是比较灵活的UDF,允许用户通过在processElement的时候可以通过传入的Conext(这个Conext是由算子实现的)操作TimerService,注册ProcessingTimeTimer和EventTimeTimer,并且通过实现方法onTimer就可以在Timer被触发的时候执行回调的逻辑;
- 其中ProcessOperator对ProcessFunction的实现是只支持获取currentProcessingTime和wartermark,不支持Timer的注册。
- LegacyKeyedProcessOperator对ProcessFunction的实现是支持Timer的注册的。同时它实现了Triggerable接口中的onEventTime和onProcessingTime方法,这两个方法是由InternalTimerService调用的回调方法,在这两个方法中,会去调用用户在ProcessFunction中实现的onTimer方法来实现定时触发的。LegacyKeyedProcessOperator现在被标记为Deprecated,由KeyedProcessOperator类替代,但是由于很多Function是基于ProcessFunction实现的,而KeyedProcessFunction在1.5.2中还没有什么实现类,所以还是会经常被用到。
- KeyedProcessOperator实现与LegacyKeyedProcessOperator非常类似,不过UDF是KeyedProcessFunction。
- StreamSink
- 除了在processElement方法中调用SinkFunction方法外,还提供了SimpleContext,可以获取processingTime,watermark和element的时间戳;
- GenericWriteAheadSink
- 提供了一个可以被实现为Exactly once的sink的抽象类,这边不展开研究。* AsyncWaitOperator提供了异步处理的能力,是一个比较特殊的算子,对元素的处理和备份恢复都比较特殊;
- element的输出通过一个Emitter对象来实现。有机会单独针对这个算子写一篇文章。
- TimestampsAndPeriodicWatermarksOperator/TimestampsAndPunctuatedWatermarksOperator
- 通过TimestampAssigner提取timestamp并生按照规则生成watermark。
- windowOperator
- 可能是flink实现的最复杂的operator;
TwoInputStreamOperator
- CoStreamMap/CoStreamFlatMap
- 基本与单流的逻辑没什么区别,只是针对两个流的Function做类似的处理。
- IntervalJoinOperator
- 对双流的元素根据提供的ProcessJoinFunction做内连接,并且每个元素都有失效时间;
- 在processElement方法中,每当一个流的元素到达,会将它加入对应流的buffer,并且遍历另一个流的buffer找到所有join的选项;
- 最后再根据失效时间注册一个状态清理的Timer防止buffer无限增长;
- 此外它还实现了ProcessJoinFunction的Conext抽象类,提供了获取左右两个流元素timestamp的功能;
- CoProcessOperator/KeyedCoProcessOperator
- 本质上与单流的处理也没有什么区别,但是提供了双流之间共享状态的可能;
- CoProcessOperator也被用来实现NonWindowJoin;
- CoBroadcastWithKeyedOperator/CoBroadcastWithNonKeyedOperator
- 提供了对(Keyed)BroadcastProcessFunction的支持,和CoProcess有一些类似,只是Broadcast的Stream只有读权限,没有写权限。并且可以通过context直接获得BroadcastState。
参考:
网友评论