美文网首页spark
StateStore源码解析

StateStore源码解析

作者: 阿猫阿狗Hakuna | 来源:发表于2018-09-05 17:18 被阅读7次

    一.Trait StateStore

    版本化key-value store的基本trait。“StateStore”的每个实例都代表一个状态数据的特定版本。实例由StateStoreProvider提供。


    image.png

    有以下两个实现:


    image.png

    变量:

    1.独一无二的标识


    image.png

    2.提交更新之前,此store中存放数据的版本


    image.png

    方法:

    1.获得一个非空key的当前值


    image.png

    2.对一个非空的key赋新值。具体实现必须知道参数中的UnsafeRow可以被重用,且必须拷贝数据以进行持久化。


    image.png

    3.移除一个非空的key


    image.png

    4.通过可选的start和end获取key-value对


    image.png

    5.提交store中所有的更新,并且返回一个新版本。具体实现应该确保在提交之后不会有更多的更新(put、remove)以免应用不当。


    image.png

    6.中止store中所有的更新。


    image.png

    7.返回一个包含StateStore中所有key-value对的迭代器。


    image.png

    8.StateStore的当前指标


    image.png

    9.是否所有的更新都已提交


    image.png

    二.Trait StateStoreProvider

    提供一个表示状态数据版本的StateStore实例
    提供者及其提供存储的生命周期如下:
    (1)第一批流查询的数据在executor上被执行时,一个StateStoreProvider在一个executor中被创建,每个独一无二的StateStoreId都会创建一个。所有后续的批次重用这个实例直到查询结束。
    (2)每批流数据通过调用getStore(version)请求状态数据的特定版本,该版本返回StateStore的实例,通过该实例可以访问所需版本的数据。provider负责用上下文信息(如key-value模式)填充该存储。
    (3)在流查询停止后,已经创建的provider实例被延迟处理掉。


    image.png

    有以下两个实现:


    image.png

    方法:

    1.返回此provider生成的StateStore的ID。应与init()中传递的相同。


    image.png

    2.通过SQL操作符初始化provider的上下文信息。通过反射创建StateStoreProvider的实例后,将实现调用此方法。


    image.png

    3.当从executor卸载provider实例时调用


    image.png

    4.返回指定版本状态数据的StateStore的实例


    image.png

    三.具体实现:HDFSBackedStateStoreProvider

    image.png

    StateStoreProvider和StateStore的具体实现,其中所有数据都由hdfs兼容的文件系统中的文件支持。对store所有的更新都必须以事务的方式完成,每一组更新都会对store的版本进行递增。这些版本可用于在正确版本的state上重新执行更新(通过rdd操作中的重试),并重新生成存储版本。

    如何使用:

    要更新stateStore中的数据,需要执行以下操作步骤。
    1.获取正确的store


    image.png
    image.png

    容错模型:

    1.提交之前,每一组更新都被写入一个delta文件
    2.state store负责管理、折叠和清理这些delta文件
    3.多次提交相同版本的更新可能会互相覆盖,一次性保证取决于多次尝试是否具有相同的更新和底层文件系统的覆盖语义。
    4.后台文件维护确保存储的最后版本总是可恢复的,以确保重新执行RDD操作重新应用正确的过去版本。

    接下来看看这个类:

    image.png

    用一个concurrentHashMap来存key-value对。
    其中实现了一些方法:


    image.png

    比较重要的commit方法:


    image.png

    点进去commitUpdates方法:


    image.png

    进入到putStateIntoStateCacheMap方法,发现一句:


    image.png

    查看loadedMaps:


    image.png

    原来是创建了个TreeMap来存放每个版本的数据,key为版本号,value就是每个版本自己的map。

    四.IncrementalExecution

    image.png

    QueryExecution的继承,将给定的LogicalPlan增量化执行。在每次执行之间保持状态。

    image.png

    获取StateStore的数量

    进入SHUFFLE_PARITIONS

    image.png

    可以看到,partition的数量由参数spark.sql.shuffle.partitions控制,如果不设置,默认为200.
    接着往下看

    image.png

    获得下一个有状态操作的state信息,进入StatefulOperatorStateInfo看看

    image.png

    这个方法只返回了一些字符串信息,但在下文通过这个信息调用了StateStoreSaveExec方法

    image.png

    这个方法创建了stateManager来管理stateStore,对stateStore进行具体的get,put等操作

    image.png

    接着调用了doExecute方法

    image.png

    在这个方法里有这样一句

    image.png

    进入mapPartitionsWithStateStore中

    image.png

    将RDD中的每个分区与StateStore中的数据进行映射

    这里的参数中有一个storeCoordinator

    image.png

    进入StateStoreCoordinatorRef

    image.png

    引用StateStoreCoordinator,可用于在所有executor之间协调StateStore的实例,并获取它们的位置进行作业调度。

    以下是StateStoreCoordinator类

    image.png

    这个创建了一个Map来存储StateStoreProviderId和host,executorId的存储关系。

    在上文调用StateStoreSaveExec方法时,调用了StateStoreRestoreExec方法

    image.png

    对于每个输入元祖,将计算key并将StateStore中的值添加到流。

    image.png

    由此可知,State Store是分布式的存储在所有Executor上的。

    相关文章

      网友评论

        本文标题:StateStore源码解析

        本文链接:https://www.haomeiwen.com/subject/drtzwftx.html