美文网首页
flink 问题总结(9)registerProcessingT

flink 问题总结(9)registerProcessingT

作者: ZYvette | 来源:发表于2020-07-24 18:04 被阅读0次

    请移步 flink问题总结目录 (不断更新中)
    姐妹篇 registerEventTimeTimer

    开篇

    我在开发过程中发现,注册相同时间的registerProcessingTimeTimer,不是每个都会触发执行,也不是只会执行一次,为什么这样,我觉得很奇怪,所以翻看了源码,本篇文章就是对这个疑问做讲解。

    前期

    我写了个测试代码,结果发现会有上述问题。
    代码如下:

        public static void processingTimeWindow() throws Exception {
            long ct=System.currentTimeMillis();
            System.out.println(ct);// 打印我触发的时间
            StreamExecutionEnvironment e = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<Long> source = e
                    .addSource(new SourceFunction<Long>() {
                        private volatile boolean stop = false;
    // 数据源是当前时间,一共有1000条数据
                        @Override
                        public void run(SourceContext<Long> ctx) throws Exception {
                            for(int i=0;i<200;i++){
                                ctx.collect(System.currentTimeMillis());
                            }
                        }
    
                        @Override
                        public void cancel() {
                            stop = true;
                        }
                    }).setParallelism(1);
            e.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // optional for Processing time
            source.keyBy(v->v/1000).process(new KeyedProcessFunction<Long, Long, Long>() {
                private ValueState<Integer> itemState;
                @Override
                public void open(Configuration parameters) throws Exception {
                    super.open(parameters);
                    ValueStateDescriptor<Integer> itemsStateDesc = new ValueStateDescriptor<>(
                            "itemState-state",
                            Integer.class);
                    itemState = getRuntimeContext().getState(itemsStateDesc);
                }
    
                @Override
                public void processElement(Long value, Context ctx, Collector<Long> out) throws Exception {
    // 每条数据会存入state,并用同一个时间ct出发定时器。
                    int val=(itemState.value()==null)?0:itemState.value();
                    itemState.update(val+1);
                    ctx.timerService().registerProcessingTimeTimer(ct);
                }
    
                @Override
                public void onTimer(long timestamp, OnTimerContext ctx, Collector<Long> out) throws Exception {
    //出发定时器时打印state, 触发时间,key
                    System.out.println(itemState.value());
                    System.out.println(timestamp+"——"+ctx.getCurrentKey());
                    System.out.println();
                }
    
                @Override
                public void close() throws Exception {
                    super.close();
                }
    
            }).setParallelism(1);
            e.execute();
        }
    

    执行结果如下:

    1595578833772                              // ct
    56                                                    // state
    1595578833772——1595578846          // 触发time  和key
    
    129
    1595578833772——1595578846
    
    197
    1595578833772——1595578846
    
    200
    1595578833772——1595578846
    

    主要步骤是:

    1. souce: 一批数据,由当前时间构成。200条, 可是自己修改大小,现象一样
    2. keyby: 根据 时间/1000 keyby ,可以认为是1秒一个窗口
    3. process: 存储状态累加,触发打印

    分析

    上结果可以看出,200条数据都被分到了1595578846 这个key里,同时触发执行的time都是1595578833772

    在没有看结果之前,一般观点:

    观点1. 认为使用processing time,那么我用的时间到了就会触发,所以应该不会执行,因为是过期时间。
    观点2. 认为我调用了200次的registerProcessingTimeTimer,那么就应该是200次
    观点3. 认为processing time 应该只执行一次,因为触发时间都是相同的。

    但是现象是:

    1. 输入的时间和实际执行的时间没关系
    2. 触发时间相同的话,不会执行200次,但是可能会执行不只1次。

    源码阅读:

    A. 整体注册方法

    首先来看下:

    1. registerProcessingTimeTimer 方法:
        @Override
        public void registerProcessingTimeTimer(N namespace, long time) {
    // 注册timer之前会先从队列头去除oldHead
            InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
    // 判断如果新的time调价到队列成功的话:
            if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
    // 判断oldHead 触发时间 如果是空的会改为最大值
                long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
                // check if we need to re-schedule our timer to earlier
    // 如果新添加的time 比oldHead 的触发时间小的话:
                if (time < nextTriggerTime) {
    // 如果nextTimer 不是空的,就会取消这次定时,当然已经执行的话不会取消。
                    if (nextTimer != null) {
                        nextTimer.cancel(false);
                    }
    // 将新的time注册为nextTimer先执行。
                    nextTimer = processingTimeService.registerTimer(time, this::onProcessingTime);
                }
            }
        }
    

    上述可以理解为:

    1. 取队头的timer
    2. 如果本次time的任务添加到队里成功,且如果本次time小于队头的这个触发时间,就将nextTimer 任务取消(已经执行除外),同时马上将本次的time作为下次触发的Timer。

    B. 新timer 添加队列部分的代码

    那么我们来看,time添加队列成功的条件是什么?

        /**
         * Adds the element to the queue. In contrast to the superclass and to maintain set semantics, this happens only if
         * no such element is already contained (determined by {@link #equals(Object)}).
         *
         * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
         * Only returns <code>false</code> iff the head element was not changed by this operation.
         */
        @Override
        public boolean add(@Nonnull T element) {
            return getDedupMapForElement(element).putIfAbsent(element, element) == null && super.add(element);
        }
    
    // 获取key对应的keygroup
        private HashMap<T, T> getDedupMapForElement(T element) {
            int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(
                keyExtractor.extractKeyFromElement(element),
                totalNumberOfKeyGroups);
            return getDedupMapForKeyGroup(keyGroup);
        }
    
    // return previous value, or null if none
        @Override
        public V putIfAbsent(K key, V value) {
            return putVal(hash(key), key, value, true, true);
        }
    
    

    代码讲解

    1. processingTimeTimersQueue 是一个优先队列,查看他的排序规则,是根据time来排序的,也就是说注册timer的time越小就会在前边。
    2. 添加分两步:
      a. getDedupMapForElement(element).putIfAbsent(element, element) == null:
      这个是首先获取key 对应的hashmap,也就是keygroup对应的存储time的hashmap。将元素添加并返回旧的元素。就是的元素是否是null。也就是判断是否存在相同time的timer。不存在,则符合添加条件。
      b. super.add(element)
      将元素存入heapPriorityQueue, 存入该新的timer,成功是true。

    结论: 如果time相同就不会添加成功,那么也就不会触发Timer 。
    似乎前边的观点3应该是对的只执行一次,触发时间到了就触发。那么实际为啥执行的和我们想的不一样呢,我们来看看registerTimer 方法。

    C. 实际注册Timer的方法:registerTimer

    /**
         * Registers a task to be executed no sooner than time {@code timestamp}, but without strong
         * guarantees of order.
         *
         * @param timestamp Time when the task is to be enabled (in processing time)
         * @param callback    The task to be executed
         * @return The future that represents the scheduled task. This always returns some future,
         *         even if the timer was shut down
         */
        @Override
        public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback callback) {
    
            long delay = ProcessingTimeServiceUtil.getProcessingTimeDelay(timestamp, getCurrentProcessingTime());
    
            // we directly try to register the timer and only react to the status on exception
            // that way we save unnecessary volatile accesses for each timer
            try {
                return timerService.schedule(wrapOnTimerCallback(callback, timestamp), delay, TimeUnit.MILLISECONDS);
            }
            catch (RejectedExecutionException e) {
                final int status = this.status.get();
                if (status == STATUS_QUIESCED) {
                    return new NeverCompleteFuture(delay);
                }
                else if (status == STATUS_SHUTDOWN) {
                    throw new IllegalStateException("Timer service is shut down");
                }
                else {
                    // something else happened, so propagate the exception
                    throw e;
                }
            }
        }
    

    代码讲解:
    a. return timerService.schedule(wrapOnTimerCallback(callback, timestamp), delay, TimeUnit.MILLISECONDS);
    这个方法是实际调度方法,他的执行时间实在delay的时长之后。
    delay的计算方法:

        /**
         * Returns the remaining delay of the processing time specified by {@code processingTimestamp}.
         *
         * @param processingTimestamp the processing time in milliseconds
         * @param currentTimestamp the current processing timestamp; it usually uses
         *        {@link ProcessingTimeService#getCurrentProcessingTime()} to get
         * @return the remaining delay of the processing time
         */
        public static long getProcessingTimeDelay(long processingTimestamp, long currentTimestamp) {
    
            // delay the firing of the timer by 1 ms to align the semantics with watermark. A watermark
            // T says we won't see elements in the future with a timestamp smaller or equal to T.
            // With processing time, we therefore need to delay firing the timer by one ms.
            return Math.max(processingTimestamp - currentTimestamp, 0) + 1;
        }
    

    delay=Math.max(processingTimestamp - currentTimestamp, 0) + 1;
    所以实际是执行时间-当前时间的差加上1ms。
    那么当执行时间小于当前时间的时候,Timer会在1ms后被调度。
    所以说明实际执行时间还没到就会等待到达执行时间,如果已经过了就会在1ms后马上执行。

    D. Timer 时间到达执行操作

    再看一下到达执行时间,做了什么操作:

        private void onProcessingTime(long time) throws Exception {
            // null out the timer in case the Triggerable calls registerProcessingTimeTimer()
            // inside the callback.
            nextTimer = null;
    
            InternalTimer<K, N> timer;
    
            while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
                processingTimeTimersQueue.poll();// 从队列中去除,同时从keygroup的map中也会去除。
                keyContext.setCurrentKey(timer.getKey());
                triggerTarget.onProcessingTime(timer);
            }
    
            if (timer != null && nextTimer == null) {
                nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this::onProcessingTime);
            }
        }
    

    a. 循环找到小于等于当前触发time的Timer。从队列去除(包括从hashmap中去除),设置当前key,并执行实际onTimer方法。
    b. 如果当前timer和nextTimer为空,则设置nextTimer。

    总结一下: 当Timer到达的时候同时队列里可能有小于等于的Timer(包括自己)。将这些Timer都取出同时执行用户的OnTimer 方法。并且在队里还有Timer且nextTimer为空的时候,注册下一次Timer。

    总结

    我们阅读源码之后解释上述三个观点及实际现象。

    观点1. 认为使用processing time,那么我用的时间到了就会触发,所以应该不会执行,因为是过期时间。
    观点2. 认为我调用了200次的registerProcessingTimeTimer,那么就应该是200次
    观点3. 认为processing time 应该只执行一次,因为触发时间都是相同的。

    1. 正常情况processing time到了就会触发,但是在processtime注册Timer时已经超过当前时间就会马上执行。
    2. 因为注册Timer时按照key逻辑分区的,所以每一个区里如果有重复的话,是会去重的。但是在Timer执行之后会将该Timer从队列中移除(包括map中移除),所以队列中就没有该Timer,那么新来的时间如果还是上次的time就会被认为是新的time,从而再次出发Timer。因此触发执行的次数不是200,但是也不是只有1次。

    相关文章

      网友评论

          本文标题:flink 问题总结(9)registerProcessingT

          本文链接:https://www.haomeiwen.com/subject/bswjlktx.html