美文网首页
Flink自定义触发器

Flink自定义触发器

作者: 和平菌 | 来源:发表于2018-10-30 16:27 被阅读0次

    上一篇分享中介绍了Flink完成数据统计的例子,在最后提到了自定义的统计触发器,这一篇分享主要介绍一下自定义的触发器如何来实现。

    一、触发器的作用
    触发器的作用就是我们在窗口中,什么时候来触发我们的聚合方法。主要涉及到的就是聚合计算(AggregateFunction)中的
    OUT getResult(ACC var1);
    这两个方法

    比如我们想要在1个小时为单位的时间窗口里,达到每分钟来刷新数据的目的,那我们就必须每分钟都要触发一次getResult方法,来把数据发送到下一个处理节点(一般来说都是Sink-->保存数据的节点)

    二、触发器的的实现
    实现很简单,只需要继承Trigger<Object, W>类,实现它的方法即可
    例如,我们需要一个带步长的触发器:

    class ProcessTimeTrigger<W extends Window> extends Trigger<Object, W>
    private final long interval;
    private ProcessTimeTrigger(long interval) {
            this.interval = interval;
        }
    
    

    方法调用时机如下:
    onElement()方法,每个元素被添加到窗口时调用
      
    onEventTime()方法,当一个已注册的事件时间计时器启动时调用
      onProcessingTime()方法,当一个已注册的处理时间计时器启动时调用
      
    onMerge()方法,与状态性触发器相关,当使用会话窗口时,两个触发器对应的窗口合并时,合并两个触发器的状态。
      *最后一个clear()方法执行任何需要清除的相应窗口
    上面的方法中有两个需要注意的地方:
    1)第一、三通过返回一个TriggerResult来决定如何操作调用他们的事件,这些操作可以是下面操作中的一个;
    CONTINUE:什么也不做
    FIRE:触发计算
    PURGE:清除窗口中的数据
    FIRE_AND_PURGE:触发计算并清除窗口中的数据

    三、自定义注册定时触发器
    我们在需要在onElement中注册一个定时触发的任务

    @Override
        public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
            ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
    
            timestamp = ctx.getCurrentProcessingTime();
    
            if (fireTimestamp.get() == null) {
                long start = timestamp - (timestamp % interval);
                long nextFireTimestamp = start + interval;
    
                ctx.registerProcessingTimeTimer(nextFireTimestamp);
    
                fireTimestamp.add(nextFireTimestamp);
                return TriggerResult.CONTINUE;
            }
            return TriggerResult.CONTINUE;
        }
    

    根据步长来注册下次执行的时间

    然后

    @Override
        public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
            ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
    
            if (fireTimestamp.get().equals(time)) {
                fireTimestamp.clear();
                fireTimestamp.add(time + interval);
                ctx.registerProcessingTimeTimer(time + interval);
                return TriggerResult.FIRE;
            } else if(window.maxTimestamp() == time) {
                return TriggerResult.FIRE;
            }
            return TriggerResult.CONTINUE;
        }
    

    在onProcessingTime的时候如果步长和当前的执行时间一致,则触发计算
    并再注册下一次的触发时间,直到窗口结束。

    相关文章

      网友评论

          本文标题:Flink自定义触发器

          本文链接:https://www.haomeiwen.com/subject/ouigtqtx.html