    0. 一些问题

    1. window中的数据保存在什么地方
    2. 一个window操作有几大组件

    1. Evictor

    是否对窗口中的数据进行计算由Trigger来决定, Evictor 则可以决定在计算前后对窗口中的元素进行剔除


    public interface Evictor<T, W extends Window> extends Serializable {
        // 在调用窗口函数之前剔除元素
        void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
        // 在调用窗口函数之后剔除元素
        void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
        // A context object that is given to {@link Evictor} methods.
      interface EvictorContext {
             * Returns the current processing time.
            long getCurrentProcessingTime();
             * Returns the metric group for this {@link Evictor}. This is the same metric
             * group that would be returned from {@link RuntimeContext#getMetricGroup()} in a user
             * function.
             * <p>You must not call methods that create metric objects
             * (such as {@link MetricGroup#counter(int)} multiple times but instead call once
             * and store the metric object in a field.
            MetricGroup getMetricGroup();
             * Returns the current watermark time.
            long getCurrentWatermark();

    2. Trigger


    public abstract class Trigger<T, W extends Window> implements Serializable {
        private static final long serialVersionUID = -4104633972991191369L;
      // 作用于窗口中的每个元素, 其返回结果来判断是否要将结果流至下游
        public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
        // 当设置了定时器的时候会触发该操作, 其中ctx可以用来注册回调函数, 具体参见下面的TriggerContext
        public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
        public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
         * Returns true if this trigger supports merging of trigger state and can therefore
         * be used with a
         * {@link org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner}.
         * <p>If this returns {@code true} you must properly implement
         * {@link #onMerge(Window, OnMergeContext)}
        public boolean canMerge() {
            return false;
         * Called when several windows have been merged into one window by the
         * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}.
        public void onMerge(W window, OnMergeContext ctx) throws Exception {
            throw new UnsupportedOperationException("This trigger does not support merging.");
        // 清除当前trigger对于给定窗口保留的状态
        public abstract void clear(W window, TriggerContext ctx) throws Exception;
         * A context object that is given to {@link Trigger} methods to allow them to register timer
         * callbacks and deal with state.
        public interface TriggerContext {
             * Returns the current processing time.
            long getCurrentProcessingTime();
             * Returns the metric group for this {@link Trigger}. This is the same metric
             * group that would be returned from {@link RuntimeContext#getMetricGroup()} in a user
             * function.
             * <p>You must not call methods that create metric objects
             * (such as {@link MetricGroup#counter(int)} multiple times but instead call once
             * and store the metric object in a field.
            MetricGroup getMetricGroup();
             * Returns the current watermark time.
            long getCurrentWatermark();
            // 注册回调
            void registerProcessingTimeTimer(long time);
            void registerEventTimeTimer(long time);
            // 删除回调
            void deleteProcessingTimeTimer(long time);
            void deleteEventTimeTimer(long time);
            // 获取状态数据
            <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);
         * Extension of {@link TriggerContext} that is given to
         * {@link Trigger#onMerge(Window, OnMergeContext)}.
        public interface OnMergeContext extends TriggerContext {
            <S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor);


    // 两种状态的组合情况
    public enum TriggerResult {
        CONTINUE(false, false),
        FIRE_AND_PURGE(true, true),
        FIRE(true, false),
        PURGE(false, true);
        private final boolean fire;
        private final boolean purge;
        TriggerResult(boolean fire, boolean purge) {
            this.purge = purge;
            this.fire = fire;
        public boolean isFire() {
            return fire;
        public boolean isPurge() {
            return purge;

    3. WindowAssigner


    public abstract class WindowAssigner<T, W extends Window> implements Serializable {
        private static final long serialVersionUID = 1L;
        // 返回某个元素上所赋予的窗口
        public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
        // 返回默认的trigger
        public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
        public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
        // 该窗口中的元素是否是依据eventTime来赋予的
        public abstract boolean isEventTime();
         * A context provided to the {@link WindowAssigner} that allows it to query the
         * current processing time.
         * <p>This is provided to the assigner by its containing
         * {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator},
         * which, in turn, gets it from the containing
         * {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
        public abstract static class WindowAssignerContext {
            public abstract long getCurrentProcessingTime();

    4. WindowedStream

    一个WindowedStream 的有4个组成部分:

    public class WindowedStream<T, K, W extends Window> {
        /** The keyed data stream that is windowed by this stream. */
        private final KeyedStream<T, K> input;
        /** The window assigner. */
        private final WindowAssigner<? super T, W> windowAssigner;
        /** The trigger that is used for window evaluation/emission. */
        private Trigger<? super T, ? super W> trigger;
        /** The evictor that is used for evicting elements before window evaluation. */
        private Evictor<? super T, ? super W> evictor;

    reduce 的实现为例, 可以看到通过有无evictor 分成了2种operator来处理

    public <R> SingleOutputStreamOperator<R> reduce(
      ReduceFunction<T> reduceFunction,
      WindowFunction<T, R, K, W> function,
      TypeInformation<R> resultType) {
      if (reduceFunction instanceof RichFunction) {
        throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction.");
        //clean the closures
      function = input.getExecutionEnvironment().clean(function);
      reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);
      final String opName = generateOperatorName(windowAssigner, trigger, evictor, reduceFunction, function);
      KeySelector<T, K> keySel = input.getKeySelector();
      OneInputStreamOperator<T, R> operator;
      if (evictor != null) {
        @SuppressWarnings({"unchecked", "rawtypes"})
        TypeSerializer<StreamRecord<T>> streamRecordSerializer =
        (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
        ListStateDescriptor<StreamRecord<T>> stateDesc =
        new ListStateDescriptor<>("window-contents", streamRecordSerializer);
        operator =
        new EvictingWindowOperator<>(windowAssigner,
          new InternalIterableWindowFunction<>(new ReduceApplyWindowFunction<>(reduceFunction, function)),
      } else {
        ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
        operator =
        new WindowOperator<>(windowAssigner,
          new InternalSingleValueWindowFunction<>(function),
      return input.transform(opName, resultType, operator);

    4.1 EvictingWindowOperator

    和其一起关联的还有相关的状态 ListState

    EvictingWindowOperator 也是 WindowOperator 的一种实现

    // 在operator初始化的时候调用, 可以看出evictingWindowState会在这里初始化
    public void open() throws Exception {
        evictorContext = new EvictorContext(null, null);
        evictingWindowState = (InternalListState<K, W, StreamRecord<IN>>)
                getOrCreateKeyedState(windowSerializer, evictingWindowStateDescriptor);

    其中会涉及到窗口的合并操作, 最终调用

    // 可以看到窗口的数据都是保存在evictingWindowState里面的
    // contents 为窗口中的元素
    TriggerResult triggerResult = triggerContext.onElement(element);
    if (triggerResult.isFire()) {
      Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
      if (contents == null) {
        // if we have no state, there is nothing to do
      emitWindowContents(window, contents, evictingWindowState);
    private void emitWindowContents(W window, Iterable<StreamRecord<IN>> contents, ListState<StreamRecord<IN>> windowState) throws Exception {
        // Work around type system restrictions...
      FluentIterable<TimestampedValue<IN>> recordsWithTimestamp = FluentIterable
      .transform(new Function<StreamRecord<IN>, TimestampedValue<IN>>() {
        public TimestampedValue<IN> apply(StreamRecord<IN> input) {
          return TimestampedValue.from(input);
      // 剔除数据
      evictorContext.evictBefore(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));
      FluentIterable<IN> projectedContents = recordsWithTimestamp
      .transform(new Function<TimestampedValue<IN>, IN>() {
        public IN apply(TimestampedValue<IN> input) {
          return input.getValue();
      processContext.window = triggerContext.window;
      // 执行用户的函数并输出数据
      userFunction.process(triggerContext.key, triggerContext.window, processContext, projectedContents, timestampedCollector);
      // 剔除数据并将其余数据放入windowState中,注意这里使用的是recordsWithTimestamp
      // 而不是 projectedContents
      evictorContext.evictAfter(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));
        //work around to fix FLINK-4369, remove the evicted elements from the windowState.
        //this is inefficient, but there is no other way to remove elements from ListState, which is an AppendingState.
      for (TimestampedValue<IN> record : recordsWithTimestamp) {

    4.2 WindowOperator

    和其一起关联的还有相关的状态 ReducingState

    其中一些具体的实现和上面介绍的 EvictingWindowOperator 相似, 不再赘述


