Watermark
致力于解决数据的乱序和延迟问题,触发window窗口中数据的计算,多channel,source取最小值,对KeyedStream做Window,如果并发度是n,在集群中就有n个WinwowOperator的instance
Window
尽心尽力提供各种数据切分机制,在无限流上切分出有限流,进行计算
State勤勤恳恳记录中间状态,并在数据恢复等场景承担着重要的角色
-- OneInputStreamTask.invoke()
|
+----> StreamTask.init
| 把StreamTaskNetworkOutput/StreamTaskNetworkInput聚合StreamOneInputProcessor
+----> StreamTask.runMailboxLoop
| 从 StreamTask.runMailboxLoop 开始,下面是一层层的调用关系
-----> StreamTask.processInput()
-----> StreamTask.inputProcessor.processInput()
-----> StreamOneInputProcessor.processInput
-----> input.emitNext(output)
-----> StreamTaskNetworkInput.emitNext()
-----> StreamTaskNetworkInput.processElement()
1. 下面是处理普通 Record
-- StreamTaskNetworkInput.processElement()
|
| 下面都是一层层的调用关系
-----> output.emitRecord(recordOrMark.asRecord())
-----> StreamTaskNetworkOutput.emitRecord()
-----> operator.processElement(record)
进入具体算子 processElement 的处理,比如StreamFlatMap.processElement
-----> StreamFlatMap.processElement(record)
-----> userFunction.flatMap()
2. 下面是处理 Watermark
-- StreamTaskNetworkInput.processElement()
|
| 下面都是一层层的调用关系
-----> StatusWatermarkValve.inputWatermark()
-----> StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels()
-----> output.emitWatermark()
-----> StreamTaskNetworkOutput.emitWatermark()
-----> operator.processWatermark(watermark)
-----> KeyedProcessOperator.processWatermark(watermark)
具体算子processWatermark处理,如WindowOperator/KeyedProcessOperator.processWatermark
最终进入基类AbstractStreamOperator.processWatermark
-----> AbstractStreamOperator.processWatermark(watermark)
-----> timeServiceManager.advanceWatermark(mark); 第一步处理watermark
output.emitWatermark(mark) 第二步将watermark发送到下游
-----> InternalTimeServiceManager.advanceWatermark
-----> 下面看看第一步处理watermark
-----> InternalTimerServiceImpl.advanceWatermark
逻辑timer时间小于watermark的都应该被触发回调。从eventTimeTimersQueue从小到大取timer,如果小于传入的water mark,那么说明这个window需要触发。注意watermarker是没有key的,所以当一个watermark来的时候是会触发所有timer,而timer的key是不一定的,所以这里一定要设置keyContext,否则就乱了
-----> triggerTarget.onEventTime(timer);
triggerTarget是具体operator对象,open时通过InternalTimeServiceManager.getInternalTimerService传递到HeapInternalTimerService
-----> KeyedProcessOperator.onEeventTime()
调用用户实现的keyedProcessFunction.onTimer去做具体事情。对于window来说也是调用onEventTime或者onProcessTime来从key和window對應的状态中的数据发送到windowFunction中去计算并发送到下游节点
-----> invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);
-----> userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
-- DataStream 设置定时发送Watermark,是加了个chain的TimestampsAndPeriodicWatermarksOperator
-- StreamTaskNetworkInput.processElement()
-----> TimestampsAndPeriodicWatermarksOperator.processElement
会调用AssignerWithPeriodicWatermarks.extractTimestamp提取event time
然后更新StreamRecord的时间
-----> WindowOperator.processElement
在windowAssigner.assignWindows时以element的timestamp作为assign时间
参考
https://www.cnblogs.com/rossiXYZ/p/12345969.html
https://blog.csdn.net/xianzhen376/article/details/90415350
网友评论