美文网首页Flink专题
Flink Streaming:Process函数(低级别操作)

Flink Streaming:Process函数(低级别操作)

作者: 尼小摩 | 来源:发表于2019-02-15 18:38 被阅读90次

    ProcessFunction是一个低级的流处理操作,可以访问所有(非循环)流应用程序的基本构建块:

    • events (流元素)
    • state (容错,一致性,只在Keyed Stream)
    • timers (事件时间和处理时间, 只在keyed stream)

    ProcessFunction可以看作是一个具有键控状态(keyed state)和计时器(timers)访问权的FlatMapFunction。它通过调用输入流中接收的每个事件来处理事件。对于容错状态,ProcessFunction允许访问Flink的键状态,可以通过RuntimeContext访问,类似于其他有状态函数访问键状态的方式。

    计时器(timers)允许应用程序对处理时间(processing time)和事件时间(event time)中的变更作出响应。processElement(…)函数的每次调用都获得一个上下文(Context )对象,该对象可以访问元素的事件时间戳TimerServiceTimerService可用于为将来的事件时间/处理时间注册回调。当到达计时器的特定时间时,将调用onTimer(…)方法。在调用期间,所有状态的作用域再次限定为创建计时器的键,允许计时器操作键控状态。

    注意:如果你想访问键控状态和定时器,你必须在一个键控流上应用ProcessFunction:

    stream.keyBy(...).process(new MyProcessFunction())
    

    要实现对两个输入的低级操作,应用程序可以使用CoProcessFunction(协处理器功能)。这个函数绑定了两个不同的输入,并获取对processElement1(…)processElement2(…)的单独调用,以获取来自两个不同输入的记录。

    实现低级连接通常遵循以下模式:

    • 为一个(或两个)输入创建状态对象。
    • 从输入接收元素时更新状态。
    • 从其他输入接收元素后,探测状态并生成join的结果。

    例如,将客户数据与金融交易join起来,同时保留客户数据的状态。如果在无序事件中,当客户数据流的水印超过交易时间时,可以使用计时器计算并发出交易的连接。

    例子

    下面的示例维护每个键的计数,并在没有更新该键的情况下,每分钟过去(在事件时间中)时发出键/计数对:

    • 计数、键和最后修改时间戳存储在ValueState中,这是键的隐式作用域。
    • 对于每个记录,ProcessFunction递增计数器并设置最后修改时间戳。
    • 该函数还将回调时间安排在一分钟以后(在事件时间内)
    • 在每次回调时,会检查回调的事件时间戳和存储计数的最后一次修改时间,如果它们匹配,则发出键/计数(在该分钟内没有更新)。

    这个简单的示例可以用会话窗口实现。在这里使用ProcessFunction来说明它提供的基本模式。

    import org.apache.flink.api.common.state.ValueState
    import org.apache.flink.api.common.state.ValueStateDescriptor
    import org.apache.flink.streaming.api.functions.ProcessFunction
    import org.apache.flink.streaming.api.functions.ProcessFunction.Context
    import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext
    import org.apache.flink.util.Collector
    
    // the source data stream
    val stream: DataStream[Tuple2[String, String]] = ...
    
    // apply the process function onto a keyed stream
    val result: DataStream[Tuple2[String, Long]] = stream
      .keyBy(0)
      .process(new CountWithTimeoutFunction())
    
    /**
      * The data type stored in the state
      */
    case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
    
    /**
      * The implementation of the ProcessFunction that maintains the count and timeouts
      */
    class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String, Long)] {
    
      /** The state that is maintained by this process function */
      lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
        .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))
    
    
      override def processElement(value: (String, String), ctx: Context, out: Collector[(String, Long)]): Unit = {
        // initialize or retrieve/update the state
    
        val current: CountWithTimestamp = state.value match {
          case null =>
            CountWithTimestamp(value._1, 1, ctx.timestamp)
          case CountWithTimestamp(key, count, lastModified) =>
            CountWithTimestamp(key, count + 1, ctx.timestamp)
        }
    
        // write the state back
        state.update(current)
    
        // schedule the next timer 60 seconds from the current event time
        ctx.timerService.registerEventTimeTimer(current.lastModified + 60000)
      }
    
      override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = {
        state.value match {
          case CountWithTimestamp(key, count, lastModified) if (timestamp == lastModified + 60000) =>
            out.collect((key, count))
          case _ =>
        }
      }
    }
    

    在Flink 1.4.0之前,当从处理时计时器调用ProcessFunction.onTimer()方法时,它将当前处理时间设置为事件时间戳。这种行为非常微妙,用户可能不会注意到。它是有害的,因为处理时间戳是不确定的,并且与水印不一致。此外,用户实现的逻辑依赖于这个错误的时间戳,这很可能是无心的错误。所以我们决定修复它。当升级到1.4.0时,使用这个错误事件时间戳的Flink作业将失败,用户应该使其作业适应正确的逻辑。

    KeyedProcessFunction

    KeyedProcessFunction作为ProcessFunction的扩展,在onTimer(…)方法中提供对定时器键的访问。

    override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]): Unit = {
      var key = ctx.getCurrentKey
      // ...
    }
    

    定时器(Timers)

    这两种类型的计时器:处理时间和事件时间都由TimerService在内部维护并排队等待执行。

    TimerService根据每个键和时间戳删除重复计时器,每个键和时间戳最多有一个计时器。如果为相同的时间戳注册了多个计时器,则只调用onTimer()方法一次。

    注意:Flink同步调用onTimer()processElement()方法。因此,用户不必担心状态的并发修改。

    故障容错(Fault Tolerance)

    计时器与应用程序的状态都具有容错和检查点功能。在故障恢复或从保存点启动应用程序时,计时器将被恢复。

    注意:在恢复之前应该立即触发的检查点处理时间计时器。这可能发生在应用程序从故障中恢复或从保存点启动时。

    注意:计时器是异步检查点,除了RocksDB后端/增量快照/基于堆的计时器的组合之外(将使用FLINK-10026解析)。注意,大量计时器会增加时间检查点,因为计时器是检查点状态的一部分。有关如何减少计时器数量的建议,请参阅“计时器合并(Timer Coalescing)”一节。

    计时器合并(Timer Coalescing)

    由于Flink对每个键和时间戳只维护一个计时器,可以通过降低计时器的精度来合并计时器,从而减少计时器的数量。

    对于1秒的定时器精度(事件或处理时间),你可以把目标时间四舍五入到整秒。计时器的启动时间最多提前1秒,但不会晚于要求的毫秒精度。因此,每个键和秒最多有一个计时器。

    val coalescedTime = ((ctx.timestamp + timeout) / 1000) * 1000
    ctx.timerService.registerProcessingTimeTimer(coalescedTime)
    

    由于事件时间定时器只触发带有水印的情况,也可以使用当前的水印调度并结合这些定时器与下一个水印:

    val coalescedTime = ctx.timerService.currentWatermark + 1
    ctx.timerService.registerEventTimeTimer(coalescedTime)
    

    计时器也可以停止和删除如下:

    停止处理时间(processing-time)计时器:
    val timestampOfTimerToStop = ...
    ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop)
    
    停止事件时间(event-time)计时器:
    val timestampOfTimerToStop = ...
    ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop)
    

    如果没有注册具有给定时间戳的计时器,则停止计时器无效。

    相关文章

      网友评论

        本文标题:Flink Streaming:Process函数(低级别操作)

        本文链接:https://www.haomeiwen.com/subject/jcbmeqtx.html