window源码

作者: edd72e125c98 | 来源:发表于2018-06-12 09:27 被阅读79次

    WindowOperator

    Context

    定义了各种 registerEventTimeTimer,registerEventTimeTimer还有deleteXXX, 其本质是调用internalTimerService的同名方法来增删timer

    提供onElement,onProcessingTime,onEventTime给上层(WindowOperator call), 内部就是包装了trigger的同名方法, trigger的方法由业务实现

    HeapInternalTimerService

    HeapInternalTimerService :> InternalTimerService, 主要就是维护两个queue,eventTimeTimersQueue和eventTimeTimersQueue
    这两个queue的内部元素是timer

    windowOperator implement了OneInputStreamOperator

    1. processElement() , 主要做了把element存到partitioned state里(给onEventTime和onProcessTime或自己去消费),再根据其中call context.onElement的结果来决定fire来消费
      首先用windowAssigner找到element对应的windows
        Collection<W> elementWindows = windowAssigner.assignWindows(
                element.getValue(), element.getTimestamp(), windowAssignerContext);
    

    mergeWindow逻辑跳过。。。直接看如何处理

    for (W window: elementWindows) {
    
                    // drop if the window is already late, 处理迟到数据
                    if (isLate(window)) {
                        continue;
                    }
                            // 找到每个窗口对应的partitionedState并存入
                AppendingState<IN, ACC> windowState =
                            getPartitionedState(window, windowSerializer, windowStateDescriptor);
                    windowState.add(element.getValue());
                          // 更新context
                    context.key = key;
                    context.window = window;
                          // call onElement
                    TriggerResult triggerResult = context.onElement(element);
    
                    if (triggerResult.isFire()) {
                        ACC contents = windowState.get();
                        if (contents == null) {
                            continue;
                        }
                      //执行window function  
                        emitWindowContents(window, contents);
                    }
    
                    if (triggerResult.isPurge()) {
                        windowState.clear();
                    }
                                // 注册cleanup timer, 会在onEventTime和onProcessTime中读到这个timer并clean
                    registerCleanupTimer(window);
                }
    
    1. processWatermark() 这个方法在一个父类的父类 AbstractUdfStreamOperator中实现了,自己没实现。
      这个方法call了timerService的advanceWatermark, 其中timerService call了 WindowOperatoronEventTime()
    public void advanceWatermark(long time) throws Exception {
            currentWatermark = time;
    
            InternalTimer<K, N> timer;
    
    //不大于,水印用于表示小于该时间戳的元素都已到达,所以所有不大于水印的触发时间戳都该被触发
            while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
    
                Set<InternalTimer<K, N>> timerSet = getEventTimeTimerSetForTimer(timer);
                timerSet.remove(timer);
                eventTimeTimersQueue.remove();
    
                keyContext.setCurrentKey(timer.getKey());
    //这边调用了onEventTime
                triggerTarget.onEventTime(timer);
            }
    

    processWatermark在OperatorChain.ChainningOutput.emitWatermark中被call。

    windowOperator implement了triggerable,2方法onEventTime()onProcessingTime()。都是处理timer的
    1.onEventTime(InternalTimer timer)在HeapInternalTimerService。advanceWatermark中调用(触发watermark以下的timer时)。
    onEventTime内部 callcontext.onEventTime()(触发trigger.onEventTime) 来决定触发window计算, 和执行cleanup window的逻辑cleanAllState()

    2.onProcessingTime(InternalTimer timer) call context.onProcessingTime
    和onEventTime类似, 触发trigger.onProcessTime和cleanAllState
    上层HeapInternalTimerService.onProcessTime call windowOperator.onProcessTime()

    //不大于表示小于该时间戳的元素都已到达,所以所有不大于此times的触发时间戳timer都该被触发
            while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
    
                Set<InternalTimer<K, N>> timerSet = getProcessingTimeTimerSetForTimer(timer);
    
                timerSet.remove(timer);
                processingTimeTimersQueue.remove();
    
                keyContext.setCurrentKey(timer.getKey());
    // triggerTarget  就是 windowOperator
                triggerTarget.onProcessingTime(timer);
            }
    
    EvictingWindowOperator

    EvictingWindowOperator :> WindowOperator

    在计算前写call evictor.evictorBefore

    AbstractKeyedTimePanes

    优化window到pane里, 但目前deprecate, 未来会重做
    WindowedStream reduce()apply()(fold不支持)中 createFastTimeOperatorIfValid()
    根据
    if (windowAssigner.getClass() == SlidingAlignedProcessingTimeWindows.class(or TumblingAlignedProcessingTimeWindows) && trigger == null && evictor == null)来判断是否可以使用带pane优化的operator (AccumulatingProcessingTimeWindowOperatorAggregatingProcessingTimeWindowOperator

    否则, 使用简单的 windowOperator和EvitorWindowOperator

    相关文章

      网友评论

        本文标题:window源码

        本文链接:https://www.haomeiwen.com/subject/jbebxxtx.html