美文网首页flink
Process Function (Low-level Oper

Process Function (Low-level Oper

作者: 小C菜鸟 | 来源:发表于2018-03-07 22:44 被阅读41次

    原文地址


    The ProcessFunction

    ProcessFunction是一个低级的流处理操作,可以访问所有(非循环)流应用程序的基本组件:

    • Events(流元素)
    • state (容错, 一致性,只在KeyedStream上)
    • timers (事件时间和处理时间,只在KeyedStream上)

    可以将ProcessFunction看做是具备访问keyed状态和定时器的FlatMapFunction。它通过invoked方法处理从输入流接收到的每个事件。

    对于容错状态,ProcessFunction通过RuntimeContext访问Flink的keyed状态,类似于有状态的函数访问keyed状态。
    定时器允许应用程序基于处理时间和事件时间响应变化。每次调用函数的 processElement(...)方法获得一个Context对象,它可以访问元素的事件时间戳,和对TimerService的访问。TimerService可以用来为将来的事件/处理时间注册回调。当定时器的达到定时时间时,会调用onTimer(...) 方法。
    注意:您想访问keyed状态和定时器,则必须在键控流上应用ProcessFunction:

    stream.keyBy(...).process(new MyProcessFunction())
    

    Low-level Joins

    为了在两个输入上实现低级别的操作,应用可以使用CoProcessFunction。。该函数绑定到两个不同的输入,并为不同的输入记录分别单独调用processElement1(…)和processElement2(…)。
    实现低级别join通常遵循以下模式:

    • 为一个(或两个)输入创建一个状态对象。
    • 当从输入源收到元素时,更新状态
    • 当从输入源收到元素时,探测状态并生成join的结果。

    例如,当为客户数据保存状态时,你可能会join客户数据和财务交易。If you care about having complete and deterministic joins in the face of out-of-order events, you can use a timer to evaluate and emit the join for a trade when the watermark for the customer data stream has passed the time of that trade.

    Example

    下面的示例为每个键进行计数,并当超过一分钟(事件时间)没有更新key的计数,则下发key/计数对:

    • 数量,key和最后一次更新时间都存储在ValueState中,它由key隐式关联。
    • 对于每个记录,ProcessFunction将数量加一并且设置最后一次更新时间
    • function将会在一分钟后(事件时间)调度回调
    • 每当回调执行时,它检查回调的事件时间戳和存储的最后一次更新时间,如果匹配(也就是说,一分钟内没有更新)则下发key/count对

    注意:这个简单的示例可以通过会话窗口实现。我们在这里使用ProcessFunction来说明它提供的基本模式。

    import org.apache.flink.api.common.state.ValueState;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.ProcessFunction;
    import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
    import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
    import org.apache.flink.util.Collector;
    
    
    // the source data stream
    DataStream<Tuple2<String, String>> stream = ...;
    
    // apply the process function onto a keyed stream
    DataStream<Tuple2<String, Long>> result = stream
        .keyBy(0)
        .process(new CountWithTimeoutFunction());
    
    /**
     * The data type stored in the state
     */
    public class CountWithTimestamp {
    
        public String key;
        public long count;
        public long lastModified;
    }
    
    /**
     * The implementation of the ProcessFunction that maintains the count and timeouts
     */
    public class CountWithTimeoutFunction extends ProcessFunction<Tuple2<String, String>, Tuple2<String, Long>> {
    
        /** The state that is maintained by this process function */
        private ValueState<CountWithTimestamp> state;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
        }
    
        @Override
        public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Long>> out)
                throws Exception {
    
            // retrieve the current count
            CountWithTimestamp current = state.value();
            if (current == null) {
                current = new CountWithTimestamp();
                current.key = value.f0;
            }
    
            // update the state's count
            current.count++;
    
            // set the state's timestamp to the record's assigned event time timestamp
            current.lastModified = ctx.timestamp();
    
            // write the state back
            state.update(current);
    
            // schedule the next timer 60 seconds from the current event time
            ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
        }
    
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out)
                throws Exception {
    
            // get the state for the key that scheduled the timer
            CountWithTimestamp result = state.value();
    
            // check if this is an outdated timer or the latest timer
            if (timestamp == result.lastModified + 60000) {
                // emit the state on timeout
                out.collect(new Tuple2<String, Long>(result.key, result.count));
            }
        }
    }
    

    注意:在Flink1.4.0之前,当调用来自处理时间的定时器时,ProcessFunction.onTimer()方法设置当前处理时间为事件时间戳。这种行为非常微妙,用户可能不会注意到。嗯,这是有害的,因为处理时间是不确定的并且不与水位对齐。此外,用户实现的逻辑依赖于这个错误的时间戳很可能是无意的错误。因此我们决定修复它。在升级到1.4.0时,使用这个错误事件时间戳的Flink作业将会失败,用户应该将他们的工作调整为正确的逻辑。

    相关文章

      网友评论

        本文标题:Process Function (Low-level Oper

        本文链接:https://www.haomeiwen.com/subject/ipzsfftx.html