Apache Flink® — Stateful Computations over Data Streams
![](https://img.haomeiwen.com/i10186629/044548dc74e23511.png)
![](https://img.haomeiwen.com/i10186629/759977b0caf3b138.png)
WINDOWS
-
滚动窗口
image.png
-
滑动窗口
image.png
-
GAP窗口
image.png
-Glable 窗口
![](https://img.haomeiwen.com/i10186629/74a43bc51126e024.png)
事件 时间
Event Time / Processing Time / Ingestion Time
![](https://img.haomeiwen.com/i10186629/7d550fa573a19387.png)
Event Time and Watermarks
水印解决乱序
The mechanism in Flink to measure progress in event time is watermarks. Watermarks flow as part of the data stream and carry a timestamp t. A Watermark(t) declares that event time has reached time t in that stream, meaning that there should be no more elements from the stream with a timestamp t’ <= t (i.e. events with timestamps older or equal to the watermark).
watermark只是应对乱序的办法之一,大多是启发式(一个基于直观或经验构造的算法
)的,在延迟和完整性之间抉择。(如果没有延迟,就不够完整;如果有延迟,极端情况就是批处理,当然完整性足够高)
![](https://img.haomeiwen.com/i10186629/7864392de4467a53.png)
![](https://img.haomeiwen.com/i10186629/9995d63a22d3046a.png)
Watermarks in Parallel Streams
![](https://img.haomeiwen.com/i10186629/f4b78529b110e6cc.png)
状态
Keyed State and Operator State
There are two basic kinds of state in Flink: Keyed State
and Operator State
.
Keyed State
Keyed State is always relative to keys and can only be used in functions and operators on a KeyedStream
.
You can think of Keyed State as Operator State that has been partitioned, or sharded, with exactly one state-partition per key. Each keyed-state is logically bound to a unique composite of <parallel-operator-instance, key>
, and since each key “belongs” to exactly one parallel instance of a keyed operator, we can think of this simply as <operator, key>
.
Keyed State is further organized into so-called Key Groups. Key Groups are the atomic unit by which Flink can redistribute Keyed State; there are exactly as many Key Groups as the defined maximum parallelism. During execution each parallel instance of a keyed operator works with the keys for one or more Key Groups.
Operator State
With Operator State (or non-keyed state), each operator state is bound to one parallel operator instance. The Kafka Connector is a good motivating example for the use of Operator State in Flink. Each parallel instance of the Kafka consumer maintains a map of topic partitions and offsets as its Operator State.
The Operator State interfaces support redistributing state among parallel operator instances when the parallelism is changed. There can be different schemes for doing this redistribution.
public class DeduplicateFilter extends RichFilterFunction<UserBehaviorBO> {
private MapState<Integer, Long> duplicateMap;
@Override
public boolean filter(UserBehaviorBO userBehaviorBO) throws Exception {
if (null == userBehaviorBO) {
return false;
}
Integer id = userBehaviorBO.getSkuId();
if (duplicateMap.contains(id)) {
return false;
} else {
duplicateMap.put(id, System.currentTimeMillis()/1000);
Iterator<Integer> it=duplicateMap.keys().iterator();
while (it.hasNext()){
int key=it.next();
System.out.println("state key :"+key+" ttl:"+(System.currentTimeMillis()/1000-duplicateMap.get(key)));
}
return true;
}
}
@Override
public void open(Configuration parameters) throws Exception {
MapStateDescriptor<Integer, Long> descriptor = new MapStateDescriptor<Integer, Long>("duplicate", TypeInformation.of(Integer.class),
TypeInformation.of(Long.class));
System.out.println("open state");
StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.seconds(60))
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.cleanupIncrementally(5, false).build();
descriptor.enableTimeToLive(stateTtlConfig);
this.duplicateMap = this.getRuntimeContext().getMapState(descriptor);
}
}
网友评论