美文网首页
[Flink State] State究竟保存在哪里?

[Flink State] State究竟保存在哪里?

作者: LZhan | 来源:发表于2019-12-09 08:52 被阅读0次
    1.前言

    从源码解析State的保存过程,上一篇从task和operator出发说明了保存state的过程,到最后是由算子调用snapshot方法,进行state的快照操作。那么state究竟保存在哪里?

    <1> State Backend简介

    • 状态后端定义了流式应用程序状态如何存储和checkpoint的。不同的状态后端以不同的方式来存储其状态,并且使用不同的数据结构来保存正在运行的应用程序的状态。
    • 例如,MemoryStateBackend将工作state保存在TaskManager的内存中,并将checkpoint数据存储在JobManager的内存中。后端是轻量级的,没有其他依赖关系,但是可用性不高并且仅支持小状态。
    • FsStateBackend将工作state保存在TaskManager的内存中,并将checkpoint数据存储在文件系统中。
    • RocksDBStateBackend将工作state保存在RocksDB中,并且默认将checkpoint数据存在文件系统中,类似FsStateBackend。

    关于 Raw Bytes Storage and Backends

    • StateBackend提供服务给raw bytes storage,keyed state和operator state。
    • raw bytes存储(通过CheckpointStreamFactory)是一个基础服务以可容错的方式简单存储。该服务通过JobManager来存储checkpoint数据和恢复元数据,通常也可以提供给keyed-和operator状态后端来存储checkpoint数据。
    • StateBackend创建的AbstractKeyedStateBackend和OperatorStateBackend定义如何保持keyed和operator状态。他们也定义了如何将state给checkpoint(经常使用raw bytes storage通过CheckpointStreamFactory)。但是也有可能例如Keyed state backend只是实现了键/值存储,并且不需要在原始字节存储中存储任何内容检查点。

    总结:StateBackend主要是针对raw bytes storage(即checkpoint),keyed state和operator state来提供功能的,其中checkpoint数据的存储则是通过CheckpointStreamFactory,而state存储,针对keyedState是通过AbstractKeyedStateBackend,针对operatorState是通过OperatorStateBackend。

    <2>

    - MemoryStateBackend FsStateBackend RocksDBStateBackend
    CheckpointStream MemCheckpointStreamFactory FsCheckpointStreamFactory FsCheckpointStreamFactory
    SavepointStream MemCheckpointStreamFactory FsSavepointStreamFactory FsSavepointStreamFactory
    KeyedStateBackend HeapKeyedStateBackend HeapKeyedStateBackend RocksDBKeyedStateBackend
    OperatorStateBackend DefaultOperatorStateBackend DefaultOperatorStateBackend DefaultOperatorStateBackend

    RocksDBStateBackend的构造函数可以传入一个AbstractStateBackend,否则默认采用FsStateBackend。

    可以看到,从OperatorState的角度来讲,目前Flink只有一个实现,即DefaultOperatorStateBackend,它将List风格的State保存在内存中。
    从KeyedState的角度来讲,目前有两种实现,HeapKeyedStateBackend将state保存在内存中,而RocksDbKeyedStateBackend将State保存在TM本地的RocksDB中。相对而言,前者在内存中,速度会快,效率高,但一方面会限制state的大小,另一方面也会造成JVM自己的内存问题;后者在本地文件中,就会涉及序列化和反序列化,效率不及前者,但可以保存的state的大小会很大。

    从checkpoint和savepoint的角度来看,Memory工厂方法都保存在内存中,显然不能在生产环境使用,而Fs工厂方法和RocksDb工厂方法,则统一都放在文件系统中,比如HDFS。


    2.源码分析

    从上图中3,4两行可以看到,具体用来存储state的有三种HeapKeyedStateBackend,RocksDBKeyedStateBackend和DefaultOperatorStateBackend。

    DefaultOperatorStateBackend
    (1)


    通过上面可以看到,状态是以Map方式来存储的

    operator的ListSate的实现类PartitionableListState,OperatorState都保存在内存中,本质上还是一个ArrayList。

    (2)snapshot方法

    该snapshotStrategy是AbstractSnapshotStrategy<OperatorStateHandle>,而AbstractSnapshotStrategy有三种实现类:

    DefaultOperatorStateBackendSnapshotStrategy中的snapshot方法:
    该snapshot方法中,主要是对registeredOperatorStatesregisteredBroadcastStates的snapshot。

    第一步:


    针对所有注册的state进行deepCopy,为了防止在checkpoint的时候数据结构又被修改,deepCopy其实是通过序列化和反序列化的过程;

    第二步:
    异步写入State和MetaInfo,先创建CheckpointStateOutputStream,通过调用factory的createCheckpointStateOutputStream方法,这个factory是哪种类型的呢?这个是由定义的状态后端所决定的。之后会返回相应的OperatorstateHandle用作restore的过程。

    第三步:
    在StreamTask触发checkpoint的时候会将一个Task中所有的operator触发一次snapshot,触发部分就是上面1,2两个步骤,其中第二步是会返回一个RunnableFuture,在触发之后会提交一个AsyncSnapshotCallable异步任务,会阻塞一直等到checkpoint的Future,其实就是去调用这个方法AbstractAsyncIOCallable, 直到完成之后OperatorState会返回一个OperatorStateHandle,这个地方和后文的keyedState返回的handle不一样。

    相关文章

      网友评论

          本文标题:[Flink State] State究竟保存在哪里?

          本文链接:https://www.haomeiwen.com/subject/iiikgctx.html