美文网首页实时数据相关
flink ProcessFunction简介,实战,流程分析

flink ProcessFunction简介,实战,流程分析

作者: 岳过山丘 | 来源:发表于2019-04-15 19:36 被阅读10次

    1.ProcessFunction介绍

    1.1 ProcessFunction基本构成

    ProcessFunction是一个低级流处理操作,可以访问所有(非循环)流应用程序的基本构建块:

    • 事件(流元素)
    • state(容错,一致,仅在keyed stream上)
    • 定时器(事件时间和处理时间,仅限keyed stream)
    image.png

    ProcessFunction可以被看作是一个可以访问keyed state和定时器的FlatMapFunction。它可以对输入流中接收的每个事件进行调用处理。

    状态

    对于容错的state,ProcessFunction可以访问Flink的keyed state,可以通过其访问 RuntimeContext,类似于其他有状态函数访问keyed state的方式。

    time

    定时器允许应用程序对processing timeevent time.
    的变化作出反应。每次调用该函数processElement(...)都会获得一个Context对象,该对象可以访问元素的事件时间戳和TimerService。的TimerService可用于注册为将来事件- /处理-时刻回调。

    触发时间

    达到计时器的特定时间时,将onTimer(...)调用该方法。在event time上注册时间为T的timer,一旦watermark大于或等于T,就会触发onTimer。在该调用期间,所有状态再次限定为创建计时器的key的状态,允许计时器操纵keyed state

    注意如果要访问键控状态和计时器,则必须应用ProcessFunction键控流:

    stream.keyBy(...).process(new MyProcessFunction())
    

    2.实战 机器宕机告警

    2.1 需求

    filebeat采集机器数据到kafka,对于某台机器2分钟内没有新数据流入kafka,则判定这台机器宕机。

    2.2 Main函数

      StreamExecutionEnvironment env = ...; //
         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //使用eventTime
    
          DataSource<MetricEvent> source ;//读取kafka
          SingleOutputStreamOperator<SimplifyMetricEvent> data =  source.flatMap(new FirstFilterFunction()).assignTimestampsAndWatermarks(new MetricLagWatermarkExtractor()); 
         int timeLimit=2*60*1000;
       SingleOutputStreamOperator<SimplifyMetricEvent> 
       data.keyBy(SimplifyMetricEvent::getIp).process(new OutageFunction(timeLimit)).print();
    
    

    注意:MetricLagWatermarkExtractor 一定要对异常数据做处理,比如时间戳大于当前时间,这个时间戳就不能作为watermark,否则后续onTimer方法的调用时间就不准确。

    2.3 ProcessFunction

    import org.apache.flink.api.common.state.ValueState;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.api.common.typeinfo.TypeHint;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    import org.apache.flink.util.Collector;
    
    import java.util.ArrayList;
    import java.util.List;
    
    
    public class OutageFunction extends KeyedProcessFunction<String, SimplifyMetricEvent, SimplifyMetricEvent> {
       private ValueState<SimplifyMetricEvent> state;
       private int delay;
       @Override
       public void open(Configuration configuration) {
           TypeInformation<SimplifyMetricEvent> info = TypeInformation.of(new TypeHint<SimplifyMetricEvent>() {
           });
           TypeInformation<Boolean> resolveInfo = TypeInformation.of(new TypeHint<Boolean>() {
           });
           state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", info));
       }
    
       public OutageFunction(int delay) {
           this.delay = delay;
       }
    
    
       @Override
       public void processElement(SimplifyMetricEvent simplifyMetricEvent, Context ctx, Collector<SimplifyMetricEvent> collector) throws Exception {
           SimplifyMetricEvent current = state.value();
    
           if (current == null) {
               current = new SimplifyMetricEvent(simplifyMetricEvent.getClusterName(), simplifyMetricEvent.getHostIp(),
                       simplifyMetricEvent.getTimestamp(), simplifyMetricEvent.getResolve(), System.currentTimeMillis());
           }
           current.setTimestamp(simplifyMetricEvent.getTimestamp());
           current.setSystemTimestamp(System.currentTimeMillis());
           state.update(current);
           ctx.timerService().registerEventTimeTimer(current.getSystemTimestamp() + delay);
    
       }
    
       @Override
       public void onTimer(long timestamp, OnTimerContext ctx, Collector<SimplifyMetricEvent> out) throws Exception {
           // get the state for the key that scheduled the timer
           //获取计划定时器的key的状态
           SimplifyMetricEvent result = state.value();
           // 检查是否是过时的定时器或最新的定时器
           if (result != null && timestamp >= result.getSystemTimestamp() + delay) {
               out.collect(result);      //宕机发生,往下游发送事件
               ctx.timerService().registerEventTimeTimer(timestamp + delay);//注册下一个宕机事件
               result.setSystemTimestamp(timestamp); 
               state.update(result);
           }
       }
    }
    
    

    ctx.timerService().registerEventTimeTimer(current.getSystemTimestamp() + delay);就是定义一个事件触发器,触发的时间是current.getSystemTimestamp() + delay。到达该时间则调用
    onTimer(long timestamp, OnTimerContext ctx, Collector<SimplifyMetricEvent> out)

    3.流程解析

    1

    data:SingleOutputStreamOperator 调用keyBy形成 KeyedStream,调用process

    @Internal
        public <R> SingleOutputStreamOperator<R> process(
                KeyedProcessFunction<KEY, T, R> keyedProcessFunction,
                TypeInformation<R> outputType) {
    
            KeyedProcessOperator<KEY, T, R> operator = new KeyedProcessOperator<>(clean(keyedProcessFunction));
            return transform("KeyedProcess", outputType, operator);
        }
    

    keyedProcessFunction 就是上边我们自定义的OutageFunction
    这里生成的 KeyedProcessOperator

    2

    public class KeyedProcessOperator<K, IN, OUT>
            extends AbstractUdfStreamOperator<OUT, KeyedProcessFunction<K, IN, OUT>>
            implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> {
    

    KeyedProcessOperator实现了 Triggerable

            @Override  //实现Triggerable
        public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
            collector.setAbsoluteTimestamp(timer.getTimestamp());
            invokeUserFunction(TimeDomain.EVENT_TIME, timer);
        }
    
        @Override //实现Triggerable
        public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
            collector.eraseTimestamp();
            invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);
        }
    
        @Override
        public void processElement(StreamRecord<IN> element) throws Exception {
            collector.setTimestamp(element);
            context.element = element;
            userFunction.processElement(element.getValue(), context, collector);
            context.element = null;
        }
    
        private void invokeUserFunction(
                TimeDomain timeDomain,
                InternalTimer<K, VoidNamespace> timer) throws Exception {
            onTimerContext.timeDomain = timeDomain;
            onTimerContext.timer = timer;
            userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
            onTimerContext.timeDomain = null;
            onTimerContext.timer = null;
        }
    

    userFunction.processElement(element.getValue(), context, collector);
    userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
    userFunction就是我们上面的OutageFunction
    这里看到在onEventTime或者onProcessingTime方法调用的时候才会调用userFunction.onTimer。那么 onEventTime 什么时候触发呢?

    3.以onEventTime为例

    image.png
    进入到InternalTimerServiceImpl
    public void advanceWatermark(long time) throws Exception {
            currentWatermark = time;
    
            InternalTimer<K, N> timer;
    
            while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
                eventTimeTimersQueue.poll();
                keyContext.setCurrentKey(timer.getKey());
                triggerTarget.onEventTime(timer);
            }
        }
    

    也就是说InternalTimerServiceImpl调用advanceWatermark时我们的onEventTime方法才调用。而advanceWatermark方法的入参time是当前operator的watermark所代表的时间。那么什么时候调用advanceWatermark呢?这个等下再看。
    这个方法里面的eventTimeTimersQueue

           /**
         * Event time timers that are currently in-flight.
         */
        private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue;
    

    当我们调用时ctx.timerService().registerEventTimeTimer(current.getSystemTimestamp() + delay);
    就是调用

        @Override
        public void registerEventTimeTimer(N namespace, long time) {
            eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
        }
    

    向里eventTimeTimersQueue存储TimerHeapInternalTimer(包含key,timestamp等)。
    当调用advanceWatermark时,更新currentWatermark,从eventTimeTimersQueue里peek出timer,判断当前watermark的时间是否大于timer里的时间,若大于,则从队列里弹出这个timer调用 triggerTarget.onEventTime(timer)
    也就是调用 KeyedProcessOperator.onEventTime,最终调用到里我们自定义OutageFunctiononTimer方法。

    3.总结一下

    如果我们的env用的是 TimeCharacteristic.EventTime,那么我们自定义的 KeyedProcessFunctiononTimer触发时间是这个算子的watermark时间大于 ctx.timerService().registerEventTimeTimer(current.getSystemTimestamp() + delay)注册的时间时才会触发。
    注意因为这里的触发时间和watermark强相关,在上游算子assignTimestampsAndWatermarks时一定正确处理wartermark的值。

    todo:

    什么时候调用InternalTimerServiceImpladvanceWatermark呢?

    相关文章

      网友评论

        本文标题:flink ProcessFunction简介,实战,流程分析

        本文链接:https://www.haomeiwen.com/subject/izfbbqtx.html