美文网首页
Flink Watermark 传递

Flink Watermark 传递

作者: 邵红晓 | 来源:发表于2020-05-18 14:09 被阅读0次

Watermark致力于解决数据的乱序和延迟问题,触发window窗口中数据的计算,多channel,source取最小值,对KeyedStream做Window,如果并发度是n,在集群中就有n个WinwowOperator的instance
Window尽心尽力提供各种数据切分机制,在无限流上切分出有限流,进行计算
State勤勤恳恳记录中间状态,并在数据恢复等场景承担着重要的角色

-- OneInputStreamTask.invoke()
      |   
      +----> StreamTask.init 
      |      把StreamTaskNetworkOutput/StreamTaskNetworkInput聚合StreamOneInputProcessor
      +----> StreamTask.runMailboxLoop
      |      从 StreamTask.runMailboxLoop 开始,下面是一层层的调用关系
      -----> StreamTask.processInput()
      -----> StreamTask.inputProcessor.processInput()
      -----> StreamOneInputProcessor.processInput
      -----> input.emitNext(output)
      -----> StreamTaskNetworkInput.emitNext()
      -----> StreamTaskNetworkInput.processElement()


1. 下面是处理普通 Record  
-- StreamTaskNetworkInput.processElement()  
      |   
      | 下面都是一层层的调用关系
      -----> output.emitRecord(recordOrMark.asRecord())
      -----> StreamTaskNetworkOutput.emitRecord()
      -----> operator.processElement(record)
             进入具体算子 processElement 的处理,比如StreamFlatMap.processElement
      -----> StreamFlatMap.processElement(record)
      -----> userFunction.flatMap()
 

2. 下面是处理 Watermark
-- StreamTaskNetworkInput.processElement()  
      |   
      | 下面都是一层层的调用关系
      -----> StatusWatermarkValve.inputWatermark()
      -----> StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels()
      -----> output.emitWatermark()
      -----> StreamTaskNetworkOutput.emitWatermark()
      -----> operator.processWatermark(watermark) 
      -----> KeyedProcessOperator.processWatermark(watermark) 
             具体算子processWatermark处理,如WindowOperator/KeyedProcessOperator.processWatermark 
             最终进入基类AbstractStreamOperator.processWatermark
      -----> AbstractStreamOperator.processWatermark(watermark) 
      -----> timeServiceManager.advanceWatermark(mark); 第一步处理watermark
             output.emitWatermark(mark) 第二步将watermark发送到下游
      -----> InternalTimeServiceManager.advanceWatermark   
      -----> 下面看看第一步处理watermark  
      -----> InternalTimerServiceImpl.advanceWatermark   
             逻辑timer时间小于watermark的都应该被触发回调。从eventTimeTimersQueue从小到大取timer,如果小于传入的water mark,那么说明这个window需要触发。注意watermarker是没有key的,所以当一个watermark来的时候是会触发所有timer,而timer的key是不一定的,所以这里一定要设置keyContext,否则就乱了
      -----> triggerTarget.onEventTime(timer);
             triggerTarget是具体operator对象,open时通过InternalTimeServiceManager.getInternalTimerService传递到HeapInternalTimerService  
      -----> KeyedProcessOperator.onEeventTime()
             调用用户实现的keyedProcessFunction.onTimer去做具体事情。对于window来说也是调用onEventTime或者onProcessTime来从key和window對應的状态中的数据发送到windowFunction中去计算并发送到下游节点  
      -----> invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);
      -----> userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);


-- DataStream 设置定时发送Watermark,是加了个chain的TimestampsAndPeriodicWatermarksOperator
-- StreamTaskNetworkInput.processElement()        
      -----> TimestampsAndPeriodicWatermarksOperator.processElement
             会调用AssignerWithPeriodicWatermarks.extractTimestamp提取event time
             然后更新StreamRecord的时间
      -----> WindowOperator.processElement
             在windowAssigner.assignWindows时以element的timestamp作为assign时间

参考
https://www.cnblogs.com/rossiXYZ/p/12345969.html
https://blog.csdn.net/xianzhen376/article/details/90415350

相关文章

网友评论

      本文标题:Flink Watermark 传递

      本文链接:https://www.haomeiwen.com/subject/ecuuohtx.html