1、配置时间特性
- ProcessingTime,指算子根据处理机器的系统时钟决定数据流当前的时间。无须等待水位线来驱动事件时间前进。
- EventTime,蒜子根据数据自身包含的信息决定当前时间。依靠水位线声明某个时间间隔内所有时间戳都已经接受时,事件时间窗口才触发
- IngestionTime,指定每个接受的记录都把数据源算子的处理时间作为事件时间的时间戳并自动生成水位线。
默认情况下是处理时间,设置其他时间特性使用
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
1.1 分配时间戳和生成水位线
- 水位线是当前事件携带的一个时间戳,用来触发计时器(比如窗口),通常比当前事件的时间早一些,告知系统该事件来时,只处理更早一个时间点之前的数据。
周期性水位分配器
- 系统默认200毫秒发一个水位线
- Flink会根据设定的间隔,调用getCurrentWatermark()方法,如果该方法返回值非空,且它的时间戳大于上一个水位线的时间戳,那么算子就会发出一个新的水位线
- 也可以使用BoundedOutOfOrdernessTimeStampExtractor来替代下面这段
env.getConfig().setAutoWatermarkInterval(100);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
})
// 乱序数据设置时间戳和watermark
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<SensorReading>() {
private final long maxOutBoundary = 3 * 1000L;
private long currentMaxTimestamp = Integer.MIN_VALUE;
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutBoundary);
}
@Override
public long extractTimestamp(SensorReading sensorReading, long l) {
//获取当前记录的时间戳
long currentTs = sensorReading.getTimestamp() * 1000L;
// 更新最大的时间戳
currentMaxTimestamp = Math.max(currentMaxTimestamp, currentTs);
// 返回记录的时间戳
return currentTs;
}
});
定点水位线分配器
- 接口中的checkAndGetNextWatermark()方法会在针对每个事件的extractTimestamp()方法后立刻调用。决定是否生成新水位线,如果该方法返回一个非空、且大于之前值的水位线。算子就会将这个新水位线发出
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
element.getCreationTime
}
override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
}
}
1.2 水位线、延迟及完整性
- 水位线可用于平衡延迟和结果的完整性。
- 它们控制着在执行某些计算前需要等待数据到达的时间。
2、处理函数
- 处理函数可以访问记录的时间戳和水位线,并支持注册在将来某个特定时间出发的计时器
2.1 处理函数接口
- 以KeyedProcessFunction为例,其中processElement会对每个到来的记录进行处理,其中的Context可以访问到水位等信息,第一个参数是输入,第三个是输出。
- 在processElement中可以注册计数器registerProcessingTimeTimer,然后在onTimer这里写触发时干什么
dataStream.keyBy("id")
.process( new MyProcess() )
.print();
public static class MyProcess extends KeyedProcessFunction<Tuple, SensorReading, Integer> {
ValueState<Long> tsTimerState;
@Override
public void open(Configuration parameters) throws Exception {
tsTimerState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("ts-timer", Long.class));
}
@Override
public void processElement(SensorReading value, Context ctx, Collector<Integer> out) throws Exception {
out.collect(value.getId().length());
// context
// Timestamp of the element currently being processed or timestamp of a firing timer.
ctx.timestamp();
// Get key of the element being processed.
ctx.getCurrentKey();
// ctx.output();
ctx.timerService().currentProcessingTime();
ctx.timerService().currentWatermark();
// 在5处理时间的5秒延迟后触发
ctx.timerService().registerProcessingTimeTimer( ctx.timerService().currentProcessingTime() + 5000L);
tsTimerState.update(ctx.timerService().currentProcessingTime() + 1000L);
// ctx.timerService().registerEventTimeTimer((value.getTimestamp() + 10) * 1000L);
// 删除指定时间触发的定时器
// ctx.timerService().deleteProcessingTimeTimer(tsTimerState.value());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
System.out.println(timestamp + " 定时器触发");
ctx.getCurrentKey();
// ctx.output();
ctx.timeDomain();
}
@Override
public void close() throws Exception {
tsTimerState.clear();
}
}
2.2 时间服务和计时器
- Context和onTimerContext对象中的TimerService提供了一下方法:
- currentProcessingTime()
- currentWatermark()
- registerProcessingTimeTimer(long timestamp)
- registerEventTimeTimer(long timestamp)
- deleteProcessingTimeTimer(long timestamp)
- deleteEventTimeTimer(long timestamp)
- 计时器处罚会调用onTimer()回调函数
- 系统对于processElement()和onTimer()两个方法的调用时同步的
- 每个键值可以有多个计时器,但具体到每个时间戳只能有一个。
- 默认会将计时器的时间戳放到一个优先队列中。
- 所有计时器会和其他状态一起写入检查点。
2.3 副输出
- 可以发出多条数据流
SingleOutputStreamOperator<SensorReading> highTempStream = dataStream.process(new ProcessFunction<SensorReading, SensorReading>() {
@Override
public void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {
// 判断温度,大于30度,高温流输出到主流;小于低温流输出到侧输出流
if (value.getTemperature() > 30) {
out.collect(value);
} else {
ctx.output(lowTempTag, value);
}
}
});
highTempStream.getSideOutput(lowTempTag).print("low-temp");
3、窗口算子
- 基于时间的窗口分配器会根据元素时间的时间戳或当前处理时间将其分配到一个或多个窗口,每个时间窗口都有一个开始时间戳和结束时间戳
- 所有内置的窗口分配器都提供了一个默认的触发器,一旦时间超过了窗口的结束时间就会触发窗口计算
- 时间区间是左闭右开的
3.1 滚动窗口
滚动窗口不会重叠,滑动窗口可以重叠,会话窗口是根据没有收到信息的间隔来划定窗口。滚动窗口比较常用。https://ashiamd.github.io/docsify-notes/#/study/BigData/Flink/%E5%B0%9A%E7%A1%85%E8%B0%B7Flink%E5%85%A5%E9%97%A8%E5%88%B0%E5%AE%9E%E6%88%98-%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0?id=_622-timewindow
这里使用.window就可以创建默认窗口了,类型为TimeWindow。
DataStream<Integer> resultStream = dataStream.keyBy("id")
.window(TumblingProcessingTimeWindows.of(Time.seconds(15)))
3.2 在窗口上应用的函数
aggregateFunction
- 以增量方式应用于窗口内的元素,状态只有一个值
- 第一个参数是输入,第三个是输出,第二个是累加用于存储中间值的类型
DataStream<Integer> resultStream = dataStream.keyBy("id")
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(new AggregateFunction<SensorReading, Integer, Integer>() {
// 新建的累加器
@Override
public Integer createAccumulator() {
return 0;
}
// 每个数据在上次的基础上累加
@Override
public Integer add(SensorReading value, Integer accumulator) {
return accumulator + 1;
}
// 返回结果值
@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}
// 分区合并结果(TimeWindow一般用不到,SessionWindow可能需要考虑合并)
@Override
public Integer merge(Integer a, Integer b) {
return a + b;
}
});
ProcessWindowFunction
- 就是窗口的处理函数,可以获取到环境信息
SingleOutputStreamOperator<SensorReading> minTempStream = keyStream
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<SensorReading>() {
private final long maxOutBoundary = 4 * 1000L;
private long currentMaxTimestamp = Integer.MIN_VALUE;
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutBoundary);
}
@Override
public long extractTimestamp(SensorReading sensorReading, long l) {
//获取当前记录的时间戳
long currentTs = sensorReading.getTimestamp() * 1000L;
// 更新最大的时间戳
currentMaxTimestamp = Math.max(currentMaxTimestamp, currentTs);
// 返回记录的时间戳
return currentTs;
}
})
.keyBy(SensorReading::getId)
.window(TumblingEventTimeWindows.of(Time.seconds(2)))
.process(new ProcessWindowFunction<SensorReading, SensorReading, String, TimeWindow>() {
@Override
public void process(String s, ProcessWindowFunction<SensorReading, SensorReading, String, TimeWindow>.Context context, Iterable<SensorReading> iterable, Collector<SensorReading> collector) throws Exception {
System.out.println(context.currentWatermark());
}
});
3.3 自定义窗口算子
- 这个就是自定义窗口,不实用现成的窗口函数
- 主要的几个组件是分配器、触发器和移除器

分配器 windowAssigner
- 会返回0个或多个窗口对象
- 会提供默认的触发器
new WindowAssigner<SensorReading, Window>() {
@Override
public Collection<Window> assignWindows(SensorReading sensorReading, long l, WindowAssignerContext windowAssignerContext) {
// long starttime = l - (l % windowsize);
// long endtime = starttime + endtime;
// collector.singletonList(new TimeWindow(starttime, endtime));
}
@Override
public Trigger<SensorReading, Window> getDefaultTrigger(StreamExecutionEnvironment streamExecutionEnvironment) {
return EventTimeTrigger.create();
}
@Override
public TypeSerializer<Window> getWindowSerializer(ExecutionConfig executionConfig) {
return null;
}
@Override
public boolean isEventTime() {
return false;
}
}
触发器
- 定义何时对窗口进行计算并发出结果
- 默认触发器会在处理时间或水位线超过了窗口结束边界的时间戳时触发
- 每次触发器调用都会生成一个TriggerResult,可以是以下几个值:CONTINUE(什么都不做)、FIRE(发出结果)、PURGE(清楚窗口内容并删除窗口自身及元数据)、FIRE_AND_PURGE
- trigger本身还可以操纵context去注册计时器(是计时器不是触发器,就是ontimer的那个东西)
SingleOutputStreamOperator<PageViewCount> uvStream = dataStream
.filter(data -> "pv".equals(data.getBehavior()))
.timeWindowAll(Time.hours(1))
.trigger(new MyTrigger())
.process(new UvCountResultWithBloomFliter());
public static class MyTrigger extends Trigger<UserBehavior, TimeWindow> {
@Override
public TriggerResult onElement(UserBehavior element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
// 每一条数据来到,直接触发窗口计算,并且直接清空窗口
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
}
}
移除器
- 用于在窗口执行计算前或后从窗口中删除元素
new Evictor<SensorReading, TimeWindow>() {
@Override
public void evictBefore(Iterable<TimestampedValue<SensorReading>> iterable, int i, TimeWindow timeWindow, EvictorContext evictorContext) {
}
@Override
public void evictAfter(Iterable<TimestampedValue<SensorReading>> iterable, int i, TimeWindow timeWindow, EvictorContext evictorContext) {
}
}
4、基于时间的双流Join
- 基于时间间隔的Join,对两条流中拥有相同键值以及彼此之间时间戳不超过某一指定间隔的事件进行Join
orangeStream
.keyBy(<KeySelector>)
.intervalJoin(greenStream.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String(){
@Override
public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
out.collect(first + "," + second);
}
});
- 基于窗口的join
input1.join(input2)
.where() // 左流限制条件
.equalTo() // 右流限制条件
.window()
.apply() // 具体join的joinFunction
5、处理迟到的数据
5.1 用副输出收集迟到的数据
OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late") {
};
// 基于事件时间的开窗聚合,统计15秒内温度的最小值
SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id")
.timeWindow(Time.seconds(15))
.sideOutputLateData(outputTag)
.minBy("temperature");
minTempStream.print("minTemp");
minTempStream.getSideOutput(outputTag).print("late");
env.execute();
5.2 基于迟到数据更新结果
- allowedLateness 指定这个之后水位线超过窗口结束时间仍会保留数据 这里指定时间这么久
- 会触发多次计算,水位线到了会触发,迟到数据来了也会触发
SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id")
.timeWindow(Time.seconds(15))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(outputTag)
.minBy("temperature");
网友评论