美文网首页
Flink Watermark 传递

Flink Watermark 传递

作者: 邵红晓 | 来源:发表于2020-05-18 14:09 被阅读0次

    Watermark致力于解决数据的乱序和延迟问题,触发window窗口中数据的计算,多channel,source取最小值,对KeyedStream做Window,如果并发度是n,在集群中就有n个WinwowOperator的instance
    Window尽心尽力提供各种数据切分机制,在无限流上切分出有限流,进行计算
    State勤勤恳恳记录中间状态,并在数据恢复等场景承担着重要的角色

    -- OneInputStreamTask.invoke()
          |   
          +----> StreamTask.init 
          |      把StreamTaskNetworkOutput/StreamTaskNetworkInput聚合StreamOneInputProcessor
          +----> StreamTask.runMailboxLoop
          |      从 StreamTask.runMailboxLoop 开始,下面是一层层的调用关系
          -----> StreamTask.processInput()
          -----> StreamTask.inputProcessor.processInput()
          -----> StreamOneInputProcessor.processInput
          -----> input.emitNext(output)
          -----> StreamTaskNetworkInput.emitNext()
          -----> StreamTaskNetworkInput.processElement()
    
    
    1. 下面是处理普通 Record  
    -- StreamTaskNetworkInput.processElement()  
          |   
          | 下面都是一层层的调用关系
          -----> output.emitRecord(recordOrMark.asRecord())
          -----> StreamTaskNetworkOutput.emitRecord()
          -----> operator.processElement(record)
                 进入具体算子 processElement 的处理,比如StreamFlatMap.processElement
          -----> StreamFlatMap.processElement(record)
          -----> userFunction.flatMap()
     
    
    2. 下面是处理 Watermark
    -- StreamTaskNetworkInput.processElement()  
          |   
          | 下面都是一层层的调用关系
          -----> StatusWatermarkValve.inputWatermark()
          -----> StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels()
          -----> output.emitWatermark()
          -----> StreamTaskNetworkOutput.emitWatermark()
          -----> operator.processWatermark(watermark) 
          -----> KeyedProcessOperator.processWatermark(watermark) 
                 具体算子processWatermark处理,如WindowOperator/KeyedProcessOperator.processWatermark 
                 最终进入基类AbstractStreamOperator.processWatermark
          -----> AbstractStreamOperator.processWatermark(watermark) 
          -----> timeServiceManager.advanceWatermark(mark); 第一步处理watermark
                 output.emitWatermark(mark) 第二步将watermark发送到下游
          -----> InternalTimeServiceManager.advanceWatermark   
          -----> 下面看看第一步处理watermark  
          -----> InternalTimerServiceImpl.advanceWatermark   
                 逻辑timer时间小于watermark的都应该被触发回调。从eventTimeTimersQueue从小到大取timer,如果小于传入的water mark,那么说明这个window需要触发。注意watermarker是没有key的,所以当一个watermark来的时候是会触发所有timer,而timer的key是不一定的,所以这里一定要设置keyContext,否则就乱了
          -----> triggerTarget.onEventTime(timer);
                 triggerTarget是具体operator对象,open时通过InternalTimeServiceManager.getInternalTimerService传递到HeapInternalTimerService  
          -----> KeyedProcessOperator.onEeventTime()
                 调用用户实现的keyedProcessFunction.onTimer去做具体事情。对于window来说也是调用onEventTime或者onProcessTime来从key和window對應的状态中的数据发送到windowFunction中去计算并发送到下游节点  
          -----> invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);
          -----> userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
    
    
    -- DataStream 设置定时发送Watermark,是加了个chain的TimestampsAndPeriodicWatermarksOperator
    -- StreamTaskNetworkInput.processElement()        
          -----> TimestampsAndPeriodicWatermarksOperator.processElement
                 会调用AssignerWithPeriodicWatermarks.extractTimestamp提取event time
                 然后更新StreamRecord的时间
          -----> WindowOperator.processElement
                 在windowAssigner.assignWindows时以element的timestamp作为assign时间
    

    参考
    https://www.cnblogs.com/rossiXYZ/p/12345969.html
    https://blog.csdn.net/xianzhen376/article/details/90415350

    相关文章

      网友评论

          本文标题:Flink Watermark 传递

          本文链接:https://www.haomeiwen.com/subject/ecuuohtx.html