0. 一些问题
- window中的数据保存在什么地方
- 一个window操作有几大组件
1. Evictor
是否对窗口中的数据进行计算由Trigger
来决定, Evictor
则可以决定在计算前后对窗口中的元素进行剔除
具体API如下:
public interface Evictor<T, W extends Window> extends Serializable {
// 在调用窗口函数之前剔除元素
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
// 在调用窗口函数之后剔除元素
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
// A context object that is given to {@link Evictor} methods.
interface EvictorContext {
/**
* Returns the current processing time.
*/
long getCurrentProcessingTime();
/**
* Returns the metric group for this {@link Evictor}. This is the same metric
* group that would be returned from {@link RuntimeContext#getMetricGroup()} in a user
* function.
*
* <p>You must not call methods that create metric objects
* (such as {@link MetricGroup#counter(int)} multiple times but instead call once
* and store the metric object in a field.
*/
MetricGroup getMetricGroup();
/**
* Returns the current watermark time.
*/
long getCurrentWatermark();
}
}
2. Trigger
决定什么时候对窗口中的数据进行计算,并发送给下游
public abstract class Trigger<T, W extends Window> implements Serializable {
private static final long serialVersionUID = -4104633972991191369L;
// 作用于窗口中的每个元素, 其返回结果来判断是否要将结果流至下游
public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
// 当设置了定时器的时候会触发该操作, 其中ctx可以用来注册回调函数, 具体参见下面的TriggerContext
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
/**
* Returns true if this trigger supports merging of trigger state and can therefore
* be used with a
* {@link org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner}.
*
* <p>If this returns {@code true} you must properly implement
* {@link #onMerge(Window, OnMergeContext)}
*/
public boolean canMerge() {
return false;
}
/**
* Called when several windows have been merged into one window by the
* {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}.
*/
public void onMerge(W window, OnMergeContext ctx) throws Exception {
throw new UnsupportedOperationException("This trigger does not support merging.");
}
// 清除当前trigger对于给定窗口保留的状态
public abstract void clear(W window, TriggerContext ctx) throws Exception;
/**
* A context object that is given to {@link Trigger} methods to allow them to register timer
* callbacks and deal with state.
*/
public interface TriggerContext {
/**
* Returns the current processing time.
*/
long getCurrentProcessingTime();
/**
* Returns the metric group for this {@link Trigger}. This is the same metric
* group that would be returned from {@link RuntimeContext#getMetricGroup()} in a user
* function.
*
* <p>You must not call methods that create metric objects
* (such as {@link MetricGroup#counter(int)} multiple times but instead call once
* and store the metric object in a field.
*/
MetricGroup getMetricGroup();
/**
* Returns the current watermark time.
*/
long getCurrentWatermark();
// 注册回调
void registerProcessingTimeTimer(long time);
void registerEventTimeTimer(long time);
// 删除回调
void deleteProcessingTimeTimer(long time);
void deleteEventTimeTimer(long time);
// 获取状态数据
<S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);
/**
* Extension of {@link TriggerContext} that is given to
* {@link Trigger#onMerge(Window, OnMergeContext)}.
*/
public interface OnMergeContext extends TriggerContext {
<S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor);
}
}
TriggerResult.java
// 两种状态的组合情况
public enum TriggerResult {
CONTINUE(false, false),
FIRE_AND_PURGE(true, true),
FIRE(true, false),
PURGE(false, true);
private final boolean fire;
private final boolean purge;
TriggerResult(boolean fire, boolean purge) {
this.purge = purge;
this.fire = fire;
}
public boolean isFire() {
return fire;
}
public boolean isPurge() {
return purge;
}
}
3. WindowAssigner
用于将窗口/多个窗口赋予到某个元素上
public abstract class WindowAssigner<T, W extends Window> implements Serializable {
private static final long serialVersionUID = 1L;
// 返回某个元素上所赋予的窗口
public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
// 返回默认的trigger
public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
// 该窗口中的元素是否是依据eventTime来赋予的
public abstract boolean isEventTime();
/**
* A context provided to the {@link WindowAssigner} that allows it to query the
* current processing time.
*
* <p>This is provided to the assigner by its containing
* {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator},
* which, in turn, gets it from the containing
* {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
*/
public abstract static class WindowAssignerContext {
public abstract long getCurrentProcessingTime();
}
}
4. WindowedStream
一个WindowedStream
的有4个组成部分:
public class WindowedStream<T, K, W extends Window> {
/** The keyed data stream that is windowed by this stream. */
private final KeyedStream<T, K> input;
/** The window assigner. */
private final WindowAssigner<? super T, W> windowAssigner;
/** The trigger that is used for window evaluation/emission. */
private Trigger<? super T, ? super W> trigger;
/** The evictor that is used for evicting elements before window evaluation. */
private Evictor<? super T, ? super W> evictor;
}
以 reduce
的实现为例, 可以看到通过有无evictor
分成了2种operator
来处理
public <R> SingleOutputStreamOperator<R> reduce(
ReduceFunction<T> reduceFunction,
WindowFunction<T, R, K, W> function,
TypeInformation<R> resultType) {
if (reduceFunction instanceof RichFunction) {
throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction.");
}
//clean the closures
function = input.getExecutionEnvironment().clean(function);
reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);
final String opName = generateOperatorName(windowAssigner, trigger, evictor, reduceFunction, function);
KeySelector<T, K> keySel = input.getKeySelector();
OneInputStreamOperator<T, R> operator;
if (evictor != null) {
@SuppressWarnings({"unchecked", "rawtypes"})
TypeSerializer<StreamRecord<T>> streamRecordSerializer =
(TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
ListStateDescriptor<StreamRecord<T>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);
operator =
new EvictingWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new InternalIterableWindowFunction<>(new ReduceApplyWindowFunction<>(reduceFunction, function)),
trigger,
evictor,
allowedLateness,
lateDataOutputTag);
} else {
ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
reduceFunction,
input.getType().createSerializer(getExecutionEnvironment().getConfig()));
operator =
new WindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(function),
trigger,
allowedLateness,
lateDataOutputTag);
}
return input.transform(opName, resultType, operator);
}
4.1 EvictingWindowOperator
和其一起关联的还有相关的状态 ListState
EvictingWindowOperator
也是 WindowOperator
的一种实现
// 在operator初始化的时候调用, 可以看出evictingWindowState会在这里初始化
public void open() throws Exception {
super.open();
evictorContext = new EvictorContext(null, null);
evictingWindowState = (InternalListState<K, W, StreamRecord<IN>>)
getOrCreateKeyedState(windowSerializer, evictingWindowStateDescriptor);
}
其中会涉及到窗口的合并操作, 最终调用
// 可以看到窗口的数据都是保存在evictingWindowState里面的
evictingWindowState.setCurrentNamespace(window);
evictingWindowState.add(element);
// contents 为窗口中的元素
TriggerResult triggerResult = triggerContext.onElement(element);
if (triggerResult.isFire()) {
Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
if (contents == null) {
// if we have no state, there is nothing to do
continue;
}
emitWindowContents(window, contents, evictingWindowState);
}
private void emitWindowContents(W window, Iterable<StreamRecord<IN>> contents, ListState<StreamRecord<IN>> windowState) throws Exception {
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
// Work around type system restrictions...
FluentIterable<TimestampedValue<IN>> recordsWithTimestamp = FluentIterable
.from(contents)
.transform(new Function<StreamRecord<IN>, TimestampedValue<IN>>() {
@Override
public TimestampedValue<IN> apply(StreamRecord<IN> input) {
return TimestampedValue.from(input);
}
});
// 剔除数据
evictorContext.evictBefore(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));
FluentIterable<IN> projectedContents = recordsWithTimestamp
.transform(new Function<TimestampedValue<IN>, IN>() {
@Override
public IN apply(TimestampedValue<IN> input) {
return input.getValue();
}
});
processContext.window = triggerContext.window;
// 执行用户的函数并输出数据
userFunction.process(triggerContext.key, triggerContext.window, processContext, projectedContents, timestampedCollector);
// 剔除数据并将其余数据放入windowState中,注意这里使用的是recordsWithTimestamp
// 而不是 projectedContents
evictorContext.evictAfter(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));
//work around to fix FLINK-4369, remove the evicted elements from the windowState.
//this is inefficient, but there is no other way to remove elements from ListState, which is an AppendingState.
windowState.clear();
for (TimestampedValue<IN> record : recordsWithTimestamp) {
windowState.add(record.getStreamRecord());
}
}
4.2 WindowOperator
和其一起关联的还有相关的状态 ReducingState
其中一些具体的实现和上面介绍的 EvictingWindowOperator
相似, 不再赘述
ref:
- https://zhuanlan.zhihu.com/p/61445308
- flink 1.9 源码
网友评论