Apache Flink® — Stateful Computations over Data Streams

-Glable 窗口

事件 时间
Event Time / Processing Time / Ingestion Time

Event Time and Watermarks
The mechanism in Flink to measure progress in event time is watermarks. Watermarks flow as part of the data stream and carry a timestamp t. A Watermark(t) declares that event time has reached time t in that stream, meaning that there should be no more elements from the stream with a timestamp t’ <= t (i.e. events with timestamps older or equal to the watermark).

Watermarks in Parallel Streams

Keyed State and Operator State
There are two basic kinds of state in Flink: Keyed State
and Operator State
Keyed State
Keyed State is always relative to keys and can only be used in functions and operators on a KeyedStream
You can think of Keyed State as Operator State that has been partitioned, or sharded, with exactly one state-partition per key. Each keyed-state is logically bound to a unique composite of <parallel-operator-instance, key>
, and since each key “belongs” to exactly one parallel instance of a keyed operator, we can think of this simply as <operator, key>
Keyed State is further organized into so-called Key Groups. Key Groups are the atomic unit by which Flink can redistribute Keyed State; there are exactly as many Key Groups as the defined maximum parallelism. During execution each parallel instance of a keyed operator works with the keys for one or more Key Groups.
Operator State
With Operator State (or non-keyed state), each operator state is bound to one parallel operator instance. The Kafka Connector is a good motivating example for the use of Operator State in Flink. Each parallel instance of the Kafka consumer maintains a map of topic partitions and offsets as its Operator State.
The Operator State interfaces support redistributing state among parallel operator instances when the parallelism is changed. There can be different schemes for doing this redistribution.
public class DeduplicateFilter extends RichFilterFunction<UserBehaviorBO> {
private MapState<Integer, Long> duplicateMap;
public boolean filter(UserBehaviorBO userBehaviorBO) throws Exception {
if (null == userBehaviorBO) {
return false;
Integer id = userBehaviorBO.getSkuId();
if (duplicateMap.contains(id)) {
return false;
} else {
duplicateMap.put(id, System.currentTimeMillis()/1000);
Iterator<Integer> it=duplicateMap.keys().iterator();
while (it.hasNext()){
int key=it.next();
System.out.println("state key :"+key+" ttl:"+(System.currentTimeMillis()/1000-duplicateMap.get(key)));
return true;
public void open(Configuration parameters) throws Exception {
MapStateDescriptor<Integer, Long> descriptor = new MapStateDescriptor<Integer, Long>("duplicate", TypeInformation.of(Integer.class),
System.out.println("open state");
StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.seconds(60))
.cleanupIncrementally(5, false).build();
this.duplicateMap = this.getRuntimeContext().getMapState(descriptor);