美文网首页
Flink Watermark中extractTimestamp

Flink Watermark中extractTimestamp

作者: kaiker | 来源:发表于2021-09-18 18:16 被阅读0次
    • 所以设置事件时间时,开窗会使用你extract的这个时间戳
    DataStream<SensorReading> dataStream = inputStream.map(line -> {
                String[] fields = line.split(",");
                return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
            }).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<SensorReading>() {
                private long maxOutofOrderness = 5 * 1000;
                private long currentMaxTs = Long.MIN_VALUE;
    
                @Nullable
                @Override
                public Watermark getCurrentWatermark() {
                    return new Watermark(currentMaxTs - maxOutofOrderness);
                }
    
                @Override
                public long extractTimestamp(SensorReading sensorReading, long l) {
                    //获取当前记录的时间戳
                    long currentTs = sensorReading.getTimestamp();
                    // 更新最大的时间戳
                    currentMaxTs = Math.max(currentMaxTs, currentTs);
                    // 返回记录的时间戳
                    return currentTs;
                }
            });
    
    • assignTimestampsAndWatermarks里面传了一个AssignerWithPeriodicWatermarks的匿名类,也可以当是传了一个对象进去。
    public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {
            int inputParallelism = this.getTransformation().getParallelism();
            AssignerWithPeriodicWatermarks<T> cleanedAssigner = (AssignerWithPeriodicWatermarks)this.clean(timestampAndWatermarkAssigner);
            TimestampsAndPeriodicWatermarksOperator<T> operator = new TimestampsAndPeriodicWatermarksOperator(cleanedAssigner);
            return this.transform("Timestamps/Watermarks", this.getTransformation().getOutputType(), operator).setParallelism(inputParallelism);
        }
    
    • TimestampsAndPeriodicWatermarksOperator这个类里调用了对象的extractTimestamp
    public void processElement(StreamRecord<T> element) throws Exception {
            long newTimestamp = ((AssignerWithPeriodicWatermarks)this.userFunction).extractTimestamp(element.getValue(), element.hasTimestamp() ? element.getTimestamp() : -9223372036854775808L);
            this.output.collect(element.replace(element.getValue(), newTimestamp));
        }
    
    • 里面用到的StreamRecord包装了传递的数据
    public final class StreamRecord<T> extends StreamElement {
        private T value;
        private long timestamp;
        private boolean hasTimestamp;
    
        public StreamRecord(T value) {
            this.value = value;
        }
    
        public StreamRecord(T value, long timestamp) {
            this.value = value;
            this.timestamp = timestamp;
            this.hasTimestamp = true;
        }
    
    ... ...
    }
    

    相关文章

      网友评论

          本文标题:Flink Watermark中extractTimestamp

          本文链接:https://www.haomeiwen.com/subject/ifnrgltx.html