美文网首页Flink源码解析
一文搞懂 Flink Timer

一文搞懂 Flink Timer

作者: shengjk1 | 来源:发表于2020-08-27 20:20 被阅读0次

    什么是 Timer

    顾名思义就是 Flink 内部的定时器,与 key 和 timestamp 相关,相同的 key 和 timestamp 只有一个与之对应的 timer。timer 本质上是通过 ScheduledThreadPoolExecutor.schedule 来实现的

    Flink synchronizes invocations of onTimer() and processElement(). Hence, users do not have to worry about concurrent modification of state.

    真实的事件样例,告诉我 只要 key 不同是不会并发修改的,如果一直都是完全相同的 key ,比如我的 key 一直都是 1,完全是会并发修改的。key 的重复性越高,并发修改的可能性就越大,除非 rocksdb 自身保证同一个每个 key ( rocksdb key ) 的事务。

    Timer 的使用

    public class KeyedProcessFunctionImp extends KeyedProcessFunction<String, Tuple2<String, Object>, Tuple2<String, String>> {
            @Override
        public void open(Configuration parameters) throws Exception {
        }
        
        @Override
        public void close() throws Exception {
        }
        
        @Override
        public void processElement(Tuple2<String, Object> stringObjectTuple2, Context context, Collector<Tuple2<String, String>> collector) throws Exception {
            System.out.println("注册一个 timer");
            long currentProcessingTime = context.timerService().currentProcessingTime() / 1000 * 1000 + 60 * 1000;
            context.timerService().registerProcessingTimeTimer(currentProcessingTime);
                
        }
        
        
        @Override
        //TODO timer 与 process 同时发生
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, String>> out) throws Exception {
            System.out.println("我是一个 timer");
            }
        }
    

    Timer的存储

    Timer 会存储到 key state backend 中,并且会做 checkpoint ,失败会恢复。

    Timer的源码分析

    context.timerService().registerProcessingTimeTimer(currentProcessingTime); 会直接调用 InternalTimerServiceImpl.registerProcessingTimeTimer 方法

    public void registerProcessingTimeTimer(N namespace, long time) {
            InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
            if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
                long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
                // check if we need to re-schedule our timer to earlier
                if (time < nextTriggerTime) {
                    if (nextTimer != null) {
                        nextTimer.cancel(false);
                    }
                    //registerProcessingTimeTimer 定时调用 onProcessingTimer 调用,
                    // 最终调用 triggerTarget.onProcessingTimer,比如 windowOperator.onProcessingTimer
                    // ScheduledThreadPoolExecutor.schedule
                    nextTimer = processingTimeService.registerTimer(time, this);
                }
            }
        }
    

    processingTimeTimersQueue 可以保证相同的 key 和 time 对应的 timer 只会注册一次。我们以 rocksdb 为例看细节

    @Override
        // 按照时间戳的顺序添加的,时间戳越大优先级越低
        public boolean add(@Nonnull E toAdd) {
            
            //会依据条件将 rocksdb 中 store 的 timer 存储到 orderedCache 中
            checkRefillCacheFromStore();
    
            final byte[] toAddBytes = serializeElement(toAdd);
    
            // 默认 128
            // orderedCache 通过 treeSet 进行操作的
            final boolean cacheFull = orderedCache.isFull();
    
            if ((!cacheFull && allElementsInCache) ||
                LEXICOGRAPHIC_BYTE_COMPARATOR.compare(toAddBytes, orderedCache.peekLast()) < 0) {
    
                if (cacheFull) {
                    // we drop the element with lowest priority from the cache
                    orderedCache.pollLast();
                    // the dropped element is now only in the store
                    allElementsInCache = false;
                }
    
                if (orderedCache.add(toAddBytes)) {
                    // write-through sync
                    addToRocksDB(toAddBytes);
                    if (toAddBytes == orderedCache.peekFirst()) {
                        peekCache = null;
                        return true;
                    }
                }
            } else {
                // we only added to the store
                addToRocksDB(toAddBytes);
                allElementsInCache = false;
            }
            return false;
        }
    

    orderedCache 是通过 treeSet 来实现的,所以 time + key + namespace (非window 是固定不变的) 为 treeMap 的 key 。新来的 timer 除了添加到 orderedCache 外还会添加到 rocksdb。
    添加完成之后,就正式开始注册 定时任务了。当定时任务开始执行时,调用

    @Override
        // registerProcessingTimeTimer 定时调用 onProcessingTime
        // time 设定的那个 timestamp
        public void onProcessingTime(long time) throws Exception {
            // null out the timer in case the Triggerable calls registerProcessingTimeTimer()
            // inside the callback.
            nextTimer = null;
    
            InternalTimer<K, N> timer;
    
            // 小于这个 time 的所有 timer 都会被触发
            while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
                processingTimeTimersQueue.poll();
                keyContext.setCurrentKey(timer.getKey());
                // windowOperator onProcessingTime
                // 自己定义的 timer
                triggerTarget.onProcessingTime(timer);
            }
    
            if (timer != null && nextTimer == null) {
                //再次创建 timer
                nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
            }
        }
    

    至此的话,自己写的 Timer 方法就被执行了。
    当然还有一些更细节的东西,比如 timer restore ,timer snapshot ,startTimerService 等读者自己可以依需查看

    Timer的其他情况

    Window Operator 的 Timer 与此类似

    相关文章

      网友评论

        本文标题:一文搞懂 Flink Timer

        本文链接:https://www.haomeiwen.com/subject/hiehsktx.html