Flink 相关概念
Flink 支持哪些流式特性
- exactly-once 语义。
- 支持基于事件发生时间的窗口计算,基于消息个数的窗口,基于处理时间的窗口计算,会话窗口。
- 支持状态存储。
- 支持 checkpoint。
- 高吞吐,低延迟
- 支持三种模式:本地模式,独立集群模式,Yarn/Mesos 模式。
Flink 时间
Flink 支持三种时间模型:事件时间(event time),处理时间(process time),收集时间(Ingestion time)
Flink 时间
在 Flink 内部如何指定以上三种时间类型:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props));
stream
.keyBy( (event) -> event.getUser() )
.timeWindow(Time.hours(1))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
事件时间(event time)& 水位标记(watermarks)
有序数据流无序数据流
并发数据流
如何产生时间戳和水位标记
- 在 Source 函数中直接产生
@Override
public void run(SourceContext<MyType> ctx) throws Exception {
while (/* condition */) {
MyType next = getNext();
ctx.collectWithTimestamp(next, next.getEventTimestamp());
if (next.hasWatermarkTime()) {
ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
}
}
}
- 指定时间戳和水位标记产生器
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<MyEvent> stream = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter(), typeInfo);
DataStream<MyEvent> withTimestampsAndWatermarks = stream
.filter( event -> event.severity() == WARNING )
.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
withTimestampsAndWatermarks
.keyBy( (event) -> event.getGroup() )
.timeWindow(Time.seconds(10))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
默认支持两种产生器
- 周期性产生水位标记
每个事件到来之后,均会调用 extractTimestamp 。但是间隔一段时间才会调用 getCurrentWatermark。
/**
* This generator generates watermarks assuming that elements arrive out of order,
* but only to a certain degree. The latest elements for a certain timestamp t will arrive
* at most n milliseconds after the earliest elements for timestamp t.
*/
public class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks<MyEvent> {
private final long maxOutOfOrderness = 3500; // 3.5 seconds
private long currentMaxTimestamp;
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
long timestamp = element.getCreationTime();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current highest timestamp minus the out-of-orderness bound
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}
- 根据事件时间特性产生水位标记
每个事件到来之后,先调用 extractTimestamp,紧接着就调用 checkAndGetNextWatermark,判断是否需要产生 watermark。
public class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks<MyEvent> {
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
return element.getCreationTime();
}
@Override
public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
return lastElement.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
}
}
Flink 窗口
Flink 支持四种窗口:翻转窗口,滑动窗口,会话窗口,全局窗口。
触发器
The trigger interface has five methods that allow a Trigger to react to different events:
The onElement() method is called for each element that is added to a window.
The onEventTime() method is called when a registered event-time timer fires.
The onProcessingTime() method is called when a registered processing-time timer fires.
The onMerge() method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, e.g. when using session windows.
Finally the clear() method performs any action needed upon removal of the corresponding window.
Two things to notice about the above methods are:
1) The first three decide how to act on their invocation event by returning a TriggerResult. The action can be one of the following:
CONTINUE: do nothing,
FIRE: trigger the computation,
PURGE: clear the elements in the window, and
FIRE_AND_PURGE: trigger the computation and clear the elements in the window afterwards.
2) Any of these methods can be used to register processing- or event-time timers for future actions.
网友评论