美文网首页
聊聊flink DataStream的join操作

聊聊flink DataStream的join操作

作者: go4it | 来源:发表于2019-01-10 14:04 被阅读46次

    本文主要研究一下flink DataStream的join操作

    实例

    stream.join(otherStream)
        .where(<KeySelector>)
        .equalTo(<KeySelector>)
        .window(<WindowAssigner>)
        .apply(<JoinFunction>)
    
    • 这里首先调用join,与另外一个stream合并,返回的是JoinedStreams,之后就可以调用JoinedStreams的where操作来构建Where对象构造条件;Where有equalTo操作可以构造EqualTo,而EqualTo有window操作可以构造WithWindow,而WithWindow可以设置windowAssigner、trigger、evictor、allowedLateness,它提供apply操作

    DataStream.join

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java

    @Public
    public class DataStream<T> {
        //......
    
        /**
         * Creates a join operation. See {@link JoinedStreams} for an example of how the keys
         * and window can be specified.
         */
        public <T2> JoinedStreams<T, T2> join(DataStream<T2> otherStream) {
            return new JoinedStreams<>(this, otherStream);
        }
    
        //......
    }
    
    • DataStream提供了join方法,用于执行join操作,它返回的是JoinedStreams

    JoinedStreams

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java

    @Public
    public class JoinedStreams<T1, T2> {
    
        /** The first input stream. */
        private final DataStream<T1> input1;
    
        /** The second input stream. */
        private final DataStream<T2> input2;
    
        public JoinedStreams(DataStream<T1> input1, DataStream<T2> input2) {
            this.input1 = requireNonNull(input1);
            this.input2 = requireNonNull(input2);
        }
    
        public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector)  {
            requireNonNull(keySelector);
            final TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
            return where(keySelector, keyType);
        }
    
        public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector, TypeInformation<KEY> keyType)  {
            requireNonNull(keySelector);
            requireNonNull(keyType);
            return new Where<>(input1.clean(keySelector), keyType);
        }
    
        //......
    }
    
    • JoinedStreams主要是提供where操作来构建Where对象

    Where

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java

        @Public
        public class Where<KEY> {
    
            private final KeySelector<T1, KEY> keySelector1;
            private final TypeInformation<KEY> keyType;
    
            Where(KeySelector<T1, KEY> keySelector1, TypeInformation<KEY> keyType) {
                this.keySelector1 = keySelector1;
                this.keyType = keyType;
            }
    
            public EqualTo equalTo(KeySelector<T2, KEY> keySelector)  {
                requireNonNull(keySelector);
                final TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
                return equalTo(keySelector, otherKey);
            }
    
            public EqualTo equalTo(KeySelector<T2, KEY> keySelector, TypeInformation<KEY> keyType)  {
                requireNonNull(keySelector);
                requireNonNull(keyType);
    
                if (!keyType.equals(this.keyType)) {
                    throw new IllegalArgumentException("The keys for the two inputs are not equal: " +
                            "first key = " + this.keyType + " , second key = " + keyType);
                }
    
                return new EqualTo(input2.clean(keySelector));
            }
    
            //......
    
        }
    
    • Where对象主要提供equalTo操作用于构建EqualTo对象

    EqualTo

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java

            @Public
            public class EqualTo {
    
                private final KeySelector<T2, KEY> keySelector2;
    
                EqualTo(KeySelector<T2, KEY> keySelector2) {
                    this.keySelector2 = requireNonNull(keySelector2);
                }
    
                /**
                 * Specifies the window on which the join operation works.
                 */
                @PublicEvolving
                public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
                    return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null, null);
                }
            }
    
    • EqualTo对象提供window操作用于构建WithWindow对象

    WithWindow

    /flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java

        @Public
        public static class WithWindow<T1, T2, KEY, W extends Window> {
    
            private final DataStream<T1> input1;
            private final DataStream<T2> input2;
    
            private final KeySelector<T1, KEY> keySelector1;
            private final KeySelector<T2, KEY> keySelector2;
            private final TypeInformation<KEY> keyType;
    
            private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;
    
            private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger;
    
            private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;
    
            private final Time allowedLateness;
    
            private CoGroupedStreams.WithWindow<T1, T2, KEY, W> coGroupedWindowedStream;
    
            @PublicEvolving
            protected WithWindow(DataStream<T1> input1,
                    DataStream<T2> input2,
                    KeySelector<T1, KEY> keySelector1,
                    KeySelector<T2, KEY> keySelector2,
                    TypeInformation<KEY> keyType,
                    WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
                    Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
                    Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
                    Time allowedLateness) {
    
                this.input1 = requireNonNull(input1);
                this.input2 = requireNonNull(input2);
    
                this.keySelector1 = requireNonNull(keySelector1);
                this.keySelector2 = requireNonNull(keySelector2);
                this.keyType = requireNonNull(keyType);
    
                this.windowAssigner = requireNonNull(windowAssigner);
    
                this.trigger = trigger;
                this.evictor = evictor;
    
                this.allowedLateness = allowedLateness;
            }
    
            @PublicEvolving
            public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
                return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
                        windowAssigner, newTrigger, evictor, allowedLateness);
            }
    
            @PublicEvolving
            public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
                return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
                        windowAssigner, trigger, newEvictor, allowedLateness);
            }
    
            @PublicEvolving
            public WithWindow<T1, T2, KEY, W> allowedLateness(Time newLateness) {
                return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
                    windowAssigner, trigger, evictor, newLateness);
            }
    
            public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) {
                TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
                    function,
                    JoinFunction.class,
                    0,
                    1,
                    2,
                    TypeExtractor.NO_INDEX,
                    input1.getType(),
                    input2.getType(),
                    "Join",
                    false);
    
                return apply(function, resultType);
            }
    
            @PublicEvolving
            @Deprecated
            public <T> SingleOutputStreamOperator<T> with(JoinFunction<T1, T2, T> function) {
                return (SingleOutputStreamOperator<T>) apply(function);
            }
    
            public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
                //clean the closure
                function = input1.getExecutionEnvironment().clean(function);
    
                coGroupedWindowedStream = input1.coGroup(input2)
                    .where(keySelector1)
                    .equalTo(keySelector2)
                    .window(windowAssigner)
                    .trigger(trigger)
                    .evictor(evictor)
                    .allowedLateness(allowedLateness);
    
                return coGroupedWindowedStream
                        .apply(new FlatJoinCoGroupFunction<>(function), resultType);
            }
    
            @PublicEvolving
            @Deprecated
            public <T> SingleOutputStreamOperator<T> with(FlatJoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
                return (SingleOutputStreamOperator<T>) apply(function, resultType);
            }
    
            public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function) {
                TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
                    function,
                    FlatJoinFunction.class,
                    0,
                    1,
                    2,
                    new int[]{2, 0},
                    input1.getType(),
                    input2.getType(),
                    "Join",
                    false);
    
                return apply(function, resultType);
            }
    
            @PublicEvolving
            @Deprecated
            public <T> SingleOutputStreamOperator<T> with(FlatJoinFunction<T1, T2, T> function) {
                return (SingleOutputStreamOperator<T>) apply(function);
            }
    
            public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
                //clean the closure
                function = input1.getExecutionEnvironment().clean(function);
    
                coGroupedWindowedStream = input1.coGroup(input2)
                    .where(keySelector1)
                    .equalTo(keySelector2)
                    .window(windowAssigner)
                    .trigger(trigger)
                    .evictor(evictor)
                    .allowedLateness(allowedLateness);
    
                return coGroupedWindowedStream
                        .apply(new JoinCoGroupFunction<>(function), resultType);
            }
    
            @PublicEvolving
            @Deprecated
            public <T> SingleOutputStreamOperator<T> with(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
                return (SingleOutputStreamOperator<T>) apply(function, resultType);
            }
    
            @VisibleForTesting
            Time getAllowedLateness() {
                return allowedLateness;
            }
    
            @VisibleForTesting
            CoGroupedStreams.WithWindow<T1, T2, KEY, W> getCoGroupedWindowedStream() {
                return coGroupedWindowedStream;
            }
        }
    
    • WithWindow可以设置windowAssigner、trigger、evictor、allowedLateness,它提供apply操作(with操作被标记为废弃)
    • apply操作可以接收JoinFunction或者FlatJoinFunction,它内部是使用DataStream的coGroup方法创建CoGroupedStreams,之后将自身的where及equalTo的keySelector、windowAssigner、trigger、evictor、allowedLateness都设置给CoGroupedStreams,最后调用CoGroupedStreams的WithWindow对象的apply方法
    • CoGroupedStreams的WithWindow对象的apply方法与JoinedStreams的WithWindow对象的apply方法参数不同,CoGroupedStreams的WithWindow的apply方法接收的是CoGroupFunction,因而JoinedStreams的WithWindow对象的apply方法内部将JoinFunction或者FlatJoinFunction包装为CoGroupFunction(JoinFunction使用JoinCoGroupFunction包装,FlatJoinFunction使用FlatJoinCoGroupFunction包装)传递给CoGroupedStreams的WithWindow的apply方法

    JoinFunction

    flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/JoinFunction.java

    @Public
    @FunctionalInterface
    public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable {
    
        /**
         * The join method, called once per joined pair of elements.
         *
         * @param first The element from first input.
         * @param second The element from second input.
         * @return The resulting element.
         *
         * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
         *                   to fail and may trigger recovery.
         */
        OUT join(IN1 first, IN2 second) throws Exception;
    }
    
    • JoinFunction继承了Function、Serializable,它定义了join操作,默认是inner join的语义,如果需要outer join,可以使用CoGroupFunction

    FlatJoinFunction

    flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/FlatJoinFunction.java

    @Public
    @FunctionalInterface
    public interface FlatJoinFunction<IN1, IN2, OUT> extends Function, Serializable {
    
        /**
         * The join method, called once per joined pair of elements.
         *
         * @param first The element from first input.
         * @param second The element from second input.
         * @param out The collector used to return zero, one, or more elements.
         *
         * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
         *                   to fail and may trigger recovery.
         */
        void join (IN1 first, IN2 second, Collector<OUT> out) throws Exception;
    }
    
    • FlatJoinFunction继承了Function、Serializable,它定义了join操作,默认是inner join的语义,如果需要outer join,可以使用CoGroupFunction;与JoinFunction的join方法不同,FlatJoinFunction的join方法多了Collector参数,可以用来发射0条、1条或者多条数据,所以是Flat命名

    CoGroupedStreams

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java

    @Public
    public class CoGroupedStreams<T1, T2> {
        //......
    
    @Public
        public static class WithWindow<T1, T2, KEY, W extends Window> {
            private final DataStream<T1> input1;
            private final DataStream<T2> input2;
    
            private final KeySelector<T1, KEY> keySelector1;
            private final KeySelector<T2, KEY> keySelector2;
    
            private final TypeInformation<KEY> keyType;
    
            private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;
    
            private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger;
    
            private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;
    
            private final Time allowedLateness;
    
            private WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowedStream;
    
            protected WithWindow(DataStream<T1> input1,
                    DataStream<T2> input2,
                    KeySelector<T1, KEY> keySelector1,
                    KeySelector<T2, KEY> keySelector2,
                    TypeInformation<KEY> keyType,
                    WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
                    Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
                    Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
                    Time allowedLateness) {
                this.input1 = input1;
                this.input2 = input2;
    
                this.keySelector1 = keySelector1;
                this.keySelector2 = keySelector2;
                this.keyType = keyType;
    
                this.windowAssigner = windowAssigner;
                this.trigger = trigger;
                this.evictor = evictor;
    
                this.allowedLateness = allowedLateness;
            }
    
            @PublicEvolving
            public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
                return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
                        windowAssigner, newTrigger, evictor, allowedLateness);
            }
    
            @PublicEvolving
            public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
                return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
                        windowAssigner, trigger, newEvictor, allowedLateness);
            }
    
            @PublicEvolving
            public WithWindow<T1, T2, KEY, W> allowedLateness(Time newLateness) {
                return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
                        windowAssigner, trigger, evictor, newLateness);
            }
    
            public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function) {
    
                TypeInformation<T> resultType = TypeExtractor.getCoGroupReturnTypes(
                    function,
                    input1.getType(),
                    input2.getType(),
                    "CoGroup",
                    false);
    
                return apply(function, resultType);
            }
    
            public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
                //clean the closure
                function = input1.getExecutionEnvironment().clean(function);
    
                UnionTypeInfo<T1, T2> unionType = new UnionTypeInfo<>(input1.getType(), input2.getType());
                UnionKeySelector<T1, T2, KEY> unionKeySelector = new UnionKeySelector<>(keySelector1, keySelector2);
    
                DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1
                        .map(new Input1Tagger<T1, T2>())
                        .setParallelism(input1.getParallelism())
                        .returns(unionType);
                DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2
                        .map(new Input2Tagger<T1, T2>())
                        .setParallelism(input2.getParallelism())
                        .returns(unionType);
    
                DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);
    
                // we explicitly create the keyed stream to manually pass the key type information in
                windowedStream =
                        new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType)
                        .window(windowAssigner);
    
                if (trigger != null) {
                    windowedStream.trigger(trigger);
                }
                if (evictor != null) {
                    windowedStream.evictor(evictor);
                }
                if (allowedLateness != null) {
                    windowedStream.allowedLateness(allowedLateness);
                }
    
                return windowedStream.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
            }
    
            //......
    
        }
    
        //......
    }
    
    • CoGroupedStreams的整体类结构跟JoinedStreams很像,CoGroupedStreams提供where操作来构建Where对象;Where对象主要提供equalTo操作用于构建EqualTo对象;EqualTo对象提供window操作用于构建WithWindow对象;WithWindow可以设置windowAssigner、trigger、evictor、allowedLateness,它提供apply操作;其中一个不同的地方是CoGroupedStreams定义的WithWindow对象的apply操作接收的Function是CoGroupFunction类型,而JoinedStreams定义的WithWindow对象的apply操作接收的Function类型是JoinFunction或FlatJoinFunction

    CoGroupFunction

    flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/CoGroupFunction.java

    @Public
    @FunctionalInterface
    public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable {
    
        /**
         * This method must be implemented to provide a user implementation of a
         * coGroup. It is called for each pair of element groups where the elements share the
         * same key.
         *
         * @param first The records from the first input.
         * @param second The records from the second.
         * @param out A collector to return elements.
         *
         * @throws Exception The function may throw Exceptions, which will cause the program to cancel,
         *                   and may trigger the recovery logic.
         */
        void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception;
    }
    
    • CoGroupFunction继承了Function、Serializable,它定义了coGroup操作,可以用来实现outer join,其参数使用的是Iterable,而JoinFunction与FlatJoinFunction的join参数使用的是单个对象类型

    WrappingFunction

    flink-java-1.7.0-sources.jar!/org/apache/flink/api/java/operators/translation/WrappingFunction.java

    @Internal
    public abstract class WrappingFunction<T extends Function> extends AbstractRichFunction {
    
        private static final long serialVersionUID = 1L;
    
        protected T wrappedFunction;
    
        protected WrappingFunction(T wrappedFunction) {
            this.wrappedFunction = wrappedFunction;
        }
    
        @Override
        public void open(Configuration parameters) throws Exception {
            FunctionUtils.openFunction(this.wrappedFunction, parameters);
        }
    
        @Override
        public void close() throws Exception {
            FunctionUtils.closeFunction(this.wrappedFunction);
        }
    
        @Override
        public void setRuntimeContext(RuntimeContext t) {
            super.setRuntimeContext(t);
    
            FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, t);
        }
    
        public T getWrappedFunction () {
            return this.wrappedFunction;
        }
    }
    
    • WrappingFunction继承了AbstractRichFunction,这里它覆盖了父类的open、close、setRuntimeContext方法,用于管理wrappedFunction

    JoinCoGroupFunction

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java

        /**
         * CoGroup function that does a nested-loop join to get the join result.
         */
        private static class JoinCoGroupFunction<T1, T2, T>
                extends WrappingFunction<JoinFunction<T1, T2, T>>
                implements CoGroupFunction<T1, T2, T> {
            private static final long serialVersionUID = 1L;
    
            public JoinCoGroupFunction(JoinFunction<T1, T2, T> wrappedFunction) {
                super(wrappedFunction);
            }
    
            @Override
            public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {
                for (T1 val1: first) {
                    for (T2 val2: second) {
                        out.collect(wrappedFunction.join(val1, val2));
                    }
                }
            }
        }
    
    • JoinCoGroupFunction继承了WrappingFunction,同时实现CoGroupFunction接口定义的coGroup方法,默认是遍历第一个集合,对其每个元素遍历第二个集合,挨个执行wrappedFunction.join,然后发射join数据
    • JoinedStreams定义了私有静态类JoinCoGroupFunction,JoinedStreams的WithWindow对象的apply方法内部使用它将JoinFunction进行包装,然后去调用CoGroupedStreams的WithWindow的apply方法
    • JoinFunction定义的join方法,接收的是两个对象类型参数,而JoinCoGroupFunction定义的coGroup方法,接收的两个Iterable类型参数

    FlatJoinCoGroupFunction

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java

        /**
         * CoGroup function that does a nested-loop join to get the join result. (FlatJoin version)
         */
        private static class FlatJoinCoGroupFunction<T1, T2, T>
                extends WrappingFunction<FlatJoinFunction<T1, T2, T>>
                implements CoGroupFunction<T1, T2, T> {
            private static final long serialVersionUID = 1L;
    
            public FlatJoinCoGroupFunction(FlatJoinFunction<T1, T2, T> wrappedFunction) {
                super(wrappedFunction);
            }
    
            @Override
            public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {
                for (T1 val1: first) {
                    for (T2 val2: second) {
                        wrappedFunction.join(val1, val2, out);
                    }
                }
            }
        }
    
    • FlatJoinCoGroupFunction继承了WrappingFunction,同时实现CoGroupFunction接口定义的coGroup方法,默认是遍历第一个集合,对其每个元素遍历第二个集合,挨个执行wrappedFunction.join,然后发射join数据
    • JoinedStreams定义了私有静态类FlatJoinCoGroupFunction,JoinedStreams的WithWindow对象的apply方法内部使用它将FlatJoinFunction进行包装,然后去调用CoGroupedStreams的WithWindow的apply方法
    • FlatJoinFunction定义的join方法,接收的是两个对象类型参数,而FlatJoinCoGroupFunction定义的coGroup方法,接收的两个Iterable类型参数

    小结

    • DataStream提供了join方法,用于执行join操作,它返回的是JoinedStreams;JoinedStreams主要是提供where操作来构建Where对象;Where对象主要提供equalTo操作用于构建EqualTo对象;EqualTo对象提供window操作用于构建WithWindow对象;WithWindow可以设置windowAssigner、trigger、evictor、allowedLateness,它提供apply操作
    • apply操作可以接收JoinFunction或者FlatJoinFunction,它内部是使用DataStream的coGroup方法创建CoGroupedStreams,之后将自身的where及equalTo的keySelector、windowAssigner、trigger、evictor、allowedLateness都设置给CoGroupedStreams,最后调用CoGroupedStreams的WithWindow对象的apply方法;JoinFunction及FlatJoinFunction都继承了Function、Serializable,它定义了join操作,默认是inner join的语义,如果需要outer join,可以使用CoGroupFunction;而FlatJoinFunction与JoinFunction的join的不同之处的在于FlatJoinFunction的join方法多了Collector参数,可以用来发射0条、1条或者多条数据,所以是Flat命名
    • CoGroupedStreams的WithWindow对象的apply方法与JoinedStreams的WithWindow对象的apply方法参数不同,CoGroupedStreams的WithWindow的apply方法接收的是CoGroupFunction,因而JoinedStreams的WithWindow对象的apply方法内部将JoinFunction或者FlatJoinFunction包装为CoGroupFunction(JoinFunction使用JoinCoGroupFunction包装,FlatJoinFunction使用FlatJoinCoGroupFunction包装),然后去调用CoGroupedStreams的WithWindow的apply方法;JoinCoGroupFunction与FlatJoinCoGroupFunction都继承了WrappingFunction(它继承了AbstractRichFunction,这里它覆盖了父类的open、close、setRuntimeContext方法,用于管理wrappedFunction),同时实现CoGroupFunction接口定义的coGroup方法,不同的是一个是包装JoinFunction,一个是包装FlatJoinFunction,不同的是后者是包装FlatJoinFunction,因而join方法多传递了out参数

    doc

    相关文章

      网友评论

          本文标题:聊聊flink DataStream的join操作

          本文链接:https://www.haomeiwen.com/subject/mwcarqtx.html