美文网首页
Flink中DataSet/DataStream的区别

Flink中DataSet/DataStream的区别

作者: 分裂四人组 | 来源:发表于2019-03-31 23:11 被阅读0次

    DataSet与DataStream的区别、使用

    • DataSet同DataStream从其接口封装、真实计算Operator有很大的差别,Dataset的实现在flink-javamodule中,而DataStream的实现在flink-streaming-java中;
    • DataSet: 批式处理,其接口封装类似于Spark的Dataset,支持丰富的函数操作,比如map/fliter/join/cogroup等;
      • 数据源创建初始数据集,例如来自文件或Java集合等静态数据;
      • 所有的操作为Operator的子类,实现具体逻辑,比如Join逻辑是在JoinOperator中实现;
    • DataStram: 流式处理,其结构封装实现输入流的处理,其也实现了丰富的函数支持;
      • 所有的操作为StreamOperator的子类,实现具体逻辑,比如Join逻辑是在IntervalJoinOperator中实现的;

    当前Flink中的使用中主要还是使用DataStream的较多,网上查看DataSet的使用场景少一点;

    StreamOperator的具体实现类介绍

    所有的StreamOperator都应该集成如下两个类型的接口实现:

    • AbstractStreamOperator: 该抽象类实现了StreamOperator的一些具体方法,如果包含了用户自定义函数应该继承AbstractUdfStreamOperator;
    • OneInputStreamOperator/TwoInputStreamOperator: 具体的Operator应该是其中之一;

    AbstractStreamOperator

    • 生命周期相关方法:
      • setup:
        • 调用链是invoke(StreamTask) -> constructor(OperatorChain) -> setup;
        • 在调用setup方法时,StreamTask已经在各个TaskManager节点上,StreamTask在启动是会创建OperatorChain,OperatorChain会一一调用所包含的算子的setup方法;
        • 在setup方法中,通过StreamTask的Environment和UserCodeClassLoader成员变量进行初始化。
      • open和close方法是空方法,由继承的类实现;
      • dispose方法是算子生命周期的最后一环,当StreamTask被取消或者出现异常时调用,负责释放与operator状态相关的资源。
    • 状态管理方法:
      • snapshotState
        • 调用链是triggerCheckpoint(CheckpointCoordinater) -> triggerCheckpoint(Execution) -(AkkaMsg: TriggerCheckpoint)-> (triggerCheckpointBarrier)TaskManager->checkpointStreamOperator(StreamTask) -> snapshotState;
        • 从这条链路可以看到checkpoint是由Master节点发起,通过akka触发TaskManager对所有StreamTask做checkpoint,然后最后由算子执行snapshot;
        • 在这一层将TimerService做备份,防止触发器丢失(后续分章节再梳理);
        • 并且调用operatorStateBackend和keyedStateBackend的snapshot方法将stateBackend的备份到用户指定的文件系统;
      • initializeState是空方法,留给子类去覆盖,作用是从checkpoint中恢复状态;
      • getPartitionedState(无namespace)方法创建了一个partitioned state的句柄,使得聚合类的方法可以操作状态,并且这些状态会在snapshotState被调用的时候被checkpoint(AbstractKeyedStateBackend的注释里提到)。在该方法中,调用了重载方法(参数有namespace和相应Serializer)并且传入VoidNamespace相应参数;
      • getOrCreateKeyedState,该方法会被WindowOperator调用,由于同一个key不同的window会对应不同的值,所以每个Window就是一个namespace,在处理具体Element前不仅要切换Key,还要切换namespace。本质上与上一个方法的区别就在于取得的state是否有namespace;
      • notifyCheckpointComplete方法调用了keyStateBackend的同名方法,通知其执行checkpoint完成之后的逻辑。

    OneInputStreamOperator/TwoInputStreamOperator

    • 这两个接口非常类似,本质上就是处理流上存在的三种元素StreamRecord,Watermark和LatencyMarker;
    • 一个用作单流输入,一个用作双流输入;
    • 除了StreamSource以外的所有Stream算子都必须实现并且只能实现其中一个接口。
    OneInputStreamOperator
    • StreamFilter/StreamMap/StreamFlatMap
      • 算子在实现的processElement分别调用传入的FilterFunction,MapFunction, FlatMapFunction的udf将element传到下游;
      • 其中StreamFlatMap用到了TimestampedCollector,它是output的一层封装,将timestamp加入到StreamRecord中发送到下游;
    • StreamGroupedReduce/StreamGroupedFold
      • 算子相似的点是都涉及到了状态操作(均为ValueState状态), 所以在覆盖open方法时通过创建一个状态的描述符以及调用AbstractStreamOperator实现的getPartitionedState方法获取了一个stateBackend的操作句柄;
      • 在processElement方法中借助这个句柄获取当前状态值,在用UDF将新的元素聚合进去并更新状态值,最后输出到下游;
      • 不同的是Fold的输出类型可能不一样(所以实现了OutputTypeConfigurable接口的setOutputType方法),并且有初始值。
    • ProcessOperator/LegacyKeyedProcessOperator/KeyedProcessOperator
      • 前两者提供了对ProcessFunction的支持, KeyedProcessOperator提供了对KeyedProcessFunction的支持;
      • ProcessFunction是比较灵活的UDF,允许用户通过在processElement的时候可以通过传入的Conext(这个Conext是由算子实现的)操作TimerService,注册ProcessingTimeTimer和EventTimeTimer,并且通过实现方法onTimer就可以在Timer被触发的时候执行回调的逻辑;
      • 其中ProcessOperator对ProcessFunction的实现是只支持获取currentProcessingTime和wartermark,不支持Timer的注册。
      • LegacyKeyedProcessOperator对ProcessFunction的实现是支持Timer的注册的。同时它实现了Triggerable接口中的onEventTime和onProcessingTime方法,这两个方法是由InternalTimerService调用的回调方法,在这两个方法中,会去调用用户在ProcessFunction中实现的onTimer方法来实现定时触发的。LegacyKeyedProcessOperator现在被标记为Deprecated,由KeyedProcessOperator类替代,但是由于很多Function是基于ProcessFunction实现的,而KeyedProcessFunction在1.5.2中还没有什么实现类,所以还是会经常被用到。
      • KeyedProcessOperator实现与LegacyKeyedProcessOperator非常类似,不过UDF是KeyedProcessFunction。
    • StreamSink
      • 除了在processElement方法中调用SinkFunction方法外,还提供了SimpleContext,可以获取processingTime,watermark和element的时间戳;
    • GenericWriteAheadSink
      • 提供了一个可以被实现为Exactly once的sink的抽象类,这边不展开研究。* AsyncWaitOperator提供了异步处理的能力,是一个比较特殊的算子,对元素的处理和备份恢复都比较特殊;
      • element的输出通过一个Emitter对象来实现。有机会单独针对这个算子写一篇文章。
    • TimestampsAndPeriodicWatermarksOperator/TimestampsAndPunctuatedWatermarksOperator
      • 通过TimestampAssigner提取timestamp并生按照规则生成watermark。
    • windowOperator
      • 可能是flink实现的最复杂的operator;
    TwoInputStreamOperator
    • CoStreamMap/CoStreamFlatMap
      • 基本与单流的逻辑没什么区别,只是针对两个流的Function做类似的处理。
    • IntervalJoinOperator
      • 对双流的元素根据提供的ProcessJoinFunction做内连接,并且每个元素都有失效时间;
      • 在processElement方法中,每当一个流的元素到达,会将它加入对应流的buffer,并且遍历另一个流的buffer找到所有join的选项;
      • 最后再根据失效时间注册一个状态清理的Timer防止buffer无限增长;
      • 此外它还实现了ProcessJoinFunction的Conext抽象类,提供了获取左右两个流元素timestamp的功能;
    • CoProcessOperator/KeyedCoProcessOperator
      • 本质上与单流的处理也没有什么区别,但是提供了双流之间共享状态的可能;
      • CoProcessOperator也被用来实现NonWindowJoin;
    • CoBroadcastWithKeyedOperator/CoBroadcastWithNonKeyedOperator
      • 提供了对(Keyed)BroadcastProcessFunction的支持,和CoProcess有一些类似,只是Broadcast的Stream只有读权限,没有写权限。并且可以通过context直接获得BroadcastState。

    参考:

    相关文章

      网友评论

          本文标题:Flink中DataSet/DataStream的区别

          本文链接:https://www.haomeiwen.com/subject/kjjpbqtx.html