美文网首页
Flink 状态

Flink 状态

作者: 天之見證 | 来源:发表于2019-07-08 23:15 被阅读0次
  1. state创建
  2. state清理 (TTL/clear)
  3. state存储 (分布式)
  4. state的恢复

flink中对状态的分类有以下2种:

  1. Keyed State (跟key关联在一起,作用于KeyedStream)
  2. Operator State (和并行度有关)

flink中状态的存储有2个格式:

  1. Managed (flink内置的数据结构存储)
  2. Raw (原始数据本身的数据结构,但在checkpoint的时候,转化成的byte数组,flink认不出原来的类型)

Key Groups 的个数与定义的最大并行度相同

1. Keyed State

1.1 Managed Keyed State

状态种类 描述 API
ValueState<T> 单个值 update, value
ListState<T> 一组值 add, addAll, get, update
ReducingState<T> 单个值 (代表加入这个状态中所有的数据的一个聚合) add<T>
AggregatingState<IN,OUT> 单个值 (类似ReducingState,但支持聚合输出为不同的类型) add(IN)
MapState<UK, UV> 一组kv结构 put,putAll, get, entries, key, values

所有的state都支持clear 操作

State is accessed using the RuntimeContext, so it is only possible in rich functions

1.2 使用state实现窗口操作

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala.createTypeInformation

class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {

  private var sum: ValueState[(Long, Long)] = _

  override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {

    // access the state value
    val tmpCurrentSum = sum.value

    // If it hasn't been used before, it will be null
    val currentSum = if (tmpCurrentSum != null) {
      tmpCurrentSum
    } else {
      (0L, 0L)
    }

    // update the count
    val newSum = (currentSum._1 + 1, currentSum._2 + input._2)

    // update the state
    sum.update(newSum)

    // if the count reaches 2, emit the average and clear the state
    if (newSum._1 >= 2) {
      out.collect((input._1, newSum._2 / newSum._1))
      sum.clear()
    }
  }

  override def open(parameters: Configuration): Unit = {
    sum = getRuntimeContext.getState(
      new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
    )
  }
}

object ExampleCountWindowAverage {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.fromCollection(List(
      (1L, 3L),
      (1L, 5L),
      (1L, 7L),
      (1L, 4L),
      (1L, 2L)
    )).keyBy(_._1)
      .flatMap(new CountWindowAverage())
      .print()
    // the printed output will be (1,4) and (1,5)

    env.execute("ExampleManagedState")
  }
}

object ExampleCountWindowAverage extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.fromCollection(List(
    (1L, 3L),
    (1L, 5L),
    (1L, 7L),
    (1L, 4L),
    (1L, 2L)
  )).keyBy(_._1)
    .flatMap(new CountWindowAverage())
    .print()
  // the printed output will be (1,4) and (1,5)
  env.execute("ExampleManagedState")
}

1.3 state TTL

  1. 状态可以根据TTL的设置,自动清理掉
  2. TTL的设置可以细粒度到每个元素(一组值)

数据的清理是在读数据的时候做的

清理策略 描述
cleanupFullSnapshot 在进行全量状态的快照去掉过期的数据
cleanupInBackground
cleanupInRocksdbCompactFilter

1.4 state在DataStream API中的使用

stateless api state api Managed Keyed State
map mapWithState ValueState
flatMap flatMapWithState ValueState
val stream: DataStream[(String, Int)] = ...

val counts: DataStream[(String, Int)] = stream
  .keyBy(_._1)
  .mapWithState((in: (String, Int), count: Option[Int]) =>
    count match {
      case Some(c) => ( (in._1, c), Some(c + in._2) )
      case None => ( (in._1, 0), Some(in._2) )
    })

2. Operator State

2.0 状态的split和union

Even-split redistribution:

  1. Each operator returns a List of state elements
  2. The whole state is logically a concatenation of all lists
  3. On restore/redistribution, the list is evenly divided into as many sublists as there are parallel operators

Union redistribution:

  1. Each operator returns a List of state elements
  2. The whole state is logically a concatenation of all lists
  3. On restore/redistribution, each operator gets the complete list of state elements

2.1 添加checkpoint的hook

  1. Whenever a checkpoint has to be performed, snapshotState() is called

  2. The counterpart, initializeState(), is called every time the user-defined function is initialized, be that when the function is first initialized or be that when the function is actually recovering from an earlier checkpoint

initializeState():

  1. when the parallel function instance is created during distributed
  2. but also where state recovery logic is included.

2.1.1 以下示例展示了在sink中添加checkpoint的hook的方法:

import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction

import scala.collection.mutable.ListBuffer

class BufferingSink(threshold: Int = 0)
  extends SinkFunction[(String, Int)]
    with CheckpointedFunction{
  @transient
  private var checkpointedState: ListState[(String, Int)] = _

  private val bufferedElements = ListBuffer[(String, Int)]()


  override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {
    bufferedElements += value
    if (bufferedElements.size == threshold) {
      for (element <- bufferedElements) {
        // send it to the sink
      }
      bufferedElements.clear()
    }
  }

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    checkpointedState.clear()
    for (element <- bufferedElements) {
      checkpointedState.add(element)
    }
  }

  override def initializeState(context: FunctionInitializationContext): Unit = {
    val descriptor = new ListStateDescriptor[(String, Int)](
      "buffered-elements",
      TypeInformation.of(new TypeHint[(String, Int)]() {})
    )
    checkpointedState = context.getOperatorStateStore.getListState(descriptor)

    if(context.isRestored) {  // 此处在进行数据恢复的时候会用到
      for(element <- checkpointedState.get()) {
        bufferedElements += element
      }
    }
  }
}

2.1.2 以下示例展示了在source中添加checkpoint的hook的方法:

当更新state或者发送数据的时候,需要获取checkpoint lock

import java.util

import org.apache.flink.streaming.api.checkpoint.ListCheckpointed
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}

class CounterSource
  extends RichParallelSourceFunction[Long]
    with ListCheckpointed[Long]{

  @volatile
  private var isRunning = true

  private var offset = 0L

  override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
    val lock = ctx.getCheckpointLock
    while (isRunning) {
      lock.synchronized({
        ctx.collect(offset)
        offset += 1
      })
    }
  }

  override def cancel(): Unit = isRunning = false

  override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] = {
    util.Collections.singletonList(offset)
  }

  override def restoreState(state: util.List[Long]): Unit = {
    for (s <- state) {
      offset = s
    }
  }
}

ref:

  1. https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html

相关文章

  • 译:Flink---状态

    Flink 1.7 Google翻译 键控状态和操作状态 Flink中有两种基本的状态类型:键控状态和运算符状态 ...

  • 【Flink】Flink 状态管理

    [TOC] 一、前言 有状态的计算是流处理框架要实现的重要功能,因为稍复杂的流处理场景都需要记录状态,然后在新流入...

  • Flink 状态

    state创建 state清理 (TTL/clear) state存储 (分布式) state的恢复 flink中...

  • flink状态

    Flink从外部数据源持续接收数据,每接收一条数据就会触发相应的计算操作。当Flink对数据进行聚合操作时,不可能...

  • Flink状态

    key状态和算子状态 key状态 key状态总是与key有关,只能被用于keyedStream类型的函数与算子。你...

  • Flink--Checkpoint机制原理

    [TOC] 如何理解flink中state(状态) state泛指 state泛指:flink中有状态函数和运算符...

  • 【flink】flink状态后端配置-设置State Backe

    一、前言 flink提供不同的状态后端(state backends)来区分状态的存储方式和存储位置。flink状...

  • 关于 Flink checkpoint,都在这里(一)

    Flink checkpoint 一直以来都是 Flink 引以为豪的机制之一,它为 Flink 状态流处理保驾护...

  • 【Flink 精选】如何优化大状态作业?

    本文从监控、参数调整、资源规划等方面,优化 Flink 大状态作业的常见问题。 背景:Flink 有状态作业能可靠...

  • Flink基础系列26-Flink状态管理

    一. 状态概述: Flink中的状态: 算子状态(Operator State) 键控状态(Keyed State...

网友评论

      本文标题:Flink 状态

      本文链接:https://www.haomeiwen.com/subject/tabzhctx.html