美文网首页Flink学习指南Flink专题
Flink Streaming状态处理(Working with

Flink Streaming状态处理(Working with

作者: 尼小摩 | 来源:发表于2019-02-04 17:13 被阅读137次

    键State和操作State (Keyed State and Operator State)

    Flink中有两种基本的状态:键状态(Keyed State)和操作状态( Operator State)。

    键状态(Keyed State)

    键状态(Keyed State)是与键相关的,只能在KeyedStream的函数和操作符中使用。

    可以把键状态(Keyed State)理解成已经分区的操作状态(Operator State)或者分片,每个键只有一个状态分区。每个键状态(Keyed State)在逻辑上绑定到<parallel-operator-instance, key>的唯一组合,由于每个键是键操作符的一个并行实例,可以将其简单地理解为<operator, key>。

    键状态(Keyed State)进一步组合成所谓的键组(Key Groups)键组(Key Groups)是Flink重新分配键状态(Keyed State)的原子单元;键组(Key Groups)的数量与定义的最大并行度完全相同。在执行过程中,每个键操作符的并行实例都使用一个或多个键组的键。

    操作符状态(Operator State)

    对于操作符状态(Operator State),每个操作符状态(Operator State)都绑定一个并行操作符实例。 Kafka Connector是在Flink中使用操作符状态(Operator State)一个很好的例子。Kafka消费者的每个并行实例都维护一个topic分区和偏移量(offset)的映射作为其操作符状态(Operator State)。

    当并行度发生改变时,操作符状态(Operator State)接口支持在并行操作符实例之间重新分布状态。

    原始和管理状态(Raw and Managed State)

    键状态(Keyed State)和操作符状态(Operator State)有两种形式:管理状态和原始状态。

    管理状态(Managed State)表示在Flink运行时约束的数据结构,比如内部的哈希表或者RocksDB。例如:ValueState, ListState。Flink在运行时对状态进行编码,并将其写入检查点(checkpoint)。

    原始状态(Raw State)是状态操作符保存在自己的数据结构中。当触发检查点时,它们只将字节序列写入检查点。Flink不知道状态的数据结构,只看到原始字节。

    所有datastream函数都可以使用管理状态,但是原始状态接口只能在实现操作符时使用。 建议使用管理状态(而不是原始状态),因为使用管理状态,Flink能够在并行度改变时自动重新分发状态,并且更好的内存管理。

    注意:如果您的管理状态需要自定义序列化逻辑,请参阅 corresponding guide ,以确保未来的兼容性。Flink的默认序列化不需要特殊处理。

    使用管理键状态(Using Managed Keyed State)

    管理键状态接口提供对不同类型状态的访问,这些状态的作用域都是当前输入元素的键。这意味着这种类型的状态只能在KeyedStream上使用,可以通过stream.keyBy(…)创建。

    我们首先看一下Flink中不同类型的可用状态,然后了解如何在程序中使用它们。可用状态为:

    • ValueState<T>: 保存一个可以修改和获取的值(如前所述,该值的作用域为input元素的key,因此操作的每个键可能都有一个值)。修改值可以使用update(T),获取值可以使用T value()

    • ListState<T>: 存储一个元素列表。可以追加元素,并且可以从当前存储的所有元素中获取一个可迭代(Iterable)的元素。添加元素使用add(T)addAll(List<T>),获取元素可以使用Iterable<T> get()。还可以使用update(list <T>)修改并覆盖现有列表。

    • ReducingState<T>: 存储一个值,该值表示添加到该状态所有值的聚合。类似于ListState,添加元素使用add(T)通过ReduceFunction聚合。

    -AggregatingState<IN, OUT>: 存储一个值,该值表示添加到该状态的所有值的聚合。 与ReducingState相反,聚合类型添加到该状态的元素可以有不同类型。与ListState相同,但是使用add(IN)添加元素使用指定的AggregateFunction进行聚合。

    • FoldingState<T, ACC>: 存储一个值,该值表示添加到该状态的所有值的聚合。与ReducingState相反,聚合类型添加到该状态的元素可以有不同类型。类似于ListState,但是使用add(T)添加元素使用指定的FoldFunction被折叠成一个聚合。
    • MapState<UK, UV>: 保存了一个映射列表。可以在状态中添加键-值对,并可以从当前存储的所有map中获取一个可迭代的元素。使用put(UK, UV)putAll(Map<UK, UV>)添加映射。获取值可以使用get(UK)。获取mappings, keys和values的可迭代数据可以分别使用entry()、keys()和values()。

    所有类型的状态都有一个clear()方法,用于清除当前活动键(即输入元素的键)的状态。

    注意,FoldingStateFoldingStateDescriptor在Flink 1.4中已经被弃用,将来会被完全删除。请使用AggregatingStateAggregatingStateDescriptor

    注意,第一,这些状态对象仅可以与状态进行交互。状态不仅可以存储在内部,也可以存储在磁盘或其他地方。第二,从状态获得的值取决于输入元素的键。因此,如果键不同,那么在一次函数调用中获得的值可能与另一次调用中的值不同。

    要获得状态句柄,必须创建StateDescriptor。它保存了状态的名称(在下面可以看到可以创建多个状态,必须具有惟一的名称,以便可以引用它们),状态保存的值的类型,可能还有用户指定的函数,如ReduceFunction。根据要检索的状态类型,可以创建ValueStateDescriptor、ListStateDescriptor、ReducingStateDescriptor、FoldingStateDescriptor或MapStateDescriptor

    使用RuntimeContext访问状态,因此只能在rich函数中访问。有关这方面的信息,请参阅这里,稍后将看到一个示例。在RichFunction中可用的RuntimeContext有以下几种访问状态的方法:

    • ValueState<T> getState(ValueStateDescriptor<T>)
    • ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
    • ListState<T> getListState(ListStateDescriptor<T>)
    • AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
    • FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
    • MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

    这是一个FlatMapFunction的例子,展示了所有元素如何组合在一起:

    class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
    
      private var sum: ValueState[(Long, Long)] = _
    
      override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
    
        // access the state value
        val tmpCurrentSum = sum.value
    
        // If it hasn't been used before, it will be null
        val currentSum = if (tmpCurrentSum != null) {
          tmpCurrentSum
        } else {
          (0L, 0L)
        }
    
        // update the count
        val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
    
        // update the state
        sum.update(newSum)
    
        // if the count reaches 2, emit the average and clear the state
        if (newSum._1 >= 2) {
          out.collect((input._1, newSum._2 / newSum._1))
          sum.clear()
        }
      }
    
      override def open(parameters: Configuration): Unit = {
        sum = getRuntimeContext.getState(
          new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
        )
      }
    }
    
    
    object ExampleCountWindowAverage extends App {
      val env = StreamExecutionEnvironment.getExecutionEnvironment
    
      env.fromCollection(List(
        (1L, 3L),
        (1L, 5L),
        (1L, 7L),
        (1L, 4L),
        (1L, 2L)
      )).keyBy(_._1)
        .flatMap(new CountWindowAverage())
        .print()
      // the printed output will be (1,4) and (1,5)
    
      env.execute("ExampleManagedState")
    }
    
    

    这个例子实现了一个简单的计数窗口。我们按第一个字段键元组(在本例中所有的键都是1)。该函数在ValueState中存储计数和一个正在运行的和。一旦计数达到2,它将发出平均值并清除状态,以便我们从0开始。注意,如果在第一个字段中有不同值的元组,那么对于每个不同的输入键,将保持不同的状态值。

    状态生存时间(State Time-To-Live (TTL))

    可以将生存时间(TTL)分配给任何类型的键状态。如果已配置TTL且状态值已过期,将以最佳方式清理存储值,下面将对此进行更详细的讨论。

    所有状态集合类型每个条目都支持TTLs。这意味着列表元素和映射项可以独立过期。

    为了使用状态TTL,首先必须构建一个StateTtlConfig配置对象。然后TTL可以通过传递配置在任何状态描述符中启用:

    import org.apache.flink.api.common.state.StateTtlConfig
    import org.apache.flink.api.common.state.ValueStateDescriptor
    import org.apache.flink.api.common.time.Time
    
    val ttlConfig = StateTtlConfig
        .newBuilder(Time.seconds(1))
        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
        .build
        
    val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
    stateDescriptor.enableTimeToLive(ttlConfig)
    

    配置有几个选项需要考虑:
    newBuilder方法的第一个参数是必选的,用于设置生存时长的值。
    更新类型在状态TTL刷新时配置(默认情况下为OnCreateAndWrite):

    • StateTtlConfig.UpdateType.OnCreateAndWrite - 只有在创建和写入时访问
    • StateTtlConfig.UpdateType.OnReadAndWrite - 在读取时访问

    状态可见性配置如果未清除过期值,则在读取访问时是否返回过期值(默认情况下,NeverReturnExpired):

    • StateTtlConfig.StateVisibility.NeverReturnExpired - 过期的值永远不会返回
    • StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 如果仍然可用返回

    NeverReturnExpired的情况下,过期状态的行为就像不再存在一样,即使仍然需要删除。该选项对于数据在TTL之后对于不可读访问的用例非常有用,例如,处理敏感资料的应用。

    另一个选项ReturnExpiredIfNotCleanedUp允许在清理之前返回过期状态。

    说明

    • 状态后端存储最后一次修改的时间戳和值,这意味着启用该特性会增加状态存储的消耗。堆状态后端(Heap state backend)使用Java对象的状态对象引用和内存中的原始long值。RocksDB状态后端为每个存储值、列表项或映射项添加8字节。

    • 目前只支持处理时间的TTLs。

    • 试图恢复状态(以前在没有TTL的情况下配置的状态),使用启用TTL的描述符或反之,将导致兼容性失败和statmigrationexception异常。
    • 只有当值序列化能够处理空值时,TTL的映射状态当前才支持空值。如果序列化器不支持null值,可以使用NullableSerializer对其进行包装,代价是在序列化形式中增加一个字节。

    过期状态的清理(Cleanup of Expired State)

    目前,过期值只有在显式读取时才会被删除,例如,通过调用valuestat .value()。

    **注意: **这意味着默认情况下,如果未读取过期状态就不会删除它,这可能导致状态不断增长。这可能会在未来的版本中改变。

    此外,可以在获取完整状态快照时激活清理,将减少其大小。当前实现不会清理本地状态,但从上一个快照恢复时,它不会包含已删除的过期状态。可以在StateTtlConfig中配置:

    val ttlConfig = StateTtlConfig
        .newBuilder(Time.seconds(1))
        .cleanupFullSnapshot
        .build
    

    此选项不适用于RocksDB状态后端中的增量检查点。

    以后还会添加更多的策略在后台自动清理过期状态。

    在Scala DataStream API中声明(State in the Scala DataStream API)

    除了上面描述的接口之外,Scala API还为KeyedStream上具有单个ValueState的有状态map()或flatMap()函数提供了快捷方式。用户函数在一个选项中获取ValueState的当前值,并且必须返回一个更新后的值,该值将用于更新状态。

    val stream: DataStream[(String, Int)] = ...
    
    val counts: DataStream[(String, Int)] = stream
      .keyBy(_._1)
      .mapWithState((in: (String, Int), count: Option[Int]) =>
        count match {
          case Some(c) => ( (in._1, c), Some(c + in._2) )
          case None => ( (in._1, 0), Some(in._2) )
        })
    
    

    使用管理操作符状态(Using Managed Operator State)

    要使用托管理操作符状态,有状态函数可以实现更通用的CheckpointedFunction接口,也可以实现listcheckpoint <T extends Serializable>接口。

    CheckpointedFunction

    CheckpointedFunction接口通过不同的重新分配方案提供对非键状态的访问。它需要实现两种方法:

    void snapshotState(FunctionSnapshotContext context) throws Exception;
    
    void initializeState(FunctionInitializationContext context) throws Exception;
    

    每当必须执行检查点时,都会调用snapshotState()。对应的initializeState()在每次初始化用户定义的函数时调用,可以是在函数第一次初始化时调用,也可以是在函数实际从较早的检查点恢复时调用。因此,initializeState()不仅是初始化不同类型状态的地方,也包括状态恢复逻辑。

    目前,支持List样式的管理操作符状态。状态是一个可序列化对象的列表,彼此独立,因此在重新扫描时有资格进行重新分发。换句话说,这些对象是可以重新分布非键状态的最佳粒度。根据状态访问方法的不同,定义了以下重分发方案:

    • **Even-split redistribution: ** 每个操作符返回一个状态元素列表。整个状态在逻辑上是串联所有列表。在恢复/重新分发时,该列表被平均地分成尽可能多的并行操作符子列表。每个操作符获取一个子列表,该子列表可以是空的,也可以包含一个或多个元素。例如,如果并行度为1,则操作符的检查点状态包含元素element1和element2。当并行度增加到2时,element1可能会出现在运算符实例0中,而element2会出现在运算符实例1中。

    • Union redistribution: 每个操作符返回一个状态元素列表。整个状态在逻辑上是串联所有列表。在恢复/重新分发时,每个操作符都获得状态元素的完整列表。

    下面是一个有状态SinkFunction的例子,它使用CheckpointedFunction将元素发送到外部之前缓冲它们。它演示了基本的均分重分发列表状态:

    class BufferingSink(threshold: Int = 0)
      extends SinkFunction[(String, Int)]
        with CheckpointedFunction {
    
      @transient
      private var checkpointedState: ListState[(String, Int)] = _
    
      private val bufferedElements = ListBuffer[(String, Int)]()
    
      override def invoke(value: (String, Int)): Unit = {
        bufferedElements += value
        if (bufferedElements.size == threshold) {
          for (element <- bufferedElements) {
            // send it to the sink
          }
          bufferedElements.clear()
        }
      }
    
      override def snapshotState(context: FunctionSnapshotContext): Unit = {
        checkpointedState.clear()
        for (element <- bufferedElements) {
          checkpointedState.add(element)
        }
      }
    
      override def initializeState(context: FunctionInitializationContext): Unit = {
        val descriptor = new ListStateDescriptor[(String, Int)](
          "buffered-elements",
          TypeInformation.of(new TypeHint[(String, Int)]() {})
        )
    
        checkpointedState = context.getOperatorStateStore.getListState(descriptor)
    
        if(context.isRestored) {
          for(element <- checkpointedState.get()) {
            bufferedElements += element
          }
        }
      }
    
    }
    

    initializeState方法以FunctionInitializationContext作为参数。用于初始化非键状态“containers”。这是ListState类型的容器,其中非键状态对象将在检查点上存储。

    注意状态是如何初始化的,类似于键状态,使用一个StateDescriptor,其中包含状态名和关于状态持有的值的类型的信息:

    val descriptor = new ListStateDescriptor[(String, Long)](
        "buffered-elements",
        TypeInformation.of(new TypeHint[(String, Long)]() {})
    )
    
    checkpointedState = context.getOperatorStateStore.getListState(descriptor)
    

    状态访问方法的命名约定包含其重新分布模式及其状态结构。例如,要在还原时使用具有union重分发方案的list state,使用getUnionListState(descriptor)访问状态。如果方法名不包含重分发模式,例如getListState(descriptor),它仅仅意味着将使用基本的均分重分发模式。

    在初始化容器之后,我们使用上下文的isrestore()方法检查失败后是否正在恢复。如果是true,即正在恢复,则应用恢复逻辑。

    如修改后的BufferingSink代码所示,状态初始化期间恢复的数据保存在一个ListState变量中,以备将来在snapshotState()中使用。在那里,ListState将清除前一个检查点包含的所有对象,然后被我们想要检查的新选项填满。

    另外,键状态也可以在initializeState()方法中初始化。可以使用FunctionInitializationContext来完成。

    ListCheckpointed

    ListCheckpointed接口是CheckpointedFunction的一个更有限的变体,它只支持列表样式的状态,在恢复时使用均分重分发方案。它还需要实现两种方法:

    List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
    
    void restoreState(List<T> state) throws Exception;
    

    snapshotState()上,操作应该向检查点返回一个对象列表,而restoreState()必须在恢复时处理这个列表。如果状态不可重分区,则始终可以在snapshotState()中返回Collections.singletonList(MY_STATE)

    有状态的源函数(Stateful Source Functions)

    与其他操作符相比,有状态源需要更多的关注。为了更新状态和输出集合的原子性(用于故障/恢复上的精确一次语义),用户需要从源上下文获取一个锁。

    Scala
    class CounterSource
           extends RichParallelSourceFunction[Long]
           with ListCheckpointed[Long] {
    
      @volatile
      private var isRunning = true
    
      private var offset = 0L
    
      override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
        val lock = ctx.getCheckpointLock
    
        while (isRunning) {
          // output and state update are atomic
          lock.synchronized({
            ctx.collect(offset)
    
            offset += 1
          })
        }
      }
    
      override def cancel(): Unit = isRunning = false
    
      override def restoreState(state: util.List[Long]): Unit =
        for (s <- state) {
          offset = s
        }
    
      override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] =
        Collections.singletonList(offset)
    
    }
    

    当Flink完全确认检查点时,一些操作可能需要这些信息来与外部世界进行通信。在本例中,请参见org.apache.flink.runtime.state.CheckpointListener接口。

    相关文章

      网友评论

        本文标题:Flink Streaming状态处理(Working with

        本文链接:https://www.haomeiwen.com/subject/jpnpsqtx.html