美文网首页
Flink-ContinuousProcessingTimeTr

Flink-ContinuousProcessingTimeTr

作者: 风筝flying | 来源:发表于2021-01-21 09:39 被阅读0次

    背景

    工作中遇到一个需求,需要按天划分窗口,并且每隔固定时间段触发一次窗口计算,时间语义为ProcessingTime。在测试过程中发现,使用ContinuousProcessingTimeTrigger会有一个问题:当窗口到达EndTime时并不会触发。

    测试

    在本地测试时使用自造数据:类别,数量,时间。然后统计每分钟的总量,每10秒钟触发一次窗口计算,并且触发窗口计算后立即清除已经计算过的所有数据,累计的总量值通过状态保存。

    public class demo2 {
        private static class DataSource extends RichParallelSourceFunction<Tuple3<String,Integer,String>>{
            private volatile boolean isRunning=true;
            @Override
            public void run(SourceContext<Tuple3<String,Integer,String>> ctx) throws Exception{
                Random random=new Random();
                while(isRunning){
                    Thread.sleep((getRuntimeContext().getIndexOfThisSubtask()+1)*1000*8);
                    String key="类别"+(char)('A'+random.nextInt(1));
                    int value=random.nextInt(10)+1;
                    String dt=dateTime.transferLongToDate("yyyy-MM-dd HH:mm:ss.SSS",System.currentTimeMillis());
                    System.out.println(String.format("Emits\t(%s,%d,%s)",key,value,dt));
                    ctx.collect(new Tuple3<>(key,value,dt));
                }
            }
            @Override
            public void cancel(){
                isRunning=false;
            }
        }
        public static void main(String[] args) throws Exception{
            StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(2);
            env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
            DataStream<Tuple3<String,Integer,String>> ds =env.addSource(new DataSource());
            SingleOutputStreamOperator<String> res=ds
                    .keyBy(
                    (KeySelector<Tuple3<String, Integer,String>, String>) in -> in.f0
            )
                    .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
                    .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
                    .evictor(CountEvictor.of(0,true))
                    .process(new ProcessWindowFunction<Tuple3<String, Integer,String>, String, String, TimeWindow>() {
                        private static final long serialVersionUID = 3091075666113786631L;
                        private ValueState<Integer> valueState;
                        @Override
                        public void open(Configuration parameters) throws Exception {
                            ValueStateDescriptor<Integer> desc=new ValueStateDescriptor<>("value_state",Integer.class);
                            valueState=getRuntimeContext().getState(desc);
                            super.open(parameters);
                        }
                        @Override
                        public void process(String tuple, Context context, Iterable<Tuple3<String, Integer,String>> iterable, Collector<String> collector) throws Exception {
                            //测试输出:窗口的每次触发时间
                            System.out.println("trigger:"+dateTime.transferLongToDate("yyyy-MM-dd HH:mm:ss.SSS",context.currentProcessingTime()));
                            int res=0;
                            if(valueState.value()!=null){
                                res=valueState.value();
                            }
                            for(Tuple3<String, Integer,String> val:iterable){
                                res+=val.f1;
                            }
                            valueState.update(res);
                            String out=dateTime.transferLongToDate("yyyy-MM-dd HH:mm:ss",context.window().getStart())+
                                    ","+tuple.toString()+":"+valueState.value();
                            collector.collect(out);
                        }
                        @Override
                        public void clear(Context context) throws Exception {
                            //状态清理时间
                            System.out.println("Start Clear:"+dateTime.transferLongToDate("yyyy-MM-dd HH:mm:ss.SSS",System.currentTimeMillis()));
                            valueState.clear();
                            super.clear(context);
                        }
                    });
            res.process(new ProcessFunction<String, Object>() {
                @Override
                public void processElement(String s, Context context, Collector<Object> collector) throws Exception {
                    System.out.println(s);
                }
            });
            env.execute();
        }
    }
    

    程序执行后的输出结果如下:


    image.png

    从上图可以看到在30/40/50这三个节点,窗口都触发了计算,并输出了正确的累计结果,但是在窗口结束的时间点并未触发计算

    问题定位

    看源码

    • 属性声明
    public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<Object, W> {
        private static final long serialVersionUID = 1L;
    
        private final long interval;
    
        /** When merging we take the lowest of all fire timestamps as the new fire timestamp. */
        private final ReducingStateDescriptor<Long> stateDesc =
                new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);
    

    interval为传入的触发时间间隔;stateDesc是定义的ReduceState状态描述符,Min()代表选择的ReduceFunction,表示选择多个时间戳中时间最小的。

    • onElement方法
        @Override
        public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
            ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
    
            timestamp = ctx.getCurrentProcessingTime();
    
            if (fireTimestamp.get() == null) {
                long start = timestamp - (timestamp % interval);
                long nextFireTimestamp = start + interval;
    
                ctx.registerProcessingTimeTimer(nextFireTimestamp);
    
                fireTimestamp.add(nextFireTimestamp);
                return TriggerResult.CONTINUE;
            }
            return TriggerResult.CONTINUE;
        }
    

    onElement方法是用来初始化窗口的第一次的触发时间。

    • onProcessingTime方法
        @Override
        public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
            ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
    
            if (fireTimestamp.get().equals(time)) {
                fireTimestamp.clear();
                fireTimestamp.add(time + interval);
                ctx.registerProcessingTimeTimer(time + interval);
                return TriggerResult.FIRE;
            }
            return TriggerResult.CONTINUE;
        }
    

    onProcessingTime方法是基于ProcessingTime的回调方法,首先从状态中获取当前的触发时间,然后跟定时器中时间进行比对,如果两者相等,则清除状态值并重新初始化,然后更新注册下一次的定时器触发时间,最后触发窗口计算。
    由onProcessingTime的代码推测,最后一次fireTimestamp和ctx.registerProcessingTimeTimer注册的时间已经超出了窗口的结束时间,导致在窗口结束时并不会触发最后一次计算。

    • 测试代码验证
      根据ContinuousProcessingTimeTrigger的源码新建一个MyContinuousProcessingTimeTrigger的类,修改其中的onProcessingTime方法:
        public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
            ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
            if (fireTimestamp.get().equals(time)) {
                fireTimestamp.clear();
                fireTimestamp.add(time + interval);
                ctx.registerProcessingTimeTimer(time + interval);
                System.out.println("nextFireTime:"+dateTime.transferLongToDate("yyyy-MM-dd HH:mm:ss.SSS",time+this.interval));
                return TriggerResult.FIRE;
            }
            return TriggerResult.CONTINUE;
        }
    

    然后再测试代码中使用MyContinuousProcessingTimeTrigger,测试输出如下:


    image.png

    前两次注册的40&50秒两个时间点都会正确触发,但17:00:00这个时间点因为此时窗口以及关闭(窗口的关闭时间:16:59:59.999),导致不会触发。
    问题的源头以及确认,那接下来就是解决这个问题了。

    解决途径

    解决这个问题,同样需要去翻源码,我们在窗口的process方法中找到如下代码:

            if (!windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
                clearAllState(triggerContext.window, evictingWindowState, mergingWindows);
            }
        private void clearAllState(
                W window,
                ListState<StreamRecord<IN>> windowState,
                MergingWindowSet<W> mergingWindows) throws Exception {
            windowState.clear();
            triggerContext.clear();
            processContext.window = window;
            processContext.clear();
            if (mergingWindows != null) {
                mergingWindows.retireWindow(window);
                mergingWindows.persist();
            }
        }
    

    可以看到,会有一个CleanupTime,当满足这个条件时,会清除窗口的信息。继续翻isCleanupTime这个方法:

        /**
         * Returns {@code true} if the given time is the cleanup time for the given window.
         */
        protected final boolean isCleanupTime(W window, long time) {
            return time == cleanupTime(window);
        }
        /**
         * Returns the cleanup time for a window, which is
         * {@code window.maxTimestamp + allowedLateness}. In
         * case this leads to a value greater than {@link Long#MAX_VALUE}
         * then a cleanup time of {@link Long#MAX_VALUE} is
         * returned.
         *
         * @param window the window whose cleanup time we are computing.
         */
        private long cleanupTime(W window) {
            if (windowAssigner.isEventTime()) {
                long cleanupTime = window.maxTimestamp() + allowedLateness;
                return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
            } else {
                return window.maxTimestamp();
            }
        }
    

    可以看到对于非EventTime的语义,cleanupTime就是窗口的结束时间window.maxTimestamp(),看到这里,解决问题的方法也就有了:
    修改MyContinuousProcessingTimeTrigger中的onProcessingTime方法:

        public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
            ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
            if(time==window.maxTimestamp()){
                return TriggerResult.FIRE;
            }
            if (fireTimestamp.get().equals(time)) {
                fireTimestamp.clear();
                fireTimestamp.add(time + interval);
                ctx.registerProcessingTimeTimer(time + interval);
                return TriggerResult.FIRE;
            }
            return TriggerResult.CONTINUE;
        }
    

    测试结果:


    image.png

    可以看到在窗口结束时会触发正确的统计结果。

    相关文章

      网友评论

          本文标题:Flink-ContinuousProcessingTimeTr

          本文链接:https://www.haomeiwen.com/subject/jwlczktx.html