美文网首页
Flink1.10—ProcessFunction使用的问题及分

Flink1.10—ProcessFunction使用的问题及分

作者: 小胡子哥灬 | 来源:发表于2020-06-07 01:20 被阅读0次

    ProcessFunction是Flink的low-level Stream Api,我们在使用Flink的API进行作业时,不外乎关心以下三种东西:

    • events(也就是流数据,streaming elements)
    • state(状态)
    • timer(时间)

    ProccessFunction就是这样一种API,它提供了所有能访问以上三种东西的接口。需要注意的是,如果要访问keyed state,则必须使用keyedstream:

    stream.keyBy(...).process(new MyProcessFunction())
    

    ProccessFunction就像是一个RichMapFunction + Timer的组合体,但功能更强大。比如当你用RichMapFunction+Timer时,state的访问就是一个问题,你无法在除map以外的其它方法访问keyed state。
    在我刚刚开始用ProccessFunction的时候,遇到些问题,这里记录下。

    为什么定时任务不触发

    相信大家刚开始使用ProccessFunction的时候,应该都先看了Flink官网的Demo,不知道大家有没有运行成功,我反正没有运行成功过,定时任务也不触发。我测试用到的kafka数据非常简单就两字段:userId和viewTime。ProccessFunction里的主要逻辑如下:

    public void processElement(UserAction value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
        // retrieve the current count
        CountWithTimestamp current = state.value();
        if (current == null) {
          current = new CountWithTimestamp();
        }
        // update the state's count
        current.count++;
        // ctx.timestamp()方法返回你的事件时间
        current.lastModified = ctx.timestamp();
        // 更新状态
        state.update(current);
        // 一分钟后调度计算(执行onTimer方法)
        ctx.timerService().registerEventTimeTimer(currentWarter + 60000);
      }
    
      @Override
      public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
        Tuple1<String> key = (Tuple1<String>) ctx.getCurrentKey();
        CountWithTimestamp result = state.value();
        if (timestamp == result.lastModified + 60000) {
          out.collect(new Tuple2<>(key.f0, result.count));
        }
      }
    

    那为什么没有触发onTimer呢?没办法,只能看源代码了,因应之前有阅读过Flink1.6各个算子之前的数据传输的代码的经验,直觉告诉我,应该在StreamInputProcessor.java类里找原因,不过现在我用的是flink1.10,所以我还得去Flink1.10里找,代码差不多,也是从StreamInputProcessor里找,只不过在Flink1.10里它变成了一个接口类,不过没关系,大同小异。在StreamTaskNetworkInput.java类中的一个关键方法processElement:

    private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {
            if (recordOrMark.isRecord()){
                output.emitRecord(recordOrMark.asRecord());
            } else if (recordOrMark.isWatermark()) {
                statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel);
            } else if (recordOrMark.isLatencyMarker()) {
                output.emitLatencyMarker(recordOrMark.asLatencyMarker());
            } else if (recordOrMark.isStreamStatus()) {
                statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), lastChannel);
            } else {
                throw new UnsupportedOperationException("Unknown type of StreamElement");
            }
        }
    

    从上面可以看到,Flink在处理元素数据的时候,有4个分支判断,即是判断StreamElement属于哪种类型:

    • StreamRecord - 代表一条流数据
    • WaterMark - 水位线,本身就是一个时间戳,指示元素timestamp小等于watermark的值都已经到了,算子通过调用。
    • StreamStatus - 流状态,包含IDLE和ACTIVE两种状态。
    • LatencyMarker - 一个特殊的mark,用于判断数据的延迟情况。

    其中recordOrMark.isWatermark()分支就是用于判断是否为一个wartermark, 当我们在程序中给元素打上watermark的时候,程序就会进入这个判断,处理新的watermark,当新的watermark大于旧的watermark时,会覆盖旧的watermark,并且如果有定时任务,则触发onTimer(新的watermark大于等于定时的timer),如下:

    //AbstractStreamOperator.java
    public void processWatermark(Watermark mark) throws Exception {
        if (timeServiceManager != null) {
            timeServiceManager.advanceWatermark(mark);
        }
        output.emitWatermark(mark);
    }
    
    //InternalTimerServiceImpl.java
    public void advanceWatermark(long time) throws Exception {
        currentWatermark = time;
    
        InternalTimer<K, N> timer;
           //从队列里取出定时任务,并且判断时间是否小于等于当前的watermark
        while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
            eventTimeTimersQueue.poll();
            keyContext.setCurrentKey(timer.getKey());
            triggerTarget.onEventTime(timer);
        }
    }
    

    给程序加上WaterMark

    因为我ProccessFunctio里使用的是ctx.timerService().registerEventTimeTimer,基于事件时间的timer,所以要使ProccessFunction里的定时任务能够触发,我们还需要如下两步:

    1. 把Time Characteristic设置为EventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
    1. 给数据加上watermark
    stream
    .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<UserAction>() {
            private long currentMaxTimestamp = 0L;
            private long maxOutOfOrderness = 10000L;//最大允许的乱序时间是10s
            private Watermark watermark = null;
            @Override
            public long extractTimestamp(UserAction element, long previousElementTimestamp) {
                    long timestamp = element.viewTime;
                    currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
            }
            @Nullable
            @Override
            public Watermark getCurrentWatermark() {
                    watermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness);
                    return watermark;
            }
    })
    .keyBy("userId")
    .process(new CountWithTimeoutFunction())
    .print();
    

    再次运行程序,把断点放在onTimer()里,依次发送数据:

    shizc,2020-06-04 16:25:00
    shizc,2020-06-04 16:26:00
    

    当发送完第二条数据的时候,断点走到了onTimer(),但当继续执行时,由于我们程序里有这样一个判断:

    // timestamp是触发timer时的那个点,
    if (timestamp == result.lastModified + 60000) {
          // emit the state on timeout
          out.collect(new Tuple2<>(key.f0, result.count));
    }
    

    我们发现程序永远也进入不了这个判断,具体原因是:

    1. 基于event-time的定时任务需要下一个元素的watermark高于当前的定时任务,比如数据当前的timestamp是2020-06-04 16:25:00,设置60s后触发onTimer,那么定时任务就是2020-06-04 16:26:00,如果想要onTimer触发,就必须要有一条数的timestamp是大于等于2020-06-04 16:26:00。
    2. 当满足上面 1 的条件时,由于以下代码的原因导致timestamp == result.lastModified + 6000永远不成立:
    ......
    // 
    current.lastModified = ctx.timestamp();
    // 更新状态
    state.update(current);
    // 一分钟后调度计算(执行onTimer方法)
    ctx.timerService().registerEventTimeTimer(currentWarter + 60000);
    

    比如:我们第一条数据是(shizc,2020-06-04 16:25:00),那么current.lastModified = ctx.timestamp() == 2020-06-04 16:25:00,执行registerEventTimeTimer后,我期望在2020-06-04 16:26:00执行onTimer,所以我发送了第二条数据(shizc,2020-06-04 16:26:00),断点走到了onTimer():
    此时onTimer方法里的timestamp=2020-06-04 16:26:00,而result.lastModified=2020-06-04 16:26:00,所以if (timestamp == result.lastModified + 60000),永远不会成立。这里应该是官网demo的bug,不应该去数据里的timestamp,应该取的是用前一个watermark来注册timer。

    使用WaterMark注册Timer

    我们修改下CountWithTimeoutFunction里的processElement代码,并加上些日志输出,代码如下:

    @Override
      public void processElement(UserAction value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
       ...
       // 取前一个watermark,基于前一个watermark的60s后,触发onTimer计算。而不是基于timestamp。
        long prevWaterMark = ctx.timerService().currentWatermark();
        LOG.info("前一个watermark: {}({})", prevWaterMark, DateFormatUtils.format(prevWaterMark, "yyyy-MM-dd HH:mm:ss"));
        current.lastModified = prevWaterMark;
        // write the state back
        state.update(current);
        ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
      }
    
      @Override
      public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
        CountWithTimestamp result = state.value();
        long prevWaterMark = result.lastModified;
        LOG.info("onTimer触发时间: {}({}), 前一个watermark: {}({})", timestamp, DateFormatUtils.format(timestamp, "yyyy-MM-dd HH:mm:ss"),  prevWaterMark, DateFormatUtils.format(prevWaterMark, "yyyy-MM-dd HH:mm:ss"));
        Tuple1<String> key = (Tuple1<String>) ctx.getCurrentKey();
        if (timestamp == result.lastModified + 60000) {
          out.collect(new Tuple2<>(key.f0, result.count));
        }
      }
    

    代码里关键的部分是long prevWaterMark = ctx.timerService().currentWatermark();,我们应该取前一个watermark,基于前一个watermark的60s后,触发onTimer计算。而不是基于timestamp。
    然后我们依次发送测试数据:

    shizc,2020-06-04 16:25:00
    shizc,2020-06-04 16:26:00
    shizc,2020-06-04 16:28:00
    

    得到的输出如下:

    前一个watermark: -10000(1970-01-01 07:59:50)
    onTimer触发时间: 50000(1970-01-01 08:00:50), 前一个watermark: -10000(1970-01-01 07:59:50)
    (shizc,1)
    前一个watermark: 1591259090000(2020-06-04 16:24:50)
    onTimer触发时间: 1591259150000(2020-06-04 16:25:50), 前一个watermark: 1591259090000(2020-06-04 16:24:50)
    (shizc,2)
    前一个watermark: 1591259150000(2020-06-04 16:25:50)
    onTimer触发时间: 1591259210000(2020-06-04 16:26:50), 前一个watermark: 1591259150000(2020-06-04 16:25:50)
    (shizc,3)
    

    通过日志可以看出,数据已经是正常输出了。

    使用ProccessTime注册Timer

    接着来看下基于ProccessTimer注册Timer和基于EventTime的Timer有什么不同:

    public void registerProcessingTimeTimer(N namespace, long time) {
            //取出head,
            InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
            if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
            //如果head不为空,则取head的timestamp作为下一个triggertime
                long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
                // 查检当前的time是否小于nextTrigger,如果小于则直接把当前的时间设置定时任务。
                if (time < nextTriggerTime) {
                    if (nextTimer != null) {
                        nextTimer.cancel(false);
                    }
                    // 真正设置定时任务
                    nextTimer = processingTimeService.registerTimer(time, this::onProcessingTime);
                }
            }
        }
    
        public void registerEventTimeTimer(N namespace, long time) {
            //直接加入到队列,等待下一个watermark到达后,再取出判断执行。 
            eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
        }
    

    可以看出proccessTime和eventTime的定时任务还是有很大差别的,proccessTime是先与队首判断,如果小于队首,则直接设置定时任务,否则加入队列(里面具体没看,不过应该是有排序的);而eventTime的是直接加入到队列,等待下一个watermark的触发,再取出判断执行。

    proccessTimeTimer的执行逻辑是从头开始执行的,当你注册了三个timer,并不是已经都为三个timer设置了定时任务,而是先为时间最小的那个设置了定时任务,等待时间触发后,再设置第二个定时任务,如下:

    private void onProcessingTime(long time) throws Exception {
            // null out the timer in case the Triggerable calls registerProcessingTimeTimer()
            // inside the callback.
            nextTimer = null;
    
            InternalTimer<K, N> timer;
    
            while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
                processingTimeTimersQueue.poll();
                keyContext.setCurrentKey(timer.getKey());
                triggerTarget.onProcessingTime(timer);
            }
    
            if (timer != null && nextTimer == null) {
                // 真正设置定时任务
                nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this::onProcessingTime);
            }
    }
    

    所以我们不担心timer多,而是担心timer多而间隔小。所以基于proccessTime的timer我们可以这样改进:

    long currentProccessTime = ctx.timerService().currentProcessingTime();
        if(current.lastModified == 0 || current.lastModified + 60000 <= currentProccessTime) {
          current.lastModified = currentProccessTime;
          // write the state back
          state.update(current);
          ctx.timerService().registerProcessingTimeTimer(current.lastModified + 60000);
        } else {
          state.update(current);
    }
    

    每隔一分钟设置一次timer,onTimer()里就不需要在有if (timestamp == result.lastModified + 60000)的判断了,因为我们已经控制了timer的个数,同样基于eventTime的timer也可以这样改进。

    运行代码,每隔60s输出数据:

    onTimer触发时间: 1591344468660(2020-06-05 16:07:48), processingTime: 1591344408660(2020-06-05 16:06:48)
    (shizc,2)
    
    onTimer触发时间: 1591344499806(2020-06-05 16:08:19), processingTime: 1591344439806(2020-06-05 16:07:19)
    (flink,2)
    

    以上就是我使用ProccessFunction遇到的一些问题,希望对大家有所帮助,若有不正确的地方欢迎纠正 ~~.

    新的发现

    就以上的问题,我在flink jira上提了个issue: FLINK-19167,我把我遇到的问题描述后,Flink大神们也都参与了讨论:

    discuss.png

    最后Dawid Wysakowicz 大神指出了问题的根源: 这个demo只有在多个key(多个window)的情况下才能正确运行,单个key的情况下,会存在lastModified的值被覆盖的情况。举个例子:
    有3条数据:(key1, 1), (key1,2),(key1,60001), 当window收到第一条数据时,注册一个基于eventtime的timer,watermark为1 + 6000, 当第三条数据进来时,先把lastModified更新为60001,此时触发onTimer, timestamp == lastModified,都同为60001,所以 if 的判断不成立。
    如果把(key1, 60001)的key改成key2, 那么前面的key1的所有timer都会触发。是因为key1的lastModified不会被更新,但是watermark大于它所注册的wartermark,所以会触发。

    timestamp: 60001, lastModified: 1
    timestamp: 60002, lastModified: 2
    (key1,2)
    

    所以由以上得出的结论是: 基于eventTime的 每一个key的timer,它们是同一个流,同一个流也就会受到watermark的增长而影响到(可以想象成 他们在同一条时间轴上)

    Flink的大神们也都说了,官网的例子的确有没有考虑到的地方,只是一个很简单的例子,多个key没问题,单个key不生效。

    The end !

    相关文章

      网友评论

          本文标题:Flink1.10—ProcessFunction使用的问题及分

          本文链接:https://www.haomeiwen.com/subject/tuxgzhtx.html